Skip to main content

timely/dataflow/operators/generic/
builder_raw.rs

1//! Types to build operators with general shapes.
2//!
3//! These types expose some raw timely interfaces, and while public so that others can build on them,
4//! they require some sophistication to use correctly. I recommend checking out `builder_rc.rs` for
5//! an interface that is intentionally harder to mis-use.
6
7use std::default::Default;
8use std::rc::Rc;
9use std::cell::RefCell;
10
11use crate::scheduling::{Schedule, Activations};
12
13use crate::progress::{Source, Target};
14use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
15use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity};
16use crate::Container;
17use crate::dataflow::{Stream, Scope, OperatorSlot};
18use crate::dataflow::channels::pushers::Tee;
19use crate::dataflow::channels::pact::ParallelizationContract;
20use crate::dataflow::operators::generic::operator_info::OperatorInfo;
21
22/// Contains type-free information about the operator properties.
23#[derive(Debug)]
24pub struct OperatorShape {
25    name: String,   // A meaningful name for the operator.
26    notify: Vec<FrontierInterest>,   // Per-input frontier interest.
27    peers: usize,   // The total number of workers in the computation. Needed to initialize pointstamp counts with the correct magnitude.
28    inputs: usize,  // The number of input ports.
29    outputs: usize, // The number of output ports.
30}
31
32/// Core data for the structure of an operator, minus scope and logic.
33impl OperatorShape {
34    fn new(name: String, peers: usize) -> Self {
35        OperatorShape {
36            name,
37            notify: Vec::new(),
38            peers,
39            inputs: 0,
40            outputs: 0,
41        }
42    }
43
44    /// The number of inputs of this operator
45    pub fn inputs(&self) -> usize { self.inputs }
46
47    /// The number of outputs of this operator
48    pub fn outputs(&self) -> usize { self.outputs }
49}
50
51/// Builds operators with generic shape.
52#[derive(Debug)]
53pub struct OperatorBuilder<'scope, T: Timestamp> {
54    scope: Scope<'scope, T>,
55    slot: OperatorSlot<'scope, T>,
56    address: Rc<[usize]>,    // path to the operator (ending with index).
57    shape: OperatorShape,
58    summary: Connectivity<<T as Timestamp>::Summary>,
59}
60
61impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
62
63    /// Allocates a new generic operator builder from its containing scope.
64    pub fn new(name: String, scope: Scope<'scope, T>) -> Self {
65
66        let slot = scope.reserve_operator();
67        let address = slot.addr();
68        let peers = scope.peers();
69
70        OperatorBuilder {
71            scope,
72            slot,
73            address,
74            shape: OperatorShape::new(name, peers),
75            summary: vec![],
76        }
77    }
78
79    /// The operator's scope-local index.
80    pub fn index(&self) -> usize { self.slot.index() }
81
82    /// The operator's worker-unique identifier.
83    pub fn global(&self) -> usize { self.slot.identifier() }
84
85    /// Return a reference to the operator's shape
86    pub fn shape(&self) -> &OperatorShape { &self.shape }
87
88    /// Sets frontier interest for a specific input.
89    pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) {
90        self.shape.notify[input] = notify;
91    }
92
93    /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
94    pub fn new_input<C: Container, P>(&mut self, stream: Stream<'scope, T, C>, pact: P) -> P::Puller
95    where
96        P: ParallelizationContract<T, C>
97    {
98        let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default())));
99        self.new_input_connection(stream, pact, connection)
100    }
101
102    /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
103    pub fn new_input_connection<C: Container, P, I>(&mut self, stream: Stream<'scope, T, C>, pact: P, connection: I) -> P::Puller
104    where
105        P: ParallelizationContract<T, C>,
106        I: IntoIterator<Item = (usize, Antichain<<T as Timestamp>::Summary>)>,
107    {
108        let channel_id = self.scope.worker().new_identifier();
109        let logging = self.scope.worker().logging();
110        let (sender, receiver) = pact.connect(self.scope.worker(), channel_id, Rc::clone(&self.address), logging);
111        let target = Target::new(self.slot.index(), self.shape.inputs);
112        stream.connect_to(target, sender, channel_id);
113
114        self.shape.inputs += 1;
115        self.shape.notify.push(FrontierInterest::Always);
116        let connectivity: PortConnectivity<_> = connection.into_iter().collect();
117        assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs));
118        self.summary.push(connectivity);
119
120        receiver
121    }
122
123    /// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
124    pub fn new_output<C: Container>(&mut self) -> (Tee<T, C>, Stream<'scope, T, C>) {
125        let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default())));
126        self.new_output_connection(connection)
127    }
128
129    /// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
130    pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (Tee<T, C>, Stream<'scope, T, C>)
131    where
132        I: IntoIterator<Item = (usize, Antichain<<T as Timestamp>::Summary>)>,
133    {
134        let new_output = self.shape.outputs;
135        self.shape.outputs += 1;
136        let (target, registrar) = Tee::new();
137        let source = Source::new(self.slot.index(), new_output);
138        let stream = Stream::new(source, registrar, self.scope);
139
140        for (input, entry) in connection {
141            self.summary[input].add_port(new_output, entry);
142        }
143
144        (target, stream)
145    }
146
147    /// Creates an operator implementation from supplied logic constructor.
148    ///
149    /// Boxes the closure to avoid per-closure monomorphization based on `L`.
150    /// For the fully generic (non-boxing) path, see [`build_typed`].
151    pub fn build<L>(self, logic: L)
152    where
153        L: FnMut(&mut SharedProgress<T>)->bool+'static
154    {
155        self.build_boxed(Box::new(logic));
156    }
157
158    /// Creates an operator implementation from pre-boxed logic.
159    ///
160    /// This method exists primarily to force the `Box<dyn ...>` coercion, which
161    /// can otherwise easily be `Box<L>` for specialized `L` instead.
162    pub fn build_boxed(self, logic: Box<dyn FnMut(&mut SharedProgress<T>)->bool>) {
163        self.build_typed(logic);
164    }
165
166    /// Like `build_reschedule`, but specialized to the closure type `L`.
167    ///
168    /// This method is instantiated once per distinct `L`, and one should be
169    /// mindful of monomorphization bloat. Callers with many distinct closures
170    /// should consider erasing their variation, for example via `Box<dyn ...>`,
171    /// as demonstrated in [`build`].
172    pub fn build_typed<L>(self, logic: L)
173    where
174        L: FnMut(&mut SharedProgress<T>)->bool+'static
175    {
176        let inputs = self.shape.inputs;
177        let outputs = self.shape.outputs;
178
179        let operator = OperatorCore {
180            shape: self.shape,
181            address: self.address,
182            activations: self.scope.activations(),
183            logic,
184            shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
185            summary: self.summary,
186        };
187
188        self.slot.install(Box::new(operator));
189    }
190
191    /// Information describing the operator.
192    pub fn operator_info(&self) -> OperatorInfo {
193        OperatorInfo::new(self.index(), self.global(), Rc::clone(&self.address))
194    }
195}
196
197struct OperatorCore<T, L>
198where
199    T: Timestamp,
200    L: FnMut(&mut SharedProgress<T>)->bool+'static,
201{
202    shape: OperatorShape,
203    address: Rc<[usize]>,
204    logic: L,
205    shared_progress: Rc<RefCell<SharedProgress<T>>>,
206    activations: Rc<RefCell<Activations>>,
207    summary: Connectivity<T::Summary>,
208}
209
210impl<T, L> Schedule for OperatorCore<T, L>
211where
212    T: Timestamp,
213    L: FnMut(&mut SharedProgress<T>)->bool+'static,
214{
215    fn name(&self) -> &str { &self.shape.name }
216    fn path(&self) -> &[usize] { &self.address[..] }
217    fn schedule(&mut self) -> bool {
218        let shared_progress = &mut *self.shared_progress.borrow_mut();
219        (self.logic)(shared_progress)
220    }
221}
222
223impl<T, L> Operate<T> for OperatorCore<T, L>
224where
225    T: Timestamp,
226    L: FnMut(&mut SharedProgress<T>)->bool+'static,
227{
228    fn inputs(&self) -> usize { self.shape.inputs }
229    fn outputs(&self) -> usize { self.shape.outputs }
230
231    // announce internal topology as fully connected, and hold all default capabilities.
232    fn initialize(self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
233
234        // Request the operator to be scheduled at least once.
235        self.activations.borrow_mut().activate(&self.address[..]);
236
237        // by default, we reserve a capability for each output port at `Default::default()`.
238        self.shared_progress
239            .borrow_mut()
240            .internals
241            .iter_mut()
242            .for_each(|output| output.update(T::minimum(), self.shape.peers as i64));
243
244        (self.summary.clone(), Rc::clone(&self.shared_progress), self)
245    }
246
247    fn notify_me(&self) -> &[FrontierInterest] { &self.shape.notify }
248}