Skip to main content

mz_timely_util/
operator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Common operator transformations on timely streams and differential collections.
17
18use std::hash::{BuildHasher, Hash, Hasher};
19use std::marker::PhantomData;
20
21use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
22use differential_dataflow::containers::{Columnation, TimelyStack};
23use differential_dataflow::difference::{Multiply, Semigroup};
24use differential_dataflow::lattice::Lattice;
25use differential_dataflow::trace::{Batcher, Builder, Description};
26use differential_dataflow::{AsCollection, Collection, Hashable, VecCollection};
27use timely::container::{DrainContainer, PushInto};
28use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
29use timely::dataflow::operators::Capability;
30use timely::dataflow::operators::generic::builder_rc::{
31    OperatorBuilder as OperatorBuilderRc, OperatorBuilder,
32};
33use timely::dataflow::operators::generic::operator::{self, Operator};
34use timely::dataflow::operators::generic::{
35    InputHandleCore, OperatorInfo, OutputBuilder, OutputBuilderSession,
36};
37use timely::dataflow::{Scope, Stream, StreamVec};
38use timely::progress::{Antichain, Timestamp};
39use timely::{Container, ContainerBuilder, PartialOrder};
40
41/// Extension methods for timely [`Stream`]s.
42pub trait StreamExt<G, C1>
43where
44    C1: Container + DrainContainer + Clone + 'static,
45    G: Scope,
46{
47    /// Like `timely::dataflow::operators::generic::operator::Operator::unary`,
48    /// but the logic function can handle failures.
49    ///
50    /// Creates a new dataflow operator that partitions its input stream by a
51    /// parallelization strategy `pact` and repeatedly invokes `logic`, the
52    /// function returned by the function passed as `constructor`. The `logic`
53    /// function can read to the input stream and write to either of two output
54    /// streams, where the first output stream represents successful
55    /// computations and the second output stream represents failed
56    /// computations.
57    fn unary_fallible<DCB, ECB, B, P>(
58        self,
59        pact: P,
60        name: &str,
61        constructor: B,
62    ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
63    where
64        DCB: ContainerBuilder,
65        ECB: ContainerBuilder,
66        B: FnOnce(
67            Capability<G::Timestamp>,
68            OperatorInfo,
69        ) -> Box<
70            dyn FnMut(
71                    &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
72                    &mut OutputBuilderSession<'_, G::Timestamp, DCB>,
73                    &mut OutputBuilderSession<'_, G::Timestamp, ECB>,
74                ) + 'static,
75        >,
76        P: ParallelizationContract<G::Timestamp, C1>;
77
78    /// Like [`timely::dataflow::operators::vec::Map::flat_map`], but `logic`
79    /// is allowed to fail. The first returned stream will contain the
80    /// successful applications of `logic`, while the second returned stream
81    /// will contain the failed applications.
82    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
83        self,
84        name: &str,
85        logic: L,
86    ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
87    where
88        DCB: ContainerBuilder + PushInto<D2>,
89        ECB: ContainerBuilder + PushInto<E>,
90        I: IntoIterator<Item = Result<D2, E>>,
91        L: for<'a> FnMut(C1::Item<'a>) -> I + 'static;
92
93    /// Block progress of the frontier at `expiration` time
94    fn expire_stream_at(self, name: &str, expiration: G::Timestamp) -> Stream<G, C1>;
95}
96
97/// Extension methods for differential [`Collection`]s.
98pub trait CollectionExt<G, D1, R>: Sized
99where
100    G: Scope,
101    R: Semigroup,
102{
103    /// Creates a new empty collection in `scope`.
104    fn empty(scope: &G) -> VecCollection<G, D1, R>;
105
106    /// Like [`Collection::map`], but `logic` is allowed to fail. The first
107    /// returned collection will contain successful applications of `logic`,
108    /// while the second returned collection will contain the failed
109    /// applications.
110    ///
111    /// Callers need to specify the following type parameters:
112    /// * `DCB`: The container builder for the `Ok` output.
113    /// * `ECB`: The container builder for the `Err` output.
114    fn map_fallible<DCB, ECB, D2, E, L>(
115        self,
116        name: &str,
117        mut logic: L,
118    ) -> (VecCollection<G, D2, R>, VecCollection<G, E, R>)
119    where
120        DCB: ContainerBuilder<Container = Vec<(D2, G::Timestamp, R)>>
121            + PushInto<(D2, G::Timestamp, R)>,
122        ECB: ContainerBuilder<Container = Vec<(E, G::Timestamp, R)>>
123            + PushInto<(E, G::Timestamp, R)>,
124        D2: Clone + 'static,
125        E: Clone + 'static,
126        L: FnMut(D1) -> Result<D2, E> + 'static,
127    {
128        self.flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |record| Some(logic(record)))
129    }
130
131    /// Like [`Collection::flat_map`], but `logic` is allowed to fail. The first
132    /// returned collection will contain the successful applications of `logic`,
133    /// while the second returned collection will contain the failed
134    /// applications.
135    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
136        self,
137        name: &str,
138        logic: L,
139    ) -> (Collection<G, DCB::Container>, Collection<G, ECB::Container>)
140    where
141        DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
142        ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
143        D2: Clone + 'static,
144        E: Clone + 'static,
145        I: IntoIterator<Item = Result<D2, E>>,
146        L: FnMut(D1) -> I + 'static;
147
148    /// Block progress of the frontier at `expiration` time.
149    fn expire_collection_at(self, name: &str, expiration: G::Timestamp) -> VecCollection<G, D1, R>;
150
151    /// Replaces each record with another, with a new difference type.
152    ///
153    /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
154    /// and move the data into the difference component. This will allow differential dataflow to update in-place.
155    fn explode_one<D2, R2, L>(self, logic: L) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
156    where
157        D2: differential_dataflow::Data,
158        R2: Semigroup + Multiply<R>,
159        <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
160        L: FnMut(D1) -> (D2, R2) + 'static,
161        G::Timestamp: Lattice;
162
163    /// Partitions the input into a monotonic collection and
164    /// non-monotone exceptions, with respect to differences.
165    ///
166    /// The exceptions are transformed by `into_err`.
167    fn ensure_monotonic<E, IE>(
168        self,
169        into_err: IE,
170    ) -> (VecCollection<G, D1, R>, VecCollection<G, E, R>)
171    where
172        E: Clone + 'static,
173        IE: Fn(D1, R) -> (E, R) + 'static,
174        R: num_traits::sign::Signed;
175
176    /// Consolidates the collection if `must_consolidate` is `true` and leaves it
177    /// untouched otherwise.
178    fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
179    where
180        D1: differential_dataflow::ExchangeData + Hash + Columnation,
181        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
182        G::Timestamp: Lattice + Columnation,
183        Ba: Batcher<
184                Input = Vec<((D1, ()), G::Timestamp, R)>,
185                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
186                Time = G::Timestamp,
187            > + 'static;
188
189    /// Consolidates the collection.
190    fn consolidate_named<Ba>(self, name: &str) -> Self
191    where
192        D1: differential_dataflow::ExchangeData + Hash + Columnation,
193        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
194        G::Timestamp: Lattice + Columnation,
195        Ba: Batcher<
196                Input = Vec<((D1, ()), G::Timestamp, R)>,
197                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
198                Time = G::Timestamp,
199            > + 'static;
200}
201
202impl<G, C1> StreamExt<G, C1> for Stream<G, C1>
203where
204    C1: Container + DrainContainer + Clone + 'static,
205    G: Scope,
206{
207    fn unary_fallible<DCB, ECB, B, P>(
208        self,
209        pact: P,
210        name: &str,
211        constructor: B,
212    ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
213    where
214        DCB: ContainerBuilder,
215        ECB: ContainerBuilder,
216        B: FnOnce(
217            Capability<G::Timestamp>,
218            OperatorInfo,
219        ) -> Box<
220            dyn FnMut(
221                    &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
222                    &mut OutputBuilderSession<'_, G::Timestamp, DCB>,
223                    &mut OutputBuilderSession<'_, G::Timestamp, ECB>,
224                ) + 'static,
225        >,
226        P: ParallelizationContract<G::Timestamp, C1>,
227    {
228        let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
229        builder.set_notify(false);
230
231        let operator_info = builder.operator_info();
232
233        let mut input = builder.new_input(self.clone(), pact);
234        let (ok_output, ok_stream) = builder.new_output();
235        let mut ok_output = OutputBuilder::from(ok_output);
236        let (err_output, err_stream) = builder.new_output();
237        let mut err_output = OutputBuilder::from(err_output);
238
239        builder.build(move |mut capabilities| {
240            // `capabilities` should be a single-element vector.
241            let capability = capabilities.pop().unwrap();
242            let mut logic = constructor(capability, operator_info);
243            move |_frontiers| {
244                let mut ok_output_handle = ok_output.activate();
245                let mut err_output_handle = err_output.activate();
246                logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
247            }
248        });
249
250        (ok_stream, err_stream)
251    }
252
253    // XXX(guswynn): file an minimization bug report for the logic flat_map
254    // false positive here
255    // TODO(guswynn): remove this after https://github.com/rust-lang/rust-clippy/issues/8098 is
256    // resolved. The `logic` `FnMut` needs to be borrowed in the `flat_map` call, not moved in
257    // so the simple `|d1| logic(d1)` closure is load-bearing
258    #[allow(clippy::redundant_closure)]
259    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
260        self,
261        name: &str,
262        mut logic: L,
263    ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
264    where
265        DCB: ContainerBuilder + PushInto<D2>,
266        ECB: ContainerBuilder + PushInto<E>,
267        I: IntoIterator<Item = Result<D2, E>>,
268        L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,
269    {
270        self.unary_fallible::<DCB, ECB, _, _>(Pipeline, name, move |_, _| {
271            Box::new(move |input, ok_output, err_output| {
272                input.for_each_time(|time, data| {
273                    let mut ok_session = ok_output.session_with_builder(&time);
274                    let mut err_session = err_output.session_with_builder(&time);
275                    for r in data
276                        .flat_map(DrainContainer::drain)
277                        .flat_map(|d1| logic(d1))
278                    {
279                        match r {
280                            Ok(d2) => ok_session.give(d2),
281                            Err(e) => err_session.give(e),
282                        }
283                    }
284                })
285            })
286        })
287    }
288
289    fn expire_stream_at(self, name: &str, expiration: G::Timestamp) -> Stream<G, C1> {
290        let name = format!("expire_stream_at({name})");
291        self.unary_frontier(Pipeline, &name.clone(), move |cap, _| {
292            // Retain a capability for the expiration time, which we'll only drop if the token
293            // is dropped. Else, block progress at the expiration time to prevent downstream
294            // operators from making any statement about expiration time or any following time.
295            let cap = Some(cap.delayed(&expiration));
296            let mut warned = false;
297            move |(input, frontier), output| {
298                let _ = &cap;
299                let frontier = frontier.frontier();
300                if !frontier.less_than(&expiration) && !warned {
301                    // Here, we print a warning, not an error. The state is only a liveness
302                    // concern, but not relevant for correctness. Additionally, a race between
303                    // shutting down the dataflow and dropping the token can cause the dataflow
304                    // to shut down before we drop the token.  This can happen when dropping
305                    // the last remaining capability on a different worker.  We do not want to
306                    // log an error every time this happens.
307
308                    tracing::warn!(
309                        name = name,
310                        frontier = ?frontier,
311                        expiration = ?expiration,
312                        "frontier not less than expiration"
313                    );
314                    warned = true;
315                }
316                input.for_each(|time, data| {
317                    let mut session = output.session(&time);
318                    session.give_container(data);
319                });
320            }
321        })
322    }
323}
324
325impl<G, D1, R> CollectionExt<G, D1, R> for VecCollection<G, D1, R>
326where
327    G: Scope,
328    G::Timestamp: Clone + 'static,
329    D1: Clone + 'static,
330    R: Semigroup + 'static,
331{
332    fn empty(scope: &G) -> VecCollection<G, D1, R> {
333        operator::empty(scope).as_collection()
334    }
335
336    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
337        self,
338        name: &str,
339        mut logic: L,
340    ) -> (Collection<G, DCB::Container>, Collection<G, ECB::Container>)
341    where
342        DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
343        ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
344        D2: Clone + 'static,
345        E: Clone + 'static,
346        I: IntoIterator<Item = Result<D2, E>>,
347        L: FnMut(D1) -> I + 'static,
348    {
349        let (ok_stream, err_stream) =
350            self.inner
351                .flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |(d1, t, r)| {
352                    logic(d1).into_iter().map(move |res| match res {
353                        Ok(d2) => Ok((d2, t.clone(), r.clone())),
354                        Err(e) => Err((e, t.clone(), r.clone())),
355                    })
356                });
357        (ok_stream.as_collection(), err_stream.as_collection())
358    }
359
360    fn expire_collection_at(self, name: &str, expiration: G::Timestamp) -> VecCollection<G, D1, R> {
361        self.inner
362            .expire_stream_at(name, expiration)
363            .as_collection()
364    }
365
366    fn explode_one<D2, R2, L>(
367        self,
368        mut logic: L,
369    ) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
370    where
371        D2: differential_dataflow::Data,
372        R2: Semigroup + Multiply<R>,
373        <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
374        L: FnMut(D1) -> (D2, R2) + 'static,
375        G::Timestamp: Lattice,
376    {
377        self.inner
378            .clone()
379            .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(
380                Pipeline,
381                "ExplodeOne",
382                move |_, _| {
383                    move |input, output| {
384                        input.for_each(|time, data| {
385                            output
386                                .session_with_builder(&time)
387                                .give_iterator(data.drain(..).map(|(x, t, d)| {
388                                    let (x, d2) = logic(x);
389                                    (x, t, d2.multiply(&d))
390                                }));
391                        });
392                    }
393                },
394            )
395            .as_collection()
396    }
397
398    fn ensure_monotonic<E, IE>(
399        self,
400        into_err: IE,
401    ) -> (VecCollection<G, D1, R>, VecCollection<G, E, R>)
402    where
403        E: Clone + 'static,
404        IE: Fn(D1, R) -> (E, R) + 'static,
405        R: num_traits::sign::Signed,
406    {
407        let (oks, errs) = self
408            .inner
409            .unary_fallible(Pipeline, "EnsureMonotonic", move |_, _| {
410                Box::new(move |input, ok_output, err_output| {
411                    input.for_each(|time, data| {
412                        let mut ok_session = ok_output.session(&time);
413                        let mut err_session = err_output.session(&time);
414                        for (x, t, d) in data.drain(..) {
415                            if d.is_positive() {
416                                ok_session.give((x, t, d))
417                            } else {
418                                let (e, d2) = into_err(x, d);
419                                err_session.give((e, t, d2))
420                            }
421                        }
422                    })
423                })
424            });
425        (oks.as_collection(), errs.as_collection())
426    }
427
428    fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
429    where
430        D1: differential_dataflow::ExchangeData + Hash + Columnation,
431        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
432        G::Timestamp: Lattice + Ord + Columnation,
433        Ba: Batcher<
434                Input = Vec<((D1, ()), G::Timestamp, R)>,
435                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
436                Time = G::Timestamp,
437            > + 'static,
438    {
439        if must_consolidate {
440            // We employ AHash below instead of the default hasher in DD to obtain
441            // a better distribution of data to workers. AHash claims empirically
442            // both speed and high quality, according to
443            // https://github.com/tkaitchuck/aHash/blob/master/compare/readme.md.
444            // TODO(vmarcos): Consider here if it is worth it to spend the time to
445            // implement twisted tabulation hashing as proposed in Mihai Patrascu,
446            // Mikkel Thorup: Twisted Tabulation Hashing. SODA 2013: 209-228, available
447            // at https://epubs.siam.org/doi/epdf/10.1137/1.9781611973105.16. The latter
448            // would provide good bounds for balls-into-bins problems when the number of
449            // bins is small (as is our case), so we'd have a theoretical guarantee.
450            // NOTE: We fix the seeds of a RandomState instance explicity with the same
451            // seeds that would be given by `AHash` via ahash::AHasher::default() so as
452            // to avoid a different selection due to compile-time features being differently
453            // selected in other dependencies using `AHash` vis-à-vis cargo's strategy
454            // of unioning features.
455            // NOTE: Depending on target features, we may end up employing the fallback
456            // hasher of `AHash`, but it should be sufficient for our needs.
457            let random_state = ahash::RandomState::with_seeds(
458                0x243f_6a88_85a3_08d3,
459                0x1319_8a2e_0370_7344,
460                0xa409_3822_299f_31d0,
461                0x082e_fa98_ec4e_6c89,
462            );
463            let exchange = Exchange::new(move |update: &((D1, _), G::Timestamp, R)| {
464                let data = &(update.0).0;
465                let mut h = random_state.build_hasher();
466                data.hash(&mut h);
467                h.finish()
468            });
469            consolidate_pact::<Ba, _, _>(self.map(|k| (k, ())).inner, exchange, name)
470                .unary(Pipeline, "unpack consolidated", |_, _| {
471                    |input, output| {
472                        input.for_each(|time, data| {
473                            let mut session = output.session(&time);
474                            for ((k, ()), t, d) in
475                                data.iter().flatten().flat_map(|chunk| chunk.iter())
476                            {
477                                session.give((k.clone(), t.clone(), d.clone()))
478                            }
479                        })
480                    }
481                })
482                .as_collection()
483        } else {
484            self
485        }
486    }
487
488    fn consolidate_named<Ba>(self, name: &str) -> Self
489    where
490        D1: differential_dataflow::ExchangeData + Hash + Columnation,
491        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
492        G::Timestamp: Lattice + Ord + Columnation,
493        Ba: Batcher<
494                Input = Vec<((D1, ()), G::Timestamp, R)>,
495                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
496                Time = G::Timestamp,
497            > + 'static,
498    {
499        let exchange =
500            Exchange::new(move |update: &((D1, ()), G::Timestamp, R)| (update.0).0.hashed());
501
502        consolidate_pact::<Ba, _, _>(self.map(|k| (k, ())).inner, exchange, name)
503            .unary(Pipeline, &format!("Unpack {name}"), |_, _| {
504                |input, output| {
505                    input.for_each(|time, data| {
506                        let mut session = output.session(&time);
507                        for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
508                        {
509                            session.give((k.clone(), t.clone(), d.clone()))
510                        }
511                    })
512                }
513            })
514            .as_collection()
515    }
516}
517
518/// Aggregates the weights of equal records into at most one record.
519///
520/// Produces a stream of chains of records, partitioned according to `pact`. The
521/// data is sorted according to `Ba`. For each timestamp, it produces at most one chain.
522///
523/// The data are accumulated in place, each held back until their timestamp has completed.
524pub fn consolidate_pact<Ba, P, G>(
525    stream: Stream<G, Ba::Input>,
526    pact: P,
527    name: &str,
528) -> StreamVec<G, Vec<Ba::Output>>
529where
530    G: Scope,
531    Ba: Batcher<Time = G::Timestamp> + 'static,
532    Ba::Input: Container + Clone + 'static,
533    Ba::Output: Clone,
534    P: ParallelizationContract<G::Timestamp, Ba::Input>,
535{
536    let logger = stream
537        .scope()
538        .logger_for("differential/arrange")
539        .map(Into::into);
540    stream.unary_frontier(pact, name, |_cap, info| {
541        // Acquire a logger for arrange events.
542
543        let mut batcher = Ba::new(logger, info.global_id);
544        // Capabilities for the lower envelope of updates in `batcher`.
545        let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
546        let mut prev_frontier = Antichain::from_elem(G::Timestamp::minimum());
547
548        move |(input, frontier), output| {
549            input.for_each(|cap, data| {
550                capabilities.insert(cap.retain(0));
551                batcher.push_container(data);
552            });
553
554            if prev_frontier.borrow() != frontier.frontier() {
555                if capabilities
556                    .elements()
557                    .iter()
558                    .any(|c| !frontier.less_equal(c.time()))
559                {
560                    let mut upper = Antichain::new(); // re-used allocation for sealing batches.
561
562                    // For each capability not in advance of the input frontier ...
563                    for (index, capability) in capabilities.elements().iter().enumerate() {
564                        if !frontier.less_equal(capability.time()) {
565                            // Assemble the upper bound on times we can commit with this capabilities.
566                            // We must respect the input frontier, and *subsequent* capabilities, as
567                            // we are pretending to retire the capability changes one by one.
568                            upper.clear();
569                            for time in frontier.frontier().iter() {
570                                upper.insert(time.clone());
571                            }
572                            for other_capability in &capabilities.elements()[(index + 1)..] {
573                                upper.insert(other_capability.time().clone());
574                            }
575
576                            // send the batch to downstream consumers, empty or not.
577                            let mut session = output.session(&capabilities.elements()[index]);
578                            // Extract updates not in advance of `upper`.
579                            let output =
580                                batcher.seal::<ConsolidateBuilder<_, Ba::Output>>(upper.clone());
581                            session.give(output);
582                        }
583                    }
584
585                    // Having extracted and sent batches between each capability and the input frontier,
586                    // we should downgrade all capabilities to match the batcher's lower update frontier.
587                    // This may involve discarding capabilities, which is fine as any new updates arrive
588                    // in messages with new capabilities.
589
590                    let mut new_capabilities = Antichain::new();
591                    for time in batcher.frontier().iter() {
592                        if let Some(capability) = capabilities
593                            .elements()
594                            .iter()
595                            .find(|c| c.time().less_equal(time))
596                        {
597                            new_capabilities.insert(capability.delayed(time));
598                        } else {
599                            panic!("failed to find capability");
600                        }
601                    }
602
603                    capabilities = new_capabilities;
604                }
605
606                prev_frontier.clear();
607                prev_frontier.extend(frontier.frontier().iter().cloned());
608            }
609        }
610    })
611}
612
613/// A builder that wraps a session for direct output to a stream.
614struct ConsolidateBuilder<T, I> {
615    _marker: PhantomData<(T, I)>,
616}
617
618impl<T, I> Builder for ConsolidateBuilder<T, I>
619where
620    T: Timestamp,
621    I: Clone,
622{
623    type Input = I;
624    type Time = T;
625    type Output = Vec<I>;
626
627    fn new() -> Self {
628        Self {
629            _marker: PhantomData,
630        }
631    }
632
633    fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
634        Self::new()
635    }
636
637    fn push(&mut self, _chunk: &mut Self::Input) {
638        unimplemented!("ConsolidateBuilder::push")
639    }
640
641    fn done(self, _: Description<Self::Time>) -> Self::Output {
642        unimplemented!("ConsolidateBuilder::done")
643    }
644
645    fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
646        std::mem::take(chain)
647    }
648}
649
650/// Merge the contents of multiple streams and combine the containers using a container builder.
651pub trait ConcatenateFlatten<G: Scope, C: Container + DrainContainer> {
652    /// Merge the contents of multiple streams and use the provided container builder to form
653    /// output containers.
654    ///
655    /// # Examples
656    /// ```
657    /// use timely::container::CapacityContainerBuilder;
658    /// use timely::dataflow::operators::{ToStream, Inspect};
659    /// use mz_timely_util::operator::ConcatenateFlatten;
660    ///
661    /// timely::example(|scope| {
662    ///
663    ///     let streams = vec![(0..10).to_stream(scope),
664    ///                        (0..10).to_stream(scope),
665    ///                        (0..10).to_stream(scope)];
666    ///
667    ///     scope.concatenate_flatten::<_, CapacityContainerBuilder<Vec<_>>>(streams)
668    ///          .inspect(|x| println!("seen: {:?}", x));
669    /// });
670    /// ```
671    fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<G, CB::Container>
672    where
673        I: IntoIterator<Item = Stream<G, C>>,
674        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>;
675}
676
677impl<G, C> ConcatenateFlatten<G, C> for Stream<G, C>
678where
679    G: Scope,
680    C: Container + DrainContainer + Clone + 'static,
681{
682    fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<G, CB::Container>
683    where
684        I: IntoIterator<Item = Stream<G, C>>,
685        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
686    {
687        self.scope()
688            .concatenate_flatten::<_, CB>(Some(Clone::clone(self)).into_iter().chain(sources))
689    }
690}
691
692impl<G, C> ConcatenateFlatten<G, C> for G
693where
694    G: Scope,
695    C: Container + DrainContainer,
696{
697    fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<G, CB::Container>
698    where
699        I: IntoIterator<Item = Stream<G, C>>,
700        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
701    {
702        let mut builder = OperatorBuilder::new("ConcatenateFlatten".to_string(), self.clone());
703        builder.set_notify(false);
704
705        // create new input handles for each input stream.
706        let mut handles = sources
707            .into_iter()
708            .map(|s| builder.new_input(s, Pipeline))
709            .collect::<Vec<_>>();
710
711        // create one output handle for the concatenated results.
712        let (output, result) = builder.new_output::<CB::Container>();
713        let mut output = OutputBuilder::<_, CB>::from(output);
714
715        builder.build(move |_capability| {
716            move |_frontier| {
717                let mut output = output.activate();
718                for handle in handles.iter_mut() {
719                    handle.for_each_time(|time, data| {
720                        output
721                            .session_with_builder(&time)
722                            .give_iterator(data.flat_map(DrainContainer::drain));
723                    })
724                }
725            }
726        });
727
728        result
729    }
730}