timely/dataflow/operators/generic/
builder_raw.rs1use std::default::Default;
8use std::rc::Rc;
9use std::cell::RefCell;
10
11use crate::scheduling::{Schedule, Activations};
12
13use crate::progress::{Source, Target};
14use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
15use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity};
16use crate::Container;
17use crate::dataflow::{Stream, Scope, OperatorSlot};
18use crate::dataflow::channels::pushers::Tee;
19use crate::dataflow::channels::pact::ParallelizationContract;
20use crate::dataflow::operators::generic::operator_info::OperatorInfo;
21
22#[derive(Debug)]
24pub struct OperatorShape {
25 name: String, notify: Vec<FrontierInterest>, peers: usize, inputs: usize, outputs: usize, }
31
32impl OperatorShape {
34 fn new(name: String, peers: usize) -> Self {
35 OperatorShape {
36 name,
37 notify: Vec::new(),
38 peers,
39 inputs: 0,
40 outputs: 0,
41 }
42 }
43
44 pub fn inputs(&self) -> usize { self.inputs }
46
47 pub fn outputs(&self) -> usize { self.outputs }
49}
50
51#[derive(Debug)]
53pub struct OperatorBuilder<'scope, T: Timestamp> {
54 scope: Scope<'scope, T>,
55 slot: OperatorSlot<'scope, T>,
56 address: Rc<[usize]>, shape: OperatorShape,
58 summary: Connectivity<<T as Timestamp>::Summary>,
59}
60
61impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
62
63 pub fn new(name: String, scope: Scope<'scope, T>) -> Self {
65
66 let slot = scope.reserve_operator();
67 let address = slot.addr();
68 let peers = scope.peers();
69
70 OperatorBuilder {
71 scope,
72 slot,
73 address,
74 shape: OperatorShape::new(name, peers),
75 summary: vec![],
76 }
77 }
78
79 pub fn index(&self) -> usize { self.slot.index() }
81
82 pub fn global(&self) -> usize { self.slot.identifier() }
84
85 pub fn shape(&self) -> &OperatorShape { &self.shape }
87
88 pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) {
90 self.shape.notify[input] = notify;
91 }
92
93 pub fn new_input<C: Container, P>(&mut self, stream: Stream<'scope, T, C>, pact: P) -> P::Puller
95 where
96 P: ParallelizationContract<T, C>
97 {
98 let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default())));
99 self.new_input_connection(stream, pact, connection)
100 }
101
102 pub fn new_input_connection<C: Container, P, I>(&mut self, stream: Stream<'scope, T, C>, pact: P, connection: I) -> P::Puller
104 where
105 P: ParallelizationContract<T, C>,
106 I: IntoIterator<Item = (usize, Antichain<<T as Timestamp>::Summary>)>,
107 {
108 let channel_id = self.scope.worker().new_identifier();
109 let logging = self.scope.worker().logging();
110 let (sender, receiver) = pact.connect(self.scope.worker(), channel_id, Rc::clone(&self.address), logging);
111 let target = Target::new(self.slot.index(), self.shape.inputs);
112 stream.connect_to(target, sender, channel_id);
113
114 self.shape.inputs += 1;
115 self.shape.notify.push(FrontierInterest::Always);
116 let connectivity: PortConnectivity<_> = connection.into_iter().collect();
117 assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs));
118 self.summary.push(connectivity);
119
120 receiver
121 }
122
123 pub fn new_output<C: Container>(&mut self) -> (Tee<T, C>, Stream<'scope, T, C>) {
125 let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default())));
126 self.new_output_connection(connection)
127 }
128
129 pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (Tee<T, C>, Stream<'scope, T, C>)
131 where
132 I: IntoIterator<Item = (usize, Antichain<<T as Timestamp>::Summary>)>,
133 {
134 let new_output = self.shape.outputs;
135 self.shape.outputs += 1;
136 let (target, registrar) = Tee::new();
137 let source = Source::new(self.slot.index(), new_output);
138 let stream = Stream::new(source, registrar, self.scope);
139
140 for (input, entry) in connection {
141 self.summary[input].add_port(new_output, entry);
142 }
143
144 (target, stream)
145 }
146
147 pub fn build<L>(self, logic: L)
152 where
153 L: FnMut(&mut SharedProgress<T>)->bool+'static
154 {
155 self.build_boxed(Box::new(logic));
156 }
157
158 pub fn build_boxed(self, logic: Box<dyn FnMut(&mut SharedProgress<T>)->bool>) {
163 self.build_typed(logic);
164 }
165
166 pub fn build_typed<L>(self, logic: L)
173 where
174 L: FnMut(&mut SharedProgress<T>)->bool+'static
175 {
176 let inputs = self.shape.inputs;
177 let outputs = self.shape.outputs;
178
179 let operator = OperatorCore {
180 shape: self.shape,
181 address: self.address,
182 activations: self.scope.activations(),
183 logic,
184 shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
185 summary: self.summary,
186 };
187
188 self.slot.install(Box::new(operator));
189 }
190
191 pub fn operator_info(&self) -> OperatorInfo {
193 OperatorInfo::new(self.index(), self.global(), Rc::clone(&self.address))
194 }
195}
196
197struct OperatorCore<T, L>
198where
199 T: Timestamp,
200 L: FnMut(&mut SharedProgress<T>)->bool+'static,
201{
202 shape: OperatorShape,
203 address: Rc<[usize]>,
204 logic: L,
205 shared_progress: Rc<RefCell<SharedProgress<T>>>,
206 activations: Rc<RefCell<Activations>>,
207 summary: Connectivity<T::Summary>,
208}
209
210impl<T, L> Schedule for OperatorCore<T, L>
211where
212 T: Timestamp,
213 L: FnMut(&mut SharedProgress<T>)->bool+'static,
214{
215 fn name(&self) -> &str { &self.shape.name }
216 fn path(&self) -> &[usize] { &self.address[..] }
217 fn schedule(&mut self) -> bool {
218 let shared_progress = &mut *self.shared_progress.borrow_mut();
219 (self.logic)(shared_progress)
220 }
221}
222
223impl<T, L> Operate<T> for OperatorCore<T, L>
224where
225 T: Timestamp,
226 L: FnMut(&mut SharedProgress<T>)->bool+'static,
227{
228 fn inputs(&self) -> usize { self.shape.inputs }
229 fn outputs(&self) -> usize { self.shape.outputs }
230
231 fn initialize(self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
233
234 self.activations.borrow_mut().activate(&self.address[..]);
236
237 self.shared_progress
239 .borrow_mut()
240 .internals
241 .iter_mut()
242 .for_each(|output| output.update(T::minimum(), self.shape.peers as i64));
243
244 (self.summary.clone(), Rc::clone(&self.shared_progress), self)
245 }
246
247 fn notify_me(&self) -> &[FrontierInterest] { &self.shape.notify }
248}