mz_persist_client/critical.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Since capabilities and handles
11
12use std::fmt::Debug;
13use std::future::Future;
14use std::time::Duration;
15
16use differential_dataflow::difference::Semigroup;
17use differential_dataflow::lattice::Lattice;
18use mz_ore::instrument;
19use mz_ore::now::EpochMillis;
20use mz_persist_types::{Codec, Codec64, Opaque};
21use proptest_derive::Arbitrary;
22use serde::{Deserialize, Serialize};
23use timely::progress::{Antichain, Timestamp};
24use uuid::Uuid;
25
26use crate::internal::machine::Machine;
27use crate::internal::state::Since;
28use crate::stats::SnapshotStats;
29use crate::{GarbageCollector, ShardId, parse_id};
30
31/// An opaque identifier for a reader of a persist durable TVC (aka shard).
32#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
33#[serde(try_from = "String", into = "String")]
34pub struct CriticalReaderId(pub(crate) [u8; 16]);
35
36impl std::fmt::Display for CriticalReaderId {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 write!(f, "c{}", Uuid::from_bytes(self.0))
39 }
40}
41
42impl std::fmt::Debug for CriticalReaderId {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(f, "CriticalReaderId({})", Uuid::from_bytes(self.0))
45 }
46}
47
48impl std::str::FromStr for CriticalReaderId {
49 type Err = String;
50
51 fn from_str(s: &str) -> Result<Self, Self::Err> {
52 parse_id('c', "CriticalReaderId", s).map(CriticalReaderId)
53 }
54}
55
56impl From<CriticalReaderId> for String {
57 fn from(reader_id: CriticalReaderId) -> Self {
58 reader_id.to_string()
59 }
60}
61
62impl TryFrom<String> for CriticalReaderId {
63 type Error = String;
64
65 fn try_from(s: String) -> Result<Self, Self::Error> {
66 s.parse()
67 }
68}
69
70impl CriticalReaderId {
71 /// Returns a random [CriticalReaderId] that is reasonably likely to have
72 /// never been generated before.
73 ///
74 /// This is intentionally public, unlike [crate::read::LeasedReaderId] and
75 /// [crate::write::WriterId], because [SinceHandle]s are expected to live
76 /// beyond process lifetimes.
77 pub fn new() -> Self {
78 CriticalReaderId(*Uuid::new_v4().as_bytes())
79 }
80}
81
82/// A "capability" granting the ability to hold back the `since` frontier of a
83/// shard.
84///
85/// In contrast to [crate::read::ReadHandle], which is time-leased, this handle
86/// and its associated capability are not leased.
87/// A SinceHandle does not release its capability when dropped.
88/// This is less ergonomic,
89/// but useful for "critical" since holds which must survive even lease timeouts.
90///
91/// **IMPORTANT**: The above means that if a SinceHandle is registered and then
92/// lost, the shard's since will be permanently "stuck", forever preventing
93/// logical compaction. Users are advised to durably record (preferably in code)
94/// the intended [CriticalReaderId] _before_ registering a SinceHandle (in case
95/// the process crashes at the wrong time).
96///
97/// All async methods on SinceHandle retry for as long as they are able, but the
98/// returned [std::future::Future]s implement "cancel on drop" semantics. This
99/// means that callers can add a timeout using [tokio::time::timeout] or
100/// [tokio::time::timeout_at].
101#[derive(Debug)]
102pub struct SinceHandle<K: Codec, V: Codec, T, D, O> {
103 pub(crate) machine: Machine<K, V, T, D>,
104 pub(crate) gc: GarbageCollector<K, V, T, D>,
105 pub(crate) reader_id: CriticalReaderId,
106
107 since: Antichain<T>,
108 opaque: O,
109 last_downgrade_since: EpochMillis,
110}
111
112impl<K, V, T, D, O> SinceHandle<K, V, T, D, O>
113where
114 K: Debug + Codec,
115 V: Debug + Codec,
116 T: Timestamp + Lattice + Codec64 + Sync,
117 D: Semigroup + Codec64 + Send + Sync,
118 O: Opaque + Codec64,
119{
120 pub(crate) fn new(
121 machine: Machine<K, V, T, D>,
122 gc: GarbageCollector<K, V, T, D>,
123 reader_id: CriticalReaderId,
124 since: Antichain<T>,
125 opaque: O,
126 ) -> Self {
127 SinceHandle {
128 machine,
129 gc,
130 reader_id,
131 since,
132 opaque,
133 last_downgrade_since: EpochMillis::default(),
134 }
135 }
136
137 /// This handle's shard id.
138 pub fn shard_id(&self) -> ShardId {
139 self.machine.shard_id()
140 }
141
142 /// This handle's `since` capability.
143 ///
144 /// This will always be greater or equal to the shard-global `since`.
145 pub fn since(&self) -> &Antichain<T> {
146 &self.since
147 }
148
149 /// This handle's `opaque`.
150 pub fn opaque(&self) -> &O {
151 &self.opaque
152 }
153
154 /// Attempts to forward the since capability of this handle to `new_since` iff
155 /// the opaque value of this handle's [CriticalReaderId] is `expected`, and
156 /// [Self::maybe_compare_and_downgrade_since] chooses to perform the downgrade.
157 ///
158 /// Users are expected to call this function frequently, but should not expect
159 /// `since` to be downgraded with each call -- this function is free to no-op
160 /// requests to perform rate-limiting for downstream services. A `None` is returned
161 /// for no-op requests, and `Some` is returned when downgrading since.
162 ///
163 /// When returning `Some(since)`, `since` will be set to the most recent value
164 /// known for this critical reader ID, and is guaranteed to be `!less_than(new_since)`.
165 ///
166 /// Because SinceHandles are expected to live beyond process lifetimes, it's
167 /// possible for the same [CriticalReaderId] to be used concurrently from
168 /// multiple processes (either intentionally or something like a zombie
169 /// process). To discover this, [Self::maybe_compare_and_downgrade_since] has
170 /// "compare and set" semantics over an opaque value. If the `expected` opaque
171 /// value does not match state, an `Err` is returned and the caller must decide
172 /// how to handle it (likely a retry or a `halt!`).
173 ///
174 /// If desired, users may use the opaque value to fence out concurrent access
175 /// of other [SinceHandle]s for a given [CriticalReaderId]. e.g.:
176 ///
177 /// ```rust,no_run
178 /// use timely::progress::Antichain;
179 /// use mz_persist_client::critical::SinceHandle;
180 /// use mz_persist_types::Codec64;
181 ///
182 /// # async fn example() {
183 /// let fencing_token: u64 = unimplemented!();
184 /// let mut since: SinceHandle<String, String, u64, i64, u64> = unimplemented!();
185 ///
186 /// let new_since: Antichain<u64> = unimplemented!();
187 /// let res = since
188 /// .maybe_compare_and_downgrade_since(
189 /// &since.opaque().clone(),
190 /// (&fencing_token, &new_since),
191 /// )
192 /// .await;
193 ///
194 /// match res {
195 /// Some(Ok(_)) => {
196 /// // we downgraded since!
197 /// }
198 /// Some(Err(actual_fencing_token)) => {
199 /// // compare `fencing_token` and `actual_fencing_token`, etc
200 /// }
201 /// None => {
202 /// // no problem, we'll try again later
203 /// }
204 /// }
205 /// # }
206 /// ```
207 ///
208 /// If fencing is not required and it's acceptable to have concurrent [SinceHandle] for
209 /// a given [CriticalReaderId], the opaque value can be given a default value and ignored:
210 ///
211 /// ```rust,no_run
212 /// use timely::progress::Antichain;
213 /// use mz_persist_client::critical::SinceHandle;
214 /// use mz_persist_types::Codec64;
215 ///
216 /// # async fn example() {
217 /// let mut since: SinceHandle<String, String, u64, i64, u64> = unimplemented!();
218 /// let new_since: Antichain<u64> = unimplemented!();
219 /// let res = since
220 /// .maybe_compare_and_downgrade_since(
221 /// &since.opaque().clone(),
222 /// (&since.opaque().clone(), &new_since),
223 /// )
224 /// .await;
225 ///
226 /// match res {
227 /// Some(Ok(_)) => {
228 /// // woohoo!
229 /// }
230 /// Some(Err(_actual_opaque)) => {
231 /// panic!("the opaque value should never change from the default");
232 /// }
233 /// None => {
234 /// // no problem, we'll try again later
235 /// }
236 /// };
237 /// # }
238 /// ```
239 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
240 pub async fn maybe_compare_and_downgrade_since(
241 &mut self,
242 expected: &O,
243 new: (&O, &Antichain<T>),
244 ) -> Option<Result<Antichain<T>, O>> {
245 let elapsed_since_last_downgrade = Duration::from_millis(
246 (self.machine.applier.cfg.now)().saturating_sub(self.last_downgrade_since),
247 );
248 if elapsed_since_last_downgrade >= self.machine.applier.cfg.critical_downgrade_interval {
249 Some(self.compare_and_downgrade_since(expected, new).await)
250 } else {
251 None
252 }
253 }
254
255 /// Forwards the since capability of this handle to `new_since` iff the opaque value of this
256 /// handle's [CriticalReaderId] is `expected`, and `new_since` is beyond the
257 /// current `since`.
258 ///
259 /// Users are expected to call this function only when a guaranteed downgrade is necessary. All
260 /// other downgrades should preferably go through [Self::maybe_compare_and_downgrade_since]
261 /// which will automatically rate limit the operations.
262 ///
263 /// When returning `Ok(since)`, `since` will be set to the most recent value known for this
264 /// critical reader ID, and is guaranteed to be `!less_than(new_since)`.
265 ///
266 /// Because SinceHandles are expected to live beyond process lifetimes, it's possible for the
267 /// same [CriticalReaderId] to be used concurrently from multiple processes (either
268 /// intentionally or something like a zombie process). To discover this,
269 /// [Self::compare_and_downgrade_since] has "compare and set" semantics over an opaque value.
270 /// If the `expected` opaque value does not match state, an `Err` is returned and the caller
271 /// must decide how to handle it (likely a retry or a `halt!`).
272 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
273 pub async fn compare_and_downgrade_since(
274 &mut self,
275 expected: &O,
276 new: (&O, &Antichain<T>),
277 ) -> Result<Antichain<T>, O> {
278 let (res, maintenance) = self
279 .machine
280 .compare_and_downgrade_since(&self.reader_id, expected, new)
281 .await;
282 self.last_downgrade_since = (self.machine.applier.cfg.now)();
283 maintenance.start_performing(&self.machine, &self.gc);
284 match res {
285 Ok(Since(since)) => {
286 self.since.clone_from(&since);
287 self.opaque.clone_from(new.0);
288 Ok(since)
289 }
290 Err((actual_opaque, since)) => {
291 self.since = since.0;
292 self.opaque.clone_from(&actual_opaque);
293 Err(actual_opaque)
294 }
295 }
296 }
297
298 /// Returns aggregate statistics about the contents of the shard TVC at the
299 /// given frontier.
300 ///
301 /// This command returns the contents of this shard as of `as_of` once they
302 /// are known. This may "block" (in an async-friendly way) if `as_of` is
303 /// greater or equal to the current `upper` of the shard. If `None` is given
304 /// for `as_of`, then the latest stats known by this process are used.
305 ///
306 /// The `Since` error indicates that the requested `as_of` cannot be served
307 /// (the caller has out of date information) and includes the smallest
308 /// `as_of` that would have been accepted.
309 pub fn snapshot_stats(
310 &self,
311 as_of: Option<Antichain<T>>,
312 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
313 let machine = self.machine.clone();
314 async move {
315 let batches = match as_of {
316 Some(as_of) => machine.snapshot(&as_of).await?,
317 None => machine.applier.all_batches(),
318 };
319 let num_updates = batches.iter().map(|b| b.len).sum();
320 Ok(SnapshotStats {
321 shard_id: machine.shard_id(),
322 num_updates,
323 })
324 }
325 }
326
327 // Expiry temporarily removed.
328 // If you'd like to stop this handle from holding back the since of the shard,
329 // downgrade it to [].
330 // TODO(bkirwi): revert this when since behaviour on expiry has settled,
331 // or all readers are associated with a critical handle.
332}
333
334#[cfg(test)]
335mod tests {
336 use std::str::FromStr;
337
338 use mz_dyncfg::ConfigUpdates;
339 use serde::{Deserialize, Serialize};
340 use serde_json::json;
341
342 use crate::tests::new_test_client;
343 use crate::{Diagnostics, PersistClient, ShardId};
344
345 use super::*;
346
347 #[mz_ore::test]
348 fn reader_id_human_readable_serde() {
349 #[derive(Debug, Serialize, Deserialize)]
350 struct Container {
351 reader_id: CriticalReaderId,
352 }
353
354 // roundtrip through json
355 let id =
356 CriticalReaderId::from_str("c00000000-1234-5678-0000-000000000000").expect("valid id");
357 assert_eq!(
358 id,
359 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
360 .expect("deserializable")
361 );
362
363 // deserialize a serialized string directly
364 assert_eq!(
365 id,
366 serde_json::from_str("\"c00000000-1234-5678-0000-000000000000\"")
367 .expect("deserializable")
368 );
369
370 // roundtrip id through a container type
371 let json = json!({ "reader_id": id });
372 assert_eq!(
373 "{\"reader_id\":\"c00000000-1234-5678-0000-000000000000\"}",
374 &json.to_string()
375 );
376 let container: Container = serde_json::from_value(json).expect("deserializable");
377 assert_eq!(container.reader_id, id);
378 }
379
380 #[mz_persist_proc::test(tokio::test)]
381 #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
382 async fn rate_limit(dyncfgs: ConfigUpdates) {
383 let client = crate::tests::new_test_client(&dyncfgs).await;
384
385 let shard_id = crate::ShardId::new();
386
387 let mut since = client
388 .open_critical_since::<(), (), u64, i64, i64>(
389 shard_id,
390 CriticalReaderId::new(),
391 Diagnostics::for_tests(),
392 )
393 .await
394 .expect("codec mismatch");
395
396 assert_eq!(since.opaque(), &i64::initial());
397
398 since
399 .compare_and_downgrade_since(&i64::initial(), (&5, &Antichain::from_elem(0)))
400 .await
401 .unwrap();
402
403 // should not fire, since we just had a successful `compare_and_downgrade_since` call
404 let noop = since
405 .maybe_compare_and_downgrade_since(&5, (&5, &Antichain::from_elem(0)))
406 .await;
407
408 assert_eq!(noop, None);
409 }
410
411 // Verifies that the handle updates its view of the opaque token correctly
412 #[mz_persist_proc::test(tokio::test)]
413 #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
414 async fn handle_opaque_token(dyncfgs: ConfigUpdates) {
415 let client = new_test_client(&dyncfgs).await;
416 let shard_id = ShardId::new();
417
418 let mut since = client
419 .open_critical_since::<(), (), u64, i64, i64>(
420 shard_id,
421 PersistClient::CONTROLLER_CRITICAL_SINCE,
422 Diagnostics::for_tests(),
423 )
424 .await
425 .expect("codec mismatch");
426
427 // The token must be initialized to the default value
428 assert_eq!(since.opaque(), &i64::MIN);
429
430 since
431 .compare_and_downgrade_since(&i64::MIN, (&5, &Antichain::from_elem(0)))
432 .await
433 .unwrap();
434
435 // Our view of the token must be updated now
436 assert_eq!(since.opaque(), &5);
437
438 let since2 = client
439 .open_critical_since::<(), (), u64, i64, i64>(
440 shard_id,
441 PersistClient::CONTROLLER_CRITICAL_SINCE,
442 Diagnostics::for_tests(),
443 )
444 .await
445 .expect("codec mismatch");
446
447 // The token should still be 5
448 assert_eq!(since2.opaque(), &5);
449 }
450}