1#![warn(missing_docs, missing_debug_implementations)]
13#![allow(ungated_async_fn_track_caller)]
17
18use std::fmt::Debug;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use differential_dataflow::difference::Monoid;
23use differential_dataflow::lattice::Lattice;
24use itertools::Itertools;
25use mz_build_info::{BuildInfo, build_info};
26use mz_dyncfg::ConfigSet;
27use mz_ore::instrument;
28use mz_persist::location::{Blob, Consensus, ExternalError};
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::{Codec, Codec64, Opaque};
31use mz_proto::{IntoRustIfSome, ProtoType};
32use semver::Version;
33use timely::order::TotalOrder;
34use timely::progress::{Antichain, Timestamp};
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::batch::{BATCH_DELETE_ENABLED, Batch, BatchBuilder, ProtoBatch};
38use crate::cache::{PersistClientCache, StateCache};
39use crate::cfg::PersistConfig;
40use crate::critical::{CriticalReaderId, SinceHandle};
41use crate::error::InvalidUsage;
42use crate::fetch::{BatchFetcher, BatchFetcherConfig};
43use crate::internal::compact::{CompactConfig, Compactor};
44use crate::internal::encoding::parse_id;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::machine::{Machine, retry_external};
47use crate::internal::state_versions::StateVersions;
48use crate::metrics::Metrics;
49use crate::read::{
50 Cursor, LazyPartStats, LeasedReaderId, READER_LEASE_DURATION, ReadHandle, Since,
51};
52use crate::rpc::PubSubSender;
53use crate::schema::CaESchema;
54use crate::write::{WriteHandle, WriterId};
55
56pub mod async_runtime;
57pub mod batch;
58pub mod cache;
59pub mod cfg;
60pub mod cli {
61 pub mod admin;
63 pub mod args;
64 pub mod bench;
65 pub mod inspect;
66}
67pub mod critical;
68pub mod error;
69pub mod fetch;
70pub mod internals_bench;
71pub mod iter;
72pub mod metrics {
73 pub use crate::internal::metrics::{
75 Metrics, SinkMetrics, SinkWorkerMetrics, UpdateDelta, encode_ts_metric,
76 };
77}
78pub mod operators {
79 use mz_dyncfg::Config;
82
83 pub mod shard_source;
84
85 pub(crate) const STORAGE_SOURCE_DECODE_FUEL: Config<usize> = Config::new(
87 "storage_source_decode_fuel",
88 100_000,
89 "\
90 The maximum amount of work to do in the persist_source mfp_and_decode \
91 operator before yielding.",
92 );
93}
94pub mod read;
95pub mod rpc;
96pub mod schema;
97pub mod stats;
98pub mod usage;
99pub mod write;
100
101mod internal {
103 pub mod apply;
104 pub mod cache;
105 pub mod compact;
106 pub mod encoding;
107 pub mod gc;
108 pub mod machine;
109 pub mod maintenance;
110 pub mod merge;
111 pub mod metrics;
112 pub mod paths;
113 pub mod restore;
114 pub mod service;
115 pub mod state;
116 pub mod state_diff;
117 pub mod state_versions;
118 pub mod trace;
119 pub mod watch;
120
121 #[cfg(test)]
122 pub mod datadriven;
123}
124
125pub const BUILD_INFO: BuildInfo = build_info!();
127
128pub use mz_persist_types::{PersistLocation, ShardId};
130
131pub use crate::internal::encoding::Schemas;
132
133#[derive(Clone, Debug)]
136pub struct Diagnostics {
137 pub shard_name: String,
139 pub handle_purpose: String,
141}
142
143impl Diagnostics {
144 pub fn from_purpose(handle_purpose: &str) -> Self {
146 Self {
147 shard_name: "unknown".to_string(),
148 handle_purpose: handle_purpose.to_string(),
149 }
150 }
151
152 pub fn for_tests() -> Self {
154 Self {
155 shard_name: "test-shard-name".to_string(),
156 handle_purpose: "test-purpose".to_string(),
157 }
158 }
159}
160
161#[derive(Debug, Clone)]
182pub struct PersistClient {
183 cfg: PersistConfig,
184 blob: Arc<dyn Blob>,
185 consensus: Arc<dyn Consensus>,
186 metrics: Arc<Metrics>,
187 isolated_runtime: Arc<IsolatedRuntime>,
188 shared_states: Arc<StateCache>,
189 pubsub_sender: Arc<dyn PubSubSender>,
190}
191
192impl PersistClient {
193 pub fn new(
199 cfg: PersistConfig,
200 blob: Arc<dyn Blob>,
201 consensus: Arc<dyn Consensus>,
202 metrics: Arc<Metrics>,
203 isolated_runtime: Arc<IsolatedRuntime>,
204 shared_states: Arc<StateCache>,
205 pubsub_sender: Arc<dyn PubSubSender>,
206 ) -> Result<Self, ExternalError> {
207 Ok(PersistClient {
210 cfg,
211 blob,
212 consensus,
213 metrics,
214 isolated_runtime,
215 shared_states,
216 pubsub_sender,
217 })
218 }
219
220 pub async fn new_for_tests() -> Self {
222 let cache = PersistClientCache::new_no_metrics();
223 cache
224 .open(PersistLocation::new_in_mem())
225 .await
226 .expect("in-mem location is valid")
227 }
228
229 pub fn dyncfgs(&self) -> &ConfigSet {
231 &self.cfg.configs
232 }
233
234 async fn make_machine<K, V, T, D>(
235 &self,
236 shard_id: ShardId,
237 diagnostics: Diagnostics,
238 ) -> Result<Machine<K, V, T, D>, InvalidUsage<T>>
239 where
240 K: Debug + Codec,
241 V: Debug + Codec,
242 T: Timestamp + Lattice + Codec64 + Sync,
243 D: Monoid + Codec64 + Send + Sync,
244 {
245 let state_versions = StateVersions::new(
246 self.cfg.clone(),
247 Arc::clone(&self.consensus),
248 Arc::clone(&self.blob),
249 Arc::clone(&self.metrics),
250 );
251 let machine = Machine::<K, V, T, D>::new(
252 self.cfg.clone(),
253 shard_id,
254 Arc::clone(&self.metrics),
255 Arc::new(state_versions),
256 Arc::clone(&self.shared_states),
257 Arc::clone(&self.pubsub_sender),
258 Arc::clone(&self.isolated_runtime),
259 diagnostics.clone(),
260 )
261 .await?;
262 Ok(machine)
263 }
264
265 #[instrument(level = "debug", fields(shard = %shard_id))]
283 pub async fn open<K, V, T, D>(
284 &self,
285 shard_id: ShardId,
286 key_schema: Arc<K::Schema>,
287 val_schema: Arc<V::Schema>,
288 diagnostics: Diagnostics,
289 use_critical_since: bool,
290 ) -> Result<(WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>), InvalidUsage<T>>
291 where
292 K: Debug + Codec,
293 V: Debug + Codec,
294 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
295 D: Monoid + Ord + Codec64 + Send + Sync,
296 {
297 Ok((
298 self.open_writer(
299 shard_id,
300 Arc::clone(&key_schema),
301 Arc::clone(&val_schema),
302 diagnostics.clone(),
303 )
304 .await?,
305 self.open_leased_reader(
306 shard_id,
307 key_schema,
308 val_schema,
309 diagnostics,
310 use_critical_since,
311 )
312 .await?,
313 ))
314 }
315
316 #[instrument(level = "debug", fields(shard = %shard_id))]
325 pub async fn open_leased_reader<K, V, T, D>(
326 &self,
327 shard_id: ShardId,
328 key_schema: Arc<K::Schema>,
329 val_schema: Arc<V::Schema>,
330 diagnostics: Diagnostics,
331 use_critical_since: bool,
332 ) -> Result<ReadHandle<K, V, T, D>, InvalidUsage<T>>
333 where
334 K: Debug + Codec,
335 V: Debug + Codec,
336 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
337 D: Monoid + Codec64 + Send + Sync,
338 {
339 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
340 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
341
342 let reader_id = LeasedReaderId::new();
343 let heartbeat_ts = (self.cfg.now)();
344 let (reader_state, maintenance) = machine
345 .register_leased_reader(
346 &reader_id,
347 &diagnostics.handle_purpose,
348 READER_LEASE_DURATION.get(&self.cfg),
349 heartbeat_ts,
350 use_critical_since,
351 )
352 .await;
353 maintenance.start_performing(&machine, &gc);
354 let schemas = Schemas {
355 id: None,
356 key: key_schema,
357 val: val_schema,
358 };
359 let reader = ReadHandle::new(
360 self.cfg.clone(),
361 Arc::clone(&self.metrics),
362 machine,
363 gc,
364 Arc::clone(&self.blob),
365 reader_id,
366 schemas,
367 reader_state.since,
368 heartbeat_ts,
369 )
370 .await;
371
372 Ok(reader)
373 }
374
375 #[instrument(level = "debug", fields(shard = %shard_id))]
377 pub async fn create_batch_fetcher<K, V, T, D>(
378 &self,
379 shard_id: ShardId,
380 key_schema: Arc<K::Schema>,
381 val_schema: Arc<V::Schema>,
382 is_transient: bool,
383 diagnostics: Diagnostics,
384 ) -> Result<BatchFetcher<K, V, T, D>, InvalidUsage<T>>
385 where
386 K: Debug + Codec,
387 V: Debug + Codec,
388 T: Timestamp + Lattice + Codec64 + Sync,
389 D: Monoid + Codec64 + Send + Sync,
390 {
391 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
392 let read_schemas = Schemas {
393 id: None,
394 key: key_schema,
395 val: val_schema,
396 };
397 let schema_cache = machine.applier.schema_cache();
398 let fetcher = BatchFetcher {
399 cfg: BatchFetcherConfig::new(&self.cfg),
400 blob: Arc::clone(&self.blob),
401 metrics: Arc::clone(&self.metrics),
402 shard_metrics: Arc::clone(&machine.applier.shard_metrics),
403 shard_id,
404 read_schemas,
405 schema_cache,
406 is_transient,
407 _phantom: PhantomData,
408 };
409
410 Ok(fetcher)
411 }
412
413 pub const CONTROLLER_CRITICAL_SINCE: CriticalReaderId =
435 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
436
437 #[instrument(level = "debug", fields(shard = %shard_id))]
458 pub async fn open_critical_since<K, V, T, D, O>(
459 &self,
460 shard_id: ShardId,
461 reader_id: CriticalReaderId,
462 diagnostics: Diagnostics,
463 ) -> Result<SinceHandle<K, V, T, D, O>, InvalidUsage<T>>
464 where
465 K: Debug + Codec,
466 V: Debug + Codec,
467 T: Timestamp + Lattice + Codec64 + Sync,
468 D: Monoid + Codec64 + Send + Sync,
469 O: Opaque + Codec64,
470 {
471 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
472 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
473
474 let (state, maintenance) = machine
475 .register_critical_reader::<O>(&reader_id, &diagnostics.handle_purpose)
476 .await;
477 maintenance.start_performing(&machine, &gc);
478 let handle = SinceHandle::new(
479 machine,
480 gc,
481 reader_id,
482 state.since,
483 Codec64::decode(state.opaque.0),
484 );
485
486 Ok(handle)
487 }
488
489 #[instrument(level = "debug", fields(shard = %shard_id))]
494 pub async fn open_writer<K, V, T, D>(
495 &self,
496 shard_id: ShardId,
497 key_schema: Arc<K::Schema>,
498 val_schema: Arc<V::Schema>,
499 diagnostics: Diagnostics,
500 ) -> Result<WriteHandle<K, V, T, D>, InvalidUsage<T>>
501 where
502 K: Debug + Codec,
503 V: Debug + Codec,
504 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
505 D: Monoid + Ord + Codec64 + Send + Sync,
506 {
507 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
508 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
509
510 let schema_id = machine.find_schema(&*key_schema, &*val_schema);
515
516 let writer_id = WriterId::new();
517 let schemas = Schemas {
518 id: schema_id,
519 key: key_schema,
520 val: val_schema,
521 };
522 let writer = WriteHandle::new(
523 self.cfg.clone(),
524 Arc::clone(&self.metrics),
525 machine,
526 gc,
527 Arc::clone(&self.blob),
528 writer_id,
529 &diagnostics.handle_purpose,
530 schemas,
531 );
532 Ok(writer)
533 }
534
535 #[instrument(level = "debug", fields(shard = %shard_id))]
545 pub async fn batch_builder<K, V, T, D>(
546 &self,
547 shard_id: ShardId,
548 write_schemas: Schemas<K, V>,
549 lower: Antichain<T>,
550 max_runs: Option<usize>,
551 ) -> BatchBuilder<K, V, T, D>
552 where
553 K: Debug + Codec,
554 V: Debug + Codec,
555 T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
556 D: Monoid + Ord + Codec64 + Send + Sync,
557 {
558 let mut compact_cfg = CompactConfig::new(&self.cfg, shard_id);
559 compact_cfg.batch.max_runs = max_runs;
560 WriteHandle::builder_inner(
561 &self.cfg,
562 compact_cfg,
563 Arc::clone(&self.metrics),
564 self.metrics.shards.shard(&shard_id, "peek_stash"),
565 &self.metrics.user,
566 Arc::clone(&self.isolated_runtime),
567 Arc::clone(&self.blob),
568 shard_id,
569 write_schemas,
570 lower,
571 )
572 }
573
574 pub fn batch_from_transmittable_batch<K, V, T, D>(
583 &self,
584 shard_id: &ShardId,
585 batch: ProtoBatch,
586 ) -> Batch<K, V, T, D>
587 where
588 K: Debug + Codec,
589 V: Debug + Codec,
590 T: Timestamp + Lattice + Codec64 + Sync,
591 D: Monoid + Ord + Codec64 + Send + Sync,
592 {
593 let batch_shard_id: ShardId = batch
594 .shard_id
595 .into_rust()
596 .expect("valid transmittable batch");
597 assert_eq!(&batch_shard_id, shard_id);
598
599 let shard_metrics = self.metrics.shards.shard(shard_id, "peek_stash");
600
601 let ret = Batch {
602 batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
603 metrics: Arc::clone(&self.metrics),
604 shard_metrics,
605 version: Version::parse(&batch.version).expect("valid transmittable batch"),
606 schemas: (batch.key_schema, batch.val_schema),
607 batch: batch
608 .batch
609 .into_rust_if_some("ProtoBatch::batch")
610 .expect("valid transmittable batch"),
611 blob: Arc::clone(&self.blob),
612 _phantom: std::marker::PhantomData,
613 };
614
615 assert_eq!(&ret.shard_id(), shard_id);
616 ret
617 }
618
619 #[allow(clippy::unused_async)]
634 pub async fn read_batches_consolidated<K, V, T, D>(
635 &mut self,
636 shard_id: ShardId,
637 as_of: Antichain<T>,
638 read_schemas: Schemas<K, V>,
639 batches: Vec<Batch<K, V, T, D>>,
640 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
641 memory_budget_bytes: usize,
642 ) -> Result<Cursor<K, V, T, D, Vec<Batch<K, V, T, D>>>, Since<T>>
643 where
644 K: Debug + Codec + Ord,
645 V: Debug + Codec + Ord,
646 T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
647 D: Monoid + Ord + Codec64 + Send + Sync,
648 {
649 let shard_metrics = self.metrics.shards.shard(&shard_id, "peek_stash");
650
651 let hollow_batches = batches.iter().map(|b| b.batch.clone()).collect_vec();
652
653 ReadHandle::read_batches_consolidated(
654 &self.cfg,
655 Arc::clone(&self.metrics),
656 shard_metrics,
657 self.metrics.read.snapshot.clone(),
658 Arc::clone(&self.blob),
659 shard_id,
660 as_of,
661 read_schemas,
662 &hollow_batches,
663 batches,
664 should_fetch_part,
665 memory_budget_bytes,
666 )
667 }
668
669 pub async fn get_schema<K, V, T, D>(
671 &self,
672 shard_id: ShardId,
673 schema_id: SchemaId,
674 diagnostics: Diagnostics,
675 ) -> Result<Option<(K::Schema, V::Schema)>, InvalidUsage<T>>
676 where
677 K: Debug + Codec,
678 V: Debug + Codec,
679 T: Timestamp + Lattice + Codec64 + Sync,
680 D: Monoid + Codec64 + Send + Sync,
681 {
682 let machine = self
683 .make_machine::<K, V, T, D>(shard_id, diagnostics)
684 .await?;
685 Ok(machine.get_schema(schema_id))
686 }
687
688 pub async fn latest_schema<K, V, T, D>(
690 &self,
691 shard_id: ShardId,
692 diagnostics: Diagnostics,
693 ) -> Result<Option<(SchemaId, K::Schema, V::Schema)>, InvalidUsage<T>>
694 where
695 K: Debug + Codec,
696 V: Debug + Codec,
697 T: Timestamp + Lattice + Codec64 + Sync,
698 D: Monoid + Codec64 + Send + Sync,
699 {
700 let machine = self
701 .make_machine::<K, V, T, D>(shard_id, diagnostics)
702 .await?;
703 Ok(machine.latest_schema())
704 }
705
706 pub async fn register_schema<K, V, T, D>(
718 &self,
719 shard_id: ShardId,
720 key_schema: &K::Schema,
721 val_schema: &V::Schema,
722 diagnostics: Diagnostics,
723 ) -> Result<Option<SchemaId>, InvalidUsage<T>>
724 where
725 K: Debug + Codec,
726 V: Debug + Codec,
727 T: Timestamp + Lattice + Codec64 + Sync,
728 D: Monoid + Codec64 + Send + Sync,
729 {
730 let machine = self
731 .make_machine::<K, V, T, D>(shard_id, diagnostics)
732 .await?;
733 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
734
735 let (schema_id, maintenance) = machine.register_schema(key_schema, val_schema).await;
736 maintenance.start_performing(&machine, &gc);
737
738 Ok(schema_id)
739 }
740
741 pub async fn compare_and_evolve_schema<K, V, T, D>(
752 &self,
753 shard_id: ShardId,
754 expected: SchemaId,
755 key_schema: &K::Schema,
756 val_schema: &V::Schema,
757 diagnostics: Diagnostics,
758 ) -> Result<CaESchema<K, V>, InvalidUsage<T>>
759 where
760 K: Debug + Codec,
761 V: Debug + Codec,
762 T: Timestamp + Lattice + Codec64 + Sync,
763 D: Monoid + Codec64 + Send + Sync,
764 {
765 let machine = self
766 .make_machine::<K, V, T, D>(shard_id, diagnostics)
767 .await?;
768 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
769 let (res, maintenance) = machine
770 .compare_and_evolve_schema(expected, key_schema, val_schema)
771 .await;
772 maintenance.start_performing(&machine, &gc);
773 Ok(res)
774 }
775
776 pub async fn is_finalized<K, V, T, D>(
780 &self,
781 shard_id: ShardId,
782 diagnostics: Diagnostics,
783 ) -> Result<bool, InvalidUsage<T>>
784 where
785 K: Debug + Codec,
786 V: Debug + Codec,
787 T: Timestamp + Lattice + Codec64 + Sync,
788 D: Monoid + Codec64 + Send + Sync,
789 {
790 let machine = self
791 .make_machine::<K, V, T, D>(shard_id, diagnostics)
792 .await?;
793 Ok(machine.is_finalized())
794 }
795
796 #[instrument(level = "debug", fields(shard = %shard_id))]
807 pub async fn finalize_shard<K, V, T, D>(
808 &self,
809 shard_id: ShardId,
810 diagnostics: Diagnostics,
811 ) -> Result<(), InvalidUsage<T>>
812 where
813 K: Debug + Codec,
814 V: Debug + Codec,
815 T: Timestamp + Lattice + Codec64 + Sync,
816 D: Monoid + Codec64 + Send + Sync,
817 {
818 let machine = self
819 .make_machine::<K, V, T, D>(shard_id, diagnostics)
820 .await?;
821
822 let maintenance = machine.become_tombstone().await?;
823 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
824
825 let () = maintenance.perform(&machine, &gc).await;
826
827 Ok(())
828 }
829
830 pub async fn upgrade_version<K, V, T, D>(
833 &self,
834 shard_id: ShardId,
835 diagnostics: Diagnostics,
836 ) -> Result<(), InvalidUsage<T>>
837 where
838 K: Debug + Codec,
839 V: Debug + Codec,
840 T: Timestamp + Lattice + Codec64 + Sync,
841 D: Monoid + Codec64 + Send + Sync,
842 {
843 let machine = self
844 .make_machine::<K, V, T, D>(shard_id, diagnostics)
845 .await?;
846
847 match machine.upgrade_version().await {
848 Ok(maintenance) => {
849 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
850 let () = maintenance.perform(&machine, &gc).await;
851 Ok(())
852 }
853 Err(version) => Err(InvalidUsage::IncompatibleVersion { version }),
854 }
855 }
856
857 pub async fn inspect_shard<T: Timestamp + Lattice + Codec64>(
863 &self,
864 shard_id: &ShardId,
865 ) -> Result<impl serde::Serialize, anyhow::Error> {
866 let state_versions = StateVersions::new(
867 self.cfg.clone(),
868 Arc::clone(&self.consensus),
869 Arc::clone(&self.blob),
870 Arc::clone(&self.metrics),
871 );
872 let versions = state_versions.fetch_all_live_diffs(shard_id).await;
876 if versions.0.is_empty() {
877 return Err(anyhow::anyhow!("{} does not exist", shard_id));
878 }
879 let state = state_versions
880 .fetch_current_state::<T>(shard_id, versions.0)
881 .await;
882 let state = state.check_ts_codec(shard_id)?;
883 Ok(state)
884 }
885
886 #[cfg(test)]
888 #[track_caller]
889 pub async fn expect_open<K, V, T, D>(
890 &self,
891 shard_id: ShardId,
892 ) -> (WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>)
893 where
894 K: Debug + Codec,
895 V: Debug + Codec,
896 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
897 D: Monoid + Ord + Codec64 + Send + Sync,
898 K::Schema: Default,
899 V::Schema: Default,
900 {
901 self.open(
902 shard_id,
903 Arc::new(K::Schema::default()),
904 Arc::new(V::Schema::default()),
905 Diagnostics::for_tests(),
906 true,
907 )
908 .await
909 .expect("codec mismatch")
910 }
911
912 pub fn metrics(&self) -> &Arc<Metrics> {
916 &self.metrics
917 }
918}
919
920#[cfg(test)]
921mod tests {
922 use std::future::Future;
923 use std::mem;
924 use std::pin::Pin;
925 use std::task::Context;
926 use std::time::Duration;
927
928 use differential_dataflow::consolidation::consolidate_updates;
929 use differential_dataflow::lattice::Lattice;
930 use futures_task::noop_waker;
931 use mz_dyncfg::ConfigUpdates;
932 use mz_ore::assert_ok;
933 use mz_persist::indexed::encoding::BlobTraceBatchPart;
934 use mz_persist::workload::DataGenerator;
935 use mz_persist_types::codec_impls::{StringSchema, VecU8Schema};
936 use mz_proto::protobuf_roundtrip;
937 use proptest::prelude::*;
938 use timely::order::PartialOrder;
939 use timely::progress::Antichain;
940
941 use crate::batch::BLOB_TARGET_SIZE;
942 use crate::cache::PersistClientCache;
943 use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
944 use crate::error::{CodecConcreteType, CodecMismatch, UpperMismatch};
945 use crate::internal::paths::BlobKey;
946 use crate::read::ListenEvent;
947
948 use super::*;
949
950 pub fn new_test_client_cache(dyncfgs: &ConfigUpdates) -> PersistClientCache {
951 let mut cache = PersistClientCache::new_no_metrics();
954 cache.cfg.set_config(&BLOB_TARGET_SIZE, 10);
955 cache
956 .cfg
957 .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 1);
958 dyncfgs.apply(cache.cfg());
959
960 cache.cfg.compaction_enabled = true;
962 cache
963 }
964
965 pub async fn new_test_client(dyncfgs: &ConfigUpdates) -> PersistClient {
966 let cache = new_test_client_cache(dyncfgs);
967 cache
968 .open(PersistLocation::new_in_mem())
969 .await
970 .expect("client construction failed")
971 }
972
973 pub fn all_ok<'a, K, V, T, D, I>(
974 iter: I,
975 as_of: T,
976 ) -> Vec<((Result<K, String>, Result<V, String>), T, D)>
977 where
978 K: Ord + Clone + 'a,
979 V: Ord + Clone + 'a,
980 T: Timestamp + Lattice + Clone + 'a,
981 D: Monoid + Clone + 'a,
982 I: IntoIterator<Item = &'a ((K, V), T, D)>,
983 {
984 let as_of = Antichain::from_elem(as_of);
985 let mut ret = iter
986 .into_iter()
987 .map(|((k, v), t, d)| {
988 let mut t = t.clone();
989 t.advance_by(as_of.borrow());
990 ((Ok(k.clone()), Ok(v.clone())), t, d.clone())
991 })
992 .collect();
993 consolidate_updates(&mut ret);
994 ret
995 }
996
997 pub async fn expect_fetch_part<K, V, T, D>(
998 blob: &dyn Blob,
999 key: &BlobKey,
1000 metrics: &Metrics,
1001 read_schemas: &Schemas<K, V>,
1002 ) -> (
1003 BlobTraceBatchPart<T>,
1004 Vec<((Result<K, String>, Result<V, String>), T, D)>,
1005 )
1006 where
1007 K: Codec,
1008 V: Codec,
1009 T: Timestamp + Codec64,
1010 D: Codec64,
1011 {
1012 let value = blob
1013 .get(key)
1014 .await
1015 .expect("failed to fetch part")
1016 .expect("missing part");
1017 let mut part =
1018 BlobTraceBatchPart::decode(&value, &metrics.columnar).expect("failed to decode part");
1019 let _ = part
1021 .updates
1022 .get_or_make_codec::<K, V>(&read_schemas.key, &read_schemas.val);
1023 let mut updates = Vec::new();
1024 for ((k, v), t, d) in part.updates.records().expect("codec data").iter() {
1026 updates.push((
1027 (
1028 K::decode(k, &read_schemas.key),
1029 V::decode(v, &read_schemas.val),
1030 ),
1031 T::decode(t),
1032 D::decode(d),
1033 ));
1034 }
1035 (part, updates)
1036 }
1037
1038 #[mz_persist_proc::test(tokio::test)]
1039 #[cfg_attr(miri, ignore)] async fn sanity_check(dyncfgs: ConfigUpdates) {
1041 let data = [
1042 (("1".to_owned(), "one".to_owned()), 1, 1),
1043 (("2".to_owned(), "two".to_owned()), 2, 1),
1044 (("3".to_owned(), "three".to_owned()), 3, 1),
1045 ];
1046
1047 let (mut write, mut read) = new_test_client(&dyncfgs)
1048 .await
1049 .expect_open::<String, String, u64, i64>(ShardId::new())
1050 .await;
1051 assert_eq!(write.upper(), &Antichain::from_elem(u64::minimum()));
1052 assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1053
1054 write
1056 .expect_append(&data[..2], write.upper().clone(), vec![3])
1057 .await;
1058 assert_eq!(write.upper(), &Antichain::from_elem(3));
1059
1060 assert_eq!(
1062 read.expect_snapshot_and_fetch(1).await,
1063 all_ok(&data[..1], 1)
1064 );
1065
1066 let mut listen = read.clone("").await.expect_listen(1).await;
1067
1068 write
1070 .expect_append(&data[2..], write.upper().clone(), vec![4])
1071 .await;
1072 assert_eq!(write.upper(), &Antichain::from_elem(4));
1073
1074 assert_eq!(
1076 listen.read_until(&4).await,
1077 (all_ok(&data[1..], 1), Antichain::from_elem(4))
1078 );
1079
1080 read.downgrade_since(&Antichain::from_elem(2)).await;
1082 assert_eq!(read.since(), &Antichain::from_elem(2));
1083 }
1084
1085 #[mz_persist_proc::test(tokio::test)]
1087 #[cfg_attr(miri, ignore)] async fn open_reader_writer(dyncfgs: ConfigUpdates) {
1089 let data = vec![
1090 (("1".to_owned(), "one".to_owned()), 1, 1),
1091 (("2".to_owned(), "two".to_owned()), 2, 1),
1092 (("3".to_owned(), "three".to_owned()), 3, 1),
1093 ];
1094
1095 let shard_id = ShardId::new();
1096 let client = new_test_client(&dyncfgs).await;
1097 let mut write1 = client
1098 .open_writer::<String, String, u64, i64>(
1099 shard_id,
1100 Arc::new(StringSchema),
1101 Arc::new(StringSchema),
1102 Diagnostics::for_tests(),
1103 )
1104 .await
1105 .expect("codec mismatch");
1106 let mut read1 = client
1107 .open_leased_reader::<String, String, u64, i64>(
1108 shard_id,
1109 Arc::new(StringSchema),
1110 Arc::new(StringSchema),
1111 Diagnostics::for_tests(),
1112 true,
1113 )
1114 .await
1115 .expect("codec mismatch");
1116 let mut read2 = client
1117 .open_leased_reader::<String, String, u64, i64>(
1118 shard_id,
1119 Arc::new(StringSchema),
1120 Arc::new(StringSchema),
1121 Diagnostics::for_tests(),
1122 true,
1123 )
1124 .await
1125 .expect("codec mismatch");
1126 let mut write2 = client
1127 .open_writer::<String, String, u64, i64>(
1128 shard_id,
1129 Arc::new(StringSchema),
1130 Arc::new(StringSchema),
1131 Diagnostics::for_tests(),
1132 )
1133 .await
1134 .expect("codec mismatch");
1135
1136 write2.expect_compare_and_append(&data[..1], 0, 2).await;
1137 assert_eq!(
1138 read2.expect_snapshot_and_fetch(1).await,
1139 all_ok(&data[..1], 1)
1140 );
1141 write1.expect_compare_and_append(&data[1..], 2, 4).await;
1142 assert_eq!(read1.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1143 }
1144
1145 #[mz_persist_proc::test(tokio::test)]
1146 #[cfg_attr(miri, ignore)] async fn invalid_usage(dyncfgs: ConfigUpdates) {
1148 let data = vec![
1149 (("1".to_owned(), "one".to_owned()), 1, 1),
1150 (("2".to_owned(), "two".to_owned()), 2, 1),
1151 (("3".to_owned(), "three".to_owned()), 3, 1),
1152 ];
1153
1154 let shard_id0 = "s00000000-0000-0000-0000-000000000000"
1155 .parse::<ShardId>()
1156 .expect("invalid shard id");
1157 let mut client = new_test_client(&dyncfgs).await;
1158
1159 let (mut write0, mut read0) = client
1160 .expect_open::<String, String, u64, i64>(shard_id0)
1161 .await;
1162
1163 write0.expect_compare_and_append(&data, 0, 4).await;
1164
1165 {
1167 fn codecs(
1168 k: &str,
1169 v: &str,
1170 t: &str,
1171 d: &str,
1172 ) -> (String, String, String, String, Option<CodecConcreteType>) {
1173 (k.to_owned(), v.to_owned(), t.to_owned(), d.to_owned(), None)
1174 }
1175
1176 client.shared_states = Arc::new(StateCache::new_no_metrics());
1177 assert_eq!(
1178 client
1179 .open::<Vec<u8>, String, u64, i64>(
1180 shard_id0,
1181 Arc::new(VecU8Schema),
1182 Arc::new(StringSchema),
1183 Diagnostics::for_tests(),
1184 true,
1185 )
1186 .await
1187 .unwrap_err(),
1188 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1189 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1190 actual: codecs("String", "String", "u64", "i64"),
1191 }))
1192 );
1193 assert_eq!(
1194 client
1195 .open::<String, Vec<u8>, u64, i64>(
1196 shard_id0,
1197 Arc::new(StringSchema),
1198 Arc::new(VecU8Schema),
1199 Diagnostics::for_tests(),
1200 true,
1201 )
1202 .await
1203 .unwrap_err(),
1204 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1205 requested: codecs("String", "Vec<u8>", "u64", "i64"),
1206 actual: codecs("String", "String", "u64", "i64"),
1207 }))
1208 );
1209 assert_eq!(
1210 client
1211 .open::<String, String, i64, i64>(
1212 shard_id0,
1213 Arc::new(StringSchema),
1214 Arc::new(StringSchema),
1215 Diagnostics::for_tests(),
1216 true,
1217 )
1218 .await
1219 .unwrap_err(),
1220 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1221 requested: codecs("String", "String", "i64", "i64"),
1222 actual: codecs("String", "String", "u64", "i64"),
1223 }))
1224 );
1225 assert_eq!(
1226 client
1227 .open::<String, String, u64, u64>(
1228 shard_id0,
1229 Arc::new(StringSchema),
1230 Arc::new(StringSchema),
1231 Diagnostics::for_tests(),
1232 true,
1233 )
1234 .await
1235 .unwrap_err(),
1236 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1237 requested: codecs("String", "String", "u64", "u64"),
1238 actual: codecs("String", "String", "u64", "i64"),
1239 }))
1240 );
1241
1242 assert_eq!(
1246 client
1247 .open_leased_reader::<Vec<u8>, String, u64, i64>(
1248 shard_id0,
1249 Arc::new(VecU8Schema),
1250 Arc::new(StringSchema),
1251 Diagnostics::for_tests(),
1252 true,
1253 )
1254 .await
1255 .unwrap_err(),
1256 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1257 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1258 actual: codecs("String", "String", "u64", "i64"),
1259 }))
1260 );
1261 assert_eq!(
1262 client
1263 .open_writer::<Vec<u8>, String, u64, i64>(
1264 shard_id0,
1265 Arc::new(VecU8Schema),
1266 Arc::new(StringSchema),
1267 Diagnostics::for_tests(),
1268 )
1269 .await
1270 .unwrap_err(),
1271 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1272 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1273 actual: codecs("String", "String", "u64", "i64"),
1274 }))
1275 );
1276 }
1277
1278 {
1280 let snap = read0
1281 .snapshot(Antichain::from_elem(3))
1282 .await
1283 .expect("cannot serve requested as_of");
1284
1285 let shard_id1 = "s11111111-1111-1111-1111-111111111111"
1286 .parse::<ShardId>()
1287 .expect("invalid shard id");
1288 let mut fetcher1 = client
1289 .create_batch_fetcher::<String, String, u64, i64>(
1290 shard_id1,
1291 Default::default(),
1292 Default::default(),
1293 false,
1294 Diagnostics::for_tests(),
1295 )
1296 .await
1297 .unwrap();
1298 for part in snap {
1299 let (part, _lease) = part.into_exchangeable_part();
1300 let res = fetcher1.fetch_leased_part(part).await;
1301 assert_eq!(
1302 res.unwrap_err(),
1303 InvalidUsage::BatchNotFromThisShard {
1304 batch_shard: shard_id0,
1305 handle_shard: shard_id1,
1306 }
1307 );
1308 }
1309 }
1310
1311 {
1313 let ts3 = &data[2];
1314 assert_eq!(ts3.1, 3);
1315 let ts3 = vec![ts3.clone()];
1316
1317 assert_eq!(
1320 write0
1321 .append(&ts3, Antichain::from_elem(4), Antichain::from_elem(5))
1322 .await
1323 .unwrap_err(),
1324 InvalidUsage::UpdateNotBeyondLower {
1325 ts: 3,
1326 lower: Antichain::from_elem(4),
1327 },
1328 );
1329 assert_eq!(
1330 write0
1331 .append(&ts3, Antichain::from_elem(2), Antichain::from_elem(3))
1332 .await
1333 .unwrap_err(),
1334 InvalidUsage::UpdateBeyondUpper {
1335 ts: 3,
1336 expected_upper: Antichain::from_elem(3),
1337 },
1338 );
1339 assert_eq!(
1341 write0
1342 .append(&data[..0], Antichain::from_elem(3), Antichain::from_elem(2))
1343 .await
1344 .unwrap_err(),
1345 InvalidUsage::InvalidBounds {
1346 lower: Antichain::from_elem(3),
1347 upper: Antichain::from_elem(2),
1348 },
1349 );
1350
1351 assert_eq!(
1353 write0
1354 .builder(Antichain::from_elem(3))
1355 .finish(Antichain::from_elem(2))
1356 .await
1357 .unwrap_err(),
1358 InvalidUsage::InvalidBounds {
1359 lower: Antichain::from_elem(3),
1360 upper: Antichain::from_elem(2)
1361 },
1362 );
1363 let batch = write0
1364 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1365 .await
1366 .expect("invalid usage");
1367 assert_eq!(
1368 write0
1369 .append_batch(batch, Antichain::from_elem(4), Antichain::from_elem(5))
1370 .await
1371 .unwrap_err(),
1372 InvalidUsage::InvalidBatchBounds {
1373 batch_lower: Antichain::from_elem(3),
1374 batch_upper: Antichain::from_elem(4),
1375 append_lower: Antichain::from_elem(4),
1376 append_upper: Antichain::from_elem(5),
1377 },
1378 );
1379 let batch = write0
1380 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1381 .await
1382 .expect("invalid usage");
1383 assert_eq!(
1384 write0
1385 .append_batch(batch, Antichain::from_elem(2), Antichain::from_elem(3))
1386 .await
1387 .unwrap_err(),
1388 InvalidUsage::InvalidBatchBounds {
1389 batch_lower: Antichain::from_elem(3),
1390 batch_upper: Antichain::from_elem(4),
1391 append_lower: Antichain::from_elem(2),
1392 append_upper: Antichain::from_elem(3),
1393 },
1394 );
1395 let batch = write0
1396 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1397 .await
1398 .expect("invalid usage");
1399 assert!(matches!(
1402 write0
1403 .append_batch(batch, Antichain::from_elem(3), Antichain::from_elem(3))
1404 .await
1405 .unwrap_err(),
1406 InvalidUsage::InvalidEmptyTimeInterval { .. }
1407 ));
1408 }
1409 }
1410
1411 #[mz_persist_proc::test(tokio::test)]
1412 #[cfg_attr(miri, ignore)] async fn multiple_shards(dyncfgs: ConfigUpdates) {
1414 let data1 = [
1415 (("1".to_owned(), "one".to_owned()), 1, 1),
1416 (("2".to_owned(), "two".to_owned()), 2, 1),
1417 ];
1418
1419 let data2 = [(("1".to_owned(), ()), 1, 1), (("2".to_owned(), ()), 2, 1)];
1420
1421 let client = new_test_client(&dyncfgs).await;
1422
1423 let (mut write1, mut read1) = client
1424 .expect_open::<String, String, u64, i64>(ShardId::new())
1425 .await;
1426
1427 let (mut write2, mut read2) = client
1430 .expect_open::<String, (), u64, i64>(ShardId::new())
1431 .await;
1432
1433 write1
1434 .expect_compare_and_append(&data1[..], u64::minimum(), 3)
1435 .await;
1436
1437 write2
1438 .expect_compare_and_append(&data2[..], u64::minimum(), 3)
1439 .await;
1440
1441 assert_eq!(
1442 read1.expect_snapshot_and_fetch(2).await,
1443 all_ok(&data1[..], 2)
1444 );
1445
1446 assert_eq!(
1447 read2.expect_snapshot_and_fetch(2).await,
1448 all_ok(&data2[..], 2)
1449 );
1450 }
1451
1452 #[mz_persist_proc::test(tokio::test)]
1453 #[cfg_attr(miri, ignore)] async fn fetch_upper(dyncfgs: ConfigUpdates) {
1455 let data = [
1456 (("1".to_owned(), "one".to_owned()), 1, 1),
1457 (("2".to_owned(), "two".to_owned()), 2, 1),
1458 ];
1459
1460 let client = new_test_client(&dyncfgs).await;
1461
1462 let shard_id = ShardId::new();
1463
1464 let (mut write1, _read1) = client
1465 .expect_open::<String, String, u64, i64>(shard_id)
1466 .await;
1467
1468 let (mut write2, _read2) = client
1469 .expect_open::<String, String, u64, i64>(shard_id)
1470 .await;
1471
1472 write1
1473 .expect_append(&data[..], write1.upper().clone(), vec![3])
1474 .await;
1475
1476 assert_eq!(write2.fetch_recent_upper().await, &Antichain::from_elem(3));
1478
1479 assert_eq!(write2.upper(), &Antichain::from_elem(3));
1482 }
1483
1484 #[mz_persist_proc::test(tokio::test)]
1485 #[cfg_attr(miri, ignore)] async fn append_with_invalid_upper(dyncfgs: ConfigUpdates) {
1487 let data = [
1488 (("1".to_owned(), "one".to_owned()), 1, 1),
1489 (("2".to_owned(), "two".to_owned()), 2, 1),
1490 ];
1491
1492 let client = new_test_client(&dyncfgs).await;
1493
1494 let shard_id = ShardId::new();
1495
1496 let (mut write, _read) = client
1497 .expect_open::<String, String, u64, i64>(shard_id)
1498 .await;
1499
1500 write
1501 .expect_append(&data[..], write.upper().clone(), vec![3])
1502 .await;
1503
1504 let data = [
1505 (("5".to_owned(), "fünf".to_owned()), 5, 1),
1506 (("6".to_owned(), "sechs".to_owned()), 6, 1),
1507 ];
1508 let res = write
1509 .append(
1510 data.iter(),
1511 Antichain::from_elem(5),
1512 Antichain::from_elem(7),
1513 )
1514 .await;
1515 assert_eq!(
1516 res,
1517 Ok(Err(UpperMismatch {
1518 expected: Antichain::from_elem(5),
1519 current: Antichain::from_elem(3)
1520 }))
1521 );
1522
1523 assert_eq!(write.upper(), &Antichain::from_elem(3));
1525 }
1526
1527 #[allow(unused)]
1530 async fn sync_send(dyncfgs: ConfigUpdates) {
1531 mz_ore::test::init_logging();
1532
1533 fn is_send_sync<T: Send + Sync>(_x: T) -> bool {
1534 true
1535 }
1536
1537 let client = new_test_client(&dyncfgs).await;
1538
1539 let (write, read) = client
1540 .expect_open::<String, String, u64, i64>(ShardId::new())
1541 .await;
1542
1543 assert!(is_send_sync(client));
1544 assert!(is_send_sync(write));
1545 assert!(is_send_sync(read));
1546 }
1547
1548 #[mz_persist_proc::test(tokio::test)]
1549 #[cfg_attr(miri, ignore)] async fn compare_and_append(dyncfgs: ConfigUpdates) {
1551 let data = vec![
1552 (("1".to_owned(), "one".to_owned()), 1, 1),
1553 (("2".to_owned(), "two".to_owned()), 2, 1),
1554 (("3".to_owned(), "three".to_owned()), 3, 1),
1555 ];
1556
1557 let id = ShardId::new();
1558 let client = new_test_client(&dyncfgs).await;
1559 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1560
1561 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1562
1563 assert_eq!(write1.upper(), &Antichain::from_elem(u64::minimum()));
1564 assert_eq!(write2.upper(), &Antichain::from_elem(u64::minimum()));
1565 assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1566
1567 write1
1569 .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1570 .await;
1571 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1572
1573 assert_eq!(
1574 read.expect_snapshot_and_fetch(2).await,
1575 all_ok(&data[..2], 2)
1576 );
1577
1578 let res = write2
1580 .compare_and_append(
1581 &data[..2],
1582 Antichain::from_elem(u64::minimum()),
1583 Antichain::from_elem(3),
1584 )
1585 .await;
1586 assert_eq!(
1587 res,
1588 Ok(Err(UpperMismatch {
1589 expected: Antichain::from_elem(u64::minimum()),
1590 current: Antichain::from_elem(3)
1591 }))
1592 );
1593
1594 assert_eq!(write2.upper(), &Antichain::from_elem(3));
1596
1597 write2.expect_compare_and_append(&data[2..], 3, 4).await;
1599
1600 assert_eq!(write2.upper(), &Antichain::from_elem(4));
1601
1602 assert_eq!(read.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1603 }
1604
1605 #[mz_persist_proc::test(tokio::test)]
1606 #[cfg_attr(miri, ignore)] async fn overlapping_append(dyncfgs: ConfigUpdates) {
1608 mz_ore::test::init_logging_default("info");
1609
1610 let data = vec![
1611 (("1".to_owned(), "one".to_owned()), 1, 1),
1612 (("2".to_owned(), "two".to_owned()), 2, 1),
1613 (("3".to_owned(), "three".to_owned()), 3, 1),
1614 (("4".to_owned(), "vier".to_owned()), 4, 1),
1615 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1616 ];
1617
1618 let id = ShardId::new();
1619 let client = new_test_client(&dyncfgs).await;
1620
1621 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1622
1623 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1624
1625 let mut listen = read.clone("").await.expect_listen(0).await;
1627
1628 write1
1630 .expect_append(&data[..2], write1.upper().clone(), vec![3])
1631 .await;
1632 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1633
1634 write2
1636 .expect_append(&data[..4], write2.upper().clone(), vec![5])
1637 .await;
1638 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1639
1640 write1
1642 .expect_append(&data[2..5], write1.upper().clone(), vec![6])
1643 .await;
1644 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1645
1646 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1647
1648 assert_eq!(
1649 listen.read_until(&6).await,
1650 (all_ok(&data[..], 1), Antichain::from_elem(6))
1651 );
1652 }
1653
1654 #[mz_persist_proc::test(tokio::test)]
1657 #[cfg_attr(miri, ignore)] async fn contiguous_append(dyncfgs: ConfigUpdates) {
1659 let data = vec![
1660 (("1".to_owned(), "one".to_owned()), 1, 1),
1661 (("2".to_owned(), "two".to_owned()), 2, 1),
1662 (("3".to_owned(), "three".to_owned()), 3, 1),
1663 (("4".to_owned(), "vier".to_owned()), 4, 1),
1664 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1665 ];
1666
1667 let id = ShardId::new();
1668 let client = new_test_client(&dyncfgs).await;
1669
1670 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1671
1672 write
1674 .expect_append(&data[..2], write.upper().clone(), vec![3])
1675 .await;
1676 assert_eq!(write.upper(), &Antichain::from_elem(3));
1677
1678 let result = write
1681 .append(
1682 &data[4..5],
1683 Antichain::from_elem(5),
1684 Antichain::from_elem(6),
1685 )
1686 .await;
1687 assert_eq!(
1688 result,
1689 Ok(Err(UpperMismatch {
1690 expected: Antichain::from_elem(5),
1691 current: Antichain::from_elem(3)
1692 }))
1693 );
1694
1695 write.expect_append(&data[2..5], vec![3], vec![6]).await;
1697 assert_eq!(write.upper(), &Antichain::from_elem(6));
1698
1699 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1700 }
1701
1702 #[mz_persist_proc::test(tokio::test)]
1705 #[cfg_attr(miri, ignore)] async fn noncontiguous_append_per_writer(dyncfgs: ConfigUpdates) {
1707 let data = vec![
1708 (("1".to_owned(), "one".to_owned()), 1, 1),
1709 (("2".to_owned(), "two".to_owned()), 2, 1),
1710 (("3".to_owned(), "three".to_owned()), 3, 1),
1711 (("4".to_owned(), "vier".to_owned()), 4, 1),
1712 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1713 ];
1714
1715 let id = ShardId::new();
1716 let client = new_test_client(&dyncfgs).await;
1717
1718 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1719
1720 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1721
1722 write1
1724 .expect_append(&data[..2], write1.upper().clone(), vec![3])
1725 .await;
1726 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1727
1728 write2.upper = Antichain::from_elem(3);
1730 write2
1731 .expect_append(&data[2..4], write2.upper().clone(), vec![5])
1732 .await;
1733 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1734
1735 write1.upper = Antichain::from_elem(5);
1737 write1
1738 .expect_append(&data[4..5], write1.upper().clone(), vec![6])
1739 .await;
1740 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1741
1742 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1743 }
1744
1745 #[mz_persist_proc::test(tokio::test)]
1748 #[cfg_attr(miri, ignore)] async fn contiguous_compare_and_append(dyncfgs: ConfigUpdates) {
1750 let data = vec![
1751 (("1".to_owned(), "one".to_owned()), 1, 1),
1752 (("2".to_owned(), "two".to_owned()), 2, 1),
1753 (("3".to_owned(), "three".to_owned()), 3, 1),
1754 (("4".to_owned(), "vier".to_owned()), 4, 1),
1755 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1756 ];
1757
1758 let id = ShardId::new();
1759 let client = new_test_client(&dyncfgs).await;
1760
1761 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1762
1763 write.expect_compare_and_append(&data[..2], 0, 3).await;
1765 assert_eq!(write.upper(), &Antichain::from_elem(3));
1766
1767 let result = write
1770 .compare_and_append(
1771 &data[4..5],
1772 Antichain::from_elem(5),
1773 Antichain::from_elem(6),
1774 )
1775 .await;
1776 assert_eq!(
1777 result,
1778 Ok(Err(UpperMismatch {
1779 expected: Antichain::from_elem(5),
1780 current: Antichain::from_elem(3)
1781 }))
1782 );
1783
1784 write.expect_compare_and_append(&data[2..5], 3, 6).await;
1787 assert_eq!(write.upper(), &Antichain::from_elem(6));
1788
1789 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1790 }
1791
1792 #[mz_persist_proc::test(tokio::test)]
1795 #[cfg_attr(miri, ignore)] async fn noncontiguous_compare_and_append_per_writer(dyncfgs: ConfigUpdates) {
1797 let data = vec![
1798 (("1".to_owned(), "one".to_owned()), 1, 1),
1799 (("2".to_owned(), "two".to_owned()), 2, 1),
1800 (("3".to_owned(), "three".to_owned()), 3, 1),
1801 (("4".to_owned(), "vier".to_owned()), 4, 1),
1802 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1803 ];
1804
1805 let id = ShardId::new();
1806 let client = new_test_client(&dyncfgs).await;
1807
1808 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1809
1810 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1811
1812 write1.expect_compare_and_append(&data[..2], 0, 3).await;
1814 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1815
1816 write2.expect_compare_and_append(&data[2..4], 3, 5).await;
1818 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1819
1820 write1.expect_compare_and_append(&data[4..5], 5, 6).await;
1822 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1823
1824 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1825 }
1826
1827 #[mz_ore::test]
1828 fn fmt_ids() {
1829 assert_eq!(
1830 format!("{}", LeasedReaderId([0u8; 16])),
1831 "r00000000-0000-0000-0000-000000000000"
1832 );
1833 assert_eq!(
1834 format!("{:?}", LeasedReaderId([0u8; 16])),
1835 "LeasedReaderId(00000000-0000-0000-0000-000000000000)"
1836 );
1837 }
1838
1839 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
1840 #[cfg_attr(miri, ignore)] async fn concurrency(dyncfgs: ConfigUpdates) {
1842 let data = DataGenerator::small();
1843
1844 const NUM_WRITERS: usize = 2;
1845 let id = ShardId::new();
1846 let client = new_test_client(&dyncfgs).await;
1847 let mut handles = Vec::<mz_ore::task::JoinHandle<()>>::new();
1848 for idx in 0..NUM_WRITERS {
1849 let (data, client) = (data.clone(), client.clone());
1850
1851 let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel(1);
1852
1853 let client1 = client.clone();
1854 let handle = mz_ore::task::spawn(|| format!("writer-{}", idx), async move {
1855 let (write, _) = client1.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1856 let mut current_upper = 0;
1857 for batch in data.batches() {
1858 let new_upper = match batch.get(batch.len() - 1) {
1859 Some((_, max_ts, _)) => u64::decode(max_ts) + 1,
1860 None => continue,
1861 };
1862 if PartialOrder::less_equal(&Antichain::from_elem(new_upper), write.upper()) {
1877 continue;
1878 }
1879
1880 let current_upper_chain = Antichain::from_elem(current_upper);
1881 current_upper = new_upper;
1882 let new_upper_chain = Antichain::from_elem(new_upper);
1883 let mut builder = write.builder(current_upper_chain);
1884
1885 for ((k, v), t, d) in batch.iter() {
1886 builder
1887 .add(&k.to_vec(), &v.to_vec(), &u64::decode(t), &i64::decode(d))
1888 .await
1889 .expect("invalid usage");
1890 }
1891
1892 let batch = builder
1893 .finish(new_upper_chain)
1894 .await
1895 .expect("invalid usage");
1896
1897 match batch_tx.send(batch).await {
1898 Ok(_) => (),
1899 Err(e) => panic!("send error: {}", e),
1900 }
1901 }
1902 });
1903 handles.push(handle);
1904
1905 let handle = mz_ore::task::spawn(|| format!("appender-{}", idx), async move {
1906 let (mut write, _) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1907
1908 while let Some(batch) = batch_rx.recv().await {
1909 let lower = batch.lower().clone();
1910 let upper = batch.upper().clone();
1911 write
1912 .append_batch(batch, lower, upper)
1913 .await
1914 .expect("invalid usage")
1915 .expect("unexpected upper");
1916 }
1917 });
1918 handles.push(handle);
1919 }
1920
1921 for handle in handles {
1922 let () = handle.await;
1923 }
1924
1925 let expected = data.records().collect::<Vec<_>>();
1926 let max_ts = expected.last().map(|(_, t, _)| *t).unwrap_or_default();
1927 let (_, mut read) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1928 assert_eq!(
1929 read.expect_snapshot_and_fetch(max_ts).await,
1930 all_ok(expected.iter(), max_ts)
1931 );
1932 }
1933
1934 #[mz_persist_proc::test(tokio::test)]
1938 #[cfg_attr(miri, ignore)] async fn regression_blocking_reads(dyncfgs: ConfigUpdates) {
1940 let waker = noop_waker();
1941 let mut cx = Context::from_waker(&waker);
1942
1943 let data = [
1944 (("1".to_owned(), "one".to_owned()), 1, 1),
1945 (("2".to_owned(), "two".to_owned()), 2, 1),
1946 (("3".to_owned(), "three".to_owned()), 3, 1),
1947 ];
1948
1949 let id = ShardId::new();
1950 let client = new_test_client(&dyncfgs).await;
1951 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1952
1953 let mut listen = read.clone("").await.expect_listen(1).await;
1955 let mut listen_next = Box::pin(listen.fetch_next());
1956 for _ in 0..100 {
1960 assert!(
1961 Pin::new(&mut listen_next).poll(&mut cx).is_pending(),
1962 "listen::next unexpectedly ready"
1963 );
1964 }
1965
1966 write
1968 .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1969 .await;
1970
1971 assert_eq!(
1974 listen_next.await,
1975 vec![
1976 ListenEvent::Updates(vec![((Ok("2".to_owned()), Ok("two".to_owned())), 2, 1)]),
1977 ListenEvent::Progress(Antichain::from_elem(3)),
1978 ]
1979 );
1980
1981 let mut snap = Box::pin(read.expect_snapshot_and_fetch(3));
1995 for _ in 0..100 {
1996 assert!(
1997 Pin::new(&mut snap).poll(&mut cx).is_pending(),
1998 "snapshot unexpectedly ready"
1999 );
2000 }
2001
2002 write.expect_compare_and_append(&data[2..], 3, 4).await;
2004
2005 assert_eq!(snap.await, all_ok(&data[..], 3));
2007 }
2008
2009 #[mz_persist_proc::test(tokio::test)]
2010 #[cfg_attr(miri, ignore)] async fn heartbeat_task_shutdown(dyncfgs: ConfigUpdates) {
2012 let mut cache = new_test_client_cache(&dyncfgs);
2015 cache
2016 .cfg
2017 .set_config(&READER_LEASE_DURATION, Duration::from_millis(1));
2018 cache.cfg.writer_lease_duration = Duration::from_millis(1);
2019 let (_write, mut read) = cache
2020 .open(PersistLocation::new_in_mem())
2021 .await
2022 .expect("client construction failed")
2023 .expect_open::<(), (), u64, i64>(ShardId::new())
2024 .await;
2025 let mut read_unexpired_state = read
2026 .unexpired_state
2027 .take()
2028 .expect("handle should have unexpired state");
2029 read.expire().await;
2030 for read_heartbeat_task in mem::take(&mut read_unexpired_state._heartbeat_tasks) {
2031 let () = read_heartbeat_task.await;
2032 }
2033 }
2034
2035 #[mz_persist_proc::test(tokio::test)]
2038 #[cfg_attr(miri, ignore)] async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
2040 let persist_client = new_test_client(&dyncfgs).await;
2041
2042 let shard_id = ShardId::new();
2043 pub const CRITICAL_SINCE: CriticalReaderId =
2044 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2045
2046 let (mut write, mut read) = persist_client
2047 .expect_open::<(), (), u64, i64>(shard_id)
2048 .await;
2049
2050 let () = read.downgrade_since(&Antichain::new()).await;
2053 let () = write.advance_upper(&Antichain::new()).await;
2054
2055 let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
2056 .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
2057 .await
2058 .expect("invalid persist usage");
2059
2060 let epoch = since_handle.opaque().clone();
2061 let new_since = Antichain::new();
2062 let downgrade = since_handle
2063 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2064 .await;
2065
2066 assert!(
2067 downgrade.is_ok(),
2068 "downgrade of critical handle must succeed"
2069 );
2070
2071 let finalize = persist_client
2072 .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2073 .await;
2074
2075 assert_ok!(finalize, "finalization must succeed");
2076
2077 let is_finalized = persist_client
2078 .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2079 .await
2080 .expect("invalid persist usage");
2081 assert!(is_finalized, "shard must still be finalized");
2082 }
2083
2084 #[mz_persist_proc::test(tokio::test)]
2088 #[cfg_attr(miri, ignore)] async fn finalize_shard(dyncfgs: ConfigUpdates) {
2090 const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
2091 let persist_client = new_test_client(&dyncfgs).await;
2092
2093 let shard_id = ShardId::new();
2094 pub const CRITICAL_SINCE: CriticalReaderId =
2095 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2096
2097 let (mut write, mut read) = persist_client
2098 .expect_open::<(), (), u64, i64>(shard_id)
2099 .await;
2100
2101 let () = write
2103 .compare_and_append(DATA, Antichain::from_elem(0), Antichain::from_elem(1))
2104 .await
2105 .expect("usage should be valid")
2106 .expect("upper should match");
2107
2108 let () = read.downgrade_since(&Antichain::new()).await;
2111 let () = write.advance_upper(&Antichain::new()).await;
2112
2113 let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
2114 .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
2115 .await
2116 .expect("invalid persist usage");
2117
2118 let epoch = since_handle.opaque().clone();
2119 let new_since = Antichain::new();
2120 let downgrade = since_handle
2121 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2122 .await;
2123
2124 assert!(
2125 downgrade.is_ok(),
2126 "downgrade of critical handle must succeed"
2127 );
2128
2129 let finalize = persist_client
2130 .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2131 .await;
2132
2133 assert_ok!(finalize, "finalization must succeed");
2134
2135 let is_finalized = persist_client
2136 .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2137 .await
2138 .expect("invalid persist usage");
2139 assert!(is_finalized, "shard must still be finalized");
2140 }
2141
2142 proptest! {
2143 #![proptest_config(ProptestConfig::with_cases(4096))]
2144
2145 #[mz_ore::test]
2146 #[cfg_attr(miri, ignore)] fn shard_id_protobuf_roundtrip(expect in any::<ShardId>() ) {
2148 let actual = protobuf_roundtrip::<_, String>(&expect);
2149 assert_ok!(actual);
2150 assert_eq!(actual.unwrap(), expect);
2151 }
2152 }
2153}