mz_adapter_types/
compaction.rs1use std::num::TryFromIntError;
11use std::time::Duration;
12
13use mz_repr::{Timestamp, TimestampManipulation};
14use mz_storage_types::read_policy::ReadPolicy;
15use serde::Serialize;
16use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
17
18const DEFAULT_LOGICAL_COMPACTION_WINDOW_MILLIS: u64 = 1000;
21
22pub const DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION: Duration =
23 Duration::from_millis(DEFAULT_LOGICAL_COMPACTION_WINDOW_MILLIS);
24
25const DEFAULT_LOGICAL_COMPACTION_WINDOW_TS: Timestamp =
27 Timestamp::new(DEFAULT_LOGICAL_COMPACTION_WINDOW_MILLIS);
28
29pub const SINCE_GRANULARITY: mz_repr::Timestamp = mz_repr::Timestamp::new(1000);
34
35#[derive(Clone, Default, Copy, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize)]
38pub enum CompactionWindow {
39 #[default]
41 Default,
42 DisableCompaction,
44 Duration(Timestamp),
46}
47
48impl CompactionWindow {
49 pub fn lag_from(&self, from: Timestamp) -> Timestamp {
50 let lag = match self {
51 CompactionWindow::Default => DEFAULT_LOGICAL_COMPACTION_WINDOW_TS,
52 CompactionWindow::DisableCompaction => return Timestamp::minimum(),
53 CompactionWindow::Duration(d) => *d,
54 };
55 from.saturating_sub(lag)
56 }
57
58 pub fn comparable_timestamp(&self) -> Timestamp {
60 match self {
61 CompactionWindow::Default => DEFAULT_LOGICAL_COMPACTION_WINDOW_TS,
62 CompactionWindow::DisableCompaction => Timestamp::maximum(),
63 CompactionWindow::Duration(d) => *d,
64 }
65 }
66}
67
68impl From<CompactionWindow> for ReadPolicy<Timestamp> {
69 fn from(value: CompactionWindow) -> Self {
70 let time = match value {
71 CompactionWindow::Default => DEFAULT_LOGICAL_COMPACTION_WINDOW_TS,
72 CompactionWindow::Duration(time) => time,
73 CompactionWindow::DisableCompaction => {
74 return ReadPolicy::ValidFrom(Antichain::from_elem(Timestamp::minimum()));
75 }
76 };
77 ReadPolicy::lag_writes_by(time, SINCE_GRANULARITY)
78 }
79}
80
81impl TryFrom<Duration> for CompactionWindow {
82 type Error = TryFromIntError;
83
84 fn try_from(value: Duration) -> Result<Self, Self::Error> {
85 Ok(Self::Duration(value.try_into()?))
86 }
87}