1use std::hash::{BuildHasher, Hash, Hasher};
19use std::marker::PhantomData;
20use std::rc::Weak;
21
22use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
23use differential_dataflow::containers::{Columnation, TimelyStack};
24use differential_dataflow::difference::{Multiply, Semigroup};
25use differential_dataflow::lattice::Lattice;
26use differential_dataflow::trace::{Batcher, Builder, Description};
27use differential_dataflow::{AsCollection, Collection, Hashable};
28use timely::container::{ContainerBuilder, DrainContainer, PushInto};
29use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
30use timely::dataflow::channels::pushers::Tee;
31use timely::dataflow::channels::pushers::buffer::Session;
32use timely::dataflow::operators::Capability;
33use timely::dataflow::operators::generic::builder_rc::{
34 OperatorBuilder as OperatorBuilderRc, OperatorBuilder,
35};
36use timely::dataflow::operators::generic::operator::{self, Operator};
37use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo, OutputHandleCore};
38use timely::dataflow::{Scope, ScopeParent, Stream, StreamCore};
39use timely::progress::{Antichain, Timestamp};
40use timely::{Container, Data, PartialOrder};
41
42pub type SessionFor<'a, G, CB> = Session<
46 'a,
47 <G as ScopeParent>::Timestamp,
48 CB,
49 timely::dataflow::channels::pushers::Counter<
50 <G as ScopeParent>::Timestamp,
51 <CB as ContainerBuilder>::Container,
52 Tee<<G as ScopeParent>::Timestamp, <CB as ContainerBuilder>::Container>,
53 >,
54>;
55
56pub trait StreamExt<G, C1>
58where
59 C1: Container + DrainContainer,
60 G: Scope,
61{
62 fn unary_fallible<DCB, ECB, B, P>(
73 &self,
74 pact: P,
75 name: &str,
76 constructor: B,
77 ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
78 where
79 DCB: ContainerBuilder,
80 ECB: ContainerBuilder,
81 B: FnOnce(
82 Capability<G::Timestamp>,
83 OperatorInfo,
84 ) -> Box<
85 dyn FnMut(
86 &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
87 &mut OutputHandleCore<G::Timestamp, DCB, Tee<G::Timestamp, DCB::Container>>,
88 &mut OutputHandleCore<G::Timestamp, ECB, Tee<G::Timestamp, ECB::Container>>,
89 ) + 'static,
90 >,
91 P: ParallelizationContract<G::Timestamp, C1>;
92
93 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
98 &self,
99 name: &str,
100 logic: L,
101 ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
102 where
103 DCB: ContainerBuilder + PushInto<D2>,
104 ECB: ContainerBuilder + PushInto<E>,
105 I: IntoIterator<Item = Result<D2, E>>,
106 L: for<'a> FnMut(C1::Item<'a>) -> I + 'static;
107
108 fn expire_stream_at(
110 &self,
111 name: &str,
112 expiration: G::Timestamp,
113 token: Weak<()>,
114 ) -> StreamCore<G, C1>;
115}
116
117pub trait CollectionExt<G, D1, R>
119where
120 G: Scope,
121 R: Semigroup,
122{
123 fn empty(scope: &G) -> Collection<G, D1, R>;
125
126 fn map_fallible<DCB, ECB, D2, E, L>(
135 &self,
136 name: &str,
137 mut logic: L,
138 ) -> (Collection<G, D2, R>, Collection<G, E, R>)
139 where
140 DCB: ContainerBuilder<Container = Vec<(D2, G::Timestamp, R)>>
141 + PushInto<(D2, G::Timestamp, R)>,
142 ECB: ContainerBuilder<Container = Vec<(E, G::Timestamp, R)>>
143 + PushInto<(E, G::Timestamp, R)>,
144 D2: Data,
145 E: Data,
146 L: FnMut(D1) -> Result<D2, E> + 'static,
147 {
148 self.flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |record| Some(logic(record)))
149 }
150
151 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
156 &self,
157 name: &str,
158 logic: L,
159 ) -> (
160 Collection<G, D2, R, DCB::Container>,
161 Collection<G, E, R, ECB::Container>,
162 )
163 where
164 DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
165 ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
166 D2: Data,
167 E: Data,
168 I: IntoIterator<Item = Result<D2, E>>,
169 L: FnMut(D1) -> I + 'static;
170
171 fn expire_collection_at(
173 &self,
174 name: &str,
175 expiration: G::Timestamp,
176 token: Weak<()>,
177 ) -> Collection<G, D1, R>;
178
179 fn explode_one<D2, R2, L>(&self, logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
184 where
185 D2: differential_dataflow::Data,
186 R2: Semigroup + Multiply<R>,
187 <R2 as Multiply<R>>::Output: Data + Semigroup,
188 L: FnMut(D1) -> (D2, R2) + 'static,
189 G::Timestamp: Lattice;
190
191 fn ensure_monotonic<E, IE>(&self, into_err: IE) -> (Collection<G, D1, R>, Collection<G, E, R>)
196 where
197 E: Data,
198 IE: Fn(D1, R) -> (E, R) + 'static,
199 R: num_traits::sign::Signed;
200
201 fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
204 where
205 D1: differential_dataflow::ExchangeData + Hash + Columnation,
206 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
207 G::Timestamp: Lattice + Columnation,
208 Ba: Batcher<
209 Input = Vec<((D1, ()), G::Timestamp, R)>,
210 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
211 Time = G::Timestamp,
212 > + 'static;
213
214 fn consolidate_named<Ba>(self, name: &str) -> Self
216 where
217 D1: differential_dataflow::ExchangeData + Hash + Columnation,
218 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
219 G::Timestamp: Lattice + Columnation,
220 Ba: Batcher<
221 Input = Vec<((D1, ()), G::Timestamp, R)>,
222 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
223 Time = G::Timestamp,
224 > + 'static;
225}
226
227impl<G, C1> StreamExt<G, C1> for StreamCore<G, C1>
228where
229 C1: Container + DrainContainer,
230 G: Scope,
231{
232 fn unary_fallible<DCB, ECB, B, P>(
233 &self,
234 pact: P,
235 name: &str,
236 constructor: B,
237 ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
238 where
239 DCB: ContainerBuilder,
240 ECB: ContainerBuilder,
241 B: FnOnce(
242 Capability<G::Timestamp>,
243 OperatorInfo,
244 ) -> Box<
245 dyn FnMut(
246 &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
247 &mut OutputHandleCore<G::Timestamp, DCB, Tee<G::Timestamp, DCB::Container>>,
248 &mut OutputHandleCore<G::Timestamp, ECB, Tee<G::Timestamp, ECB::Container>>,
249 ) + 'static,
250 >,
251 P: ParallelizationContract<G::Timestamp, C1>,
252 {
253 let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
254 builder.set_notify(false);
255
256 let operator_info = builder.operator_info();
257
258 let mut input = builder.new_input(self, pact);
259 let (mut ok_output, ok_stream) = builder.new_output();
260 let (mut err_output, err_stream) = builder.new_output();
261
262 builder.build(move |mut capabilities| {
263 let capability = capabilities.pop().unwrap();
265 let mut logic = constructor(capability, operator_info);
266 move |_frontiers| {
267 let mut ok_output_handle = ok_output.activate();
268 let mut err_output_handle = err_output.activate();
269 logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
270 }
271 });
272
273 (ok_stream, err_stream)
274 }
275
276 #[allow(clippy::redundant_closure)]
282 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
283 &self,
284 name: &str,
285 mut logic: L,
286 ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
287 where
288 DCB: ContainerBuilder + PushInto<D2>,
289 ECB: ContainerBuilder + PushInto<E>,
290 I: IntoIterator<Item = Result<D2, E>>,
291 L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,
292 {
293 self.unary_fallible::<DCB, ECB, _, _>(Pipeline, name, move |_, _| {
294 Box::new(move |input, ok_output, err_output| {
295 input.for_each(|time, data| {
296 let mut ok_session = ok_output.session_with_builder(&time);
297 let mut err_session = err_output.session_with_builder(&time);
298 for r in data.drain().flat_map(|d1| logic(d1)) {
299 match r {
300 Ok(d2) => ok_session.push_into(d2),
301 Err(e) => err_session.push_into(e),
302 }
303 }
304 })
305 })
306 })
307 }
308
309 fn expire_stream_at(
310 &self,
311 name: &str,
312 expiration: G::Timestamp,
313 token: Weak<()>,
314 ) -> StreamCore<G, C1> {
315 let name = format!("expire_stream_at({name})");
316 self.unary_frontier(Pipeline, &name.clone(), move |cap, _| {
317 let mut cap = Some(cap.delayed(&expiration));
321 let mut warned = false;
322 move |input, output| {
323 if token.upgrade().is_none() {
324 drop(cap.take());
326 } else {
327 let frontier = input.frontier().frontier();
328 if !frontier.less_than(&expiration) && !warned {
329 tracing::warn!(
337 name = name,
338 frontier = ?frontier,
339 expiration = ?expiration,
340 "frontier not less than expiration"
341 );
342 warned = true;
343 }
344 }
345 input.for_each(|time, data| {
346 let mut session = output.session(&time);
347 session.give_container(data);
348 });
349 }
350 })
351 }
352}
353
354impl<G, D1, R> CollectionExt<G, D1, R> for Collection<G, D1, R>
355where
356 G: Scope,
357 G::Timestamp: Data,
358 D1: Data,
359 R: Semigroup + 'static,
360{
361 fn empty(scope: &G) -> Collection<G, D1, R> {
362 operator::empty(scope).as_collection()
363 }
364
365 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
366 &self,
367 name: &str,
368 mut logic: L,
369 ) -> (
370 Collection<G, D2, R, DCB::Container>,
371 Collection<G, E, R, ECB::Container>,
372 )
373 where
374 DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
375 ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
376 D2: Data,
377 E: Data,
378 I: IntoIterator<Item = Result<D2, E>>,
379 L: FnMut(D1) -> I + 'static,
380 {
381 let (ok_stream, err_stream) =
382 self.inner
383 .flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |(d1, t, r)| {
384 logic(d1).into_iter().map(move |res| match res {
385 Ok(d2) => Ok((d2, t.clone(), r.clone())),
386 Err(e) => Err((e, t.clone(), r.clone())),
387 })
388 });
389 (ok_stream.as_collection(), err_stream.as_collection())
390 }
391
392 fn expire_collection_at(
393 &self,
394 name: &str,
395 expiration: G::Timestamp,
396 token: Weak<()>,
397 ) -> Collection<G, D1, R> {
398 self.inner
399 .expire_stream_at(name, expiration, token)
400 .as_collection()
401 }
402
403 fn explode_one<D2, R2, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
404 where
405 D2: differential_dataflow::Data,
406 R2: Semigroup + Multiply<R>,
407 <R2 as Multiply<R>>::Output: Data + Semigroup,
408 L: FnMut(D1) -> (D2, R2) + 'static,
409 G::Timestamp: Lattice,
410 {
411 self.inner
412 .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(
413 Pipeline,
414 "ExplodeOne",
415 move |_, _| {
416 move |input, output| {
417 input.for_each(|time, data| {
418 output
419 .session_with_builder(&time)
420 .give_iterator(data.drain(..).map(|(x, t, d)| {
421 let (x, d2) = logic(x);
422 (x, t, d2.multiply(&d))
423 }));
424 });
425 }
426 },
427 )
428 .as_collection()
429 }
430
431 fn ensure_monotonic<E, IE>(&self, into_err: IE) -> (Collection<G, D1, R>, Collection<G, E, R>)
432 where
433 E: Data,
434 IE: Fn(D1, R) -> (E, R) + 'static,
435 R: num_traits::sign::Signed,
436 {
437 let (oks, errs) = self
438 .inner
439 .unary_fallible(Pipeline, "EnsureMonotonic", move |_, _| {
440 Box::new(move |input, ok_output, err_output| {
441 input.for_each(|time, data| {
442 let mut ok_session = ok_output.session(&time);
443 let mut err_session = err_output.session(&time);
444 for (x, t, d) in data.drain(..) {
445 if d.is_positive() {
446 ok_session.give((x, t, d))
447 } else {
448 let (e, d2) = into_err(x, d);
449 err_session.give((e, t, d2))
450 }
451 }
452 })
453 })
454 });
455 (oks.as_collection(), errs.as_collection())
456 }
457
458 fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
459 where
460 D1: differential_dataflow::ExchangeData + Hash + Columnation,
461 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
462 G::Timestamp: Lattice + Ord + Columnation,
463 Ba: Batcher<
464 Input = Vec<((D1, ()), G::Timestamp, R)>,
465 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
466 Time = G::Timestamp,
467 > + 'static,
468 {
469 if must_consolidate {
470 let random_state = ahash::RandomState::with_seeds(
488 0x243f_6a88_85a3_08d3,
489 0x1319_8a2e_0370_7344,
490 0xa409_3822_299f_31d0,
491 0x082e_fa98_ec4e_6c89,
492 );
493 let exchange = Exchange::new(move |update: &((D1, _), G::Timestamp, R)| {
494 let data = &(update.0).0;
495 let mut h = random_state.build_hasher();
496 data.hash(&mut h);
497 h.finish()
498 });
499 consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
500 .unary(Pipeline, "unpack consolidated", |_, _| {
501 |input, output| {
502 input.for_each(|time, data| {
503 let mut session = output.session(&time);
504 for ((k, ()), t, d) in
505 data.iter().flatten().flat_map(|chunk| chunk.iter())
506 {
507 session.give((k.clone(), t.clone(), d.clone()))
508 }
509 })
510 }
511 })
512 .as_collection()
513 } else {
514 self
515 }
516 }
517
518 fn consolidate_named<Ba>(self, name: &str) -> Self
519 where
520 D1: differential_dataflow::ExchangeData + Hash + Columnation,
521 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
522 G::Timestamp: Lattice + Ord + Columnation,
523 Ba: Batcher<
524 Input = Vec<((D1, ()), G::Timestamp, R)>,
525 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
526 Time = G::Timestamp,
527 > + 'static,
528 {
529 let exchange =
530 Exchange::new(move |update: &((D1, ()), G::Timestamp, R)| (update.0).0.hashed());
531
532 consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
533 .unary(Pipeline, &format!("Unpack {name}"), |_, _| {
534 |input, output| {
535 input.for_each(|time, data| {
536 let mut session = output.session(&time);
537 for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
538 {
539 session.give((k.clone(), t.clone(), d.clone()))
540 }
541 })
542 }
543 })
544 .as_collection()
545 }
546}
547
548pub fn consolidate_pact<Ba, P, G>(
555 stream: &StreamCore<G, Ba::Input>,
556 pact: P,
557 name: &str,
558) -> Stream<G, Vec<Ba::Output>>
559where
560 G: Scope,
561 Ba: Batcher<Time = G::Timestamp> + 'static,
562 Ba::Input: Container + Clone + 'static,
563 Ba::Output: Container + Clone,
564 P: ParallelizationContract<G::Timestamp, Ba::Input>,
565{
566 stream.unary_frontier(pact, name, |_cap, info| {
567 let logger = stream
569 .scope()
570 .logger_for("differential/arrange")
571 .map(Into::into);
572
573 let mut batcher = Ba::new(logger, info.global_id);
574 let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
576 let mut prev_frontier = Antichain::from_elem(G::Timestamp::minimum());
577
578 move |input, output| {
579 input.for_each(|cap, data| {
580 capabilities.insert(cap.retain());
581 batcher.push_container(data);
582 });
583
584 if prev_frontier.borrow() != input.frontier().frontier() {
585 if capabilities
586 .elements()
587 .iter()
588 .any(|c| !input.frontier().less_equal(c.time()))
589 {
590 let mut upper = Antichain::new(); for (index, capability) in capabilities.elements().iter().enumerate() {
594 if !input.frontier().less_equal(capability.time()) {
595 upper.clear();
599 for time in input.frontier().frontier().iter() {
600 upper.insert(time.clone());
601 }
602 for other_capability in &capabilities.elements()[(index + 1)..] {
603 upper.insert(other_capability.time().clone());
604 }
605
606 let mut session = output.session(&capabilities.elements()[index]);
608 let output =
610 batcher.seal::<ConsolidateBuilder<_, Ba::Output>>(upper.clone());
611 session.give(output);
612 }
613 }
614
615 let mut new_capabilities = Antichain::new();
621 for time in batcher.frontier().iter() {
622 if let Some(capability) = capabilities
623 .elements()
624 .iter()
625 .find(|c| c.time().less_equal(time))
626 {
627 new_capabilities.insert(capability.delayed(time));
628 } else {
629 panic!("failed to find capability");
630 }
631 }
632
633 capabilities = new_capabilities;
634 }
635
636 prev_frontier.clear();
637 prev_frontier.extend(input.frontier().frontier().iter().cloned());
638 }
639 }
640 })
641}
642
643struct ConsolidateBuilder<T, I> {
645 _marker: PhantomData<(T, I)>,
646}
647
648impl<T, I> Builder for ConsolidateBuilder<T, I>
649where
650 T: Timestamp,
651 I: Container,
652{
653 type Input = I;
654 type Time = T;
655 type Output = Vec<I>;
656
657 fn new() -> Self {
658 Self {
659 _marker: PhantomData,
660 }
661 }
662
663 fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
664 Self::new()
665 }
666
667 fn push(&mut self, _chunk: &mut Self::Input) {
668 unimplemented!("ConsolidateBuilder::push")
669 }
670
671 fn done(self, _: Description<Self::Time>) -> Self::Output {
672 unimplemented!("ConsolidateBuilder::done")
673 }
674
675 fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
676 std::mem::take(chain)
677 }
678}
679
680pub trait ConcatenateFlatten<G: Scope, C: Container + DrainContainer> {
682 fn concatenate_flatten<I, CB>(&self, sources: I) -> StreamCore<G, CB::Container>
702 where
703 I: IntoIterator<Item = StreamCore<G, C>>,
704 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>;
705}
706
707impl<G, C> ConcatenateFlatten<G, C> for StreamCore<G, C>
708where
709 G: Scope,
710 C: Container + DrainContainer,
711{
712 fn concatenate_flatten<I, CB>(&self, sources: I) -> StreamCore<G, CB::Container>
713 where
714 I: IntoIterator<Item = StreamCore<G, C>>,
715 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
716 {
717 let clone = self.clone();
718 self.scope()
719 .concatenate_flatten::<_, CB>(Some(clone).into_iter().chain(sources))
720 }
721}
722
723impl<G, C> ConcatenateFlatten<G, C> for G
724where
725 G: Scope,
726 C: Container + DrainContainer,
727{
728 fn concatenate_flatten<I, CB>(&self, sources: I) -> StreamCore<G, CB::Container>
729 where
730 I: IntoIterator<Item = StreamCore<G, C>>,
731 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
732 {
733 let mut builder = OperatorBuilder::new("ConcatenateFlatten".to_string(), self.clone());
734 builder.set_notify(false);
735
736 let mut handles = sources
738 .into_iter()
739 .map(|s| builder.new_input(&s, Pipeline))
740 .collect::<Vec<_>>();
741
742 let (mut output, result) = builder.new_output::<CB>();
744
745 builder.build(move |_capability| {
746 move |_frontier| {
747 let mut output = output.activate();
748 for handle in handles.iter_mut() {
749 handle.for_each(|time, data| {
750 output
751 .session_with_builder(&time)
752 .give_iterator(data.drain());
753 })
754 }
755 }
756 });
757
758 result
759 }
760}