1#![warn(missing_docs, missing_debug_implementations)]
13#![allow(ungated_async_fn_track_caller)]
17
18use std::fmt::Debug;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use differential_dataflow::difference::Monoid;
23use differential_dataflow::lattice::Lattice;
24use itertools::Itertools;
25use mz_build_info::{BuildInfo, build_info};
26use mz_dyncfg::ConfigSet;
27use mz_ore::instrument;
28use mz_persist::location::{Blob, Consensus, ExternalError};
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::{Codec, Codec64, Opaque};
31use mz_proto::{IntoRustIfSome, ProtoType};
32use semver::Version;
33use timely::order::TotalOrder;
34use timely::progress::{Antichain, Timestamp};
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::batch::{BATCH_DELETE_ENABLED, Batch, BatchBuilder, ProtoBatch};
38use crate::cache::{PersistClientCache, StateCache};
39use crate::cfg::PersistConfig;
40use crate::critical::{CriticalReaderId, SinceHandle};
41use crate::error::InvalidUsage;
42use crate::fetch::{BatchFetcher, BatchFetcherConfig};
43use crate::internal::compact::{CompactConfig, Compactor};
44use crate::internal::encoding::parse_id;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::machine::{Machine, retry_external};
47use crate::internal::state_versions::StateVersions;
48use crate::metrics::Metrics;
49use crate::read::{
50 Cursor, LazyPartStats, LeasedReaderId, READER_LEASE_DURATION, ReadHandle, Since,
51};
52use crate::rpc::PubSubSender;
53use crate::schema::CaESchema;
54use crate::write::{WriteHandle, WriterId};
55
56pub mod async_runtime;
57pub mod batch;
58pub mod cache;
59pub mod cfg;
60pub mod cli {
61 pub mod admin;
63 pub mod args;
64 pub mod bench;
65 pub mod inspect;
66}
67pub mod critical;
68pub mod error;
69pub mod fetch;
70pub mod internals_bench;
71pub mod iter;
72pub mod metrics {
73 pub use crate::internal::metrics::{
75 Metrics, SinkMetrics, SinkWorkerMetrics, UpdateDelta, encode_ts_metric,
76 };
77}
78pub mod operators {
79 use mz_dyncfg::Config;
82
83 pub mod shard_source;
84
85 pub(crate) const STORAGE_SOURCE_DECODE_FUEL: Config<usize> = Config::new(
87 "storage_source_decode_fuel",
88 100_000,
89 "\
90 The maximum amount of work to do in the persist_source mfp_and_decode \
91 operator before yielding.",
92 );
93}
94pub mod read;
95pub mod rpc;
96pub mod schema;
97pub mod stats;
98pub mod usage;
99pub mod write;
100
101mod internal {
103 pub mod apply;
104 pub mod cache;
105 pub mod compact;
106 pub mod encoding;
107 pub mod gc;
108 pub mod machine;
109 pub mod maintenance;
110 pub mod merge;
111 pub mod metrics;
112 pub mod paths;
113 pub mod restore;
114 pub mod service;
115 pub mod state;
116 pub mod state_diff;
117 pub mod state_versions;
118 pub mod trace;
119 pub mod watch;
120
121 #[cfg(test)]
122 pub mod datadriven;
123}
124
125pub const BUILD_INFO: BuildInfo = build_info!();
127
128pub use mz_persist_types::{PersistLocation, ShardId};
130
131pub use crate::internal::encoding::Schemas;
132
133#[derive(Clone, Debug)]
136pub struct Diagnostics {
137 pub shard_name: String,
139 pub handle_purpose: String,
141}
142
143impl Diagnostics {
144 pub fn from_purpose(handle_purpose: &str) -> Self {
146 Self {
147 shard_name: "unknown".to_string(),
148 handle_purpose: handle_purpose.to_string(),
149 }
150 }
151
152 pub fn for_tests() -> Self {
154 Self {
155 shard_name: "test-shard-name".to_string(),
156 handle_purpose: "test-purpose".to_string(),
157 }
158 }
159}
160
161#[derive(Debug, Clone)]
182pub struct PersistClient {
183 cfg: PersistConfig,
184 blob: Arc<dyn Blob>,
185 consensus: Arc<dyn Consensus>,
186 metrics: Arc<Metrics>,
187 isolated_runtime: Arc<IsolatedRuntime>,
188 shared_states: Arc<StateCache>,
189 pubsub_sender: Arc<dyn PubSubSender>,
190}
191
192impl PersistClient {
193 pub fn new(
199 cfg: PersistConfig,
200 blob: Arc<dyn Blob>,
201 consensus: Arc<dyn Consensus>,
202 metrics: Arc<Metrics>,
203 isolated_runtime: Arc<IsolatedRuntime>,
204 shared_states: Arc<StateCache>,
205 pubsub_sender: Arc<dyn PubSubSender>,
206 ) -> Result<Self, ExternalError> {
207 Ok(PersistClient {
210 cfg,
211 blob,
212 consensus,
213 metrics,
214 isolated_runtime,
215 shared_states,
216 pubsub_sender,
217 })
218 }
219
220 pub async fn new_for_tests() -> Self {
222 let cache = PersistClientCache::new_no_metrics();
223 cache
224 .open(PersistLocation::new_in_mem())
225 .await
226 .expect("in-mem location is valid")
227 }
228
229 pub fn dyncfgs(&self) -> &ConfigSet {
231 &self.cfg.configs
232 }
233
234 async fn make_machine<K, V, T, D>(
235 &self,
236 shard_id: ShardId,
237 diagnostics: Diagnostics,
238 ) -> Result<Machine<K, V, T, D>, InvalidUsage<T>>
239 where
240 K: Debug + Codec,
241 V: Debug + Codec,
242 T: Timestamp + Lattice + Codec64 + Sync,
243 D: Monoid + Codec64 + Send + Sync,
244 {
245 let state_versions = StateVersions::new(
246 self.cfg.clone(),
247 Arc::clone(&self.consensus),
248 Arc::clone(&self.blob),
249 Arc::clone(&self.metrics),
250 );
251 let machine = Machine::<K, V, T, D>::new(
252 self.cfg.clone(),
253 shard_id,
254 Arc::clone(&self.metrics),
255 Arc::new(state_versions),
256 Arc::clone(&self.shared_states),
257 Arc::clone(&self.pubsub_sender),
258 Arc::clone(&self.isolated_runtime),
259 diagnostics.clone(),
260 )
261 .await?;
262 Ok(machine)
263 }
264
265 #[instrument(level = "debug", fields(shard = %shard_id))]
283 pub async fn open<K, V, T, D>(
284 &self,
285 shard_id: ShardId,
286 key_schema: Arc<K::Schema>,
287 val_schema: Arc<V::Schema>,
288 diagnostics: Diagnostics,
289 use_critical_since: bool,
290 ) -> Result<(WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>), InvalidUsage<T>>
291 where
292 K: Debug + Codec,
293 V: Debug + Codec,
294 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
295 D: Monoid + Ord + Codec64 + Send + Sync,
296 {
297 Ok((
298 self.open_writer(
299 shard_id,
300 Arc::clone(&key_schema),
301 Arc::clone(&val_schema),
302 diagnostics.clone(),
303 )
304 .await?,
305 self.open_leased_reader(
306 shard_id,
307 key_schema,
308 val_schema,
309 diagnostics,
310 use_critical_since,
311 )
312 .await?,
313 ))
314 }
315
316 #[instrument(level = "debug", fields(shard = %shard_id))]
325 pub async fn open_leased_reader<K, V, T, D>(
326 &self,
327 shard_id: ShardId,
328 key_schema: Arc<K::Schema>,
329 val_schema: Arc<V::Schema>,
330 diagnostics: Diagnostics,
331 use_critical_since: bool,
332 ) -> Result<ReadHandle<K, V, T, D>, InvalidUsage<T>>
333 where
334 K: Debug + Codec,
335 V: Debug + Codec,
336 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
337 D: Monoid + Codec64 + Send + Sync,
338 {
339 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
340 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
341
342 let reader_id = LeasedReaderId::new();
343 let heartbeat_ts = (self.cfg.now)();
344 let (reader_state, maintenance) = machine
345 .register_leased_reader(
346 &reader_id,
347 &diagnostics.handle_purpose,
348 READER_LEASE_DURATION.get(&self.cfg),
349 heartbeat_ts,
350 use_critical_since,
351 )
352 .await;
353 maintenance.start_performing(&machine, &gc);
354 let schemas = Schemas {
355 id: None,
356 key: key_schema,
357 val: val_schema,
358 };
359 let reader = ReadHandle::new(
360 self.cfg.clone(),
361 Arc::clone(&self.metrics),
362 machine,
363 gc,
364 Arc::clone(&self.blob),
365 reader_id,
366 schemas,
367 reader_state,
368 )
369 .await;
370
371 Ok(reader)
372 }
373
374 #[instrument(level = "debug", fields(shard = %shard_id))]
376 pub async fn create_batch_fetcher<K, V, T, D>(
377 &self,
378 shard_id: ShardId,
379 key_schema: Arc<K::Schema>,
380 val_schema: Arc<V::Schema>,
381 is_transient: bool,
382 diagnostics: Diagnostics,
383 ) -> Result<BatchFetcher<K, V, T, D>, InvalidUsage<T>>
384 where
385 K: Debug + Codec,
386 V: Debug + Codec,
387 T: Timestamp + Lattice + Codec64 + Sync,
388 D: Monoid + Codec64 + Send + Sync,
389 {
390 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
391 let read_schemas = Schemas {
392 id: None,
393 key: key_schema,
394 val: val_schema,
395 };
396 let schema_cache = machine.applier.schema_cache();
397 let fetcher = BatchFetcher {
398 cfg: BatchFetcherConfig::new(&self.cfg),
399 blob: Arc::clone(&self.blob),
400 metrics: Arc::clone(&self.metrics),
401 shard_metrics: Arc::clone(&machine.applier.shard_metrics),
402 shard_id,
403 read_schemas,
404 schema_cache,
405 is_transient,
406 _phantom: PhantomData,
407 };
408
409 Ok(fetcher)
410 }
411
412 pub const CONTROLLER_CRITICAL_SINCE: CriticalReaderId =
434 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
435
436 #[instrument(level = "debug", fields(shard = %shard_id))]
457 pub async fn open_critical_since<K, V, T, D, O>(
458 &self,
459 shard_id: ShardId,
460 reader_id: CriticalReaderId,
461 diagnostics: Diagnostics,
462 ) -> Result<SinceHandle<K, V, T, D, O>, InvalidUsage<T>>
463 where
464 K: Debug + Codec,
465 V: Debug + Codec,
466 T: Timestamp + Lattice + Codec64 + Sync,
467 D: Monoid + Codec64 + Send + Sync,
468 O: Opaque + Codec64,
469 {
470 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
471 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
472
473 let (state, maintenance) = machine
474 .register_critical_reader::<O>(&reader_id, &diagnostics.handle_purpose)
475 .await;
476 maintenance.start_performing(&machine, &gc);
477 let handle = SinceHandle::new(
478 machine,
479 gc,
480 reader_id,
481 state.since,
482 Codec64::decode(state.opaque.0),
483 );
484
485 Ok(handle)
486 }
487
488 #[instrument(level = "debug", fields(shard = %shard_id))]
493 pub async fn open_writer<K, V, T, D>(
494 &self,
495 shard_id: ShardId,
496 key_schema: Arc<K::Schema>,
497 val_schema: Arc<V::Schema>,
498 diagnostics: Diagnostics,
499 ) -> Result<WriteHandle<K, V, T, D>, InvalidUsage<T>>
500 where
501 K: Debug + Codec,
502 V: Debug + Codec,
503 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
504 D: Monoid + Ord + Codec64 + Send + Sync,
505 {
506 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
507 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
508
509 let schema_id = machine.find_schema(&*key_schema, &*val_schema);
514
515 let writer_id = WriterId::new();
516 let schemas = Schemas {
517 id: schema_id,
518 key: key_schema,
519 val: val_schema,
520 };
521 let writer = WriteHandle::new(
522 self.cfg.clone(),
523 Arc::clone(&self.metrics),
524 machine,
525 gc,
526 Arc::clone(&self.blob),
527 writer_id,
528 &diagnostics.handle_purpose,
529 schemas,
530 );
531 Ok(writer)
532 }
533
534 #[instrument(level = "debug", fields(shard = %shard_id))]
544 pub async fn batch_builder<K, V, T, D>(
545 &self,
546 shard_id: ShardId,
547 write_schemas: Schemas<K, V>,
548 lower: Antichain<T>,
549 max_runs: Option<usize>,
550 ) -> BatchBuilder<K, V, T, D>
551 where
552 K: Debug + Codec,
553 V: Debug + Codec,
554 T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
555 D: Monoid + Ord + Codec64 + Send + Sync,
556 {
557 let mut compact_cfg = CompactConfig::new(&self.cfg, shard_id);
558 compact_cfg.batch.max_runs = max_runs;
559 WriteHandle::builder_inner(
560 &self.cfg,
561 compact_cfg,
562 Arc::clone(&self.metrics),
563 self.metrics.shards.shard(&shard_id, "peek_stash"),
564 &self.metrics.user,
565 Arc::clone(&self.isolated_runtime),
566 Arc::clone(&self.blob),
567 shard_id,
568 write_schemas,
569 lower,
570 )
571 }
572
573 pub fn batch_from_transmittable_batch<K, V, T, D>(
582 &self,
583 shard_id: &ShardId,
584 batch: ProtoBatch,
585 ) -> Batch<K, V, T, D>
586 where
587 K: Debug + Codec,
588 V: Debug + Codec,
589 T: Timestamp + Lattice + Codec64 + Sync,
590 D: Monoid + Ord + Codec64 + Send + Sync,
591 {
592 let batch_shard_id: ShardId = batch
593 .shard_id
594 .into_rust()
595 .expect("valid transmittable batch");
596 assert_eq!(&batch_shard_id, shard_id);
597
598 let shard_metrics = self.metrics.shards.shard(shard_id, "peek_stash");
599
600 let ret = Batch {
601 batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
602 metrics: Arc::clone(&self.metrics),
603 shard_metrics,
604 version: Version::parse(&batch.version).expect("valid transmittable batch"),
605 schemas: (batch.key_schema, batch.val_schema),
606 batch: batch
607 .batch
608 .into_rust_if_some("ProtoBatch::batch")
609 .expect("valid transmittable batch"),
610 blob: Arc::clone(&self.blob),
611 _phantom: std::marker::PhantomData,
612 };
613
614 assert_eq!(&ret.shard_id(), shard_id);
615 ret
616 }
617
618 #[allow(clippy::unused_async)]
633 pub async fn read_batches_consolidated<K, V, T, D>(
634 &mut self,
635 shard_id: ShardId,
636 as_of: Antichain<T>,
637 read_schemas: Schemas<K, V>,
638 batches: Vec<Batch<K, V, T, D>>,
639 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
640 memory_budget_bytes: usize,
641 ) -> Result<Cursor<K, V, T, D, Vec<Batch<K, V, T, D>>>, Since<T>>
642 where
643 K: Debug + Codec + Ord,
644 V: Debug + Codec + Ord,
645 T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
646 D: Monoid + Ord + Codec64 + Send + Sync,
647 {
648 let shard_metrics = self.metrics.shards.shard(&shard_id, "peek_stash");
649
650 let hollow_batches = batches.iter().map(|b| b.batch.clone()).collect_vec();
651
652 ReadHandle::read_batches_consolidated(
653 &self.cfg,
654 Arc::clone(&self.metrics),
655 shard_metrics,
656 self.metrics.read.snapshot.clone(),
657 Arc::clone(&self.blob),
658 shard_id,
659 as_of,
660 read_schemas,
661 &hollow_batches,
662 batches,
663 should_fetch_part,
664 memory_budget_bytes,
665 )
666 }
667
668 pub async fn get_schema<K, V, T, D>(
670 &self,
671 shard_id: ShardId,
672 schema_id: SchemaId,
673 diagnostics: Diagnostics,
674 ) -> Result<Option<(K::Schema, V::Schema)>, InvalidUsage<T>>
675 where
676 K: Debug + Codec,
677 V: Debug + Codec,
678 T: Timestamp + Lattice + Codec64 + Sync,
679 D: Monoid + Codec64 + Send + Sync,
680 {
681 let machine = self
682 .make_machine::<K, V, T, D>(shard_id, diagnostics)
683 .await?;
684 Ok(machine.get_schema(schema_id))
685 }
686
687 pub async fn latest_schema<K, V, T, D>(
689 &self,
690 shard_id: ShardId,
691 diagnostics: Diagnostics,
692 ) -> Result<Option<(SchemaId, K::Schema, V::Schema)>, InvalidUsage<T>>
693 where
694 K: Debug + Codec,
695 V: Debug + Codec,
696 T: Timestamp + Lattice + Codec64 + Sync,
697 D: Monoid + Codec64 + Send + Sync,
698 {
699 let machine = self
700 .make_machine::<K, V, T, D>(shard_id, diagnostics)
701 .await?;
702 Ok(machine.latest_schema())
703 }
704
705 pub async fn register_schema<K, V, T, D>(
717 &self,
718 shard_id: ShardId,
719 key_schema: &K::Schema,
720 val_schema: &V::Schema,
721 diagnostics: Diagnostics,
722 ) -> Result<Option<SchemaId>, InvalidUsage<T>>
723 where
724 K: Debug + Codec,
725 V: Debug + Codec,
726 T: Timestamp + Lattice + Codec64 + Sync,
727 D: Monoid + Codec64 + Send + Sync,
728 {
729 let machine = self
730 .make_machine::<K, V, T, D>(shard_id, diagnostics)
731 .await?;
732 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
733
734 let (schema_id, maintenance) = machine.register_schema(key_schema, val_schema).await;
735 maintenance.start_performing(&machine, &gc);
736
737 Ok(schema_id)
738 }
739
740 pub async fn compare_and_evolve_schema<K, V, T, D>(
751 &self,
752 shard_id: ShardId,
753 expected: SchemaId,
754 key_schema: &K::Schema,
755 val_schema: &V::Schema,
756 diagnostics: Diagnostics,
757 ) -> Result<CaESchema<K, V>, InvalidUsage<T>>
758 where
759 K: Debug + Codec,
760 V: Debug + Codec,
761 T: Timestamp + Lattice + Codec64 + Sync,
762 D: Monoid + Codec64 + Send + Sync,
763 {
764 let machine = self
765 .make_machine::<K, V, T, D>(shard_id, diagnostics)
766 .await?;
767 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
768 let (res, maintenance) = machine
769 .compare_and_evolve_schema(expected, key_schema, val_schema)
770 .await;
771 maintenance.start_performing(&machine, &gc);
772 Ok(res)
773 }
774
775 pub async fn is_finalized<K, V, T, D>(
779 &self,
780 shard_id: ShardId,
781 diagnostics: Diagnostics,
782 ) -> Result<bool, InvalidUsage<T>>
783 where
784 K: Debug + Codec,
785 V: Debug + Codec,
786 T: Timestamp + Lattice + Codec64 + Sync,
787 D: Monoid + Codec64 + Send + Sync,
788 {
789 let machine = self
790 .make_machine::<K, V, T, D>(shard_id, diagnostics)
791 .await?;
792 Ok(machine.is_finalized())
793 }
794
795 #[instrument(level = "debug", fields(shard = %shard_id))]
806 pub async fn finalize_shard<K, V, T, D>(
807 &self,
808 shard_id: ShardId,
809 diagnostics: Diagnostics,
810 ) -> Result<(), InvalidUsage<T>>
811 where
812 K: Debug + Codec,
813 V: Debug + Codec,
814 T: Timestamp + Lattice + Codec64 + Sync,
815 D: Monoid + Codec64 + Send + Sync,
816 {
817 let machine = self
818 .make_machine::<K, V, T, D>(shard_id, diagnostics)
819 .await?;
820
821 let maintenance = machine.become_tombstone().await?;
822 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
823
824 let () = maintenance.perform(&machine, &gc).await;
825
826 Ok(())
827 }
828
829 pub async fn upgrade_version<K, V, T, D>(
832 &self,
833 shard_id: ShardId,
834 diagnostics: Diagnostics,
835 ) -> Result<(), InvalidUsage<T>>
836 where
837 K: Debug + Codec,
838 V: Debug + Codec,
839 T: Timestamp + Lattice + Codec64 + Sync,
840 D: Monoid + Codec64 + Send + Sync,
841 {
842 let machine = self
843 .make_machine::<K, V, T, D>(shard_id, diagnostics)
844 .await?;
845
846 match machine.upgrade_version().await {
847 Ok(maintenance) => {
848 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
849 let () = maintenance.perform(&machine, &gc).await;
850 Ok(())
851 }
852 Err(version) => Err(InvalidUsage::IncompatibleVersion { version }),
853 }
854 }
855
856 pub async fn inspect_shard<T: Timestamp + Lattice + Codec64>(
862 &self,
863 shard_id: &ShardId,
864 ) -> Result<impl serde::Serialize, anyhow::Error> {
865 let state_versions = StateVersions::new(
866 self.cfg.clone(),
867 Arc::clone(&self.consensus),
868 Arc::clone(&self.blob),
869 Arc::clone(&self.metrics),
870 );
871 let versions = state_versions.fetch_all_live_diffs(shard_id).await;
875 if versions.is_empty() {
876 return Err(anyhow::anyhow!("{} does not exist", shard_id));
877 }
878 let state = state_versions
879 .fetch_current_state::<T>(shard_id, versions)
880 .await;
881 let state = state.check_ts_codec(shard_id)?;
882 Ok(state)
883 }
884
885 #[cfg(test)]
887 #[track_caller]
888 pub async fn expect_open<K, V, T, D>(
889 &self,
890 shard_id: ShardId,
891 ) -> (WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>)
892 where
893 K: Debug + Codec,
894 V: Debug + Codec,
895 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
896 D: Monoid + Ord + Codec64 + Send + Sync,
897 K::Schema: Default,
898 V::Schema: Default,
899 {
900 self.open(
901 shard_id,
902 Arc::new(K::Schema::default()),
903 Arc::new(V::Schema::default()),
904 Diagnostics::for_tests(),
905 true,
906 )
907 .await
908 .expect("codec mismatch")
909 }
910
911 pub fn metrics(&self) -> &Arc<Metrics> {
915 &self.metrics
916 }
917}
918
919#[cfg(test)]
920mod tests {
921 use std::future::Future;
922 use std::pin::Pin;
923 use std::task::Context;
924 use std::time::Duration;
925
926 use differential_dataflow::consolidation::consolidate_updates;
927 use differential_dataflow::lattice::Lattice;
928 use futures_task::noop_waker;
929 use mz_dyncfg::ConfigUpdates;
930 use mz_ore::assert_ok;
931 use mz_persist::indexed::encoding::BlobTraceBatchPart;
932 use mz_persist::workload::DataGenerator;
933 use mz_persist_types::codec_impls::{StringSchema, VecU8Schema};
934 use mz_proto::protobuf_roundtrip;
935 use proptest::prelude::*;
936 use timely::order::PartialOrder;
937 use timely::progress::Antichain;
938
939 use crate::batch::BLOB_TARGET_SIZE;
940 use crate::cache::PersistClientCache;
941 use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
942 use crate::error::{CodecConcreteType, CodecMismatch, UpperMismatch};
943 use crate::internal::paths::BlobKey;
944 use crate::read::ListenEvent;
945
946 use super::*;
947
948 pub fn new_test_client_cache(dyncfgs: &ConfigUpdates) -> PersistClientCache {
949 let mut cache = PersistClientCache::new_no_metrics();
952 cache.cfg.set_config(&BLOB_TARGET_SIZE, 10);
953 cache
954 .cfg
955 .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 1);
956 dyncfgs.apply(cache.cfg());
957
958 cache.cfg.compaction_enabled = true;
960 cache
961 }
962
963 pub async fn new_test_client(dyncfgs: &ConfigUpdates) -> PersistClient {
964 let cache = new_test_client_cache(dyncfgs);
965 cache
966 .open(PersistLocation::new_in_mem())
967 .await
968 .expect("client construction failed")
969 }
970
971 pub fn all_ok<'a, K, V, T, D, I>(iter: I, as_of: T) -> Vec<((K, V), T, D)>
972 where
973 K: Ord + Clone + 'a,
974 V: Ord + Clone + 'a,
975 T: Timestamp + Lattice + Clone + 'a,
976 D: Monoid + Clone + 'a,
977 I: IntoIterator<Item = &'a ((K, V), T, D)>,
978 {
979 let as_of = Antichain::from_elem(as_of);
980 let mut ret = iter
981 .into_iter()
982 .map(|((k, v), t, d)| {
983 let mut t = t.clone();
984 t.advance_by(as_of.borrow());
985 ((k.clone(), v.clone()), t, d.clone())
986 })
987 .collect();
988 consolidate_updates(&mut ret);
989 ret
990 }
991
992 pub async fn expect_fetch_part<K, V, T, D>(
993 blob: &dyn Blob,
994 key: &BlobKey,
995 metrics: &Metrics,
996 read_schemas: &Schemas<K, V>,
997 ) -> (BlobTraceBatchPart<T>, Vec<((K, V), T, D)>)
998 where
999 K: Codec + Clone,
1000 V: Codec + Clone,
1001 T: Timestamp + Codec64,
1002 D: Codec64,
1003 {
1004 let value = blob
1005 .get(key)
1006 .await
1007 .expect("failed to fetch part")
1008 .expect("missing part");
1009 let mut part =
1010 BlobTraceBatchPart::decode(&value, &metrics.columnar).expect("failed to decode part");
1011 let structured = part
1012 .updates
1013 .into_part::<K, V>(&*read_schemas.key, &*read_schemas.val);
1014 let updates = structured
1015 .decode_iter::<K, V, T, D>(&*read_schemas.key, &*read_schemas.val)
1016 .expect("structured data")
1017 .collect();
1018 (part, updates)
1019 }
1020
1021 #[mz_persist_proc::test(tokio::test)]
1022 #[cfg_attr(miri, ignore)] async fn sanity_check(dyncfgs: ConfigUpdates) {
1024 let data = [
1025 (("1".to_owned(), "one".to_owned()), 1, 1),
1026 (("2".to_owned(), "two".to_owned()), 2, 1),
1027 (("3".to_owned(), "three".to_owned()), 3, 1),
1028 ];
1029
1030 let (mut write, mut read) = new_test_client(&dyncfgs)
1031 .await
1032 .expect_open::<String, String, u64, i64>(ShardId::new())
1033 .await;
1034 assert_eq!(write.upper(), &Antichain::from_elem(u64::minimum()));
1035 assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1036
1037 write
1039 .expect_append(&data[..2], write.upper().clone(), vec![3])
1040 .await;
1041 assert_eq!(write.upper(), &Antichain::from_elem(3));
1042
1043 assert_eq!(
1045 read.expect_snapshot_and_fetch(1).await,
1046 all_ok(&data[..1], 1)
1047 );
1048
1049 let mut listen = read.clone("").await.expect_listen(1).await;
1050
1051 write
1053 .expect_append(&data[2..], write.upper().clone(), vec![4])
1054 .await;
1055 assert_eq!(write.upper(), &Antichain::from_elem(4));
1056
1057 assert_eq!(
1059 listen.read_until(&4).await,
1060 (all_ok(&data[1..], 1), Antichain::from_elem(4))
1061 );
1062
1063 read.downgrade_since(&Antichain::from_elem(2)).await;
1065 assert_eq!(read.since(), &Antichain::from_elem(2));
1066 }
1067
1068 #[mz_persist_proc::test(tokio::test)]
1070 #[cfg_attr(miri, ignore)] async fn open_reader_writer(dyncfgs: ConfigUpdates) {
1072 let data = vec![
1073 (("1".to_owned(), "one".to_owned()), 1, 1),
1074 (("2".to_owned(), "two".to_owned()), 2, 1),
1075 (("3".to_owned(), "three".to_owned()), 3, 1),
1076 ];
1077
1078 let shard_id = ShardId::new();
1079 let client = new_test_client(&dyncfgs).await;
1080 let mut write1 = client
1081 .open_writer::<String, String, u64, i64>(
1082 shard_id,
1083 Arc::new(StringSchema),
1084 Arc::new(StringSchema),
1085 Diagnostics::for_tests(),
1086 )
1087 .await
1088 .expect("codec mismatch");
1089 let mut read1 = client
1090 .open_leased_reader::<String, String, u64, i64>(
1091 shard_id,
1092 Arc::new(StringSchema),
1093 Arc::new(StringSchema),
1094 Diagnostics::for_tests(),
1095 true,
1096 )
1097 .await
1098 .expect("codec mismatch");
1099 let mut read2 = client
1100 .open_leased_reader::<String, String, u64, i64>(
1101 shard_id,
1102 Arc::new(StringSchema),
1103 Arc::new(StringSchema),
1104 Diagnostics::for_tests(),
1105 true,
1106 )
1107 .await
1108 .expect("codec mismatch");
1109 let mut write2 = client
1110 .open_writer::<String, String, u64, i64>(
1111 shard_id,
1112 Arc::new(StringSchema),
1113 Arc::new(StringSchema),
1114 Diagnostics::for_tests(),
1115 )
1116 .await
1117 .expect("codec mismatch");
1118
1119 write2.expect_compare_and_append(&data[..1], 0, 2).await;
1120 assert_eq!(
1121 read2.expect_snapshot_and_fetch(1).await,
1122 all_ok(&data[..1], 1)
1123 );
1124 write1.expect_compare_and_append(&data[1..], 2, 4).await;
1125 assert_eq!(read1.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1126 }
1127
1128 #[mz_persist_proc::test(tokio::test)]
1129 #[cfg_attr(miri, ignore)] async fn invalid_usage(dyncfgs: ConfigUpdates) {
1131 let data = vec![
1132 (("1".to_owned(), "one".to_owned()), 1, 1),
1133 (("2".to_owned(), "two".to_owned()), 2, 1),
1134 (("3".to_owned(), "three".to_owned()), 3, 1),
1135 ];
1136
1137 let shard_id0 = "s00000000-0000-0000-0000-000000000000"
1138 .parse::<ShardId>()
1139 .expect("invalid shard id");
1140 let mut client = new_test_client(&dyncfgs).await;
1141
1142 let (mut write0, mut read0) = client
1143 .expect_open::<String, String, u64, i64>(shard_id0)
1144 .await;
1145
1146 write0.expect_compare_and_append(&data, 0, 4).await;
1147
1148 {
1150 fn codecs(
1151 k: &str,
1152 v: &str,
1153 t: &str,
1154 d: &str,
1155 ) -> (String, String, String, String, Option<CodecConcreteType>) {
1156 (k.to_owned(), v.to_owned(), t.to_owned(), d.to_owned(), None)
1157 }
1158
1159 client.shared_states = Arc::new(StateCache::new_no_metrics());
1160 assert_eq!(
1161 client
1162 .open::<Vec<u8>, String, u64, i64>(
1163 shard_id0,
1164 Arc::new(VecU8Schema),
1165 Arc::new(StringSchema),
1166 Diagnostics::for_tests(),
1167 true,
1168 )
1169 .await
1170 .unwrap_err(),
1171 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1172 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1173 actual: codecs("String", "String", "u64", "i64"),
1174 }))
1175 );
1176 assert_eq!(
1177 client
1178 .open::<String, Vec<u8>, u64, i64>(
1179 shard_id0,
1180 Arc::new(StringSchema),
1181 Arc::new(VecU8Schema),
1182 Diagnostics::for_tests(),
1183 true,
1184 )
1185 .await
1186 .unwrap_err(),
1187 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1188 requested: codecs("String", "Vec<u8>", "u64", "i64"),
1189 actual: codecs("String", "String", "u64", "i64"),
1190 }))
1191 );
1192 assert_eq!(
1193 client
1194 .open::<String, String, i64, i64>(
1195 shard_id0,
1196 Arc::new(StringSchema),
1197 Arc::new(StringSchema),
1198 Diagnostics::for_tests(),
1199 true,
1200 )
1201 .await
1202 .unwrap_err(),
1203 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1204 requested: codecs("String", "String", "i64", "i64"),
1205 actual: codecs("String", "String", "u64", "i64"),
1206 }))
1207 );
1208 assert_eq!(
1209 client
1210 .open::<String, String, u64, u64>(
1211 shard_id0,
1212 Arc::new(StringSchema),
1213 Arc::new(StringSchema),
1214 Diagnostics::for_tests(),
1215 true,
1216 )
1217 .await
1218 .unwrap_err(),
1219 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1220 requested: codecs("String", "String", "u64", "u64"),
1221 actual: codecs("String", "String", "u64", "i64"),
1222 }))
1223 );
1224
1225 assert_eq!(
1229 client
1230 .open_leased_reader::<Vec<u8>, String, u64, i64>(
1231 shard_id0,
1232 Arc::new(VecU8Schema),
1233 Arc::new(StringSchema),
1234 Diagnostics::for_tests(),
1235 true,
1236 )
1237 .await
1238 .unwrap_err(),
1239 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1240 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1241 actual: codecs("String", "String", "u64", "i64"),
1242 }))
1243 );
1244 assert_eq!(
1245 client
1246 .open_writer::<Vec<u8>, String, u64, i64>(
1247 shard_id0,
1248 Arc::new(VecU8Schema),
1249 Arc::new(StringSchema),
1250 Diagnostics::for_tests(),
1251 )
1252 .await
1253 .unwrap_err(),
1254 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1255 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1256 actual: codecs("String", "String", "u64", "i64"),
1257 }))
1258 );
1259 }
1260
1261 {
1263 let snap = read0
1264 .snapshot(Antichain::from_elem(3))
1265 .await
1266 .expect("cannot serve requested as_of");
1267
1268 let shard_id1 = "s11111111-1111-1111-1111-111111111111"
1269 .parse::<ShardId>()
1270 .expect("invalid shard id");
1271 let mut fetcher1 = client
1272 .create_batch_fetcher::<String, String, u64, i64>(
1273 shard_id1,
1274 Default::default(),
1275 Default::default(),
1276 false,
1277 Diagnostics::for_tests(),
1278 )
1279 .await
1280 .unwrap();
1281 for part in snap {
1282 let (part, _lease) = part.into_exchangeable_part();
1283 let res = fetcher1.fetch_leased_part(part).await;
1284 assert_eq!(
1285 res.unwrap_err(),
1286 InvalidUsage::BatchNotFromThisShard {
1287 batch_shard: shard_id0,
1288 handle_shard: shard_id1,
1289 }
1290 );
1291 }
1292 }
1293
1294 {
1296 let ts3 = &data[2];
1297 assert_eq!(ts3.1, 3);
1298 let ts3 = vec![ts3.clone()];
1299
1300 assert_eq!(
1303 write0
1304 .append(&ts3, Antichain::from_elem(4), Antichain::from_elem(5))
1305 .await
1306 .unwrap_err(),
1307 InvalidUsage::UpdateNotBeyondLower {
1308 ts: 3,
1309 lower: Antichain::from_elem(4),
1310 },
1311 );
1312 assert_eq!(
1313 write0
1314 .append(&ts3, Antichain::from_elem(2), Antichain::from_elem(3))
1315 .await
1316 .unwrap_err(),
1317 InvalidUsage::UpdateBeyondUpper {
1318 ts: 3,
1319 expected_upper: Antichain::from_elem(3),
1320 },
1321 );
1322 assert_eq!(
1324 write0
1325 .append(&data[..0], Antichain::from_elem(3), Antichain::from_elem(2))
1326 .await
1327 .unwrap_err(),
1328 InvalidUsage::InvalidBounds {
1329 lower: Antichain::from_elem(3),
1330 upper: Antichain::from_elem(2),
1331 },
1332 );
1333
1334 assert_eq!(
1336 write0
1337 .builder(Antichain::from_elem(3))
1338 .finish(Antichain::from_elem(2))
1339 .await
1340 .unwrap_err(),
1341 InvalidUsage::InvalidBounds {
1342 lower: Antichain::from_elem(3),
1343 upper: Antichain::from_elem(2)
1344 },
1345 );
1346 let batch = write0
1347 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1348 .await
1349 .expect("invalid usage");
1350 assert_eq!(
1351 write0
1352 .append_batch(batch, Antichain::from_elem(4), Antichain::from_elem(5))
1353 .await
1354 .unwrap_err(),
1355 InvalidUsage::InvalidBatchBounds {
1356 batch_lower: Antichain::from_elem(3),
1357 batch_upper: Antichain::from_elem(4),
1358 append_lower: Antichain::from_elem(4),
1359 append_upper: Antichain::from_elem(5),
1360 },
1361 );
1362 let batch = write0
1363 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1364 .await
1365 .expect("invalid usage");
1366 assert_eq!(
1367 write0
1368 .append_batch(batch, Antichain::from_elem(2), Antichain::from_elem(3))
1369 .await
1370 .unwrap_err(),
1371 InvalidUsage::InvalidBatchBounds {
1372 batch_lower: Antichain::from_elem(3),
1373 batch_upper: Antichain::from_elem(4),
1374 append_lower: Antichain::from_elem(2),
1375 append_upper: Antichain::from_elem(3),
1376 },
1377 );
1378 let batch = write0
1379 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1380 .await
1381 .expect("invalid usage");
1382 assert!(matches!(
1385 write0
1386 .append_batch(batch, Antichain::from_elem(3), Antichain::from_elem(3))
1387 .await
1388 .unwrap_err(),
1389 InvalidUsage::InvalidEmptyTimeInterval { .. }
1390 ));
1391 }
1392 }
1393
1394 #[mz_persist_proc::test(tokio::test)]
1395 #[cfg_attr(miri, ignore)] async fn multiple_shards(dyncfgs: ConfigUpdates) {
1397 let data1 = [
1398 (("1".to_owned(), "one".to_owned()), 1, 1),
1399 (("2".to_owned(), "two".to_owned()), 2, 1),
1400 ];
1401
1402 let data2 = [(("1".to_owned(), ()), 1, 1), (("2".to_owned(), ()), 2, 1)];
1403
1404 let client = new_test_client(&dyncfgs).await;
1405
1406 let (mut write1, mut read1) = client
1407 .expect_open::<String, String, u64, i64>(ShardId::new())
1408 .await;
1409
1410 let (mut write2, mut read2) = client
1413 .expect_open::<String, (), u64, i64>(ShardId::new())
1414 .await;
1415
1416 write1
1417 .expect_compare_and_append(&data1[..], u64::minimum(), 3)
1418 .await;
1419
1420 write2
1421 .expect_compare_and_append(&data2[..], u64::minimum(), 3)
1422 .await;
1423
1424 assert_eq!(
1425 read1.expect_snapshot_and_fetch(2).await,
1426 all_ok(&data1[..], 2)
1427 );
1428
1429 assert_eq!(
1430 read2.expect_snapshot_and_fetch(2).await,
1431 all_ok(&data2[..], 2)
1432 );
1433 }
1434
1435 #[mz_persist_proc::test(tokio::test)]
1436 #[cfg_attr(miri, ignore)] async fn fetch_upper(dyncfgs: ConfigUpdates) {
1438 let data = [
1439 (("1".to_owned(), "one".to_owned()), 1, 1),
1440 (("2".to_owned(), "two".to_owned()), 2, 1),
1441 ];
1442
1443 let client = new_test_client(&dyncfgs).await;
1444
1445 let shard_id = ShardId::new();
1446
1447 let (mut write1, _read1) = client
1448 .expect_open::<String, String, u64, i64>(shard_id)
1449 .await;
1450
1451 let (mut write2, _read2) = client
1452 .expect_open::<String, String, u64, i64>(shard_id)
1453 .await;
1454
1455 write1
1456 .expect_append(&data[..], write1.upper().clone(), vec![3])
1457 .await;
1458
1459 assert_eq!(write2.fetch_recent_upper().await, &Antichain::from_elem(3));
1461
1462 assert_eq!(write2.upper(), &Antichain::from_elem(3));
1465 }
1466
1467 #[mz_persist_proc::test(tokio::test)]
1468 #[cfg_attr(miri, ignore)] async fn append_with_invalid_upper(dyncfgs: ConfigUpdates) {
1470 let data = [
1471 (("1".to_owned(), "one".to_owned()), 1, 1),
1472 (("2".to_owned(), "two".to_owned()), 2, 1),
1473 ];
1474
1475 let client = new_test_client(&dyncfgs).await;
1476
1477 let shard_id = ShardId::new();
1478
1479 let (mut write, _read) = client
1480 .expect_open::<String, String, u64, i64>(shard_id)
1481 .await;
1482
1483 write
1484 .expect_append(&data[..], write.upper().clone(), vec![3])
1485 .await;
1486
1487 let data = [
1488 (("5".to_owned(), "fünf".to_owned()), 5, 1),
1489 (("6".to_owned(), "sechs".to_owned()), 6, 1),
1490 ];
1491 let res = write
1492 .append(
1493 data.iter(),
1494 Antichain::from_elem(5),
1495 Antichain::from_elem(7),
1496 )
1497 .await;
1498 assert_eq!(
1499 res,
1500 Ok(Err(UpperMismatch {
1501 expected: Antichain::from_elem(5),
1502 current: Antichain::from_elem(3)
1503 }))
1504 );
1505
1506 assert_eq!(write.upper(), &Antichain::from_elem(3));
1508 }
1509
1510 #[allow(unused)]
1513 async fn sync_send(dyncfgs: ConfigUpdates) {
1514 mz_ore::test::init_logging();
1515
1516 fn is_send_sync<T: Send + Sync>(_x: T) -> bool {
1517 true
1518 }
1519
1520 let client = new_test_client(&dyncfgs).await;
1521
1522 let (write, read) = client
1523 .expect_open::<String, String, u64, i64>(ShardId::new())
1524 .await;
1525
1526 assert!(is_send_sync(client));
1527 assert!(is_send_sync(write));
1528 assert!(is_send_sync(read));
1529 }
1530
1531 #[mz_persist_proc::test(tokio::test)]
1532 #[cfg_attr(miri, ignore)] async fn compare_and_append(dyncfgs: ConfigUpdates) {
1534 let data = vec![
1535 (("1".to_owned(), "one".to_owned()), 1, 1),
1536 (("2".to_owned(), "two".to_owned()), 2, 1),
1537 (("3".to_owned(), "three".to_owned()), 3, 1),
1538 ];
1539
1540 let id = ShardId::new();
1541 let client = new_test_client(&dyncfgs).await;
1542 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1543
1544 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1545
1546 assert_eq!(write1.upper(), &Antichain::from_elem(u64::minimum()));
1547 assert_eq!(write2.upper(), &Antichain::from_elem(u64::minimum()));
1548 assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1549
1550 write1
1552 .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1553 .await;
1554 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1555
1556 assert_eq!(
1557 read.expect_snapshot_and_fetch(2).await,
1558 all_ok(&data[..2], 2)
1559 );
1560
1561 let res = write2
1563 .compare_and_append(
1564 &data[..2],
1565 Antichain::from_elem(u64::minimum()),
1566 Antichain::from_elem(3),
1567 )
1568 .await;
1569 assert_eq!(
1570 res,
1571 Ok(Err(UpperMismatch {
1572 expected: Antichain::from_elem(u64::minimum()),
1573 current: Antichain::from_elem(3)
1574 }))
1575 );
1576
1577 assert_eq!(write2.upper(), &Antichain::from_elem(3));
1579
1580 write2.expect_compare_and_append(&data[2..], 3, 4).await;
1582
1583 assert_eq!(write2.upper(), &Antichain::from_elem(4));
1584
1585 assert_eq!(read.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1586 }
1587
1588 #[mz_persist_proc::test(tokio::test)]
1589 #[cfg_attr(miri, ignore)] async fn overlapping_append(dyncfgs: ConfigUpdates) {
1591 mz_ore::test::init_logging_default("info");
1592
1593 let data = vec![
1594 (("1".to_owned(), "one".to_owned()), 1, 1),
1595 (("2".to_owned(), "two".to_owned()), 2, 1),
1596 (("3".to_owned(), "three".to_owned()), 3, 1),
1597 (("4".to_owned(), "vier".to_owned()), 4, 1),
1598 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1599 ];
1600
1601 let id = ShardId::new();
1602 let client = new_test_client(&dyncfgs).await;
1603
1604 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1605
1606 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1607
1608 let mut listen = read.clone("").await.expect_listen(0).await;
1610
1611 write1
1613 .expect_append(&data[..2], write1.upper().clone(), vec![3])
1614 .await;
1615 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1616
1617 write2
1619 .expect_append(&data[..4], write2.upper().clone(), vec![5])
1620 .await;
1621 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1622
1623 write1
1625 .expect_append(&data[2..5], write1.upper().clone(), vec![6])
1626 .await;
1627 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1628
1629 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1630
1631 assert_eq!(
1632 listen.read_until(&6).await,
1633 (all_ok(&data[..], 1), Antichain::from_elem(6))
1634 );
1635 }
1636
1637 #[mz_persist_proc::test(tokio::test)]
1640 #[cfg_attr(miri, ignore)] async fn contiguous_append(dyncfgs: ConfigUpdates) {
1642 let data = vec![
1643 (("1".to_owned(), "one".to_owned()), 1, 1),
1644 (("2".to_owned(), "two".to_owned()), 2, 1),
1645 (("3".to_owned(), "three".to_owned()), 3, 1),
1646 (("4".to_owned(), "vier".to_owned()), 4, 1),
1647 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1648 ];
1649
1650 let id = ShardId::new();
1651 let client = new_test_client(&dyncfgs).await;
1652
1653 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1654
1655 write
1657 .expect_append(&data[..2], write.upper().clone(), vec![3])
1658 .await;
1659 assert_eq!(write.upper(), &Antichain::from_elem(3));
1660
1661 let result = write
1664 .append(
1665 &data[4..5],
1666 Antichain::from_elem(5),
1667 Antichain::from_elem(6),
1668 )
1669 .await;
1670 assert_eq!(
1671 result,
1672 Ok(Err(UpperMismatch {
1673 expected: Antichain::from_elem(5),
1674 current: Antichain::from_elem(3)
1675 }))
1676 );
1677
1678 write.expect_append(&data[2..5], vec![3], vec![6]).await;
1680 assert_eq!(write.upper(), &Antichain::from_elem(6));
1681
1682 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1683 }
1684
1685 #[mz_persist_proc::test(tokio::test)]
1688 #[cfg_attr(miri, ignore)] async fn noncontiguous_append_per_writer(dyncfgs: ConfigUpdates) {
1690 let data = vec![
1691 (("1".to_owned(), "one".to_owned()), 1, 1),
1692 (("2".to_owned(), "two".to_owned()), 2, 1),
1693 (("3".to_owned(), "three".to_owned()), 3, 1),
1694 (("4".to_owned(), "vier".to_owned()), 4, 1),
1695 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1696 ];
1697
1698 let id = ShardId::new();
1699 let client = new_test_client(&dyncfgs).await;
1700
1701 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1702
1703 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1704
1705 write1
1707 .expect_append(&data[..2], write1.upper().clone(), vec![3])
1708 .await;
1709 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1710
1711 write2.upper = Antichain::from_elem(3);
1713 write2
1714 .expect_append(&data[2..4], write2.upper().clone(), vec![5])
1715 .await;
1716 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1717
1718 write1.upper = Antichain::from_elem(5);
1720 write1
1721 .expect_append(&data[4..5], write1.upper().clone(), vec![6])
1722 .await;
1723 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1724
1725 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1726 }
1727
1728 #[mz_persist_proc::test(tokio::test)]
1731 #[cfg_attr(miri, ignore)] async fn contiguous_compare_and_append(dyncfgs: ConfigUpdates) {
1733 let data = vec![
1734 (("1".to_owned(), "one".to_owned()), 1, 1),
1735 (("2".to_owned(), "two".to_owned()), 2, 1),
1736 (("3".to_owned(), "three".to_owned()), 3, 1),
1737 (("4".to_owned(), "vier".to_owned()), 4, 1),
1738 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1739 ];
1740
1741 let id = ShardId::new();
1742 let client = new_test_client(&dyncfgs).await;
1743
1744 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1745
1746 write.expect_compare_and_append(&data[..2], 0, 3).await;
1748 assert_eq!(write.upper(), &Antichain::from_elem(3));
1749
1750 let result = write
1753 .compare_and_append(
1754 &data[4..5],
1755 Antichain::from_elem(5),
1756 Antichain::from_elem(6),
1757 )
1758 .await;
1759 assert_eq!(
1760 result,
1761 Ok(Err(UpperMismatch {
1762 expected: Antichain::from_elem(5),
1763 current: Antichain::from_elem(3)
1764 }))
1765 );
1766
1767 write.expect_compare_and_append(&data[2..5], 3, 6).await;
1770 assert_eq!(write.upper(), &Antichain::from_elem(6));
1771
1772 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1773 }
1774
1775 #[mz_persist_proc::test(tokio::test)]
1778 #[cfg_attr(miri, ignore)] async fn noncontiguous_compare_and_append_per_writer(dyncfgs: ConfigUpdates) {
1780 let data = vec![
1781 (("1".to_owned(), "one".to_owned()), 1, 1),
1782 (("2".to_owned(), "two".to_owned()), 2, 1),
1783 (("3".to_owned(), "three".to_owned()), 3, 1),
1784 (("4".to_owned(), "vier".to_owned()), 4, 1),
1785 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1786 ];
1787
1788 let id = ShardId::new();
1789 let client = new_test_client(&dyncfgs).await;
1790
1791 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1792
1793 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1794
1795 write1.expect_compare_and_append(&data[..2], 0, 3).await;
1797 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1798
1799 write2.expect_compare_and_append(&data[2..4], 3, 5).await;
1801 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1802
1803 write1.expect_compare_and_append(&data[4..5], 5, 6).await;
1805 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1806
1807 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1808 }
1809
1810 #[mz_ore::test]
1811 fn fmt_ids() {
1812 assert_eq!(
1813 format!("{}", LeasedReaderId([0u8; 16])),
1814 "r00000000-0000-0000-0000-000000000000"
1815 );
1816 assert_eq!(
1817 format!("{:?}", LeasedReaderId([0u8; 16])),
1818 "LeasedReaderId(00000000-0000-0000-0000-000000000000)"
1819 );
1820 }
1821
1822 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
1823 #[cfg_attr(miri, ignore)] async fn concurrency(dyncfgs: ConfigUpdates) {
1825 let data = DataGenerator::small();
1826
1827 const NUM_WRITERS: usize = 2;
1828 let id = ShardId::new();
1829 let client = new_test_client(&dyncfgs).await;
1830 let mut handles = Vec::<mz_ore::task::JoinHandle<()>>::new();
1831 for idx in 0..NUM_WRITERS {
1832 let (data, client) = (data.clone(), client.clone());
1833
1834 let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel(1);
1835
1836 let client1 = client.clone();
1837 let handle = mz_ore::task::spawn(|| format!("writer-{}", idx), async move {
1838 let (write, _) = client1.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1839 let mut current_upper = 0;
1840 for batch in data.batches() {
1841 let new_upper = match batch.get(batch.len() - 1) {
1842 Some((_, max_ts, _)) => u64::decode(max_ts) + 1,
1843 None => continue,
1844 };
1845 if PartialOrder::less_equal(&Antichain::from_elem(new_upper), write.upper()) {
1860 continue;
1861 }
1862
1863 let current_upper_chain = Antichain::from_elem(current_upper);
1864 current_upper = new_upper;
1865 let new_upper_chain = Antichain::from_elem(new_upper);
1866 let mut builder = write.builder(current_upper_chain);
1867
1868 for ((k, v), t, d) in batch.iter() {
1869 builder
1870 .add(&k.to_vec(), &v.to_vec(), &u64::decode(t), &i64::decode(d))
1871 .await
1872 .expect("invalid usage");
1873 }
1874
1875 let batch = builder
1876 .finish(new_upper_chain)
1877 .await
1878 .expect("invalid usage");
1879
1880 match batch_tx.send(batch).await {
1881 Ok(_) => (),
1882 Err(e) => panic!("send error: {}", e),
1883 }
1884 }
1885 });
1886 handles.push(handle);
1887
1888 let handle = mz_ore::task::spawn(|| format!("appender-{}", idx), async move {
1889 let (mut write, _) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1890
1891 while let Some(batch) = batch_rx.recv().await {
1892 let lower = batch.lower().clone();
1893 let upper = batch.upper().clone();
1894 write
1895 .append_batch(batch, lower, upper)
1896 .await
1897 .expect("invalid usage")
1898 .expect("unexpected upper");
1899 }
1900 });
1901 handles.push(handle);
1902 }
1903
1904 for handle in handles {
1905 let () = handle.await;
1906 }
1907
1908 let expected = data.records().collect::<Vec<_>>();
1909 let max_ts = expected.last().map(|(_, t, _)| *t).unwrap_or_default();
1910 let (_, mut read) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1911 assert_eq!(
1912 read.expect_snapshot_and_fetch(max_ts).await,
1913 all_ok(expected.iter(), max_ts)
1914 );
1915 }
1916
1917 #[mz_persist_proc::test(tokio::test)]
1921 #[cfg_attr(miri, ignore)] async fn regression_blocking_reads(dyncfgs: ConfigUpdates) {
1923 let waker = noop_waker();
1924 let mut cx = Context::from_waker(&waker);
1925
1926 let data = [
1927 (("1".to_owned(), "one".to_owned()), 1, 1),
1928 (("2".to_owned(), "two".to_owned()), 2, 1),
1929 (("3".to_owned(), "three".to_owned()), 3, 1),
1930 ];
1931
1932 let id = ShardId::new();
1933 let client = new_test_client(&dyncfgs).await;
1934 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1935
1936 let mut listen = read.clone("").await.expect_listen(1).await;
1938 let mut listen_next = Box::pin(listen.fetch_next());
1939 for _ in 0..100 {
1943 assert!(
1944 Pin::new(&mut listen_next).poll(&mut cx).is_pending(),
1945 "listen::next unexpectedly ready"
1946 );
1947 }
1948
1949 write
1951 .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1952 .await;
1953
1954 assert_eq!(
1957 listen_next.await,
1958 vec![
1959 ListenEvent::Updates(vec![(("2".to_owned(), "two".to_owned()), 2, 1)]),
1960 ListenEvent::Progress(Antichain::from_elem(3)),
1961 ]
1962 );
1963
1964 let mut snap = Box::pin(read.expect_snapshot_and_fetch(3));
1978 for _ in 0..100 {
1979 assert!(
1980 Pin::new(&mut snap).poll(&mut cx).is_pending(),
1981 "snapshot unexpectedly ready"
1982 );
1983 }
1984
1985 write.expect_compare_and_append(&data[2..], 3, 4).await;
1987
1988 assert_eq!(snap.await, all_ok(&data[..], 3));
1990 }
1991
1992 #[mz_persist_proc::test(tokio::test)]
1993 #[cfg_attr(miri, ignore)] async fn heartbeat_task_shutdown(dyncfgs: ConfigUpdates) {
1995 let mut cache = new_test_client_cache(&dyncfgs);
1998 cache
1999 .cfg
2000 .set_config(&READER_LEASE_DURATION, Duration::from_millis(1));
2001 cache.cfg.writer_lease_duration = Duration::from_millis(1);
2002 let (_write, mut read) = cache
2003 .open(PersistLocation::new_in_mem())
2004 .await
2005 .expect("client construction failed")
2006 .expect_open::<(), (), u64, i64>(ShardId::new())
2007 .await;
2008 let read_unexpired_state = read
2009 .unexpired_state
2010 .take()
2011 .expect("handle should have unexpired state");
2012 read.expire().await;
2013 read_unexpired_state.heartbeat_task.await
2014 }
2015
2016 #[mz_persist_proc::test(tokio::test)]
2019 #[cfg_attr(miri, ignore)] async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
2021 let persist_client = new_test_client(&dyncfgs).await;
2022
2023 let shard_id = ShardId::new();
2024 pub const CRITICAL_SINCE: CriticalReaderId =
2025 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2026
2027 let (mut write, mut read) = persist_client
2028 .expect_open::<(), (), u64, i64>(shard_id)
2029 .await;
2030
2031 let () = read.downgrade_since(&Antichain::new()).await;
2034 let () = write.advance_upper(&Antichain::new()).await;
2035
2036 let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
2037 .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
2038 .await
2039 .expect("invalid persist usage");
2040
2041 let epoch = since_handle.opaque().clone();
2042 let new_since = Antichain::new();
2043 let downgrade = since_handle
2044 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2045 .await;
2046
2047 assert!(
2048 downgrade.is_ok(),
2049 "downgrade of critical handle must succeed"
2050 );
2051
2052 let finalize = persist_client
2053 .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2054 .await;
2055
2056 assert_ok!(finalize, "finalization must succeed");
2057
2058 let is_finalized = persist_client
2059 .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2060 .await
2061 .expect("invalid persist usage");
2062 assert!(is_finalized, "shard must still be finalized");
2063 }
2064
2065 #[mz_persist_proc::test(tokio::test)]
2069 #[cfg_attr(miri, ignore)] async fn finalize_shard(dyncfgs: ConfigUpdates) {
2071 const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
2072 let persist_client = new_test_client(&dyncfgs).await;
2073
2074 let shard_id = ShardId::new();
2075 pub const CRITICAL_SINCE: CriticalReaderId =
2076 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2077
2078 let (mut write, mut read) = persist_client
2079 .expect_open::<(), (), u64, i64>(shard_id)
2080 .await;
2081
2082 let () = write
2084 .compare_and_append(DATA, Antichain::from_elem(0), Antichain::from_elem(1))
2085 .await
2086 .expect("usage should be valid")
2087 .expect("upper should match");
2088
2089 let () = read.downgrade_since(&Antichain::new()).await;
2092 let () = write.advance_upper(&Antichain::new()).await;
2093
2094 let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
2095 .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
2096 .await
2097 .expect("invalid persist usage");
2098
2099 let epoch = since_handle.opaque().clone();
2100 let new_since = Antichain::new();
2101 let downgrade = since_handle
2102 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2103 .await;
2104
2105 assert!(
2106 downgrade.is_ok(),
2107 "downgrade of critical handle must succeed"
2108 );
2109
2110 let finalize = persist_client
2111 .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2112 .await;
2113
2114 assert_ok!(finalize, "finalization must succeed");
2115
2116 let is_finalized = persist_client
2117 .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2118 .await
2119 .expect("invalid persist usage");
2120 assert!(is_finalized, "shard must still be finalized");
2121 }
2122
2123 proptest! {
2124 #![proptest_config(ProptestConfig::with_cases(4096))]
2125
2126 #[mz_ore::test]
2127 #[cfg_attr(miri, ignore)] fn shard_id_protobuf_roundtrip(expect in any::<ShardId>() ) {
2129 let actual = protobuf_roundtrip::<_, String>(&expect);
2130 assert_ok!(actual);
2131 assert_eq!(actual.unwrap(), expect);
2132 }
2133 }
2134}