timely/dataflow/operators/generic/
builder_rc.rs1use std::rc::Rc;
4use std::cell::RefCell;
5use std::default::Default;
6
7use crate::progress::{ChangeBatch, Timestamp};
8use crate::progress::operate::SharedProgress;
9use crate::progress::frontier::{Antichain, MutableAntichain};
10
11use crate::Container;
12use crate::dataflow::{Scope, Stream};
13use crate::dataflow::channels::pushers::Counter as PushCounter;
14use crate::dataflow::channels::pushers;
15use crate::dataflow::channels::pact::ParallelizationContract;
16use crate::dataflow::channels::pullers::Counter as PullCounter;
17use crate::dataflow::operators::capability::Capability;
18use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle};
19use crate::dataflow::operators::generic::operator_info::OperatorInfo;
20use crate::dataflow::operators::generic::builder_raw::OperatorShape;
21use crate::progress::operate::{FrontierInterest, PortConnectivity};
22
23use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
24
25#[derive(Debug)]
27pub struct OperatorBuilder<'scope, T: Timestamp> {
28 builder: OperatorBuilderRaw<'scope, T>,
29 frontier: Vec<MutableAntichain<T>>,
30 consumed: Vec<Rc<RefCell<ChangeBatch<T>>>>,
31 internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
32 summaries: Vec<Rc<RefCell<PortConnectivity<<T as Timestamp>::Summary>>>>,
34 produced: Vec<Rc<RefCell<ChangeBatch<T>>>>,
35}
36
37impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
38
39 pub fn new(name: String, scope: Scope<'scope, T>) -> Self {
41 OperatorBuilder {
42 builder: OperatorBuilderRaw::new(name, scope),
43 frontier: Vec::new(),
44 consumed: Vec::new(),
45 internal: Rc::new(RefCell::new(Vec::new())),
46 summaries: Vec::new(),
47 produced: Vec::new(),
48 }
49 }
50
51 pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) {
53 self.builder.set_notify_for(input, notify);
54 }
55
56 pub fn new_input<C: Container, P>(&mut self, stream: Stream<'scope, T, C>, pact: P) -> InputHandleCore<T, C, P::Puller>
58 where
59 P: ParallelizationContract<T, C> {
60
61 let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default())));
62 self.new_input_connection(stream, pact, connection)
63 }
64
65 pub fn new_input_connection<C: Container, P, I>(&mut self, stream: Stream<'scope, T, C>, pact: P, connection: I) -> InputHandleCore<T, C, P::Puller>
74 where
75 P: ParallelizationContract<T, C>,
76 I: IntoIterator<Item = (usize, Antichain<<T as Timestamp>::Summary>)> + Clone,
77 {
78 let puller = self.builder.new_input_connection(stream, pact, connection.clone());
79
80 let input = PullCounter::new(puller);
81 self.frontier.push(MutableAntichain::new());
82 self.consumed.push(Rc::clone(input.consumed()));
83
84 let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
85 self.summaries.push(Rc::clone(&shared_summary));
86
87 new_input_handle(input, Rc::clone(&self.internal), shared_summary)
88 }
89
90 pub fn new_output<C: Container>(&mut self) -> (pushers::Output<T, C>, Stream<'scope, T, C>) {
92 let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default())));
93 self.new_output_connection(connection)
94 }
95
96 pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (pushers::Output<T, C>, Stream<'scope, T, C>)
105 where
106 I: IntoIterator<Item = (usize, Antichain<<T as Timestamp>::Summary>)> + Clone,
107 {
108 let new_output = self.shape().outputs();
109 let (tee, stream) = self.builder.new_output_connection(connection.clone());
110
111 let internal = Rc::new(RefCell::new(ChangeBatch::new()));
112 self.internal.borrow_mut().push(Rc::clone(&internal));
113
114 let counter = PushCounter::new(tee);
115 self.produced.push(Rc::clone(counter.produced()));
116
117 for (input, entry) in connection {
118 self.summaries[input].borrow_mut().add_port(new_output, entry);
119 }
120
121 (pushers::Output::new(counter, internal, new_output), stream)
122 }
123
124 pub fn build<B, L>(self, constructor: B)
126 where
127 B: FnOnce(Vec<Capability<T>>) -> L,
128 L: FnMut(&[MutableAntichain<T>])+'static
129 {
130 self.build_reschedule(|caps| {
131 let mut logic = constructor(caps);
132 move |frontier| { logic(frontier); false }
133 })
134 }
135
136 pub fn build_reschedule<B, L>(self, constructor: B)
148 where
149 B: FnOnce(Vec<Capability<T>>) -> L,
150 L: FnMut(&[MutableAntichain<T>])->bool+'static
151 {
152 self.build_reschedule_boxed(Box::new(|caps| -> Box<dyn FnMut(&[MutableAntichain<T>])->bool> { Box::new(constructor(caps)) }));
153 }
154
155 pub fn build_reschedule_boxed<'a>(self, constructor: Box<dyn FnOnce(Vec<Capability<T>>) -> Box<dyn FnMut(&[MutableAntichain<T>])->bool> + 'a>) {
160 self.build_reschedule_typed(constructor);
161 }
162
163 pub fn build_reschedule_typed<B, L>(self, constructor: B)
172 where
173 B: FnOnce(Vec<Capability<T>>) -> L,
174 L: FnMut(&[MutableAntichain<T>])->bool+'static
175 {
176 let mut logic = constructor(self.mint_capabilities());
177
178 let mut bookkeeping = ProgressBookkeeping {
179 frontier: self.frontier,
180 consumed: self.consumed,
181 internal: self.internal,
182 produced: self.produced,
183 };
184
185 let raw_logic =
186 move |progress: &mut SharedProgress<T>| {
187 bookkeeping.drain_frontiers(progress);
188 let result = logic(bookkeeping.frontiers());
189 bookkeeping.publish_progress(progress);
190 result
191 };
192
193 self.builder.build_typed(raw_logic);
194 }
195
196 fn mint_capabilities(&self) -> Vec<Capability<T>> {
201 let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
202 for batch in self.internal.borrow().iter() {
203 capabilities.push(Capability::new(T::minimum(), Rc::clone(batch)));
204 batch.borrow_mut().clear();
206 }
207 capabilities
208 }
209
210 pub fn index(&self) -> usize { self.builder.index() }
212
213 pub fn global(&self) -> usize { self.builder.global() }
215
216 pub fn shape(&self) -> &OperatorShape { self.builder.shape() }
218
219 pub fn operator_info(&self) -> OperatorInfo { self.builder.operator_info() }
221}
222
223
224struct ProgressBookkeeping<T: Timestamp> {
230 frontier: Vec<MutableAntichain<T>>,
231 consumed: Vec<Rc<RefCell<ChangeBatch<T>>>>,
232 internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
233 produced: Vec<Rc<RefCell<ChangeBatch<T>>>>,
234}
235
236impl<T: Timestamp> ProgressBookkeeping<T> {
237 #[inline(always)] fn frontiers(&self) -> &[MutableAntichain<T>] { &self.frontier[..] }
239
240 fn drain_frontiers(&mut self, progress: &mut SharedProgress<T>) {
242 for (progress, frontier) in progress.frontiers.iter_mut().zip(self.frontier.iter_mut()) {
243 frontier.update_iter(progress.drain());
244 }
245 }
246
247 fn publish_progress(&self, progress: &mut SharedProgress<T>) {
249 for (progress, consumed) in progress.consumeds.iter_mut().zip(self.consumed.iter()) {
251 consumed.borrow_mut().drain_into(progress);
252 }
253
254 let self_internal_borrow = self.internal.borrow_mut();
256 for index in 0 .. self_internal_borrow.len() {
257 let mut borrow = self_internal_borrow[index].borrow_mut();
258 progress.internals[index].extend(borrow.drain());
259 }
260
261 for (progress, produced) in progress.produceds.iter_mut().zip(self.produced.iter()) {
263 produced.borrow_mut().drain_into(progress);
264 }
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use crate::dataflow::operators::generic::OutputBuilder;
271
272 #[test]
273 #[should_panic]
274 fn incorrect_capabilities() {
275
276 use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
280
281 crate::example(|scope| {
282
283 let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
284
285 let (output1, _stream1) = builder.new_output::<Vec<()>>();
286 let (output2, _stream2) = builder.new_output::<Vec<()>>();
287 let mut output1 = OutputBuilder::from(output1);
288 let mut output2 = OutputBuilder::from(output2);
289
290 builder.build(move |capabilities| {
291 move |_frontiers| {
292
293 let mut output_handle1 = output1.activate();
294 let mut output_handle2 = output2.activate();
295
296 output_handle2.session(&capabilities[0]);
298 output_handle1.session(&capabilities[1]);
299 }
300 });
301 })
302 }
303
304 #[test]
305 fn correct_capabilities() {
306
307 use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
311
312 crate::example(|scope| {
313
314 let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
315
316 let (output1, _stream1) = builder.new_output::<Vec<()>>();
317 let (output2, _stream2) = builder.new_output::<Vec<()>>();
318 let mut output1 = OutputBuilder::from(output1);
319 let mut output2 = OutputBuilder::from(output2);
320
321 builder.build(move |mut capabilities| {
322 move |_frontiers| {
323
324 let mut output_handle1 = output1.activate();
325 let mut output_handle2 = output2.activate();
326
327 if !capabilities.is_empty() {
329
330 output_handle1.session(&capabilities[0]);
332 output_handle2.session(&capabilities[1]);
333
334 capabilities.clear();
335 }
336 }
337 });
338
339 "Hello".to_owned()
340 });
341 }
342}