Skip to main content

timely/dataflow/operators/generic/
builder_rc.rs

1//! Types to build operators with general shapes.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::default::Default;
6
7use crate::progress::{ChangeBatch, Timestamp};
8use crate::progress::operate::SharedProgress;
9use crate::progress::frontier::{Antichain, MutableAntichain};
10
11use crate::Container;
12use crate::dataflow::{Scope, Stream};
13use crate::dataflow::channels::pushers::Counter as PushCounter;
14use crate::dataflow::channels::pushers;
15use crate::dataflow::channels::pact::ParallelizationContract;
16use crate::dataflow::channels::pullers::Counter as PullCounter;
17use crate::dataflow::operators::capability::Capability;
18use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle};
19use crate::dataflow::operators::generic::operator_info::OperatorInfo;
20use crate::dataflow::operators::generic::builder_raw::OperatorShape;
21use crate::progress::operate::{FrontierInterest, PortConnectivity};
22
23use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
24
25/// Builds operators with generic shape.
26#[derive(Debug)]
27pub struct OperatorBuilder<'scope, T: Timestamp> {
28    builder: OperatorBuilderRaw<'scope, T>,
29    frontier: Vec<MutableAntichain<T>>,
30    consumed: Vec<Rc<RefCell<ChangeBatch<T>>>>,
31    internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
32    /// For each input, a shared list of summaries to each output.
33    summaries: Vec<Rc<RefCell<PortConnectivity<<T as Timestamp>::Summary>>>>,
34    produced: Vec<Rc<RefCell<ChangeBatch<T>>>>,
35}
36
37impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
38
39    /// Allocates a new generic operator builder from its containing scope.
40    pub fn new(name: String, scope: Scope<'scope, T>) -> Self {
41        OperatorBuilder {
42            builder: OperatorBuilderRaw::new(name, scope),
43            frontier: Vec::new(),
44            consumed: Vec::new(),
45            internal: Rc::new(RefCell::new(Vec::new())),
46            summaries: Vec::new(),
47            produced: Vec::new(),
48        }
49    }
50
51    /// Sets frontier interest for a specific input.
52    pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) {
53        self.builder.set_notify_for(input, notify);
54    }
55
56    /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
57    pub fn new_input<C: Container, P>(&mut self, stream: Stream<'scope, T, C>, pact: P) -> InputHandleCore<T, C, P::Puller>
58    where
59        P: ParallelizationContract<T, C> {
60
61        let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default())));
62        self.new_input_connection(stream, pact, connection)
63    }
64
65    /// Adds a new input with connection information to a generic operator builder, returning the `Pull` implementor to use.
66    ///
67    /// The `connection` parameter contains promises made by the operator for each of the existing *outputs*, that any timestamp
68    /// appearing at the input, any output timestamp will be greater than or equal to the input timestamp subjected to a `Summary`
69    /// greater or equal to some element of the corresponding antichain in `connection`.
70    ///
71    /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
72    /// antichain indicating that there is no connection from the input to the output.
73    pub fn new_input_connection<C: Container, P, I>(&mut self, stream: Stream<'scope, T, C>, pact: P, connection: I) -> InputHandleCore<T, C, P::Puller>
74    where
75        P: ParallelizationContract<T, C>,
76        I: IntoIterator<Item = (usize, Antichain<<T as Timestamp>::Summary>)> + Clone,
77    {
78        let puller = self.builder.new_input_connection(stream, pact, connection.clone());
79
80        let input = PullCounter::new(puller);
81        self.frontier.push(MutableAntichain::new());
82        self.consumed.push(Rc::clone(input.consumed()));
83
84        let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
85        self.summaries.push(Rc::clone(&shared_summary));
86
87        new_input_handle(input, Rc::clone(&self.internal), shared_summary)
88    }
89
90    /// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
91    pub fn new_output<C: Container>(&mut self) -> (pushers::Output<T, C>, Stream<'scope, T, C>) {
92        let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default())));
93        self.new_output_connection(connection)
94    }
95
96    /// Adds a new output with connection information to a generic operator builder, returning the `Push` implementor to use.
97    ///
98    /// The `connection` parameter contains promises made by the operator for each of the existing *inputs*, that any timestamp
99    /// appearing at the input, any output timestamp will be greater than or equal to the input timestamp subjected to a `Summary`
100    /// greater or equal to some element of the corresponding antichain in `connection`.
101    ///
102    /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
103    /// antichain indicating that there is no connection from the input to the output.
104    pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (pushers::Output<T, C>, Stream<'scope, T, C>)
105    where
106        I: IntoIterator<Item = (usize, Antichain<<T as Timestamp>::Summary>)> + Clone,
107    {
108        let new_output = self.shape().outputs();
109        let (tee, stream) = self.builder.new_output_connection(connection.clone());
110
111        let internal = Rc::new(RefCell::new(ChangeBatch::new()));
112        self.internal.borrow_mut().push(Rc::clone(&internal));
113
114        let counter = PushCounter::new(tee);
115        self.produced.push(Rc::clone(counter.produced()));
116
117        for (input, entry) in connection {
118            self.summaries[input].borrow_mut().add_port(new_output, entry);
119        }
120
121        (pushers::Output::new(counter, internal, new_output), stream)
122    }
123
124    /// Creates an operator implementation from supplied logic constructor.
125    pub fn build<B, L>(self, constructor: B)
126    where
127        B: FnOnce(Vec<Capability<T>>) -> L,
128        L: FnMut(&[MutableAntichain<T>])+'static
129    {
130        self.build_reschedule(|caps| {
131            let mut logic = constructor(caps);
132            move |frontier| { logic(frontier); false }
133        })
134    }
135
136    /// Creates an operator implementation from supplied logic constructor.
137    ///
138    /// Unlike `build`, the supplied closure can indicate if the operator
139    /// should be considered incomplete. A not-incomplete operator will be
140    /// shut down if it has empty input frontiers and holds no capabilities.
141    /// Flagging oneself as incomplete is most commonly used by operators
142    /// that manage external resources like file writes or transactions that
143    /// must complete before the operator should be shut down.
144    ///
145    /// This method boxes `B` and `L` and delegates to [`build_reschedule_boxed`].
146    /// For the fully generic (non-boxing) path, see [`build_reschedule_typed`].
147    pub fn build_reschedule<B, L>(self, constructor: B)
148    where
149        B: FnOnce(Vec<Capability<T>>) -> L,
150        L: FnMut(&[MutableAntichain<T>])->bool+'static
151    {
152        self.build_reschedule_boxed(Box::new(|caps| -> Box<dyn FnMut(&[MutableAntichain<T>])->bool> { Box::new(constructor(caps)) }));
153    }
154
155    /// Like `build_reschedule`, but with a pre-boxed constructor.
156    ///
157    /// This method exists primarily to force the `Box<dyn ...>` coercions, which
158    /// can otherwise easily be `Box<B>` or `Box<L>` for specialized `B` and `L` instead.
159    pub fn build_reschedule_boxed<'a>(self, constructor: Box<dyn FnOnce(Vec<Capability<T>>) -> Box<dyn FnMut(&[MutableAntichain<T>])->bool> + 'a>) {
160        self.build_reschedule_typed(constructor);
161    }
162
163    /// Like `build_reschedule`, but specialized to the closure types `B` and `L`.
164    ///
165    /// This method is instantiated once per distinct `(B, L)` pair, and one
166    /// should be mindful of monomorphization bloat. Callers with many closures
167    /// should consider erasing their variation, for example via `Box<dyn ...>`.
168    ///
169    /// This method calls `build_typed` directly using a new closure, mirroring
170    /// the variation in `L`, rather than forcing it to be reboxed via `build`.
171    pub fn build_reschedule_typed<B, L>(self, constructor: B)
172    where
173        B: FnOnce(Vec<Capability<T>>) -> L,
174        L: FnMut(&[MutableAntichain<T>])->bool+'static
175    {
176        let mut logic = constructor(self.mint_capabilities());
177
178        let mut bookkeeping = ProgressBookkeeping {
179            frontier: self.frontier,
180            consumed: self.consumed,
181            internal: self.internal,
182            produced: self.produced,
183        };
184
185        let raw_logic =
186        move |progress: &mut SharedProgress<T>| {
187            bookkeeping.drain_frontiers(progress);
188            let result = logic(bookkeeping.frontiers());
189            bookkeeping.publish_progress(progress);
190            result
191        };
192
193        self.builder.build_typed(raw_logic);
194    }
195
196    /// Create initial capabilities, one per output, and clear their creation evidence.
197    ///
198    /// This method is specifically outlined from `Self::build_reschedule_typed` to avoid
199    /// monomorphization bloat, as it depends only on `T`, not on the closures.
200    fn mint_capabilities(&self) -> Vec<Capability<T>> {
201        let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
202        for batch in self.internal.borrow().iter() {
203            capabilities.push(Capability::new(T::minimum(), Rc::clone(batch)));
204            // Discard evidence of creation, as we are assumed to start with one.
205            batch.borrow_mut().clear();
206        }
207        capabilities
208    }
209
210    /// Get the identifier assigned to the operator being constructed
211    pub fn index(&self) -> usize { self.builder.index() }
212
213    /// The operator's worker-unique identifier.
214    pub fn global(&self) -> usize { self.builder.global() }
215
216    /// Return a reference to the operator's shape
217    pub fn shape(&self) -> &OperatorShape { self.builder.shape() }
218
219    /// Creates operator info for the operator.
220    pub fn operator_info(&self) -> OperatorInfo { self.builder.operator_info() }
221}
222
223
224/// Progress-tracking state that is independent of operator logic.
225///
226/// Extracted so that `drain_frontiers` and `publish_progress` are monomorphized
227/// once per timestamp type `T`, rather than once per closure type passed to
228/// `build_reschedule`.
229struct ProgressBookkeeping<T: Timestamp> {
230    frontier: Vec<MutableAntichain<T>>,
231    consumed: Vec<Rc<RefCell<ChangeBatch<T>>>>,
232    internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
233    produced: Vec<Rc<RefCell<ChangeBatch<T>>>>,
234}
235
236impl<T: Timestamp> ProgressBookkeeping<T> {
237    /// The current input frontiers, for passing to operator logic.
238    #[inline(always)] fn frontiers(&self) -> &[MutableAntichain<T>] { &self.frontier[..] }
239
240    /// Drain incoming frontier changes from `SharedProgress` into our local antichains.
241    fn drain_frontiers(&mut self, progress: &mut SharedProgress<T>) {
242        for (progress, frontier) in progress.frontiers.iter_mut().zip(self.frontier.iter_mut()) {
243            frontier.update_iter(progress.drain());
244        }
245    }
246
247    /// Publish consumed, internal, and produced changes back to `SharedProgress`.
248    fn publish_progress(&self, progress: &mut SharedProgress<T>) {
249        // move batches of consumed changes.
250        for (progress, consumed) in progress.consumeds.iter_mut().zip(self.consumed.iter()) {
251            consumed.borrow_mut().drain_into(progress);
252        }
253
254        // move batches of internal changes.
255        let self_internal_borrow = self.internal.borrow_mut();
256        for index in 0 .. self_internal_borrow.len() {
257            let mut borrow = self_internal_borrow[index].borrow_mut();
258            progress.internals[index].extend(borrow.drain());
259        }
260
261        // move batches of produced changes.
262        for (progress, produced) in progress.produceds.iter_mut().zip(self.produced.iter()) {
263            produced.borrow_mut().drain_into(progress);
264        }
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use crate::dataflow::operators::generic::OutputBuilder;
271
272    #[test]
273    #[should_panic]
274    fn incorrect_capabilities() {
275
276        // This tests that if we attempt to use a capability associated with the
277        // wrong output, there is a run-time assertion.
278
279        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
280
281        crate::example(|scope| {
282
283            let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
284
285            let (output1, _stream1) = builder.new_output::<Vec<()>>();
286            let (output2, _stream2) = builder.new_output::<Vec<()>>();
287            let mut output1 = OutputBuilder::from(output1);
288            let mut output2 = OutputBuilder::from(output2);
289
290            builder.build(move |capabilities| {
291                move |_frontiers| {
292
293                    let mut output_handle1 = output1.activate();
294                    let mut output_handle2 = output2.activate();
295
296                    // NOTE: Using incorrect capabilities here.
297                    output_handle2.session(&capabilities[0]);
298                    output_handle1.session(&capabilities[1]);
299                }
300            });
301        })
302    }
303
304    #[test]
305    fn correct_capabilities() {
306
307        // This tests that if we attempt to use capabilities with the correct outputs
308        // there is no runtime assertion
309
310        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
311
312        crate::example(|scope| {
313
314            let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
315
316            let (output1, _stream1) = builder.new_output::<Vec<()>>();
317            let (output2, _stream2) = builder.new_output::<Vec<()>>();
318            let mut output1 = OutputBuilder::from(output1);
319            let mut output2 = OutputBuilder::from(output2);
320
321            builder.build(move |mut capabilities| {
322                move |_frontiers| {
323
324                    let mut output_handle1 = output1.activate();
325                    let mut output_handle2 = output2.activate();
326
327                    // Avoid second call.
328                    if !capabilities.is_empty() {
329
330                        // NOTE: Using correct capabilities here.
331                        output_handle1.session(&capabilities[0]);
332                        output_handle2.session(&capabilities[1]);
333
334                        capabilities.clear();
335                    }
336                }
337            });
338
339            "Hello".to_owned()
340        });
341    }
342}