mz_persist/
location.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Abstractions over files, cloud storage, etc used in persistence.
11
12use std::fmt;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use azure_core::StatusCode;
20use bytes::Bytes;
21use futures_util::Stream;
22use mz_ore::bytes::SegmentedBytes;
23use mz_ore::cast::u64_to_usize;
24use mz_postgres_client::error::PostgresError;
25use mz_proto::RustType;
26use proptest_derive::Arbitrary;
27use serde::{Deserialize, Serialize};
28use tracing::{Instrument, Span};
29
30use crate::error::Error;
31
32/// The "sequence number" of a persist state change.
33///
34/// Persist is a state machine, with all mutating requests modeled as input
35/// state changes sequenced into a log. This reflects that ordering.
36///
37/// This ordering also includes requests that were sequenced and applied to the
38/// persist state machine, but that application was deterministically made into
39/// a no-op because it was contextually invalid (a write or seal at a sealed
40/// timestamp, an allow_compactions at an unsealed timestamp, etc).
41///
42/// Read-only requests are assigned the SeqNo of a write, indicating that all
43/// mutating requests up to and including that one are reflected in the read
44/// state.
45#[derive(
46    Arbitrary, Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize,
47)]
48pub struct SeqNo(pub u64);
49
50impl std::fmt::Display for SeqNo {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        write!(f, "v{}", self.0)
53    }
54}
55
56impl timely::PartialOrder for SeqNo {
57    fn less_equal(&self, other: &Self) -> bool {
58        self <= other
59    }
60}
61
62impl std::str::FromStr for SeqNo {
63    type Err = String;
64
65    fn from_str(encoded: &str) -> Result<Self, Self::Err> {
66        let encoded = match encoded.strip_prefix('v') {
67            Some(x) => x,
68            None => return Err(format!("invalid SeqNo {}: incorrect prefix", encoded)),
69        };
70        let seqno =
71            u64::from_str(encoded).map_err(|err| format!("invalid SeqNo {}: {}", encoded, err))?;
72        Ok(SeqNo(seqno))
73    }
74}
75
76impl SeqNo {
77    /// Returns the next SeqNo in the sequence.
78    pub fn next(self) -> SeqNo {
79        SeqNo(self.0 + 1)
80    }
81
82    /// A minimum value suitable as a default.
83    pub fn minimum() -> Self {
84        SeqNo(0)
85    }
86}
87
88impl RustType<u64> for SeqNo {
89    fn into_proto(&self) -> u64 {
90        self.0
91    }
92
93    fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
94        Ok(SeqNo(proto))
95    }
96}
97
98/// An error coming from an underlying durability system (e.g. s3) indicating
99/// that the operation _definitely did NOT succeed_ (e.g. permission denied).
100#[derive(Debug)]
101pub struct Determinate {
102    inner: anyhow::Error,
103}
104
105impl std::fmt::Display for Determinate {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        write!(f, "determinate: ")?;
108        self.inner.fmt(f)
109    }
110}
111
112impl std::error::Error for Determinate {
113    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
114        self.inner.source()
115    }
116}
117
118impl Determinate {
119    /// Return a new Determinate wrapping the given error.
120    ///
121    /// Exposed for testing via [crate::unreliable].
122    pub fn new(inner: anyhow::Error) -> Self {
123        Determinate { inner }
124    }
125}
126
127/// An error coming from an underlying durability system (e.g. s3) indicating
128/// that the operation _might have succeeded_ (e.g. timeout).
129#[derive(Debug)]
130pub struct Indeterminate {
131    pub(crate) inner: anyhow::Error,
132}
133
134impl Indeterminate {
135    /// Return a new Indeterminate wrapping the given error.
136    ///
137    /// Exposed for testing.
138    pub fn new(inner: anyhow::Error) -> Self {
139        Indeterminate { inner }
140    }
141}
142
143impl std::fmt::Display for Indeterminate {
144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145        write!(f, "indeterminate: ")?;
146        self.inner.fmt(f)
147    }
148}
149
150impl std::error::Error for Indeterminate {
151    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
152        self.inner.source()
153    }
154}
155
156/// An impl of PartialEq purely for convenience in tests and debug assertions.
157#[cfg(any(test, debug_assertions))]
158impl PartialEq for Indeterminate {
159    fn eq(&self, other: &Self) -> bool {
160        self.to_string() == other.to_string()
161    }
162}
163
164/// An error coming from an underlying durability system (e.g. s3) or from
165/// invalid data received from one.
166#[derive(Debug)]
167pub enum ExternalError {
168    /// A determinate error from an external system.
169    Determinate(Determinate),
170    /// An indeterminate error from an external system.
171    Indeterminate(Indeterminate),
172}
173
174impl ExternalError {
175    /// Returns a new error representing a timeout.
176    ///
177    /// TODO: When we overhaul errors, this presumably should instead be a type
178    /// that can be matched on.
179    #[track_caller]
180    pub fn new_timeout(deadline: Instant) -> Self {
181        ExternalError::Indeterminate(Indeterminate {
182            inner: anyhow!("timeout at {:?}", deadline),
183        })
184    }
185
186    /// Returns whether this error represents a timeout.
187    ///
188    /// TODO: When we overhaul errors, this presumably should instead be a type
189    /// that can be matched on.
190    pub fn is_timeout(&self) -> bool {
191        // Gross...
192        self.to_string().contains("timeout")
193    }
194}
195
196impl std::fmt::Display for ExternalError {
197    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198        match self {
199            ExternalError::Determinate(x) => std::fmt::Display::fmt(x, f),
200            ExternalError::Indeterminate(x) => std::fmt::Display::fmt(x, f),
201        }
202    }
203}
204
205impl std::error::Error for ExternalError {
206    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
207        match self {
208            ExternalError::Determinate(e) => e.source(),
209            ExternalError::Indeterminate(e) => e.source(),
210        }
211    }
212}
213
214/// An impl of PartialEq purely for convenience in tests and debug assertions.
215#[cfg(any(test, debug_assertions))]
216impl PartialEq for ExternalError {
217    fn eq(&self, other: &Self) -> bool {
218        self.to_string() == other.to_string()
219    }
220}
221
222impl From<PostgresError> for ExternalError {
223    fn from(x: PostgresError) -> Self {
224        match x {
225            PostgresError::Determinate(e) => ExternalError::Determinate(Determinate::new(e)),
226            PostgresError::Indeterminate(e) => ExternalError::Indeterminate(Indeterminate::new(e)),
227        }
228    }
229}
230
231impl From<Indeterminate> for ExternalError {
232    fn from(x: Indeterminate) -> Self {
233        ExternalError::Indeterminate(x)
234    }
235}
236
237impl From<Determinate> for ExternalError {
238    fn from(x: Determinate) -> Self {
239        ExternalError::Determinate(x)
240    }
241}
242
243impl From<anyhow::Error> for ExternalError {
244    fn from(inner: anyhow::Error) -> Self {
245        ExternalError::Indeterminate(Indeterminate { inner })
246    }
247}
248
249impl From<Error> for ExternalError {
250    fn from(x: Error) -> Self {
251        ExternalError::Indeterminate(Indeterminate {
252            inner: anyhow::Error::new(x),
253        })
254    }
255}
256
257impl From<std::io::Error> for ExternalError {
258    fn from(x: std::io::Error) -> Self {
259        ExternalError::Indeterminate(Indeterminate {
260            inner: anyhow::Error::new(x),
261        })
262    }
263}
264
265impl From<deadpool_postgres::tokio_postgres::Error> for ExternalError {
266    fn from(e: deadpool_postgres::tokio_postgres::Error) -> Self {
267        let code = match e.as_db_error().map(|x| x.code()) {
268            Some(x) => x,
269            None => {
270                return ExternalError::Indeterminate(Indeterminate {
271                    inner: anyhow::Error::new(e),
272                });
273            }
274        };
275        match code {
276            // Feel free to add more things to this allowlist as we encounter
277            // them as long as you're certain they're determinate.
278            &deadpool_postgres::tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE => {
279                ExternalError::Determinate(Determinate {
280                    inner: anyhow::Error::new(e),
281                })
282            }
283            _ => ExternalError::Indeterminate(Indeterminate {
284                inner: anyhow::Error::new(e),
285            }),
286        }
287    }
288}
289
290impl From<azure_core::Error> for ExternalError {
291    fn from(value: azure_core::Error) -> Self {
292        let definitely_determinate = if let Some(http) = value.as_http_error() {
293            match http.status() {
294                // There are many other status codes that _ought_ to be determinate, according to
295                // the HTTP spec, but this includes only codes that we've observed in practice for now.
296                StatusCode::TooManyRequests => true,
297                _ => false,
298            }
299        } else {
300            false
301        };
302        if definitely_determinate {
303            ExternalError::Determinate(Determinate {
304                inner: anyhow!(value),
305            })
306        } else {
307            ExternalError::Indeterminate(Indeterminate {
308                inner: anyhow!(value),
309            })
310        }
311    }
312}
313
314impl From<deadpool_postgres::PoolError> for ExternalError {
315    fn from(x: deadpool_postgres::PoolError) -> Self {
316        match x {
317            // We have logic for turning a postgres Error into an ExternalError,
318            // so use it.
319            deadpool_postgres::PoolError::Backend(x) => ExternalError::from(x),
320            x => ExternalError::Indeterminate(Indeterminate {
321                inner: anyhow::Error::new(x),
322            }),
323        }
324    }
325}
326
327impl From<tokio::task::JoinError> for ExternalError {
328    fn from(x: tokio::task::JoinError) -> Self {
329        ExternalError::Indeterminate(Indeterminate {
330            inner: anyhow::Error::new(x),
331        })
332    }
333}
334
335/// An abstraction for a single arbitrarily-sized binary blob and an associated
336/// version number (sequence number).
337#[derive(Debug, Clone, PartialEq)]
338pub struct VersionedData {
339    /// The sequence number of the data.
340    pub seqno: SeqNo,
341    /// The data itself.
342    pub data: Bytes,
343}
344
345/// Helper constant to scan all states in [Consensus::scan].
346/// The maximum possible SeqNo is i64::MAX.
347// TODO(benesch): find a way to express this without `as`.
348#[allow(clippy::as_conversions)]
349pub const SCAN_ALL: usize = u64_to_usize(i64::MAX as u64);
350
351/// A key usable for liveness checks via [Consensus::head].
352pub const CONSENSUS_HEAD_LIVENESS_KEY: &str = "LIVENESS";
353
354/// Return type to indicate whether [Consensus::compare_and_set] succeeded or failed.
355#[derive(Debug, PartialEq)]
356pub enum CaSResult {
357    /// The compare-and-set succeeded and committed new state.
358    Committed,
359    /// The compare-and-set failed due to expectation mismatch.
360    ExpectationMismatch,
361}
362
363/// Wraps all calls to a backing store in a new tokio task. This adds extra overhead,
364/// but insulates the system from callers who fail to drive futures promptly to completion,
365/// which can cause timeouts or resource exhaustion in a store.
366#[derive(Debug)]
367pub struct Tasked<A>(pub Arc<A>);
368
369impl<A> Tasked<A> {
370    fn clone_backing(&self) -> Arc<A> {
371        Arc::clone(&self.0)
372    }
373}
374
375/// A boxed stream, similar to what `async_trait` desugars async functions to, but hardcoded
376/// to our standard result type.
377pub type ResultStream<'a, T> = Pin<Box<dyn Stream<Item = Result<T, ExternalError>> + Send + 'a>>;
378
379/// An abstraction for [VersionedData] held in a location in persistent storage
380/// where the data are conditionally updated by version.
381///
382/// Users are expected to use this API with consistently increasing sequence numbers
383/// to allow multiple processes across multiple machines to agree to a total order
384/// of the evolution of the data. To make roundtripping through various forms of durable
385/// storage easier, sequence numbers used with [Consensus] need to be restricted to the
386/// range [0, i64::MAX].
387#[async_trait]
388pub trait Consensus: std::fmt::Debug + Send + Sync {
389    /// Returns all the keys ever created in the consensus store.
390    fn list_keys(&self) -> ResultStream<'_, String>;
391
392    /// Returns a recent version of `data`, and the corresponding sequence number, if
393    /// one exists at this location.
394    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError>;
395
396    /// Update the [VersionedData] stored at this location to `new`, iff the
397    /// current sequence number is exactly `expected` and `new`'s sequence
398    /// number > the current sequence number.
399    ///
400    /// It is invalid to call this function with a `new` and `expected` such
401    /// that `new`'s sequence number is <= `expected`. It is invalid to call
402    /// this function with a sequence number outside of the range `[0, i64::MAX]`.
403    ///
404    /// This data is initialized to None, and the first call to compare_and_set
405    /// needs to happen with None as the expected value to set the state.
406    async fn compare_and_set(
407        &self,
408        key: &str,
409        expected: Option<SeqNo>,
410        new: VersionedData,
411    ) -> Result<CaSResult, ExternalError>;
412
413    /// Return `limit` versions of data stored for this `key` at sequence numbers >= `from`,
414    /// in ascending order of sequence number.
415    ///
416    /// Returns an empty vec if `from` is greater than the current sequence
417    /// number or if there is no data at this key.
418    async fn scan(
419        &self,
420        key: &str,
421        from: SeqNo,
422        limit: usize,
423    ) -> Result<Vec<VersionedData>, ExternalError>;
424
425    /// Deletes all historical versions of the data stored at `key` that are <
426    /// `seqno`, iff `seqno` <= the current sequence number.
427    ///
428    /// Returns the number of versions deleted on success. Returns an error if
429    /// `seqno` is greater than the current sequence number, or if there is no
430    /// data at this key.
431    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError>;
432}
433
434#[async_trait]
435impl<A: Consensus + 'static> Consensus for Tasked<A> {
436    fn list_keys(&self) -> ResultStream<'_, String> {
437        // Similarly to Blob::list_keys_and_metadata, this is difficult to make into a task.
438        // (If we use an unbounded channel between the task and the caller, we can buffer forever;
439        // if we use a bounded channel, we lose the isolation benefits of Tasked.)
440        // However, this should only be called in administrative contexts
441        // and not in the main state-machine impl.
442        self.0.list_keys()
443    }
444
445    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
446        let backing = self.clone_backing();
447        let key = key.to_owned();
448        mz_ore::task::spawn(
449            || "persist::task::head",
450            async move { backing.head(&key).await }.instrument(Span::current()),
451        )
452        .await?
453    }
454
455    async fn compare_and_set(
456        &self,
457        key: &str,
458        expected: Option<SeqNo>,
459        new: VersionedData,
460    ) -> Result<CaSResult, ExternalError> {
461        let backing = self.clone_backing();
462        let key = key.to_owned();
463        mz_ore::task::spawn(
464            || "persist::task::cas",
465            async move { backing.compare_and_set(&key, expected, new).await }
466                .instrument(Span::current()),
467        )
468        .await?
469    }
470
471    async fn scan(
472        &self,
473        key: &str,
474        from: SeqNo,
475        limit: usize,
476    ) -> Result<Vec<VersionedData>, ExternalError> {
477        let backing = self.clone_backing();
478        let key = key.to_owned();
479        mz_ore::task::spawn(
480            || "persist::task::scan",
481            async move { backing.scan(&key, from, limit).await }.instrument(Span::current()),
482        )
483        .await?
484    }
485
486    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
487        let backing = self.clone_backing();
488        let key = key.to_owned();
489        mz_ore::task::spawn(
490            || "persist::task::truncate",
491            async move { backing.truncate(&key, seqno).await }.instrument(Span::current()),
492        )
493        .await?
494    }
495}
496
497/// Metadata about a particular blob stored by persist
498#[derive(Debug)]
499pub struct BlobMetadata<'a> {
500    /// The key for the blob
501    pub key: &'a str,
502    /// Size of the blob
503    pub size_in_bytes: u64,
504}
505
506/// A key usable for liveness checks via [Blob::get].
507pub const BLOB_GET_LIVENESS_KEY: &str = "LIVENESS";
508
509/// An abstraction over read-write access to a `bytes key`->`bytes value` store.
510///
511/// Implementations are required to be _linearizable_.
512///
513/// TODO: Consider whether this can be relaxed. Since our usage is write-once
514/// modify-never, it certainly seems like we could by adding retries around
515/// `get` to wait for a non-linearizable `set` to show up. However, the tricky
516/// bit comes once we stop handing out seqno capabilities to readers and have to
517/// start reasoning about "this set hasn't show up yet" vs "the blob has already
518/// been deleted". Another tricky problem is the same but for a deletion when
519/// the first attempt timed out.
520#[async_trait]
521pub trait Blob: std::fmt::Debug + Send + Sync {
522    /// Returns a reference to the value corresponding to the key.
523    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError>;
524
525    /// List all of the keys in the map with metadata about the entry.
526    ///
527    /// Can be optionally restricted to only list keys starting with a
528    /// given prefix.
529    async fn list_keys_and_metadata(
530        &self,
531        key_prefix: &str,
532        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
533    ) -> Result<(), ExternalError>;
534
535    /// Inserts a key-value pair into the map.
536    ///
537    /// Writes must be atomic and either succeed or leave the previous value
538    /// intact.
539    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError>;
540
541    /// Remove a key from the map.
542    ///
543    /// Returns Some and the size of the deleted blob if if exists. Succeeds and
544    /// returns None if it does not exist.
545    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError>;
546
547    /// Restores a previously-deleted key to the map, if possible.
548    ///
549    /// Returns successfully if the key exists after this call: perhaps because it already existed
550    /// or was restored. (In particular, this makes restore idempotent.)
551    /// Fails if we were unable to restore any value for that key:
552    /// perhaps the key was never written, or was permanently deleted.
553    ///
554    /// It is acceptable for [Blob::restore] to be unable
555    /// to restore keys, in which case this method should succeed iff the key exists.
556    async fn restore(&self, key: &str) -> Result<(), ExternalError>;
557}
558
559#[async_trait]
560impl<A: Blob + 'static> Blob for Tasked<A> {
561    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
562        let backing = self.clone_backing();
563        let key = key.to_owned();
564        mz_ore::task::spawn(
565            || "persist::task::get",
566            async move { backing.get(&key).await }.instrument(Span::current()),
567        )
568        .await?
569    }
570
571    /// List all of the keys in the map with metadata about the entry.
572    ///
573    /// Can be optionally restricted to only list keys starting with a
574    /// given prefix.
575    async fn list_keys_and_metadata(
576        &self,
577        key_prefix: &str,
578        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
579    ) -> Result<(), ExternalError> {
580        // TODO: No good way that I can see to make this one a task because of
581        // the closure and Blob needing to be object-safe.
582        self.0.list_keys_and_metadata(key_prefix, f).await
583    }
584
585    /// Inserts a key-value pair into the map.
586    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
587        let backing = self.clone_backing();
588        let key = key.to_owned();
589        mz_ore::task::spawn(
590            || "persist::task::set",
591            async move { backing.set(&key, value).await }.instrument(Span::current()),
592        )
593        .await?
594    }
595
596    /// Remove a key from the map.
597    ///
598    /// Returns Some and the size of the deleted blob if if exists. Succeeds and
599    /// returns None if it does not exist.
600    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
601        let backing = self.clone_backing();
602        let key = key.to_owned();
603        mz_ore::task::spawn(
604            || "persist::task::delete",
605            async move { backing.delete(&key).await }.instrument(Span::current()),
606        )
607        .await?
608    }
609
610    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
611        let backing = self.clone_backing();
612        let key = key.to_owned();
613        mz_ore::task::spawn(
614            || "persist::task::restore",
615            async move { backing.restore(&key).await }.instrument(Span::current()),
616        )
617        .await?
618    }
619}
620
621/// Test helpers for the crate.
622#[cfg(test)]
623pub mod tests {
624    use std::future::Future;
625
626    use anyhow::anyhow;
627    use futures_util::TryStreamExt;
628    use mz_ore::assert_err;
629    use uuid::Uuid;
630
631    use crate::location::Blob;
632
633    use super::*;
634
635    fn keys(baseline: &[String], new: &[&str]) -> Vec<String> {
636        let mut ret = baseline.to_vec();
637        ret.extend(new.iter().map(|x| x.to_string()));
638        ret.sort();
639        ret
640    }
641
642    async fn get_keys(b: &impl Blob) -> Result<Vec<String>, ExternalError> {
643        let mut keys = vec![];
644        b.list_keys_and_metadata("", &mut |entry| keys.push(entry.key.to_string()))
645            .await?;
646        Ok(keys)
647    }
648
649    async fn get_keys_with_prefix(
650        b: &impl Blob,
651        prefix: &str,
652    ) -> Result<Vec<String>, ExternalError> {
653        let mut keys = vec![];
654        b.list_keys_and_metadata(prefix, &mut |entry| keys.push(entry.key.to_string()))
655            .await?;
656        Ok(keys)
657    }
658
659    /// Common test impl for different blob implementations.
660    pub async fn blob_impl_test<
661        B: Blob,
662        F: Future<Output = Result<B, ExternalError>>,
663        NewFn: Fn(&'static str) -> F,
664    >(
665        new_fn: NewFn,
666    ) -> Result<(), ExternalError> {
667        let values = ["v0".as_bytes().to_vec(), "v1".as_bytes().to_vec()];
668
669        let blob0 = new_fn("path0").await?;
670
671        // We can create a second blob writing to a different place.
672        let _ = new_fn("path1").await?;
673
674        // We can open two blobs to the same place, even.
675        let blob1 = new_fn("path0").await?;
676
677        let k0 = "foo/bar/k0";
678
679        // Empty key is empty.
680        assert_eq!(blob0.get(k0).await?, None);
681        assert_eq!(blob1.get(k0).await?, None);
682
683        // Empty list keys is empty.
684        let empty_keys = get_keys(&blob0).await?;
685        assert_eq!(empty_keys, Vec::<String>::new());
686        let empty_keys = get_keys(&blob1).await?;
687        assert_eq!(empty_keys, Vec::<String>::new());
688
689        // Set a key and get it back.
690        blob0.set(k0, values[0].clone().into()).await?;
691        assert_eq!(
692            blob0.get(k0).await?.map(|s| s.into_contiguous()),
693            Some(values[0].clone())
694        );
695        assert_eq!(
696            blob1.get(k0).await?.map(|s| s.into_contiguous()),
697            Some(values[0].clone())
698        );
699
700        // Set another key and get it back.
701        blob0.set("k0a", values[0].clone().into()).await?;
702        assert_eq!(
703            blob0.get("k0a").await?.map(|s| s.into_contiguous()),
704            Some(values[0].clone())
705        );
706        assert_eq!(
707            blob1.get("k0a").await?.map(|s| s.into_contiguous()),
708            Some(values[0].clone())
709        );
710
711        // Blob contains the key we just inserted.
712        let mut blob_keys = get_keys(&blob0).await?;
713        blob_keys.sort();
714        assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
715        let mut blob_keys = get_keys(&blob1).await?;
716        blob_keys.sort();
717        assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
718
719        // Can overwrite a key.
720        blob0.set(k0, values[1].clone().into()).await?;
721        assert_eq!(
722            blob0.get(k0).await?.map(|s| s.into_contiguous()),
723            Some(values[1].clone())
724        );
725        assert_eq!(
726            blob1.get(k0).await?.map(|s| s.into_contiguous()),
727            Some(values[1].clone())
728        );
729        // Can overwrite another key.
730        blob0.set("k0a", values[1].clone().into()).await?;
731        assert_eq!(
732            blob0.get("k0a").await?.map(|s| s.into_contiguous()),
733            Some(values[1].clone())
734        );
735        assert_eq!(
736            blob1.get("k0a").await?.map(|s| s.into_contiguous()),
737            Some(values[1].clone())
738        );
739
740        // Can delete a key.
741        assert_eq!(blob0.delete(k0).await, Ok(Some(2)));
742        // Can no longer get a deleted key.
743        assert_eq!(blob0.get(k0).await?, None);
744        assert_eq!(blob1.get(k0).await?, None);
745        // Double deleting a key succeeds but indicates that it did no work.
746        assert_eq!(blob0.delete(k0).await, Ok(None));
747        // Deleting a key that does not exist succeeds.
748        assert_eq!(blob0.delete("nope").await, Ok(None));
749        // Deleting a key with an empty value indicates it did work but deleted
750        // no bytes.
751        blob0.set("empty", Bytes::new()).await?;
752        assert_eq!(blob0.delete("empty").await, Ok(Some(0)));
753
754        // Attempt to restore a key. Not all backends will be able to restore, but
755        // we can confirm that our data is visible iff restore reported success.
756        blob0.set("undelete", Bytes::from("data")).await?;
757        // Restoring should always succeed when the key exists.
758        blob0.restore("undelete").await?;
759        assert_eq!(blob0.delete("undelete").await?, Some("data".len()));
760        let expected = match blob0.restore("undelete").await {
761            Ok(()) => Some(Bytes::from("data").into()),
762            Err(ExternalError::Determinate(_)) => None,
763            Err(other) => return Err(other),
764        };
765        assert_eq!(blob0.get("undelete").await?, expected);
766        blob0.delete("undelete").await?;
767
768        // Empty blob contains no keys.
769        blob0.delete("k0a").await?;
770        let mut blob_keys = get_keys(&blob0).await?;
771        blob_keys.sort();
772        assert_eq!(blob_keys, empty_keys);
773        let mut blob_keys = get_keys(&blob1).await?;
774        blob_keys.sort();
775        assert_eq!(blob_keys, empty_keys);
776        // Can reset a deleted key to some other value.
777        blob0.set(k0, values[1].clone().into()).await?;
778        assert_eq!(
779            blob1.get(k0).await?.map(|s| s.into_contiguous()),
780            Some(values[1].clone())
781        );
782        assert_eq!(
783            blob0.get(k0).await?.map(|s| s.into_contiguous()),
784            Some(values[1].clone())
785        );
786
787        // Insert multiple keys back to back and validate that we can list
788        // them all out.
789        let mut expected_keys = empty_keys;
790        for i in 1..=5 {
791            let key = format!("k{}", i);
792            blob0.set(&key, values[0].clone().into()).await?;
793            expected_keys.push(key);
794        }
795
796        // Blob contains the key we just inserted.
797        let mut blob_keys = get_keys(&blob0).await?;
798        blob_keys.sort();
799        assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
800        let mut blob_keys = get_keys(&blob1).await?;
801        blob_keys.sort();
802        assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
803
804        // Insert multiple keys with a different prefix and validate that we can
805        // list out keys by their prefix
806        let mut expected_prefix_keys = vec![];
807        for i in 1..=3 {
808            let key = format!("k-prefix-{}", i);
809            blob0.set(&key, values[0].clone().into()).await?;
810            expected_prefix_keys.push(key);
811        }
812        let mut blob_keys = get_keys_with_prefix(&blob0, "k-prefix").await?;
813        blob_keys.sort();
814        assert_eq!(blob_keys, expected_prefix_keys);
815        let mut blob_keys = get_keys_with_prefix(&blob0, "k").await?;
816        blob_keys.sort();
817        expected_keys.extend(expected_prefix_keys);
818        expected_keys.sort();
819        assert_eq!(blob_keys, expected_keys);
820
821        // We can open a new blob to the same path and use it.
822        let blob3 = new_fn("path0").await?;
823        assert_eq!(
824            blob3.get(k0).await?.map(|s| s.into_contiguous()),
825            Some(values[1].clone())
826        );
827
828        Ok(())
829    }
830
831    /// Common test impl for different consensus implementations.
832    pub async fn consensus_impl_test<
833        C: Consensus,
834        F: Future<Output = Result<C, ExternalError>>,
835        NewFn: FnMut() -> F,
836    >(
837        mut new_fn: NewFn,
838    ) -> Result<(), ExternalError> {
839        let consensus = new_fn().await?;
840
841        // Use a random key so independent runs of this test don't interfere
842        // with each other.
843        let key = Uuid::new_v4().to_string();
844
845        // Starting value of consensus data is None.
846        assert_eq!(consensus.head(&key).await, Ok(None));
847
848        // Can scan a key that has no data.
849        assert_eq!(consensus.scan(&key, SeqNo(0), SCAN_ALL).await, Ok(vec![]));
850
851        // Cannot truncate data from a key that doesn't have any data
852        assert_err!(consensus.truncate(&key, SeqNo(0)).await);
853
854        let state = VersionedData {
855            seqno: SeqNo(5),
856            data: Bytes::from("abc"),
857        };
858
859        // Incorrectly setting the data with a non-None expected should fail.
860        assert_eq!(
861            consensus
862                .compare_and_set(&key, Some(SeqNo(0)), state.clone())
863                .await,
864            Ok(CaSResult::ExpectationMismatch),
865        );
866
867        // Correctly updating the state with the correct expected value should succeed.
868        assert_eq!(
869            consensus.compare_and_set(&key, None, state.clone()).await,
870            Ok(CaSResult::Committed),
871        );
872
873        // The new key is visible in state.
874        let keys: Vec<_> = consensus.list_keys().try_collect().await?;
875        assert_eq!(keys, vec![key.to_owned()]);
876
877        // We can observe the a recent value on successful update.
878        assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
879
880        // Can scan a key that has data with a lower bound sequence number < head.
881        assert_eq!(
882            consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
883            Ok(vec![state.clone()])
884        );
885
886        // Can scan a key that has data with a lower bound sequence number == head.
887        assert_eq!(
888            consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
889            Ok(vec![state.clone()])
890        );
891
892        // Can scan a key that has data with a lower bound sequence number >
893        // head.
894        assert_eq!(consensus.scan(&key, SeqNo(6), SCAN_ALL).await, Ok(vec![]));
895
896        // Can truncate data with an upper bound <= head, even if there is no data in the
897        // range [0, upper).
898        assert_eq!(consensus.truncate(&key, SeqNo(0)).await, Ok(0));
899        assert_eq!(consensus.truncate(&key, SeqNo(5)).await, Ok(0));
900
901        // Cannot truncate data with an upper bound > head.
902        assert_err!(consensus.truncate(&key, SeqNo(6)).await);
903
904        let new_state = VersionedData {
905            seqno: SeqNo(10),
906            data: Bytes::from("def"),
907        };
908
909        // Trying to update without the correct expected seqno fails, (even if expected > current)
910        assert_eq!(
911            consensus
912                .compare_and_set(&key, Some(SeqNo(7)), new_state.clone())
913                .await,
914            Ok(CaSResult::ExpectationMismatch),
915        );
916
917        // Trying to update without the correct expected seqno fails, (even if expected < current)
918        assert_eq!(
919            consensus
920                .compare_and_set(&key, Some(SeqNo(3)), new_state.clone())
921                .await,
922            Ok(CaSResult::ExpectationMismatch),
923        );
924
925        let invalid_constant_seqno = VersionedData {
926            seqno: SeqNo(5),
927            data: Bytes::from("invalid"),
928        };
929
930        // Trying to set the data to a sequence number == current fails even if
931        // expected is correct.
932        assert_eq!(
933            consensus
934                .compare_and_set(&key, Some(state.seqno), invalid_constant_seqno)
935                .await,
936            Err(ExternalError::from(anyhow!(
937                "new seqno must be strictly greater than expected. Got new: SeqNo(5) expected: SeqNo(5)"
938            )))
939        );
940
941        let invalid_regressing_seqno = VersionedData {
942            seqno: SeqNo(3),
943            data: Bytes::from("invalid"),
944        };
945
946        // Trying to set the data to a sequence number < current fails even if
947        // expected is correct.
948        assert_eq!(
949            consensus
950                .compare_and_set(&key, Some(state.seqno), invalid_regressing_seqno)
951                .await,
952            Err(ExternalError::from(anyhow!(
953                "new seqno must be strictly greater than expected. Got new: SeqNo(3) expected: SeqNo(5)"
954            )))
955        );
956
957        // Can correctly update to a new state if we provide the right expected seqno
958        assert_eq!(
959            consensus
960                .compare_and_set(&key, Some(state.seqno), new_state.clone())
961                .await,
962            Ok(CaSResult::Committed),
963        );
964
965        // We can observe the a recent value on successful update.
966        assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
967
968        // We can observe both states in the correct order with scan if pass
969        // in a suitable lower bound.
970        assert_eq!(
971            consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
972            Ok(vec![state.clone(), new_state.clone()])
973        );
974
975        // We can observe only the most recent state if the lower bound is higher
976        // than the previous insertion's sequence number.
977        assert_eq!(
978            consensus.scan(&key, SeqNo(6), SCAN_ALL).await,
979            Ok(vec![new_state.clone()])
980        );
981
982        // We can still observe the most recent insert as long as the provided
983        // lower bound == most recent 's sequence number.
984        assert_eq!(
985            consensus.scan(&key, SeqNo(10), SCAN_ALL).await,
986            Ok(vec![new_state.clone()])
987        );
988
989        // We can scan if the provided lower bound > head's sequence number.
990        assert_eq!(consensus.scan(&key, SeqNo(11), SCAN_ALL).await, Ok(vec![]));
991
992        // We can scan with limits that don't cover all states
993        assert_eq!(
994            consensus.scan(&key, SeqNo::minimum(), 1).await,
995            Ok(vec![state.clone()])
996        );
997        assert_eq!(
998            consensus.scan(&key, SeqNo(5), 1).await,
999            Ok(vec![state.clone()])
1000        );
1001
1002        // We can scan with limits to cover exactly the number of states
1003        assert_eq!(
1004            consensus.scan(&key, SeqNo::minimum(), 2).await,
1005            Ok(vec![state.clone(), new_state.clone()])
1006        );
1007
1008        // We can scan with a limit larger than the number of states
1009        assert_eq!(
1010            consensus.scan(&key, SeqNo(4), 100).await,
1011            Ok(vec![state.clone(), new_state.clone()])
1012        );
1013
1014        // Can remove the previous write with the appropriate truncation.
1015        assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(1));
1016
1017        // Verify that the old write is indeed deleted.
1018        assert_eq!(
1019            consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
1020            Ok(vec![new_state.clone()])
1021        );
1022
1023        // Truncate is idempotent and can be repeated. The return value
1024        // indicates we didn't do any work though.
1025        assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(0));
1026
1027        // Make sure entries under different keys don't clash.
1028        let other_key = Uuid::new_v4().to_string();
1029
1030        assert_eq!(consensus.head(&other_key).await, Ok(None));
1031
1032        let state = VersionedData {
1033            seqno: SeqNo(1),
1034            data: Bytes::from("einszweidrei"),
1035        };
1036
1037        assert_eq!(
1038            consensus
1039                .compare_and_set(&other_key, None, state.clone())
1040                .await,
1041            Ok(CaSResult::Committed),
1042        );
1043
1044        assert_eq!(consensus.head(&other_key).await, Ok(Some(state.clone())));
1045
1046        // State for the first key is still as expected.
1047        assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
1048
1049        // Trying to update from a stale version of current doesn't work.
1050        let invalid_jump_forward = VersionedData {
1051            seqno: SeqNo(11),
1052            data: Bytes::from("invalid"),
1053        };
1054        assert_eq!(
1055            consensus
1056                .compare_and_set(&key, Some(state.seqno), invalid_jump_forward)
1057                .await,
1058            Ok(CaSResult::ExpectationMismatch),
1059        );
1060
1061        // Writing a large (~10 KiB) amount of data works fine.
1062        let large_state = VersionedData {
1063            seqno: SeqNo(11),
1064            data: std::iter::repeat(b'a').take(10240).collect(),
1065        };
1066        assert_eq!(
1067            consensus
1068                .compare_and_set(&key, Some(new_state.seqno), large_state)
1069                .await,
1070            Ok(CaSResult::Committed),
1071        );
1072
1073        // Truncate can delete more than one version at a time.
1074        let v12 = VersionedData {
1075            seqno: SeqNo(12),
1076            data: Bytes::new(),
1077        };
1078        assert_eq!(
1079            consensus.compare_and_set(&key, Some(SeqNo(11)), v12).await,
1080            Ok(CaSResult::Committed),
1081        );
1082        assert_eq!(consensus.truncate(&key, SeqNo(12)).await, Ok(2));
1083
1084        // Sequence numbers used within Consensus have to be within [0, i64::MAX].
1085
1086        assert_eq!(
1087            consensus
1088                .compare_and_set(
1089                    &Uuid::new_v4().to_string(),
1090                    None,
1091                    VersionedData {
1092                        seqno: SeqNo(0),
1093                        data: Bytes::new(),
1094                    }
1095                )
1096                .await,
1097            Ok(CaSResult::Committed),
1098        );
1099        assert_eq!(
1100            consensus
1101                .compare_and_set(
1102                    &Uuid::new_v4().to_string(),
1103                    None,
1104                    VersionedData {
1105                        seqno: SeqNo(i64::MAX.try_into().expect("i64::MAX fits in u64")),
1106                        data: Bytes::new(),
1107                    }
1108                )
1109                .await,
1110            Ok(CaSResult::Committed),
1111        );
1112        assert_err!(
1113            consensus
1114                .compare_and_set(
1115                    &Uuid::new_v4().to_string(),
1116                    None,
1117                    VersionedData {
1118                        seqno: SeqNo(1 << 63),
1119                        data: Bytes::new(),
1120                    }
1121                )
1122                .await
1123        );
1124        assert_err!(
1125            consensus
1126                .compare_and_set(
1127                    &Uuid::new_v4().to_string(),
1128                    None,
1129                    VersionedData {
1130                        seqno: SeqNo(u64::MAX),
1131                        data: Bytes::new(),
1132                    }
1133                )
1134                .await
1135        );
1136
1137        Ok(())
1138    }
1139
1140    #[mz_ore::test]
1141    fn timeout_error() {
1142        assert!(ExternalError::new_timeout(Instant::now()).is_timeout());
1143        assert!(!ExternalError::from(anyhow!("foo")).is_timeout());
1144    }
1145}