1#![warn(missing_docs, missing_debug_implementations)]
13#![allow(ungated_async_fn_track_caller)]
17
18use std::fmt::Debug;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use differential_dataflow::difference::Monoid;
23use differential_dataflow::lattice::Lattice;
24use itertools::Itertools;
25use mz_build_info::{BuildInfo, build_info};
26use mz_dyncfg::ConfigSet;
27use mz_ore::instrument;
28use mz_persist::location::{Blob, Consensus, ExternalError};
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::{Codec, Codec64, Opaque};
31use mz_proto::{IntoRustIfSome, ProtoType};
32use semver::Version;
33use timely::order::TotalOrder;
34use timely::progress::{Antichain, Timestamp};
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::batch::{BATCH_DELETE_ENABLED, Batch, BatchBuilder, ProtoBatch};
38use crate::cache::{PersistClientCache, StateCache};
39use crate::cfg::PersistConfig;
40use crate::critical::{CriticalReaderId, SinceHandle};
41use crate::error::InvalidUsage;
42use crate::fetch::{BatchFetcher, BatchFetcherConfig};
43use crate::internal::compact::{CompactConfig, Compactor};
44use crate::internal::encoding::parse_id;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::machine::{Machine, retry_external};
47use crate::internal::state_versions::StateVersions;
48use crate::metrics::Metrics;
49use crate::read::{
50 Cursor, LazyPartStats, LeasedReaderId, READER_LEASE_DURATION, ReadHandle, Since,
51};
52use crate::rpc::PubSubSender;
53use crate::schema::CaESchema;
54use crate::write::{WriteHandle, WriterId};
55
56pub mod async_runtime;
57pub mod batch;
58pub mod cache;
59pub mod cfg;
60pub mod cli {
61 pub mod admin;
63 pub mod args;
64 pub mod bench;
65 pub mod inspect;
66}
67pub mod critical;
68pub mod error;
69pub mod fetch;
70pub mod internals_bench;
71pub mod iter;
72pub mod metrics {
73 pub use crate::internal::metrics::{
75 Metrics, SinkMetrics, SinkWorkerMetrics, UpdateDelta, encode_ts_metric,
76 };
77}
78pub mod operators {
79 use mz_dyncfg::Config;
82
83 pub mod shard_source;
84
85 pub(crate) const STORAGE_SOURCE_DECODE_FUEL: Config<usize> = Config::new(
87 "storage_source_decode_fuel",
88 100_000,
89 "\
90 The maximum amount of work to do in the persist_source mfp_and_decode \
91 operator before yielding.",
92 );
93}
94pub mod read;
95pub mod rpc;
96pub mod schema;
97pub mod stats;
98pub mod usage;
99pub mod write;
100
101mod internal {
103 pub mod apply;
104 pub mod cache;
105 pub mod compact;
106 pub mod encoding;
107 pub mod gc;
108 pub mod machine;
109 pub mod maintenance;
110 pub mod merge;
111 pub mod metrics;
112 pub mod paths;
113 pub mod restore;
114 pub mod service;
115 pub mod state;
116 pub mod state_diff;
117 pub mod state_versions;
118 pub mod trace;
119 pub mod watch;
120
121 #[cfg(test)]
122 pub mod datadriven;
123}
124
125pub const BUILD_INFO: BuildInfo = build_info!();
127
128pub use mz_persist_types::{PersistLocation, ShardId};
130
131pub use crate::internal::encoding::Schemas;
132
133#[derive(Clone, Debug)]
136pub struct Diagnostics {
137 pub shard_name: String,
139 pub handle_purpose: String,
141}
142
143impl Diagnostics {
144 pub fn from_purpose(handle_purpose: &str) -> Self {
146 Self {
147 shard_name: "unknown".to_string(),
148 handle_purpose: handle_purpose.to_string(),
149 }
150 }
151
152 pub fn for_tests() -> Self {
154 Self {
155 shard_name: "test-shard-name".to_string(),
156 handle_purpose: "test-purpose".to_string(),
157 }
158 }
159}
160
161#[derive(Debug, Clone)]
182pub struct PersistClient {
183 cfg: PersistConfig,
184 blob: Arc<dyn Blob>,
185 consensus: Arc<dyn Consensus>,
186 metrics: Arc<Metrics>,
187 isolated_runtime: Arc<IsolatedRuntime>,
188 shared_states: Arc<StateCache>,
189 pubsub_sender: Arc<dyn PubSubSender>,
190}
191
192impl PersistClient {
193 pub fn new(
199 cfg: PersistConfig,
200 blob: Arc<dyn Blob>,
201 consensus: Arc<dyn Consensus>,
202 metrics: Arc<Metrics>,
203 isolated_runtime: Arc<IsolatedRuntime>,
204 shared_states: Arc<StateCache>,
205 pubsub_sender: Arc<dyn PubSubSender>,
206 ) -> Result<Self, ExternalError> {
207 Ok(PersistClient {
210 cfg,
211 blob,
212 consensus,
213 metrics,
214 isolated_runtime,
215 shared_states,
216 pubsub_sender,
217 })
218 }
219
220 pub async fn new_for_tests() -> Self {
222 let cache = PersistClientCache::new_no_metrics();
223 cache
224 .open(PersistLocation::new_in_mem())
225 .await
226 .expect("in-mem location is valid")
227 }
228
229 pub fn dyncfgs(&self) -> &ConfigSet {
231 &self.cfg.configs
232 }
233
234 async fn make_machine<K, V, T, D>(
235 &self,
236 shard_id: ShardId,
237 diagnostics: Diagnostics,
238 ) -> Result<Machine<K, V, T, D>, InvalidUsage<T>>
239 where
240 K: Debug + Codec,
241 V: Debug + Codec,
242 T: Timestamp + Lattice + Codec64 + Sync,
243 D: Monoid + Codec64 + Send + Sync,
244 {
245 let state_versions = StateVersions::new(
246 self.cfg.clone(),
247 Arc::clone(&self.consensus),
248 Arc::clone(&self.blob),
249 Arc::clone(&self.metrics),
250 );
251 let machine = Machine::<K, V, T, D>::new(
252 self.cfg.clone(),
253 shard_id,
254 Arc::clone(&self.metrics),
255 Arc::new(state_versions),
256 Arc::clone(&self.shared_states),
257 Arc::clone(&self.pubsub_sender),
258 Arc::clone(&self.isolated_runtime),
259 diagnostics.clone(),
260 )
261 .await?;
262 Ok(machine)
263 }
264
265 #[instrument(level = "debug", fields(shard = %shard_id))]
283 pub async fn open<K, V, T, D>(
284 &self,
285 shard_id: ShardId,
286 key_schema: Arc<K::Schema>,
287 val_schema: Arc<V::Schema>,
288 diagnostics: Diagnostics,
289 use_critical_since: bool,
290 ) -> Result<(WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>), InvalidUsage<T>>
291 where
292 K: Debug + Codec,
293 V: Debug + Codec,
294 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
295 D: Monoid + Ord + Codec64 + Send + Sync,
296 {
297 Ok((
298 self.open_writer(
299 shard_id,
300 Arc::clone(&key_schema),
301 Arc::clone(&val_schema),
302 diagnostics.clone(),
303 )
304 .await?,
305 self.open_leased_reader(
306 shard_id,
307 key_schema,
308 val_schema,
309 diagnostics,
310 use_critical_since,
311 )
312 .await?,
313 ))
314 }
315
316 #[instrument(level = "debug", fields(shard = %shard_id))]
325 pub async fn open_leased_reader<K, V, T, D>(
326 &self,
327 shard_id: ShardId,
328 key_schema: Arc<K::Schema>,
329 val_schema: Arc<V::Schema>,
330 diagnostics: Diagnostics,
331 use_critical_since: bool,
332 ) -> Result<ReadHandle<K, V, T, D>, InvalidUsage<T>>
333 where
334 K: Debug + Codec,
335 V: Debug + Codec,
336 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
337 D: Monoid + Codec64 + Send + Sync,
338 {
339 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
340 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
341
342 let reader_id = LeasedReaderId::new();
343 let heartbeat_ts = (self.cfg.now)();
344 let (reader_state, maintenance) = machine
345 .register_leased_reader(
346 &reader_id,
347 &diagnostics.handle_purpose,
348 READER_LEASE_DURATION.get(&self.cfg),
349 heartbeat_ts,
350 use_critical_since,
351 )
352 .await;
353 maintenance.start_performing(&machine, &gc);
354 let schemas = Schemas {
355 id: None,
356 key: key_schema,
357 val: val_schema,
358 };
359 let reader = ReadHandle::new(
360 self.cfg.clone(),
361 Arc::clone(&self.metrics),
362 machine,
363 gc,
364 Arc::clone(&self.blob),
365 reader_id,
366 schemas,
367 reader_state.since,
368 heartbeat_ts,
369 )
370 .await;
371
372 Ok(reader)
373 }
374
375 #[instrument(level = "debug", fields(shard = %shard_id))]
377 pub async fn create_batch_fetcher<K, V, T, D>(
378 &self,
379 shard_id: ShardId,
380 key_schema: Arc<K::Schema>,
381 val_schema: Arc<V::Schema>,
382 is_transient: bool,
383 diagnostics: Diagnostics,
384 ) -> Result<BatchFetcher<K, V, T, D>, InvalidUsage<T>>
385 where
386 K: Debug + Codec,
387 V: Debug + Codec,
388 T: Timestamp + Lattice + Codec64 + Sync,
389 D: Monoid + Codec64 + Send + Sync,
390 {
391 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
392 let read_schemas = Schemas {
393 id: None,
394 key: key_schema,
395 val: val_schema,
396 };
397 let schema_cache = machine.applier.schema_cache();
398 let fetcher = BatchFetcher {
399 cfg: BatchFetcherConfig::new(&self.cfg),
400 blob: Arc::clone(&self.blob),
401 metrics: Arc::clone(&self.metrics),
402 shard_metrics: Arc::clone(&machine.applier.shard_metrics),
403 shard_id,
404 read_schemas,
405 schema_cache,
406 is_transient,
407 _phantom: PhantomData,
408 };
409
410 Ok(fetcher)
411 }
412
413 pub const CONTROLLER_CRITICAL_SINCE: CriticalReaderId =
435 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
436
437 #[instrument(level = "debug", fields(shard = %shard_id))]
458 pub async fn open_critical_since<K, V, T, D, O>(
459 &self,
460 shard_id: ShardId,
461 reader_id: CriticalReaderId,
462 diagnostics: Diagnostics,
463 ) -> Result<SinceHandle<K, V, T, D, O>, InvalidUsage<T>>
464 where
465 K: Debug + Codec,
466 V: Debug + Codec,
467 T: Timestamp + Lattice + Codec64 + Sync,
468 D: Monoid + Codec64 + Send + Sync,
469 O: Opaque + Codec64,
470 {
471 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
472 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
473
474 let (state, maintenance) = machine
475 .register_critical_reader::<O>(&reader_id, &diagnostics.handle_purpose)
476 .await;
477 maintenance.start_performing(&machine, &gc);
478 let handle = SinceHandle::new(
479 machine,
480 gc,
481 reader_id,
482 state.since,
483 Codec64::decode(state.opaque.0),
484 );
485
486 Ok(handle)
487 }
488
489 #[instrument(level = "debug", fields(shard = %shard_id))]
494 pub async fn open_writer<K, V, T, D>(
495 &self,
496 shard_id: ShardId,
497 key_schema: Arc<K::Schema>,
498 val_schema: Arc<V::Schema>,
499 diagnostics: Diagnostics,
500 ) -> Result<WriteHandle<K, V, T, D>, InvalidUsage<T>>
501 where
502 K: Debug + Codec,
503 V: Debug + Codec,
504 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
505 D: Monoid + Ord + Codec64 + Send + Sync,
506 {
507 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
508 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
509
510 let schema_id = machine.find_schema(&*key_schema, &*val_schema);
515
516 let writer_id = WriterId::new();
517 let schemas = Schemas {
518 id: schema_id,
519 key: key_schema,
520 val: val_schema,
521 };
522 let writer = WriteHandle::new(
523 self.cfg.clone(),
524 Arc::clone(&self.metrics),
525 machine,
526 gc,
527 Arc::clone(&self.blob),
528 writer_id,
529 &diagnostics.handle_purpose,
530 schemas,
531 );
532 Ok(writer)
533 }
534
535 #[instrument(level = "debug", fields(shard = %shard_id))]
545 pub async fn batch_builder<K, V, T, D>(
546 &self,
547 shard_id: ShardId,
548 write_schemas: Schemas<K, V>,
549 lower: Antichain<T>,
550 max_runs: Option<usize>,
551 ) -> BatchBuilder<K, V, T, D>
552 where
553 K: Debug + Codec,
554 V: Debug + Codec,
555 T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
556 D: Monoid + Ord + Codec64 + Send + Sync,
557 {
558 let mut compact_cfg = CompactConfig::new(&self.cfg, shard_id);
559 compact_cfg.batch.max_runs = max_runs;
560 WriteHandle::builder_inner(
561 &self.cfg,
562 compact_cfg,
563 Arc::clone(&self.metrics),
564 self.metrics.shards.shard(&shard_id, "peek_stash"),
565 &self.metrics.user,
566 Arc::clone(&self.isolated_runtime),
567 Arc::clone(&self.blob),
568 shard_id,
569 write_schemas,
570 lower,
571 )
572 }
573
574 pub fn batch_from_transmittable_batch<K, V, T, D>(
583 &self,
584 shard_id: &ShardId,
585 batch: ProtoBatch,
586 ) -> Batch<K, V, T, D>
587 where
588 K: Debug + Codec,
589 V: Debug + Codec,
590 T: Timestamp + Lattice + Codec64 + Sync,
591 D: Monoid + Ord + Codec64 + Send + Sync,
592 {
593 let batch_shard_id: ShardId = batch
594 .shard_id
595 .into_rust()
596 .expect("valid transmittable batch");
597 assert_eq!(&batch_shard_id, shard_id);
598
599 let shard_metrics = self.metrics.shards.shard(shard_id, "peek_stash");
600
601 let ret = Batch {
602 batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
603 metrics: Arc::clone(&self.metrics),
604 shard_metrics,
605 version: Version::parse(&batch.version).expect("valid transmittable batch"),
606 batch: batch
607 .batch
608 .into_rust_if_some("ProtoBatch::batch")
609 .expect("valid transmittable batch"),
610 blob: Arc::clone(&self.blob),
611 _phantom: std::marker::PhantomData,
612 };
613
614 assert_eq!(&ret.shard_id(), shard_id);
615 ret
616 }
617
618 #[allow(clippy::unused_async)]
633 pub async fn read_batches_consolidated<K, V, T, D>(
634 &mut self,
635 shard_id: ShardId,
636 as_of: Antichain<T>,
637 read_schemas: Schemas<K, V>,
638 batches: Vec<Batch<K, V, T, D>>,
639 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
640 memory_budget_bytes: usize,
641 ) -> Result<Cursor<K, V, T, D, Vec<Batch<K, V, T, D>>>, Since<T>>
642 where
643 K: Debug + Codec + Ord,
644 V: Debug + Codec + Ord,
645 T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
646 D: Monoid + Ord + Codec64 + Send + Sync,
647 {
648 let shard_metrics = self.metrics.shards.shard(&shard_id, "peek_stash");
649
650 let hollow_batches = batches.iter().map(|b| b.batch.clone()).collect_vec();
651
652 ReadHandle::read_batches_consolidated(
653 &self.cfg,
654 Arc::clone(&self.metrics),
655 shard_metrics,
656 self.metrics.read.snapshot.clone(),
657 Arc::clone(&self.blob),
658 shard_id,
659 as_of,
660 read_schemas,
661 &hollow_batches,
662 batches,
663 should_fetch_part,
664 memory_budget_bytes,
665 )
666 }
667
668 pub async fn get_schema<K, V, T, D>(
670 &self,
671 shard_id: ShardId,
672 schema_id: SchemaId,
673 diagnostics: Diagnostics,
674 ) -> Result<Option<(K::Schema, V::Schema)>, InvalidUsage<T>>
675 where
676 K: Debug + Codec,
677 V: Debug + Codec,
678 T: Timestamp + Lattice + Codec64 + Sync,
679 D: Monoid + Codec64 + Send + Sync,
680 {
681 let machine = self
682 .make_machine::<K, V, T, D>(shard_id, diagnostics)
683 .await?;
684 Ok(machine.get_schema(schema_id))
685 }
686
687 pub async fn latest_schema<K, V, T, D>(
689 &self,
690 shard_id: ShardId,
691 diagnostics: Diagnostics,
692 ) -> Result<Option<(SchemaId, K::Schema, V::Schema)>, InvalidUsage<T>>
693 where
694 K: Debug + Codec,
695 V: Debug + Codec,
696 T: Timestamp + Lattice + Codec64 + Sync,
697 D: Monoid + Codec64 + Send + Sync,
698 {
699 let machine = self
700 .make_machine::<K, V, T, D>(shard_id, diagnostics)
701 .await?;
702 Ok(machine.latest_schema())
703 }
704
705 pub async fn register_schema<K, V, T, D>(
717 &self,
718 shard_id: ShardId,
719 key_schema: &K::Schema,
720 val_schema: &V::Schema,
721 diagnostics: Diagnostics,
722 ) -> Result<Option<SchemaId>, InvalidUsage<T>>
723 where
724 K: Debug + Codec,
725 V: Debug + Codec,
726 T: Timestamp + Lattice + Codec64 + Sync,
727 D: Monoid + Codec64 + Send + Sync,
728 {
729 let machine = self
730 .make_machine::<K, V, T, D>(shard_id, diagnostics)
731 .await?;
732 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
733
734 let (schema_id, maintenance) = machine.register_schema(key_schema, val_schema).await;
735 maintenance.start_performing(&machine, &gc);
736
737 Ok(schema_id)
738 }
739
740 pub async fn compare_and_evolve_schema<K, V, T, D>(
751 &self,
752 shard_id: ShardId,
753 expected: SchemaId,
754 key_schema: &K::Schema,
755 val_schema: &V::Schema,
756 diagnostics: Diagnostics,
757 ) -> Result<CaESchema<K, V>, InvalidUsage<T>>
758 where
759 K: Debug + Codec,
760 V: Debug + Codec,
761 T: Timestamp + Lattice + Codec64 + Sync,
762 D: Monoid + Codec64 + Send + Sync,
763 {
764 let machine = self
765 .make_machine::<K, V, T, D>(shard_id, diagnostics)
766 .await?;
767 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
768 let (res, maintenance) = machine
769 .compare_and_evolve_schema(expected, key_schema, val_schema)
770 .await;
771 maintenance.start_performing(&machine, &gc);
772 Ok(res)
773 }
774
775 pub async fn is_finalized<K, V, T, D>(
779 &self,
780 shard_id: ShardId,
781 diagnostics: Diagnostics,
782 ) -> Result<bool, InvalidUsage<T>>
783 where
784 K: Debug + Codec,
785 V: Debug + Codec,
786 T: Timestamp + Lattice + Codec64 + Sync,
787 D: Monoid + Codec64 + Send + Sync,
788 {
789 let machine = self
790 .make_machine::<K, V, T, D>(shard_id, diagnostics)
791 .await?;
792 Ok(machine.is_finalized())
793 }
794
795 #[instrument(level = "debug", fields(shard = %shard_id))]
806 pub async fn finalize_shard<K, V, T, D>(
807 &self,
808 shard_id: ShardId,
809 diagnostics: Diagnostics,
810 ) -> Result<(), InvalidUsage<T>>
811 where
812 K: Debug + Codec,
813 V: Debug + Codec,
814 T: Timestamp + Lattice + Codec64 + Sync,
815 D: Monoid + Codec64 + Send + Sync,
816 {
817 let machine = self
818 .make_machine::<K, V, T, D>(shard_id, diagnostics)
819 .await?;
820
821 let maintenance = machine.become_tombstone().await?;
822 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
823
824 let () = maintenance.perform(&machine, &gc).await;
825
826 Ok(())
827 }
828
829 pub async fn upgrade_version<K, V, T, D>(
832 &self,
833 shard_id: ShardId,
834 diagnostics: Diagnostics,
835 ) -> Result<(), InvalidUsage<T>>
836 where
837 K: Debug + Codec,
838 V: Debug + Codec,
839 T: Timestamp + Lattice + Codec64 + Sync,
840 D: Monoid + Codec64 + Send + Sync,
841 {
842 let machine = self
843 .make_machine::<K, V, T, D>(shard_id, diagnostics)
844 .await?;
845
846 match machine.upgrade_version().await {
847 Ok(maintenance) => {
848 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
849 let () = maintenance.perform(&machine, &gc).await;
850 Ok(())
851 }
852 Err(version) => Err(InvalidUsage::IncompatibleVersion { version }),
853 }
854 }
855
856 pub async fn inspect_shard<T: Timestamp + Lattice + Codec64>(
862 &self,
863 shard_id: &ShardId,
864 ) -> Result<impl serde::Serialize, anyhow::Error> {
865 let state_versions = StateVersions::new(
866 self.cfg.clone(),
867 Arc::clone(&self.consensus),
868 Arc::clone(&self.blob),
869 Arc::clone(&self.metrics),
870 );
871 let versions = state_versions.fetch_all_live_diffs(shard_id).await;
875 if versions.0.is_empty() {
876 return Err(anyhow::anyhow!("{} does not exist", shard_id));
877 }
878 let state = state_versions
879 .fetch_current_state::<T>(shard_id, versions.0)
880 .await;
881 let state = state.check_ts_codec(shard_id)?;
882 Ok(state)
883 }
884
885 #[cfg(test)]
887 #[track_caller]
888 pub async fn expect_open<K, V, T, D>(
889 &self,
890 shard_id: ShardId,
891 ) -> (WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>)
892 where
893 K: Debug + Codec,
894 V: Debug + Codec,
895 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
896 D: Monoid + Ord + Codec64 + Send + Sync,
897 K::Schema: Default,
898 V::Schema: Default,
899 {
900 self.open(
901 shard_id,
902 Arc::new(K::Schema::default()),
903 Arc::new(V::Schema::default()),
904 Diagnostics::for_tests(),
905 true,
906 )
907 .await
908 .expect("codec mismatch")
909 }
910
911 pub fn metrics(&self) -> &Arc<Metrics> {
915 &self.metrics
916 }
917}
918
919#[cfg(test)]
920mod tests {
921 use std::future::Future;
922 use std::mem;
923 use std::pin::Pin;
924 use std::task::Context;
925 use std::time::Duration;
926
927 use differential_dataflow::consolidation::consolidate_updates;
928 use differential_dataflow::lattice::Lattice;
929 use futures_task::noop_waker;
930 use mz_dyncfg::ConfigUpdates;
931 use mz_ore::assert_ok;
932 use mz_persist::indexed::encoding::BlobTraceBatchPart;
933 use mz_persist::workload::DataGenerator;
934 use mz_persist_types::codec_impls::{StringSchema, VecU8Schema};
935 use mz_proto::protobuf_roundtrip;
936 use proptest::prelude::*;
937 use timely::order::PartialOrder;
938 use timely::progress::Antichain;
939
940 use crate::batch::BLOB_TARGET_SIZE;
941 use crate::cache::PersistClientCache;
942 use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
943 use crate::error::{CodecConcreteType, CodecMismatch, UpperMismatch};
944 use crate::internal::paths::BlobKey;
945 use crate::read::ListenEvent;
946
947 use super::*;
948
949 pub fn new_test_client_cache(dyncfgs: &ConfigUpdates) -> PersistClientCache {
950 let mut cache = PersistClientCache::new_no_metrics();
953 cache.cfg.set_config(&BLOB_TARGET_SIZE, 10);
954 cache
955 .cfg
956 .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 1);
957 dyncfgs.apply(cache.cfg());
958
959 cache.cfg.compaction_enabled = true;
961 cache
962 }
963
964 pub async fn new_test_client(dyncfgs: &ConfigUpdates) -> PersistClient {
965 let cache = new_test_client_cache(dyncfgs);
966 cache
967 .open(PersistLocation::new_in_mem())
968 .await
969 .expect("client construction failed")
970 }
971
972 pub fn all_ok<'a, K, V, T, D, I>(
973 iter: I,
974 as_of: T,
975 ) -> Vec<((Result<K, String>, Result<V, String>), T, D)>
976 where
977 K: Ord + Clone + 'a,
978 V: Ord + Clone + 'a,
979 T: Timestamp + Lattice + Clone + 'a,
980 D: Monoid + Clone + 'a,
981 I: IntoIterator<Item = &'a ((K, V), T, D)>,
982 {
983 let as_of = Antichain::from_elem(as_of);
984 let mut ret = iter
985 .into_iter()
986 .map(|((k, v), t, d)| {
987 let mut t = t.clone();
988 t.advance_by(as_of.borrow());
989 ((Ok(k.clone()), Ok(v.clone())), t, d.clone())
990 })
991 .collect();
992 consolidate_updates(&mut ret);
993 ret
994 }
995
996 pub async fn expect_fetch_part<K, V, T, D>(
997 blob: &dyn Blob,
998 key: &BlobKey,
999 metrics: &Metrics,
1000 read_schemas: &Schemas<K, V>,
1001 ) -> (
1002 BlobTraceBatchPart<T>,
1003 Vec<((Result<K, String>, Result<V, String>), T, D)>,
1004 )
1005 where
1006 K: Codec,
1007 V: Codec,
1008 T: Timestamp + Codec64,
1009 D: Codec64,
1010 {
1011 let value = blob
1012 .get(key)
1013 .await
1014 .expect("failed to fetch part")
1015 .expect("missing part");
1016 let mut part =
1017 BlobTraceBatchPart::decode(&value, &metrics.columnar).expect("failed to decode part");
1018 let _ = part
1020 .updates
1021 .get_or_make_codec::<K, V>(&read_schemas.key, &read_schemas.val);
1022 let mut updates = Vec::new();
1023 for ((k, v), t, d) in part.updates.records().expect("codec data").iter() {
1025 updates.push((
1026 (
1027 K::decode(k, &read_schemas.key),
1028 V::decode(v, &read_schemas.val),
1029 ),
1030 T::decode(t),
1031 D::decode(d),
1032 ));
1033 }
1034 (part, updates)
1035 }
1036
1037 #[mz_persist_proc::test(tokio::test)]
1038 #[cfg_attr(miri, ignore)] async fn sanity_check(dyncfgs: ConfigUpdates) {
1040 let data = [
1041 (("1".to_owned(), "one".to_owned()), 1, 1),
1042 (("2".to_owned(), "two".to_owned()), 2, 1),
1043 (("3".to_owned(), "three".to_owned()), 3, 1),
1044 ];
1045
1046 let (mut write, mut read) = new_test_client(&dyncfgs)
1047 .await
1048 .expect_open::<String, String, u64, i64>(ShardId::new())
1049 .await;
1050 assert_eq!(write.upper(), &Antichain::from_elem(u64::minimum()));
1051 assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1052
1053 write
1055 .expect_append(&data[..2], write.upper().clone(), vec![3])
1056 .await;
1057 assert_eq!(write.upper(), &Antichain::from_elem(3));
1058
1059 assert_eq!(
1061 read.expect_snapshot_and_fetch(1).await,
1062 all_ok(&data[..1], 1)
1063 );
1064
1065 let mut listen = read.clone("").await.expect_listen(1).await;
1066
1067 write
1069 .expect_append(&data[2..], write.upper().clone(), vec![4])
1070 .await;
1071 assert_eq!(write.upper(), &Antichain::from_elem(4));
1072
1073 assert_eq!(
1075 listen.read_until(&4).await,
1076 (all_ok(&data[1..], 1), Antichain::from_elem(4))
1077 );
1078
1079 read.downgrade_since(&Antichain::from_elem(2)).await;
1081 assert_eq!(read.since(), &Antichain::from_elem(2));
1082 }
1083
1084 #[mz_persist_proc::test(tokio::test)]
1086 #[cfg_attr(miri, ignore)] async fn open_reader_writer(dyncfgs: ConfigUpdates) {
1088 let data = vec![
1089 (("1".to_owned(), "one".to_owned()), 1, 1),
1090 (("2".to_owned(), "two".to_owned()), 2, 1),
1091 (("3".to_owned(), "three".to_owned()), 3, 1),
1092 ];
1093
1094 let shard_id = ShardId::new();
1095 let client = new_test_client(&dyncfgs).await;
1096 let mut write1 = client
1097 .open_writer::<String, String, u64, i64>(
1098 shard_id,
1099 Arc::new(StringSchema),
1100 Arc::new(StringSchema),
1101 Diagnostics::for_tests(),
1102 )
1103 .await
1104 .expect("codec mismatch");
1105 let mut read1 = client
1106 .open_leased_reader::<String, String, u64, i64>(
1107 shard_id,
1108 Arc::new(StringSchema),
1109 Arc::new(StringSchema),
1110 Diagnostics::for_tests(),
1111 true,
1112 )
1113 .await
1114 .expect("codec mismatch");
1115 let mut read2 = client
1116 .open_leased_reader::<String, String, u64, i64>(
1117 shard_id,
1118 Arc::new(StringSchema),
1119 Arc::new(StringSchema),
1120 Diagnostics::for_tests(),
1121 true,
1122 )
1123 .await
1124 .expect("codec mismatch");
1125 let mut write2 = client
1126 .open_writer::<String, String, u64, i64>(
1127 shard_id,
1128 Arc::new(StringSchema),
1129 Arc::new(StringSchema),
1130 Diagnostics::for_tests(),
1131 )
1132 .await
1133 .expect("codec mismatch");
1134
1135 write2.expect_compare_and_append(&data[..1], 0, 2).await;
1136 assert_eq!(
1137 read2.expect_snapshot_and_fetch(1).await,
1138 all_ok(&data[..1], 1)
1139 );
1140 write1.expect_compare_and_append(&data[1..], 2, 4).await;
1141 assert_eq!(read1.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1142 }
1143
1144 #[mz_persist_proc::test(tokio::test)]
1145 #[cfg_attr(miri, ignore)] async fn invalid_usage(dyncfgs: ConfigUpdates) {
1147 let data = vec![
1148 (("1".to_owned(), "one".to_owned()), 1, 1),
1149 (("2".to_owned(), "two".to_owned()), 2, 1),
1150 (("3".to_owned(), "three".to_owned()), 3, 1),
1151 ];
1152
1153 let shard_id0 = "s00000000-0000-0000-0000-000000000000"
1154 .parse::<ShardId>()
1155 .expect("invalid shard id");
1156 let mut client = new_test_client(&dyncfgs).await;
1157
1158 let (mut write0, mut read0) = client
1159 .expect_open::<String, String, u64, i64>(shard_id0)
1160 .await;
1161
1162 write0.expect_compare_and_append(&data, 0, 4).await;
1163
1164 {
1166 fn codecs(
1167 k: &str,
1168 v: &str,
1169 t: &str,
1170 d: &str,
1171 ) -> (String, String, String, String, Option<CodecConcreteType>) {
1172 (k.to_owned(), v.to_owned(), t.to_owned(), d.to_owned(), None)
1173 }
1174
1175 client.shared_states = Arc::new(StateCache::new_no_metrics());
1176 assert_eq!(
1177 client
1178 .open::<Vec<u8>, String, u64, i64>(
1179 shard_id0,
1180 Arc::new(VecU8Schema),
1181 Arc::new(StringSchema),
1182 Diagnostics::for_tests(),
1183 true,
1184 )
1185 .await
1186 .unwrap_err(),
1187 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1188 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1189 actual: codecs("String", "String", "u64", "i64"),
1190 }))
1191 );
1192 assert_eq!(
1193 client
1194 .open::<String, Vec<u8>, u64, i64>(
1195 shard_id0,
1196 Arc::new(StringSchema),
1197 Arc::new(VecU8Schema),
1198 Diagnostics::for_tests(),
1199 true,
1200 )
1201 .await
1202 .unwrap_err(),
1203 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1204 requested: codecs("String", "Vec<u8>", "u64", "i64"),
1205 actual: codecs("String", "String", "u64", "i64"),
1206 }))
1207 );
1208 assert_eq!(
1209 client
1210 .open::<String, String, i64, i64>(
1211 shard_id0,
1212 Arc::new(StringSchema),
1213 Arc::new(StringSchema),
1214 Diagnostics::for_tests(),
1215 true,
1216 )
1217 .await
1218 .unwrap_err(),
1219 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1220 requested: codecs("String", "String", "i64", "i64"),
1221 actual: codecs("String", "String", "u64", "i64"),
1222 }))
1223 );
1224 assert_eq!(
1225 client
1226 .open::<String, String, u64, u64>(
1227 shard_id0,
1228 Arc::new(StringSchema),
1229 Arc::new(StringSchema),
1230 Diagnostics::for_tests(),
1231 true,
1232 )
1233 .await
1234 .unwrap_err(),
1235 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1236 requested: codecs("String", "String", "u64", "u64"),
1237 actual: codecs("String", "String", "u64", "i64"),
1238 }))
1239 );
1240
1241 assert_eq!(
1245 client
1246 .open_leased_reader::<Vec<u8>, String, u64, i64>(
1247 shard_id0,
1248 Arc::new(VecU8Schema),
1249 Arc::new(StringSchema),
1250 Diagnostics::for_tests(),
1251 true,
1252 )
1253 .await
1254 .unwrap_err(),
1255 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1256 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1257 actual: codecs("String", "String", "u64", "i64"),
1258 }))
1259 );
1260 assert_eq!(
1261 client
1262 .open_writer::<Vec<u8>, String, u64, i64>(
1263 shard_id0,
1264 Arc::new(VecU8Schema),
1265 Arc::new(StringSchema),
1266 Diagnostics::for_tests(),
1267 )
1268 .await
1269 .unwrap_err(),
1270 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1271 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1272 actual: codecs("String", "String", "u64", "i64"),
1273 }))
1274 );
1275 }
1276
1277 {
1279 let snap = read0
1280 .snapshot(Antichain::from_elem(3))
1281 .await
1282 .expect("cannot serve requested as_of");
1283
1284 let shard_id1 = "s11111111-1111-1111-1111-111111111111"
1285 .parse::<ShardId>()
1286 .expect("invalid shard id");
1287 let mut fetcher1 = client
1288 .create_batch_fetcher::<String, String, u64, i64>(
1289 shard_id1,
1290 Default::default(),
1291 Default::default(),
1292 false,
1293 Diagnostics::for_tests(),
1294 )
1295 .await
1296 .unwrap();
1297 for part in snap {
1298 let (part, _lease) = part.into_exchangeable_part();
1299 let res = fetcher1.fetch_leased_part(part).await;
1300 assert_eq!(
1301 res.unwrap_err(),
1302 InvalidUsage::BatchNotFromThisShard {
1303 batch_shard: shard_id0,
1304 handle_shard: shard_id1,
1305 }
1306 );
1307 }
1308 }
1309
1310 {
1312 let ts3 = &data[2];
1313 assert_eq!(ts3.1, 3);
1314 let ts3 = vec![ts3.clone()];
1315
1316 assert_eq!(
1319 write0
1320 .append(&ts3, Antichain::from_elem(4), Antichain::from_elem(5))
1321 .await
1322 .unwrap_err(),
1323 InvalidUsage::UpdateNotBeyondLower {
1324 ts: 3,
1325 lower: Antichain::from_elem(4),
1326 },
1327 );
1328 assert_eq!(
1329 write0
1330 .append(&ts3, Antichain::from_elem(2), Antichain::from_elem(3))
1331 .await
1332 .unwrap_err(),
1333 InvalidUsage::UpdateBeyondUpper {
1334 ts: 3,
1335 expected_upper: Antichain::from_elem(3),
1336 },
1337 );
1338 assert_eq!(
1340 write0
1341 .append(&data[..0], Antichain::from_elem(3), Antichain::from_elem(2))
1342 .await
1343 .unwrap_err(),
1344 InvalidUsage::InvalidBounds {
1345 lower: Antichain::from_elem(3),
1346 upper: Antichain::from_elem(2),
1347 },
1348 );
1349
1350 assert_eq!(
1352 write0
1353 .builder(Antichain::from_elem(3))
1354 .finish(Antichain::from_elem(2))
1355 .await
1356 .unwrap_err(),
1357 InvalidUsage::InvalidBounds {
1358 lower: Antichain::from_elem(3),
1359 upper: Antichain::from_elem(2)
1360 },
1361 );
1362 let batch = write0
1363 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1364 .await
1365 .expect("invalid usage");
1366 assert_eq!(
1367 write0
1368 .append_batch(batch, Antichain::from_elem(4), Antichain::from_elem(5))
1369 .await
1370 .unwrap_err(),
1371 InvalidUsage::InvalidBatchBounds {
1372 batch_lower: Antichain::from_elem(3),
1373 batch_upper: Antichain::from_elem(4),
1374 append_lower: Antichain::from_elem(4),
1375 append_upper: Antichain::from_elem(5),
1376 },
1377 );
1378 let batch = write0
1379 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1380 .await
1381 .expect("invalid usage");
1382 assert_eq!(
1383 write0
1384 .append_batch(batch, Antichain::from_elem(2), Antichain::from_elem(3))
1385 .await
1386 .unwrap_err(),
1387 InvalidUsage::InvalidBatchBounds {
1388 batch_lower: Antichain::from_elem(3),
1389 batch_upper: Antichain::from_elem(4),
1390 append_lower: Antichain::from_elem(2),
1391 append_upper: Antichain::from_elem(3),
1392 },
1393 );
1394 let batch = write0
1395 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1396 .await
1397 .expect("invalid usage");
1398 assert!(matches!(
1401 write0
1402 .append_batch(batch, Antichain::from_elem(3), Antichain::from_elem(3))
1403 .await
1404 .unwrap_err(),
1405 InvalidUsage::InvalidEmptyTimeInterval { .. }
1406 ));
1407 }
1408 }
1409
1410 #[mz_persist_proc::test(tokio::test)]
1411 #[cfg_attr(miri, ignore)] async fn multiple_shards(dyncfgs: ConfigUpdates) {
1413 let data1 = [
1414 (("1".to_owned(), "one".to_owned()), 1, 1),
1415 (("2".to_owned(), "two".to_owned()), 2, 1),
1416 ];
1417
1418 let data2 = [(("1".to_owned(), ()), 1, 1), (("2".to_owned(), ()), 2, 1)];
1419
1420 let client = new_test_client(&dyncfgs).await;
1421
1422 let (mut write1, mut read1) = client
1423 .expect_open::<String, String, u64, i64>(ShardId::new())
1424 .await;
1425
1426 let (mut write2, mut read2) = client
1429 .expect_open::<String, (), u64, i64>(ShardId::new())
1430 .await;
1431
1432 write1
1433 .expect_compare_and_append(&data1[..], u64::minimum(), 3)
1434 .await;
1435
1436 write2
1437 .expect_compare_and_append(&data2[..], u64::minimum(), 3)
1438 .await;
1439
1440 assert_eq!(
1441 read1.expect_snapshot_and_fetch(2).await,
1442 all_ok(&data1[..], 2)
1443 );
1444
1445 assert_eq!(
1446 read2.expect_snapshot_and_fetch(2).await,
1447 all_ok(&data2[..], 2)
1448 );
1449 }
1450
1451 #[mz_persist_proc::test(tokio::test)]
1452 #[cfg_attr(miri, ignore)] async fn fetch_upper(dyncfgs: ConfigUpdates) {
1454 let data = [
1455 (("1".to_owned(), "one".to_owned()), 1, 1),
1456 (("2".to_owned(), "two".to_owned()), 2, 1),
1457 ];
1458
1459 let client = new_test_client(&dyncfgs).await;
1460
1461 let shard_id = ShardId::new();
1462
1463 let (mut write1, _read1) = client
1464 .expect_open::<String, String, u64, i64>(shard_id)
1465 .await;
1466
1467 let (mut write2, _read2) = client
1468 .expect_open::<String, String, u64, i64>(shard_id)
1469 .await;
1470
1471 write1
1472 .expect_append(&data[..], write1.upper().clone(), vec![3])
1473 .await;
1474
1475 assert_eq!(write2.fetch_recent_upper().await, &Antichain::from_elem(3));
1477
1478 assert_eq!(write2.upper(), &Antichain::from_elem(3));
1481 }
1482
1483 #[mz_persist_proc::test(tokio::test)]
1484 #[cfg_attr(miri, ignore)] async fn append_with_invalid_upper(dyncfgs: ConfigUpdates) {
1486 let data = [
1487 (("1".to_owned(), "one".to_owned()), 1, 1),
1488 (("2".to_owned(), "two".to_owned()), 2, 1),
1489 ];
1490
1491 let client = new_test_client(&dyncfgs).await;
1492
1493 let shard_id = ShardId::new();
1494
1495 let (mut write, _read) = client
1496 .expect_open::<String, String, u64, i64>(shard_id)
1497 .await;
1498
1499 write
1500 .expect_append(&data[..], write.upper().clone(), vec![3])
1501 .await;
1502
1503 let data = [
1504 (("5".to_owned(), "fünf".to_owned()), 5, 1),
1505 (("6".to_owned(), "sechs".to_owned()), 6, 1),
1506 ];
1507 let res = write
1508 .append(
1509 data.iter(),
1510 Antichain::from_elem(5),
1511 Antichain::from_elem(7),
1512 )
1513 .await;
1514 assert_eq!(
1515 res,
1516 Ok(Err(UpperMismatch {
1517 expected: Antichain::from_elem(5),
1518 current: Antichain::from_elem(3)
1519 }))
1520 );
1521
1522 assert_eq!(write.upper(), &Antichain::from_elem(3));
1524 }
1525
1526 #[allow(unused)]
1529 async fn sync_send(dyncfgs: ConfigUpdates) {
1530 mz_ore::test::init_logging();
1531
1532 fn is_send_sync<T: Send + Sync>(_x: T) -> bool {
1533 true
1534 }
1535
1536 let client = new_test_client(&dyncfgs).await;
1537
1538 let (write, read) = client
1539 .expect_open::<String, String, u64, i64>(ShardId::new())
1540 .await;
1541
1542 assert!(is_send_sync(client));
1543 assert!(is_send_sync(write));
1544 assert!(is_send_sync(read));
1545 }
1546
1547 #[mz_persist_proc::test(tokio::test)]
1548 #[cfg_attr(miri, ignore)] async fn compare_and_append(dyncfgs: ConfigUpdates) {
1550 let data = vec![
1551 (("1".to_owned(), "one".to_owned()), 1, 1),
1552 (("2".to_owned(), "two".to_owned()), 2, 1),
1553 (("3".to_owned(), "three".to_owned()), 3, 1),
1554 ];
1555
1556 let id = ShardId::new();
1557 let client = new_test_client(&dyncfgs).await;
1558 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1559
1560 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1561
1562 assert_eq!(write1.upper(), &Antichain::from_elem(u64::minimum()));
1563 assert_eq!(write2.upper(), &Antichain::from_elem(u64::minimum()));
1564 assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1565
1566 write1
1568 .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1569 .await;
1570 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1571
1572 assert_eq!(
1573 read.expect_snapshot_and_fetch(2).await,
1574 all_ok(&data[..2], 2)
1575 );
1576
1577 let res = write2
1579 .compare_and_append(
1580 &data[..2],
1581 Antichain::from_elem(u64::minimum()),
1582 Antichain::from_elem(3),
1583 )
1584 .await;
1585 assert_eq!(
1586 res,
1587 Ok(Err(UpperMismatch {
1588 expected: Antichain::from_elem(u64::minimum()),
1589 current: Antichain::from_elem(3)
1590 }))
1591 );
1592
1593 assert_eq!(write2.upper(), &Antichain::from_elem(3));
1595
1596 write2.expect_compare_and_append(&data[2..], 3, 4).await;
1598
1599 assert_eq!(write2.upper(), &Antichain::from_elem(4));
1600
1601 assert_eq!(read.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1602 }
1603
1604 #[mz_persist_proc::test(tokio::test)]
1605 #[cfg_attr(miri, ignore)] async fn overlapping_append(dyncfgs: ConfigUpdates) {
1607 mz_ore::test::init_logging_default("info");
1608
1609 let data = vec![
1610 (("1".to_owned(), "one".to_owned()), 1, 1),
1611 (("2".to_owned(), "two".to_owned()), 2, 1),
1612 (("3".to_owned(), "three".to_owned()), 3, 1),
1613 (("4".to_owned(), "vier".to_owned()), 4, 1),
1614 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1615 ];
1616
1617 let id = ShardId::new();
1618 let client = new_test_client(&dyncfgs).await;
1619
1620 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1621
1622 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1623
1624 let mut listen = read.clone("").await.expect_listen(0).await;
1626
1627 write1
1629 .expect_append(&data[..2], write1.upper().clone(), vec![3])
1630 .await;
1631 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1632
1633 write2
1635 .expect_append(&data[..4], write2.upper().clone(), vec![5])
1636 .await;
1637 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1638
1639 write1
1641 .expect_append(&data[2..5], write1.upper().clone(), vec![6])
1642 .await;
1643 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1644
1645 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1646
1647 assert_eq!(
1648 listen.read_until(&6).await,
1649 (all_ok(&data[..], 1), Antichain::from_elem(6))
1650 );
1651 }
1652
1653 #[mz_persist_proc::test(tokio::test)]
1656 #[cfg_attr(miri, ignore)] async fn contiguous_append(dyncfgs: ConfigUpdates) {
1658 let data = vec![
1659 (("1".to_owned(), "one".to_owned()), 1, 1),
1660 (("2".to_owned(), "two".to_owned()), 2, 1),
1661 (("3".to_owned(), "three".to_owned()), 3, 1),
1662 (("4".to_owned(), "vier".to_owned()), 4, 1),
1663 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1664 ];
1665
1666 let id = ShardId::new();
1667 let client = new_test_client(&dyncfgs).await;
1668
1669 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1670
1671 write
1673 .expect_append(&data[..2], write.upper().clone(), vec![3])
1674 .await;
1675 assert_eq!(write.upper(), &Antichain::from_elem(3));
1676
1677 let result = write
1680 .append(
1681 &data[4..5],
1682 Antichain::from_elem(5),
1683 Antichain::from_elem(6),
1684 )
1685 .await;
1686 assert_eq!(
1687 result,
1688 Ok(Err(UpperMismatch {
1689 expected: Antichain::from_elem(5),
1690 current: Antichain::from_elem(3)
1691 }))
1692 );
1693
1694 write.expect_append(&data[2..5], vec![3], vec![6]).await;
1696 assert_eq!(write.upper(), &Antichain::from_elem(6));
1697
1698 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1699 }
1700
1701 #[mz_persist_proc::test(tokio::test)]
1704 #[cfg_attr(miri, ignore)] async fn noncontiguous_append_per_writer(dyncfgs: ConfigUpdates) {
1706 let data = vec![
1707 (("1".to_owned(), "one".to_owned()), 1, 1),
1708 (("2".to_owned(), "two".to_owned()), 2, 1),
1709 (("3".to_owned(), "three".to_owned()), 3, 1),
1710 (("4".to_owned(), "vier".to_owned()), 4, 1),
1711 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1712 ];
1713
1714 let id = ShardId::new();
1715 let client = new_test_client(&dyncfgs).await;
1716
1717 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1718
1719 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1720
1721 write1
1723 .expect_append(&data[..2], write1.upper().clone(), vec![3])
1724 .await;
1725 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1726
1727 write2.upper = Antichain::from_elem(3);
1729 write2
1730 .expect_append(&data[2..4], write2.upper().clone(), vec![5])
1731 .await;
1732 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1733
1734 write1.upper = Antichain::from_elem(5);
1736 write1
1737 .expect_append(&data[4..5], write1.upper().clone(), vec![6])
1738 .await;
1739 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1740
1741 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1742 }
1743
1744 #[mz_persist_proc::test(tokio::test)]
1747 #[cfg_attr(miri, ignore)] async fn contiguous_compare_and_append(dyncfgs: ConfigUpdates) {
1749 let data = vec![
1750 (("1".to_owned(), "one".to_owned()), 1, 1),
1751 (("2".to_owned(), "two".to_owned()), 2, 1),
1752 (("3".to_owned(), "three".to_owned()), 3, 1),
1753 (("4".to_owned(), "vier".to_owned()), 4, 1),
1754 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1755 ];
1756
1757 let id = ShardId::new();
1758 let client = new_test_client(&dyncfgs).await;
1759
1760 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1761
1762 write.expect_compare_and_append(&data[..2], 0, 3).await;
1764 assert_eq!(write.upper(), &Antichain::from_elem(3));
1765
1766 let result = write
1769 .compare_and_append(
1770 &data[4..5],
1771 Antichain::from_elem(5),
1772 Antichain::from_elem(6),
1773 )
1774 .await;
1775 assert_eq!(
1776 result,
1777 Ok(Err(UpperMismatch {
1778 expected: Antichain::from_elem(5),
1779 current: Antichain::from_elem(3)
1780 }))
1781 );
1782
1783 write.expect_compare_and_append(&data[2..5], 3, 6).await;
1786 assert_eq!(write.upper(), &Antichain::from_elem(6));
1787
1788 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1789 }
1790
1791 #[mz_persist_proc::test(tokio::test)]
1794 #[cfg_attr(miri, ignore)] async fn noncontiguous_compare_and_append_per_writer(dyncfgs: ConfigUpdates) {
1796 let data = vec![
1797 (("1".to_owned(), "one".to_owned()), 1, 1),
1798 (("2".to_owned(), "two".to_owned()), 2, 1),
1799 (("3".to_owned(), "three".to_owned()), 3, 1),
1800 (("4".to_owned(), "vier".to_owned()), 4, 1),
1801 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1802 ];
1803
1804 let id = ShardId::new();
1805 let client = new_test_client(&dyncfgs).await;
1806
1807 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1808
1809 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1810
1811 write1.expect_compare_and_append(&data[..2], 0, 3).await;
1813 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1814
1815 write2.expect_compare_and_append(&data[2..4], 3, 5).await;
1817 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1818
1819 write1.expect_compare_and_append(&data[4..5], 5, 6).await;
1821 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1822
1823 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1824 }
1825
1826 #[mz_ore::test]
1827 fn fmt_ids() {
1828 assert_eq!(
1829 format!("{}", LeasedReaderId([0u8; 16])),
1830 "r00000000-0000-0000-0000-000000000000"
1831 );
1832 assert_eq!(
1833 format!("{:?}", LeasedReaderId([0u8; 16])),
1834 "LeasedReaderId(00000000-0000-0000-0000-000000000000)"
1835 );
1836 }
1837
1838 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
1839 #[cfg_attr(miri, ignore)] async fn concurrency(dyncfgs: ConfigUpdates) {
1841 let data = DataGenerator::small();
1842
1843 const NUM_WRITERS: usize = 2;
1844 let id = ShardId::new();
1845 let client = new_test_client(&dyncfgs).await;
1846 let mut handles = Vec::<mz_ore::task::JoinHandle<()>>::new();
1847 for idx in 0..NUM_WRITERS {
1848 let (data, client) = (data.clone(), client.clone());
1849
1850 let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel(1);
1851
1852 let client1 = client.clone();
1853 let handle = mz_ore::task::spawn(|| format!("writer-{}", idx), async move {
1854 let (write, _) = client1.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1855 let mut current_upper = 0;
1856 for batch in data.batches() {
1857 let new_upper = match batch.get(batch.len() - 1) {
1858 Some((_, max_ts, _)) => u64::decode(max_ts) + 1,
1859 None => continue,
1860 };
1861 if PartialOrder::less_equal(&Antichain::from_elem(new_upper), write.upper()) {
1876 continue;
1877 }
1878
1879 let current_upper_chain = Antichain::from_elem(current_upper);
1880 current_upper = new_upper;
1881 let new_upper_chain = Antichain::from_elem(new_upper);
1882 let mut builder = write.builder(current_upper_chain);
1883
1884 for ((k, v), t, d) in batch.iter() {
1885 builder
1886 .add(&k.to_vec(), &v.to_vec(), &u64::decode(t), &i64::decode(d))
1887 .await
1888 .expect("invalid usage");
1889 }
1890
1891 let batch = builder
1892 .finish(new_upper_chain)
1893 .await
1894 .expect("invalid usage");
1895
1896 match batch_tx.send(batch).await {
1897 Ok(_) => (),
1898 Err(e) => panic!("send error: {}", e),
1899 }
1900 }
1901 });
1902 handles.push(handle);
1903
1904 let handle = mz_ore::task::spawn(|| format!("appender-{}", idx), async move {
1905 let (mut write, _) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1906
1907 while let Some(batch) = batch_rx.recv().await {
1908 let lower = batch.lower().clone();
1909 let upper = batch.upper().clone();
1910 write
1911 .append_batch(batch, lower, upper)
1912 .await
1913 .expect("invalid usage")
1914 .expect("unexpected upper");
1915 }
1916 });
1917 handles.push(handle);
1918 }
1919
1920 for handle in handles {
1921 let () = handle.await.expect("task failed");
1922 }
1923
1924 let expected = data.records().collect::<Vec<_>>();
1925 let max_ts = expected.last().map(|(_, t, _)| *t).unwrap_or_default();
1926 let (_, mut read) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1927 assert_eq!(
1928 read.expect_snapshot_and_fetch(max_ts).await,
1929 all_ok(expected.iter(), max_ts)
1930 );
1931 }
1932
1933 #[mz_persist_proc::test(tokio::test)]
1937 #[cfg_attr(miri, ignore)] async fn regression_blocking_reads(dyncfgs: ConfigUpdates) {
1939 let waker = noop_waker();
1940 let mut cx = Context::from_waker(&waker);
1941
1942 let data = [
1943 (("1".to_owned(), "one".to_owned()), 1, 1),
1944 (("2".to_owned(), "two".to_owned()), 2, 1),
1945 (("3".to_owned(), "three".to_owned()), 3, 1),
1946 ];
1947
1948 let id = ShardId::new();
1949 let client = new_test_client(&dyncfgs).await;
1950 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1951
1952 let mut listen = read.clone("").await.expect_listen(1).await;
1954 let mut listen_next = Box::pin(listen.fetch_next());
1955 for _ in 0..100 {
1959 assert!(
1960 Pin::new(&mut listen_next).poll(&mut cx).is_pending(),
1961 "listen::next unexpectedly ready"
1962 );
1963 }
1964
1965 write
1967 .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1968 .await;
1969
1970 assert_eq!(
1973 listen_next.await,
1974 vec![
1975 ListenEvent::Updates(vec![((Ok("2".to_owned()), Ok("two".to_owned())), 2, 1)]),
1976 ListenEvent::Progress(Antichain::from_elem(3)),
1977 ]
1978 );
1979
1980 let mut snap = Box::pin(read.expect_snapshot_and_fetch(3));
1994 for _ in 0..100 {
1995 assert!(
1996 Pin::new(&mut snap).poll(&mut cx).is_pending(),
1997 "snapshot unexpectedly ready"
1998 );
1999 }
2000
2001 write.expect_compare_and_append(&data[2..], 3, 4).await;
2003
2004 assert_eq!(snap.await, all_ok(&data[..], 3));
2006 }
2007
2008 #[mz_persist_proc::test(tokio::test)]
2009 #[cfg_attr(miri, ignore)] async fn heartbeat_task_shutdown(dyncfgs: ConfigUpdates) {
2011 let mut cache = new_test_client_cache(&dyncfgs);
2014 cache
2015 .cfg
2016 .set_config(&READER_LEASE_DURATION, Duration::from_millis(1));
2017 cache.cfg.writer_lease_duration = Duration::from_millis(1);
2018 let (_write, mut read) = cache
2019 .open(PersistLocation::new_in_mem())
2020 .await
2021 .expect("client construction failed")
2022 .expect_open::<(), (), u64, i64>(ShardId::new())
2023 .await;
2024 let mut read_unexpired_state = read
2025 .unexpired_state
2026 .take()
2027 .expect("handle should have unexpired state");
2028 read.expire().await;
2029 for read_heartbeat_task in mem::take(&mut read_unexpired_state._heartbeat_tasks) {
2030 let () = read_heartbeat_task
2031 .await
2032 .expect("task should shutdown cleanly");
2033 }
2034 }
2035
2036 #[mz_persist_proc::test(tokio::test)]
2039 #[cfg_attr(miri, ignore)] async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
2041 let persist_client = new_test_client(&dyncfgs).await;
2042
2043 let shard_id = ShardId::new();
2044 pub const CRITICAL_SINCE: CriticalReaderId =
2045 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2046
2047 let (mut write, mut read) = persist_client
2048 .expect_open::<(), (), u64, i64>(shard_id)
2049 .await;
2050
2051 let () = read.downgrade_since(&Antichain::new()).await;
2054 let () = write.advance_upper(&Antichain::new()).await;
2055
2056 let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
2057 .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
2058 .await
2059 .expect("invalid persist usage");
2060
2061 let epoch = since_handle.opaque().clone();
2062 let new_since = Antichain::new();
2063 let downgrade = since_handle
2064 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2065 .await;
2066
2067 assert!(
2068 downgrade.is_ok(),
2069 "downgrade of critical handle must succeed"
2070 );
2071
2072 let finalize = persist_client
2073 .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2074 .await;
2075
2076 assert_ok!(finalize, "finalization must succeed");
2077
2078 let is_finalized = persist_client
2079 .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2080 .await
2081 .expect("invalid persist usage");
2082 assert!(is_finalized, "shard must still be finalized");
2083 }
2084
2085 #[mz_persist_proc::test(tokio::test)]
2089 #[cfg_attr(miri, ignore)] async fn finalize_shard(dyncfgs: ConfigUpdates) {
2091 const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
2092 let persist_client = new_test_client(&dyncfgs).await;
2093
2094 let shard_id = ShardId::new();
2095 pub const CRITICAL_SINCE: CriticalReaderId =
2096 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2097
2098 let (mut write, mut read) = persist_client
2099 .expect_open::<(), (), u64, i64>(shard_id)
2100 .await;
2101
2102 let () = write
2104 .compare_and_append(DATA, Antichain::from_elem(0), Antichain::from_elem(1))
2105 .await
2106 .expect("usage should be valid")
2107 .expect("upper should match");
2108
2109 let () = read.downgrade_since(&Antichain::new()).await;
2112 let () = write.advance_upper(&Antichain::new()).await;
2113
2114 let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
2115 .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
2116 .await
2117 .expect("invalid persist usage");
2118
2119 let epoch = since_handle.opaque().clone();
2120 let new_since = Antichain::new();
2121 let downgrade = since_handle
2122 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2123 .await;
2124
2125 assert!(
2126 downgrade.is_ok(),
2127 "downgrade of critical handle must succeed"
2128 );
2129
2130 let finalize = persist_client
2131 .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2132 .await;
2133
2134 assert_ok!(finalize, "finalization must succeed");
2135
2136 let is_finalized = persist_client
2137 .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2138 .await
2139 .expect("invalid persist usage");
2140 assert!(is_finalized, "shard must still be finalized");
2141 }
2142
2143 proptest! {
2144 #![proptest_config(ProptestConfig::with_cases(4096))]
2145
2146 #[mz_ore::test]
2147 #[cfg_attr(miri, ignore)] fn shard_id_protobuf_roundtrip(expect in any::<ShardId>() ) {
2149 let actual = protobuf_roundtrip::<_, String>(&expect);
2150 assert_ok!(actual);
2151 assert_eq!(actual.unwrap(), expect);
2152 }
2153 }
2154}