mz_timely_util/
temporal.rs1use std::collections::BTreeMap;
19
20use mz_ore::cast::CastFrom;
21use timely::progress::Timestamp;
22use timely::progress::frontier::AntichainRef;
23
24pub trait BucketTimestamp: Timestamp {
28 const DOMAIN: usize = size_of::<Self>() * 8;
30 fn advance_by_power_of_two(&self, exponent: u32) -> Option<Self>;
33}
34
35pub trait Bucket: Sized {
37 type Timestamp: BucketTimestamp;
39 fn split(self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self);
43}
44
45pub struct BucketRange<T> {
47 pub start: T,
49 end: Option<T>,
50}
51
52impl<T: PartialOrd> BucketRange<T> {
53 pub fn contains(&self, time: &T) -> bool {
55 *time >= self.start && self.end.as_ref().is_none_or(|end| *time < *end)
56 }
57}
58
59#[derive(Debug)]
81pub struct BucketChain<S: Bucket> {
82 content: BTreeMap<S::Timestamp, (u32, S)>,
83}
84
85impl<S: Bucket> BucketChain<S> {
86 #[inline]
88 pub fn new(storage: S) -> Self {
89 let bits = S::Timestamp::DOMAIN.try_into().expect("Must fit");
90 Self {
91 content: BTreeMap::from([(Timestamp::minimum(), (bits, storage))]),
93 }
94 }
95
96 #[inline]
102 pub fn range_of(&self, timestamp: &S::Timestamp) -> Option<BucketRange<S::Timestamp>> {
103 let (time, (bits, _)) = self.content.range(..=timestamp).next_back()?;
104 Some(BucketRange {
105 start: time.clone(),
106 end: time.advance_by_power_of_two(*bits),
107 })
108 }
109
110 #[inline]
115 pub fn find(&self, timestamp: &S::Timestamp) -> Option<&S> {
116 self.content
117 .range(..=timestamp)
118 .next_back()
119 .map(|(_, (_, storage))| storage)
120 }
121
122 #[inline]
127 pub fn find_mut(&mut self, timestamp: &S::Timestamp) -> Option<&mut S> {
128 self.content
129 .range_mut(..=timestamp)
130 .next_back()
131 .map(|(_, (_, storage))| storage)
132 }
133
134 #[inline]
137 pub fn peel(&mut self, frontier: AntichainRef<S::Timestamp>) -> Vec<S> {
138 let mut peeled = vec![];
139 while let Some(min_entry) = self.content.first_entry()
141 && !frontier.less_equal(min_entry.key())
142 {
143 let (offset, (bits, storage)) = self.content.pop_first().expect("must exist");
144 let upper = offset.advance_by_power_of_two(bits);
145
146 if upper.is_none() && !frontier.is_empty()
148 || upper.is_some() && frontier.less_than(&upper.unwrap())
149 {
150 self.split_and_insert(&mut 0, bits, offset, storage);
152 } else {
153 peeled.push(storage);
155 }
156 }
157 peeled
158 }
159
160 #[inline]
165 pub fn restore(&mut self, fuel: &mut i64) {
166 let mut last_bits = -2_isize;
168 let well_formed = self.content.values().all(|(bits, _)| {
169 let ok = isize::cast_from(*bits) <= last_bits + 2;
170 last_bits = isize::cast_from(*bits);
171 ok
172 });
173 if well_formed {
174 return;
175 }
176
177 let mut new = BTreeMap::default();
180 let mut last_bits = -2;
181 while *fuel > 0
182 && let Some((time, (bits, storage))) = self.content.pop_first()
183 {
184 if isize::cast_from(bits) <= last_bits + 2 {
186 new.insert(time, (bits, storage));
187 last_bits = isize::cast_from(bits);
188 } else {
189 self.split_and_insert(fuel, bits, time, storage);
191 }
192 }
193 new.append(&mut self.content);
195 self.content = new;
196 }
197
198 #[inline(always)]
200 pub fn is_empty(&self) -> bool {
201 self.content.is_empty()
202 }
203
204 #[inline(always)]
206 pub fn len(&self) -> usize {
207 self.content.len()
208 }
209
210 #[inline]
212 pub fn buckets(&self) -> impl Iterator<Item = &S> {
213 self.content.values().map(|(_, storage)| storage)
214 }
215
216 #[inline]
218 pub fn buckets_mut(&mut self) -> impl Iterator<Item = &mut S> {
219 self.content.values_mut().map(|(_, storage)| storage)
220 }
221
222 #[inline(always)]
227 fn split_and_insert(&mut self, fuel: &mut i64, bits: u32, offset: S::Timestamp, storage: S) {
228 let bits = bits - 1;
229 let midpoint = offset.advance_by_power_of_two(bits).expect("must exist");
230 let (bot, top) = storage.split(&midpoint, fuel);
231 self.content.insert(offset, (bits, bot));
232 self.content.insert(midpoint, (bits, top));
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239
240 impl BucketTimestamp for u8 {
241 fn advance_by_power_of_two(&self, bits: u32) -> Option<Self> {
242 self.checked_add(1_u8.checked_shl(bits)?)
243 }
244 }
245
246 impl BucketTimestamp for u64 {
247 fn advance_by_power_of_two(&self, bits: u32) -> Option<Self> {
248 self.checked_add(1_u64.checked_shl(bits)?)
249 }
250 }
251
252 struct TestStorage<T> {
253 inner: Vec<T>,
254 }
255
256 impl<T: std::fmt::Debug> std::fmt::Debug for TestStorage<T> {
257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258 self.inner.fmt(f)
259 }
260 }
261
262 impl<T: BucketTimestamp> Bucket for TestStorage<T> {
263 type Timestamp = T;
264 fn split(self, timestamp: &T, fuel: &mut i64) -> (Self, Self) {
265 *fuel = fuel.saturating_sub(self.inner.len().try_into().expect("must fit"));
266 let (left, right) = self.inner.into_iter().partition(|d| *d < *timestamp);
267 (Self { inner: left }, Self { inner: right })
268 }
269 }
270
271 fn collect_and_sort<T: BucketTimestamp>(peeled: Vec<TestStorage<T>>) -> Vec<T> {
272 let mut collected: Vec<_> = peeled
273 .iter()
274 .flat_map(|b| b.inner.iter().cloned())
275 .collect();
276 collected.sort();
277 collected
278 }
279
280 #[mz_ore::test]
281 fn test_bucket_chain_empty_peel_all() {
282 let mut chain = BucketChain::new(TestStorage::<u8> { inner: vec![] });
283 let mut fuel = 1000;
284 chain.restore(&mut fuel);
285 assert!(fuel > 0);
286 let peeled = chain.peel(AntichainRef::new(&[]));
287 assert!(collect_and_sort(peeled).is_empty());
288 assert!(chain.is_empty());
289 }
290
291 #[mz_ore::test]
292 fn test_bucket_chain_u8() {
293 let mut chain = BucketChain::new(TestStorage::<u8> {
294 inner: (0..=255).collect(),
295 });
296 let mut fuel = -1;
297 while fuel <= 0 {
298 fuel = 100;
299 chain.restore(&mut fuel);
300 }
301 let peeled = chain.peel(AntichainRef::new(&[1]));
302 assert_eq!(peeled.len(), 1);
303 assert_eq!(peeled[0].inner[0], 0);
304 assert!(collect_and_sort(peeled).into_iter().eq(0..1));
305 let mut fuel = 1000;
306 chain.restore(&mut fuel);
307 assert!(fuel > 0);
308 let peeled = chain.peel(AntichainRef::new(&[63]));
309 let mut fuel = 1000;
310 chain.restore(&mut fuel);
311 assert!(fuel > 0);
312 assert!(collect_and_sort(peeled).into_iter().eq(1..63));
313 let peeled = chain.peel(AntichainRef::new(&[65]));
314 let mut fuel = 1000;
315 chain.restore(&mut fuel);
316 assert!(fuel > 0);
317 assert!(collect_and_sort(peeled).into_iter().eq(63..65));
318 let peeled = chain.peel(AntichainRef::new(&[]));
319 let mut fuel = 1000;
320 chain.restore(&mut fuel);
321 assert!(fuel > 0);
322 assert!(collect_and_sort(peeled).into_iter().eq(65..=255));
323 }
324
325 #[mz_ore::test]
327 #[cfg_attr(miri, ignore)] fn test_bucket_10m() {
329 let limit = 10_000_000;
330
331 let mut chain = BucketChain::new(TestStorage::<u64> { inner: Vec::new() });
332 let mut fuel = 1000;
333 chain.restore(&mut fuel);
334 assert!(fuel > 0);
335
336 let now = 1739276664_u64;
337
338 let peeled = chain.peel(AntichainRef::new(&[now]));
339 let mut fuel = 1000;
340 chain.restore(&mut fuel);
341 assert!(fuel > 0);
342 let peeled = collect_and_sort(peeled);
343 assert!(peeled.is_empty());
344
345 for i in now..now + limit {
346 chain.find_mut(&i).expect("must exist").inner.push(i);
347 }
348
349 let mut offset = now;
350 let step = 1000;
351 while offset < now + limit {
352 let peeled = chain.peel(AntichainRef::new(&[offset + step]));
353 assert!(
354 collect_and_sort(peeled)
355 .into_iter()
356 .eq(offset..offset + step)
357 );
358 offset += step;
359 let mut fuel = 1000;
360 chain.restore(&mut fuel);
361 }
362 }
363
364 #[mz_ore::test]
365 fn test_range_of() {
366 let chain = BucketChain::new(TestStorage::<u8> { inner: vec![] });
367 let range = chain.range_of(&0).unwrap();
368 assert!(range.contains(&0));
369 assert!(range.contains(&255));
370 }
371}