1use std::hash::{BuildHasher, Hash, Hasher};
19
20use columnation::Columnation;
21use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
22use differential_dataflow::difference::{Multiply, Semigroup};
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Batcher;
25use differential_dataflow::{AsCollection, Collection, Hashable, VecCollection};
26use timely::container::{DrainContainer, PushInto};
27use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
28use timely::dataflow::operators::Capability;
29use timely::dataflow::operators::generic::builder_rc::{
30 OperatorBuilder as OperatorBuilderRc, OperatorBuilder,
31};
32use timely::dataflow::operators::generic::operator::{self, Operator};
33use timely::dataflow::operators::generic::{
34 InputHandleCore, OperatorInfo, OutputBuilder, OutputBuilderSession,
35};
36use timely::dataflow::{Scope, Stream, StreamVec};
37use timely::progress::operate::FrontierInterest;
38use timely::progress::{Antichain, Timestamp};
39use timely::{Container, ContainerBuilder, PartialOrder};
40
41use crate::columnation::{ColumnationChunker, ColumnationStack};
42
43pub trait StreamExt<'scope, T, C1>
45where
46 T: Timestamp,
47 C1: Container + DrainContainer + Clone + 'static,
48{
49 fn unary_fallible<DCB, ECB, B, P>(
60 self,
61 pact: P,
62 name: &str,
63 constructor: B,
64 ) -> (
65 Stream<'scope, T, DCB::Container>,
66 Stream<'scope, T, ECB::Container>,
67 )
68 where
69 DCB: ContainerBuilder,
70 ECB: ContainerBuilder,
71 B: FnOnce(
72 Capability<T>,
73 OperatorInfo,
74 ) -> Box<
75 dyn FnMut(
76 &mut InputHandleCore<T, C1, P::Puller>,
77 &mut OutputBuilderSession<'_, T, DCB>,
78 &mut OutputBuilderSession<'_, T, ECB>,
79 ) + 'static,
80 >,
81 P: ParallelizationContract<T, C1>;
82
83 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
88 self,
89 name: &str,
90 logic: L,
91 ) -> (
92 Stream<'scope, T, DCB::Container>,
93 Stream<'scope, T, ECB::Container>,
94 )
95 where
96 DCB: ContainerBuilder + PushInto<D2>,
97 ECB: ContainerBuilder + PushInto<E>,
98 I: IntoIterator<Item = Result<D2, E>>,
99 L: for<'a> FnMut(C1::Item<'a>) -> I + 'static;
100
101 fn expire_stream_at(self, name: &str, expiration: T) -> Stream<'scope, T, C1>;
103}
104
105pub trait CollectionExt<'scope, T, D1, R>: Sized
107where
108 T: Timestamp,
109 R: Semigroup,
110{
111 fn empty(scope: Scope<'scope, T>) -> VecCollection<'scope, T, D1, R>;
113
114 fn map_fallible<DCB, ECB, D2, E, L>(
123 self,
124 name: &str,
125 mut logic: L,
126 ) -> (
127 VecCollection<'scope, T, D2, R>,
128 VecCollection<'scope, T, E, R>,
129 )
130 where
131 DCB: ContainerBuilder<Container = Vec<(D2, T, R)>> + PushInto<(D2, T, R)>,
132 ECB: ContainerBuilder<Container = Vec<(E, T, R)>> + PushInto<(E, T, R)>,
133 D2: Clone + 'static,
134 E: Clone + 'static,
135 L: FnMut(D1) -> Result<D2, E> + 'static,
136 {
137 self.flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |record| Some(logic(record)))
138 }
139
140 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
145 self,
146 name: &str,
147 logic: L,
148 ) -> (
149 Collection<'scope, T, DCB::Container>,
150 Collection<'scope, T, ECB::Container>,
151 )
152 where
153 DCB: ContainerBuilder + PushInto<(D2, T, R)>,
154 ECB: ContainerBuilder + PushInto<(E, T, R)>,
155 D2: Clone + 'static,
156 E: Clone + 'static,
157 I: IntoIterator<Item = Result<D2, E>>,
158 L: FnMut(D1) -> I + 'static;
159
160 fn expire_collection_at(self, name: &str, expiration: T) -> VecCollection<'scope, T, D1, R>;
162
163 fn explode_one<D2, R2, L>(
168 self,
169 logic: L,
170 ) -> VecCollection<'scope, T, D2, <R2 as Multiply<R>>::Output>
171 where
172 D2: differential_dataflow::Data,
173 R2: Semigroup + Multiply<R>,
174 <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
175 L: FnMut(D1) -> (D2, R2) + 'static,
176 T: Lattice;
177
178 fn ensure_monotonic<E, IE>(
183 self,
184 into_err: IE,
185 ) -> (
186 VecCollection<'scope, T, D1, R>,
187 VecCollection<'scope, T, E, R>,
188 )
189 where
190 E: Clone + 'static,
191 IE: Fn(D1, R) -> (E, R) + 'static,
192 R: num_traits::sign::Signed;
193
194 fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
197 where
198 D1: differential_dataflow::ExchangeData + Hash + Columnation,
199 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
200 T: Lattice + Columnation,
201 Ba: Batcher<Time = T, Output = ColumnationStack<((D1, ()), T, R)>> + 'static;
202
203 fn consolidate_named<Ba>(self, name: &str) -> Self
205 where
206 D1: differential_dataflow::ExchangeData + Hash + Columnation,
207 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
208 T: Lattice + Columnation,
209 Ba: Batcher<Time = T, Output = ColumnationStack<((D1, ()), T, R)>> + 'static;
210}
211
212impl<'scope, T, C1> StreamExt<'scope, T, C1> for Stream<'scope, T, C1>
213where
214 T: Timestamp,
215 C1: Container + DrainContainer + Clone + 'static,
216{
217 fn unary_fallible<DCB, ECB, B, P>(
218 self,
219 pact: P,
220 name: &str,
221 constructor: B,
222 ) -> (
223 Stream<'scope, T, DCB::Container>,
224 Stream<'scope, T, ECB::Container>,
225 )
226 where
227 DCB: ContainerBuilder,
228 ECB: ContainerBuilder,
229 B: FnOnce(
230 Capability<T>,
231 OperatorInfo,
232 ) -> Box<
233 dyn FnMut(
234 &mut InputHandleCore<T, C1, P::Puller>,
235 &mut OutputBuilderSession<'_, T, DCB>,
236 &mut OutputBuilderSession<'_, T, ECB>,
237 ) + 'static,
238 >,
239 P: ParallelizationContract<T, C1>,
240 {
241 let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
242
243 let operator_info = builder.operator_info();
244
245 let mut input = builder.new_input(self.clone(), pact);
246 builder.set_notify_for(0, FrontierInterest::Never);
247 let (ok_output, ok_stream) = builder.new_output();
248 let mut ok_output = OutputBuilder::from(ok_output);
249 let (err_output, err_stream) = builder.new_output();
250 let mut err_output = OutputBuilder::from(err_output);
251
252 builder.build(move |mut capabilities| {
253 let capability = capabilities.pop().unwrap();
255 let mut logic = constructor(capability, operator_info);
256 move |_frontiers| {
257 let mut ok_output_handle = ok_output.activate();
258 let mut err_output_handle = err_output.activate();
259 logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
260 }
261 });
262
263 (ok_stream, err_stream)
264 }
265
266 #[allow(clippy::redundant_closure)]
272 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
273 self,
274 name: &str,
275 mut logic: L,
276 ) -> (
277 Stream<'scope, T, DCB::Container>,
278 Stream<'scope, T, ECB::Container>,
279 )
280 where
281 DCB: ContainerBuilder + PushInto<D2>,
282 ECB: ContainerBuilder + PushInto<E>,
283 I: IntoIterator<Item = Result<D2, E>>,
284 L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,
285 {
286 self.unary_fallible::<DCB, ECB, _, _>(Pipeline, name, move |_, _| {
287 Box::new(move |input, ok_output, err_output| {
288 input.for_each_time(|time, data| {
289 let mut ok_session = ok_output.session_with_builder(&time);
290 let mut err_session = err_output.session_with_builder(&time);
291 for r in data
292 .flat_map(DrainContainer::drain)
293 .flat_map(|d1| logic(d1))
294 {
295 match r {
296 Ok(d2) => ok_session.give(d2),
297 Err(e) => err_session.give(e),
298 }
299 }
300 })
301 })
302 })
303 }
304
305 fn expire_stream_at(self, name: &str, expiration: T) -> Stream<'scope, T, C1> {
306 let name = format!("expire_stream_at({name})");
307 self.unary_frontier(Pipeline, &name.clone(), move |cap, _| {
308 let cap = Some(cap.delayed(&expiration));
312 let mut warned = false;
313 move |(input, frontier), output| {
314 let _ = ∩
315 let frontier = frontier.frontier();
316 if !frontier.less_than(&expiration) && !warned {
317 tracing::warn!(
325 name = name,
326 frontier = ?frontier,
327 expiration = ?expiration,
328 "frontier not less than expiration"
329 );
330 warned = true;
331 }
332 input.for_each(|time, data| {
333 let mut session = output.session(&time);
334 session.give_container(data);
335 });
336 }
337 })
338 }
339}
340
341impl<'scope, T, D1, R> CollectionExt<'scope, T, D1, R> for VecCollection<'scope, T, D1, R>
342where
343 T: Timestamp + Clone + 'static,
344 D1: Clone + 'static,
345 R: Semigroup + 'static,
346{
347 fn empty(scope: Scope<'scope, T>) -> VecCollection<'scope, T, D1, R> {
348 operator::empty(scope).as_collection()
349 }
350
351 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
352 self,
353 name: &str,
354 mut logic: L,
355 ) -> (
356 Collection<'scope, T, DCB::Container>,
357 Collection<'scope, T, ECB::Container>,
358 )
359 where
360 DCB: ContainerBuilder + PushInto<(D2, T, R)>,
361 ECB: ContainerBuilder + PushInto<(E, T, R)>,
362 D2: Clone + 'static,
363 E: Clone + 'static,
364 I: IntoIterator<Item = Result<D2, E>>,
365 L: FnMut(D1) -> I + 'static,
366 {
367 let (ok_stream, err_stream) =
368 self.inner
369 .flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |(d1, t, r)| {
370 logic(d1).into_iter().map(move |res| match res {
371 Ok(d2) => Ok((d2, t.clone(), r.clone())),
372 Err(e) => Err((e, t.clone(), r.clone())),
373 })
374 });
375 (ok_stream.as_collection(), err_stream.as_collection())
376 }
377
378 fn expire_collection_at(self, name: &str, expiration: T) -> VecCollection<'scope, T, D1, R> {
379 self.inner
380 .expire_stream_at(name, expiration)
381 .as_collection()
382 }
383
384 fn explode_one<D2, R2, L>(
385 self,
386 mut logic: L,
387 ) -> VecCollection<'scope, T, D2, <R2 as Multiply<R>>::Output>
388 where
389 D2: differential_dataflow::Data,
390 R2: Semigroup + Multiply<R>,
391 <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
392 L: FnMut(D1) -> (D2, R2) + 'static,
393 T: Lattice,
394 {
395 self.inner
396 .clone()
397 .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(
398 Pipeline,
399 "ExplodeOne",
400 move |_, _| {
401 move |input, output| {
402 input.for_each(|time, data| {
403 output
404 .session_with_builder(&time)
405 .give_iterator(data.drain(..).map(|(x, t, d)| {
406 let (x, d2) = logic(x);
407 (x, t, d2.multiply(&d))
408 }));
409 });
410 }
411 },
412 )
413 .as_collection()
414 }
415
416 fn ensure_monotonic<E, IE>(
417 self,
418 into_err: IE,
419 ) -> (
420 VecCollection<'scope, T, D1, R>,
421 VecCollection<'scope, T, E, R>,
422 )
423 where
424 E: Clone + 'static,
425 IE: Fn(D1, R) -> (E, R) + 'static,
426 R: num_traits::sign::Signed,
427 {
428 let (oks, errs) = self
429 .inner
430 .unary_fallible(Pipeline, "EnsureMonotonic", move |_, _| {
431 Box::new(move |input, ok_output, err_output| {
432 input.for_each(|time, data| {
433 let mut ok_session = ok_output.session(&time);
434 let mut err_session = err_output.session(&time);
435 for (x, t, d) in data.drain(..) {
436 if d.is_positive() {
437 ok_session.give((x, t, d))
438 } else {
439 let (e, d2) = into_err(x, d);
440 err_session.give((e, t, d2))
441 }
442 }
443 })
444 })
445 });
446 (oks.as_collection(), errs.as_collection())
447 }
448
449 fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
450 where
451 D1: differential_dataflow::ExchangeData + Hash + Columnation,
452 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
453 T: Lattice + Ord + Columnation,
454 Ba: Batcher<Time = T, Output = ColumnationStack<((D1, ()), T, R)>> + 'static,
455 {
456 if must_consolidate {
457 let random_state = ahash::RandomState::with_seeds(
475 0x243f_6a88_85a3_08d3,
476 0x1319_8a2e_0370_7344,
477 0xa409_3822_299f_31d0,
478 0x082e_fa98_ec4e_6c89,
479 );
480 let exchange = Exchange::new(move |update: &((D1, _), T, R)| {
481 let data = &(update.0).0;
482 let mut h = random_state.build_hasher();
483 data.hash(&mut h);
484 h.finish()
485 });
486 consolidate_pact::<ColumnationChunker<((D1, ()), T, R)>, Ba, _, _>(
487 self.map(|k| (k, ())).inner,
488 exchange,
489 name,
490 )
491 .unary(Pipeline, "unpack consolidated", |_, _| {
492 |input, output| {
493 input.for_each(|time, data| {
494 let mut session = output.session(&time);
495 for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
496 {
497 session.give((k.clone(), t.clone(), d.clone()))
498 }
499 })
500 }
501 })
502 .as_collection()
503 } else {
504 self
505 }
506 }
507
508 fn consolidate_named<Ba>(self, name: &str) -> Self
509 where
510 D1: differential_dataflow::ExchangeData + Hash + Columnation,
511 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
512 T: Lattice + Ord + Columnation,
513 Ba: Batcher<Time = T, Output = ColumnationStack<((D1, ()), T, R)>> + 'static,
514 {
515 let exchange = Exchange::new(move |update: &((D1, ()), T, R)| (update.0).0.hashed());
516
517 consolidate_pact::<ColumnationChunker<((D1, ()), T, R)>, Ba, _, _>(
518 self.map(|k| (k, ())).inner,
519 exchange,
520 name,
521 )
522 .unary(Pipeline, &format!("Unpack {name}"), |_, _| {
523 |input, output| {
524 input.for_each(|time, data| {
525 let mut session = output.session(&time);
526 for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter()) {
527 session.give((k.clone(), t.clone(), d.clone()))
528 }
529 })
530 }
531 })
532 .as_collection()
533 }
534}
535
536pub fn consolidate_pact<'scope, Chu, Ba, C, P>(
543 stream: Stream<'scope, Ba::Time, C>,
544 pact: P,
545 name: &str,
546) -> StreamVec<'scope, Ba::Time, Vec<Ba::Output>>
547where
548 Ba: Batcher + 'static,
549 Chu: ContainerBuilder<Container = Ba::Output> + for<'a> PushInto<&'a mut C> + 'static,
550 C: Container + Clone + 'static,
551 Ba::Output: Clone,
552 P: ParallelizationContract<Ba::Time, C>,
553{
554 let logger = stream
555 .scope()
556 .worker()
557 .logger_for("differential/arrange")
558 .map(Into::into);
559 stream.unary_frontier(pact, name, |_cap, info| {
560 let mut batcher = Ba::new(logger, info.global_id);
563 let mut chunker = Chu::default();
566 let mut capabilities = Antichain::<Capability<Ba::Time>>::new();
568 let mut prev_frontier = Antichain::from_elem(Ba::Time::minimum());
569
570 move |(input, frontier), output| {
571 input.for_each(|cap, data| {
572 capabilities.insert(cap.retain(0));
573 chunker.push_into(data);
574 while let Some(chunk) = chunker.extract() {
575 batcher.push_into(std::mem::take(chunk));
576 }
577 });
578
579 if prev_frontier.borrow() != frontier.frontier() {
580 while let Some(chunk) = chunker.finish() {
583 batcher.push_into(std::mem::take(chunk));
584 }
585
586 if capabilities
587 .elements()
588 .iter()
589 .any(|c| !frontier.less_equal(c.time()))
590 {
591 let mut upper = Antichain::new(); for (index, capability) in capabilities.elements().iter().enumerate() {
595 if !frontier.less_equal(capability.time()) {
596 upper.clear();
600 for time in frontier.frontier().iter() {
601 upper.insert(time.clone());
602 }
603 for other_capability in &capabilities.elements()[(index + 1)..] {
604 upper.insert(other_capability.time().clone());
605 }
606
607 let mut session = output.session(&capabilities.elements()[index]);
609 let (chain, _description) = batcher.seal(upper.clone());
611 session.give(chain);
612 }
613 }
614
615 let mut new_capabilities = Antichain::new();
621 for time in batcher.frontier().iter() {
622 if let Some(capability) = capabilities
623 .elements()
624 .iter()
625 .find(|c| c.time().less_equal(time))
626 {
627 new_capabilities.insert(capability.delayed(time));
628 } else {
629 panic!("failed to find capability");
630 }
631 }
632
633 capabilities = new_capabilities;
634 }
635
636 prev_frontier.clear();
637 prev_frontier.extend(frontier.frontier().iter().cloned());
638 }
639 }
640 })
641}
642
643pub trait ConcatenateFlatten<'scope, T: Timestamp, C: Container + DrainContainer> {
645 fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<'scope, T, CB::Container>
666 where
667 I: IntoIterator<Item = Stream<'scope, T, C>>,
668 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>;
669}
670
671impl<'scope, T, C> ConcatenateFlatten<'scope, T, C> for Stream<'scope, T, C>
672where
673 T: Timestamp,
674 C: Container + DrainContainer + Clone + 'static,
675{
676 fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<'scope, T, CB::Container>
677 where
678 I: IntoIterator<Item = Stream<'scope, T, C>>,
679 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
680 {
681 self.scope()
682 .concatenate_flatten::<_, CB>(Some(Clone::clone(self)).into_iter().chain(sources))
683 }
684}
685
686impl<'scope, T, C> ConcatenateFlatten<'scope, T, C> for Scope<'scope, T>
687where
688 T: Timestamp,
689 C: Container + DrainContainer,
690{
691 fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<'scope, T, CB::Container>
692 where
693 I: IntoIterator<Item = Stream<'scope, T, C>>,
694 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
695 {
696 let mut builder = OperatorBuilder::new("ConcatenateFlatten".to_string(), self.clone());
697
698 let mut handles = sources
700 .into_iter()
701 .map(|s| builder.new_input(s, Pipeline))
702 .collect::<Vec<_>>();
703 for i in 0..handles.len() {
704 builder.set_notify_for(i, FrontierInterest::Never);
705 }
706
707 let (output, result) = builder.new_output::<CB::Container>();
709 let mut output = OutputBuilder::<_, CB>::from(output);
710
711 builder.build(move |_capability| {
712 move |_frontier| {
713 let mut output = output.activate();
714 for handle in handles.iter_mut() {
715 handle.for_each_time(|time, data| {
716 output
717 .session_with_builder(&time)
718 .give_iterator(data.flat_map(DrainContainer::drain));
719 })
720 }
721 }
722 });
723
724 result
725 }
726}
727
728pub trait ClearContainer {
730 fn clear(&mut self);
732}
733
734impl<T> ClearContainer for Vec<T> {
735 fn clear(&mut self) {
736 Vec::clear(self)
737 }
738}