mz_persist/
location.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Abstractions over files, cloud storage, etc used in persistence.
11
12use std::fmt;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use bytes::Bytes;
20use futures_util::Stream;
21use mz_ore::bytes::SegmentedBytes;
22use mz_ore::cast::u64_to_usize;
23use mz_postgres_client::error::PostgresError;
24use mz_proto::RustType;
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27use tracing::{Instrument, Span};
28
29use crate::error::Error;
30
31/// The "sequence number" of a persist state change.
32///
33/// Persist is a state machine, with all mutating requests modeled as input
34/// state changes sequenced into a log. This reflects that ordering.
35///
36/// This ordering also includes requests that were sequenced and applied to the
37/// persist state machine, but that application was deterministically made into
38/// a no-op because it was contextually invalid (a write or seal at a sealed
39/// timestamp, an allow_compactions at an unsealed timestamp, etc).
40///
41/// Read-only requests are assigned the SeqNo of a write, indicating that all
42/// mutating requests up to and including that one are reflected in the read
43/// state.
44#[derive(
45    Arbitrary, Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize,
46)]
47pub struct SeqNo(pub u64);
48
49impl std::fmt::Display for SeqNo {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        write!(f, "v{}", self.0)
52    }
53}
54
55impl timely::PartialOrder for SeqNo {
56    fn less_equal(&self, other: &Self) -> bool {
57        self <= other
58    }
59}
60
61impl std::str::FromStr for SeqNo {
62    type Err = String;
63
64    fn from_str(encoded: &str) -> Result<Self, Self::Err> {
65        let encoded = match encoded.strip_prefix('v') {
66            Some(x) => x,
67            None => return Err(format!("invalid SeqNo {}: incorrect prefix", encoded)),
68        };
69        let seqno =
70            u64::from_str(encoded).map_err(|err| format!("invalid SeqNo {}: {}", encoded, err))?;
71        Ok(SeqNo(seqno))
72    }
73}
74
75impl SeqNo {
76    /// Returns the next SeqNo in the sequence.
77    pub fn next(self) -> SeqNo {
78        SeqNo(self.0 + 1)
79    }
80
81    /// A minimum value suitable as a default.
82    pub fn minimum() -> Self {
83        SeqNo(0)
84    }
85}
86
87impl RustType<u64> for SeqNo {
88    fn into_proto(&self) -> u64 {
89        self.0
90    }
91
92    fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
93        Ok(SeqNo(proto))
94    }
95}
96
97/// An error coming from an underlying durability system (e.g. s3) indicating
98/// that the operation _definitely did NOT succeed_ (e.g. permission denied).
99#[derive(Debug)]
100pub struct Determinate {
101    inner: anyhow::Error,
102}
103
104impl std::fmt::Display for Determinate {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        write!(f, "determinate: ")?;
107        self.inner.fmt(f)
108    }
109}
110
111impl std::error::Error for Determinate {
112    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
113        self.inner.source()
114    }
115}
116
117impl Determinate {
118    /// Return a new Determinate wrapping the given error.
119    ///
120    /// Exposed for testing via [crate::unreliable].
121    pub fn new(inner: anyhow::Error) -> Self {
122        Determinate { inner }
123    }
124}
125
126/// An error coming from an underlying durability system (e.g. s3) indicating
127/// that the operation _might have succeeded_ (e.g. timeout).
128#[derive(Debug)]
129pub struct Indeterminate {
130    pub(crate) inner: anyhow::Error,
131}
132
133impl Indeterminate {
134    /// Return a new Indeterminate wrapping the given error.
135    ///
136    /// Exposed for testing.
137    pub fn new(inner: anyhow::Error) -> Self {
138        Indeterminate { inner }
139    }
140}
141
142impl std::fmt::Display for Indeterminate {
143    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144        write!(f, "indeterminate: ")?;
145        self.inner.fmt(f)
146    }
147}
148
149impl std::error::Error for Indeterminate {
150    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
151        self.inner.source()
152    }
153}
154
155/// An impl of PartialEq purely for convenience in tests and debug assertions.
156#[cfg(any(test, debug_assertions))]
157impl PartialEq for Indeterminate {
158    fn eq(&self, other: &Self) -> bool {
159        self.to_string() == other.to_string()
160    }
161}
162
163/// An error coming from an underlying durability system (e.g. s3) or from
164/// invalid data received from one.
165#[derive(Debug)]
166pub enum ExternalError {
167    /// A determinate error from an external system.
168    Determinate(Determinate),
169    /// An indeterminate error from an external system.
170    Indeterminate(Indeterminate),
171}
172
173impl ExternalError {
174    /// Returns a new error representing a timeout.
175    ///
176    /// TODO: When we overhaul errors, this presumably should instead be a type
177    /// that can be matched on.
178    #[track_caller]
179    pub fn new_timeout(deadline: Instant) -> Self {
180        ExternalError::Indeterminate(Indeterminate {
181            inner: anyhow!("timeout at {:?}", deadline),
182        })
183    }
184
185    /// Returns whether this error represents a timeout.
186    ///
187    /// TODO: When we overhaul errors, this presumably should instead be a type
188    /// that can be matched on.
189    pub fn is_timeout(&self) -> bool {
190        // Gross...
191        self.to_string().contains("timeout")
192    }
193}
194
195impl std::fmt::Display for ExternalError {
196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197        match self {
198            ExternalError::Determinate(x) => std::fmt::Display::fmt(x, f),
199            ExternalError::Indeterminate(x) => std::fmt::Display::fmt(x, f),
200        }
201    }
202}
203
204impl std::error::Error for ExternalError {
205    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
206        match self {
207            ExternalError::Determinate(e) => e.source(),
208            ExternalError::Indeterminate(e) => e.source(),
209        }
210    }
211}
212
213/// An impl of PartialEq purely for convenience in tests and debug assertions.
214#[cfg(any(test, debug_assertions))]
215impl PartialEq for ExternalError {
216    fn eq(&self, other: &Self) -> bool {
217        self.to_string() == other.to_string()
218    }
219}
220
221impl From<PostgresError> for ExternalError {
222    fn from(x: PostgresError) -> Self {
223        match x {
224            PostgresError::Determinate(e) => ExternalError::Determinate(Determinate::new(e)),
225            PostgresError::Indeterminate(e) => ExternalError::Indeterminate(Indeterminate::new(e)),
226        }
227    }
228}
229
230impl From<Indeterminate> for ExternalError {
231    fn from(x: Indeterminate) -> Self {
232        ExternalError::Indeterminate(x)
233    }
234}
235
236impl From<Determinate> for ExternalError {
237    fn from(x: Determinate) -> Self {
238        ExternalError::Determinate(x)
239    }
240}
241
242impl From<anyhow::Error> for ExternalError {
243    fn from(inner: anyhow::Error) -> Self {
244        ExternalError::Indeterminate(Indeterminate { inner })
245    }
246}
247
248impl From<Error> for ExternalError {
249    fn from(x: Error) -> Self {
250        ExternalError::Indeterminate(Indeterminate {
251            inner: anyhow::Error::new(x),
252        })
253    }
254}
255
256impl From<std::io::Error> for ExternalError {
257    fn from(x: std::io::Error) -> Self {
258        ExternalError::Indeterminate(Indeterminate {
259            inner: anyhow::Error::new(x),
260        })
261    }
262}
263
264impl From<deadpool_postgres::tokio_postgres::Error> for ExternalError {
265    fn from(e: deadpool_postgres::tokio_postgres::Error) -> Self {
266        let code = match e.as_db_error().map(|x| x.code()) {
267            Some(x) => x,
268            None => {
269                return ExternalError::Indeterminate(Indeterminate {
270                    inner: anyhow::Error::new(e),
271                });
272            }
273        };
274        match code {
275            // Feel free to add more things to this allowlist as we encounter
276            // them as long as you're certain they're determinate.
277            &deadpool_postgres::tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE => {
278                ExternalError::Determinate(Determinate {
279                    inner: anyhow::Error::new(e),
280                })
281            }
282            _ => ExternalError::Indeterminate(Indeterminate {
283                inner: anyhow::Error::new(e),
284            }),
285        }
286    }
287}
288
289impl From<azure_core::Error> for ExternalError {
290    fn from(value: azure_core::Error) -> Self {
291        ExternalError::Indeterminate(Indeterminate {
292            inner: anyhow!(value),
293        })
294    }
295}
296
297impl From<deadpool_postgres::PoolError> for ExternalError {
298    fn from(x: deadpool_postgres::PoolError) -> Self {
299        match x {
300            // We have logic for turning a postgres Error into an ExternalError,
301            // so use it.
302            deadpool_postgres::PoolError::Backend(x) => ExternalError::from(x),
303            x => ExternalError::Indeterminate(Indeterminate {
304                inner: anyhow::Error::new(x),
305            }),
306        }
307    }
308}
309
310impl From<tokio::task::JoinError> for ExternalError {
311    fn from(x: tokio::task::JoinError) -> Self {
312        ExternalError::Indeterminate(Indeterminate {
313            inner: anyhow::Error::new(x),
314        })
315    }
316}
317
318/// An abstraction for a single arbitrarily-sized binary blob and an associated
319/// version number (sequence number).
320#[derive(Debug, Clone, PartialEq)]
321pub struct VersionedData {
322    /// The sequence number of the data.
323    pub seqno: SeqNo,
324    /// The data itself.
325    pub data: Bytes,
326}
327
328/// Helper constant to scan all states in [Consensus::scan].
329/// The maximum possible SeqNo is i64::MAX.
330// TODO(benesch): find a way to express this without `as`.
331#[allow(clippy::as_conversions)]
332pub const SCAN_ALL: usize = u64_to_usize(i64::MAX as u64);
333
334/// A key usable for liveness checks via [Consensus::head].
335pub const CONSENSUS_HEAD_LIVENESS_KEY: &str = "LIVENESS";
336
337/// Return type to indicate whether [Consensus::compare_and_set] succeeded or failed.
338#[derive(Debug, PartialEq)]
339pub enum CaSResult {
340    /// The compare-and-set succeeded and committed new state.
341    Committed,
342    /// The compare-and-set failed due to expectation mismatch.
343    ExpectationMismatch,
344}
345
346/// Wraps all calls to a backing store in a new tokio task. This adds extra overhead,
347/// but insulates the system from callers who fail to drive futures promptly to completion,
348/// which can cause timeouts or resource exhaustion in a store.
349#[derive(Debug)]
350pub struct Tasked<A>(pub Arc<A>);
351
352impl<A> Tasked<A> {
353    fn clone_backing(&self) -> Arc<A> {
354        Arc::clone(&self.0)
355    }
356}
357
358/// A boxed stream, similar to what `async_trait` desugars async functions to, but hardcoded
359/// to our standard result type.
360pub type ResultStream<'a, T> = Pin<Box<dyn Stream<Item = Result<T, ExternalError>> + Send + 'a>>;
361
362/// An abstraction for [VersionedData] held in a location in persistent storage
363/// where the data are conditionally updated by version.
364///
365/// Users are expected to use this API with consistently increasing sequence numbers
366/// to allow multiple processes across multiple machines to agree to a total order
367/// of the evolution of the data. To make roundtripping through various forms of durable
368/// storage easier, sequence numbers used with [Consensus] need to be restricted to the
369/// range [0, i64::MAX].
370#[async_trait]
371pub trait Consensus: std::fmt::Debug + Send + Sync {
372    /// Returns all the keys ever created in the consensus store.
373    fn list_keys(&self) -> ResultStream<String>;
374
375    /// Returns a recent version of `data`, and the corresponding sequence number, if
376    /// one exists at this location.
377    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError>;
378
379    /// Update the [VersionedData] stored at this location to `new`, iff the
380    /// current sequence number is exactly `expected` and `new`'s sequence
381    /// number > the current sequence number.
382    ///
383    /// It is invalid to call this function with a `new` and `expected` such
384    /// that `new`'s sequence number is <= `expected`. It is invalid to call
385    /// this function with a sequence number outside of the range `[0, i64::MAX]`.
386    ///
387    /// This data is initialized to None, and the first call to compare_and_set
388    /// needs to happen with None as the expected value to set the state.
389    async fn compare_and_set(
390        &self,
391        key: &str,
392        expected: Option<SeqNo>,
393        new: VersionedData,
394    ) -> Result<CaSResult, ExternalError>;
395
396    /// Return `limit` versions of data stored for this `key` at sequence numbers >= `from`,
397    /// in ascending order of sequence number.
398    ///
399    /// Returns an empty vec if `from` is greater than the current sequence
400    /// number or if there is no data at this key.
401    async fn scan(
402        &self,
403        key: &str,
404        from: SeqNo,
405        limit: usize,
406    ) -> Result<Vec<VersionedData>, ExternalError>;
407
408    /// Deletes all historical versions of the data stored at `key` that are <
409    /// `seqno`, iff `seqno` <= the current sequence number.
410    ///
411    /// Returns the number of versions deleted on success. Returns an error if
412    /// `seqno` is greater than the current sequence number, or if there is no
413    /// data at this key.
414    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError>;
415}
416
417#[async_trait]
418impl<A: Consensus + 'static> Consensus for Tasked<A> {
419    fn list_keys(&self) -> ResultStream<String> {
420        // Similarly to Blob::list_keys_and_metadata, this is difficult to make into a task.
421        // (If we use an unbounded channel between the task and the caller, we can buffer forever;
422        // if we use a bounded channel, we lose the isolation benefits of Tasked.)
423        // However, this should only be called in administrative contexts
424        // and not in the main state-machine impl.
425        self.0.list_keys()
426    }
427
428    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
429        let backing = self.clone_backing();
430        let key = key.to_owned();
431        mz_ore::task::spawn(
432            || "persist::task::head",
433            async move { backing.head(&key).await }.instrument(Span::current()),
434        )
435        .await?
436    }
437
438    async fn compare_and_set(
439        &self,
440        key: &str,
441        expected: Option<SeqNo>,
442        new: VersionedData,
443    ) -> Result<CaSResult, ExternalError> {
444        let backing = self.clone_backing();
445        let key = key.to_owned();
446        mz_ore::task::spawn(
447            || "persist::task::cas",
448            async move { backing.compare_and_set(&key, expected, new).await }
449                .instrument(Span::current()),
450        )
451        .await?
452    }
453
454    async fn scan(
455        &self,
456        key: &str,
457        from: SeqNo,
458        limit: usize,
459    ) -> Result<Vec<VersionedData>, ExternalError> {
460        let backing = self.clone_backing();
461        let key = key.to_owned();
462        mz_ore::task::spawn(
463            || "persist::task::scan",
464            async move { backing.scan(&key, from, limit).await }.instrument(Span::current()),
465        )
466        .await?
467    }
468
469    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
470        let backing = self.clone_backing();
471        let key = key.to_owned();
472        mz_ore::task::spawn(
473            || "persist::task::truncate",
474            async move { backing.truncate(&key, seqno).await }.instrument(Span::current()),
475        )
476        .await?
477    }
478}
479
480/// Metadata about a particular blob stored by persist
481#[derive(Debug)]
482pub struct BlobMetadata<'a> {
483    /// The key for the blob
484    pub key: &'a str,
485    /// Size of the blob
486    pub size_in_bytes: u64,
487}
488
489/// A key usable for liveness checks via [Blob::get].
490pub const BLOB_GET_LIVENESS_KEY: &str = "LIVENESS";
491
492/// An abstraction over read-write access to a `bytes key`->`bytes value` store.
493///
494/// Implementations are required to be _linearizable_.
495///
496/// TODO: Consider whether this can be relaxed. Since our usage is write-once
497/// modify-never, it certainly seems like we could by adding retries around
498/// `get` to wait for a non-linearizable `set` to show up. However, the tricky
499/// bit comes once we stop handing out seqno capabilities to readers and have to
500/// start reasoning about "this set hasn't show up yet" vs "the blob has already
501/// been deleted". Another tricky problem is the same but for a deletion when
502/// the first attempt timed out.
503#[async_trait]
504pub trait Blob: std::fmt::Debug + Send + Sync {
505    /// Returns a reference to the value corresponding to the key.
506    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError>;
507
508    /// List all of the keys in the map with metadata about the entry.
509    ///
510    /// Can be optionally restricted to only list keys starting with a
511    /// given prefix.
512    async fn list_keys_and_metadata(
513        &self,
514        key_prefix: &str,
515        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
516    ) -> Result<(), ExternalError>;
517
518    /// Inserts a key-value pair into the map.
519    ///
520    /// Writes must be atomic and either succeed or leave the previous value
521    /// intact.
522    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError>;
523
524    /// Remove a key from the map.
525    ///
526    /// Returns Some and the size of the deleted blob if if exists. Succeeds and
527    /// returns None if it does not exist.
528    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError>;
529
530    /// Restores a previously-deleted key to the map, if possible.
531    ///
532    /// Returns successfully if the key exists after this call: perhaps because it already existed
533    /// or was restored. (In particular, this makes restore idempotent.)
534    /// Fails if we were unable to restore any value for that key:
535    /// perhaps the key was never written, or was permanently deleted.
536    ///
537    /// It is acceptable for [Blob::restore] to be unable
538    /// to restore keys, in which case this method should succeed iff the key exists.
539    async fn restore(&self, key: &str) -> Result<(), ExternalError>;
540}
541
542#[async_trait]
543impl<A: Blob + 'static> Blob for Tasked<A> {
544    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
545        let backing = self.clone_backing();
546        let key = key.to_owned();
547        mz_ore::task::spawn(
548            || "persist::task::get",
549            async move { backing.get(&key).await }.instrument(Span::current()),
550        )
551        .await?
552    }
553
554    /// List all of the keys in the map with metadata about the entry.
555    ///
556    /// Can be optionally restricted to only list keys starting with a
557    /// given prefix.
558    async fn list_keys_and_metadata(
559        &self,
560        key_prefix: &str,
561        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
562    ) -> Result<(), ExternalError> {
563        // TODO: No good way that I can see to make this one a task because of
564        // the closure and Blob needing to be object-safe.
565        self.0.list_keys_and_metadata(key_prefix, f).await
566    }
567
568    /// Inserts a key-value pair into the map.
569    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
570        let backing = self.clone_backing();
571        let key = key.to_owned();
572        mz_ore::task::spawn(
573            || "persist::task::set",
574            async move { backing.set(&key, value).await }.instrument(Span::current()),
575        )
576        .await?
577    }
578
579    /// Remove a key from the map.
580    ///
581    /// Returns Some and the size of the deleted blob if if exists. Succeeds and
582    /// returns None if it does not exist.
583    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
584        let backing = self.clone_backing();
585        let key = key.to_owned();
586        mz_ore::task::spawn(
587            || "persist::task::delete",
588            async move { backing.delete(&key).await }.instrument(Span::current()),
589        )
590        .await?
591    }
592
593    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
594        let backing = self.clone_backing();
595        let key = key.to_owned();
596        mz_ore::task::spawn(
597            || "persist::task::restore",
598            async move { backing.restore(&key).await }.instrument(Span::current()),
599        )
600        .await?
601    }
602}
603
604/// Test helpers for the crate.
605#[cfg(test)]
606pub mod tests {
607    use std::future::Future;
608
609    use anyhow::anyhow;
610    use futures_util::TryStreamExt;
611    use mz_ore::assert_err;
612    use uuid::Uuid;
613
614    use crate::location::Blob;
615
616    use super::*;
617
618    fn keys(baseline: &[String], new: &[&str]) -> Vec<String> {
619        let mut ret = baseline.to_vec();
620        ret.extend(new.iter().map(|x| x.to_string()));
621        ret.sort();
622        ret
623    }
624
625    async fn get_keys(b: &impl Blob) -> Result<Vec<String>, ExternalError> {
626        let mut keys = vec![];
627        b.list_keys_and_metadata("", &mut |entry| keys.push(entry.key.to_string()))
628            .await?;
629        Ok(keys)
630    }
631
632    async fn get_keys_with_prefix(
633        b: &impl Blob,
634        prefix: &str,
635    ) -> Result<Vec<String>, ExternalError> {
636        let mut keys = vec![];
637        b.list_keys_and_metadata(prefix, &mut |entry| keys.push(entry.key.to_string()))
638            .await?;
639        Ok(keys)
640    }
641
642    /// Common test impl for different blob implementations.
643    pub async fn blob_impl_test<
644        B: Blob,
645        F: Future<Output = Result<B, ExternalError>>,
646        NewFn: Fn(&'static str) -> F,
647    >(
648        new_fn: NewFn,
649    ) -> Result<(), ExternalError> {
650        let values = ["v0".as_bytes().to_vec(), "v1".as_bytes().to_vec()];
651
652        let blob0 = new_fn("path0").await?;
653
654        // We can create a second blob writing to a different place.
655        let _ = new_fn("path1").await?;
656
657        // We can open two blobs to the same place, even.
658        let blob1 = new_fn("path0").await?;
659
660        let k0 = "foo/bar/k0";
661
662        // Empty key is empty.
663        assert_eq!(blob0.get(k0).await?, None);
664        assert_eq!(blob1.get(k0).await?, None);
665
666        // Empty list keys is empty.
667        let empty_keys = get_keys(&blob0).await?;
668        assert_eq!(empty_keys, Vec::<String>::new());
669        let empty_keys = get_keys(&blob1).await?;
670        assert_eq!(empty_keys, Vec::<String>::new());
671
672        // Set a key and get it back.
673        blob0.set(k0, values[0].clone().into()).await?;
674        assert_eq!(
675            blob0.get(k0).await?.map(|s| s.into_contiguous()),
676            Some(values[0].clone())
677        );
678        assert_eq!(
679            blob1.get(k0).await?.map(|s| s.into_contiguous()),
680            Some(values[0].clone())
681        );
682
683        // Set another key and get it back.
684        blob0.set("k0a", values[0].clone().into()).await?;
685        assert_eq!(
686            blob0.get("k0a").await?.map(|s| s.into_contiguous()),
687            Some(values[0].clone())
688        );
689        assert_eq!(
690            blob1.get("k0a").await?.map(|s| s.into_contiguous()),
691            Some(values[0].clone())
692        );
693
694        // Blob contains the key we just inserted.
695        let mut blob_keys = get_keys(&blob0).await?;
696        blob_keys.sort();
697        assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
698        let mut blob_keys = get_keys(&blob1).await?;
699        blob_keys.sort();
700        assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
701
702        // Can overwrite a key.
703        blob0.set(k0, values[1].clone().into()).await?;
704        assert_eq!(
705            blob0.get(k0).await?.map(|s| s.into_contiguous()),
706            Some(values[1].clone())
707        );
708        assert_eq!(
709            blob1.get(k0).await?.map(|s| s.into_contiguous()),
710            Some(values[1].clone())
711        );
712        // Can overwrite another key.
713        blob0.set("k0a", values[1].clone().into()).await?;
714        assert_eq!(
715            blob0.get("k0a").await?.map(|s| s.into_contiguous()),
716            Some(values[1].clone())
717        );
718        assert_eq!(
719            blob1.get("k0a").await?.map(|s| s.into_contiguous()),
720            Some(values[1].clone())
721        );
722
723        // Can delete a key.
724        assert_eq!(blob0.delete(k0).await, Ok(Some(2)));
725        // Can no longer get a deleted key.
726        assert_eq!(blob0.get(k0).await?, None);
727        assert_eq!(blob1.get(k0).await?, None);
728        // Double deleting a key succeeds but indicates that it did no work.
729        assert_eq!(blob0.delete(k0).await, Ok(None));
730        // Deleting a key that does not exist succeeds.
731        assert_eq!(blob0.delete("nope").await, Ok(None));
732        // Deleting a key with an empty value indicates it did work but deleted
733        // no bytes.
734        blob0.set("empty", Bytes::new()).await?;
735        assert_eq!(blob0.delete("empty").await, Ok(Some(0)));
736
737        // Attempt to restore a key. Not all backends will be able to restore, but
738        // we can confirm that our data is visible iff restore reported success.
739        blob0.set("undelete", Bytes::from("data")).await?;
740        // Restoring should always succeed when the key exists.
741        blob0.restore("undelete").await?;
742        assert_eq!(blob0.delete("undelete").await?, Some("data".len()));
743        let expected = match blob0.restore("undelete").await {
744            Ok(()) => Some(Bytes::from("data").into()),
745            Err(ExternalError::Determinate(_)) => None,
746            Err(other) => return Err(other),
747        };
748        assert_eq!(blob0.get("undelete").await?, expected);
749        blob0.delete("undelete").await?;
750
751        // Empty blob contains no keys.
752        blob0.delete("k0a").await?;
753        let mut blob_keys = get_keys(&blob0).await?;
754        blob_keys.sort();
755        assert_eq!(blob_keys, empty_keys);
756        let mut blob_keys = get_keys(&blob1).await?;
757        blob_keys.sort();
758        assert_eq!(blob_keys, empty_keys);
759        // Can reset a deleted key to some other value.
760        blob0.set(k0, values[1].clone().into()).await?;
761        assert_eq!(
762            blob1.get(k0).await?.map(|s| s.into_contiguous()),
763            Some(values[1].clone())
764        );
765        assert_eq!(
766            blob0.get(k0).await?.map(|s| s.into_contiguous()),
767            Some(values[1].clone())
768        );
769
770        // Insert multiple keys back to back and validate that we can list
771        // them all out.
772        let mut expected_keys = empty_keys;
773        for i in 1..=5 {
774            let key = format!("k{}", i);
775            blob0.set(&key, values[0].clone().into()).await?;
776            expected_keys.push(key);
777        }
778
779        // Blob contains the key we just inserted.
780        let mut blob_keys = get_keys(&blob0).await?;
781        blob_keys.sort();
782        assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
783        let mut blob_keys = get_keys(&blob1).await?;
784        blob_keys.sort();
785        assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
786
787        // Insert multiple keys with a different prefix and validate that we can
788        // list out keys by their prefix
789        let mut expected_prefix_keys = vec![];
790        for i in 1..=3 {
791            let key = format!("k-prefix-{}", i);
792            blob0.set(&key, values[0].clone().into()).await?;
793            expected_prefix_keys.push(key);
794        }
795        let mut blob_keys = get_keys_with_prefix(&blob0, "k-prefix").await?;
796        blob_keys.sort();
797        assert_eq!(blob_keys, expected_prefix_keys);
798        let mut blob_keys = get_keys_with_prefix(&blob0, "k").await?;
799        blob_keys.sort();
800        expected_keys.extend(expected_prefix_keys);
801        expected_keys.sort();
802        assert_eq!(blob_keys, expected_keys);
803
804        // We can open a new blob to the same path and use it.
805        let blob3 = new_fn("path0").await?;
806        assert_eq!(
807            blob3.get(k0).await?.map(|s| s.into_contiguous()),
808            Some(values[1].clone())
809        );
810
811        Ok(())
812    }
813
814    /// Common test impl for different consensus implementations.
815    pub async fn consensus_impl_test<
816        C: Consensus,
817        F: Future<Output = Result<C, ExternalError>>,
818        NewFn: FnMut() -> F,
819    >(
820        mut new_fn: NewFn,
821    ) -> Result<(), ExternalError> {
822        let consensus = new_fn().await?;
823
824        // Use a random key so independent runs of this test don't interfere
825        // with each other.
826        let key = Uuid::new_v4().to_string();
827
828        // Starting value of consensus data is None.
829        assert_eq!(consensus.head(&key).await, Ok(None));
830
831        // Can scan a key that has no data.
832        assert_eq!(consensus.scan(&key, SeqNo(0), SCAN_ALL).await, Ok(vec![]));
833
834        // Cannot truncate data from a key that doesn't have any data
835        assert_err!(consensus.truncate(&key, SeqNo(0)).await);
836
837        let state = VersionedData {
838            seqno: SeqNo(5),
839            data: Bytes::from("abc"),
840        };
841
842        // Incorrectly setting the data with a non-None expected should fail.
843        assert_eq!(
844            consensus
845                .compare_and_set(&key, Some(SeqNo(0)), state.clone())
846                .await,
847            Ok(CaSResult::ExpectationMismatch),
848        );
849
850        // Correctly updating the state with the correct expected value should succeed.
851        assert_eq!(
852            consensus.compare_and_set(&key, None, state.clone()).await,
853            Ok(CaSResult::Committed),
854        );
855
856        // The new key is visible in state.
857        let keys: Vec<_> = consensus.list_keys().try_collect().await?;
858        assert_eq!(keys, vec![key.to_owned()]);
859
860        // We can observe the a recent value on successful update.
861        assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
862
863        // Can scan a key that has data with a lower bound sequence number < head.
864        assert_eq!(
865            consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
866            Ok(vec![state.clone()])
867        );
868
869        // Can scan a key that has data with a lower bound sequence number == head.
870        assert_eq!(
871            consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
872            Ok(vec![state.clone()])
873        );
874
875        // Can scan a key that has data with a lower bound sequence number >
876        // head.
877        assert_eq!(consensus.scan(&key, SeqNo(6), SCAN_ALL).await, Ok(vec![]));
878
879        // Can truncate data with an upper bound <= head, even if there is no data in the
880        // range [0, upper).
881        assert_eq!(consensus.truncate(&key, SeqNo(0)).await, Ok(0));
882        assert_eq!(consensus.truncate(&key, SeqNo(5)).await, Ok(0));
883
884        // Cannot truncate data with an upper bound > head.
885        assert_err!(consensus.truncate(&key, SeqNo(6)).await);
886
887        let new_state = VersionedData {
888            seqno: SeqNo(10),
889            data: Bytes::from("def"),
890        };
891
892        // Trying to update without the correct expected seqno fails, (even if expected > current)
893        assert_eq!(
894            consensus
895                .compare_and_set(&key, Some(SeqNo(7)), new_state.clone())
896                .await,
897            Ok(CaSResult::ExpectationMismatch),
898        );
899
900        // Trying to update without the correct expected seqno fails, (even if expected < current)
901        assert_eq!(
902            consensus
903                .compare_and_set(&key, Some(SeqNo(3)), new_state.clone())
904                .await,
905            Ok(CaSResult::ExpectationMismatch),
906        );
907
908        let invalid_constant_seqno = VersionedData {
909            seqno: SeqNo(5),
910            data: Bytes::from("invalid"),
911        };
912
913        // Trying to set the data to a sequence number == current fails even if
914        // expected is correct.
915        assert_eq!(
916            consensus
917                .compare_and_set(&key, Some(state.seqno), invalid_constant_seqno)
918                .await,
919            Err(ExternalError::from(anyhow!(
920                "new seqno must be strictly greater than expected. Got new: SeqNo(5) expected: SeqNo(5)"
921            )))
922        );
923
924        let invalid_regressing_seqno = VersionedData {
925            seqno: SeqNo(3),
926            data: Bytes::from("invalid"),
927        };
928
929        // Trying to set the data to a sequence number < current fails even if
930        // expected is correct.
931        assert_eq!(
932            consensus
933                .compare_and_set(&key, Some(state.seqno), invalid_regressing_seqno)
934                .await,
935            Err(ExternalError::from(anyhow!(
936                "new seqno must be strictly greater than expected. Got new: SeqNo(3) expected: SeqNo(5)"
937            )))
938        );
939
940        // Can correctly update to a new state if we provide the right expected seqno
941        assert_eq!(
942            consensus
943                .compare_and_set(&key, Some(state.seqno), new_state.clone())
944                .await,
945            Ok(CaSResult::Committed),
946        );
947
948        // We can observe the a recent value on successful update.
949        assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
950
951        // We can observe both states in the correct order with scan if pass
952        // in a suitable lower bound.
953        assert_eq!(
954            consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
955            Ok(vec![state.clone(), new_state.clone()])
956        );
957
958        // We can observe only the most recent state if the lower bound is higher
959        // than the previous insertion's sequence number.
960        assert_eq!(
961            consensus.scan(&key, SeqNo(6), SCAN_ALL).await,
962            Ok(vec![new_state.clone()])
963        );
964
965        // We can still observe the most recent insert as long as the provided
966        // lower bound == most recent 's sequence number.
967        assert_eq!(
968            consensus.scan(&key, SeqNo(10), SCAN_ALL).await,
969            Ok(vec![new_state.clone()])
970        );
971
972        // We can scan if the provided lower bound > head's sequence number.
973        assert_eq!(consensus.scan(&key, SeqNo(11), SCAN_ALL).await, Ok(vec![]));
974
975        // We can scan with limits that don't cover all states
976        assert_eq!(
977            consensus.scan(&key, SeqNo::minimum(), 1).await,
978            Ok(vec![state.clone()])
979        );
980        assert_eq!(
981            consensus.scan(&key, SeqNo(5), 1).await,
982            Ok(vec![state.clone()])
983        );
984
985        // We can scan with limits to cover exactly the number of states
986        assert_eq!(
987            consensus.scan(&key, SeqNo::minimum(), 2).await,
988            Ok(vec![state.clone(), new_state.clone()])
989        );
990
991        // We can scan with a limit larger than the number of states
992        assert_eq!(
993            consensus.scan(&key, SeqNo(4), 100).await,
994            Ok(vec![state.clone(), new_state.clone()])
995        );
996
997        // Can remove the previous write with the appropriate truncation.
998        assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(1));
999
1000        // Verify that the old write is indeed deleted.
1001        assert_eq!(
1002            consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
1003            Ok(vec![new_state.clone()])
1004        );
1005
1006        // Truncate is idempotent and can be repeated. The return value
1007        // indicates we didn't do any work though.
1008        assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(0));
1009
1010        // Make sure entries under different keys don't clash.
1011        let other_key = Uuid::new_v4().to_string();
1012
1013        assert_eq!(consensus.head(&other_key).await, Ok(None));
1014
1015        let state = VersionedData {
1016            seqno: SeqNo(1),
1017            data: Bytes::from("einszweidrei"),
1018        };
1019
1020        assert_eq!(
1021            consensus
1022                .compare_and_set(&other_key, None, state.clone())
1023                .await,
1024            Ok(CaSResult::Committed),
1025        );
1026
1027        assert_eq!(consensus.head(&other_key).await, Ok(Some(state.clone())));
1028
1029        // State for the first key is still as expected.
1030        assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
1031
1032        // Trying to update from a stale version of current doesn't work.
1033        let invalid_jump_forward = VersionedData {
1034            seqno: SeqNo(11),
1035            data: Bytes::from("invalid"),
1036        };
1037        assert_eq!(
1038            consensus
1039                .compare_and_set(&key, Some(state.seqno), invalid_jump_forward)
1040                .await,
1041            Ok(CaSResult::ExpectationMismatch),
1042        );
1043
1044        // Writing a large (~10 KiB) amount of data works fine.
1045        let large_state = VersionedData {
1046            seqno: SeqNo(11),
1047            data: std::iter::repeat(b'a').take(10240).collect(),
1048        };
1049        assert_eq!(
1050            consensus
1051                .compare_and_set(&key, Some(new_state.seqno), large_state)
1052                .await,
1053            Ok(CaSResult::Committed),
1054        );
1055
1056        // Truncate can delete more than one version at a time.
1057        let v12 = VersionedData {
1058            seqno: SeqNo(12),
1059            data: Bytes::new(),
1060        };
1061        assert_eq!(
1062            consensus.compare_and_set(&key, Some(SeqNo(11)), v12).await,
1063            Ok(CaSResult::Committed),
1064        );
1065        assert_eq!(consensus.truncate(&key, SeqNo(12)).await, Ok(2));
1066
1067        // Sequence numbers used within Consensus have to be within [0, i64::MAX].
1068
1069        assert_eq!(
1070            consensus
1071                .compare_and_set(
1072                    &Uuid::new_v4().to_string(),
1073                    None,
1074                    VersionedData {
1075                        seqno: SeqNo(0),
1076                        data: Bytes::new(),
1077                    }
1078                )
1079                .await,
1080            Ok(CaSResult::Committed),
1081        );
1082        assert_eq!(
1083            consensus
1084                .compare_and_set(
1085                    &Uuid::new_v4().to_string(),
1086                    None,
1087                    VersionedData {
1088                        seqno: SeqNo(i64::MAX.try_into().expect("i64::MAX fits in u64")),
1089                        data: Bytes::new(),
1090                    }
1091                )
1092                .await,
1093            Ok(CaSResult::Committed),
1094        );
1095        assert_err!(
1096            consensus
1097                .compare_and_set(
1098                    &Uuid::new_v4().to_string(),
1099                    None,
1100                    VersionedData {
1101                        seqno: SeqNo(1 << 63),
1102                        data: Bytes::new(),
1103                    }
1104                )
1105                .await
1106        );
1107        assert_err!(
1108            consensus
1109                .compare_and_set(
1110                    &Uuid::new_v4().to_string(),
1111                    None,
1112                    VersionedData {
1113                        seqno: SeqNo(u64::MAX),
1114                        data: Bytes::new(),
1115                    }
1116                )
1117                .await
1118        );
1119
1120        Ok(())
1121    }
1122
1123    #[mz_ore::test]
1124    fn timeout_error() {
1125        assert!(ExternalError::new_timeout(Instant::now()).is_timeout());
1126        assert!(!ExternalError::from(anyhow!("foo")).is_timeout());
1127    }
1128}