1use std::fmt;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use azure_core::StatusCode;
20use bytes::Bytes;
21use futures_util::Stream;
22use mz_ore::bytes::SegmentedBytes;
23use mz_ore::cast::u64_to_usize;
24use mz_postgres_client::error::PostgresError;
25use mz_proto::RustType;
26use proptest_derive::Arbitrary;
27use serde::{Deserialize, Serialize};
28use tracing::{Instrument, Span};
29
30use crate::error::Error;
31
32#[derive(
46 Arbitrary, Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize,
47)]
48pub struct SeqNo(pub u64);
49
50impl std::fmt::Display for SeqNo {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 write!(f, "v{}", self.0)
53 }
54}
55
56impl timely::PartialOrder for SeqNo {
57 fn less_equal(&self, other: &Self) -> bool {
58 self <= other
59 }
60}
61
62impl std::str::FromStr for SeqNo {
63 type Err = String;
64
65 fn from_str(encoded: &str) -> Result<Self, Self::Err> {
66 let encoded = match encoded.strip_prefix('v') {
67 Some(x) => x,
68 None => return Err(format!("invalid SeqNo {}: incorrect prefix", encoded)),
69 };
70 let seqno =
71 u64::from_str(encoded).map_err(|err| format!("invalid SeqNo {}: {}", encoded, err))?;
72 Ok(SeqNo(seqno))
73 }
74}
75
76impl SeqNo {
77 pub fn next(self) -> SeqNo {
79 SeqNo(self.0 + 1)
80 }
81
82 pub fn minimum() -> Self {
84 SeqNo(0)
85 }
86}
87
88impl RustType<u64> for SeqNo {
89 fn into_proto(&self) -> u64 {
90 self.0
91 }
92
93 fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
94 Ok(SeqNo(proto))
95 }
96}
97
98#[derive(Debug)]
101pub struct Determinate {
102 inner: anyhow::Error,
103}
104
105impl std::fmt::Display for Determinate {
106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107 write!(f, "determinate: ")?;
108 self.inner.fmt(f)
109 }
110}
111
112impl std::error::Error for Determinate {
113 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
114 self.inner.source()
115 }
116}
117
118impl Determinate {
119 pub fn new(inner: anyhow::Error) -> Self {
123 Determinate { inner }
124 }
125}
126
127#[derive(Debug)]
130pub struct Indeterminate {
131 pub(crate) inner: anyhow::Error,
132}
133
134impl Indeterminate {
135 pub fn new(inner: anyhow::Error) -> Self {
139 Indeterminate { inner }
140 }
141}
142
143impl std::fmt::Display for Indeterminate {
144 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145 write!(f, "indeterminate: ")?;
146 self.inner.fmt(f)
147 }
148}
149
150impl std::error::Error for Indeterminate {
151 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
152 self.inner.source()
153 }
154}
155
156#[cfg(any(test, debug_assertions))]
158impl PartialEq for Indeterminate {
159 fn eq(&self, other: &Self) -> bool {
160 self.to_string() == other.to_string()
161 }
162}
163
164#[derive(Debug)]
167pub enum ExternalError {
168 Determinate(Determinate),
170 Indeterminate(Indeterminate),
172}
173
174impl ExternalError {
175 #[track_caller]
180 pub fn new_timeout(deadline: Instant) -> Self {
181 ExternalError::Indeterminate(Indeterminate {
182 inner: anyhow!("timeout at {:?}", deadline),
183 })
184 }
185
186 pub fn is_timeout(&self) -> bool {
191 self.to_string().contains("timeout")
193 }
194}
195
196impl std::fmt::Display for ExternalError {
197 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198 match self {
199 ExternalError::Determinate(x) => std::fmt::Display::fmt(x, f),
200 ExternalError::Indeterminate(x) => std::fmt::Display::fmt(x, f),
201 }
202 }
203}
204
205impl std::error::Error for ExternalError {
206 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
207 match self {
208 ExternalError::Determinate(e) => e.source(),
209 ExternalError::Indeterminate(e) => e.source(),
210 }
211 }
212}
213
214#[cfg(any(test, debug_assertions))]
216impl PartialEq for ExternalError {
217 fn eq(&self, other: &Self) -> bool {
218 self.to_string() == other.to_string()
219 }
220}
221
222impl From<PostgresError> for ExternalError {
223 fn from(x: PostgresError) -> Self {
224 match x {
225 PostgresError::Determinate(e) => ExternalError::Determinate(Determinate::new(e)),
226 PostgresError::Indeterminate(e) => ExternalError::Indeterminate(Indeterminate::new(e)),
227 }
228 }
229}
230
231impl From<Indeterminate> for ExternalError {
232 fn from(x: Indeterminate) -> Self {
233 ExternalError::Indeterminate(x)
234 }
235}
236
237impl From<Determinate> for ExternalError {
238 fn from(x: Determinate) -> Self {
239 ExternalError::Determinate(x)
240 }
241}
242
243impl From<anyhow::Error> for ExternalError {
244 fn from(inner: anyhow::Error) -> Self {
245 ExternalError::Indeterminate(Indeterminate { inner })
246 }
247}
248
249impl From<Error> for ExternalError {
250 fn from(x: Error) -> Self {
251 ExternalError::Indeterminate(Indeterminate {
252 inner: anyhow::Error::new(x),
253 })
254 }
255}
256
257impl From<std::io::Error> for ExternalError {
258 fn from(x: std::io::Error) -> Self {
259 ExternalError::Indeterminate(Indeterminate {
260 inner: anyhow::Error::new(x),
261 })
262 }
263}
264
265impl From<deadpool_postgres::tokio_postgres::Error> for ExternalError {
266 fn from(e: deadpool_postgres::tokio_postgres::Error) -> Self {
267 let code = match e.as_db_error().map(|x| x.code()) {
268 Some(x) => x,
269 None => {
270 return ExternalError::Indeterminate(Indeterminate {
271 inner: anyhow::Error::new(e),
272 });
273 }
274 };
275 match code {
276 &deadpool_postgres::tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE => {
279 ExternalError::Determinate(Determinate {
280 inner: anyhow::Error::new(e),
281 })
282 }
283 _ => ExternalError::Indeterminate(Indeterminate {
284 inner: anyhow::Error::new(e),
285 }),
286 }
287 }
288}
289
290impl From<azure_core::Error> for ExternalError {
291 fn from(value: azure_core::Error) -> Self {
292 let definitely_determinate = if let Some(http) = value.as_http_error() {
293 match http.status() {
294 StatusCode::TooManyRequests => true,
297 _ => false,
298 }
299 } else {
300 false
301 };
302 if definitely_determinate {
303 ExternalError::Determinate(Determinate {
304 inner: anyhow!(value),
305 })
306 } else {
307 ExternalError::Indeterminate(Indeterminate {
308 inner: anyhow!(value),
309 })
310 }
311 }
312}
313
314impl From<deadpool_postgres::PoolError> for ExternalError {
315 fn from(x: deadpool_postgres::PoolError) -> Self {
316 match x {
317 deadpool_postgres::PoolError::Backend(x) => ExternalError::from(x),
320 x => ExternalError::Indeterminate(Indeterminate {
321 inner: anyhow::Error::new(x),
322 }),
323 }
324 }
325}
326
327impl From<tokio::task::JoinError> for ExternalError {
328 fn from(x: tokio::task::JoinError) -> Self {
329 ExternalError::Indeterminate(Indeterminate {
330 inner: anyhow::Error::new(x),
331 })
332 }
333}
334
335#[derive(Debug, Clone, PartialEq)]
338pub struct VersionedData {
339 pub seqno: SeqNo,
341 pub data: Bytes,
343}
344
345#[allow(clippy::as_conversions)]
349pub const SCAN_ALL: usize = u64_to_usize(i64::MAX as u64);
350
351pub const CONSENSUS_HEAD_LIVENESS_KEY: &str = "LIVENESS";
353
354#[derive(Debug, PartialEq)]
356pub enum CaSResult {
357 Committed,
359 ExpectationMismatch,
361}
362
363#[derive(Debug)]
367pub struct Tasked<A>(pub Arc<A>);
368
369impl<A> Tasked<A> {
370 fn clone_backing(&self) -> Arc<A> {
371 Arc::clone(&self.0)
372 }
373}
374
375pub type ResultStream<'a, T> = Pin<Box<dyn Stream<Item = Result<T, ExternalError>> + Send + 'a>>;
378
379#[async_trait]
388pub trait Consensus: std::fmt::Debug + Send + Sync {
389 fn list_keys(&self) -> ResultStream<'_, String>;
391
392 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError>;
395
396 async fn compare_and_set(
407 &self,
408 key: &str,
409 expected: Option<SeqNo>,
410 new: VersionedData,
411 ) -> Result<CaSResult, ExternalError>;
412
413 async fn scan(
419 &self,
420 key: &str,
421 from: SeqNo,
422 limit: usize,
423 ) -> Result<Vec<VersionedData>, ExternalError>;
424
425 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError>;
432}
433
434#[async_trait]
435impl<A: Consensus + 'static> Consensus for Tasked<A> {
436 fn list_keys(&self) -> ResultStream<'_, String> {
437 self.0.list_keys()
443 }
444
445 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
446 let backing = self.clone_backing();
447 let key = key.to_owned();
448 mz_ore::task::spawn(
449 || "persist::task::head",
450 async move { backing.head(&key).await }.instrument(Span::current()),
451 )
452 .await?
453 }
454
455 async fn compare_and_set(
456 &self,
457 key: &str,
458 expected: Option<SeqNo>,
459 new: VersionedData,
460 ) -> Result<CaSResult, ExternalError> {
461 let backing = self.clone_backing();
462 let key = key.to_owned();
463 mz_ore::task::spawn(
464 || "persist::task::cas",
465 async move { backing.compare_and_set(&key, expected, new).await }
466 .instrument(Span::current()),
467 )
468 .await?
469 }
470
471 async fn scan(
472 &self,
473 key: &str,
474 from: SeqNo,
475 limit: usize,
476 ) -> Result<Vec<VersionedData>, ExternalError> {
477 let backing = self.clone_backing();
478 let key = key.to_owned();
479 mz_ore::task::spawn(
480 || "persist::task::scan",
481 async move { backing.scan(&key, from, limit).await }.instrument(Span::current()),
482 )
483 .await?
484 }
485
486 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
487 let backing = self.clone_backing();
488 let key = key.to_owned();
489 mz_ore::task::spawn(
490 || "persist::task::truncate",
491 async move { backing.truncate(&key, seqno).await }.instrument(Span::current()),
492 )
493 .await?
494 }
495}
496
497#[derive(Debug)]
499pub struct BlobMetadata<'a> {
500 pub key: &'a str,
502 pub size_in_bytes: u64,
504}
505
506pub const BLOB_GET_LIVENESS_KEY: &str = "LIVENESS";
508
509#[async_trait]
521pub trait Blob: std::fmt::Debug + Send + Sync {
522 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError>;
524
525 async fn list_keys_and_metadata(
530 &self,
531 key_prefix: &str,
532 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
533 ) -> Result<(), ExternalError>;
534
535 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError>;
540
541 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError>;
546
547 async fn restore(&self, key: &str) -> Result<(), ExternalError>;
557}
558
559#[async_trait]
560impl<A: Blob + 'static> Blob for Tasked<A> {
561 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
562 let backing = self.clone_backing();
563 let key = key.to_owned();
564 mz_ore::task::spawn(
565 || "persist::task::get",
566 async move { backing.get(&key).await }.instrument(Span::current()),
567 )
568 .await?
569 }
570
571 async fn list_keys_and_metadata(
576 &self,
577 key_prefix: &str,
578 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
579 ) -> Result<(), ExternalError> {
580 self.0.list_keys_and_metadata(key_prefix, f).await
583 }
584
585 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
587 let backing = self.clone_backing();
588 let key = key.to_owned();
589 mz_ore::task::spawn(
590 || "persist::task::set",
591 async move { backing.set(&key, value).await }.instrument(Span::current()),
592 )
593 .await?
594 }
595
596 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
601 let backing = self.clone_backing();
602 let key = key.to_owned();
603 mz_ore::task::spawn(
604 || "persist::task::delete",
605 async move { backing.delete(&key).await }.instrument(Span::current()),
606 )
607 .await?
608 }
609
610 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
611 let backing = self.clone_backing();
612 let key = key.to_owned();
613 mz_ore::task::spawn(
614 || "persist::task::restore",
615 async move { backing.restore(&key).await }.instrument(Span::current()),
616 )
617 .await?
618 }
619}
620
621#[cfg(test)]
623pub mod tests {
624 use std::future::Future;
625
626 use anyhow::anyhow;
627 use futures_util::TryStreamExt;
628 use mz_ore::assert_err;
629 use uuid::Uuid;
630
631 use crate::location::Blob;
632
633 use super::*;
634
635 fn keys(baseline: &[String], new: &[&str]) -> Vec<String> {
636 let mut ret = baseline.to_vec();
637 ret.extend(new.iter().map(|x| x.to_string()));
638 ret.sort();
639 ret
640 }
641
642 async fn get_keys(b: &impl Blob) -> Result<Vec<String>, ExternalError> {
643 let mut keys = vec![];
644 b.list_keys_and_metadata("", &mut |entry| keys.push(entry.key.to_string()))
645 .await?;
646 Ok(keys)
647 }
648
649 async fn get_keys_with_prefix(
650 b: &impl Blob,
651 prefix: &str,
652 ) -> Result<Vec<String>, ExternalError> {
653 let mut keys = vec![];
654 b.list_keys_and_metadata(prefix, &mut |entry| keys.push(entry.key.to_string()))
655 .await?;
656 Ok(keys)
657 }
658
659 pub async fn blob_impl_test<
661 B: Blob,
662 F: Future<Output = Result<B, ExternalError>>,
663 NewFn: Fn(&'static str) -> F,
664 >(
665 new_fn: NewFn,
666 ) -> Result<(), ExternalError> {
667 let values = ["v0".as_bytes().to_vec(), "v1".as_bytes().to_vec()];
668
669 let blob0 = new_fn("path0").await?;
670
671 let _ = new_fn("path1").await?;
673
674 let blob1 = new_fn("path0").await?;
676
677 let k0 = "foo/bar/k0";
678
679 assert_eq!(blob0.get(k0).await?, None);
681 assert_eq!(blob1.get(k0).await?, None);
682
683 let empty_keys = get_keys(&blob0).await?;
685 assert_eq!(empty_keys, Vec::<String>::new());
686 let empty_keys = get_keys(&blob1).await?;
687 assert_eq!(empty_keys, Vec::<String>::new());
688
689 blob0.set(k0, values[0].clone().into()).await?;
691 assert_eq!(
692 blob0.get(k0).await?.map(|s| s.into_contiguous()),
693 Some(values[0].clone())
694 );
695 assert_eq!(
696 blob1.get(k0).await?.map(|s| s.into_contiguous()),
697 Some(values[0].clone())
698 );
699
700 blob0.set("k0a", values[0].clone().into()).await?;
702 assert_eq!(
703 blob0.get("k0a").await?.map(|s| s.into_contiguous()),
704 Some(values[0].clone())
705 );
706 assert_eq!(
707 blob1.get("k0a").await?.map(|s| s.into_contiguous()),
708 Some(values[0].clone())
709 );
710
711 let mut blob_keys = get_keys(&blob0).await?;
713 blob_keys.sort();
714 assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
715 let mut blob_keys = get_keys(&blob1).await?;
716 blob_keys.sort();
717 assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
718
719 blob0.set(k0, values[1].clone().into()).await?;
721 assert_eq!(
722 blob0.get(k0).await?.map(|s| s.into_contiguous()),
723 Some(values[1].clone())
724 );
725 assert_eq!(
726 blob1.get(k0).await?.map(|s| s.into_contiguous()),
727 Some(values[1].clone())
728 );
729 blob0.set("k0a", values[1].clone().into()).await?;
731 assert_eq!(
732 blob0.get("k0a").await?.map(|s| s.into_contiguous()),
733 Some(values[1].clone())
734 );
735 assert_eq!(
736 blob1.get("k0a").await?.map(|s| s.into_contiguous()),
737 Some(values[1].clone())
738 );
739
740 assert_eq!(blob0.delete(k0).await, Ok(Some(2)));
742 assert_eq!(blob0.get(k0).await?, None);
744 assert_eq!(blob1.get(k0).await?, None);
745 assert_eq!(blob0.delete(k0).await, Ok(None));
747 assert_eq!(blob0.delete("nope").await, Ok(None));
749 blob0.set("empty", Bytes::new()).await?;
752 assert_eq!(blob0.delete("empty").await, Ok(Some(0)));
753
754 blob0.set("undelete", Bytes::from("data")).await?;
757 blob0.restore("undelete").await?;
759 assert_eq!(blob0.delete("undelete").await?, Some("data".len()));
760 let expected = match blob0.restore("undelete").await {
761 Ok(()) => Some(Bytes::from("data").into()),
762 Err(ExternalError::Determinate(_)) => None,
763 Err(other) => return Err(other),
764 };
765 assert_eq!(blob0.get("undelete").await?, expected);
766 blob0.delete("undelete").await?;
767
768 blob0.delete("k0a").await?;
770 let mut blob_keys = get_keys(&blob0).await?;
771 blob_keys.sort();
772 assert_eq!(blob_keys, empty_keys);
773 let mut blob_keys = get_keys(&blob1).await?;
774 blob_keys.sort();
775 assert_eq!(blob_keys, empty_keys);
776 blob0.set(k0, values[1].clone().into()).await?;
778 assert_eq!(
779 blob1.get(k0).await?.map(|s| s.into_contiguous()),
780 Some(values[1].clone())
781 );
782 assert_eq!(
783 blob0.get(k0).await?.map(|s| s.into_contiguous()),
784 Some(values[1].clone())
785 );
786
787 let mut expected_keys = empty_keys;
790 for i in 1..=5 {
791 let key = format!("k{}", i);
792 blob0.set(&key, values[0].clone().into()).await?;
793 expected_keys.push(key);
794 }
795
796 let mut blob_keys = get_keys(&blob0).await?;
798 blob_keys.sort();
799 assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
800 let mut blob_keys = get_keys(&blob1).await?;
801 blob_keys.sort();
802 assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
803
804 let mut expected_prefix_keys = vec![];
807 for i in 1..=3 {
808 let key = format!("k-prefix-{}", i);
809 blob0.set(&key, values[0].clone().into()).await?;
810 expected_prefix_keys.push(key);
811 }
812 let mut blob_keys = get_keys_with_prefix(&blob0, "k-prefix").await?;
813 blob_keys.sort();
814 assert_eq!(blob_keys, expected_prefix_keys);
815 let mut blob_keys = get_keys_with_prefix(&blob0, "k").await?;
816 blob_keys.sort();
817 expected_keys.extend(expected_prefix_keys);
818 expected_keys.sort();
819 assert_eq!(blob_keys, expected_keys);
820
821 let blob3 = new_fn("path0").await?;
823 assert_eq!(
824 blob3.get(k0).await?.map(|s| s.into_contiguous()),
825 Some(values[1].clone())
826 );
827
828 Ok(())
829 }
830
831 pub async fn consensus_impl_test<
833 C: Consensus,
834 F: Future<Output = Result<C, ExternalError>>,
835 NewFn: FnMut() -> F,
836 >(
837 mut new_fn: NewFn,
838 ) -> Result<(), ExternalError> {
839 let consensus = new_fn().await?;
840
841 let key = Uuid::new_v4().to_string();
844
845 assert_eq!(consensus.head(&key).await, Ok(None));
847
848 assert_eq!(consensus.scan(&key, SeqNo(0), SCAN_ALL).await, Ok(vec![]));
850
851 assert_err!(consensus.truncate(&key, SeqNo(0)).await);
853
854 let state = VersionedData {
855 seqno: SeqNo(5),
856 data: Bytes::from("abc"),
857 };
858
859 assert_eq!(
861 consensus
862 .compare_and_set(&key, Some(SeqNo(0)), state.clone())
863 .await,
864 Ok(CaSResult::ExpectationMismatch),
865 );
866
867 assert_eq!(
869 consensus.compare_and_set(&key, None, state.clone()).await,
870 Ok(CaSResult::Committed),
871 );
872
873 let keys: Vec<_> = consensus.list_keys().try_collect().await?;
875 assert_eq!(keys, vec![key.to_owned()]);
876
877 assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
879
880 assert_eq!(
882 consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
883 Ok(vec![state.clone()])
884 );
885
886 assert_eq!(
888 consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
889 Ok(vec![state.clone()])
890 );
891
892 assert_eq!(consensus.scan(&key, SeqNo(6), SCAN_ALL).await, Ok(vec![]));
895
896 assert_eq!(consensus.truncate(&key, SeqNo(0)).await, Ok(0));
899 assert_eq!(consensus.truncate(&key, SeqNo(5)).await, Ok(0));
900
901 assert_err!(consensus.truncate(&key, SeqNo(6)).await);
903
904 let new_state = VersionedData {
905 seqno: SeqNo(10),
906 data: Bytes::from("def"),
907 };
908
909 assert_eq!(
911 consensus
912 .compare_and_set(&key, Some(SeqNo(7)), new_state.clone())
913 .await,
914 Ok(CaSResult::ExpectationMismatch),
915 );
916
917 assert_eq!(
919 consensus
920 .compare_and_set(&key, Some(SeqNo(3)), new_state.clone())
921 .await,
922 Ok(CaSResult::ExpectationMismatch),
923 );
924
925 let invalid_constant_seqno = VersionedData {
926 seqno: SeqNo(5),
927 data: Bytes::from("invalid"),
928 };
929
930 assert_eq!(
933 consensus
934 .compare_and_set(&key, Some(state.seqno), invalid_constant_seqno)
935 .await,
936 Err(ExternalError::from(anyhow!(
937 "new seqno must be strictly greater than expected. Got new: SeqNo(5) expected: SeqNo(5)"
938 )))
939 );
940
941 let invalid_regressing_seqno = VersionedData {
942 seqno: SeqNo(3),
943 data: Bytes::from("invalid"),
944 };
945
946 assert_eq!(
949 consensus
950 .compare_and_set(&key, Some(state.seqno), invalid_regressing_seqno)
951 .await,
952 Err(ExternalError::from(anyhow!(
953 "new seqno must be strictly greater than expected. Got new: SeqNo(3) expected: SeqNo(5)"
954 )))
955 );
956
957 assert_eq!(
959 consensus
960 .compare_and_set(&key, Some(state.seqno), new_state.clone())
961 .await,
962 Ok(CaSResult::Committed),
963 );
964
965 assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
967
968 assert_eq!(
971 consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
972 Ok(vec![state.clone(), new_state.clone()])
973 );
974
975 assert_eq!(
978 consensus.scan(&key, SeqNo(6), SCAN_ALL).await,
979 Ok(vec![new_state.clone()])
980 );
981
982 assert_eq!(
985 consensus.scan(&key, SeqNo(10), SCAN_ALL).await,
986 Ok(vec![new_state.clone()])
987 );
988
989 assert_eq!(consensus.scan(&key, SeqNo(11), SCAN_ALL).await, Ok(vec![]));
991
992 assert_eq!(
994 consensus.scan(&key, SeqNo::minimum(), 1).await,
995 Ok(vec![state.clone()])
996 );
997 assert_eq!(
998 consensus.scan(&key, SeqNo(5), 1).await,
999 Ok(vec![state.clone()])
1000 );
1001
1002 assert_eq!(
1004 consensus.scan(&key, SeqNo::minimum(), 2).await,
1005 Ok(vec![state.clone(), new_state.clone()])
1006 );
1007
1008 assert_eq!(
1010 consensus.scan(&key, SeqNo(4), 100).await,
1011 Ok(vec![state.clone(), new_state.clone()])
1012 );
1013
1014 assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(1));
1016
1017 assert_eq!(
1019 consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
1020 Ok(vec![new_state.clone()])
1021 );
1022
1023 assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(0));
1026
1027 let other_key = Uuid::new_v4().to_string();
1029
1030 assert_eq!(consensus.head(&other_key).await, Ok(None));
1031
1032 let state = VersionedData {
1033 seqno: SeqNo(1),
1034 data: Bytes::from("einszweidrei"),
1035 };
1036
1037 assert_eq!(
1038 consensus
1039 .compare_and_set(&other_key, None, state.clone())
1040 .await,
1041 Ok(CaSResult::Committed),
1042 );
1043
1044 assert_eq!(consensus.head(&other_key).await, Ok(Some(state.clone())));
1045
1046 assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
1048
1049 let invalid_jump_forward = VersionedData {
1051 seqno: SeqNo(11),
1052 data: Bytes::from("invalid"),
1053 };
1054 assert_eq!(
1055 consensus
1056 .compare_and_set(&key, Some(state.seqno), invalid_jump_forward)
1057 .await,
1058 Ok(CaSResult::ExpectationMismatch),
1059 );
1060
1061 let large_state = VersionedData {
1063 seqno: SeqNo(11),
1064 data: std::iter::repeat(b'a').take(10240).collect(),
1065 };
1066 assert_eq!(
1067 consensus
1068 .compare_and_set(&key, Some(new_state.seqno), large_state)
1069 .await,
1070 Ok(CaSResult::Committed),
1071 );
1072
1073 let v12 = VersionedData {
1075 seqno: SeqNo(12),
1076 data: Bytes::new(),
1077 };
1078 assert_eq!(
1079 consensus.compare_and_set(&key, Some(SeqNo(11)), v12).await,
1080 Ok(CaSResult::Committed),
1081 );
1082 assert_eq!(consensus.truncate(&key, SeqNo(12)).await, Ok(2));
1083
1084 assert_eq!(
1087 consensus
1088 .compare_and_set(
1089 &Uuid::new_v4().to_string(),
1090 None,
1091 VersionedData {
1092 seqno: SeqNo(0),
1093 data: Bytes::new(),
1094 }
1095 )
1096 .await,
1097 Ok(CaSResult::Committed),
1098 );
1099 assert_eq!(
1100 consensus
1101 .compare_and_set(
1102 &Uuid::new_v4().to_string(),
1103 None,
1104 VersionedData {
1105 seqno: SeqNo(i64::MAX.try_into().expect("i64::MAX fits in u64")),
1106 data: Bytes::new(),
1107 }
1108 )
1109 .await,
1110 Ok(CaSResult::Committed),
1111 );
1112 assert_err!(
1113 consensus
1114 .compare_and_set(
1115 &Uuid::new_v4().to_string(),
1116 None,
1117 VersionedData {
1118 seqno: SeqNo(1 << 63),
1119 data: Bytes::new(),
1120 }
1121 )
1122 .await
1123 );
1124 assert_err!(
1125 consensus
1126 .compare_and_set(
1127 &Uuid::new_v4().to_string(),
1128 None,
1129 VersionedData {
1130 seqno: SeqNo(u64::MAX),
1131 data: Bytes::new(),
1132 }
1133 )
1134 .await
1135 );
1136
1137 Ok(())
1138 }
1139
1140 #[mz_ore::test]
1141 fn timeout_error() {
1142 assert!(ExternalError::new_timeout(Instant::now()).is_timeout());
1143 assert!(!ExternalError::from(anyhow!("foo")).is_timeout());
1144 }
1145}