Skip to main content

mz_compute/extensions/
temporal_bucket.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities and stream extensions for temporal bucketing.
11
12use differential_dataflow::Hashable;
13use differential_dataflow::difference::Semigroup;
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::trace::Batcher;
16use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
17use mz_timely_util::columnation::{ColInternalMerger, ColumnationChunker, ColumnationStack};
18use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp};
19use timely::container::PushInto;
20use timely::dataflow::channels::pact::Exchange;
21use timely::dataflow::operators::Operator;
22use timely::dataflow::{Stream, StreamVec};
23use timely::order::TotalOrder;
24use timely::progress::{Antichain, PathSummary, Timestamp};
25use timely::{ContainerBuilder, ExchangeData, PartialOrder};
26
27use crate::typedefs::MzData;
28
29/// Sort outstanding updates into a [`BucketChain`], and reveal data not in advance of the input
30/// frontier. Retains a capability at the last input frontier to retain the right to produce data
31/// at times between the last input frontier and the current input frontier.
32pub trait TemporalBucketing<'scope, T: Timestamp, O> {
33    /// Construct a new stream that stores updates into a [`BucketChain`] and reveals data
34    /// not in advance of the frontier. Data that is within `threshold` distance of the input
35    /// frontier or the `as_of` is passed through without being stored in the chain.
36    fn bucket<CB>(
37        self,
38        as_of: Antichain<T>,
39        threshold: T::Summary,
40    ) -> Stream<'scope, T, CB::Container>
41    where
42        CB: ContainerBuilder + PushInto<O>;
43}
44
45/// Implementation for streams in scopes where timestamps define a total order.
46impl<'scope, T, D> TemporalBucketing<'scope, T, (D, T, mz_repr::Diff)>
47    for StreamVec<'scope, T, (D, T, mz_repr::Diff)>
48where
49    T: Timestamp + ExchangeData + MzData + BucketTimestamp + TotalOrder + Lattice,
50    D: ExchangeData + MzData + Ord + Clone + std::fmt::Debug + Hashable,
51{
52    fn bucket<CB>(
53        self,
54        as_of: Antichain<T>,
55        threshold: T::Summary,
56    ) -> Stream<'scope, T, CB::Container>
57    where
58        CB: ContainerBuilder + PushInto<(D, T, mz_repr::Diff)>,
59    {
60        let scope = self.scope();
61        let logger = scope
62            .worker()
63            .logger_for("differential/arrange")
64            .map(Into::into);
65
66        let pact = Exchange::new(|(d, _, _): &(D, T, mz_repr::Diff)| d.hashed().into());
67        self.unary_frontier::<CB, _, _, _>(pact, "Temporal delay", |cap, info| {
68            let mut chain = BucketChain::new(MergeBatcherWrapper::new(logger, info.global_id));
69            let activator = scope.activator_for(info.address);
70
71            // Cap tracking the lower bound of potentially outstanding data.
72            let mut cap = Some(cap);
73
74            // Buffer for data to be inserted into the chain.
75            let mut buffer = Vec::new();
76
77            move |(input, frontier), output| {
78                // The upper frontier is the join of the input frontier and the `as_of` frontier,
79                // with the `threshold` summary applied to it.
80                let mut upper = Antichain::new();
81                for time1 in &frontier.frontier() {
82                    for time2 in as_of.elements() {
83                        // TODO: Use `join_assign` if we ever use a timestamp with allocations.
84                        if let Some(time) = threshold.results_in(&time1.join(time2)) {
85                            upper.insert(time);
86                        }
87                    }
88                }
89
90                input.for_each_time(|time, data| {
91                    let mut session = output.session_with_builder(&time);
92                    for data in data {
93                        // Skip data that is about to be revealed.
94                        let pass_through = data.extract_if(.., |(_, t, _)| !upper.less_equal(t));
95                        session.give_iterator(pass_through);
96
97                        // Sort data by time, then drain it into a buffer that contains data for a
98                        // single bucket. We scan the data for ranges of time that fall into the same
99                        // bucket so we can push batches of data at once.
100                        data.sort_unstable_by(|(_, t, _), (_, t2, _)| t.cmp(t2));
101
102                        let mut drain = data.drain(..);
103                        if let Some((datum, time, diff)) = drain.next() {
104                            let mut range = chain.range_of(&time).expect("Must exist");
105                            buffer.push((datum, time, diff));
106                            for (datum, time, diff) in drain {
107                                // If we have a range, check if the time is not within it.
108                                if !range.contains(&time) {
109                                    // If the time is outside the range, push the current buffer
110                                    // to the chain and reset the range.
111                                    if !buffer.is_empty() {
112                                        let bucket =
113                                            chain.find_mut(&range.start).expect("Must exist");
114                                        bucket.push_container(&mut buffer);
115                                        buffer.clear();
116                                    }
117                                    range = chain.range_of(&time).expect("Must exist");
118                                }
119                                buffer.push((datum, time, diff));
120                            }
121
122                            // Handle leftover data in the buffer.
123                            if !buffer.is_empty() {
124                                let bucket = chain.find_mut(&range.start).expect("Must exist");
125                                bucket.push_container(&mut buffer);
126                                buffer.clear();
127                            }
128                        }
129                    }
130                });
131
132                // Check for data that is ready to be revealed.
133                let peeled = chain.peel(upper.borrow());
134                if let Some(cap) = cap.as_ref() {
135                    let mut session = output.session_with_builder(cap);
136                    for stack in peeled.into_iter().flat_map(|x| x.done()) {
137                        // TODO: If we have a columnar merge batcher, cloning won't be necessary.
138                        session.give_iterator(stack.iter().cloned());
139                    }
140                } else {
141                    // If we don't have a cap, we should not have any data to reveal.
142                    assert_eq!(
143                        peeled.into_iter().flat_map(|x| x.done()).next(),
144                        None,
145                        "Unexpected data revealed without a cap."
146                    );
147                }
148
149                // Downgrade the cap to the current input frontier.
150                if frontier.is_empty() || upper.is_empty() {
151                    cap = None;
152                } else if let Some(cap) = cap.as_mut() {
153                    // TODO: This assumes that the time is total ordered.
154                    cap.downgrade(&upper[0]);
155                }
156
157                // Maintain the bucket chain by restoring it with fuel.
158                let mut fuel = 1_000_000;
159                chain.restore(&mut fuel);
160                if fuel <= 0 {
161                    // If we run out of fuel, we activate the operator to continue processing.
162                    activator.activate();
163                }
164            }
165        })
166    }
167}
168
169/// A wrapper around `MergeBatcher` that implements the `Storage` trait for bucketing.
170///
171/// The merge batcher consumes pre-chunked input, so this wrapper carries a
172/// [`ColumnationChunker`] that consolidates the `Vec` input into the
173/// [`ColumnationStack`] chunks the batcher consumes.
174struct MergeBatcherWrapper<D, T, R>
175where
176    D: MzData + Ord + Clone,
177    T: MzData + Ord + PartialOrder + Clone,
178    R: MzData + Semigroup + Default,
179{
180    logger: Option<differential_dataflow::logging::Logger>,
181    operator_id: usize,
182    chunker: ColumnationChunker<(D, T, R)>,
183    inner: MergeBatcher<ColInternalMerger<D, T, R>>,
184}
185
186impl<D, T, R> MergeBatcherWrapper<D, T, R>
187where
188    D: MzData + Ord + Clone + 'static,
189    T: MzData + Ord + PartialOrder + Clone + Timestamp,
190    R: MzData + Semigroup + Default + 'static,
191{
192    /// Construct a new `MergeBatcherWrapper` with the given logger and operator ID.
193    fn new(logger: Option<differential_dataflow::logging::Logger>, operator_id: usize) -> Self {
194        Self {
195            logger: logger.clone(),
196            operator_id,
197            chunker: ColumnationChunker::default(),
198            inner: MergeBatcher::new(logger, operator_id),
199        }
200    }
201
202    /// Consolidate `buffer` through the chunker and feed any complete chunks to
203    /// the batcher.
204    fn push_container(&mut self, buffer: &mut Vec<(D, T, R)>) {
205        use timely::container::ContainerBuilder as _;
206        self.chunker.push_into(buffer);
207        while let Some(chunk) = self.chunker.extract() {
208            self.inner.push_into(std::mem::take(chunk));
209        }
210    }
211
212    /// Flush any partial chunk still held by the chunker into the batcher.
213    fn flush(&mut self) {
214        use timely::container::ContainerBuilder as _;
215        while let Some(chunk) = self.chunker.finish() {
216            self.inner.push_into(std::mem::take(chunk));
217        }
218    }
219
220    /// Reveal the contents of the `MergeBatcher`, returning a vector of `ColumnationStack`s.
221    fn done(mut self) -> Vec<ColumnationStack<(D, T, R)>> {
222        self.flush();
223        let (chain, _description) = self.inner.seal(Antichain::new());
224        chain
225    }
226}
227
228impl<D, T, R> Bucket for MergeBatcherWrapper<D, T, R>
229where
230    D: MzData + Ord + Clone + 'static,
231    T: MzData + Ord + PartialOrder + Clone + 'static + BucketTimestamp,
232    R: MzData + Semigroup + Default + 'static,
233{
234    type Timestamp = T;
235
236    fn split(mut self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) {
237        // The implementation isn't tuned for performance. We should not bounce in and out of
238        // different containers when not needed. The merge batcher we use can only accept
239        // vectors as inputs, but not any other container type.
240        // TODO: Allow the merge batcher to accept more generic containers.
241        self.flush();
242        let upper = Antichain::from_elem(timestamp.clone());
243        let mut lower = Self::new(self.logger.clone(), self.operator_id);
244        let mut buffer = Vec::new();
245        let (chain, _description) = self.inner.seal(upper);
246        for chunk in chain {
247            *fuel = fuel.saturating_sub(chunk.len().try_into().expect("must fit"));
248            // TODO: Avoid this cloning.
249            buffer.extend(chunk.into_iter().cloned());
250            lower.push_container(&mut buffer);
251            buffer.clear();
252        }
253        (lower, self)
254    }
255}