1use std::future::Future;
19use std::hash::{BuildHasher, Hash, Hasher};
20use std::marker::PhantomData;
21use std::rc::Weak;
22
23use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
24use differential_dataflow::containers::{Columnation, TimelyStack};
25use differential_dataflow::difference::{Multiply, Semigroup};
26use differential_dataflow::lattice::Lattice;
27use differential_dataflow::logging::DifferentialEventBuilder;
28use differential_dataflow::trace::{Batcher, Builder, Description};
29use differential_dataflow::{AsCollection, Collection, Hashable};
30use timely::container::{ContainerBuilder, PushInto};
31use timely::dataflow::channels::ContainerBytes;
32use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
33use timely::dataflow::channels::pushers::Tee;
34use timely::dataflow::operators::Capability;
35use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
36use timely::dataflow::operators::generic::operator::{self, Operator};
37use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo, OutputHandleCore};
38use timely::dataflow::{Scope, Stream, StreamCore};
39use timely::progress::{Antichain, Timestamp};
40use timely::{Container, Data, PartialOrder};
41
42use crate::builder_async::{
43 AsyncInputHandle, AsyncOutputHandle, ConnectedToOne, Disconnected,
44 OperatorBuilder as OperatorBuilderAsync,
45};
46
47pub trait StreamExt<G, C1>
49where
50 C1: Container,
51 G: Scope,
52{
53 fn unary_fallible<DCB, ECB, B, P>(
64 &self,
65 pact: P,
66 name: &str,
67 constructor: B,
68 ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
69 where
70 DCB: ContainerBuilder,
71 ECB: ContainerBuilder,
72 B: FnOnce(
73 Capability<G::Timestamp>,
74 OperatorInfo,
75 ) -> Box<
76 dyn FnMut(
77 &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
78 &mut OutputHandleCore<G::Timestamp, DCB, Tee<G::Timestamp, DCB::Container>>,
79 &mut OutputHandleCore<G::Timestamp, ECB, Tee<G::Timestamp, ECB::Container>>,
80 ) + 'static,
81 >,
82 P: ParallelizationContract<G::Timestamp, C1>;
83
84 fn unary_async<CB, P, B, BFut>(
88 &self,
89 pact: P,
90 name: String,
91 constructor: B,
92 ) -> StreamCore<G, CB::Container>
93 where
94 CB: ContainerBuilder,
95 B: FnOnce(
96 Capability<G::Timestamp>,
97 OperatorInfo,
98 AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>,
99 AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
100 ) -> BFut,
101 BFut: Future + 'static,
102 P: ParallelizationContract<G::Timestamp, C1>;
103
104 fn binary_async<C2, CB, P1, P2, B, BFut>(
108 &self,
109 other: &StreamCore<G, C2>,
110 pact1: P1,
111 pact2: P2,
112 name: String,
113 constructor: B,
114 ) -> StreamCore<G, CB::Container>
115 where
116 C2: Container + 'static,
117 CB: ContainerBuilder,
118 B: FnOnce(
119 Capability<G::Timestamp>,
120 OperatorInfo,
121 AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>,
122 AsyncInputHandle<G::Timestamp, C2, ConnectedToOne>,
123 AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
124 ) -> BFut,
125 BFut: Future + 'static,
126 P1: ParallelizationContract<G::Timestamp, C1>,
127 P2: ParallelizationContract<G::Timestamp, C2>;
128
129 fn sink_async<P, B, BFut>(&self, pact: P, name: String, constructor: B)
133 where
134 B: FnOnce(OperatorInfo, AsyncInputHandle<G::Timestamp, C1, Disconnected>) -> BFut,
135 BFut: Future + 'static,
136 P: ParallelizationContract<G::Timestamp, C1>;
137
138 fn map_fallible<DCB, ECB, D2, E, L>(
143 &self,
144 name: &str,
145 mut logic: L,
146 ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
147 where
148 DCB: ContainerBuilder + PushInto<D2>,
149 ECB: ContainerBuilder + PushInto<E>,
150 L: for<'a> FnMut(C1::Item<'a>) -> Result<D2, E> + 'static,
151 {
152 self.flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |record| Some(logic(record)))
153 }
154
155 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
160 &self,
161 name: &str,
162 logic: L,
163 ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
164 where
165 DCB: ContainerBuilder + PushInto<D2>,
166 ECB: ContainerBuilder + PushInto<E>,
167 I: IntoIterator<Item = Result<D2, E>>,
168 L: for<'a> FnMut(C1::Item<'a>) -> I + 'static;
169
170 fn expire_stream_at(
172 &self,
173 name: &str,
174 expiration: G::Timestamp,
175 token: Weak<()>,
176 ) -> StreamCore<G, C1>;
177
178 fn pass_through<CB, R>(&self, name: &str, unit: R) -> StreamCore<G, CB::Container>
181 where
182 CB: ContainerBuilder + for<'a> PushInto<(C1::Item<'a>, G::Timestamp, R)>,
183 R: Data;
184
185 fn with_token(&self, token: Weak<()>) -> StreamCore<G, C1>;
189
190 fn distribute(&self) -> StreamCore<G, C1>
192 where
193 C1: ContainerBytes + Send;
194}
195
196pub trait CollectionExt<G, D1, R>
198where
199 G: Scope,
200 R: Semigroup,
201{
202 fn empty(scope: &G) -> Collection<G, D1, R>;
204
205 fn map_fallible<DCB, ECB, D2, E, L>(
214 &self,
215 name: &str,
216 mut logic: L,
217 ) -> (Collection<G, D2, R>, Collection<G, E, R>)
218 where
219 DCB: ContainerBuilder<Container = Vec<(D2, G::Timestamp, R)>>
220 + PushInto<(D2, G::Timestamp, R)>,
221 ECB: ContainerBuilder<Container = Vec<(E, G::Timestamp, R)>>
222 + PushInto<(E, G::Timestamp, R)>,
223 D2: Data,
224 E: Data,
225 L: FnMut(D1) -> Result<D2, E> + 'static,
226 {
227 self.flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |record| Some(logic(record)))
228 }
229
230 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
235 &self,
236 name: &str,
237 logic: L,
238 ) -> (
239 Collection<G, D2, R, DCB::Container>,
240 Collection<G, E, R, ECB::Container>,
241 )
242 where
243 DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
244 ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
245 D2: Data,
246 E: Data,
247 I: IntoIterator<Item = Result<D2, E>>,
248 L: FnMut(D1) -> I + 'static;
249
250 fn expire_collection_at(
252 &self,
253 name: &str,
254 expiration: G::Timestamp,
255 token: Weak<()>,
256 ) -> Collection<G, D1, R>;
257
258 fn explode_one<D2, R2, L>(&self, logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
263 where
264 D2: differential_dataflow::Data,
265 R2: Semigroup + Multiply<R>,
266 <R2 as Multiply<R>>::Output: Data + Semigroup,
267 L: FnMut(D1) -> (D2, R2) + 'static,
268 G::Timestamp: Lattice;
269
270 fn ensure_monotonic<E, IE>(&self, into_err: IE) -> (Collection<G, D1, R>, Collection<G, E, R>)
275 where
276 E: Data,
277 IE: Fn(D1, R) -> (E, R) + 'static,
278 R: num_traits::sign::Signed;
279
280 fn with_token(&self, token: Weak<()>) -> Collection<G, D1, R>;
284
285 fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
288 where
289 D1: differential_dataflow::ExchangeData + Hash + Columnation,
290 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
291 G::Timestamp: Lattice + Columnation,
292 Ba: Batcher<
293 Input = Vec<((D1, ()), G::Timestamp, R)>,
294 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
295 Time = G::Timestamp,
296 > + 'static;
297
298 fn consolidate_named<Ba>(self, name: &str) -> Self
300 where
301 D1: differential_dataflow::ExchangeData + Hash + Columnation,
302 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
303 G::Timestamp: Lattice + Columnation,
304 Ba: Batcher<
305 Input = Vec<((D1, ()), G::Timestamp, R)>,
306 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
307 Time = G::Timestamp,
308 > + 'static;
309}
310
311impl<G, C1> StreamExt<G, C1> for StreamCore<G, C1>
312where
313 C1: Container + Data,
314 G: Scope,
315{
316 fn unary_fallible<DCB, ECB, B, P>(
317 &self,
318 pact: P,
319 name: &str,
320 constructor: B,
321 ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
322 where
323 DCB: ContainerBuilder,
324 ECB: ContainerBuilder,
325 B: FnOnce(
326 Capability<G::Timestamp>,
327 OperatorInfo,
328 ) -> Box<
329 dyn FnMut(
330 &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
331 &mut OutputHandleCore<G::Timestamp, DCB, Tee<G::Timestamp, DCB::Container>>,
332 &mut OutputHandleCore<G::Timestamp, ECB, Tee<G::Timestamp, ECB::Container>>,
333 ) + 'static,
334 >,
335 P: ParallelizationContract<G::Timestamp, C1>,
336 {
337 let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
338 builder.set_notify(false);
339
340 let operator_info = builder.operator_info();
341
342 let mut input = builder.new_input(self, pact);
343 let (mut ok_output, ok_stream) = builder.new_output();
344 let (mut err_output, err_stream) = builder.new_output();
345
346 builder.build(move |mut capabilities| {
347 let capability = capabilities.pop().unwrap();
349 let mut logic = constructor(capability, operator_info);
350 move |_frontiers| {
351 let mut ok_output_handle = ok_output.activate();
352 let mut err_output_handle = err_output.activate();
353 logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
354 }
355 });
356
357 (ok_stream, err_stream)
358 }
359
360 fn unary_async<CB, P, B, BFut>(
361 &self,
362 pact: P,
363 name: String,
364 constructor: B,
365 ) -> StreamCore<G, CB::Container>
366 where
367 CB: ContainerBuilder,
368 B: FnOnce(
369 Capability<G::Timestamp>,
370 OperatorInfo,
371 AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>,
372 AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
373 ) -> BFut,
374 BFut: Future + 'static,
375 P: ParallelizationContract<G::Timestamp, C1>,
376 {
377 let mut builder = OperatorBuilderAsync::new(name, self.scope());
378 let operator_info = builder.operator_info();
379
380 let (output, stream) = builder.new_output();
381 let input = builder.new_input_for(self, pact, &output);
382
383 builder.build(move |mut capabilities| {
384 let capability = capabilities.pop().unwrap();
386 constructor(capability, operator_info, input, output)
387 });
388
389 stream
390 }
391
392 fn binary_async<C2, CB, P1, P2, B, BFut>(
393 &self,
394 other: &StreamCore<G, C2>,
395 pact1: P1,
396 pact2: P2,
397 name: String,
398 constructor: B,
399 ) -> StreamCore<G, CB::Container>
400 where
401 C2: Container + 'static,
402 CB: ContainerBuilder,
403 B: FnOnce(
404 Capability<G::Timestamp>,
405 OperatorInfo,
406 AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>,
407 AsyncInputHandle<G::Timestamp, C2, ConnectedToOne>,
408 AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
409 ) -> BFut,
410 BFut: Future + 'static,
411 P1: ParallelizationContract<G::Timestamp, C1>,
412 P2: ParallelizationContract<G::Timestamp, C2>,
413 {
414 let mut builder = OperatorBuilderAsync::new(name, self.scope());
415 let operator_info = builder.operator_info();
416
417 let (output, stream) = builder.new_output();
418 let input1 = builder.new_input_for(self, pact1, &output);
419 let input2 = builder.new_input_for(other, pact2, &output);
420
421 builder.build(move |mut capabilities| {
422 let capability = capabilities.pop().unwrap();
424 constructor(capability, operator_info, input1, input2, output)
425 });
426
427 stream
428 }
429
430 fn sink_async<P, B, BFut>(&self, pact: P, name: String, constructor: B)
434 where
435 B: FnOnce(OperatorInfo, AsyncInputHandle<G::Timestamp, C1, Disconnected>) -> BFut,
436 BFut: Future + 'static,
437 P: ParallelizationContract<G::Timestamp, C1>,
438 {
439 let mut builder = OperatorBuilderAsync::new(name, self.scope());
440 let operator_info = builder.operator_info();
441
442 let input = builder.new_disconnected_input(self, pact);
443
444 builder.build(move |_capabilities| constructor(operator_info, input));
445 }
446
447 #[allow(clippy::redundant_closure)]
453 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
454 &self,
455 name: &str,
456 mut logic: L,
457 ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
458 where
459 DCB: ContainerBuilder + PushInto<D2>,
460 ECB: ContainerBuilder + PushInto<E>,
461 I: IntoIterator<Item = Result<D2, E>>,
462 L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,
463 {
464 self.unary_fallible::<DCB, ECB, _, _>(Pipeline, name, move |_, _| {
465 Box::new(move |input, ok_output, err_output| {
466 input.for_each(|time, data| {
467 let mut ok_session = ok_output.session_with_builder(&time);
468 let mut err_session = err_output.session_with_builder(&time);
469 for r in data.drain().flat_map(|d1| logic(d1)) {
470 match r {
471 Ok(d2) => ok_session.push_into(d2),
472 Err(e) => err_session.push_into(e),
473 }
474 }
475 })
476 })
477 })
478 }
479
480 fn expire_stream_at(
481 &self,
482 name: &str,
483 expiration: G::Timestamp,
484 token: Weak<()>,
485 ) -> StreamCore<G, C1> {
486 let name = format!("expire_stream_at({name})");
487 self.unary_frontier(Pipeline, &name.clone(), move |cap, _| {
488 let mut cap = Some(cap.delayed(&expiration));
492 let mut warned = false;
493 move |input, output| {
494 if token.upgrade().is_none() {
495 drop(cap.take());
497 } else {
498 let frontier = input.frontier().frontier();
499 if !frontier.less_than(&expiration) && !warned {
500 tracing::warn!(
508 name = name,
509 frontier = ?frontier,
510 expiration = ?expiration,
511 "frontier not less than expiration"
512 );
513 warned = true;
514 }
515 }
516 input.for_each(|time, data| {
517 let mut session = output.session(&time);
518 session.give_container(data);
519 });
520 }
521 })
522 }
523
524 fn pass_through<CB, R>(&self, name: &str, unit: R) -> StreamCore<G, CB::Container>
525 where
526 CB: ContainerBuilder + for<'a> PushInto<(C1::Item<'a>, G::Timestamp, R)>,
527 R: Data,
528 {
529 self.unary::<CB, _, _, _>(Pipeline, name, move |_, _| {
530 move |input, output| {
531 input.for_each(|cap, data| {
532 let mut session = output.session_with_builder(&cap);
533 session.give_iterator(
534 data.drain()
535 .map(|payload| (payload, cap.time().clone(), unit.clone())),
536 );
537 });
538 }
539 })
540 }
541
542 fn with_token(&self, token: Weak<()>) -> StreamCore<G, C1> {
543 self.unary(Pipeline, "WithToken", move |_cap, _info| {
544 move |input, output| {
545 input.for_each(|cap, data| {
546 if token.upgrade().is_some() {
547 output.session(&cap).give_container(data);
548 }
549 });
550 }
551 })
552 }
553
554 fn distribute(&self) -> StreamCore<G, C1>
555 where
556 C1: ContainerBytes + Send,
557 {
558 self.unary(crate::pact::Distribute, "Distribute", move |_, _| {
559 move |input, output| {
560 input.for_each(|time, data| {
561 output.session(&time).give_container(data);
562 });
563 }
564 })
565 }
566}
567
568impl<G, D1, R> CollectionExt<G, D1, R> for Collection<G, D1, R>
569where
570 G: Scope,
571 G::Timestamp: Data,
572 D1: Data,
573 R: Semigroup + 'static,
574{
575 fn empty(scope: &G) -> Collection<G, D1, R> {
576 operator::empty(scope).as_collection()
577 }
578
579 fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
580 &self,
581 name: &str,
582 mut logic: L,
583 ) -> (
584 Collection<G, D2, R, DCB::Container>,
585 Collection<G, E, R, ECB::Container>,
586 )
587 where
588 DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
589 ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
590 D2: Data,
591 E: Data,
592 I: IntoIterator<Item = Result<D2, E>>,
593 L: FnMut(D1) -> I + 'static,
594 {
595 let (ok_stream, err_stream) =
596 self.inner
597 .flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |(d1, t, r)| {
598 logic(d1).into_iter().map(move |res| match res {
599 Ok(d2) => Ok((d2, t.clone(), r.clone())),
600 Err(e) => Err((e, t.clone(), r.clone())),
601 })
602 });
603 (ok_stream.as_collection(), err_stream.as_collection())
604 }
605
606 fn expire_collection_at(
607 &self,
608 name: &str,
609 expiration: G::Timestamp,
610 token: Weak<()>,
611 ) -> Collection<G, D1, R> {
612 self.inner
613 .expire_stream_at(name, expiration, token)
614 .as_collection()
615 }
616
617 fn explode_one<D2, R2, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
618 where
619 D2: differential_dataflow::Data,
620 R2: Semigroup + Multiply<R>,
621 <R2 as Multiply<R>>::Output: Data + Semigroup,
622 L: FnMut(D1) -> (D2, R2) + 'static,
623 G::Timestamp: Lattice,
624 {
625 self.inner
626 .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(
627 Pipeline,
628 "ExplodeOne",
629 move |_, _| {
630 move |input, output| {
631 input.for_each(|time, data| {
632 output
633 .session_with_builder(&time)
634 .give_iterator(data.drain(..).map(|(x, t, d)| {
635 let (x, d2) = logic(x);
636 (x, t, d2.multiply(&d))
637 }));
638 });
639 }
640 },
641 )
642 .as_collection()
643 }
644
645 fn ensure_monotonic<E, IE>(&self, into_err: IE) -> (Collection<G, D1, R>, Collection<G, E, R>)
646 where
647 E: Data,
648 IE: Fn(D1, R) -> (E, R) + 'static,
649 R: num_traits::sign::Signed,
650 {
651 let (oks, errs) = self
652 .inner
653 .unary_fallible(Pipeline, "EnsureMonotonic", move |_, _| {
654 Box::new(move |input, ok_output, err_output| {
655 input.for_each(|time, data| {
656 let mut ok_session = ok_output.session(&time);
657 let mut err_session = err_output.session(&time);
658 for (x, t, d) in data.drain(..) {
659 if d.is_positive() {
660 ok_session.give((x, t, d))
661 } else {
662 let (e, d2) = into_err(x, d);
663 err_session.give((e, t, d2))
664 }
665 }
666 })
667 })
668 });
669 (oks.as_collection(), errs.as_collection())
670 }
671
672 fn with_token(&self, token: Weak<()>) -> Collection<G, D1, R> {
673 self.inner.with_token(token).as_collection()
674 }
675
676 fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
677 where
678 D1: differential_dataflow::ExchangeData + Hash + Columnation,
679 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
680 G::Timestamp: Lattice + Ord + Columnation,
681 Ba: Batcher<
682 Input = Vec<((D1, ()), G::Timestamp, R)>,
683 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
684 Time = G::Timestamp,
685 > + 'static,
686 {
687 if must_consolidate {
688 let random_state = ahash::RandomState::with_seeds(
706 0x243f_6a88_85a3_08d3,
707 0x1319_8a2e_0370_7344,
708 0xa409_3822_299f_31d0,
709 0x082e_fa98_ec4e_6c89,
710 );
711 let exchange = Exchange::new(move |update: &((D1, _), G::Timestamp, R)| {
712 let data = &(update.0).0;
713 let mut h = random_state.build_hasher();
714 data.hash(&mut h);
715 h.finish()
716 });
717 consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
718 .unary(Pipeline, "unpack consolidated", |_, _| {
719 |input, output| {
720 input.for_each(|time, data| {
721 let mut session = output.session(&time);
722 for ((k, ()), t, d) in
723 data.iter().flatten().flat_map(|chunk| chunk.iter())
724 {
725 session.give((k.clone(), t.clone(), d.clone()))
726 }
727 })
728 }
729 })
730 .as_collection()
731 } else {
732 self
733 }
734 }
735
736 fn consolidate_named<Ba>(self, name: &str) -> Self
737 where
738 D1: differential_dataflow::ExchangeData + Hash + Columnation,
739 R: Semigroup + differential_dataflow::ExchangeData + Columnation,
740 G::Timestamp: Lattice + Ord + Columnation,
741 Ba: Batcher<
742 Input = Vec<((D1, ()), G::Timestamp, R)>,
743 Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
744 Time = G::Timestamp,
745 > + 'static,
746 {
747 let exchange =
748 Exchange::new(move |update: &((D1, ()), G::Timestamp, R)| (update.0).0.hashed());
749
750 consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
751 .unary(Pipeline, &format!("Unpack {name}"), |_, _| {
752 |input, output| {
753 input.for_each(|time, data| {
754 let mut session = output.session(&time);
755 for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
756 {
757 session.give((k.clone(), t.clone(), d.clone()))
758 }
759 })
760 }
761 })
762 .as_collection()
763 }
764}
765
766pub fn source_async<G: Scope, CB, B, BFut>(
772 scope: &G,
773 name: String,
774 constructor: B,
775) -> StreamCore<G, CB::Container>
776where
777 CB: ContainerBuilder,
778 B: FnOnce(
779 Capability<G::Timestamp>,
780 OperatorInfo,
781 AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
782 ) -> BFut,
783 BFut: Future + 'static,
784{
785 let mut builder = OperatorBuilderAsync::new(name, scope.clone());
786 let operator_info = builder.operator_info();
787
788 let (output, stream) = builder.new_output();
789
790 builder.build(move |mut capabilities| {
791 let capability = capabilities.pop().unwrap();
793 constructor(capability, operator_info, output)
794 });
795
796 stream
797}
798
799pub fn consolidate_pact<Ba, P, G>(
806 stream: &StreamCore<G, Ba::Input>,
807 pact: P,
808 name: &str,
809) -> Stream<G, Vec<Ba::Output>>
810where
811 G: Scope,
812 Ba: Batcher<Time = G::Timestamp> + 'static,
813 Ba::Input: Container + Clone + 'static,
814 Ba::Output: Container + Clone,
815 P: ParallelizationContract<G::Timestamp, Ba::Input>,
816{
817 stream.unary_frontier(pact, name, |_cap, info| {
818 let logger = {
820 let scope = stream.scope();
821 let register = scope.log_register();
822 register
823 .get::<DifferentialEventBuilder>("differential/arrange")
824 .map(Into::into)
825 };
826
827 let mut batcher = Ba::new(logger, info.global_id);
828 let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
830 let mut prev_frontier = Antichain::from_elem(G::Timestamp::minimum());
831
832 move |input, output| {
833 input.for_each(|cap, data| {
834 capabilities.insert(cap.retain());
835 batcher.push_container(data);
836 });
837
838 if prev_frontier.borrow() != input.frontier().frontier() {
839 if capabilities
840 .elements()
841 .iter()
842 .any(|c| !input.frontier().less_equal(c.time()))
843 {
844 let mut upper = Antichain::new(); for (index, capability) in capabilities.elements().iter().enumerate() {
848 if !input.frontier().less_equal(capability.time()) {
849 upper.clear();
853 for time in input.frontier().frontier().iter() {
854 upper.insert(time.clone());
855 }
856 for other_capability in &capabilities.elements()[(index + 1)..] {
857 upper.insert(other_capability.time().clone());
858 }
859
860 let mut session = output.session(&capabilities.elements()[index]);
862 let output =
864 batcher.seal::<ConsolidateBuilder<_, Ba::Output>>(upper.clone());
865 session.give(output);
866 }
867 }
868
869 let mut new_capabilities = Antichain::new();
875 for time in batcher.frontier().iter() {
876 if let Some(capability) = capabilities
877 .elements()
878 .iter()
879 .find(|c| c.time().less_equal(time))
880 {
881 new_capabilities.insert(capability.delayed(time));
882 } else {
883 panic!("failed to find capability");
884 }
885 }
886
887 capabilities = new_capabilities;
888 }
889
890 prev_frontier.clear();
891 prev_frontier.extend(input.frontier().frontier().iter().cloned());
892 }
893 }
894 })
895}
896
897struct ConsolidateBuilder<T, I> {
899 _marker: PhantomData<(T, I)>,
900}
901
902impl<T, I> Builder for ConsolidateBuilder<T, I>
903where
904 T: Timestamp,
905 I: Container,
906{
907 type Input = I;
908 type Time = T;
909 type Output = Vec<I>;
910
911 fn new() -> Self {
912 Self {
913 _marker: PhantomData,
914 }
915 }
916
917 fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
918 Self::new()
919 }
920
921 fn push(&mut self, _chunk: &mut Self::Input) {
922 unimplemented!("ConsolidateBuilder::push")
923 }
924
925 fn done(self, _: Description<Self::Time>) -> Self::Output {
926 unimplemented!("ConsolidateBuilder::done")
927 }
928
929 fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
930 std::mem::take(chain)
931 }
932}