Skip to main content

mz_compute/extensions/
temporal_bucket.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities and stream extensions for temporal bucketing.
11
12use columnar::{Columnar, Index, Len};
13use differential_dataflow::Hashable;
14use differential_dataflow::difference::Semigroup;
15use differential_dataflow::lattice::Lattice;
16use differential_dataflow::trace::Batcher;
17use mz_timely_util::columnar::Column;
18use mz_timely_util::columnar::batcher::ColumnChunker;
19use mz_timely_util::columnar::merge_batcher::ColumnMergeBatcher;
20use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp};
21use timely::Accountable;
22use timely::container::PushInto;
23use timely::dataflow::channels::pact::Exchange;
24use timely::dataflow::operators::Operator;
25use timely::dataflow::{Stream, StreamVec};
26use timely::order::TotalOrder;
27use timely::progress::{Antichain, PathSummary, Timestamp};
28use timely::{ContainerBuilder, ExchangeData, PartialOrder};
29
30use crate::typedefs::MzData;
31
32/// Sort outstanding updates into a [`BucketChain`], and reveal data not in advance of the input
33/// frontier. Retains a capability at the last input frontier to retain the right to produce data
34/// at times between the last input frontier and the current input frontier.
35pub trait TemporalBucketing<'scope, T: Timestamp, O> {
36    /// Construct a new stream that stores updates into a [`BucketChain`] and reveals data
37    /// not in advance of the frontier. Data that is within `threshold` distance of the input
38    /// frontier or the `as_of` is passed through without being stored in the chain.
39    fn bucket<CB>(
40        self,
41        as_of: Antichain<T>,
42        threshold: T::Summary,
43    ) -> Stream<'scope, T, CB::Container>
44    where
45        CB: ContainerBuilder + PushInto<O>;
46}
47
48/// Implementation for streams in scopes where timestamps define a total order.
49impl<'scope, T, D> TemporalBucketing<'scope, T, (D, T, mz_repr::Diff)>
50    for StreamVec<'scope, T, (D, T, mz_repr::Diff)>
51where
52    T: Timestamp + Default + ExchangeData + MzData + BucketTimestamp + TotalOrder + Lattice,
53    D: ExchangeData + MzData + Ord + Clone + std::fmt::Debug + Hashable,
54{
55    fn bucket<CB>(
56        self,
57        as_of: Antichain<T>,
58        threshold: T::Summary,
59    ) -> Stream<'scope, T, CB::Container>
60    where
61        CB: ContainerBuilder + PushInto<(D, T, mz_repr::Diff)>,
62    {
63        let scope = self.scope();
64        let logger = scope
65            .worker()
66            .logger_for("differential/arrange")
67            .map(Into::into);
68
69        let pact = Exchange::new(|(d, _, _): &(D, T, mz_repr::Diff)| d.hashed().into());
70        self.unary_frontier::<CB, _, _, _>(pact, "Temporal delay", |cap, info| {
71            let mut chain = BucketChain::new(MergeBatcherWrapper::new(logger, info.global_id));
72            let activator = scope.activator_for(info.address);
73
74            // Cap tracking the lower bound of potentially outstanding data.
75            let mut cap = Some(cap);
76
77            // Buffer for data to be inserted into the chain.
78            let mut buffer = Vec::new();
79
80            move |(input, frontier), output| {
81                // The upper frontier is the join of the input frontier and the `as_of` frontier,
82                // with the `threshold` summary applied to it.
83                let mut upper = Antichain::new();
84                for time1 in &frontier.frontier() {
85                    for time2 in as_of.elements() {
86                        // TODO: Use `join_assign` if we ever use a timestamp with allocations.
87                        if let Some(time) = threshold.results_in(&time1.join(time2)) {
88                            upper.insert(time);
89                        }
90                    }
91                }
92
93                input.for_each_time(|time, data| {
94                    let mut session = output.session_with_builder(&time);
95                    for data in data {
96                        // Skip data that is about to be revealed.
97                        let pass_through = data.extract_if(.., |(_, t, _)| !upper.less_equal(t));
98                        session.give_iterator(pass_through);
99
100                        // Sort data by time, then drain it into a buffer that contains data for a
101                        // single bucket. We scan the data for ranges of time that fall into the same
102                        // bucket so we can push batches of data at once.
103                        data.sort_unstable_by(|(_, t, _), (_, t2, _)| t.cmp(t2));
104
105                        let mut drain = data.drain(..);
106                        if let Some((datum, time, diff)) = drain.next() {
107                            let mut range = chain.range_of(&time).expect("Must exist");
108                            buffer.push((datum, time, diff));
109                            for (datum, time, diff) in drain {
110                                // If we have a range, check if the time is not within it.
111                                if !range.contains(&time) {
112                                    // If the time is outside the range, push the current buffer
113                                    // to the chain and reset the range.
114                                    if !buffer.is_empty() {
115                                        let bucket =
116                                            chain.find_mut(&range.start).expect("Must exist");
117                                        bucket.push_container(&mut buffer);
118                                        buffer.clear();
119                                    }
120                                    range = chain.range_of(&time).expect("Must exist");
121                                }
122                                buffer.push((datum, time, diff));
123                            }
124
125                            // Handle leftover data in the buffer.
126                            if !buffer.is_empty() {
127                                let bucket = chain.find_mut(&range.start).expect("Must exist");
128                                bucket.push_container(&mut buffer);
129                                buffer.clear();
130                            }
131                        }
132                    }
133                });
134
135                // Check for data that is ready to be revealed.
136                let peeled = chain.peel(upper.borrow());
137                if let Some(cap) = cap.as_ref() {
138                    let mut session = output.session_with_builder(cap);
139                    for chunk in peeled.into_iter().flat_map(|x| x.done()) {
140                        // The columnar merge batcher hands back `Column` chunks; the output
141                        // builder consumes owned `(D, T, Diff)` tuples, so reconstitute each
142                        // record from its columnar reference.
143                        session.give_iterator(
144                            chunk
145                                .borrow()
146                                .into_index_iter()
147                                .map(<(D, T, mz_repr::Diff)>::into_owned),
148                        );
149                    }
150                } else {
151                    // If we don't have a cap, we should not have any data to reveal.
152                    assert!(
153                        peeled
154                            .into_iter()
155                            .flat_map(|x| x.done())
156                            .all(|chunk| chunk.record_count() == 0),
157                        "Unexpected data revealed without a cap."
158                    );
159                }
160
161                // Downgrade the cap to the current input frontier.
162                if frontier.is_empty() || upper.is_empty() {
163                    cap = None;
164                } else if let Some(cap) = cap.as_mut() {
165                    // TODO: This assumes that the time is total ordered.
166                    cap.downgrade(&upper[0]);
167                }
168
169                // Maintain the bucket chain by restoring it with fuel.
170                let mut fuel = 1_000_000;
171                chain.restore(&mut fuel);
172                if fuel <= 0 {
173                    // If we run out of fuel, we activate the operator to continue processing.
174                    activator.activate();
175                }
176            }
177        })
178    }
179}
180
181/// A wrapper around [`ColumnMergeBatcher`] that implements the bucketing API.
182///
183/// This is the same columnar-native merge batcher (`Col2ValPagedBatcher`) the
184/// default arrangement uses, so the bucket chain and arrangements share a single
185/// merge-batcher implementation. The batcher consumes pre-chunked, consolidated
186/// [`Column`] input, so this wrapper carries a [`ColumnChunker`] that sorts and
187/// consolidates the `Vec` input into the [`Column`] chunks the batcher consumes.
188struct MergeBatcherWrapper<D, T, R>
189where
190    D: MzData + Ord + Clone,
191    T: MzData + Ord + PartialOrder + Clone,
192    R: MzData + Semigroup + Default,
193{
194    logger: Option<differential_dataflow::logging::Logger>,
195    operator_id: usize,
196    chunker: ColumnChunker<(D, T, R)>,
197    inner: ColumnMergeBatcher<D, T, R>,
198}
199
200impl<D, T, R> MergeBatcherWrapper<D, T, R>
201where
202    D: MzData + Ord + Clone + 'static,
203    T: MzData + Ord + PartialOrder + Clone + Default + Timestamp,
204    R: MzData + Semigroup + Default + 'static + for<'a> Semigroup<columnar::Ref<'a, R>>,
205    for<'a> columnar::Ref<'a, R>: Ord,
206    for<'a> <D as Columnar>::Container: columnar::Push<columnar::Ref<'a, D>>,
207    for<'a> <T as Columnar>::Container: columnar::Push<columnar::Ref<'a, T>>,
208    for<'a> <R as Columnar>::Container: columnar::Push<&'a R>,
209    for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>,
210{
211    /// Construct a new `MergeBatcherWrapper` with the given logger and operator ID.
212    fn new(logger: Option<differential_dataflow::logging::Logger>, operator_id: usize) -> Self {
213        Self {
214            logger: logger.clone(),
215            operator_id,
216            chunker: ColumnChunker::default(),
217            inner: ColumnMergeBatcher::new(logger, operator_id),
218        }
219    }
220
221    /// Consolidate `buffer` through the chunker and feed any complete chunks to
222    /// the batcher. Empties `buffer`, retaining its capacity.
223    fn push_container(&mut self, buffer: &mut Vec<(D, T, R)>) {
224        use timely::container::{ContainerBuilder as _, PushInto as _};
225        if buffer.is_empty() {
226            return;
227        }
228        // The chunker consumes `Column` input, so stage the `Vec` updates into a
229        // raw `Column` first; the chunker then sorts and consolidates them.
230        let mut raw: Column<(D, T, R)> = Default::default();
231        for update in buffer.drain(..) {
232            raw.push_into(&update);
233        }
234        self.chunker.push_into(&mut raw);
235        while let Some(chunk) = self.chunker.extract() {
236            self.inner.push_into(std::mem::take(chunk));
237        }
238    }
239
240    /// Flush any partial chunk still held by the chunker into the batcher.
241    fn flush(&mut self) {
242        use timely::container::ContainerBuilder as _;
243        while let Some(chunk) = self.chunker.finish() {
244            self.inner.push_into(std::mem::take(chunk));
245        }
246    }
247
248    /// Reveal the contents of the merge batcher, returning a vector of `Column` chunks.
249    fn done(mut self) -> Vec<Column<(D, T, R)>> {
250        self.flush();
251        let (chain, _description) = self.inner.seal(Antichain::new());
252        chain
253    }
254}
255
256impl<D, T, R> Bucket for MergeBatcherWrapper<D, T, R>
257where
258    D: MzData + Ord + Clone + 'static,
259    T: MzData + Ord + PartialOrder + Clone + Default + 'static + BucketTimestamp,
260    R: MzData + Semigroup + Default + 'static + for<'a> Semigroup<columnar::Ref<'a, R>>,
261    for<'a> columnar::Ref<'a, R>: Ord,
262    for<'a> <D as Columnar>::Container: columnar::Push<columnar::Ref<'a, D>>,
263    for<'a> <T as Columnar>::Container: columnar::Push<columnar::Ref<'a, T>>,
264    for<'a> <R as Columnar>::Container: columnar::Push<&'a R>,
265    for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>,
266{
267    type Timestamp = T;
268
269    fn split(mut self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) {
270        // The implementation isn't tuned for performance: we round-trip the sealed
271        // chunks back through a `Vec` and re-chunk them into the lower batcher.
272        // TODO: Split the batcher's chains directly without re-chunking.
273        self.flush();
274        let upper = Antichain::from_elem(timestamp.clone());
275        let mut lower = Self::new(self.logger.clone(), self.operator_id);
276        let mut buffer = Vec::new();
277        let (chain, _description) = self.inner.seal(upper);
278        for chunk in chain {
279            *fuel = fuel.saturating_sub(chunk.record_count());
280            // TODO: Avoid this cloning.
281            buffer.extend(
282                chunk
283                    .borrow()
284                    .into_index_iter()
285                    .map(<(D, T, R)>::into_owned),
286            );
287            lower.push_container(&mut buffer);
288            buffer.clear();
289        }
290        (lower, self)
291    }
292}