1#![warn(missing_docs, missing_debug_implementations)]
13#![allow(ungated_async_fn_track_caller)]
17
18use std::fmt::Debug;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use itertools::Itertools;
25use mz_build_info::{BuildInfo, build_info};
26use mz_dyncfg::ConfigSet;
27use mz_ore::{instrument, soft_assert_or_log};
28use mz_persist::location::{Blob, Consensus, ExternalError};
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::{Codec, Codec64, Opaque};
31use mz_proto::{IntoRustIfSome, ProtoType};
32use semver::Version;
33use timely::order::TotalOrder;
34use timely::progress::{Antichain, Timestamp};
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::batch::{BATCH_DELETE_ENABLED, Batch, BatchBuilder, ProtoBatch};
38use crate::cache::{PersistClientCache, StateCache};
39use crate::cfg::PersistConfig;
40use crate::critical::{CriticalReaderId, SinceHandle};
41use crate::error::InvalidUsage;
42use crate::fetch::{BatchFetcher, BatchFetcherConfig};
43use crate::internal::compact::{CompactConfig, Compactor};
44use crate::internal::encoding::parse_id;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::machine::{Machine, retry_external};
47use crate::internal::state_versions::StateVersions;
48use crate::metrics::Metrics;
49use crate::read::{
50 Cursor, LazyPartStats, LeasedReaderId, READER_LEASE_DURATION, ReadHandle, Since,
51};
52use crate::rpc::PubSubSender;
53use crate::schema::CaESchema;
54use crate::write::{WriteHandle, WriterId};
55
56pub mod async_runtime;
57pub mod batch;
58pub mod cache;
59pub mod cfg;
60pub mod cli {
61 pub mod admin;
63 pub mod args;
64 pub mod bench;
65 pub mod inspect;
66}
67pub mod critical;
68pub mod error;
69pub mod fetch;
70pub mod internals_bench;
71pub mod iter;
72pub mod metrics {
73 pub use crate::internal::metrics::{
75 Metrics, SinkMetrics, SinkWorkerMetrics, UpdateDelta, encode_ts_metric,
76 };
77}
78pub mod operators {
79 use mz_dyncfg::Config;
82
83 pub mod shard_source;
84
85 pub(crate) const STORAGE_SOURCE_DECODE_FUEL: Config<usize> = Config::new(
87 "storage_source_decode_fuel",
88 100_000,
89 "\
90 The maximum amount of work to do in the persist_source mfp_and_decode \
91 operator before yielding.",
92 );
93}
94pub mod read;
95pub mod rpc;
96pub mod schema;
97pub mod stats;
98pub mod usage;
99pub mod write;
100
101mod internal {
103 pub mod apply;
104 pub mod cache;
105 pub mod compact;
106 pub mod encoding;
107 pub mod gc;
108 pub mod machine;
109 pub mod maintenance;
110 pub mod merge;
111 pub mod metrics;
112 pub mod paths;
113 pub mod restore;
114 pub mod service;
115 pub mod state;
116 pub mod state_diff;
117 pub mod state_versions;
118 pub mod trace;
119 pub mod watch;
120
121 #[cfg(test)]
122 pub mod datadriven;
123}
124
125pub const BUILD_INFO: BuildInfo = build_info!();
127
128pub use mz_persist_types::{PersistLocation, ShardId};
130
131pub use crate::internal::encoding::Schemas;
132
133#[derive(Clone, Debug)]
136pub struct Diagnostics {
137 pub shard_name: String,
139 pub handle_purpose: String,
141}
142
143impl Diagnostics {
144 pub fn from_purpose(handle_purpose: &str) -> Self {
146 Self {
147 shard_name: "unknown".to_string(),
148 handle_purpose: handle_purpose.to_string(),
149 }
150 }
151
152 pub fn for_tests() -> Self {
154 Self {
155 shard_name: "test-shard-name".to_string(),
156 handle_purpose: "test-purpose".to_string(),
157 }
158 }
159}
160
161#[derive(Debug, Clone)]
182pub struct PersistClient {
183 cfg: PersistConfig,
184 blob: Arc<dyn Blob>,
185 consensus: Arc<dyn Consensus>,
186 metrics: Arc<Metrics>,
187 isolated_runtime: Arc<IsolatedRuntime>,
188 shared_states: Arc<StateCache>,
189 pubsub_sender: Arc<dyn PubSubSender>,
190}
191
192impl PersistClient {
193 pub fn new(
199 cfg: PersistConfig,
200 blob: Arc<dyn Blob>,
201 consensus: Arc<dyn Consensus>,
202 metrics: Arc<Metrics>,
203 isolated_runtime: Arc<IsolatedRuntime>,
204 shared_states: Arc<StateCache>,
205 pubsub_sender: Arc<dyn PubSubSender>,
206 ) -> Result<Self, ExternalError> {
207 Ok(PersistClient {
210 cfg,
211 blob,
212 consensus,
213 metrics,
214 isolated_runtime,
215 shared_states,
216 pubsub_sender,
217 })
218 }
219
220 pub async fn new_for_tests() -> Self {
222 let cache = PersistClientCache::new_no_metrics();
223 cache
224 .open(PersistLocation::new_in_mem())
225 .await
226 .expect("in-mem location is valid")
227 }
228
229 pub fn dyncfgs(&self) -> &ConfigSet {
231 &self.cfg.configs
232 }
233
234 async fn make_machine<K, V, T, D>(
235 &self,
236 shard_id: ShardId,
237 diagnostics: Diagnostics,
238 ) -> Result<Machine<K, V, T, D>, InvalidUsage<T>>
239 where
240 K: Debug + Codec,
241 V: Debug + Codec,
242 T: Timestamp + Lattice + Codec64 + Sync,
243 D: Semigroup + Codec64 + Send + Sync,
244 {
245 let state_versions = StateVersions::new(
246 self.cfg.clone(),
247 Arc::clone(&self.consensus),
248 Arc::clone(&self.blob),
249 Arc::clone(&self.metrics),
250 );
251 let machine = Machine::<K, V, T, D>::new(
252 self.cfg.clone(),
253 shard_id,
254 Arc::clone(&self.metrics),
255 Arc::new(state_versions),
256 Arc::clone(&self.shared_states),
257 Arc::clone(&self.pubsub_sender),
258 Arc::clone(&self.isolated_runtime),
259 diagnostics.clone(),
260 )
261 .await?;
262 Ok(machine)
263 }
264
265 #[instrument(level = "debug", fields(shard = %shard_id))]
283 pub async fn open<K, V, T, D>(
284 &self,
285 shard_id: ShardId,
286 key_schema: Arc<K::Schema>,
287 val_schema: Arc<V::Schema>,
288 diagnostics: Diagnostics,
289 use_critical_since: bool,
290 ) -> Result<(WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>), InvalidUsage<T>>
291 where
292 K: Debug + Codec,
293 V: Debug + Codec,
294 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
295 D: Semigroup + Ord + Codec64 + Send + Sync,
296 {
297 Ok((
298 self.open_writer(
299 shard_id,
300 Arc::clone(&key_schema),
301 Arc::clone(&val_schema),
302 diagnostics.clone(),
303 )
304 .await?,
305 self.open_leased_reader(
306 shard_id,
307 key_schema,
308 val_schema,
309 diagnostics,
310 use_critical_since,
311 )
312 .await?,
313 ))
314 }
315
316 #[instrument(level = "debug", fields(shard = %shard_id))]
325 pub async fn open_leased_reader<K, V, T, D>(
326 &self,
327 shard_id: ShardId,
328 key_schema: Arc<K::Schema>,
329 val_schema: Arc<V::Schema>,
330 diagnostics: Diagnostics,
331 use_critical_since: bool,
332 ) -> Result<ReadHandle<K, V, T, D>, InvalidUsage<T>>
333 where
334 K: Debug + Codec,
335 V: Debug + Codec,
336 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
337 D: Semigroup + Codec64 + Send + Sync,
338 {
339 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
340 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
341
342 let reader_id = LeasedReaderId::new();
343 let heartbeat_ts = (self.cfg.now)();
344 let (reader_state, maintenance) = machine
345 .register_leased_reader(
346 &reader_id,
347 &diagnostics.handle_purpose,
348 READER_LEASE_DURATION.get(&self.cfg),
349 heartbeat_ts,
350 use_critical_since,
351 )
352 .await;
353 maintenance.start_performing(&machine, &gc);
354 let schemas = Schemas {
355 id: None,
356 key: key_schema,
357 val: val_schema,
358 };
359 let reader = ReadHandle::new(
360 self.cfg.clone(),
361 Arc::clone(&self.metrics),
362 machine,
363 gc,
364 Arc::clone(&self.blob),
365 reader_id,
366 schemas,
367 reader_state.since,
368 heartbeat_ts,
369 )
370 .await;
371
372 Ok(reader)
373 }
374
375 #[instrument(level = "debug", fields(shard = %shard_id))]
377 pub async fn create_batch_fetcher<K, V, T, D>(
378 &self,
379 shard_id: ShardId,
380 key_schema: Arc<K::Schema>,
381 val_schema: Arc<V::Schema>,
382 is_transient: bool,
383 diagnostics: Diagnostics,
384 ) -> Result<BatchFetcher<K, V, T, D>, InvalidUsage<T>>
385 where
386 K: Debug + Codec,
387 V: Debug + Codec,
388 T: Timestamp + Lattice + Codec64 + Sync,
389 D: Semigroup + Codec64 + Send + Sync,
390 {
391 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
392 let read_schemas = Schemas {
393 id: None,
394 key: key_schema,
395 val: val_schema,
396 };
397 let schema_cache = machine.applier.schema_cache();
398 let fetcher = BatchFetcher {
399 cfg: BatchFetcherConfig::new(&self.cfg),
400 blob: Arc::clone(&self.blob),
401 metrics: Arc::clone(&self.metrics),
402 shard_metrics: Arc::clone(&machine.applier.shard_metrics),
403 shard_id,
404 read_schemas,
405 schema_cache,
406 is_transient,
407 _phantom: PhantomData,
408 };
409
410 Ok(fetcher)
411 }
412
413 pub const CONTROLLER_CRITICAL_SINCE: CriticalReaderId =
435 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
436
437 #[instrument(level = "debug", fields(shard = %shard_id))]
458 pub async fn open_critical_since<K, V, T, D, O>(
459 &self,
460 shard_id: ShardId,
461 reader_id: CriticalReaderId,
462 diagnostics: Diagnostics,
463 ) -> Result<SinceHandle<K, V, T, D, O>, InvalidUsage<T>>
464 where
465 K: Debug + Codec,
466 V: Debug + Codec,
467 T: Timestamp + Lattice + Codec64 + Sync,
468 D: Semigroup + Codec64 + Send + Sync,
469 O: Opaque + Codec64,
470 {
471 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
472 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
473
474 let (state, maintenance) = machine
475 .register_critical_reader::<O>(&reader_id, &diagnostics.handle_purpose)
476 .await;
477 maintenance.start_performing(&machine, &gc);
478 let handle = SinceHandle::new(
479 machine,
480 gc,
481 reader_id,
482 state.since,
483 Codec64::decode(state.opaque.0),
484 );
485
486 Ok(handle)
487 }
488
489 #[instrument(level = "debug", fields(shard = %shard_id))]
498 pub async fn open_writer<K, V, T, D>(
499 &self,
500 shard_id: ShardId,
501 key_schema: Arc<K::Schema>,
502 val_schema: Arc<V::Schema>,
503 diagnostics: Diagnostics,
504 ) -> Result<WriteHandle<K, V, T, D>, InvalidUsage<T>>
505 where
506 K: Debug + Codec,
507 V: Debug + Codec,
508 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
509 D: Semigroup + Ord + Codec64 + Send + Sync,
510 {
511 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
512 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
513
514 let (schema_id, maintenance) = machine.register_schema(&*key_schema, &*val_schema).await;
524 maintenance.start_performing(&machine, &gc);
525 soft_assert_or_log!(
526 schema_id.is_some(),
527 "unable to register schemas {:?} {:?}",
528 key_schema,
529 val_schema,
530 );
531
532 let writer_id = WriterId::new();
533 let schemas = Schemas {
534 id: schema_id,
535 key: key_schema,
536 val: val_schema,
537 };
538 let writer = WriteHandle::new(
539 self.cfg.clone(),
540 Arc::clone(&self.metrics),
541 machine,
542 gc,
543 Arc::clone(&self.blob),
544 writer_id,
545 &diagnostics.handle_purpose,
546 schemas,
547 );
548 Ok(writer)
549 }
550
551 #[instrument(level = "debug", fields(shard = %shard_id))]
561 pub async fn batch_builder<K, V, T, D>(
562 &self,
563 shard_id: ShardId,
564 write_schemas: Schemas<K, V>,
565 lower: Antichain<T>,
566 max_runs: Option<usize>,
567 ) -> BatchBuilder<K, V, T, D>
568 where
569 K: Debug + Codec,
570 V: Debug + Codec,
571 T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
572 D: Semigroup + Ord + Codec64 + Send + Sync,
573 {
574 let mut compact_cfg = CompactConfig::new(&self.cfg, shard_id);
575 compact_cfg.batch.max_runs = max_runs;
576 WriteHandle::builder_inner(
577 &self.cfg,
578 compact_cfg,
579 Arc::clone(&self.metrics),
580 self.metrics.shards.shard(&shard_id, "peek_stash"),
581 &self.metrics.user,
582 Arc::clone(&self.isolated_runtime),
583 Arc::clone(&self.blob),
584 shard_id,
585 write_schemas,
586 lower,
587 )
588 }
589
590 pub fn batch_from_transmittable_batch<K, V, T, D>(
599 &self,
600 shard_id: &ShardId,
601 batch: ProtoBatch,
602 ) -> Batch<K, V, T, D>
603 where
604 K: Debug + Codec,
605 V: Debug + Codec,
606 T: Timestamp + Lattice + Codec64 + Sync,
607 D: Semigroup + Ord + Codec64 + Send + Sync,
608 {
609 let batch_shard_id: ShardId = batch
610 .shard_id
611 .into_rust()
612 .expect("valid transmittable batch");
613 assert_eq!(&batch_shard_id, shard_id);
614
615 let shard_metrics = self.metrics.shards.shard(shard_id, "peek_stash");
616
617 let ret = Batch {
618 batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
619 metrics: Arc::clone(&self.metrics),
620 shard_metrics,
621 version: Version::parse(&batch.version).expect("valid transmittable batch"),
622 batch: batch
623 .batch
624 .into_rust_if_some("ProtoBatch::batch")
625 .expect("valid transmittable batch"),
626 blob: Arc::clone(&self.blob),
627 _phantom: std::marker::PhantomData,
628 };
629
630 assert_eq!(&ret.shard_id(), shard_id);
631 ret
632 }
633
634 #[allow(clippy::unused_async)]
649 pub async fn read_batches_consolidated<K, V, T, D>(
650 &mut self,
651 shard_id: ShardId,
652 as_of: Antichain<T>,
653 read_schemas: Schemas<K, V>,
654 batches: Vec<Batch<K, V, T, D>>,
655 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
656 memory_budget_bytes: usize,
657 ) -> Result<Cursor<K, V, T, D, Vec<Batch<K, V, T, D>>>, Since<T>>
658 where
659 K: Debug + Codec + Ord,
660 V: Debug + Codec + Ord,
661 T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
662 D: Semigroup + Ord + Codec64 + Send + Sync,
663 {
664 let shard_metrics = self.metrics.shards.shard(&shard_id, "peek_stash");
665
666 let hollow_batches = batches.iter().map(|b| b.batch.clone()).collect_vec();
667
668 ReadHandle::read_batches_consolidated(
669 &self.cfg,
670 Arc::clone(&self.metrics),
671 shard_metrics,
672 self.metrics.read.snapshot.clone(),
673 Arc::clone(&self.blob),
674 shard_id,
675 as_of,
676 read_schemas,
677 &hollow_batches,
678 batches,
679 should_fetch_part,
680 memory_budget_bytes,
681 )
682 }
683
684 pub async fn get_schema<K, V, T, D>(
686 &self,
687 shard_id: ShardId,
688 schema_id: SchemaId,
689 diagnostics: Diagnostics,
690 ) -> Result<Option<(K::Schema, V::Schema)>, InvalidUsage<T>>
691 where
692 K: Debug + Codec,
693 V: Debug + Codec,
694 T: Timestamp + Lattice + Codec64 + Sync,
695 D: Semigroup + Codec64 + Send + Sync,
696 {
697 let machine = self
698 .make_machine::<K, V, T, D>(shard_id, diagnostics)
699 .await?;
700 Ok(machine.get_schema(schema_id))
701 }
702
703 pub async fn latest_schema<K, V, T, D>(
705 &self,
706 shard_id: ShardId,
707 diagnostics: Diagnostics,
708 ) -> Result<Option<(SchemaId, K::Schema, V::Schema)>, InvalidUsage<T>>
709 where
710 K: Debug + Codec,
711 V: Debug + Codec,
712 T: Timestamp + Lattice + Codec64 + Sync,
713 D: Semigroup + Codec64 + Send + Sync,
714 {
715 let machine = self
716 .make_machine::<K, V, T, D>(shard_id, diagnostics)
717 .await?;
718 Ok(machine.latest_schema())
719 }
720
721 pub async fn compare_and_evolve_schema<K, V, T, D>(
732 &self,
733 shard_id: ShardId,
734 expected: SchemaId,
735 key_schema: &K::Schema,
736 val_schema: &V::Schema,
737 diagnostics: Diagnostics,
738 ) -> Result<CaESchema<K, V>, InvalidUsage<T>>
739 where
740 K: Debug + Codec,
741 V: Debug + Codec,
742 T: Timestamp + Lattice + Codec64 + Sync,
743 D: Semigroup + Codec64 + Send + Sync,
744 {
745 let machine = self
746 .make_machine::<K, V, T, D>(shard_id, diagnostics)
747 .await?;
748 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
749 let (res, maintenance) = machine
750 .compare_and_evolve_schema(expected, key_schema, val_schema)
751 .await;
752 maintenance.start_performing(&machine, &gc);
753 Ok(res)
754 }
755
756 pub async fn is_finalized<K, V, T, D>(
760 &self,
761 shard_id: ShardId,
762 diagnostics: Diagnostics,
763 ) -> Result<bool, InvalidUsage<T>>
764 where
765 K: Debug + Codec,
766 V: Debug + Codec,
767 T: Timestamp + Lattice + Codec64 + Sync,
768 D: Semigroup + Codec64 + Send + Sync,
769 {
770 let machine = self
771 .make_machine::<K, V, T, D>(shard_id, diagnostics)
772 .await?;
773 Ok(machine.is_finalized())
774 }
775
776 #[instrument(level = "debug", fields(shard = %shard_id))]
787 pub async fn finalize_shard<K, V, T, D>(
788 &self,
789 shard_id: ShardId,
790 diagnostics: Diagnostics,
791 ) -> Result<(), InvalidUsage<T>>
792 where
793 K: Debug + Codec,
794 V: Debug + Codec,
795 T: Timestamp + Lattice + Codec64 + Sync,
796 D: Semigroup + Codec64 + Send + Sync,
797 {
798 let machine = self
799 .make_machine::<K, V, T, D>(shard_id, diagnostics)
800 .await?;
801
802 let maintenance = machine.become_tombstone().await?;
803 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
804
805 let () = maintenance.perform(&machine, &gc).await;
806
807 Ok(())
808 }
809
810 pub async fn inspect_shard<T: Timestamp + Lattice + Codec64>(
816 &self,
817 shard_id: &ShardId,
818 ) -> Result<impl serde::Serialize, anyhow::Error> {
819 let state_versions = StateVersions::new(
820 self.cfg.clone(),
821 Arc::clone(&self.consensus),
822 Arc::clone(&self.blob),
823 Arc::clone(&self.metrics),
824 );
825 let versions = state_versions.fetch_all_live_diffs(shard_id).await;
829 if versions.0.is_empty() {
830 return Err(anyhow::anyhow!("{} does not exist", shard_id));
831 }
832 let state = state_versions
833 .fetch_current_state::<T>(shard_id, versions.0)
834 .await;
835 let state = state.check_ts_codec(shard_id)?;
836 Ok(state)
837 }
838
839 #[cfg(test)]
841 #[track_caller]
842 pub async fn expect_open<K, V, T, D>(
843 &self,
844 shard_id: ShardId,
845 ) -> (WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>)
846 where
847 K: Debug + Codec,
848 V: Debug + Codec,
849 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
850 D: Semigroup + Ord + Codec64 + Send + Sync,
851 K::Schema: Default,
852 V::Schema: Default,
853 {
854 self.open(
855 shard_id,
856 Arc::new(K::Schema::default()),
857 Arc::new(V::Schema::default()),
858 Diagnostics::for_tests(),
859 true,
860 )
861 .await
862 .expect("codec mismatch")
863 }
864
865 pub fn metrics(&self) -> &Arc<Metrics> {
869 &self.metrics
870 }
871}
872
873#[cfg(test)]
874mod tests {
875 use std::future::Future;
876 use std::mem;
877 use std::pin::Pin;
878 use std::task::Context;
879 use std::time::Duration;
880
881 use differential_dataflow::consolidation::consolidate_updates;
882 use differential_dataflow::lattice::Lattice;
883 use futures_task::noop_waker;
884 use mz_dyncfg::ConfigUpdates;
885 use mz_ore::assert_ok;
886 use mz_persist::indexed::encoding::BlobTraceBatchPart;
887 use mz_persist::workload::DataGenerator;
888 use mz_persist_types::codec_impls::{StringSchema, VecU8Schema};
889 use mz_proto::protobuf_roundtrip;
890 use proptest::prelude::*;
891 use timely::order::PartialOrder;
892 use timely::progress::Antichain;
893
894 use crate::batch::BLOB_TARGET_SIZE;
895 use crate::cache::PersistClientCache;
896 use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
897 use crate::error::{CodecConcreteType, CodecMismatch, UpperMismatch};
898 use crate::internal::paths::BlobKey;
899 use crate::read::ListenEvent;
900
901 use super::*;
902
903 pub fn new_test_client_cache(dyncfgs: &ConfigUpdates) -> PersistClientCache {
904 let mut cache = PersistClientCache::new_no_metrics();
907 cache.cfg.set_config(&BLOB_TARGET_SIZE, 10);
908 cache
909 .cfg
910 .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 1);
911 dyncfgs.apply(cache.cfg());
912
913 cache.cfg.compaction_enabled = true;
915 cache
916 }
917
918 pub async fn new_test_client(dyncfgs: &ConfigUpdates) -> PersistClient {
919 let cache = new_test_client_cache(dyncfgs);
920 cache
921 .open(PersistLocation::new_in_mem())
922 .await
923 .expect("client construction failed")
924 }
925
926 pub fn all_ok<'a, K, V, T, D, I>(
927 iter: I,
928 as_of: T,
929 ) -> Vec<((Result<K, String>, Result<V, String>), T, D)>
930 where
931 K: Ord + Clone + 'a,
932 V: Ord + Clone + 'a,
933 T: Timestamp + Lattice + Clone + 'a,
934 D: Semigroup + Clone + 'a,
935 I: IntoIterator<Item = &'a ((K, V), T, D)>,
936 {
937 let as_of = Antichain::from_elem(as_of);
938 let mut ret = iter
939 .into_iter()
940 .map(|((k, v), t, d)| {
941 let mut t = t.clone();
942 t.advance_by(as_of.borrow());
943 ((Ok(k.clone()), Ok(v.clone())), t, d.clone())
944 })
945 .collect();
946 consolidate_updates(&mut ret);
947 ret
948 }
949
950 pub async fn expect_fetch_part<K, V, T, D>(
951 blob: &dyn Blob,
952 key: &BlobKey,
953 metrics: &Metrics,
954 read_schemas: &Schemas<K, V>,
955 ) -> (
956 BlobTraceBatchPart<T>,
957 Vec<((Result<K, String>, Result<V, String>), T, D)>,
958 )
959 where
960 K: Codec,
961 V: Codec,
962 T: Timestamp + Codec64,
963 D: Codec64,
964 {
965 let value = blob
966 .get(key)
967 .await
968 .expect("failed to fetch part")
969 .expect("missing part");
970 let mut part =
971 BlobTraceBatchPart::decode(&value, &metrics.columnar).expect("failed to decode part");
972 let _ = part
974 .updates
975 .get_or_make_codec::<K, V>(&read_schemas.key, &read_schemas.val);
976 let mut updates = Vec::new();
977 for ((k, v), t, d) in part.updates.records().expect("codec data").iter() {
979 updates.push((
980 (
981 K::decode(k, &read_schemas.key),
982 V::decode(v, &read_schemas.val),
983 ),
984 T::decode(t),
985 D::decode(d),
986 ));
987 }
988 (part, updates)
989 }
990
991 #[mz_persist_proc::test(tokio::test)]
992 #[cfg_attr(miri, ignore)] async fn sanity_check(dyncfgs: ConfigUpdates) {
994 let data = [
995 (("1".to_owned(), "one".to_owned()), 1, 1),
996 (("2".to_owned(), "two".to_owned()), 2, 1),
997 (("3".to_owned(), "three".to_owned()), 3, 1),
998 ];
999
1000 let (mut write, mut read) = new_test_client(&dyncfgs)
1001 .await
1002 .expect_open::<String, String, u64, i64>(ShardId::new())
1003 .await;
1004 assert_eq!(write.upper(), &Antichain::from_elem(u64::minimum()));
1005 assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1006
1007 write
1009 .expect_append(&data[..2], write.upper().clone(), vec![3])
1010 .await;
1011 assert_eq!(write.upper(), &Antichain::from_elem(3));
1012
1013 assert_eq!(
1015 read.expect_snapshot_and_fetch(1).await,
1016 all_ok(&data[..1], 1)
1017 );
1018
1019 let mut listen = read.clone("").await.expect_listen(1).await;
1020
1021 write
1023 .expect_append(&data[2..], write.upper().clone(), vec![4])
1024 .await;
1025 assert_eq!(write.upper(), &Antichain::from_elem(4));
1026
1027 assert_eq!(
1029 listen.read_until(&4).await,
1030 (all_ok(&data[1..], 1), Antichain::from_elem(4))
1031 );
1032
1033 read.downgrade_since(&Antichain::from_elem(2)).await;
1035 assert_eq!(read.since(), &Antichain::from_elem(2));
1036 }
1037
1038 #[mz_persist_proc::test(tokio::test)]
1040 #[cfg_attr(miri, ignore)] async fn open_reader_writer(dyncfgs: ConfigUpdates) {
1042 let data = vec![
1043 (("1".to_owned(), "one".to_owned()), 1, 1),
1044 (("2".to_owned(), "two".to_owned()), 2, 1),
1045 (("3".to_owned(), "three".to_owned()), 3, 1),
1046 ];
1047
1048 let shard_id = ShardId::new();
1049 let client = new_test_client(&dyncfgs).await;
1050 let mut write1 = client
1051 .open_writer::<String, String, u64, i64>(
1052 shard_id,
1053 Arc::new(StringSchema),
1054 Arc::new(StringSchema),
1055 Diagnostics::for_tests(),
1056 )
1057 .await
1058 .expect("codec mismatch");
1059 let mut read1 = client
1060 .open_leased_reader::<String, String, u64, i64>(
1061 shard_id,
1062 Arc::new(StringSchema),
1063 Arc::new(StringSchema),
1064 Diagnostics::for_tests(),
1065 true,
1066 )
1067 .await
1068 .expect("codec mismatch");
1069 let mut read2 = client
1070 .open_leased_reader::<String, String, u64, i64>(
1071 shard_id,
1072 Arc::new(StringSchema),
1073 Arc::new(StringSchema),
1074 Diagnostics::for_tests(),
1075 true,
1076 )
1077 .await
1078 .expect("codec mismatch");
1079 let mut write2 = client
1080 .open_writer::<String, String, u64, i64>(
1081 shard_id,
1082 Arc::new(StringSchema),
1083 Arc::new(StringSchema),
1084 Diagnostics::for_tests(),
1085 )
1086 .await
1087 .expect("codec mismatch");
1088
1089 write2.expect_compare_and_append(&data[..1], 0, 2).await;
1090 assert_eq!(
1091 read2.expect_snapshot_and_fetch(1).await,
1092 all_ok(&data[..1], 1)
1093 );
1094 write1.expect_compare_and_append(&data[1..], 2, 4).await;
1095 assert_eq!(read1.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1096 }
1097
1098 #[mz_persist_proc::test(tokio::test)]
1099 #[cfg_attr(miri, ignore)] async fn invalid_usage(dyncfgs: ConfigUpdates) {
1101 let data = vec![
1102 (("1".to_owned(), "one".to_owned()), 1, 1),
1103 (("2".to_owned(), "two".to_owned()), 2, 1),
1104 (("3".to_owned(), "three".to_owned()), 3, 1),
1105 ];
1106
1107 let shard_id0 = "s00000000-0000-0000-0000-000000000000"
1108 .parse::<ShardId>()
1109 .expect("invalid shard id");
1110 let mut client = new_test_client(&dyncfgs).await;
1111
1112 let (mut write0, mut read0) = client
1113 .expect_open::<String, String, u64, i64>(shard_id0)
1114 .await;
1115
1116 write0.expect_compare_and_append(&data, 0, 4).await;
1117
1118 {
1120 fn codecs(
1121 k: &str,
1122 v: &str,
1123 t: &str,
1124 d: &str,
1125 ) -> (String, String, String, String, Option<CodecConcreteType>) {
1126 (k.to_owned(), v.to_owned(), t.to_owned(), d.to_owned(), None)
1127 }
1128
1129 client.shared_states = Arc::new(StateCache::new_no_metrics());
1130 assert_eq!(
1131 client
1132 .open::<Vec<u8>, String, u64, i64>(
1133 shard_id0,
1134 Arc::new(VecU8Schema),
1135 Arc::new(StringSchema),
1136 Diagnostics::for_tests(),
1137 true,
1138 )
1139 .await
1140 .unwrap_err(),
1141 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1142 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1143 actual: codecs("String", "String", "u64", "i64"),
1144 }))
1145 );
1146 assert_eq!(
1147 client
1148 .open::<String, Vec<u8>, u64, i64>(
1149 shard_id0,
1150 Arc::new(StringSchema),
1151 Arc::new(VecU8Schema),
1152 Diagnostics::for_tests(),
1153 true,
1154 )
1155 .await
1156 .unwrap_err(),
1157 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1158 requested: codecs("String", "Vec<u8>", "u64", "i64"),
1159 actual: codecs("String", "String", "u64", "i64"),
1160 }))
1161 );
1162 assert_eq!(
1163 client
1164 .open::<String, String, i64, i64>(
1165 shard_id0,
1166 Arc::new(StringSchema),
1167 Arc::new(StringSchema),
1168 Diagnostics::for_tests(),
1169 true,
1170 )
1171 .await
1172 .unwrap_err(),
1173 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1174 requested: codecs("String", "String", "i64", "i64"),
1175 actual: codecs("String", "String", "u64", "i64"),
1176 }))
1177 );
1178 assert_eq!(
1179 client
1180 .open::<String, String, u64, u64>(
1181 shard_id0,
1182 Arc::new(StringSchema),
1183 Arc::new(StringSchema),
1184 Diagnostics::for_tests(),
1185 true,
1186 )
1187 .await
1188 .unwrap_err(),
1189 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1190 requested: codecs("String", "String", "u64", "u64"),
1191 actual: codecs("String", "String", "u64", "i64"),
1192 }))
1193 );
1194
1195 assert_eq!(
1199 client
1200 .open_leased_reader::<Vec<u8>, String, u64, i64>(
1201 shard_id0,
1202 Arc::new(VecU8Schema),
1203 Arc::new(StringSchema),
1204 Diagnostics::for_tests(),
1205 true,
1206 )
1207 .await
1208 .unwrap_err(),
1209 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1210 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1211 actual: codecs("String", "String", "u64", "i64"),
1212 }))
1213 );
1214 assert_eq!(
1215 client
1216 .open_writer::<Vec<u8>, String, u64, i64>(
1217 shard_id0,
1218 Arc::new(VecU8Schema),
1219 Arc::new(StringSchema),
1220 Diagnostics::for_tests(),
1221 )
1222 .await
1223 .unwrap_err(),
1224 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1225 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1226 actual: codecs("String", "String", "u64", "i64"),
1227 }))
1228 );
1229 }
1230
1231 {
1233 let snap = read0
1234 .snapshot(Antichain::from_elem(3))
1235 .await
1236 .expect("cannot serve requested as_of");
1237
1238 let shard_id1 = "s11111111-1111-1111-1111-111111111111"
1239 .parse::<ShardId>()
1240 .expect("invalid shard id");
1241 let mut fetcher1 = client
1242 .create_batch_fetcher::<String, String, u64, i64>(
1243 shard_id1,
1244 Default::default(),
1245 Default::default(),
1246 false,
1247 Diagnostics::for_tests(),
1248 )
1249 .await
1250 .unwrap();
1251 for part in snap {
1252 let (part, _lease) = part.into_exchangeable_part();
1253 let res = fetcher1.fetch_leased_part(part).await;
1254 assert_eq!(
1255 res.unwrap_err(),
1256 InvalidUsage::BatchNotFromThisShard {
1257 batch_shard: shard_id0,
1258 handle_shard: shard_id1,
1259 }
1260 );
1261 }
1262 }
1263
1264 {
1266 let ts3 = &data[2];
1267 assert_eq!(ts3.1, 3);
1268 let ts3 = vec![ts3.clone()];
1269
1270 assert_eq!(
1273 write0
1274 .append(&ts3, Antichain::from_elem(4), Antichain::from_elem(5))
1275 .await
1276 .unwrap_err(),
1277 InvalidUsage::UpdateNotBeyondLower {
1278 ts: 3,
1279 lower: Antichain::from_elem(4),
1280 },
1281 );
1282 assert_eq!(
1283 write0
1284 .append(&ts3, Antichain::from_elem(2), Antichain::from_elem(3))
1285 .await
1286 .unwrap_err(),
1287 InvalidUsage::UpdateBeyondUpper {
1288 ts: 3,
1289 expected_upper: Antichain::from_elem(3),
1290 },
1291 );
1292 assert_eq!(
1294 write0
1295 .append(&data[..0], Antichain::from_elem(3), Antichain::from_elem(2))
1296 .await
1297 .unwrap_err(),
1298 InvalidUsage::InvalidBounds {
1299 lower: Antichain::from_elem(3),
1300 upper: Antichain::from_elem(2),
1301 },
1302 );
1303
1304 assert_eq!(
1306 write0
1307 .builder(Antichain::from_elem(3))
1308 .finish(Antichain::from_elem(2))
1309 .await
1310 .unwrap_err(),
1311 InvalidUsage::InvalidBounds {
1312 lower: Antichain::from_elem(3),
1313 upper: Antichain::from_elem(2)
1314 },
1315 );
1316 let batch = write0
1317 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1318 .await
1319 .expect("invalid usage");
1320 assert_eq!(
1321 write0
1322 .append_batch(batch, Antichain::from_elem(4), Antichain::from_elem(5))
1323 .await
1324 .unwrap_err(),
1325 InvalidUsage::InvalidBatchBounds {
1326 batch_lower: Antichain::from_elem(3),
1327 batch_upper: Antichain::from_elem(4),
1328 append_lower: Antichain::from_elem(4),
1329 append_upper: Antichain::from_elem(5),
1330 },
1331 );
1332 let batch = write0
1333 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1334 .await
1335 .expect("invalid usage");
1336 assert_eq!(
1337 write0
1338 .append_batch(batch, Antichain::from_elem(2), Antichain::from_elem(3))
1339 .await
1340 .unwrap_err(),
1341 InvalidUsage::InvalidBatchBounds {
1342 batch_lower: Antichain::from_elem(3),
1343 batch_upper: Antichain::from_elem(4),
1344 append_lower: Antichain::from_elem(2),
1345 append_upper: Antichain::from_elem(3),
1346 },
1347 );
1348 let batch = write0
1349 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1350 .await
1351 .expect("invalid usage");
1352 assert!(matches!(
1355 write0
1356 .append_batch(batch, Antichain::from_elem(3), Antichain::from_elem(3))
1357 .await
1358 .unwrap_err(),
1359 InvalidUsage::InvalidEmptyTimeInterval { .. }
1360 ));
1361 }
1362 }
1363
1364 #[mz_persist_proc::test(tokio::test)]
1365 #[cfg_attr(miri, ignore)] async fn multiple_shards(dyncfgs: ConfigUpdates) {
1367 let data1 = [
1368 (("1".to_owned(), "one".to_owned()), 1, 1),
1369 (("2".to_owned(), "two".to_owned()), 2, 1),
1370 ];
1371
1372 let data2 = [(("1".to_owned(), ()), 1, 1), (("2".to_owned(), ()), 2, 1)];
1373
1374 let client = new_test_client(&dyncfgs).await;
1375
1376 let (mut write1, mut read1) = client
1377 .expect_open::<String, String, u64, i64>(ShardId::new())
1378 .await;
1379
1380 let (mut write2, mut read2) = client
1383 .expect_open::<String, (), u64, i64>(ShardId::new())
1384 .await;
1385
1386 write1
1387 .expect_compare_and_append(&data1[..], u64::minimum(), 3)
1388 .await;
1389
1390 write2
1391 .expect_compare_and_append(&data2[..], u64::minimum(), 3)
1392 .await;
1393
1394 assert_eq!(
1395 read1.expect_snapshot_and_fetch(2).await,
1396 all_ok(&data1[..], 2)
1397 );
1398
1399 assert_eq!(
1400 read2.expect_snapshot_and_fetch(2).await,
1401 all_ok(&data2[..], 2)
1402 );
1403 }
1404
1405 #[mz_persist_proc::test(tokio::test)]
1406 #[cfg_attr(miri, ignore)] async fn fetch_upper(dyncfgs: ConfigUpdates) {
1408 let data = [
1409 (("1".to_owned(), "one".to_owned()), 1, 1),
1410 (("2".to_owned(), "two".to_owned()), 2, 1),
1411 ];
1412
1413 let client = new_test_client(&dyncfgs).await;
1414
1415 let shard_id = ShardId::new();
1416
1417 let (mut write1, _read1) = client
1418 .expect_open::<String, String, u64, i64>(shard_id)
1419 .await;
1420
1421 let (mut write2, _read2) = client
1422 .expect_open::<String, String, u64, i64>(shard_id)
1423 .await;
1424
1425 write1
1426 .expect_append(&data[..], write1.upper().clone(), vec![3])
1427 .await;
1428
1429 assert_eq!(write2.fetch_recent_upper().await, &Antichain::from_elem(3));
1431
1432 assert_eq!(write2.upper(), &Antichain::from_elem(3));
1435 }
1436
1437 #[mz_persist_proc::test(tokio::test)]
1438 #[cfg_attr(miri, ignore)] async fn append_with_invalid_upper(dyncfgs: ConfigUpdates) {
1440 let data = [
1441 (("1".to_owned(), "one".to_owned()), 1, 1),
1442 (("2".to_owned(), "two".to_owned()), 2, 1),
1443 ];
1444
1445 let client = new_test_client(&dyncfgs).await;
1446
1447 let shard_id = ShardId::new();
1448
1449 let (mut write, _read) = client
1450 .expect_open::<String, String, u64, i64>(shard_id)
1451 .await;
1452
1453 write
1454 .expect_append(&data[..], write.upper().clone(), vec![3])
1455 .await;
1456
1457 let data = [
1458 (("5".to_owned(), "fünf".to_owned()), 5, 1),
1459 (("6".to_owned(), "sechs".to_owned()), 6, 1),
1460 ];
1461 let res = write
1462 .append(
1463 data.iter(),
1464 Antichain::from_elem(5),
1465 Antichain::from_elem(7),
1466 )
1467 .await;
1468 assert_eq!(
1469 res,
1470 Ok(Err(UpperMismatch {
1471 expected: Antichain::from_elem(5),
1472 current: Antichain::from_elem(3)
1473 }))
1474 );
1475
1476 assert_eq!(write.upper(), &Antichain::from_elem(3));
1478 }
1479
1480 #[allow(unused)]
1483 async fn sync_send(dyncfgs: ConfigUpdates) {
1484 mz_ore::test::init_logging();
1485
1486 fn is_send_sync<T: Send + Sync>(_x: T) -> bool {
1487 true
1488 }
1489
1490 let client = new_test_client(&dyncfgs).await;
1491
1492 let (write, read) = client
1493 .expect_open::<String, String, u64, i64>(ShardId::new())
1494 .await;
1495
1496 assert!(is_send_sync(client));
1497 assert!(is_send_sync(write));
1498 assert!(is_send_sync(read));
1499 }
1500
1501 #[mz_persist_proc::test(tokio::test)]
1502 #[cfg_attr(miri, ignore)] async fn compare_and_append(dyncfgs: ConfigUpdates) {
1504 let data = vec![
1505 (("1".to_owned(), "one".to_owned()), 1, 1),
1506 (("2".to_owned(), "two".to_owned()), 2, 1),
1507 (("3".to_owned(), "three".to_owned()), 3, 1),
1508 ];
1509
1510 let id = ShardId::new();
1511 let client = new_test_client(&dyncfgs).await;
1512 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1513
1514 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1515
1516 assert_eq!(write1.upper(), &Antichain::from_elem(u64::minimum()));
1517 assert_eq!(write2.upper(), &Antichain::from_elem(u64::minimum()));
1518 assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1519
1520 write1
1522 .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1523 .await;
1524 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1525
1526 assert_eq!(
1527 read.expect_snapshot_and_fetch(2).await,
1528 all_ok(&data[..2], 2)
1529 );
1530
1531 let res = write2
1533 .compare_and_append(
1534 &data[..2],
1535 Antichain::from_elem(u64::minimum()),
1536 Antichain::from_elem(3),
1537 )
1538 .await;
1539 assert_eq!(
1540 res,
1541 Ok(Err(UpperMismatch {
1542 expected: Antichain::from_elem(u64::minimum()),
1543 current: Antichain::from_elem(3)
1544 }))
1545 );
1546
1547 assert_eq!(write2.upper(), &Antichain::from_elem(3));
1549
1550 write2.expect_compare_and_append(&data[2..], 3, 4).await;
1552
1553 assert_eq!(write2.upper(), &Antichain::from_elem(4));
1554
1555 assert_eq!(read.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1556 }
1557
1558 #[mz_persist_proc::test(tokio::test)]
1559 #[cfg_attr(miri, ignore)] async fn overlapping_append(dyncfgs: ConfigUpdates) {
1561 mz_ore::test::init_logging_default("info");
1562
1563 let data = vec![
1564 (("1".to_owned(), "one".to_owned()), 1, 1),
1565 (("2".to_owned(), "two".to_owned()), 2, 1),
1566 (("3".to_owned(), "three".to_owned()), 3, 1),
1567 (("4".to_owned(), "vier".to_owned()), 4, 1),
1568 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1569 ];
1570
1571 let id = ShardId::new();
1572 let client = new_test_client(&dyncfgs).await;
1573
1574 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1575
1576 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1577
1578 let mut listen = read.clone("").await.expect_listen(0).await;
1580
1581 write1
1583 .expect_append(&data[..2], write1.upper().clone(), vec![3])
1584 .await;
1585 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1586
1587 write2
1589 .expect_append(&data[..4], write2.upper().clone(), vec![5])
1590 .await;
1591 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1592
1593 write1
1595 .expect_append(&data[2..5], write1.upper().clone(), vec![6])
1596 .await;
1597 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1598
1599 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1600
1601 assert_eq!(
1602 listen.read_until(&6).await,
1603 (all_ok(&data[..], 1), Antichain::from_elem(6))
1604 );
1605 }
1606
1607 #[mz_persist_proc::test(tokio::test)]
1610 #[cfg_attr(miri, ignore)] async fn contiguous_append(dyncfgs: ConfigUpdates) {
1612 let data = vec![
1613 (("1".to_owned(), "one".to_owned()), 1, 1),
1614 (("2".to_owned(), "two".to_owned()), 2, 1),
1615 (("3".to_owned(), "three".to_owned()), 3, 1),
1616 (("4".to_owned(), "vier".to_owned()), 4, 1),
1617 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1618 ];
1619
1620 let id = ShardId::new();
1621 let client = new_test_client(&dyncfgs).await;
1622
1623 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1624
1625 write
1627 .expect_append(&data[..2], write.upper().clone(), vec![3])
1628 .await;
1629 assert_eq!(write.upper(), &Antichain::from_elem(3));
1630
1631 let result = write
1634 .append(
1635 &data[4..5],
1636 Antichain::from_elem(5),
1637 Antichain::from_elem(6),
1638 )
1639 .await;
1640 assert_eq!(
1641 result,
1642 Ok(Err(UpperMismatch {
1643 expected: Antichain::from_elem(5),
1644 current: Antichain::from_elem(3)
1645 }))
1646 );
1647
1648 write.expect_append(&data[2..5], vec![3], vec![6]).await;
1650 assert_eq!(write.upper(), &Antichain::from_elem(6));
1651
1652 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1653 }
1654
1655 #[mz_persist_proc::test(tokio::test)]
1658 #[cfg_attr(miri, ignore)] async fn noncontiguous_append_per_writer(dyncfgs: ConfigUpdates) {
1660 let data = vec![
1661 (("1".to_owned(), "one".to_owned()), 1, 1),
1662 (("2".to_owned(), "two".to_owned()), 2, 1),
1663 (("3".to_owned(), "three".to_owned()), 3, 1),
1664 (("4".to_owned(), "vier".to_owned()), 4, 1),
1665 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1666 ];
1667
1668 let id = ShardId::new();
1669 let client = new_test_client(&dyncfgs).await;
1670
1671 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1672
1673 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1674
1675 write1
1677 .expect_append(&data[..2], write1.upper().clone(), vec![3])
1678 .await;
1679 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1680
1681 write2.upper = Antichain::from_elem(3);
1683 write2
1684 .expect_append(&data[2..4], write2.upper().clone(), vec![5])
1685 .await;
1686 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1687
1688 write1.upper = Antichain::from_elem(5);
1690 write1
1691 .expect_append(&data[4..5], write1.upper().clone(), vec![6])
1692 .await;
1693 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1694
1695 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1696 }
1697
1698 #[mz_persist_proc::test(tokio::test)]
1701 #[cfg_attr(miri, ignore)] async fn contiguous_compare_and_append(dyncfgs: ConfigUpdates) {
1703 let data = vec![
1704 (("1".to_owned(), "one".to_owned()), 1, 1),
1705 (("2".to_owned(), "two".to_owned()), 2, 1),
1706 (("3".to_owned(), "three".to_owned()), 3, 1),
1707 (("4".to_owned(), "vier".to_owned()), 4, 1),
1708 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1709 ];
1710
1711 let id = ShardId::new();
1712 let client = new_test_client(&dyncfgs).await;
1713
1714 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1715
1716 write.expect_compare_and_append(&data[..2], 0, 3).await;
1718 assert_eq!(write.upper(), &Antichain::from_elem(3));
1719
1720 let result = write
1723 .compare_and_append(
1724 &data[4..5],
1725 Antichain::from_elem(5),
1726 Antichain::from_elem(6),
1727 )
1728 .await;
1729 assert_eq!(
1730 result,
1731 Ok(Err(UpperMismatch {
1732 expected: Antichain::from_elem(5),
1733 current: Antichain::from_elem(3)
1734 }))
1735 );
1736
1737 write.expect_compare_and_append(&data[2..5], 3, 6).await;
1740 assert_eq!(write.upper(), &Antichain::from_elem(6));
1741
1742 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1743 }
1744
1745 #[mz_persist_proc::test(tokio::test)]
1748 #[cfg_attr(miri, ignore)] async fn noncontiguous_compare_and_append_per_writer(dyncfgs: ConfigUpdates) {
1750 let data = vec![
1751 (("1".to_owned(), "one".to_owned()), 1, 1),
1752 (("2".to_owned(), "two".to_owned()), 2, 1),
1753 (("3".to_owned(), "three".to_owned()), 3, 1),
1754 (("4".to_owned(), "vier".to_owned()), 4, 1),
1755 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1756 ];
1757
1758 let id = ShardId::new();
1759 let client = new_test_client(&dyncfgs).await;
1760
1761 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1762
1763 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1764
1765 write1.expect_compare_and_append(&data[..2], 0, 3).await;
1767 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1768
1769 write2.expect_compare_and_append(&data[2..4], 3, 5).await;
1771 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1772
1773 write1.expect_compare_and_append(&data[4..5], 5, 6).await;
1775 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1776
1777 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1778 }
1779
1780 #[mz_ore::test]
1781 fn fmt_ids() {
1782 assert_eq!(
1783 format!("{}", LeasedReaderId([0u8; 16])),
1784 "r00000000-0000-0000-0000-000000000000"
1785 );
1786 assert_eq!(
1787 format!("{:?}", LeasedReaderId([0u8; 16])),
1788 "LeasedReaderId(00000000-0000-0000-0000-000000000000)"
1789 );
1790 }
1791
1792 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
1793 #[cfg_attr(miri, ignore)] async fn concurrency(dyncfgs: ConfigUpdates) {
1795 let data = DataGenerator::small();
1796
1797 const NUM_WRITERS: usize = 2;
1798 let id = ShardId::new();
1799 let client = new_test_client(&dyncfgs).await;
1800 let mut handles = Vec::<mz_ore::task::JoinHandle<()>>::new();
1801 for idx in 0..NUM_WRITERS {
1802 let (data, client) = (data.clone(), client.clone());
1803
1804 let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel(1);
1805
1806 let client1 = client.clone();
1807 let handle = mz_ore::task::spawn(|| format!("writer-{}", idx), async move {
1808 let (write, _) = client1.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1809 let mut current_upper = 0;
1810 for batch in data.batches() {
1811 let new_upper = match batch.get(batch.len() - 1) {
1812 Some((_, max_ts, _)) => u64::decode(max_ts) + 1,
1813 None => continue,
1814 };
1815 if PartialOrder::less_equal(&Antichain::from_elem(new_upper), write.upper()) {
1830 continue;
1831 }
1832
1833 let current_upper_chain = Antichain::from_elem(current_upper);
1834 current_upper = new_upper;
1835 let new_upper_chain = Antichain::from_elem(new_upper);
1836 let mut builder = write.builder(current_upper_chain);
1837
1838 for ((k, v), t, d) in batch.iter() {
1839 builder
1840 .add(&k.to_vec(), &v.to_vec(), &u64::decode(t), &i64::decode(d))
1841 .await
1842 .expect("invalid usage");
1843 }
1844
1845 let batch = builder
1846 .finish(new_upper_chain)
1847 .await
1848 .expect("invalid usage");
1849
1850 match batch_tx.send(batch).await {
1851 Ok(_) => (),
1852 Err(e) => panic!("send error: {}", e),
1853 }
1854 }
1855 });
1856 handles.push(handle);
1857
1858 let handle = mz_ore::task::spawn(|| format!("appender-{}", idx), async move {
1859 let (mut write, _) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1860
1861 while let Some(batch) = batch_rx.recv().await {
1862 let lower = batch.lower().clone();
1863 let upper = batch.upper().clone();
1864 write
1865 .append_batch(batch, lower, upper)
1866 .await
1867 .expect("invalid usage")
1868 .expect("unexpected upper");
1869 }
1870 });
1871 handles.push(handle);
1872 }
1873
1874 for handle in handles {
1875 let () = handle.await.expect("task failed");
1876 }
1877
1878 let expected = data.records().collect::<Vec<_>>();
1879 let max_ts = expected.last().map(|(_, t, _)| *t).unwrap_or_default();
1880 let (_, mut read) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1881 assert_eq!(
1882 read.expect_snapshot_and_fetch(max_ts).await,
1883 all_ok(expected.iter(), max_ts)
1884 );
1885 }
1886
1887 #[mz_persist_proc::test(tokio::test)]
1891 #[cfg_attr(miri, ignore)] async fn regression_blocking_reads(dyncfgs: ConfigUpdates) {
1893 let waker = noop_waker();
1894 let mut cx = Context::from_waker(&waker);
1895
1896 let data = [
1897 (("1".to_owned(), "one".to_owned()), 1, 1),
1898 (("2".to_owned(), "two".to_owned()), 2, 1),
1899 (("3".to_owned(), "three".to_owned()), 3, 1),
1900 ];
1901
1902 let id = ShardId::new();
1903 let client = new_test_client(&dyncfgs).await;
1904 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1905
1906 let mut listen = read.clone("").await.expect_listen(1).await;
1908 let mut listen_next = Box::pin(listen.fetch_next());
1909 for _ in 0..100 {
1913 assert!(
1914 Pin::new(&mut listen_next).poll(&mut cx).is_pending(),
1915 "listen::next unexpectedly ready"
1916 );
1917 }
1918
1919 write
1921 .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1922 .await;
1923
1924 assert_eq!(
1927 listen_next.await,
1928 vec![
1929 ListenEvent::Updates(vec![((Ok("2".to_owned()), Ok("two".to_owned())), 2, 1)]),
1930 ListenEvent::Progress(Antichain::from_elem(3)),
1931 ]
1932 );
1933
1934 let mut snap = Box::pin(read.expect_snapshot_and_fetch(3));
1948 for _ in 0..100 {
1949 assert!(
1950 Pin::new(&mut snap).poll(&mut cx).is_pending(),
1951 "snapshot unexpectedly ready"
1952 );
1953 }
1954
1955 write.expect_compare_and_append(&data[2..], 3, 4).await;
1957
1958 assert_eq!(snap.await, all_ok(&data[..], 3));
1960 }
1961
1962 #[mz_persist_proc::test(tokio::test)]
1963 #[cfg_attr(miri, ignore)] async fn heartbeat_task_shutdown(dyncfgs: ConfigUpdates) {
1965 let mut cache = new_test_client_cache(&dyncfgs);
1968 cache
1969 .cfg
1970 .set_config(&READER_LEASE_DURATION, Duration::from_millis(1));
1971 cache.cfg.writer_lease_duration = Duration::from_millis(1);
1972 let (_write, mut read) = cache
1973 .open(PersistLocation::new_in_mem())
1974 .await
1975 .expect("client construction failed")
1976 .expect_open::<(), (), u64, i64>(ShardId::new())
1977 .await;
1978 let mut read_unexpired_state = read
1979 .unexpired_state
1980 .take()
1981 .expect("handle should have unexpired state");
1982 read.expire().await;
1983 for read_heartbeat_task in mem::take(&mut read_unexpired_state._heartbeat_tasks) {
1984 let () = read_heartbeat_task
1985 .await
1986 .expect("task should shutdown cleanly");
1987 }
1988 }
1989
1990 #[mz_persist_proc::test(tokio::test)]
1993 #[cfg_attr(miri, ignore)] async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
1995 const EMPTY: &[(((), ()), u64, i64)] = &[];
1996 let persist_client = new_test_client(&dyncfgs).await;
1997
1998 let shard_id = ShardId::new();
1999 pub const CRITICAL_SINCE: CriticalReaderId =
2000 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2001
2002 let (mut write, mut read) = persist_client
2003 .expect_open::<(), (), u64, i64>(shard_id)
2004 .await;
2005
2006 let () = read.downgrade_since(&Antichain::new()).await;
2009 let () = write
2010 .compare_and_append(EMPTY, Antichain::from_elem(0), Antichain::new())
2011 .await
2012 .expect("usage should be valid")
2013 .expect("upper should match");
2014
2015 let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
2016 .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
2017 .await
2018 .expect("invalid persist usage");
2019
2020 let epoch = since_handle.opaque().clone();
2021 let new_since = Antichain::new();
2022 let downgrade = since_handle
2023 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2024 .await;
2025
2026 assert!(
2027 downgrade.is_ok(),
2028 "downgrade of critical handle must succeed"
2029 );
2030
2031 let finalize = persist_client
2032 .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2033 .await;
2034
2035 assert_ok!(finalize, "finalization must succeed");
2036
2037 let is_finalized = persist_client
2038 .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2039 .await
2040 .expect("invalid persist usage");
2041 assert!(is_finalized, "shard must still be finalized");
2042 }
2043
2044 #[mz_persist_proc::test(tokio::test)]
2048 #[cfg_attr(miri, ignore)] async fn finalize_shard(dyncfgs: ConfigUpdates) {
2050 const EMPTY: &[(((), ()), u64, i64)] = &[];
2051 const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
2052 let persist_client = new_test_client(&dyncfgs).await;
2053
2054 let shard_id = ShardId::new();
2055 pub const CRITICAL_SINCE: CriticalReaderId =
2056 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2057
2058 let (mut write, mut read) = persist_client
2059 .expect_open::<(), (), u64, i64>(shard_id)
2060 .await;
2061
2062 let () = write
2064 .compare_and_append(DATA, Antichain::from_elem(0), Antichain::from_elem(1))
2065 .await
2066 .expect("usage should be valid")
2067 .expect("upper should match");
2068
2069 let () = read.downgrade_since(&Antichain::new()).await;
2072 let () = write
2073 .compare_and_append(EMPTY, Antichain::from_elem(1), Antichain::new())
2074 .await
2075 .expect("usage should be valid")
2076 .expect("upper should match");
2077
2078 let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
2079 .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
2080 .await
2081 .expect("invalid persist usage");
2082
2083 let epoch = since_handle.opaque().clone();
2084 let new_since = Antichain::new();
2085 let downgrade = since_handle
2086 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2087 .await;
2088
2089 assert!(
2090 downgrade.is_ok(),
2091 "downgrade of critical handle must succeed"
2092 );
2093
2094 let finalize = persist_client
2095 .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2096 .await;
2097
2098 assert_ok!(finalize, "finalization must succeed");
2099
2100 let is_finalized = persist_client
2101 .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2102 .await
2103 .expect("invalid persist usage");
2104 assert!(is_finalized, "shard must still be finalized");
2105 }
2106
2107 proptest! {
2108 #![proptest_config(ProptestConfig::with_cases(4096))]
2109
2110 #[mz_ore::test]
2111 #[cfg_attr(miri, ignore)] fn shard_id_protobuf_roundtrip(expect in any::<ShardId>() ) {
2113 let actual = protobuf_roundtrip::<_, String>(&expect);
2114 assert_ok!(actual);
2115 assert_eq!(actual.unwrap(), expect);
2116 }
2117 }
2118}