mz_storage_types/
read_policy.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use derivative::Derivative;
13use itertools::Itertools;
14use mz_repr::TimestampManipulation;
15use serde::Serialize;
16use timely::progress::frontier::AntichainRef;
17use timely::progress::{Antichain, Timestamp};
18
19/// Compaction policies for collections maintained by `Controller`.
20///
21/// NOTE(benesch): this might want to live somewhere besides the storage crate,
22/// because it is fundamental to both storage and compute.
23#[derive(Clone, Derivative, Serialize)]
24#[derivative(Debug)]
25pub enum ReadPolicy<T> {
26    /// No-one has yet requested a `ReadPolicy` from us, which means that we can
27    /// still change the implied_capability/the collection since if we need
28    /// to.
29    NoPolicy { initial_since: Antichain<T> },
30    /// Maintain the collection as valid from this frontier onward.
31    ValidFrom(Antichain<T>),
32    /// Maintain the collection as valid from a function of the write frontier.
33    ///
34    /// This function will only be re-evaluated when the write frontier changes.
35    /// If the intended behavior is to change in response to external signals,
36    /// consider using the `ValidFrom` variant to manually pilot compaction.
37    ///
38    /// The `Arc` makes the function cloneable.
39    LagWriteFrontier(
40        #[derivative(Debug = "ignore")]
41        #[serde(skip)]
42        Arc<dyn Fn(AntichainRef<T>) -> Antichain<T> + Send + Sync>,
43    ),
44    /// Allows one to express multiple read policies, taking the least of
45    /// the resulting frontiers.
46    Multiple(Vec<ReadPolicy<T>>),
47}
48
49impl<T> ReadPolicy<T>
50where
51    T: Timestamp + TimestampManipulation,
52{
53    /// Creates a read policy that lags the write frontier "by one".
54    pub fn step_back() -> Self {
55        Self::LagWriteFrontier(Arc::new(move |upper| {
56            if upper.is_empty() {
57                Antichain::from_elem(Timestamp::minimum())
58            } else {
59                let stepped_back = upper
60                    .to_owned()
61                    .into_iter()
62                    .map(|time| {
63                        if time == T::minimum() {
64                            time
65                        } else {
66                            time.step_back().unwrap()
67                        }
68                    })
69                    .collect_vec();
70                stepped_back.into()
71            }
72        }))
73    }
74}
75
76impl ReadPolicy<mz_repr::Timestamp> {
77    /// Creates a read policy that lags the write frontier by the indicated amount, rounded down to (at most) the specified value.
78    /// The rounding down is done to reduce the number of changes the capability undergoes.
79    pub fn lag_writes_by(lag: mz_repr::Timestamp, max_granularity: mz_repr::Timestamp) -> Self {
80        Self::LagWriteFrontier(Arc::new(move |upper| {
81            if upper.is_empty() {
82                Antichain::from_elem(Timestamp::minimum())
83            } else {
84                // Subtract the lag from the time, and then round down to a multiple of `granularity` to cut chatter.
85                let mut time = upper[0];
86                if lag != mz_repr::Timestamp::default() {
87                    time = time.saturating_sub(lag);
88                    // It makes little sense to refuse to compact if the user genuinely
89                    // sets a smaller compaction window than the default, so honor it here.
90                    let granularity = std::cmp::min(lag, max_granularity);
91                    time = time.saturating_sub(time % granularity);
92                }
93                Antichain::from_elem(time)
94            }
95        }))
96    }
97}
98
99impl<T: Timestamp> ReadPolicy<T> {
100    pub fn frontier(&self, write_frontier: AntichainRef<T>) -> Antichain<T> {
101        match self {
102            ReadPolicy::NoPolicy { initial_since } => initial_since.clone(),
103            ReadPolicy::ValidFrom(frontier) => frontier.clone(),
104            ReadPolicy::LagWriteFrontier(logic) => logic(write_frontier),
105            ReadPolicy::Multiple(policies) => {
106                let mut frontier = Antichain::new();
107                for policy in policies.iter() {
108                    for time in policy.frontier(write_frontier).iter() {
109                        frontier.insert(time.clone());
110                    }
111                }
112                frontier
113            }
114        }
115    }
116}