1use std::hash::{BuildHasher, Hash, Hasher};
19use std::marker::PhantomData;
20
21use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
22use differential_dataflow::containers::{Columnation, TimelyStack};
23use differential_dataflow::difference::{Multiply, Semigroup};
24use differential_dataflow::lattice::Lattice;
25use differential_dataflow::trace::{Batcher, Builder, Description};
26use differential_dataflow::{AsCollection, Collection, Hashable, VecCollection};
27use timely::container::{DrainContainer, PushInto};
28use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
29use timely::dataflow::operators::Capability;
30use timely::dataflow::operators::generic::builder_rc::{
31 OperatorBuilder as OperatorBuilderRc, OperatorBuilder,
32};
33use timely::dataflow::operators::generic::operator::{self, Operator};
34use timely::dataflow::operators::generic::{
35 InputHandleCore, OperatorInfo, OutputBuilder, OutputBuilderSession,
36};
37use timely::dataflow::{Scope, Stream, StreamVec};
38use timely::progress::operate::FrontierInterest;
39use timely::progress::{Antichain, Timestamp};
40use timely::{Container, ContainerBuilder, PartialOrder};
41
42pub trait StreamExt<G, C1>
44where
45 C1: Container + DrainContainer + Clone + 'static,
46 G: Scope,
47{
48 fn unary_fallible<DCB, ECB, B, P>(
59 self,
60 pact: P,
61 name: &str,
62 constructor: B,
63 ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
64 where
65 DCB: ContainerBuilder,
66 ECB: ContainerBuilder,
67 B: FnOnce(
68 Capability<G::Timestamp>,
69 OperatorInfo,
70 ) -> Box<
71 dyn FnMut(
72 &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
73 &mut OutputBuilderSession<'_, G::Timestamp, DCB>,
74 &mut OutputBuilderSession<'_, G::Timestamp, ECB>,
75 ) + 'static,
76 >,
77 P: ParallelizationContract<G::Timestamp, C1>;
78
79 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
84 self,
85 name: &str,
86 logic: L,
87 ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
88 where
89 DCB: ContainerBuilder + PushInto<D2>,
90 ECB: ContainerBuilder + PushInto<E>,
91 I: IntoIterator<Item = Result<D2, E>>,
92 L: for<'a> FnMut(C1::Item<'a>) -> I + 'static;
93
94 fn expire_stream_at(self, name: &str, expiration: G::Timestamp) -> Stream<G, C1>;
96}
97
98pub trait CollectionExt<G, D1, R>: Sized
100where
101 G: Scope,
102 R: Semigroup,
103{
104 fn empty(scope: &G) -> VecCollection<G, D1, R>;
106
107 fn map_fallible<DCB, ECB, D2, E, L>(
116 self,
117 name: &str,
118 mut logic: L,
119 ) -> (VecCollection<G, D2, R>, VecCollection<G, E, R>)
120 where
121 DCB: ContainerBuilder<Container = Vec<(D2, G::Timestamp, R)>>
122 + PushInto<(D2, G::Timestamp, R)>,
123 ECB: ContainerBuilder<Container = Vec<(E, G::Timestamp, R)>>
124 + PushInto<(E, G::Timestamp, R)>,
125 D2: Clone + 'static,
126 E: Clone + 'static,
127 L: FnMut(D1) -> Result<D2, E> + 'static,
128 {
129 self.flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |record| Some(logic(record)))
130 }
131
132 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
137 self,
138 name: &str,
139 logic: L,
140 ) -> (Collection<G, DCB::Container>, Collection<G, ECB::Container>)
141 where
142 DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
143 ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
144 D2: Clone + 'static,
145 E: Clone + 'static,
146 I: IntoIterator<Item = Result<D2, E>>,
147 L: FnMut(D1) -> I + 'static;
148
149 fn expire_collection_at(self, name: &str, expiration: G::Timestamp) -> VecCollection<G, D1, R>;
151
152 fn explode_one<D2, R2, L>(self, logic: L) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
157 where
158 D2: differential_dataflow::Data,
159 R2: Semigroup + Multiply<R>,
160 <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
161 L: FnMut(D1) -> (D2, R2) + 'static,
162 G::Timestamp: Lattice;
163
164 fn ensure_monotonic<E, IE>(
169 self,
170 into_err: IE,
171 ) -> (VecCollection<G, D1, R>, VecCollection<G, E, R>)
172 where
173 E: Clone + 'static,
174 IE: Fn(D1, R) -> (E, R) + 'static,
175 R: num_traits::sign::Signed;
176
177 fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
180 where
181 D1: differential_dataflow::ExchangeData + Hash + Columnation,
182 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
183 G::Timestamp: Lattice + Columnation,
184 Ba: Batcher<
185 Input = Vec<((D1, ()), G::Timestamp, R)>,
186 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
187 Time = G::Timestamp,
188 > + 'static;
189
190 fn consolidate_named<Ba>(self, name: &str) -> Self
192 where
193 D1: differential_dataflow::ExchangeData + Hash + Columnation,
194 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
195 G::Timestamp: Lattice + Columnation,
196 Ba: Batcher<
197 Input = Vec<((D1, ()), G::Timestamp, R)>,
198 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
199 Time = G::Timestamp,
200 > + 'static;
201}
202
203impl<G, C1> StreamExt<G, C1> for Stream<G, C1>
204where
205 C1: Container + DrainContainer + Clone + 'static,
206 G: Scope,
207{
208 fn unary_fallible<DCB, ECB, B, P>(
209 self,
210 pact: P,
211 name: &str,
212 constructor: B,
213 ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
214 where
215 DCB: ContainerBuilder,
216 ECB: ContainerBuilder,
217 B: FnOnce(
218 Capability<G::Timestamp>,
219 OperatorInfo,
220 ) -> Box<
221 dyn FnMut(
222 &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
223 &mut OutputBuilderSession<'_, G::Timestamp, DCB>,
224 &mut OutputBuilderSession<'_, G::Timestamp, ECB>,
225 ) + 'static,
226 >,
227 P: ParallelizationContract<G::Timestamp, C1>,
228 {
229 let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
230
231 let operator_info = builder.operator_info();
232
233 let mut input = builder.new_input(self.clone(), pact);
234 builder.set_notify_for(0, FrontierInterest::Never);
235 let (ok_output, ok_stream) = builder.new_output();
236 let mut ok_output = OutputBuilder::from(ok_output);
237 let (err_output, err_stream) = builder.new_output();
238 let mut err_output = OutputBuilder::from(err_output);
239
240 builder.build(move |mut capabilities| {
241 let capability = capabilities.pop().unwrap();
243 let mut logic = constructor(capability, operator_info);
244 move |_frontiers| {
245 let mut ok_output_handle = ok_output.activate();
246 let mut err_output_handle = err_output.activate();
247 logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
248 }
249 });
250
251 (ok_stream, err_stream)
252 }
253
254 #[allow(clippy::redundant_closure)]
260 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
261 self,
262 name: &str,
263 mut logic: L,
264 ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
265 where
266 DCB: ContainerBuilder + PushInto<D2>,
267 ECB: ContainerBuilder + PushInto<E>,
268 I: IntoIterator<Item = Result<D2, E>>,
269 L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,
270 {
271 self.unary_fallible::<DCB, ECB, _, _>(Pipeline, name, move |_, _| {
272 Box::new(move |input, ok_output, err_output| {
273 input.for_each_time(|time, data| {
274 let mut ok_session = ok_output.session_with_builder(&time);
275 let mut err_session = err_output.session_with_builder(&time);
276 for r in data
277 .flat_map(DrainContainer::drain)
278 .flat_map(|d1| logic(d1))
279 {
280 match r {
281 Ok(d2) => ok_session.give(d2),
282 Err(e) => err_session.give(e),
283 }
284 }
285 })
286 })
287 })
288 }
289
290 fn expire_stream_at(self, name: &str, expiration: G::Timestamp) -> Stream<G, C1> {
291 let name = format!("expire_stream_at({name})");
292 self.unary_frontier(Pipeline, &name.clone(), move |cap, _| {
293 let cap = Some(cap.delayed(&expiration));
297 let mut warned = false;
298 move |(input, frontier), output| {
299 let _ = ∩
300 let frontier = frontier.frontier();
301 if !frontier.less_than(&expiration) && !warned {
302 tracing::warn!(
310 name = name,
311 frontier = ?frontier,
312 expiration = ?expiration,
313 "frontier not less than expiration"
314 );
315 warned = true;
316 }
317 input.for_each(|time, data| {
318 let mut session = output.session(&time);
319 session.give_container(data);
320 });
321 }
322 })
323 }
324}
325
326impl<G, D1, R> CollectionExt<G, D1, R> for VecCollection<G, D1, R>
327where
328 G: Scope,
329 G::Timestamp: Clone + 'static,
330 D1: Clone + 'static,
331 R: Semigroup + 'static,
332{
333 fn empty(scope: &G) -> VecCollection<G, D1, R> {
334 operator::empty(scope).as_collection()
335 }
336
337 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
338 self,
339 name: &str,
340 mut logic: L,
341 ) -> (Collection<G, DCB::Container>, Collection<G, ECB::Container>)
342 where
343 DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
344 ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
345 D2: Clone + 'static,
346 E: Clone + 'static,
347 I: IntoIterator<Item = Result<D2, E>>,
348 L: FnMut(D1) -> I + 'static,
349 {
350 let (ok_stream, err_stream) =
351 self.inner
352 .flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |(d1, t, r)| {
353 logic(d1).into_iter().map(move |res| match res {
354 Ok(d2) => Ok((d2, t.clone(), r.clone())),
355 Err(e) => Err((e, t.clone(), r.clone())),
356 })
357 });
358 (ok_stream.as_collection(), err_stream.as_collection())
359 }
360
361 fn expire_collection_at(self, name: &str, expiration: G::Timestamp) -> VecCollection<G, D1, R> {
362 self.inner
363 .expire_stream_at(name, expiration)
364 .as_collection()
365 }
366
367 fn explode_one<D2, R2, L>(
368 self,
369 mut logic: L,
370 ) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
371 where
372 D2: differential_dataflow::Data,
373 R2: Semigroup + Multiply<R>,
374 <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
375 L: FnMut(D1) -> (D2, R2) + 'static,
376 G::Timestamp: Lattice,
377 {
378 self.inner
379 .clone()
380 .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(
381 Pipeline,
382 "ExplodeOne",
383 move |_, _| {
384 move |input, output| {
385 input.for_each(|time, data| {
386 output
387 .session_with_builder(&time)
388 .give_iterator(data.drain(..).map(|(x, t, d)| {
389 let (x, d2) = logic(x);
390 (x, t, d2.multiply(&d))
391 }));
392 });
393 }
394 },
395 )
396 .as_collection()
397 }
398
399 fn ensure_monotonic<E, IE>(
400 self,
401 into_err: IE,
402 ) -> (VecCollection<G, D1, R>, VecCollection<G, E, R>)
403 where
404 E: Clone + 'static,
405 IE: Fn(D1, R) -> (E, R) + 'static,
406 R: num_traits::sign::Signed,
407 {
408 let (oks, errs) = self
409 .inner
410 .unary_fallible(Pipeline, "EnsureMonotonic", move |_, _| {
411 Box::new(move |input, ok_output, err_output| {
412 input.for_each(|time, data| {
413 let mut ok_session = ok_output.session(&time);
414 let mut err_session = err_output.session(&time);
415 for (x, t, d) in data.drain(..) {
416 if d.is_positive() {
417 ok_session.give((x, t, d))
418 } else {
419 let (e, d2) = into_err(x, d);
420 err_session.give((e, t, d2))
421 }
422 }
423 })
424 })
425 });
426 (oks.as_collection(), errs.as_collection())
427 }
428
429 fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
430 where
431 D1: differential_dataflow::ExchangeData + Hash + Columnation,
432 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
433 G::Timestamp: Lattice + Ord + Columnation,
434 Ba: Batcher<
435 Input = Vec<((D1, ()), G::Timestamp, R)>,
436 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
437 Time = G::Timestamp,
438 > + 'static,
439 {
440 if must_consolidate {
441 let random_state = ahash::RandomState::with_seeds(
459 0x243f_6a88_85a3_08d3,
460 0x1319_8a2e_0370_7344,
461 0xa409_3822_299f_31d0,
462 0x082e_fa98_ec4e_6c89,
463 );
464 let exchange = Exchange::new(move |update: &((D1, _), G::Timestamp, R)| {
465 let data = &(update.0).0;
466 let mut h = random_state.build_hasher();
467 data.hash(&mut h);
468 h.finish()
469 });
470 consolidate_pact::<Ba, _, _>(self.map(|k| (k, ())).inner, exchange, name)
471 .unary(Pipeline, "unpack consolidated", |_, _| {
472 |input, output| {
473 input.for_each(|time, data| {
474 let mut session = output.session(&time);
475 for ((k, ()), t, d) in
476 data.iter().flatten().flat_map(|chunk| chunk.iter())
477 {
478 session.give((k.clone(), t.clone(), d.clone()))
479 }
480 })
481 }
482 })
483 .as_collection()
484 } else {
485 self
486 }
487 }
488
489 fn consolidate_named<Ba>(self, name: &str) -> Self
490 where
491 D1: differential_dataflow::ExchangeData + Hash + Columnation,
492 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
493 G::Timestamp: Lattice + Ord + Columnation,
494 Ba: Batcher<
495 Input = Vec<((D1, ()), G::Timestamp, R)>,
496 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
497 Time = G::Timestamp,
498 > + 'static,
499 {
500 let exchange =
501 Exchange::new(move |update: &((D1, ()), G::Timestamp, R)| (update.0).0.hashed());
502
503 consolidate_pact::<Ba, _, _>(self.map(|k| (k, ())).inner, exchange, name)
504 .unary(Pipeline, &format!("Unpack {name}"), |_, _| {
505 |input, output| {
506 input.for_each(|time, data| {
507 let mut session = output.session(&time);
508 for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
509 {
510 session.give((k.clone(), t.clone(), d.clone()))
511 }
512 })
513 }
514 })
515 .as_collection()
516 }
517}
518
519pub fn consolidate_pact<Ba, P, G>(
526 stream: Stream<G, Ba::Input>,
527 pact: P,
528 name: &str,
529) -> StreamVec<G, Vec<Ba::Output>>
530where
531 G: Scope,
532 Ba: Batcher<Time = G::Timestamp> + 'static,
533 Ba::Input: Container + Clone + 'static,
534 Ba::Output: Clone,
535 P: ParallelizationContract<G::Timestamp, Ba::Input>,
536{
537 let logger = stream
538 .scope()
539 .logger_for("differential/arrange")
540 .map(Into::into);
541 stream.unary_frontier(pact, name, |_cap, info| {
542 let mut batcher = Ba::new(logger, info.global_id);
545 let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
547 let mut prev_frontier = Antichain::from_elem(G::Timestamp::minimum());
548
549 move |(input, frontier), output| {
550 input.for_each(|cap, data| {
551 capabilities.insert(cap.retain(0));
552 batcher.push_container(data);
553 });
554
555 if prev_frontier.borrow() != frontier.frontier() {
556 if capabilities
557 .elements()
558 .iter()
559 .any(|c| !frontier.less_equal(c.time()))
560 {
561 let mut upper = Antichain::new(); for (index, capability) in capabilities.elements().iter().enumerate() {
565 if !frontier.less_equal(capability.time()) {
566 upper.clear();
570 for time in frontier.frontier().iter() {
571 upper.insert(time.clone());
572 }
573 for other_capability in &capabilities.elements()[(index + 1)..] {
574 upper.insert(other_capability.time().clone());
575 }
576
577 let mut session = output.session(&capabilities.elements()[index]);
579 let output =
581 batcher.seal::<ConsolidateBuilder<_, Ba::Output>>(upper.clone());
582 session.give(output);
583 }
584 }
585
586 let mut new_capabilities = Antichain::new();
592 for time in batcher.frontier().iter() {
593 if let Some(capability) = capabilities
594 .elements()
595 .iter()
596 .find(|c| c.time().less_equal(time))
597 {
598 new_capabilities.insert(capability.delayed(time));
599 } else {
600 panic!("failed to find capability");
601 }
602 }
603
604 capabilities = new_capabilities;
605 }
606
607 prev_frontier.clear();
608 prev_frontier.extend(frontier.frontier().iter().cloned());
609 }
610 }
611 })
612}
613
614struct ConsolidateBuilder<T, I> {
616 _marker: PhantomData<(T, I)>,
617}
618
619impl<T, I> Builder for ConsolidateBuilder<T, I>
620where
621 T: Timestamp,
622 I: Clone,
623{
624 type Input = I;
625 type Time = T;
626 type Output = Vec<I>;
627
628 fn new() -> Self {
629 Self {
630 _marker: PhantomData,
631 }
632 }
633
634 fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
635 Self::new()
636 }
637
638 fn push(&mut self, _chunk: &mut Self::Input) {
639 unimplemented!("ConsolidateBuilder::push")
640 }
641
642 fn done(self, _: Description<Self::Time>) -> Self::Output {
643 unimplemented!("ConsolidateBuilder::done")
644 }
645
646 fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
647 std::mem::take(chain)
648 }
649}
650
651pub trait ConcatenateFlatten<G: Scope, C: Container + DrainContainer> {
653 fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<G, CB::Container>
673 where
674 I: IntoIterator<Item = Stream<G, C>>,
675 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>;
676}
677
678impl<G, C> ConcatenateFlatten<G, C> for Stream<G, C>
679where
680 G: Scope,
681 C: Container + DrainContainer + Clone + 'static,
682{
683 fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<G, CB::Container>
684 where
685 I: IntoIterator<Item = Stream<G, C>>,
686 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
687 {
688 self.scope()
689 .concatenate_flatten::<_, CB>(Some(Clone::clone(self)).into_iter().chain(sources))
690 }
691}
692
693impl<G, C> ConcatenateFlatten<G, C> for G
694where
695 G: Scope,
696 C: Container + DrainContainer,
697{
698 fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<G, CB::Container>
699 where
700 I: IntoIterator<Item = Stream<G, C>>,
701 CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
702 {
703 let mut builder = OperatorBuilder::new("ConcatenateFlatten".to_string(), self.clone());
704
705 let mut handles = sources
707 .into_iter()
708 .map(|s| builder.new_input(s, Pipeline))
709 .collect::<Vec<_>>();
710 for i in 0..handles.len() {
711 builder.set_notify_for(i, FrontierInterest::Never);
712 }
713
714 let (output, result) = builder.new_output::<CB::Container>();
716 let mut output = OutputBuilder::<_, CB>::from(output);
717
718 builder.build(move |_capability| {
719 move |_frontier| {
720 let mut output = output.activate();
721 for handle in handles.iter_mut() {
722 handle.for_each_time(|time, data| {
723 output
724 .session_with_builder(&time)
725 .give_iterator(data.flat_map(DrainContainer::drain));
726 })
727 }
728 }
729 });
730
731 result
732 }
733}