mz_compute/extensions/
temporal_bucket.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities and stream extensions for temporal bucketing.
11
12use std::marker::PhantomData;
13
14use differential_dataflow::Hashable;
15use differential_dataflow::containers::TimelyStack;
16use differential_dataflow::difference::Semigroup;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::implementations::chunker::ColumnationChunker;
19use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher};
20use differential_dataflow::trace::{Batcher, Builder, Description};
21use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp};
22use timely::container::{ContainerBuilder, PushInto};
23use timely::dataflow::channels::pact::Exchange;
24use timely::dataflow::operators::Operator;
25use timely::dataflow::{Scope, StreamCore};
26use timely::order::TotalOrder;
27use timely::progress::{Antichain, PathSummary, Timestamp};
28use timely::{Data, ExchangeData, PartialOrder};
29
30use crate::typedefs::MzData;
31
32/// Sort outstanding updates into a [`BucketChain`], and reveal data not in advance of the input
33/// frontier. Retains a capability at the last input frontier to retain the right to produce data
34/// at times between the last input frontier and the current input frontier.
35pub trait TemporalBucketing<G: Scope, O> {
36    /// Construct a new stream that stores updates into a [`BucketChain`] and reveals data
37    /// not in advance of the frontier. Data that is within `threshold` distance of the input
38    /// frontier or the `as_of` is passed through without being stored in the chain.
39    fn bucket<CB>(
40        &self,
41        as_of: Antichain<G::Timestamp>,
42        threshold: <G::Timestamp as Timestamp>::Summary,
43    ) -> StreamCore<G, CB::Container>
44    where
45        CB: ContainerBuilder + PushInto<O>;
46}
47
48/// Implementation for streams in scopes where timestamps define a total order.
49impl<G, D> TemporalBucketing<G, (D, G::Timestamp, mz_repr::Diff)>
50    for StreamCore<G, Vec<(D, G::Timestamp, mz_repr::Diff)>>
51where
52    G: Scope<Timestamp: ExchangeData + MzData + BucketTimestamp + TotalOrder + Lattice>,
53    D: ExchangeData + MzData + Ord + std::fmt::Debug + Hashable,
54{
55    fn bucket<CB>(
56        &self,
57        as_of: Antichain<G::Timestamp>,
58        threshold: <G::Timestamp as Timestamp>::Summary,
59    ) -> StreamCore<G, CB::Container>
60    where
61        CB: ContainerBuilder + PushInto<(D, G::Timestamp, mz_repr::Diff)>,
62    {
63        let scope = self.scope();
64        let logger = scope.logger_for("differential/arrange").map(Into::into);
65
66        let pact = Exchange::new(|(d, _, _): &(D, G::Timestamp, mz_repr::Diff)| d.hashed().into());
67        self.unary_frontier::<CB, _, _, _>(pact, "Temporal delay", |cap, info| {
68            let mut chain = BucketChain::new(MergeBatcherWrapper::new(logger, info.global_id));
69            let activator = scope.activator_for(info.address);
70
71            // Cap tracking the lower bound of potentially outstanding data.
72            let mut cap = Some(cap);
73
74            // Buffer for data to be inserted into the chain.
75            let mut buffer = Vec::new();
76
77            move |input, output| {
78                // The upper frontier is the join of the input frontier and the `as_of` frontier,
79                // with the `threshold` summary applied to it.
80                let mut upper = Antichain::new();
81                for time1 in &input.frontier().frontier() {
82                    for time2 in as_of.elements() {
83                        // TODO: Use `join_assign` if we ever use a timestamp with allocations.
84                        if let Some(time) = threshold.results_in(&time1.join(time2)) {
85                            upper.insert(time);
86                        }
87                    }
88                }
89
90                while let Some((time, data)) = input.next() {
91                    // Skip data that is about to be revealed.
92                    let pass_through = data.extract_if(.., |(_, t, _)| !upper.less_equal(t));
93                    output
94                        .session_with_builder(&time)
95                        .give_iterator(pass_through);
96
97                    // Sort data by time, then drain it into a buffer that contains data for a
98                    // single bucket. We scan the data for ranges of time that fall into the same
99                    // bucket so we can push batches of data at once.
100                    data.sort_unstable_by(|(_, t, _), (_, t2, _)| t.cmp(t2));
101
102                    let mut drain = data.drain(..);
103                    if let Some((datum, time, diff)) = drain.next() {
104                        let mut range = chain.range_of(&time).expect("Must exist");
105                        buffer.push((datum, time, diff));
106                        for (datum, time, diff) in drain {
107                            // If we have a range, check if the time is not within it.
108                            if !range.contains(&time) {
109                                // If the time is outside the range, push the current buffer
110                                // to the chain and reset the range.
111                                if !buffer.is_empty() {
112                                    let bucket = chain.find_mut(&range.start).expect("Must exist");
113                                    bucket.inner.push_container(&mut buffer);
114                                    buffer.clear();
115                                }
116                                range = chain.range_of(&time).expect("Must exist");
117                            }
118                            buffer.push((datum, time, diff));
119                        }
120
121                        // Handle leftover data in the buffer.
122                        if !buffer.is_empty() {
123                            let bucket = chain.find_mut(&range.start).expect("Must exist");
124                            bucket.inner.push_container(&mut buffer);
125                            buffer.clear();
126                        }
127                    }
128                }
129
130                // Check for data that is ready to be revealed.
131                let peeled = chain.peel(upper.borrow());
132                if let Some(cap) = cap.as_ref() {
133                    let mut session = output.session_with_builder(cap);
134                    for stack in peeled.into_iter().flat_map(|x| x.done()) {
135                        // TODO: If we have a columnar merge batcher, cloning won't be necessary.
136                        session.give_iterator(stack.iter().cloned());
137                    }
138                } else {
139                    // If we don't have a cap, we should not have any data to reveal.
140                    assert_eq!(
141                        peeled.into_iter().flat_map(|x| x.done()).next(),
142                        None,
143                        "Unexpected data revealed without a cap."
144                    );
145                }
146
147                // Downgrade the cap to the current input frontier.
148                if input.frontier().is_empty() || upper.is_empty() {
149                    cap = None;
150                } else if let Some(cap) = cap.as_mut() {
151                    // TODO: This assumes that the time is total ordered.
152                    cap.downgrade(&upper[0]);
153                }
154
155                // Maintain the bucket chain by restoring it with fuel.
156                let mut fuel = 1_000_000;
157                chain.restore(&mut fuel);
158                if fuel <= 0 {
159                    // If we run out of fuel, we activate the operator to continue processing.
160                    activator.activate();
161                }
162            }
163        })
164    }
165}
166
167/// A wrapper around `MergeBatcher` that implements the `Storage` trait for bucketing.
168struct MergeBatcherWrapper<D, T, R>
169where
170    D: MzData + Ord,
171    T: MzData + Ord + PartialOrder + Clone,
172    R: MzData + Semigroup + Default,
173{
174    logger: Option<differential_dataflow::logging::Logger>,
175    operator_id: usize,
176    inner: MergeBatcher<Vec<(D, T, R)>, ColumnationChunker<(D, T, R)>, ColMerger<D, T, R>>,
177}
178
179impl<D, T, R> MergeBatcherWrapper<D, T, R>
180where
181    D: MzData + Ord + Clone,
182    T: MzData + Ord + PartialOrder + Clone + Timestamp,
183    R: MzData + Semigroup + Default,
184{
185    /// Construct a new `MergeBatcherWrapper` with the given logger and operator ID.
186    fn new(logger: Option<differential_dataflow::logging::Logger>, operator_id: usize) -> Self {
187        Self {
188            logger: logger.clone(),
189            operator_id,
190            inner: MergeBatcher::new(logger, operator_id),
191        }
192    }
193
194    /// Reveal the contents of the `MergeBatcher`, returning a vector of `TimelyStack`s.
195    fn done(mut self) -> Vec<TimelyStack<(D, T, R)>> {
196        self.inner.seal::<CapturingBuilder<_, _>>(Antichain::new())
197    }
198}
199
200impl<D, T, R> Bucket for MergeBatcherWrapper<D, T, R>
201where
202    D: MzData + Ord + Data,
203    T: MzData + Ord + PartialOrder + Data + BucketTimestamp,
204    R: MzData + Semigroup + Default,
205{
206    type Timestamp = T;
207
208    fn split(mut self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) {
209        // The implementation isn't tuned for performance. We should not bounce in and out of
210        // different containers when not needed. The merge batcher we use can only accept
211        // vectors as inputs, but not any other container type.
212        // TODO: Allow the merge batcher to accept more generic containers.
213        let upper = Antichain::from_elem(timestamp.clone());
214        let mut lower = Self::new(self.logger.clone(), self.operator_id);
215        let mut buffer = Vec::new();
216        for chunk in self.inner.seal::<CapturingBuilder<_, _>>(upper) {
217            *fuel = fuel.saturating_sub(chunk.len().try_into().expect("must fit"));
218            // TODO: Avoid this cloning.
219            buffer.extend(chunk.into_iter().cloned());
220            lower.inner.push_container(&mut buffer);
221            buffer.clear();
222        }
223        (lower, self)
224    }
225}
226
227struct CapturingBuilder<D, T>(D, PhantomData<T>);
228
229impl<D, T: Timestamp> Builder for CapturingBuilder<D, T> {
230    type Input = D;
231    type Time = T;
232    type Output = Vec<D>;
233
234    fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
235        // Not needed for this implementation.
236        unimplemented!()
237    }
238
239    fn push(&mut self, _chunk: &mut Self::Input) {
240        // Not needed for this implementation.
241        unimplemented!()
242    }
243
244    fn done(self, _description: Description<Self::Time>) -> Self::Output {
245        // Not needed for this implementation.
246        unimplemented!()
247    }
248
249    #[inline]
250    fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
251        std::mem::take(chain)
252    }
253}