1use std::fmt;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use azure_core::StatusCode;
20use bytes::Bytes;
21use futures_util::Stream;
22use mz_ore::bytes::SegmentedBytes;
23use mz_ore::cast::u64_to_usize;
24use mz_postgres_client::error::PostgresError;
25use mz_proto::RustType;
26use proptest_derive::Arbitrary;
27use serde::{Deserialize, Serialize};
28use tracing::{Instrument, Span};
29
30use crate::error::Error;
31
32#[derive(
46 Arbitrary,
47 Clone,
48 Copy,
49 Debug,
50 PartialOrd,
51 Ord,
52 PartialEq,
53 Eq,
54 Hash,
55 Serialize,
56 Deserialize
57)]
58pub struct SeqNo(pub u64);
59
60impl std::fmt::Display for SeqNo {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 write!(f, "v{}", self.0)
63 }
64}
65
66impl timely::PartialOrder for SeqNo {
67 fn less_equal(&self, other: &Self) -> bool {
68 self <= other
69 }
70}
71
72impl std::str::FromStr for SeqNo {
73 type Err = String;
74
75 fn from_str(encoded: &str) -> Result<Self, Self::Err> {
76 let encoded = match encoded.strip_prefix('v') {
77 Some(x) => x,
78 None => return Err(format!("invalid SeqNo {}: incorrect prefix", encoded)),
79 };
80 let seqno =
81 u64::from_str(encoded).map_err(|err| format!("invalid SeqNo {}: {}", encoded, err))?;
82 Ok(SeqNo(seqno))
83 }
84}
85
86impl SeqNo {
87 pub fn previous(self) -> Option<SeqNo> {
89 Some(SeqNo(self.0.checked_sub(1)?))
90 }
91
92 pub fn next(self) -> SeqNo {
94 SeqNo(self.0 + 1)
95 }
96
97 pub fn minimum() -> Self {
99 SeqNo(0)
100 }
101
102 pub fn maximum() -> Self {
104 SeqNo(u64::MAX)
105 }
106}
107
108impl RustType<u64> for SeqNo {
109 fn into_proto(&self) -> u64 {
110 self.0
111 }
112
113 fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
114 Ok(SeqNo(proto))
115 }
116}
117
118#[derive(Debug)]
121pub struct Determinate {
122 inner: anyhow::Error,
123}
124
125impl std::fmt::Display for Determinate {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 write!(f, "determinate: ")?;
128 self.inner.fmt(f)
129 }
130}
131
132impl std::error::Error for Determinate {
133 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
134 self.inner.source()
135 }
136}
137
138impl From<anyhow::Error> for Determinate {
139 fn from(inner: anyhow::Error) -> Self {
140 Self::new(inner)
141 }
142}
143
144impl Determinate {
145 pub fn new(inner: anyhow::Error) -> Self {
149 Determinate { inner }
150 }
151}
152
153#[derive(Debug)]
156pub struct Indeterminate {
157 pub(crate) inner: anyhow::Error,
158}
159
160impl Indeterminate {
161 pub fn new(inner: anyhow::Error) -> Self {
165 Indeterminate { inner }
166 }
167}
168
169impl std::fmt::Display for Indeterminate {
170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171 write!(f, "indeterminate: ")?;
172 self.inner.fmt(f)
173 }
174}
175
176impl std::error::Error for Indeterminate {
177 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
178 self.inner.source()
179 }
180}
181
182#[cfg(any(test, debug_assertions))]
184impl PartialEq for Indeterminate {
185 fn eq(&self, other: &Self) -> bool {
186 self.to_string() == other.to_string()
187 }
188}
189
190#[derive(Debug)]
193pub enum ExternalError {
194 Determinate(Determinate),
196 Indeterminate(Indeterminate),
198}
199
200impl ExternalError {
201 #[track_caller]
206 pub fn new_timeout(deadline: Instant) -> Self {
207 ExternalError::Indeterminate(Indeterminate {
208 inner: anyhow!("timeout at {:?}", deadline),
209 })
210 }
211
212 pub fn is_timeout(&self) -> bool {
217 self.to_string().contains("timeout")
219 }
220}
221
222impl std::fmt::Display for ExternalError {
223 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
224 match self {
225 ExternalError::Determinate(x) => std::fmt::Display::fmt(x, f),
226 ExternalError::Indeterminate(x) => std::fmt::Display::fmt(x, f),
227 }
228 }
229}
230
231impl std::error::Error for ExternalError {
232 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
233 match self {
234 ExternalError::Determinate(e) => e.source(),
235 ExternalError::Indeterminate(e) => e.source(),
236 }
237 }
238}
239
240#[cfg(any(test, debug_assertions))]
242impl PartialEq for ExternalError {
243 fn eq(&self, other: &Self) -> bool {
244 self.to_string() == other.to_string()
245 }
246}
247
248impl From<PostgresError> for ExternalError {
249 fn from(x: PostgresError) -> Self {
250 match x {
251 PostgresError::Determinate(e) => ExternalError::Determinate(Determinate::new(e)),
252 PostgresError::Indeterminate(e) => ExternalError::Indeterminate(Indeterminate::new(e)),
253 }
254 }
255}
256
257impl From<Indeterminate> for ExternalError {
258 fn from(x: Indeterminate) -> Self {
259 ExternalError::Indeterminate(x)
260 }
261}
262
263impl From<Determinate> for ExternalError {
264 fn from(x: Determinate) -> Self {
265 ExternalError::Determinate(x)
266 }
267}
268
269impl From<anyhow::Error> for ExternalError {
270 fn from(inner: anyhow::Error) -> Self {
271 ExternalError::Indeterminate(Indeterminate { inner })
272 }
273}
274
275impl From<Error> for ExternalError {
276 fn from(x: Error) -> Self {
277 ExternalError::Indeterminate(Indeterminate {
278 inner: anyhow::Error::new(x),
279 })
280 }
281}
282
283impl From<std::io::Error> for ExternalError {
284 fn from(x: std::io::Error) -> Self {
285 ExternalError::Indeterminate(Indeterminate {
286 inner: anyhow::Error::new(x),
287 })
288 }
289}
290
291impl From<deadpool_postgres::tokio_postgres::Error> for ExternalError {
292 fn from(e: deadpool_postgres::tokio_postgres::Error) -> Self {
293 let code = match e.as_db_error().map(|x| x.code()) {
294 Some(x) => x,
295 None => {
296 return ExternalError::Indeterminate(Indeterminate {
297 inner: anyhow::Error::new(e),
298 });
299 }
300 };
301 match code {
302 &deadpool_postgres::tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE => {
305 ExternalError::Determinate(Determinate {
306 inner: anyhow::Error::new(e),
307 })
308 }
309 _ => ExternalError::Indeterminate(Indeterminate {
310 inner: anyhow::Error::new(e),
311 }),
312 }
313 }
314}
315
316impl From<azure_core::Error> for ExternalError {
317 fn from(value: azure_core::Error) -> Self {
318 let definitely_determinate = if let Some(http) = value.as_http_error() {
319 match http.status() {
320 StatusCode::TooManyRequests => true,
323 _ => false,
324 }
325 } else {
326 false
327 };
328 if definitely_determinate {
329 ExternalError::Determinate(Determinate {
330 inner: anyhow!(value),
331 })
332 } else {
333 ExternalError::Indeterminate(Indeterminate {
334 inner: anyhow!(value),
335 })
336 }
337 }
338}
339
340impl From<deadpool_postgres::PoolError> for ExternalError {
341 fn from(x: deadpool_postgres::PoolError) -> Self {
342 match x {
343 deadpool_postgres::PoolError::Backend(x) => ExternalError::from(x),
346 x => ExternalError::Indeterminate(Indeterminate {
347 inner: anyhow::Error::new(x),
348 }),
349 }
350 }
351}
352
353impl From<tokio::task::JoinError> for ExternalError {
354 fn from(x: tokio::task::JoinError) -> Self {
355 ExternalError::Indeterminate(Indeterminate {
356 inner: anyhow::Error::new(x),
357 })
358 }
359}
360
361#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
364pub struct VersionedData {
365 pub seqno: SeqNo,
367 pub data: Bytes,
369}
370
371#[allow(clippy::as_conversions)]
375pub const SCAN_ALL: usize = u64_to_usize(i64::MAX as u64);
376
377pub const CONSENSUS_HEAD_LIVENESS_KEY: &str = "LIVENESS";
379
380#[derive(Debug, PartialEq, Serialize, Deserialize)]
382pub enum CaSResult {
383 Committed,
385 ExpectationMismatch,
387}
388
389#[derive(Debug)]
393pub struct Tasked<A>(pub Arc<A>);
394
395impl<A> Tasked<A> {
396 fn clone_backing(&self) -> Arc<A> {
397 Arc::clone(&self.0)
398 }
399}
400
401pub type ResultStream<'a, T> = Pin<Box<dyn Stream<Item = Result<T, ExternalError>> + Send + 'a>>;
404
405#[async_trait]
414pub trait Consensus: std::fmt::Debug + Send + Sync {
415 fn list_keys(&self) -> ResultStream<'_, String>;
417
418 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError>;
421
422 async fn compare_and_set(
427 &self,
428 key: &str,
429 new: VersionedData,
430 ) -> Result<CaSResult, ExternalError>;
431
432 async fn scan(
438 &self,
439 key: &str,
440 from: SeqNo,
441 limit: usize,
442 ) -> Result<Vec<VersionedData>, ExternalError>;
443
444 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError>;
451}
452
453#[async_trait]
454impl<A: Consensus + 'static> Consensus for Tasked<A> {
455 fn list_keys(&self) -> ResultStream<'_, String> {
456 self.0.list_keys()
462 }
463
464 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
465 let backing = self.clone_backing();
466 let key = key.to_owned();
467 mz_ore::task::spawn(
468 || "persist::task::head",
469 async move { backing.head(&key).await }.instrument(Span::current()),
470 )
471 .await
472 }
473
474 async fn compare_and_set(
475 &self,
476 key: &str,
477 new: VersionedData,
478 ) -> Result<CaSResult, ExternalError> {
479 let backing = self.clone_backing();
480 let key = key.to_owned();
481 mz_ore::task::spawn(
482 || "persist::task::cas",
483 async move { backing.compare_and_set(&key, new).await }.instrument(Span::current()),
484 )
485 .await
486 }
487
488 async fn scan(
489 &self,
490 key: &str,
491 from: SeqNo,
492 limit: usize,
493 ) -> Result<Vec<VersionedData>, ExternalError> {
494 let backing = self.clone_backing();
495 let key = key.to_owned();
496 mz_ore::task::spawn(
497 || "persist::task::scan",
498 async move { backing.scan(&key, from, limit).await }.instrument(Span::current()),
499 )
500 .await
501 }
502
503 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
504 let backing = self.clone_backing();
505 let key = key.to_owned();
506 mz_ore::task::spawn(
507 || "persist::task::truncate",
508 async move { backing.truncate(&key, seqno).await }.instrument(Span::current()),
509 )
510 .await
511 }
512}
513
514#[derive(Debug)]
516pub struct BlobMetadata<'a> {
517 pub key: &'a str,
519 pub size_in_bytes: u64,
521}
522
523pub const BLOB_GET_LIVENESS_KEY: &str = "LIVENESS";
525
526#[async_trait]
538pub trait Blob: std::fmt::Debug + Send + Sync {
539 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError>;
541
542 async fn list_keys_and_metadata(
547 &self,
548 key_prefix: &str,
549 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
550 ) -> Result<(), ExternalError>;
551
552 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError>;
557
558 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError>;
563
564 async fn restore(&self, key: &str) -> Result<(), ExternalError>;
574}
575
576#[async_trait]
577impl<A: Blob + 'static> Blob for Tasked<A> {
578 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
579 let backing = self.clone_backing();
580 let key = key.to_owned();
581 mz_ore::task::spawn(
582 || "persist::task::get",
583 async move { backing.get(&key).await }.instrument(Span::current()),
584 )
585 .await
586 }
587
588 async fn list_keys_and_metadata(
593 &self,
594 key_prefix: &str,
595 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
596 ) -> Result<(), ExternalError> {
597 self.0.list_keys_and_metadata(key_prefix, f).await
600 }
601
602 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
604 let backing = self.clone_backing();
605 let key = key.to_owned();
606 mz_ore::task::spawn(
607 || "persist::task::set",
608 async move { backing.set(&key, value).await }.instrument(Span::current()),
609 )
610 .await
611 }
612
613 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
618 let backing = self.clone_backing();
619 let key = key.to_owned();
620 mz_ore::task::spawn(
621 || "persist::task::delete",
622 async move { backing.delete(&key).await }.instrument(Span::current()),
623 )
624 .await
625 }
626
627 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
628 let backing = self.clone_backing();
629 let key = key.to_owned();
630 mz_ore::task::spawn(
631 || "persist::task::restore",
632 async move { backing.restore(&key).await }.instrument(Span::current()),
633 )
634 .await
635 }
636}
637
638#[cfg(test)]
640pub mod tests {
641 use std::future::Future;
642
643 use anyhow::anyhow;
644 use futures_util::TryStreamExt;
645 use mz_ore::{assert_err, assert_ok};
646 use uuid::Uuid;
647
648 use crate::location::Blob;
649
650 use super::*;
651
652 fn keys(baseline: &[String], new: &[&str]) -> Vec<String> {
653 let mut ret = baseline.to_vec();
654 ret.extend(new.iter().map(|x| x.to_string()));
655 ret.sort();
656 ret
657 }
658
659 async fn get_keys(b: &impl Blob) -> Result<Vec<String>, ExternalError> {
660 let mut keys = vec![];
661 b.list_keys_and_metadata("", &mut |entry| keys.push(entry.key.to_string()))
662 .await?;
663 Ok(keys)
664 }
665
666 async fn get_keys_with_prefix(
667 b: &impl Blob,
668 prefix: &str,
669 ) -> Result<Vec<String>, ExternalError> {
670 let mut keys = vec![];
671 b.list_keys_and_metadata(prefix, &mut |entry| keys.push(entry.key.to_string()))
672 .await?;
673 Ok(keys)
674 }
675
676 pub async fn blob_impl_test<
678 B: Blob,
679 F: Future<Output = Result<B, ExternalError>>,
680 NewFn: Fn(&'static str) -> F,
681 >(
682 new_fn: NewFn,
683 ) -> Result<(), ExternalError> {
684 let values = ["v0".as_bytes().to_vec(), "v1".as_bytes().to_vec()];
685
686 let blob0 = new_fn("path0").await?;
687
688 let _ = new_fn("path1").await?;
690
691 let blob1 = new_fn("path0").await?;
693
694 let k0 = "foo/bar/k0";
695
696 assert_eq!(blob0.get(k0).await?, None);
698 assert_eq!(blob1.get(k0).await?, None);
699
700 let empty_keys = get_keys(&blob0).await?;
702 assert_eq!(empty_keys, Vec::<String>::new());
703 let empty_keys = get_keys(&blob1).await?;
704 assert_eq!(empty_keys, Vec::<String>::new());
705
706 blob0.set(k0, values[0].clone().into()).await?;
708 assert_eq!(
709 blob0.get(k0).await?.map(|s| s.into_contiguous()),
710 Some(values[0].clone())
711 );
712 assert_eq!(
713 blob1.get(k0).await?.map(|s| s.into_contiguous()),
714 Some(values[0].clone())
715 );
716
717 blob0.set("k0a", values[0].clone().into()).await?;
719 assert_eq!(
720 blob0.get("k0a").await?.map(|s| s.into_contiguous()),
721 Some(values[0].clone())
722 );
723 assert_eq!(
724 blob1.get("k0a").await?.map(|s| s.into_contiguous()),
725 Some(values[0].clone())
726 );
727
728 let mut blob_keys = get_keys(&blob0).await?;
730 blob_keys.sort();
731 assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
732 let mut blob_keys = get_keys(&blob1).await?;
733 blob_keys.sort();
734 assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
735
736 blob0.set(k0, values[1].clone().into()).await?;
738 assert_eq!(
739 blob0.get(k0).await?.map(|s| s.into_contiguous()),
740 Some(values[1].clone())
741 );
742 assert_eq!(
743 blob1.get(k0).await?.map(|s| s.into_contiguous()),
744 Some(values[1].clone())
745 );
746 blob0.set("k0a", values[1].clone().into()).await?;
748 assert_eq!(
749 blob0.get("k0a").await?.map(|s| s.into_contiguous()),
750 Some(values[1].clone())
751 );
752 assert_eq!(
753 blob1.get("k0a").await?.map(|s| s.into_contiguous()),
754 Some(values[1].clone())
755 );
756
757 assert_eq!(blob0.delete(k0).await, Ok(Some(2)));
759 assert_eq!(blob0.get(k0).await?, None);
761 assert_eq!(blob1.get(k0).await?, None);
762 assert_eq!(blob0.delete(k0).await, Ok(None));
764 assert_eq!(blob0.delete("nope").await, Ok(None));
766 blob0.set("empty", Bytes::new()).await?;
769 assert_eq!(blob0.delete("empty").await, Ok(Some(0)));
770
771 blob0.set("undelete", Bytes::from("data")).await?;
774 blob0.restore("undelete").await?;
776 assert_eq!(blob0.delete("undelete").await?, Some("data".len()));
777 let expected = match blob0.restore("undelete").await {
778 Ok(()) => Some(Bytes::from("data").into()),
779 Err(ExternalError::Determinate(_)) => None,
780 Err(other) => return Err(other),
781 };
782 assert_eq!(blob0.get("undelete").await?, expected);
783 blob0.delete("undelete").await?;
784
785 blob0.delete("k0a").await?;
787 let mut blob_keys = get_keys(&blob0).await?;
788 blob_keys.sort();
789 assert_eq!(blob_keys, empty_keys);
790 let mut blob_keys = get_keys(&blob1).await?;
791 blob_keys.sort();
792 assert_eq!(blob_keys, empty_keys);
793 blob0.set(k0, values[1].clone().into()).await?;
795 assert_eq!(
796 blob1.get(k0).await?.map(|s| s.into_contiguous()),
797 Some(values[1].clone())
798 );
799 assert_eq!(
800 blob0.get(k0).await?.map(|s| s.into_contiguous()),
801 Some(values[1].clone())
802 );
803
804 let mut expected_keys = empty_keys;
807 for i in 1..=5 {
808 let key = format!("k{}", i);
809 blob0.set(&key, values[0].clone().into()).await?;
810 expected_keys.push(key);
811 }
812
813 let mut blob_keys = get_keys(&blob0).await?;
815 blob_keys.sort();
816 assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
817 let mut blob_keys = get_keys(&blob1).await?;
818 blob_keys.sort();
819 assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
820
821 let mut expected_prefix_keys = vec![];
824 for i in 1..=3 {
825 let key = format!("k-prefix-{}", i);
826 blob0.set(&key, values[0].clone().into()).await?;
827 expected_prefix_keys.push(key);
828 }
829 let mut blob_keys = get_keys_with_prefix(&blob0, "k-prefix").await?;
830 blob_keys.sort();
831 assert_eq!(blob_keys, expected_prefix_keys);
832 let mut blob_keys = get_keys_with_prefix(&blob0, "k").await?;
833 blob_keys.sort();
834 expected_keys.extend(expected_prefix_keys);
835 expected_keys.sort();
836 assert_eq!(blob_keys, expected_keys);
837
838 let blob3 = new_fn("path0").await?;
840 assert_eq!(
841 blob3.get(k0).await?.map(|s| s.into_contiguous()),
842 Some(values[1].clone())
843 );
844
845 Ok(())
846 }
847
848 pub async fn consensus_impl_test<
850 C: Consensus,
851 F: Future<Output = Result<C, ExternalError>>,
852 NewFn: FnMut() -> F,
853 >(
854 mut new_fn: NewFn,
855 ) -> Result<(), ExternalError> {
856 let consensus = new_fn().await?;
857
858 let key = Uuid::new_v4().to_string();
861
862 assert_eq!(consensus.head(&key).await, Ok(None));
864
865 assert_eq!(consensus.scan(&key, SeqNo(0), SCAN_ALL).await, Ok(vec![]));
867
868 assert_err!(consensus.truncate(&key, SeqNo(0)).await);
870
871 let state_at = |v| VersionedData {
872 seqno: SeqNo(v),
873 data: Bytes::from("abc"),
874 };
875
876 assert_eq!(
878 consensus.compare_and_set(&key, state_at(1)).await,
879 Ok(CaSResult::ExpectationMismatch),
880 );
881
882 assert_eq!(
884 consensus.compare_and_set(&key, state_at(0)).await,
885 Ok(CaSResult::Committed),
886 );
887
888 let keys: Vec<_> = consensus.list_keys().try_collect().await?;
890 assert_eq!(keys, vec![key.to_owned()]);
891
892 assert_eq!(consensus.head(&key).await, Ok(Some(state_at(0))));
894
895 assert_eq!(
897 consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
898 Ok(vec![state_at(0)])
899 );
900
901 assert_eq!(
903 consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
904 Ok(vec![state_at(0)])
905 );
906
907 assert_eq!(consensus.scan(&key, SeqNo(1), SCAN_ALL).await, Ok(vec![]));
910
911 assert_ok!(consensus.truncate(&key, SeqNo(0)).await);
914
915 assert_err!(consensus.truncate(&key, SeqNo(1)).await);
917
918 let new_state_at = |v| VersionedData {
919 seqno: SeqNo(v),
920 data: Bytes::from("def"),
921 };
922
923 assert_eq!(
925 consensus.compare_and_set(&key, new_state_at(3)).await,
926 Ok(CaSResult::ExpectationMismatch),
927 );
928
929 assert_eq!(
931 consensus.compare_and_set(&key, new_state_at(0)).await,
932 Ok(CaSResult::ExpectationMismatch),
933 );
934
935 assert_eq!(
937 consensus.compare_and_set(&key, new_state_at(1)).await,
938 Ok(CaSResult::Committed),
939 );
940
941 assert_eq!(consensus.head(&key).await, Ok(Some(new_state_at(1))));
943
944 assert_eq!(
947 consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
948 Ok(vec![state_at(0), new_state_at(1)])
949 );
950
951 assert_eq!(
954 consensus.scan(&key, SeqNo(1), SCAN_ALL).await,
955 Ok(vec![new_state_at(1)])
956 );
957
958 assert_eq!(consensus.scan(&key, SeqNo(2), SCAN_ALL).await, Ok(vec![]));
960
961 assert_eq!(
963 consensus.scan(&key, SeqNo::minimum(), 1).await,
964 Ok(vec![state_at(0)])
965 );
966
967 assert_eq!(
969 consensus.scan(&key, SeqNo::minimum(), 2).await,
970 Ok(vec![state_at(0), new_state_at(1)])
971 );
972
973 assert_eq!(
975 consensus.scan(&key, SeqNo(0), 100).await,
976 Ok(vec![state_at(0), new_state_at(1)])
977 );
978
979 assert_ok!(consensus.truncate(&key, SeqNo(1)).await);
981
982 assert_eq!(
984 consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
985 Ok(vec![new_state_at(1)])
986 );
987
988 assert_ok!(consensus.truncate(&key, SeqNo(1)).await);
991
992 let other_key = Uuid::new_v4().to_string();
994
995 assert_eq!(consensus.head(&other_key).await, Ok(None));
996
997 let state = VersionedData {
998 seqno: SeqNo(0),
999 data: Bytes::from("einszweidrei"),
1000 };
1001
1002 assert_eq!(
1003 consensus.compare_and_set(&other_key, state.clone()).await,
1004 Ok(CaSResult::Committed),
1005 );
1006
1007 assert_eq!(consensus.head(&other_key).await, Ok(Some(state.clone())));
1008
1009 assert_eq!(consensus.head(&key).await, Ok(Some(new_state_at(1))));
1011
1012 let invalid_jump_forward = VersionedData {
1014 seqno: SeqNo(11),
1015 data: Bytes::from("invalid"),
1016 };
1017 assert_eq!(
1018 consensus.compare_and_set(&key, invalid_jump_forward).await,
1019 Ok(CaSResult::ExpectationMismatch),
1020 );
1021
1022 let large_state = VersionedData {
1024 seqno: SeqNo(2),
1025 data: std::iter::repeat(b'a').take(10240).collect(),
1026 };
1027 assert_eq!(
1028 consensus.compare_and_set(&key, large_state).await,
1029 Ok(CaSResult::Committed),
1030 );
1031
1032 let v3 = VersionedData {
1034 seqno: SeqNo(3),
1035 data: Bytes::new(),
1036 };
1037 assert_eq!(
1038 consensus.compare_and_set(&key, v3).await,
1039 Ok(CaSResult::Committed),
1040 );
1041 assert_ok!(consensus.truncate(&key, SeqNo(3)).await);
1042
1043 Ok(())
1044 }
1045
1046 #[mz_ore::test]
1047 fn timeout_error() {
1048 assert!(ExternalError::new_timeout(Instant::now()).is_timeout());
1049 assert!(!ExternalError::from(anyhow!("foo")).is_timeout());
1050 }
1051}