Skip to main content

mz_timely_util/
temporal.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Utilities to efficiently store future updates.
17
18use std::collections::BTreeMap;
19
20use mz_ore::cast::CastFrom;
21use timely::progress::Timestamp;
22use timely::progress::frontier::AntichainRef;
23
24/// Timestamp extension for timestamps that can advance by `2^exponent`.
25///
26/// Most likely, this is only relevant for totally ordered timestamps.
27pub trait BucketTimestamp: Timestamp {
28    /// The number of bits in the timestamp.
29    const DOMAIN: usize = size_of::<Self>() * 8;
30    /// Advance this timestamp by `2^exponent`. Returns `None` if the
31    /// timestamp would overflow.
32    fn advance_by_power_of_two(&self, exponent: u32) -> Option<Self>;
33}
34
35/// A type that can be split into two parts based on a timestamp.
36pub trait Bucket: Sized {
37    /// The timestamp type associated with this storage.
38    type Timestamp: BucketTimestamp;
39    /// Split self in two, based on the timestamp. The result is a pair of self, where the first
40    /// element contains all data with a timestamp strictly less than `timestamp`, and the second
41    /// all other data.
42    fn split(self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self);
43}
44
45/// A `[start, end)` range where `end` is `None` when the bucket covers the rest of the domain.
46pub struct BucketRange<T> {
47    /// The lower bound (inclusive).
48    pub start: T,
49    end: Option<T>,
50}
51
52impl<T: PartialOrd> BucketRange<T> {
53    /// Returns `true` if the range contains the given value.
54    pub fn contains(&self, time: &T) -> bool {
55        *time >= self.start && self.end.as_ref().is_none_or(|end| *time < *end)
56    }
57}
58
59/// A sorted list of buckets, representing data bucketed by timestamp.
60///
61/// Bucket chains support three main APIs: finding buckets for a given timestamp, peeling
62/// off buckets up to a frontier, and restoring the chain property. All operations aim to be
63/// amortized logarithmic in the number of outstanding timestamps.
64///
65/// We achieve this by storing buckets of increasing size for timestamps that are further out
66/// in the future. At the same time, in a well-formed chain, adjacent buckets span a time range at
67/// most factor 4 different. Factor 4 means that we can skip at most one bucket size for adjacent
68/// buckets, limiting the amount of work we need to do when peeling off buckets or restoring the
69/// chain property.
70///
71/// A bucket chain is well-formed if all buckets are within two bits of each other, with an imaginary
72/// bucket of -2 bits at the start. A chain does not need to be well-formed at all times, and supports
73/// peeling and finding even if not well-formed. However, `peel` might need to split more buckets to
74/// extract the desired data.
75///
76/// The `restore` method can be used to restore the chain property. It needs to be called repeatedly
77/// with a positive amount of fuel while the remaining fuel after the call is non-positive. This
78/// allows the caller to control the amount of work done in a single call and interleave it with
79/// other work.
80#[derive(Debug)]
81pub struct BucketChain<S: Bucket> {
82    content: BTreeMap<S::Timestamp, (u32, S)>,
83}
84
85impl<S: Bucket> BucketChain<S> {
86    /// Construct a new bucket chain. Spans the whole time domain.
87    #[inline]
88    pub fn new(storage: S) -> Self {
89        let bits = S::Timestamp::DOMAIN.try_into().expect("Must fit");
90        Self {
91            // The initial bucket starts at the minimum timestamp and spans the whole domain.
92            content: BTreeMap::from([(Timestamp::minimum(), (bits, storage))]),
93        }
94    }
95
96    /// Find the time range for the bucket that contains data for time `timestamp`.
97    /// Returns `None` if there is no bucket for the requested time.
98    /// Only times that haven't been peeled can still be found.
99    ///
100    /// The bounds are only valid until the next call to `peel` or `restore`.
101    #[inline]
102    pub fn range_of(&self, timestamp: &S::Timestamp) -> Option<BucketRange<S::Timestamp>> {
103        let (time, (bits, _)) = self.content.range(..=timestamp).next_back()?;
104        Some(BucketRange {
105            start: time.clone(),
106            end: time.advance_by_power_of_two(*bits),
107        })
108    }
109
110    /// Find the bucket that contains data for time `timestamp`. Returns a reference to the bucket,
111    /// or `None` if there is no bucket for the requested time.
112    ///
113    /// Only times that haven't been peeled can still be found.
114    #[inline]
115    pub fn find(&self, timestamp: &S::Timestamp) -> Option<&S> {
116        self.content
117            .range(..=timestamp)
118            .next_back()
119            .map(|(_, (_, storage))| storage)
120    }
121
122    /// Find the bucket that contains data for time `timestamp`. Returns a mutable reference to
123    /// the bucket, or `None` if there is no bucket for the requested time.
124    ///
125    /// Only times that haven't been peeled can still be found.
126    #[inline]
127    pub fn find_mut(&mut self, timestamp: &S::Timestamp) -> Option<&mut S> {
128        self.content
129            .range_mut(..=timestamp)
130            .next_back()
131            .map(|(_, (_, storage))| storage)
132    }
133
134    /// Peel off all data up to `frontier`, where the returned buckets contain all
135    /// data strictly less than the frontier.
136    #[inline]
137    pub fn peel(&mut self, frontier: AntichainRef<S::Timestamp>) -> Vec<S> {
138        let mut peeled = vec![];
139        // While there are buckets, and the frontier is not less than the lowest offset, peel off
140        while let Some(min_entry) = self.content.first_entry()
141            && !frontier.less_equal(min_entry.key())
142        {
143            let (offset, (bits, storage)) = self.content.pop_first().expect("must exist");
144            let upper = offset.advance_by_power_of_two(bits);
145
146            // Split the bucket if it spans the frontier.
147            if upper.is_none() && !frontier.is_empty()
148                || upper.is_some() && frontier.less_than(&upper.unwrap())
149            {
150                // We need to split the bucket, no matter how much fuel we have.
151                self.split_and_insert(&mut 0, bits, offset, storage);
152            } else {
153                // Bucket is ready to go.
154                peeled.push(storage);
155            }
156        }
157        peeled
158    }
159
160    /// Restore the chain property by splitting buckets as necessary.
161    ///
162    /// The chain is well-formed if all buckets are within two bits of each other, with an imaginary
163    /// bucket of -2 bits just before the smallest bucket.
164    #[inline]
165    pub fn restore(&mut self, fuel: &mut i64) {
166        // Fast path: the chain is already well-formed. Avoids rebuilding the map below.
167        let mut last_bits = -2_isize;
168        let well_formed = self.content.values().all(|(bits, _)| {
169            let ok = isize::cast_from(*bits) <= last_bits + 2;
170            last_bits = isize::cast_from(*bits);
171            ok
172        });
173        if well_formed {
174            return;
175        }
176
177        // We could write this in terms of a cursor API, but it's not stable yet. Instead, we
178        // allocate a new map and move elements over.
179        let mut new = BTreeMap::default();
180        let mut last_bits = -2;
181        while *fuel > 0
182            && let Some((time, (bits, storage))) = self.content.pop_first()
183        {
184            // Insert bucket if the size is correct.
185            if isize::cast_from(bits) <= last_bits + 2 {
186                new.insert(time, (bits, storage));
187                last_bits = isize::cast_from(bits);
188            } else {
189                // Otherwise, we need to split it.
190                self.split_and_insert(fuel, bits, time, storage);
191            }
192        }
193        // Move remaining elements if we ran out of fuel.
194        new.append(&mut self.content);
195        self.content = new;
196    }
197
198    /// Returns `true` if the chain is empty. This means there are no outstanding times left.
199    #[inline(always)]
200    pub fn is_empty(&self) -> bool {
201        self.content.is_empty()
202    }
203
204    /// The number of buckets in the chain.
205    #[inline(always)]
206    pub fn len(&self) -> usize {
207        self.content.len()
208    }
209
210    /// Split the bucket specified by `(bits, offset, storage)` and insert the new buckets.
211    /// Updates `fuel`.
212    ///
213    /// Panics if the bucket cannot be split, i.e, it covers 0 bits.
214    #[inline(always)]
215    fn split_and_insert(&mut self, fuel: &mut i64, bits: u32, offset: S::Timestamp, storage: S) {
216        let bits = bits - 1;
217        let midpoint = offset.advance_by_power_of_two(bits).expect("must exist");
218        let (bot, top) = storage.split(&midpoint, fuel);
219        self.content.insert(offset, (bits, bot));
220        self.content.insert(midpoint, (bits, top));
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227
228    impl BucketTimestamp for u8 {
229        fn advance_by_power_of_two(&self, bits: u32) -> Option<Self> {
230            self.checked_add(1_u8.checked_shl(bits)?)
231        }
232    }
233
234    impl BucketTimestamp for u64 {
235        fn advance_by_power_of_two(&self, bits: u32) -> Option<Self> {
236            self.checked_add(1_u64.checked_shl(bits)?)
237        }
238    }
239
240    struct TestStorage<T> {
241        inner: Vec<T>,
242    }
243
244    impl<T: std::fmt::Debug> std::fmt::Debug for TestStorage<T> {
245        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246            self.inner.fmt(f)
247        }
248    }
249
250    impl<T: BucketTimestamp> Bucket for TestStorage<T> {
251        type Timestamp = T;
252        fn split(self, timestamp: &T, fuel: &mut i64) -> (Self, Self) {
253            *fuel = fuel.saturating_sub(self.inner.len().try_into().expect("must fit"));
254            let (left, right) = self.inner.into_iter().partition(|d| *d < *timestamp);
255            (Self { inner: left }, Self { inner: right })
256        }
257    }
258
259    fn collect_and_sort<T: BucketTimestamp>(peeled: Vec<TestStorage<T>>) -> Vec<T> {
260        let mut collected: Vec<_> = peeled
261            .iter()
262            .flat_map(|b| b.inner.iter().cloned())
263            .collect();
264        collected.sort();
265        collected
266    }
267
268    #[mz_ore::test]
269    fn test_bucket_chain_empty_peel_all() {
270        let mut chain = BucketChain::new(TestStorage::<u8> { inner: vec![] });
271        let mut fuel = 1000;
272        chain.restore(&mut fuel);
273        assert!(fuel > 0);
274        let peeled = chain.peel(AntichainRef::new(&[]));
275        assert!(collect_and_sort(peeled).is_empty());
276        assert!(chain.is_empty());
277    }
278
279    #[mz_ore::test]
280    fn test_bucket_chain_u8() {
281        let mut chain = BucketChain::new(TestStorage::<u8> {
282            inner: (0..=255).collect(),
283        });
284        let mut fuel = -1;
285        while fuel <= 0 {
286            fuel = 100;
287            chain.restore(&mut fuel);
288        }
289        let peeled = chain.peel(AntichainRef::new(&[1]));
290        assert_eq!(peeled.len(), 1);
291        assert_eq!(peeled[0].inner[0], 0);
292        assert!(collect_and_sort(peeled).into_iter().eq(0..1));
293        let mut fuel = 1000;
294        chain.restore(&mut fuel);
295        assert!(fuel > 0);
296        let peeled = chain.peel(AntichainRef::new(&[63]));
297        let mut fuel = 1000;
298        chain.restore(&mut fuel);
299        assert!(fuel > 0);
300        assert!(collect_and_sort(peeled).into_iter().eq(1..63));
301        let peeled = chain.peel(AntichainRef::new(&[65]));
302        let mut fuel = 1000;
303        chain.restore(&mut fuel);
304        assert!(fuel > 0);
305        assert!(collect_and_sort(peeled).into_iter().eq(63..65));
306        let peeled = chain.peel(AntichainRef::new(&[]));
307        let mut fuel = 1000;
308        chain.restore(&mut fuel);
309        assert!(fuel > 0);
310        assert!(collect_and_sort(peeled).into_iter().eq(65..=255));
311    }
312
313    /// Test a chain with 10M disjoint elements.
314    #[mz_ore::test]
315    #[cfg_attr(miri, ignore)] // slow
316    fn test_bucket_10m() {
317        let limit = 10_000_000;
318
319        let mut chain = BucketChain::new(TestStorage::<u64> { inner: Vec::new() });
320        let mut fuel = 1000;
321        chain.restore(&mut fuel);
322        assert!(fuel > 0);
323
324        let now = 1739276664_u64;
325
326        let peeled = chain.peel(AntichainRef::new(&[now]));
327        let mut fuel = 1000;
328        chain.restore(&mut fuel);
329        assert!(fuel > 0);
330        let peeled = collect_and_sort(peeled);
331        assert!(peeled.is_empty());
332
333        for i in now..now + limit {
334            chain.find_mut(&i).expect("must exist").inner.push(i);
335        }
336
337        let mut offset = now;
338        let step = 1000;
339        while offset < now + limit {
340            let peeled = chain.peel(AntichainRef::new(&[offset + step]));
341            assert!(
342                collect_and_sort(peeled)
343                    .into_iter()
344                    .eq(offset..offset + step)
345            );
346            offset += step;
347            let mut fuel = 1000;
348            chain.restore(&mut fuel);
349        }
350    }
351
352    #[mz_ore::test]
353    fn test_range_of() {
354        let chain = BucketChain::new(TestStorage::<u8> { inner: vec![] });
355        let range = chain.range_of(&0).unwrap();
356        assert!(range.contains(&0));
357        assert!(range.contains(&255));
358    }
359}