mz_timely_util/
temporal.rs1use std::collections::BTreeMap;
19
20use mz_ore::cast::CastFrom;
21use timely::progress::Timestamp;
22use timely::progress::frontier::AntichainRef;
23
24pub trait BucketTimestamp: Timestamp {
28 const DOMAIN: usize = size_of::<Self>() * 8;
30 fn advance_by_power_of_two(&self, exponent: u32) -> Option<Self>;
33}
34
35pub trait Bucket: Sized {
37 type Timestamp: BucketTimestamp;
39 fn split(self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self);
43}
44
45#[derive(Debug)]
67pub struct BucketChain<S: Bucket> {
68 content: BTreeMap<S::Timestamp, (u32, S)>,
69}
70
71impl<S: Bucket> BucketChain<S> {
72 #[inline]
74 pub fn new(storage: S) -> Self {
75 let bits = S::Timestamp::DOMAIN.try_into().expect("Must fit");
76 Self {
77 content: BTreeMap::from([(Timestamp::minimum(), (bits, storage))]),
79 }
80 }
81
82 #[inline]
89 pub fn range_of(&self, timestamp: &S::Timestamp) -> Option<std::ops::Range<S::Timestamp>> {
90 let (time, (bits, _)) = self.content.range(..=timestamp).next_back()?;
91 let top = time
92 .advance_by_power_of_two(bits.saturating_sub(1))
93 .expect("must exist");
94 Some(time.clone()..top)
95 }
96
97 #[inline]
102 pub fn find(&self, timestamp: &S::Timestamp) -> Option<&S> {
103 self.content
104 .range(..=timestamp)
105 .next_back()
106 .map(|(_, (_, storage))| storage)
107 }
108
109 #[inline]
114 pub fn find_mut(&mut self, timestamp: &S::Timestamp) -> Option<&mut S> {
115 self.content
116 .range_mut(..=timestamp)
117 .next_back()
118 .map(|(_, (_, storage))| storage)
119 }
120
121 #[inline]
124 pub fn peel(&mut self, frontier: AntichainRef<S::Timestamp>) -> Vec<S> {
125 let mut peeled = vec![];
126 while let Some(min_entry) = self.content.first_entry()
128 && !frontier.less_equal(min_entry.key())
129 {
130 let (offset, (bits, storage)) = self.content.pop_first().expect("must exist");
131 let upper = offset.advance_by_power_of_two(bits);
132
133 if upper.is_none() && !frontier.is_empty()
135 || upper.is_some() && frontier.less_than(&upper.unwrap())
136 {
137 self.split_and_insert(&mut 0, bits, offset, storage);
139 } else {
140 peeled.push(storage);
142 }
143 }
144 peeled
145 }
146
147 #[inline]
152 pub fn restore(&mut self, fuel: &mut i64) {
153 let mut new = BTreeMap::default();
156 let mut last_bits = -2;
157 while *fuel > 0
158 && let Some((time, (bits, storage))) = self.content.pop_first()
159 {
160 if isize::cast_from(bits) <= last_bits + 2 {
162 new.insert(time, (bits, storage));
163 last_bits = isize::cast_from(bits);
164 } else {
165 self.split_and_insert(fuel, bits, time, storage);
167 }
168 }
169 new.append(&mut self.content);
171 self.content = new;
172 }
173
174 #[inline(always)]
176 pub fn is_empty(&self) -> bool {
177 self.content.is_empty()
178 }
179
180 #[inline(always)]
182 pub fn len(&self) -> usize {
183 self.content.len()
184 }
185
186 #[inline(always)]
191 fn split_and_insert(&mut self, fuel: &mut i64, bits: u32, offset: S::Timestamp, storage: S) {
192 let bits = bits - 1;
193 let midpoint = offset.advance_by_power_of_two(bits).expect("must exist");
194 let (bot, top) = storage.split(&midpoint, fuel);
195 self.content.insert(offset, (bits, bot));
196 self.content.insert(midpoint, (bits, top));
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203
204 impl BucketTimestamp for u8 {
205 fn advance_by_power_of_two(&self, bits: u32) -> Option<Self> {
206 self.checked_add(1_u8.checked_shl(bits)?)
207 }
208 }
209
210 impl BucketTimestamp for u64 {
211 fn advance_by_power_of_two(&self, bits: u32) -> Option<Self> {
212 self.checked_add(1_u64.checked_shl(bits)?)
213 }
214 }
215
216 struct TestStorage<T> {
217 inner: Vec<T>,
218 }
219
220 impl<T: std::fmt::Debug> std::fmt::Debug for TestStorage<T> {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 self.inner.fmt(f)
223 }
224 }
225
226 impl<T: BucketTimestamp> Bucket for TestStorage<T> {
227 type Timestamp = T;
228 fn split(self, timestamp: &T, fuel: &mut i64) -> (Self, Self) {
229 *fuel = fuel.saturating_sub(self.inner.len().try_into().expect("must fit"));
230 let (left, right) = self.inner.into_iter().partition(|d| *d < *timestamp);
231 (Self { inner: left }, Self { inner: right })
232 }
233 }
234
235 fn collect_and_sort<T: BucketTimestamp>(peeled: Vec<TestStorage<T>>) -> Vec<T> {
236 let mut collected: Vec<_> = peeled
237 .iter()
238 .flat_map(|b| b.inner.iter().cloned())
239 .collect();
240 collected.sort();
241 collected
242 }
243
244 #[mz_ore::test]
245 fn test_bucket_chain_empty_peel_all() {
246 let mut chain = BucketChain::new(TestStorage::<u8> { inner: vec![] });
247 let mut fuel = 1000;
248 chain.restore(&mut fuel);
249 assert!(fuel > 0);
250 let peeled = chain.peel(AntichainRef::new(&[]));
251 assert!(collect_and_sort(peeled).is_empty());
252 assert!(chain.is_empty());
253 }
254
255 #[mz_ore::test]
256 fn test_bucket_chain_u8() {
257 let mut chain = BucketChain::new(TestStorage::<u8> {
258 inner: (0..=255).collect(),
259 });
260 let mut fuel = -1;
261 while fuel <= 0 {
262 fuel = 100;
263 chain.restore(&mut fuel);
264 }
265 let peeled = chain.peel(AntichainRef::new(&[1]));
266 assert_eq!(peeled.len(), 1);
267 assert_eq!(peeled[0].inner[0], 0);
268 assert!(collect_and_sort(peeled).into_iter().eq(0..1));
269 let mut fuel = 1000;
270 chain.restore(&mut fuel);
271 assert!(fuel > 0);
272 let peeled = chain.peel(AntichainRef::new(&[63]));
273 let mut fuel = 1000;
274 chain.restore(&mut fuel);
275 assert!(fuel > 0);
276 assert!(collect_and_sort(peeled).into_iter().eq(1..63));
277 let peeled = chain.peel(AntichainRef::new(&[65]));
278 let mut fuel = 1000;
279 chain.restore(&mut fuel);
280 assert!(fuel > 0);
281 assert!(collect_and_sort(peeled).into_iter().eq(63..65));
282 let peeled = chain.peel(AntichainRef::new(&[]));
283 let mut fuel = 1000;
284 chain.restore(&mut fuel);
285 assert!(fuel > 0);
286 assert!(collect_and_sort(peeled).into_iter().eq(65..=255));
287 }
288
289 #[mz_ore::test]
291 #[cfg_attr(miri, ignore)] fn test_bucket_10m() {
293 let limit = 10_000_000;
294
295 let mut chain = BucketChain::new(TestStorage::<u64> { inner: Vec::new() });
296 let mut fuel = 1000;
297 chain.restore(&mut fuel);
298 assert!(fuel > 0);
299
300 let now = 1739276664_u64;
301
302 let peeled = chain.peel(AntichainRef::new(&[now]));
303 let mut fuel = 1000;
304 chain.restore(&mut fuel);
305 assert!(fuel > 0);
306 let peeled = collect_and_sort(peeled);
307 assert!(peeled.is_empty());
308
309 for i in now..now + limit {
310 chain.find_mut(&i).expect("must exist").inner.push(i);
311 }
312
313 let mut offset = now;
314 let step = 1000;
315 while offset < now + limit {
316 let peeled = chain.peel(AntichainRef::new(&[offset + step]));
317 assert!(
318 collect_and_sort(peeled)
319 .into_iter()
320 .eq(offset..offset + step)
321 );
322 offset += step;
323 let mut fuel = 1000;
324 chain.restore(&mut fuel);
325 }
326 }
327}