Skip to main content

mz_persist_client/
critical.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Since capabilities and handles
11
12use std::fmt::Debug;
13use std::future::Future;
14use std::time::Duration;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use mz_ore::instrument;
19use mz_ore::now::EpochMillis;
20use mz_persist_types::{Codec, Codec64, Opaque};
21use proptest_derive::Arbitrary;
22use serde::{Deserialize, Serialize};
23use timely::progress::{Antichain, Timestamp};
24use uuid::Uuid;
25
26use crate::error::InvalidUsage;
27use crate::internal::machine::Machine;
28use crate::internal::state::Since;
29use crate::stats::SnapshotStats;
30use crate::{GarbageCollector, ShardId, parse_id};
31
32/// An opaque identifier for a reader of a persist durable TVC (aka shard).
33#[derive(
34    Arbitrary,
35    Clone,
36    PartialEq,
37    Eq,
38    PartialOrd,
39    Ord,
40    Hash,
41    Serialize,
42    Deserialize
43)]
44#[serde(try_from = "String", into = "String")]
45pub struct CriticalReaderId(pub(crate) [u8; 16]);
46
47impl std::fmt::Display for CriticalReaderId {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        write!(f, "c{}", Uuid::from_bytes(self.0))
50    }
51}
52
53impl std::fmt::Debug for CriticalReaderId {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        write!(f, "CriticalReaderId({})", Uuid::from_bytes(self.0))
56    }
57}
58
59impl std::str::FromStr for CriticalReaderId {
60    type Err = String;
61
62    fn from_str(s: &str) -> Result<Self, Self::Err> {
63        parse_id("c", "CriticalReaderId", s).map(CriticalReaderId)
64    }
65}
66
67impl From<CriticalReaderId> for String {
68    fn from(reader_id: CriticalReaderId) -> Self {
69        reader_id.to_string()
70    }
71}
72
73impl TryFrom<String> for CriticalReaderId {
74    type Error = String;
75
76    fn try_from(s: String) -> Result<Self, Self::Error> {
77        s.parse()
78    }
79}
80
81impl CriticalReaderId {
82    /// Returns a random [CriticalReaderId] that is reasonably likely to have
83    /// never been generated before.
84    ///
85    /// This is intentionally public, unlike [crate::read::LeasedReaderId] and
86    /// [crate::write::WriterId], because [SinceHandle]s are expected to live
87    /// beyond process lifetimes.
88    pub fn new() -> Self {
89        CriticalReaderId(*Uuid::new_v4().as_bytes())
90    }
91}
92
93/// A "capability" granting the ability to hold back the `since` frontier of a
94/// shard.
95///
96/// In contrast to [crate::read::ReadHandle], which is time-leased, this handle
97/// and its associated capability are not leased.
98/// A SinceHandle does not release its capability when dropped.
99/// This is less ergonomic,
100/// but useful for "critical" since holds which must survive even lease timeouts.
101///
102/// **IMPORTANT**: The above means that if a SinceHandle is registered and then
103/// lost, the shard's since will be permanently "stuck", forever preventing
104/// logical compaction. Users are advised to durably record (preferably in code)
105/// the intended [CriticalReaderId] _before_ registering a SinceHandle (in case
106/// the process crashes at the wrong time).
107///
108/// All async methods on SinceHandle retry for as long as they are able, but the
109/// returned [std::future::Future]s implement "cancel on drop" semantics. This
110/// means that callers can add a timeout using [tokio::time::timeout] or
111/// [tokio::time::timeout_at].
112#[derive(Debug)]
113pub struct SinceHandle<K: Codec, V: Codec, T, D, O> {
114    pub(crate) machine: Machine<K, V, T, D>,
115    pub(crate) gc: GarbageCollector<K, V, T, D>,
116    pub(crate) reader_id: CriticalReaderId,
117
118    since: Antichain<T>,
119    opaque: O,
120    last_downgrade_since: EpochMillis,
121}
122
123impl<K, V, T, D, O> SinceHandle<K, V, T, D, O>
124where
125    K: Debug + Codec,
126    V: Debug + Codec,
127    T: Timestamp + Lattice + Codec64 + Sync,
128    D: Monoid + Codec64 + Send + Sync,
129    O: Opaque + Codec64,
130{
131    pub(crate) fn new(
132        machine: Machine<K, V, T, D>,
133        gc: GarbageCollector<K, V, T, D>,
134        reader_id: CriticalReaderId,
135        since: Antichain<T>,
136        opaque: O,
137    ) -> Self {
138        SinceHandle {
139            machine,
140            gc,
141            reader_id,
142            since,
143            opaque,
144            last_downgrade_since: EpochMillis::default(),
145        }
146    }
147
148    /// This handle's shard id.
149    pub fn shard_id(&self) -> ShardId {
150        self.machine.shard_id()
151    }
152
153    /// This handle's `since` capability.
154    ///
155    /// This will always be greater or equal to the shard-global `since`.
156    pub fn since(&self) -> &Antichain<T> {
157        &self.since
158    }
159
160    /// This handle's `opaque`.
161    pub fn opaque(&self) -> &O {
162        &self.opaque
163    }
164
165    /// Attempts to forward the since capability of this handle to `new_since` iff
166    /// the opaque value of this handle's [CriticalReaderId] is `expected`, and
167    /// [Self::maybe_compare_and_downgrade_since] chooses to perform the downgrade.
168    ///
169    /// Users are expected to call this function frequently, but should not expect
170    /// `since` to be downgraded with each call -- this function is free to no-op
171    /// requests to perform rate-limiting for downstream services. A `None` is returned
172    /// for no-op requests, and `Some` is returned when downgrading since.
173    ///
174    /// When returning `Some(since)`, `since` will be set to the most recent value
175    /// known for this critical reader ID, and is guaranteed to be `!less_than(new_since)`.
176    ///
177    /// Because SinceHandles are expected to live beyond process lifetimes, it's
178    /// possible for the same [CriticalReaderId] to be used concurrently from
179    /// multiple processes (either intentionally or something like a zombie
180    /// process). To discover this, [Self::maybe_compare_and_downgrade_since] has
181    /// "compare and set" semantics over an opaque value. If the `expected` opaque
182    /// value does not match state, an `Err` is returned and the caller must decide
183    /// how to handle it (likely a retry or a `halt!`).
184    ///
185    /// If desired, users may use the opaque value to fence out concurrent access
186    /// of other [SinceHandle]s for a given [CriticalReaderId]. e.g.:
187    ///
188    /// ```rust,no_run
189    /// use timely::progress::Antichain;
190    /// use mz_persist_client::critical::SinceHandle;
191    /// use mz_persist_types::Codec64;
192    ///
193    /// # async fn example() {
194    /// let fencing_token: u64 = unimplemented!();
195    /// let mut since: SinceHandle<String, String, u64, i64, u64> = unimplemented!();
196    ///
197    /// let new_since: Antichain<u64> = unimplemented!();
198    /// let res = since
199    ///     .maybe_compare_and_downgrade_since(
200    ///         &since.opaque().clone(),
201    ///         (&fencing_token, &new_since),
202    ///     )
203    ///     .await;
204    ///
205    /// match res {
206    ///     Some(Ok(_)) => {
207    ///         // we downgraded since!
208    ///     }
209    ///     Some(Err(actual_fencing_token)) => {
210    ///         // compare `fencing_token` and `actual_fencing_token`, etc
211    ///     }
212    ///     None => {
213    ///         // no problem, we'll try again later
214    ///     }
215    /// }
216    /// # }
217    /// ```
218    ///
219    /// If fencing is not required and it's acceptable to have concurrent [SinceHandle] for
220    /// a given [CriticalReaderId], the opaque value can be given a default value and ignored:
221    ///
222    /// ```rust,no_run
223    /// use timely::progress::Antichain;
224    /// use mz_persist_client::critical::SinceHandle;
225    /// use mz_persist_types::Codec64;
226    ///
227    /// # async fn example() {
228    /// let mut since: SinceHandle<String, String, u64, i64, u64> = unimplemented!();
229    /// let new_since: Antichain<u64> = unimplemented!();
230    /// let res = since
231    ///     .maybe_compare_and_downgrade_since(
232    ///         &since.opaque().clone(),
233    ///         (&since.opaque().clone(), &new_since),
234    ///     )
235    ///     .await;
236    ///
237    /// match res {
238    ///     Some(Ok(_)) => {
239    ///         // woohoo!
240    ///     }
241    ///     Some(Err(_actual_opaque)) => {
242    ///         panic!("the opaque value should never change from the default");
243    ///     }
244    ///     None => {
245    ///         // no problem, we'll try again later
246    ///     }
247    /// };
248    /// # }
249    /// ```
250    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
251    pub async fn maybe_compare_and_downgrade_since(
252        &mut self,
253        expected: &O,
254        new: (&O, &Antichain<T>),
255    ) -> Option<Result<Antichain<T>, O>> {
256        let elapsed_since_last_downgrade = Duration::from_millis(
257            (self.machine.applier.cfg.now)().saturating_sub(self.last_downgrade_since),
258        );
259        if elapsed_since_last_downgrade >= self.machine.applier.cfg.critical_downgrade_interval {
260            Some(self.compare_and_downgrade_since(expected, new).await)
261        } else {
262            None
263        }
264    }
265
266    /// Forwards the since capability of this handle to `new_since` iff the opaque value of this
267    /// handle's [CriticalReaderId] is `expected`, and `new_since` is beyond the
268    /// current `since`.
269    ///
270    /// Users are expected to call this function only when a guaranteed downgrade is necessary. All
271    /// other downgrades should preferably go through [Self::maybe_compare_and_downgrade_since]
272    /// which will automatically rate limit the operations.
273    ///
274    /// When returning `Ok(since)`, `since` will be set to the most recent value known for this
275    /// critical reader ID, and is guaranteed to be `!less_than(new_since)`.
276    ///
277    /// Because SinceHandles are expected to live beyond process lifetimes, it's possible for the
278    /// same [CriticalReaderId] to be used concurrently from multiple processes (either
279    /// intentionally or something like a zombie process). To discover this,
280    /// [Self::compare_and_downgrade_since] has "compare and set" semantics over an opaque value.
281    /// If the `expected` opaque value does not match state, an `Err` is returned and the caller
282    /// must decide how to handle it (likely a retry or a `halt!`).
283    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
284    pub async fn compare_and_downgrade_since(
285        &mut self,
286        expected: &O,
287        new: (&O, &Antichain<T>),
288    ) -> Result<Antichain<T>, O> {
289        let (res, maintenance) = self
290            .machine
291            .compare_and_downgrade_since(&self.reader_id, expected, new)
292            .await;
293        self.last_downgrade_since = (self.machine.applier.cfg.now)();
294        maintenance.start_performing(&self.machine, &self.gc);
295        match res {
296            Ok(Since(since)) => {
297                self.since.clone_from(&since);
298                self.opaque.clone_from(new.0);
299                Ok(since)
300            }
301            Err((actual_opaque, since)) => {
302                self.since = since.0;
303                self.opaque.clone_from(&actual_opaque);
304                Err(actual_opaque)
305            }
306        }
307    }
308
309    /// Returns aggregate statistics about the contents of the shard TVC at the
310    /// given frontier.
311    ///
312    /// This command returns the contents of this shard as of `as_of` once they
313    /// are known. This may "block" (in an async-friendly way) if `as_of` is
314    /// greater or equal to the current `upper` of the shard. If `None` is given
315    /// for `as_of`, then the latest stats known by this process are used.
316    ///
317    /// The `Since` error indicates that the requested `as_of` cannot be served
318    /// (the caller has out of date information) and includes the smallest
319    /// `as_of` that would have been accepted.
320    pub fn snapshot_stats(
321        &self,
322        as_of: Option<Antichain<T>>,
323    ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
324        let machine = self.machine.clone();
325        async move {
326            let batches = match as_of {
327                Some(as_of) => machine.snapshot(&as_of).await?,
328                None => machine.applier.all_batches(),
329            };
330            let num_updates = batches.iter().map(|b| b.len).sum();
331            Ok(SnapshotStats {
332                shard_id: machine.shard_id(),
333                num_updates,
334            })
335        }
336    }
337
338    /// Upgrade the version associated with this shard, applying any state migrations. This
339    /// is irrevocable and will fence out old versions: it should only be run only once Materialize
340    /// has fully committed to the new version.
341    pub async fn upgrade_version(&self) -> Result<(), InvalidUsage<T>> {
342        match self.machine.upgrade_version().await {
343            Ok(maintenance) => {
344                let () = maintenance.perform(&self.machine, &self.gc).await;
345                Ok(())
346            }
347            Err(version) => Err(InvalidUsage::IncompatibleVersion { version }),
348        }
349    }
350
351    // Expiry temporarily removed.
352    // If you'd like to stop this handle from holding back the since of the shard,
353    // downgrade it to [].
354    // TODO(bkirwi): revert this when since behaviour on expiry has settled,
355    // or all readers are associated with a critical handle.
356}
357
358#[cfg(test)]
359mod tests {
360    use std::str::FromStr;
361
362    use mz_dyncfg::ConfigUpdates;
363    use serde::{Deserialize, Serialize};
364    use serde_json::json;
365
366    use crate::tests::new_test_client;
367    use crate::{Diagnostics, PersistClient, ShardId};
368
369    use super::*;
370
371    #[mz_ore::test]
372    fn reader_id_human_readable_serde() {
373        #[derive(Debug, Serialize, Deserialize)]
374        struct Container {
375            reader_id: CriticalReaderId,
376        }
377
378        // roundtrip through json
379        let id =
380            CriticalReaderId::from_str("c00000000-1234-5678-0000-000000000000").expect("valid id");
381        assert_eq!(
382            id,
383            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
384                .expect("deserializable")
385        );
386
387        // deserialize a serialized string directly
388        assert_eq!(
389            id,
390            serde_json::from_str("\"c00000000-1234-5678-0000-000000000000\"")
391                .expect("deserializable")
392        );
393
394        // roundtrip id through a container type
395        let json = json!({ "reader_id": id });
396        assert_eq!(
397            "{\"reader_id\":\"c00000000-1234-5678-0000-000000000000\"}",
398            &json.to_string()
399        );
400        let container: Container = serde_json::from_value(json).expect("deserializable");
401        assert_eq!(container.reader_id, id);
402    }
403
404    #[mz_persist_proc::test(tokio::test)]
405    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
406    async fn rate_limit(dyncfgs: ConfigUpdates) {
407        let client = crate::tests::new_test_client(&dyncfgs).await;
408
409        let shard_id = crate::ShardId::new();
410
411        let mut since = client
412            .open_critical_since::<(), (), u64, i64, i64>(
413                shard_id,
414                CriticalReaderId::new(),
415                Diagnostics::for_tests(),
416            )
417            .await
418            .expect("codec mismatch");
419
420        assert_eq!(since.opaque(), &i64::initial());
421
422        since
423            .compare_and_downgrade_since(&i64::initial(), (&5, &Antichain::from_elem(0)))
424            .await
425            .unwrap();
426
427        // should not fire, since we just had a successful `compare_and_downgrade_since` call
428        let noop = since
429            .maybe_compare_and_downgrade_since(&5, (&5, &Antichain::from_elem(0)))
430            .await;
431
432        assert_eq!(noop, None);
433    }
434
435    // Verifies that the handle updates its view of the opaque token correctly
436    #[mz_persist_proc::test(tokio::test)]
437    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
438    async fn handle_opaque_token(dyncfgs: ConfigUpdates) {
439        let client = new_test_client(&dyncfgs).await;
440        let shard_id = ShardId::new();
441
442        let mut since = client
443            .open_critical_since::<(), (), u64, i64, i64>(
444                shard_id,
445                PersistClient::CONTROLLER_CRITICAL_SINCE,
446                Diagnostics::for_tests(),
447            )
448            .await
449            .expect("codec mismatch");
450
451        // The token must be initialized to the default value
452        assert_eq!(since.opaque(), &i64::MIN);
453
454        since
455            .compare_and_downgrade_since(&i64::MIN, (&5, &Antichain::from_elem(0)))
456            .await
457            .unwrap();
458
459        // Our view of the token must be updated now
460        assert_eq!(since.opaque(), &5);
461
462        let since2 = client
463            .open_critical_since::<(), (), u64, i64, i64>(
464                shard_id,
465                PersistClient::CONTROLLER_CRITICAL_SINCE,
466                Diagnostics::for_tests(),
467            )
468            .await
469            .expect("codec mismatch");
470
471        // The token should still be 5
472        assert_eq!(since2.opaque(), &5);
473    }
474}