timely/dataflow/operators/generic/
builder_rc.rs

1//! Types to build operators with general shapes.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::default::Default;
6
7use crate::progress::{ChangeBatch, Timestamp};
8use crate::progress::operate::SharedProgress;
9use crate::progress::frontier::{Antichain, MutableAntichain};
10
11use crate::Container;
12use crate::container::ContainerBuilder;
13use crate::dataflow::{Scope, StreamCore};
14use crate::dataflow::channels::pushers::Tee;
15use crate::dataflow::channels::pushers::Counter as PushCounter;
16use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer;
17use crate::dataflow::channels::pact::ParallelizationContract;
18use crate::dataflow::channels::pullers::Counter as PullCounter;
19use crate::dataflow::operators::capability::Capability;
20use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper};
21use crate::dataflow::operators::generic::operator_info::OperatorInfo;
22use crate::dataflow::operators::generic::builder_raw::OperatorShape;
23use crate::progress::operate::PortConnectivity;
24
25use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
26
27/// Builds operators with generic shape.
28#[derive(Debug)]
29pub struct OperatorBuilder<G: Scope> {
30    builder: OperatorBuilderRaw<G>,
31    frontier: Vec<MutableAntichain<G::Timestamp>>,
32    consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
33    internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
34    /// For each input, a shared list of summaries to each output.
35    summaries: Vec<Rc<RefCell<PortConnectivity<<G::Timestamp as Timestamp>::Summary>>>>,
36    produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
37}
38
39impl<G: Scope> OperatorBuilder<G> {
40
41    /// Allocates a new generic operator builder from its containing scope.
42    pub fn new(name: String, scope: G) -> Self {
43        OperatorBuilder {
44            builder: OperatorBuilderRaw::new(name, scope),
45            frontier: Vec::new(),
46            consumed: Vec::new(),
47            internal: Rc::new(RefCell::new(Vec::new())),
48            summaries: Vec::new(),
49            produced: Vec::new(),
50        }
51    }
52
53    /// Indicates whether the operator requires frontier information.
54    pub fn set_notify(&mut self, notify: bool) {
55        self.builder.set_notify(notify);
56    }
57
58    /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
59    pub fn new_input<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P) -> InputHandleCore<G::Timestamp, C, P::Puller>
60    where
61        P: ParallelizationContract<G::Timestamp, C> {
62
63        let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default())));
64        self.new_input_connection(stream, pact, connection)
65    }
66
67    /// Adds a new input with connection information to a generic operator builder, returning the `Pull` implementor to use.
68    ///
69    /// The `connection` parameter contains promises made by the operator for each of the existing *outputs*, that any timestamp
70    /// appearing at the input, any output timestamp will be greater than or equal to the input timestamp subjected to a `Summary`
71    /// greater or equal to some element of the corresponding antichain in `connection`.
72    ///
73    /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
74    /// antichain indicating that there is no connection from the input to the output.
75    pub fn new_input_connection<C: Container, P, I>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: I) -> InputHandleCore<G::Timestamp, C, P::Puller>
76    where
77        P: ParallelizationContract<G::Timestamp, C>,
78        I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
79    {
80        let puller = self.builder.new_input_connection(stream, pact, connection.clone());
81
82        let input = PullCounter::new(puller);
83        self.frontier.push(MutableAntichain::new());
84        self.consumed.push(Rc::clone(input.consumed()));
85
86        let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
87        self.summaries.push(Rc::clone(&shared_summary));
88
89        new_input_handle(input, Rc::clone(&self.internal), shared_summary)
90    }
91
92    /// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
93    pub fn new_output<CB: ContainerBuilder>(&mut self) -> (OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>) {
94        let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default())));
95        self.new_output_connection(connection)
96    }
97
98    /// Adds a new output with connection information to a generic operator builder, returning the `Push` implementor to use.
99    ///
100    /// The `connection` parameter contains promises made by the operator for each of the existing *inputs*, that any timestamp
101    /// appearing at the input, any output timestamp will be greater than or equal to the input timestamp subjected to a `Summary`
102    /// greater or equal to some element of the corresponding antichain in `connection`.
103    ///
104    /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
105    /// antichain indicating that there is no connection from the input to the output.
106    pub fn new_output_connection<CB: ContainerBuilder, I>(&mut self, connection: I) -> (
107        OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
108        StreamCore<G, CB::Container>
109    )
110    where
111        I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
112    {
113        let new_output = self.shape().outputs();
114        let (tee, stream) = self.builder.new_output_connection(connection.clone());
115
116        let internal = Rc::new(RefCell::new(ChangeBatch::new()));
117        self.internal.borrow_mut().push(Rc::clone(&internal));
118
119        let mut buffer = PushBuffer::new(PushCounter::new(tee));
120        self.produced.push(Rc::clone(buffer.inner().produced()));
121
122        for (input, entry) in connection {
123            self.summaries[input].borrow_mut().add_port(new_output, entry);
124        }
125
126        (OutputWrapper::new(buffer, internal, new_output), stream)
127    }
128
129    /// Creates an operator implementation from supplied logic constructor.
130    pub fn build<B, L>(self, constructor: B)
131    where
132        B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
133        L: FnMut(&[MutableAntichain<G::Timestamp>])+'static
134    {
135        self.build_reschedule(|caps| {
136            let mut logic = constructor(caps);
137            move |frontier| { logic(frontier); false }
138        })
139    }
140
141    /// Creates an operator implementation from supplied logic constructor.
142    ///
143    /// Unlike `build`, the supplied closure can indicate if the operator
144    /// should be considered incomplete. The `build` method indicates that
145    /// the operator is never incomplete and can be shut down at the system's
146    /// discretion.
147    pub fn build_reschedule<B, L>(self, constructor: B)
148    where
149        B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
150        L: FnMut(&[MutableAntichain<G::Timestamp>])->bool+'static
151    {
152        // create capabilities, discard references to their creation.
153        let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
154        for batch in self.internal.borrow().iter() {
155            capabilities.push(Capability::new(G::Timestamp::minimum(), Rc::clone(batch)));
156            // Discard evidence of creation, as we are assumed to start with one.
157            batch.borrow_mut().clear();
158        }
159
160        let mut logic = constructor(capabilities);
161
162        let mut self_frontier = self.frontier;
163        let self_consumed = self.consumed;
164        let self_internal = self.internal;
165        let self_produced = self.produced;
166
167        let raw_logic =
168        move |progress: &mut SharedProgress<G::Timestamp>| {
169
170            // drain frontier changes
171            for (progress, frontier) in progress.frontiers.iter_mut().zip(self_frontier.iter_mut()) {
172                frontier.update_iter(progress.drain());
173            }
174
175            // invoke supplied logic
176            let result = logic(&self_frontier[..]);
177
178            // move batches of consumed changes.
179            for (progress, consumed) in progress.consumeds.iter_mut().zip(self_consumed.iter()) {
180                consumed.borrow_mut().drain_into(progress);
181            }
182
183            // move batches of internal changes.
184            let self_internal_borrow = self_internal.borrow_mut();
185            for index in 0 .. self_internal_borrow.len() {
186                let mut borrow = self_internal_borrow[index].borrow_mut();
187                progress.internals[index].extend(borrow.drain());
188            }
189
190            // move batches of produced changes.
191            for (progress, produced) in progress.produceds.iter_mut().zip(self_produced.iter()) {
192                produced.borrow_mut().drain_into(progress);
193            }
194
195            result
196        };
197
198        self.builder.build(raw_logic);
199    }
200
201    /// Get the identifier assigned to the operator being constructed
202    pub fn index(&self) -> usize {
203        self.builder.index()
204    }
205
206    /// The operator's worker-unique identifier.
207    pub fn global(&self) -> usize {
208        self.builder.global()
209    }
210
211    /// Return a reference to the operator's shape
212    pub fn shape(&self) -> &OperatorShape {
213        self.builder.shape()
214    }
215
216    /// Creates operator info for the operator.
217    pub fn operator_info(&self) -> OperatorInfo {
218        self.builder.operator_info()
219    }
220}
221
222
223#[cfg(test)]
224mod tests {
225    use crate::container::CapacityContainerBuilder;
226
227    #[test]
228    #[should_panic]
229    fn incorrect_capabilities() {
230
231        // This tests that if we attempt to use a capability associated with the
232        // wrong output, there is a run-time assertion.
233
234        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
235
236        crate::example(|scope| {
237
238            let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
239
240            // let mut input = builder.new_input(stream, Pipeline);
241            let (mut output1, _stream1) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
242            let (mut output2, _stream2) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
243
244            builder.build(move |capabilities| {
245                move |_frontiers| {
246
247                    let mut output_handle1 = output1.activate();
248                    let mut output_handle2 = output2.activate();
249
250                    // NOTE: Using incorrect capabilities here.
251                    output_handle2.session(&capabilities[0]);
252                    output_handle1.session(&capabilities[1]);
253                }
254            });
255        })
256    }
257
258    #[test]
259    fn correct_capabilities() {
260
261        // This tests that if we attempt to use capabilities with the correct outputs
262        // there is no runtime assertion
263
264        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
265
266        crate::example(|scope| {
267
268            let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
269
270            // let mut input = builder.new_input(stream, Pipeline);
271            let (mut output1, _stream1) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
272            let (mut output2, _stream2) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
273
274            builder.build(move |mut capabilities| {
275                move |_frontiers| {
276
277                    let mut output_handle1 = output1.activate();
278                    let mut output_handle2 = output2.activate();
279
280                    // Avoid second call.
281                    if !capabilities.is_empty() {
282
283                        // NOTE: Using correct capabilities here.
284                        output_handle1.session(&capabilities[0]);
285                        output_handle2.session(&capabilities[1]);
286
287                        capabilities.clear();
288                    }
289                }
290            });
291
292            "Hello".to_owned()
293        });
294    }
295}