1use std::hash::{BuildHasher, Hash, Hasher};
19use std::marker::PhantomData;
20
21use columnation::Columnation;
22use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
23use differential_dataflow::difference::{Multiply, Semigroup};
24use differential_dataflow::lattice::Lattice;
25use differential_dataflow::trace::{Batcher, Builder, Description};
26use differential_dataflow::{AsCollection, Collection, Hashable, VecCollection};
27use timely::container::{DrainContainer, PushInto};
28use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
29use timely::dataflow::operators::Capability;
30use timely::dataflow::operators::generic::builder_rc::{
31 OperatorBuilder as OperatorBuilderRc, OperatorBuilder,
32};
33use timely::dataflow::operators::generic::operator::{self, Operator};
34use timely::dataflow::operators::generic::{
35 InputHandleCore, OperatorInfo, OutputBuilder, OutputBuilderSession,
36};
37use timely::dataflow::{Scope, Stream, StreamVec};
38use timely::progress::operate::FrontierInterest;
39use timely::progress::{Antichain, Timestamp};
40use timely::{Container, ContainerBuilder, PartialOrder};
41
42use crate::columnation::ColumnationStack;
43
44pub trait StreamExt<'scope, T, C1>
46where
47 T: Timestamp,
48 C1: Container + DrainContainer + Clone + 'static,
49{
50 fn unary_fallible<DCB, ECB, B, P>(
61 self,
62 pact: P,
63 name: &str,
64 constructor: B,
65 ) -> (
66 Stream<'scope, T, DCB::Container>,
67 Stream<'scope, T, ECB::Container>,
68 )
69 where
70 DCB: ContainerBuilder,
71 ECB: ContainerBuilder,
72 B: FnOnce(
73 Capability<T>,
74 OperatorInfo,
75 ) -> Box<
76 dyn FnMut(
77 &mut InputHandleCore<T, C1, P::Puller>,
78 &mut OutputBuilderSession<'_, T, DCB>,
79 &mut OutputBuilderSession<'_, T, ECB>,
80 ) + 'static,
81 >,
82 P: ParallelizationContract<T, C1>;
83
84 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
89 self,
90 name: &str,
91 logic: L,
92 ) -> (
93 Stream<'scope, T, DCB::Container>,
94 Stream<'scope, T, ECB::Container>,
95 )
96 where
97 DCB: ContainerBuilder + PushInto<D2>,
98 ECB: ContainerBuilder + PushInto<E>,
99 I: IntoIterator<Item = Result<D2, E>>,
100 L: for<'a> FnMut(C1::Item<'a>) -> I + 'static;
101
102 fn expire_stream_at(self, name: &str, expiration: T) -> Stream<'scope, T, C1>;
104}
105
106pub trait CollectionExt<'scope, T, D1, R>: Sized
108where
109 T: Timestamp,
110 R: Semigroup,
111{
112 fn empty(scope: Scope<'scope, T>) -> VecCollection<'scope, T, D1, R>;
114
115 fn map_fallible<DCB, ECB, D2, E, L>(
124 self,
125 name: &str,
126 mut logic: L,
127 ) -> (
128 VecCollection<'scope, T, D2, R>,
129 VecCollection<'scope, T, E, R>,
130 )
131 where
132 DCB: ContainerBuilder<Container = Vec<(D2, T, R)>> + PushInto<(D2, T, R)>,
133 ECB: ContainerBuilder<Container = Vec<(E, T, R)>> + PushInto<(E, T, R)>,
134 D2: Clone + 'static,
135 E: Clone + 'static,
136 L: FnMut(D1) -> Result<D2, E> + 'static,
137 {
138 self.flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |record| Some(logic(record)))
139 }
140
141 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
146 self,
147 name: &str,
148 logic: L,
149 ) -> (
150 Collection<'scope, T, DCB::Container>,
151 Collection<'scope, T, ECB::Container>,
152 )
153 where
154 DCB: ContainerBuilder + PushInto<(D2, T, R)>,
155 ECB: ContainerBuilder + PushInto<(E, T, R)>,
156 D2: Clone + 'static,
157 E: Clone + 'static,
158 I: IntoIterator<Item = Result<D2, E>>,
159 L: FnMut(D1) -> I + 'static;
160
161 fn expire_collection_at(self, name: &str, expiration: T) -> VecCollection<'scope, T, D1, R>;
163
164 fn explode_one<D2, R2, L>(
169 self,
170 logic: L,
171 ) -> VecCollection<'scope, T, D2, <R2 as Multiply<R>>::Output>
172 where
173 D2: differential_dataflow::Data,
174 R2: Semigroup + Multiply<R>,
175 <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
176 L: FnMut(D1) -> (D2, R2) + 'static,
177 T: Lattice;
178
179 fn ensure_monotonic<E, IE>(
184 self,
185 into_err: IE,
186 ) -> (
187 VecCollection<'scope, T, D1, R>,
188 VecCollection<'scope, T, E, R>,
189 )
190 where
191 E: Clone + 'static,
192 IE: Fn(D1, R) -> (E, R) + 'static,
193 R: num_traits::sign::Signed;
194
195 fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
198 where
199 D1: differential_dataflow::ExchangeData + Hash + Columnation,
200 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
201 T: Lattice + Columnation,
202 Ba: Batcher<
203 Input = Vec<((D1, ()), T, R)>,
204 Output = ColumnationStack<((D1, ()), T, R)>,
205 Time = T,
206 > + 'static;
207
208 fn consolidate_named<Ba>(self, name: &str) -> Self
210 where
211 D1: differential_dataflow::ExchangeData + Hash + Columnation,
212 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
213 T: Lattice + Columnation,
214 Ba: Batcher<
215 Input = Vec<((D1, ()), T, R)>,
216 Output = ColumnationStack<((D1, ()), T, R)>,
217 Time = T,
218 > + 'static;
219}
220
221impl<'scope, T, C1> StreamExt<'scope, T, C1> for Stream<'scope, T, C1>
222where
223 T: Timestamp,
224 C1: Container + DrainContainer + Clone + 'static,
225{
226 fn unary_fallible<DCB, ECB, B, P>(
227 self,
228 pact: P,
229 name: &str,
230 constructor: B,
231 ) -> (
232 Stream<'scope, T, DCB::Container>,
233 Stream<'scope, T, ECB::Container>,
234 )
235 where
236 DCB: ContainerBuilder,
237 ECB: ContainerBuilder,
238 B: FnOnce(
239 Capability<T>,
240 OperatorInfo,
241 ) -> Box<
242 dyn FnMut(
243 &mut InputHandleCore<T, C1, P::Puller>,
244 &mut OutputBuilderSession<'_, T, DCB>,
245 &mut OutputBuilderSession<'_, T, ECB>,
246 ) + 'static,
247 >,
248 P: ParallelizationContract<T, C1>,
249 {
250 let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
251
252 let operator_info = builder.operator_info();
253
254 let mut input = builder.new_input(self.clone(), pact);
255 builder.set_notify_for(0, FrontierInterest::Never);
256 let (ok_output, ok_stream) = builder.new_output();
257 let mut ok_output = OutputBuilder::from(ok_output);
258 let (err_output, err_stream) = builder.new_output();
259 let mut err_output = OutputBuilder::from(err_output);
260
261 builder.build(move |mut capabilities| {
262 let capability = capabilities.pop().unwrap();
264 let mut logic = constructor(capability, operator_info);
265 move |_frontiers| {
266 let mut ok_output_handle = ok_output.activate();
267 let mut err_output_handle = err_output.activate();
268 logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
269 }
270 });
271
272 (ok_stream, err_stream)
273 }
274
275 #[allow(clippy::redundant_closure)]
281 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
282 self,
283 name: &str,
284 mut logic: L,
285 ) -> (
286 Stream<'scope, T, DCB::Container>,
287 Stream<'scope, T, ECB::Container>,
288 )
289 where
290 DCB: ContainerBuilder + PushInto<D2>,
291 ECB: ContainerBuilder + PushInto<E>,
292 I: IntoIterator<Item = Result<D2, E>>,
293 L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,
294 {
295 self.unary_fallible::<DCB, ECB, _, _>(Pipeline, name, move |_, _| {
296 Box::new(move |input, ok_output, err_output| {
297 input.for_each_time(|time, data| {
298 let mut ok_session = ok_output.session_with_builder(&time);
299 let mut err_session = err_output.session_with_builder(&time);
300 for r in data
301 .flat_map(DrainContainer::drain)
302 .flat_map(|d1| logic(d1))
303 {
304 match r {
305 Ok(d2) => ok_session.give(d2),
306 Err(e) => err_session.give(e),
307 }
308 }
309 })
310 })
311 })
312 }
313
314 fn expire_stream_at(self, name: &str, expiration: T) -> Stream<'scope, T, C1> {
315 let name = format!("expire_stream_at({name})");
316 self.unary_frontier(Pipeline, &name.clone(), move |cap, _| {
317 let cap = Some(cap.delayed(&expiration));
321 let mut warned = false;
322 move |(input, frontier), output| {
323 let _ = ∩
324 let frontier = frontier.frontier();
325 if !frontier.less_than(&expiration) && !warned {
326 tracing::warn!(
334 name = name,
335 frontier = ?frontier,
336 expiration = ?expiration,
337 "frontier not less than expiration"
338 );
339 warned = true;
340 }
341 input.for_each(|time, data| {
342 let mut session = output.session(&time);
343 session.give_container(data);
344 });
345 }
346 })
347 }
348}
349
350impl<'scope, T, D1, R> CollectionExt<'scope, T, D1, R> for VecCollection<'scope, T, D1, R>
351where
352 T: Timestamp + Clone + 'static,
353 D1: Clone + 'static,
354 R: Semigroup + 'static,
355{
356 fn empty(scope: Scope<'scope, T>) -> VecCollection<'scope, T, D1, R> {
357 operator::empty(scope).as_collection()
358 }
359
360 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
361 self,
362 name: &str,
363 mut logic: L,
364 ) -> (
365 Collection<'scope, T, DCB::Container>,
366 Collection<'scope, T, ECB::Container>,
367 )
368 where
369 DCB: ContainerBuilder + PushInto<(D2, T, R)>,
370 ECB: ContainerBuilder + PushInto<(E, T, R)>,
371 D2: Clone + 'static,
372 E: Clone + 'static,
373 I: IntoIterator<Item = Result<D2, E>>,
374 L: FnMut(D1) -> I + 'static,
375 {
376 let (ok_stream, err_stream) =
377 self.inner
378 .flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |(d1, t, r)| {
379 logic(d1).into_iter().map(move |res| match res {
380 Ok(d2) => Ok((d2, t.clone(), r.clone())),
381 Err(e) => Err((e, t.clone(), r.clone())),
382 })
383 });
384 (ok_stream.as_collection(), err_stream.as_collection())
385 }
386
387 fn expire_collection_at(self, name: &str, expiration: T) -> VecCollection<'scope, T, D1, R> {
388 self.inner
389 .expire_stream_at(name, expiration)
390 .as_collection()
391 }
392
393 fn explode_one<D2, R2, L>(
394 self,
395 mut logic: L,
396 ) -> VecCollection<'scope, T, D2, <R2 as Multiply<R>>::Output>
397 where
398 D2: differential_dataflow::Data,
399 R2: Semigroup + Multiply<R>,
400 <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
401 L: FnMut(D1) -> (D2, R2) + 'static,
402 T: Lattice,
403 {
404 self.inner
405 .clone()
406 .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(
407 Pipeline,
408 "ExplodeOne",
409 move |_, _| {
410 move |input, output| {
411 input.for_each(|time, data| {
412 output
413 .session_with_builder(&time)
414 .give_iterator(data.drain(..).map(|(x, t, d)| {
415 let (x, d2) = logic(x);
416 (x, t, d2.multiply(&d))
417 }));
418 });
419 }
420 },
421 )
422 .as_collection()
423 }
424
425 fn ensure_monotonic<E, IE>(
426 self,
427 into_err: IE,
428 ) -> (
429 VecCollection<'scope, T, D1, R>,
430 VecCollection<'scope, T, E, R>,
431 )
432 where
433 E: Clone + 'static,
434 IE: Fn(D1, R) -> (E, R) + 'static,
435 R: num_traits::sign::Signed,
436 {
437 let (oks, errs) = self
438 .inner
439 .unary_fallible(Pipeline, "EnsureMonotonic", move |_, _| {
440 Box::new(move |input, ok_output, err_output| {
441 input.for_each(|time, data| {
442 let mut ok_session = ok_output.session(&time);
443 let mut err_session = err_output.session(&time);
444 for (x, t, d) in data.drain(..) {
445 if d.is_positive() {
446 ok_session.give((x, t, d))
447 } else {
448 let (e, d2) = into_err(x, d);
449 err_session.give((e, t, d2))
450 }
451 }
452 })
453 })
454 });
455 (oks.as_collection(), errs.as_collection())
456 }
457
458 fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
459 where
460 D1: differential_dataflow::ExchangeData + Hash + Columnation,
461 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
462 T: Lattice + Ord + Columnation,
463 Ba: Batcher<
464 Input = Vec<((D1, ()), T, R)>,
465 Output = ColumnationStack<((D1, ()), T, R)>,
466 Time = T,
467 > + 'static,
468 {
469 if must_consolidate {
470 let random_state = ahash::RandomState::with_seeds(
488 0x243f_6a88_85a3_08d3,
489 0x1319_8a2e_0370_7344,
490 0xa409_3822_299f_31d0,
491 0x082e_fa98_ec4e_6c89,
492 );
493 let exchange = Exchange::new(move |update: &((D1, _), T, R)| {
494 let data = &(update.0).0;
495 let mut h = random_state.build_hasher();
496 data.hash(&mut h);
497 h.finish()
498 });
499 consolidate_pact::<Ba, _>(self.map(|k| (k, ())).inner, exchange, name)
500 .unary(Pipeline, "unpack consolidated", |_, _| {
501 |input, output| {
502 input.for_each(|time, data| {
503 let mut session = output.session(&time);
504 for ((k, ()), t, d) in
505 data.iter().flatten().flat_map(|chunk| chunk.iter())
506 {
507 session.give((k.clone(), t.clone(), d.clone()))
508 }
509 })
510 }
511 })
512 .as_collection()
513 } else {
514 self
515 }
516 }
517
518 fn consolidate_named<Ba>(self, name: &str) -> Self
519 where
520 D1: differential_dataflow::ExchangeData + Hash + Columnation,
521 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
522 T: Lattice + Ord + Columnation,
523 Ba: Batcher<
524 Input = Vec<((D1, ()), T, R)>,
525 Output = ColumnationStack<((D1, ()), T, R)>,
526 Time = T,
527 > + 'static,
528 {
529 let exchange = Exchange::new(move |update: &((D1, ()), T, R)| (update.0).0.hashed());
530
531 consolidate_pact::<Ba, _>(self.map(|k| (k, ())).inner, exchange, name)
532 .unary(Pipeline, &format!("Unpack {name}"), |_, _| {
533 |input, output| {
534 input.for_each(|time, data| {
535 let mut session = output.session(&time);
536 for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
537 {
538 session.give((k.clone(), t.clone(), d.clone()))
539 }
540 })
541 }
542 })
543 .as_collection()
544 }
545}
546
547pub fn consolidate_pact<'scope, Ba, P>(
554 stream: Stream<'scope, Ba::Time, Ba::Input>,
555 pact: P,
556 name: &str,
557) -> StreamVec<'scope, Ba::Time, Vec<Ba::Output>>
558where
559 Ba: Batcher + 'static,
560 Ba::Input: Container + Clone + 'static,
561 Ba::Output: Clone,
562 P: ParallelizationContract<Ba::Time, Ba::Input>,
563{
564 let logger = stream
565 .scope()
566 .worker()
567 .logger_for("differential/arrange")
568 .map(Into::into);
569 stream.unary_frontier(pact, name, |_cap, info| {
570 let mut batcher = Ba::new(logger, info.global_id);
573 let mut capabilities = Antichain::<Capability<Ba::Time>>::new();
575 let mut prev_frontier = Antichain::from_elem(Ba::Time::minimum());
576
577 move |(input, frontier), output| {
578 input.for_each(|cap, data| {
579 capabilities.insert(cap.retain(0));
580 batcher.push_container(data);
581 });
582
583 if prev_frontier.borrow() != frontier.frontier() {
584 if capabilities
585 .elements()
586 .iter()
587 .any(|c| !frontier.less_equal(c.time()))
588 {
589 let mut upper = Antichain::new(); for (index, capability) in capabilities.elements().iter().enumerate() {
593 if !frontier.less_equal(capability.time()) {
594 upper.clear();
598 for time in frontier.frontier().iter() {
599 upper.insert(time.clone());
600 }
601 for other_capability in &capabilities.elements()[(index + 1)..] {
602 upper.insert(other_capability.time().clone());
603 }
604
605 let mut session = output.session(&capabilities.elements()[index]);
607 let output =
609 batcher.seal::<ConsolidateBuilder<_, Ba::Output>>(upper.clone());
610 session.give(output);
611 }
612 }
613
614 let mut new_capabilities = Antichain::new();
620 for time in batcher.frontier().iter() {
621 if let Some(capability) = capabilities
622 .elements()
623 .iter()
624 .find(|c| c.time().less_equal(time))
625 {
626 new_capabilities.insert(capability.delayed(time));
627 } else {
628 panic!("failed to find capability");
629 }
630 }
631
632 capabilities = new_capabilities;
633 }
634
635 prev_frontier.clear();
636 prev_frontier.extend(frontier.frontier().iter().cloned());
637 }
638 }
639 })
640}
641
642struct ConsolidateBuilder<T, I> {
644 _marker: PhantomData<(T, I)>,
645}
646
647impl<T, I> Builder for ConsolidateBuilder<T, I>
648where
649 T: Timestamp,
650 I: Clone,
651{
652 type Input = I;
653 type Time = T;
654 type Output = Vec<I>;
655
656 fn new() -> Self {
657 Self {
658 _marker: PhantomData,
659 }
660 }
661
662 fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
663 Self::new()
664 }
665
666 fn push(&mut self, _chunk: &mut Self::Input) {
667 unimplemented!("ConsolidateBuilder::push")
668 }
669
670 fn done(self, _: Description<Self::Time>) -> Self::Output {
671 unimplemented!("ConsolidateBuilder::done")
672 }
673
674 fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
675 std::mem::take(chain)
676 }
677}
678
679pub trait ConcatenateFlatten<'scope, T: Timestamp, C: Container + DrainContainer> {
681 fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<'scope, T, CB::Container>
702 where
703 I: IntoIterator<Item = Stream<'scope, T, C>>,
704 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>;
705}
706
707impl<'scope, T, C> ConcatenateFlatten<'scope, T, C> for Stream<'scope, T, C>
708where
709 T: Timestamp,
710 C: Container + DrainContainer + Clone + 'static,
711{
712 fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<'scope, T, CB::Container>
713 where
714 I: IntoIterator<Item = Stream<'scope, T, C>>,
715 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
716 {
717 self.scope()
718 .concatenate_flatten::<_, CB>(Some(Clone::clone(self)).into_iter().chain(sources))
719 }
720}
721
722impl<'scope, T, C> ConcatenateFlatten<'scope, T, C> for Scope<'scope, T>
723where
724 T: Timestamp,
725 C: Container + DrainContainer,
726{
727 fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<'scope, T, CB::Container>
728 where
729 I: IntoIterator<Item = Stream<'scope, T, C>>,
730 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
731 {
732 let mut builder = OperatorBuilder::new("ConcatenateFlatten".to_string(), self.clone());
733
734 let mut handles = sources
736 .into_iter()
737 .map(|s| builder.new_input(s, Pipeline))
738 .collect::<Vec<_>>();
739 for i in 0..handles.len() {
740 builder.set_notify_for(i, FrontierInterest::Never);
741 }
742
743 let (output, result) = builder.new_output::<CB::Container>();
745 let mut output = OutputBuilder::<_, CB>::from(output);
746
747 builder.build(move |_capability| {
748 move |_frontier| {
749 let mut output = output.activate();
750 for handle in handles.iter_mut() {
751 handle.for_each_time(|time, data| {
752 output
753 .session_with_builder(&time)
754 .give_iterator(data.flat_map(DrainContainer::drain));
755 })
756 }
757 }
758 });
759
760 result
761 }
762}
763
764pub trait ClearContainer {
766 fn clear(&mut self);
768}
769
770impl<T> ClearContainer for Vec<T> {
771 fn clear(&mut self) {
772 Vec::clear(self)
773 }
774}