Skip to main content

timely/dataflow/operators/generic/
builder_raw.rs

1//! Types to build operators with general shapes.
2//!
3//! These types expose some raw timely interfaces, and while public so that others can build on them,
4//! they require some sophistication to use correctly. I recommend checking out `builder_rc.rs` for
5//! an interface that is intentionally harder to mis-use.
6
7use std::default::Default;
8use std::rc::Rc;
9use std::cell::RefCell;
10
11use crate::scheduling::{Schedule, Activations};
12
13use crate::progress::{Source, Target};
14use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
15use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity};
16use crate::Container;
17use crate::dataflow::{Stream, Scope};
18use crate::dataflow::channels::pushers::Tee;
19use crate::dataflow::channels::pact::ParallelizationContract;
20use crate::dataflow::operators::generic::operator_info::OperatorInfo;
21
22/// Contains type-free information about the operator properties.
23#[derive(Debug)]
24pub struct OperatorShape {
25    name: String,   // A meaningful name for the operator.
26    notify: Vec<FrontierInterest>,   // Per-input frontier interest.
27    peers: usize,   // The total number of workers in the computation. Needed to initialize pointstamp counts with the correct magnitude.
28    inputs: usize,  // The number of input ports.
29    outputs: usize, // The number of output ports.
30}
31
32/// Core data for the structure of an operator, minus scope and logic.
33impl OperatorShape {
34    fn new(name: String, peers: usize) -> Self {
35        OperatorShape {
36            name,
37            notify: Vec::new(),
38            peers,
39            inputs: 0,
40            outputs: 0,
41        }
42    }
43
44    /// The number of inputs of this operator
45    pub fn inputs(&self) -> usize { self.inputs }
46
47    /// The number of outputs of this operator
48    pub fn outputs(&self) -> usize { self.outputs }
49}
50
51/// Builds operators with generic shape.
52#[derive(Debug)]
53pub struct OperatorBuilder<G: Scope> {
54    scope: G,
55    index: usize,
56    global: usize,
57    address: Rc<[usize]>,    // path to the operator (ending with index).
58    shape: OperatorShape,
59    summary: Connectivity<<G::Timestamp as Timestamp>::Summary>,
60}
61
62impl<G: Scope> OperatorBuilder<G> {
63
64    /// Allocates a new generic operator builder from its containing scope.
65    pub fn new(name: String, mut scope: G) -> Self {
66
67        let global = scope.new_identifier();
68        let index = scope.allocate_operator_index();
69        let address = scope.addr_for_child(index);
70        let peers = scope.peers();
71
72        OperatorBuilder {
73            scope,
74            index,
75            global,
76            address,
77            shape: OperatorShape::new(name, peers),
78            summary: vec![],
79        }
80    }
81
82    /// The operator's scope-local index.
83    pub fn index(&self) -> usize { self.index }
84
85    /// The operator's worker-unique identifier.
86    pub fn global(&self) -> usize { self.global }
87
88    /// Return a reference to the operator's shape
89    pub fn shape(&self) -> &OperatorShape { &self.shape }
90
91    /// Sets frontier interest for a specific input.
92    pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) {
93        self.shape.notify[input] = notify;
94    }
95
96    /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
97    pub fn new_input<C: Container, P>(&mut self, stream: Stream<G, C>, pact: P) -> P::Puller
98    where
99        P: ParallelizationContract<G::Timestamp, C>
100    {
101        let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default())));
102        self.new_input_connection(stream, pact, connection)
103    }
104
105    /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
106    pub fn new_input_connection<C: Container, P, I>(&mut self, stream: Stream<G, C>, pact: P, connection: I) -> P::Puller
107    where
108        P: ParallelizationContract<G::Timestamp, C>,
109        I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)>,
110    {
111        let channel_id = self.scope.new_identifier();
112        let logging = self.scope.logging();
113        let (sender, receiver) = pact.connect(&mut self.scope, channel_id, Rc::clone(&self.address), logging);
114        let target = Target::new(self.index, self.shape.inputs);
115        stream.connect_to(target, sender, channel_id);
116
117        self.shape.inputs += 1;
118        self.shape.notify.push(FrontierInterest::Always);
119        let connectivity: PortConnectivity<_> = connection.into_iter().collect();
120        assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs));
121        self.summary.push(connectivity);
122
123        receiver
124    }
125
126    /// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
127    pub fn new_output<C: Container>(&mut self) -> (Tee<G::Timestamp, C>, Stream<G, C>) {
128        let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default())));
129        self.new_output_connection(connection)
130    }
131
132    /// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
133    pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (Tee<G::Timestamp, C>, Stream<G, C>)
134    where
135        I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)>,
136    {
137        let new_output = self.shape.outputs;
138        self.shape.outputs += 1;
139        let (target, registrar) = Tee::new();
140        let source = Source::new(self.index, new_output);
141        let stream = Stream::new(source, registrar, self.scope.clone());
142
143        for (input, entry) in connection {
144            self.summary[input].add_port(new_output, entry);
145        }
146
147        (target, stream)
148    }
149
150    /// Creates an operator implementation from supplied logic constructor.
151    pub fn build<L>(mut self, logic: L)
152    where
153        L: FnMut(&mut SharedProgress<G::Timestamp>)->bool+'static
154    {
155        let inputs = self.shape.inputs;
156        let outputs = self.shape.outputs;
157
158        let operator = OperatorCore {
159            shape: self.shape,
160            address: self.address,
161            activations: self.scope.activations(),
162            logic,
163            shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
164            summary: self.summary,
165        };
166
167        self.scope.add_operator_with_indices(Box::new(operator), self.index, self.global);
168    }
169
170    /// Information describing the operator.
171    pub fn operator_info(&self) -> OperatorInfo {
172        OperatorInfo::new(self.index, self.global, Rc::clone(&self.address))
173    }
174}
175
176struct OperatorCore<T, L>
177where
178    T: Timestamp,
179    L: FnMut(&mut SharedProgress<T>)->bool+'static,
180{
181    shape: OperatorShape,
182    address: Rc<[usize]>,
183    logic: L,
184    shared_progress: Rc<RefCell<SharedProgress<T>>>,
185    activations: Rc<RefCell<Activations>>,
186    summary: Connectivity<T::Summary>,
187}
188
189impl<T, L> Schedule for OperatorCore<T, L>
190where
191    T: Timestamp,
192    L: FnMut(&mut SharedProgress<T>)->bool+'static,
193{
194    fn name(&self) -> &str { &self.shape.name }
195    fn path(&self) -> &[usize] { &self.address[..] }
196    fn schedule(&mut self) -> bool {
197        let shared_progress = &mut *self.shared_progress.borrow_mut();
198        (self.logic)(shared_progress)
199    }
200}
201
202impl<T, L> Operate<T> for OperatorCore<T, L>
203where
204    T: Timestamp,
205    L: FnMut(&mut SharedProgress<T>)->bool+'static,
206{
207    fn inputs(&self) -> usize { self.shape.inputs }
208    fn outputs(&self) -> usize { self.shape.outputs }
209
210    // announce internal topology as fully connected, and hold all default capabilities.
211    fn initialize(self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
212
213        // Request the operator to be scheduled at least once.
214        self.activations.borrow_mut().activate(&self.address[..]);
215
216        // by default, we reserve a capability for each output port at `Default::default()`.
217        self.shared_progress
218            .borrow_mut()
219            .internals
220            .iter_mut()
221            .for_each(|output| output.update(T::minimum(), self.shape.peers as i64));
222
223        (self.summary.clone(), Rc::clone(&self.shared_progress), self)
224    }
225
226    fn notify_me(&self) -> &[FrontierInterest] { &self.shape.notify }
227}