timely/dataflow/operators/generic/
builder_rc.rs

1//! Types to build operators with general shapes.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::default::Default;
6
7use crate::progress::{ChangeBatch, Timestamp};
8use crate::progress::operate::SharedProgress;
9use crate::progress::frontier::{Antichain, MutableAntichain};
10
11use crate::Container;
12use crate::dataflow::{Scope, StreamCore};
13use crate::dataflow::channels::pushers::Counter as PushCounter;
14use crate::dataflow::channels::pushers;
15use crate::dataflow::channels::pact::ParallelizationContract;
16use crate::dataflow::channels::pullers::Counter as PullCounter;
17use crate::dataflow::operators::capability::Capability;
18use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle};
19use crate::dataflow::operators::generic::operator_info::OperatorInfo;
20use crate::dataflow::operators::generic::builder_raw::OperatorShape;
21use crate::progress::operate::PortConnectivity;
22
23use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
24
25/// Builds operators with generic shape.
26#[derive(Debug)]
27pub struct OperatorBuilder<G: Scope> {
28    builder: OperatorBuilderRaw<G>,
29    frontier: Vec<MutableAntichain<G::Timestamp>>,
30    consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
31    internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
32    /// For each input, a shared list of summaries to each output.
33    summaries: Vec<Rc<RefCell<PortConnectivity<<G::Timestamp as Timestamp>::Summary>>>>,
34    produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
35}
36
37impl<G: Scope> OperatorBuilder<G> {
38
39    /// Allocates a new generic operator builder from its containing scope.
40    pub fn new(name: String, scope: G) -> Self {
41        OperatorBuilder {
42            builder: OperatorBuilderRaw::new(name, scope),
43            frontier: Vec::new(),
44            consumed: Vec::new(),
45            internal: Rc::new(RefCell::new(Vec::new())),
46            summaries: Vec::new(),
47            produced: Vec::new(),
48        }
49    }
50
51    /// Indicates whether the operator requires frontier information.
52    pub fn set_notify(&mut self, notify: bool) {
53        self.builder.set_notify(notify);
54    }
55
56    /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
57    pub fn new_input<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P) -> InputHandleCore<G::Timestamp, C, P::Puller>
58    where
59        P: ParallelizationContract<G::Timestamp, C> {
60
61        let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default())));
62        self.new_input_connection(stream, pact, connection)
63    }
64
65    /// Adds a new input with connection information to a generic operator builder, returning the `Pull` implementor to use.
66    ///
67    /// The `connection` parameter contains promises made by the operator for each of the existing *outputs*, that any timestamp
68    /// appearing at the input, any output timestamp will be greater than or equal to the input timestamp subjected to a `Summary`
69    /// greater or equal to some element of the corresponding antichain in `connection`.
70    ///
71    /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
72    /// antichain indicating that there is no connection from the input to the output.
73    pub fn new_input_connection<C: Container, P, I>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: I) -> InputHandleCore<G::Timestamp, C, P::Puller>
74    where
75        P: ParallelizationContract<G::Timestamp, C>,
76        I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
77    {
78        let puller = self.builder.new_input_connection(stream, pact, connection.clone());
79
80        let input = PullCounter::new(puller);
81        self.frontier.push(MutableAntichain::new());
82        self.consumed.push(Rc::clone(input.consumed()));
83
84        let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
85        self.summaries.push(Rc::clone(&shared_summary));
86
87        new_input_handle(input, Rc::clone(&self.internal), shared_summary)
88    }
89
90    /// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
91    pub fn new_output<C: Container>(&mut self) -> (pushers::Output<G::Timestamp, C>, StreamCore<G, C>) {
92        let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default())));
93        self.new_output_connection(connection)
94    }
95
96    /// Adds a new output with connection information to a generic operator builder, returning the `Push` implementor to use.
97    ///
98    /// The `connection` parameter contains promises made by the operator for each of the existing *inputs*, that any timestamp
99    /// appearing at the input, any output timestamp will be greater than or equal to the input timestamp subjected to a `Summary`
100    /// greater or equal to some element of the corresponding antichain in `connection`.
101    ///
102    /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
103    /// antichain indicating that there is no connection from the input to the output.
104    pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (
105        pushers::Output<G::Timestamp, C>,
106        StreamCore<G, C>,
107    )
108    where
109        I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
110    {
111        let new_output = self.shape().outputs();
112        let (tee, stream) = self.builder.new_output_connection(connection.clone());
113
114        let internal = Rc::new(RefCell::new(ChangeBatch::new()));
115        self.internal.borrow_mut().push(Rc::clone(&internal));
116
117        let counter = PushCounter::new(tee);
118        self.produced.push(Rc::clone(counter.produced()));
119
120        for (input, entry) in connection {
121            self.summaries[input].borrow_mut().add_port(new_output, entry);
122        }
123
124        (pushers::Output::new(counter, internal, new_output), stream)
125    }
126
127    /// Creates an operator implementation from supplied logic constructor.
128    pub fn build<B, L>(self, constructor: B)
129    where
130        B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
131        L: FnMut(&[MutableAntichain<G::Timestamp>])+'static
132    {
133        self.build_reschedule(|caps| {
134            let mut logic = constructor(caps);
135            move |frontier| { logic(frontier); false }
136        })
137    }
138
139    /// Creates an operator implementation from supplied logic constructor.
140    ///
141    /// Unlike `build`, the supplied closure can indicate if the operator
142    /// should be considered incomplete. The `build` method indicates that
143    /// the operator is never incomplete and can be shut down at the system's
144    /// discretion.
145    pub fn build_reschedule<B, L>(self, constructor: B)
146    where
147        B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
148        L: FnMut(&[MutableAntichain<G::Timestamp>])->bool+'static
149    {
150        // create capabilities, discard references to their creation.
151        let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
152        for batch in self.internal.borrow().iter() {
153            capabilities.push(Capability::new(G::Timestamp::minimum(), Rc::clone(batch)));
154            // Discard evidence of creation, as we are assumed to start with one.
155            batch.borrow_mut().clear();
156        }
157
158        let mut logic = constructor(capabilities);
159
160        let mut self_frontier = self.frontier;
161        let self_consumed = self.consumed;
162        let self_internal = self.internal;
163        let self_produced = self.produced;
164
165        let raw_logic =
166        move |progress: &mut SharedProgress<G::Timestamp>| {
167
168            // drain frontier changes
169            for (progress, frontier) in progress.frontiers.iter_mut().zip(self_frontier.iter_mut()) {
170                frontier.update_iter(progress.drain());
171            }
172
173            // invoke supplied logic
174            let result = logic(&self_frontier[..]);
175
176            // move batches of consumed changes.
177            for (progress, consumed) in progress.consumeds.iter_mut().zip(self_consumed.iter()) {
178                consumed.borrow_mut().drain_into(progress);
179            }
180
181            // move batches of internal changes.
182            let self_internal_borrow = self_internal.borrow_mut();
183            for index in 0 .. self_internal_borrow.len() {
184                let mut borrow = self_internal_borrow[index].borrow_mut();
185                progress.internals[index].extend(borrow.drain());
186            }
187
188            // move batches of produced changes.
189            for (progress, produced) in progress.produceds.iter_mut().zip(self_produced.iter()) {
190                produced.borrow_mut().drain_into(progress);
191            }
192
193            result
194        };
195
196        self.builder.build(raw_logic);
197    }
198
199    /// Get the identifier assigned to the operator being constructed
200    pub fn index(&self) -> usize {
201        self.builder.index()
202    }
203
204    /// The operator's worker-unique identifier.
205    pub fn global(&self) -> usize {
206        self.builder.global()
207    }
208
209    /// Return a reference to the operator's shape
210    pub fn shape(&self) -> &OperatorShape {
211        self.builder.shape()
212    }
213
214    /// Creates operator info for the operator.
215    pub fn operator_info(&self) -> OperatorInfo {
216        self.builder.operator_info()
217    }
218}
219
220
221#[cfg(test)]
222mod tests {
223    use crate::dataflow::operators::generic::OutputBuilder;
224
225    #[test]
226    #[should_panic]
227    fn incorrect_capabilities() {
228
229        // This tests that if we attempt to use a capability associated with the
230        // wrong output, there is a run-time assertion.
231
232        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
233
234        crate::example(|scope| {
235
236            let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
237
238            let (output1, _stream1) = builder.new_output::<Vec<()>>();
239            let (output2, _stream2) = builder.new_output::<Vec<()>>();
240            let mut output1 = OutputBuilder::from(output1);
241            let mut output2 = OutputBuilder::from(output2);
242
243            builder.build(move |capabilities| {
244                move |_frontiers| {
245
246                    let mut output_handle1 = output1.activate();
247                    let mut output_handle2 = output2.activate();
248
249                    // NOTE: Using incorrect capabilities here.
250                    output_handle2.session(&capabilities[0]);
251                    output_handle1.session(&capabilities[1]);
252                }
253            });
254        })
255    }
256
257    #[test]
258    fn correct_capabilities() {
259
260        // This tests that if we attempt to use capabilities with the correct outputs
261        // there is no runtime assertion
262
263        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
264
265        crate::example(|scope| {
266
267            let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
268
269            let (output1, _stream1) = builder.new_output::<Vec<()>>();
270            let (output2, _stream2) = builder.new_output::<Vec<()>>();
271            let mut output1 = OutputBuilder::from(output1);
272            let mut output2 = OutputBuilder::from(output2);
273
274            builder.build(move |mut capabilities| {
275                move |_frontiers| {
276
277                    let mut output_handle1 = output1.activate();
278                    let mut output_handle2 = output2.activate();
279
280                    // Avoid second call.
281                    if !capabilities.is_empty() {
282
283                        // NOTE: Using correct capabilities here.
284                        output_handle1.session(&capabilities[0]);
285                        output_handle2.session(&capabilities[1]);
286
287                        capabilities.clear();
288                    }
289                }
290            });
291
292            "Hello".to_owned()
293        });
294    }
295}