timely/dataflow/operators/generic/
builder_rc.rs1use std::rc::Rc;
4use std::cell::RefCell;
5use std::default::Default;
6
7use crate::progress::{ChangeBatch, Timestamp};
8use crate::progress::operate::SharedProgress;
9use crate::progress::frontier::{Antichain, MutableAntichain};
10
11use crate::Container;
12use crate::dataflow::{Scope, StreamCore};
13use crate::dataflow::channels::pushers::Counter as PushCounter;
14use crate::dataflow::channels::pushers;
15use crate::dataflow::channels::pact::ParallelizationContract;
16use crate::dataflow::channels::pullers::Counter as PullCounter;
17use crate::dataflow::operators::capability::Capability;
18use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle};
19use crate::dataflow::operators::generic::operator_info::OperatorInfo;
20use crate::dataflow::operators::generic::builder_raw::OperatorShape;
21use crate::progress::operate::PortConnectivity;
22
23use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
24
25#[derive(Debug)]
27pub struct OperatorBuilder<G: Scope> {
28 builder: OperatorBuilderRaw<G>,
29 frontier: Vec<MutableAntichain<G::Timestamp>>,
30 consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
31 internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
32 summaries: Vec<Rc<RefCell<PortConnectivity<<G::Timestamp as Timestamp>::Summary>>>>,
34 produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
35}
36
37impl<G: Scope> OperatorBuilder<G> {
38
39 pub fn new(name: String, scope: G) -> Self {
41 OperatorBuilder {
42 builder: OperatorBuilderRaw::new(name, scope),
43 frontier: Vec::new(),
44 consumed: Vec::new(),
45 internal: Rc::new(RefCell::new(Vec::new())),
46 summaries: Vec::new(),
47 produced: Vec::new(),
48 }
49 }
50
51 pub fn set_notify(&mut self, notify: bool) {
53 self.builder.set_notify(notify);
54 }
55
56 pub fn new_input<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P) -> InputHandleCore<G::Timestamp, C, P::Puller>
58 where
59 P: ParallelizationContract<G::Timestamp, C> {
60
61 let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default())));
62 self.new_input_connection(stream, pact, connection)
63 }
64
65 pub fn new_input_connection<C: Container, P, I>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: I) -> InputHandleCore<G::Timestamp, C, P::Puller>
74 where
75 P: ParallelizationContract<G::Timestamp, C>,
76 I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
77 {
78 let puller = self.builder.new_input_connection(stream, pact, connection.clone());
79
80 let input = PullCounter::new(puller);
81 self.frontier.push(MutableAntichain::new());
82 self.consumed.push(Rc::clone(input.consumed()));
83
84 let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
85 self.summaries.push(Rc::clone(&shared_summary));
86
87 new_input_handle(input, Rc::clone(&self.internal), shared_summary)
88 }
89
90 pub fn new_output<C: Container>(&mut self) -> (pushers::Output<G::Timestamp, C>, StreamCore<G, C>) {
92 let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default())));
93 self.new_output_connection(connection)
94 }
95
96 pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (
105 pushers::Output<G::Timestamp, C>,
106 StreamCore<G, C>,
107 )
108 where
109 I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
110 {
111 let new_output = self.shape().outputs();
112 let (tee, stream) = self.builder.new_output_connection(connection.clone());
113
114 let internal = Rc::new(RefCell::new(ChangeBatch::new()));
115 self.internal.borrow_mut().push(Rc::clone(&internal));
116
117 let counter = PushCounter::new(tee);
118 self.produced.push(Rc::clone(counter.produced()));
119
120 for (input, entry) in connection {
121 self.summaries[input].borrow_mut().add_port(new_output, entry);
122 }
123
124 (pushers::Output::new(counter, internal, new_output), stream)
125 }
126
127 pub fn build<B, L>(self, constructor: B)
129 where
130 B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
131 L: FnMut(&[MutableAntichain<G::Timestamp>])+'static
132 {
133 self.build_reschedule(|caps| {
134 let mut logic = constructor(caps);
135 move |frontier| { logic(frontier); false }
136 })
137 }
138
139 pub fn build_reschedule<B, L>(self, constructor: B)
146 where
147 B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
148 L: FnMut(&[MutableAntichain<G::Timestamp>])->bool+'static
149 {
150 let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
152 for batch in self.internal.borrow().iter() {
153 capabilities.push(Capability::new(G::Timestamp::minimum(), Rc::clone(batch)));
154 batch.borrow_mut().clear();
156 }
157
158 let mut logic = constructor(capabilities);
159
160 let mut self_frontier = self.frontier;
161 let self_consumed = self.consumed;
162 let self_internal = self.internal;
163 let self_produced = self.produced;
164
165 let raw_logic =
166 move |progress: &mut SharedProgress<G::Timestamp>| {
167
168 for (progress, frontier) in progress.frontiers.iter_mut().zip(self_frontier.iter_mut()) {
170 frontier.update_iter(progress.drain());
171 }
172
173 let result = logic(&self_frontier[..]);
175
176 for (progress, consumed) in progress.consumeds.iter_mut().zip(self_consumed.iter()) {
178 consumed.borrow_mut().drain_into(progress);
179 }
180
181 let self_internal_borrow = self_internal.borrow_mut();
183 for index in 0 .. self_internal_borrow.len() {
184 let mut borrow = self_internal_borrow[index].borrow_mut();
185 progress.internals[index].extend(borrow.drain());
186 }
187
188 for (progress, produced) in progress.produceds.iter_mut().zip(self_produced.iter()) {
190 produced.borrow_mut().drain_into(progress);
191 }
192
193 result
194 };
195
196 self.builder.build(raw_logic);
197 }
198
199 pub fn index(&self) -> usize {
201 self.builder.index()
202 }
203
204 pub fn global(&self) -> usize {
206 self.builder.global()
207 }
208
209 pub fn shape(&self) -> &OperatorShape {
211 self.builder.shape()
212 }
213
214 pub fn operator_info(&self) -> OperatorInfo {
216 self.builder.operator_info()
217 }
218}
219
220
221#[cfg(test)]
222mod tests {
223 use crate::dataflow::operators::generic::OutputBuilder;
224
225 #[test]
226 #[should_panic]
227 fn incorrect_capabilities() {
228
229 use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
233
234 crate::example(|scope| {
235
236 let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
237
238 let (output1, _stream1) = builder.new_output::<Vec<()>>();
239 let (output2, _stream2) = builder.new_output::<Vec<()>>();
240 let mut output1 = OutputBuilder::from(output1);
241 let mut output2 = OutputBuilder::from(output2);
242
243 builder.build(move |capabilities| {
244 move |_frontiers| {
245
246 let mut output_handle1 = output1.activate();
247 let mut output_handle2 = output2.activate();
248
249 output_handle2.session(&capabilities[0]);
251 output_handle1.session(&capabilities[1]);
252 }
253 });
254 })
255 }
256
257 #[test]
258 fn correct_capabilities() {
259
260 use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
264
265 crate::example(|scope| {
266
267 let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
268
269 let (output1, _stream1) = builder.new_output::<Vec<()>>();
270 let (output2, _stream2) = builder.new_output::<Vec<()>>();
271 let mut output1 = OutputBuilder::from(output1);
272 let mut output2 = OutputBuilder::from(output2);
273
274 builder.build(move |mut capabilities| {
275 move |_frontiers| {
276
277 let mut output_handle1 = output1.activate();
278 let mut output_handle2 = output2.activate();
279
280 if !capabilities.is_empty() {
282
283 output_handle1.session(&capabilities[0]);
285 output_handle2.session(&capabilities[1]);
286
287 capabilities.clear();
288 }
289 }
290 });
291
292 "Hello".to_owned()
293 });
294 }
295}