mz_timely_util/
operator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Common operator transformations on timely streams and differential collections.
17
18use std::hash::{BuildHasher, Hash, Hasher};
19use std::marker::PhantomData;
20use std::rc::Weak;
21
22use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
23use differential_dataflow::containers::{Columnation, TimelyStack};
24use differential_dataflow::difference::{Multiply, Semigroup};
25use differential_dataflow::lattice::Lattice;
26use differential_dataflow::trace::{Batcher, Builder, Description};
27use differential_dataflow::{AsCollection, Collection, Hashable};
28use timely::container::{ContainerBuilder, DrainContainer, PushInto};
29use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
30use timely::dataflow::channels::pushers::Tee;
31use timely::dataflow::channels::pushers::buffer::Session;
32use timely::dataflow::operators::Capability;
33use timely::dataflow::operators::generic::builder_rc::{
34    OperatorBuilder as OperatorBuilderRc, OperatorBuilder,
35};
36use timely::dataflow::operators::generic::operator::{self, Operator};
37use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo, OutputHandleCore};
38use timely::dataflow::{Scope, ScopeParent, Stream, StreamCore};
39use timely::progress::{Antichain, Timestamp};
40use timely::{Container, Data, PartialOrder};
41
42/// A session with lifetime `'a` in a scope `G` with a container builder `CB`.
43///
44/// This is a shorthand primarily for the reason of readability.
45pub type SessionFor<'a, G, CB> = Session<
46    'a,
47    <G as ScopeParent>::Timestamp,
48    CB,
49    timely::dataflow::channels::pushers::Counter<
50        <G as ScopeParent>::Timestamp,
51        <CB as ContainerBuilder>::Container,
52        Tee<<G as ScopeParent>::Timestamp, <CB as ContainerBuilder>::Container>,
53    >,
54>;
55
56/// Extension methods for timely [`StreamCore`]s.
57pub trait StreamExt<G, C1>
58where
59    C1: Container + DrainContainer,
60    G: Scope,
61{
62    /// Like `timely::dataflow::operators::generic::operator::Operator::unary`,
63    /// but the logic function can handle failures.
64    ///
65    /// Creates a new dataflow operator that partitions its input stream by a
66    /// parallelization strategy `pact` and repeatedly invokes `logic`, the
67    /// function returned by the function passed as `constructor`. The `logic`
68    /// function can read to the input stream and write to either of two output
69    /// streams, where the first output stream represents successful
70    /// computations and the second output stream represents failed
71    /// computations.
72    fn unary_fallible<DCB, ECB, B, P>(
73        &self,
74        pact: P,
75        name: &str,
76        constructor: B,
77    ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
78    where
79        DCB: ContainerBuilder,
80        ECB: ContainerBuilder,
81        B: FnOnce(
82            Capability<G::Timestamp>,
83            OperatorInfo,
84        ) -> Box<
85            dyn FnMut(
86                    &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
87                    &mut OutputHandleCore<G::Timestamp, DCB, Tee<G::Timestamp, DCB::Container>>,
88                    &mut OutputHandleCore<G::Timestamp, ECB, Tee<G::Timestamp, ECB::Container>>,
89                ) + 'static,
90        >,
91        P: ParallelizationContract<G::Timestamp, C1>;
92
93    /// Like [`timely::dataflow::operators::map::Map::flat_map`], but `logic`
94    /// is allowed to fail. The first returned stream will contain the
95    /// successful applications of `logic`, while the second returned stream
96    /// will contain the failed applications.
97    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
98        &self,
99        name: &str,
100        logic: L,
101    ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
102    where
103        DCB: ContainerBuilder + PushInto<D2>,
104        ECB: ContainerBuilder + PushInto<E>,
105        I: IntoIterator<Item = Result<D2, E>>,
106        L: for<'a> FnMut(C1::Item<'a>) -> I + 'static;
107
108    /// Block progress of the frontier at `expiration` time, unless the token is dropped.
109    fn expire_stream_at(
110        &self,
111        name: &str,
112        expiration: G::Timestamp,
113        token: Weak<()>,
114    ) -> StreamCore<G, C1>;
115}
116
117/// Extension methods for differential [`Collection`]s.
118pub trait CollectionExt<G, D1, R>
119where
120    G: Scope,
121    R: Semigroup,
122{
123    /// Creates a new empty collection in `scope`.
124    fn empty(scope: &G) -> Collection<G, D1, R>;
125
126    /// Like [`Collection::map`], but `logic` is allowed to fail. The first
127    /// returned collection will contain successful applications of `logic`,
128    /// while the second returned collection will contain the failed
129    /// applications.
130    ///
131    /// Callers need to specify the following type parameters:
132    /// * `DCB`: The container builder for the `Ok` output.
133    /// * `ECB`: The container builder for the `Err` output.
134    fn map_fallible<DCB, ECB, D2, E, L>(
135        &self,
136        name: &str,
137        mut logic: L,
138    ) -> (Collection<G, D2, R>, Collection<G, E, R>)
139    where
140        DCB: ContainerBuilder<Container = Vec<(D2, G::Timestamp, R)>>
141            + PushInto<(D2, G::Timestamp, R)>,
142        ECB: ContainerBuilder<Container = Vec<(E, G::Timestamp, R)>>
143            + PushInto<(E, G::Timestamp, R)>,
144        D2: Data,
145        E: Data,
146        L: FnMut(D1) -> Result<D2, E> + 'static,
147    {
148        self.flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |record| Some(logic(record)))
149    }
150
151    /// Like [`Collection::flat_map`], but `logic` is allowed to fail. The first
152    /// returned collection will contain the successful applications of `logic`,
153    /// while the second returned collection will contain the failed
154    /// applications.
155    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
156        &self,
157        name: &str,
158        logic: L,
159    ) -> (
160        Collection<G, D2, R, DCB::Container>,
161        Collection<G, E, R, ECB::Container>,
162    )
163    where
164        DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
165        ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
166        D2: Data,
167        E: Data,
168        I: IntoIterator<Item = Result<D2, E>>,
169        L: FnMut(D1) -> I + 'static;
170
171    /// Block progress of the frontier at `expiration` time, unless the token is dropped.
172    fn expire_collection_at(
173        &self,
174        name: &str,
175        expiration: G::Timestamp,
176        token: Weak<()>,
177    ) -> Collection<G, D1, R>;
178
179    /// Replaces each record with another, with a new difference type.
180    ///
181    /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
182    /// and move the data into the difference component. This will allow differential dataflow to update in-place.
183    fn explode_one<D2, R2, L>(&self, logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
184    where
185        D2: differential_dataflow::Data,
186        R2: Semigroup + Multiply<R>,
187        <R2 as Multiply<R>>::Output: Data + Semigroup,
188        L: FnMut(D1) -> (D2, R2) + 'static,
189        G::Timestamp: Lattice;
190
191    /// Partitions the input into a monotonic collection and
192    /// non-monotone exceptions, with respect to differences.
193    ///
194    /// The exceptions are transformed by `into_err`.
195    fn ensure_monotonic<E, IE>(&self, into_err: IE) -> (Collection<G, D1, R>, Collection<G, E, R>)
196    where
197        E: Data,
198        IE: Fn(D1, R) -> (E, R) + 'static,
199        R: num_traits::sign::Signed;
200
201    /// Consolidates the collection if `must_consolidate` is `true` and leaves it
202    /// untouched otherwise.
203    fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
204    where
205        D1: differential_dataflow::ExchangeData + Hash + Columnation,
206        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
207        G::Timestamp: Lattice + Columnation,
208        Ba: Batcher<
209                Input = Vec<((D1, ()), G::Timestamp, R)>,
210                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
211                Time = G::Timestamp,
212            > + 'static;
213
214    /// Consolidates the collection.
215    fn consolidate_named<Ba>(self, name: &str) -> Self
216    where
217        D1: differential_dataflow::ExchangeData + Hash + Columnation,
218        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
219        G::Timestamp: Lattice + Columnation,
220        Ba: Batcher<
221                Input = Vec<((D1, ()), G::Timestamp, R)>,
222                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
223                Time = G::Timestamp,
224            > + 'static;
225}
226
227impl<G, C1> StreamExt<G, C1> for StreamCore<G, C1>
228where
229    C1: Container + DrainContainer,
230    G: Scope,
231{
232    fn unary_fallible<DCB, ECB, B, P>(
233        &self,
234        pact: P,
235        name: &str,
236        constructor: B,
237    ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
238    where
239        DCB: ContainerBuilder,
240        ECB: ContainerBuilder,
241        B: FnOnce(
242            Capability<G::Timestamp>,
243            OperatorInfo,
244        ) -> Box<
245            dyn FnMut(
246                    &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
247                    &mut OutputHandleCore<G::Timestamp, DCB, Tee<G::Timestamp, DCB::Container>>,
248                    &mut OutputHandleCore<G::Timestamp, ECB, Tee<G::Timestamp, ECB::Container>>,
249                ) + 'static,
250        >,
251        P: ParallelizationContract<G::Timestamp, C1>,
252    {
253        let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
254        builder.set_notify(false);
255
256        let operator_info = builder.operator_info();
257
258        let mut input = builder.new_input(self, pact);
259        let (mut ok_output, ok_stream) = builder.new_output();
260        let (mut err_output, err_stream) = builder.new_output();
261
262        builder.build(move |mut capabilities| {
263            // `capabilities` should be a single-element vector.
264            let capability = capabilities.pop().unwrap();
265            let mut logic = constructor(capability, operator_info);
266            move |_frontiers| {
267                let mut ok_output_handle = ok_output.activate();
268                let mut err_output_handle = err_output.activate();
269                logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
270            }
271        });
272
273        (ok_stream, err_stream)
274    }
275
276    // XXX(guswynn): file an minimization bug report for the logic flat_map
277    // false positive here
278    // TODO(guswynn): remove this after https://github.com/rust-lang/rust-clippy/issues/8098 is
279    // resolved. The `logic` `FnMut` needs to be borrowed in the `flat_map` call, not moved in
280    // so the simple `|d1| logic(d1)` closure is load-bearing
281    #[allow(clippy::redundant_closure)]
282    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
283        &self,
284        name: &str,
285        mut logic: L,
286    ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
287    where
288        DCB: ContainerBuilder + PushInto<D2>,
289        ECB: ContainerBuilder + PushInto<E>,
290        I: IntoIterator<Item = Result<D2, E>>,
291        L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,
292    {
293        self.unary_fallible::<DCB, ECB, _, _>(Pipeline, name, move |_, _| {
294            Box::new(move |input, ok_output, err_output| {
295                input.for_each(|time, data| {
296                    let mut ok_session = ok_output.session_with_builder(&time);
297                    let mut err_session = err_output.session_with_builder(&time);
298                    for r in data.drain().flat_map(|d1| logic(d1)) {
299                        match r {
300                            Ok(d2) => ok_session.push_into(d2),
301                            Err(e) => err_session.push_into(e),
302                        }
303                    }
304                })
305            })
306        })
307    }
308
309    fn expire_stream_at(
310        &self,
311        name: &str,
312        expiration: G::Timestamp,
313        token: Weak<()>,
314    ) -> StreamCore<G, C1> {
315        let name = format!("expire_stream_at({name})");
316        self.unary_frontier(Pipeline, &name.clone(), move |cap, _| {
317            // Retain a capability for the expiration time, which we'll only drop if the token
318            // is dropped. Else, block progress at the expiration time to prevent downstream
319            // operators from making any statement about expiration time or any following time.
320            let mut cap = Some(cap.delayed(&expiration));
321            let mut warned = false;
322            move |input, output| {
323                if token.upgrade().is_none() {
324                    // In shutdown, allow to propagate.
325                    drop(cap.take());
326                } else {
327                    let frontier = input.frontier().frontier();
328                    if !frontier.less_than(&expiration) && !warned {
329                        // Here, we print a warning, not an error. The state is only a liveness
330                        // concern, but not relevant for correctness. Additionally, a race between
331                        // shutting down the dataflow and dropping the token can cause the dataflow
332                        // to shut down before we drop the token.  This can happen when dropping
333                        // the last remaining capability on a different worker.  We do not want to
334                        // log an error every time this happens.
335
336                        tracing::warn!(
337                            name = name,
338                            frontier = ?frontier,
339                            expiration = ?expiration,
340                            "frontier not less than expiration"
341                        );
342                        warned = true;
343                    }
344                }
345                input.for_each(|time, data| {
346                    let mut session = output.session(&time);
347                    session.give_container(data);
348                });
349            }
350        })
351    }
352}
353
354impl<G, D1, R> CollectionExt<G, D1, R> for Collection<G, D1, R>
355where
356    G: Scope,
357    G::Timestamp: Data,
358    D1: Data,
359    R: Semigroup + 'static,
360{
361    fn empty(scope: &G) -> Collection<G, D1, R> {
362        operator::empty(scope).as_collection()
363    }
364
365    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
366        &self,
367        name: &str,
368        mut logic: L,
369    ) -> (
370        Collection<G, D2, R, DCB::Container>,
371        Collection<G, E, R, ECB::Container>,
372    )
373    where
374        DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
375        ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
376        D2: Data,
377        E: Data,
378        I: IntoIterator<Item = Result<D2, E>>,
379        L: FnMut(D1) -> I + 'static,
380    {
381        let (ok_stream, err_stream) =
382            self.inner
383                .flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |(d1, t, r)| {
384                    logic(d1).into_iter().map(move |res| match res {
385                        Ok(d2) => Ok((d2, t.clone(), r.clone())),
386                        Err(e) => Err((e, t.clone(), r.clone())),
387                    })
388                });
389        (ok_stream.as_collection(), err_stream.as_collection())
390    }
391
392    fn expire_collection_at(
393        &self,
394        name: &str,
395        expiration: G::Timestamp,
396        token: Weak<()>,
397    ) -> Collection<G, D1, R> {
398        self.inner
399            .expire_stream_at(name, expiration, token)
400            .as_collection()
401    }
402
403    fn explode_one<D2, R2, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
404    where
405        D2: differential_dataflow::Data,
406        R2: Semigroup + Multiply<R>,
407        <R2 as Multiply<R>>::Output: Data + Semigroup,
408        L: FnMut(D1) -> (D2, R2) + 'static,
409        G::Timestamp: Lattice,
410    {
411        self.inner
412            .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(
413                Pipeline,
414                "ExplodeOne",
415                move |_, _| {
416                    move |input, output| {
417                        input.for_each(|time, data| {
418                            output
419                                .session_with_builder(&time)
420                                .give_iterator(data.drain(..).map(|(x, t, d)| {
421                                    let (x, d2) = logic(x);
422                                    (x, t, d2.multiply(&d))
423                                }));
424                        });
425                    }
426                },
427            )
428            .as_collection()
429    }
430
431    fn ensure_monotonic<E, IE>(&self, into_err: IE) -> (Collection<G, D1, R>, Collection<G, E, R>)
432    where
433        E: Data,
434        IE: Fn(D1, R) -> (E, R) + 'static,
435        R: num_traits::sign::Signed,
436    {
437        let (oks, errs) = self
438            .inner
439            .unary_fallible(Pipeline, "EnsureMonotonic", move |_, _| {
440                Box::new(move |input, ok_output, err_output| {
441                    input.for_each(|time, data| {
442                        let mut ok_session = ok_output.session(&time);
443                        let mut err_session = err_output.session(&time);
444                        for (x, t, d) in data.drain(..) {
445                            if d.is_positive() {
446                                ok_session.give((x, t, d))
447                            } else {
448                                let (e, d2) = into_err(x, d);
449                                err_session.give((e, t, d2))
450                            }
451                        }
452                    })
453                })
454            });
455        (oks.as_collection(), errs.as_collection())
456    }
457
458    fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
459    where
460        D1: differential_dataflow::ExchangeData + Hash + Columnation,
461        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
462        G::Timestamp: Lattice + Ord + Columnation,
463        Ba: Batcher<
464                Input = Vec<((D1, ()), G::Timestamp, R)>,
465                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
466                Time = G::Timestamp,
467            > + 'static,
468    {
469        if must_consolidate {
470            // We employ AHash below instead of the default hasher in DD to obtain
471            // a better distribution of data to workers. AHash claims empirically
472            // both speed and high quality, according to
473            // https://github.com/tkaitchuck/aHash/blob/master/compare/readme.md.
474            // TODO(vmarcos): Consider here if it is worth it to spend the time to
475            // implement twisted tabulation hashing as proposed in Mihai Patrascu,
476            // Mikkel Thorup: Twisted Tabulation Hashing. SODA 2013: 209-228, available
477            // at https://epubs.siam.org/doi/epdf/10.1137/1.9781611973105.16. The latter
478            // would provide good bounds for balls-into-bins problems when the number of
479            // bins is small (as is our case), so we'd have a theoretical guarantee.
480            // NOTE: We fix the seeds of a RandomState instance explicity with the same
481            // seeds that would be given by `AHash` via ahash::AHasher::default() so as
482            // to avoid a different selection due to compile-time features being differently
483            // selected in other dependencies using `AHash` vis-à-vis cargo's strategy
484            // of unioning features.
485            // NOTE: Depending on target features, we may end up employing the fallback
486            // hasher of `AHash`, but it should be sufficient for our needs.
487            let random_state = ahash::RandomState::with_seeds(
488                0x243f_6a88_85a3_08d3,
489                0x1319_8a2e_0370_7344,
490                0xa409_3822_299f_31d0,
491                0x082e_fa98_ec4e_6c89,
492            );
493            let exchange = Exchange::new(move |update: &((D1, _), G::Timestamp, R)| {
494                let data = &(update.0).0;
495                let mut h = random_state.build_hasher();
496                data.hash(&mut h);
497                h.finish()
498            });
499            consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
500                .unary(Pipeline, "unpack consolidated", |_, _| {
501                    |input, output| {
502                        input.for_each(|time, data| {
503                            let mut session = output.session(&time);
504                            for ((k, ()), t, d) in
505                                data.iter().flatten().flat_map(|chunk| chunk.iter())
506                            {
507                                session.give((k.clone(), t.clone(), d.clone()))
508                            }
509                        })
510                    }
511                })
512                .as_collection()
513        } else {
514            self
515        }
516    }
517
518    fn consolidate_named<Ba>(self, name: &str) -> Self
519    where
520        D1: differential_dataflow::ExchangeData + Hash + Columnation,
521        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
522        G::Timestamp: Lattice + Ord + Columnation,
523        Ba: Batcher<
524                Input = Vec<((D1, ()), G::Timestamp, R)>,
525                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
526                Time = G::Timestamp,
527            > + 'static,
528    {
529        let exchange =
530            Exchange::new(move |update: &((D1, ()), G::Timestamp, R)| (update.0).0.hashed());
531
532        consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
533            .unary(Pipeline, &format!("Unpack {name}"), |_, _| {
534                |input, output| {
535                    input.for_each(|time, data| {
536                        let mut session = output.session(&time);
537                        for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
538                        {
539                            session.give((k.clone(), t.clone(), d.clone()))
540                        }
541                    })
542                }
543            })
544            .as_collection()
545    }
546}
547
548/// Aggregates the weights of equal records into at most one record.
549///
550/// Produces a stream of chains of records, partitioned according to `pact`. The
551/// data is sorted according to `Ba`. For each timestamp, it produces at most one chain.
552///
553/// The data are accumulated in place, each held back until their timestamp has completed.
554pub fn consolidate_pact<Ba, P, G>(
555    stream: &StreamCore<G, Ba::Input>,
556    pact: P,
557    name: &str,
558) -> Stream<G, Vec<Ba::Output>>
559where
560    G: Scope,
561    Ba: Batcher<Time = G::Timestamp> + 'static,
562    Ba::Input: Container + Clone + 'static,
563    Ba::Output: Container + Clone,
564    P: ParallelizationContract<G::Timestamp, Ba::Input>,
565{
566    stream.unary_frontier(pact, name, |_cap, info| {
567        // Acquire a logger for arrange events.
568        let logger = stream
569            .scope()
570            .logger_for("differential/arrange")
571            .map(Into::into);
572
573        let mut batcher = Ba::new(logger, info.global_id);
574        // Capabilities for the lower envelope of updates in `batcher`.
575        let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
576        let mut prev_frontier = Antichain::from_elem(G::Timestamp::minimum());
577
578        move |input, output| {
579            input.for_each(|cap, data| {
580                capabilities.insert(cap.retain());
581                batcher.push_container(data);
582            });
583
584            if prev_frontier.borrow() != input.frontier().frontier() {
585                if capabilities
586                    .elements()
587                    .iter()
588                    .any(|c| !input.frontier().less_equal(c.time()))
589                {
590                    let mut upper = Antichain::new(); // re-used allocation for sealing batches.
591
592                    // For each capability not in advance of the input frontier ...
593                    for (index, capability) in capabilities.elements().iter().enumerate() {
594                        if !input.frontier().less_equal(capability.time()) {
595                            // Assemble the upper bound on times we can commit with this capabilities.
596                            // We must respect the input frontier, and *subsequent* capabilities, as
597                            // we are pretending to retire the capability changes one by one.
598                            upper.clear();
599                            for time in input.frontier().frontier().iter() {
600                                upper.insert(time.clone());
601                            }
602                            for other_capability in &capabilities.elements()[(index + 1)..] {
603                                upper.insert(other_capability.time().clone());
604                            }
605
606                            // send the batch to downstream consumers, empty or not.
607                            let mut session = output.session(&capabilities.elements()[index]);
608                            // Extract updates not in advance of `upper`.
609                            let output =
610                                batcher.seal::<ConsolidateBuilder<_, Ba::Output>>(upper.clone());
611                            session.give(output);
612                        }
613                    }
614
615                    // Having extracted and sent batches between each capability and the input frontier,
616                    // we should downgrade all capabilities to match the batcher's lower update frontier.
617                    // This may involve discarding capabilities, which is fine as any new updates arrive
618                    // in messages with new capabilities.
619
620                    let mut new_capabilities = Antichain::new();
621                    for time in batcher.frontier().iter() {
622                        if let Some(capability) = capabilities
623                            .elements()
624                            .iter()
625                            .find(|c| c.time().less_equal(time))
626                        {
627                            new_capabilities.insert(capability.delayed(time));
628                        } else {
629                            panic!("failed to find capability");
630                        }
631                    }
632
633                    capabilities = new_capabilities;
634                }
635
636                prev_frontier.clear();
637                prev_frontier.extend(input.frontier().frontier().iter().cloned());
638            }
639        }
640    })
641}
642
643/// A builder that wraps a session for direct output to a stream.
644struct ConsolidateBuilder<T, I> {
645    _marker: PhantomData<(T, I)>,
646}
647
648impl<T, I> Builder for ConsolidateBuilder<T, I>
649where
650    T: Timestamp,
651    I: Container,
652{
653    type Input = I;
654    type Time = T;
655    type Output = Vec<I>;
656
657    fn new() -> Self {
658        Self {
659            _marker: PhantomData,
660        }
661    }
662
663    fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
664        Self::new()
665    }
666
667    fn push(&mut self, _chunk: &mut Self::Input) {
668        unimplemented!("ConsolidateBuilder::push")
669    }
670
671    fn done(self, _: Description<Self::Time>) -> Self::Output {
672        unimplemented!("ConsolidateBuilder::done")
673    }
674
675    fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
676        std::mem::take(chain)
677    }
678}
679
680/// Merge the contents of multiple streams and combine the containers using a container builder.
681pub trait ConcatenateFlatten<G: Scope, C: Container + DrainContainer> {
682    /// Merge the contents of multiple streams and use the provided container builder to form
683    /// output containers.
684    ///
685    /// # Examples
686    /// ```
687    /// use timely::container::CapacityContainerBuilder;
688    /// use timely::dataflow::operators::{ToStream, Inspect};
689    /// use mz_timely_util::operator::ConcatenateFlatten;
690    ///
691    /// timely::example(|scope| {
692    ///
693    ///     let streams = vec![(0..10).to_stream(scope),
694    ///                        (0..10).to_stream(scope),
695    ///                        (0..10).to_stream(scope)];
696    ///
697    ///     scope.concatenate_flatten::<_, CapacityContainerBuilder<Vec<_>>>(streams)
698    ///          .inspect(|x| println!("seen: {:?}", x));
699    /// });
700    /// ```
701    fn concatenate_flatten<I, CB>(&self, sources: I) -> StreamCore<G, CB::Container>
702    where
703        I: IntoIterator<Item = StreamCore<G, C>>,
704        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>;
705}
706
707impl<G, C> ConcatenateFlatten<G, C> for StreamCore<G, C>
708where
709    G: Scope,
710    C: Container + DrainContainer,
711{
712    fn concatenate_flatten<I, CB>(&self, sources: I) -> StreamCore<G, CB::Container>
713    where
714        I: IntoIterator<Item = StreamCore<G, C>>,
715        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
716    {
717        let clone = self.clone();
718        self.scope()
719            .concatenate_flatten::<_, CB>(Some(clone).into_iter().chain(sources))
720    }
721}
722
723impl<G, C> ConcatenateFlatten<G, C> for G
724where
725    G: Scope,
726    C: Container + DrainContainer,
727{
728    fn concatenate_flatten<I, CB>(&self, sources: I) -> StreamCore<G, CB::Container>
729    where
730        I: IntoIterator<Item = StreamCore<G, C>>,
731        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
732    {
733        let mut builder = OperatorBuilder::new("ConcatenateFlatten".to_string(), self.clone());
734        builder.set_notify(false);
735
736        // create new input handles for each input stream.
737        let mut handles = sources
738            .into_iter()
739            .map(|s| builder.new_input(&s, Pipeline))
740            .collect::<Vec<_>>();
741
742        // create one output handle for the concatenated results.
743        let (mut output, result) = builder.new_output::<CB>();
744
745        builder.build(move |_capability| {
746            move |_frontier| {
747                let mut output = output.activate();
748                for handle in handles.iter_mut() {
749                    handle.for_each(|time, data| {
750                        output
751                            .session_with_builder(&time)
752                            .give_iterator(data.drain());
753                    })
754                }
755            }
756        });
757
758        result
759    }
760}