mz_timely_util/
temporal.rs1use std::collections::BTreeMap;
19
20use mz_ore::cast::CastFrom;
21use timely::progress::Timestamp;
22use timely::progress::frontier::AntichainRef;
23
24pub trait BucketTimestamp: Timestamp {
28 const DOMAIN: usize = size_of::<Self>() * 8;
30 fn advance_by_power_of_two(&self, exponent: u32) -> Option<Self>;
33}
34
35pub trait Bucket: Sized {
37 type Timestamp: BucketTimestamp;
39 fn split(self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self);
43}
44
45pub struct BucketRange<T> {
47 pub start: T,
49 end: Option<T>,
50}
51
52impl<T: PartialOrd> BucketRange<T> {
53 pub fn contains(&self, time: &T) -> bool {
55 *time >= self.start && self.end.as_ref().is_none_or(|end| *time < *end)
56 }
57}
58
59#[derive(Debug)]
81pub struct BucketChain<S: Bucket> {
82 content: BTreeMap<S::Timestamp, (u32, S)>,
83}
84
85impl<S: Bucket> BucketChain<S> {
86 #[inline]
88 pub fn new(storage: S) -> Self {
89 let bits = S::Timestamp::DOMAIN.try_into().expect("Must fit");
90 Self {
91 content: BTreeMap::from([(Timestamp::minimum(), (bits, storage))]),
93 }
94 }
95
96 #[inline]
102 pub fn range_of(&self, timestamp: &S::Timestamp) -> Option<BucketRange<S::Timestamp>> {
103 let (time, (bits, _)) = self.content.range(..=timestamp).next_back()?;
104 Some(BucketRange {
105 start: time.clone(),
106 end: time.advance_by_power_of_two(*bits),
107 })
108 }
109
110 #[inline]
115 pub fn find(&self, timestamp: &S::Timestamp) -> Option<&S> {
116 self.content
117 .range(..=timestamp)
118 .next_back()
119 .map(|(_, (_, storage))| storage)
120 }
121
122 #[inline]
127 pub fn find_mut(&mut self, timestamp: &S::Timestamp) -> Option<&mut S> {
128 self.content
129 .range_mut(..=timestamp)
130 .next_back()
131 .map(|(_, (_, storage))| storage)
132 }
133
134 #[inline]
137 pub fn peel(&mut self, frontier: AntichainRef<S::Timestamp>) -> Vec<S> {
138 let mut peeled = vec![];
139 while let Some(min_entry) = self.content.first_entry()
141 && !frontier.less_equal(min_entry.key())
142 {
143 let (offset, (bits, storage)) = self.content.pop_first().expect("must exist");
144 let upper = offset.advance_by_power_of_two(bits);
145
146 if upper.is_none() && !frontier.is_empty()
148 || upper.is_some() && frontier.less_than(&upper.unwrap())
149 {
150 self.split_and_insert(&mut 0, bits, offset, storage);
152 } else {
153 peeled.push(storage);
155 }
156 }
157 peeled
158 }
159
160 #[inline]
165 pub fn restore(&mut self, fuel: &mut i64) {
166 let mut last_bits = -2_isize;
168 let well_formed = self.content.values().all(|(bits, _)| {
169 let ok = isize::cast_from(*bits) <= last_bits + 2;
170 last_bits = isize::cast_from(*bits);
171 ok
172 });
173 if well_formed {
174 return;
175 }
176
177 let mut new = BTreeMap::default();
180 let mut last_bits = -2;
181 while *fuel > 0
182 && let Some((time, (bits, storage))) = self.content.pop_first()
183 {
184 if isize::cast_from(bits) <= last_bits + 2 {
186 new.insert(time, (bits, storage));
187 last_bits = isize::cast_from(bits);
188 } else {
189 self.split_and_insert(fuel, bits, time, storage);
191 }
192 }
193 new.append(&mut self.content);
195 self.content = new;
196 }
197
198 #[inline(always)]
200 pub fn is_empty(&self) -> bool {
201 self.content.is_empty()
202 }
203
204 #[inline(always)]
206 pub fn len(&self) -> usize {
207 self.content.len()
208 }
209
210 #[inline(always)]
215 fn split_and_insert(&mut self, fuel: &mut i64, bits: u32, offset: S::Timestamp, storage: S) {
216 let bits = bits - 1;
217 let midpoint = offset.advance_by_power_of_two(bits).expect("must exist");
218 let (bot, top) = storage.split(&midpoint, fuel);
219 self.content.insert(offset, (bits, bot));
220 self.content.insert(midpoint, (bits, top));
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227
228 impl BucketTimestamp for u8 {
229 fn advance_by_power_of_two(&self, bits: u32) -> Option<Self> {
230 self.checked_add(1_u8.checked_shl(bits)?)
231 }
232 }
233
234 impl BucketTimestamp for u64 {
235 fn advance_by_power_of_two(&self, bits: u32) -> Option<Self> {
236 self.checked_add(1_u64.checked_shl(bits)?)
237 }
238 }
239
240 struct TestStorage<T> {
241 inner: Vec<T>,
242 }
243
244 impl<T: std::fmt::Debug> std::fmt::Debug for TestStorage<T> {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 self.inner.fmt(f)
247 }
248 }
249
250 impl<T: BucketTimestamp> Bucket for TestStorage<T> {
251 type Timestamp = T;
252 fn split(self, timestamp: &T, fuel: &mut i64) -> (Self, Self) {
253 *fuel = fuel.saturating_sub(self.inner.len().try_into().expect("must fit"));
254 let (left, right) = self.inner.into_iter().partition(|d| *d < *timestamp);
255 (Self { inner: left }, Self { inner: right })
256 }
257 }
258
259 fn collect_and_sort<T: BucketTimestamp>(peeled: Vec<TestStorage<T>>) -> Vec<T> {
260 let mut collected: Vec<_> = peeled
261 .iter()
262 .flat_map(|b| b.inner.iter().cloned())
263 .collect();
264 collected.sort();
265 collected
266 }
267
268 #[mz_ore::test]
269 fn test_bucket_chain_empty_peel_all() {
270 let mut chain = BucketChain::new(TestStorage::<u8> { inner: vec![] });
271 let mut fuel = 1000;
272 chain.restore(&mut fuel);
273 assert!(fuel > 0);
274 let peeled = chain.peel(AntichainRef::new(&[]));
275 assert!(collect_and_sort(peeled).is_empty());
276 assert!(chain.is_empty());
277 }
278
279 #[mz_ore::test]
280 fn test_bucket_chain_u8() {
281 let mut chain = BucketChain::new(TestStorage::<u8> {
282 inner: (0..=255).collect(),
283 });
284 let mut fuel = -1;
285 while fuel <= 0 {
286 fuel = 100;
287 chain.restore(&mut fuel);
288 }
289 let peeled = chain.peel(AntichainRef::new(&[1]));
290 assert_eq!(peeled.len(), 1);
291 assert_eq!(peeled[0].inner[0], 0);
292 assert!(collect_and_sort(peeled).into_iter().eq(0..1));
293 let mut fuel = 1000;
294 chain.restore(&mut fuel);
295 assert!(fuel > 0);
296 let peeled = chain.peel(AntichainRef::new(&[63]));
297 let mut fuel = 1000;
298 chain.restore(&mut fuel);
299 assert!(fuel > 0);
300 assert!(collect_and_sort(peeled).into_iter().eq(1..63));
301 let peeled = chain.peel(AntichainRef::new(&[65]));
302 let mut fuel = 1000;
303 chain.restore(&mut fuel);
304 assert!(fuel > 0);
305 assert!(collect_and_sort(peeled).into_iter().eq(63..65));
306 let peeled = chain.peel(AntichainRef::new(&[]));
307 let mut fuel = 1000;
308 chain.restore(&mut fuel);
309 assert!(fuel > 0);
310 assert!(collect_and_sort(peeled).into_iter().eq(65..=255));
311 }
312
313 #[mz_ore::test]
315 #[cfg_attr(miri, ignore)] fn test_bucket_10m() {
317 let limit = 10_000_000;
318
319 let mut chain = BucketChain::new(TestStorage::<u64> { inner: Vec::new() });
320 let mut fuel = 1000;
321 chain.restore(&mut fuel);
322 assert!(fuel > 0);
323
324 let now = 1739276664_u64;
325
326 let peeled = chain.peel(AntichainRef::new(&[now]));
327 let mut fuel = 1000;
328 chain.restore(&mut fuel);
329 assert!(fuel > 0);
330 let peeled = collect_and_sort(peeled);
331 assert!(peeled.is_empty());
332
333 for i in now..now + limit {
334 chain.find_mut(&i).expect("must exist").inner.push(i);
335 }
336
337 let mut offset = now;
338 let step = 1000;
339 while offset < now + limit {
340 let peeled = chain.peel(AntichainRef::new(&[offset + step]));
341 assert!(
342 collect_and_sort(peeled)
343 .into_iter()
344 .eq(offset..offset + step)
345 );
346 offset += step;
347 let mut fuel = 1000;
348 chain.restore(&mut fuel);
349 }
350 }
351
352 #[mz_ore::test]
353 fn test_range_of() {
354 let chain = BucketChain::new(TestStorage::<u8> { inner: vec![] });
355 let range = chain.range_of(&0).unwrap();
356 assert!(range.contains(&0));
357 assert!(range.contains(&255));
358 }
359}