mz_compute/extensions/
temporal_bucket.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities and stream extensions for temporal bucketing.
11
12use std::marker::PhantomData;
13
14use differential_dataflow::containers::TimelyStack;
15use differential_dataflow::difference::Semigroup;
16use differential_dataflow::lattice::Lattice;
17use differential_dataflow::trace::implementations::chunker::ColumnationChunker;
18use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher};
19use differential_dataflow::trace::{Batcher, Builder, Description};
20use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp};
21use timely::container::{ContainerBuilder, PushInto};
22use timely::dataflow::channels::pact::Pipeline;
23use timely::dataflow::operators::Operator;
24use timely::dataflow::{Scope, StreamCore};
25use timely::order::TotalOrder;
26use timely::progress::{Antichain, PathSummary, Timestamp};
27use timely::{Data, PartialOrder};
28
29use crate::typedefs::MzData;
30
31/// Sort outstanding updates into a [`BucketChain`], and reveal data not in advance of the input
32/// frontier. Retains a capability at the last input frontier to retain the right to produce data
33/// at times between the last input frontier and the current input frontier.
34pub trait TemporalBucketing<G: Scope, O> {
35    /// Construct a new stream that stores updates into a [`BucketChain`] and reveals data
36    /// not in advance of the frontier. Data that is within `threshold` distance of the input
37    /// frontier or the `as_of` is passed through without being stored in the chain.
38    fn bucket<CB>(
39        &self,
40        as_of: Antichain<G::Timestamp>,
41        threshold: <G::Timestamp as Timestamp>::Summary,
42    ) -> StreamCore<G, CB::Container>
43    where
44        CB: ContainerBuilder + PushInto<O>;
45}
46
47/// Implementation for streams in scopes where timestamps define a total order.
48impl<G, D> TemporalBucketing<G, (D, G::Timestamp, mz_repr::Diff)>
49    for StreamCore<G, Vec<(D, G::Timestamp, mz_repr::Diff)>>
50where
51    G: Scope,
52    G::Timestamp: Data + MzData + BucketTimestamp + TotalOrder + Lattice,
53    D: Data + MzData + Ord + std::fmt::Debug,
54{
55    fn bucket<CB>(
56        &self,
57        as_of: Antichain<G::Timestamp>,
58        threshold: <G::Timestamp as Timestamp>::Summary,
59    ) -> StreamCore<G, CB::Container>
60    where
61        CB: ContainerBuilder + PushInto<(D, G::Timestamp, mz_repr::Diff)>,
62    {
63        let scope = self.scope();
64        let logger = scope.logger_for("differential/arrange").map(Into::into);
65
66        self.unary_frontier::<CB, _, _, _>(Pipeline, "Temporal delay", |cap, info| {
67            let mut chain = BucketChain::new(MergeBatcherWrapper::new(logger, info.global_id));
68            let activator = scope.activator_for(info.address);
69
70            // Cap tracking the lower bound of potentially outstanding data.
71            let mut cap = Some(cap);
72
73            // Buffer for data to be inserted into the chain.
74            let mut buffer = Vec::new();
75
76            move |input, output| {
77                // The upper frontier is the join of the input frontier and the `as_of` frontier,
78                // with the `threshold` summary applied to it.
79                let mut upper = Antichain::new();
80                for time1 in &input.frontier().frontier() {
81                    for time2 in as_of.elements() {
82                        // TODO: Use `join_assign` if we ever use a timestamp with allocations.
83                        if let Some(time) = threshold.results_in(&time1.join(time2)) {
84                            upper.insert(time);
85                        }
86                    }
87                }
88
89                while let Some((time, data)) = input.next() {
90                    // Skip data that is about to be revealed.
91                    let pass_through = data.extract_if(.., |(_, t, _)| !upper.less_equal(t));
92                    output
93                        .session_with_builder(&time)
94                        .give_iterator(pass_through);
95
96                    // Sort data by time, then drain it into a buffer that contains data for a
97                    // single bucket. We scan the data for ranges of time that fall into the same
98                    // bucket so we can push batches of data at once.
99                    data.sort_unstable_by(|(_, t, _), (_, t2, _)| t.cmp(t2));
100
101                    let mut drain = data.drain(..);
102                    if let Some((datum, time, diff)) = drain.next() {
103                        let mut range = chain.range_of(&time).expect("Must exist");
104                        buffer.push((datum, time, diff));
105                        for (datum, time, diff) in drain {
106                            // If we have a range, check if the time is not within it.
107                            if !range.contains(&time) {
108                                // If the time is outside the range, push the current buffer
109                                // to the chain and reset the range.
110                                if !buffer.is_empty() {
111                                    let bucket = chain.find_mut(&range.start).expect("Must exist");
112                                    bucket.inner.push_container(&mut buffer);
113                                    buffer.clear();
114                                }
115                                range = chain.range_of(&time).expect("Must exist");
116                            }
117                            buffer.push((datum, time, diff));
118                        }
119
120                        // Handle leftover data in the buffer.
121                        if !buffer.is_empty() {
122                            let bucket = chain.find_mut(&range.start).expect("Must exist");
123                            bucket.inner.push_container(&mut buffer);
124                            buffer.clear();
125                        }
126                    }
127                }
128
129                // Check for data that is ready to be revealed.
130                let peeled = chain.peel(upper.borrow());
131                if let Some(cap) = cap.as_ref() {
132                    let mut session = output.session_with_builder(cap);
133                    for stack in peeled.into_iter().flat_map(|x| x.done()) {
134                        // TODO: If we have a columnar merge batcher, cloning won't be necessary.
135                        session.give_iterator(stack.iter().cloned());
136                    }
137                } else {
138                    // If we don't have a cap, we should not have any data to reveal.
139                    assert_eq!(
140                        peeled.into_iter().flat_map(|x| x.done()).next(),
141                        None,
142                        "Unexpected data revealed without a cap."
143                    );
144                }
145
146                // Downgrade the cap to the current input frontier.
147                if input.frontier().is_empty() || upper.is_empty() {
148                    cap = None;
149                } else if let Some(cap) = cap.as_mut() {
150                    // TODO: This assumes that the time is total ordered.
151                    cap.downgrade(&upper[0]);
152                }
153
154                // Maintain the bucket chain by restoring it with fuel.
155                let mut fuel = 1_000_000;
156                chain.restore(&mut fuel);
157                if fuel <= 0 {
158                    // If we run out of fuel, we activate the operator to continue processing.
159                    activator.activate();
160                }
161            }
162        })
163    }
164}
165
166/// A wrapper around `MergeBatcher` that implements the `Storage` trait for bucketing.
167struct MergeBatcherWrapper<D, T, R>
168where
169    D: MzData + Ord,
170    T: MzData + Ord + PartialOrder + Clone,
171    R: MzData + Semigroup + Default,
172{
173    logger: Option<differential_dataflow::logging::Logger>,
174    operator_id: usize,
175    inner: MergeBatcher<Vec<(D, T, R)>, ColumnationChunker<(D, T, R)>, ColMerger<D, T, R>>,
176}
177
178impl<D, T, R> MergeBatcherWrapper<D, T, R>
179where
180    D: MzData + Ord + Clone,
181    T: MzData + Ord + PartialOrder + Clone + Timestamp,
182    R: MzData + Semigroup + Default,
183{
184    /// Construct a new `MergeBatcherWrapper` with the given logger and operator ID.
185    fn new(logger: Option<differential_dataflow::logging::Logger>, operator_id: usize) -> Self {
186        Self {
187            logger: logger.clone(),
188            operator_id,
189            inner: MergeBatcher::new(logger, operator_id),
190        }
191    }
192
193    /// Reveal the contents of the `MergeBatcher`, returning a vector of `TimelyStack`s.
194    fn done(mut self) -> Vec<TimelyStack<(D, T, R)>> {
195        self.inner.seal::<CapturingBuilder<_, _>>(Antichain::new())
196    }
197}
198
199impl<D, T, R> Bucket for MergeBatcherWrapper<D, T, R>
200where
201    D: MzData + Ord + Data,
202    T: MzData + Ord + PartialOrder + Data + BucketTimestamp,
203    R: MzData + Semigroup + Default,
204{
205    type Timestamp = T;
206
207    fn split(mut self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) {
208        // The implementation isn't tuned for performance. We should not bounce in and out of
209        // different containers when not needed. The merge batcher we use can only accept
210        // vectors as inputs, but not any other container type.
211        // TODO: Allow the merge batcher to accept more generic containers.
212        let upper = Antichain::from_elem(timestamp.clone());
213        let mut lower = Self::new(self.logger.clone(), self.operator_id);
214        let mut buffer = Vec::new();
215        for chunk in self.inner.seal::<CapturingBuilder<_, _>>(upper) {
216            *fuel = fuel.saturating_sub(chunk.len().try_into().expect("must fit"));
217            // TODO: Avoid this cloning.
218            buffer.extend(chunk.into_iter().cloned());
219            lower.inner.push_container(&mut buffer);
220            buffer.clear();
221        }
222        (lower, self)
223    }
224}
225
226struct CapturingBuilder<D, T>(D, PhantomData<T>);
227
228impl<D, T: Timestamp> Builder for CapturingBuilder<D, T> {
229    type Input = D;
230    type Time = T;
231    type Output = Vec<D>;
232
233    fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
234        // Not needed for this implementation.
235        unimplemented!()
236    }
237
238    fn push(&mut self, _chunk: &mut Self::Input) {
239        // Not needed for this implementation.
240        unimplemented!()
241    }
242
243    fn done(self, _description: Description<Self::Time>) -> Self::Output {
244        // Not needed for this implementation.
245        unimplemented!()
246    }
247
248    #[inline]
249    fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
250        std::mem::take(chain)
251    }
252}