mz_storage_types/
read_holds.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::fmt::Debug;
11use std::sync::Arc;
12
13use mz_repr::GlobalId;
14use thiserror::Error;
15use timely::PartialOrder;
16use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
17use tokio::sync::mpsc::UnboundedSender;
18use tokio::sync::mpsc::error::SendError;
19
20pub type ChangeTx<T> = Arc<
21    dyn Fn(GlobalId, ChangeBatch<T>) -> Result<(), SendError<(GlobalId, ChangeBatch<T>)>>
22        + Send
23        + Sync,
24>;
25
26/// Token that represents a hold on a collection. This prevents the since of the
27/// collection from progressing beyond the hold. In other words, it cannot
28/// become true that our hold is `less_than` the since.
29///
30/// This [ReadHold] is safe to drop. The installed read hold will be returned to
31/// the issuer behind the scenes.
32pub struct ReadHold<T: TimelyTimestamp> {
33    /// Identifies that collection that we have a hold on.
34    id: GlobalId,
35
36    /// The times at which we hold.
37    since: Antichain<T>,
38
39    /// For communicating changes to this read hold back to whoever issued it.
40    change_tx: ChangeTx<T>,
41}
42
43impl<T: TimelyTimestamp> Debug for ReadHold<T> {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.debug_struct("ReadHold")
46            .field("id", &self.id)
47            .field("since", &self.since)
48            .finish_non_exhaustive()
49    }
50}
51
52/// Errors for manipulating read holds.
53#[derive(Error, Debug)]
54pub enum ReadHoldDowngradeError<T> {
55    /// The new frontier is not beyond the current since.
56    #[error("since violation: new frontier {frontier:?} is not beyond current since {since:?}")]
57    SinceViolation {
58        /// The frontier to downgrade to.
59        frontier: Antichain<T>,
60        /// The since of the collection.
61        since: Antichain<T>,
62    },
63}
64
65impl<T: TimelyTimestamp> ReadHold<T> {
66    pub fn new(id: GlobalId, since: Antichain<T>, change_tx: ChangeTx<T>) -> Self {
67        Self {
68            id,
69            since,
70            change_tx,
71        }
72    }
73
74    pub fn with_channel(
75        id: GlobalId,
76        since: Antichain<T>,
77        channel_tx: UnboundedSender<(GlobalId, ChangeBatch<T>)>,
78    ) -> Self {
79        let tx = Arc::new(move |id, changes| channel_tx.send((id, changes)));
80        Self::new(id, since, tx)
81    }
82
83    /// Returns the [GlobalId] of the collection that this [ReadHold] is for.
84    pub fn id(&self) -> GlobalId {
85        self.id
86    }
87
88    /// Returns the frontier at which this [ReadHold] is holding back the since
89    /// of the collection identified by `id`. This does not mean that the
90    /// overall since of the collection is what we report here. Only that it is
91    /// _at least_ held back to the reported frontier by this read hold.
92    pub fn since(&self) -> &Antichain<T> {
93        &self.since
94    }
95
96    /// Merges `other` into `self`, keeping the overall read hold.
97    ///
98    /// # Panics
99    ///
100    /// Panics when trying to merge a [ReadHold] for a different collection
101    /// (different [GlobalId]).
102    pub fn merge_assign(&mut self, mut other: ReadHold<T>) {
103        assert_eq!(
104            self.id, other.id,
105            "can only merge ReadHolds for the same ID"
106        );
107
108        let mut changes = ChangeBatch::new();
109        changes.extend(self.since.iter().map(|t| (t.clone(), -1)));
110        changes.extend(other.since.iter().map(|t| (t.clone(), -1)));
111
112        // It's very important that we clear the since of other. Otherwise, it's
113        // Drop impl would try and drop it again, by sending another ChangeBatch
114        // on drop.
115        let other_since = std::mem::take(&mut other.since);
116
117        self.since.extend(other_since);
118
119        // Record the new requirements, which we're guaranteed to be possible
120        // because we're only retracing the two merged sinces together with this
121        // in one go.
122        changes.extend(self.since.iter().map(|t| (t.clone(), 1)));
123
124        match (self.change_tx)(self.id, changes) {
125            Ok(_) => (),
126            Err(e) => {
127                panic!("cannot merge ReadHold: {}", e);
128            }
129        }
130    }
131
132    /// Downgrades `self` to the given `frontier`. Returns `Err` when the new
133    /// frontier is `less_than` the frontier at which this [ReadHold] is
134    /// holding.
135    pub fn try_downgrade(
136        &mut self,
137        frontier: Antichain<T>,
138    ) -> Result<(), ReadHoldDowngradeError<T>> {
139        if PartialOrder::less_than(&frontier, &self.since) {
140            return Err(ReadHoldDowngradeError::SinceViolation {
141                frontier,
142                since: self.since.clone(),
143            });
144        }
145
146        let mut changes = ChangeBatch::new();
147
148        changes.extend(self.since.iter().map(|t| (t.clone(), -1)));
149        changes.extend(frontier.iter().map(|t| (t.clone(), 1)));
150        self.since = frontier;
151
152        if !changes.is_empty() {
153            // If the other side already hung up, that's ok.
154            let _ = (self.change_tx)(self.id, changes);
155        }
156
157        Ok(())
158    }
159
160    /// Release this read hold.
161    pub fn release(&mut self) {
162        self.try_downgrade(Antichain::new())
163            .expect("known to succeed");
164    }
165}
166
167impl<T: TimelyTimestamp> Clone for ReadHold<T> {
168    fn clone(&self) -> Self {
169        if self.id.is_user() {
170            tracing::trace!("cloning ReadHold on {}: {:?}", self.id, self.since);
171        }
172
173        // Let the other end know.
174        let mut changes = ChangeBatch::new();
175
176        changes.extend(self.since.iter().map(|t| (t.clone(), 1)));
177
178        if !changes.is_empty() {
179            // We do care about sending here. If the other end hung up we don't
180            // really have a read hold anymore.
181            match (self.change_tx)(self.id.clone(), changes) {
182                Ok(_) => (),
183                Err(e) => {
184                    panic!("cannot clone ReadHold: {}", e);
185                }
186            }
187        }
188
189        Self {
190            id: self.id.clone(),
191            since: self.since.clone(),
192            change_tx: Arc::clone(&self.change_tx),
193        }
194    }
195}
196
197impl<T: TimelyTimestamp> Drop for ReadHold<T> {
198    fn drop(&mut self) {
199        if self.id.is_user() {
200            tracing::trace!("dropping ReadHold on {}: {:?}", self.id, self.since);
201        }
202
203        self.release();
204    }
205}
206
207#[derive(Error, Debug)]
208pub enum ReadHoldError {
209    #[error("collection does not exist: {0}")]
210    CollectionMissing(GlobalId),
211    #[error("desired read hold frontier is not beyond the since of collection: {0}")]
212    SinceViolation(GlobalId),
213}