timely/dataflow/operators/core/
unordered_input.rs1use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::Container;
7use crate::container::{ContainerBuilder, CapacityContainerBuilder};
8
9use crate::scheduling::{Schedule, ActivateOnDrop};
10
11use crate::progress::{Operate, operate::SharedProgress, Timestamp};
12use crate::progress::Source;
13use crate::progress::ChangeBatch;
14use crate::progress::operate::Connectivity;
15use crate::dataflow::channels::pushers::{Counter, Tee};
16use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession};
17
18use crate::dataflow::operators::{ActivateCapability, Capability};
19
20use crate::dataflow::{Scope, StreamCore};
21
22pub trait UnorderedInput<G: Scope> {
24 fn new_unordered_input<CB: ContainerBuilder>(&mut self) -> ((UnorderedHandle<G::Timestamp, CB>, ActivateCapability<G::Timestamp>), StreamCore<G, CB::Container>);
80}
81
82impl<G: Scope> UnorderedInput<G> for G {
83 fn new_unordered_input<CB: ContainerBuilder>(&mut self) -> ((UnorderedHandle<G::Timestamp, CB>, ActivateCapability<G::Timestamp>), StreamCore<G, CB::Container>) {
84
85 let (output, registrar) = Tee::<G::Timestamp, CB::Container>::new();
86 let internal = Rc::new(RefCell::new(ChangeBatch::new()));
87 let cap = Capability::new(G::Timestamp::minimum(), Rc::clone(&internal));
89 let counter = Counter::new(output);
90 let produced = Rc::clone(counter.produced());
91 let peers = self.peers();
92
93 let index = self.allocate_operator_index();
94 let address = self.addr_for_child(index);
95
96 let cap = ActivateCapability::new(cap, Rc::clone(&address), self.activations());
97
98 let helper = UnorderedHandle::new(counter);
99
100 self.add_operator_with_index(Box::new(UnorderedOperator {
101 name: "UnorderedInput".to_owned(),
102 address,
103 shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
104 internal,
105 produced,
106 peers,
107 }), index);
108
109 ((helper, cap), StreamCore::new(Source::new(index, 0), registrar, self.clone()))
110 }
111}
112
113struct UnorderedOperator<T:Timestamp> {
114 name: String,
115 address: Rc<[usize]>,
116 shared_progress: Rc<RefCell<SharedProgress<T>>>,
117 internal: Rc<RefCell<ChangeBatch<T>>>,
118 produced: Rc<RefCell<ChangeBatch<T>>>,
119 peers: usize,
120}
121
122impl<T:Timestamp> Schedule for UnorderedOperator<T> {
123 fn name(&self) -> &str { &self.name }
124 fn path(&self) -> &[usize] { &self.address[..] }
125 fn schedule(&mut self) -> bool {
126 let shared_progress = &mut *self.shared_progress.borrow_mut();
127 self.internal.borrow_mut().drain_into(&mut shared_progress.internals[0]);
128 self.produced.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
129 false
130 }
131}
132
133impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
134 fn inputs(&self) -> usize { 0 }
135 fn outputs(&self) -> usize { 1 }
136
137 fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
138 let mut borrow = self.internal.borrow_mut();
139 for (time, count) in borrow.drain() {
140 self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));
141 }
142 (Vec::new(), Rc::clone(&self.shared_progress))
143 }
144
145 fn notify_me(&self) -> bool { false }
146}
147
148#[derive(Debug)]
150pub struct UnorderedHandle<T: Timestamp, CB: ContainerBuilder> {
151 buffer: PushBuffer<T, CB, Counter<T, CB::Container, Tee<T, CB::Container>>>,
152}
153
154impl<T: Timestamp, CB: ContainerBuilder> UnorderedHandle<T, CB> {
155 fn new(pusher: Counter<T, CB::Container, Tee<T, CB::Container>>) -> UnorderedHandle<T, CB> {
156 UnorderedHandle {
157 buffer: PushBuffer::new(pusher),
158 }
159 }
160
161 #[inline]
163 pub fn session_with_builder(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<'_, T, CB, Counter<T, CB::Container, Tee<T, CB::Container>>>> {
164 ActivateOnDrop::new(self.buffer.autoflush_session_with_builder(cap.capability.clone()), Rc::clone(&cap.address), Rc::clone(&cap.activations))
165 }
166}
167
168impl<T: Timestamp, C: Container> UnorderedHandle<T, CapacityContainerBuilder<C>> {
169 #[inline]
171 pub fn session(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<'_, T, CapacityContainerBuilder<C>, Counter<T, C, Tee<T, C>>>> {
172 self.session_with_builder(cap)
173 }
174}