Skip to main content

mz_compute/extensions/
temporal_bucket.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities and stream extensions for temporal bucketing.
11
12use std::marker::PhantomData;
13
14use differential_dataflow::Hashable;
15use differential_dataflow::containers::TimelyStack;
16use differential_dataflow::difference::Semigroup;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::implementations::chunker::ColumnationChunker;
19use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
20use differential_dataflow::trace::implementations::merge_batcher::container::ColInternalMerger;
21use differential_dataflow::trace::{Batcher, Builder, Description};
22use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp};
23use timely::container::PushInto;
24use timely::dataflow::channels::pact::Exchange;
25use timely::dataflow::operators::Operator;
26use timely::dataflow::{Scope, Stream, StreamVec};
27use timely::order::TotalOrder;
28use timely::progress::{Antichain, PathSummary, Timestamp};
29use timely::{ContainerBuilder, ExchangeData, PartialOrder};
30
31use crate::typedefs::MzData;
32
33/// Sort outstanding updates into a [`BucketChain`], and reveal data not in advance of the input
34/// frontier. Retains a capability at the last input frontier to retain the right to produce data
35/// at times between the last input frontier and the current input frontier.
36pub trait TemporalBucketing<G: Scope, O> {
37    /// Construct a new stream that stores updates into a [`BucketChain`] and reveals data
38    /// not in advance of the frontier. Data that is within `threshold` distance of the input
39    /// frontier or the `as_of` is passed through without being stored in the chain.
40    fn bucket<CB>(
41        self,
42        as_of: Antichain<G::Timestamp>,
43        threshold: <G::Timestamp as Timestamp>::Summary,
44    ) -> Stream<G, CB::Container>
45    where
46        CB: ContainerBuilder + PushInto<O>;
47}
48
49/// Implementation for streams in scopes where timestamps define a total order.
50impl<G, D> TemporalBucketing<G, (D, G::Timestamp, mz_repr::Diff)>
51    for StreamVec<G, (D, G::Timestamp, mz_repr::Diff)>
52where
53    G: Scope<Timestamp: ExchangeData + MzData + BucketTimestamp + TotalOrder + Lattice>,
54    D: ExchangeData + MzData + Ord + Clone + std::fmt::Debug + Hashable,
55{
56    fn bucket<CB>(
57        self,
58        as_of: Antichain<G::Timestamp>,
59        threshold: <G::Timestamp as Timestamp>::Summary,
60    ) -> Stream<G, CB::Container>
61    where
62        CB: ContainerBuilder + PushInto<(D, G::Timestamp, mz_repr::Diff)>,
63    {
64        let scope = self.scope();
65        let logger = scope.logger_for("differential/arrange").map(Into::into);
66
67        let pact = Exchange::new(|(d, _, _): &(D, G::Timestamp, mz_repr::Diff)| d.hashed().into());
68        self.unary_frontier::<CB, _, _, _>(pact, "Temporal delay", |cap, info| {
69            let mut chain = BucketChain::new(MergeBatcherWrapper::new(logger, info.global_id));
70            let activator = scope.activator_for(info.address);
71
72            // Cap tracking the lower bound of potentially outstanding data.
73            let mut cap = Some(cap);
74
75            // Buffer for data to be inserted into the chain.
76            let mut buffer = Vec::new();
77
78            move |(input, frontier), output| {
79                // The upper frontier is the join of the input frontier and the `as_of` frontier,
80                // with the `threshold` summary applied to it.
81                let mut upper = Antichain::new();
82                for time1 in &frontier.frontier() {
83                    for time2 in as_of.elements() {
84                        // TODO: Use `join_assign` if we ever use a timestamp with allocations.
85                        if let Some(time) = threshold.results_in(&time1.join(time2)) {
86                            upper.insert(time);
87                        }
88                    }
89                }
90
91                input.for_each_time(|time, data| {
92                    let mut session = output.session_with_builder(&time);
93                    for data in data {
94                        // Skip data that is about to be revealed.
95                        let pass_through = data.extract_if(.., |(_, t, _)| !upper.less_equal(t));
96                        session.give_iterator(pass_through);
97
98                        // Sort data by time, then drain it into a buffer that contains data for a
99                        // single bucket. We scan the data for ranges of time that fall into the same
100                        // bucket so we can push batches of data at once.
101                        data.sort_unstable_by(|(_, t, _), (_, t2, _)| t.cmp(t2));
102
103                        let mut drain = data.drain(..);
104                        if let Some((datum, time, diff)) = drain.next() {
105                            let mut range = chain.range_of(&time).expect("Must exist");
106                            buffer.push((datum, time, diff));
107                            for (datum, time, diff) in drain {
108                                // If we have a range, check if the time is not within it.
109                                if !range.contains(&time) {
110                                    // If the time is outside the range, push the current buffer
111                                    // to the chain and reset the range.
112                                    if !buffer.is_empty() {
113                                        let bucket =
114                                            chain.find_mut(&range.start).expect("Must exist");
115                                        bucket.inner.push_container(&mut buffer);
116                                        buffer.clear();
117                                    }
118                                    range = chain.range_of(&time).expect("Must exist");
119                                }
120                                buffer.push((datum, time, diff));
121                            }
122
123                            // Handle leftover data in the buffer.
124                            if !buffer.is_empty() {
125                                let bucket = chain.find_mut(&range.start).expect("Must exist");
126                                bucket.inner.push_container(&mut buffer);
127                                buffer.clear();
128                            }
129                        }
130                    }
131                });
132
133                // Check for data that is ready to be revealed.
134                let peeled = chain.peel(upper.borrow());
135                if let Some(cap) = cap.as_ref() {
136                    let mut session = output.session_with_builder(cap);
137                    for stack in peeled.into_iter().flat_map(|x| x.done()) {
138                        // TODO: If we have a columnar merge batcher, cloning won't be necessary.
139                        session.give_iterator(stack.iter().cloned());
140                    }
141                } else {
142                    // If we don't have a cap, we should not have any data to reveal.
143                    assert_eq!(
144                        peeled.into_iter().flat_map(|x| x.done()).next(),
145                        None,
146                        "Unexpected data revealed without a cap."
147                    );
148                }
149
150                // Downgrade the cap to the current input frontier.
151                if frontier.is_empty() || upper.is_empty() {
152                    cap = None;
153                } else if let Some(cap) = cap.as_mut() {
154                    // TODO: This assumes that the time is total ordered.
155                    cap.downgrade(&upper[0]);
156                }
157
158                // Maintain the bucket chain by restoring it with fuel.
159                let mut fuel = 1_000_000;
160                chain.restore(&mut fuel);
161                if fuel <= 0 {
162                    // If we run out of fuel, we activate the operator to continue processing.
163                    activator.activate();
164                }
165            }
166        })
167    }
168}
169
170/// A wrapper around `MergeBatcher` that implements the `Storage` trait for bucketing.
171struct MergeBatcherWrapper<D, T, R>
172where
173    D: MzData + Ord + Clone,
174    T: MzData + Ord + PartialOrder + Clone,
175    R: MzData + Semigroup + Default,
176{
177    logger: Option<differential_dataflow::logging::Logger>,
178    operator_id: usize,
179    inner: MergeBatcher<Vec<(D, T, R)>, ColumnationChunker<(D, T, R)>, ColInternalMerger<D, T, R>>,
180}
181
182impl<D, T, R> MergeBatcherWrapper<D, T, R>
183where
184    D: MzData + Ord + Clone,
185    T: MzData + Ord + PartialOrder + Clone + Timestamp,
186    R: MzData + Semigroup + Default,
187{
188    /// Construct a new `MergeBatcherWrapper` with the given logger and operator ID.
189    fn new(logger: Option<differential_dataflow::logging::Logger>, operator_id: usize) -> Self {
190        Self {
191            logger: logger.clone(),
192            operator_id,
193            inner: MergeBatcher::new(logger, operator_id),
194        }
195    }
196
197    /// Reveal the contents of the `MergeBatcher`, returning a vector of `TimelyStack`s.
198    fn done(mut self) -> Vec<TimelyStack<(D, T, R)>> {
199        self.inner.seal::<CapturingBuilder<_, _>>(Antichain::new())
200    }
201}
202
203impl<D, T, R> Bucket for MergeBatcherWrapper<D, T, R>
204where
205    D: MzData + Ord + Clone + 'static,
206    T: MzData + Ord + PartialOrder + Clone + 'static + BucketTimestamp,
207    R: MzData + Semigroup + Default,
208{
209    type Timestamp = T;
210
211    fn split(mut self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) {
212        // The implementation isn't tuned for performance. We should not bounce in and out of
213        // different containers when not needed. The merge batcher we use can only accept
214        // vectors as inputs, but not any other container type.
215        // TODO: Allow the merge batcher to accept more generic containers.
216        let upper = Antichain::from_elem(timestamp.clone());
217        let mut lower = Self::new(self.logger.clone(), self.operator_id);
218        let mut buffer = Vec::new();
219        for chunk in self.inner.seal::<CapturingBuilder<_, _>>(upper) {
220            *fuel = fuel.saturating_sub(chunk.len().try_into().expect("must fit"));
221            // TODO: Avoid this cloning.
222            buffer.extend(chunk.into_iter().cloned());
223            lower.inner.push_container(&mut buffer);
224            buffer.clear();
225        }
226        (lower, self)
227    }
228}
229
230struct CapturingBuilder<D, T>(D, PhantomData<T>);
231
232impl<D, T: Timestamp> Builder for CapturingBuilder<D, T> {
233    type Input = D;
234    type Time = T;
235    type Output = Vec<D>;
236
237    fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
238        // Not needed for this implementation.
239        unimplemented!()
240    }
241
242    fn push(&mut self, _chunk: &mut Self::Input) {
243        // Not needed for this implementation.
244        unimplemented!()
245    }
246
247    fn done(self, _description: Description<Self::Time>) -> Self::Output {
248        // Not needed for this implementation.
249        unimplemented!()
250    }
251
252    #[inline]
253    fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
254        std::mem::take(chain)
255    }
256}