Skip to main content

mz_compute/extensions/
temporal_bucket.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities and stream extensions for temporal bucketing.
11
12use std::marker::PhantomData;
13
14use differential_dataflow::Hashable;
15use differential_dataflow::difference::Semigroup;
16use differential_dataflow::lattice::Lattice;
17use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
18use differential_dataflow::trace::{Batcher, Builder, Description};
19use mz_timely_util::columnation::{ColInternalMerger, ColumnationChunker, ColumnationStack};
20use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp};
21use timely::container::PushInto;
22use timely::dataflow::channels::pact::Exchange;
23use timely::dataflow::operators::Operator;
24use timely::dataflow::{Stream, StreamVec};
25use timely::order::TotalOrder;
26use timely::progress::{Antichain, PathSummary, Timestamp};
27use timely::{ContainerBuilder, ExchangeData, PartialOrder};
28
29use crate::typedefs::MzData;
30
31/// Sort outstanding updates into a [`BucketChain`], and reveal data not in advance of the input
32/// frontier. Retains a capability at the last input frontier to retain the right to produce data
33/// at times between the last input frontier and the current input frontier.
34pub trait TemporalBucketing<'scope, T: Timestamp, O> {
35    /// Construct a new stream that stores updates into a [`BucketChain`] and reveals data
36    /// not in advance of the frontier. Data that is within `threshold` distance of the input
37    /// frontier or the `as_of` is passed through without being stored in the chain.
38    fn bucket<CB>(
39        self,
40        as_of: Antichain<T>,
41        threshold: T::Summary,
42    ) -> Stream<'scope, T, CB::Container>
43    where
44        CB: ContainerBuilder + PushInto<O>;
45}
46
47/// Implementation for streams in scopes where timestamps define a total order.
48impl<'scope, T, D> TemporalBucketing<'scope, T, (D, T, mz_repr::Diff)>
49    for StreamVec<'scope, T, (D, T, mz_repr::Diff)>
50where
51    T: Timestamp + ExchangeData + MzData + BucketTimestamp + TotalOrder + Lattice,
52    D: ExchangeData + MzData + Ord + Clone + std::fmt::Debug + Hashable,
53{
54    fn bucket<CB>(
55        self,
56        as_of: Antichain<T>,
57        threshold: T::Summary,
58    ) -> Stream<'scope, T, CB::Container>
59    where
60        CB: ContainerBuilder + PushInto<(D, T, mz_repr::Diff)>,
61    {
62        let scope = self.scope();
63        let logger = scope
64            .worker()
65            .logger_for("differential/arrange")
66            .map(Into::into);
67
68        let pact = Exchange::new(|(d, _, _): &(D, T, mz_repr::Diff)| d.hashed().into());
69        self.unary_frontier::<CB, _, _, _>(pact, "Temporal delay", |cap, info| {
70            let mut chain = BucketChain::new(MergeBatcherWrapper::new(logger, info.global_id));
71            let activator = scope.activator_for(info.address);
72
73            // Cap tracking the lower bound of potentially outstanding data.
74            let mut cap = Some(cap);
75
76            // Buffer for data to be inserted into the chain.
77            let mut buffer = Vec::new();
78
79            move |(input, frontier), output| {
80                // The upper frontier is the join of the input frontier and the `as_of` frontier,
81                // with the `threshold` summary applied to it.
82                let mut upper = Antichain::new();
83                for time1 in &frontier.frontier() {
84                    for time2 in as_of.elements() {
85                        // TODO: Use `join_assign` if we ever use a timestamp with allocations.
86                        if let Some(time) = threshold.results_in(&time1.join(time2)) {
87                            upper.insert(time);
88                        }
89                    }
90                }
91
92                input.for_each_time(|time, data| {
93                    let mut session = output.session_with_builder(&time);
94                    for data in data {
95                        // Skip data that is about to be revealed.
96                        let pass_through = data.extract_if(.., |(_, t, _)| !upper.less_equal(t));
97                        session.give_iterator(pass_through);
98
99                        // Sort data by time, then drain it into a buffer that contains data for a
100                        // single bucket. We scan the data for ranges of time that fall into the same
101                        // bucket so we can push batches of data at once.
102                        data.sort_unstable_by(|(_, t, _), (_, t2, _)| t.cmp(t2));
103
104                        let mut drain = data.drain(..);
105                        if let Some((datum, time, diff)) = drain.next() {
106                            let mut range = chain.range_of(&time).expect("Must exist");
107                            buffer.push((datum, time, diff));
108                            for (datum, time, diff) in drain {
109                                // If we have a range, check if the time is not within it.
110                                if !range.contains(&time) {
111                                    // If the time is outside the range, push the current buffer
112                                    // to the chain and reset the range.
113                                    if !buffer.is_empty() {
114                                        let bucket =
115                                            chain.find_mut(&range.start).expect("Must exist");
116                                        bucket.inner.push_container(&mut buffer);
117                                        buffer.clear();
118                                    }
119                                    range = chain.range_of(&time).expect("Must exist");
120                                }
121                                buffer.push((datum, time, diff));
122                            }
123
124                            // Handle leftover data in the buffer.
125                            if !buffer.is_empty() {
126                                let bucket = chain.find_mut(&range.start).expect("Must exist");
127                                bucket.inner.push_container(&mut buffer);
128                                buffer.clear();
129                            }
130                        }
131                    }
132                });
133
134                // Check for data that is ready to be revealed.
135                let peeled = chain.peel(upper.borrow());
136                if let Some(cap) = cap.as_ref() {
137                    let mut session = output.session_with_builder(cap);
138                    for stack in peeled.into_iter().flat_map(|x| x.done()) {
139                        // TODO: If we have a columnar merge batcher, cloning won't be necessary.
140                        session.give_iterator(stack.iter().cloned());
141                    }
142                } else {
143                    // If we don't have a cap, we should not have any data to reveal.
144                    assert_eq!(
145                        peeled.into_iter().flat_map(|x| x.done()).next(),
146                        None,
147                        "Unexpected data revealed without a cap."
148                    );
149                }
150
151                // Downgrade the cap to the current input frontier.
152                if frontier.is_empty() || upper.is_empty() {
153                    cap = None;
154                } else if let Some(cap) = cap.as_mut() {
155                    // TODO: This assumes that the time is total ordered.
156                    cap.downgrade(&upper[0]);
157                }
158
159                // Maintain the bucket chain by restoring it with fuel.
160                let mut fuel = 1_000_000;
161                chain.restore(&mut fuel);
162                if fuel <= 0 {
163                    // If we run out of fuel, we activate the operator to continue processing.
164                    activator.activate();
165                }
166            }
167        })
168    }
169}
170
171/// A wrapper around `MergeBatcher` that implements the `Storage` trait for bucketing.
172struct MergeBatcherWrapper<D, T, R>
173where
174    D: MzData + Ord + Clone,
175    T: MzData + Ord + PartialOrder + Clone,
176    R: MzData + Semigroup + Default,
177{
178    logger: Option<differential_dataflow::logging::Logger>,
179    operator_id: usize,
180    inner: MergeBatcher<Vec<(D, T, R)>, ColumnationChunker<(D, T, R)>, ColInternalMerger<D, T, R>>,
181}
182
183impl<D, T, R> MergeBatcherWrapper<D, T, R>
184where
185    D: MzData + Ord + Clone,
186    T: MzData + Ord + PartialOrder + Clone + Timestamp,
187    R: MzData + Semigroup + Default,
188{
189    /// Construct a new `MergeBatcherWrapper` with the given logger and operator ID.
190    fn new(logger: Option<differential_dataflow::logging::Logger>, operator_id: usize) -> Self {
191        Self {
192            logger: logger.clone(),
193            operator_id,
194            inner: MergeBatcher::new(logger, operator_id),
195        }
196    }
197
198    /// Reveal the contents of the `MergeBatcher`, returning a vector of `ColumnationStack`s.
199    fn done(mut self) -> Vec<ColumnationStack<(D, T, R)>> {
200        self.inner.seal::<CapturingBuilder<_, _>>(Antichain::new())
201    }
202}
203
204impl<D, T, R> Bucket for MergeBatcherWrapper<D, T, R>
205where
206    D: MzData + Ord + Clone + 'static,
207    T: MzData + Ord + PartialOrder + Clone + 'static + BucketTimestamp,
208    R: MzData + Semigroup + Default,
209{
210    type Timestamp = T;
211
212    fn split(mut self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) {
213        // The implementation isn't tuned for performance. We should not bounce in and out of
214        // different containers when not needed. The merge batcher we use can only accept
215        // vectors as inputs, but not any other container type.
216        // TODO: Allow the merge batcher to accept more generic containers.
217        let upper = Antichain::from_elem(timestamp.clone());
218        let mut lower = Self::new(self.logger.clone(), self.operator_id);
219        let mut buffer = Vec::new();
220        for chunk in self.inner.seal::<CapturingBuilder<_, _>>(upper) {
221            *fuel = fuel.saturating_sub(chunk.len().try_into().expect("must fit"));
222            // TODO: Avoid this cloning.
223            buffer.extend(chunk.into_iter().cloned());
224            lower.inner.push_container(&mut buffer);
225            buffer.clear();
226        }
227        (lower, self)
228    }
229}
230
231struct CapturingBuilder<D, T>(D, PhantomData<T>);
232
233impl<D, T: Timestamp> Builder for CapturingBuilder<D, T> {
234    type Input = D;
235    type Time = T;
236    type Output = Vec<D>;
237
238    fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
239        // Not needed for this implementation.
240        unimplemented!()
241    }
242
243    fn push(&mut self, _chunk: &mut Self::Input) {
244        // Not needed for this implementation.
245        unimplemented!()
246    }
247
248    fn done(self, _description: Description<Self::Time>) -> Self::Output {
249        // Not needed for this implementation.
250        unimplemented!()
251    }
252
253    #[inline]
254    fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
255        std::mem::take(chain)
256    }
257}