1use std::fmt;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use azure_core::StatusCode;
20use bytes::Bytes;
21use futures_util::Stream;
22use mz_ore::bytes::SegmentedBytes;
23use mz_ore::cast::u64_to_usize;
24use mz_postgres_client::error::PostgresError;
25use mz_proto::RustType;
26use proptest_derive::Arbitrary;
27use serde::{Deserialize, Serialize};
28use tracing::{Instrument, Span};
29
30use crate::error::Error;
31
32#[derive(
46 Arbitrary, Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize,
47)]
48pub struct SeqNo(pub u64);
49
50impl std::fmt::Display for SeqNo {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 write!(f, "v{}", self.0)
53 }
54}
55
56impl timely::PartialOrder for SeqNo {
57 fn less_equal(&self, other: &Self) -> bool {
58 self <= other
59 }
60}
61
62impl std::str::FromStr for SeqNo {
63 type Err = String;
64
65 fn from_str(encoded: &str) -> Result<Self, Self::Err> {
66 let encoded = match encoded.strip_prefix('v') {
67 Some(x) => x,
68 None => return Err(format!("invalid SeqNo {}: incorrect prefix", encoded)),
69 };
70 let seqno =
71 u64::from_str(encoded).map_err(|err| format!("invalid SeqNo {}: {}", encoded, err))?;
72 Ok(SeqNo(seqno))
73 }
74}
75
76impl SeqNo {
77 pub fn next(self) -> SeqNo {
79 SeqNo(self.0 + 1)
80 }
81
82 pub fn minimum() -> Self {
84 SeqNo(0)
85 }
86
87 pub fn maximum() -> Self {
89 SeqNo(u64::MAX)
90 }
91}
92
93impl RustType<u64> for SeqNo {
94 fn into_proto(&self) -> u64 {
95 self.0
96 }
97
98 fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
99 Ok(SeqNo(proto))
100 }
101}
102
103#[derive(Debug)]
106pub struct Determinate {
107 inner: anyhow::Error,
108}
109
110impl std::fmt::Display for Determinate {
111 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112 write!(f, "determinate: ")?;
113 self.inner.fmt(f)
114 }
115}
116
117impl std::error::Error for Determinate {
118 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
119 self.inner.source()
120 }
121}
122
123impl From<anyhow::Error> for Determinate {
124 fn from(inner: anyhow::Error) -> Self {
125 Self::new(inner)
126 }
127}
128
129impl Determinate {
130 pub fn new(inner: anyhow::Error) -> Self {
134 Determinate { inner }
135 }
136}
137
138#[derive(Debug)]
141pub struct Indeterminate {
142 pub(crate) inner: anyhow::Error,
143}
144
145impl Indeterminate {
146 pub fn new(inner: anyhow::Error) -> Self {
150 Indeterminate { inner }
151 }
152}
153
154impl std::fmt::Display for Indeterminate {
155 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156 write!(f, "indeterminate: ")?;
157 self.inner.fmt(f)
158 }
159}
160
161impl std::error::Error for Indeterminate {
162 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
163 self.inner.source()
164 }
165}
166
167#[cfg(any(test, debug_assertions))]
169impl PartialEq for Indeterminate {
170 fn eq(&self, other: &Self) -> bool {
171 self.to_string() == other.to_string()
172 }
173}
174
175#[derive(Debug)]
178pub enum ExternalError {
179 Determinate(Determinate),
181 Indeterminate(Indeterminate),
183}
184
185impl ExternalError {
186 #[track_caller]
191 pub fn new_timeout(deadline: Instant) -> Self {
192 ExternalError::Indeterminate(Indeterminate {
193 inner: anyhow!("timeout at {:?}", deadline),
194 })
195 }
196
197 pub fn is_timeout(&self) -> bool {
202 self.to_string().contains("timeout")
204 }
205}
206
207impl std::fmt::Display for ExternalError {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 match self {
210 ExternalError::Determinate(x) => std::fmt::Display::fmt(x, f),
211 ExternalError::Indeterminate(x) => std::fmt::Display::fmt(x, f),
212 }
213 }
214}
215
216impl std::error::Error for ExternalError {
217 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
218 match self {
219 ExternalError::Determinate(e) => e.source(),
220 ExternalError::Indeterminate(e) => e.source(),
221 }
222 }
223}
224
225#[cfg(any(test, debug_assertions))]
227impl PartialEq for ExternalError {
228 fn eq(&self, other: &Self) -> bool {
229 self.to_string() == other.to_string()
230 }
231}
232
233impl From<PostgresError> for ExternalError {
234 fn from(x: PostgresError) -> Self {
235 match x {
236 PostgresError::Determinate(e) => ExternalError::Determinate(Determinate::new(e)),
237 PostgresError::Indeterminate(e) => ExternalError::Indeterminate(Indeterminate::new(e)),
238 }
239 }
240}
241
242impl From<Indeterminate> for ExternalError {
243 fn from(x: Indeterminate) -> Self {
244 ExternalError::Indeterminate(x)
245 }
246}
247
248impl From<Determinate> for ExternalError {
249 fn from(x: Determinate) -> Self {
250 ExternalError::Determinate(x)
251 }
252}
253
254impl From<anyhow::Error> for ExternalError {
255 fn from(inner: anyhow::Error) -> Self {
256 ExternalError::Indeterminate(Indeterminate { inner })
257 }
258}
259
260impl From<Error> for ExternalError {
261 fn from(x: Error) -> Self {
262 ExternalError::Indeterminate(Indeterminate {
263 inner: anyhow::Error::new(x),
264 })
265 }
266}
267
268impl From<std::io::Error> for ExternalError {
269 fn from(x: std::io::Error) -> Self {
270 ExternalError::Indeterminate(Indeterminate {
271 inner: anyhow::Error::new(x),
272 })
273 }
274}
275
276impl From<deadpool_postgres::tokio_postgres::Error> for ExternalError {
277 fn from(e: deadpool_postgres::tokio_postgres::Error) -> Self {
278 let code = match e.as_db_error().map(|x| x.code()) {
279 Some(x) => x,
280 None => {
281 return ExternalError::Indeterminate(Indeterminate {
282 inner: anyhow::Error::new(e),
283 });
284 }
285 };
286 match code {
287 &deadpool_postgres::tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE => {
290 ExternalError::Determinate(Determinate {
291 inner: anyhow::Error::new(e),
292 })
293 }
294 _ => ExternalError::Indeterminate(Indeterminate {
295 inner: anyhow::Error::new(e),
296 }),
297 }
298 }
299}
300
301impl From<azure_core::Error> for ExternalError {
302 fn from(value: azure_core::Error) -> Self {
303 let definitely_determinate = if let Some(http) = value.as_http_error() {
304 match http.status() {
305 StatusCode::TooManyRequests => true,
308 _ => false,
309 }
310 } else {
311 false
312 };
313 if definitely_determinate {
314 ExternalError::Determinate(Determinate {
315 inner: anyhow!(value),
316 })
317 } else {
318 ExternalError::Indeterminate(Indeterminate {
319 inner: anyhow!(value),
320 })
321 }
322 }
323}
324
325impl From<deadpool_postgres::PoolError> for ExternalError {
326 fn from(x: deadpool_postgres::PoolError) -> Self {
327 match x {
328 deadpool_postgres::PoolError::Backend(x) => ExternalError::from(x),
331 x => ExternalError::Indeterminate(Indeterminate {
332 inner: anyhow::Error::new(x),
333 }),
334 }
335 }
336}
337
338impl From<tokio::task::JoinError> for ExternalError {
339 fn from(x: tokio::task::JoinError) -> Self {
340 ExternalError::Indeterminate(Indeterminate {
341 inner: anyhow::Error::new(x),
342 })
343 }
344}
345
346#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
349pub struct VersionedData {
350 pub seqno: SeqNo,
352 pub data: Bytes,
354}
355
356#[allow(clippy::as_conversions)]
360pub const SCAN_ALL: usize = u64_to_usize(i64::MAX as u64);
361
362pub const CONSENSUS_HEAD_LIVENESS_KEY: &str = "LIVENESS";
364
365#[derive(Debug, PartialEq, Serialize, Deserialize)]
367pub enum CaSResult {
368 Committed,
370 ExpectationMismatch,
372}
373
374#[derive(Debug)]
378pub struct Tasked<A>(pub Arc<A>);
379
380impl<A> Tasked<A> {
381 fn clone_backing(&self) -> Arc<A> {
382 Arc::clone(&self.0)
383 }
384}
385
386pub type ResultStream<'a, T> = Pin<Box<dyn Stream<Item = Result<T, ExternalError>> + Send + 'a>>;
389
390#[async_trait]
399pub trait Consensus: std::fmt::Debug + Send + Sync {
400 fn list_keys(&self) -> ResultStream<'_, String>;
402
403 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError>;
406
407 async fn compare_and_set(
418 &self,
419 key: &str,
420 expected: Option<SeqNo>,
421 new: VersionedData,
422 ) -> Result<CaSResult, ExternalError>;
423
424 async fn scan(
430 &self,
431 key: &str,
432 from: SeqNo,
433 limit: usize,
434 ) -> Result<Vec<VersionedData>, ExternalError>;
435
436 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError>;
443}
444
445#[async_trait]
446impl<A: Consensus + 'static> Consensus for Tasked<A> {
447 fn list_keys(&self) -> ResultStream<'_, String> {
448 self.0.list_keys()
454 }
455
456 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
457 let backing = self.clone_backing();
458 let key = key.to_owned();
459 mz_ore::task::spawn(
460 || "persist::task::head",
461 async move { backing.head(&key).await }.instrument(Span::current()),
462 )
463 .await
464 }
465
466 async fn compare_and_set(
467 &self,
468 key: &str,
469 expected: Option<SeqNo>,
470 new: VersionedData,
471 ) -> Result<CaSResult, ExternalError> {
472 let backing = self.clone_backing();
473 let key = key.to_owned();
474 mz_ore::task::spawn(
475 || "persist::task::cas",
476 async move { backing.compare_and_set(&key, expected, new).await }
477 .instrument(Span::current()),
478 )
479 .await
480 }
481
482 async fn scan(
483 &self,
484 key: &str,
485 from: SeqNo,
486 limit: usize,
487 ) -> Result<Vec<VersionedData>, ExternalError> {
488 let backing = self.clone_backing();
489 let key = key.to_owned();
490 mz_ore::task::spawn(
491 || "persist::task::scan",
492 async move { backing.scan(&key, from, limit).await }.instrument(Span::current()),
493 )
494 .await
495 }
496
497 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
498 let backing = self.clone_backing();
499 let key = key.to_owned();
500 mz_ore::task::spawn(
501 || "persist::task::truncate",
502 async move { backing.truncate(&key, seqno).await }.instrument(Span::current()),
503 )
504 .await
505 }
506}
507
508#[derive(Debug)]
510pub struct BlobMetadata<'a> {
511 pub key: &'a str,
513 pub size_in_bytes: u64,
515}
516
517pub const BLOB_GET_LIVENESS_KEY: &str = "LIVENESS";
519
520#[async_trait]
532pub trait Blob: std::fmt::Debug + Send + Sync {
533 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError>;
535
536 async fn list_keys_and_metadata(
541 &self,
542 key_prefix: &str,
543 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
544 ) -> Result<(), ExternalError>;
545
546 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError>;
551
552 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError>;
557
558 async fn restore(&self, key: &str) -> Result<(), ExternalError>;
568}
569
570#[async_trait]
571impl<A: Blob + 'static> Blob for Tasked<A> {
572 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
573 let backing = self.clone_backing();
574 let key = key.to_owned();
575 mz_ore::task::spawn(
576 || "persist::task::get",
577 async move { backing.get(&key).await }.instrument(Span::current()),
578 )
579 .await
580 }
581
582 async fn list_keys_and_metadata(
587 &self,
588 key_prefix: &str,
589 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
590 ) -> Result<(), ExternalError> {
591 self.0.list_keys_and_metadata(key_prefix, f).await
594 }
595
596 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
598 let backing = self.clone_backing();
599 let key = key.to_owned();
600 mz_ore::task::spawn(
601 || "persist::task::set",
602 async move { backing.set(&key, value).await }.instrument(Span::current()),
603 )
604 .await
605 }
606
607 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
612 let backing = self.clone_backing();
613 let key = key.to_owned();
614 mz_ore::task::spawn(
615 || "persist::task::delete",
616 async move { backing.delete(&key).await }.instrument(Span::current()),
617 )
618 .await
619 }
620
621 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
622 let backing = self.clone_backing();
623 let key = key.to_owned();
624 mz_ore::task::spawn(
625 || "persist::task::restore",
626 async move { backing.restore(&key).await }.instrument(Span::current()),
627 )
628 .await
629 }
630}
631
632#[cfg(test)]
634pub mod tests {
635 use std::future::Future;
636
637 use anyhow::anyhow;
638 use futures_util::TryStreamExt;
639 use mz_ore::{assert_err, assert_ok};
640 use uuid::Uuid;
641
642 use crate::location::Blob;
643
644 use super::*;
645
646 fn keys(baseline: &[String], new: &[&str]) -> Vec<String> {
647 let mut ret = baseline.to_vec();
648 ret.extend(new.iter().map(|x| x.to_string()));
649 ret.sort();
650 ret
651 }
652
653 async fn get_keys(b: &impl Blob) -> Result<Vec<String>, ExternalError> {
654 let mut keys = vec![];
655 b.list_keys_and_metadata("", &mut |entry| keys.push(entry.key.to_string()))
656 .await?;
657 Ok(keys)
658 }
659
660 async fn get_keys_with_prefix(
661 b: &impl Blob,
662 prefix: &str,
663 ) -> Result<Vec<String>, ExternalError> {
664 let mut keys = vec![];
665 b.list_keys_and_metadata(prefix, &mut |entry| keys.push(entry.key.to_string()))
666 .await?;
667 Ok(keys)
668 }
669
670 pub async fn blob_impl_test<
672 B: Blob,
673 F: Future<Output = Result<B, ExternalError>>,
674 NewFn: Fn(&'static str) -> F,
675 >(
676 new_fn: NewFn,
677 ) -> Result<(), ExternalError> {
678 let values = ["v0".as_bytes().to_vec(), "v1".as_bytes().to_vec()];
679
680 let blob0 = new_fn("path0").await?;
681
682 let _ = new_fn("path1").await?;
684
685 let blob1 = new_fn("path0").await?;
687
688 let k0 = "foo/bar/k0";
689
690 assert_eq!(blob0.get(k0).await?, None);
692 assert_eq!(blob1.get(k0).await?, None);
693
694 let empty_keys = get_keys(&blob0).await?;
696 assert_eq!(empty_keys, Vec::<String>::new());
697 let empty_keys = get_keys(&blob1).await?;
698 assert_eq!(empty_keys, Vec::<String>::new());
699
700 blob0.set(k0, values[0].clone().into()).await?;
702 assert_eq!(
703 blob0.get(k0).await?.map(|s| s.into_contiguous()),
704 Some(values[0].clone())
705 );
706 assert_eq!(
707 blob1.get(k0).await?.map(|s| s.into_contiguous()),
708 Some(values[0].clone())
709 );
710
711 blob0.set("k0a", values[0].clone().into()).await?;
713 assert_eq!(
714 blob0.get("k0a").await?.map(|s| s.into_contiguous()),
715 Some(values[0].clone())
716 );
717 assert_eq!(
718 blob1.get("k0a").await?.map(|s| s.into_contiguous()),
719 Some(values[0].clone())
720 );
721
722 let mut blob_keys = get_keys(&blob0).await?;
724 blob_keys.sort();
725 assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
726 let mut blob_keys = get_keys(&blob1).await?;
727 blob_keys.sort();
728 assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
729
730 blob0.set(k0, values[1].clone().into()).await?;
732 assert_eq!(
733 blob0.get(k0).await?.map(|s| s.into_contiguous()),
734 Some(values[1].clone())
735 );
736 assert_eq!(
737 blob1.get(k0).await?.map(|s| s.into_contiguous()),
738 Some(values[1].clone())
739 );
740 blob0.set("k0a", values[1].clone().into()).await?;
742 assert_eq!(
743 blob0.get("k0a").await?.map(|s| s.into_contiguous()),
744 Some(values[1].clone())
745 );
746 assert_eq!(
747 blob1.get("k0a").await?.map(|s| s.into_contiguous()),
748 Some(values[1].clone())
749 );
750
751 assert_eq!(blob0.delete(k0).await, Ok(Some(2)));
753 assert_eq!(blob0.get(k0).await?, None);
755 assert_eq!(blob1.get(k0).await?, None);
756 assert_eq!(blob0.delete(k0).await, Ok(None));
758 assert_eq!(blob0.delete("nope").await, Ok(None));
760 blob0.set("empty", Bytes::new()).await?;
763 assert_eq!(blob0.delete("empty").await, Ok(Some(0)));
764
765 blob0.set("undelete", Bytes::from("data")).await?;
768 blob0.restore("undelete").await?;
770 assert_eq!(blob0.delete("undelete").await?, Some("data".len()));
771 let expected = match blob0.restore("undelete").await {
772 Ok(()) => Some(Bytes::from("data").into()),
773 Err(ExternalError::Determinate(_)) => None,
774 Err(other) => return Err(other),
775 };
776 assert_eq!(blob0.get("undelete").await?, expected);
777 blob0.delete("undelete").await?;
778
779 blob0.delete("k0a").await?;
781 let mut blob_keys = get_keys(&blob0).await?;
782 blob_keys.sort();
783 assert_eq!(blob_keys, empty_keys);
784 let mut blob_keys = get_keys(&blob1).await?;
785 blob_keys.sort();
786 assert_eq!(blob_keys, empty_keys);
787 blob0.set(k0, values[1].clone().into()).await?;
789 assert_eq!(
790 blob1.get(k0).await?.map(|s| s.into_contiguous()),
791 Some(values[1].clone())
792 );
793 assert_eq!(
794 blob0.get(k0).await?.map(|s| s.into_contiguous()),
795 Some(values[1].clone())
796 );
797
798 let mut expected_keys = empty_keys;
801 for i in 1..=5 {
802 let key = format!("k{}", i);
803 blob0.set(&key, values[0].clone().into()).await?;
804 expected_keys.push(key);
805 }
806
807 let mut blob_keys = get_keys(&blob0).await?;
809 blob_keys.sort();
810 assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
811 let mut blob_keys = get_keys(&blob1).await?;
812 blob_keys.sort();
813 assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
814
815 let mut expected_prefix_keys = vec![];
818 for i in 1..=3 {
819 let key = format!("k-prefix-{}", i);
820 blob0.set(&key, values[0].clone().into()).await?;
821 expected_prefix_keys.push(key);
822 }
823 let mut blob_keys = get_keys_with_prefix(&blob0, "k-prefix").await?;
824 blob_keys.sort();
825 assert_eq!(blob_keys, expected_prefix_keys);
826 let mut blob_keys = get_keys_with_prefix(&blob0, "k").await?;
827 blob_keys.sort();
828 expected_keys.extend(expected_prefix_keys);
829 expected_keys.sort();
830 assert_eq!(blob_keys, expected_keys);
831
832 let blob3 = new_fn("path0").await?;
834 assert_eq!(
835 blob3.get(k0).await?.map(|s| s.into_contiguous()),
836 Some(values[1].clone())
837 );
838
839 Ok(())
840 }
841
842 pub async fn consensus_impl_test<
844 C: Consensus,
845 F: Future<Output = Result<C, ExternalError>>,
846 NewFn: FnMut() -> F,
847 >(
848 mut new_fn: NewFn,
849 ) -> Result<(), ExternalError> {
850 let consensus = new_fn().await?;
851
852 let key = Uuid::new_v4().to_string();
855
856 assert_eq!(consensus.head(&key).await, Ok(None));
858
859 assert_eq!(consensus.scan(&key, SeqNo(0), SCAN_ALL).await, Ok(vec![]));
861
862 assert_err!(consensus.truncate(&key, SeqNo(0)).await);
864
865 let state = VersionedData {
866 seqno: SeqNo(5),
867 data: Bytes::from("abc"),
868 };
869
870 assert_eq!(
872 consensus
873 .compare_and_set(&key, Some(SeqNo(0)), state.clone())
874 .await,
875 Ok(CaSResult::ExpectationMismatch),
876 );
877
878 assert_eq!(
880 consensus.compare_and_set(&key, None, state.clone()).await,
881 Ok(CaSResult::Committed),
882 );
883
884 let keys: Vec<_> = consensus.list_keys().try_collect().await?;
886 assert_eq!(keys, vec![key.to_owned()]);
887
888 assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
890
891 assert_eq!(
893 consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
894 Ok(vec![state.clone()])
895 );
896
897 assert_eq!(
899 consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
900 Ok(vec![state.clone()])
901 );
902
903 assert_eq!(consensus.scan(&key, SeqNo(6), SCAN_ALL).await, Ok(vec![]));
906
907 assert_ok!(consensus.truncate(&key, SeqNo(0)).await);
910 assert_ok!(consensus.truncate(&key, SeqNo(5)).await);
911
912 assert_err!(consensus.truncate(&key, SeqNo(6)).await);
914
915 let new_state = VersionedData {
916 seqno: SeqNo(10),
917 data: Bytes::from("def"),
918 };
919
920 assert_eq!(
922 consensus
923 .compare_and_set(&key, Some(SeqNo(7)), new_state.clone())
924 .await,
925 Ok(CaSResult::ExpectationMismatch),
926 );
927
928 assert_eq!(
930 consensus
931 .compare_and_set(&key, Some(SeqNo(3)), new_state.clone())
932 .await,
933 Ok(CaSResult::ExpectationMismatch),
934 );
935
936 let invalid_constant_seqno = VersionedData {
937 seqno: SeqNo(5),
938 data: Bytes::from("invalid"),
939 };
940
941 assert_eq!(
944 consensus
945 .compare_and_set(&key, Some(state.seqno), invalid_constant_seqno)
946 .await,
947 Err(ExternalError::from(anyhow!(
948 "new seqno must be strictly greater than expected. Got new: SeqNo(5) expected: SeqNo(5)"
949 )))
950 );
951
952 let invalid_regressing_seqno = VersionedData {
953 seqno: SeqNo(3),
954 data: Bytes::from("invalid"),
955 };
956
957 assert_eq!(
960 consensus
961 .compare_and_set(&key, Some(state.seqno), invalid_regressing_seqno)
962 .await,
963 Err(ExternalError::from(anyhow!(
964 "new seqno must be strictly greater than expected. Got new: SeqNo(3) expected: SeqNo(5)"
965 )))
966 );
967
968 assert_eq!(
970 consensus
971 .compare_and_set(&key, Some(state.seqno), new_state.clone())
972 .await,
973 Ok(CaSResult::Committed),
974 );
975
976 assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
978
979 assert_eq!(
982 consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
983 Ok(vec![state.clone(), new_state.clone()])
984 );
985
986 assert_eq!(
989 consensus.scan(&key, SeqNo(6), SCAN_ALL).await,
990 Ok(vec![new_state.clone()])
991 );
992
993 assert_eq!(
996 consensus.scan(&key, SeqNo(10), SCAN_ALL).await,
997 Ok(vec![new_state.clone()])
998 );
999
1000 assert_eq!(consensus.scan(&key, SeqNo(11), SCAN_ALL).await, Ok(vec![]));
1002
1003 assert_eq!(
1005 consensus.scan(&key, SeqNo::minimum(), 1).await,
1006 Ok(vec![state.clone()])
1007 );
1008 assert_eq!(
1009 consensus.scan(&key, SeqNo(5), 1).await,
1010 Ok(vec![state.clone()])
1011 );
1012
1013 assert_eq!(
1015 consensus.scan(&key, SeqNo::minimum(), 2).await,
1016 Ok(vec![state.clone(), new_state.clone()])
1017 );
1018
1019 assert_eq!(
1021 consensus.scan(&key, SeqNo(4), 100).await,
1022 Ok(vec![state.clone(), new_state.clone()])
1023 );
1024
1025 assert_ok!(consensus.truncate(&key, SeqNo(6)).await);
1027
1028 assert_eq!(
1030 consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
1031 Ok(vec![new_state.clone()])
1032 );
1033
1034 assert_ok!(consensus.truncate(&key, SeqNo(6)).await);
1037
1038 let other_key = Uuid::new_v4().to_string();
1040
1041 assert_eq!(consensus.head(&other_key).await, Ok(None));
1042
1043 let state = VersionedData {
1044 seqno: SeqNo(1),
1045 data: Bytes::from("einszweidrei"),
1046 };
1047
1048 assert_eq!(
1049 consensus
1050 .compare_and_set(&other_key, None, state.clone())
1051 .await,
1052 Ok(CaSResult::Committed),
1053 );
1054
1055 assert_eq!(consensus.head(&other_key).await, Ok(Some(state.clone())));
1056
1057 assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
1059
1060 let invalid_jump_forward = VersionedData {
1062 seqno: SeqNo(11),
1063 data: Bytes::from("invalid"),
1064 };
1065 assert_eq!(
1066 consensus
1067 .compare_and_set(&key, Some(state.seqno), invalid_jump_forward)
1068 .await,
1069 Ok(CaSResult::ExpectationMismatch),
1070 );
1071
1072 let large_state = VersionedData {
1074 seqno: SeqNo(11),
1075 data: std::iter::repeat(b'a').take(10240).collect(),
1076 };
1077 assert_eq!(
1078 consensus
1079 .compare_and_set(&key, Some(new_state.seqno), large_state)
1080 .await,
1081 Ok(CaSResult::Committed),
1082 );
1083
1084 let v12 = VersionedData {
1086 seqno: SeqNo(12),
1087 data: Bytes::new(),
1088 };
1089 assert_eq!(
1090 consensus.compare_and_set(&key, Some(SeqNo(11)), v12).await,
1091 Ok(CaSResult::Committed),
1092 );
1093 assert_ok!(consensus.truncate(&key, SeqNo(12)).await);
1094
1095 assert_eq!(
1098 consensus
1099 .compare_and_set(
1100 &Uuid::new_v4().to_string(),
1101 None,
1102 VersionedData {
1103 seqno: SeqNo(0),
1104 data: Bytes::new(),
1105 }
1106 )
1107 .await,
1108 Ok(CaSResult::Committed),
1109 );
1110 assert_eq!(
1111 consensus
1112 .compare_and_set(
1113 &Uuid::new_v4().to_string(),
1114 None,
1115 VersionedData {
1116 seqno: SeqNo(i64::MAX.try_into().expect("i64::MAX fits in u64")),
1117 data: Bytes::new(),
1118 }
1119 )
1120 .await,
1121 Ok(CaSResult::Committed),
1122 );
1123 assert_err!(
1124 consensus
1125 .compare_and_set(
1126 &Uuid::new_v4().to_string(),
1127 None,
1128 VersionedData {
1129 seqno: SeqNo(1 << 63),
1130 data: Bytes::new(),
1131 }
1132 )
1133 .await
1134 );
1135 assert_err!(
1136 consensus
1137 .compare_and_set(
1138 &Uuid::new_v4().to_string(),
1139 None,
1140 VersionedData {
1141 seqno: SeqNo(u64::MAX),
1142 data: Bytes::new(),
1143 }
1144 )
1145 .await
1146 );
1147
1148 Ok(())
1149 }
1150
1151 #[mz_ore::test]
1152 fn timeout_error() {
1153 assert!(ExternalError::new_timeout(Instant::now()).is_timeout());
1154 assert!(!ExternalError::from(anyhow!("foo")).is_timeout());
1155 }
1156}