Skip to main content

mz_persist/
location.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Abstractions over files, cloud storage, etc used in persistence.
11
12use std::fmt;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use azure_core::StatusCode;
20use bytes::Bytes;
21use futures_util::Stream;
22use mz_ore::bytes::SegmentedBytes;
23use mz_ore::cast::u64_to_usize;
24use mz_postgres_client::error::PostgresError;
25use mz_proto::RustType;
26use proptest_derive::Arbitrary;
27use serde::{Deserialize, Serialize};
28use tracing::{Instrument, Span};
29
30use crate::error::Error;
31
32/// The "sequence number" of a persist state change.
33///
34/// Persist is a state machine, with all mutating requests modeled as input
35/// state changes sequenced into a log. This reflects that ordering.
36///
37/// This ordering also includes requests that were sequenced and applied to the
38/// persist state machine, but that application was deterministically made into
39/// a no-op because it was contextually invalid (a write or seal at a sealed
40/// timestamp, an allow_compactions at an unsealed timestamp, etc).
41///
42/// Read-only requests are assigned the SeqNo of a write, indicating that all
43/// mutating requests up to and including that one are reflected in the read
44/// state.
45#[derive(
46    Arbitrary, Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize,
47)]
48pub struct SeqNo(pub u64);
49
50impl std::fmt::Display for SeqNo {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        write!(f, "v{}", self.0)
53    }
54}
55
56impl timely::PartialOrder for SeqNo {
57    fn less_equal(&self, other: &Self) -> bool {
58        self <= other
59    }
60}
61
62impl std::str::FromStr for SeqNo {
63    type Err = String;
64
65    fn from_str(encoded: &str) -> Result<Self, Self::Err> {
66        let encoded = match encoded.strip_prefix('v') {
67            Some(x) => x,
68            None => return Err(format!("invalid SeqNo {}: incorrect prefix", encoded)),
69        };
70        let seqno =
71            u64::from_str(encoded).map_err(|err| format!("invalid SeqNo {}: {}", encoded, err))?;
72        Ok(SeqNo(seqno))
73    }
74}
75
76impl SeqNo {
77    /// Returns the next SeqNo in the sequence.
78    pub fn next(self) -> SeqNo {
79        SeqNo(self.0 + 1)
80    }
81
82    /// A minimum value suitable as a default.
83    pub fn minimum() -> Self {
84        SeqNo(0)
85    }
86
87    /// A maximum value.
88    pub fn maximum() -> Self {
89        SeqNo(u64::MAX)
90    }
91}
92
93impl RustType<u64> for SeqNo {
94    fn into_proto(&self) -> u64 {
95        self.0
96    }
97
98    fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
99        Ok(SeqNo(proto))
100    }
101}
102
103/// An error coming from an underlying durability system (e.g. s3) indicating
104/// that the operation _definitely did NOT succeed_ (e.g. permission denied).
105#[derive(Debug)]
106pub struct Determinate {
107    inner: anyhow::Error,
108}
109
110impl std::fmt::Display for Determinate {
111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112        write!(f, "determinate: ")?;
113        self.inner.fmt(f)
114    }
115}
116
117impl std::error::Error for Determinate {
118    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
119        self.inner.source()
120    }
121}
122
123impl From<anyhow::Error> for Determinate {
124    fn from(inner: anyhow::Error) -> Self {
125        Self::new(inner)
126    }
127}
128
129impl Determinate {
130    /// Return a new Determinate wrapping the given error.
131    ///
132    /// Exposed for testing via [crate::unreliable].
133    pub fn new(inner: anyhow::Error) -> Self {
134        Determinate { inner }
135    }
136}
137
138/// An error coming from an underlying durability system (e.g. s3) indicating
139/// that the operation _might have succeeded_ (e.g. timeout).
140#[derive(Debug)]
141pub struct Indeterminate {
142    pub(crate) inner: anyhow::Error,
143}
144
145impl Indeterminate {
146    /// Return a new Indeterminate wrapping the given error.
147    ///
148    /// Exposed for testing.
149    pub fn new(inner: anyhow::Error) -> Self {
150        Indeterminate { inner }
151    }
152}
153
154impl std::fmt::Display for Indeterminate {
155    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156        write!(f, "indeterminate: ")?;
157        self.inner.fmt(f)
158    }
159}
160
161impl std::error::Error for Indeterminate {
162    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
163        self.inner.source()
164    }
165}
166
167/// An impl of PartialEq purely for convenience in tests and debug assertions.
168#[cfg(any(test, debug_assertions))]
169impl PartialEq for Indeterminate {
170    fn eq(&self, other: &Self) -> bool {
171        self.to_string() == other.to_string()
172    }
173}
174
175/// An error coming from an underlying durability system (e.g. s3) or from
176/// invalid data received from one.
177#[derive(Debug)]
178pub enum ExternalError {
179    /// A determinate error from an external system.
180    Determinate(Determinate),
181    /// An indeterminate error from an external system.
182    Indeterminate(Indeterminate),
183}
184
185impl ExternalError {
186    /// Returns a new error representing a timeout.
187    ///
188    /// TODO: When we overhaul errors, this presumably should instead be a type
189    /// that can be matched on.
190    #[track_caller]
191    pub fn new_timeout(deadline: Instant) -> Self {
192        ExternalError::Indeterminate(Indeterminate {
193            inner: anyhow!("timeout at {:?}", deadline),
194        })
195    }
196
197    /// Returns whether this error represents a timeout.
198    ///
199    /// TODO: When we overhaul errors, this presumably should instead be a type
200    /// that can be matched on.
201    pub fn is_timeout(&self) -> bool {
202        // Gross...
203        self.to_string().contains("timeout")
204    }
205}
206
207impl std::fmt::Display for ExternalError {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        match self {
210            ExternalError::Determinate(x) => std::fmt::Display::fmt(x, f),
211            ExternalError::Indeterminate(x) => std::fmt::Display::fmt(x, f),
212        }
213    }
214}
215
216impl std::error::Error for ExternalError {
217    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
218        match self {
219            ExternalError::Determinate(e) => e.source(),
220            ExternalError::Indeterminate(e) => e.source(),
221        }
222    }
223}
224
225/// An impl of PartialEq purely for convenience in tests and debug assertions.
226#[cfg(any(test, debug_assertions))]
227impl PartialEq for ExternalError {
228    fn eq(&self, other: &Self) -> bool {
229        self.to_string() == other.to_string()
230    }
231}
232
233impl From<PostgresError> for ExternalError {
234    fn from(x: PostgresError) -> Self {
235        match x {
236            PostgresError::Determinate(e) => ExternalError::Determinate(Determinate::new(e)),
237            PostgresError::Indeterminate(e) => ExternalError::Indeterminate(Indeterminate::new(e)),
238        }
239    }
240}
241
242impl From<Indeterminate> for ExternalError {
243    fn from(x: Indeterminate) -> Self {
244        ExternalError::Indeterminate(x)
245    }
246}
247
248impl From<Determinate> for ExternalError {
249    fn from(x: Determinate) -> Self {
250        ExternalError::Determinate(x)
251    }
252}
253
254impl From<anyhow::Error> for ExternalError {
255    fn from(inner: anyhow::Error) -> Self {
256        ExternalError::Indeterminate(Indeterminate { inner })
257    }
258}
259
260impl From<Error> for ExternalError {
261    fn from(x: Error) -> Self {
262        ExternalError::Indeterminate(Indeterminate {
263            inner: anyhow::Error::new(x),
264        })
265    }
266}
267
268impl From<std::io::Error> for ExternalError {
269    fn from(x: std::io::Error) -> Self {
270        ExternalError::Indeterminate(Indeterminate {
271            inner: anyhow::Error::new(x),
272        })
273    }
274}
275
276impl From<deadpool_postgres::tokio_postgres::Error> for ExternalError {
277    fn from(e: deadpool_postgres::tokio_postgres::Error) -> Self {
278        let code = match e.as_db_error().map(|x| x.code()) {
279            Some(x) => x,
280            None => {
281                return ExternalError::Indeterminate(Indeterminate {
282                    inner: anyhow::Error::new(e),
283                });
284            }
285        };
286        match code {
287            // Feel free to add more things to this allowlist as we encounter
288            // them as long as you're certain they're determinate.
289            &deadpool_postgres::tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE => {
290                ExternalError::Determinate(Determinate {
291                    inner: anyhow::Error::new(e),
292                })
293            }
294            _ => ExternalError::Indeterminate(Indeterminate {
295                inner: anyhow::Error::new(e),
296            }),
297        }
298    }
299}
300
301impl From<azure_core::Error> for ExternalError {
302    fn from(value: azure_core::Error) -> Self {
303        let definitely_determinate = if let Some(http) = value.as_http_error() {
304            match http.status() {
305                // There are many other status codes that _ought_ to be determinate, according to
306                // the HTTP spec, but this includes only codes that we've observed in practice for now.
307                StatusCode::TooManyRequests => true,
308                _ => false,
309            }
310        } else {
311            false
312        };
313        if definitely_determinate {
314            ExternalError::Determinate(Determinate {
315                inner: anyhow!(value),
316            })
317        } else {
318            ExternalError::Indeterminate(Indeterminate {
319                inner: anyhow!(value),
320            })
321        }
322    }
323}
324
325impl From<deadpool_postgres::PoolError> for ExternalError {
326    fn from(x: deadpool_postgres::PoolError) -> Self {
327        match x {
328            // We have logic for turning a postgres Error into an ExternalError,
329            // so use it.
330            deadpool_postgres::PoolError::Backend(x) => ExternalError::from(x),
331            x => ExternalError::Indeterminate(Indeterminate {
332                inner: anyhow::Error::new(x),
333            }),
334        }
335    }
336}
337
338impl From<tokio::task::JoinError> for ExternalError {
339    fn from(x: tokio::task::JoinError) -> Self {
340        ExternalError::Indeterminate(Indeterminate {
341            inner: anyhow::Error::new(x),
342        })
343    }
344}
345
346/// An abstraction for a single arbitrarily-sized binary blob and an associated
347/// version number (sequence number).
348#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
349pub struct VersionedData {
350    /// The sequence number of the data.
351    pub seqno: SeqNo,
352    /// The data itself.
353    pub data: Bytes,
354}
355
356/// Helper constant to scan all states in [Consensus::scan].
357/// The maximum possible SeqNo is i64::MAX.
358// TODO(benesch): find a way to express this without `as`.
359#[allow(clippy::as_conversions)]
360pub const SCAN_ALL: usize = u64_to_usize(i64::MAX as u64);
361
362/// A key usable for liveness checks via [Consensus::head].
363pub const CONSENSUS_HEAD_LIVENESS_KEY: &str = "LIVENESS";
364
365/// Return type to indicate whether [Consensus::compare_and_set] succeeded or failed.
366#[derive(Debug, PartialEq, Serialize, Deserialize)]
367pub enum CaSResult {
368    /// The compare-and-set succeeded and committed new state.
369    Committed,
370    /// The compare-and-set failed due to expectation mismatch.
371    ExpectationMismatch,
372}
373
374/// Wraps all calls to a backing store in a new tokio task. This adds extra overhead,
375/// but insulates the system from callers who fail to drive futures promptly to completion,
376/// which can cause timeouts or resource exhaustion in a store.
377#[derive(Debug)]
378pub struct Tasked<A>(pub Arc<A>);
379
380impl<A> Tasked<A> {
381    fn clone_backing(&self) -> Arc<A> {
382        Arc::clone(&self.0)
383    }
384}
385
386/// A boxed stream, similar to what `async_trait` desugars async functions to, but hardcoded
387/// to our standard result type.
388pub type ResultStream<'a, T> = Pin<Box<dyn Stream<Item = Result<T, ExternalError>> + Send + 'a>>;
389
390/// An abstraction for [VersionedData] held in a location in persistent storage
391/// where the data are conditionally updated by version.
392///
393/// Users are expected to use this API with consistently increasing sequence numbers
394/// to allow multiple processes across multiple machines to agree to a total order
395/// of the evolution of the data. To make roundtripping through various forms of durable
396/// storage easier, sequence numbers used with [Consensus] need to be restricted to the
397/// range [0, i64::MAX].
398#[async_trait]
399pub trait Consensus: std::fmt::Debug + Send + Sync {
400    /// Returns all the keys ever created in the consensus store.
401    fn list_keys(&self) -> ResultStream<'_, String>;
402
403    /// Returns a recent version of `data`, and the corresponding sequence number, if
404    /// one exists at this location.
405    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError>;
406
407    /// Update the [VersionedData] stored at this location to `new`, iff the
408    /// current sequence number is exactly `expected` and `new`'s sequence
409    /// number > the current sequence number.
410    ///
411    /// It is invalid to call this function with a `new` and `expected` such
412    /// that `new`'s sequence number is <= `expected`. It is invalid to call
413    /// this function with a sequence number outside of the range `[0, i64::MAX]`.
414    ///
415    /// This data is initialized to None, and the first call to compare_and_set
416    /// needs to happen with None as the expected value to set the state.
417    async fn compare_and_set(
418        &self,
419        key: &str,
420        expected: Option<SeqNo>,
421        new: VersionedData,
422    ) -> Result<CaSResult, ExternalError>;
423
424    /// Return `limit` versions of data stored for this `key` at sequence numbers >= `from`,
425    /// in ascending order of sequence number.
426    ///
427    /// Returns an empty vec if `from` is greater than the current sequence
428    /// number or if there is no data at this key.
429    async fn scan(
430        &self,
431        key: &str,
432        from: SeqNo,
433        limit: usize,
434    ) -> Result<Vec<VersionedData>, ExternalError>;
435
436    /// Deletes all historical versions of the data stored at `key` that are <
437    /// `seqno`, iff `seqno` <= the current sequence number.
438    ///
439    /// Returns the number of versions deleted or `None` on success. Returns an error if
440    /// `seqno` is greater than the current sequence number, or if there is no
441    /// data at this key.
442    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError>;
443}
444
445#[async_trait]
446impl<A: Consensus + 'static> Consensus for Tasked<A> {
447    fn list_keys(&self) -> ResultStream<'_, String> {
448        // Similarly to Blob::list_keys_and_metadata, this is difficult to make into a task.
449        // (If we use an unbounded channel between the task and the caller, we can buffer forever;
450        // if we use a bounded channel, we lose the isolation benefits of Tasked.)
451        // However, this should only be called in administrative contexts
452        // and not in the main state-machine impl.
453        self.0.list_keys()
454    }
455
456    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
457        let backing = self.clone_backing();
458        let key = key.to_owned();
459        mz_ore::task::spawn(
460            || "persist::task::head",
461            async move { backing.head(&key).await }.instrument(Span::current()),
462        )
463        .await
464    }
465
466    async fn compare_and_set(
467        &self,
468        key: &str,
469        expected: Option<SeqNo>,
470        new: VersionedData,
471    ) -> Result<CaSResult, ExternalError> {
472        let backing = self.clone_backing();
473        let key = key.to_owned();
474        mz_ore::task::spawn(
475            || "persist::task::cas",
476            async move { backing.compare_and_set(&key, expected, new).await }
477                .instrument(Span::current()),
478        )
479        .await
480    }
481
482    async fn scan(
483        &self,
484        key: &str,
485        from: SeqNo,
486        limit: usize,
487    ) -> Result<Vec<VersionedData>, ExternalError> {
488        let backing = self.clone_backing();
489        let key = key.to_owned();
490        mz_ore::task::spawn(
491            || "persist::task::scan",
492            async move { backing.scan(&key, from, limit).await }.instrument(Span::current()),
493        )
494        .await
495    }
496
497    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
498        let backing = self.clone_backing();
499        let key = key.to_owned();
500        mz_ore::task::spawn(
501            || "persist::task::truncate",
502            async move { backing.truncate(&key, seqno).await }.instrument(Span::current()),
503        )
504        .await
505    }
506}
507
508/// Metadata about a particular blob stored by persist
509#[derive(Debug)]
510pub struct BlobMetadata<'a> {
511    /// The key for the blob
512    pub key: &'a str,
513    /// Size of the blob
514    pub size_in_bytes: u64,
515}
516
517/// A key usable for liveness checks via [Blob::get].
518pub const BLOB_GET_LIVENESS_KEY: &str = "LIVENESS";
519
520/// An abstraction over read-write access to a `bytes key`->`bytes value` store.
521///
522/// Implementations are required to be _linearizable_.
523///
524/// TODO: Consider whether this can be relaxed. Since our usage is write-once
525/// modify-never, it certainly seems like we could by adding retries around
526/// `get` to wait for a non-linearizable `set` to show up. However, the tricky
527/// bit comes once we stop handing out seqno capabilities to readers and have to
528/// start reasoning about "this set hasn't show up yet" vs "the blob has already
529/// been deleted". Another tricky problem is the same but for a deletion when
530/// the first attempt timed out.
531#[async_trait]
532pub trait Blob: std::fmt::Debug + Send + Sync {
533    /// Returns a reference to the value corresponding to the key.
534    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError>;
535
536    /// List all of the keys in the map with metadata about the entry.
537    ///
538    /// Can be optionally restricted to only list keys starting with a
539    /// given prefix.
540    async fn list_keys_and_metadata(
541        &self,
542        key_prefix: &str,
543        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
544    ) -> Result<(), ExternalError>;
545
546    /// Inserts a key-value pair into the map.
547    ///
548    /// Writes must be atomic and either succeed or leave the previous value
549    /// intact.
550    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError>;
551
552    /// Remove a key from the map.
553    ///
554    /// Returns Some and the size of the deleted blob if if exists. Succeeds and
555    /// returns None if it does not exist.
556    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError>;
557
558    /// Restores a previously-deleted key to the map, if possible.
559    ///
560    /// Returns successfully if the key exists after this call: perhaps because it already existed
561    /// or was restored. (In particular, this makes restore idempotent.)
562    /// Fails if we were unable to restore any value for that key:
563    /// perhaps the key was never written, or was permanently deleted.
564    ///
565    /// It is acceptable for [Blob::restore] to be unable
566    /// to restore keys, in which case this method should succeed iff the key exists.
567    async fn restore(&self, key: &str) -> Result<(), ExternalError>;
568}
569
570#[async_trait]
571impl<A: Blob + 'static> Blob for Tasked<A> {
572    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
573        let backing = self.clone_backing();
574        let key = key.to_owned();
575        mz_ore::task::spawn(
576            || "persist::task::get",
577            async move { backing.get(&key).await }.instrument(Span::current()),
578        )
579        .await
580    }
581
582    /// List all of the keys in the map with metadata about the entry.
583    ///
584    /// Can be optionally restricted to only list keys starting with a
585    /// given prefix.
586    async fn list_keys_and_metadata(
587        &self,
588        key_prefix: &str,
589        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
590    ) -> Result<(), ExternalError> {
591        // TODO: No good way that I can see to make this one a task because of
592        // the closure and Blob needing to be object-safe.
593        self.0.list_keys_and_metadata(key_prefix, f).await
594    }
595
596    /// Inserts a key-value pair into the map.
597    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
598        let backing = self.clone_backing();
599        let key = key.to_owned();
600        mz_ore::task::spawn(
601            || "persist::task::set",
602            async move { backing.set(&key, value).await }.instrument(Span::current()),
603        )
604        .await
605    }
606
607    /// Remove a key from the map.
608    ///
609    /// Returns Some and the size of the deleted blob if if exists. Succeeds and
610    /// returns None if it does not exist.
611    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
612        let backing = self.clone_backing();
613        let key = key.to_owned();
614        mz_ore::task::spawn(
615            || "persist::task::delete",
616            async move { backing.delete(&key).await }.instrument(Span::current()),
617        )
618        .await
619    }
620
621    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
622        let backing = self.clone_backing();
623        let key = key.to_owned();
624        mz_ore::task::spawn(
625            || "persist::task::restore",
626            async move { backing.restore(&key).await }.instrument(Span::current()),
627        )
628        .await
629    }
630}
631
632/// Test helpers for the crate.
633#[cfg(test)]
634pub mod tests {
635    use std::future::Future;
636
637    use anyhow::anyhow;
638    use futures_util::TryStreamExt;
639    use mz_ore::{assert_err, assert_ok};
640    use uuid::Uuid;
641
642    use crate::location::Blob;
643
644    use super::*;
645
646    fn keys(baseline: &[String], new: &[&str]) -> Vec<String> {
647        let mut ret = baseline.to_vec();
648        ret.extend(new.iter().map(|x| x.to_string()));
649        ret.sort();
650        ret
651    }
652
653    async fn get_keys(b: &impl Blob) -> Result<Vec<String>, ExternalError> {
654        let mut keys = vec![];
655        b.list_keys_and_metadata("", &mut |entry| keys.push(entry.key.to_string()))
656            .await?;
657        Ok(keys)
658    }
659
660    async fn get_keys_with_prefix(
661        b: &impl Blob,
662        prefix: &str,
663    ) -> Result<Vec<String>, ExternalError> {
664        let mut keys = vec![];
665        b.list_keys_and_metadata(prefix, &mut |entry| keys.push(entry.key.to_string()))
666            .await?;
667        Ok(keys)
668    }
669
670    /// Common test impl for different blob implementations.
671    pub async fn blob_impl_test<
672        B: Blob,
673        F: Future<Output = Result<B, ExternalError>>,
674        NewFn: Fn(&'static str) -> F,
675    >(
676        new_fn: NewFn,
677    ) -> Result<(), ExternalError> {
678        let values = ["v0".as_bytes().to_vec(), "v1".as_bytes().to_vec()];
679
680        let blob0 = new_fn("path0").await?;
681
682        // We can create a second blob writing to a different place.
683        let _ = new_fn("path1").await?;
684
685        // We can open two blobs to the same place, even.
686        let blob1 = new_fn("path0").await?;
687
688        let k0 = "foo/bar/k0";
689
690        // Empty key is empty.
691        assert_eq!(blob0.get(k0).await?, None);
692        assert_eq!(blob1.get(k0).await?, None);
693
694        // Empty list keys is empty.
695        let empty_keys = get_keys(&blob0).await?;
696        assert_eq!(empty_keys, Vec::<String>::new());
697        let empty_keys = get_keys(&blob1).await?;
698        assert_eq!(empty_keys, Vec::<String>::new());
699
700        // Set a key and get it back.
701        blob0.set(k0, values[0].clone().into()).await?;
702        assert_eq!(
703            blob0.get(k0).await?.map(|s| s.into_contiguous()),
704            Some(values[0].clone())
705        );
706        assert_eq!(
707            blob1.get(k0).await?.map(|s| s.into_contiguous()),
708            Some(values[0].clone())
709        );
710
711        // Set another key and get it back.
712        blob0.set("k0a", values[0].clone().into()).await?;
713        assert_eq!(
714            blob0.get("k0a").await?.map(|s| s.into_contiguous()),
715            Some(values[0].clone())
716        );
717        assert_eq!(
718            blob1.get("k0a").await?.map(|s| s.into_contiguous()),
719            Some(values[0].clone())
720        );
721
722        // Blob contains the key we just inserted.
723        let mut blob_keys = get_keys(&blob0).await?;
724        blob_keys.sort();
725        assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
726        let mut blob_keys = get_keys(&blob1).await?;
727        blob_keys.sort();
728        assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
729
730        // Can overwrite a key.
731        blob0.set(k0, values[1].clone().into()).await?;
732        assert_eq!(
733            blob0.get(k0).await?.map(|s| s.into_contiguous()),
734            Some(values[1].clone())
735        );
736        assert_eq!(
737            blob1.get(k0).await?.map(|s| s.into_contiguous()),
738            Some(values[1].clone())
739        );
740        // Can overwrite another key.
741        blob0.set("k0a", values[1].clone().into()).await?;
742        assert_eq!(
743            blob0.get("k0a").await?.map(|s| s.into_contiguous()),
744            Some(values[1].clone())
745        );
746        assert_eq!(
747            blob1.get("k0a").await?.map(|s| s.into_contiguous()),
748            Some(values[1].clone())
749        );
750
751        // Can delete a key.
752        assert_eq!(blob0.delete(k0).await, Ok(Some(2)));
753        // Can no longer get a deleted key.
754        assert_eq!(blob0.get(k0).await?, None);
755        assert_eq!(blob1.get(k0).await?, None);
756        // Double deleting a key succeeds but indicates that it did no work.
757        assert_eq!(blob0.delete(k0).await, Ok(None));
758        // Deleting a key that does not exist succeeds.
759        assert_eq!(blob0.delete("nope").await, Ok(None));
760        // Deleting a key with an empty value indicates it did work but deleted
761        // no bytes.
762        blob0.set("empty", Bytes::new()).await?;
763        assert_eq!(blob0.delete("empty").await, Ok(Some(0)));
764
765        // Attempt to restore a key. Not all backends will be able to restore, but
766        // we can confirm that our data is visible iff restore reported success.
767        blob0.set("undelete", Bytes::from("data")).await?;
768        // Restoring should always succeed when the key exists.
769        blob0.restore("undelete").await?;
770        assert_eq!(blob0.delete("undelete").await?, Some("data".len()));
771        let expected = match blob0.restore("undelete").await {
772            Ok(()) => Some(Bytes::from("data").into()),
773            Err(ExternalError::Determinate(_)) => None,
774            Err(other) => return Err(other),
775        };
776        assert_eq!(blob0.get("undelete").await?, expected);
777        blob0.delete("undelete").await?;
778
779        // Empty blob contains no keys.
780        blob0.delete("k0a").await?;
781        let mut blob_keys = get_keys(&blob0).await?;
782        blob_keys.sort();
783        assert_eq!(blob_keys, empty_keys);
784        let mut blob_keys = get_keys(&blob1).await?;
785        blob_keys.sort();
786        assert_eq!(blob_keys, empty_keys);
787        // Can reset a deleted key to some other value.
788        blob0.set(k0, values[1].clone().into()).await?;
789        assert_eq!(
790            blob1.get(k0).await?.map(|s| s.into_contiguous()),
791            Some(values[1].clone())
792        );
793        assert_eq!(
794            blob0.get(k0).await?.map(|s| s.into_contiguous()),
795            Some(values[1].clone())
796        );
797
798        // Insert multiple keys back to back and validate that we can list
799        // them all out.
800        let mut expected_keys = empty_keys;
801        for i in 1..=5 {
802            let key = format!("k{}", i);
803            blob0.set(&key, values[0].clone().into()).await?;
804            expected_keys.push(key);
805        }
806
807        // Blob contains the key we just inserted.
808        let mut blob_keys = get_keys(&blob0).await?;
809        blob_keys.sort();
810        assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
811        let mut blob_keys = get_keys(&blob1).await?;
812        blob_keys.sort();
813        assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
814
815        // Insert multiple keys with a different prefix and validate that we can
816        // list out keys by their prefix
817        let mut expected_prefix_keys = vec![];
818        for i in 1..=3 {
819            let key = format!("k-prefix-{}", i);
820            blob0.set(&key, values[0].clone().into()).await?;
821            expected_prefix_keys.push(key);
822        }
823        let mut blob_keys = get_keys_with_prefix(&blob0, "k-prefix").await?;
824        blob_keys.sort();
825        assert_eq!(blob_keys, expected_prefix_keys);
826        let mut blob_keys = get_keys_with_prefix(&blob0, "k").await?;
827        blob_keys.sort();
828        expected_keys.extend(expected_prefix_keys);
829        expected_keys.sort();
830        assert_eq!(blob_keys, expected_keys);
831
832        // We can open a new blob to the same path and use it.
833        let blob3 = new_fn("path0").await?;
834        assert_eq!(
835            blob3.get(k0).await?.map(|s| s.into_contiguous()),
836            Some(values[1].clone())
837        );
838
839        Ok(())
840    }
841
842    /// Common test impl for different consensus implementations.
843    pub async fn consensus_impl_test<
844        C: Consensus,
845        F: Future<Output = Result<C, ExternalError>>,
846        NewFn: FnMut() -> F,
847    >(
848        mut new_fn: NewFn,
849    ) -> Result<(), ExternalError> {
850        let consensus = new_fn().await?;
851
852        // Use a random key so independent runs of this test don't interfere
853        // with each other.
854        let key = Uuid::new_v4().to_string();
855
856        // Starting value of consensus data is None.
857        assert_eq!(consensus.head(&key).await, Ok(None));
858
859        // Can scan a key that has no data.
860        assert_eq!(consensus.scan(&key, SeqNo(0), SCAN_ALL).await, Ok(vec![]));
861
862        // Cannot truncate data from a key that doesn't have any data
863        assert_err!(consensus.truncate(&key, SeqNo(0)).await);
864
865        let state = VersionedData {
866            seqno: SeqNo(5),
867            data: Bytes::from("abc"),
868        };
869
870        // Incorrectly setting the data with a non-None expected should fail.
871        assert_eq!(
872            consensus
873                .compare_and_set(&key, Some(SeqNo(0)), state.clone())
874                .await,
875            Ok(CaSResult::ExpectationMismatch),
876        );
877
878        // Correctly updating the state with the correct expected value should succeed.
879        assert_eq!(
880            consensus.compare_and_set(&key, None, state.clone()).await,
881            Ok(CaSResult::Committed),
882        );
883
884        // The new key is visible in state.
885        let keys: Vec<_> = consensus.list_keys().try_collect().await?;
886        assert_eq!(keys, vec![key.to_owned()]);
887
888        // We can observe the a recent value on successful update.
889        assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
890
891        // Can scan a key that has data with a lower bound sequence number < head.
892        assert_eq!(
893            consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
894            Ok(vec![state.clone()])
895        );
896
897        // Can scan a key that has data with a lower bound sequence number == head.
898        assert_eq!(
899            consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
900            Ok(vec![state.clone()])
901        );
902
903        // Can scan a key that has data with a lower bound sequence number >
904        // head.
905        assert_eq!(consensus.scan(&key, SeqNo(6), SCAN_ALL).await, Ok(vec![]));
906
907        // Can truncate data with an upper bound <= head, even if there is no data in the
908        // range [0, upper).
909        assert_ok!(consensus.truncate(&key, SeqNo(0)).await);
910        assert_ok!(consensus.truncate(&key, SeqNo(5)).await);
911
912        // Cannot truncate data with an upper bound > head.
913        assert_err!(consensus.truncate(&key, SeqNo(6)).await);
914
915        let new_state = VersionedData {
916            seqno: SeqNo(10),
917            data: Bytes::from("def"),
918        };
919
920        // Trying to update without the correct expected seqno fails, (even if expected > current)
921        assert_eq!(
922            consensus
923                .compare_and_set(&key, Some(SeqNo(7)), new_state.clone())
924                .await,
925            Ok(CaSResult::ExpectationMismatch),
926        );
927
928        // Trying to update without the correct expected seqno fails, (even if expected < current)
929        assert_eq!(
930            consensus
931                .compare_and_set(&key, Some(SeqNo(3)), new_state.clone())
932                .await,
933            Ok(CaSResult::ExpectationMismatch),
934        );
935
936        let invalid_constant_seqno = VersionedData {
937            seqno: SeqNo(5),
938            data: Bytes::from("invalid"),
939        };
940
941        // Trying to set the data to a sequence number == current fails even if
942        // expected is correct.
943        assert_eq!(
944            consensus
945                .compare_and_set(&key, Some(state.seqno), invalid_constant_seqno)
946                .await,
947            Err(ExternalError::from(anyhow!(
948                "new seqno must be strictly greater than expected. Got new: SeqNo(5) expected: SeqNo(5)"
949            )))
950        );
951
952        let invalid_regressing_seqno = VersionedData {
953            seqno: SeqNo(3),
954            data: Bytes::from("invalid"),
955        };
956
957        // Trying to set the data to a sequence number < current fails even if
958        // expected is correct.
959        assert_eq!(
960            consensus
961                .compare_and_set(&key, Some(state.seqno), invalid_regressing_seqno)
962                .await,
963            Err(ExternalError::from(anyhow!(
964                "new seqno must be strictly greater than expected. Got new: SeqNo(3) expected: SeqNo(5)"
965            )))
966        );
967
968        // Can correctly update to a new state if we provide the right expected seqno
969        assert_eq!(
970            consensus
971                .compare_and_set(&key, Some(state.seqno), new_state.clone())
972                .await,
973            Ok(CaSResult::Committed),
974        );
975
976        // We can observe the a recent value on successful update.
977        assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
978
979        // We can observe both states in the correct order with scan if pass
980        // in a suitable lower bound.
981        assert_eq!(
982            consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
983            Ok(vec![state.clone(), new_state.clone()])
984        );
985
986        // We can observe only the most recent state if the lower bound is higher
987        // than the previous insertion's sequence number.
988        assert_eq!(
989            consensus.scan(&key, SeqNo(6), SCAN_ALL).await,
990            Ok(vec![new_state.clone()])
991        );
992
993        // We can still observe the most recent insert as long as the provided
994        // lower bound == most recent 's sequence number.
995        assert_eq!(
996            consensus.scan(&key, SeqNo(10), SCAN_ALL).await,
997            Ok(vec![new_state.clone()])
998        );
999
1000        // We can scan if the provided lower bound > head's sequence number.
1001        assert_eq!(consensus.scan(&key, SeqNo(11), SCAN_ALL).await, Ok(vec![]));
1002
1003        // We can scan with limits that don't cover all states
1004        assert_eq!(
1005            consensus.scan(&key, SeqNo::minimum(), 1).await,
1006            Ok(vec![state.clone()])
1007        );
1008        assert_eq!(
1009            consensus.scan(&key, SeqNo(5), 1).await,
1010            Ok(vec![state.clone()])
1011        );
1012
1013        // We can scan with limits to cover exactly the number of states
1014        assert_eq!(
1015            consensus.scan(&key, SeqNo::minimum(), 2).await,
1016            Ok(vec![state.clone(), new_state.clone()])
1017        );
1018
1019        // We can scan with a limit larger than the number of states
1020        assert_eq!(
1021            consensus.scan(&key, SeqNo(4), 100).await,
1022            Ok(vec![state.clone(), new_state.clone()])
1023        );
1024
1025        // Can remove the previous write with the appropriate truncation.
1026        assert_ok!(consensus.truncate(&key, SeqNo(6)).await);
1027
1028        // Verify that the old write is indeed deleted.
1029        assert_eq!(
1030            consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
1031            Ok(vec![new_state.clone()])
1032        );
1033
1034        // Truncate is idempotent and can be repeated. The return value
1035        // indicates we didn't do any work though.
1036        assert_ok!(consensus.truncate(&key, SeqNo(6)).await);
1037
1038        // Make sure entries under different keys don't clash.
1039        let other_key = Uuid::new_v4().to_string();
1040
1041        assert_eq!(consensus.head(&other_key).await, Ok(None));
1042
1043        let state = VersionedData {
1044            seqno: SeqNo(1),
1045            data: Bytes::from("einszweidrei"),
1046        };
1047
1048        assert_eq!(
1049            consensus
1050                .compare_and_set(&other_key, None, state.clone())
1051                .await,
1052            Ok(CaSResult::Committed),
1053        );
1054
1055        assert_eq!(consensus.head(&other_key).await, Ok(Some(state.clone())));
1056
1057        // State for the first key is still as expected.
1058        assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
1059
1060        // Trying to update from a stale version of current doesn't work.
1061        let invalid_jump_forward = VersionedData {
1062            seqno: SeqNo(11),
1063            data: Bytes::from("invalid"),
1064        };
1065        assert_eq!(
1066            consensus
1067                .compare_and_set(&key, Some(state.seqno), invalid_jump_forward)
1068                .await,
1069            Ok(CaSResult::ExpectationMismatch),
1070        );
1071
1072        // Writing a large (~10 KiB) amount of data works fine.
1073        let large_state = VersionedData {
1074            seqno: SeqNo(11),
1075            data: std::iter::repeat(b'a').take(10240).collect(),
1076        };
1077        assert_eq!(
1078            consensus
1079                .compare_and_set(&key, Some(new_state.seqno), large_state)
1080                .await,
1081            Ok(CaSResult::Committed),
1082        );
1083
1084        // Truncate can delete more than one version at a time.
1085        let v12 = VersionedData {
1086            seqno: SeqNo(12),
1087            data: Bytes::new(),
1088        };
1089        assert_eq!(
1090            consensus.compare_and_set(&key, Some(SeqNo(11)), v12).await,
1091            Ok(CaSResult::Committed),
1092        );
1093        assert_ok!(consensus.truncate(&key, SeqNo(12)).await);
1094
1095        // Sequence numbers used within Consensus have to be within [0, i64::MAX].
1096
1097        assert_eq!(
1098            consensus
1099                .compare_and_set(
1100                    &Uuid::new_v4().to_string(),
1101                    None,
1102                    VersionedData {
1103                        seqno: SeqNo(0),
1104                        data: Bytes::new(),
1105                    }
1106                )
1107                .await,
1108            Ok(CaSResult::Committed),
1109        );
1110        assert_eq!(
1111            consensus
1112                .compare_and_set(
1113                    &Uuid::new_v4().to_string(),
1114                    None,
1115                    VersionedData {
1116                        seqno: SeqNo(i64::MAX.try_into().expect("i64::MAX fits in u64")),
1117                        data: Bytes::new(),
1118                    }
1119                )
1120                .await,
1121            Ok(CaSResult::Committed),
1122        );
1123        assert_err!(
1124            consensus
1125                .compare_and_set(
1126                    &Uuid::new_v4().to_string(),
1127                    None,
1128                    VersionedData {
1129                        seqno: SeqNo(1 << 63),
1130                        data: Bytes::new(),
1131                    }
1132                )
1133                .await
1134        );
1135        assert_err!(
1136            consensus
1137                .compare_and_set(
1138                    &Uuid::new_v4().to_string(),
1139                    None,
1140                    VersionedData {
1141                        seqno: SeqNo(u64::MAX),
1142                        data: Bytes::new(),
1143                    }
1144                )
1145                .await
1146        );
1147
1148        Ok(())
1149    }
1150
1151    #[mz_ore::test]
1152    fn timeout_error() {
1153        assert!(ExternalError::new_timeout(Instant::now()).is_timeout());
1154        assert!(!ExternalError::from(anyhow!("foo")).is_timeout());
1155    }
1156}