1#![warn(missing_docs, missing_debug_implementations)]
13#![allow(ungated_async_fn_track_caller)]
17
18use std::fmt::Debug;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use mz_build_info::{BuildInfo, build_info};
25use mz_dyncfg::ConfigSet;
26use mz_ore::{instrument, soft_assert_or_log};
27use mz_persist::location::{Blob, Consensus, ExternalError};
28use mz_persist_types::schema::SchemaId;
29use mz_persist_types::{Codec, Codec64, Opaque};
30use timely::progress::Timestamp;
31
32use crate::async_runtime::IsolatedRuntime;
33use crate::cache::{PersistClientCache, StateCache};
34use crate::cfg::PersistConfig;
35use crate::critical::{CriticalReaderId, SinceHandle};
36use crate::error::InvalidUsage;
37use crate::fetch::{BatchFetcher, BatchFetcherConfig};
38use crate::internal::compact::Compactor;
39use crate::internal::encoding::{Schemas, parse_id};
40use crate::internal::gc::GarbageCollector;
41use crate::internal::machine::{Machine, retry_external};
42use crate::internal::state_versions::StateVersions;
43use crate::metrics::Metrics;
44use crate::read::{LeasedReaderId, READER_LEASE_DURATION, ReadHandle};
45use crate::rpc::PubSubSender;
46use crate::schema::CaESchema;
47use crate::write::{WriteHandle, WriterId};
48
49pub mod async_runtime;
50pub mod batch;
51pub mod cache;
52pub mod cfg;
53pub mod cli {
54 pub mod admin;
56 pub mod args;
57 pub mod bench;
58 pub mod inspect;
59}
60pub mod critical;
61pub mod error;
62pub mod fetch;
63pub mod internals_bench;
64pub mod iter;
65pub mod metrics {
66 pub use crate::internal::metrics::{
68 Metrics, SinkMetrics, SinkWorkerMetrics, UpdateDelta, encode_ts_metric,
69 };
70}
71pub mod operators {
72 use mz_dyncfg::Config;
75
76 pub mod shard_source;
77
78 pub(crate) const STORAGE_SOURCE_DECODE_FUEL: Config<usize> = Config::new(
80 "storage_source_decode_fuel",
81 100_000,
82 "\
83 The maximum amount of work to do in the persist_source mfp_and_decode \
84 operator before yielding.",
85 );
86}
87pub mod read;
88pub mod rpc;
89pub mod schema;
90pub mod stats;
91pub mod usage;
92pub mod write;
93
94mod internal {
96 pub mod apply;
97 pub mod cache;
98 pub mod compact;
99 pub mod encoding;
100 pub mod gc;
101 pub mod machine;
102 pub mod maintenance;
103 pub mod merge;
104 pub mod metrics;
105 pub mod paths;
106 pub mod restore;
107 pub mod service;
108 pub mod state;
109 pub mod state_diff;
110 pub mod state_versions;
111 pub mod trace;
112 pub mod watch;
113
114 #[cfg(test)]
115 pub mod datadriven;
116}
117
118pub const BUILD_INFO: BuildInfo = build_info!();
120
121pub use mz_persist_types::{PersistLocation, ShardId};
123
124#[derive(Clone, Debug)]
127pub struct Diagnostics {
128 pub shard_name: String,
130 pub handle_purpose: String,
132}
133
134impl Diagnostics {
135 pub fn from_purpose(handle_purpose: &str) -> Self {
137 Self {
138 shard_name: "unknown".to_string(),
139 handle_purpose: handle_purpose.to_string(),
140 }
141 }
142
143 pub fn for_tests() -> Self {
145 Self {
146 shard_name: "test-shard-name".to_string(),
147 handle_purpose: "test-purpose".to_string(),
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
173pub struct PersistClient {
174 cfg: PersistConfig,
175 blob: Arc<dyn Blob>,
176 consensus: Arc<dyn Consensus>,
177 metrics: Arc<Metrics>,
178 isolated_runtime: Arc<IsolatedRuntime>,
179 shared_states: Arc<StateCache>,
180 pubsub_sender: Arc<dyn PubSubSender>,
181}
182
183impl PersistClient {
184 pub fn new(
190 cfg: PersistConfig,
191 blob: Arc<dyn Blob>,
192 consensus: Arc<dyn Consensus>,
193 metrics: Arc<Metrics>,
194 isolated_runtime: Arc<IsolatedRuntime>,
195 shared_states: Arc<StateCache>,
196 pubsub_sender: Arc<dyn PubSubSender>,
197 ) -> Result<Self, ExternalError> {
198 Ok(PersistClient {
201 cfg,
202 blob,
203 consensus,
204 metrics,
205 isolated_runtime,
206 shared_states,
207 pubsub_sender,
208 })
209 }
210
211 pub async fn new_for_tests() -> Self {
213 let cache = PersistClientCache::new_no_metrics();
214 cache
215 .open(PersistLocation::new_in_mem())
216 .await
217 .expect("in-mem location is valid")
218 }
219
220 pub fn dyncfgs(&self) -> &ConfigSet {
222 &self.cfg.configs
223 }
224
225 async fn make_machine<K, V, T, D>(
226 &self,
227 shard_id: ShardId,
228 diagnostics: Diagnostics,
229 ) -> Result<Machine<K, V, T, D>, InvalidUsage<T>>
230 where
231 K: Debug + Codec,
232 V: Debug + Codec,
233 T: Timestamp + Lattice + Codec64 + Sync,
234 D: Semigroup + Codec64 + Send + Sync,
235 {
236 let state_versions = StateVersions::new(
237 self.cfg.clone(),
238 Arc::clone(&self.consensus),
239 Arc::clone(&self.blob),
240 Arc::clone(&self.metrics),
241 );
242 let machine = Machine::<K, V, T, D>::new(
243 self.cfg.clone(),
244 shard_id,
245 Arc::clone(&self.metrics),
246 Arc::new(state_versions),
247 Arc::clone(&self.shared_states),
248 Arc::clone(&self.pubsub_sender),
249 Arc::clone(&self.isolated_runtime),
250 diagnostics.clone(),
251 )
252 .await?;
253 Ok(machine)
254 }
255
256 #[instrument(level = "debug", fields(shard = %shard_id))]
274 pub async fn open<K, V, T, D>(
275 &self,
276 shard_id: ShardId,
277 key_schema: Arc<K::Schema>,
278 val_schema: Arc<V::Schema>,
279 diagnostics: Diagnostics,
280 use_critical_since: bool,
281 ) -> Result<(WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>), InvalidUsage<T>>
282 where
283 K: Debug + Codec,
284 V: Debug + Codec,
285 T: Timestamp + Lattice + Codec64 + Sync,
286 D: Semigroup + Ord + Codec64 + Send + Sync,
287 {
288 Ok((
289 self.open_writer(
290 shard_id,
291 Arc::clone(&key_schema),
292 Arc::clone(&val_schema),
293 diagnostics.clone(),
294 )
295 .await?,
296 self.open_leased_reader(
297 shard_id,
298 key_schema,
299 val_schema,
300 diagnostics,
301 use_critical_since,
302 )
303 .await?,
304 ))
305 }
306
307 #[instrument(level = "debug", fields(shard = %shard_id))]
316 pub async fn open_leased_reader<K, V, T, D>(
317 &self,
318 shard_id: ShardId,
319 key_schema: Arc<K::Schema>,
320 val_schema: Arc<V::Schema>,
321 diagnostics: Diagnostics,
322 use_critical_since: bool,
323 ) -> Result<ReadHandle<K, V, T, D>, InvalidUsage<T>>
324 where
325 K: Debug + Codec,
326 V: Debug + Codec,
327 T: Timestamp + Lattice + Codec64 + Sync,
328 D: Semigroup + Codec64 + Send + Sync,
329 {
330 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
331 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
332
333 let reader_id = LeasedReaderId::new();
334 let heartbeat_ts = (self.cfg.now)();
335 let (reader_state, maintenance) = machine
336 .register_leased_reader(
337 &reader_id,
338 &diagnostics.handle_purpose,
339 READER_LEASE_DURATION.get(&self.cfg),
340 heartbeat_ts,
341 use_critical_since,
342 )
343 .await;
344 maintenance.start_performing(&machine, &gc);
345 let schemas = Schemas {
346 id: None,
347 key: key_schema,
348 val: val_schema,
349 };
350 let reader = ReadHandle::new(
351 self.cfg.clone(),
352 Arc::clone(&self.metrics),
353 machine,
354 gc,
355 Arc::clone(&self.blob),
356 reader_id,
357 schemas,
358 reader_state.since,
359 heartbeat_ts,
360 )
361 .await;
362
363 Ok(reader)
364 }
365
366 #[instrument(level = "debug", fields(shard = %shard_id))]
368 pub async fn create_batch_fetcher<K, V, T, D>(
369 &self,
370 shard_id: ShardId,
371 key_schema: Arc<K::Schema>,
372 val_schema: Arc<V::Schema>,
373 is_transient: bool,
374 diagnostics: Diagnostics,
375 ) -> Result<BatchFetcher<K, V, T, D>, InvalidUsage<T>>
376 where
377 K: Debug + Codec,
378 V: Debug + Codec,
379 T: Timestamp + Lattice + Codec64 + Sync,
380 D: Semigroup + Codec64 + Send + Sync,
381 {
382 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
383 let read_schemas = Schemas {
384 id: None,
385 key: key_schema,
386 val: val_schema,
387 };
388 let schema_cache = machine.applier.schema_cache();
389 let fetcher = BatchFetcher {
390 cfg: BatchFetcherConfig::new(&self.cfg),
391 blob: Arc::clone(&self.blob),
392 metrics: Arc::clone(&self.metrics),
393 shard_metrics: Arc::clone(&machine.applier.shard_metrics),
394 shard_id,
395 read_schemas,
396 schema_cache,
397 is_transient,
398 _phantom: PhantomData,
399 };
400
401 Ok(fetcher)
402 }
403
404 pub const CONTROLLER_CRITICAL_SINCE: CriticalReaderId =
426 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
427
428 #[instrument(level = "debug", fields(shard = %shard_id))]
449 pub async fn open_critical_since<K, V, T, D, O>(
450 &self,
451 shard_id: ShardId,
452 reader_id: CriticalReaderId,
453 diagnostics: Diagnostics,
454 ) -> Result<SinceHandle<K, V, T, D, O>, InvalidUsage<T>>
455 where
456 K: Debug + Codec,
457 V: Debug + Codec,
458 T: Timestamp + Lattice + Codec64 + Sync,
459 D: Semigroup + Codec64 + Send + Sync,
460 O: Opaque + Codec64,
461 {
462 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
463 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
464
465 let (state, maintenance) = machine
466 .register_critical_reader::<O>(&reader_id, &diagnostics.handle_purpose)
467 .await;
468 maintenance.start_performing(&machine, &gc);
469 let handle = SinceHandle::new(
470 machine,
471 gc,
472 reader_id,
473 state.since,
474 Codec64::decode(state.opaque.0),
475 );
476
477 Ok(handle)
478 }
479
480 #[instrument(level = "debug", fields(shard = %shard_id))]
489 pub async fn open_writer<K, V, T, D>(
490 &self,
491 shard_id: ShardId,
492 key_schema: Arc<K::Schema>,
493 val_schema: Arc<V::Schema>,
494 diagnostics: Diagnostics,
495 ) -> Result<WriteHandle<K, V, T, D>, InvalidUsage<T>>
496 where
497 K: Debug + Codec,
498 V: Debug + Codec,
499 T: Timestamp + Lattice + Codec64 + Sync,
500 D: Semigroup + Ord + Codec64 + Send + Sync,
501 {
502 let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
503 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
504
505 let (schema_id, maintenance) = machine.register_schema(&*key_schema, &*val_schema).await;
515 maintenance.start_performing(&machine, &gc);
516 soft_assert_or_log!(
517 schema_id.is_some(),
518 "unable to register schemas {:?} {:?}",
519 key_schema,
520 val_schema,
521 );
522
523 let writer_id = WriterId::new();
524 let schemas = Schemas {
525 id: schema_id,
526 key: key_schema,
527 val: val_schema,
528 };
529 let writer = WriteHandle::new(
530 self.cfg.clone(),
531 Arc::clone(&self.metrics),
532 machine,
533 gc,
534 Arc::clone(&self.blob),
535 writer_id,
536 &diagnostics.handle_purpose,
537 schemas,
538 );
539 Ok(writer)
540 }
541
542 pub async fn get_schema<K, V, T, D>(
544 &self,
545 shard_id: ShardId,
546 schema_id: SchemaId,
547 diagnostics: Diagnostics,
548 ) -> Result<Option<(K::Schema, V::Schema)>, InvalidUsage<T>>
549 where
550 K: Debug + Codec,
551 V: Debug + Codec,
552 T: Timestamp + Lattice + Codec64 + Sync,
553 D: Semigroup + Codec64 + Send + Sync,
554 {
555 let machine = self
556 .make_machine::<K, V, T, D>(shard_id, diagnostics)
557 .await?;
558 Ok(machine.get_schema(schema_id))
559 }
560
561 pub async fn latest_schema<K, V, T, D>(
563 &self,
564 shard_id: ShardId,
565 diagnostics: Diagnostics,
566 ) -> Result<Option<(SchemaId, K::Schema, V::Schema)>, InvalidUsage<T>>
567 where
568 K: Debug + Codec,
569 V: Debug + Codec,
570 T: Timestamp + Lattice + Codec64 + Sync,
571 D: Semigroup + Codec64 + Send + Sync,
572 {
573 let machine = self
574 .make_machine::<K, V, T, D>(shard_id, diagnostics)
575 .await?;
576 Ok(machine.latest_schema())
577 }
578
579 pub async fn compare_and_evolve_schema<K, V, T, D>(
590 &self,
591 shard_id: ShardId,
592 expected: SchemaId,
593 key_schema: &K::Schema,
594 val_schema: &V::Schema,
595 diagnostics: Diagnostics,
596 ) -> Result<CaESchema<K, V>, InvalidUsage<T>>
597 where
598 K: Debug + Codec,
599 V: Debug + Codec,
600 T: Timestamp + Lattice + Codec64 + Sync,
601 D: Semigroup + Codec64 + Send + Sync,
602 {
603 let machine = self
604 .make_machine::<K, V, T, D>(shard_id, diagnostics)
605 .await?;
606 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
607 let (res, maintenance) = machine
608 .compare_and_evolve_schema(expected, key_schema, val_schema)
609 .await;
610 maintenance.start_performing(&machine, &gc);
611 Ok(res)
612 }
613
614 pub async fn is_finalized<K, V, T, D>(
618 &self,
619 shard_id: ShardId,
620 diagnostics: Diagnostics,
621 ) -> Result<bool, InvalidUsage<T>>
622 where
623 K: Debug + Codec,
624 V: Debug + Codec,
625 T: Timestamp + Lattice + Codec64 + Sync,
626 D: Semigroup + Codec64 + Send + Sync,
627 {
628 let machine = self
629 .make_machine::<K, V, T, D>(shard_id, diagnostics)
630 .await?;
631 Ok(machine.is_finalized())
632 }
633
634 #[instrument(level = "debug", fields(shard = %shard_id))]
645 pub async fn finalize_shard<K, V, T, D>(
646 &self,
647 shard_id: ShardId,
648 diagnostics: Diagnostics,
649 ) -> Result<(), InvalidUsage<T>>
650 where
651 K: Debug + Codec,
652 V: Debug + Codec,
653 T: Timestamp + Lattice + Codec64 + Sync,
654 D: Semigroup + Codec64 + Send + Sync,
655 {
656 let machine = self
657 .make_machine::<K, V, T, D>(shard_id, diagnostics)
658 .await?;
659
660 let maintenance = machine.become_tombstone().await?;
661 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
662
663 let () = maintenance.perform(&machine, &gc).await;
664
665 Ok(())
666 }
667
668 pub async fn inspect_shard<T: Timestamp + Lattice + Codec64>(
674 &self,
675 shard_id: &ShardId,
676 ) -> Result<impl serde::Serialize, anyhow::Error> {
677 let state_versions = StateVersions::new(
678 self.cfg.clone(),
679 Arc::clone(&self.consensus),
680 Arc::clone(&self.blob),
681 Arc::clone(&self.metrics),
682 );
683 let versions = state_versions.fetch_all_live_diffs(shard_id).await;
687 if versions.0.is_empty() {
688 return Err(anyhow::anyhow!("{} does not exist", shard_id));
689 }
690 let state = state_versions
691 .fetch_current_state::<T>(shard_id, versions.0)
692 .await;
693 let state = state.check_ts_codec(shard_id)?;
694 Ok(state)
695 }
696
697 #[cfg(test)]
699 #[track_caller]
700 pub async fn expect_open<K, V, T, D>(
701 &self,
702 shard_id: ShardId,
703 ) -> (WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>)
704 where
705 K: Debug + Codec,
706 V: Debug + Codec,
707 T: Timestamp + Lattice + Codec64 + Sync,
708 D: Semigroup + Ord + Codec64 + Send + Sync,
709 K::Schema: Default,
710 V::Schema: Default,
711 {
712 self.open(
713 shard_id,
714 Arc::new(K::Schema::default()),
715 Arc::new(V::Schema::default()),
716 Diagnostics::for_tests(),
717 true,
718 )
719 .await
720 .expect("codec mismatch")
721 }
722
723 pub fn metrics(&self) -> &Arc<Metrics> {
727 &self.metrics
728 }
729}
730
731#[cfg(test)]
732mod tests {
733 use std::future::Future;
734 use std::mem;
735 use std::pin::Pin;
736 use std::task::Context;
737 use std::time::Duration;
738
739 use differential_dataflow::consolidation::consolidate_updates;
740 use differential_dataflow::lattice::Lattice;
741 use futures_task::noop_waker;
742 use mz_dyncfg::ConfigUpdates;
743 use mz_ore::assert_ok;
744 use mz_persist::indexed::encoding::BlobTraceBatchPart;
745 use mz_persist::workload::DataGenerator;
746 use mz_persist_types::codec_impls::{StringSchema, VecU8Schema};
747 use mz_proto::protobuf_roundtrip;
748 use proptest::prelude::*;
749 use timely::order::PartialOrder;
750 use timely::progress::Antichain;
751
752 use crate::batch::BLOB_TARGET_SIZE;
753 use crate::cache::PersistClientCache;
754 use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
755 use crate::error::{CodecConcreteType, CodecMismatch, UpperMismatch};
756 use crate::internal::paths::BlobKey;
757 use crate::read::ListenEvent;
758
759 use super::*;
760
761 pub fn new_test_client_cache(dyncfgs: &ConfigUpdates) -> PersistClientCache {
762 let mut cache = PersistClientCache::new_no_metrics();
765 cache.cfg.set_config(&BLOB_TARGET_SIZE, 10);
766 cache
767 .cfg
768 .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 1);
769 dyncfgs.apply(cache.cfg());
770
771 cache.cfg.compaction_enabled = true;
773 cache
774 }
775
776 pub async fn new_test_client(dyncfgs: &ConfigUpdates) -> PersistClient {
777 let cache = new_test_client_cache(dyncfgs);
778 cache
779 .open(PersistLocation::new_in_mem())
780 .await
781 .expect("client construction failed")
782 }
783
784 pub fn all_ok<'a, K, V, T, D, I>(
785 iter: I,
786 as_of: T,
787 ) -> Vec<((Result<K, String>, Result<V, String>), T, D)>
788 where
789 K: Ord + Clone + 'a,
790 V: Ord + Clone + 'a,
791 T: Timestamp + Lattice + Clone + 'a,
792 D: Semigroup + Clone + 'a,
793 I: IntoIterator<Item = &'a ((K, V), T, D)>,
794 {
795 let as_of = Antichain::from_elem(as_of);
796 let mut ret = iter
797 .into_iter()
798 .map(|((k, v), t, d)| {
799 let mut t = t.clone();
800 t.advance_by(as_of.borrow());
801 ((Ok(k.clone()), Ok(v.clone())), t, d.clone())
802 })
803 .collect();
804 consolidate_updates(&mut ret);
805 ret
806 }
807
808 pub async fn expect_fetch_part<K, V, T, D>(
809 blob: &dyn Blob,
810 key: &BlobKey,
811 metrics: &Metrics,
812 read_schemas: &Schemas<K, V>,
813 ) -> (
814 BlobTraceBatchPart<T>,
815 Vec<((Result<K, String>, Result<V, String>), T, D)>,
816 )
817 where
818 K: Codec,
819 V: Codec,
820 T: Timestamp + Codec64,
821 D: Codec64,
822 {
823 let value = blob
824 .get(key)
825 .await
826 .expect("failed to fetch part")
827 .expect("missing part");
828 let mut part =
829 BlobTraceBatchPart::decode(&value, &metrics.columnar).expect("failed to decode part");
830 let _ = part
832 .updates
833 .get_or_make_codec::<K, V>(&read_schemas.key, &read_schemas.val);
834 let mut updates = Vec::new();
835 for ((k, v), t, d) in part.updates.records().expect("codec data").iter() {
837 updates.push((
838 (
839 K::decode(k, &read_schemas.key),
840 V::decode(v, &read_schemas.val),
841 ),
842 T::decode(t),
843 D::decode(d),
844 ));
845 }
846 (part, updates)
847 }
848
849 #[mz_persist_proc::test(tokio::test)]
850 #[cfg_attr(miri, ignore)] async fn sanity_check(dyncfgs: ConfigUpdates) {
852 let data = [
853 (("1".to_owned(), "one".to_owned()), 1, 1),
854 (("2".to_owned(), "two".to_owned()), 2, 1),
855 (("3".to_owned(), "three".to_owned()), 3, 1),
856 ];
857
858 let (mut write, mut read) = new_test_client(&dyncfgs)
859 .await
860 .expect_open::<String, String, u64, i64>(ShardId::new())
861 .await;
862 assert_eq!(write.upper(), &Antichain::from_elem(u64::minimum()));
863 assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
864
865 write
867 .expect_append(&data[..2], write.upper().clone(), vec![3])
868 .await;
869 assert_eq!(write.upper(), &Antichain::from_elem(3));
870
871 assert_eq!(
873 read.expect_snapshot_and_fetch(1).await,
874 all_ok(&data[..1], 1)
875 );
876
877 let mut listen = read.clone("").await.expect_listen(1).await;
878
879 write
881 .expect_append(&data[2..], write.upper().clone(), vec![4])
882 .await;
883 assert_eq!(write.upper(), &Antichain::from_elem(4));
884
885 assert_eq!(
887 listen.read_until(&4).await,
888 (all_ok(&data[1..], 1), Antichain::from_elem(4))
889 );
890
891 read.downgrade_since(&Antichain::from_elem(2)).await;
893 assert_eq!(read.since(), &Antichain::from_elem(2));
894 }
895
896 #[mz_persist_proc::test(tokio::test)]
898 #[cfg_attr(miri, ignore)] async fn open_reader_writer(dyncfgs: ConfigUpdates) {
900 let data = vec![
901 (("1".to_owned(), "one".to_owned()), 1, 1),
902 (("2".to_owned(), "two".to_owned()), 2, 1),
903 (("3".to_owned(), "three".to_owned()), 3, 1),
904 ];
905
906 let shard_id = ShardId::new();
907 let client = new_test_client(&dyncfgs).await;
908 let mut write1 = client
909 .open_writer::<String, String, u64, i64>(
910 shard_id,
911 Arc::new(StringSchema),
912 Arc::new(StringSchema),
913 Diagnostics::for_tests(),
914 )
915 .await
916 .expect("codec mismatch");
917 let mut read1 = client
918 .open_leased_reader::<String, String, u64, i64>(
919 shard_id,
920 Arc::new(StringSchema),
921 Arc::new(StringSchema),
922 Diagnostics::for_tests(),
923 true,
924 )
925 .await
926 .expect("codec mismatch");
927 let mut read2 = client
928 .open_leased_reader::<String, String, u64, i64>(
929 shard_id,
930 Arc::new(StringSchema),
931 Arc::new(StringSchema),
932 Diagnostics::for_tests(),
933 true,
934 )
935 .await
936 .expect("codec mismatch");
937 let mut write2 = client
938 .open_writer::<String, String, u64, i64>(
939 shard_id,
940 Arc::new(StringSchema),
941 Arc::new(StringSchema),
942 Diagnostics::for_tests(),
943 )
944 .await
945 .expect("codec mismatch");
946
947 write2.expect_compare_and_append(&data[..1], 0, 2).await;
948 assert_eq!(
949 read2.expect_snapshot_and_fetch(1).await,
950 all_ok(&data[..1], 1)
951 );
952 write1.expect_compare_and_append(&data[1..], 2, 4).await;
953 assert_eq!(read1.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
954 }
955
956 #[mz_persist_proc::test(tokio::test)]
957 #[cfg_attr(miri, ignore)] async fn invalid_usage(dyncfgs: ConfigUpdates) {
959 let data = vec![
960 (("1".to_owned(), "one".to_owned()), 1, 1),
961 (("2".to_owned(), "two".to_owned()), 2, 1),
962 (("3".to_owned(), "three".to_owned()), 3, 1),
963 ];
964
965 let shard_id0 = "s00000000-0000-0000-0000-000000000000"
966 .parse::<ShardId>()
967 .expect("invalid shard id");
968 let mut client = new_test_client(&dyncfgs).await;
969
970 let (mut write0, mut read0) = client
971 .expect_open::<String, String, u64, i64>(shard_id0)
972 .await;
973
974 write0.expect_compare_and_append(&data, 0, 4).await;
975
976 {
978 fn codecs(
979 k: &str,
980 v: &str,
981 t: &str,
982 d: &str,
983 ) -> (String, String, String, String, Option<CodecConcreteType>) {
984 (k.to_owned(), v.to_owned(), t.to_owned(), d.to_owned(), None)
985 }
986
987 client.shared_states = Arc::new(StateCache::new_no_metrics());
988 assert_eq!(
989 client
990 .open::<Vec<u8>, String, u64, i64>(
991 shard_id0,
992 Arc::new(VecU8Schema),
993 Arc::new(StringSchema),
994 Diagnostics::for_tests(),
995 true,
996 )
997 .await
998 .unwrap_err(),
999 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1000 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1001 actual: codecs("String", "String", "u64", "i64"),
1002 }))
1003 );
1004 assert_eq!(
1005 client
1006 .open::<String, Vec<u8>, u64, i64>(
1007 shard_id0,
1008 Arc::new(StringSchema),
1009 Arc::new(VecU8Schema),
1010 Diagnostics::for_tests(),
1011 true,
1012 )
1013 .await
1014 .unwrap_err(),
1015 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1016 requested: codecs("String", "Vec<u8>", "u64", "i64"),
1017 actual: codecs("String", "String", "u64", "i64"),
1018 }))
1019 );
1020 assert_eq!(
1021 client
1022 .open::<String, String, i64, i64>(
1023 shard_id0,
1024 Arc::new(StringSchema),
1025 Arc::new(StringSchema),
1026 Diagnostics::for_tests(),
1027 true,
1028 )
1029 .await
1030 .unwrap_err(),
1031 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1032 requested: codecs("String", "String", "i64", "i64"),
1033 actual: codecs("String", "String", "u64", "i64"),
1034 }))
1035 );
1036 assert_eq!(
1037 client
1038 .open::<String, String, u64, u64>(
1039 shard_id0,
1040 Arc::new(StringSchema),
1041 Arc::new(StringSchema),
1042 Diagnostics::for_tests(),
1043 true,
1044 )
1045 .await
1046 .unwrap_err(),
1047 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1048 requested: codecs("String", "String", "u64", "u64"),
1049 actual: codecs("String", "String", "u64", "i64"),
1050 }))
1051 );
1052
1053 assert_eq!(
1057 client
1058 .open_leased_reader::<Vec<u8>, String, u64, i64>(
1059 shard_id0,
1060 Arc::new(VecU8Schema),
1061 Arc::new(StringSchema),
1062 Diagnostics::for_tests(),
1063 true,
1064 )
1065 .await
1066 .unwrap_err(),
1067 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1068 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1069 actual: codecs("String", "String", "u64", "i64"),
1070 }))
1071 );
1072 assert_eq!(
1073 client
1074 .open_writer::<Vec<u8>, String, u64, i64>(
1075 shard_id0,
1076 Arc::new(VecU8Schema),
1077 Arc::new(StringSchema),
1078 Diagnostics::for_tests(),
1079 )
1080 .await
1081 .unwrap_err(),
1082 InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1083 requested: codecs("Vec<u8>", "String", "u64", "i64"),
1084 actual: codecs("String", "String", "u64", "i64"),
1085 }))
1086 );
1087 }
1088
1089 {
1091 let snap = read0
1092 .snapshot(Antichain::from_elem(3))
1093 .await
1094 .expect("cannot serve requested as_of");
1095
1096 let shard_id1 = "s11111111-1111-1111-1111-111111111111"
1097 .parse::<ShardId>()
1098 .expect("invalid shard id");
1099 let mut fetcher1 = client
1100 .create_batch_fetcher::<String, String, u64, i64>(
1101 shard_id1,
1102 Default::default(),
1103 Default::default(),
1104 false,
1105 Diagnostics::for_tests(),
1106 )
1107 .await
1108 .unwrap();
1109 for batch in snap {
1110 let res = fetcher1.fetch_leased_part(&batch).await;
1111 assert_eq!(
1112 res.unwrap_err(),
1113 InvalidUsage::BatchNotFromThisShard {
1114 batch_shard: shard_id0,
1115 handle_shard: shard_id1,
1116 }
1117 );
1118 }
1119 }
1120
1121 {
1123 let ts3 = &data[2];
1124 assert_eq!(ts3.1, 3);
1125 let ts3 = vec![ts3.clone()];
1126
1127 assert_eq!(
1130 write0
1131 .append(&ts3, Antichain::from_elem(4), Antichain::from_elem(5))
1132 .await
1133 .unwrap_err(),
1134 InvalidUsage::UpdateNotBeyondLower {
1135 ts: 3,
1136 lower: Antichain::from_elem(4),
1137 },
1138 );
1139 assert_eq!(
1140 write0
1141 .append(&ts3, Antichain::from_elem(2), Antichain::from_elem(3))
1142 .await
1143 .unwrap_err(),
1144 InvalidUsage::UpdateBeyondUpper {
1145 ts: 3,
1146 expected_upper: Antichain::from_elem(3),
1147 },
1148 );
1149 assert_eq!(
1151 write0
1152 .append(&data[..0], Antichain::from_elem(3), Antichain::from_elem(2))
1153 .await
1154 .unwrap_err(),
1155 InvalidUsage::InvalidBounds {
1156 lower: Antichain::from_elem(3),
1157 upper: Antichain::from_elem(2),
1158 },
1159 );
1160
1161 assert_eq!(
1163 write0
1164 .builder(Antichain::from_elem(3))
1165 .finish(Antichain::from_elem(2))
1166 .await
1167 .unwrap_err(),
1168 InvalidUsage::InvalidBounds {
1169 lower: Antichain::from_elem(3),
1170 upper: Antichain::from_elem(2)
1171 },
1172 );
1173 let batch = write0
1174 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1175 .await
1176 .expect("invalid usage");
1177 assert_eq!(
1178 write0
1179 .append_batch(batch, Antichain::from_elem(4), Antichain::from_elem(5))
1180 .await
1181 .unwrap_err(),
1182 InvalidUsage::InvalidBatchBounds {
1183 batch_lower: Antichain::from_elem(3),
1184 batch_upper: Antichain::from_elem(4),
1185 append_lower: Antichain::from_elem(4),
1186 append_upper: Antichain::from_elem(5),
1187 },
1188 );
1189 let batch = write0
1190 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1191 .await
1192 .expect("invalid usage");
1193 assert_eq!(
1194 write0
1195 .append_batch(batch, Antichain::from_elem(2), Antichain::from_elem(3))
1196 .await
1197 .unwrap_err(),
1198 InvalidUsage::InvalidBatchBounds {
1199 batch_lower: Antichain::from_elem(3),
1200 batch_upper: Antichain::from_elem(4),
1201 append_lower: Antichain::from_elem(2),
1202 append_upper: Antichain::from_elem(3),
1203 },
1204 );
1205 let batch = write0
1206 .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1207 .await
1208 .expect("invalid usage");
1209 assert!(matches!(
1212 write0
1213 .append_batch(batch, Antichain::from_elem(3), Antichain::from_elem(3))
1214 .await
1215 .unwrap_err(),
1216 InvalidUsage::InvalidEmptyTimeInterval { .. }
1217 ));
1218 }
1219 }
1220
1221 #[mz_persist_proc::test(tokio::test)]
1222 #[cfg_attr(miri, ignore)] async fn multiple_shards(dyncfgs: ConfigUpdates) {
1224 let data1 = [
1225 (("1".to_owned(), "one".to_owned()), 1, 1),
1226 (("2".to_owned(), "two".to_owned()), 2, 1),
1227 ];
1228
1229 let data2 = [(("1".to_owned(), ()), 1, 1), (("2".to_owned(), ()), 2, 1)];
1230
1231 let client = new_test_client(&dyncfgs).await;
1232
1233 let (mut write1, mut read1) = client
1234 .expect_open::<String, String, u64, i64>(ShardId::new())
1235 .await;
1236
1237 let (mut write2, mut read2) = client
1240 .expect_open::<String, (), u64, i64>(ShardId::new())
1241 .await;
1242
1243 write1
1244 .expect_compare_and_append(&data1[..], u64::minimum(), 3)
1245 .await;
1246
1247 write2
1248 .expect_compare_and_append(&data2[..], u64::minimum(), 3)
1249 .await;
1250
1251 assert_eq!(
1252 read1.expect_snapshot_and_fetch(2).await,
1253 all_ok(&data1[..], 2)
1254 );
1255
1256 assert_eq!(
1257 read2.expect_snapshot_and_fetch(2).await,
1258 all_ok(&data2[..], 2)
1259 );
1260 }
1261
1262 #[mz_persist_proc::test(tokio::test)]
1263 #[cfg_attr(miri, ignore)] async fn fetch_upper(dyncfgs: ConfigUpdates) {
1265 let data = [
1266 (("1".to_owned(), "one".to_owned()), 1, 1),
1267 (("2".to_owned(), "two".to_owned()), 2, 1),
1268 ];
1269
1270 let client = new_test_client(&dyncfgs).await;
1271
1272 let shard_id = ShardId::new();
1273
1274 let (mut write1, _read1) = client
1275 .expect_open::<String, String, u64, i64>(shard_id)
1276 .await;
1277
1278 let (mut write2, _read2) = client
1279 .expect_open::<String, String, u64, i64>(shard_id)
1280 .await;
1281
1282 write1
1283 .expect_append(&data[..], write1.upper().clone(), vec![3])
1284 .await;
1285
1286 assert_eq!(write2.fetch_recent_upper().await, &Antichain::from_elem(3));
1288
1289 assert_eq!(write2.upper(), &Antichain::from_elem(3));
1292 }
1293
1294 #[mz_persist_proc::test(tokio::test)]
1295 #[cfg_attr(miri, ignore)] async fn append_with_invalid_upper(dyncfgs: ConfigUpdates) {
1297 let data = [
1298 (("1".to_owned(), "one".to_owned()), 1, 1),
1299 (("2".to_owned(), "two".to_owned()), 2, 1),
1300 ];
1301
1302 let client = new_test_client(&dyncfgs).await;
1303
1304 let shard_id = ShardId::new();
1305
1306 let (mut write, _read) = client
1307 .expect_open::<String, String, u64, i64>(shard_id)
1308 .await;
1309
1310 write
1311 .expect_append(&data[..], write.upper().clone(), vec![3])
1312 .await;
1313
1314 let data = [
1315 (("5".to_owned(), "fünf".to_owned()), 5, 1),
1316 (("6".to_owned(), "sechs".to_owned()), 6, 1),
1317 ];
1318 let res = write
1319 .append(
1320 data.iter(),
1321 Antichain::from_elem(5),
1322 Antichain::from_elem(7),
1323 )
1324 .await;
1325 assert_eq!(
1326 res,
1327 Ok(Err(UpperMismatch {
1328 expected: Antichain::from_elem(5),
1329 current: Antichain::from_elem(3)
1330 }))
1331 );
1332
1333 assert_eq!(write.upper(), &Antichain::from_elem(3));
1335 }
1336
1337 #[allow(unused)]
1340 async fn sync_send(dyncfgs: ConfigUpdates) {
1341 mz_ore::test::init_logging();
1342
1343 fn is_send_sync<T: Send + Sync>(_x: T) -> bool {
1344 true
1345 }
1346
1347 let client = new_test_client(&dyncfgs).await;
1348
1349 let (write, read) = client
1350 .expect_open::<String, String, u64, i64>(ShardId::new())
1351 .await;
1352
1353 assert!(is_send_sync(client));
1354 assert!(is_send_sync(write));
1355 assert!(is_send_sync(read));
1356 }
1357
1358 #[mz_persist_proc::test(tokio::test)]
1359 #[cfg_attr(miri, ignore)] async fn compare_and_append(dyncfgs: ConfigUpdates) {
1361 let data = vec![
1362 (("1".to_owned(), "one".to_owned()), 1, 1),
1363 (("2".to_owned(), "two".to_owned()), 2, 1),
1364 (("3".to_owned(), "three".to_owned()), 3, 1),
1365 ];
1366
1367 let id = ShardId::new();
1368 let client = new_test_client(&dyncfgs).await;
1369 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1370
1371 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1372
1373 assert_eq!(write1.upper(), &Antichain::from_elem(u64::minimum()));
1374 assert_eq!(write2.upper(), &Antichain::from_elem(u64::minimum()));
1375 assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1376
1377 write1
1379 .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1380 .await;
1381 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1382
1383 assert_eq!(
1384 read.expect_snapshot_and_fetch(2).await,
1385 all_ok(&data[..2], 2)
1386 );
1387
1388 let res = write2
1390 .compare_and_append(
1391 &data[..2],
1392 Antichain::from_elem(u64::minimum()),
1393 Antichain::from_elem(3),
1394 )
1395 .await;
1396 assert_eq!(
1397 res,
1398 Ok(Err(UpperMismatch {
1399 expected: Antichain::from_elem(u64::minimum()),
1400 current: Antichain::from_elem(3)
1401 }))
1402 );
1403
1404 assert_eq!(write2.upper(), &Antichain::from_elem(3));
1406
1407 write2.expect_compare_and_append(&data[2..], 3, 4).await;
1409
1410 assert_eq!(write2.upper(), &Antichain::from_elem(4));
1411
1412 assert_eq!(read.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1413 }
1414
1415 #[mz_persist_proc::test(tokio::test)]
1416 #[cfg_attr(miri, ignore)] async fn overlapping_append(dyncfgs: ConfigUpdates) {
1418 mz_ore::test::init_logging_default("info");
1419
1420 let data = vec![
1421 (("1".to_owned(), "one".to_owned()), 1, 1),
1422 (("2".to_owned(), "two".to_owned()), 2, 1),
1423 (("3".to_owned(), "three".to_owned()), 3, 1),
1424 (("4".to_owned(), "vier".to_owned()), 4, 1),
1425 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1426 ];
1427
1428 let id = ShardId::new();
1429 let client = new_test_client(&dyncfgs).await;
1430
1431 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1432
1433 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1434
1435 let mut listen = read.clone("").await.expect_listen(0).await;
1437
1438 write1
1440 .expect_append(&data[..2], write1.upper().clone(), vec![3])
1441 .await;
1442 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1443
1444 write2
1446 .expect_append(&data[..4], write2.upper().clone(), vec![5])
1447 .await;
1448 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1449
1450 write1
1452 .expect_append(&data[2..5], write1.upper().clone(), vec![6])
1453 .await;
1454 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1455
1456 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1457
1458 assert_eq!(
1459 listen.read_until(&6).await,
1460 (all_ok(&data[..], 1), Antichain::from_elem(6))
1461 );
1462 }
1463
1464 #[mz_persist_proc::test(tokio::test)]
1467 #[cfg_attr(miri, ignore)] async fn contiguous_append(dyncfgs: ConfigUpdates) {
1469 let data = vec![
1470 (("1".to_owned(), "one".to_owned()), 1, 1),
1471 (("2".to_owned(), "two".to_owned()), 2, 1),
1472 (("3".to_owned(), "three".to_owned()), 3, 1),
1473 (("4".to_owned(), "vier".to_owned()), 4, 1),
1474 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1475 ];
1476
1477 let id = ShardId::new();
1478 let client = new_test_client(&dyncfgs).await;
1479
1480 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1481
1482 write
1484 .expect_append(&data[..2], write.upper().clone(), vec![3])
1485 .await;
1486 assert_eq!(write.upper(), &Antichain::from_elem(3));
1487
1488 let result = write
1491 .append(
1492 &data[4..5],
1493 Antichain::from_elem(5),
1494 Antichain::from_elem(6),
1495 )
1496 .await;
1497 assert_eq!(
1498 result,
1499 Ok(Err(UpperMismatch {
1500 expected: Antichain::from_elem(5),
1501 current: Antichain::from_elem(3)
1502 }))
1503 );
1504
1505 write.expect_append(&data[2..5], vec![3], vec![6]).await;
1507 assert_eq!(write.upper(), &Antichain::from_elem(6));
1508
1509 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1510 }
1511
1512 #[mz_persist_proc::test(tokio::test)]
1515 #[cfg_attr(miri, ignore)] async fn noncontiguous_append_per_writer(dyncfgs: ConfigUpdates) {
1517 let data = vec![
1518 (("1".to_owned(), "one".to_owned()), 1, 1),
1519 (("2".to_owned(), "two".to_owned()), 2, 1),
1520 (("3".to_owned(), "three".to_owned()), 3, 1),
1521 (("4".to_owned(), "vier".to_owned()), 4, 1),
1522 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1523 ];
1524
1525 let id = ShardId::new();
1526 let client = new_test_client(&dyncfgs).await;
1527
1528 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1529
1530 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1531
1532 write1
1534 .expect_append(&data[..2], write1.upper().clone(), vec![3])
1535 .await;
1536 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1537
1538 write2.upper = Antichain::from_elem(3);
1540 write2
1541 .expect_append(&data[2..4], write2.upper().clone(), vec![5])
1542 .await;
1543 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1544
1545 write1.upper = Antichain::from_elem(5);
1547 write1
1548 .expect_append(&data[4..5], write1.upper().clone(), vec![6])
1549 .await;
1550 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1551
1552 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1553 }
1554
1555 #[mz_persist_proc::test(tokio::test)]
1558 #[cfg_attr(miri, ignore)] async fn contiguous_compare_and_append(dyncfgs: ConfigUpdates) {
1560 let data = vec![
1561 (("1".to_owned(), "one".to_owned()), 1, 1),
1562 (("2".to_owned(), "two".to_owned()), 2, 1),
1563 (("3".to_owned(), "three".to_owned()), 3, 1),
1564 (("4".to_owned(), "vier".to_owned()), 4, 1),
1565 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1566 ];
1567
1568 let id = ShardId::new();
1569 let client = new_test_client(&dyncfgs).await;
1570
1571 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1572
1573 write.expect_compare_and_append(&data[..2], 0, 3).await;
1575 assert_eq!(write.upper(), &Antichain::from_elem(3));
1576
1577 let result = write
1580 .compare_and_append(
1581 &data[4..5],
1582 Antichain::from_elem(5),
1583 Antichain::from_elem(6),
1584 )
1585 .await;
1586 assert_eq!(
1587 result,
1588 Ok(Err(UpperMismatch {
1589 expected: Antichain::from_elem(5),
1590 current: Antichain::from_elem(3)
1591 }))
1592 );
1593
1594 write.expect_compare_and_append(&data[2..5], 3, 6).await;
1597 assert_eq!(write.upper(), &Antichain::from_elem(6));
1598
1599 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1600 }
1601
1602 #[mz_persist_proc::test(tokio::test)]
1605 #[cfg_attr(miri, ignore)] async fn noncontiguous_compare_and_append_per_writer(dyncfgs: ConfigUpdates) {
1607 let data = vec![
1608 (("1".to_owned(), "one".to_owned()), 1, 1),
1609 (("2".to_owned(), "two".to_owned()), 2, 1),
1610 (("3".to_owned(), "three".to_owned()), 3, 1),
1611 (("4".to_owned(), "vier".to_owned()), 4, 1),
1612 (("5".to_owned(), "cinque".to_owned()), 5, 1),
1613 ];
1614
1615 let id = ShardId::new();
1616 let client = new_test_client(&dyncfgs).await;
1617
1618 let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1619
1620 let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1621
1622 write1.expect_compare_and_append(&data[..2], 0, 3).await;
1624 assert_eq!(write1.upper(), &Antichain::from_elem(3));
1625
1626 write2.expect_compare_and_append(&data[2..4], 3, 5).await;
1628 assert_eq!(write2.upper(), &Antichain::from_elem(5));
1629
1630 write1.expect_compare_and_append(&data[4..5], 5, 6).await;
1632 assert_eq!(write1.upper(), &Antichain::from_elem(6));
1633
1634 assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1635 }
1636
1637 #[mz_ore::test]
1638 fn fmt_ids() {
1639 assert_eq!(
1640 format!("{}", LeasedReaderId([0u8; 16])),
1641 "r00000000-0000-0000-0000-000000000000"
1642 );
1643 assert_eq!(
1644 format!("{:?}", LeasedReaderId([0u8; 16])),
1645 "LeasedReaderId(00000000-0000-0000-0000-000000000000)"
1646 );
1647 }
1648
1649 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
1650 #[cfg_attr(miri, ignore)] async fn concurrency(dyncfgs: ConfigUpdates) {
1652 let data = DataGenerator::small();
1653
1654 const NUM_WRITERS: usize = 2;
1655 let id = ShardId::new();
1656 let client = new_test_client(&dyncfgs).await;
1657 let mut handles = Vec::<mz_ore::task::JoinHandle<()>>::new();
1658 for idx in 0..NUM_WRITERS {
1659 let (data, client) = (data.clone(), client.clone());
1660
1661 let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel(1);
1662
1663 let client1 = client.clone();
1664 let handle = mz_ore::task::spawn(|| format!("writer-{}", idx), async move {
1665 let (write, _) = client1.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1666 let mut current_upper = 0;
1667 for batch in data.batches() {
1668 let new_upper = match batch.get(batch.len() - 1) {
1669 Some((_, max_ts, _)) => u64::decode(max_ts) + 1,
1670 None => continue,
1671 };
1672 if PartialOrder::less_equal(&Antichain::from_elem(new_upper), write.upper()) {
1687 continue;
1688 }
1689
1690 let current_upper_chain = Antichain::from_elem(current_upper);
1691 current_upper = new_upper;
1692 let new_upper_chain = Antichain::from_elem(new_upper);
1693 let mut builder = write.builder(current_upper_chain);
1694
1695 for ((k, v), t, d) in batch.iter() {
1696 builder
1697 .add(&k.to_vec(), &v.to_vec(), &u64::decode(t), &i64::decode(d))
1698 .await
1699 .expect("invalid usage");
1700 }
1701
1702 let batch = builder
1703 .finish(new_upper_chain)
1704 .await
1705 .expect("invalid usage");
1706
1707 match batch_tx.send(batch).await {
1708 Ok(_) => (),
1709 Err(e) => panic!("send error: {}", e),
1710 }
1711 }
1712 });
1713 handles.push(handle);
1714
1715 let handle = mz_ore::task::spawn(|| format!("appender-{}", idx), async move {
1716 let (mut write, _) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1717
1718 while let Some(batch) = batch_rx.recv().await {
1719 let lower = batch.lower().clone();
1720 let upper = batch.upper().clone();
1721 write
1722 .append_batch(batch, lower, upper)
1723 .await
1724 .expect("invalid usage")
1725 .expect("unexpected upper");
1726 }
1727 });
1728 handles.push(handle);
1729 }
1730
1731 for handle in handles {
1732 let () = handle.await.expect("task failed");
1733 }
1734
1735 let expected = data.records().collect::<Vec<_>>();
1736 let max_ts = expected.last().map(|(_, t, _)| *t).unwrap_or_default();
1737 let (_, mut read) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1738 assert_eq!(
1739 read.expect_snapshot_and_fetch(max_ts).await,
1740 all_ok(expected.iter(), max_ts)
1741 );
1742 }
1743
1744 #[mz_persist_proc::test(tokio::test)]
1748 #[cfg_attr(miri, ignore)] async fn regression_blocking_reads(dyncfgs: ConfigUpdates) {
1750 let waker = noop_waker();
1751 let mut cx = Context::from_waker(&waker);
1752
1753 let data = [
1754 (("1".to_owned(), "one".to_owned()), 1, 1),
1755 (("2".to_owned(), "two".to_owned()), 2, 1),
1756 (("3".to_owned(), "three".to_owned()), 3, 1),
1757 ];
1758
1759 let id = ShardId::new();
1760 let client = new_test_client(&dyncfgs).await;
1761 let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1762
1763 let mut listen = read.clone("").await.expect_listen(1).await;
1765 let mut listen_next = Box::pin(listen.fetch_next());
1766 for _ in 0..100 {
1770 assert!(
1771 Pin::new(&mut listen_next).poll(&mut cx).is_pending(),
1772 "listen::next unexpectedly ready"
1773 );
1774 }
1775
1776 write
1778 .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1779 .await;
1780
1781 assert_eq!(
1784 listen_next.await,
1785 vec![
1786 ListenEvent::Updates(vec![((Ok("2".to_owned()), Ok("two".to_owned())), 2, 1)]),
1787 ListenEvent::Progress(Antichain::from_elem(3)),
1788 ]
1789 );
1790
1791 let mut snap = Box::pin(read.expect_snapshot_and_fetch(3));
1805 for _ in 0..100 {
1806 assert!(
1807 Pin::new(&mut snap).poll(&mut cx).is_pending(),
1808 "snapshot unexpectedly ready"
1809 );
1810 }
1811
1812 write.expect_compare_and_append(&data[2..], 3, 4).await;
1814
1815 assert_eq!(snap.await, all_ok(&data[..], 3));
1817 }
1818
1819 #[mz_persist_proc::test(tokio::test)]
1820 #[cfg_attr(miri, ignore)] async fn heartbeat_task_shutdown(dyncfgs: ConfigUpdates) {
1822 let mut cache = new_test_client_cache(&dyncfgs);
1825 cache
1826 .cfg
1827 .set_config(&READER_LEASE_DURATION, Duration::from_millis(1));
1828 cache.cfg.writer_lease_duration = Duration::from_millis(1);
1829 let (_write, mut read) = cache
1830 .open(PersistLocation::new_in_mem())
1831 .await
1832 .expect("client construction failed")
1833 .expect_open::<(), (), u64, i64>(ShardId::new())
1834 .await;
1835 let mut read_unexpired_state = read
1836 .unexpired_state
1837 .take()
1838 .expect("handle should have unexpired state");
1839 read.expire().await;
1840 for read_heartbeat_task in mem::take(&mut read_unexpired_state._heartbeat_tasks) {
1841 let () = read_heartbeat_task
1842 .await
1843 .expect("task should shutdown cleanly");
1844 }
1845 }
1846
1847 #[mz_persist_proc::test(tokio::test)]
1850 #[cfg_attr(miri, ignore)] async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
1852 const EMPTY: &[(((), ()), u64, i64)] = &[];
1853 let persist_client = new_test_client(&dyncfgs).await;
1854
1855 let shard_id = ShardId::new();
1856 pub const CRITICAL_SINCE: CriticalReaderId =
1857 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
1858
1859 let (mut write, mut read) = persist_client
1860 .expect_open::<(), (), u64, i64>(shard_id)
1861 .await;
1862
1863 let () = read.downgrade_since(&Antichain::new()).await;
1866 let () = write
1867 .compare_and_append(EMPTY, Antichain::from_elem(0), Antichain::new())
1868 .await
1869 .expect("usage should be valid")
1870 .expect("upper should match");
1871
1872 let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
1873 .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
1874 .await
1875 .expect("invalid persist usage");
1876
1877 let epoch = since_handle.opaque().clone();
1878 let new_since = Antichain::new();
1879 let downgrade = since_handle
1880 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
1881 .await;
1882
1883 assert!(
1884 downgrade.is_ok(),
1885 "downgrade of critical handle must succeed"
1886 );
1887
1888 let finalize = persist_client
1889 .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
1890 .await;
1891
1892 assert_ok!(finalize, "finalization must succeed");
1893
1894 let is_finalized = persist_client
1895 .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
1896 .await
1897 .expect("invalid persist usage");
1898 assert!(is_finalized, "shard must still be finalized");
1899 }
1900
1901 #[mz_persist_proc::test(tokio::test)]
1905 #[cfg_attr(miri, ignore)] async fn finalize_shard(dyncfgs: ConfigUpdates) {
1907 const EMPTY: &[(((), ()), u64, i64)] = &[];
1908 const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
1909 let persist_client = new_test_client(&dyncfgs).await;
1910
1911 let shard_id = ShardId::new();
1912 pub const CRITICAL_SINCE: CriticalReaderId =
1913 CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
1914
1915 let (mut write, mut read) = persist_client
1916 .expect_open::<(), (), u64, i64>(shard_id)
1917 .await;
1918
1919 let () = write
1921 .compare_and_append(DATA, Antichain::from_elem(0), Antichain::from_elem(1))
1922 .await
1923 .expect("usage should be valid")
1924 .expect("upper should match");
1925
1926 let () = read.downgrade_since(&Antichain::new()).await;
1929 let () = write
1930 .compare_and_append(EMPTY, Antichain::from_elem(1), Antichain::new())
1931 .await
1932 .expect("usage should be valid")
1933 .expect("upper should match");
1934
1935 let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
1936 .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
1937 .await
1938 .expect("invalid persist usage");
1939
1940 let epoch = since_handle.opaque().clone();
1941 let new_since = Antichain::new();
1942 let downgrade = since_handle
1943 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
1944 .await;
1945
1946 assert!(
1947 downgrade.is_ok(),
1948 "downgrade of critical handle must succeed"
1949 );
1950
1951 let finalize = persist_client
1952 .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
1953 .await;
1954
1955 assert_ok!(finalize, "finalization must succeed");
1956
1957 let is_finalized = persist_client
1958 .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
1959 .await
1960 .expect("invalid persist usage");
1961 assert!(is_finalized, "shard must still be finalized");
1962 }
1963
1964 proptest! {
1965 #![proptest_config(ProptestConfig::with_cases(4096))]
1966
1967 #[mz_ore::test]
1968 #[cfg_attr(miri, ignore)] fn shard_id_protobuf_roundtrip(expect in any::<ShardId>() ) {
1970 let actual = protobuf_roundtrip::<_, String>(&expect);
1971 assert_ok!(actual);
1972 assert_eq!(actual.unwrap(), expect);
1973 }
1974 }
1975}