Skip to main content

mz_timely_util/
operator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Common operator transformations on timely streams and differential collections.
17
18use std::hash::{BuildHasher, Hash, Hasher};
19use std::marker::PhantomData;
20
21use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
22use differential_dataflow::containers::{Columnation, TimelyStack};
23use differential_dataflow::difference::{Multiply, Semigroup};
24use differential_dataflow::lattice::Lattice;
25use differential_dataflow::trace::{Batcher, Builder, Description};
26use differential_dataflow::{AsCollection, Collection, Hashable, VecCollection};
27use timely::container::{DrainContainer, PushInto};
28use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
29use timely::dataflow::operators::Capability;
30use timely::dataflow::operators::generic::builder_rc::{
31    OperatorBuilder as OperatorBuilderRc, OperatorBuilder,
32};
33use timely::dataflow::operators::generic::operator::{self, Operator};
34use timely::dataflow::operators::generic::{
35    InputHandleCore, OperatorInfo, OutputBuilder, OutputBuilderSession,
36};
37use timely::dataflow::{Scope, Stream, StreamVec};
38use timely::progress::operate::FrontierInterest;
39use timely::progress::{Antichain, Timestamp};
40use timely::{Container, ContainerBuilder, PartialOrder};
41
42/// Extension methods for timely [`Stream`]s.
43pub trait StreamExt<G, C1>
44where
45    C1: Container + DrainContainer + Clone + 'static,
46    G: Scope,
47{
48    /// Like `timely::dataflow::operators::generic::operator::Operator::unary`,
49    /// but the logic function can handle failures.
50    ///
51    /// Creates a new dataflow operator that partitions its input stream by a
52    /// parallelization strategy `pact` and repeatedly invokes `logic`, the
53    /// function returned by the function passed as `constructor`. The `logic`
54    /// function can read to the input stream and write to either of two output
55    /// streams, where the first output stream represents successful
56    /// computations and the second output stream represents failed
57    /// computations.
58    fn unary_fallible<DCB, ECB, B, P>(
59        self,
60        pact: P,
61        name: &str,
62        constructor: B,
63    ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
64    where
65        DCB: ContainerBuilder,
66        ECB: ContainerBuilder,
67        B: FnOnce(
68            Capability<G::Timestamp>,
69            OperatorInfo,
70        ) -> Box<
71            dyn FnMut(
72                    &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
73                    &mut OutputBuilderSession<'_, G::Timestamp, DCB>,
74                    &mut OutputBuilderSession<'_, G::Timestamp, ECB>,
75                ) + 'static,
76        >,
77        P: ParallelizationContract<G::Timestamp, C1>;
78
79    /// Like [`timely::dataflow::operators::vec::Map::flat_map`], but `logic`
80    /// is allowed to fail. The first returned stream will contain the
81    /// successful applications of `logic`, while the second returned stream
82    /// will contain the failed applications.
83    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
84        self,
85        name: &str,
86        logic: L,
87    ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
88    where
89        DCB: ContainerBuilder + PushInto<D2>,
90        ECB: ContainerBuilder + PushInto<E>,
91        I: IntoIterator<Item = Result<D2, E>>,
92        L: for<'a> FnMut(C1::Item<'a>) -> I + 'static;
93
94    /// Block progress of the frontier at `expiration` time
95    fn expire_stream_at(self, name: &str, expiration: G::Timestamp) -> Stream<G, C1>;
96}
97
98/// Extension methods for differential [`Collection`]s.
99pub trait CollectionExt<G, D1, R>: Sized
100where
101    G: Scope,
102    R: Semigroup,
103{
104    /// Creates a new empty collection in `scope`.
105    fn empty(scope: &G) -> VecCollection<G, D1, R>;
106
107    /// Like [`Collection::map`], but `logic` is allowed to fail. The first
108    /// returned collection will contain successful applications of `logic`,
109    /// while the second returned collection will contain the failed
110    /// applications.
111    ///
112    /// Callers need to specify the following type parameters:
113    /// * `DCB`: The container builder for the `Ok` output.
114    /// * `ECB`: The container builder for the `Err` output.
115    fn map_fallible<DCB, ECB, D2, E, L>(
116        self,
117        name: &str,
118        mut logic: L,
119    ) -> (VecCollection<G, D2, R>, VecCollection<G, E, R>)
120    where
121        DCB: ContainerBuilder<Container = Vec<(D2, G::Timestamp, R)>>
122            + PushInto<(D2, G::Timestamp, R)>,
123        ECB: ContainerBuilder<Container = Vec<(E, G::Timestamp, R)>>
124            + PushInto<(E, G::Timestamp, R)>,
125        D2: Clone + 'static,
126        E: Clone + 'static,
127        L: FnMut(D1) -> Result<D2, E> + 'static,
128    {
129        self.flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |record| Some(logic(record)))
130    }
131
132    /// Like [`Collection::flat_map`], but `logic` is allowed to fail. The first
133    /// returned collection will contain the successful applications of `logic`,
134    /// while the second returned collection will contain the failed
135    /// applications.
136    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
137        self,
138        name: &str,
139        logic: L,
140    ) -> (Collection<G, DCB::Container>, Collection<G, ECB::Container>)
141    where
142        DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
143        ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
144        D2: Clone + 'static,
145        E: Clone + 'static,
146        I: IntoIterator<Item = Result<D2, E>>,
147        L: FnMut(D1) -> I + 'static;
148
149    /// Block progress of the frontier at `expiration` time.
150    fn expire_collection_at(self, name: &str, expiration: G::Timestamp) -> VecCollection<G, D1, R>;
151
152    /// Replaces each record with another, with a new difference type.
153    ///
154    /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
155    /// and move the data into the difference component. This will allow differential dataflow to update in-place.
156    fn explode_one<D2, R2, L>(self, logic: L) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
157    where
158        D2: differential_dataflow::Data,
159        R2: Semigroup + Multiply<R>,
160        <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
161        L: FnMut(D1) -> (D2, R2) + 'static,
162        G::Timestamp: Lattice;
163
164    /// Partitions the input into a monotonic collection and
165    /// non-monotone exceptions, with respect to differences.
166    ///
167    /// The exceptions are transformed by `into_err`.
168    fn ensure_monotonic<E, IE>(
169        self,
170        into_err: IE,
171    ) -> (VecCollection<G, D1, R>, VecCollection<G, E, R>)
172    where
173        E: Clone + 'static,
174        IE: Fn(D1, R) -> (E, R) + 'static,
175        R: num_traits::sign::Signed;
176
177    /// Consolidates the collection if `must_consolidate` is `true` and leaves it
178    /// untouched otherwise.
179    fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
180    where
181        D1: differential_dataflow::ExchangeData + Hash + Columnation,
182        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
183        G::Timestamp: Lattice + Columnation,
184        Ba: Batcher<
185                Input = Vec<((D1, ()), G::Timestamp, R)>,
186                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
187                Time = G::Timestamp,
188            > + 'static;
189
190    /// Consolidates the collection.
191    fn consolidate_named<Ba>(self, name: &str) -> Self
192    where
193        D1: differential_dataflow::ExchangeData + Hash + Columnation,
194        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
195        G::Timestamp: Lattice + Columnation,
196        Ba: Batcher<
197                Input = Vec<((D1, ()), G::Timestamp, R)>,
198                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
199                Time = G::Timestamp,
200            > + 'static;
201}
202
203impl<G, C1> StreamExt<G, C1> for Stream<G, C1>
204where
205    C1: Container + DrainContainer + Clone + 'static,
206    G: Scope,
207{
208    fn unary_fallible<DCB, ECB, B, P>(
209        self,
210        pact: P,
211        name: &str,
212        constructor: B,
213    ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
214    where
215        DCB: ContainerBuilder,
216        ECB: ContainerBuilder,
217        B: FnOnce(
218            Capability<G::Timestamp>,
219            OperatorInfo,
220        ) -> Box<
221            dyn FnMut(
222                    &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
223                    &mut OutputBuilderSession<'_, G::Timestamp, DCB>,
224                    &mut OutputBuilderSession<'_, G::Timestamp, ECB>,
225                ) + 'static,
226        >,
227        P: ParallelizationContract<G::Timestamp, C1>,
228    {
229        let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
230
231        let operator_info = builder.operator_info();
232
233        let mut input = builder.new_input(self.clone(), pact);
234        builder.set_notify_for(0, FrontierInterest::Never);
235        let (ok_output, ok_stream) = builder.new_output();
236        let mut ok_output = OutputBuilder::from(ok_output);
237        let (err_output, err_stream) = builder.new_output();
238        let mut err_output = OutputBuilder::from(err_output);
239
240        builder.build(move |mut capabilities| {
241            // `capabilities` should be a single-element vector.
242            let capability = capabilities.pop().unwrap();
243            let mut logic = constructor(capability, operator_info);
244            move |_frontiers| {
245                let mut ok_output_handle = ok_output.activate();
246                let mut err_output_handle = err_output.activate();
247                logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
248            }
249        });
250
251        (ok_stream, err_stream)
252    }
253
254    // XXX(guswynn): file an minimization bug report for the logic flat_map
255    // false positive here
256    // TODO(guswynn): remove this after https://github.com/rust-lang/rust-clippy/issues/8098 is
257    // resolved. The `logic` `FnMut` needs to be borrowed in the `flat_map` call, not moved in
258    // so the simple `|d1| logic(d1)` closure is load-bearing
259    #[allow(clippy::redundant_closure)]
260    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
261        self,
262        name: &str,
263        mut logic: L,
264    ) -> (Stream<G, DCB::Container>, Stream<G, ECB::Container>)
265    where
266        DCB: ContainerBuilder + PushInto<D2>,
267        ECB: ContainerBuilder + PushInto<E>,
268        I: IntoIterator<Item = Result<D2, E>>,
269        L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,
270    {
271        self.unary_fallible::<DCB, ECB, _, _>(Pipeline, name, move |_, _| {
272            Box::new(move |input, ok_output, err_output| {
273                input.for_each_time(|time, data| {
274                    let mut ok_session = ok_output.session_with_builder(&time);
275                    let mut err_session = err_output.session_with_builder(&time);
276                    for r in data
277                        .flat_map(DrainContainer::drain)
278                        .flat_map(|d1| logic(d1))
279                    {
280                        match r {
281                            Ok(d2) => ok_session.give(d2),
282                            Err(e) => err_session.give(e),
283                        }
284                    }
285                })
286            })
287        })
288    }
289
290    fn expire_stream_at(self, name: &str, expiration: G::Timestamp) -> Stream<G, C1> {
291        let name = format!("expire_stream_at({name})");
292        self.unary_frontier(Pipeline, &name.clone(), move |cap, _| {
293            // Retain a capability for the expiration time, which we'll only drop if the token
294            // is dropped. Else, block progress at the expiration time to prevent downstream
295            // operators from making any statement about expiration time or any following time.
296            let cap = Some(cap.delayed(&expiration));
297            let mut warned = false;
298            move |(input, frontier), output| {
299                let _ = &cap;
300                let frontier = frontier.frontier();
301                if !frontier.less_than(&expiration) && !warned {
302                    // Here, we print a warning, not an error. The state is only a liveness
303                    // concern, but not relevant for correctness. Additionally, a race between
304                    // shutting down the dataflow and dropping the token can cause the dataflow
305                    // to shut down before we drop the token.  This can happen when dropping
306                    // the last remaining capability on a different worker.  We do not want to
307                    // log an error every time this happens.
308
309                    tracing::warn!(
310                        name = name,
311                        frontier = ?frontier,
312                        expiration = ?expiration,
313                        "frontier not less than expiration"
314                    );
315                    warned = true;
316                }
317                input.for_each(|time, data| {
318                    let mut session = output.session(&time);
319                    session.give_container(data);
320                });
321            }
322        })
323    }
324}
325
326impl<G, D1, R> CollectionExt<G, D1, R> for VecCollection<G, D1, R>
327where
328    G: Scope,
329    G::Timestamp: Clone + 'static,
330    D1: Clone + 'static,
331    R: Semigroup + 'static,
332{
333    fn empty(scope: &G) -> VecCollection<G, D1, R> {
334        operator::empty(scope).as_collection()
335    }
336
337    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
338        self,
339        name: &str,
340        mut logic: L,
341    ) -> (Collection<G, DCB::Container>, Collection<G, ECB::Container>)
342    where
343        DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
344        ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
345        D2: Clone + 'static,
346        E: Clone + 'static,
347        I: IntoIterator<Item = Result<D2, E>>,
348        L: FnMut(D1) -> I + 'static,
349    {
350        let (ok_stream, err_stream) =
351            self.inner
352                .flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |(d1, t, r)| {
353                    logic(d1).into_iter().map(move |res| match res {
354                        Ok(d2) => Ok((d2, t.clone(), r.clone())),
355                        Err(e) => Err((e, t.clone(), r.clone())),
356                    })
357                });
358        (ok_stream.as_collection(), err_stream.as_collection())
359    }
360
361    fn expire_collection_at(self, name: &str, expiration: G::Timestamp) -> VecCollection<G, D1, R> {
362        self.inner
363            .expire_stream_at(name, expiration)
364            .as_collection()
365    }
366
367    fn explode_one<D2, R2, L>(
368        self,
369        mut logic: L,
370    ) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
371    where
372        D2: differential_dataflow::Data,
373        R2: Semigroup + Multiply<R>,
374        <R2 as Multiply<R>>::Output: Clone + 'static + Semigroup,
375        L: FnMut(D1) -> (D2, R2) + 'static,
376        G::Timestamp: Lattice,
377    {
378        self.inner
379            .clone()
380            .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(
381                Pipeline,
382                "ExplodeOne",
383                move |_, _| {
384                    move |input, output| {
385                        input.for_each(|time, data| {
386                            output
387                                .session_with_builder(&time)
388                                .give_iterator(data.drain(..).map(|(x, t, d)| {
389                                    let (x, d2) = logic(x);
390                                    (x, t, d2.multiply(&d))
391                                }));
392                        });
393                    }
394                },
395            )
396            .as_collection()
397    }
398
399    fn ensure_monotonic<E, IE>(
400        self,
401        into_err: IE,
402    ) -> (VecCollection<G, D1, R>, VecCollection<G, E, R>)
403    where
404        E: Clone + 'static,
405        IE: Fn(D1, R) -> (E, R) + 'static,
406        R: num_traits::sign::Signed,
407    {
408        let (oks, errs) = self
409            .inner
410            .unary_fallible(Pipeline, "EnsureMonotonic", move |_, _| {
411                Box::new(move |input, ok_output, err_output| {
412                    input.for_each(|time, data| {
413                        let mut ok_session = ok_output.session(&time);
414                        let mut err_session = err_output.session(&time);
415                        for (x, t, d) in data.drain(..) {
416                            if d.is_positive() {
417                                ok_session.give((x, t, d))
418                            } else {
419                                let (e, d2) = into_err(x, d);
420                                err_session.give((e, t, d2))
421                            }
422                        }
423                    })
424                })
425            });
426        (oks.as_collection(), errs.as_collection())
427    }
428
429    fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
430    where
431        D1: differential_dataflow::ExchangeData + Hash + Columnation,
432        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
433        G::Timestamp: Lattice + Ord + Columnation,
434        Ba: Batcher<
435                Input = Vec<((D1, ()), G::Timestamp, R)>,
436                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
437                Time = G::Timestamp,
438            > + 'static,
439    {
440        if must_consolidate {
441            // We employ AHash below instead of the default hasher in DD to obtain
442            // a better distribution of data to workers. AHash claims empirically
443            // both speed and high quality, according to
444            // https://github.com/tkaitchuck/aHash/blob/master/compare/readme.md.
445            // TODO(vmarcos): Consider here if it is worth it to spend the time to
446            // implement twisted tabulation hashing as proposed in Mihai Patrascu,
447            // Mikkel Thorup: Twisted Tabulation Hashing. SODA 2013: 209-228, available
448            // at https://epubs.siam.org/doi/epdf/10.1137/1.9781611973105.16. The latter
449            // would provide good bounds for balls-into-bins problems when the number of
450            // bins is small (as is our case), so we'd have a theoretical guarantee.
451            // NOTE: We fix the seeds of a RandomState instance explicity with the same
452            // seeds that would be given by `AHash` via ahash::AHasher::default() so as
453            // to avoid a different selection due to compile-time features being differently
454            // selected in other dependencies using `AHash` vis-à-vis cargo's strategy
455            // of unioning features.
456            // NOTE: Depending on target features, we may end up employing the fallback
457            // hasher of `AHash`, but it should be sufficient for our needs.
458            let random_state = ahash::RandomState::with_seeds(
459                0x243f_6a88_85a3_08d3,
460                0x1319_8a2e_0370_7344,
461                0xa409_3822_299f_31d0,
462                0x082e_fa98_ec4e_6c89,
463            );
464            let exchange = Exchange::new(move |update: &((D1, _), G::Timestamp, R)| {
465                let data = &(update.0).0;
466                let mut h = random_state.build_hasher();
467                data.hash(&mut h);
468                h.finish()
469            });
470            consolidate_pact::<Ba, _, _>(self.map(|k| (k, ())).inner, exchange, name)
471                .unary(Pipeline, "unpack consolidated", |_, _| {
472                    |input, output| {
473                        input.for_each(|time, data| {
474                            let mut session = output.session(&time);
475                            for ((k, ()), t, d) in
476                                data.iter().flatten().flat_map(|chunk| chunk.iter())
477                            {
478                                session.give((k.clone(), t.clone(), d.clone()))
479                            }
480                        })
481                    }
482                })
483                .as_collection()
484        } else {
485            self
486        }
487    }
488
489    fn consolidate_named<Ba>(self, name: &str) -> Self
490    where
491        D1: differential_dataflow::ExchangeData + Hash + Columnation,
492        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
493        G::Timestamp: Lattice + Ord + Columnation,
494        Ba: Batcher<
495                Input = Vec<((D1, ()), G::Timestamp, R)>,
496                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
497                Time = G::Timestamp,
498            > + 'static,
499    {
500        let exchange =
501            Exchange::new(move |update: &((D1, ()), G::Timestamp, R)| (update.0).0.hashed());
502
503        consolidate_pact::<Ba, _, _>(self.map(|k| (k, ())).inner, exchange, name)
504            .unary(Pipeline, &format!("Unpack {name}"), |_, _| {
505                |input, output| {
506                    input.for_each(|time, data| {
507                        let mut session = output.session(&time);
508                        for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
509                        {
510                            session.give((k.clone(), t.clone(), d.clone()))
511                        }
512                    })
513                }
514            })
515            .as_collection()
516    }
517}
518
519/// Aggregates the weights of equal records into at most one record.
520///
521/// Produces a stream of chains of records, partitioned according to `pact`. The
522/// data is sorted according to `Ba`. For each timestamp, it produces at most one chain.
523///
524/// The data are accumulated in place, each held back until their timestamp has completed.
525pub fn consolidate_pact<Ba, P, G>(
526    stream: Stream<G, Ba::Input>,
527    pact: P,
528    name: &str,
529) -> StreamVec<G, Vec<Ba::Output>>
530where
531    G: Scope,
532    Ba: Batcher<Time = G::Timestamp> + 'static,
533    Ba::Input: Container + Clone + 'static,
534    Ba::Output: Clone,
535    P: ParallelizationContract<G::Timestamp, Ba::Input>,
536{
537    let logger = stream
538        .scope()
539        .logger_for("differential/arrange")
540        .map(Into::into);
541    stream.unary_frontier(pact, name, |_cap, info| {
542        // Acquire a logger for arrange events.
543
544        let mut batcher = Ba::new(logger, info.global_id);
545        // Capabilities for the lower envelope of updates in `batcher`.
546        let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
547        let mut prev_frontier = Antichain::from_elem(G::Timestamp::minimum());
548
549        move |(input, frontier), output| {
550            input.for_each(|cap, data| {
551                capabilities.insert(cap.retain(0));
552                batcher.push_container(data);
553            });
554
555            if prev_frontier.borrow() != frontier.frontier() {
556                if capabilities
557                    .elements()
558                    .iter()
559                    .any(|c| !frontier.less_equal(c.time()))
560                {
561                    let mut upper = Antichain::new(); // re-used allocation for sealing batches.
562
563                    // For each capability not in advance of the input frontier ...
564                    for (index, capability) in capabilities.elements().iter().enumerate() {
565                        if !frontier.less_equal(capability.time()) {
566                            // Assemble the upper bound on times we can commit with this capabilities.
567                            // We must respect the input frontier, and *subsequent* capabilities, as
568                            // we are pretending to retire the capability changes one by one.
569                            upper.clear();
570                            for time in frontier.frontier().iter() {
571                                upper.insert(time.clone());
572                            }
573                            for other_capability in &capabilities.elements()[(index + 1)..] {
574                                upper.insert(other_capability.time().clone());
575                            }
576
577                            // send the batch to downstream consumers, empty or not.
578                            let mut session = output.session(&capabilities.elements()[index]);
579                            // Extract updates not in advance of `upper`.
580                            let output =
581                                batcher.seal::<ConsolidateBuilder<_, Ba::Output>>(upper.clone());
582                            session.give(output);
583                        }
584                    }
585
586                    // Having extracted and sent batches between each capability and the input frontier,
587                    // we should downgrade all capabilities to match the batcher's lower update frontier.
588                    // This may involve discarding capabilities, which is fine as any new updates arrive
589                    // in messages with new capabilities.
590
591                    let mut new_capabilities = Antichain::new();
592                    for time in batcher.frontier().iter() {
593                        if let Some(capability) = capabilities
594                            .elements()
595                            .iter()
596                            .find(|c| c.time().less_equal(time))
597                        {
598                            new_capabilities.insert(capability.delayed(time));
599                        } else {
600                            panic!("failed to find capability");
601                        }
602                    }
603
604                    capabilities = new_capabilities;
605                }
606
607                prev_frontier.clear();
608                prev_frontier.extend(frontier.frontier().iter().cloned());
609            }
610        }
611    })
612}
613
614/// A builder that wraps a session for direct output to a stream.
615struct ConsolidateBuilder<T, I> {
616    _marker: PhantomData<(T, I)>,
617}
618
619impl<T, I> Builder for ConsolidateBuilder<T, I>
620where
621    T: Timestamp,
622    I: Clone,
623{
624    type Input = I;
625    type Time = T;
626    type Output = Vec<I>;
627
628    fn new() -> Self {
629        Self {
630            _marker: PhantomData,
631        }
632    }
633
634    fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
635        Self::new()
636    }
637
638    fn push(&mut self, _chunk: &mut Self::Input) {
639        unimplemented!("ConsolidateBuilder::push")
640    }
641
642    fn done(self, _: Description<Self::Time>) -> Self::Output {
643        unimplemented!("ConsolidateBuilder::done")
644    }
645
646    fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
647        std::mem::take(chain)
648    }
649}
650
651/// Merge the contents of multiple streams and combine the containers using a container builder.
652pub trait ConcatenateFlatten<G: Scope, C: Container + DrainContainer> {
653    /// Merge the contents of multiple streams and use the provided container builder to form
654    /// output containers.
655    ///
656    /// # Examples
657    /// ```
658    /// use timely::container::CapacityContainerBuilder;
659    /// use timely::dataflow::operators::{ToStream, Inspect};
660    /// use mz_timely_util::operator::ConcatenateFlatten;
661    ///
662    /// timely::example(|scope| {
663    ///
664    ///     let streams = vec![(0..10).to_stream(scope),
665    ///                        (0..10).to_stream(scope),
666    ///                        (0..10).to_stream(scope)];
667    ///
668    ///     scope.concatenate_flatten::<_, CapacityContainerBuilder<Vec<_>>>(streams)
669    ///          .inspect(|x| println!("seen: {:?}", x));
670    /// });
671    /// ```
672    fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<G, CB::Container>
673    where
674        I: IntoIterator<Item = Stream<G, C>>,
675        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>;
676}
677
678impl<G, C> ConcatenateFlatten<G, C> for Stream<G, C>
679where
680    G: Scope,
681    C: Container + DrainContainer + Clone + 'static,
682{
683    fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<G, CB::Container>
684    where
685        I: IntoIterator<Item = Stream<G, C>>,
686        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
687    {
688        self.scope()
689            .concatenate_flatten::<_, CB>(Some(Clone::clone(self)).into_iter().chain(sources))
690    }
691}
692
693impl<G, C> ConcatenateFlatten<G, C> for G
694where
695    G: Scope,
696    C: Container + DrainContainer,
697{
698    fn concatenate_flatten<I, CB>(&self, sources: I) -> Stream<G, CB::Container>
699    where
700        I: IntoIterator<Item = Stream<G, C>>,
701        CB: ContainerBuilder + for<'a> PushInto<C::Item<'a>>,
702    {
703        let mut builder = OperatorBuilder::new("ConcatenateFlatten".to_string(), self.clone());
704
705        // create new input handles for each input stream.
706        let mut handles = sources
707            .into_iter()
708            .map(|s| builder.new_input(s, Pipeline))
709            .collect::<Vec<_>>();
710        for i in 0..handles.len() {
711            builder.set_notify_for(i, FrontierInterest::Never);
712        }
713
714        // create one output handle for the concatenated results.
715        let (output, result) = builder.new_output::<CB::Container>();
716        let mut output = OutputBuilder::<_, CB>::from(output);
717
718        builder.build(move |_capability| {
719            move |_frontier| {
720                let mut output = output.activate();
721                for handle in handles.iter_mut() {
722                    handle.for_each_time(|time, data| {
723                        output
724                            .session_with_builder(&time)
725                            .give_iterator(data.flat_map(DrainContainer::drain));
726                    })
727                }
728            }
729        });
730
731        result
732    }
733}