mz_persist_client/
critical.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Since capabilities and handles
11
12use std::fmt::Debug;
13use std::future::Future;
14use std::time::Duration;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use mz_ore::instrument;
19use mz_ore::now::EpochMillis;
20use mz_persist_types::{Codec, Codec64, Opaque};
21use proptest_derive::Arbitrary;
22use serde::{Deserialize, Serialize};
23use timely::progress::{Antichain, Timestamp};
24use uuid::Uuid;
25
26use crate::error::InvalidUsage;
27use crate::internal::machine::Machine;
28use crate::internal::state::Since;
29use crate::stats::SnapshotStats;
30use crate::{GarbageCollector, ShardId, parse_id};
31
32/// An opaque identifier for a reader of a persist durable TVC (aka shard).
33#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
34#[serde(try_from = "String", into = "String")]
35pub struct CriticalReaderId(pub(crate) [u8; 16]);
36
37impl std::fmt::Display for CriticalReaderId {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "c{}", Uuid::from_bytes(self.0))
40    }
41}
42
43impl std::fmt::Debug for CriticalReaderId {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        write!(f, "CriticalReaderId({})", Uuid::from_bytes(self.0))
46    }
47}
48
49impl std::str::FromStr for CriticalReaderId {
50    type Err = String;
51
52    fn from_str(s: &str) -> Result<Self, Self::Err> {
53        parse_id("c", "CriticalReaderId", s).map(CriticalReaderId)
54    }
55}
56
57impl From<CriticalReaderId> for String {
58    fn from(reader_id: CriticalReaderId) -> Self {
59        reader_id.to_string()
60    }
61}
62
63impl TryFrom<String> for CriticalReaderId {
64    type Error = String;
65
66    fn try_from(s: String) -> Result<Self, Self::Error> {
67        s.parse()
68    }
69}
70
71impl CriticalReaderId {
72    /// Returns a random [CriticalReaderId] that is reasonably likely to have
73    /// never been generated before.
74    ///
75    /// This is intentionally public, unlike [crate::read::LeasedReaderId] and
76    /// [crate::write::WriterId], because [SinceHandle]s are expected to live
77    /// beyond process lifetimes.
78    pub fn new() -> Self {
79        CriticalReaderId(*Uuid::new_v4().as_bytes())
80    }
81}
82
83/// A "capability" granting the ability to hold back the `since` frontier of a
84/// shard.
85///
86/// In contrast to [crate::read::ReadHandle], which is time-leased, this handle
87/// and its associated capability are not leased.
88/// A SinceHandle does not release its capability when dropped.
89/// This is less ergonomic,
90/// but useful for "critical" since holds which must survive even lease timeouts.
91///
92/// **IMPORTANT**: The above means that if a SinceHandle is registered and then
93/// lost, the shard's since will be permanently "stuck", forever preventing
94/// logical compaction. Users are advised to durably record (preferably in code)
95/// the intended [CriticalReaderId] _before_ registering a SinceHandle (in case
96/// the process crashes at the wrong time).
97///
98/// All async methods on SinceHandle retry for as long as they are able, but the
99/// returned [std::future::Future]s implement "cancel on drop" semantics. This
100/// means that callers can add a timeout using [tokio::time::timeout] or
101/// [tokio::time::timeout_at].
102#[derive(Debug)]
103pub struct SinceHandle<K: Codec, V: Codec, T, D, O> {
104    pub(crate) machine: Machine<K, V, T, D>,
105    pub(crate) gc: GarbageCollector<K, V, T, D>,
106    pub(crate) reader_id: CriticalReaderId,
107
108    since: Antichain<T>,
109    opaque: O,
110    last_downgrade_since: EpochMillis,
111}
112
113impl<K, V, T, D, O> SinceHandle<K, V, T, D, O>
114where
115    K: Debug + Codec,
116    V: Debug + Codec,
117    T: Timestamp + Lattice + Codec64 + Sync,
118    D: Monoid + Codec64 + Send + Sync,
119    O: Opaque + Codec64,
120{
121    pub(crate) fn new(
122        machine: Machine<K, V, T, D>,
123        gc: GarbageCollector<K, V, T, D>,
124        reader_id: CriticalReaderId,
125        since: Antichain<T>,
126        opaque: O,
127    ) -> Self {
128        SinceHandle {
129            machine,
130            gc,
131            reader_id,
132            since,
133            opaque,
134            last_downgrade_since: EpochMillis::default(),
135        }
136    }
137
138    /// This handle's shard id.
139    pub fn shard_id(&self) -> ShardId {
140        self.machine.shard_id()
141    }
142
143    /// This handle's `since` capability.
144    ///
145    /// This will always be greater or equal to the shard-global `since`.
146    pub fn since(&self) -> &Antichain<T> {
147        &self.since
148    }
149
150    /// This handle's `opaque`.
151    pub fn opaque(&self) -> &O {
152        &self.opaque
153    }
154
155    /// Attempts to forward the since capability of this handle to `new_since` iff
156    /// the opaque value of this handle's [CriticalReaderId] is `expected`, and
157    /// [Self::maybe_compare_and_downgrade_since] chooses to perform the downgrade.
158    ///
159    /// Users are expected to call this function frequently, but should not expect
160    /// `since` to be downgraded with each call -- this function is free to no-op
161    /// requests to perform rate-limiting for downstream services. A `None` is returned
162    /// for no-op requests, and `Some` is returned when downgrading since.
163    ///
164    /// When returning `Some(since)`, `since` will be set to the most recent value
165    /// known for this critical reader ID, and is guaranteed to be `!less_than(new_since)`.
166    ///
167    /// Because SinceHandles are expected to live beyond process lifetimes, it's
168    /// possible for the same [CriticalReaderId] to be used concurrently from
169    /// multiple processes (either intentionally or something like a zombie
170    /// process). To discover this, [Self::maybe_compare_and_downgrade_since] has
171    /// "compare and set" semantics over an opaque value. If the `expected` opaque
172    /// value does not match state, an `Err` is returned and the caller must decide
173    /// how to handle it (likely a retry or a `halt!`).
174    ///
175    /// If desired, users may use the opaque value to fence out concurrent access
176    /// of other [SinceHandle]s for a given [CriticalReaderId]. e.g.:
177    ///
178    /// ```rust,no_run
179    /// use timely::progress::Antichain;
180    /// use mz_persist_client::critical::SinceHandle;
181    /// use mz_persist_types::Codec64;
182    ///
183    /// # async fn example() {
184    /// let fencing_token: u64 = unimplemented!();
185    /// let mut since: SinceHandle<String, String, u64, i64, u64> = unimplemented!();
186    ///
187    /// let new_since: Antichain<u64> = unimplemented!();
188    /// let res = since
189    ///     .maybe_compare_and_downgrade_since(
190    ///         &since.opaque().clone(),
191    ///         (&fencing_token, &new_since),
192    ///     )
193    ///     .await;
194    ///
195    /// match res {
196    ///     Some(Ok(_)) => {
197    ///         // we downgraded since!
198    ///     }
199    ///     Some(Err(actual_fencing_token)) => {
200    ///         // compare `fencing_token` and `actual_fencing_token`, etc
201    ///     }
202    ///     None => {
203    ///         // no problem, we'll try again later
204    ///     }
205    /// }
206    /// # }
207    /// ```
208    ///
209    /// If fencing is not required and it's acceptable to have concurrent [SinceHandle] for
210    /// a given [CriticalReaderId], the opaque value can be given a default value and ignored:
211    ///
212    /// ```rust,no_run
213    /// use timely::progress::Antichain;
214    /// use mz_persist_client::critical::SinceHandle;
215    /// use mz_persist_types::Codec64;
216    ///
217    /// # async fn example() {
218    /// let mut since: SinceHandle<String, String, u64, i64, u64> = unimplemented!();
219    /// let new_since: Antichain<u64> = unimplemented!();
220    /// let res = since
221    ///     .maybe_compare_and_downgrade_since(
222    ///         &since.opaque().clone(),
223    ///         (&since.opaque().clone(), &new_since),
224    ///     )
225    ///     .await;
226    ///
227    /// match res {
228    ///     Some(Ok(_)) => {
229    ///         // woohoo!
230    ///     }
231    ///     Some(Err(_actual_opaque)) => {
232    ///         panic!("the opaque value should never change from the default");
233    ///     }
234    ///     None => {
235    ///         // no problem, we'll try again later
236    ///     }
237    /// };
238    /// # }
239    /// ```
240    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
241    pub async fn maybe_compare_and_downgrade_since(
242        &mut self,
243        expected: &O,
244        new: (&O, &Antichain<T>),
245    ) -> Option<Result<Antichain<T>, O>> {
246        let elapsed_since_last_downgrade = Duration::from_millis(
247            (self.machine.applier.cfg.now)().saturating_sub(self.last_downgrade_since),
248        );
249        if elapsed_since_last_downgrade >= self.machine.applier.cfg.critical_downgrade_interval {
250            Some(self.compare_and_downgrade_since(expected, new).await)
251        } else {
252            None
253        }
254    }
255
256    /// Forwards the since capability of this handle to `new_since` iff the opaque value of this
257    /// handle's [CriticalReaderId] is `expected`, and `new_since` is beyond the
258    /// current `since`.
259    ///
260    /// Users are expected to call this function only when a guaranteed downgrade is necessary. All
261    /// other downgrades should preferably go through [Self::maybe_compare_and_downgrade_since]
262    /// which will automatically rate limit the operations.
263    ///
264    /// When returning `Ok(since)`, `since` will be set to the most recent value known for this
265    /// critical reader ID, and is guaranteed to be `!less_than(new_since)`.
266    ///
267    /// Because SinceHandles are expected to live beyond process lifetimes, it's possible for the
268    /// same [CriticalReaderId] to be used concurrently from multiple processes (either
269    /// intentionally or something like a zombie process). To discover this,
270    /// [Self::compare_and_downgrade_since] has "compare and set" semantics over an opaque value.
271    /// If the `expected` opaque value does not match state, an `Err` is returned and the caller
272    /// must decide how to handle it (likely a retry or a `halt!`).
273    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
274    pub async fn compare_and_downgrade_since(
275        &mut self,
276        expected: &O,
277        new: (&O, &Antichain<T>),
278    ) -> Result<Antichain<T>, O> {
279        let (res, maintenance) = self
280            .machine
281            .compare_and_downgrade_since(&self.reader_id, expected, new)
282            .await;
283        self.last_downgrade_since = (self.machine.applier.cfg.now)();
284        maintenance.start_performing(&self.machine, &self.gc);
285        match res {
286            Ok(Since(since)) => {
287                self.since.clone_from(&since);
288                self.opaque.clone_from(new.0);
289                Ok(since)
290            }
291            Err((actual_opaque, since)) => {
292                self.since = since.0;
293                self.opaque.clone_from(&actual_opaque);
294                Err(actual_opaque)
295            }
296        }
297    }
298
299    /// Returns aggregate statistics about the contents of the shard TVC at the
300    /// given frontier.
301    ///
302    /// This command returns the contents of this shard as of `as_of` once they
303    /// are known. This may "block" (in an async-friendly way) if `as_of` is
304    /// greater or equal to the current `upper` of the shard. If `None` is given
305    /// for `as_of`, then the latest stats known by this process are used.
306    ///
307    /// The `Since` error indicates that the requested `as_of` cannot be served
308    /// (the caller has out of date information) and includes the smallest
309    /// `as_of` that would have been accepted.
310    pub fn snapshot_stats(
311        &self,
312        as_of: Option<Antichain<T>>,
313    ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
314        let machine = self.machine.clone();
315        async move {
316            let batches = match as_of {
317                Some(as_of) => machine.snapshot(&as_of).await?,
318                None => machine.applier.all_batches(),
319            };
320            let num_updates = batches.iter().map(|b| b.len).sum();
321            Ok(SnapshotStats {
322                shard_id: machine.shard_id(),
323                num_updates,
324            })
325        }
326    }
327
328    /// Upgrade the version associated with this shard, applying any state migrations. This
329    /// is irrevocable and will fence out old versions: it should only be run only once Materialize
330    /// has fully committed to the new version.
331    pub async fn upgrade_version(&self) -> Result<(), InvalidUsage<T>> {
332        match self.machine.upgrade_version().await {
333            Ok(maintenance) => {
334                let () = maintenance.perform(&self.machine, &self.gc).await;
335                Ok(())
336            }
337            Err(version) => Err(InvalidUsage::IncompatibleVersion { version }),
338        }
339    }
340
341    // Expiry temporarily removed.
342    // If you'd like to stop this handle from holding back the since of the shard,
343    // downgrade it to [].
344    // TODO(bkirwi): revert this when since behaviour on expiry has settled,
345    // or all readers are associated with a critical handle.
346}
347
348#[cfg(test)]
349mod tests {
350    use std::str::FromStr;
351
352    use mz_dyncfg::ConfigUpdates;
353    use serde::{Deserialize, Serialize};
354    use serde_json::json;
355
356    use crate::tests::new_test_client;
357    use crate::{Diagnostics, PersistClient, ShardId};
358
359    use super::*;
360
361    #[mz_ore::test]
362    fn reader_id_human_readable_serde() {
363        #[derive(Debug, Serialize, Deserialize)]
364        struct Container {
365            reader_id: CriticalReaderId,
366        }
367
368        // roundtrip through json
369        let id =
370            CriticalReaderId::from_str("c00000000-1234-5678-0000-000000000000").expect("valid id");
371        assert_eq!(
372            id,
373            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
374                .expect("deserializable")
375        );
376
377        // deserialize a serialized string directly
378        assert_eq!(
379            id,
380            serde_json::from_str("\"c00000000-1234-5678-0000-000000000000\"")
381                .expect("deserializable")
382        );
383
384        // roundtrip id through a container type
385        let json = json!({ "reader_id": id });
386        assert_eq!(
387            "{\"reader_id\":\"c00000000-1234-5678-0000-000000000000\"}",
388            &json.to_string()
389        );
390        let container: Container = serde_json::from_value(json).expect("deserializable");
391        assert_eq!(container.reader_id, id);
392    }
393
394    #[mz_persist_proc::test(tokio::test)]
395    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
396    async fn rate_limit(dyncfgs: ConfigUpdates) {
397        let client = crate::tests::new_test_client(&dyncfgs).await;
398
399        let shard_id = crate::ShardId::new();
400
401        let mut since = client
402            .open_critical_since::<(), (), u64, i64, i64>(
403                shard_id,
404                CriticalReaderId::new(),
405                Diagnostics::for_tests(),
406            )
407            .await
408            .expect("codec mismatch");
409
410        assert_eq!(since.opaque(), &i64::initial());
411
412        since
413            .compare_and_downgrade_since(&i64::initial(), (&5, &Antichain::from_elem(0)))
414            .await
415            .unwrap();
416
417        // should not fire, since we just had a successful `compare_and_downgrade_since` call
418        let noop = since
419            .maybe_compare_and_downgrade_since(&5, (&5, &Antichain::from_elem(0)))
420            .await;
421
422        assert_eq!(noop, None);
423    }
424
425    // Verifies that the handle updates its view of the opaque token correctly
426    #[mz_persist_proc::test(tokio::test)]
427    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
428    async fn handle_opaque_token(dyncfgs: ConfigUpdates) {
429        let client = new_test_client(&dyncfgs).await;
430        let shard_id = ShardId::new();
431
432        let mut since = client
433            .open_critical_since::<(), (), u64, i64, i64>(
434                shard_id,
435                PersistClient::CONTROLLER_CRITICAL_SINCE,
436                Diagnostics::for_tests(),
437            )
438            .await
439            .expect("codec mismatch");
440
441        // The token must be initialized to the default value
442        assert_eq!(since.opaque(), &i64::MIN);
443
444        since
445            .compare_and_downgrade_since(&i64::MIN, (&5, &Antichain::from_elem(0)))
446            .await
447            .unwrap();
448
449        // Our view of the token must be updated now
450        assert_eq!(since.opaque(), &5);
451
452        let since2 = client
453            .open_critical_since::<(), (), u64, i64, i64>(
454                shard_id,
455                PersistClient::CONTROLLER_CRITICAL_SINCE,
456                Diagnostics::for_tests(),
457            )
458            .await
459            .expect("codec mismatch");
460
461        // The token should still be 5
462        assert_eq!(since2.opaque(), &5);
463    }
464}