Skip to main content

mz_persist_client/
critical.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Since capabilities and handles
11
12use std::fmt::Debug;
13use std::future::Future;
14use std::time::Duration;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use mz_ore::now::EpochMillis;
19use mz_ore::{instrument, soft_assert_eq_or_log};
20use mz_persist_types::{Codec, Codec64};
21use proptest_derive::Arbitrary;
22use serde::ser::SerializeTupleStruct;
23use serde::{Deserialize, Serialize, Serializer};
24use timely::progress::{Antichain, Timestamp};
25use uuid::Uuid;
26
27use crate::error::InvalidUsage;
28use crate::internal::machine::Machine;
29use crate::internal::state::Since;
30use crate::stats::SnapshotStats;
31use crate::{GarbageCollector, ShardId, parse_id};
32
33/// An opaque identifier for a reader of a persist durable TVC (aka shard).
34#[derive(
35    Arbitrary,
36    Clone,
37    PartialEq,
38    Eq,
39    PartialOrd,
40    Ord,
41    Hash,
42    Serialize,
43    Deserialize
44)]
45#[serde(try_from = "String", into = "String")]
46pub struct CriticalReaderId(pub(crate) [u8; 16]);
47
48impl std::fmt::Display for CriticalReaderId {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        write!(f, "c{}", Uuid::from_bytes(self.0))
51    }
52}
53
54impl std::fmt::Debug for CriticalReaderId {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        write!(f, "CriticalReaderId({})", Uuid::from_bytes(self.0))
57    }
58}
59
60impl std::str::FromStr for CriticalReaderId {
61    type Err = String;
62
63    fn from_str(s: &str) -> Result<Self, Self::Err> {
64        parse_id("c", "CriticalReaderId", s).map(CriticalReaderId)
65    }
66}
67
68impl From<CriticalReaderId> for String {
69    fn from(reader_id: CriticalReaderId) -> Self {
70        reader_id.to_string()
71    }
72}
73
74impl TryFrom<String> for CriticalReaderId {
75    type Error = String;
76
77    fn try_from(s: String) -> Result<Self, Self::Error> {
78        s.parse()
79    }
80}
81
82impl CriticalReaderId {
83    /// Returns a random [CriticalReaderId] that is reasonably likely to have
84    /// never been generated before.
85    ///
86    /// This is intentionally public, unlike [crate::read::LeasedReaderId] and
87    /// [crate::write::WriterId], because [SinceHandle]s are expected to live
88    /// beyond process lifetimes.
89    pub fn new() -> Self {
90        CriticalReaderId(*Uuid::new_v4().as_bytes())
91    }
92}
93
94/// An opaque fencing token used in compare_and_downgrade_since.
95#[derive(Arbitrary, Debug, Clone, PartialEq)]
96pub struct Opaque(pub(crate) String, pub(crate) [u8; 8]);
97
98impl Serialize for Opaque {
99    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
100    where
101        S: Serializer,
102    {
103        let mut serializer = serializer.serialize_tuple_struct("Opaque", 2)?;
104        serializer.serialize_field(&self.0)?;
105        serializer.serialize_field(&u64::from_le_bytes(self.1))?;
106        serializer.end()
107    }
108}
109
110impl Opaque {
111    /// The name of the codec used to encode this token.
112    pub fn codec_name(&self) -> &str {
113        &self.0
114    }
115
116    /// Decode this token as the given type.
117    pub fn decode<T: Codec64>(&self) -> T {
118        soft_assert_eq_or_log!(T::codec_name(), self.0);
119        T::decode(self.1)
120    }
121
122    /// Encode the given 64-bit type as an opaque token. This records both the name of the type/encoding
123    /// and the 64-bit value itself.
124    pub fn encode<T: Codec64>(value: &T) -> Self {
125        Self(T::codec_name(), T::encode(value))
126    }
127}
128
129/// A "capability" granting the ability to hold back the `since` frontier of a
130/// shard.
131///
132/// In contrast to [crate::read::ReadHandle], which is time-leased, this handle
133/// and its associated capability are not leased.
134/// A SinceHandle does not release its capability when dropped.
135/// This is less ergonomic,
136/// but useful for "critical" since holds which must survive even lease timeouts.
137///
138/// **IMPORTANT**: The above means that if a SinceHandle is registered and then
139/// lost, the shard's since will be permanently "stuck", forever preventing
140/// logical compaction. Users are advised to durably record (preferably in code)
141/// the intended [CriticalReaderId] _before_ registering a SinceHandle (in case
142/// the process crashes at the wrong time).
143///
144/// All async methods on SinceHandle retry for as long as they are able, but the
145/// returned [std::future::Future]s implement "cancel on drop" semantics. This
146/// means that callers can add a timeout using [tokio::time::timeout] or
147/// [tokio::time::timeout_at].
148#[derive(Debug)]
149pub struct SinceHandle<K: Codec, V: Codec, T, D> {
150    pub(crate) machine: Machine<K, V, T, D>,
151    pub(crate) gc: GarbageCollector<K, V, T, D>,
152    pub(crate) reader_id: CriticalReaderId,
153
154    since: Antichain<T>,
155    opaque: Opaque,
156    last_downgrade_since: EpochMillis,
157}
158
159impl<K, V, T, D> SinceHandle<K, V, T, D>
160where
161    K: Debug + Codec,
162    V: Debug + Codec,
163    T: Timestamp + Lattice + Codec64 + Sync,
164    D: Monoid + Codec64 + Send + Sync,
165{
166    pub(crate) fn new(
167        machine: Machine<K, V, T, D>,
168        gc: GarbageCollector<K, V, T, D>,
169        reader_id: CriticalReaderId,
170        since: Antichain<T>,
171        opaque: Opaque,
172    ) -> Self {
173        SinceHandle {
174            machine,
175            gc,
176            reader_id,
177            since,
178            opaque,
179            last_downgrade_since: EpochMillis::default(),
180        }
181    }
182
183    /// This handle's shard id.
184    pub fn shard_id(&self) -> ShardId {
185        self.machine.shard_id()
186    }
187
188    /// This handle's `since` capability.
189    ///
190    /// This will always be greater or equal to the shard-global `since`.
191    pub fn since(&self) -> &Antichain<T> {
192        &self.since
193    }
194
195    /// This handle's `opaque`.
196    pub fn opaque(&self) -> &Opaque {
197        &self.opaque
198    }
199
200    /// Attempts to forward the since capability of this handle to `new_since` iff
201    /// the opaque value of this handle's [CriticalReaderId] is `expected`, and
202    /// [Self::maybe_compare_and_downgrade_since] chooses to perform the downgrade.
203    ///
204    /// Users are expected to call this function frequently, but should not expect
205    /// `since` to be downgraded with each call -- this function is free to no-op
206    /// requests to perform rate-limiting for downstream services. A `None` is returned
207    /// for no-op requests, and `Some` is returned when downgrading since.
208    ///
209    /// When returning `Some(since)`, `since` will be set to the most recent value
210    /// known for this critical reader ID, and is guaranteed to be `!less_than(new_since)`.
211    ///
212    /// Because SinceHandles are expected to live beyond process lifetimes, it's
213    /// possible for the same [CriticalReaderId] to be used concurrently from
214    /// multiple processes (either intentionally or something like a zombie
215    /// process). To discover this, [Self::maybe_compare_and_downgrade_since] has
216    /// "compare and set" semantics over an opaque value. If the `expected` opaque
217    /// value does not match state, an `Err` is returned and the caller must decide
218    /// how to handle it (likely a retry or a `halt!`).
219    ///
220    /// If desired, users may use the opaque value to fence out concurrent access
221    /// of other [SinceHandle]s for a given [CriticalReaderId]. e.g.:
222    ///
223    /// ```rust,no_run
224    /// use timely::progress::Antichain;
225    /// use mz_persist_client::critical::{SinceHandle, Opaque};
226    /// use mz_persist_types::Codec64;
227    ///
228    /// # async fn example() {
229    /// let fencing_token: Opaque = unimplemented!();
230    /// let mut since: SinceHandle<String, String, u64, i64> = unimplemented!();
231    ///
232    /// let new_since: Antichain<u64> = unimplemented!();
233    /// let res = since
234    ///     .maybe_compare_and_downgrade_since(
235    ///         &since.opaque().clone(),
236    ///         (&fencing_token, &new_since),
237    ///     )
238    ///     .await;
239    ///
240    /// match res {
241    ///     Some(Ok(_)) => {
242    ///         // we downgraded since!
243    ///     }
244    ///     Some(Err(actual_fencing_token)) => {
245    ///         // compare `fencing_token` and `actual_fencing_token`, etc
246    ///     }
247    ///     None => {
248    ///         // no problem, we'll try again later
249    ///     }
250    /// }
251    /// # }
252    /// ```
253    ///
254    /// If fencing is not required and it's acceptable to have concurrent [SinceHandle] for
255    /// a given [CriticalReaderId], the opaque value can be given a default value and ignored:
256    ///
257    /// ```rust,no_run
258    /// use timely::progress::Antichain;
259    /// use mz_persist_client::critical::SinceHandle;
260    /// use mz_persist_types::Codec64;
261    ///
262    /// # async fn example() {
263    /// let mut since: SinceHandle<String, String, u64, i64> = unimplemented!();
264    /// let new_since: Antichain<u64> = unimplemented!();
265    /// let res = since
266    ///     .maybe_compare_and_downgrade_since(
267    ///         &since.opaque().clone(),
268    ///         (&since.opaque().clone(), &new_since),
269    ///     )
270    ///     .await;
271    ///
272    /// match res {
273    ///     Some(Ok(_)) => {
274    ///         // woohoo!
275    ///     }
276    ///     Some(Err(_actual_opaque)) => {
277    ///         panic!("the opaque value should never change from the default");
278    ///     }
279    ///     None => {
280    ///         // no problem, we'll try again later
281    ///     }
282    /// };
283    /// # }
284    /// ```
285    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
286    pub async fn maybe_compare_and_downgrade_since(
287        &mut self,
288        expected: &Opaque,
289        new: (&Opaque, &Antichain<T>),
290    ) -> Option<Result<Antichain<T>, Opaque>> {
291        let elapsed_since_last_downgrade = Duration::from_millis(
292            (self.machine.applier.cfg.now)().saturating_sub(self.last_downgrade_since),
293        );
294        if elapsed_since_last_downgrade >= self.machine.applier.cfg.critical_downgrade_interval {
295            Some(self.compare_and_downgrade_since(expected, new).await)
296        } else {
297            None
298        }
299    }
300
301    /// Forwards the since capability of this handle to `new_since` iff the opaque value of this
302    /// handle's [CriticalReaderId] is `expected`, and `new_since` is beyond the
303    /// current `since`.
304    ///
305    /// Users are expected to call this function only when a guaranteed downgrade is necessary. All
306    /// other downgrades should preferably go through [Self::maybe_compare_and_downgrade_since]
307    /// which will automatically rate limit the operations.
308    ///
309    /// When returning `Ok(since)`, `since` will be set to the most recent value known for this
310    /// critical reader ID, and is guaranteed to be `!less_than(new_since)`.
311    ///
312    /// Because SinceHandles are expected to live beyond process lifetimes, it's possible for the
313    /// same [CriticalReaderId] to be used concurrently from multiple processes (either
314    /// intentionally or something like a zombie process). To discover this,
315    /// [Self::compare_and_downgrade_since] has "compare and set" semantics over an opaque value.
316    /// If the `expected` opaque value does not match state, an `Err` is returned and the caller
317    /// must decide how to handle it (likely a retry or a `halt!`).
318    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
319    pub async fn compare_and_downgrade_since(
320        &mut self,
321        expected: &Opaque,
322        new: (&Opaque, &Antichain<T>),
323    ) -> Result<Antichain<T>, Opaque> {
324        let (res, maintenance) = self
325            .machine
326            .compare_and_downgrade_since(&self.reader_id, expected, new)
327            .await;
328        self.last_downgrade_since = (self.machine.applier.cfg.now)();
329        maintenance.start_performing(&self.machine, &self.gc);
330        match res {
331            Ok(Since(since)) => {
332                self.since.clone_from(&since);
333                self.opaque.clone_from(new.0);
334                Ok(since)
335            }
336            Err((actual_opaque, since)) => {
337                self.since = since.0;
338                self.opaque.clone_from(&actual_opaque);
339                Err(actual_opaque)
340            }
341        }
342    }
343
344    /// Returns aggregate statistics about the contents of the shard TVC at the
345    /// given frontier.
346    ///
347    /// This command returns the contents of this shard as of `as_of` once they
348    /// are known. This may "block" (in an async-friendly way) if `as_of` is
349    /// greater or equal to the current `upper` of the shard. If `None` is given
350    /// for `as_of`, then the latest stats known by this process are used.
351    ///
352    /// The `Since` error indicates that the requested `as_of` cannot be served
353    /// (the caller has out of date information) and includes the smallest
354    /// `as_of` that would have been accepted.
355    pub fn snapshot_stats(
356        &self,
357        as_of: Option<Antichain<T>>,
358    ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
359        let machine = self.machine.clone();
360        async move {
361            let batches = match as_of {
362                Some(as_of) => machine.snapshot(&as_of).await?,
363                None => machine.applier.all_batches(),
364            };
365            let num_updates = batches.iter().map(|b| b.len).sum();
366            Ok(SnapshotStats {
367                shard_id: machine.shard_id(),
368                num_updates,
369            })
370        }
371    }
372
373    /// Upgrade the version associated with this shard, applying any state migrations. This
374    /// is irrevocable and will fence out old versions: it should only be run only once Materialize
375    /// has fully committed to the new version.
376    pub async fn upgrade_version(&self) -> Result<(), InvalidUsage<T>> {
377        match self.machine.upgrade_version().await {
378            Ok(maintenance) => {
379                let () = maintenance.perform(&self.machine, &self.gc).await;
380                Ok(())
381            }
382            Err(version) => Err(InvalidUsage::IncompatibleVersion { version }),
383        }
384    }
385
386    // Expiry temporarily removed.
387    // If you'd like to stop this handle from holding back the since of the shard,
388    // downgrade it to [].
389    // TODO(bkirwi): revert this when since behaviour on expiry has settled,
390    // or all readers are associated with a critical handle.
391}
392
393#[cfg(test)]
394mod tests {
395    use std::str::FromStr;
396
397    use mz_dyncfg::ConfigUpdates;
398    use serde::{Deserialize, Serialize};
399    use serde_json::json;
400
401    use crate::tests::new_test_client;
402    use crate::{Diagnostics, PersistClient, ShardId};
403
404    use super::*;
405
406    #[mz_ore::test]
407    fn reader_id_human_readable_serde() {
408        #[derive(Debug, Serialize, Deserialize)]
409        struct Container {
410            reader_id: CriticalReaderId,
411        }
412
413        // roundtrip through json
414        let id =
415            CriticalReaderId::from_str("c00000000-1234-5678-0000-000000000000").expect("valid id");
416        assert_eq!(
417            id,
418            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
419                .expect("deserializable")
420        );
421
422        // deserialize a serialized string directly
423        assert_eq!(
424            id,
425            serde_json::from_str("\"c00000000-1234-5678-0000-000000000000\"")
426                .expect("deserializable")
427        );
428
429        // roundtrip id through a container type
430        let json = json!({ "reader_id": id });
431        assert_eq!(
432            "{\"reader_id\":\"c00000000-1234-5678-0000-000000000000\"}",
433            &json.to_string()
434        );
435        let container: Container = serde_json::from_value(json).expect("deserializable");
436        assert_eq!(container.reader_id, id);
437    }
438
439    #[mz_persist_proc::test(tokio::test)]
440    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
441    async fn rate_limit(dyncfgs: ConfigUpdates) {
442        let client = crate::tests::new_test_client(&dyncfgs).await;
443
444        let shard_id = crate::ShardId::new();
445
446        let mut since = client
447            .open_critical_since::<(), (), u64, i64>(
448                shard_id,
449                CriticalReaderId::new(),
450                Opaque::encode(&i64::MIN),
451                Diagnostics::for_tests(),
452            )
453            .await
454            .expect("codec mismatch");
455
456        assert_eq!(since.opaque(), &Opaque::encode(&i64::MIN));
457
458        since
459            .compare_and_downgrade_since(
460                &Opaque::encode(&i64::MIN),
461                (&Opaque::encode(&5i64), &Antichain::from_elem(0)),
462            )
463            .await
464            .unwrap();
465
466        // should not fire, since we just had a successful `compare_and_downgrade_since` call
467        let noop = since
468            .maybe_compare_and_downgrade_since(
469                &Opaque::encode(&5i64),
470                (&Opaque::encode(&5i64), &Antichain::from_elem(0)),
471            )
472            .await;
473
474        assert_eq!(noop, None);
475    }
476
477    // Verifies that the handle updates its view of the opaque token correctly
478    #[mz_persist_proc::test(tokio::test)]
479    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
480    async fn handle_opaque_token(dyncfgs: ConfigUpdates) {
481        let client = new_test_client(&dyncfgs).await;
482        let shard_id = ShardId::new();
483
484        let mut since = client
485            .open_critical_since::<(), (), u64, i64>(
486                shard_id,
487                PersistClient::CONTROLLER_CRITICAL_SINCE,
488                Opaque::encode(&i64::MIN),
489                Diagnostics::for_tests(),
490            )
491            .await
492            .expect("codec mismatch");
493
494        // The token must be initialized to the default value
495        assert_eq!(since.opaque(), &Opaque::encode(&i64::MIN));
496
497        since
498            .compare_and_downgrade_since(
499                &Opaque::encode(&i64::MIN),
500                (&Opaque::encode(&5i64), &Antichain::from_elem(0)),
501            )
502            .await
503            .unwrap();
504
505        // Our view of the token must be updated now
506        assert_eq!(since.opaque(), &Opaque::encode(&5i64));
507
508        let since2 = client
509            .open_critical_since::<(), (), u64, i64>(
510                shard_id,
511                PersistClient::CONTROLLER_CRITICAL_SINCE,
512                Opaque::encode(&i64::MIN),
513                Diagnostics::for_tests(),
514            )
515            .await
516            .expect("codec mismatch");
517
518        // The token should still be 5
519        assert_eq!(since2.opaque(), &Opaque::encode(&5i64));
520    }
521}