timely/dataflow/operators/generic/
builder_rc.rs1use std::rc::Rc;
4use std::cell::RefCell;
5use std::default::Default;
6
7use crate::progress::{ChangeBatch, Timestamp};
8use crate::progress::operate::SharedProgress;
9use crate::progress::frontier::{Antichain, MutableAntichain};
10
11use crate::Container;
12use crate::container::ContainerBuilder;
13use crate::dataflow::{Scope, StreamCore};
14use crate::dataflow::channels::pushers::Tee;
15use crate::dataflow::channels::pushers::Counter as PushCounter;
16use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer;
17use crate::dataflow::channels::pact::ParallelizationContract;
18use crate::dataflow::channels::pullers::Counter as PullCounter;
19use crate::dataflow::operators::capability::Capability;
20use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper};
21use crate::dataflow::operators::generic::operator_info::OperatorInfo;
22use crate::dataflow::operators::generic::builder_raw::OperatorShape;
23use crate::progress::operate::PortConnectivity;
24
25use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
26
27#[derive(Debug)]
29pub struct OperatorBuilder<G: Scope> {
30 builder: OperatorBuilderRaw<G>,
31 frontier: Vec<MutableAntichain<G::Timestamp>>,
32 consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
33 internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
34 summaries: Vec<Rc<RefCell<PortConnectivity<<G::Timestamp as Timestamp>::Summary>>>>,
36 produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
37}
38
39impl<G: Scope> OperatorBuilder<G> {
40
41 pub fn new(name: String, scope: G) -> Self {
43 OperatorBuilder {
44 builder: OperatorBuilderRaw::new(name, scope),
45 frontier: Vec::new(),
46 consumed: Vec::new(),
47 internal: Rc::new(RefCell::new(Vec::new())),
48 summaries: Vec::new(),
49 produced: Vec::new(),
50 }
51 }
52
53 pub fn set_notify(&mut self, notify: bool) {
55 self.builder.set_notify(notify);
56 }
57
58 pub fn new_input<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P) -> InputHandleCore<G::Timestamp, C, P::Puller>
60 where
61 P: ParallelizationContract<G::Timestamp, C> {
62
63 let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default())));
64 self.new_input_connection(stream, pact, connection)
65 }
66
67 pub fn new_input_connection<C: Container, P, I>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: I) -> InputHandleCore<G::Timestamp, C, P::Puller>
76 where
77 P: ParallelizationContract<G::Timestamp, C>,
78 I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
79 {
80 let puller = self.builder.new_input_connection(stream, pact, connection.clone());
81
82 let input = PullCounter::new(puller);
83 self.frontier.push(MutableAntichain::new());
84 self.consumed.push(Rc::clone(input.consumed()));
85
86 let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
87 self.summaries.push(Rc::clone(&shared_summary));
88
89 new_input_handle(input, Rc::clone(&self.internal), shared_summary)
90 }
91
92 pub fn new_output<CB: ContainerBuilder>(&mut self) -> (OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>) {
94 let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default())));
95 self.new_output_connection(connection)
96 }
97
98 pub fn new_output_connection<CB: ContainerBuilder, I>(&mut self, connection: I) -> (
107 OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
108 StreamCore<G, CB::Container>
109 )
110 where
111 I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
112 {
113 let new_output = self.shape().outputs();
114 let (tee, stream) = self.builder.new_output_connection(connection.clone());
115
116 let internal = Rc::new(RefCell::new(ChangeBatch::new()));
117 self.internal.borrow_mut().push(Rc::clone(&internal));
118
119 let mut buffer = PushBuffer::new(PushCounter::new(tee));
120 self.produced.push(Rc::clone(buffer.inner().produced()));
121
122 for (input, entry) in connection {
123 self.summaries[input].borrow_mut().add_port(new_output, entry);
124 }
125
126 (OutputWrapper::new(buffer, internal, new_output), stream)
127 }
128
129 pub fn build<B, L>(self, constructor: B)
131 where
132 B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
133 L: FnMut(&[MutableAntichain<G::Timestamp>])+'static
134 {
135 self.build_reschedule(|caps| {
136 let mut logic = constructor(caps);
137 move |frontier| { logic(frontier); false }
138 })
139 }
140
141 pub fn build_reschedule<B, L>(self, constructor: B)
148 where
149 B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
150 L: FnMut(&[MutableAntichain<G::Timestamp>])->bool+'static
151 {
152 let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
154 for batch in self.internal.borrow().iter() {
155 capabilities.push(Capability::new(G::Timestamp::minimum(), Rc::clone(batch)));
156 batch.borrow_mut().clear();
158 }
159
160 let mut logic = constructor(capabilities);
161
162 let mut self_frontier = self.frontier;
163 let self_consumed = self.consumed;
164 let self_internal = self.internal;
165 let self_produced = self.produced;
166
167 let raw_logic =
168 move |progress: &mut SharedProgress<G::Timestamp>| {
169
170 for (progress, frontier) in progress.frontiers.iter_mut().zip(self_frontier.iter_mut()) {
172 frontier.update_iter(progress.drain());
173 }
174
175 let result = logic(&self_frontier[..]);
177
178 for (progress, consumed) in progress.consumeds.iter_mut().zip(self_consumed.iter()) {
180 consumed.borrow_mut().drain_into(progress);
181 }
182
183 let self_internal_borrow = self_internal.borrow_mut();
185 for index in 0 .. self_internal_borrow.len() {
186 let mut borrow = self_internal_borrow[index].borrow_mut();
187 progress.internals[index].extend(borrow.drain());
188 }
189
190 for (progress, produced) in progress.produceds.iter_mut().zip(self_produced.iter()) {
192 produced.borrow_mut().drain_into(progress);
193 }
194
195 result
196 };
197
198 self.builder.build(raw_logic);
199 }
200
201 pub fn index(&self) -> usize {
203 self.builder.index()
204 }
205
206 pub fn global(&self) -> usize {
208 self.builder.global()
209 }
210
211 pub fn shape(&self) -> &OperatorShape {
213 self.builder.shape()
214 }
215
216 pub fn operator_info(&self) -> OperatorInfo {
218 self.builder.operator_info()
219 }
220}
221
222
223#[cfg(test)]
224mod tests {
225 use crate::container::CapacityContainerBuilder;
226
227 #[test]
228 #[should_panic]
229 fn incorrect_capabilities() {
230
231 use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
235
236 crate::example(|scope| {
237
238 let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
239
240 let (mut output1, _stream1) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
242 let (mut output2, _stream2) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
243
244 builder.build(move |capabilities| {
245 move |_frontiers| {
246
247 let mut output_handle1 = output1.activate();
248 let mut output_handle2 = output2.activate();
249
250 output_handle2.session(&capabilities[0]);
252 output_handle1.session(&capabilities[1]);
253 }
254 });
255 })
256 }
257
258 #[test]
259 fn correct_capabilities() {
260
261 use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
265
266 crate::example(|scope| {
267
268 let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
269
270 let (mut output1, _stream1) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
272 let (mut output2, _stream2) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
273
274 builder.build(move |mut capabilities| {
275 move |_frontiers| {
276
277 let mut output_handle1 = output1.activate();
278 let mut output_handle2 = output2.activate();
279
280 if !capabilities.is_empty() {
282
283 output_handle1.session(&capabilities[0]);
285 output_handle2.session(&capabilities[1]);
286
287 capabilities.clear();
288 }
289 }
290 });
291
292 "Hello".to_owned()
293 });
294 }
295}