1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
// Copyright Materialize, Inc. and contributors. All rights reserved.
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::sync::Arc;
use derivative::Derivative;
use itertools::Itertools;
use mz_repr::TimestampManipulation;
use serde::Serialize;
use timely::progress::frontier::AntichainRef;
use timely::progress::{Antichain, Timestamp};
/// Compaction policies for collections maintained by `Controller`.
/// NOTE(benesch): this might want to live somewhere besides the storage crate,
/// because it is fundamental to both storage and compute.
#[derive(Clone, Derivative, Serialize)]
pub enum ReadPolicy<T> {
/// No-one has yet requested a `ReadPolicy` from us, which means that we can
/// still change the implied_capability/the collection since if we need
/// to.
NoPolicy { initial_since: Antichain<T> },
/// Maintain the collection as valid from this frontier onward.
/// Maintain the collection as valid from a function of the write frontier.
/// This function will only be re-evaluated when the write frontier changes.
/// If the intended behavior is to change in response to external signals,
/// consider using the `ValidFrom` variant to manually pilot compaction.
/// The `Arc` makes the function cloneable.
#[derivative(Debug = "ignore")]
Arc<dyn Fn(AntichainRef<T>) -> Antichain<T> + Send + Sync>,
/// Allows one to express multiple read policies, taking the least of
/// the resulting frontiers.
impl<T> ReadPolicy<T>
T: Timestamp + TimestampManipulation,
/// Creates a read policy that lags the write frontier "by one".
pub fn step_back() -> Self {
Self::LagWriteFrontier(Arc::new(move |upper| {
if upper.is_empty() {
} else {
let stepped_back = upper
.map(|time| {
if time == T::minimum() {
} else {
impl ReadPolicy<mz_repr::Timestamp> {
/// Creates a read policy that lags the write frontier by the indicated amount, rounded down to (at most) the specified value.
/// The rounding down is done to reduce the number of changes the capability undergoes.
pub fn lag_writes_by(lag: mz_repr::Timestamp, max_granularity: mz_repr::Timestamp) -> Self {
Self::LagWriteFrontier(Arc::new(move |upper| {
if upper.is_empty() {
} else {
// Subtract the lag from the time, and then round down to a multiple of `granularity` to cut chatter.
let mut time = upper[0];
if lag != mz_repr::Timestamp::default() {
time = time.saturating_sub(lag);
// It makes little sense to refuse to compact if the user genuinely
// sets a smaller compaction window than the default, so honor it here.
let granularity = std::cmp::min(lag, max_granularity);
time = time.saturating_sub(time % granularity);
impl<T: Timestamp> ReadPolicy<T> {
pub fn frontier(&self, write_frontier: AntichainRef<T>) -> Antichain<T> {
match self {
ReadPolicy::NoPolicy { initial_since } => initial_since.clone(),
ReadPolicy::ValidFrom(frontier) => frontier.clone(),
ReadPolicy::LagWriteFrontier(logic) => logic(write_frontier),
ReadPolicy::Multiple(policies) => {
let mut frontier = Antichain::new();
for policy in policies.iter() {
for time in policy.frontier(write_frontier).iter() {