mz_persist_client/critical.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Since capabilities and handles
11
12use std::fmt::Debug;
13use std::future::Future;
14use std::time::Duration;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use mz_ore::now::EpochMillis;
19use mz_ore::{instrument, soft_assert_eq_or_log};
20use mz_persist_types::{Codec, Codec64};
21use proptest_derive::Arbitrary;
22use serde::ser::SerializeTupleStruct;
23use serde::{Deserialize, Serialize, Serializer};
24use timely::progress::{Antichain, Timestamp};
25use uuid::Uuid;
26
27use crate::error::InvalidUsage;
28use crate::internal::machine::Machine;
29use crate::internal::state::Since;
30use crate::stats::SnapshotStats;
31use crate::{GarbageCollector, ShardId, parse_id};
32
33/// An opaque identifier for a reader of a persist durable TVC (aka shard).
34#[derive(
35 Arbitrary,
36 Clone,
37 PartialEq,
38 Eq,
39 PartialOrd,
40 Ord,
41 Hash,
42 Serialize,
43 Deserialize
44)]
45#[serde(try_from = "String", into = "String")]
46pub struct CriticalReaderId(pub(crate) [u8; 16]);
47
48impl std::fmt::Display for CriticalReaderId {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 write!(f, "c{}", Uuid::from_bytes(self.0))
51 }
52}
53
54impl std::fmt::Debug for CriticalReaderId {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 write!(f, "CriticalReaderId({})", Uuid::from_bytes(self.0))
57 }
58}
59
60impl std::str::FromStr for CriticalReaderId {
61 type Err = String;
62
63 fn from_str(s: &str) -> Result<Self, Self::Err> {
64 parse_id("c", "CriticalReaderId", s).map(CriticalReaderId)
65 }
66}
67
68impl From<CriticalReaderId> for String {
69 fn from(reader_id: CriticalReaderId) -> Self {
70 reader_id.to_string()
71 }
72}
73
74impl TryFrom<String> for CriticalReaderId {
75 type Error = String;
76
77 fn try_from(s: String) -> Result<Self, Self::Error> {
78 s.parse()
79 }
80}
81
82impl CriticalReaderId {
83 /// Returns a random [CriticalReaderId] that is reasonably likely to have
84 /// never been generated before.
85 ///
86 /// This is intentionally public, unlike [crate::read::LeasedReaderId] and
87 /// [crate::write::WriterId], because [SinceHandle]s are expected to live
88 /// beyond process lifetimes.
89 pub fn new() -> Self {
90 CriticalReaderId(*Uuid::new_v4().as_bytes())
91 }
92}
93
94/// An opaque fencing token used in compare_and_downgrade_since.
95#[derive(Arbitrary, Debug, Clone, PartialEq)]
96pub struct Opaque(pub(crate) String, pub(crate) [u8; 8]);
97
98impl Serialize for Opaque {
99 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
100 where
101 S: Serializer,
102 {
103 let mut serializer = serializer.serialize_tuple_struct("Opaque", 2)?;
104 serializer.serialize_field(&self.0)?;
105 serializer.serialize_field(&u64::from_le_bytes(self.1))?;
106 serializer.end()
107 }
108}
109
110impl Opaque {
111 /// The name of the codec used to encode this token.
112 pub fn codec_name(&self) -> &str {
113 &self.0
114 }
115
116 /// Decode this token as the given type.
117 pub fn decode<T: Codec64>(&self) -> T {
118 soft_assert_eq_or_log!(T::codec_name(), self.0);
119 T::decode(self.1)
120 }
121
122 /// Encode the given 64-bit type as an opaque token. This records both the name of the type/encoding
123 /// and the 64-bit value itself.
124 pub fn encode<T: Codec64>(value: &T) -> Self {
125 Self(T::codec_name(), T::encode(value))
126 }
127}
128
129/// A "capability" granting the ability to hold back the `since` frontier of a
130/// shard.
131///
132/// In contrast to [crate::read::ReadHandle], which is time-leased, this handle
133/// and its associated capability are not leased.
134/// A SinceHandle does not release its capability when dropped.
135/// This is less ergonomic,
136/// but useful for "critical" since holds which must survive even lease timeouts.
137///
138/// **IMPORTANT**: The above means that if a SinceHandle is registered and then
139/// lost, the shard's since will be permanently "stuck", forever preventing
140/// logical compaction. Users are advised to durably record (preferably in code)
141/// the intended [CriticalReaderId] _before_ registering a SinceHandle (in case
142/// the process crashes at the wrong time).
143///
144/// All async methods on SinceHandle retry for as long as they are able, but the
145/// returned [std::future::Future]s implement "cancel on drop" semantics. This
146/// means that callers can add a timeout using [tokio::time::timeout] or
147/// [tokio::time::timeout_at].
148#[derive(Debug)]
149pub struct SinceHandle<K: Codec, V: Codec, T, D> {
150 pub(crate) machine: Machine<K, V, T, D>,
151 pub(crate) gc: GarbageCollector<K, V, T, D>,
152 pub(crate) reader_id: CriticalReaderId,
153
154 since: Antichain<T>,
155 opaque: Opaque,
156 last_downgrade_since: EpochMillis,
157}
158
159impl<K, V, T, D> SinceHandle<K, V, T, D>
160where
161 K: Debug + Codec,
162 V: Debug + Codec,
163 T: Timestamp + Lattice + Codec64 + Sync,
164 D: Monoid + Codec64 + Send + Sync,
165{
166 pub(crate) fn new(
167 machine: Machine<K, V, T, D>,
168 gc: GarbageCollector<K, V, T, D>,
169 reader_id: CriticalReaderId,
170 since: Antichain<T>,
171 opaque: Opaque,
172 ) -> Self {
173 SinceHandle {
174 machine,
175 gc,
176 reader_id,
177 since,
178 opaque,
179 last_downgrade_since: EpochMillis::default(),
180 }
181 }
182
183 /// This handle's shard id.
184 pub fn shard_id(&self) -> ShardId {
185 self.machine.shard_id()
186 }
187
188 /// This handle's `since` capability.
189 ///
190 /// This will always be greater or equal to the shard-global `since`.
191 pub fn since(&self) -> &Antichain<T> {
192 &self.since
193 }
194
195 /// This handle's `opaque`.
196 pub fn opaque(&self) -> &Opaque {
197 &self.opaque
198 }
199
200 /// Attempts to forward the since capability of this handle to `new_since` iff
201 /// the opaque value of this handle's [CriticalReaderId] is `expected`, and
202 /// [Self::maybe_compare_and_downgrade_since] chooses to perform the downgrade.
203 ///
204 /// Users are expected to call this function frequently, but should not expect
205 /// `since` to be downgraded with each call -- this function is free to no-op
206 /// requests to perform rate-limiting for downstream services. A `None` is returned
207 /// for no-op requests, and `Some` is returned when downgrading since.
208 ///
209 /// When returning `Some(since)`, `since` will be set to the most recent value
210 /// known for this critical reader ID, and is guaranteed to be `!less_than(new_since)`.
211 ///
212 /// Because SinceHandles are expected to live beyond process lifetimes, it's
213 /// possible for the same [CriticalReaderId] to be used concurrently from
214 /// multiple processes (either intentionally or something like a zombie
215 /// process). To discover this, [Self::maybe_compare_and_downgrade_since] has
216 /// "compare and set" semantics over an opaque value. If the `expected` opaque
217 /// value does not match state, an `Err` is returned and the caller must decide
218 /// how to handle it (likely a retry or a `halt!`).
219 ///
220 /// If desired, users may use the opaque value to fence out concurrent access
221 /// of other [SinceHandle]s for a given [CriticalReaderId]. e.g.:
222 ///
223 /// ```rust,no_run
224 /// use timely::progress::Antichain;
225 /// use mz_persist_client::critical::{SinceHandle, Opaque};
226 /// use mz_persist_types::Codec64;
227 ///
228 /// # async fn example() {
229 /// let fencing_token: Opaque = unimplemented!();
230 /// let mut since: SinceHandle<String, String, u64, i64> = unimplemented!();
231 ///
232 /// let new_since: Antichain<u64> = unimplemented!();
233 /// let res = since
234 /// .maybe_compare_and_downgrade_since(
235 /// &since.opaque().clone(),
236 /// (&fencing_token, &new_since),
237 /// )
238 /// .await;
239 ///
240 /// match res {
241 /// Some(Ok(_)) => {
242 /// // we downgraded since!
243 /// }
244 /// Some(Err(actual_fencing_token)) => {
245 /// // compare `fencing_token` and `actual_fencing_token`, etc
246 /// }
247 /// None => {
248 /// // no problem, we'll try again later
249 /// }
250 /// }
251 /// # }
252 /// ```
253 ///
254 /// If fencing is not required and it's acceptable to have concurrent [SinceHandle] for
255 /// a given [CriticalReaderId], the opaque value can be given a default value and ignored:
256 ///
257 /// ```rust,no_run
258 /// use timely::progress::Antichain;
259 /// use mz_persist_client::critical::SinceHandle;
260 /// use mz_persist_types::Codec64;
261 ///
262 /// # async fn example() {
263 /// let mut since: SinceHandle<String, String, u64, i64> = unimplemented!();
264 /// let new_since: Antichain<u64> = unimplemented!();
265 /// let res = since
266 /// .maybe_compare_and_downgrade_since(
267 /// &since.opaque().clone(),
268 /// (&since.opaque().clone(), &new_since),
269 /// )
270 /// .await;
271 ///
272 /// match res {
273 /// Some(Ok(_)) => {
274 /// // woohoo!
275 /// }
276 /// Some(Err(_actual_opaque)) => {
277 /// panic!("the opaque value should never change from the default");
278 /// }
279 /// None => {
280 /// // no problem, we'll try again later
281 /// }
282 /// };
283 /// # }
284 /// ```
285 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
286 pub async fn maybe_compare_and_downgrade_since(
287 &mut self,
288 expected: &Opaque,
289 new: (&Opaque, &Antichain<T>),
290 ) -> Option<Result<Antichain<T>, Opaque>> {
291 let elapsed_since_last_downgrade = Duration::from_millis(
292 (self.machine.applier.cfg.now)().saturating_sub(self.last_downgrade_since),
293 );
294 if elapsed_since_last_downgrade >= self.machine.applier.cfg.critical_downgrade_interval {
295 Some(self.compare_and_downgrade_since(expected, new).await)
296 } else {
297 None
298 }
299 }
300
301 /// Forwards the since capability of this handle to `new_since` iff the opaque value of this
302 /// handle's [CriticalReaderId] is `expected`, and `new_since` is beyond the
303 /// current `since`.
304 ///
305 /// Users are expected to call this function only when a guaranteed downgrade is necessary. All
306 /// other downgrades should preferably go through [Self::maybe_compare_and_downgrade_since]
307 /// which will automatically rate limit the operations.
308 ///
309 /// When returning `Ok(since)`, `since` will be set to the most recent value known for this
310 /// critical reader ID, and is guaranteed to be `!less_than(new_since)`.
311 ///
312 /// Because SinceHandles are expected to live beyond process lifetimes, it's possible for the
313 /// same [CriticalReaderId] to be used concurrently from multiple processes (either
314 /// intentionally or something like a zombie process). To discover this,
315 /// [Self::compare_and_downgrade_since] has "compare and set" semantics over an opaque value.
316 /// If the `expected` opaque value does not match state, an `Err` is returned and the caller
317 /// must decide how to handle it (likely a retry or a `halt!`).
318 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
319 pub async fn compare_and_downgrade_since(
320 &mut self,
321 expected: &Opaque,
322 new: (&Opaque, &Antichain<T>),
323 ) -> Result<Antichain<T>, Opaque> {
324 let (res, maintenance) = self
325 .machine
326 .compare_and_downgrade_since(&self.reader_id, expected, new)
327 .await;
328 self.last_downgrade_since = (self.machine.applier.cfg.now)();
329 maintenance.start_performing(&self.machine, &self.gc);
330 match res {
331 Ok(Since(since)) => {
332 self.since.clone_from(&since);
333 self.opaque.clone_from(new.0);
334 Ok(since)
335 }
336 Err((actual_opaque, since)) => {
337 self.since = since.0;
338 self.opaque.clone_from(&actual_opaque);
339 Err(actual_opaque)
340 }
341 }
342 }
343
344 /// Returns aggregate statistics about the contents of the shard TVC at the
345 /// given frontier.
346 ///
347 /// This command returns the contents of this shard as of `as_of` once they
348 /// are known. This may "block" (in an async-friendly way) if `as_of` is
349 /// greater or equal to the current `upper` of the shard. If `None` is given
350 /// for `as_of`, then the latest stats known by this process are used.
351 ///
352 /// The `Since` error indicates that the requested `as_of` cannot be served
353 /// (the caller has out of date information) and includes the smallest
354 /// `as_of` that would have been accepted.
355 pub fn snapshot_stats(
356 &self,
357 as_of: Option<Antichain<T>>,
358 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
359 let machine = self.machine.clone();
360 async move {
361 let batches = match as_of {
362 Some(as_of) => machine.snapshot(&as_of).await?,
363 None => machine.applier.all_batches(),
364 };
365 let num_updates = batches.iter().map(|b| b.len).sum();
366 Ok(SnapshotStats {
367 shard_id: machine.shard_id(),
368 num_updates,
369 })
370 }
371 }
372
373 /// Upgrade the version associated with this shard, applying any state migrations. This
374 /// is irrevocable and will fence out old versions: it should only be run only once Materialize
375 /// has fully committed to the new version.
376 pub async fn upgrade_version(&self) -> Result<(), InvalidUsage<T>> {
377 match self.machine.upgrade_version().await {
378 Ok(maintenance) => {
379 let () = maintenance.perform(&self.machine, &self.gc).await;
380 Ok(())
381 }
382 Err(version) => Err(InvalidUsage::IncompatibleVersion { version }),
383 }
384 }
385
386 // Expiry temporarily removed.
387 // If you'd like to stop this handle from holding back the since of the shard,
388 // downgrade it to [].
389 // TODO(bkirwi): revert this when since behaviour on expiry has settled,
390 // or all readers are associated with a critical handle.
391}
392
393#[cfg(test)]
394mod tests {
395 use std::str::FromStr;
396
397 use mz_dyncfg::ConfigUpdates;
398 use serde::{Deserialize, Serialize};
399 use serde_json::json;
400
401 use crate::tests::new_test_client;
402 use crate::{Diagnostics, PersistClient, ShardId};
403
404 use super::*;
405
406 #[mz_ore::test]
407 fn reader_id_human_readable_serde() {
408 #[derive(Debug, Serialize, Deserialize)]
409 struct Container {
410 reader_id: CriticalReaderId,
411 }
412
413 // roundtrip through json
414 let id =
415 CriticalReaderId::from_str("c00000000-1234-5678-0000-000000000000").expect("valid id");
416 assert_eq!(
417 id,
418 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
419 .expect("deserializable")
420 );
421
422 // deserialize a serialized string directly
423 assert_eq!(
424 id,
425 serde_json::from_str("\"c00000000-1234-5678-0000-000000000000\"")
426 .expect("deserializable")
427 );
428
429 // roundtrip id through a container type
430 let json = json!({ "reader_id": id });
431 assert_eq!(
432 "{\"reader_id\":\"c00000000-1234-5678-0000-000000000000\"}",
433 &json.to_string()
434 );
435 let container: Container = serde_json::from_value(json).expect("deserializable");
436 assert_eq!(container.reader_id, id);
437 }
438
439 #[mz_persist_proc::test(tokio::test)]
440 #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
441 async fn rate_limit(dyncfgs: ConfigUpdates) {
442 let client = crate::tests::new_test_client(&dyncfgs).await;
443
444 let shard_id = crate::ShardId::new();
445
446 let mut since = client
447 .open_critical_since::<(), (), u64, i64>(
448 shard_id,
449 CriticalReaderId::new(),
450 Opaque::encode(&i64::MIN),
451 Diagnostics::for_tests(),
452 )
453 .await
454 .expect("codec mismatch");
455
456 assert_eq!(since.opaque(), &Opaque::encode(&i64::MIN));
457
458 since
459 .compare_and_downgrade_since(
460 &Opaque::encode(&i64::MIN),
461 (&Opaque::encode(&5i64), &Antichain::from_elem(0)),
462 )
463 .await
464 .unwrap();
465
466 // should not fire, since we just had a successful `compare_and_downgrade_since` call
467 let noop = since
468 .maybe_compare_and_downgrade_since(
469 &Opaque::encode(&5i64),
470 (&Opaque::encode(&5i64), &Antichain::from_elem(0)),
471 )
472 .await;
473
474 assert_eq!(noop, None);
475 }
476
477 // Verifies that the handle updates its view of the opaque token correctly
478 #[mz_persist_proc::test(tokio::test)]
479 #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
480 async fn handle_opaque_token(dyncfgs: ConfigUpdates) {
481 let client = new_test_client(&dyncfgs).await;
482 let shard_id = ShardId::new();
483
484 let mut since = client
485 .open_critical_since::<(), (), u64, i64>(
486 shard_id,
487 PersistClient::CONTROLLER_CRITICAL_SINCE,
488 Opaque::encode(&i64::MIN),
489 Diagnostics::for_tests(),
490 )
491 .await
492 .expect("codec mismatch");
493
494 // The token must be initialized to the default value
495 assert_eq!(since.opaque(), &Opaque::encode(&i64::MIN));
496
497 since
498 .compare_and_downgrade_since(
499 &Opaque::encode(&i64::MIN),
500 (&Opaque::encode(&5i64), &Antichain::from_elem(0)),
501 )
502 .await
503 .unwrap();
504
505 // Our view of the token must be updated now
506 assert_eq!(since.opaque(), &Opaque::encode(&5i64));
507
508 let since2 = client
509 .open_critical_since::<(), (), u64, i64>(
510 shard_id,
511 PersistClient::CONTROLLER_CRITICAL_SINCE,
512 Opaque::encode(&i64::MIN),
513 Diagnostics::for_tests(),
514 )
515 .await
516 .expect("codec mismatch");
517
518 // The token should still be 5
519 assert_eq!(since2.opaque(), &Opaque::encode(&5i64));
520 }
521}