1#![warn(missing_docs, missing_debug_implementations)]
13#![allow(ungated_async_fn_track_caller)]
17
18use std::fmt::Debug;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use differential_dataflow::difference::Monoid;
23use differential_dataflow::lattice::Lattice;
24use itertools::Itertools;
25use mz_build_info::{BuildInfo, build_info};
26use mz_dyncfg::ConfigSet;
27use mz_ore::instrument;
28use mz_persist::location::{Blob, Consensus, ExternalError};
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::{Codec, Codec64};
31use mz_proto::{IntoRustIfSome, ProtoType};
32use semver::Version;
33use timely::order::TotalOrder;
34use timely::progress::{Antichain, Timestamp};
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::batch::{BATCH_DELETE_ENABLED, Batch, BatchBuilder, ProtoBatch};
38use crate::cache::{PersistClientCache, StateCache};
39use crate::cfg::PersistConfig;
40use crate::critical::{CriticalReaderId, Opaque, SinceHandle};
41use crate::error::InvalidUsage;
42use crate::fetch::{BatchFetcher, BatchFetcherConfig};
43use crate::internal::compact::{CompactConfig, Compactor};
44use crate::internal::encoding::parse_id;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::machine::{Machine, retry_external};
47use crate::internal::state_versions::StateVersions;
48use crate::metrics::Metrics;
49use crate::read::{
50 Cursor, LazyPartStats, LeasedReaderId, READER_LEASE_DURATION, ReadHandle, Since,
51};
52use crate::rpc::PubSubSender;
53use crate::schema::CaESchema;
54use crate::write::{WriteHandle, WriterId};
55
56pub mod async_runtime;
57pub mod batch;
58pub mod cache;
59pub mod cfg;
60pub mod cli {
61 pub mod admin;
63 pub mod args;
64 pub mod bench;
65 pub mod inspect;
66}
67pub mod critical;
68pub mod error;
69pub mod fetch;
70pub mod internals_bench;
71pub mod iter;
72pub mod metrics {
73 pub use crate::internal::metrics::{
75 Metrics, SinkMetrics, SinkWorkerMetrics, UpdateDelta, encode_ts_metric,
76 };
77}
78pub mod operators {
79 use mz_dyncfg::Config;
82
83 pub mod shard_source;
84
85 pub(crate) const STORAGE_SOURCE_DECODE_FUEL: Config<usize> = Config::new(
87 "storage_source_decode_fuel",
88 100_000,
89 "\
90 The maximum amount of work to do in the persist_source mfp_and_decode \
91 operator before yielding.",
92 );
93}
94pub mod read;
95pub mod rpc;
96pub mod schema;
97pub mod stats;
98pub mod usage;
99pub mod write;
100
101mod internal {
103 pub mod apply;
104 pub mod cache;
105 pub mod compact;
106 pub mod encoding;
107 pub mod gc;
108 pub mod machine;
109 pub mod maintenance;
110 pub mod merge;
111 pub mod metrics;
112 pub mod paths;
113 pub mod restore;
114 pub mod service;
115 pub mod state;
116 pub mod state_diff;
117 pub mod state_versions;
118 pub mod trace;
119 pub mod watch;
120
121 #[cfg(test)]
122 pub mod datadriven;
123}
124
125pub const BUILD_INFO: BuildInfo = build_info!();
127
128pub use mz_persist_types::{PersistLocation, ShardId};
130
131pub use crate::internal::encoding::Schemas;
132
133#[derive(Clone, Debug)]
136pub struct Diagnostics {
137 pub shard_name: String,
139 pub handle_purpose: String,
141}
142
143impl Diagnostics {
144 pub fn from_purpose(handle_purpose: &str) -> Self {
146 Self {
147 shard_name: "unknown".to_string(),
148 handle_purpose: handle_purpose.to_string(),
149 }
150 }
151
152 pub fn for_tests() -> Self {
154 Self {
155 shard_name: "test-shard-name".to_string(),
156 handle_purpose: "test-purpose".to_string(),
157 }
158 }
159}
160
161#[derive(Debug, Clone)]
182pub struct PersistClient {
183 cfg: PersistConfig,
184 blob: Arc<dyn Blob>,
185 consensus: Arc<dyn Consensus>,
186 metrics: Arc<Metrics>,
187 isolated_runtime: Arc<IsolatedRuntime>,
188 shared_states: Arc<StateCache>,
189 pubsub_sender: Arc<dyn PubSubSender>,
190}
191
192impl PersistClient {
193 pub fn new(
199 cfg: PersistConfig,
200 blob: Arc<dyn Blob>,
201 consensus: Arc<dyn Consensus>,
202 metrics: Arc<Metrics>,
203 isolated_runtime: Arc<IsolatedRuntime>,
204 shared_states: Arc<StateCache>,
205 pubsub_sender: Arc<dyn PubSubSender>,
206 ) -> Result<Self, ExternalError> {
207 Ok(PersistClient {
210 cfg,
211 blob,
212 consensus,
213 metrics,
214 isolated_runtime,
215 shared_states,
216 pubsub_sender,
217 })
218 }
219
220 pub async fn new_for_tests() -> Self {
222 let cache = PersistClientCache::new_no_metrics();
223 cache
224 .open(PersistLocation::new_in_mem())
225 .await
226 .expect("in-mem location is valid")
227 }
228
229 pub fn dyncfgs(&self) -> &ConfigSet {
231 &self.cfg.configs
232 }
233
234 async fn make_machine<K, V, T, D>(
235 &self,
236 shard_id: ShardId,
237 diagnostics: Diagnostics,
238 ) -> Result<Machine<K, V, T, D>, InvalidUsage<T>>
239 where
240 K: Debug + Codec,
241 V: Debug + Codec,
242 T: Timestamp + Lattice + Codec64 + Sync,
243 D: Monoid + Codec64 + Send + Sync,
244 {
245 let state_versions = StateVersions::new(
246 self.cfg.clone(),
247 Arc::clone(&self.consensus),
248 Arc::clone(&self.blob),
249 Arc::clone(&self.metrics),
250 );
251 let machine = Machine::<K, V, T, D>::new(
252 self.cfg.clone(),
253 shard_id,
254 Arc::clone(&self.metrics),
255 Arc::new(state_versions),
256 Arc::clone(&self.shared_states),
257 Arc::clone(&self.pubsub_sender),
258 Arc::clone(&self.isolated_runtime),
259 diagnostics.clone(),
260 )
261 .await?;
262 Ok(machine)
263 }
264
265 #[instrument(level = "debug", fields(shard = %shard_id))]
283 pub async fn open<K, V, T, D>(
284 &self,
285 shard_id: ShardId,
286 key_schema: Arc<K::Schema>,
287 val_schema: Arc<V::Schema>,
288 diagnostics: Diagnostics,
289 use_critical_since: bool,
290 ) -> Result<(WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>), InvalidUsage<T>>
291 where
292 K: Debug + Codec,
293 V: Debug + Codec,
294 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
295 D: Monoid + Ord + Codec64 + Send + Sync,
296 {
297 Ok((
298 self.open_writer(
299 shard_id,
300 Arc::clone(&key_schema),
301 Arc::clone(&val_schema),
302 diagnostics.clone(),
303 )
304 .await?,
305 self.open_leased_reader(
306 shard_id,
307 key_schema,
308 val_schema,
309 diagnostics,
310 use_critical_since,
311 )
312 .await?,
313 ))
314 }
315
316 #[instrument(level = "debug", fields(shard = %shard_id))]
325 pub async fn open_leased_reader<K, V, T, D>(
326 &self,
327 shard_id: ShardId,
328 key_schema: Arc<K::Schema>,
329 val_schema: Arc<V::Schema>,
330 diagnostics: Diagnostics,
331 use_critical_since: bool,
332 ) -> Result<ReadHandle<K, V, T, D>, InvalidUsage<T>>
333 where
334 K: Debug + Codec,
335 V: Debug + Codec,
336 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
337 D: Monoid + Codec64 + Send + Sync,
338 {
339 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
340 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
341
342 let reader_id = LeasedReaderId::new();
343 let heartbeat_ts = (self.cfg.now)();
344 let (reader_state, maintenance) = machine
345 .register_leased_reader(
346 &reader_id,
347 &diagnostics.handle_purpose,
348 READER_LEASE_DURATION.get(&self.cfg),
349 heartbeat_ts,
350 use_critical_since,
351 )
352 .await;
353 maintenance.start_performing(&machine, &gc);
354 let schemas = Schemas {
355 id: None,
356 key: key_schema,
357 val: val_schema,
358 };
359 let reader = ReadHandle::new(
360 self.cfg.clone(),
361 Arc::clone(&self.metrics),
362 machine,
363 gc,
364 Arc::clone(&self.blob),
365 reader_id,
366 schemas,
367 reader_state,
368 )
369 .await;
370
371 Ok(reader)
372 }
373
374 #[instrument(level = "debug", fields(shard = %shard_id))]
376 pub async fn create_batch_fetcher<K, V, T, D>(
377 &self,
378 shard_id: ShardId,
379 key_schema: Arc<K::Schema>,
380 val_schema: Arc<V::Schema>,
381 is_transient: bool,
382 diagnostics: Diagnostics,
383 ) -> Result<BatchFetcher<K, V, T, D>, InvalidUsage<T>>
384 where
385 K: Debug + Codec,
386 V: Debug + Codec,
387 T: Timestamp + Lattice + Codec64 + Sync,
388 D: Monoid + Codec64 + Send + Sync,
389 {
390 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
391 let read_schemas = Schemas {
392 id: None,
393 key: key_schema,
394 val: val_schema,
395 };
396 let schema_cache = machine.applier.schema_cache();
397 let fetcher = BatchFetcher {
398 cfg: BatchFetcherConfig::new(&self.cfg),
399 blob: Arc::clone(&self.blob),
400 metrics: Arc::clone(&self.metrics),
401 shard_metrics: Arc::clone(&machine.applier.shard_metrics),
402 shard_id,
403 read_schemas,
404 schema_cache,
405 is_transient,
406 _phantom: PhantomData,
407 };
408
409 Ok(fetcher)
410 }
411
412 pub const CONTROLLER_CRITICAL_SINCE: CriticalReaderId =
434 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
435
436 #[instrument(level = "debug", fields(shard = %shard_id))]
457 pub async fn open_critical_since<K, V, T, D>(
458 &self,
459 shard_id: ShardId,
460 reader_id: CriticalReaderId,
461 default_opaque: Opaque,
462 diagnostics: Diagnostics,
463 ) -> Result<SinceHandle<K, V, T, D>, InvalidUsage<T>>
464 where
465 K: Debug + Codec,
466 V: Debug + Codec,
467 T: Timestamp + Lattice + Codec64 + Sync,
468 D: Monoid + Codec64 + Send + Sync,
469 {
470 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
471 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
472
473 let (state, maintenance) = machine
474 .register_critical_reader(&reader_id, default_opaque, &diagnostics.handle_purpose)
475 .await;
476 maintenance.start_performing(&machine, &gc);
477 let handle = SinceHandle::new(machine, gc, reader_id, state.since, state.opaque);
478
479 Ok(handle)
480 }
481
482 #[instrument(level = "debug", fields(shard = %shard_id))]
487 pub async fn open_writer<K, V, T, D>(
488 &self,
489 shard_id: ShardId,
490 key_schema: Arc<K::Schema>,
491 val_schema: Arc<V::Schema>,
492 diagnostics: Diagnostics,
493 ) -> Result<WriteHandle<K, V, T, D>, InvalidUsage<T>>
494 where
495 K: Debug + Codec,
496 V: Debug + Codec,
497 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
498 D: Monoid + Ord + Codec64 + Send + Sync,
499 {
500 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
501 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
502
503 let schema_id = machine.find_schema(&*key_schema, &*val_schema);
508
509 let writer_id = WriterId::new();
510 let schemas = Schemas {
511 id: schema_id,
512 key: key_schema,
513 val: val_schema,
514 };
515 let writer = WriteHandle::new(
516 self.cfg.clone(),
517 Arc::clone(&self.metrics),
518 machine,
519 gc,
520 Arc::clone(&self.blob),
521 writer_id,
522 &diagnostics.handle_purpose,
523 schemas,
524 );
525 Ok(writer)
526 }
527
528 #[instrument(level = "debug", fields(shard = %shard_id))]
538 pub async fn batch_builder<K, V, T, D>(
539 &self,
540 shard_id: ShardId,
541 write_schemas: Schemas<K, V>,
542 lower: Antichain<T>,
543 max_runs: Option<usize>,
544 ) -> BatchBuilder<K, V, T, D>
545 where
546 K: Debug + Codec,
547 V: Debug + Codec,
548 T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
549 D: Monoid + Ord + Codec64 + Send + Sync,
550 {
551 let mut compact_cfg = CompactConfig::new(&self.cfg, shard_id);
552 compact_cfg.batch.max_runs = max_runs;
553 WriteHandle::builder_inner(
554 &self.cfg,
555 compact_cfg,
556 Arc::clone(&self.metrics),
557 self.metrics.shards.shard(&shard_id, "peek_stash"),
558 &self.metrics.user,
559 Arc::clone(&self.isolated_runtime),
560 Arc::clone(&self.blob),
561 shard_id,
562 write_schemas,
563 lower,
564 )
565 }
566
567 pub fn batch_from_transmittable_batch<K, V, T, D>(
576 &self,
577 shard_id: &ShardId,
578 batch: ProtoBatch,
579 ) -> Batch<K, V, T, D>
580 where
581 K: Debug + Codec,
582 V: Debug + Codec,
583 T: Timestamp + Lattice + Codec64 + Sync,
584 D: Monoid + Ord + Codec64 + Send + Sync,
585 {
586 let batch_shard_id: ShardId = batch
587 .shard_id
588 .into_rust()
589 .expect("valid transmittable batch");
590 assert_eq!(&batch_shard_id, shard_id);
591
592 let shard_metrics = self.metrics.shards.shard(shard_id, "peek_stash");
593
594 let ret = Batch {
595 batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
596 metrics: Arc::clone(&self.metrics),
597 shard_metrics,
598 version: Version::parse(&batch.version).expect("valid transmittable batch"),
599 schemas: (batch.key_schema, batch.val_schema),
600 batch: batch
601 .batch
602 .into_rust_if_some("ProtoBatch::batch")
603 .expect("valid transmittable batch"),
604 blob: Arc::clone(&self.blob),
605 _phantom: std::marker::PhantomData,
606 };
607
608 assert_eq!(&ret.shard_id(), shard_id);
609 ret
610 }
611
612 #[allow(clippy::unused_async)]
627 pub async fn read_batches_consolidated<K, V, T, D>(
628 &mut self,
629 shard_id: ShardId,
630 as_of: Antichain<T>,
631 read_schemas: Schemas<K, V>,
632 batches: Vec<Batch<K, V, T, D>>,
633 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
634 memory_budget_bytes: usize,
635 ) -> Result<Cursor<K, V, T, D, Vec<Batch<K, V, T, D>>>, Since<T>>
636 where
637 K: Debug + Codec + Ord,
638 V: Debug + Codec + Ord,
639 T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
640 D: Monoid + Ord + Codec64 + Send + Sync,
641 {
642 let shard_metrics = self.metrics.shards.shard(&shard_id, "peek_stash");
643
644 let hollow_batches = batches.iter().map(|b| b.batch.clone()).collect_vec();
645
646 ReadHandle::read_batches_consolidated(
647 &self.cfg,
648 Arc::clone(&self.metrics),
649 shard_metrics,
650 self.metrics.read.snapshot.clone(),
651 Arc::clone(&self.blob),
652 shard_id,
653 as_of,
654 read_schemas,
655 &hollow_batches,
656 batches,
657 should_fetch_part,
658 memory_budget_bytes,
659 )
660 }
661
662 pub async fn get_schema<K, V, T, D>(
664 &self,
665 shard_id: ShardId,
666 schema_id: SchemaId,
667 diagnostics: Diagnostics,
668 ) -> Result<Option<(K::Schema, V::Schema)>, InvalidUsage<T>>
669 where
670 K: Debug + Codec,
671 V: Debug + Codec,
672 T: Timestamp + Lattice + Codec64 + Sync,
673 D: Monoid + Codec64 + Send + Sync,
674 {
675 let machine = self
676 .make_machine::<K, V, T, D>(shard_id, diagnostics)
677 .await?;
678 Ok(machine.get_schema(schema_id))
679 }
680
681 pub async fn latest_schema<K, V, T, D>(
683 &self,
684 shard_id: ShardId,
685 diagnostics: Diagnostics,
686 ) -> Result<Option<(SchemaId, K::Schema, V::Schema)>, InvalidUsage<T>>
687 where
688 K: Debug + Codec,
689 V: Debug + Codec,
690 T: Timestamp + Lattice + Codec64 + Sync,
691 D: Monoid + Codec64 + Send + Sync,
692 {
693 let machine = self
694 .make_machine::<K, V, T, D>(shard_id, diagnostics)
695 .await?;
696 Ok(machine.latest_schema())
697 }
698
699 pub async fn register_schema<K, V, T, D>(
711 &self,
712 shard_id: ShardId,
713 key_schema: &K::Schema,
714 val_schema: &V::Schema,
715 diagnostics: Diagnostics,
716 ) -> Result<Option<SchemaId>, InvalidUsage<T>>
717 where
718 K: Debug + Codec,
719 V: Debug + Codec,
720 T: Timestamp + Lattice + Codec64 + Sync,
721 D: Monoid + Codec64 + Send + Sync,
722 {
723 let machine = self
724 .make_machine::<K, V, T, D>(shard_id, diagnostics)
725 .await?;
726 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
727
728 let (schema_id, maintenance) = machine.register_schema(key_schema, val_schema).await;
729 maintenance.start_performing(&machine, &gc);
730
731 Ok(schema_id)
732 }
733
734 pub async fn compare_and_evolve_schema<K, V, T, D>(
745 &self,
746 shard_id: ShardId,
747 expected: SchemaId,
748 key_schema: &K::Schema,
749 val_schema: &V::Schema,
750 diagnostics: Diagnostics,
751 ) -> Result<CaESchema<K, V>, InvalidUsage<T>>
752 where
753 K: Debug + Codec,
754 V: Debug + Codec,
755 T: Timestamp + Lattice + Codec64 + Sync,
756 D: Monoid + Codec64 + Send + Sync,
757 {
758 let machine = self
759 .make_machine::<K, V, T, D>(shard_id, diagnostics)
760 .await?;
761 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
762 let (res, maintenance) = machine
763 .compare_and_evolve_schema(expected, key_schema, val_schema)
764 .await;
765 maintenance.start_performing(&machine, &gc);
766 Ok(res)
767 }
768
769 pub async fn is_finalized<K, V, T, D>(
773 &self,
774 shard_id: ShardId,
775 diagnostics: Diagnostics,
776 ) -> Result<bool, InvalidUsage<T>>
777 where
778 K: Debug + Codec,
779 V: Debug + Codec,
780 T: Timestamp + Lattice + Codec64 + Sync,
781 D: Monoid + Codec64 + Send + Sync,
782 {
783 let machine = self
784 .make_machine::<K, V, T, D>(shard_id, diagnostics)
785 .await?;
786 Ok(machine.is_finalized())
787 }
788
789 #[instrument(level = "debug", fields(shard = %shard_id))]
800 pub async fn finalize_shard<K, V, T, D>(
801 &self,
802 shard_id: ShardId,
803 diagnostics: Diagnostics,
804 ) -> Result<(), InvalidUsage<T>>
805 where
806 K: Debug + Codec,
807 V: Debug + Codec,
808 T: Timestamp + Lattice + Codec64 + Sync,
809 D: Monoid + Codec64 + Send + Sync,
810 {
811 let machine = self
812 .make_machine::<K, V, T, D>(shard_id, diagnostics)
813 .await?;
814
815 let maintenance = machine.become_tombstone().await?;
816 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
817
818 let () = maintenance.perform(&machine, &gc).await;
819
820 Ok(())
821 }
822
823 pub async fn upgrade_version<K, V, T, D>(
826 &self,
827 shard_id: ShardId,
828 diagnostics: Diagnostics,
829 ) -> Result<(), InvalidUsage<T>>
830 where
831 K: Debug + Codec,
832 V: Debug + Codec,
833 T: Timestamp + Lattice + Codec64 + Sync,
834 D: Monoid + Codec64 + Send + Sync,
835 {
836 let machine = self
837 .make_machine::<K, V, T, D>(shard_id, diagnostics)
838 .await?;
839
840 match machine.upgrade_version().await {
841 Ok(maintenance) => {
842 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
843 let () = maintenance.perform(&machine, &gc).await;
844 Ok(())
845 }
846 Err(version) => Err(InvalidUsage::IncompatibleVersion { version }),
847 }
848 }
849
850 pub async fn inspect_shard<T: Timestamp + Lattice + Codec64>(
856 &self,
857 shard_id: &ShardId,
858 ) -> Result<impl serde::Serialize, anyhow::Error> {
859 let state_versions = StateVersions::new(
860 self.cfg.clone(),
861 Arc::clone(&self.consensus),
862 Arc::clone(&self.blob),
863 Arc::clone(&self.metrics),
864 );
865 let versions = state_versions.fetch_all_live_diffs(shard_id).await;
869 if versions.is_empty() {
870 return Err(anyhow::anyhow!("{} does not exist", shard_id));
871 }
872 let state = state_versions
873 .fetch_current_state::<T>(shard_id, versions)
874 .await;
875 let state = state.check_ts_codec(shard_id)?;
876 Ok(state)
877 }
878
879 #[cfg(test)]
881 #[track_caller]
882 pub async fn expect_open<K, V, T, D>(
883 &self,
884 shard_id: ShardId,
885 ) -> (WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>)
886 where
887 K: Debug + Codec,
888 V: Debug + Codec,
889 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
890 D: Monoid + Ord + Codec64 + Send + Sync,
891 K::Schema: Default,
892 V::Schema: Default,
893 {
894 self.open(
895 shard_id,
896 Arc::new(K::Schema::default()),
897 Arc::new(V::Schema::default()),
898 Diagnostics::for_tests(),
899 true,
900 )
901 .await
902 .expect("codec mismatch")
903 }
904
905 pub fn metrics(&self) -> &Arc<Metrics> {
909 &self.metrics
910 }
911}
912
913#[cfg(test)]
914mod tests {
915 use std::future::Future;
916 use std::pin::Pin;
917 use std::task::Context;
918 use std::time::Duration;
919
920 use differential_dataflow::consolidation::consolidate_updates;
921 use differential_dataflow::lattice::Lattice;
922 use futures_task::noop_waker;
923 use mz_dyncfg::ConfigUpdates;
924 use mz_ore::assert_ok;
925 use mz_persist::indexed::encoding::BlobTraceBatchPart;
926 use mz_persist::workload::DataGenerator;
927 use mz_persist_types::codec_impls::{StringSchema, VecU8Schema};
928 use mz_proto::protobuf_roundtrip;
929 use proptest::prelude::*;
930 use timely::order::PartialOrder;
931 use timely::progress::Antichain;
932
933 use crate::batch::BLOB_TARGET_SIZE;
934 use crate::cache::PersistClientCache;
935 use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
936 use crate::critical::Opaque;
937 use crate::error::{CodecConcreteType, CodecMismatch, UpperMismatch};
938 use crate::internal::paths::BlobKey;
939 use crate::read::ListenEvent;
940
941 use super::*;
942
943 pub fn new_test_client_cache(dyncfgs: &ConfigUpdates) -> PersistClientCache {
944 let mut cache = PersistClientCache::new_no_metrics();
947 cache.cfg.set_config(&BLOB_TARGET_SIZE, 10);
948 cache
949 .cfg
950 .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 1);
951 dyncfgs.apply(cache.cfg());
952
953 cache.cfg.compaction_enabled = true;
955 cache
956 }
957
958 pub async fn new_test_client(dyncfgs: &ConfigUpdates) -> PersistClient {
959 let cache = new_test_client_cache(dyncfgs);
960 cache
961 .open(PersistLocation::new_in_mem())
962 .await
963 .expect("client construction failed")
964 }
965
966 pub fn all_ok<'a, K, V, T, D, I>(iter: I, as_of: T) -> Vec<((K, V), T, D)>
967 where
968 K: Ord + Clone + 'a,
969 V: Ord + Clone + 'a,
970 T: Timestamp + Lattice + Clone + 'a,
971 D: Monoid + Clone + 'a,
972 I: IntoIterator<Item = &'a ((K, V), T, D)>,
973 {
974 let as_of = Antichain::from_elem(as_of);
975 let mut ret = iter
976 .into_iter()
977 .map(|((k, v), t, d)| {
978 let mut t = t.clone();
979 t.advance_by(as_of.borrow());
980 ((k.clone(), v.clone()), t, d.clone())
981 })
982 .collect();
983 consolidate_updates(&mut ret);
984 ret
985 }
986
987 pub async fn expect_fetch_part<K, V, T, D>(
988 blob: &dyn Blob,
989 key: &BlobKey,
990 metrics: &Metrics,
991 read_schemas: &Schemas<K, V>,
992 ) -> (BlobTraceBatchPart<T>, Vec<((K, V), T, D)>)
993 where
994 K: Codec + Clone,
995 V: Codec + Clone,
996 T: Timestamp + Codec64,
997 D: Codec64,
998 {
999 let value = blob
1000 .get(key)
1001 .await
1002 .expect("failed to fetch part")
1003 .expect("missing part");
1004 let mut part =
1005 BlobTraceBatchPart::decode(&value, &metrics.columnar).expect("failed to decode part");
1006 let structured = part
1007 .updates
1008 .into_part::<K, V>(&*read_schemas.key, &*read_schemas.val);
1009 let updates = structured
1010 .decode_iter::<K, V, T, D>(&*read_schemas.key, &*read_schemas.val)
1011 .expect("structured data")
1012 .collect();
1013 (part, updates)
1014 }
1015
1016 #[mz_persist_proc::test(tokio::test)]
1017 #[cfg_attr(miri, ignore)] async fn sanity_check(dyncfgs: ConfigUpdates) {
1019 let data = [
1020 (("1".to_owned(), "one".to_owned()), 1, 1),
1021 (("2".to_owned(), "two".to_owned()), 2, 1),
1022 (("3".to_owned(), "three".to_owned()), 3, 1),
1023 ];
1024
1025 let (mut write, mut read) = new_test_client(&dyncfgs)
1026 .await
1027 .expect_open::<String, String, u64, i64>(ShardId::new())
1028 .await;
1029 assert_eq!(write.upper(), &Antichain::from_elem(u64::minimum()));
1030 assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1031
1032 write
1034 .expect_append(&data[..2], write.upper().clone(), vec![3])
1035 .await;
1036 assert_eq!(write.upper(), &Antichain::from_elem(3));
1037
1038 assert_eq!(
1040 read.expect_snapshot_and_fetch(1).await,
1041 all_ok(&data[..1], 1)
1042 );
1043
1044 let mut listen = read.clone("").await.expect_listen(1).await;
1045
1046 write
1048 .expect_append(&data[2..], write.upper().clone(), vec![4])
1049 .await;
1050 assert_eq!(write.upper(), &Antichain::from_elem(4));
1051
1052 assert_eq!(
1054 listen.read_until(&4).await,
1055 (all_ok(&data[1..], 1), Antichain::from_elem(4))
1056 );
1057
1058 read.downgrade_since(&Antichain::from_elem(2)).await;
1060 assert_eq!(read.since(), &Antichain::from_elem(2));
1061 }
1062
1063 #[mz_persist_proc::test(tokio::test)]
1065 #[cfg_attr(miri, ignore)] async fn open_reader_writer(dyncfgs: ConfigUpdates) {
1067 let data = vec![
1068 (("1".to_owned(), "one".to_owned()), 1, 1),
1069 (("2".to_owned(), "two".to_owned()), 2, 1),
1070 (("3".to_owned(), "three".to_owned()), 3, 1),
1071 ];
1072
1073 let shard_id = ShardId::new();
1074 let client = new_test_client(&dyncfgs).await;
1075 let mut write1 = client
1076 .open_writer::<String, String, u64, i64>(
1077 shard_id,
1078 Arc::new(StringSchema),
1079 Arc::new(StringSchema),
1080 Diagnostics::for_tests(),
1081 )
1082 .await
1083 .expect("codec mismatch");
1084 let mut read1 = client
1085 .open_leased_reader::<String, String, u64, i64>(
1086 shard_id,
1087 Arc::new(StringSchema),
1088 Arc::new(StringSchema),
1089 Diagnostics::for_tests(),
1090 true,
1091 )
1092 .await
1093 .expect("codec mismatch");
1094 let mut read2 = client
1095 .open_leased_reader::<String, String, u64, i64>(
1096 shard_id,
1097 Arc::new(StringSchema),
1098 Arc::new(StringSchema),
1099 Diagnostics::for_tests(),
1100 true,
1101 )
1102 .await
1103 .expect("codec mismatch");
1104 let mut write2 = client
1105 .open_writer::<String, String, u64, i64>(
1106 shard_id,
1107 Arc::new(StringSchema),
1108 Arc::new(StringSchema),
1109 Diagnostics::for_tests(),
1110 )
1111 .await
1112 .expect("codec mismatch");
1113
1114 write2.expect_compare_and_append(&data[..1], 0, 2).await;
1115 assert_eq!(
1116 read2.expect_snapshot_and_fetch(1).await,
1117 all_ok(&data[..1], 1)
1118 );
1119 write1.expect_compare_and_append(&data[1..], 2, 4).await;
1120 assert_eq!(read1.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1121 }
1122
1123 #[mz_persist_proc::test(tokio::test)]
1124 #[cfg_attr(miri, ignore)] async fn invalid_usage(dyncfgs: ConfigUpdates) {
1126 let data = vec![
1127 (("1".to_owned(), "one".to_owned()), 1, 1),
1128 (("2".to_owned(), "two".to_owned()), 2, 1),
1129 (("3".to_owned(), "three".to_owned()), 3, 1),
1130 ];
1131
1132 let shard_id0 = "s00000000-0000-0000-0000-000000000000"
1133 .parse::<ShardId>()
1134 .expect("invalid shard id");
1135 let mut client = new_test_client(&dyncfgs).await;
1136
1137 let (mut write0, mut read0) = client
1138 .expect_open::<String, String, u64, i64>(shard_id0)
1139 .await;
1140
1141 write0.expect_compare_and_append(&data, 0, 4).await;
1142
1143 {
1145 fn codecs(
1146 k: &str,
1147 v: &str,
1148 t: &str,
1149 d: &str,
1150 ) -> (String, String, String, String, Option<CodecConcreteType>) {
1151 (k.to_owned(), v.to_owned(), t.to_owned(), d.to_owned(), None)
1152 }
1153
1154 client.shared_states = Arc::new(StateCache::new_no_metrics());
1155 assert_eq!(
1156 client
1157 .open::<Vec<u8>, String, u64, i64>(
1158 shard_id0,
1159 Arc::new(VecU8Schema),
1160 Arc::new(StringSchema),
1161 Diagnostics::for_tests(),
1162 true,
1163 )
1164 .await
1165 .unwrap_err(),
1166 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1167 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1168 actual: codecs("String", "String", "u64", "i64"),
1169 }))
1170 );
1171 assert_eq!(
1172 client
1173 .open::<String, Vec<u8>, u64, i64>(
1174 shard_id0,
1175 Arc::new(StringSchema),
1176 Arc::new(VecU8Schema),
1177 Diagnostics::for_tests(),
1178 true,
1179 )
1180 .await
1181 .unwrap_err(),
1182 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1183 requested: codecs("String", "Vec<u8>", "u64", "i64"),
1184 actual: codecs("String", "String", "u64", "i64"),
1185 }))
1186 );
1187 assert_eq!(
1188 client
1189 .open::<String, String, i64, i64>(
1190 shard_id0,
1191 Arc::new(StringSchema),
1192 Arc::new(StringSchema),
1193 Diagnostics::for_tests(),
1194 true,
1195 )
1196 .await
1197 .unwrap_err(),
1198 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1199 requested: codecs("String", "String", "i64", "i64"),
1200 actual: codecs("String", "String", "u64", "i64"),
1201 }))
1202 );
1203 assert_eq!(
1204 client
1205 .open::<String, String, u64, u64>(
1206 shard_id0,
1207 Arc::new(StringSchema),
1208 Arc::new(StringSchema),
1209 Diagnostics::for_tests(),
1210 true,
1211 )
1212 .await
1213 .unwrap_err(),
1214 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1215 requested: codecs("String", "String", "u64", "u64"),
1216 actual: codecs("String", "String", "u64", "i64"),
1217 }))
1218 );
1219
1220 assert_eq!(
1224 client
1225 .open_leased_reader::<Vec<u8>, String, u64, i64>(
1226 shard_id0,
1227 Arc::new(VecU8Schema),
1228 Arc::new(StringSchema),
1229 Diagnostics::for_tests(),
1230 true,
1231 )
1232 .await
1233 .unwrap_err(),
1234 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1235 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1236 actual: codecs("String", "String", "u64", "i64"),
1237 }))
1238 );
1239 assert_eq!(
1240 client
1241 .open_writer::<Vec<u8>, String, u64, i64>(
1242 shard_id0,
1243 Arc::new(VecU8Schema),
1244 Arc::new(StringSchema),
1245 Diagnostics::for_tests(),
1246 )
1247 .await
1248 .unwrap_err(),
1249 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1250 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1251 actual: codecs("String", "String", "u64", "i64"),
1252 }))
1253 );
1254 }
1255
1256 {
1258 let snap = read0
1259 .snapshot(Antichain::from_elem(3))
1260 .await
1261 .expect("cannot serve requested as_of");
1262
1263 let shard_id1 = "s11111111-1111-1111-1111-111111111111"
1264 .parse::<ShardId>()
1265 .expect("invalid shard id");
1266 let mut fetcher1 = client
1267 .create_batch_fetcher::<String, String, u64, i64>(
1268 shard_id1,
1269 Default::default(),
1270 Default::default(),
1271 false,
1272 Diagnostics::for_tests(),
1273 )
1274 .await
1275 .unwrap();
1276 for part in snap {
1277 let (part, _lease) = part.into_exchangeable_part();
1278 let res = fetcher1.fetch_leased_part(part).await;
1279 assert_eq!(
1280 res.unwrap_err(),
1281 InvalidUsage::BatchNotFromThisShard {
1282 batch_shard: shard_id0,
1283 handle_shard: shard_id1,
1284 }
1285 );
1286 }
1287 }
1288
1289 {
1291 let ts3 = &data[2];
1292 assert_eq!(ts3.1, 3);
1293 let ts3 = vec![ts3.clone()];
1294
1295 assert_eq!(
1298 write0
1299 .append(&ts3, Antichain::from_elem(4), Antichain::from_elem(5))
1300 .await
1301 .unwrap_err(),
1302 InvalidUsage::UpdateNotBeyondLower {
1303 ts: 3,
1304 lower: Antichain::from_elem(4),
1305 },
1306 );
1307 assert_eq!(
1308 write0
1309 .append(&ts3, Antichain::from_elem(2), Antichain::from_elem(3))
1310 .await
1311 .unwrap_err(),
1312 InvalidUsage::UpdateBeyondUpper {
1313 ts: 3,
1314 expected_upper: Antichain::from_elem(3),
1315 },
1316 );
1317 assert_eq!(
1319 write0
1320 .append(&data[..0], Antichain::from_elem(3), Antichain::from_elem(2))
1321 .await
1322 .unwrap_err(),
1323 InvalidUsage::InvalidBounds {
1324 lower: Antichain::from_elem(3),
1325 upper: Antichain::from_elem(2),
1326 },
1327 );
1328
1329 assert_eq!(
1331 write0
1332 .builder(Antichain::from_elem(3))
1333 .finish(Antichain::from_elem(2))
1334 .await
1335 .unwrap_err(),
1336 InvalidUsage::InvalidBounds {
1337 lower: Antichain::from_elem(3),
1338 upper: Antichain::from_elem(2)
1339 },
1340 );
1341 let batch = write0
1342 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1343 .await
1344 .expect("invalid usage");
1345 assert_eq!(
1346 write0
1347 .append_batch(batch, Antichain::from_elem(4), Antichain::from_elem(5))
1348 .await
1349 .unwrap_err(),
1350 InvalidUsage::InvalidBatchBounds {
1351 batch_lower: Antichain::from_elem(3),
1352 batch_upper: Antichain::from_elem(4),
1353 append_lower: Antichain::from_elem(4),
1354 append_upper: Antichain::from_elem(5),
1355 },
1356 );
1357 let batch = write0
1358 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1359 .await
1360 .expect("invalid usage");
1361 assert_eq!(
1362 write0
1363 .append_batch(batch, Antichain::from_elem(2), Antichain::from_elem(3))
1364 .await
1365 .unwrap_err(),
1366 InvalidUsage::InvalidBatchBounds {
1367 batch_lower: Antichain::from_elem(3),
1368 batch_upper: Antichain::from_elem(4),
1369 append_lower: Antichain::from_elem(2),
1370 append_upper: Antichain::from_elem(3),
1371 },
1372 );
1373 let batch = write0
1374 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1375 .await
1376 .expect("invalid usage");
1377 assert!(matches!(
1380 write0
1381 .append_batch(batch, Antichain::from_elem(3), Antichain::from_elem(3))
1382 .await
1383 .unwrap_err(),
1384 InvalidUsage::InvalidEmptyTimeInterval { .. }
1385 ));
1386 }
1387 }
1388
1389 #[mz_persist_proc::test(tokio::test)]
1390 #[cfg_attr(miri, ignore)] async fn multiple_shards(dyncfgs: ConfigUpdates) {
1392 let data1 = [
1393 (("1".to_owned(), "one".to_owned()), 1, 1),
1394 (("2".to_owned(), "two".to_owned()), 2, 1),
1395 ];
1396
1397 let data2 = [(("1".to_owned(), ()), 1, 1), (("2".to_owned(), ()), 2, 1)];
1398
1399 let client = new_test_client(&dyncfgs).await;
1400
1401 let (mut write1, mut read1) = client
1402 .expect_open::<String, String, u64, i64>(ShardId::new())
1403 .await;
1404
1405 let (mut write2, mut read2) = client
1408 .expect_open::<String, (), u64, i64>(ShardId::new())
1409 .await;
1410
1411 write1
1412 .expect_compare_and_append(&data1[..], u64::minimum(), 3)
1413 .await;
1414
1415 write2
1416 .expect_compare_and_append(&data2[..], u64::minimum(), 3)
1417 .await;
1418
1419 assert_eq!(
1420 read1.expect_snapshot_and_fetch(2).await,
1421 all_ok(&data1[..], 2)
1422 );
1423
1424 assert_eq!(
1425 read2.expect_snapshot_and_fetch(2).await,
1426 all_ok(&data2[..], 2)
1427 );
1428 }
1429
1430 #[mz_persist_proc::test(tokio::test)]
1431 #[cfg_attr(miri, ignore)] async fn fetch_upper(dyncfgs: ConfigUpdates) {
1433 let data = [
1434 (("1".to_owned(), "one".to_owned()), 1, 1),
1435 (("2".to_owned(), "two".to_owned()), 2, 1),
1436 ];
1437
1438 let client = new_test_client(&dyncfgs).await;
1439
1440 let shard_id = ShardId::new();
1441
1442 let (mut write1, _read1) = client
1443 .expect_open::<String, String, u64, i64>(shard_id)
1444 .await;
1445
1446 let (mut write2, _read2) = client
1447 .expect_open::<String, String, u64, i64>(shard_id)
1448 .await;
1449
1450 write1
1451 .expect_append(&data[..], write1.upper().clone(), vec![3])
1452 .await;
1453
1454 assert_eq!(write2.fetch_recent_upper().await, &Antichain::from_elem(3));
1456
1457 assert_eq!(write2.upper(), &Antichain::from_elem(3));
1460 }
1461
1462 #[mz_persist_proc::test(tokio::test)]
1463 #[cfg_attr(miri, ignore)] async fn append_with_invalid_upper(dyncfgs: ConfigUpdates) {
1465 let data = [
1466 (("1".to_owned(), "one".to_owned()), 1, 1),
1467 (("2".to_owned(), "two".to_owned()), 2, 1),
1468 ];
1469
1470 let client = new_test_client(&dyncfgs).await;
1471
1472 let shard_id = ShardId::new();
1473
1474 let (mut write, _read) = client
1475 .expect_open::<String, String, u64, i64>(shard_id)
1476 .await;
1477
1478 write
1479 .expect_append(&data[..], write.upper().clone(), vec![3])
1480 .await;
1481
1482 let data = [
1483 (("5".to_owned(), "fünf".to_owned()), 5, 1),
1484 (("6".to_owned(), "sechs".to_owned()), 6, 1),
1485 ];
1486 let res = write
1487 .append(
1488 data.iter(),
1489 Antichain::from_elem(5),
1490 Antichain::from_elem(7),
1491 )
1492 .await;
1493 assert_eq!(
1494 res,
1495 Ok(Err(UpperMismatch {
1496 expected: Antichain::from_elem(5),
1497 current: Antichain::from_elem(3)
1498 }))
1499 );
1500
1501 assert_eq!(write.upper(), &Antichain::from_elem(3));
1503 }
1504
1505 #[allow(unused)]
1508 async fn sync_send(dyncfgs: ConfigUpdates) {
1509 mz_ore::test::init_logging();
1510
1511 fn is_send_sync<T: Send + Sync>(_x: T) -> bool {
1512 true
1513 }
1514
1515 let client = new_test_client(&dyncfgs).await;
1516
1517 let (write, read) = client
1518 .expect_open::<String, String, u64, i64>(ShardId::new())
1519 .await;
1520
1521 assert!(is_send_sync(client));
1522 assert!(is_send_sync(write));
1523 assert!(is_send_sync(read));
1524 }
1525
1526 #[mz_persist_proc::test(tokio::test)]
1527 #[cfg_attr(miri, ignore)] async fn compare_and_append(dyncfgs: ConfigUpdates) {
1529 let data = vec![
1530 (("1".to_owned(), "one".to_owned()), 1, 1),
1531 (("2".to_owned(), "two".to_owned()), 2, 1),
1532 (("3".to_owned(), "three".to_owned()), 3, 1),
1533 ];
1534
1535 let id = ShardId::new();
1536 let client = new_test_client(&dyncfgs).await;
1537 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1538
1539 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1540
1541 assert_eq!(write1.upper(), &Antichain::from_elem(u64::minimum()));
1542 assert_eq!(write2.upper(), &Antichain::from_elem(u64::minimum()));
1543 assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1544
1545 write1
1547 .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1548 .await;
1549 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1550
1551 assert_eq!(
1552 read.expect_snapshot_and_fetch(2).await,
1553 all_ok(&data[..2], 2)
1554 );
1555
1556 let res = write2
1558 .compare_and_append(
1559 &data[..2],
1560 Antichain::from_elem(u64::minimum()),
1561 Antichain::from_elem(3),
1562 )
1563 .await;
1564 assert_eq!(
1565 res,
1566 Ok(Err(UpperMismatch {
1567 expected: Antichain::from_elem(u64::minimum()),
1568 current: Antichain::from_elem(3)
1569 }))
1570 );
1571
1572 assert_eq!(write2.upper(), &Antichain::from_elem(3));
1574
1575 write2.expect_compare_and_append(&data[2..], 3, 4).await;
1577
1578 assert_eq!(write2.upper(), &Antichain::from_elem(4));
1579
1580 assert_eq!(read.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1581 }
1582
1583 #[mz_persist_proc::test(tokio::test)]
1584 #[cfg_attr(miri, ignore)] async fn overlapping_append(dyncfgs: ConfigUpdates) {
1586 mz_ore::test::init_logging_default("info");
1587
1588 let data = vec![
1589 (("1".to_owned(), "one".to_owned()), 1, 1),
1590 (("2".to_owned(), "two".to_owned()), 2, 1),
1591 (("3".to_owned(), "three".to_owned()), 3, 1),
1592 (("4".to_owned(), "vier".to_owned()), 4, 1),
1593 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1594 ];
1595
1596 let id = ShardId::new();
1597 let client = new_test_client(&dyncfgs).await;
1598
1599 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1600
1601 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1602
1603 let mut listen = read.clone("").await.expect_listen(0).await;
1605
1606 write1
1608 .expect_append(&data[..2], write1.upper().clone(), vec![3])
1609 .await;
1610 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1611
1612 write2
1614 .expect_append(&data[..4], write2.upper().clone(), vec![5])
1615 .await;
1616 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1617
1618 write1
1620 .expect_append(&data[2..5], write1.upper().clone(), vec![6])
1621 .await;
1622 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1623
1624 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1625
1626 assert_eq!(
1627 listen.read_until(&6).await,
1628 (all_ok(&data[..], 1), Antichain::from_elem(6))
1629 );
1630 }
1631
1632 #[mz_persist_proc::test(tokio::test)]
1635 #[cfg_attr(miri, ignore)] async fn contiguous_append(dyncfgs: ConfigUpdates) {
1637 let data = vec![
1638 (("1".to_owned(), "one".to_owned()), 1, 1),
1639 (("2".to_owned(), "two".to_owned()), 2, 1),
1640 (("3".to_owned(), "three".to_owned()), 3, 1),
1641 (("4".to_owned(), "vier".to_owned()), 4, 1),
1642 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1643 ];
1644
1645 let id = ShardId::new();
1646 let client = new_test_client(&dyncfgs).await;
1647
1648 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1649
1650 write
1652 .expect_append(&data[..2], write.upper().clone(), vec![3])
1653 .await;
1654 assert_eq!(write.upper(), &Antichain::from_elem(3));
1655
1656 let result = write
1659 .append(
1660 &data[4..5],
1661 Antichain::from_elem(5),
1662 Antichain::from_elem(6),
1663 )
1664 .await;
1665 assert_eq!(
1666 result,
1667 Ok(Err(UpperMismatch {
1668 expected: Antichain::from_elem(5),
1669 current: Antichain::from_elem(3)
1670 }))
1671 );
1672
1673 write.expect_append(&data[2..5], vec![3], vec![6]).await;
1675 assert_eq!(write.upper(), &Antichain::from_elem(6));
1676
1677 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1678 }
1679
1680 #[mz_persist_proc::test(tokio::test)]
1683 #[cfg_attr(miri, ignore)] async fn noncontiguous_append_per_writer(dyncfgs: ConfigUpdates) {
1685 let data = vec![
1686 (("1".to_owned(), "one".to_owned()), 1, 1),
1687 (("2".to_owned(), "two".to_owned()), 2, 1),
1688 (("3".to_owned(), "three".to_owned()), 3, 1),
1689 (("4".to_owned(), "vier".to_owned()), 4, 1),
1690 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1691 ];
1692
1693 let id = ShardId::new();
1694 let client = new_test_client(&dyncfgs).await;
1695
1696 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1697
1698 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1699
1700 write1
1702 .expect_append(&data[..2], write1.upper().clone(), vec![3])
1703 .await;
1704 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1705
1706 write2.upper = Antichain::from_elem(3);
1708 write2
1709 .expect_append(&data[2..4], write2.upper().clone(), vec![5])
1710 .await;
1711 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1712
1713 write1.upper = Antichain::from_elem(5);
1715 write1
1716 .expect_append(&data[4..5], write1.upper().clone(), vec![6])
1717 .await;
1718 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1719
1720 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1721 }
1722
1723 #[mz_persist_proc::test(tokio::test)]
1726 #[cfg_attr(miri, ignore)] async fn contiguous_compare_and_append(dyncfgs: ConfigUpdates) {
1728 let data = vec![
1729 (("1".to_owned(), "one".to_owned()), 1, 1),
1730 (("2".to_owned(), "two".to_owned()), 2, 1),
1731 (("3".to_owned(), "three".to_owned()), 3, 1),
1732 (("4".to_owned(), "vier".to_owned()), 4, 1),
1733 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1734 ];
1735
1736 let id = ShardId::new();
1737 let client = new_test_client(&dyncfgs).await;
1738
1739 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1740
1741 write.expect_compare_and_append(&data[..2], 0, 3).await;
1743 assert_eq!(write.upper(), &Antichain::from_elem(3));
1744
1745 let result = write
1748 .compare_and_append(
1749 &data[4..5],
1750 Antichain::from_elem(5),
1751 Antichain::from_elem(6),
1752 )
1753 .await;
1754 assert_eq!(
1755 result,
1756 Ok(Err(UpperMismatch {
1757 expected: Antichain::from_elem(5),
1758 current: Antichain::from_elem(3)
1759 }))
1760 );
1761
1762 write.expect_compare_and_append(&data[2..5], 3, 6).await;
1765 assert_eq!(write.upper(), &Antichain::from_elem(6));
1766
1767 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1768 }
1769
1770 #[mz_persist_proc::test(tokio::test)]
1773 #[cfg_attr(miri, ignore)] async fn noncontiguous_compare_and_append_per_writer(dyncfgs: ConfigUpdates) {
1775 let data = vec![
1776 (("1".to_owned(), "one".to_owned()), 1, 1),
1777 (("2".to_owned(), "two".to_owned()), 2, 1),
1778 (("3".to_owned(), "three".to_owned()), 3, 1),
1779 (("4".to_owned(), "vier".to_owned()), 4, 1),
1780 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1781 ];
1782
1783 let id = ShardId::new();
1784 let client = new_test_client(&dyncfgs).await;
1785
1786 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1787
1788 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1789
1790 write1.expect_compare_and_append(&data[..2], 0, 3).await;
1792 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1793
1794 write2.expect_compare_and_append(&data[2..4], 3, 5).await;
1796 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1797
1798 write1.expect_compare_and_append(&data[4..5], 5, 6).await;
1800 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1801
1802 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1803 }
1804
1805 #[mz_ore::test]
1806 fn fmt_ids() {
1807 assert_eq!(
1808 format!("{}", LeasedReaderId([0u8; 16])),
1809 "r00000000-0000-0000-0000-000000000000"
1810 );
1811 assert_eq!(
1812 format!("{:?}", LeasedReaderId([0u8; 16])),
1813 "LeasedReaderId(00000000-0000-0000-0000-000000000000)"
1814 );
1815 }
1816
1817 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
1818 #[cfg_attr(miri, ignore)] async fn concurrency(dyncfgs: ConfigUpdates) {
1820 let data = DataGenerator::small();
1821
1822 const NUM_WRITERS: usize = 2;
1823 let id = ShardId::new();
1824 let client = new_test_client(&dyncfgs).await;
1825 let mut handles = Vec::<mz_ore::task::JoinHandle<()>>::new();
1826 for idx in 0..NUM_WRITERS {
1827 let (data, client) = (data.clone(), client.clone());
1828
1829 let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel(1);
1830
1831 let client1 = client.clone();
1832 let handle = mz_ore::task::spawn(|| format!("writer-{}", idx), async move {
1833 let (write, _) = client1.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1834 let mut current_upper = 0;
1835 for batch in data.batches() {
1836 let new_upper = match batch.get(batch.len() - 1) {
1837 Some((_, max_ts, _)) => u64::decode(max_ts) + 1,
1838 None => continue,
1839 };
1840 if PartialOrder::less_equal(&Antichain::from_elem(new_upper), write.upper()) {
1855 continue;
1856 }
1857
1858 let current_upper_chain = Antichain::from_elem(current_upper);
1859 current_upper = new_upper;
1860 let new_upper_chain = Antichain::from_elem(new_upper);
1861 let mut builder = write.builder(current_upper_chain);
1862
1863 for ((k, v), t, d) in batch.iter() {
1864 builder
1865 .add(&k.to_vec(), &v.to_vec(), &u64::decode(t), &i64::decode(d))
1866 .await
1867 .expect("invalid usage");
1868 }
1869
1870 let batch = builder
1871 .finish(new_upper_chain)
1872 .await
1873 .expect("invalid usage");
1874
1875 match batch_tx.send(batch).await {
1876 Ok(_) => (),
1877 Err(e) => panic!("send error: {}", e),
1878 }
1879 }
1880 });
1881 handles.push(handle);
1882
1883 let handle = mz_ore::task::spawn(|| format!("appender-{}", idx), async move {
1884 let (mut write, _) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1885
1886 while let Some(batch) = batch_rx.recv().await {
1887 let lower = batch.lower().clone();
1888 let upper = batch.upper().clone();
1889 write
1890 .append_batch(batch, lower, upper)
1891 .await
1892 .expect("invalid usage")
1893 .expect("unexpected upper");
1894 }
1895 });
1896 handles.push(handle);
1897 }
1898
1899 for handle in handles {
1900 let () = handle.await;
1901 }
1902
1903 let expected = data.records().collect::<Vec<_>>();
1904 let max_ts = expected.last().map(|(_, t, _)| *t).unwrap_or_default();
1905 let (_, mut read) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1906 assert_eq!(
1907 read.expect_snapshot_and_fetch(max_ts).await,
1908 all_ok(expected.iter(), max_ts)
1909 );
1910 }
1911
1912 #[mz_persist_proc::test(tokio::test)]
1916 #[cfg_attr(miri, ignore)] async fn regression_blocking_reads(dyncfgs: ConfigUpdates) {
1918 let waker = noop_waker();
1919 let mut cx = Context::from_waker(&waker);
1920
1921 let data = [
1922 (("1".to_owned(), "one".to_owned()), 1, 1),
1923 (("2".to_owned(), "two".to_owned()), 2, 1),
1924 (("3".to_owned(), "three".to_owned()), 3, 1),
1925 ];
1926
1927 let id = ShardId::new();
1928 let client = new_test_client(&dyncfgs).await;
1929 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1930
1931 let mut listen = read.clone("").await.expect_listen(1).await;
1933 let mut listen_next = Box::pin(listen.fetch_next());
1934 for _ in 0..100 {
1938 assert!(
1939 Pin::new(&mut listen_next).poll(&mut cx).is_pending(),
1940 "listen::next unexpectedly ready"
1941 );
1942 }
1943
1944 write
1946 .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1947 .await;
1948
1949 assert_eq!(
1952 listen_next.await,
1953 vec![
1954 ListenEvent::Updates(vec![(("2".to_owned(), "two".to_owned()), 2, 1)]),
1955 ListenEvent::Progress(Antichain::from_elem(3)),
1956 ]
1957 );
1958
1959 let mut snap = Box::pin(read.expect_snapshot_and_fetch(3));
1973 for _ in 0..100 {
1974 assert!(
1975 Pin::new(&mut snap).poll(&mut cx).is_pending(),
1976 "snapshot unexpectedly ready"
1977 );
1978 }
1979
1980 write.expect_compare_and_append(&data[2..], 3, 4).await;
1982
1983 assert_eq!(snap.await, all_ok(&data[..], 3));
1985 }
1986
1987 #[mz_persist_proc::test(tokio::test)]
1988 #[cfg_attr(miri, ignore)] async fn heartbeat_task_shutdown(dyncfgs: ConfigUpdates) {
1990 let mut cache = new_test_client_cache(&dyncfgs);
1993 cache
1994 .cfg
1995 .set_config(&READER_LEASE_DURATION, Duration::from_millis(1));
1996 cache.cfg.writer_lease_duration = Duration::from_millis(1);
1997 let (_write, mut read) = cache
1998 .open(PersistLocation::new_in_mem())
1999 .await
2000 .expect("client construction failed")
2001 .expect_open::<(), (), u64, i64>(ShardId::new())
2002 .await;
2003 let read_unexpired_state = read
2004 .unexpired_state
2005 .take()
2006 .expect("handle should have unexpired state");
2007 read.expire().await;
2008 read_unexpired_state.heartbeat_task.await
2009 }
2010
2011 #[mz_persist_proc::test(tokio::test)]
2014 #[cfg_attr(miri, ignore)] async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
2016 let persist_client = new_test_client(&dyncfgs).await;
2017
2018 let shard_id = ShardId::new();
2019 pub const CRITICAL_SINCE: CriticalReaderId =
2020 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2021
2022 let (mut write, mut read) = persist_client
2023 .expect_open::<(), (), u64, i64>(shard_id)
2024 .await;
2025
2026 let () = read.downgrade_since(&Antichain::new()).await;
2029 let () = write.advance_upper(&Antichain::new()).await;
2030
2031 let mut since_handle: SinceHandle<(), (), u64, i64> = persist_client
2032 .open_critical_since(
2033 shard_id,
2034 CRITICAL_SINCE,
2035 Opaque::encode(&0u64),
2036 Diagnostics::for_tests(),
2037 )
2038 .await
2039 .expect("invalid persist usage");
2040
2041 let epoch = since_handle.opaque().clone();
2042 let new_since = Antichain::new();
2043 let downgrade = since_handle
2044 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2045 .await;
2046
2047 assert!(
2048 downgrade.is_ok(),
2049 "downgrade of critical handle must succeed"
2050 );
2051
2052 let finalize = persist_client
2053 .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2054 .await;
2055
2056 assert_ok!(finalize, "finalization must succeed");
2057
2058 let is_finalized = persist_client
2059 .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2060 .await
2061 .expect("invalid persist usage");
2062 assert!(is_finalized, "shard must still be finalized");
2063 }
2064
2065 #[mz_persist_proc::test(tokio::test)]
2069 #[cfg_attr(miri, ignore)] async fn finalize_shard(dyncfgs: ConfigUpdates) {
2071 const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
2072 let persist_client = new_test_client(&dyncfgs).await;
2073
2074 let shard_id = ShardId::new();
2075 pub const CRITICAL_SINCE: CriticalReaderId =
2076 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2077
2078 let (mut write, mut read) = persist_client
2079 .expect_open::<(), (), u64, i64>(shard_id)
2080 .await;
2081
2082 let () = write
2084 .compare_and_append(DATA, Antichain::from_elem(0), Antichain::from_elem(1))
2085 .await
2086 .expect("usage should be valid")
2087 .expect("upper should match");
2088
2089 let () = read.downgrade_since(&Antichain::new()).await;
2092 let () = write.advance_upper(&Antichain::new()).await;
2093
2094 let mut since_handle: SinceHandle<(), (), u64, i64> = persist_client
2095 .open_critical_since(
2096 shard_id,
2097 CRITICAL_SINCE,
2098 Opaque::encode(&0u64),
2099 Diagnostics::for_tests(),
2100 )
2101 .await
2102 .expect("invalid persist usage");
2103
2104 let epoch = since_handle.opaque().clone();
2105 let new_since = Antichain::new();
2106 let downgrade = since_handle
2107 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2108 .await;
2109
2110 assert!(
2111 downgrade.is_ok(),
2112 "downgrade of critical handle must succeed"
2113 );
2114
2115 let finalize = persist_client
2116 .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2117 .await;
2118
2119 assert_ok!(finalize, "finalization must succeed");
2120
2121 let is_finalized = persist_client
2122 .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2123 .await
2124 .expect("invalid persist usage");
2125 assert!(is_finalized, "shard must still be finalized");
2126 }
2127
2128 proptest! {
2129 #![proptest_config(ProptestConfig::with_cases(4096))]
2130
2131 #[mz_ore::test]
2132 #[cfg_attr(miri, ignore)] fn shard_id_protobuf_roundtrip(expect in any::<ShardId>() ) {
2134 let actual = protobuf_roundtrip::<_, String>(&expect);
2135 assert_ok!(actual);
2136 assert_eq!(actual.unwrap(), expect);
2137 }
2138 }
2139}