Skip to main content

mz_persist/
location.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Abstractions over files, cloud storage, etc used in persistence.
11
12use std::fmt;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use azure_core::StatusCode;
20use bytes::Bytes;
21use futures_util::Stream;
22use mz_ore::bytes::SegmentedBytes;
23use mz_ore::cast::u64_to_usize;
24use mz_postgres_client::error::PostgresError;
25use mz_proto::RustType;
26use proptest_derive::Arbitrary;
27use serde::{Deserialize, Serialize};
28use tracing::{Instrument, Span};
29
30use crate::error::Error;
31
32/// The "sequence number" of a persist state change.
33///
34/// Persist is a state machine, with all mutating requests modeled as input
35/// state changes sequenced into a log. This reflects that ordering.
36///
37/// This ordering also includes requests that were sequenced and applied to the
38/// persist state machine, but that application was deterministically made into
39/// a no-op because it was contextually invalid (a write or seal at a sealed
40/// timestamp, an allow_compactions at an unsealed timestamp, etc).
41///
42/// Read-only requests are assigned the SeqNo of a write, indicating that all
43/// mutating requests up to and including that one are reflected in the read
44/// state.
45#[derive(
46    Arbitrary,
47    Clone,
48    Copy,
49    Debug,
50    PartialOrd,
51    Ord,
52    PartialEq,
53    Eq,
54    Hash,
55    Serialize,
56    Deserialize
57)]
58pub struct SeqNo(pub u64);
59
60impl std::fmt::Display for SeqNo {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        write!(f, "v{}", self.0)
63    }
64}
65
66impl timely::PartialOrder for SeqNo {
67    fn less_equal(&self, other: &Self) -> bool {
68        self <= other
69    }
70}
71
72impl std::str::FromStr for SeqNo {
73    type Err = String;
74
75    fn from_str(encoded: &str) -> Result<Self, Self::Err> {
76        let encoded = match encoded.strip_prefix('v') {
77            Some(x) => x,
78            None => return Err(format!("invalid SeqNo {}: incorrect prefix", encoded)),
79        };
80        let seqno =
81            u64::from_str(encoded).map_err(|err| format!("invalid SeqNo {}: {}", encoded, err))?;
82        Ok(SeqNo(seqno))
83    }
84}
85
86impl SeqNo {
87    /// Returns the next SeqNo in the sequence.
88    pub fn next(self) -> SeqNo {
89        SeqNo(self.0 + 1)
90    }
91
92    /// A minimum value suitable as a default.
93    pub fn minimum() -> Self {
94        SeqNo(0)
95    }
96
97    /// A maximum value.
98    pub fn maximum() -> Self {
99        SeqNo(u64::MAX)
100    }
101}
102
103impl RustType<u64> for SeqNo {
104    fn into_proto(&self) -> u64 {
105        self.0
106    }
107
108    fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
109        Ok(SeqNo(proto))
110    }
111}
112
113/// An error coming from an underlying durability system (e.g. s3) indicating
114/// that the operation _definitely did NOT succeed_ (e.g. permission denied).
115#[derive(Debug)]
116pub struct Determinate {
117    inner: anyhow::Error,
118}
119
120impl std::fmt::Display for Determinate {
121    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122        write!(f, "determinate: ")?;
123        self.inner.fmt(f)
124    }
125}
126
127impl std::error::Error for Determinate {
128    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
129        self.inner.source()
130    }
131}
132
133impl From<anyhow::Error> for Determinate {
134    fn from(inner: anyhow::Error) -> Self {
135        Self::new(inner)
136    }
137}
138
139impl Determinate {
140    /// Return a new Determinate wrapping the given error.
141    ///
142    /// Exposed for testing via [crate::unreliable].
143    pub fn new(inner: anyhow::Error) -> Self {
144        Determinate { inner }
145    }
146}
147
148/// An error coming from an underlying durability system (e.g. s3) indicating
149/// that the operation _might have succeeded_ (e.g. timeout).
150#[derive(Debug)]
151pub struct Indeterminate {
152    pub(crate) inner: anyhow::Error,
153}
154
155impl Indeterminate {
156    /// Return a new Indeterminate wrapping the given error.
157    ///
158    /// Exposed for testing.
159    pub fn new(inner: anyhow::Error) -> Self {
160        Indeterminate { inner }
161    }
162}
163
164impl std::fmt::Display for Indeterminate {
165    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166        write!(f, "indeterminate: ")?;
167        self.inner.fmt(f)
168    }
169}
170
171impl std::error::Error for Indeterminate {
172    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
173        self.inner.source()
174    }
175}
176
177/// An impl of PartialEq purely for convenience in tests and debug assertions.
178#[cfg(any(test, debug_assertions))]
179impl PartialEq for Indeterminate {
180    fn eq(&self, other: &Self) -> bool {
181        self.to_string() == other.to_string()
182    }
183}
184
185/// An error coming from an underlying durability system (e.g. s3) or from
186/// invalid data received from one.
187#[derive(Debug)]
188pub enum ExternalError {
189    /// A determinate error from an external system.
190    Determinate(Determinate),
191    /// An indeterminate error from an external system.
192    Indeterminate(Indeterminate),
193}
194
195impl ExternalError {
196    /// Returns a new error representing a timeout.
197    ///
198    /// TODO: When we overhaul errors, this presumably should instead be a type
199    /// that can be matched on.
200    #[track_caller]
201    pub fn new_timeout(deadline: Instant) -> Self {
202        ExternalError::Indeterminate(Indeterminate {
203            inner: anyhow!("timeout at {:?}", deadline),
204        })
205    }
206
207    /// Returns whether this error represents a timeout.
208    ///
209    /// TODO: When we overhaul errors, this presumably should instead be a type
210    /// that can be matched on.
211    pub fn is_timeout(&self) -> bool {
212        // Gross...
213        self.to_string().contains("timeout")
214    }
215}
216
217impl std::fmt::Display for ExternalError {
218    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219        match self {
220            ExternalError::Determinate(x) => std::fmt::Display::fmt(x, f),
221            ExternalError::Indeterminate(x) => std::fmt::Display::fmt(x, f),
222        }
223    }
224}
225
226impl std::error::Error for ExternalError {
227    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
228        match self {
229            ExternalError::Determinate(e) => e.source(),
230            ExternalError::Indeterminate(e) => e.source(),
231        }
232    }
233}
234
235/// An impl of PartialEq purely for convenience in tests and debug assertions.
236#[cfg(any(test, debug_assertions))]
237impl PartialEq for ExternalError {
238    fn eq(&self, other: &Self) -> bool {
239        self.to_string() == other.to_string()
240    }
241}
242
243impl From<PostgresError> for ExternalError {
244    fn from(x: PostgresError) -> Self {
245        match x {
246            PostgresError::Determinate(e) => ExternalError::Determinate(Determinate::new(e)),
247            PostgresError::Indeterminate(e) => ExternalError::Indeterminate(Indeterminate::new(e)),
248        }
249    }
250}
251
252impl From<Indeterminate> for ExternalError {
253    fn from(x: Indeterminate) -> Self {
254        ExternalError::Indeterminate(x)
255    }
256}
257
258impl From<Determinate> for ExternalError {
259    fn from(x: Determinate) -> Self {
260        ExternalError::Determinate(x)
261    }
262}
263
264impl From<anyhow::Error> for ExternalError {
265    fn from(inner: anyhow::Error) -> Self {
266        ExternalError::Indeterminate(Indeterminate { inner })
267    }
268}
269
270impl From<Error> for ExternalError {
271    fn from(x: Error) -> Self {
272        ExternalError::Indeterminate(Indeterminate {
273            inner: anyhow::Error::new(x),
274        })
275    }
276}
277
278impl From<std::io::Error> for ExternalError {
279    fn from(x: std::io::Error) -> Self {
280        ExternalError::Indeterminate(Indeterminate {
281            inner: anyhow::Error::new(x),
282        })
283    }
284}
285
286impl From<deadpool_postgres::tokio_postgres::Error> for ExternalError {
287    fn from(e: deadpool_postgres::tokio_postgres::Error) -> Self {
288        let code = match e.as_db_error().map(|x| x.code()) {
289            Some(x) => x,
290            None => {
291                return ExternalError::Indeterminate(Indeterminate {
292                    inner: anyhow::Error::new(e),
293                });
294            }
295        };
296        match code {
297            // Feel free to add more things to this allowlist as we encounter
298            // them as long as you're certain they're determinate.
299            &deadpool_postgres::tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE => {
300                ExternalError::Determinate(Determinate {
301                    inner: anyhow::Error::new(e),
302                })
303            }
304            _ => ExternalError::Indeterminate(Indeterminate {
305                inner: anyhow::Error::new(e),
306            }),
307        }
308    }
309}
310
311impl From<azure_core::Error> for ExternalError {
312    fn from(value: azure_core::Error) -> Self {
313        let definitely_determinate = if let Some(http) = value.as_http_error() {
314            match http.status() {
315                // There are many other status codes that _ought_ to be determinate, according to
316                // the HTTP spec, but this includes only codes that we've observed in practice for now.
317                StatusCode::TooManyRequests => true,
318                _ => false,
319            }
320        } else {
321            false
322        };
323        if definitely_determinate {
324            ExternalError::Determinate(Determinate {
325                inner: anyhow!(value),
326            })
327        } else {
328            ExternalError::Indeterminate(Indeterminate {
329                inner: anyhow!(value),
330            })
331        }
332    }
333}
334
335impl From<deadpool_postgres::PoolError> for ExternalError {
336    fn from(x: deadpool_postgres::PoolError) -> Self {
337        match x {
338            // We have logic for turning a postgres Error into an ExternalError,
339            // so use it.
340            deadpool_postgres::PoolError::Backend(x) => ExternalError::from(x),
341            x => ExternalError::Indeterminate(Indeterminate {
342                inner: anyhow::Error::new(x),
343            }),
344        }
345    }
346}
347
348impl From<tokio::task::JoinError> for ExternalError {
349    fn from(x: tokio::task::JoinError) -> Self {
350        ExternalError::Indeterminate(Indeterminate {
351            inner: anyhow::Error::new(x),
352        })
353    }
354}
355
356/// An abstraction for a single arbitrarily-sized binary blob and an associated
357/// version number (sequence number).
358#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
359pub struct VersionedData {
360    /// The sequence number of the data.
361    pub seqno: SeqNo,
362    /// The data itself.
363    pub data: Bytes,
364}
365
366/// Helper constant to scan all states in [Consensus::scan].
367/// The maximum possible SeqNo is i64::MAX.
368// TODO(benesch): find a way to express this without `as`.
369#[allow(clippy::as_conversions)]
370pub const SCAN_ALL: usize = u64_to_usize(i64::MAX as u64);
371
372/// A key usable for liveness checks via [Consensus::head].
373pub const CONSENSUS_HEAD_LIVENESS_KEY: &str = "LIVENESS";
374
375/// Return type to indicate whether [Consensus::compare_and_set] succeeded or failed.
376#[derive(Debug, PartialEq, Serialize, Deserialize)]
377pub enum CaSResult {
378    /// The compare-and-set succeeded and committed new state.
379    Committed,
380    /// The compare-and-set failed due to expectation mismatch.
381    ExpectationMismatch,
382}
383
384/// Wraps all calls to a backing store in a new tokio task. This adds extra overhead,
385/// but insulates the system from callers who fail to drive futures promptly to completion,
386/// which can cause timeouts or resource exhaustion in a store.
387#[derive(Debug)]
388pub struct Tasked<A>(pub Arc<A>);
389
390impl<A> Tasked<A> {
391    fn clone_backing(&self) -> Arc<A> {
392        Arc::clone(&self.0)
393    }
394}
395
396/// A boxed stream, similar to what `async_trait` desugars async functions to, but hardcoded
397/// to our standard result type.
398pub type ResultStream<'a, T> = Pin<Box<dyn Stream<Item = Result<T, ExternalError>> + Send + 'a>>;
399
400/// An abstraction for [VersionedData] held in a location in persistent storage
401/// where the data are conditionally updated by version.
402///
403/// Users are expected to use this API with consistently increasing sequence numbers
404/// to allow multiple processes across multiple machines to agree to a total order
405/// of the evolution of the data. To make roundtripping through various forms of durable
406/// storage easier, sequence numbers used with [Consensus] need to be restricted to the
407/// range [0, i64::MAX].
408#[async_trait]
409pub trait Consensus: std::fmt::Debug + Send + Sync {
410    /// Returns all the keys ever created in the consensus store.
411    fn list_keys(&self) -> ResultStream<'_, String>;
412
413    /// Returns a recent version of `data`, and the corresponding sequence number, if
414    /// one exists at this location.
415    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError>;
416
417    /// Update the [VersionedData] stored at this location to `new`, iff the
418    /// current sequence number is exactly `expected` and `new`'s sequence
419    /// number > the current sequence number.
420    ///
421    /// It is invalid to call this function with a `new` and `expected` such
422    /// that `new`'s sequence number is <= `expected`. It is invalid to call
423    /// this function with a sequence number outside of the range `[0, i64::MAX]`.
424    ///
425    /// This data is initialized to None, and the first call to compare_and_set
426    /// needs to happen with None as the expected value to set the state.
427    async fn compare_and_set(
428        &self,
429        key: &str,
430        expected: Option<SeqNo>,
431        new: VersionedData,
432    ) -> Result<CaSResult, ExternalError>;
433
434    /// Return `limit` versions of data stored for this `key` at sequence numbers >= `from`,
435    /// in ascending order of sequence number.
436    ///
437    /// Returns an empty vec if `from` is greater than the current sequence
438    /// number or if there is no data at this key.
439    async fn scan(
440        &self,
441        key: &str,
442        from: SeqNo,
443        limit: usize,
444    ) -> Result<Vec<VersionedData>, ExternalError>;
445
446    /// Deletes all historical versions of the data stored at `key` that are <
447    /// `seqno`, iff `seqno` <= the current sequence number.
448    ///
449    /// Returns the number of versions deleted or `None` on success. Returns an error if
450    /// `seqno` is greater than the current sequence number, or if there is no
451    /// data at this key.
452    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError>;
453}
454
455#[async_trait]
456impl<A: Consensus + 'static> Consensus for Tasked<A> {
457    fn list_keys(&self) -> ResultStream<'_, String> {
458        // Similarly to Blob::list_keys_and_metadata, this is difficult to make into a task.
459        // (If we use an unbounded channel between the task and the caller, we can buffer forever;
460        // if we use a bounded channel, we lose the isolation benefits of Tasked.)
461        // However, this should only be called in administrative contexts
462        // and not in the main state-machine impl.
463        self.0.list_keys()
464    }
465
466    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
467        let backing = self.clone_backing();
468        let key = key.to_owned();
469        mz_ore::task::spawn(
470            || "persist::task::head",
471            async move { backing.head(&key).await }.instrument(Span::current()),
472        )
473        .await
474    }
475
476    async fn compare_and_set(
477        &self,
478        key: &str,
479        expected: Option<SeqNo>,
480        new: VersionedData,
481    ) -> Result<CaSResult, ExternalError> {
482        let backing = self.clone_backing();
483        let key = key.to_owned();
484        mz_ore::task::spawn(
485            || "persist::task::cas",
486            async move { backing.compare_and_set(&key, expected, new).await }
487                .instrument(Span::current()),
488        )
489        .await
490    }
491
492    async fn scan(
493        &self,
494        key: &str,
495        from: SeqNo,
496        limit: usize,
497    ) -> Result<Vec<VersionedData>, ExternalError> {
498        let backing = self.clone_backing();
499        let key = key.to_owned();
500        mz_ore::task::spawn(
501            || "persist::task::scan",
502            async move { backing.scan(&key, from, limit).await }.instrument(Span::current()),
503        )
504        .await
505    }
506
507    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
508        let backing = self.clone_backing();
509        let key = key.to_owned();
510        mz_ore::task::spawn(
511            || "persist::task::truncate",
512            async move { backing.truncate(&key, seqno).await }.instrument(Span::current()),
513        )
514        .await
515    }
516}
517
518/// Metadata about a particular blob stored by persist
519#[derive(Debug)]
520pub struct BlobMetadata<'a> {
521    /// The key for the blob
522    pub key: &'a str,
523    /// Size of the blob
524    pub size_in_bytes: u64,
525}
526
527/// A key usable for liveness checks via [Blob::get].
528pub const BLOB_GET_LIVENESS_KEY: &str = "LIVENESS";
529
530/// An abstraction over read-write access to a `bytes key`->`bytes value` store.
531///
532/// Implementations are required to be _linearizable_.
533///
534/// TODO: Consider whether this can be relaxed. Since our usage is write-once
535/// modify-never, it certainly seems like we could by adding retries around
536/// `get` to wait for a non-linearizable `set` to show up. However, the tricky
537/// bit comes once we stop handing out seqno capabilities to readers and have to
538/// start reasoning about "this set hasn't show up yet" vs "the blob has already
539/// been deleted". Another tricky problem is the same but for a deletion when
540/// the first attempt timed out.
541#[async_trait]
542pub trait Blob: std::fmt::Debug + Send + Sync {
543    /// Returns a reference to the value corresponding to the key.
544    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError>;
545
546    /// List all of the keys in the map with metadata about the entry.
547    ///
548    /// Can be optionally restricted to only list keys starting with a
549    /// given prefix.
550    async fn list_keys_and_metadata(
551        &self,
552        key_prefix: &str,
553        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
554    ) -> Result<(), ExternalError>;
555
556    /// Inserts a key-value pair into the map.
557    ///
558    /// Writes must be atomic and either succeed or leave the previous value
559    /// intact.
560    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError>;
561
562    /// Remove a key from the map.
563    ///
564    /// Returns Some and the size of the deleted blob if if exists. Succeeds and
565    /// returns None if it does not exist.
566    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError>;
567
568    /// Restores a previously-deleted key to the map, if possible.
569    ///
570    /// Returns successfully if the key exists after this call: perhaps because it already existed
571    /// or was restored. (In particular, this makes restore idempotent.)
572    /// Fails if we were unable to restore any value for that key:
573    /// perhaps the key was never written, or was permanently deleted.
574    ///
575    /// It is acceptable for [Blob::restore] to be unable
576    /// to restore keys, in which case this method should succeed iff the key exists.
577    async fn restore(&self, key: &str) -> Result<(), ExternalError>;
578}
579
580#[async_trait]
581impl<A: Blob + 'static> Blob for Tasked<A> {
582    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
583        let backing = self.clone_backing();
584        let key = key.to_owned();
585        mz_ore::task::spawn(
586            || "persist::task::get",
587            async move { backing.get(&key).await }.instrument(Span::current()),
588        )
589        .await
590    }
591
592    /// List all of the keys in the map with metadata about the entry.
593    ///
594    /// Can be optionally restricted to only list keys starting with a
595    /// given prefix.
596    async fn list_keys_and_metadata(
597        &self,
598        key_prefix: &str,
599        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
600    ) -> Result<(), ExternalError> {
601        // TODO: No good way that I can see to make this one a task because of
602        // the closure and Blob needing to be object-safe.
603        self.0.list_keys_and_metadata(key_prefix, f).await
604    }
605
606    /// Inserts a key-value pair into the map.
607    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
608        let backing = self.clone_backing();
609        let key = key.to_owned();
610        mz_ore::task::spawn(
611            || "persist::task::set",
612            async move { backing.set(&key, value).await }.instrument(Span::current()),
613        )
614        .await
615    }
616
617    /// Remove a key from the map.
618    ///
619    /// Returns Some and the size of the deleted blob if if exists. Succeeds and
620    /// returns None if it does not exist.
621    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
622        let backing = self.clone_backing();
623        let key = key.to_owned();
624        mz_ore::task::spawn(
625            || "persist::task::delete",
626            async move { backing.delete(&key).await }.instrument(Span::current()),
627        )
628        .await
629    }
630
631    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
632        let backing = self.clone_backing();
633        let key = key.to_owned();
634        mz_ore::task::spawn(
635            || "persist::task::restore",
636            async move { backing.restore(&key).await }.instrument(Span::current()),
637        )
638        .await
639    }
640}
641
642/// Test helpers for the crate.
643#[cfg(test)]
644pub mod tests {
645    use std::future::Future;
646
647    use anyhow::anyhow;
648    use futures_util::TryStreamExt;
649    use mz_ore::{assert_err, assert_ok};
650    use uuid::Uuid;
651
652    use crate::location::Blob;
653
654    use super::*;
655
656    fn keys(baseline: &[String], new: &[&str]) -> Vec<String> {
657        let mut ret = baseline.to_vec();
658        ret.extend(new.iter().map(|x| x.to_string()));
659        ret.sort();
660        ret
661    }
662
663    async fn get_keys(b: &impl Blob) -> Result<Vec<String>, ExternalError> {
664        let mut keys = vec![];
665        b.list_keys_and_metadata("", &mut |entry| keys.push(entry.key.to_string()))
666            .await?;
667        Ok(keys)
668    }
669
670    async fn get_keys_with_prefix(
671        b: &impl Blob,
672        prefix: &str,
673    ) -> Result<Vec<String>, ExternalError> {
674        let mut keys = vec![];
675        b.list_keys_and_metadata(prefix, &mut |entry| keys.push(entry.key.to_string()))
676            .await?;
677        Ok(keys)
678    }
679
680    /// Common test impl for different blob implementations.
681    pub async fn blob_impl_test<
682        B: Blob,
683        F: Future<Output = Result<B, ExternalError>>,
684        NewFn: Fn(&'static str) -> F,
685    >(
686        new_fn: NewFn,
687    ) -> Result<(), ExternalError> {
688        let values = ["v0".as_bytes().to_vec(), "v1".as_bytes().to_vec()];
689
690        let blob0 = new_fn("path0").await?;
691
692        // We can create a second blob writing to a different place.
693        let _ = new_fn("path1").await?;
694
695        // We can open two blobs to the same place, even.
696        let blob1 = new_fn("path0").await?;
697
698        let k0 = "foo/bar/k0";
699
700        // Empty key is empty.
701        assert_eq!(blob0.get(k0).await?, None);
702        assert_eq!(blob1.get(k0).await?, None);
703
704        // Empty list keys is empty.
705        let empty_keys = get_keys(&blob0).await?;
706        assert_eq!(empty_keys, Vec::<String>::new());
707        let empty_keys = get_keys(&blob1).await?;
708        assert_eq!(empty_keys, Vec::<String>::new());
709
710        // Set a key and get it back.
711        blob0.set(k0, values[0].clone().into()).await?;
712        assert_eq!(
713            blob0.get(k0).await?.map(|s| s.into_contiguous()),
714            Some(values[0].clone())
715        );
716        assert_eq!(
717            blob1.get(k0).await?.map(|s| s.into_contiguous()),
718            Some(values[0].clone())
719        );
720
721        // Set another key and get it back.
722        blob0.set("k0a", values[0].clone().into()).await?;
723        assert_eq!(
724            blob0.get("k0a").await?.map(|s| s.into_contiguous()),
725            Some(values[0].clone())
726        );
727        assert_eq!(
728            blob1.get("k0a").await?.map(|s| s.into_contiguous()),
729            Some(values[0].clone())
730        );
731
732        // Blob contains the key we just inserted.
733        let mut blob_keys = get_keys(&blob0).await?;
734        blob_keys.sort();
735        assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
736        let mut blob_keys = get_keys(&blob1).await?;
737        blob_keys.sort();
738        assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
739
740        // Can overwrite a key.
741        blob0.set(k0, values[1].clone().into()).await?;
742        assert_eq!(
743            blob0.get(k0).await?.map(|s| s.into_contiguous()),
744            Some(values[1].clone())
745        );
746        assert_eq!(
747            blob1.get(k0).await?.map(|s| s.into_contiguous()),
748            Some(values[1].clone())
749        );
750        // Can overwrite another key.
751        blob0.set("k0a", values[1].clone().into()).await?;
752        assert_eq!(
753            blob0.get("k0a").await?.map(|s| s.into_contiguous()),
754            Some(values[1].clone())
755        );
756        assert_eq!(
757            blob1.get("k0a").await?.map(|s| s.into_contiguous()),
758            Some(values[1].clone())
759        );
760
761        // Can delete a key.
762        assert_eq!(blob0.delete(k0).await, Ok(Some(2)));
763        // Can no longer get a deleted key.
764        assert_eq!(blob0.get(k0).await?, None);
765        assert_eq!(blob1.get(k0).await?, None);
766        // Double deleting a key succeeds but indicates that it did no work.
767        assert_eq!(blob0.delete(k0).await, Ok(None));
768        // Deleting a key that does not exist succeeds.
769        assert_eq!(blob0.delete("nope").await, Ok(None));
770        // Deleting a key with an empty value indicates it did work but deleted
771        // no bytes.
772        blob0.set("empty", Bytes::new()).await?;
773        assert_eq!(blob0.delete("empty").await, Ok(Some(0)));
774
775        // Attempt to restore a key. Not all backends will be able to restore, but
776        // we can confirm that our data is visible iff restore reported success.
777        blob0.set("undelete", Bytes::from("data")).await?;
778        // Restoring should always succeed when the key exists.
779        blob0.restore("undelete").await?;
780        assert_eq!(blob0.delete("undelete").await?, Some("data".len()));
781        let expected = match blob0.restore("undelete").await {
782            Ok(()) => Some(Bytes::from("data").into()),
783            Err(ExternalError::Determinate(_)) => None,
784            Err(other) => return Err(other),
785        };
786        assert_eq!(blob0.get("undelete").await?, expected);
787        blob0.delete("undelete").await?;
788
789        // Empty blob contains no keys.
790        blob0.delete("k0a").await?;
791        let mut blob_keys = get_keys(&blob0).await?;
792        blob_keys.sort();
793        assert_eq!(blob_keys, empty_keys);
794        let mut blob_keys = get_keys(&blob1).await?;
795        blob_keys.sort();
796        assert_eq!(blob_keys, empty_keys);
797        // Can reset a deleted key to some other value.
798        blob0.set(k0, values[1].clone().into()).await?;
799        assert_eq!(
800            blob1.get(k0).await?.map(|s| s.into_contiguous()),
801            Some(values[1].clone())
802        );
803        assert_eq!(
804            blob0.get(k0).await?.map(|s| s.into_contiguous()),
805            Some(values[1].clone())
806        );
807
808        // Insert multiple keys back to back and validate that we can list
809        // them all out.
810        let mut expected_keys = empty_keys;
811        for i in 1..=5 {
812            let key = format!("k{}", i);
813            blob0.set(&key, values[0].clone().into()).await?;
814            expected_keys.push(key);
815        }
816
817        // Blob contains the key we just inserted.
818        let mut blob_keys = get_keys(&blob0).await?;
819        blob_keys.sort();
820        assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
821        let mut blob_keys = get_keys(&blob1).await?;
822        blob_keys.sort();
823        assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
824
825        // Insert multiple keys with a different prefix and validate that we can
826        // list out keys by their prefix
827        let mut expected_prefix_keys = vec![];
828        for i in 1..=3 {
829            let key = format!("k-prefix-{}", i);
830            blob0.set(&key, values[0].clone().into()).await?;
831            expected_prefix_keys.push(key);
832        }
833        let mut blob_keys = get_keys_with_prefix(&blob0, "k-prefix").await?;
834        blob_keys.sort();
835        assert_eq!(blob_keys, expected_prefix_keys);
836        let mut blob_keys = get_keys_with_prefix(&blob0, "k").await?;
837        blob_keys.sort();
838        expected_keys.extend(expected_prefix_keys);
839        expected_keys.sort();
840        assert_eq!(blob_keys, expected_keys);
841
842        // We can open a new blob to the same path and use it.
843        let blob3 = new_fn("path0").await?;
844        assert_eq!(
845            blob3.get(k0).await?.map(|s| s.into_contiguous()),
846            Some(values[1].clone())
847        );
848
849        Ok(())
850    }
851
852    /// Common test impl for different consensus implementations.
853    pub async fn consensus_impl_test<
854        C: Consensus,
855        F: Future<Output = Result<C, ExternalError>>,
856        NewFn: FnMut() -> F,
857    >(
858        mut new_fn: NewFn,
859    ) -> Result<(), ExternalError> {
860        let consensus = new_fn().await?;
861
862        // Use a random key so independent runs of this test don't interfere
863        // with each other.
864        let key = Uuid::new_v4().to_string();
865
866        // Starting value of consensus data is None.
867        assert_eq!(consensus.head(&key).await, Ok(None));
868
869        // Can scan a key that has no data.
870        assert_eq!(consensus.scan(&key, SeqNo(0), SCAN_ALL).await, Ok(vec![]));
871
872        // Cannot truncate data from a key that doesn't have any data
873        assert_err!(consensus.truncate(&key, SeqNo(0)).await);
874
875        let state = VersionedData {
876            seqno: SeqNo(5),
877            data: Bytes::from("abc"),
878        };
879
880        // Incorrectly setting the data with a non-None expected should fail.
881        assert_eq!(
882            consensus
883                .compare_and_set(&key, Some(SeqNo(0)), state.clone())
884                .await,
885            Ok(CaSResult::ExpectationMismatch),
886        );
887
888        // Correctly updating the state with the correct expected value should succeed.
889        assert_eq!(
890            consensus.compare_and_set(&key, None, state.clone()).await,
891            Ok(CaSResult::Committed),
892        );
893
894        // The new key is visible in state.
895        let keys: Vec<_> = consensus.list_keys().try_collect().await?;
896        assert_eq!(keys, vec![key.to_owned()]);
897
898        // We can observe the a recent value on successful update.
899        assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
900
901        // Can scan a key that has data with a lower bound sequence number < head.
902        assert_eq!(
903            consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
904            Ok(vec![state.clone()])
905        );
906
907        // Can scan a key that has data with a lower bound sequence number == head.
908        assert_eq!(
909            consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
910            Ok(vec![state.clone()])
911        );
912
913        // Can scan a key that has data with a lower bound sequence number >
914        // head.
915        assert_eq!(consensus.scan(&key, SeqNo(6), SCAN_ALL).await, Ok(vec![]));
916
917        // Can truncate data with an upper bound <= head, even if there is no data in the
918        // range [0, upper).
919        assert_ok!(consensus.truncate(&key, SeqNo(0)).await);
920        assert_ok!(consensus.truncate(&key, SeqNo(5)).await);
921
922        // Cannot truncate data with an upper bound > head.
923        assert_err!(consensus.truncate(&key, SeqNo(6)).await);
924
925        let new_state = VersionedData {
926            seqno: SeqNo(10),
927            data: Bytes::from("def"),
928        };
929
930        // Trying to update without the correct expected seqno fails, (even if expected > current)
931        assert_eq!(
932            consensus
933                .compare_and_set(&key, Some(SeqNo(7)), new_state.clone())
934                .await,
935            Ok(CaSResult::ExpectationMismatch),
936        );
937
938        // Trying to update without the correct expected seqno fails, (even if expected < current)
939        assert_eq!(
940            consensus
941                .compare_and_set(&key, Some(SeqNo(3)), new_state.clone())
942                .await,
943            Ok(CaSResult::ExpectationMismatch),
944        );
945
946        let invalid_constant_seqno = VersionedData {
947            seqno: SeqNo(5),
948            data: Bytes::from("invalid"),
949        };
950
951        // Trying to set the data to a sequence number == current fails even if
952        // expected is correct.
953        assert_eq!(
954            consensus
955                .compare_and_set(&key, Some(state.seqno), invalid_constant_seqno)
956                .await,
957            Err(ExternalError::from(anyhow!(
958                "new seqno must be strictly greater than expected. Got new: SeqNo(5) expected: SeqNo(5)"
959            )))
960        );
961
962        let invalid_regressing_seqno = VersionedData {
963            seqno: SeqNo(3),
964            data: Bytes::from("invalid"),
965        };
966
967        // Trying to set the data to a sequence number < current fails even if
968        // expected is correct.
969        assert_eq!(
970            consensus
971                .compare_and_set(&key, Some(state.seqno), invalid_regressing_seqno)
972                .await,
973            Err(ExternalError::from(anyhow!(
974                "new seqno must be strictly greater than expected. Got new: SeqNo(3) expected: SeqNo(5)"
975            )))
976        );
977
978        // Can correctly update to a new state if we provide the right expected seqno
979        assert_eq!(
980            consensus
981                .compare_and_set(&key, Some(state.seqno), new_state.clone())
982                .await,
983            Ok(CaSResult::Committed),
984        );
985
986        // We can observe the a recent value on successful update.
987        assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
988
989        // We can observe both states in the correct order with scan if pass
990        // in a suitable lower bound.
991        assert_eq!(
992            consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
993            Ok(vec![state.clone(), new_state.clone()])
994        );
995
996        // We can observe only the most recent state if the lower bound is higher
997        // than the previous insertion's sequence number.
998        assert_eq!(
999            consensus.scan(&key, SeqNo(6), SCAN_ALL).await,
1000            Ok(vec![new_state.clone()])
1001        );
1002
1003        // We can still observe the most recent insert as long as the provided
1004        // lower bound == most recent 's sequence number.
1005        assert_eq!(
1006            consensus.scan(&key, SeqNo(10), SCAN_ALL).await,
1007            Ok(vec![new_state.clone()])
1008        );
1009
1010        // We can scan if the provided lower bound > head's sequence number.
1011        assert_eq!(consensus.scan(&key, SeqNo(11), SCAN_ALL).await, Ok(vec![]));
1012
1013        // We can scan with limits that don't cover all states
1014        assert_eq!(
1015            consensus.scan(&key, SeqNo::minimum(), 1).await,
1016            Ok(vec![state.clone()])
1017        );
1018        assert_eq!(
1019            consensus.scan(&key, SeqNo(5), 1).await,
1020            Ok(vec![state.clone()])
1021        );
1022
1023        // We can scan with limits to cover exactly the number of states
1024        assert_eq!(
1025            consensus.scan(&key, SeqNo::minimum(), 2).await,
1026            Ok(vec![state.clone(), new_state.clone()])
1027        );
1028
1029        // We can scan with a limit larger than the number of states
1030        assert_eq!(
1031            consensus.scan(&key, SeqNo(4), 100).await,
1032            Ok(vec![state.clone(), new_state.clone()])
1033        );
1034
1035        // Can remove the previous write with the appropriate truncation.
1036        assert_ok!(consensus.truncate(&key, SeqNo(6)).await);
1037
1038        // Verify that the old write is indeed deleted.
1039        assert_eq!(
1040            consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
1041            Ok(vec![new_state.clone()])
1042        );
1043
1044        // Truncate is idempotent and can be repeated. The return value
1045        // indicates we didn't do any work though.
1046        assert_ok!(consensus.truncate(&key, SeqNo(6)).await);
1047
1048        // Make sure entries under different keys don't clash.
1049        let other_key = Uuid::new_v4().to_string();
1050
1051        assert_eq!(consensus.head(&other_key).await, Ok(None));
1052
1053        let state = VersionedData {
1054            seqno: SeqNo(1),
1055            data: Bytes::from("einszweidrei"),
1056        };
1057
1058        assert_eq!(
1059            consensus
1060                .compare_and_set(&other_key, None, state.clone())
1061                .await,
1062            Ok(CaSResult::Committed),
1063        );
1064
1065        assert_eq!(consensus.head(&other_key).await, Ok(Some(state.clone())));
1066
1067        // State for the first key is still as expected.
1068        assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
1069
1070        // Trying to update from a stale version of current doesn't work.
1071        let invalid_jump_forward = VersionedData {
1072            seqno: SeqNo(11),
1073            data: Bytes::from("invalid"),
1074        };
1075        assert_eq!(
1076            consensus
1077                .compare_and_set(&key, Some(state.seqno), invalid_jump_forward)
1078                .await,
1079            Ok(CaSResult::ExpectationMismatch),
1080        );
1081
1082        // Writing a large (~10 KiB) amount of data works fine.
1083        let large_state = VersionedData {
1084            seqno: SeqNo(11),
1085            data: std::iter::repeat(b'a').take(10240).collect(),
1086        };
1087        assert_eq!(
1088            consensus
1089                .compare_and_set(&key, Some(new_state.seqno), large_state)
1090                .await,
1091            Ok(CaSResult::Committed),
1092        );
1093
1094        // Truncate can delete more than one version at a time.
1095        let v12 = VersionedData {
1096            seqno: SeqNo(12),
1097            data: Bytes::new(),
1098        };
1099        assert_eq!(
1100            consensus.compare_and_set(&key, Some(SeqNo(11)), v12).await,
1101            Ok(CaSResult::Committed),
1102        );
1103        assert_ok!(consensus.truncate(&key, SeqNo(12)).await);
1104
1105        // Sequence numbers used within Consensus have to be within [0, i64::MAX].
1106
1107        assert_eq!(
1108            consensus
1109                .compare_and_set(
1110                    &Uuid::new_v4().to_string(),
1111                    None,
1112                    VersionedData {
1113                        seqno: SeqNo(0),
1114                        data: Bytes::new(),
1115                    }
1116                )
1117                .await,
1118            Ok(CaSResult::Committed),
1119        );
1120        assert_eq!(
1121            consensus
1122                .compare_and_set(
1123                    &Uuid::new_v4().to_string(),
1124                    None,
1125                    VersionedData {
1126                        seqno: SeqNo(i64::MAX.try_into().expect("i64::MAX fits in u64")),
1127                        data: Bytes::new(),
1128                    }
1129                )
1130                .await,
1131            Ok(CaSResult::Committed),
1132        );
1133        assert_err!(
1134            consensus
1135                .compare_and_set(
1136                    &Uuid::new_v4().to_string(),
1137                    None,
1138                    VersionedData {
1139                        seqno: SeqNo(1 << 63),
1140                        data: Bytes::new(),
1141                    }
1142                )
1143                .await
1144        );
1145        assert_err!(
1146            consensus
1147                .compare_and_set(
1148                    &Uuid::new_v4().to_string(),
1149                    None,
1150                    VersionedData {
1151                        seqno: SeqNo(u64::MAX),
1152                        data: Bytes::new(),
1153                    }
1154                )
1155                .await
1156        );
1157
1158        Ok(())
1159    }
1160
1161    #[mz_ore::test]
1162    fn timeout_error() {
1163        assert!(ExternalError::new_timeout(Instant::now()).is_timeout());
1164        assert!(!ExternalError::from(anyhow!("foo")).is_timeout());
1165    }
1166}