1use std::fmt;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use azure_core::StatusCode;
20use bytes::Bytes;
21use futures_util::Stream;
22use mz_ore::bytes::SegmentedBytes;
23use mz_ore::cast::u64_to_usize;
24use mz_postgres_client::error::PostgresError;
25use mz_proto::RustType;
26use proptest_derive::Arbitrary;
27use serde::{Deserialize, Serialize};
28use tracing::{Instrument, Span};
29
30use crate::error::Error;
31
32#[derive(
46 Arbitrary,
47 Clone,
48 Copy,
49 Debug,
50 PartialOrd,
51 Ord,
52 PartialEq,
53 Eq,
54 Hash,
55 Serialize,
56 Deserialize
57)]
58pub struct SeqNo(pub u64);
59
60impl std::fmt::Display for SeqNo {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 write!(f, "v{}", self.0)
63 }
64}
65
66impl timely::PartialOrder for SeqNo {
67 fn less_equal(&self, other: &Self) -> bool {
68 self <= other
69 }
70}
71
72impl std::str::FromStr for SeqNo {
73 type Err = String;
74
75 fn from_str(encoded: &str) -> Result<Self, Self::Err> {
76 let encoded = match encoded.strip_prefix('v') {
77 Some(x) => x,
78 None => return Err(format!("invalid SeqNo {}: incorrect prefix", encoded)),
79 };
80 let seqno =
81 u64::from_str(encoded).map_err(|err| format!("invalid SeqNo {}: {}", encoded, err))?;
82 Ok(SeqNo(seqno))
83 }
84}
85
86impl SeqNo {
87 pub fn next(self) -> SeqNo {
89 SeqNo(self.0 + 1)
90 }
91
92 pub fn minimum() -> Self {
94 SeqNo(0)
95 }
96
97 pub fn maximum() -> Self {
99 SeqNo(u64::MAX)
100 }
101}
102
103impl RustType<u64> for SeqNo {
104 fn into_proto(&self) -> u64 {
105 self.0
106 }
107
108 fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
109 Ok(SeqNo(proto))
110 }
111}
112
113#[derive(Debug)]
116pub struct Determinate {
117 inner: anyhow::Error,
118}
119
120impl std::fmt::Display for Determinate {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 write!(f, "determinate: ")?;
123 self.inner.fmt(f)
124 }
125}
126
127impl std::error::Error for Determinate {
128 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
129 self.inner.source()
130 }
131}
132
133impl From<anyhow::Error> for Determinate {
134 fn from(inner: anyhow::Error) -> Self {
135 Self::new(inner)
136 }
137}
138
139impl Determinate {
140 pub fn new(inner: anyhow::Error) -> Self {
144 Determinate { inner }
145 }
146}
147
148#[derive(Debug)]
151pub struct Indeterminate {
152 pub(crate) inner: anyhow::Error,
153}
154
155impl Indeterminate {
156 pub fn new(inner: anyhow::Error) -> Self {
160 Indeterminate { inner }
161 }
162}
163
164impl std::fmt::Display for Indeterminate {
165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166 write!(f, "indeterminate: ")?;
167 self.inner.fmt(f)
168 }
169}
170
171impl std::error::Error for Indeterminate {
172 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
173 self.inner.source()
174 }
175}
176
177#[cfg(any(test, debug_assertions))]
179impl PartialEq for Indeterminate {
180 fn eq(&self, other: &Self) -> bool {
181 self.to_string() == other.to_string()
182 }
183}
184
185#[derive(Debug)]
188pub enum ExternalError {
189 Determinate(Determinate),
191 Indeterminate(Indeterminate),
193}
194
195impl ExternalError {
196 #[track_caller]
201 pub fn new_timeout(deadline: Instant) -> Self {
202 ExternalError::Indeterminate(Indeterminate {
203 inner: anyhow!("timeout at {:?}", deadline),
204 })
205 }
206
207 pub fn is_timeout(&self) -> bool {
212 self.to_string().contains("timeout")
214 }
215}
216
217impl std::fmt::Display for ExternalError {
218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219 match self {
220 ExternalError::Determinate(x) => std::fmt::Display::fmt(x, f),
221 ExternalError::Indeterminate(x) => std::fmt::Display::fmt(x, f),
222 }
223 }
224}
225
226impl std::error::Error for ExternalError {
227 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
228 match self {
229 ExternalError::Determinate(e) => e.source(),
230 ExternalError::Indeterminate(e) => e.source(),
231 }
232 }
233}
234
235#[cfg(any(test, debug_assertions))]
237impl PartialEq for ExternalError {
238 fn eq(&self, other: &Self) -> bool {
239 self.to_string() == other.to_string()
240 }
241}
242
243impl From<PostgresError> for ExternalError {
244 fn from(x: PostgresError) -> Self {
245 match x {
246 PostgresError::Determinate(e) => ExternalError::Determinate(Determinate::new(e)),
247 PostgresError::Indeterminate(e) => ExternalError::Indeterminate(Indeterminate::new(e)),
248 }
249 }
250}
251
252impl From<Indeterminate> for ExternalError {
253 fn from(x: Indeterminate) -> Self {
254 ExternalError::Indeterminate(x)
255 }
256}
257
258impl From<Determinate> for ExternalError {
259 fn from(x: Determinate) -> Self {
260 ExternalError::Determinate(x)
261 }
262}
263
264impl From<anyhow::Error> for ExternalError {
265 fn from(inner: anyhow::Error) -> Self {
266 ExternalError::Indeterminate(Indeterminate { inner })
267 }
268}
269
270impl From<Error> for ExternalError {
271 fn from(x: Error) -> Self {
272 ExternalError::Indeterminate(Indeterminate {
273 inner: anyhow::Error::new(x),
274 })
275 }
276}
277
278impl From<std::io::Error> for ExternalError {
279 fn from(x: std::io::Error) -> Self {
280 ExternalError::Indeterminate(Indeterminate {
281 inner: anyhow::Error::new(x),
282 })
283 }
284}
285
286impl From<deadpool_postgres::tokio_postgres::Error> for ExternalError {
287 fn from(e: deadpool_postgres::tokio_postgres::Error) -> Self {
288 let code = match e.as_db_error().map(|x| x.code()) {
289 Some(x) => x,
290 None => {
291 return ExternalError::Indeterminate(Indeterminate {
292 inner: anyhow::Error::new(e),
293 });
294 }
295 };
296 match code {
297 &deadpool_postgres::tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE => {
300 ExternalError::Determinate(Determinate {
301 inner: anyhow::Error::new(e),
302 })
303 }
304 _ => ExternalError::Indeterminate(Indeterminate {
305 inner: anyhow::Error::new(e),
306 }),
307 }
308 }
309}
310
311impl From<azure_core::Error> for ExternalError {
312 fn from(value: azure_core::Error) -> Self {
313 let definitely_determinate = if let Some(http) = value.as_http_error() {
314 match http.status() {
315 StatusCode::TooManyRequests => true,
318 _ => false,
319 }
320 } else {
321 false
322 };
323 if definitely_determinate {
324 ExternalError::Determinate(Determinate {
325 inner: anyhow!(value),
326 })
327 } else {
328 ExternalError::Indeterminate(Indeterminate {
329 inner: anyhow!(value),
330 })
331 }
332 }
333}
334
335impl From<deadpool_postgres::PoolError> for ExternalError {
336 fn from(x: deadpool_postgres::PoolError) -> Self {
337 match x {
338 deadpool_postgres::PoolError::Backend(x) => ExternalError::from(x),
341 x => ExternalError::Indeterminate(Indeterminate {
342 inner: anyhow::Error::new(x),
343 }),
344 }
345 }
346}
347
348impl From<tokio::task::JoinError> for ExternalError {
349 fn from(x: tokio::task::JoinError) -> Self {
350 ExternalError::Indeterminate(Indeterminate {
351 inner: anyhow::Error::new(x),
352 })
353 }
354}
355
356#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
359pub struct VersionedData {
360 pub seqno: SeqNo,
362 pub data: Bytes,
364}
365
366#[allow(clippy::as_conversions)]
370pub const SCAN_ALL: usize = u64_to_usize(i64::MAX as u64);
371
372pub const CONSENSUS_HEAD_LIVENESS_KEY: &str = "LIVENESS";
374
375#[derive(Debug, PartialEq, Serialize, Deserialize)]
377pub enum CaSResult {
378 Committed,
380 ExpectationMismatch,
382}
383
384#[derive(Debug)]
388pub struct Tasked<A>(pub Arc<A>);
389
390impl<A> Tasked<A> {
391 fn clone_backing(&self) -> Arc<A> {
392 Arc::clone(&self.0)
393 }
394}
395
396pub type ResultStream<'a, T> = Pin<Box<dyn Stream<Item = Result<T, ExternalError>> + Send + 'a>>;
399
400#[async_trait]
409pub trait Consensus: std::fmt::Debug + Send + Sync {
410 fn list_keys(&self) -> ResultStream<'_, String>;
412
413 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError>;
416
417 async fn compare_and_set(
428 &self,
429 key: &str,
430 expected: Option<SeqNo>,
431 new: VersionedData,
432 ) -> Result<CaSResult, ExternalError>;
433
434 async fn scan(
440 &self,
441 key: &str,
442 from: SeqNo,
443 limit: usize,
444 ) -> Result<Vec<VersionedData>, ExternalError>;
445
446 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError>;
453}
454
455#[async_trait]
456impl<A: Consensus + 'static> Consensus for Tasked<A> {
457 fn list_keys(&self) -> ResultStream<'_, String> {
458 self.0.list_keys()
464 }
465
466 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
467 let backing = self.clone_backing();
468 let key = key.to_owned();
469 mz_ore::task::spawn(
470 || "persist::task::head",
471 async move { backing.head(&key).await }.instrument(Span::current()),
472 )
473 .await
474 }
475
476 async fn compare_and_set(
477 &self,
478 key: &str,
479 expected: Option<SeqNo>,
480 new: VersionedData,
481 ) -> Result<CaSResult, ExternalError> {
482 let backing = self.clone_backing();
483 let key = key.to_owned();
484 mz_ore::task::spawn(
485 || "persist::task::cas",
486 async move { backing.compare_and_set(&key, expected, new).await }
487 .instrument(Span::current()),
488 )
489 .await
490 }
491
492 async fn scan(
493 &self,
494 key: &str,
495 from: SeqNo,
496 limit: usize,
497 ) -> Result<Vec<VersionedData>, ExternalError> {
498 let backing = self.clone_backing();
499 let key = key.to_owned();
500 mz_ore::task::spawn(
501 || "persist::task::scan",
502 async move { backing.scan(&key, from, limit).await }.instrument(Span::current()),
503 )
504 .await
505 }
506
507 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
508 let backing = self.clone_backing();
509 let key = key.to_owned();
510 mz_ore::task::spawn(
511 || "persist::task::truncate",
512 async move { backing.truncate(&key, seqno).await }.instrument(Span::current()),
513 )
514 .await
515 }
516}
517
518#[derive(Debug)]
520pub struct BlobMetadata<'a> {
521 pub key: &'a str,
523 pub size_in_bytes: u64,
525}
526
527pub const BLOB_GET_LIVENESS_KEY: &str = "LIVENESS";
529
530#[async_trait]
542pub trait Blob: std::fmt::Debug + Send + Sync {
543 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError>;
545
546 async fn list_keys_and_metadata(
551 &self,
552 key_prefix: &str,
553 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
554 ) -> Result<(), ExternalError>;
555
556 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError>;
561
562 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError>;
567
568 async fn restore(&self, key: &str) -> Result<(), ExternalError>;
578}
579
580#[async_trait]
581impl<A: Blob + 'static> Blob for Tasked<A> {
582 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
583 let backing = self.clone_backing();
584 let key = key.to_owned();
585 mz_ore::task::spawn(
586 || "persist::task::get",
587 async move { backing.get(&key).await }.instrument(Span::current()),
588 )
589 .await
590 }
591
592 async fn list_keys_and_metadata(
597 &self,
598 key_prefix: &str,
599 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
600 ) -> Result<(), ExternalError> {
601 self.0.list_keys_and_metadata(key_prefix, f).await
604 }
605
606 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
608 let backing = self.clone_backing();
609 let key = key.to_owned();
610 mz_ore::task::spawn(
611 || "persist::task::set",
612 async move { backing.set(&key, value).await }.instrument(Span::current()),
613 )
614 .await
615 }
616
617 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
622 let backing = self.clone_backing();
623 let key = key.to_owned();
624 mz_ore::task::spawn(
625 || "persist::task::delete",
626 async move { backing.delete(&key).await }.instrument(Span::current()),
627 )
628 .await
629 }
630
631 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
632 let backing = self.clone_backing();
633 let key = key.to_owned();
634 mz_ore::task::spawn(
635 || "persist::task::restore",
636 async move { backing.restore(&key).await }.instrument(Span::current()),
637 )
638 .await
639 }
640}
641
642#[cfg(test)]
644pub mod tests {
645 use std::future::Future;
646
647 use anyhow::anyhow;
648 use futures_util::TryStreamExt;
649 use mz_ore::{assert_err, assert_ok};
650 use uuid::Uuid;
651
652 use crate::location::Blob;
653
654 use super::*;
655
656 fn keys(baseline: &[String], new: &[&str]) -> Vec<String> {
657 let mut ret = baseline.to_vec();
658 ret.extend(new.iter().map(|x| x.to_string()));
659 ret.sort();
660 ret
661 }
662
663 async fn get_keys(b: &impl Blob) -> Result<Vec<String>, ExternalError> {
664 let mut keys = vec![];
665 b.list_keys_and_metadata("", &mut |entry| keys.push(entry.key.to_string()))
666 .await?;
667 Ok(keys)
668 }
669
670 async fn get_keys_with_prefix(
671 b: &impl Blob,
672 prefix: &str,
673 ) -> Result<Vec<String>, ExternalError> {
674 let mut keys = vec![];
675 b.list_keys_and_metadata(prefix, &mut |entry| keys.push(entry.key.to_string()))
676 .await?;
677 Ok(keys)
678 }
679
680 pub async fn blob_impl_test<
682 B: Blob,
683 F: Future<Output = Result<B, ExternalError>>,
684 NewFn: Fn(&'static str) -> F,
685 >(
686 new_fn: NewFn,
687 ) -> Result<(), ExternalError> {
688 let values = ["v0".as_bytes().to_vec(), "v1".as_bytes().to_vec()];
689
690 let blob0 = new_fn("path0").await?;
691
692 let _ = new_fn("path1").await?;
694
695 let blob1 = new_fn("path0").await?;
697
698 let k0 = "foo/bar/k0";
699
700 assert_eq!(blob0.get(k0).await?, None);
702 assert_eq!(blob1.get(k0).await?, None);
703
704 let empty_keys = get_keys(&blob0).await?;
706 assert_eq!(empty_keys, Vec::<String>::new());
707 let empty_keys = get_keys(&blob1).await?;
708 assert_eq!(empty_keys, Vec::<String>::new());
709
710 blob0.set(k0, values[0].clone().into()).await?;
712 assert_eq!(
713 blob0.get(k0).await?.map(|s| s.into_contiguous()),
714 Some(values[0].clone())
715 );
716 assert_eq!(
717 blob1.get(k0).await?.map(|s| s.into_contiguous()),
718 Some(values[0].clone())
719 );
720
721 blob0.set("k0a", values[0].clone().into()).await?;
723 assert_eq!(
724 blob0.get("k0a").await?.map(|s| s.into_contiguous()),
725 Some(values[0].clone())
726 );
727 assert_eq!(
728 blob1.get("k0a").await?.map(|s| s.into_contiguous()),
729 Some(values[0].clone())
730 );
731
732 let mut blob_keys = get_keys(&blob0).await?;
734 blob_keys.sort();
735 assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
736 let mut blob_keys = get_keys(&blob1).await?;
737 blob_keys.sort();
738 assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
739
740 blob0.set(k0, values[1].clone().into()).await?;
742 assert_eq!(
743 blob0.get(k0).await?.map(|s| s.into_contiguous()),
744 Some(values[1].clone())
745 );
746 assert_eq!(
747 blob1.get(k0).await?.map(|s| s.into_contiguous()),
748 Some(values[1].clone())
749 );
750 blob0.set("k0a", values[1].clone().into()).await?;
752 assert_eq!(
753 blob0.get("k0a").await?.map(|s| s.into_contiguous()),
754 Some(values[1].clone())
755 );
756 assert_eq!(
757 blob1.get("k0a").await?.map(|s| s.into_contiguous()),
758 Some(values[1].clone())
759 );
760
761 assert_eq!(blob0.delete(k0).await, Ok(Some(2)));
763 assert_eq!(blob0.get(k0).await?, None);
765 assert_eq!(blob1.get(k0).await?, None);
766 assert_eq!(blob0.delete(k0).await, Ok(None));
768 assert_eq!(blob0.delete("nope").await, Ok(None));
770 blob0.set("empty", Bytes::new()).await?;
773 assert_eq!(blob0.delete("empty").await, Ok(Some(0)));
774
775 blob0.set("undelete", Bytes::from("data")).await?;
778 blob0.restore("undelete").await?;
780 assert_eq!(blob0.delete("undelete").await?, Some("data".len()));
781 let expected = match blob0.restore("undelete").await {
782 Ok(()) => Some(Bytes::from("data").into()),
783 Err(ExternalError::Determinate(_)) => None,
784 Err(other) => return Err(other),
785 };
786 assert_eq!(blob0.get("undelete").await?, expected);
787 blob0.delete("undelete").await?;
788
789 blob0.delete("k0a").await?;
791 let mut blob_keys = get_keys(&blob0).await?;
792 blob_keys.sort();
793 assert_eq!(blob_keys, empty_keys);
794 let mut blob_keys = get_keys(&blob1).await?;
795 blob_keys.sort();
796 assert_eq!(blob_keys, empty_keys);
797 blob0.set(k0, values[1].clone().into()).await?;
799 assert_eq!(
800 blob1.get(k0).await?.map(|s| s.into_contiguous()),
801 Some(values[1].clone())
802 );
803 assert_eq!(
804 blob0.get(k0).await?.map(|s| s.into_contiguous()),
805 Some(values[1].clone())
806 );
807
808 let mut expected_keys = empty_keys;
811 for i in 1..=5 {
812 let key = format!("k{}", i);
813 blob0.set(&key, values[0].clone().into()).await?;
814 expected_keys.push(key);
815 }
816
817 let mut blob_keys = get_keys(&blob0).await?;
819 blob_keys.sort();
820 assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
821 let mut blob_keys = get_keys(&blob1).await?;
822 blob_keys.sort();
823 assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
824
825 let mut expected_prefix_keys = vec![];
828 for i in 1..=3 {
829 let key = format!("k-prefix-{}", i);
830 blob0.set(&key, values[0].clone().into()).await?;
831 expected_prefix_keys.push(key);
832 }
833 let mut blob_keys = get_keys_with_prefix(&blob0, "k-prefix").await?;
834 blob_keys.sort();
835 assert_eq!(blob_keys, expected_prefix_keys);
836 let mut blob_keys = get_keys_with_prefix(&blob0, "k").await?;
837 blob_keys.sort();
838 expected_keys.extend(expected_prefix_keys);
839 expected_keys.sort();
840 assert_eq!(blob_keys, expected_keys);
841
842 let blob3 = new_fn("path0").await?;
844 assert_eq!(
845 blob3.get(k0).await?.map(|s| s.into_contiguous()),
846 Some(values[1].clone())
847 );
848
849 Ok(())
850 }
851
852 pub async fn consensus_impl_test<
854 C: Consensus,
855 F: Future<Output = Result<C, ExternalError>>,
856 NewFn: FnMut() -> F,
857 >(
858 mut new_fn: NewFn,
859 ) -> Result<(), ExternalError> {
860 let consensus = new_fn().await?;
861
862 let key = Uuid::new_v4().to_string();
865
866 assert_eq!(consensus.head(&key).await, Ok(None));
868
869 assert_eq!(consensus.scan(&key, SeqNo(0), SCAN_ALL).await, Ok(vec![]));
871
872 assert_err!(consensus.truncate(&key, SeqNo(0)).await);
874
875 let state = VersionedData {
876 seqno: SeqNo(5),
877 data: Bytes::from("abc"),
878 };
879
880 assert_eq!(
882 consensus
883 .compare_and_set(&key, Some(SeqNo(0)), state.clone())
884 .await,
885 Ok(CaSResult::ExpectationMismatch),
886 );
887
888 assert_eq!(
890 consensus.compare_and_set(&key, None, state.clone()).await,
891 Ok(CaSResult::Committed),
892 );
893
894 let keys: Vec<_> = consensus.list_keys().try_collect().await?;
896 assert_eq!(keys, vec![key.to_owned()]);
897
898 assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
900
901 assert_eq!(
903 consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
904 Ok(vec![state.clone()])
905 );
906
907 assert_eq!(
909 consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
910 Ok(vec![state.clone()])
911 );
912
913 assert_eq!(consensus.scan(&key, SeqNo(6), SCAN_ALL).await, Ok(vec![]));
916
917 assert_ok!(consensus.truncate(&key, SeqNo(0)).await);
920 assert_ok!(consensus.truncate(&key, SeqNo(5)).await);
921
922 assert_err!(consensus.truncate(&key, SeqNo(6)).await);
924
925 let new_state = VersionedData {
926 seqno: SeqNo(10),
927 data: Bytes::from("def"),
928 };
929
930 assert_eq!(
932 consensus
933 .compare_and_set(&key, Some(SeqNo(7)), new_state.clone())
934 .await,
935 Ok(CaSResult::ExpectationMismatch),
936 );
937
938 assert_eq!(
940 consensus
941 .compare_and_set(&key, Some(SeqNo(3)), new_state.clone())
942 .await,
943 Ok(CaSResult::ExpectationMismatch),
944 );
945
946 let invalid_constant_seqno = VersionedData {
947 seqno: SeqNo(5),
948 data: Bytes::from("invalid"),
949 };
950
951 assert_eq!(
954 consensus
955 .compare_and_set(&key, Some(state.seqno), invalid_constant_seqno)
956 .await,
957 Err(ExternalError::from(anyhow!(
958 "new seqno must be strictly greater than expected. Got new: SeqNo(5) expected: SeqNo(5)"
959 )))
960 );
961
962 let invalid_regressing_seqno = VersionedData {
963 seqno: SeqNo(3),
964 data: Bytes::from("invalid"),
965 };
966
967 assert_eq!(
970 consensus
971 .compare_and_set(&key, Some(state.seqno), invalid_regressing_seqno)
972 .await,
973 Err(ExternalError::from(anyhow!(
974 "new seqno must be strictly greater than expected. Got new: SeqNo(3) expected: SeqNo(5)"
975 )))
976 );
977
978 assert_eq!(
980 consensus
981 .compare_and_set(&key, Some(state.seqno), new_state.clone())
982 .await,
983 Ok(CaSResult::Committed),
984 );
985
986 assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
988
989 assert_eq!(
992 consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
993 Ok(vec![state.clone(), new_state.clone()])
994 );
995
996 assert_eq!(
999 consensus.scan(&key, SeqNo(6), SCAN_ALL).await,
1000 Ok(vec![new_state.clone()])
1001 );
1002
1003 assert_eq!(
1006 consensus.scan(&key, SeqNo(10), SCAN_ALL).await,
1007 Ok(vec![new_state.clone()])
1008 );
1009
1010 assert_eq!(consensus.scan(&key, SeqNo(11), SCAN_ALL).await, Ok(vec![]));
1012
1013 assert_eq!(
1015 consensus.scan(&key, SeqNo::minimum(), 1).await,
1016 Ok(vec![state.clone()])
1017 );
1018 assert_eq!(
1019 consensus.scan(&key, SeqNo(5), 1).await,
1020 Ok(vec![state.clone()])
1021 );
1022
1023 assert_eq!(
1025 consensus.scan(&key, SeqNo::minimum(), 2).await,
1026 Ok(vec![state.clone(), new_state.clone()])
1027 );
1028
1029 assert_eq!(
1031 consensus.scan(&key, SeqNo(4), 100).await,
1032 Ok(vec![state.clone(), new_state.clone()])
1033 );
1034
1035 assert_ok!(consensus.truncate(&key, SeqNo(6)).await);
1037
1038 assert_eq!(
1040 consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
1041 Ok(vec![new_state.clone()])
1042 );
1043
1044 assert_ok!(consensus.truncate(&key, SeqNo(6)).await);
1047
1048 let other_key = Uuid::new_v4().to_string();
1050
1051 assert_eq!(consensus.head(&other_key).await, Ok(None));
1052
1053 let state = VersionedData {
1054 seqno: SeqNo(1),
1055 data: Bytes::from("einszweidrei"),
1056 };
1057
1058 assert_eq!(
1059 consensus
1060 .compare_and_set(&other_key, None, state.clone())
1061 .await,
1062 Ok(CaSResult::Committed),
1063 );
1064
1065 assert_eq!(consensus.head(&other_key).await, Ok(Some(state.clone())));
1066
1067 assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
1069
1070 let invalid_jump_forward = VersionedData {
1072 seqno: SeqNo(11),
1073 data: Bytes::from("invalid"),
1074 };
1075 assert_eq!(
1076 consensus
1077 .compare_and_set(&key, Some(state.seqno), invalid_jump_forward)
1078 .await,
1079 Ok(CaSResult::ExpectationMismatch),
1080 );
1081
1082 let large_state = VersionedData {
1084 seqno: SeqNo(11),
1085 data: std::iter::repeat(b'a').take(10240).collect(),
1086 };
1087 assert_eq!(
1088 consensus
1089 .compare_and_set(&key, Some(new_state.seqno), large_state)
1090 .await,
1091 Ok(CaSResult::Committed),
1092 );
1093
1094 let v12 = VersionedData {
1096 seqno: SeqNo(12),
1097 data: Bytes::new(),
1098 };
1099 assert_eq!(
1100 consensus.compare_and_set(&key, Some(SeqNo(11)), v12).await,
1101 Ok(CaSResult::Committed),
1102 );
1103 assert_ok!(consensus.truncate(&key, SeqNo(12)).await);
1104
1105 assert_eq!(
1108 consensus
1109 .compare_and_set(
1110 &Uuid::new_v4().to_string(),
1111 None,
1112 VersionedData {
1113 seqno: SeqNo(0),
1114 data: Bytes::new(),
1115 }
1116 )
1117 .await,
1118 Ok(CaSResult::Committed),
1119 );
1120 assert_eq!(
1121 consensus
1122 .compare_and_set(
1123 &Uuid::new_v4().to_string(),
1124 None,
1125 VersionedData {
1126 seqno: SeqNo(i64::MAX.try_into().expect("i64::MAX fits in u64")),
1127 data: Bytes::new(),
1128 }
1129 )
1130 .await,
1131 Ok(CaSResult::Committed),
1132 );
1133 assert_err!(
1134 consensus
1135 .compare_and_set(
1136 &Uuid::new_v4().to_string(),
1137 None,
1138 VersionedData {
1139 seqno: SeqNo(1 << 63),
1140 data: Bytes::new(),
1141 }
1142 )
1143 .await
1144 );
1145 assert_err!(
1146 consensus
1147 .compare_and_set(
1148 &Uuid::new_v4().to_string(),
1149 None,
1150 VersionedData {
1151 seqno: SeqNo(u64::MAX),
1152 data: Bytes::new(),
1153 }
1154 )
1155 .await
1156 );
1157
1158 Ok(())
1159 }
1160
1161 #[mz_ore::test]
1162 fn timeout_error() {
1163 assert!(ExternalError::new_timeout(Instant::now()).is_timeout());
1164 assert!(!ExternalError::from(anyhow!("foo")).is_timeout());
1165 }
1166}