mz_storage_types/read_policy.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use derivative::Derivative;
13use itertools::Itertools;
14use mz_repr::TimestampManipulation;
15use serde::Serialize;
16use timely::progress::frontier::AntichainRef;
17use timely::progress::{Antichain, Timestamp};
18
19/// Compaction policies for collections maintained by `Controller`.
20///
21/// NOTE(benesch): this might want to live somewhere besides the storage crate,
22/// because it is fundamental to both storage and compute.
23#[derive(Clone, Derivative, Serialize)]
24#[derivative(Debug)]
25pub enum ReadPolicy<T> {
26 /// No-one has yet requested a `ReadPolicy` from us, which means that we can
27 /// still change the implied_capability/the collection since if we need
28 /// to.
29 NoPolicy { initial_since: Antichain<T> },
30 /// Maintain the collection as valid from this frontier onward.
31 ValidFrom(Antichain<T>),
32 /// Maintain the collection as valid from a function of the write frontier.
33 ///
34 /// This function will only be re-evaluated when the write frontier changes.
35 /// If the intended behavior is to change in response to external signals,
36 /// consider using the `ValidFrom` variant to manually pilot compaction.
37 ///
38 /// The `Arc` makes the function cloneable.
39 LagWriteFrontier(
40 #[derivative(Debug = "ignore")]
41 #[serde(skip)]
42 Arc<dyn Fn(AntichainRef<T>) -> Antichain<T> + Send + Sync>,
43 ),
44 /// Allows one to express multiple read policies, taking the least of
45 /// the resulting frontiers.
46 Multiple(Vec<ReadPolicy<T>>),
47}
48
49impl<T> ReadPolicy<T>
50where
51 T: Timestamp + TimestampManipulation,
52{
53 /// Creates a read policy that lags the write frontier "by one".
54 pub fn step_back() -> Self {
55 Self::LagWriteFrontier(Arc::new(move |upper| {
56 if upper.is_empty() {
57 Antichain::from_elem(Timestamp::minimum())
58 } else {
59 let stepped_back = upper
60 .to_owned()
61 .into_iter()
62 .map(|time| {
63 if time == T::minimum() {
64 time
65 } else {
66 time.step_back().unwrap()
67 }
68 })
69 .collect_vec();
70 stepped_back.into()
71 }
72 }))
73 }
74}
75
76impl ReadPolicy<mz_repr::Timestamp> {
77 /// Creates a read policy that lags the write frontier by the indicated amount, rounded down to (at most) the specified value.
78 /// The rounding down is done to reduce the number of changes the capability undergoes.
79 pub fn lag_writes_by(lag: mz_repr::Timestamp, max_granularity: mz_repr::Timestamp) -> Self {
80 Self::LagWriteFrontier(Arc::new(move |upper| {
81 if upper.is_empty() {
82 Antichain::from_elem(Timestamp::minimum())
83 } else {
84 // Subtract the lag from the time, and then round down to a multiple of `granularity` to cut chatter.
85 let mut time = upper[0];
86 if lag != mz_repr::Timestamp::default() {
87 time = time.saturating_sub(lag);
88 // It makes little sense to refuse to compact if the user genuinely
89 // sets a smaller compaction window than the default, so honor it here.
90 let granularity = std::cmp::min(lag, max_granularity);
91 time = time.saturating_sub(time % granularity);
92 }
93 Antichain::from_elem(time)
94 }
95 }))
96 }
97}
98
99impl<T: Timestamp> ReadPolicy<T> {
100 pub fn frontier(&self, write_frontier: AntichainRef<T>) -> Antichain<T> {
101 match self {
102 ReadPolicy::NoPolicy { initial_since } => initial_since.clone(),
103 ReadPolicy::ValidFrom(frontier) => frontier.clone(),
104 ReadPolicy::LagWriteFrontier(logic) => logic(write_frontier),
105 ReadPolicy::Multiple(policies) => {
106 let mut frontier = Antichain::new();
107 for policy in policies.iter() {
108 for time in policy.frontier(write_frontier).iter() {
109 frontier.insert(time.clone());
110 }
111 }
112 frontier
113 }
114 }
115 }
116}