Skip to main content

mz_timely_util/
operator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Common operator transformations on timely streams and differential collections.
17
18use std::hash::{BuildHasher, Hash, Hasher};
19use std::marker::PhantomData;
20
21use columnation::Columnation;
22use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
23use differential_dataflow::difference::{Multiply, Semigroup};
24use differential_dataflow::lattice::Lattice;
25use differential_dataflow::trace::{Batcher, Builder, Description};
26use differential_dataflow::{AsCollection, Collection, Hashable, VecCollection};
27use timely::container::{DrainContainer, PushInto};
28use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
29use timely::dataflow::operators::Capability;
30use timely::dataflow::operators::generic::builder_rc::{
31    OperatorBuilder as OperatorBuilderRc, OperatorBuilder,
32};
33use timely::dataflow::operators::generic::operator::{self, Operator};
34use timely::dataflow::operators::generic::{
35    InputHandleCore, OperatorInfo, OutputBuilder, OutputBuilderSession,
36};
37use timely::dataflow::{Scope, Stream, StreamVec};
38use timely::progress::operate::FrontierInterest;
39use timely::progress::{Antichain, Timestamp};
40use timely::{Container, ContainerBuilder, PartialOrder};
41
42use crate::columnation::ColumnationStack;
43
44/// Extension methods for timely [`Stream`]s.
45pub trait StreamExt<'scope, T, C1>
46where
47    T: Timestamp,
48    C1: Container + DrainContainer + Clone + 'static,
49{
50    /// Like `timely::dataflow::operators::generic::operator::Operator::unary`,
51    /// but the logic function can handle failures.
52    ///
53    /// Creates a new dataflow operator that partitions its input stream by a
54    /// parallelization strategy `pact` and repeatedly invokes `logic`, the
55    /// function returned by the function passed as `constructor`. The `logic`
56    /// function can read to the input stream and write to either of two output
57    /// streams, where the first output stream represents successful
58    /// computations and the second output stream represents failed
59    /// computations.
60    fn unary_fallible<DCB, ECB, B, P>(
61        self,
62        pact: P,
63        name: &str,
64        constructor: B,
65    ) -> (
66        Stream<'scope, T, DCB::Container>,
67        Stream<'scope, T, ECB::Container>,
68    )
69    where
70        DCB: ContainerBuilder,
71        ECB: ContainerBuilder,
72        B: FnOnce(
73            Capability<T>,
74            OperatorInfo,
75        ) -> Box<
76            dyn FnMut(
77                    &mut InputHandleCore<T, C1, P::Puller>,
78                    &mut OutputBuilderSession<'_, T, DCB>,
79                    &mut OutputBuilderSession<'_, T, ECB>,
80                ) + 'static,
81        >,
82        P: ParallelizationContract<T, C1>;
83
84    /// Like [`timely::dataflow::operators::vec::Map::flat_map`], but `logic`
85    /// is allowed to fail. The first returned stream will contain the
86    /// successful applications of `logic`, while the second returned stream
87    /// will contain the failed applications.
88    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
89        self,
90        name: &str,
91        logic: L,
92    ) -> (
93        Stream<'scope, T, DCB::Container>,
94        Stream<'scope, T, ECB::Container>,
95    )
96    where
97        DCB: ContainerBuilder + PushInto<D2>,
98        ECB: ContainerBuilder + PushInto<E>,
99        I: IntoIterator<Item = Result<D2, E>>,
100        L: for<'a> FnMut(C1::Item<'a>) -> I + 'static;
101
102    /// Block progress of the frontier at `expiration` time
103    fn expire_stream_at(self, name: &str, expiration: T) -> Stream<'scope, T, C1>;
104}
105
106/// Extension methods for differential [`Collection`]s.
107pub trait CollectionExt<'scope, T, D1, R>: Sized
108where
109    T: Timestamp,
110    R: Semigroup,
111{
112    /// Creates a new empty collection in `scope`.
113    fn empty(scope: Scope<'scope, T>) -> VecCollection<'scope, T, D1, R>;
114
115    /// Like [`Collection::map`], but `logic` is allowed to fail. The first
116    /// returned collection will contain successful applications of `logic`,
117    /// while the second returned collection will contain the failed
118    /// applications.
119    ///
120    /// Callers need to specify the following type parameters:
121    /// * `DCB`: The container builder for the `Ok` output.
122    /// * `ECB`: The container builder for the `Err` output.
123    fn map_fallible<DCB, ECB, D2, E, L>(
124        self,
125        name: &str,
126        mut logic: L,
127    ) -> (
128        VecCollection<'scope, T, D2, R>,
129        VecCollection<'scope, T, E, R>,
130    )
131    where
132        DCB: ContainerBuilder<Container = Vec<(D2, T, R)>> + PushInto<(D2, T, R)>,
133        ECB: ContainerBuilder<Container = Vec<(E, T, R)>> + PushInto<(E, T, R)>,
134        D2: Clone + 'static,
135        E: Clone + 'static,
136        L: FnMut(D1) -> Result<D2, E> + 'static,
137    {
138        self.flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |record| Some(logic(record)))
139    }
140
141    /// Like [`Collection::flat_map`], but `logic` is allowed to fail. The first
142    /// returned collection will contain the successful applications of `logic`,
143    /// while the second returned collection will contain the failed
144    /// applications.
145    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
146        self,
147        name: &str,
148        logic: L,
149    ) -> (
150        Collection<'scope, T, DCB::Container>,
151        Collection<'scope, T, ECB::Container>,
152    )
153    where
154        DCB: ContainerBuilder + PushInto<(D2, T, R)>,
155        ECB: ContainerBuilder + PushInto<(E, T, R)>,
156        D2: Clone + 'static,
157        E: Clone + 'static,
158        I: IntoIterator<Item = Result<D2, E>>,
159        L: FnMut(D1) -> I + 'static;
160
161    /// Block progress of the frontier at `expiration` time.
162    fn expire_collection_at(self, name: &str, expiration: T) -> VecCollection<'scope, T, D1, R>;
163
164    /// Replaces each record with another, with a new difference type.
165    ///
166    /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
167    /// and move the data into the difference component. This will allow differential dataflow to update in-place.
168    fn explode_one<D2, R2, L>(
169        self,
170        logic: L,
171    ) -> VecCollection<'scope, T, D2, <R2 as Multiply<R>>::Output>
172    where
173        D2: differential_dataflow::Data,
174        R2: Semigroup + Multiply<R>,
175        <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
176        L: FnMut(D1) -> (D2, R2) + 'static,
177        T: Lattice;
178
179    /// Partitions the input into a monotonic collection and
180    /// non-monotone exceptions, with respect to differences.
181    ///
182    /// The exceptions are transformed by `into_err`.
183    fn ensure_monotonic<E, IE>(
184        self,
185        into_err: IE,
186    ) -> (
187        VecCollection<'scope, T, D1, R>,
188        VecCollection<'scope, T, E, R>,
189    )
190    where
191        E: Clone + 'static,
192        IE: Fn(D1, R) -> (E, R) + 'static,
193        R: num_traits::sign::Signed;
194
195    /// Consolidates the collection if `must_consolidate` is `true` and leaves it
196    /// untouched otherwise.
197    fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
198    where
199        D1: differential_dataflow::ExchangeData + Hash + Columnation,
200        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
201        T: Lattice + Columnation,
202        Ba: Batcher<
203                Input = Vec<((D1, ()), T, R)>,
204                Output = ColumnationStack<((D1, ()), T, R)>,
205                Time = T,
206            > + 'static;
207
208    /// Consolidates the collection.
209    fn consolidate_named<Ba>(self, name: &str) -> Self
210    where
211        D1: differential_dataflow::ExchangeData + Hash + Columnation,
212        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
213        T: Lattice + Columnation,
214        Ba: Batcher<
215                Input = Vec<((D1, ()), T, R)>,
216                Output = ColumnationStack<((D1, ()), T, R)>,
217                Time = T,
218            > + 'static;
219}
220
221impl<'scope, T, C1> StreamExt<'scope, T, C1> for Stream<'scope, T, C1>
222where
223    T: Timestamp,
224    C1: Container + DrainContainer + Clone + 'static,
225{
226    fn unary_fallible<DCB, ECB, B, P>(
227        self,
228        pact: P,
229        name: &str,
230        constructor: B,
231    ) -> (
232        Stream<'scope, T, DCB::Container>,
233        Stream<'scope, T, ECB::Container>,
234    )
235    where
236        DCB: ContainerBuilder,
237        ECB: ContainerBuilder,
238        B: FnOnce(
239            Capability<T>,
240            OperatorInfo,
241        ) -> Box<
242            dyn FnMut(
243                    &mut InputHandleCore<T, C1, P::Puller>,
244                    &mut OutputBuilderSession<'_, T, DCB>,
245                    &mut OutputBuilderSession<'_, T, ECB>,
246                ) + 'static,
247        >,
248        P: ParallelizationContract<T, C1>,
249    {
250        let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
251
252        let operator_info = builder.operator_info();
253
254        let mut input = builder.new_input(self.clone(), pact);
255        builder.set_notify_for(0, FrontierInterest::Never);
256        let (ok_output, ok_stream) = builder.new_output();
257        let mut ok_output = OutputBuilder::from(ok_output);
258        let (err_output, err_stream) = builder.new_output();
259        let mut err_output = OutputBuilder::from(err_output);
260
261        builder.build(move |mut capabilities| {
262            // `capabilities` should be a single-element vector.
263            let capability = capabilities.pop().unwrap();
264            let mut logic = constructor(capability, operator_info);
265            move |_frontiers| {
266                let mut ok_output_handle = ok_output.activate();
267                let mut err_output_handle = err_output.activate();
268                logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
269            }
270        });
271
272        (ok_stream, err_stream)
273    }
274
275    // XXX(guswynn): file an minimization bug report for the logic flat_map
276    // false positive here
277    // TODO(guswynn): remove this after https://github.com/rust-lang/rust-clippy/issues/8098 is
278    // resolved. The `logic` `FnMut` needs to be borrowed in the `flat_map` call, not moved in
279    // so the simple `|d1| logic(d1)` closure is load-bearing
280    #[allow(clippy::redundant_closure)]
281    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
282        self,
283        name: &str,
284        mut logic: L,
285    ) -> (
286        Stream<'scope, T, DCB::Container>,
287        Stream<'scope, T, ECB::Container>,
288    )
289    where
290        DCB: ContainerBuilder + PushInto<D2>,
291        ECB: ContainerBuilder + PushInto<E>,
292        I: IntoIterator<Item = Result<D2, E>>,
293        L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,
294    {
295        self.unary_fallible::<DCB, ECB, _, _>(Pipeline, name, move |_, _| {
296            Box::new(move |input, ok_output, err_output| {
297                input.for_each_time(|time, data| {
298                    let mut ok_session = ok_output.session_with_builder(&time);
299                    let mut err_session = err_output.session_with_builder(&time);
300                    for r in data
301                        .flat_map(DrainContainer::drain)
302                        .flat_map(|d1| logic(d1))
303                    {
304                        match r {
305                            Ok(d2) => ok_session.give(d2),
306                            Err(e) => err_session.give(e),
307                        }
308                    }
309                })
310            })
311        })
312    }
313
314    fn expire_stream_at(self, name: &str, expiration: T) -> Stream<'scope, T, C1> {
315        let name = format!("expire_stream_at({name})");
316        self.unary_frontier(Pipeline, &name.clone(), move |cap, _| {
317            // Retain a capability for the expiration time, which we'll only drop if the token
318            // is dropped. Else, block progress at the expiration time to prevent downstream
319            // operators from making any statement about expiration time or any following time.
320            let cap = Some(cap.delayed(&expiration));
321            let mut warned = false;
322            move |(input, frontier), output| {
323                let _ = &cap;
324                let frontier = frontier.frontier();
325                if !frontier.less_than(&expiration) && !warned {
326                    // Here, we print a warning, not an error. The state is only a liveness
327                    // concern, but not relevant for correctness. Additionally, a race between
328                    // shutting down the dataflow and dropping the token can cause the dataflow
329                    // to shut down before we drop the token.  This can happen when dropping
330                    // the last remaining capability on a different worker.  We do not want to
331                    // log an error every time this happens.
332
333                    tracing::warn!(
334                        name = name,
335                        frontier = ?frontier,
336                        expiration = ?expiration,
337                        "frontier not less than expiration"
338                    );
339                    warned = true;
340                }
341                input.for_each(|time, data| {
342                    let mut session = output.session(&time);
343                    session.give_container(data);
344                });
345            }
346        })
347    }
348}
349
350impl<'scope, T, D1, R> CollectionExt<'scope, T, D1, R> for VecCollection<'scope, T, D1, R>
351where
352    T: Timestamp + Clone + 'static,
353    D1: Clone + 'static,
354    R: Semigroup + 'static,
355{
356    fn empty(scope: Scope<'scope, T>) -> VecCollection<'scope, T, D1, R> {
357        operator::empty(scope).as_collection()
358    }
359
360    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
361        self,
362        name: &str,
363        mut logic: L,
364    ) -> (
365        Collection<'scope, T, DCB::Container>,
366        Collection<'scope, T, ECB::Container>,
367    )
368    where
369        DCB: ContainerBuilder + PushInto<(D2, T, R)>,
370        ECB: ContainerBuilder + PushInto<(E, T, R)>,
371        D2: Clone + 'static,
372        E: Clone + 'static,
373        I: IntoIterator<Item = Result<D2, E>>,
374        L: FnMut(D1) -> I + 'static,
375    {
376        let (ok_stream, err_stream) =
377            self.inner
378                .flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |(d1, t, r)| {
379                    logic(d1).into_iter().map(move |res| match res {
380                        Ok(d2) => Ok((d2, t.clone(), r.clone())),
381                        Err(e) => Err((e, t.clone(), r.clone())),
382                    })
383                });
384        (ok_stream.as_collection(), err_stream.as_collection())
385    }
386
387    fn expire_collection_at(self, name: &str, expiration: T) -> VecCollection<'scope, T, D1, R> {
388        self.inner
389            .expire_stream_at(name, expiration)
390            .as_collection()
391    }
392
393    fn explode_one<D2, R2, L>(
394        self,
395        mut logic: L,
396    ) -> VecCollection<'scope, T, D2, <R2 as Multiply<R>>::Output>
397    where
398        D2: differential_dataflow::Data,
399        R2: Semigroup + Multiply<R>,
400        <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
401        L: FnMut(D1) -> (D2, R2) + 'static,
402        T: Lattice,
403    {
404        self.inner
405            .clone()
406            .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(
407                Pipeline,
408                "ExplodeOne",
409                move |_, _| {
410                    move |input, output| {
411                        input.for_each(|time, data| {
412                            output
413                                .session_with_builder(&time)
414                                .give_iterator(data.drain(..).map(|(x, t, d)| {
415                                    let (x, d2) = logic(x);
416                                    (x, t, d2.multiply(&d))
417                                }));
418                        });
419                    }
420                },
421            )
422            .as_collection()
423    }
424
425    fn ensure_monotonic<E, IE>(
426        self,
427        into_err: IE,
428    ) -> (
429        VecCollection<'scope, T, D1, R>,
430        VecCollection<'scope, T, E, R>,
431    )
432    where
433        E: Clone + 'static,
434        IE: Fn(D1, R) -> (E, R) + 'static,
435        R: num_traits::sign::Signed,
436    {
437        let (oks, errs) = self
438            .inner
439            .unary_fallible(Pipeline, "EnsureMonotonic", move |_, _| {
440                Box::new(move |input, ok_output, err_output| {
441                    input.for_each(|time, data| {
442                        let mut ok_session = ok_output.session(&time);
443                        let mut err_session = err_output.session(&time);
444                        for (x, t, d) in data.drain(..) {
445                            if d.is_positive() {
446                                ok_session.give((x, t, d))
447                            } else {
448                                let (e, d2) = into_err(x, d);
449                                err_session.give((e, t, d2))
450                            }
451                        }
452                    })
453                })
454            });
455        (oks.as_collection(), errs.as_collection())
456    }
457
458    fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
459    where
460        D1: differential_dataflow::ExchangeData + Hash + Columnation,
461        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
462        T: Lattice + Ord + Columnation,
463        Ba: Batcher<
464                Input = Vec<((D1, ()), T, R)>,
465                Output = ColumnationStack<((D1, ()), T, R)>,
466                Time = T,
467            > + 'static,
468    {
469        if must_consolidate {
470            // We employ AHash below instead of the default hasher in DD to obtain
471            // a better distribution of data to workers. AHash claims empirically
472            // both speed and high quality, according to
473            // https://github.com/tkaitchuck/aHash/blob/master/compare/readme.md.
474            // TODO(vmarcos): Consider here if it is worth it to spend the time to
475            // implement twisted tabulation hashing as proposed in Mihai Patrascu,
476            // Mikkel Thorup: Twisted Tabulation Hashing. SODA 2013: 209-228, available
477            // at https://epubs.siam.org/doi/epdf/10.1137/1.9781611973105.16. The latter
478            // would provide good bounds for balls-into-bins problems when the number of
479            // bins is small (as is our case), so we'd have a theoretical guarantee.
480            // NOTE: We fix the seeds of a RandomState instance explicity with the same
481            // seeds that would be given by `AHash` via ahash::AHasher::default() so as
482            // to avoid a different selection due to compile-time features being differently
483            // selected in other dependencies using `AHash` vis-à-vis cargo's strategy
484            // of unioning features.
485            // NOTE: Depending on target features, we may end up employing the fallback
486            // hasher of `AHash`, but it should be sufficient for our needs.
487            let random_state = ahash::RandomState::with_seeds(
488                0x243f_6a88_85a3_08d3,
489                0x1319_8a2e_0370_7344,
490                0xa409_3822_299f_31d0,
491                0x082e_fa98_ec4e_6c89,
492            );
493            let exchange = Exchange::new(move |update: &((D1, _), T, R)| {
494                let data = &(update.0).0;
495                let mut h = random_state.build_hasher();
496                data.hash(&mut h);
497                h.finish()
498            });
499            consolidate_pact::<Ba, _>(self.map(|k| (k, ())).inner, exchange, name)
500                .unary(Pipeline, "unpack consolidated", |_, _| {
501                    |input, output| {
502                        input.for_each(|time, data| {
503                            let mut session = output.session(&time);
504                            for ((k, ()), t, d) in
505                                data.iter().flatten().flat_map(|chunk| chunk.iter())
506                            {
507                                session.give((k.clone(), t.clone(), d.clone()))
508                            }
509                        })
510                    }
511                })
512                .as_collection()
513        } else {
514            self
515        }
516    }
517
518    fn consolidate_named<Ba>(self, name: &str) -> Self
519    where
520        D1: differential_dataflow::ExchangeData + Hash + Columnation,
521        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
522        T: Lattice + Ord + Columnation,
523        Ba: Batcher<
524                Input = Vec<((D1, ()), T, R)>,
525                Output = ColumnationStack<((D1, ()), T, R)>,
526                Time = T,
527            > + 'static,
528    {
529        let exchange = Exchange::new(move |update: &((D1, ()), T, R)| (update.0).0.hashed());
530
531        consolidate_pact::<Ba, _>(self.map(|k| (k, ())).inner, exchange, name)
532            .unary(Pipeline, &format!("Unpack {name}"), |_, _| {
533                |input, output| {
534                    input.for_each(|time, data| {
535                        let mut session = output.session(&time);
536                        for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
537                        {
538                            session.give((k.clone(), t.clone(), d.clone()))
539                        }
540                    })
541                }
542            })
543            .as_collection()
544    }
545}
546
547/// Aggregates the weights of equal records into at most one record.
548///
549/// Produces a stream of chains of records, partitioned according to `pact`. The
550/// data is sorted according to `Ba`. For each timestamp, it produces at most one chain.
551///
552/// The data are accumulated in place, each held back until their timestamp has completed.
553pub fn consolidate_pact<'scope, Ba, P>(
554    stream: Stream<'scope, Ba::Time, Ba::Input>,
555    pact: P,
556    name: &str,
557) -> StreamVec<'scope, Ba::Time, Vec<Ba::Output>>
558where
559    Ba: Batcher + 'static,
560    Ba::Input: Container + Clone + 'static,
561    Ba::Output: Clone,
562    P: ParallelizationContract<Ba::Time, Ba::Input>,
563{
564    let logger = stream
565        .scope()
566        .worker()
567        .logger_for("differential/arrange")
568        .map(Into::into);
569    stream.unary_frontier(pact, name, |_cap, info| {
570        // Acquire a logger for arrange events.
571
572        let mut batcher = Ba::new(logger, info.global_id);
573        // Capabilities for the lower envelope of updates in `batcher`.
574        let mut capabilities = Antichain::<Capability<Ba::Time>>::new();
575        let mut prev_frontier = Antichain::from_elem(Ba::Time::minimum());
576
577        move |(input, frontier), output| {
578            input.for_each(|cap, data| {
579                capabilities.insert(cap.retain(0));
580                batcher.push_container(data);
581            });
582
583            if prev_frontier.borrow() != frontier.frontier() {
584                if capabilities
585                    .elements()
586                    .iter()
587                    .any(|c| !frontier.less_equal(c.time()))
588                {
589                    let mut upper = Antichain::new(); // re-used allocation for sealing batches.
590
591                    // For each capability not in advance of the input frontier ...
592                    for (index, capability) in capabilities.elements().iter().enumerate() {
593                        if !frontier.less_equal(capability.time()) {
594                            // Assemble the upper bound on times we can commit with this capabilities.
595                            // We must respect the input frontier, and *subsequent* capabilities, as
596                            // we are pretending to retire the capability changes one by one.
597                            upper.clear();
598                            for time in frontier.frontier().iter() {
599                                upper.insert(time.clone());
600                            }
601                            for other_capability in &capabilities.elements()[(index + 1)..] {
602                                upper.insert(other_capability.time().clone());
603                            }
604
605                            // send the batch to downstream consumers, empty or not.
606                            let mut session = output.session(&capabilities.elements()[index]);
607                            // Extract updates not in advance of `upper`.
608                            let output =
609                                batcher.seal::<ConsolidateBuilder<_, Ba::Output>>(upper.clone());
610                            session.give(output);
611                        }
612                    }
613
614                    // Having extracted and sent batches between each capability and the input frontier,
615                    // we should downgrade all capabilities to match the batcher's lower update frontier.
616                    // This may involve discarding capabilities, which is fine as any new updates arrive
617                    // in messages with new capabilities.
618
619                    let mut new_capabilities = Antichain::new();
620                    for time in batcher.frontier().iter() {
621                        if let Some(capability) = capabilities
622                            .elements()
623                            .iter()
624                            .find(|c| c.time().less_equal(time))
625                        {
626                            new_capabilities.insert(capability.delayed(time));
627                        } else {
628                            panic!("failed to find capability");
629                        }
630                    }
631
632                    capabilities = new_capabilities;
633                }
634
635                prev_frontier.clear();
636                prev_frontier.extend(frontier.frontier().iter().cloned());
637            }
638        }
639    })
640}
641
642/// A builder that wraps a session for direct output to a stream.
643struct ConsolidateBuilder<T, I> {
644    _marker: PhantomData<(T, I)>,
645}
646
647impl<T, I> Builder for ConsolidateBuilder<T, I>
648where
649    T: Timestamp,
650    I: Clone,
651{
652    type Input = I;
653    type Time = T;
654    type Output = Vec<I>;
655
656    fn new() -> Self {
657        Self {
658            _marker: PhantomData,
659        }
660    }
661
662    fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
663        Self::new()
664    }
665
666    fn push(&mut self, _chunk: &mut Self::Input) {
667        unimplemented!("ConsolidateBuilder::push")
668    }
669
670    fn done(self, _: Description<Self::Time>) -> Self::Output {
671        unimplemented!("ConsolidateBuilder::done")
672    }
673
674    fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
675        std::mem::take(chain)
676    }
677}
678
679/// Merge the contents of multiple streams and combine the containers using a container builder.
680pub trait ConcatenateFlatten<'scope, T: Timestamp, C: Container + DrainContainer> {
681    /// Merge the contents of multiple streams and use the provided container builder to form
682    /// output containers.
683    ///
684    /// # Examples
685    /// ```
686    /// use timely::container::CapacityContainerBuilder;
687    /// use timely::dataflow::operators::{ToStream, Inspect};
688    /// use mz_timely_util::operator::ConcatenateFlatten;
689    ///
690    /// timely::example(|scope| {
691    ///
692    ///     let streams: Vec<timely::dataflow::StreamVec<_, i32>> =
693    ///         vec![(0..10).to_stream(scope),
694    ///              (0..10).to_stream(scope),
695    ///              (0..10).to_stream(scope)];
696    ///
697    ///     scope.concatenate_flatten::<_, CapacityContainerBuilder<Vec<i32>>>(streams)
698    ///          .inspect(|x| println!("seen: {:?}", x));
699    /// });
700    /// ```
701    fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<'scope, T, CB::Container>
702    where
703        I: IntoIterator<Item = Stream<'scope, T, C>>,
704        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>;
705}
706
707impl<'scope, T, C> ConcatenateFlatten<'scope, T, C> for Stream<'scope, T, C>
708where
709    T: Timestamp,
710    C: Container + DrainContainer + Clone + 'static,
711{
712    fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<'scope, T, CB::Container>
713    where
714        I: IntoIterator<Item = Stream<'scope, T, C>>,
715        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
716    {
717        self.scope()
718            .concatenate_flatten::<_, CB>(Some(Clone::clone(self)).into_iter().chain(sources))
719    }
720}
721
722impl<'scope, T, C> ConcatenateFlatten<'scope, T, C> for Scope<'scope, T>
723where
724    T: Timestamp,
725    C: Container + DrainContainer,
726{
727    fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<'scope, T, CB::Container>
728    where
729        I: IntoIterator<Item = Stream<'scope, T, C>>,
730        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
731    {
732        let mut builder = OperatorBuilder::new("ConcatenateFlatten".to_string(), self.clone());
733
734        // create new input handles for each input stream.
735        let mut handles = sources
736            .into_iter()
737            .map(|s| builder.new_input(s, Pipeline))
738            .collect::<Vec<_>>();
739        for i in 0..handles.len() {
740            builder.set_notify_for(i, FrontierInterest::Never);
741        }
742
743        // create one output handle for the concatenated results.
744        let (output, result) = builder.new_output::<CB::Container>();
745        let mut output = OutputBuilder::<_, CB>::from(output);
746
747        builder.build(move |_capability| {
748            move |_frontier| {
749                let mut output = output.activate();
750                for handle in handles.iter_mut() {
751                    handle.for_each_time(|time, data| {
752                        output
753                            .session_with_builder(&time)
754                            .give_iterator(data.flat_map(DrainContainer::drain));
755                    })
756                }
757            }
758        });
759
760        result
761    }
762}
763
764/// A trait for containers that can be cleared.
765pub trait ClearContainer {
766    /// Clear the contents of the container.
767    fn clear(&mut self);
768}
769
770impl<T> ClearContainer for Vec<T> {
771    fn clear(&mut self) {
772        Vec::clear(self)
773    }
774}