Skip to main content

mz_timely_util/
operator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Common operator transformations on timely streams and differential collections.
17
18use std::hash::{BuildHasher, Hash, Hasher};
19
20use columnation::Columnation;
21use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
22use differential_dataflow::difference::{Multiply, Semigroup};
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Batcher;
25use differential_dataflow::{AsCollection, Collection, Hashable, VecCollection};
26use timely::container::{DrainContainer, PushInto};
27use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
28use timely::dataflow::operators::Capability;
29use timely::dataflow::operators::generic::builder_rc::{
30    OperatorBuilder as OperatorBuilderRc, OperatorBuilder,
31};
32use timely::dataflow::operators::generic::operator::{self, Operator};
33use timely::dataflow::operators::generic::{
34    InputHandleCore, OperatorInfo, OutputBuilder, OutputBuilderSession,
35};
36use timely::dataflow::{Scope, Stream, StreamVec};
37use timely::progress::operate::FrontierInterest;
38use timely::progress::{Antichain, Timestamp};
39use timely::{Container, ContainerBuilder, PartialOrder};
40
41use crate::columnation::{ColumnationChunker, ColumnationStack};
42
43/// Extension methods for timely [`Stream`]s.
44pub trait StreamExt<'scope, T, C1>
45where
46    T: Timestamp,
47    C1: Container + DrainContainer + Clone + 'static,
48{
49    /// Like `timely::dataflow::operators::generic::operator::Operator::unary`,
50    /// but the logic function can handle failures.
51    ///
52    /// Creates a new dataflow operator that partitions its input stream by a
53    /// parallelization strategy `pact` and repeatedly invokes `logic`, the
54    /// function returned by the function passed as `constructor`. The `logic`
55    /// function can read to the input stream and write to either of two output
56    /// streams, where the first output stream represents successful
57    /// computations and the second output stream represents failed
58    /// computations.
59    fn unary_fallible<DCB, ECB, B, P>(
60        self,
61        pact: P,
62        name: &str,
63        constructor: B,
64    ) -> (
65        Stream<'scope, T, DCB::Container>,
66        Stream<'scope, T, ECB::Container>,
67    )
68    where
69        DCB: ContainerBuilder,
70        ECB: ContainerBuilder,
71        B: FnOnce(
72            Capability<T>,
73            OperatorInfo,
74        ) -> Box<
75            dyn FnMut(
76                    &mut InputHandleCore<T, C1, P::Puller>,
77                    &mut OutputBuilderSession<'_, T, DCB>,
78                    &mut OutputBuilderSession<'_, T, ECB>,
79                ) + 'static,
80        >,
81        P: ParallelizationContract<T, C1>;
82
83    /// Like [`timely::dataflow::operators::vec::Map::flat_map`], but `logic`
84    /// is allowed to fail. The first returned stream will contain the
85    /// successful applications of `logic`, while the second returned stream
86    /// will contain the failed applications.
87    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
88        self,
89        name: &str,
90        logic: L,
91    ) -> (
92        Stream<'scope, T, DCB::Container>,
93        Stream<'scope, T, ECB::Container>,
94    )
95    where
96        DCB: ContainerBuilder + PushInto<D2>,
97        ECB: ContainerBuilder + PushInto<E>,
98        I: IntoIterator<Item = Result<D2, E>>,
99        L: for<'a> FnMut(C1::Item<'a>) -> I + 'static;
100
101    /// Block progress of the frontier at `expiration` time
102    fn expire_stream_at(self, name: &str, expiration: T) -> Stream<'scope, T, C1>;
103}
104
105/// Extension methods for differential [`Collection`]s.
106pub trait CollectionExt<'scope, T, D1, R>: Sized
107where
108    T: Timestamp,
109    R: Semigroup,
110{
111    /// Creates a new empty collection in `scope`.
112    fn empty(scope: Scope<'scope, T>) -> VecCollection<'scope, T, D1, R>;
113
114    /// Like [`Collection::map`], but `logic` is allowed to fail. The first
115    /// returned collection will contain successful applications of `logic`,
116    /// while the second returned collection will contain the failed
117    /// applications.
118    ///
119    /// Callers need to specify the following type parameters:
120    /// * `DCB`: The container builder for the `Ok` output.
121    /// * `ECB`: The container builder for the `Err` output.
122    fn map_fallible<DCB, ECB, D2, E, L>(
123        self,
124        name: &str,
125        mut logic: L,
126    ) -> (
127        VecCollection<'scope, T, D2, R>,
128        VecCollection<'scope, T, E, R>,
129    )
130    where
131        DCB: ContainerBuilder<Container = Vec<(D2, T, R)>> + PushInto<(D2, T, R)>,
132        ECB: ContainerBuilder<Container = Vec<(E, T, R)>> + PushInto<(E, T, R)>,
133        D2: Clone + 'static,
134        E: Clone + 'static,
135        L: FnMut(D1) -> Result<D2, E> + 'static,
136    {
137        self.flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |record| Some(logic(record)))
138    }
139
140    /// Like [`Collection::flat_map`], but `logic` is allowed to fail. The first
141    /// returned collection will contain the successful applications of `logic`,
142    /// while the second returned collection will contain the failed
143    /// applications.
144    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
145        self,
146        name: &str,
147        logic: L,
148    ) -> (
149        Collection<'scope, T, DCB::Container>,
150        Collection<'scope, T, ECB::Container>,
151    )
152    where
153        DCB: ContainerBuilder + PushInto<(D2, T, R)>,
154        ECB: ContainerBuilder + PushInto<(E, T, R)>,
155        D2: Clone + 'static,
156        E: Clone + 'static,
157        I: IntoIterator<Item = Result<D2, E>>,
158        L: FnMut(D1) -> I + 'static;
159
160    /// Block progress of the frontier at `expiration` time.
161    fn expire_collection_at(self, name: &str, expiration: T) -> VecCollection<'scope, T, D1, R>;
162
163    /// Replaces each record with another, with a new difference type.
164    ///
165    /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
166    /// and move the data into the difference component. This will allow differential dataflow to update in-place.
167    fn explode_one<D2, R2, L>(
168        self,
169        logic: L,
170    ) -> VecCollection<'scope, T, D2, <R2 as Multiply<R>>::Output>
171    where
172        D2: differential_dataflow::Data,
173        R2: Semigroup + Multiply<R>,
174        <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
175        L: FnMut(D1) -> (D2, R2) + 'static,
176        T: Lattice;
177
178    /// Partitions the input into a monotonic collection and
179    /// non-monotone exceptions, with respect to differences.
180    ///
181    /// The exceptions are transformed by `into_err`.
182    fn ensure_monotonic<E, IE>(
183        self,
184        into_err: IE,
185    ) -> (
186        VecCollection<'scope, T, D1, R>,
187        VecCollection<'scope, T, E, R>,
188    )
189    where
190        E: Clone + 'static,
191        IE: Fn(D1, R) -> (E, R) + 'static,
192        R: num_traits::sign::Signed;
193
194    /// Consolidates the collection if `must_consolidate` is `true` and leaves it
195    /// untouched otherwise.
196    fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
197    where
198        D1: differential_dataflow::ExchangeData + Hash + Columnation,
199        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
200        T: Lattice + Columnation,
201        Ba: Batcher<Time = T, Output = ColumnationStack<((D1, ()), T, R)>> + 'static;
202
203    /// Consolidates the collection.
204    fn consolidate_named<Ba>(self, name: &str) -> Self
205    where
206        D1: differential_dataflow::ExchangeData + Hash + Columnation,
207        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
208        T: Lattice + Columnation,
209        Ba: Batcher<Time = T, Output = ColumnationStack<((D1, ()), T, R)>> + 'static;
210}
211
212impl<'scope, T, C1> StreamExt<'scope, T, C1> for Stream<'scope, T, C1>
213where
214    T: Timestamp,
215    C1: Container + DrainContainer + Clone + 'static,
216{
217    fn unary_fallible<DCB, ECB, B, P>(
218        self,
219        pact: P,
220        name: &str,
221        constructor: B,
222    ) -> (
223        Stream<'scope, T, DCB::Container>,
224        Stream<'scope, T, ECB::Container>,
225    )
226    where
227        DCB: ContainerBuilder,
228        ECB: ContainerBuilder,
229        B: FnOnce(
230            Capability<T>,
231            OperatorInfo,
232        ) -> Box<
233            dyn FnMut(
234                    &mut InputHandleCore<T, C1, P::Puller>,
235                    &mut OutputBuilderSession<'_, T, DCB>,
236                    &mut OutputBuilderSession<'_, T, ECB>,
237                ) + 'static,
238        >,
239        P: ParallelizationContract<T, C1>,
240    {
241        let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
242
243        let operator_info = builder.operator_info();
244
245        let mut input = builder.new_input(self.clone(), pact);
246        builder.set_notify_for(0, FrontierInterest::Never);
247        let (ok_output, ok_stream) = builder.new_output();
248        let mut ok_output = OutputBuilder::from(ok_output);
249        let (err_output, err_stream) = builder.new_output();
250        let mut err_output = OutputBuilder::from(err_output);
251
252        builder.build(move |mut capabilities| {
253            // `capabilities` should be a single-element vector.
254            let capability = capabilities.pop().unwrap();
255            let mut logic = constructor(capability, operator_info);
256            move |_frontiers| {
257                let mut ok_output_handle = ok_output.activate();
258                let mut err_output_handle = err_output.activate();
259                logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
260            }
261        });
262
263        (ok_stream, err_stream)
264    }
265
266    // XXX(guswynn): file an minimization bug report for the logic flat_map
267    // false positive here
268    // TODO(guswynn): remove this after https://github.com/rust-lang/rust-clippy/issues/8098 is
269    // resolved. The `logic` `FnMut` needs to be borrowed in the `flat_map` call, not moved in
270    // so the simple `|d1| logic(d1)` closure is load-bearing
271    #[allow(clippy::redundant_closure)]
272    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
273        self,
274        name: &str,
275        mut logic: L,
276    ) -> (
277        Stream<'scope, T, DCB::Container>,
278        Stream<'scope, T, ECB::Container>,
279    )
280    where
281        DCB: ContainerBuilder + PushInto<D2>,
282        ECB: ContainerBuilder + PushInto<E>,
283        I: IntoIterator<Item = Result<D2, E>>,
284        L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,
285    {
286        self.unary_fallible::<DCB, ECB, _, _>(Pipeline, name, move |_, _| {
287            Box::new(move |input, ok_output, err_output| {
288                input.for_each_time(|time, data| {
289                    let mut ok_session = ok_output.session_with_builder(&time);
290                    let mut err_session = err_output.session_with_builder(&time);
291                    for r in data
292                        .flat_map(DrainContainer::drain)
293                        .flat_map(|d1| logic(d1))
294                    {
295                        match r {
296                            Ok(d2) => ok_session.give(d2),
297                            Err(e) => err_session.give(e),
298                        }
299                    }
300                })
301            })
302        })
303    }
304
305    fn expire_stream_at(self, name: &str, expiration: T) -> Stream<'scope, T, C1> {
306        let name = format!("expire_stream_at({name})");
307        self.unary_frontier(Pipeline, &name.clone(), move |cap, _| {
308            // Retain a capability for the expiration time, which we'll only drop if the token
309            // is dropped. Else, block progress at the expiration time to prevent downstream
310            // operators from making any statement about expiration time or any following time.
311            let cap = Some(cap.delayed(&expiration));
312            let mut warned = false;
313            move |(input, frontier), output| {
314                let _ = &cap;
315                let frontier = frontier.frontier();
316                if !frontier.less_than(&expiration) && !warned {
317                    // Here, we print a warning, not an error. The state is only a liveness
318                    // concern, but not relevant for correctness. Additionally, a race between
319                    // shutting down the dataflow and dropping the token can cause the dataflow
320                    // to shut down before we drop the token.  This can happen when dropping
321                    // the last remaining capability on a different worker.  We do not want to
322                    // log an error every time this happens.
323
324                    tracing::warn!(
325                        name = name,
326                        frontier = ?frontier,
327                        expiration = ?expiration,
328                        "frontier not less than expiration"
329                    );
330                    warned = true;
331                }
332                input.for_each(|time, data| {
333                    let mut session = output.session(&time);
334                    session.give_container(data);
335                });
336            }
337        })
338    }
339}
340
341impl<'scope, T, D1, R> CollectionExt<'scope, T, D1, R> for VecCollection<'scope, T, D1, R>
342where
343    T: Timestamp + Clone + 'static,
344    D1: Clone + 'static,
345    R: Semigroup + 'static,
346{
347    fn empty(scope: Scope<'scope, T>) -> VecCollection<'scope, T, D1, R> {
348        operator::empty(scope).as_collection()
349    }
350
351    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
352        self,
353        name: &str,
354        mut logic: L,
355    ) -> (
356        Collection<'scope, T, DCB::Container>,
357        Collection<'scope, T, ECB::Container>,
358    )
359    where
360        DCB: ContainerBuilder + PushInto<(D2, T, R)>,
361        ECB: ContainerBuilder + PushInto<(E, T, R)>,
362        D2: Clone + 'static,
363        E: Clone + 'static,
364        I: IntoIterator<Item = Result<D2, E>>,
365        L: FnMut(D1) -> I + 'static,
366    {
367        let (ok_stream, err_stream) =
368            self.inner
369                .flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |(d1, t, r)| {
370                    logic(d1).into_iter().map(move |res| match res {
371                        Ok(d2) => Ok((d2, t.clone(), r.clone())),
372                        Err(e) => Err((e, t.clone(), r.clone())),
373                    })
374                });
375        (ok_stream.as_collection(), err_stream.as_collection())
376    }
377
378    fn expire_collection_at(self, name: &str, expiration: T) -> VecCollection<'scope, T, D1, R> {
379        self.inner
380            .expire_stream_at(name, expiration)
381            .as_collection()
382    }
383
384    fn explode_one<D2, R2, L>(
385        self,
386        mut logic: L,
387    ) -> VecCollection<'scope, T, D2, <R2 as Multiply<R>>::Output>
388    where
389        D2: differential_dataflow::Data,
390        R2: Semigroup + Multiply<R>,
391        <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
392        L: FnMut(D1) -> (D2, R2) + 'static,
393        T: Lattice,
394    {
395        self.inner
396            .clone()
397            .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(
398                Pipeline,
399                "ExplodeOne",
400                move |_, _| {
401                    move |input, output| {
402                        input.for_each(|time, data| {
403                            output
404                                .session_with_builder(&time)
405                                .give_iterator(data.drain(..).map(|(x, t, d)| {
406                                    let (x, d2) = logic(x);
407                                    (x, t, d2.multiply(&d))
408                                }));
409                        });
410                    }
411                },
412            )
413            .as_collection()
414    }
415
416    fn ensure_monotonic<E, IE>(
417        self,
418        into_err: IE,
419    ) -> (
420        VecCollection<'scope, T, D1, R>,
421        VecCollection<'scope, T, E, R>,
422    )
423    where
424        E: Clone + 'static,
425        IE: Fn(D1, R) -> (E, R) + 'static,
426        R: num_traits::sign::Signed,
427    {
428        let (oks, errs) = self
429            .inner
430            .unary_fallible(Pipeline, "EnsureMonotonic", move |_, _| {
431                Box::new(move |input, ok_output, err_output| {
432                    input.for_each(|time, data| {
433                        let mut ok_session = ok_output.session(&time);
434                        let mut err_session = err_output.session(&time);
435                        for (x, t, d) in data.drain(..) {
436                            if d.is_positive() {
437                                ok_session.give((x, t, d))
438                            } else {
439                                let (e, d2) = into_err(x, d);
440                                err_session.give((e, t, d2))
441                            }
442                        }
443                    })
444                })
445            });
446        (oks.as_collection(), errs.as_collection())
447    }
448
449    fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
450    where
451        D1: differential_dataflow::ExchangeData + Hash + Columnation,
452        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
453        T: Lattice + Ord + Columnation,
454        Ba: Batcher<Time = T, Output = ColumnationStack<((D1, ()), T, R)>> + 'static,
455    {
456        if must_consolidate {
457            // We employ AHash below instead of the default hasher in DD to obtain
458            // a better distribution of data to workers. AHash claims empirically
459            // both speed and high quality, according to
460            // https://github.com/tkaitchuck/aHash/blob/master/compare/readme.md.
461            // TODO(vmarcos): Consider here if it is worth it to spend the time to
462            // implement twisted tabulation hashing as proposed in Mihai Patrascu,
463            // Mikkel Thorup: Twisted Tabulation Hashing. SODA 2013: 209-228, available
464            // at https://epubs.siam.org/doi/epdf/10.1137/1.9781611973105.16. The latter
465            // would provide good bounds for balls-into-bins problems when the number of
466            // bins is small (as is our case), so we'd have a theoretical guarantee.
467            // NOTE: We fix the seeds of a RandomState instance explicity with the same
468            // seeds that would be given by `AHash` via ahash::AHasher::default() so as
469            // to avoid a different selection due to compile-time features being differently
470            // selected in other dependencies using `AHash` vis-à-vis cargo's strategy
471            // of unioning features.
472            // NOTE: Depending on target features, we may end up employing the fallback
473            // hasher of `AHash`, but it should be sufficient for our needs.
474            let random_state = ahash::RandomState::with_seeds(
475                0x243f_6a88_85a3_08d3,
476                0x1319_8a2e_0370_7344,
477                0xa409_3822_299f_31d0,
478                0x082e_fa98_ec4e_6c89,
479            );
480            let exchange = Exchange::new(move |update: &((D1, _), T, R)| {
481                let data = &(update.0).0;
482                let mut h = random_state.build_hasher();
483                data.hash(&mut h);
484                h.finish()
485            });
486            consolidate_pact::<ColumnationChunker<((D1, ()), T, R)>, Ba, _, _>(
487                self.map(|k| (k, ())).inner,
488                exchange,
489                name,
490            )
491            .unary(Pipeline, "unpack consolidated", |_, _| {
492                |input, output| {
493                    input.for_each(|time, data| {
494                        let mut session = output.session(&time);
495                        for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
496                        {
497                            session.give((k.clone(), t.clone(), d.clone()))
498                        }
499                    })
500                }
501            })
502            .as_collection()
503        } else {
504            self
505        }
506    }
507
508    fn consolidate_named<Ba>(self, name: &str) -> Self
509    where
510        D1: differential_dataflow::ExchangeData + Hash + Columnation,
511        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
512        T: Lattice + Ord + Columnation,
513        Ba: Batcher<Time = T, Output = ColumnationStack<((D1, ()), T, R)>> + 'static,
514    {
515        let exchange = Exchange::new(move |update: &((D1, ()), T, R)| (update.0).0.hashed());
516
517        consolidate_pact::<ColumnationChunker<((D1, ()), T, R)>, Ba, _, _>(
518            self.map(|k| (k, ())).inner,
519            exchange,
520            name,
521        )
522        .unary(Pipeline, &format!("Unpack {name}"), |_, _| {
523            |input, output| {
524                input.for_each(|time, data| {
525                    let mut session = output.session(&time);
526                    for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter()) {
527                        session.give((k.clone(), t.clone(), d.clone()))
528                    }
529                })
530            }
531        })
532        .as_collection()
533    }
534}
535
536/// Aggregates the weights of equal records into at most one record.
537///
538/// Produces a stream of chains of records, partitioned according to `pact`. The
539/// data is sorted according to `Ba`. For each timestamp, it produces at most one chain.
540///
541/// The data are accumulated in place, each held back until their timestamp has completed.
542pub fn consolidate_pact<'scope, Chu, Ba, C, P>(
543    stream: Stream<'scope, Ba::Time, C>,
544    pact: P,
545    name: &str,
546) -> StreamVec<'scope, Ba::Time, Vec<Ba::Output>>
547where
548    Ba: Batcher + 'static,
549    Chu: ContainerBuilder<Container = Ba::Output> + for<'a> PushInto<&'a mut C> + 'static,
550    C: Container + Clone + 'static,
551    Ba::Output: Clone,
552    P: ParallelizationContract<Ba::Time, C>,
553{
554    let logger = stream
555        .scope()
556        .worker()
557        .logger_for("differential/arrange")
558        .map(Into::into);
559    stream.unary_frontier(pact, name, |_cap, info| {
560        // Acquire a logger for arrange events.
561
562        let mut batcher = Ba::new(logger, info.global_id);
563        // The chunker consolidates raw input containers into the chunks the
564        // batcher consumes.
565        let mut chunker = Chu::default();
566        // Capabilities for the lower envelope of updates in `batcher`.
567        let mut capabilities = Antichain::<Capability<Ba::Time>>::new();
568        let mut prev_frontier = Antichain::from_elem(Ba::Time::minimum());
569
570        move |(input, frontier), output| {
571            input.for_each(|cap, data| {
572                capabilities.insert(cap.retain(0));
573                chunker.push_into(data);
574                while let Some(chunk) = chunker.extract() {
575                    batcher.push_into(std::mem::take(chunk));
576                }
577            });
578
579            if prev_frontier.borrow() != frontier.frontier() {
580                // Flush any data the chunker is still accumulating into the
581                // batcher before we seal.
582                while let Some(chunk) = chunker.finish() {
583                    batcher.push_into(std::mem::take(chunk));
584                }
585
586                if capabilities
587                    .elements()
588                    .iter()
589                    .any(|c| !frontier.less_equal(c.time()))
590                {
591                    let mut upper = Antichain::new(); // re-used allocation for sealing batches.
592
593                    // For each capability not in advance of the input frontier ...
594                    for (index, capability) in capabilities.elements().iter().enumerate() {
595                        if !frontier.less_equal(capability.time()) {
596                            // Assemble the upper bound on times we can commit with this capabilities.
597                            // We must respect the input frontier, and *subsequent* capabilities, as
598                            // we are pretending to retire the capability changes one by one.
599                            upper.clear();
600                            for time in frontier.frontier().iter() {
601                                upper.insert(time.clone());
602                            }
603                            for other_capability in &capabilities.elements()[(index + 1)..] {
604                                upper.insert(other_capability.time().clone());
605                            }
606
607                            // send the batch to downstream consumers, empty or not.
608                            let mut session = output.session(&capabilities.elements()[index]);
609                            // Extract updates not in advance of `upper`.
610                            let (chain, _description) = batcher.seal(upper.clone());
611                            session.give(chain);
612                        }
613                    }
614
615                    // Having extracted and sent batches between each capability and the input frontier,
616                    // we should downgrade all capabilities to match the batcher's lower update frontier.
617                    // This may involve discarding capabilities, which is fine as any new updates arrive
618                    // in messages with new capabilities.
619
620                    let mut new_capabilities = Antichain::new();
621                    for time in batcher.frontier().iter() {
622                        if let Some(capability) = capabilities
623                            .elements()
624                            .iter()
625                            .find(|c| c.time().less_equal(time))
626                        {
627                            new_capabilities.insert(capability.delayed(time));
628                        } else {
629                            panic!("failed to find capability");
630                        }
631                    }
632
633                    capabilities = new_capabilities;
634                }
635
636                prev_frontier.clear();
637                prev_frontier.extend(frontier.frontier().iter().cloned());
638            }
639        }
640    })
641}
642
643/// Merge the contents of multiple streams and combine the containers using a container builder.
644pub trait ConcatenateFlatten<'scope, T: Timestamp, C: Container + DrainContainer> {
645    /// Merge the contents of multiple streams and use the provided container builder to form
646    /// output containers.
647    ///
648    /// # Examples
649    /// ```
650    /// use timely::container::CapacityContainerBuilder;
651    /// use timely::dataflow::operators::{ToStream, Inspect};
652    /// use mz_timely_util::operator::ConcatenateFlatten;
653    ///
654    /// timely::example(|scope| {
655    ///
656    ///     let streams: Vec<timely::dataflow::StreamVec<_, i32>> =
657    ///         vec![(0..10).to_stream(scope),
658    ///              (0..10).to_stream(scope),
659    ///              (0..10).to_stream(scope)];
660    ///
661    ///     scope.concatenate_flatten::<_, CapacityContainerBuilder<Vec<i32>>>(streams)
662    ///          .inspect(|x| println!("seen: {:?}", x));
663    /// });
664    /// ```
665    fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<'scope, T, CB::Container>
666    where
667        I: IntoIterator<Item = Stream<'scope, T, C>>,
668        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>;
669}
670
671impl<'scope, T, C> ConcatenateFlatten<'scope, T, C> for Stream<'scope, T, C>
672where
673    T: Timestamp,
674    C: Container + DrainContainer + Clone + 'static,
675{
676    fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<'scope, T, CB::Container>
677    where
678        I: IntoIterator<Item = Stream<'scope, T, C>>,
679        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
680    {
681        self.scope()
682            .concatenate_flatten::<_, CB>(Some(Clone::clone(self)).into_iter().chain(sources))
683    }
684}
685
686impl<'scope, T, C> ConcatenateFlatten<'scope, T, C> for Scope<'scope, T>
687where
688    T: Timestamp,
689    C: Container + DrainContainer,
690{
691    fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<'scope, T, CB::Container>
692    where
693        I: IntoIterator<Item = Stream<'scope, T, C>>,
694        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
695    {
696        let mut builder = OperatorBuilder::new("ConcatenateFlatten".to_string(), self.clone());
697
698        // create new input handles for each input stream.
699        let mut handles = sources
700            .into_iter()
701            .map(|s| builder.new_input(s, Pipeline))
702            .collect::<Vec<_>>();
703        for i in 0..handles.len() {
704            builder.set_notify_for(i, FrontierInterest::Never);
705        }
706
707        // create one output handle for the concatenated results.
708        let (output, result) = builder.new_output::<CB::Container>();
709        let mut output = OutputBuilder::<_, CB>::from(output);
710
711        builder.build(move |_capability| {
712            move |_frontier| {
713                let mut output = output.activate();
714                for handle in handles.iter_mut() {
715                    handle.for_each_time(|time, data| {
716                        output
717                            .session_with_builder(&time)
718                            .give_iterator(data.flat_map(DrainContainer::drain));
719                    })
720                }
721            }
722        });
723
724        result
725    }
726}
727
728/// A trait for containers that can be cleared.
729pub trait ClearContainer {
730    /// Clear the contents of the container.
731    fn clear(&mut self);
732}
733
734impl<T> ClearContainer for Vec<T> {
735    fn clear(&mut self) {
736        Vec::clear(self)
737    }
738}