timely/dataflow/operators/generic/
builder_raw.rs1use std::default::Default;
8use std::rc::Rc;
9use std::cell::RefCell;
10
11use crate::scheduling::{Schedule, Activations};
12
13use crate::progress::{Source, Target};
14use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
15use crate::progress::operate::{Connectivity, PortConnectivity};
16use crate::Container;
17use crate::dataflow::{StreamCore, Scope};
18use crate::dataflow::channels::pushers::Tee;
19use crate::dataflow::channels::pact::ParallelizationContract;
20use crate::dataflow::operators::generic::operator_info::OperatorInfo;
21
22#[derive(Debug)]
24pub struct OperatorShape {
25    name: String,   notify: bool,   peers: usize,   inputs: usize,  outputs: usize, }
31
32impl OperatorShape {
34    fn new(name: String, peers: usize) -> Self {
35        OperatorShape {
36            name,
37            notify: true,
38            peers,
39            inputs: 0,
40            outputs: 0,
41        }
42    }
43
44    pub fn inputs(&self) -> usize {
46        self.inputs
47    }
48
49    pub fn outputs(&self) -> usize {
51        self.outputs
52    }
53}
54
55#[derive(Debug)]
57pub struct OperatorBuilder<G: Scope> {
58    scope: G,
59    index: usize,
60    global: usize,
61    address: Rc<[usize]>,    shape: OperatorShape,
63    summary: Connectivity<<G::Timestamp as Timestamp>::Summary>,
64}
65
66impl<G: Scope> OperatorBuilder<G> {
67
68    pub fn new(name: String, mut scope: G) -> Self {
70
71        let global = scope.new_identifier();
72        let index = scope.allocate_operator_index();
73        let address = scope.addr_for_child(index);
74        let peers = scope.peers();
75
76        OperatorBuilder {
77            scope,
78            index,
79            global,
80            address,
81            shape: OperatorShape::new(name, peers),
82            summary: vec![],
83        }
84    }
85
86    pub fn index(&self) -> usize {
88        self.index
89    }
90
91    pub fn global(&self) -> usize {
93        self.global
94    }
95
96    pub fn shape(&self) -> &OperatorShape {
98        &self.shape
99    }
100
101    pub fn set_notify(&mut self, notify: bool) {
103        self.shape.notify = notify;
104    }
105
106    pub fn new_input<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P) -> P::Puller
108    where
109        P: ParallelizationContract<G::Timestamp, C>
110    {
111        let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default())));
112        self.new_input_connection(stream, pact, connection)
113    }
114
115    pub fn new_input_connection<C: Container, P, I>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: I) -> P::Puller
117    where
118        P: ParallelizationContract<G::Timestamp, C>,
119        I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)>,
120    {
121        let channel_id = self.scope.new_identifier();
122        let logging = self.scope.logging();
123        let (sender, receiver) = pact.connect(&mut self.scope, channel_id, Rc::clone(&self.address), logging);
124        let target = Target::new(self.index, self.shape.inputs);
125        stream.connect_to(target, sender, channel_id);
126
127        self.shape.inputs += 1;
128        let connectivity: PortConnectivity<_> = connection.into_iter().collect();
129        assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs));
130        self.summary.push(connectivity);
131
132        receiver
133    }
134
135    pub fn new_output<C: Container>(&mut self) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {
137
138        let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default())));
139        self.new_output_connection(connection)
140    }
141
142    pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (Tee<G::Timestamp, C>, StreamCore<G, C>)
144    where
145        I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)>,
146    {
147        let new_output = self.shape.outputs;
148        self.shape.outputs += 1;
149        let (targets, registrar) = Tee::<G::Timestamp,C>::new();
150        let source = Source::new(self.index, new_output);
151        let stream = StreamCore::new(source, registrar, self.scope.clone());
152
153        for (input, entry) in connection {
154            self.summary[input].add_port(new_output, entry);
155        }
156
157        (targets, stream)
158    }
159
160    pub fn build<L>(mut self, logic: L)
162    where
163        L: FnMut(&mut SharedProgress<G::Timestamp>)->bool+'static
164    {
165        let inputs = self.shape.inputs;
166        let outputs = self.shape.outputs;
167
168        let operator = OperatorCore {
169            shape: self.shape,
170            address: self.address,
171            activations: self.scope.activations(),
172            logic,
173            shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
174            summary: self.summary,
175        };
176
177        self.scope.add_operator_with_indices(Box::new(operator), self.index, self.global);
178    }
179
180    pub fn operator_info(&self) -> OperatorInfo {
182        OperatorInfo::new(self.index, self.global, Rc::clone(&self.address))
183    }
184}
185
186struct OperatorCore<T, L>
187where
188    T: Timestamp,
189    L: FnMut(&mut SharedProgress<T>)->bool+'static,
190{
191    shape: OperatorShape,
192    address: Rc<[usize]>,
193    logic: L,
194    shared_progress: Rc<RefCell<SharedProgress<T>>>,
195    activations: Rc<RefCell<Activations>>,
196    summary: Connectivity<T::Summary>,
197}
198
199impl<T, L> Schedule for OperatorCore<T, L>
200where
201    T: Timestamp,
202    L: FnMut(&mut SharedProgress<T>)->bool+'static,
203{
204    fn name(&self) -> &str { &self.shape.name }
205    fn path(&self) -> &[usize] { &self.address[..] }
206    fn schedule(&mut self) -> bool {
207        let shared_progress = &mut *self.shared_progress.borrow_mut();
208        (self.logic)(shared_progress)
209    }
210}
211
212impl<T, L> Operate<T> for OperatorCore<T, L>
213where
214    T: Timestamp,
215    L: FnMut(&mut SharedProgress<T>)->bool+'static,
216{
217    fn inputs(&self) -> usize { self.shape.inputs }
218    fn outputs(&self) -> usize { self.shape.outputs }
219
220    fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>) {
222
223        self.activations.borrow_mut().activate(&self.address[..]);
225
226        self.shared_progress
228            .borrow_mut()
229            .internals
230            .iter_mut()
231            .for_each(|output| output.update(T::minimum(), self.shape.peers as i64));
232
233        (self.summary.clone(), Rc::clone(&self.shared_progress))
234    }
235
236    fn set_external_summary(&mut self) {
238        self.schedule();
240    }
241
242    fn notify_me(&self) -> bool { self.shape.notify }
243}