1//! Types to build operators with general shapes.
23use std::rc::Rc;
4use std::cell::RefCell;
5use std::default::Default;
67use crate::progress::{ChangeBatch, Timestamp};
8use crate::progress::operate::SharedProgress;
9use crate::progress::frontier::{Antichain, MutableAntichain};
1011use crate::Container;
12use crate::container::ContainerBuilder;
13use crate::dataflow::{Scope, StreamCore};
14use crate::dataflow::channels::pushers::Tee;
15use crate::dataflow::channels::pushers::Counter as PushCounter;
16use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer;
17use crate::dataflow::channels::pact::ParallelizationContract;
18use crate::dataflow::channels::pullers::Counter as PullCounter;
19use crate::dataflow::operators::capability::Capability;
20use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper};
21use crate::dataflow::operators::generic::operator_info::OperatorInfo;
22use crate::dataflow::operators::generic::builder_raw::OperatorShape;
23use crate::progress::operate::PortConnectivity;
24use crate::logging::TimelyLogger as Logger;
2526use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
2728/// Builds operators with generic shape.
29#[derive(Debug)]
30pub struct OperatorBuilder<G: Scope> {
31 builder: OperatorBuilderRaw<G>,
32 frontier: Vec<MutableAntichain<G::Timestamp>>,
33 consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
34 internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
35/// For each input, a shared list of summaries to each output.
36summaries: Vec<Rc<RefCell<PortConnectivity<<G::Timestamp as Timestamp>::Summary>>>>,
37 produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
38 logging: Option<Logger>,
39}
4041impl<G: Scope> OperatorBuilder<G> {
4243/// Allocates a new generic operator builder from its containing scope.
44pub fn new(name: String, scope: G) -> Self {
45let logging = scope.logging();
46 OperatorBuilder {
47 builder: OperatorBuilderRaw::new(name, scope),
48 frontier: Vec::new(),
49 consumed: Vec::new(),
50 internal: Rc::new(RefCell::new(Vec::new())),
51 summaries: Vec::new(),
52 produced: Vec::new(),
53 logging,
54 }
55 }
5657/// Indicates whether the operator requires frontier information.
58pub fn set_notify(&mut self, notify: bool) {
59self.builder.set_notify(notify);
60 }
6162/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
63pub fn new_input<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P) -> InputHandleCore<G::Timestamp, C, P::Puller>
64where
65P: ParallelizationContract<G::Timestamp, C> {
6667let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default())));
68self.new_input_connection(stream, pact, connection)
69 }
7071/// Adds a new input with connection information to a generic operator builder, returning the `Pull` implementor to use.
72 ///
73 /// The `connection` parameter contains promises made by the operator for each of the existing *outputs*, that any timestamp
74 /// appearing at the input, any output timestamp will be greater than or equal to the input timestamp subjected to a `Summary`
75 /// greater or equal to some element of the corresponding antichain in `connection`.
76 ///
77 /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
78 /// antichain indicating that there is no connection from the input to the output.
79pub fn new_input_connection<C: Container, P, I>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: I) -> InputHandleCore<G::Timestamp, C, P::Puller>
80where
81P: ParallelizationContract<G::Timestamp, C>,
82 I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
83 {
84let puller = self.builder.new_input_connection(stream, pact, connection.clone());
8586let input = PullCounter::new(puller);
87self.frontier.push(MutableAntichain::new());
88self.consumed.push(Rc::clone(input.consumed()));
8990let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
91self.summaries.push(Rc::clone(&shared_summary));
9293 new_input_handle(input, Rc::clone(&self.internal), shared_summary, self.logging.clone())
94 }
9596/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
97pub fn new_output<CB: ContainerBuilder>(&mut self) -> (OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>) {
98let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default())));
99self.new_output_connection(connection)
100 }
101102/// Adds a new output with connection information to a generic operator builder, returning the `Push` implementor to use.
103 ///
104 /// The `connection` parameter contains promises made by the operator for each of the existing *inputs*, that any timestamp
105 /// appearing at the input, any output timestamp will be greater than or equal to the input timestamp subjected to a `Summary`
106 /// greater or equal to some element of the corresponding antichain in `connection`.
107 ///
108 /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
109 /// antichain indicating that there is no connection from the input to the output.
110pub fn new_output_connection<CB: ContainerBuilder, I>(&mut self, connection: I) -> (
111 OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
112 StreamCore<G, CB::Container>
113 )
114where
115I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
116 {
117let new_output = self.shape().outputs();
118let (tee, stream) = self.builder.new_output_connection(connection.clone());
119120let internal = Rc::new(RefCell::new(ChangeBatch::new()));
121self.internal.borrow_mut().push(Rc::clone(&internal));
122123let mut buffer = PushBuffer::new(PushCounter::new(tee));
124self.produced.push(Rc::clone(buffer.inner().produced()));
125126for (input, entry) in connection {
127self.summaries[input].borrow_mut().add_port(new_output, entry);
128 }
129130 (OutputWrapper::new(buffer, internal), stream)
131 }
132133/// Creates an operator implementation from supplied logic constructor.
134pub fn build<B, L>(self, constructor: B)
135where
136B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
137 L: FnMut(&[MutableAntichain<G::Timestamp>])+'static
138{
139self.build_reschedule(|caps| {
140let mut logic = constructor(caps);
141move |frontier| { logic(frontier); false }
142 })
143 }
144145/// Creates an operator implementation from supplied logic constructor.
146 ///
147 /// Unlike `build`, the supplied closure can indicate if the operator
148 /// should be considered incomplete. The `build` method indicates that
149 /// the operator is never incomplete and can be shut down at the system's
150 /// discretion.
151pub fn build_reschedule<B, L>(self, constructor: B)
152where
153B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
154 L: FnMut(&[MutableAntichain<G::Timestamp>])->bool+'static
155{
156// create capabilities, discard references to their creation.
157let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
158for batch in self.internal.borrow().iter() {
159 capabilities.push(Capability::new(G::Timestamp::minimum(), Rc::clone(batch)));
160// Discard evidence of creation, as we are assumed to start with one.
161batch.borrow_mut().clear();
162 }
163164let mut logic = constructor(capabilities);
165166let mut self_frontier = self.frontier;
167let self_consumed = self.consumed;
168let self_internal = self.internal;
169let self_produced = self.produced;
170171let raw_logic =
172move |progress: &mut SharedProgress<G::Timestamp>| {
173174// drain frontier changes
175for (progress, frontier) in progress.frontiers.iter_mut().zip(self_frontier.iter_mut()) {
176 frontier.update_iter(progress.drain());
177 }
178179// invoke supplied logic
180let result = logic(&self_frontier[..]);
181182// move batches of consumed changes.
183for (progress, consumed) in progress.consumeds.iter_mut().zip(self_consumed.iter()) {
184 consumed.borrow_mut().drain_into(progress);
185 }
186187// move batches of internal changes.
188let self_internal_borrow = self_internal.borrow_mut();
189for index in 0 .. self_internal_borrow.len() {
190let mut borrow = self_internal_borrow[index].borrow_mut();
191 progress.internals[index].extend(borrow.drain());
192 }
193194// move batches of produced changes.
195for (progress, produced) in progress.produceds.iter_mut().zip(self_produced.iter()) {
196 produced.borrow_mut().drain_into(progress);
197 }
198199 result
200 };
201202self.builder.build(raw_logic);
203 }
204205/// Get the identifier assigned to the operator being constructed
206pub fn index(&self) -> usize {
207self.builder.index()
208 }
209210/// The operator's worker-unique identifier.
211pub fn global(&self) -> usize {
212self.builder.global()
213 }
214215/// Return a reference to the operator's shape
216pub fn shape(&self) -> &OperatorShape {
217self.builder.shape()
218 }
219220/// Creates operator info for the operator.
221pub fn operator_info(&self) -> OperatorInfo {
222self.builder.operator_info()
223 }
224}
225226227#[cfg(test)]
228mod tests {
229use crate::container::CapacityContainerBuilder;
230231#[test]
232 #[should_panic]
233fn incorrect_capabilities() {
234235// This tests that if we attempt to use a capability associated with the
236 // wrong output, there is a run-time assertion.
237238use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
239240crate::example(|scope| {
241242let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
243244// let mut input = builder.new_input(stream, Pipeline);
245let (mut output1, _stream1) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
246let (mut output2, _stream2) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
247248 builder.build(move |capabilities| {
249move |_frontiers| {
250251let mut output_handle1 = output1.activate();
252let mut output_handle2 = output2.activate();
253254// NOTE: Using incorrect capabilities here.
255output_handle2.session(&capabilities[0]);
256 output_handle1.session(&capabilities[1]);
257 }
258 });
259 })
260 }
261262#[test]
263fn correct_capabilities() {
264265// This tests that if we attempt to use capabilities with the correct outputs
266 // there is no runtime assertion
267268use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
269270crate::example(|scope| {
271272let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
273274// let mut input = builder.new_input(stream, Pipeline);
275let (mut output1, _stream1) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
276let (mut output2, _stream2) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
277278 builder.build(move |mut capabilities| {
279move |_frontiers| {
280281let mut output_handle1 = output1.activate();
282let mut output_handle2 = output2.activate();
283284// Avoid second call.
285if !capabilities.is_empty() {
286287// NOTE: Using correct capabilities here.
288output_handle1.session(&capabilities[0]);
289 output_handle2.session(&capabilities[1]);
290291 capabilities.clear();
292 }
293 }
294 });
295296"Hello".to_owned()
297 });
298 }
299}