mz_timely_util/
operator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Common operator transformations on timely streams and differential collections.
17
18use std::future::Future;
19use std::hash::{BuildHasher, Hash, Hasher};
20use std::marker::PhantomData;
21use std::rc::Weak;
22
23use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
24use differential_dataflow::containers::{Columnation, TimelyStack};
25use differential_dataflow::difference::{Multiply, Semigroup};
26use differential_dataflow::lattice::Lattice;
27use differential_dataflow::logging::DifferentialEventBuilder;
28use differential_dataflow::trace::{Batcher, Builder, Description};
29use differential_dataflow::{AsCollection, Collection, Hashable};
30use timely::container::{ContainerBuilder, PushInto};
31use timely::dataflow::channels::ContainerBytes;
32use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
33use timely::dataflow::channels::pushers::Tee;
34use timely::dataflow::operators::Capability;
35use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
36use timely::dataflow::operators::generic::operator::{self, Operator};
37use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo, OutputHandleCore};
38use timely::dataflow::{Scope, Stream, StreamCore};
39use timely::progress::{Antichain, Timestamp};
40use timely::{Container, Data, PartialOrder};
41
42use crate::builder_async::{
43    AsyncInputHandle, AsyncOutputHandle, ConnectedToOne, Disconnected,
44    OperatorBuilder as OperatorBuilderAsync,
45};
46
47/// Extension methods for timely [`StreamCore`]s.
48pub trait StreamExt<G, C1>
49where
50    C1: Container,
51    G: Scope,
52{
53    /// Like `timely::dataflow::operators::generic::operator::Operator::unary`,
54    /// but the logic function can handle failures.
55    ///
56    /// Creates a new dataflow operator that partitions its input stream by a
57    /// parallelization strategy `pact` and repeatedly invokes `logic`, the
58    /// function returned by the function passed as `constructor`. The `logic`
59    /// function can read to the input stream and write to either of two output
60    /// streams, where the first output stream represents successful
61    /// computations and the second output stream represents failed
62    /// computations.
63    fn unary_fallible<DCB, ECB, B, P>(
64        &self,
65        pact: P,
66        name: &str,
67        constructor: B,
68    ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
69    where
70        DCB: ContainerBuilder,
71        ECB: ContainerBuilder,
72        B: FnOnce(
73            Capability<G::Timestamp>,
74            OperatorInfo,
75        ) -> Box<
76            dyn FnMut(
77                    &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
78                    &mut OutputHandleCore<G::Timestamp, DCB, Tee<G::Timestamp, DCB::Container>>,
79                    &mut OutputHandleCore<G::Timestamp, ECB, Tee<G::Timestamp, ECB::Container>>,
80                ) + 'static,
81        >,
82        P: ParallelizationContract<G::Timestamp, C1>;
83
84    /// Creates a new dataflow operator that partitions its input stream by a parallelization
85    /// strategy pact, and repeatedly schedules logic, the future returned by the function passed
86    /// as constructor. logic can read from the input stream, and write to the output stream.
87    fn unary_async<CB, P, B, BFut>(
88        &self,
89        pact: P,
90        name: String,
91        constructor: B,
92    ) -> StreamCore<G, CB::Container>
93    where
94        CB: ContainerBuilder,
95        B: FnOnce(
96            Capability<G::Timestamp>,
97            OperatorInfo,
98            AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>,
99            AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
100        ) -> BFut,
101        BFut: Future + 'static,
102        P: ParallelizationContract<G::Timestamp, C1>;
103
104    /// Creates a new dataflow operator that partitions its input streams by a parallelization
105    /// strategy pact, and repeatedly schedules logic, the future returned by the function passed
106    /// as constructor. logic can read from the input streams, and write to the output stream.
107    fn binary_async<C2, CB, P1, P2, B, BFut>(
108        &self,
109        other: &StreamCore<G, C2>,
110        pact1: P1,
111        pact2: P2,
112        name: String,
113        constructor: B,
114    ) -> StreamCore<G, CB::Container>
115    where
116        C2: Container + 'static,
117        CB: ContainerBuilder,
118        B: FnOnce(
119            Capability<G::Timestamp>,
120            OperatorInfo,
121            AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>,
122            AsyncInputHandle<G::Timestamp, C2, ConnectedToOne>,
123            AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
124        ) -> BFut,
125        BFut: Future + 'static,
126        P1: ParallelizationContract<G::Timestamp, C1>,
127        P2: ParallelizationContract<G::Timestamp, C2>;
128
129    /// Creates a new dataflow operator that partitions its input stream by a parallelization
130    /// strategy pact, and repeatedly schedules logic which can read from the input stream and
131    /// inspect the frontier at the input.
132    fn sink_async<P, B, BFut>(&self, pact: P, name: String, constructor: B)
133    where
134        B: FnOnce(OperatorInfo, AsyncInputHandle<G::Timestamp, C1, Disconnected>) -> BFut,
135        BFut: Future + 'static,
136        P: ParallelizationContract<G::Timestamp, C1>;
137
138    /// Like [`timely::dataflow::operators::map::Map::map`], but `logic`
139    /// is allowed to fail. The first returned stream will contain the
140    /// successful applications of `logic`, while the second returned stream
141    /// will contain the failed applications.
142    fn map_fallible<DCB, ECB, D2, E, L>(
143        &self,
144        name: &str,
145        mut logic: L,
146    ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
147    where
148        DCB: ContainerBuilder + PushInto<D2>,
149        ECB: ContainerBuilder + PushInto<E>,
150        L: for<'a> FnMut(C1::Item<'a>) -> Result<D2, E> + 'static,
151    {
152        self.flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |record| Some(logic(record)))
153    }
154
155    /// Like [`timely::dataflow::operators::map::Map::flat_map`], but `logic`
156    /// is allowed to fail. The first returned stream will contain the
157    /// successful applications of `logic`, while the second returned stream
158    /// will contain the failed applications.
159    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
160        &self,
161        name: &str,
162        logic: L,
163    ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
164    where
165        DCB: ContainerBuilder + PushInto<D2>,
166        ECB: ContainerBuilder + PushInto<E>,
167        I: IntoIterator<Item = Result<D2, E>>,
168        L: for<'a> FnMut(C1::Item<'a>) -> I + 'static;
169
170    /// Block progress of the frontier at `expiration` time, unless the token is dropped.
171    fn expire_stream_at(
172        &self,
173        name: &str,
174        expiration: G::Timestamp,
175        token: Weak<()>,
176    ) -> StreamCore<G, C1>;
177
178    /// Take a Timely stream and convert it to a Differential stream, where each diff is "1"
179    /// and each time is the current Timely timestamp.
180    fn pass_through<CB, R>(&self, name: &str, unit: R) -> StreamCore<G, CB::Container>
181    where
182        CB: ContainerBuilder + for<'a> PushInto<(C1::Item<'a>, G::Timestamp, R)>,
183        R: Data;
184
185    /// Wraps the stream with an operator that passes through all received inputs as long as the
186    /// provided token can be upgraded. Once the token cannot be upgraded anymore, all data flowing
187    /// into the operator is dropped.
188    fn with_token(&self, token: Weak<()>) -> StreamCore<G, C1>;
189
190    /// Distributes the data of the stream to all workers in a round-robin fashion.
191    fn distribute(&self) -> StreamCore<G, C1>
192    where
193        C1: ContainerBytes + Send;
194}
195
196/// Extension methods for differential [`Collection`]s.
197pub trait CollectionExt<G, D1, R>
198where
199    G: Scope,
200    R: Semigroup,
201{
202    /// Creates a new empty collection in `scope`.
203    fn empty(scope: &G) -> Collection<G, D1, R>;
204
205    /// Like [`Collection::map`], but `logic` is allowed to fail. The first
206    /// returned collection will contain successful applications of `logic`,
207    /// while the second returned collection will contain the failed
208    /// applications.
209    ///
210    /// Callers need to specify the following type parameters:
211    /// * `DCB`: The container builder for the `Ok` output.
212    /// * `ECB`: The container builder for the `Err` output.
213    fn map_fallible<DCB, ECB, D2, E, L>(
214        &self,
215        name: &str,
216        mut logic: L,
217    ) -> (Collection<G, D2, R>, Collection<G, E, R>)
218    where
219        DCB: ContainerBuilder<Container = Vec<(D2, G::Timestamp, R)>>
220            + PushInto<(D2, G::Timestamp, R)>,
221        ECB: ContainerBuilder<Container = Vec<(E, G::Timestamp, R)>>
222            + PushInto<(E, G::Timestamp, R)>,
223        D2: Data,
224        E: Data,
225        L: FnMut(D1) -> Result<D2, E> + 'static,
226    {
227        self.flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |record| Some(logic(record)))
228    }
229
230    /// Like [`Collection::flat_map`], but `logic` is allowed to fail. The first
231    /// returned collection will contain the successful applications of `logic`,
232    /// while the second returned collection will contain the failed
233    /// applications.
234    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
235        &self,
236        name: &str,
237        logic: L,
238    ) -> (
239        Collection<G, D2, R, DCB::Container>,
240        Collection<G, E, R, ECB::Container>,
241    )
242    where
243        DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
244        ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
245        D2: Data,
246        E: Data,
247        I: IntoIterator<Item = Result<D2, E>>,
248        L: FnMut(D1) -> I + 'static;
249
250    /// Block progress of the frontier at `expiration` time, unless the token is dropped.
251    fn expire_collection_at(
252        &self,
253        name: &str,
254        expiration: G::Timestamp,
255        token: Weak<()>,
256    ) -> Collection<G, D1, R>;
257
258    /// Replaces each record with another, with a new difference type.
259    ///
260    /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
261    /// and move the data into the difference component. This will allow differential dataflow to update in-place.
262    fn explode_one<D2, R2, L>(&self, logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
263    where
264        D2: differential_dataflow::Data,
265        R2: Semigroup + Multiply<R>,
266        <R2 as Multiply<R>>::Output: Data + Semigroup,
267        L: FnMut(D1) -> (D2, R2) + 'static,
268        G::Timestamp: Lattice;
269
270    /// Partitions the input into a monotonic collection and
271    /// non-monotone exceptions, with respect to differences.
272    ///
273    /// The exceptions are transformed by `into_err`.
274    fn ensure_monotonic<E, IE>(&self, into_err: IE) -> (Collection<G, D1, R>, Collection<G, E, R>)
275    where
276        E: Data,
277        IE: Fn(D1, R) -> (E, R) + 'static,
278        R: num_traits::sign::Signed;
279
280    /// Wraps the collection with an operator that passes through all received inputs as long as
281    /// the provided token can be upgraded. Once the token cannot be upgraded anymore, all data
282    /// flowing into the operator is dropped.
283    fn with_token(&self, token: Weak<()>) -> Collection<G, D1, R>;
284
285    /// Consolidates the collection if `must_consolidate` is `true` and leaves it
286    /// untouched otherwise.
287    fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
288    where
289        D1: differential_dataflow::ExchangeData + Hash + Columnation,
290        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
291        G::Timestamp: Lattice + Columnation,
292        Ba: Batcher<
293                Input = Vec<((D1, ()), G::Timestamp, R)>,
294                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
295                Time = G::Timestamp,
296            > + 'static;
297
298    /// Consolidates the collection.
299    fn consolidate_named<Ba>(self, name: &str) -> Self
300    where
301        D1: differential_dataflow::ExchangeData + Hash + Columnation,
302        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
303        G::Timestamp: Lattice + Columnation,
304        Ba: Batcher<
305                Input = Vec<((D1, ()), G::Timestamp, R)>,
306                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
307                Time = G::Timestamp,
308            > + 'static;
309}
310
311impl<G, C1> StreamExt<G, C1> for StreamCore<G, C1>
312where
313    C1: Container + Data,
314    G: Scope,
315{
316    fn unary_fallible<DCB, ECB, B, P>(
317        &self,
318        pact: P,
319        name: &str,
320        constructor: B,
321    ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
322    where
323        DCB: ContainerBuilder,
324        ECB: ContainerBuilder,
325        B: FnOnce(
326            Capability<G::Timestamp>,
327            OperatorInfo,
328        ) -> Box<
329            dyn FnMut(
330                    &mut InputHandleCore<G::Timestamp, C1, P::Puller>,
331                    &mut OutputHandleCore<G::Timestamp, DCB, Tee<G::Timestamp, DCB::Container>>,
332                    &mut OutputHandleCore<G::Timestamp, ECB, Tee<G::Timestamp, ECB::Container>>,
333                ) + 'static,
334        >,
335        P: ParallelizationContract<G::Timestamp, C1>,
336    {
337        let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
338        builder.set_notify(false);
339
340        let operator_info = builder.operator_info();
341
342        let mut input = builder.new_input(self, pact);
343        let (mut ok_output, ok_stream) = builder.new_output();
344        let (mut err_output, err_stream) = builder.new_output();
345
346        builder.build(move |mut capabilities| {
347            // `capabilities` should be a single-element vector.
348            let capability = capabilities.pop().unwrap();
349            let mut logic = constructor(capability, operator_info);
350            move |_frontiers| {
351                let mut ok_output_handle = ok_output.activate();
352                let mut err_output_handle = err_output.activate();
353                logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
354            }
355        });
356
357        (ok_stream, err_stream)
358    }
359
360    fn unary_async<CB, P, B, BFut>(
361        &self,
362        pact: P,
363        name: String,
364        constructor: B,
365    ) -> StreamCore<G, CB::Container>
366    where
367        CB: ContainerBuilder,
368        B: FnOnce(
369            Capability<G::Timestamp>,
370            OperatorInfo,
371            AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>,
372            AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
373        ) -> BFut,
374        BFut: Future + 'static,
375        P: ParallelizationContract<G::Timestamp, C1>,
376    {
377        let mut builder = OperatorBuilderAsync::new(name, self.scope());
378        let operator_info = builder.operator_info();
379
380        let (output, stream) = builder.new_output();
381        let input = builder.new_input_for(self, pact, &output);
382
383        builder.build(move |mut capabilities| {
384            // `capabilities` should be a single-element vector.
385            let capability = capabilities.pop().unwrap();
386            constructor(capability, operator_info, input, output)
387        });
388
389        stream
390    }
391
392    fn binary_async<C2, CB, P1, P2, B, BFut>(
393        &self,
394        other: &StreamCore<G, C2>,
395        pact1: P1,
396        pact2: P2,
397        name: String,
398        constructor: B,
399    ) -> StreamCore<G, CB::Container>
400    where
401        C2: Container + 'static,
402        CB: ContainerBuilder,
403        B: FnOnce(
404            Capability<G::Timestamp>,
405            OperatorInfo,
406            AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>,
407            AsyncInputHandle<G::Timestamp, C2, ConnectedToOne>,
408            AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
409        ) -> BFut,
410        BFut: Future + 'static,
411        P1: ParallelizationContract<G::Timestamp, C1>,
412        P2: ParallelizationContract<G::Timestamp, C2>,
413    {
414        let mut builder = OperatorBuilderAsync::new(name, self.scope());
415        let operator_info = builder.operator_info();
416
417        let (output, stream) = builder.new_output();
418        let input1 = builder.new_input_for(self, pact1, &output);
419        let input2 = builder.new_input_for(other, pact2, &output);
420
421        builder.build(move |mut capabilities| {
422            // `capabilities` should be a single-element vector.
423            let capability = capabilities.pop().unwrap();
424            constructor(capability, operator_info, input1, input2, output)
425        });
426
427        stream
428    }
429
430    /// Creates a new dataflow operator that partitions its input stream by a parallelization
431    /// strategy pact, and repeatedly schedules logic which can read from the input stream and
432    /// inspect the frontier at the input.
433    fn sink_async<P, B, BFut>(&self, pact: P, name: String, constructor: B)
434    where
435        B: FnOnce(OperatorInfo, AsyncInputHandle<G::Timestamp, C1, Disconnected>) -> BFut,
436        BFut: Future + 'static,
437        P: ParallelizationContract<G::Timestamp, C1>,
438    {
439        let mut builder = OperatorBuilderAsync::new(name, self.scope());
440        let operator_info = builder.operator_info();
441
442        let input = builder.new_disconnected_input(self, pact);
443
444        builder.build(move |_capabilities| constructor(operator_info, input));
445    }
446
447    // XXX(guswynn): file an minimization bug report for the logic flat_map
448    // false positive here
449    // TODO(guswynn): remove this after https://github.com/rust-lang/rust-clippy/issues/8098 is
450    // resolved. The `logic` `FnMut` needs to be borrowed in the `flat_map` call, not moved in
451    // so the simple `|d1| logic(d1)` closure is load-bearing
452    #[allow(clippy::redundant_closure)]
453    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
454        &self,
455        name: &str,
456        mut logic: L,
457    ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
458    where
459        DCB: ContainerBuilder + PushInto<D2>,
460        ECB: ContainerBuilder + PushInto<E>,
461        I: IntoIterator<Item = Result<D2, E>>,
462        L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,
463    {
464        self.unary_fallible::<DCB, ECB, _, _>(Pipeline, name, move |_, _| {
465            Box::new(move |input, ok_output, err_output| {
466                input.for_each(|time, data| {
467                    let mut ok_session = ok_output.session_with_builder(&time);
468                    let mut err_session = err_output.session_with_builder(&time);
469                    for r in data.drain().flat_map(|d1| logic(d1)) {
470                        match r {
471                            Ok(d2) => ok_session.push_into(d2),
472                            Err(e) => err_session.push_into(e),
473                        }
474                    }
475                })
476            })
477        })
478    }
479
480    fn expire_stream_at(
481        &self,
482        name: &str,
483        expiration: G::Timestamp,
484        token: Weak<()>,
485    ) -> StreamCore<G, C1> {
486        let name = format!("expire_stream_at({name})");
487        self.unary_frontier(Pipeline, &name.clone(), move |cap, _| {
488            // Retain a capability for the expiration time, which we'll only drop if the token
489            // is dropped. Else, block progress at the expiration time to prevent downstream
490            // operators from making any statement about expiration time or any following time.
491            let mut cap = Some(cap.delayed(&expiration));
492            let mut warned = false;
493            move |input, output| {
494                if token.upgrade().is_none() {
495                    // In shutdown, allow to propagate.
496                    drop(cap.take());
497                } else {
498                    let frontier = input.frontier().frontier();
499                    if !frontier.less_than(&expiration) && !warned {
500                        // Here, we print a warning, not an error. The state is only a liveness
501                        // concern, but not relevant for correctness. Additionally, a race between
502                        // shutting down the dataflow and dropping the token can cause the dataflow
503                        // to shut down before we drop the token.  This can happen when dropping
504                        // the last remaining capability on a different worker.  We do not want to
505                        // log an error every time this happens.
506
507                        tracing::warn!(
508                            name = name,
509                            frontier = ?frontier,
510                            expiration = ?expiration,
511                            "frontier not less than expiration"
512                        );
513                        warned = true;
514                    }
515                }
516                input.for_each(|time, data| {
517                    let mut session = output.session(&time);
518                    session.give_container(data);
519                });
520            }
521        })
522    }
523
524    fn pass_through<CB, R>(&self, name: &str, unit: R) -> StreamCore<G, CB::Container>
525    where
526        CB: ContainerBuilder + for<'a> PushInto<(C1::Item<'a>, G::Timestamp, R)>,
527        R: Data,
528    {
529        self.unary::<CB, _, _, _>(Pipeline, name, move |_, _| {
530            move |input, output| {
531                input.for_each(|cap, data| {
532                    let mut session = output.session_with_builder(&cap);
533                    session.give_iterator(
534                        data.drain()
535                            .map(|payload| (payload, cap.time().clone(), unit.clone())),
536                    );
537                });
538            }
539        })
540    }
541
542    fn with_token(&self, token: Weak<()>) -> StreamCore<G, C1> {
543        self.unary(Pipeline, "WithToken", move |_cap, _info| {
544            move |input, output| {
545                input.for_each(|cap, data| {
546                    if token.upgrade().is_some() {
547                        output.session(&cap).give_container(data);
548                    }
549                });
550            }
551        })
552    }
553
554    fn distribute(&self) -> StreamCore<G, C1>
555    where
556        C1: ContainerBytes + Send,
557    {
558        self.unary(crate::pact::Distribute, "Distribute", move |_, _| {
559            move |input, output| {
560                input.for_each(|time, data| {
561                    output.session(&time).give_container(data);
562                });
563            }
564        })
565    }
566}
567
568impl<G, D1, R> CollectionExt<G, D1, R> for Collection<G, D1, R>
569where
570    G: Scope,
571    G::Timestamp: Data,
572    D1: Data,
573    R: Semigroup + 'static,
574{
575    fn empty(scope: &G) -> Collection<G, D1, R> {
576        operator::empty(scope).as_collection()
577    }
578
579    fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
580        &self,
581        name: &str,
582        mut logic: L,
583    ) -> (
584        Collection<G, D2, R, DCB::Container>,
585        Collection<G, E, R, ECB::Container>,
586    )
587    where
588        DCB: ContainerBuilder + PushInto<(D2, G::Timestamp, R)>,
589        ECB: ContainerBuilder + PushInto<(E, G::Timestamp, R)>,
590        D2: Data,
591        E: Data,
592        I: IntoIterator<Item = Result<D2, E>>,
593        L: FnMut(D1) -> I + 'static,
594    {
595        let (ok_stream, err_stream) =
596            self.inner
597                .flat_map_fallible::<DCB, ECB, _, _, _, _>(name, move |(d1, t, r)| {
598                    logic(d1).into_iter().map(move |res| match res {
599                        Ok(d2) => Ok((d2, t.clone(), r.clone())),
600                        Err(e) => Err((e, t.clone(), r.clone())),
601                    })
602                });
603        (ok_stream.as_collection(), err_stream.as_collection())
604    }
605
606    fn expire_collection_at(
607        &self,
608        name: &str,
609        expiration: G::Timestamp,
610        token: Weak<()>,
611    ) -> Collection<G, D1, R> {
612        self.inner
613            .expire_stream_at(name, expiration, token)
614            .as_collection()
615    }
616
617    fn explode_one<D2, R2, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
618    where
619        D2: differential_dataflow::Data,
620        R2: Semigroup + Multiply<R>,
621        <R2 as Multiply<R>>::Output: Data + Semigroup,
622        L: FnMut(D1) -> (D2, R2) + 'static,
623        G::Timestamp: Lattice,
624    {
625        self.inner
626            .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(
627                Pipeline,
628                "ExplodeOne",
629                move |_, _| {
630                    move |input, output| {
631                        input.for_each(|time, data| {
632                            output
633                                .session_with_builder(&time)
634                                .give_iterator(data.drain(..).map(|(x, t, d)| {
635                                    let (x, d2) = logic(x);
636                                    (x, t, d2.multiply(&d))
637                                }));
638                        });
639                    }
640                },
641            )
642            .as_collection()
643    }
644
645    fn ensure_monotonic<E, IE>(&self, into_err: IE) -> (Collection<G, D1, R>, Collection<G, E, R>)
646    where
647        E: Data,
648        IE: Fn(D1, R) -> (E, R) + 'static,
649        R: num_traits::sign::Signed,
650    {
651        let (oks, errs) = self
652            .inner
653            .unary_fallible(Pipeline, "EnsureMonotonic", move |_, _| {
654                Box::new(move |input, ok_output, err_output| {
655                    input.for_each(|time, data| {
656                        let mut ok_session = ok_output.session(&time);
657                        let mut err_session = err_output.session(&time);
658                        for (x, t, d) in data.drain(..) {
659                            if d.is_positive() {
660                                ok_session.give((x, t, d))
661                            } else {
662                                let (e, d2) = into_err(x, d);
663                                err_session.give((e, t, d2))
664                            }
665                        }
666                    })
667                })
668            });
669        (oks.as_collection(), errs.as_collection())
670    }
671
672    fn with_token(&self, token: Weak<()>) -> Collection<G, D1, R> {
673        self.inner.with_token(token).as_collection()
674    }
675
676    fn consolidate_named_if<Ba>(self, must_consolidate: bool, name: &str) -> Self
677    where
678        D1: differential_dataflow::ExchangeData + Hash + Columnation,
679        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
680        G::Timestamp: Lattice + Ord + Columnation,
681        Ba: Batcher<
682                Input = Vec<((D1, ()), G::Timestamp, R)>,
683                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
684                Time = G::Timestamp,
685            > + 'static,
686    {
687        if must_consolidate {
688            // We employ AHash below instead of the default hasher in DD to obtain
689            // a better distribution of data to workers. AHash claims empirically
690            // both speed and high quality, according to
691            // https://github.com/tkaitchuck/aHash/blob/master/compare/readme.md.
692            // TODO(vmarcos): Consider here if it is worth it to spend the time to
693            // implement twisted tabulation hashing as proposed in Mihai Patrascu,
694            // Mikkel Thorup: Twisted Tabulation Hashing. SODA 2013: 209-228, available
695            // at https://epubs.siam.org/doi/epdf/10.1137/1.9781611973105.16. The latter
696            // would provide good bounds for balls-into-bins problems when the number of
697            // bins is small (as is our case), so we'd have a theoretical guarantee.
698            // NOTE: We fix the seeds of a RandomState instance explicity with the same
699            // seeds that would be given by `AHash` via ahash::AHasher::default() so as
700            // to avoid a different selection due to compile-time features being differently
701            // selected in other dependencies using `AHash` vis-à-vis cargo's strategy
702            // of unioning features.
703            // NOTE: Depending on target features, we may end up employing the fallback
704            // hasher of `AHash`, but it should be sufficient for our needs.
705            let random_state = ahash::RandomState::with_seeds(
706                0x243f_6a88_85a3_08d3,
707                0x1319_8a2e_0370_7344,
708                0xa409_3822_299f_31d0,
709                0x082e_fa98_ec4e_6c89,
710            );
711            let exchange = Exchange::new(move |update: &((D1, _), G::Timestamp, R)| {
712                let data = &(update.0).0;
713                let mut h = random_state.build_hasher();
714                data.hash(&mut h);
715                h.finish()
716            });
717            consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
718                .unary(Pipeline, "unpack consolidated", |_, _| {
719                    |input, output| {
720                        input.for_each(|time, data| {
721                            let mut session = output.session(&time);
722                            for ((k, ()), t, d) in
723                                data.iter().flatten().flat_map(|chunk| chunk.iter())
724                            {
725                                session.give((k.clone(), t.clone(), d.clone()))
726                            }
727                        })
728                    }
729                })
730                .as_collection()
731        } else {
732            self
733        }
734    }
735
736    fn consolidate_named<Ba>(self, name: &str) -> Self
737    where
738        D1: differential_dataflow::ExchangeData + Hash + Columnation,
739        R: Semigroup + differential_dataflow::ExchangeData + Columnation,
740        G::Timestamp: Lattice + Ord + Columnation,
741        Ba: Batcher<
742                Input = Vec<((D1, ()), G::Timestamp, R)>,
743                Output = TimelyStack<((D1, ()), G::Timestamp, R)>,
744                Time = G::Timestamp,
745            > + 'static,
746    {
747        let exchange =
748            Exchange::new(move |update: &((D1, ()), G::Timestamp, R)| (update.0).0.hashed());
749
750        consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
751            .unary(Pipeline, &format!("Unpack {name}"), |_, _| {
752                |input, output| {
753                    input.for_each(|time, data| {
754                        let mut session = output.session(&time);
755                        for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
756                        {
757                            session.give((k.clone(), t.clone(), d.clone()))
758                        }
759                    })
760                }
761            })
762            .as_collection()
763    }
764}
765
766/// Creates a new async data stream source for a scope.
767///
768/// The source is defined by a name, and a constructor which takes a default capability and an
769/// output handle to a future. The future is then repeatedly scheduled, and is expected to
770/// eventually send data and downgrade and release capabilities.
771pub fn source_async<G: Scope, CB, B, BFut>(
772    scope: &G,
773    name: String,
774    constructor: B,
775) -> StreamCore<G, CB::Container>
776where
777    CB: ContainerBuilder,
778    B: FnOnce(
779        Capability<G::Timestamp>,
780        OperatorInfo,
781        AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
782    ) -> BFut,
783    BFut: Future + 'static,
784{
785    let mut builder = OperatorBuilderAsync::new(name, scope.clone());
786    let operator_info = builder.operator_info();
787
788    let (output, stream) = builder.new_output();
789
790    builder.build(move |mut capabilities| {
791        // `capabilities` should be a single-element vector.
792        let capability = capabilities.pop().unwrap();
793        constructor(capability, operator_info, output)
794    });
795
796    stream
797}
798
799/// Aggregates the weights of equal records into at most one record.
800///
801/// Produces a stream of chains of records, partitioned according to `pact`. The
802/// data is sorted according to `Ba`. For each timestamp, it produces at most one chain.
803///
804/// The data are accumulated in place, each held back until their timestamp has completed.
805pub fn consolidate_pact<Ba, P, G>(
806    stream: &StreamCore<G, Ba::Input>,
807    pact: P,
808    name: &str,
809) -> Stream<G, Vec<Ba::Output>>
810where
811    G: Scope,
812    Ba: Batcher<Time = G::Timestamp> + 'static,
813    Ba::Input: Container + Clone + 'static,
814    Ba::Output: Container + Clone,
815    P: ParallelizationContract<G::Timestamp, Ba::Input>,
816{
817    stream.unary_frontier(pact, name, |_cap, info| {
818        // Acquire a logger for arrange events.
819        let logger = {
820            let scope = stream.scope();
821            let register = scope.log_register();
822            register
823                .get::<DifferentialEventBuilder>("differential/arrange")
824                .map(Into::into)
825        };
826
827        let mut batcher = Ba::new(logger, info.global_id);
828        // Capabilities for the lower envelope of updates in `batcher`.
829        let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
830        let mut prev_frontier = Antichain::from_elem(G::Timestamp::minimum());
831
832        move |input, output| {
833            input.for_each(|cap, data| {
834                capabilities.insert(cap.retain());
835                batcher.push_container(data);
836            });
837
838            if prev_frontier.borrow() != input.frontier().frontier() {
839                if capabilities
840                    .elements()
841                    .iter()
842                    .any(|c| !input.frontier().less_equal(c.time()))
843                {
844                    let mut upper = Antichain::new(); // re-used allocation for sealing batches.
845
846                    // For each capability not in advance of the input frontier ...
847                    for (index, capability) in capabilities.elements().iter().enumerate() {
848                        if !input.frontier().less_equal(capability.time()) {
849                            // Assemble the upper bound on times we can commit with this capabilities.
850                            // We must respect the input frontier, and *subsequent* capabilities, as
851                            // we are pretending to retire the capability changes one by one.
852                            upper.clear();
853                            for time in input.frontier().frontier().iter() {
854                                upper.insert(time.clone());
855                            }
856                            for other_capability in &capabilities.elements()[(index + 1)..] {
857                                upper.insert(other_capability.time().clone());
858                            }
859
860                            // send the batch to downstream consumers, empty or not.
861                            let mut session = output.session(&capabilities.elements()[index]);
862                            // Extract updates not in advance of `upper`.
863                            let output =
864                                batcher.seal::<ConsolidateBuilder<_, Ba::Output>>(upper.clone());
865                            session.give(output);
866                        }
867                    }
868
869                    // Having extracted and sent batches between each capability and the input frontier,
870                    // we should downgrade all capabilities to match the batcher's lower update frontier.
871                    // This may involve discarding capabilities, which is fine as any new updates arrive
872                    // in messages with new capabilities.
873
874                    let mut new_capabilities = Antichain::new();
875                    for time in batcher.frontier().iter() {
876                        if let Some(capability) = capabilities
877                            .elements()
878                            .iter()
879                            .find(|c| c.time().less_equal(time))
880                        {
881                            new_capabilities.insert(capability.delayed(time));
882                        } else {
883                            panic!("failed to find capability");
884                        }
885                    }
886
887                    capabilities = new_capabilities;
888                }
889
890                prev_frontier.clear();
891                prev_frontier.extend(input.frontier().frontier().iter().cloned());
892            }
893        }
894    })
895}
896
897/// A builder that wraps a session for direct output to a stream.
898struct ConsolidateBuilder<T, I> {
899    _marker: PhantomData<(T, I)>,
900}
901
902impl<T, I> Builder for ConsolidateBuilder<T, I>
903where
904    T: Timestamp,
905    I: Container,
906{
907    type Input = I;
908    type Time = T;
909    type Output = Vec<I>;
910
911    fn new() -> Self {
912        Self {
913            _marker: PhantomData,
914        }
915    }
916
917    fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
918        Self::new()
919    }
920
921    fn push(&mut self, _chunk: &mut Self::Input) {
922        unimplemented!("ConsolidateBuilder::push")
923    }
924
925    fn done(self, _: Description<Self::Time>) -> Self::Output {
926        unimplemented!("ConsolidateBuilder::done")
927    }
928
929    fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
930        std::mem::take(chain)
931    }
932}