1use std::fmt;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use bytes::Bytes;
20use futures_util::Stream;
21use mz_ore::bytes::SegmentedBytes;
22use mz_ore::cast::u64_to_usize;
23use mz_postgres_client::error::PostgresError;
24use mz_proto::RustType;
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27use tracing::{Instrument, Span};
28
29use crate::error::Error;
30
31#[derive(
45 Arbitrary, Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize,
46)]
47pub struct SeqNo(pub u64);
48
49impl std::fmt::Display for SeqNo {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 write!(f, "v{}", self.0)
52 }
53}
54
55impl timely::PartialOrder for SeqNo {
56 fn less_equal(&self, other: &Self) -> bool {
57 self <= other
58 }
59}
60
61impl std::str::FromStr for SeqNo {
62 type Err = String;
63
64 fn from_str(encoded: &str) -> Result<Self, Self::Err> {
65 let encoded = match encoded.strip_prefix('v') {
66 Some(x) => x,
67 None => return Err(format!("invalid SeqNo {}: incorrect prefix", encoded)),
68 };
69 let seqno =
70 u64::from_str(encoded).map_err(|err| format!("invalid SeqNo {}: {}", encoded, err))?;
71 Ok(SeqNo(seqno))
72 }
73}
74
75impl SeqNo {
76 pub fn next(self) -> SeqNo {
78 SeqNo(self.0 + 1)
79 }
80
81 pub fn minimum() -> Self {
83 SeqNo(0)
84 }
85}
86
87impl RustType<u64> for SeqNo {
88 fn into_proto(&self) -> u64 {
89 self.0
90 }
91
92 fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
93 Ok(SeqNo(proto))
94 }
95}
96
97#[derive(Debug)]
100pub struct Determinate {
101 inner: anyhow::Error,
102}
103
104impl std::fmt::Display for Determinate {
105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106 write!(f, "determinate: ")?;
107 self.inner.fmt(f)
108 }
109}
110
111impl std::error::Error for Determinate {
112 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
113 self.inner.source()
114 }
115}
116
117impl Determinate {
118 pub fn new(inner: anyhow::Error) -> Self {
122 Determinate { inner }
123 }
124}
125
126#[derive(Debug)]
129pub struct Indeterminate {
130 pub(crate) inner: anyhow::Error,
131}
132
133impl Indeterminate {
134 pub fn new(inner: anyhow::Error) -> Self {
138 Indeterminate { inner }
139 }
140}
141
142impl std::fmt::Display for Indeterminate {
143 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144 write!(f, "indeterminate: ")?;
145 self.inner.fmt(f)
146 }
147}
148
149impl std::error::Error for Indeterminate {
150 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
151 self.inner.source()
152 }
153}
154
155#[cfg(any(test, debug_assertions))]
157impl PartialEq for Indeterminate {
158 fn eq(&self, other: &Self) -> bool {
159 self.to_string() == other.to_string()
160 }
161}
162
163#[derive(Debug)]
166pub enum ExternalError {
167 Determinate(Determinate),
169 Indeterminate(Indeterminate),
171}
172
173impl ExternalError {
174 #[track_caller]
179 pub fn new_timeout(deadline: Instant) -> Self {
180 ExternalError::Indeterminate(Indeterminate {
181 inner: anyhow!("timeout at {:?}", deadline),
182 })
183 }
184
185 pub fn is_timeout(&self) -> bool {
190 self.to_string().contains("timeout")
192 }
193}
194
195impl std::fmt::Display for ExternalError {
196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197 match self {
198 ExternalError::Determinate(x) => std::fmt::Display::fmt(x, f),
199 ExternalError::Indeterminate(x) => std::fmt::Display::fmt(x, f),
200 }
201 }
202}
203
204impl std::error::Error for ExternalError {
205 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
206 match self {
207 ExternalError::Determinate(e) => e.source(),
208 ExternalError::Indeterminate(e) => e.source(),
209 }
210 }
211}
212
213#[cfg(any(test, debug_assertions))]
215impl PartialEq for ExternalError {
216 fn eq(&self, other: &Self) -> bool {
217 self.to_string() == other.to_string()
218 }
219}
220
221impl From<PostgresError> for ExternalError {
222 fn from(x: PostgresError) -> Self {
223 match x {
224 PostgresError::Determinate(e) => ExternalError::Determinate(Determinate::new(e)),
225 PostgresError::Indeterminate(e) => ExternalError::Indeterminate(Indeterminate::new(e)),
226 }
227 }
228}
229
230impl From<Indeterminate> for ExternalError {
231 fn from(x: Indeterminate) -> Self {
232 ExternalError::Indeterminate(x)
233 }
234}
235
236impl From<Determinate> for ExternalError {
237 fn from(x: Determinate) -> Self {
238 ExternalError::Determinate(x)
239 }
240}
241
242impl From<anyhow::Error> for ExternalError {
243 fn from(inner: anyhow::Error) -> Self {
244 ExternalError::Indeterminate(Indeterminate { inner })
245 }
246}
247
248impl From<Error> for ExternalError {
249 fn from(x: Error) -> Self {
250 ExternalError::Indeterminate(Indeterminate {
251 inner: anyhow::Error::new(x),
252 })
253 }
254}
255
256impl From<std::io::Error> for ExternalError {
257 fn from(x: std::io::Error) -> Self {
258 ExternalError::Indeterminate(Indeterminate {
259 inner: anyhow::Error::new(x),
260 })
261 }
262}
263
264impl From<deadpool_postgres::tokio_postgres::Error> for ExternalError {
265 fn from(e: deadpool_postgres::tokio_postgres::Error) -> Self {
266 let code = match e.as_db_error().map(|x| x.code()) {
267 Some(x) => x,
268 None => {
269 return ExternalError::Indeterminate(Indeterminate {
270 inner: anyhow::Error::new(e),
271 });
272 }
273 };
274 match code {
275 &deadpool_postgres::tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE => {
278 ExternalError::Determinate(Determinate {
279 inner: anyhow::Error::new(e),
280 })
281 }
282 _ => ExternalError::Indeterminate(Indeterminate {
283 inner: anyhow::Error::new(e),
284 }),
285 }
286 }
287}
288
289impl From<azure_core::Error> for ExternalError {
290 fn from(value: azure_core::Error) -> Self {
291 ExternalError::Indeterminate(Indeterminate {
292 inner: anyhow!(value),
293 })
294 }
295}
296
297impl From<deadpool_postgres::PoolError> for ExternalError {
298 fn from(x: deadpool_postgres::PoolError) -> Self {
299 match x {
300 deadpool_postgres::PoolError::Backend(x) => ExternalError::from(x),
303 x => ExternalError::Indeterminate(Indeterminate {
304 inner: anyhow::Error::new(x),
305 }),
306 }
307 }
308}
309
310impl From<tokio::task::JoinError> for ExternalError {
311 fn from(x: tokio::task::JoinError) -> Self {
312 ExternalError::Indeterminate(Indeterminate {
313 inner: anyhow::Error::new(x),
314 })
315 }
316}
317
318#[derive(Debug, Clone, PartialEq)]
321pub struct VersionedData {
322 pub seqno: SeqNo,
324 pub data: Bytes,
326}
327
328#[allow(clippy::as_conversions)]
332pub const SCAN_ALL: usize = u64_to_usize(i64::MAX as u64);
333
334pub const CONSENSUS_HEAD_LIVENESS_KEY: &str = "LIVENESS";
336
337#[derive(Debug, PartialEq)]
339pub enum CaSResult {
340 Committed,
342 ExpectationMismatch,
344}
345
346#[derive(Debug)]
350pub struct Tasked<A>(pub Arc<A>);
351
352impl<A> Tasked<A> {
353 fn clone_backing(&self) -> Arc<A> {
354 Arc::clone(&self.0)
355 }
356}
357
358pub type ResultStream<'a, T> = Pin<Box<dyn Stream<Item = Result<T, ExternalError>> + Send + 'a>>;
361
362#[async_trait]
371pub trait Consensus: std::fmt::Debug + Send + Sync {
372 fn list_keys(&self) -> ResultStream<String>;
374
375 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError>;
378
379 async fn compare_and_set(
390 &self,
391 key: &str,
392 expected: Option<SeqNo>,
393 new: VersionedData,
394 ) -> Result<CaSResult, ExternalError>;
395
396 async fn scan(
402 &self,
403 key: &str,
404 from: SeqNo,
405 limit: usize,
406 ) -> Result<Vec<VersionedData>, ExternalError>;
407
408 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError>;
415}
416
417#[async_trait]
418impl<A: Consensus + 'static> Consensus for Tasked<A> {
419 fn list_keys(&self) -> ResultStream<String> {
420 self.0.list_keys()
426 }
427
428 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
429 let backing = self.clone_backing();
430 let key = key.to_owned();
431 mz_ore::task::spawn(
432 || "persist::task::head",
433 async move { backing.head(&key).await }.instrument(Span::current()),
434 )
435 .await?
436 }
437
438 async fn compare_and_set(
439 &self,
440 key: &str,
441 expected: Option<SeqNo>,
442 new: VersionedData,
443 ) -> Result<CaSResult, ExternalError> {
444 let backing = self.clone_backing();
445 let key = key.to_owned();
446 mz_ore::task::spawn(
447 || "persist::task::cas",
448 async move { backing.compare_and_set(&key, expected, new).await }
449 .instrument(Span::current()),
450 )
451 .await?
452 }
453
454 async fn scan(
455 &self,
456 key: &str,
457 from: SeqNo,
458 limit: usize,
459 ) -> Result<Vec<VersionedData>, ExternalError> {
460 let backing = self.clone_backing();
461 let key = key.to_owned();
462 mz_ore::task::spawn(
463 || "persist::task::scan",
464 async move { backing.scan(&key, from, limit).await }.instrument(Span::current()),
465 )
466 .await?
467 }
468
469 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
470 let backing = self.clone_backing();
471 let key = key.to_owned();
472 mz_ore::task::spawn(
473 || "persist::task::truncate",
474 async move { backing.truncate(&key, seqno).await }.instrument(Span::current()),
475 )
476 .await?
477 }
478}
479
480#[derive(Debug)]
482pub struct BlobMetadata<'a> {
483 pub key: &'a str,
485 pub size_in_bytes: u64,
487}
488
489pub const BLOB_GET_LIVENESS_KEY: &str = "LIVENESS";
491
492#[async_trait]
504pub trait Blob: std::fmt::Debug + Send + Sync {
505 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError>;
507
508 async fn list_keys_and_metadata(
513 &self,
514 key_prefix: &str,
515 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
516 ) -> Result<(), ExternalError>;
517
518 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError>;
523
524 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError>;
529
530 async fn restore(&self, key: &str) -> Result<(), ExternalError>;
540}
541
542#[async_trait]
543impl<A: Blob + 'static> Blob for Tasked<A> {
544 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
545 let backing = self.clone_backing();
546 let key = key.to_owned();
547 mz_ore::task::spawn(
548 || "persist::task::get",
549 async move { backing.get(&key).await }.instrument(Span::current()),
550 )
551 .await?
552 }
553
554 async fn list_keys_and_metadata(
559 &self,
560 key_prefix: &str,
561 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
562 ) -> Result<(), ExternalError> {
563 self.0.list_keys_and_metadata(key_prefix, f).await
566 }
567
568 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
570 let backing = self.clone_backing();
571 let key = key.to_owned();
572 mz_ore::task::spawn(
573 || "persist::task::set",
574 async move { backing.set(&key, value).await }.instrument(Span::current()),
575 )
576 .await?
577 }
578
579 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
584 let backing = self.clone_backing();
585 let key = key.to_owned();
586 mz_ore::task::spawn(
587 || "persist::task::delete",
588 async move { backing.delete(&key).await }.instrument(Span::current()),
589 )
590 .await?
591 }
592
593 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
594 let backing = self.clone_backing();
595 let key = key.to_owned();
596 mz_ore::task::spawn(
597 || "persist::task::restore",
598 async move { backing.restore(&key).await }.instrument(Span::current()),
599 )
600 .await?
601 }
602}
603
604#[cfg(test)]
606pub mod tests {
607 use std::future::Future;
608
609 use anyhow::anyhow;
610 use futures_util::TryStreamExt;
611 use mz_ore::assert_err;
612 use uuid::Uuid;
613
614 use crate::location::Blob;
615
616 use super::*;
617
618 fn keys(baseline: &[String], new: &[&str]) -> Vec<String> {
619 let mut ret = baseline.to_vec();
620 ret.extend(new.iter().map(|x| x.to_string()));
621 ret.sort();
622 ret
623 }
624
625 async fn get_keys(b: &impl Blob) -> Result<Vec<String>, ExternalError> {
626 let mut keys = vec![];
627 b.list_keys_and_metadata("", &mut |entry| keys.push(entry.key.to_string()))
628 .await?;
629 Ok(keys)
630 }
631
632 async fn get_keys_with_prefix(
633 b: &impl Blob,
634 prefix: &str,
635 ) -> Result<Vec<String>, ExternalError> {
636 let mut keys = vec![];
637 b.list_keys_and_metadata(prefix, &mut |entry| keys.push(entry.key.to_string()))
638 .await?;
639 Ok(keys)
640 }
641
642 pub async fn blob_impl_test<
644 B: Blob,
645 F: Future<Output = Result<B, ExternalError>>,
646 NewFn: Fn(&'static str) -> F,
647 >(
648 new_fn: NewFn,
649 ) -> Result<(), ExternalError> {
650 let values = ["v0".as_bytes().to_vec(), "v1".as_bytes().to_vec()];
651
652 let blob0 = new_fn("path0").await?;
653
654 let _ = new_fn("path1").await?;
656
657 let blob1 = new_fn("path0").await?;
659
660 let k0 = "foo/bar/k0";
661
662 assert_eq!(blob0.get(k0).await?, None);
664 assert_eq!(blob1.get(k0).await?, None);
665
666 let empty_keys = get_keys(&blob0).await?;
668 assert_eq!(empty_keys, Vec::<String>::new());
669 let empty_keys = get_keys(&blob1).await?;
670 assert_eq!(empty_keys, Vec::<String>::new());
671
672 blob0.set(k0, values[0].clone().into()).await?;
674 assert_eq!(
675 blob0.get(k0).await?.map(|s| s.into_contiguous()),
676 Some(values[0].clone())
677 );
678 assert_eq!(
679 blob1.get(k0).await?.map(|s| s.into_contiguous()),
680 Some(values[0].clone())
681 );
682
683 blob0.set("k0a", values[0].clone().into()).await?;
685 assert_eq!(
686 blob0.get("k0a").await?.map(|s| s.into_contiguous()),
687 Some(values[0].clone())
688 );
689 assert_eq!(
690 blob1.get("k0a").await?.map(|s| s.into_contiguous()),
691 Some(values[0].clone())
692 );
693
694 let mut blob_keys = get_keys(&blob0).await?;
696 blob_keys.sort();
697 assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
698 let mut blob_keys = get_keys(&blob1).await?;
699 blob_keys.sort();
700 assert_eq!(blob_keys, keys(&empty_keys, &[k0, "k0a"]));
701
702 blob0.set(k0, values[1].clone().into()).await?;
704 assert_eq!(
705 blob0.get(k0).await?.map(|s| s.into_contiguous()),
706 Some(values[1].clone())
707 );
708 assert_eq!(
709 blob1.get(k0).await?.map(|s| s.into_contiguous()),
710 Some(values[1].clone())
711 );
712 blob0.set("k0a", values[1].clone().into()).await?;
714 assert_eq!(
715 blob0.get("k0a").await?.map(|s| s.into_contiguous()),
716 Some(values[1].clone())
717 );
718 assert_eq!(
719 blob1.get("k0a").await?.map(|s| s.into_contiguous()),
720 Some(values[1].clone())
721 );
722
723 assert_eq!(blob0.delete(k0).await, Ok(Some(2)));
725 assert_eq!(blob0.get(k0).await?, None);
727 assert_eq!(blob1.get(k0).await?, None);
728 assert_eq!(blob0.delete(k0).await, Ok(None));
730 assert_eq!(blob0.delete("nope").await, Ok(None));
732 blob0.set("empty", Bytes::new()).await?;
735 assert_eq!(blob0.delete("empty").await, Ok(Some(0)));
736
737 blob0.set("undelete", Bytes::from("data")).await?;
740 blob0.restore("undelete").await?;
742 assert_eq!(blob0.delete("undelete").await?, Some("data".len()));
743 let expected = match blob0.restore("undelete").await {
744 Ok(()) => Some(Bytes::from("data").into()),
745 Err(ExternalError::Determinate(_)) => None,
746 Err(other) => return Err(other),
747 };
748 assert_eq!(blob0.get("undelete").await?, expected);
749 blob0.delete("undelete").await?;
750
751 blob0.delete("k0a").await?;
753 let mut blob_keys = get_keys(&blob0).await?;
754 blob_keys.sort();
755 assert_eq!(blob_keys, empty_keys);
756 let mut blob_keys = get_keys(&blob1).await?;
757 blob_keys.sort();
758 assert_eq!(blob_keys, empty_keys);
759 blob0.set(k0, values[1].clone().into()).await?;
761 assert_eq!(
762 blob1.get(k0).await?.map(|s| s.into_contiguous()),
763 Some(values[1].clone())
764 );
765 assert_eq!(
766 blob0.get(k0).await?.map(|s| s.into_contiguous()),
767 Some(values[1].clone())
768 );
769
770 let mut expected_keys = empty_keys;
773 for i in 1..=5 {
774 let key = format!("k{}", i);
775 blob0.set(&key, values[0].clone().into()).await?;
776 expected_keys.push(key);
777 }
778
779 let mut blob_keys = get_keys(&blob0).await?;
781 blob_keys.sort();
782 assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
783 let mut blob_keys = get_keys(&blob1).await?;
784 blob_keys.sort();
785 assert_eq!(blob_keys, keys(&expected_keys, &[k0]));
786
787 let mut expected_prefix_keys = vec![];
790 for i in 1..=3 {
791 let key = format!("k-prefix-{}", i);
792 blob0.set(&key, values[0].clone().into()).await?;
793 expected_prefix_keys.push(key);
794 }
795 let mut blob_keys = get_keys_with_prefix(&blob0, "k-prefix").await?;
796 blob_keys.sort();
797 assert_eq!(blob_keys, expected_prefix_keys);
798 let mut blob_keys = get_keys_with_prefix(&blob0, "k").await?;
799 blob_keys.sort();
800 expected_keys.extend(expected_prefix_keys);
801 expected_keys.sort();
802 assert_eq!(blob_keys, expected_keys);
803
804 let blob3 = new_fn("path0").await?;
806 assert_eq!(
807 blob3.get(k0).await?.map(|s| s.into_contiguous()),
808 Some(values[1].clone())
809 );
810
811 Ok(())
812 }
813
814 pub async fn consensus_impl_test<
816 C: Consensus,
817 F: Future<Output = Result<C, ExternalError>>,
818 NewFn: FnMut() -> F,
819 >(
820 mut new_fn: NewFn,
821 ) -> Result<(), ExternalError> {
822 let consensus = new_fn().await?;
823
824 let key = Uuid::new_v4().to_string();
827
828 assert_eq!(consensus.head(&key).await, Ok(None));
830
831 assert_eq!(consensus.scan(&key, SeqNo(0), SCAN_ALL).await, Ok(vec![]));
833
834 assert_err!(consensus.truncate(&key, SeqNo(0)).await);
836
837 let state = VersionedData {
838 seqno: SeqNo(5),
839 data: Bytes::from("abc"),
840 };
841
842 assert_eq!(
844 consensus
845 .compare_and_set(&key, Some(SeqNo(0)), state.clone())
846 .await,
847 Ok(CaSResult::ExpectationMismatch),
848 );
849
850 assert_eq!(
852 consensus.compare_and_set(&key, None, state.clone()).await,
853 Ok(CaSResult::Committed),
854 );
855
856 let keys: Vec<_> = consensus.list_keys().try_collect().await?;
858 assert_eq!(keys, vec![key.to_owned()]);
859
860 assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
862
863 assert_eq!(
865 consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
866 Ok(vec![state.clone()])
867 );
868
869 assert_eq!(
871 consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
872 Ok(vec![state.clone()])
873 );
874
875 assert_eq!(consensus.scan(&key, SeqNo(6), SCAN_ALL).await, Ok(vec![]));
878
879 assert_eq!(consensus.truncate(&key, SeqNo(0)).await, Ok(0));
882 assert_eq!(consensus.truncate(&key, SeqNo(5)).await, Ok(0));
883
884 assert_err!(consensus.truncate(&key, SeqNo(6)).await);
886
887 let new_state = VersionedData {
888 seqno: SeqNo(10),
889 data: Bytes::from("def"),
890 };
891
892 assert_eq!(
894 consensus
895 .compare_and_set(&key, Some(SeqNo(7)), new_state.clone())
896 .await,
897 Ok(CaSResult::ExpectationMismatch),
898 );
899
900 assert_eq!(
902 consensus
903 .compare_and_set(&key, Some(SeqNo(3)), new_state.clone())
904 .await,
905 Ok(CaSResult::ExpectationMismatch),
906 );
907
908 let invalid_constant_seqno = VersionedData {
909 seqno: SeqNo(5),
910 data: Bytes::from("invalid"),
911 };
912
913 assert_eq!(
916 consensus
917 .compare_and_set(&key, Some(state.seqno), invalid_constant_seqno)
918 .await,
919 Err(ExternalError::from(anyhow!(
920 "new seqno must be strictly greater than expected. Got new: SeqNo(5) expected: SeqNo(5)"
921 )))
922 );
923
924 let invalid_regressing_seqno = VersionedData {
925 seqno: SeqNo(3),
926 data: Bytes::from("invalid"),
927 };
928
929 assert_eq!(
932 consensus
933 .compare_and_set(&key, Some(state.seqno), invalid_regressing_seqno)
934 .await,
935 Err(ExternalError::from(anyhow!(
936 "new seqno must be strictly greater than expected. Got new: SeqNo(3) expected: SeqNo(5)"
937 )))
938 );
939
940 assert_eq!(
942 consensus
943 .compare_and_set(&key, Some(state.seqno), new_state.clone())
944 .await,
945 Ok(CaSResult::Committed),
946 );
947
948 assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
950
951 assert_eq!(
954 consensus.scan(&key, SeqNo(5), SCAN_ALL).await,
955 Ok(vec![state.clone(), new_state.clone()])
956 );
957
958 assert_eq!(
961 consensus.scan(&key, SeqNo(6), SCAN_ALL).await,
962 Ok(vec![new_state.clone()])
963 );
964
965 assert_eq!(
968 consensus.scan(&key, SeqNo(10), SCAN_ALL).await,
969 Ok(vec![new_state.clone()])
970 );
971
972 assert_eq!(consensus.scan(&key, SeqNo(11), SCAN_ALL).await, Ok(vec![]));
974
975 assert_eq!(
977 consensus.scan(&key, SeqNo::minimum(), 1).await,
978 Ok(vec![state.clone()])
979 );
980 assert_eq!(
981 consensus.scan(&key, SeqNo(5), 1).await,
982 Ok(vec![state.clone()])
983 );
984
985 assert_eq!(
987 consensus.scan(&key, SeqNo::minimum(), 2).await,
988 Ok(vec![state.clone(), new_state.clone()])
989 );
990
991 assert_eq!(
993 consensus.scan(&key, SeqNo(4), 100).await,
994 Ok(vec![state.clone(), new_state.clone()])
995 );
996
997 assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(1));
999
1000 assert_eq!(
1002 consensus.scan(&key, SeqNo(0), SCAN_ALL).await,
1003 Ok(vec![new_state.clone()])
1004 );
1005
1006 assert_eq!(consensus.truncate(&key, SeqNo(6)).await, Ok(0));
1009
1010 let other_key = Uuid::new_v4().to_string();
1012
1013 assert_eq!(consensus.head(&other_key).await, Ok(None));
1014
1015 let state = VersionedData {
1016 seqno: SeqNo(1),
1017 data: Bytes::from("einszweidrei"),
1018 };
1019
1020 assert_eq!(
1021 consensus
1022 .compare_and_set(&other_key, None, state.clone())
1023 .await,
1024 Ok(CaSResult::Committed),
1025 );
1026
1027 assert_eq!(consensus.head(&other_key).await, Ok(Some(state.clone())));
1028
1029 assert_eq!(consensus.head(&key).await, Ok(Some(new_state.clone())));
1031
1032 let invalid_jump_forward = VersionedData {
1034 seqno: SeqNo(11),
1035 data: Bytes::from("invalid"),
1036 };
1037 assert_eq!(
1038 consensus
1039 .compare_and_set(&key, Some(state.seqno), invalid_jump_forward)
1040 .await,
1041 Ok(CaSResult::ExpectationMismatch),
1042 );
1043
1044 let large_state = VersionedData {
1046 seqno: SeqNo(11),
1047 data: std::iter::repeat(b'a').take(10240).collect(),
1048 };
1049 assert_eq!(
1050 consensus
1051 .compare_and_set(&key, Some(new_state.seqno), large_state)
1052 .await,
1053 Ok(CaSResult::Committed),
1054 );
1055
1056 let v12 = VersionedData {
1058 seqno: SeqNo(12),
1059 data: Bytes::new(),
1060 };
1061 assert_eq!(
1062 consensus.compare_and_set(&key, Some(SeqNo(11)), v12).await,
1063 Ok(CaSResult::Committed),
1064 );
1065 assert_eq!(consensus.truncate(&key, SeqNo(12)).await, Ok(2));
1066
1067 assert_eq!(
1070 consensus
1071 .compare_and_set(
1072 &Uuid::new_v4().to_string(),
1073 None,
1074 VersionedData {
1075 seqno: SeqNo(0),
1076 data: Bytes::new(),
1077 }
1078 )
1079 .await,
1080 Ok(CaSResult::Committed),
1081 );
1082 assert_eq!(
1083 consensus
1084 .compare_and_set(
1085 &Uuid::new_v4().to_string(),
1086 None,
1087 VersionedData {
1088 seqno: SeqNo(i64::MAX.try_into().expect("i64::MAX fits in u64")),
1089 data: Bytes::new(),
1090 }
1091 )
1092 .await,
1093 Ok(CaSResult::Committed),
1094 );
1095 assert_err!(
1096 consensus
1097 .compare_and_set(
1098 &Uuid::new_v4().to_string(),
1099 None,
1100 VersionedData {
1101 seqno: SeqNo(1 << 63),
1102 data: Bytes::new(),
1103 }
1104 )
1105 .await
1106 );
1107 assert_err!(
1108 consensus
1109 .compare_and_set(
1110 &Uuid::new_v4().to_string(),
1111 None,
1112 VersionedData {
1113 seqno: SeqNo(u64::MAX),
1114 data: Bytes::new(),
1115 }
1116 )
1117 .await
1118 );
1119
1120 Ok(())
1121 }
1122
1123 #[mz_ore::test]
1124 fn timeout_error() {
1125 assert!(ExternalError::new_timeout(Instant::now()).is_timeout());
1126 assert!(!ExternalError::from(anyhow!("foo")).is_timeout());
1127 }
1128}