1use std::hash::{BuildHasher, Hash, Hasher};
19use std::marker::PhantomData;
20
21use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
22use differential_dataflow::containers::{Columnation, TimelyStack};
23use differential_dataflow::difference::{Multiply, Semigroup};
24use differential_dataflow::lattice::Lattice;
25use differential_dataflow::trace::{Batcher, Builder, Description};
26use differential_dataflow::{AsCollection, Collection, Hashable, VecCollection};
27use timely::container::{DrainContainer, PushInto};
28use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
29use timely::dataflow::operators::Capability;
30use timely::dataflow::operators::generic::builder_rc::{
31 OperatorBuilder as OperatorBuilderRc, OperatorBuilder,
32};
33use timely::dataflow::operators::generic::operator::{self, Operator};
34use timely::dataflow::operators::generic::{
35 InputHandleCore, OperatorInfo, OutputBuilder, OutputBuilderSession,
36};
37use timely::dataflow::{Scope, Stream, StreamVec};
38use timely::progress::{Antichain, Timestamp};
39use timely::{Container, ContainerBuilder, PartialOrder};
40
41pub trait StreamExt<G, C1>
43where
44 C1: Container + DrainContainer + Clone + 'static,
45 G: Scope,
46{
47 fn unary_fallible<DCB, ECB, B, P>(
58 self,
59 pact: P,
60 name: &str,
61 constructor: B,
62 ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
63 where
64 DCB: ContainerBuilder,
65 ECB: ContainerBuilder,
66 B: FnOnce(
67 Capability<G::Timestamp>,
68 OperatorInfo,
69 ) -> Box<
70 dyn FnMut(
71 &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
72 &mut OutputBuilderSession<'_, G::Timestamp, DCB>,
73 &mut OutputBuilderSession<'_, G::Timestamp, ECB>,
74 ) + 'static,
75 >,
76 P: ParallelizationContract<G::Timestamp, C1>;
77
78 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
83 self,
84 name: &str,
85 logic: L,
86 ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
87 where
88 DCB: ContainerBuilder + PushInto<D2>,
89 ECB: ContainerBuilder + PushInto<E>,
90 I: IntoIterator<Item = Result<D2, E>>,
91 L: for<'a> FnMut(C1::Item<'a>) -> I + 'static;
92
93 fn expire_stream_at(self, name: &str, expiration: G::Timestamp) -> Stream<G, C1>;
95}
96
97pub trait CollectionExt<G, D1, R>: Sized
99where
100 G: Scope,
101 R: Semigroup,
102{
103 fn empty(scope: &G) -> VecCollection<G, D1, R>;
105
106 fn map_fallible<DCB, ECB, D2, E, L>(
115 self,
116 name: &str,
117 mut logic: L,
118 ) -> (VecCollection<G, D2, R>, VecCollection<G, E, R>)
119 where
120 DCB: ContainerBuilder<Container = Vec<(D2, G::Timestamp, R)>>
121 + PushInto<(D2, G::Timestamp, R)>,
122 ECB: ContainerBuilder<Container = Vec<(E, G::Timestamp, R)>>
123 + PushInto<(E, G::Timestamp, R)>,
124 D2: Clone + 'static,
125 E: Clone + 'static,
126 L: FnMut(D1) -> Result<D2, E> + 'static,
127 {
128 self.flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |record| Some(logic(record)))
129 }
130
131 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
136 self,
137 name: &str,
138 logic: L,
139 ) -> (Collection<G, DCB::Container>, Collection<G, ECB::Container>)
140 where
141 DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
142 ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
143 D2: Clone + 'static,
144 E: Clone + 'static,
145 I: IntoIterator<Item = Result<D2, E>>,
146 L: FnMut(D1) -> I + 'static;
147
148 fn expire_collection_at(self, name: &str, expiration: G::Timestamp) -> VecCollection<G, D1, R>;
150
151 fn explode_one<D2, R2, L>(self, logic: L) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
156 where
157 D2: differential_dataflow::Data,
158 R2: Semigroup + Multiply<R>,
159 <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
160 L: FnMut(D1) -> (D2, R2) + 'static,
161 G::Timestamp: Lattice;
162
163 fn ensure_monotonic<E, IE>(
168 self,
169 into_err: IE,
170 ) -> (VecCollection<G, D1, R>, VecCollection<G, E, R>)
171 where
172 E: Clone + 'static,
173 IE: Fn(D1, R) -> (E, R) + 'static,
174 R: num_traits::sign::Signed;
175
176 fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
179 where
180 D1: differential_dataflow::ExchangeData + Hash + Columnation,
181 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
182 G::Timestamp: Lattice + Columnation,
183 Ba: Batcher<
184 Input = Vec<((D1, ()), G::Timestamp, R)>,
185 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
186 Time = G::Timestamp,
187 > + 'static;
188
189 fn consolidate_named<Ba>(self, name: &str) -> Self
191 where
192 D1: differential_dataflow::ExchangeData + Hash + Columnation,
193 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
194 G::Timestamp: Lattice + Columnation,
195 Ba: Batcher<
196 Input = Vec<((D1, ()), G::Timestamp, R)>,
197 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
198 Time = G::Timestamp,
199 > + 'static;
200}
201
202impl<G, C1> StreamExt<G, C1> for Stream<G, C1>
203where
204 C1: Container + DrainContainer + Clone + 'static,
205 G: Scope,
206{
207 fn unary_fallible<DCB, ECB, B, P>(
208 self,
209 pact: P,
210 name: &str,
211 constructor: B,
212 ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
213 where
214 DCB: ContainerBuilder,
215 ECB: ContainerBuilder,
216 B: FnOnce(
217 Capability<G::Timestamp>,
218 OperatorInfo,
219 ) -> Box<
220 dyn FnMut(
221 &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
222 &mut OutputBuilderSession<'_, G::Timestamp, DCB>,
223 &mut OutputBuilderSession<'_, G::Timestamp, ECB>,
224 ) + 'static,
225 >,
226 P: ParallelizationContract<G::Timestamp, C1>,
227 {
228 let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
229 builder.set_notify(false);
230
231 let operator_info = builder.operator_info();
232
233 let mut input = builder.new_input(self.clone(), pact);
234 let (ok_output, ok_stream) = builder.new_output();
235 let mut ok_output = OutputBuilder::from(ok_output);
236 let (err_output, err_stream) = builder.new_output();
237 let mut err_output = OutputBuilder::from(err_output);
238
239 builder.build(move |mut capabilities| {
240 let capability = capabilities.pop().unwrap();
242 let mut logic = constructor(capability, operator_info);
243 move |_frontiers| {
244 let mut ok_output_handle = ok_output.activate();
245 let mut err_output_handle = err_output.activate();
246 logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
247 }
248 });
249
250 (ok_stream, err_stream)
251 }
252
253 #[allow(clippy::redundant_closure)]
259 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
260 self,
261 name: &str,
262 mut logic: L,
263 ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
264 where
265 DCB: ContainerBuilder + PushInto<D2>,
266 ECB: ContainerBuilder + PushInto<E>,
267 I: IntoIterator<Item = Result<D2, E>>,
268 L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,
269 {
270 self.unary_fallible::<DCB, ECB, _, _>(Pipeline, name, move |_, _| {
271 Box::new(move |input, ok_output, err_output| {
272 input.for_each_time(|time, data| {
273 let mut ok_session = ok_output.session_with_builder(&time);
274 let mut err_session = err_output.session_with_builder(&time);
275 for r in data
276 .flat_map(DrainContainer::drain)
277 .flat_map(|d1| logic(d1))
278 {
279 match r {
280 Ok(d2) => ok_session.give(d2),
281 Err(e) => err_session.give(e),
282 }
283 }
284 })
285 })
286 })
287 }
288
289 fn expire_stream_at(self, name: &str, expiration: G::Timestamp) -> Stream<G, C1> {
290 let name = format!("expire_stream_at({name})");
291 self.unary_frontier(Pipeline, &name.clone(), move |cap, _| {
292 let cap = Some(cap.delayed(&expiration));
296 let mut warned = false;
297 move |(input, frontier), output| {
298 let _ = ∩
299 let frontier = frontier.frontier();
300 if !frontier.less_than(&expiration) && !warned {
301 tracing::warn!(
309 name = name,
310 frontier = ?frontier,
311 expiration = ?expiration,
312 "frontier not less than expiration"
313 );
314 warned = true;
315 }
316 input.for_each(|time, data| {
317 let mut session = output.session(&time);
318 session.give_container(data);
319 });
320 }
321 })
322 }
323}
324
325impl<G, D1, R> CollectionExt<G, D1, R> for VecCollection<G, D1, R>
326where
327 G: Scope,
328 G::Timestamp: Clone + 'static,
329 D1: Clone + 'static,
330 R: Semigroup + 'static,
331{
332 fn empty(scope: &G) -> VecCollection<G, D1, R> {
333 operator::empty(scope).as_collection()
334 }
335
336 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
337 self,
338 name: &str,
339 mut logic: L,
340 ) -> (Collection<G, DCB::Container>, Collection<G, ECB::Container>)
341 where
342 DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
343 ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
344 D2: Clone + 'static,
345 E: Clone + 'static,
346 I: IntoIterator<Item = Result<D2, E>>,
347 L: FnMut(D1) -> I + 'static,
348 {
349 let (ok_stream, err_stream) =
350 self.inner
351 .flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |(d1, t, r)| {
352 logic(d1).into_iter().map(move |res| match res {
353 Ok(d2) => Ok((d2, t.clone(), r.clone())),
354 Err(e) => Err((e, t.clone(), r.clone())),
355 })
356 });
357 (ok_stream.as_collection(), err_stream.as_collection())
358 }
359
360 fn expire_collection_at(self, name: &str, expiration: G::Timestamp) -> VecCollection<G, D1, R> {
361 self.inner
362 .expire_stream_at(name, expiration)
363 .as_collection()
364 }
365
366 fn explode_one<D2, R2, L>(
367 self,
368 mut logic: L,
369 ) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
370 where
371 D2: differential_dataflow::Data,
372 R2: Semigroup + Multiply<R>,
373 <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
374 L: FnMut(D1) -> (D2, R2) + 'static,
375 G::Timestamp: Lattice,
376 {
377 self.inner
378 .clone()
379 .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(
380 Pipeline,
381 "ExplodeOne",
382 move |_, _| {
383 move |input, output| {
384 input.for_each(|time, data| {
385 output
386 .session_with_builder(&time)
387 .give_iterator(data.drain(..).map(|(x, t, d)| {
388 let (x, d2) = logic(x);
389 (x, t, d2.multiply(&d))
390 }));
391 });
392 }
393 },
394 )
395 .as_collection()
396 }
397
398 fn ensure_monotonic<E, IE>(
399 self,
400 into_err: IE,
401 ) -> (VecCollection<G, D1, R>, VecCollection<G, E, R>)
402 where
403 E: Clone + 'static,
404 IE: Fn(D1, R) -> (E, R) + 'static,
405 R: num_traits::sign::Signed,
406 {
407 let (oks, errs) = self
408 .inner
409 .unary_fallible(Pipeline, "EnsureMonotonic", move |_, _| {
410 Box::new(move |input, ok_output, err_output| {
411 input.for_each(|time, data| {
412 let mut ok_session = ok_output.session(&time);
413 let mut err_session = err_output.session(&time);
414 for (x, t, d) in data.drain(..) {
415 if d.is_positive() {
416 ok_session.give((x, t, d))
417 } else {
418 let (e, d2) = into_err(x, d);
419 err_session.give((e, t, d2))
420 }
421 }
422 })
423 })
424 });
425 (oks.as_collection(), errs.as_collection())
426 }
427
428 fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
429 where
430 D1: differential_dataflow::ExchangeData + Hash + Columnation,
431 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
432 G::Timestamp: Lattice + Ord + Columnation,
433 Ba: Batcher<
434 Input = Vec<((D1, ()), G::Timestamp, R)>,
435 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
436 Time = G::Timestamp,
437 > + 'static,
438 {
439 if must_consolidate {
440 let random_state = ahash::RandomState::with_seeds(
458 0x243f_6a88_85a3_08d3,
459 0x1319_8a2e_0370_7344,
460 0xa409_3822_299f_31d0,
461 0x082e_fa98_ec4e_6c89,
462 );
463 let exchange = Exchange::new(move |update: &((D1, _), G::Timestamp, R)| {
464 let data = &(update.0).0;
465 let mut h = random_state.build_hasher();
466 data.hash(&mut h);
467 h.finish()
468 });
469 consolidate_pact::<Ba, _, _>(self.map(|k| (k, ())).inner, exchange, name)
470 .unary(Pipeline, "unpack consolidated", |_, _| {
471 |input, output| {
472 input.for_each(|time, data| {
473 let mut session = output.session(&time);
474 for ((k, ()), t, d) in
475 data.iter().flatten().flat_map(|chunk| chunk.iter())
476 {
477 session.give((k.clone(), t.clone(), d.clone()))
478 }
479 })
480 }
481 })
482 .as_collection()
483 } else {
484 self
485 }
486 }
487
488 fn consolidate_named<Ba>(self, name: &str) -> Self
489 where
490 D1: differential_dataflow::ExchangeData + Hash + Columnation,
491 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
492 G::Timestamp: Lattice + Ord + Columnation,
493 Ba: Batcher<
494 Input = Vec<((D1, ()), G::Timestamp, R)>,
495 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
496 Time = G::Timestamp,
497 > + 'static,
498 {
499 let exchange =
500 Exchange::new(move |update: &((D1, ()), G::Timestamp, R)| (update.0).0.hashed());
501
502 consolidate_pact::<Ba, _, _>(self.map(|k| (k, ())).inner, exchange, name)
503 .unary(Pipeline, &format!("Unpack {name}"), |_, _| {
504 |input, output| {
505 input.for_each(|time, data| {
506 let mut session = output.session(&time);
507 for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
508 {
509 session.give((k.clone(), t.clone(), d.clone()))
510 }
511 })
512 }
513 })
514 .as_collection()
515 }
516}
517
518pub fn consolidate_pact<Ba, P, G>(
525 stream: Stream<G, Ba::Input>,
526 pact: P,
527 name: &str,
528) -> StreamVec<G, Vec<Ba::Output>>
529where
530 G: Scope,
531 Ba: Batcher<Time = G::Timestamp> + 'static,
532 Ba::Input: Container + Clone + 'static,
533 Ba::Output: Clone,
534 P: ParallelizationContract<G::Timestamp, Ba::Input>,
535{
536 let logger = stream
537 .scope()
538 .logger_for("differential/arrange")
539 .map(Into::into);
540 stream.unary_frontier(pact, name, |_cap, info| {
541 let mut batcher = Ba::new(logger, info.global_id);
544 let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
546 let mut prev_frontier = Antichain::from_elem(G::Timestamp::minimum());
547
548 move |(input, frontier), output| {
549 input.for_each(|cap, data| {
550 capabilities.insert(cap.retain(0));
551 batcher.push_container(data);
552 });
553
554 if prev_frontier.borrow() != frontier.frontier() {
555 if capabilities
556 .elements()
557 .iter()
558 .any(|c| !frontier.less_equal(c.time()))
559 {
560 let mut upper = Antichain::new(); for (index, capability) in capabilities.elements().iter().enumerate() {
564 if !frontier.less_equal(capability.time()) {
565 upper.clear();
569 for time in frontier.frontier().iter() {
570 upper.insert(time.clone());
571 }
572 for other_capability in &capabilities.elements()[(index + 1)..] {
573 upper.insert(other_capability.time().clone());
574 }
575
576 let mut session = output.session(&capabilities.elements()[index]);
578 let output =
580 batcher.seal::<ConsolidateBuilder<_, Ba::Output>>(upper.clone());
581 session.give(output);
582 }
583 }
584
585 let mut new_capabilities = Antichain::new();
591 for time in batcher.frontier().iter() {
592 if let Some(capability) = capabilities
593 .elements()
594 .iter()
595 .find(|c| c.time().less_equal(time))
596 {
597 new_capabilities.insert(capability.delayed(time));
598 } else {
599 panic!("failed to find capability");
600 }
601 }
602
603 capabilities = new_capabilities;
604 }
605
606 prev_frontier.clear();
607 prev_frontier.extend(frontier.frontier().iter().cloned());
608 }
609 }
610 })
611}
612
613struct ConsolidateBuilder<T, I> {
615 _marker: PhantomData<(T, I)>,
616}
617
618impl<T, I> Builder for ConsolidateBuilder<T, I>
619where
620 T: Timestamp,
621 I: Clone,
622{
623 type Input = I;
624 type Time = T;
625 type Output = Vec<I>;
626
627 fn new() -> Self {
628 Self {
629 _marker: PhantomData,
630 }
631 }
632
633 fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
634 Self::new()
635 }
636
637 fn push(&mut self, _chunk: &mut Self::Input) {
638 unimplemented!("ConsolidateBuilder::push")
639 }
640
641 fn done(self, _: Description<Self::Time>) -> Self::Output {
642 unimplemented!("ConsolidateBuilder::done")
643 }
644
645 fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
646 std::mem::take(chain)
647 }
648}
649
650pub trait ConcatenateFlatten<G: Scope, C: Container + DrainContainer> {
652 fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<G, CB::Container>
672 where
673 I: IntoIterator<Item = Stream<G, C>>,
674 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>;
675}
676
677impl<G, C> ConcatenateFlatten<G, C> for Stream<G, C>
678where
679 G: Scope,
680 C: Container + DrainContainer + Clone + 'static,
681{
682 fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<G, CB::Container>
683 where
684 I: IntoIterator<Item = Stream<G, C>>,
685 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
686 {
687 self.scope()
688 .concatenate_flatten::<_, CB>(Some(Clone::clone(self)).into_iter().chain(sources))
689 }
690}
691
692impl<G, C> ConcatenateFlatten<G, C> for G
693where
694 G: Scope,
695 C: Container + DrainContainer,
696{
697 fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<G, CB::Container>
698 where
699 I: IntoIterator<Item = Stream<G, C>>,
700 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
701 {
702 let mut builder = OperatorBuilder::new("ConcatenateFlatten".to_string(), self.clone());
703 builder.set_notify(false);
704
705 let mut handles = sources
707 .into_iter()
708 .map(|s| builder.new_input(s, Pipeline))
709 .collect::<Vec<_>>();
710
711 let (output, result) = builder.new_output::<CB::Container>();
713 let mut output = OutputBuilder::<_, CB>::from(output);
714
715 builder.build(move |_capability| {
716 move |_frontier| {
717 let mut output = output.activate();
718 for handle in handles.iter_mut() {
719 handle.for_each_time(|time, data| {
720 output
721 .session_with_builder(&time)
722 .give_iterator(data.flat_map(DrainContainer::drain));
723 })
724 }
725 }
726 });
727
728 result
729 }
730}