Skip to main content

mz_persist/
location.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Abstractions over files, cloud storage, etc used in persistence.
11
12use std::fmt;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use azure_core::StatusCode;
20use bytes::Bytes;
21use futures_util::Stream;
22use mz_ore::bytes::SegmentedBytes;
23use mz_ore::cast::u64_to_usize;
24use mz_postgres_client::error::PostgresError;
25use mz_proto::RustType;
26use proptest_derive::Arbitrary;
27use serde::{Deserialize, Serialize};
28use tracing::{Instrument, Span};
29
30use crate::error::Error;
31
32/// The "sequence number" of a persist state change.
33///
34/// Persist is a state machine, with all mutating requests modeled as input
35/// state changes sequenced into a log. This reflects that ordering.
36///
37/// This ordering also includes requests that were sequenced and applied to the
38/// persist state machine, but that application was deterministically made into
39/// a no-op because it was contextually invalid (a write or seal at a sealed
40/// timestamp, an allow_compactions at an unsealed timestamp, etc).
41///
42/// Read-only requests are assigned the SeqNo of a write, indicating that all
43/// mutating requests up to and including that one are reflected in the read
44/// state.
45#[derive(
46    Arbitrary,
47    Clone,
48    Copy,
49    Debug,
50    PartialOrd,
51    Ord,
52    PartialEq,
53    Eq,
54    Hash,
55    Serialize,
56    Deserialize
57)]
58pub struct SeqNo(pub u64);
59
60impl std::fmt::Display for SeqNo {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        write!(f, "v{}", self.0)
63    }
64}
65
66impl timely::PartialOrder for SeqNo {
67    fn less_equal(&self, other: &Self) -> bool {
68        self <= other
69    }
70}
71
72impl std::str::FromStr for SeqNo {
73    type Err = String;
74
75    fn from_str(encoded: &str) -> Result<Self, Self::Err> {
76        let encoded = match encoded.strip_prefix('v') {
77            Some(x) => x,
78            None => return Err(format!("invalid SeqNo {}: incorrect prefix", encoded)),
79        };
80        let seqno =
81            u64::from_str(encoded).map_err(|err| format!("invalid SeqNo {}: {}", encoded, err))?;
82        Ok(SeqNo(seqno))
83    }
84}
85
86impl SeqNo {
87    /// Returns the previous SeqNo in the sequence, if there is one.
88    pub fn previous(self) -> Option<SeqNo> {
89        Some(SeqNo(self.0.checked_sub(1)?))
90    }
91
92    /// Returns the next SeqNo in the sequence.
93    pub fn next(self) -> SeqNo {
94        SeqNo(self.0 + 1)
95    }
96
97    /// A minimum value suitable as a default.
98    pub fn minimum() -> Self {
99        SeqNo(0)
100    }
101
102    /// A maximum value.
103    pub fn maximum() -> Self {
104        SeqNo(u64::MAX)
105    }
106}
107
108impl RustType<u64> for SeqNo {
109    fn into_proto(&self) -> u64 {
110        self.0
111    }
112
113    fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
114        Ok(SeqNo(proto))
115    }
116}
117
118/// An error coming from an underlying durability system (e.g. s3) indicating
119/// that the operation _definitely did NOT succeed_ (e.g. permission denied).
120#[derive(Debug)]
121pub struct Determinate {
122    inner: anyhow::Error,
123}
124
125impl std::fmt::Display for Determinate {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        write!(f, "determinate: ")?;
128        self.inner.fmt(f)
129    }
130}
131
132impl std::error::Error for Determinate {
133    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
134        self.inner.source()
135    }
136}
137
138impl From<anyhow::Error> for Determinate {
139    fn from(inner: anyhow::Error) -> Self {
140        Self::new(inner)
141    }
142}
143
144impl Determinate {
145    /// Return a new Determinate wrapping the given error.
146    ///
147    /// Exposed for testing via [crate::unreliable].
148    pub fn new(inner: anyhow::Error) -> Self {
149        Determinate { inner }
150    }
151}
152
153/// An error coming from an underlying durability system (e.g. s3) indicating
154/// that the operation _might have succeeded_ (e.g. timeout).
155#[derive(Debug)]
156pub struct Indeterminate {
157    pub(crate) inner: anyhow::Error,
158}
159
160impl Indeterminate {
161    /// Return a new Indeterminate wrapping the given error.
162    ///
163    /// Exposed for testing.
164    pub fn new(inner: anyhow::Error) -> Self {
165        Indeterminate { inner }
166    }
167}
168
169impl std::fmt::Display for Indeterminate {
170    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171        write!(f, "indeterminate: ")?;
172        self.inner.fmt(f)
173    }
174}
175
176impl std::error::Error for Indeterminate {
177    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
178        self.inner.source()
179    }
180}
181
182/// An impl of PartialEq purely for convenience in tests and debug assertions.
183#[cfg(any(test, debug_assertions))]
184impl PartialEq for Indeterminate {
185    fn eq(&self, other: &Self) -> bool {
186        self.to_string() == other.to_string()
187    }
188}
189
190/// An error coming from an underlying durability system (e.g. s3) or from
191/// invalid data received from one.
192#[derive(Debug)]
193pub enum ExternalError {
194    /// A determinate error from an external system.
195    Determinate(Determinate),
196    /// An indeterminate error from an external system.
197    Indeterminate(Indeterminate),
198}
199
200impl ExternalError {
201    /// Returns a new error representing a timeout.
202    ///
203    /// TODO: When we overhaul errors, this presumably should instead be a type
204    /// that can be matched on.
205    #[track_caller]
206    pub fn new_timeout(deadline: Instant) -> Self {
207        ExternalError::Indeterminate(Indeterminate {
208            inner: anyhow!("timeout at {:?}", deadline),
209        })
210    }
211
212    /// Returns whether this error represents a timeout.
213    ///
214    /// TODO: When we overhaul errors, this presumably should instead be a type
215    /// that can be matched on.
216    pub fn is_timeout(&self) -> bool {
217        // Gross...
218        self.to_string().contains("timeout")
219    }
220}
221
222impl std::fmt::Display for ExternalError {
223    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
224        match self {
225            ExternalError::Determinate(x) => std::fmt::Display::fmt(x, f),
226            ExternalError::Indeterminate(x) => std::fmt::Display::fmt(x, f),
227        }
228    }
229}
230
231impl std::error::Error for ExternalError {
232    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
233        match self {
234            ExternalError::Determinate(e) => e.source(),
235            ExternalError::Indeterminate(e) => e.source(),
236        }
237    }
238}
239
240/// An impl of PartialEq purely for convenience in tests and debug assertions.
241#[cfg(any(test, debug_assertions))]
242impl PartialEq for ExternalError {
243    fn eq(&self, other: &Self) -> bool {
244        self.to_string() == other.to_string()
245    }
246}
247
248impl From<PostgresError> for ExternalError {
249    fn from(x: PostgresError) -> Self {
250        match x {
251            PostgresError::Determinate(e) => ExternalError::Determinate(Determinate::new(e)),
252            PostgresError::Indeterminate(e) => ExternalError::Indeterminate(Indeterminate::new(e)),
253        }
254    }
255}
256
257impl From<Indeterminate> for ExternalError {
258    fn from(x: Indeterminate) -> Self {
259        ExternalError::Indeterminate(x)
260    }
261}
262
263impl From<Determinate> for ExternalError {
264    fn from(x: Determinate) -> Self {
265        ExternalError::Determinate(x)
266    }
267}
268
269impl From<anyhow::Error> for ExternalError {
270    fn from(inner: anyhow::Error) -> Self {
271        ExternalError::Indeterminate(Indeterminate { inner })
272    }
273}
274
275impl From<Error> for ExternalError {
276    fn from(x: Error) -> Self {
277        ExternalError::Indeterminate(Indeterminate {
278            inner: anyhow::Error::new(x),
279        })
280    }
281}
282
283impl From<std::io::Error> for ExternalError {
284    fn from(x: std::io::Error) -> Self {
285        ExternalError::Indeterminate(Indeterminate {
286            inner: anyhow::Error::new(x),
287        })
288    }
289}
290
291impl From<deadpool_postgres::tokio_postgres::Error> for ExternalError {
292    fn from(e: deadpool_postgres::tokio_postgres::Error) -> Self {
293        let code = match e.as_db_error().map(|x| x.code()) {
294            Some(x) => x,
295            None => {
296                return ExternalError::Indeterminate(Indeterminate {
297                    inner: anyhow::Error::new(e),
298                });
299            }
300        };
301        match code {
302            // Feel free to add more things to this allowlist as we encounter
303            // them as long as you're certain they're determinate.
304            &deadpool_postgres::tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE => {
305                ExternalError::Determinate(Determinate {
306                    inner: anyhow::Error::new(e),
307                })
308            }
309            _ => ExternalError::Indeterminate(Indeterminate {
310                inner: anyhow::Error::new(e),
311            }),
312        }
313    }
314}
315
316impl From<azure_core::Error> for ExternalError {
317    fn from(value: azure_core::Error) -> Self {
318        let definitely_determinate = if let Some(http) = value.as_http_error() {
319            match http.status() {
320                // There are many other status codes that _ought_ to be determinate, according to
321                // the HTTP spec, but this includes only codes that we've observed in practice for now.
322                StatusCode::TooManyRequests => true,
323                _ => false,
324            }
325        } else {
326            false
327        };
328        if definitely_determinate {
329            ExternalError::Determinate(Determinate {
330                inner: anyhow!(value),
331            })
332        } else {
333            ExternalError::Indeterminate(Indeterminate {
334                inner: anyhow!(value),
335            })
336        }
337    }
338}
339
340impl From<deadpool_postgres::PoolError> for ExternalError {
341    fn from(x: deadpool_postgres::PoolError) -> Self {
342        match x {
343            // We have logic for turning a postgres Error into an ExternalError,
344            // so use it.
345            deadpool_postgres::PoolError::Backend(x) => ExternalError::from(x),
346            x => ExternalError::Indeterminate(Indeterminate {
347                inner: anyhow::Error::new(x),
348            }),
349        }
350    }
351}
352
353impl From<tokio::task::JoinError> for ExternalError {
354    fn from(x: tokio::task::JoinError) -> Self {
355        ExternalError::Indeterminate(Indeterminate {
356            inner: anyhow::Error::new(x),
357        })
358    }
359}
360
361/// An abstraction for a single arbitrarily-sized binary blob and an associated
362/// version number (sequence number).
363#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
364pub struct VersionedData {
365    /// The sequence number of the data.
366    pub seqno: SeqNo,
367    /// The data itself.
368    pub data: Bytes,
369}
370
371/// Helper constant to scan all states in [Consensus::scan].
372/// The maximum possible SeqNo is i64::MAX.
373// TODO(benesch): find a way to express this without `as`.
374#[allow(clippy::as_conversions)]
375pub const SCAN_ALL: usize = u64_to_usize(i64::MAX as u64);
376
377/// A key usable for liveness checks via [Consensus::head].
378pub const CONSENSUS_HEAD_LIVENESS_KEY: &str = "LIVENESS";
379
380/// Return type to indicate whether [Consensus::compare_and_set] succeeded or failed.
381#[derive(Debug, PartialEq, Serialize, Deserialize)]
382pub enum CaSResult {
383    /// The compare-and-set succeeded and committed new state.
384    Committed,
385    /// The compare-and-set failed due to expectation mismatch.
386    ExpectationMismatch,
387}
388
389/// Wraps all calls to a backing store in a new tokio task. This adds extra overhead,
390/// but insulates the system from callers who fail to drive futures promptly to completion,
391/// which can cause timeouts or resource exhaustion in a store.
392#[derive(Debug)]
393pub struct Tasked<A>(pub Arc<A>);
394
395impl<A> Tasked<A> {
396    fn clone_backing(&self) -> Arc<A> {
397        Arc::clone(&self.0)
398    }
399}
400
401/// A boxed stream, similar to what `async_trait` desugars async functions to, but hardcoded
402/// to our standard result type.
403pub type ResultStream<'a, T> = Pin<Box<dyn Stream<Item = Result<T, ExternalError>> + Send + 'a>>;
404
405/// An abstraction for [VersionedData] held in a location in persistent storage
406/// where the data are conditionally updated by version.
407///
408/// Users are expected to use this API with consistently increasing sequence numbers
409/// to allow multiple processes across multiple machines to agree to a total order
410/// of the evolution of the data. To make roundtripping through various forms of durable
411/// storage easier, sequence numbers used with [Consensus] need to be restricted to the
412/// range [0, i64::MAX].
413#[async_trait]
414pub trait Consensus: std::fmt::Debug + Send + Sync {
415    /// Returns all the keys ever created in the consensus store.
416    fn list_keys(&self) -> ResultStream<'_, String>;
417
418    /// Returns a recent version of `data`, and the corresponding sequence number, if
419    /// one exists at this location.
420    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError>;
421
422    /// Add the [VersionedData] to the log for the given key. If the sequence number is 0, the log
423    /// must be empty; otherwise, it must be one greater than the previous sequence number.
424    /// It is invalid to call
425    /// this function with a sequence number outside of the range `[0, i64::MAX]`.
426    async fn compare_and_set(
427        &self,
428        key: &str,
429        new: VersionedData,
430    ) -> Result<CaSResult, ExternalError>;
431
432    /// Return `limit` versions of data stored for this `key` at sequence numbers >= `from`,
433    /// in ascending order of sequence number.
434    ///
435    /// Returns an empty vec if `from` is greater than the current sequence
436    /// number or if there is no data at this key.
437    async fn scan(
438        &self,
439        key: &str,
440        from: SeqNo,
441        limit: usize,
442    ) -> Result<Vec<VersionedData>, ExternalError>;
443
444    /// Deletes all historical versions of the data stored at `key` that are <
445    /// `seqno`, iff `seqno` <= the current sequence number.
446    ///
447    /// Returns the number of versions deleted or `None` on success. Returns an error if
448    /// `seqno` is greater than the current sequence number, or if there is no
449    /// data at this key.
450    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError>;
451}
452
453#[async_trait]
454impl<A: Consensus + 'static> Consensus for Tasked<A> {
455    fn list_keys(&self) -> ResultStream<'_, String> {
456        // Similarly to Blob::list_keys_and_metadata, this is difficult to make into a task.
457        // (If we use an unbounded channel between the task and the caller, we can buffer forever;
458        // if we use a bounded channel, we lose the isolation benefits of Tasked.)
459        // However, this should only be called in administrative contexts
460        // and not in the main state-machine impl.
461        self.0.list_keys()
462    }
463
464    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
465        let backing = self.clone_backing();
466        let key = key.to_owned();
467        mz_ore::task::spawn(
468            || "persist::task::head",
469            async move { backing.head(&key).await }.instrument(Span::current()),
470        )
471        .await
472    }
473
474    async fn compare_and_set(
475        &self,
476        key: &str,
477        new: VersionedData,
478    ) -> Result<CaSResult, ExternalError> {
479        let backing = self.clone_backing();
480        let key = key.to_owned();
481        mz_ore::task::spawn(
482            || "persist::task::cas",
483            async move { backing.compare_and_set(&key, new).await }.instrument(Span::current()),
484        )
485        .await
486    }
487
488    async fn scan(
489        &self,
490        key: &str,
491        from: SeqNo,
492        limit: usize,
493    ) -> Result<Vec<VersionedData>, ExternalError> {
494        let backing = self.clone_backing();
495        let key = key.to_owned();
496        mz_ore::task::spawn(
497            || "persist::task::scan",
498            async move { backing.scan(&key, from, limit).await }.instrument(Span::current()),
499        )
500        .await
501    }
502
503    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
504        let backing = self.clone_backing();
505        let key = key.to_owned();
506        mz_ore::task::spawn(
507            || "persist::task::truncate",
508            async move { backing.truncate(&key, seqno).await }.instrument(Span::current()),
509        )
510        .await
511    }
512}
513
514/// Metadata about a particular blob stored by persist
515#[derive(Debug)]
516pub struct BlobMetadata<'a> {
517    /// The key for the blob
518    pub key: &'a str,
519    /// Size of the blob
520    pub size_in_bytes: u64,
521}
522
523/// A key usable for liveness checks via [Blob::get].
524pub const BLOB_GET_LIVENESS_KEY: &str = "LIVENESS";
525
526/// An abstraction over read-write access to a `bytes key`->`bytes value` store.
527///
528/// Implementations are required to be _linearizable_.
529///
530/// TODO: Consider whether this can be relaxed. Since our usage is write-once
531/// modify-never, it certainly seems like we could by adding retries around
532/// `get` to wait for a non-linearizable `set` to show up. However, the tricky
533/// bit comes once we stop handing out seqno capabilities to readers and have to
534/// start reasoning about "this set hasn't show up yet" vs "the blob has already
535/// been deleted". Another tricky problem is the same but for a deletion when
536/// the first attempt timed out.
537#[async_trait]
538pub trait Blob: std::fmt::Debug + Send + Sync {
539    /// Returns a reference to the value corresponding to the key.
540    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError>;
541
542    /// List all of the keys in the map with metadata about the entry.
543    ///
544    /// Can be optionally restricted to only list keys starting with a
545    /// given prefix.
546    async fn list_keys_and_metadata(
547        &self,
548        key_prefix: &str,
549        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
550    ) -> Result<(), ExternalError>;
551
552    /// Inserts a key-value pair into the map.
553    ///
554    /// Writes must be atomic and either succeed or leave the previous value
555    /// intact.
556    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError>;
557
558    /// Remove a key from the map.
559    ///
560    /// Returns Some and the size of the deleted blob if if exists. Succeeds and
561    /// returns None if it does not exist.
562    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError>;
563
564    /// Restores a previously-deleted key to the map, if possible.
565    ///
566    /// Returns successfully if the key exists after this call: perhaps because it already existed
567    /// or was restored. (In particular, this makes restore idempotent.)
568    /// Fails if we were unable to restore any value for that key:
569    /// perhaps the key was never written, or was permanently deleted.
570    ///
571    /// It is acceptable for [Blob::restore] to be unable
572    /// to restore keys, in which case this method should succeed iff the key exists.
573    async fn restore(&self, key: &str) -> Result<(), ExternalError>;
574}
575
576#[async_trait]
577impl<A: Blob + 'static> Blob for Tasked<A> {
578    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
579        let backing = self.clone_backing();
580        let key = key.to_owned();
581        mz_ore::task::spawn(
582            || "persist::task::get",
583            async move { backing.get(&key).await }.instrument(Span::current()),
584        )
585        .await
586    }
587
588    /// List all of the keys in the map with metadata about the entry.
589    ///
590    /// Can be optionally restricted to only list keys starting with a
591    /// given prefix.
592    async fn list_keys_and_metadata(
593        &self,
594        key_prefix: &str,
595        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
596    ) -> Result<(), ExternalError> {
597        // TODO: No good way that I can see to make this one a task because of
598        // the closure and Blob needing to be object-safe.
599        self.0.list_keys_and_metadata(key_prefix, f).await
600    }
601
602    /// Inserts a key-value pair into the map.
603    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
604        let backing = self.clone_backing();
605        let key = key.to_owned();
606        mz_ore::task::spawn(
607            || "persist::task::set",
608            async move { backing.set(&key, value).await }.instrument(Span::current()),
609        )
610        .await
611    }
612
613    /// Remove a key from the map.
614    ///
615    /// Returns Some and the size of the deleted blob if if exists. Succeeds and
616    /// returns None if it does not exist.
617    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
618        let backing = self.clone_backing();
619        let key = key.to_owned();
620        mz_ore::task::spawn(
621            || "persist::task::delete",
622            async move { backing.delete(&key).await }.instrument(Span::current()),
623        )
624        .await
625    }
626
627    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
628        let backing = self.clone_backing();
629        let key = key.to_owned();
630        mz_ore::task::spawn(
631            || "persist::task::restore",
632            async move { backing.restore(&key).await }.instrument(Span::current()),
633        )
634        .await
635    }
636}
637
638/// Test helpers for the crate.
639#[cfg(test)]
640pub mod tests {
641    use std::future::Future;
642
643    use anyhow::anyhow;
644    use futures_util::TryStreamExt;
645    use mz_ore::{assert_err, assert_ok};
646    use uuid::Uuid;
647
648    use crate::location::Blob;
649
650    use super::*;
651
652    fn keys(baseline: &[String], new: &[&str]) -> Vec<String> {
653        let mut ret = baseline.to_vec();
654        ret.extend(new.iter().map(|x| x.to_string()));
655        ret.sort();
656        ret
657    }
658
659    async fn get_keys(b: &impl Blob) -> Result<Vec<String>, ExternalError> {
660        let mut keys = vec![];
661        b.list_keys_and_metadata("", &mut |entry| keys.push(entry.key.to_string()))
662            .await?;
663        Ok(keys)
664    }
665
666    async fn get_keys_with_prefix(
667        b: &impl Blob,
668        prefix: &str,
669    ) -> Result<Vec<String>, ExternalError> {
670        let mut keys = vec![];
671        b.list_keys_and_metadata(prefix, &mut |entry| keys.push(entry.key.to_string()))
672            .await?;
673        Ok(keys)
674    }
675
676    /// Common test impl for different blob implementations.
677    pub async fn blob_impl_test<
678        B: Blob,
679        F: Future<Output = Result<B, ExternalError>>,
680        NewFn: Fn(&'static str) -> F,
681    >(
682        new_fn: NewFn,
683    ) -> Result<(), ExternalError> {
684        let values = ["v0".as_bytes().to_vec(), "v1".as_bytes().to_vec()];
685
686        let blob0 = new_fn("path0").await?;
687
688        // We can create a second blob writing to a different place.
689        let _ = new_fn("path1").await?;
690
691        // We can open two blobs to the same place, even.
692        let blob1 = new_fn("path0").await?;
693
694        let k0 = "foo/bar/k0";
695
696        // Empty key is empty.
697        assert_eq!(blob0.get(k0).await?, None);
698        assert_eq!(blob1.get(k0).await?, None);
699
700        // Empty list keys is empty.
701        let empty_keys = get_keys(&blob0).await?;
702        assert_eq!(empty_keys, Vec::<String>::new());
703        let empty_keys = get_keys(&blob1).await?;
704        assert_eq!(empty_keys, Vec::<String>::new());
705
706        // Set a key and get it back.
707        blob0.set(k0, values[0].clone().into()).await?;
708        assert_eq!(
709            blob0.get(k0).await?.map(|s| s.into_contiguous()),
710            Some(values[0].clone())
711        );
712        assert_eq!(
713            blob1.get(k0).await?.map(|s| s.into_contiguous()),
714            Some(values[0].clone())
715        );
716
717        // Set another key and get it back.
718        blob0.set("k0a", values[0].clone().into()).await?;
719        assert_eq!(
720            blob0.get("k0a").await?.map(|s| s.into_contiguous()),
721            Some(values[0].clone())
722        );
723        assert_eq!(
724            blob1.get("k0a").await?.map(|s| s.into_contiguous()),
725            Some(values[0].clone())
726        );
727
728        // Blob contains the key we just inserted.
729        let mut blob_keys = get_keys(&blob0).await?;
730        blob_keys.sort();
731        assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
732        let mut blob_keys = get_keys(&blob1).await?;
733        blob_keys.sort();
734        assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
735
736        // Can overwrite a key.
737        blob0.set(k0, values[1].clone().into()).await?;
738        assert_eq!(
739            blob0.get(k0).await?.map(|s| s.into_contiguous()),
740            Some(values[1].clone())
741        );
742        assert_eq!(
743            blob1.get(k0).await?.map(|s| s.into_contiguous()),
744            Some(values[1].clone())
745        );
746        // Can overwrite another key.
747        blob0.set("k0a", values[1].clone().into()).await?;
748        assert_eq!(
749            blob0.get("k0a").await?.map(|s| s.into_contiguous()),
750            Some(values[1].clone())
751        );
752        assert_eq!(
753            blob1.get("k0a").await?.map(|s| s.into_contiguous()),
754            Some(values[1].clone())
755        );
756
757        // Can delete a key.
758        assert_eq!(blob0.delete(k0).await, Ok(Some(2)));
759        // Can no longer get a deleted key.
760        assert_eq!(blob0.get(k0).await?, None);
761        assert_eq!(blob1.get(k0).await?, None);
762        // Double deleting a key succeeds but indicates that it did no work.
763        assert_eq!(blob0.delete(k0).await, Ok(None));
764        // Deleting a key that does not exist succeeds.
765        assert_eq!(blob0.delete("nope").await, Ok(None));
766        // Deleting a key with an empty value indicates it did work but deleted
767        // no bytes.
768        blob0.set("empty", Bytes::new()).await?;
769        assert_eq!(blob0.delete("empty").await, Ok(Some(0)));
770
771        // Attempt to restore a key. Not all backends will be able to restore, but
772        // we can confirm that our data is visible iff restore reported success.
773        blob0.set("undelete", Bytes::from("data")).await?;
774        // Restoring should always succeed when the key exists.
775        blob0.restore("undelete").await?;
776        assert_eq!(blob0.delete("undelete").await?, Some("data".len()));
777        let expected = match blob0.restore("undelete").await {
778            Ok(()) => Some(Bytes::from("data").into()),
779            Err(ExternalError::Determinate(_)) => None,
780            Err(other) => return Err(other),
781        };
782        assert_eq!(blob0.get("undelete").await?, expected);
783        blob0.delete("undelete").await?;
784
785        // Empty blob contains no keys.
786        blob0.delete("k0a").await?;
787        let mut blob_keys = get_keys(&blob0).await?;
788        blob_keys.sort();
789        assert_eq!(blob_keys, empty_keys);
790        let mut blob_keys = get_keys(&blob1).await?;
791        blob_keys.sort();
792        assert_eq!(blob_keys, empty_keys);
793        // Can reset a deleted key to some other value.
794        blob0.set(k0, values[1].clone().into()).await?;
795        assert_eq!(
796            blob1.get(k0).await?.map(|s| s.into_contiguous()),
797            Some(values[1].clone())
798        );
799        assert_eq!(
800            blob0.get(k0).await?.map(|s| s.into_contiguous()),
801            Some(values[1].clone())
802        );
803
804        // Insert multiple keys back to back and validate that we can list
805        // them all out.
806        let mut expected_keys = empty_keys;
807        for i in 1..=5 {
808            let key = format!("k{}", i);
809            blob0.set(&key, values[0].clone().into()).await?;
810            expected_keys.push(key);
811        }
812
813        // Blob contains the key we just inserted.
814        let mut blob_keys = get_keys(&blob0).await?;
815        blob_keys.sort();
816        assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
817        let mut blob_keys = get_keys(&blob1).await?;
818        blob_keys.sort();
819        assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
820
821        // Insert multiple keys with a different prefix and validate that we can
822        // list out keys by their prefix
823        let mut expected_prefix_keys = vec![];
824        for i in 1..=3 {
825            let key = format!("k-prefix-{}", i);
826            blob0.set(&key, values[0].clone().into()).await?;
827            expected_prefix_keys.push(key);
828        }
829        let mut blob_keys = get_keys_with_prefix(&blob0, "k-prefix").await?;
830        blob_keys.sort();
831        assert_eq!(blob_keys, expected_prefix_keys);
832        let mut blob_keys = get_keys_with_prefix(&blob0, "k").await?;
833        blob_keys.sort();
834        expected_keys.extend(expected_prefix_keys);
835        expected_keys.sort();
836        assert_eq!(blob_keys, expected_keys);
837
838        // We can open a new blob to the same path and use it.
839        let blob3 = new_fn("path0").await?;
840        assert_eq!(
841            blob3.get(k0).await?.map(|s| s.into_contiguous()),
842            Some(values[1].clone())
843        );
844
845        Ok(())
846    }
847
848    /// Common test impl for different consensus implementations.
849    pub async fn consensus_impl_test<
850        C: Consensus,
851        F: Future<Output = Result<C, ExternalError>>,
852        NewFn: FnMut() -> F,
853    >(
854        mut new_fn: NewFn,
855    ) -> Result<(), ExternalError> {
856        let consensus = new_fn().await?;
857
858        // Use a random key so independent runs of this test don't interfere
859        // with each other.
860        let key = Uuid::new_v4().to_string();
861
862        // Starting value of consensus data is None.
863        assert_eq!(consensus.head(&key).await, Ok(None));
864
865        // Can scan a key that has no data.
866        assert_eq!(consensus.scan(&key, SeqNo(0), SCAN_ALL).await, Ok(vec![]));
867
868        // Cannot truncate data from a key that doesn't have any data
869        assert_err!(consensus.truncate(&key, SeqNo(0)).await);
870
871        let state_at = |v| VersionedData {
872            seqno: SeqNo(v),
873            data: Bytes::from("abc"),
874        };
875
876        // Incorrectly setting the data with a non-initial seqno should fail.
877        assert_eq!(
878            consensus.compare_and_set(&key, state_at(1)).await,
879            Ok(CaSResult::ExpectationMismatch),
880        );
881
882        // Correctly updating the state with the correct expected value should succeed.
883        assert_eq!(
884            consensus.compare_and_set(&key, state_at(0)).await,
885            Ok(CaSResult::Committed),
886        );
887
888        // The new key is visible in state.
889        let keys: Vec<_> = consensus.list_keys().try_collect().await?;
890        assert_eq!(keys, vec![key.to_owned()]);
891
892        // We can observe the a recent value on successful update.
893        assert_eq!(consensus.head(&key).await, Ok(Some(state_at(0))));
894
895        // Can scan a key that has data with a lower bound sequence number < head.
896        assert_eq!(
897            consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
898            Ok(vec![state_at(0)])
899        );
900
901        // Can scan a key that has data with a lower bound sequence number == head.
902        assert_eq!(
903            consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
904            Ok(vec![state_at(0)])
905        );
906
907        // Can scan a key that has data with a lower bound sequence number >
908        // head.
909        assert_eq!(consensus.scan(&key, SeqNo(1), SCAN_ALL).await, Ok(vec![]));
910
911        // Can truncate data with an upper bound <= head, even if there is no data in the
912        // range [0, upper).
913        assert_ok!(consensus.truncate(&key, SeqNo(0)).await);
914
915        // Cannot truncate data with an upper bound > head.
916        assert_err!(consensus.truncate(&key, SeqNo(1)).await);
917
918        let new_state_at = |v| VersionedData {
919            seqno: SeqNo(v),
920            data: Bytes::from("def"),
921        };
922
923        // Trying to update without the correct expected seqno fails, (even if expected > current)
924        assert_eq!(
925            consensus.compare_and_set(&key, new_state_at(3)).await,
926            Ok(CaSResult::ExpectationMismatch),
927        );
928
929        // Trying to update without the correct expected seqno fails, (even if expected < current)
930        assert_eq!(
931            consensus.compare_and_set(&key, new_state_at(0)).await,
932            Ok(CaSResult::ExpectationMismatch),
933        );
934
935        // Can correctly update to a new state if we provide the right expected seqno
936        assert_eq!(
937            consensus.compare_and_set(&key, new_state_at(1)).await,
938            Ok(CaSResult::Committed),
939        );
940
941        // We can observe the a recent value on successful update.
942        assert_eq!(consensus.head(&key).await, Ok(Some(new_state_at(1))));
943
944        // We can observe both states in the correct order with scan if pass
945        // in a suitable lower bound.
946        assert_eq!(
947            consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
948            Ok(vec![state_at(0), new_state_at(1)])
949        );
950
951        // We can observe only the most recent state if the lower bound is higher
952        // than the previous insertion's sequence number.
953        assert_eq!(
954            consensus.scan(&key, SeqNo(1), SCAN_ALL).await,
955            Ok(vec![new_state_at(1)])
956        );
957
958        // We can scan if the provided lower bound > head's sequence number.
959        assert_eq!(consensus.scan(&key, SeqNo(2), SCAN_ALL).await, Ok(vec![]));
960
961        // We can scan with limits that don't cover all states
962        assert_eq!(
963            consensus.scan(&key, SeqNo::minimum(), 1).await,
964            Ok(vec![state_at(0)])
965        );
966
967        // We can scan with limits to cover exactly the number of states
968        assert_eq!(
969            consensus.scan(&key, SeqNo::minimum(), 2).await,
970            Ok(vec![state_at(0), new_state_at(1)])
971        );
972
973        // We can scan with a limit larger than the number of states
974        assert_eq!(
975            consensus.scan(&key, SeqNo(0), 100).await,
976            Ok(vec![state_at(0), new_state_at(1)])
977        );
978
979        // Can remove the previous write with the appropriate truncation.
980        assert_ok!(consensus.truncate(&key, SeqNo(1)).await);
981
982        // Verify that the old write is indeed deleted.
983        assert_eq!(
984            consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
985            Ok(vec![new_state_at(1)])
986        );
987
988        // Truncate is idempotent and can be repeated. The return value
989        // indicates we didn't do any work though.
990        assert_ok!(consensus.truncate(&key, SeqNo(1)).await);
991
992        // Make sure entries under different keys don't clash.
993        let other_key = Uuid::new_v4().to_string();
994
995        assert_eq!(consensus.head(&other_key).await, Ok(None));
996
997        let state = VersionedData {
998            seqno: SeqNo(0),
999            data: Bytes::from("einszweidrei"),
1000        };
1001
1002        assert_eq!(
1003            consensus.compare_and_set(&other_key, state.clone()).await,
1004            Ok(CaSResult::Committed),
1005        );
1006
1007        assert_eq!(consensus.head(&other_key).await, Ok(Some(state.clone())));
1008
1009        // State for the first key is still as expected.
1010        assert_eq!(consensus.head(&key).await, Ok(Some(new_state_at(1))));
1011
1012        // Trying to update from a stale version of current doesn't work.
1013        let invalid_jump_forward = VersionedData {
1014            seqno: SeqNo(11),
1015            data: Bytes::from("invalid"),
1016        };
1017        assert_eq!(
1018            consensus.compare_and_set(&key, invalid_jump_forward).await,
1019            Ok(CaSResult::ExpectationMismatch),
1020        );
1021
1022        // Writing a large (~10 KiB) amount of data works fine.
1023        let large_state = VersionedData {
1024            seqno: SeqNo(2),
1025            data: std::iter::repeat(b'a').take(10240).collect(),
1026        };
1027        assert_eq!(
1028            consensus.compare_and_set(&key, large_state).await,
1029            Ok(CaSResult::Committed),
1030        );
1031
1032        // Truncate can delete more than one version at a time.
1033        let v3 = VersionedData {
1034            seqno: SeqNo(3),
1035            data: Bytes::new(),
1036        };
1037        assert_eq!(
1038            consensus.compare_and_set(&key, v3).await,
1039            Ok(CaSResult::Committed),
1040        );
1041        assert_ok!(consensus.truncate(&key, SeqNo(3)).await);
1042
1043        Ok(())
1044    }
1045
1046    #[mz_ore::test]
1047    fn timeout_error() {
1048        assert!(ExternalError::new_timeout(Instant::now()).is_timeout());
1049        assert!(!ExternalError::from(anyhow!("foo")).is_timeout());
1050    }
1051}