timely/dataflow/operators/generic/
builder_raw.rs1use std::default::Default;
8use std::rc::Rc;
9use std::cell::RefCell;
10
11use crate::scheduling::{Schedule, Activations};
12
13use crate::progress::{Source, Target};
14use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
15use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity};
16use crate::Container;
17use crate::dataflow::{Stream, Scope};
18use crate::dataflow::channels::pushers::Tee;
19use crate::dataflow::channels::pact::ParallelizationContract;
20use crate::dataflow::operators::generic::operator_info::OperatorInfo;
21
22#[derive(Debug)]
24pub struct OperatorShape {
25 name: String, notify: Vec<FrontierInterest>, peers: usize, inputs: usize, outputs: usize, }
31
32impl OperatorShape {
34 fn new(name: String, peers: usize) -> Self {
35 OperatorShape {
36 name,
37 notify: Vec::new(),
38 peers,
39 inputs: 0,
40 outputs: 0,
41 }
42 }
43
44 pub fn inputs(&self) -> usize { self.inputs }
46
47 pub fn outputs(&self) -> usize { self.outputs }
49}
50
51#[derive(Debug)]
53pub struct OperatorBuilder<G: Scope> {
54 scope: G,
55 index: usize,
56 global: usize,
57 address: Rc<[usize]>, shape: OperatorShape,
59 summary: Connectivity<<G::Timestamp as Timestamp>::Summary>,
60}
61
62impl<G: Scope> OperatorBuilder<G> {
63
64 pub fn new(name: String, mut scope: G) -> Self {
66
67 let global = scope.new_identifier();
68 let index = scope.allocate_operator_index();
69 let address = scope.addr_for_child(index);
70 let peers = scope.peers();
71
72 OperatorBuilder {
73 scope,
74 index,
75 global,
76 address,
77 shape: OperatorShape::new(name, peers),
78 summary: vec![],
79 }
80 }
81
82 pub fn index(&self) -> usize { self.index }
84
85 pub fn global(&self) -> usize { self.global }
87
88 pub fn shape(&self) -> &OperatorShape { &self.shape }
90
91 pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) {
93 self.shape.notify[input] = notify;
94 }
95
96 pub fn new_input<C: Container, P>(&mut self, stream: Stream<G, C>, pact: P) -> P::Puller
98 where
99 P: ParallelizationContract<G::Timestamp, C>
100 {
101 let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default())));
102 self.new_input_connection(stream, pact, connection)
103 }
104
105 pub fn new_input_connection<C: Container, P, I>(&mut self, stream: Stream<G, C>, pact: P, connection: I) -> P::Puller
107 where
108 P: ParallelizationContract<G::Timestamp, C>,
109 I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)>,
110 {
111 let channel_id = self.scope.new_identifier();
112 let logging = self.scope.logging();
113 let (sender, receiver) = pact.connect(&mut self.scope, channel_id, Rc::clone(&self.address), logging);
114 let target = Target::new(self.index, self.shape.inputs);
115 stream.connect_to(target, sender, channel_id);
116
117 self.shape.inputs += 1;
118 self.shape.notify.push(FrontierInterest::Always);
119 let connectivity: PortConnectivity<_> = connection.into_iter().collect();
120 assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs));
121 self.summary.push(connectivity);
122
123 receiver
124 }
125
126 pub fn new_output<C: Container>(&mut self) -> (Tee<G::Timestamp, C>, Stream<G, C>) {
128 let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default())));
129 self.new_output_connection(connection)
130 }
131
132 pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (Tee<G::Timestamp, C>, Stream<G, C>)
134 where
135 I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)>,
136 {
137 let new_output = self.shape.outputs;
138 self.shape.outputs += 1;
139 let (target, registrar) = Tee::new();
140 let source = Source::new(self.index, new_output);
141 let stream = Stream::new(source, registrar, self.scope.clone());
142
143 for (input, entry) in connection {
144 self.summary[input].add_port(new_output, entry);
145 }
146
147 (target, stream)
148 }
149
150 pub fn build<L>(mut self, logic: L)
152 where
153 L: FnMut(&mut SharedProgress<G::Timestamp>)->bool+'static
154 {
155 let inputs = self.shape.inputs;
156 let outputs = self.shape.outputs;
157
158 let operator = OperatorCore {
159 shape: self.shape,
160 address: self.address,
161 activations: self.scope.activations(),
162 logic,
163 shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
164 summary: self.summary,
165 };
166
167 self.scope.add_operator_with_indices(Box::new(operator), self.index, self.global);
168 }
169
170 pub fn operator_info(&self) -> OperatorInfo {
172 OperatorInfo::new(self.index, self.global, Rc::clone(&self.address))
173 }
174}
175
176struct OperatorCore<T, L>
177where
178 T: Timestamp,
179 L: FnMut(&mut SharedProgress<T>)->bool+'static,
180{
181 shape: OperatorShape,
182 address: Rc<[usize]>,
183 logic: L,
184 shared_progress: Rc<RefCell<SharedProgress<T>>>,
185 activations: Rc<RefCell<Activations>>,
186 summary: Connectivity<T::Summary>,
187}
188
189impl<T, L> Schedule for OperatorCore<T, L>
190where
191 T: Timestamp,
192 L: FnMut(&mut SharedProgress<T>)->bool+'static,
193{
194 fn name(&self) -> &str { &self.shape.name }
195 fn path(&self) -> &[usize] { &self.address[..] }
196 fn schedule(&mut self) -> bool {
197 let shared_progress = &mut *self.shared_progress.borrow_mut();
198 (self.logic)(shared_progress)
199 }
200}
201
202impl<T, L> Operate<T> for OperatorCore<T, L>
203where
204 T: Timestamp,
205 L: FnMut(&mut SharedProgress<T>)->bool+'static,
206{
207 fn inputs(&self) -> usize { self.shape.inputs }
208 fn outputs(&self) -> usize { self.shape.outputs }
209
210 fn initialize(self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
212
213 self.activations.borrow_mut().activate(&self.address[..]);
215
216 self.shared_progress
218 .borrow_mut()
219 .internals
220 .iter_mut()
221 .for_each(|output| output.update(T::minimum(), self.shape.peers as i64));
222
223 (self.summary.clone(), Rc::clone(&self.shared_progress), self)
224 }
225
226 fn notify_me(&self) -> &[FrontierInterest] { &self.shape.notify }
227}