1use std::cmp::Ordering;
19use std::fmt::{self, Debug};
20use std::hash::Hash;
21
22use differential_dataflow::containers::CopyRegion;
23use serde::{Deserialize, Serialize};
24use timely::order::Product;
25use timely::progress::Antichain;
26use timely::progress::timestamp::{PathSummary, Refines, Timestamp};
27use timely::{ExchangeData, PartialOrder};
28use uuid::Uuid;
29
30use mz_ore::cast::CastFrom;
31
32#[derive(
50 Debug,
51 Copy,
52 Clone,
53 PartialEq,
54 Eq,
55 PartialOrd,
56 Ord,
57 Hash,
58 Serialize,
59 Deserialize
60)]
61pub struct Partitioned<P, T>(Product<Interval<P>, T>);
62
63impl<P: Clone + PartialOrd, T> Partitioned<P, T> {
64 pub fn new_singleton(partition: P, timestamp: T) -> Self {
66 let interval = Interval {
67 lower: partition.clone(),
68 upper: partition,
69 };
70 Self(Product::new(interval, timestamp))
71 }
72
73 pub fn new_range(lower: P, upper: P, timestamp: T) -> Self {
75 assert!(lower <= upper, "invalid range bounds");
76 Self(Product::new(Interval { lower, upper }, timestamp))
77 }
78
79 pub fn interval(&self) -> &Interval<P> {
81 &self.0.outer
82 }
83
84 pub fn timestamp(&self) -> &T {
86 &self.0.inner
87 }
88
89 pub fn timestamp_mut(&mut self) -> &mut T {
90 &mut self.0.inner
91 }
92}
93
94impl<P: Clone + PartialOrd + Step, T: Clone> Partitioned<P, T> {
95 pub fn split(&self, point: &P) -> (Option<Self>, Option<Self>) {
98 let (before, after) = self.interval().split(point);
99 let mapper = |interval| Self(Product::new(interval, self.timestamp().clone()));
100 (before.map(mapper), after.map(mapper))
101 }
102}
103
104impl<P: fmt::Display, T: fmt::Display> fmt::Display for Partitioned<P, T> {
105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
106 f.write_str("(")?;
107 self.0.outer.fmt(f)?;
108 f.write_str(", ")?;
109 self.0.inner.fmt(f)?;
110 f.write_str(")")?;
111 Ok(())
112 }
113}
114
115impl<P, T: Timestamp> Timestamp for Partitioned<P, T>
116where
117 P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
118{
119 type Summary = ();
120 fn minimum() -> Self {
121 Self(Timestamp::minimum())
122 }
123}
124impl<P, T: Timestamp> Refines<()> for Partitioned<P, T>
125where
126 P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
127{
128 fn to_inner(_other: ()) -> Self {
129 Self::minimum()
130 }
131
132 fn to_outer(self) {}
133
134 fn summarize(_path: Self::Summary) {}
135}
136
137impl<P: Ord + Eq, T: PartialOrder> PartialOrder for Partitioned<P, T> {
138 #[inline]
139 fn less_equal(&self, other: &Self) -> bool {
140 self.0.less_equal(&other.0)
141 }
142}
143impl<P: Clone, T: Timestamp> PathSummary<Partitioned<P, T>> for () {
144 #[inline]
145 fn results_in(&self, src: &Partitioned<P, T>) -> Option<Partitioned<P, T>> {
146 Some(src.clone())
147 }
148
149 #[inline]
150 fn followed_by(&self, _other: &Self) -> Option<Self> {
151 Some(())
152 }
153}
154
155impl<P: Copy, T: Copy> columnation::Columnation for Partitioned<P, T> {
156 type InnerRegion = CopyRegion<Partitioned<P, T>>;
157}
158
159pub trait Extrema {
161 fn minimum() -> Self;
163 fn maximum() -> Self;
165}
166
167impl Extrema for u64 {
168 fn minimum() -> Self {
169 Self::MIN
170 }
171 fn maximum() -> Self {
172 Self::MAX
173 }
174}
175
176impl Extrema for i32 {
177 fn minimum() -> Self {
178 Self::MIN
179 }
180 fn maximum() -> Self {
181 Self::MAX
182 }
183}
184
185impl Extrema for Uuid {
186 fn minimum() -> Self {
187 Self::nil()
188 }
189 fn maximum() -> Self {
190 Self::max()
191 }
192}
193
194pub trait Step
196where
197 Self: Sized,
198{
199 fn backward_checked(&self, count: usize) -> Option<Self>;
201 fn forward_checked(&self, count: usize) -> Option<Self>;
203}
204
205impl Step for Uuid {
206 fn backward_checked(&self, count: usize) -> Option<Self> {
207 self.as_u128()
208 .checked_sub(u128::cast_from(count))
209 .map(Self::from_u128)
210 }
211 fn forward_checked(&self, count: usize) -> Option<Self> {
212 self.as_u128()
213 .checked_add(u128::cast_from(count))
214 .map(Self::from_u128)
215 }
216}
217
218#[derive(
219 Debug,
220 Copy,
221 Clone,
222 PartialEq,
223 Eq,
224 PartialOrd,
225 Ord,
226 Hash,
227 Serialize,
228 Deserialize
229)]
230pub struct Interval<P> {
232 pub lower: P,
233 pub upper: P,
234}
235
236impl<P: Eq> Interval<P> {
237 pub fn singleton(&self) -> Option<&P> {
239 if self.lower == self.upper {
240 Some(&self.lower)
241 } else {
242 None
243 }
244 }
245}
246
247impl<P: PartialOrd> Interval<P> {
248 pub fn contains(&self, other: &P) -> bool {
249 self.lower <= *other && *other <= self.upper
250 }
251}
252
253impl<P: Step + PartialOrd + Clone> Interval<P> {
254 pub fn split(&self, point: &P) -> (Option<Self>, Option<Self>) {
257 let before = match point.backward_checked(1) {
258 Some(bef) if self.lower <= bef => Some(Interval {
259 lower: self.lower.clone(),
260 upper: bef,
261 }),
262 _ => None,
263 };
264 let after = match point.forward_checked(1) {
265 Some(aft) if self.upper >= aft => Some(Interval {
266 lower: aft,
267 upper: self.upper.clone(),
268 }),
269 _ => None,
270 };
271 (before, after)
272 }
273}
274
275impl<P: Ord + Eq> PartialOrder for Interval<P> {
276 #[inline]
277 fn less_equal(&self, other: &Self) -> bool {
278 self.lower <= other.lower && other.upper <= self.upper
279 }
280}
281
282impl<P: Clone> PathSummary<Interval<P>> for () {
283 #[inline]
284 fn results_in(&self, src: &Interval<P>) -> Option<Interval<P>> {
285 Some(src.clone())
286 }
287
288 #[inline]
289 fn followed_by(&self, _other: &Self) -> Option<Self> {
290 Some(())
291 }
292}
293
294impl<P> Timestamp for Interval<P>
295where
296 P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
297{
298 type Summary = ();
299
300 #[inline]
301 fn minimum() -> Self {
302 Self {
303 lower: P::minimum(),
304 upper: P::maximum(),
305 }
306 }
307}
308
309impl<P: fmt::Display> fmt::Display for Interval<P> {
310 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311 f.write_str("[")?;
312 self.lower.fmt(f)?;
313 f.write_str(", ")?;
314 self.upper.fmt(f)?;
315 f.write_str("]")?;
316 Ok(())
317 }
318}
319
320#[derive(Debug, Clone, Eq, PartialEq)]
325pub struct Reverse<T>(pub T);
326
327impl<T: PartialOrder> PartialOrder for Reverse<T> {
328 #[inline]
329 fn less_equal(&self, other: &Self) -> bool {
330 PartialOrder::less_equal(&other.0, &self.0)
331 }
332}
333impl<T: PartialOrd> PartialOrd for Reverse<T> {
334 #[inline]
335 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
336 other.0.partial_cmp(&self.0)
337 }
338}
339
340impl<T: Ord> Ord for Reverse<T> {
341 #[inline]
342 fn cmp(&self, other: &Self) -> Ordering {
343 other.0.cmp(&self.0)
344 }
345}
346
347#[cfg(test)]
348mod test {
349 use timely::progress::Antichain;
350
351 use super::*;
352
353 #[mz_ore::test]
354 fn basic_properties() {
355 let minimum: Partitioned<u64, u64> = Partitioned::minimum();
356 assert_eq!(minimum, Partitioned::new_range(0, u64::MAX, 0));
357 assert!(PartialOrder::less_equal(&minimum, &minimum));
358 assert!(!PartialOrder::less_than(&minimum, &minimum));
359
360 let lower = Partitioned::new_range(0, 9, 0);
362 let partition10 = Partitioned::new_singleton(10, 0);
363 let upper = Partitioned::new_range(11, u64::MAX, 0);
364 assert!(!PartialOrder::less_equal(&lower, &partition10));
365 assert!(!PartialOrder::less_equal(&partition10, &lower));
366 assert!(!PartialOrder::less_equal(&lower, &upper));
367 assert!(!PartialOrder::less_equal(&upper, &lower));
368 assert!(!PartialOrder::less_equal(&partition10, &upper));
369 assert!(!PartialOrder::less_equal(&upper, &partition10));
370
371 let partition5 = Partitioned::new_singleton(5, 0);
372 assert!(PartialOrder::less_than(&lower, &partition5));
374 assert!(!PartialOrder::less_equal(&upper, &partition5));
376 assert!(!PartialOrder::less_equal(&partition5, &upper));
377
378 let sub_range = Partitioned::new_range(2, 4, 0);
379 assert!(PartialOrder::less_than(&lower, &sub_range));
381 assert!(!PartialOrder::less_equal(&upper, &sub_range));
383 assert!(!PartialOrder::less_equal(&sub_range, &upper));
384
385 assert!(PartialOrder::less_equal(&lower, &lower));
387 assert!(PartialOrder::less_equal(&partition5, &partition5));
388 assert!(PartialOrder::less_equal(&upper, &upper));
389 }
390
391 #[mz_ore::test]
392 fn antichain_properties() {
393 let mut frontier = Antichain::new();
394
395 frontier.extend([
397 Partitioned::new_range(0, 9, 5),
398 Partitioned::new_singleton(10, 5),
399 Partitioned::new_range(11, u64::MAX, 5),
400 ]);
401 assert_eq!(frontier.len(), 3);
402
403 frontier.insert(Partitioned::new_range(0, u64::MAX, 4));
405 assert_eq!(
406 frontier,
407 Antichain::from_elem(Partitioned::new_range(0, u64::MAX, 4))
408 );
409
410 let frontier = Antichain::from_iter([
412 Partitioned::new_range(0, 9, 5),
413 Partitioned::new_singleton(10, 10),
414 Partitioned::new_range(11, u64::MAX, 5),
415 ]);
416
417 assert!(frontier.less_than(&Partitioned::new_singleton(10, 11)));
419 assert!(frontier.less_than(&Partitioned::new_singleton(0, 6)));
421 assert!(!frontier.less_than(&Partitioned::new_singleton(0, 4)));
423 assert!(frontier.less_than(&Partitioned::new_range(3, 5, 6)));
425 assert!(!frontier.less_than(&Partitioned::new_range(3, 5, 4)));
427 }
428}
429
430pub fn refine_antichain<T: Timestamp, Inner: Timestamp + Refines<T>>(
434 frontier: &Antichain<T>,
435) -> Antichain<Inner> {
436 Antichain::from_iter(frontier.iter().map(|t| Refines::to_inner(t.clone())))
437}