mz_persist_client/
critical.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Since capabilities and handles
11
12use std::fmt::Debug;
13use std::future::Future;
14use std::time::Duration;
15
16use differential_dataflow::difference::Semigroup;
17use differential_dataflow::lattice::Lattice;
18use mz_ore::instrument;
19use mz_ore::now::EpochMillis;
20use mz_persist_types::{Codec, Codec64, Opaque};
21use proptest_derive::Arbitrary;
22use serde::{Deserialize, Serialize};
23use timely::progress::{Antichain, Timestamp};
24use uuid::Uuid;
25
26use crate::internal::machine::Machine;
27use crate::internal::state::Since;
28use crate::stats::SnapshotStats;
29use crate::{GarbageCollector, ShardId, parse_id};
30
31/// An opaque identifier for a reader of a persist durable TVC (aka shard).
32#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
33#[serde(try_from = "String", into = "String")]
34pub struct CriticalReaderId(pub(crate) [u8; 16]);
35
36impl std::fmt::Display for CriticalReaderId {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        write!(f, "c{}", Uuid::from_bytes(self.0))
39    }
40}
41
42impl std::fmt::Debug for CriticalReaderId {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        write!(f, "CriticalReaderId({})", Uuid::from_bytes(self.0))
45    }
46}
47
48impl std::str::FromStr for CriticalReaderId {
49    type Err = String;
50
51    fn from_str(s: &str) -> Result<Self, Self::Err> {
52        parse_id('c', "CriticalReaderId", s).map(CriticalReaderId)
53    }
54}
55
56impl From<CriticalReaderId> for String {
57    fn from(reader_id: CriticalReaderId) -> Self {
58        reader_id.to_string()
59    }
60}
61
62impl TryFrom<String> for CriticalReaderId {
63    type Error = String;
64
65    fn try_from(s: String) -> Result<Self, Self::Error> {
66        s.parse()
67    }
68}
69
70impl CriticalReaderId {
71    /// Returns a random [CriticalReaderId] that is reasonably likely to have
72    /// never been generated before.
73    ///
74    /// This is intentionally public, unlike [crate::read::LeasedReaderId] and
75    /// [crate::write::WriterId], because [SinceHandle]s are expected to live
76    /// beyond process lifetimes.
77    pub fn new() -> Self {
78        CriticalReaderId(*Uuid::new_v4().as_bytes())
79    }
80}
81
82/// A "capability" granting the ability to hold back the `since` frontier of a
83/// shard.
84///
85/// In contrast to [crate::read::ReadHandle], which is time-leased, this handle
86/// and its associated capability are not leased.
87/// A SinceHandle does not release its capability when dropped.
88/// This is less ergonomic,
89/// but useful for "critical" since holds which must survive even lease timeouts.
90///
91/// **IMPORTANT**: The above means that if a SinceHandle is registered and then
92/// lost, the shard's since will be permanently "stuck", forever preventing
93/// logical compaction. Users are advised to durably record (preferably in code)
94/// the intended [CriticalReaderId] _before_ registering a SinceHandle (in case
95/// the process crashes at the wrong time).
96///
97/// All async methods on SinceHandle retry for as long as they are able, but the
98/// returned [std::future::Future]s implement "cancel on drop" semantics. This
99/// means that callers can add a timeout using [tokio::time::timeout] or
100/// [tokio::time::timeout_at].
101#[derive(Debug)]
102pub struct SinceHandle<K: Codec, V: Codec, T, D, O> {
103    pub(crate) machine: Machine<K, V, T, D>,
104    pub(crate) gc: GarbageCollector<K, V, T, D>,
105    pub(crate) reader_id: CriticalReaderId,
106
107    since: Antichain<T>,
108    opaque: O,
109    last_downgrade_since: EpochMillis,
110}
111
112impl<K, V, T, D, O> SinceHandle<K, V, T, D, O>
113where
114    K: Debug + Codec,
115    V: Debug + Codec,
116    T: Timestamp + Lattice + Codec64 + Sync,
117    D: Semigroup + Codec64 + Send + Sync,
118    O: Opaque + Codec64,
119{
120    pub(crate) fn new(
121        machine: Machine<K, V, T, D>,
122        gc: GarbageCollector<K, V, T, D>,
123        reader_id: CriticalReaderId,
124        since: Antichain<T>,
125        opaque: O,
126    ) -> Self {
127        SinceHandle {
128            machine,
129            gc,
130            reader_id,
131            since,
132            opaque,
133            last_downgrade_since: EpochMillis::default(),
134        }
135    }
136
137    /// This handle's shard id.
138    pub fn shard_id(&self) -> ShardId {
139        self.machine.shard_id()
140    }
141
142    /// This handle's `since` capability.
143    ///
144    /// This will always be greater or equal to the shard-global `since`.
145    pub fn since(&self) -> &Antichain<T> {
146        &self.since
147    }
148
149    /// This handle's `opaque`.
150    pub fn opaque(&self) -> &O {
151        &self.opaque
152    }
153
154    /// Attempts to forward the since capability of this handle to `new_since` iff
155    /// the opaque value of this handle's [CriticalReaderId] is `expected`, and
156    /// [Self::maybe_compare_and_downgrade_since] chooses to perform the downgrade.
157    ///
158    /// Users are expected to call this function frequently, but should not expect
159    /// `since` to be downgraded with each call -- this function is free to no-op
160    /// requests to perform rate-limiting for downstream services. A `None` is returned
161    /// for no-op requests, and `Some` is returned when downgrading since.
162    ///
163    /// When returning `Some(since)`, `since` will be set to the most recent value
164    /// known for this critical reader ID, and is guaranteed to be `!less_than(new_since)`.
165    ///
166    /// Because SinceHandles are expected to live beyond process lifetimes, it's
167    /// possible for the same [CriticalReaderId] to be used concurrently from
168    /// multiple processes (either intentionally or something like a zombie
169    /// process). To discover this, [Self::maybe_compare_and_downgrade_since] has
170    /// "compare and set" semantics over an opaque value. If the `expected` opaque
171    /// value does not match state, an `Err` is returned and the caller must decide
172    /// how to handle it (likely a retry or a `halt!`).
173    ///
174    /// If desired, users may use the opaque value to fence out concurrent access
175    /// of other [SinceHandle]s for a given [CriticalReaderId]. e.g.:
176    ///
177    /// ```rust,no_run
178    /// use timely::progress::Antichain;
179    /// use mz_persist_client::critical::SinceHandle;
180    /// use mz_persist_types::Codec64;
181    ///
182    /// # async fn example() {
183    /// let fencing_token: u64 = unimplemented!();
184    /// let mut since: SinceHandle<String, String, u64, i64, u64> = unimplemented!();
185    ///
186    /// let new_since: Antichain<u64> = unimplemented!();
187    /// let res = since
188    ///     .maybe_compare_and_downgrade_since(
189    ///         &since.opaque().clone(),
190    ///         (&fencing_token, &new_since),
191    ///     )
192    ///     .await;
193    ///
194    /// match res {
195    ///     Some(Ok(_)) => {
196    ///         // we downgraded since!
197    ///     }
198    ///     Some(Err(actual_fencing_token)) => {
199    ///         // compare `fencing_token` and `actual_fencing_token`, etc
200    ///     }
201    ///     None => {
202    ///         // no problem, we'll try again later
203    ///     }
204    /// }
205    /// # }
206    /// ```
207    ///
208    /// If fencing is not required and it's acceptable to have concurrent [SinceHandle] for
209    /// a given [CriticalReaderId], the opaque value can be given a default value and ignored:
210    ///
211    /// ```rust,no_run
212    /// use timely::progress::Antichain;
213    /// use mz_persist_client::critical::SinceHandle;
214    /// use mz_persist_types::Codec64;
215    ///
216    /// # async fn example() {
217    /// let mut since: SinceHandle<String, String, u64, i64, u64> = unimplemented!();
218    /// let new_since: Antichain<u64> = unimplemented!();
219    /// let res = since
220    ///     .maybe_compare_and_downgrade_since(
221    ///         &since.opaque().clone(),
222    ///         (&since.opaque().clone(), &new_since),
223    ///     )
224    ///     .await;
225    ///
226    /// match res {
227    ///     Some(Ok(_)) => {
228    ///         // woohoo!
229    ///     }
230    ///     Some(Err(_actual_opaque)) => {
231    ///         panic!("the opaque value should never change from the default");
232    ///     }
233    ///     None => {
234    ///         // no problem, we'll try again later
235    ///     }
236    /// };
237    /// # }
238    /// ```
239    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
240    pub async fn maybe_compare_and_downgrade_since(
241        &mut self,
242        expected: &O,
243        new: (&O, &Antichain<T>),
244    ) -> Option<Result<Antichain<T>, O>> {
245        let elapsed_since_last_downgrade = Duration::from_millis(
246            (self.machine.applier.cfg.now)().saturating_sub(self.last_downgrade_since),
247        );
248        if elapsed_since_last_downgrade >= self.machine.applier.cfg.critical_downgrade_interval {
249            Some(self.compare_and_downgrade_since(expected, new).await)
250        } else {
251            None
252        }
253    }
254
255    /// Forwards the since capability of this handle to `new_since` iff the opaque value of this
256    /// handle's [CriticalReaderId] is `expected`, and `new_since` is beyond the
257    /// current `since`.
258    ///
259    /// Users are expected to call this function only when a guaranteed downgrade is necessary. All
260    /// other downgrades should preferably go through [Self::maybe_compare_and_downgrade_since]
261    /// which will automatically rate limit the operations.
262    ///
263    /// When returning `Ok(since)`, `since` will be set to the most recent value known for this
264    /// critical reader ID, and is guaranteed to be `!less_than(new_since)`.
265    ///
266    /// Because SinceHandles are expected to live beyond process lifetimes, it's possible for the
267    /// same [CriticalReaderId] to be used concurrently from multiple processes (either
268    /// intentionally or something like a zombie process). To discover this,
269    /// [Self::compare_and_downgrade_since] has "compare and set" semantics over an opaque value.
270    /// If the `expected` opaque value does not match state, an `Err` is returned and the caller
271    /// must decide how to handle it (likely a retry or a `halt!`).
272    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
273    pub async fn compare_and_downgrade_since(
274        &mut self,
275        expected: &O,
276        new: (&O, &Antichain<T>),
277    ) -> Result<Antichain<T>, O> {
278        let (res, maintenance) = self
279            .machine
280            .compare_and_downgrade_since(&self.reader_id, expected, new)
281            .await;
282        self.last_downgrade_since = (self.machine.applier.cfg.now)();
283        maintenance.start_performing(&self.machine, &self.gc);
284        match res {
285            Ok(Since(since)) => {
286                self.since.clone_from(&since);
287                self.opaque.clone_from(new.0);
288                Ok(since)
289            }
290            Err((actual_opaque, since)) => {
291                self.since = since.0;
292                self.opaque.clone_from(&actual_opaque);
293                Err(actual_opaque)
294            }
295        }
296    }
297
298    /// Returns aggregate statistics about the contents of the shard TVC at the
299    /// given frontier.
300    ///
301    /// This command returns the contents of this shard as of `as_of` once they
302    /// are known. This may "block" (in an async-friendly way) if `as_of` is
303    /// greater or equal to the current `upper` of the shard. If `None` is given
304    /// for `as_of`, then the latest stats known by this process are used.
305    ///
306    /// The `Since` error indicates that the requested `as_of` cannot be served
307    /// (the caller has out of date information) and includes the smallest
308    /// `as_of` that would have been accepted.
309    pub fn snapshot_stats(
310        &self,
311        as_of: Option<Antichain<T>>,
312    ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
313        let machine = self.machine.clone();
314        async move {
315            let batches = match as_of {
316                Some(as_of) => machine.snapshot(&as_of).await?,
317                None => machine.applier.all_batches(),
318            };
319            let num_updates = batches.iter().map(|b| b.len).sum();
320            Ok(SnapshotStats {
321                shard_id: machine.shard_id(),
322                num_updates,
323            })
324        }
325    }
326
327    // Expiry temporarily removed.
328    // If you'd like to stop this handle from holding back the since of the shard,
329    // downgrade it to [].
330    // TODO(bkirwi): revert this when since behaviour on expiry has settled,
331    // or all readers are associated with a critical handle.
332}
333
334#[cfg(test)]
335mod tests {
336    use std::str::FromStr;
337
338    use mz_dyncfg::ConfigUpdates;
339    use serde::{Deserialize, Serialize};
340    use serde_json::json;
341
342    use crate::tests::new_test_client;
343    use crate::{Diagnostics, PersistClient, ShardId};
344
345    use super::*;
346
347    #[mz_ore::test]
348    fn reader_id_human_readable_serde() {
349        #[derive(Debug, Serialize, Deserialize)]
350        struct Container {
351            reader_id: CriticalReaderId,
352        }
353
354        // roundtrip through json
355        let id =
356            CriticalReaderId::from_str("c00000000-1234-5678-0000-000000000000").expect("valid id");
357        assert_eq!(
358            id,
359            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
360                .expect("deserializable")
361        );
362
363        // deserialize a serialized string directly
364        assert_eq!(
365            id,
366            serde_json::from_str("\"c00000000-1234-5678-0000-000000000000\"")
367                .expect("deserializable")
368        );
369
370        // roundtrip id through a container type
371        let json = json!({ "reader_id": id });
372        assert_eq!(
373            "{\"reader_id\":\"c00000000-1234-5678-0000-000000000000\"}",
374            &json.to_string()
375        );
376        let container: Container = serde_json::from_value(json).expect("deserializable");
377        assert_eq!(container.reader_id, id);
378    }
379
380    #[mz_persist_proc::test(tokio::test)]
381    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
382    async fn rate_limit(dyncfgs: ConfigUpdates) {
383        let client = crate::tests::new_test_client(&dyncfgs).await;
384
385        let shard_id = crate::ShardId::new();
386
387        let mut since = client
388            .open_critical_since::<(), (), u64, i64, i64>(
389                shard_id,
390                CriticalReaderId::new(),
391                Diagnostics::for_tests(),
392            )
393            .await
394            .expect("codec mismatch");
395
396        assert_eq!(since.opaque(), &i64::initial());
397
398        since
399            .compare_and_downgrade_since(&i64::initial(), (&5, &Antichain::from_elem(0)))
400            .await
401            .unwrap();
402
403        // should not fire, since we just had a successful `compare_and_downgrade_since` call
404        let noop = since
405            .maybe_compare_and_downgrade_since(&5, (&5, &Antichain::from_elem(0)))
406            .await;
407
408        assert_eq!(noop, None);
409    }
410
411    // Verifies that the handle updates its view of the opaque token correctly
412    #[mz_persist_proc::test(tokio::test)]
413    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
414    async fn handle_opaque_token(dyncfgs: ConfigUpdates) {
415        let client = new_test_client(&dyncfgs).await;
416        let shard_id = ShardId::new();
417
418        let mut since = client
419            .open_critical_since::<(), (), u64, i64, i64>(
420                shard_id,
421                PersistClient::CONTROLLER_CRITICAL_SINCE,
422                Diagnostics::for_tests(),
423            )
424            .await
425            .expect("codec mismatch");
426
427        // The token must be initialized to the default value
428        assert_eq!(since.opaque(), &i64::MIN);
429
430        since
431            .compare_and_downgrade_since(&i64::MIN, (&5, &Antichain::from_elem(0)))
432            .await
433            .unwrap();
434
435        // Our view of the token must be updated now
436        assert_eq!(since.opaque(), &5);
437
438        let since2 = client
439            .open_critical_since::<(), (), u64, i64, i64>(
440                shard_id,
441                PersistClient::CONTROLLER_CRITICAL_SINCE,
442                Diagnostics::for_tests(),
443            )
444            .await
445            .expect("codec mismatch");
446
447        // The token should still be 5
448        assert_eq!(since2.opaque(), &5);
449    }
450}