1use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::difference::Semigroup;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::Description;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_dyncfg::Config;
22use mz_ore::instrument;
23use mz_ore::task::RuntimeExt;
24use mz_persist::location::Blob;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::{Codec, Codec64};
27use mz_proto::{IntoRustIfSome, ProtoType};
28use proptest_derive::Arbitrary;
29use semver::Version;
30use serde::{Deserialize, Serialize};
31use timely::PartialOrder;
32use timely::progress::{Antichain, Timestamp};
33use tokio::runtime::Handle;
34use tracing::{Instrument, debug_span, info, warn};
35use uuid::Uuid;
36
37use crate::batch::{
38 Added, BATCH_DELETE_ENABLED, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
39 BatchParts, ProtoBatch, validate_truncate_batch,
40};
41use crate::error::{InvalidUsage, UpperMismatch};
42use crate::fetch::{EncodedPart, FetchBatchFilter, FetchedPart, PartDecodeFormat};
43use crate::internal::compact::{CompactConfig, Compactor};
44use crate::internal::encoding::{Schemas, check_data_version};
45use crate::internal::machine::{CompareAndAppendRes, ExpireFn, Machine};
46use crate::internal::metrics::Metrics;
47use crate::internal::state::{BatchPart, HandleDebugState, HollowBatch, RunOrder, RunPart};
48use crate::read::ReadHandle;
49use crate::schema::PartMigration;
50use crate::{GarbageCollector, IsolatedRuntime, PersistConfig, ShardId, parse_id};
51
52pub(crate) const COMBINE_INLINE_WRITES: Config<bool> = Config::new(
53 "persist_write_combine_inline_writes",
54 true,
55 "If set, re-encode inline writes if they don't fit into the batch metadata limits.",
56);
57
58#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
60#[serde(try_from = "String", into = "String")]
61pub struct WriterId(pub(crate) [u8; 16]);
62
63impl std::fmt::Display for WriterId {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(f, "w{}", Uuid::from_bytes(self.0))
66 }
67}
68
69impl std::fmt::Debug for WriterId {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 write!(f, "WriterId({})", Uuid::from_bytes(self.0))
72 }
73}
74
75impl std::str::FromStr for WriterId {
76 type Err = String;
77
78 fn from_str(s: &str) -> Result<Self, Self::Err> {
79 parse_id('w', "WriterId", s).map(WriterId)
80 }
81}
82
83impl From<WriterId> for String {
84 fn from(writer_id: WriterId) -> Self {
85 writer_id.to_string()
86 }
87}
88
89impl TryFrom<String> for WriterId {
90 type Error = String;
91
92 fn try_from(s: String) -> Result<Self, Self::Error> {
93 s.parse()
94 }
95}
96
97impl WriterId {
98 pub(crate) fn new() -> Self {
99 WriterId(*Uuid::new_v4().as_bytes())
100 }
101}
102
103#[derive(Debug)]
119pub struct WriteHandle<K: Codec, V: Codec, T, D> {
120 pub(crate) cfg: PersistConfig,
121 pub(crate) metrics: Arc<Metrics>,
122 pub(crate) machine: Machine<K, V, T, D>,
123 pub(crate) gc: GarbageCollector<K, V, T, D>,
124 pub(crate) compact: Option<Compactor<K, V, T, D>>,
125 pub(crate) blob: Arc<dyn Blob>,
126 pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
127 pub(crate) writer_id: WriterId,
128 pub(crate) debug_state: HandleDebugState,
129 pub(crate) write_schemas: Schemas<K, V>,
130
131 pub(crate) upper: Antichain<T>,
132 expire_fn: Option<ExpireFn>,
133}
134
135impl<K, V, T, D> WriteHandle<K, V, T, D>
136where
137 K: Debug + Codec,
138 V: Debug + Codec,
139 T: Timestamp + Lattice + Codec64 + Sync,
140 D: Semigroup + Ord + Codec64 + Send + Sync,
141{
142 pub(crate) fn new(
143 cfg: PersistConfig,
144 metrics: Arc<Metrics>,
145 machine: Machine<K, V, T, D>,
146 gc: GarbageCollector<K, V, T, D>,
147 blob: Arc<dyn Blob>,
148 writer_id: WriterId,
149 purpose: &str,
150 write_schemas: Schemas<K, V>,
151 ) -> Self {
152 let isolated_runtime = Arc::clone(&machine.isolated_runtime);
153 let compact = cfg.compaction_enabled.then(|| {
154 Compactor::new(
155 cfg.clone(),
156 Arc::clone(&metrics),
157 write_schemas.clone(),
158 gc.clone(),
159 )
160 });
161 let debug_state = HandleDebugState {
162 hostname: cfg.hostname.to_owned(),
163 purpose: purpose.to_owned(),
164 };
165 let upper = machine.applier.clone_upper();
166 let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), writer_id.clone());
167 WriteHandle {
168 cfg,
169 metrics,
170 machine,
171 gc,
172 compact,
173 blob,
174 isolated_runtime,
175 writer_id,
176 debug_state,
177 write_schemas,
178 upper,
179 expire_fn: Some(expire_fn),
180 }
181 }
182
183 pub fn from_read(read: &ReadHandle<K, V, T, D>, purpose: &str) -> Self {
186 Self::new(
187 read.cfg.clone(),
188 Arc::clone(&read.metrics),
189 read.machine.clone(),
190 read.gc.clone(),
191 Arc::clone(&read.blob),
192 WriterId::new(),
193 purpose,
194 read.read_schemas.clone(),
195 )
196 }
197
198 pub fn shard_id(&self) -> ShardId {
200 self.machine.shard_id()
201 }
202
203 pub fn schema_id(&self) -> Option<SchemaId> {
205 self.write_schemas.id
206 }
207
208 pub fn upper(&self) -> &Antichain<T> {
215 &self.upper
216 }
217
218 pub fn shared_upper(&self) -> Antichain<T> {
224 self.machine.applier.clone_upper()
225 }
226
227 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
233 pub async fn fetch_recent_upper(&mut self) -> &Antichain<T> {
234 self.machine
237 .applier
238 .fetch_upper(|current_upper| self.upper.clone_from(current_upper))
239 .await;
240 &self.upper
241 }
242
243 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
273 pub async fn append<SB, KB, VB, TB, DB, I>(
274 &mut self,
275 updates: I,
276 lower: Antichain<T>,
277 upper: Antichain<T>,
278 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
279 where
280 SB: Borrow<((KB, VB), TB, DB)>,
281 KB: Borrow<K>,
282 VB: Borrow<V>,
283 TB: Borrow<T>,
284 DB: Borrow<D>,
285 I: IntoIterator<Item = SB>,
286 D: Send + Sync,
287 {
288 let batch = self.batch(updates, lower.clone(), upper.clone()).await?;
289 self.append_batch(batch, lower, upper).await
290 }
291
292 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
321 pub async fn compare_and_append<SB, KB, VB, TB, DB, I>(
322 &mut self,
323 updates: I,
324 expected_upper: Antichain<T>,
325 new_upper: Antichain<T>,
326 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
327 where
328 SB: Borrow<((KB, VB), TB, DB)>,
329 KB: Borrow<K>,
330 VB: Borrow<V>,
331 TB: Borrow<T>,
332 DB: Borrow<D>,
333 I: IntoIterator<Item = SB>,
334 D: Send + Sync,
335 {
336 let mut batch = self
337 .batch(updates, expected_upper.clone(), new_upper.clone())
338 .await?;
339 match self
340 .compare_and_append_batch(&mut [&mut batch], expected_upper, new_upper)
341 .await
342 {
343 ok @ Ok(Ok(())) => ok,
344 err => {
345 batch.delete().await;
350 err
351 }
352 }
353 }
354
355 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
381 pub async fn append_batch(
382 &mut self,
383 mut batch: Batch<K, V, T, D>,
384 mut lower: Antichain<T>,
385 upper: Antichain<T>,
386 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
387 where
388 D: Send + Sync,
389 {
390 loop {
391 let res = self
392 .compare_and_append_batch(&mut [&mut batch], lower.clone(), upper.clone())
393 .await?;
394 match res {
395 Ok(()) => {
396 self.upper = upper;
397 return Ok(Ok(()));
398 }
399 Err(mismatch) => {
400 if PartialOrder::less_than(&mismatch.current, &lower) {
402 self.upper.clone_from(&mismatch.current);
403
404 batch.delete().await;
405
406 return Ok(Err(mismatch));
407 } else if PartialOrder::less_than(&mismatch.current, &upper) {
408 lower = mismatch.current;
415 } else {
416 self.upper = mismatch.current;
418
419 batch.delete().await;
423
424 return Ok(Ok(()));
425 }
426 }
427 }
428 }
429 }
430
431 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
457 pub async fn compare_and_append_batch(
458 &mut self,
459 batches: &mut [&mut Batch<K, V, T, D>],
460 expected_upper: Antichain<T>,
461 new_upper: Antichain<T>,
462 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
463 where
464 D: Send + Sync,
465 {
466 for batch in batches.iter() {
467 if self.machine.shard_id() != batch.shard_id() {
468 return Err(InvalidUsage::BatchNotFromThisShard {
469 batch_shard: batch.shard_id(),
470 handle_shard: self.machine.shard_id(),
471 });
472 }
473 check_data_version(&self.cfg.build_version, &batch.version);
474 if self.cfg.build_version > batch.version {
475 info!(
476 shard_id =? self.machine.shard_id(),
477 batch_version =? batch.version,
478 writer_version =? self.cfg.build_version,
479 "Appending batch from the past. This is fine but should be rare. \
480 TODO: Error on very old versions once the leaked blob detector exists."
481 )
482 }
483 }
484
485 let lower = expected_upper.clone();
486 let upper = new_upper;
487 let since = Antichain::from_elem(T::minimum());
488 let desc = Description::new(lower, upper, since);
489
490 let mut received_inline_backpressure = false;
491 let mut inline_batch_builder: Option<(_, BatchBuilder<K, V, T, D>)> = None;
498 let maintenance = loop {
499 let any_batch_rewrite = batches
500 .iter()
501 .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some()));
502 let (mut parts, mut num_updates, mut run_splits, mut run_metas) =
503 (vec![], 0, vec![], vec![]);
504 let mut key_storage = None;
505 let mut val_storage = None;
506 for batch in batches.iter() {
507 let () = validate_truncate_batch(&batch.batch, &desc, any_batch_rewrite)?;
508 for (run_meta, run) in batch.batch.runs() {
509 let start_index = parts.len();
510 for part in run {
511 if let (
512 RunPart::Single(
513 batch_part @ BatchPart::Inline {
514 updates,
515 ts_rewrite,
516 schema_id: _,
517 deprecated_schema_id: _,
518 },
519 ),
520 Some((schema_cache, builder)),
521 ) = (part, &mut inline_batch_builder)
522 {
523 let schema_migration = PartMigration::new(
524 batch_part,
525 self.write_schemas.clone(),
526 schema_cache,
527 )
528 .await
529 .expect("schemas for inline user part");
530
531 let encoded_part = EncodedPart::from_inline(
532 &*self.metrics,
533 self.metrics.read.compaction.clone(),
534 desc.clone(),
535 updates,
536 ts_rewrite.as_ref(),
537 );
538 let mut fetched_part = FetchedPart::new(
539 Arc::clone(&self.metrics),
540 encoded_part,
541 schema_migration,
542 FetchBatchFilter::Compaction {
543 since: desc.since().clone(),
544 },
545 false,
546 PartDecodeFormat::Arrow,
547 None,
548 );
549
550 while let Some(((k, v), t, d)) =
551 fetched_part.next_with_storage(&mut key_storage, &mut val_storage)
552 {
553 builder
554 .add(
555 &k.expect("decoded just-encoded key data"),
556 &v.expect("decoded just-encoded val data"),
557 &t,
558 &d,
559 )
560 .await
561 .expect("re-encoding just-decoded data");
562 }
563 } else {
564 parts.push(part.clone())
565 }
566 }
567
568 let end_index = parts.len();
569
570 if start_index == end_index {
571 continue;
572 }
573
574 if start_index != 0 {
576 run_splits.push(start_index);
577 }
578 run_metas.push(run_meta.clone());
579 }
580 num_updates += batch.batch.len;
581 }
582
583 let mut flushed_inline_batch = if let Some((_, builder)) = inline_batch_builder.take() {
584 let mut finished = builder
585 .finish(desc.upper().clone())
586 .await
587 .expect("invalid usage");
588 let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
589 finished
590 .flush_to_blob(
591 &cfg,
592 &self.metrics.inline.backpressure,
593 &self.isolated_runtime,
594 &self.write_schemas,
595 )
596 .await;
597 Some(finished)
598 } else {
599 None
600 };
601
602 if let Some(batch) = &flushed_inline_batch {
603 for (run_meta, run) in batch.batch.runs() {
604 assert!(run.len() > 0);
605 let start_index = parts.len();
606 if start_index != 0 {
607 run_splits.push(start_index);
608 }
609 run_metas.push(run_meta.clone());
610 parts.extend(run.iter().cloned())
611 }
612 }
613
614 let combined_batch =
615 HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits);
616 let heartbeat_timestamp = (self.cfg.now)();
617 let res = self
618 .machine
619 .compare_and_append(
620 &combined_batch,
621 &self.writer_id,
622 &self.debug_state,
623 heartbeat_timestamp,
624 )
625 .await;
626
627 match res {
628 CompareAndAppendRes::Success(_seqno, maintenance) => {
629 self.upper.clone_from(desc.upper());
630 for batch in batches.iter_mut() {
631 batch.mark_consumed();
632 }
633 if let Some(batch) = &mut flushed_inline_batch {
634 batch.mark_consumed();
635 }
636 break maintenance;
637 }
638 CompareAndAppendRes::InvalidUsage(invalid_usage) => {
639 if let Some(batch) = flushed_inline_batch.take() {
640 batch.delete().await;
641 }
642 return Err(invalid_usage);
643 }
644 CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => {
645 if let Some(batch) = flushed_inline_batch.take() {
646 batch.delete().await;
647 }
648 self.upper.clone_from(¤t_upper);
651 return Ok(Err(UpperMismatch {
652 current: current_upper,
653 expected: expected_upper,
654 }));
655 }
656 CompareAndAppendRes::InlineBackpressure => {
657 assert_eq!(received_inline_backpressure, false);
660 received_inline_backpressure = true;
661 if COMBINE_INLINE_WRITES.get(&self.cfg) {
662 inline_batch_builder = Some((
663 self.machine.applier.schema_cache(),
664 self.builder(desc.lower().clone()),
665 ));
666 continue;
667 }
668
669 let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
670 let flush_batches = batches
673 .iter_mut()
674 .map(|batch| async {
675 batch
676 .flush_to_blob(
677 &cfg,
678 &self.metrics.inline.backpressure,
679 &self.isolated_runtime,
680 &self.write_schemas,
681 )
682 .await
683 })
684 .collect::<FuturesUnordered<_>>();
685 let () = flush_batches.collect::<()>().await;
686
687 for batch in batches.iter() {
688 assert_eq!(batch.batch.inline_bytes(), 0);
689 }
690
691 continue;
692 }
693 }
694 };
695
696 maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
697
698 Ok(Ok(()))
699 }
700
701 pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> {
704 let shard_id: ShardId = batch
705 .shard_id
706 .into_rust()
707 .expect("valid transmittable batch");
708 assert_eq!(shard_id, self.machine.shard_id());
709
710 let ret = Batch {
711 batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
712 metrics: Arc::clone(&self.metrics),
713 shard_metrics: Arc::clone(&self.machine.applier.shard_metrics),
714 version: Version::parse(&batch.version).expect("valid transmittable batch"),
715 batch: batch
716 .batch
717 .into_rust_if_some("ProtoBatch::batch")
718 .expect("valid transmittable batch"),
719 blob: Arc::clone(&self.blob),
720 _phantom: std::marker::PhantomData,
721 };
722 assert_eq!(ret.shard_id(), self.machine.shard_id());
723 ret
724 }
725
726 pub fn builder(&self, lower: Antichain<T>) -> BatchBuilder<K, V, T, D> {
739 let cfg = CompactConfig::new(&self.cfg, self.shard_id());
740 let parts = if let Some(max_runs) = cfg.batch.max_runs {
741 BatchParts::new_compacting::<K, V, D>(
742 cfg,
743 Description::new(
744 lower.clone(),
745 Antichain::new(),
746 Antichain::from_elem(T::minimum()),
747 ),
748 max_runs,
749 Arc::clone(&self.metrics),
750 Arc::clone(&self.machine.applier.shard_metrics),
751 self.shard_id(),
752 Arc::clone(&self.blob),
753 Arc::clone(&self.isolated_runtime),
754 &self.metrics.user,
755 self.write_schemas.clone(),
756 )
757 } else {
758 BatchParts::new_ordered(
759 cfg.batch,
760 RunOrder::Unordered,
761 Arc::clone(&self.metrics),
762 Arc::clone(&self.machine.applier.shard_metrics),
763 self.shard_id(),
764 Arc::clone(&self.blob),
765 Arc::clone(&self.isolated_runtime),
766 &self.metrics.user,
767 )
768 };
769 let builder = BatchBuilderInternal::new(
770 BatchBuilderConfig::new(&self.cfg, self.shard_id()),
771 parts,
772 Arc::clone(&self.metrics),
773 self.write_schemas.clone(),
774 Arc::clone(&self.blob),
775 self.machine.shard_id().clone(),
776 self.cfg.build_version.clone(),
777 );
778 BatchBuilder::new(
779 builder,
780 Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())),
781 )
782 }
783
784 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
787 pub async fn batch<SB, KB, VB, TB, DB, I>(
788 &mut self,
789 updates: I,
790 lower: Antichain<T>,
791 upper: Antichain<T>,
792 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>>
793 where
794 SB: Borrow<((KB, VB), TB, DB)>,
795 KB: Borrow<K>,
796 VB: Borrow<V>,
797 TB: Borrow<T>,
798 DB: Borrow<D>,
799 I: IntoIterator<Item = SB>,
800 {
801 let iter = updates.into_iter();
802
803 let mut builder = self.builder(lower.clone());
804
805 for update in iter {
806 let ((k, v), t, d) = update.borrow();
807 let (k, v, t, d) = (k.borrow(), v.borrow(), t.borrow(), d.borrow());
808 match builder.add(k, v, t, d).await {
809 Ok(Added::Record | Added::RecordAndParts) => (),
810 Err(invalid_usage) => return Err(invalid_usage),
811 }
812 }
813
814 builder.finish(upper.clone()).await
815 }
816
817 pub async fn wait_for_upper_past(&mut self, frontier: &Antichain<T>) {
819 let mut watch = self.machine.applier.watch();
820 let batch = self
821 .machine
822 .next_listen_batch(frontier, &mut watch, None, None)
823 .await;
824 if PartialOrder::less_than(&self.upper, batch.desc.upper()) {
825 self.upper.clone_from(batch.desc.upper());
826 }
827 assert!(PartialOrder::less_than(frontier, &self.upper));
828 }
829
830 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
839 pub async fn expire(mut self) {
840 let Some(expire_fn) = self.expire_fn.take() else {
841 return;
842 };
843 expire_fn.0().await;
844 }
845
846 fn expire_fn(
847 machine: Machine<K, V, T, D>,
848 gc: GarbageCollector<K, V, T, D>,
849 writer_id: WriterId,
850 ) -> ExpireFn {
851 ExpireFn(Box::new(move || {
852 Box::pin(async move {
853 let (_, maintenance) = machine.expire_writer(&writer_id).await;
854 maintenance.start_performing(&machine, &gc);
855 })
856 }))
857 }
858
859 #[cfg(test)]
861 #[track_caller]
862 pub async fn expect_append<L, U>(&mut self, updates: &[((K, V), T, D)], lower: L, new_upper: U)
863 where
864 L: Into<Antichain<T>>,
865 U: Into<Antichain<T>>,
866 D: Send + Sync,
867 {
868 self.append(updates.iter(), lower.into(), new_upper.into())
869 .await
870 .expect("invalid usage")
871 .expect("unexpected upper");
872 }
873
874 #[cfg(test)]
877 #[track_caller]
878 pub async fn expect_compare_and_append(
879 &mut self,
880 updates: &[((K, V), T, D)],
881 expected_upper: T,
882 new_upper: T,
883 ) where
884 D: Send + Sync,
885 {
886 self.compare_and_append(
887 updates.iter().map(|((k, v), t, d)| ((k, v), t, d)),
888 Antichain::from_elem(expected_upper),
889 Antichain::from_elem(new_upper),
890 )
891 .await
892 .expect("invalid usage")
893 .expect("unexpected upper")
894 }
895
896 #[cfg(test)]
899 #[track_caller]
900 pub async fn expect_compare_and_append_batch(
901 &mut self,
902 batches: &mut [&mut Batch<K, V, T, D>],
903 expected_upper: T,
904 new_upper: T,
905 ) {
906 self.compare_and_append_batch(
907 batches,
908 Antichain::from_elem(expected_upper),
909 Antichain::from_elem(new_upper),
910 )
911 .await
912 .expect("invalid usage")
913 .expect("unexpected upper")
914 }
915
916 #[cfg(test)]
918 #[track_caller]
919 pub async fn expect_batch(
920 &mut self,
921 updates: &[((K, V), T, D)],
922 lower: T,
923 upper: T,
924 ) -> Batch<K, V, T, D> {
925 self.batch(
926 updates.iter(),
927 Antichain::from_elem(lower),
928 Antichain::from_elem(upper),
929 )
930 .await
931 .expect("invalid usage")
932 }
933}
934
935impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D> {
936 fn drop(&mut self) {
937 let Some(expire_fn) = self.expire_fn.take() else {
938 return;
939 };
940 let handle = match Handle::try_current() {
941 Ok(x) => x,
942 Err(_) => {
943 warn!(
944 "WriteHandle {} dropped without being explicitly expired, falling back to lease timeout",
945 self.writer_id
946 );
947 return;
948 }
949 };
950 let expire_span = debug_span!("drop::expire");
956 handle.spawn_named(
957 || format!("WriteHandle::expire ({})", self.writer_id),
958 expire_fn.0().instrument(expire_span),
959 );
960 }
961}
962
963#[cfg(test)]
964mod tests {
965 use std::str::FromStr;
966 use std::sync::mpsc;
967
968 use differential_dataflow::consolidation::consolidate_updates;
969 use futures_util::FutureExt;
970 use mz_dyncfg::ConfigUpdates;
971 use mz_ore::collections::CollectionExt;
972 use mz_ore::task;
973 use serde_json::json;
974
975 use crate::cache::PersistClientCache;
976 use crate::tests::{all_ok, new_test_client};
977 use crate::{PersistLocation, ShardId};
978
979 use super::*;
980
981 #[mz_persist_proc::test(tokio::test)]
982 #[cfg_attr(miri, ignore)] async fn empty_batches(dyncfgs: ConfigUpdates) {
984 let data = [
985 (("1".to_owned(), "one".to_owned()), 1, 1),
986 (("2".to_owned(), "two".to_owned()), 2, 1),
987 (("3".to_owned(), "three".to_owned()), 3, 1),
988 ];
989
990 let (mut write, _) = new_test_client(&dyncfgs)
991 .await
992 .expect_open::<String, String, u64, i64>(ShardId::new())
993 .await;
994 let blob = Arc::clone(&write.blob);
995
996 let mut upper = 3;
998 write.expect_append(&data[..2], vec![0], vec![upper]).await;
999
1000 let mut count_before = 0;
1002 blob.list_keys_and_metadata("", &mut |_| {
1003 count_before += 1;
1004 })
1005 .await
1006 .expect("list_keys failed");
1007 for _ in 0..5 {
1008 let new_upper = upper + 1;
1009 write.expect_compare_and_append(&[], upper, new_upper).await;
1010 upper = new_upper;
1011 }
1012 let mut count_after = 0;
1013 blob.list_keys_and_metadata("", &mut |_| {
1014 count_after += 1;
1015 })
1016 .await
1017 .expect("list_keys failed");
1018 assert_eq!(count_after, count_before);
1019 }
1020
1021 #[mz_persist_proc::test(tokio::test)]
1022 #[cfg_attr(miri, ignore)] async fn compare_and_append_batch_multi(dyncfgs: ConfigUpdates) {
1024 let data0 = vec![
1025 (("1".to_owned(), "one".to_owned()), 1, 1),
1026 (("2".to_owned(), "two".to_owned()), 2, 1),
1027 (("4".to_owned(), "four".to_owned()), 4, 1),
1028 ];
1029 let data1 = vec![
1030 (("1".to_owned(), "one".to_owned()), 1, 1),
1031 (("2".to_owned(), "two".to_owned()), 2, 1),
1032 (("3".to_owned(), "three".to_owned()), 3, 1),
1033 ];
1034
1035 let (mut write, mut read) = new_test_client(&dyncfgs)
1036 .await
1037 .expect_open::<String, String, u64, i64>(ShardId::new())
1038 .await;
1039
1040 let mut batch0 = write.expect_batch(&data0, 0, 5).await;
1041 let mut batch1 = write.expect_batch(&data1, 0, 4).await;
1042
1043 write
1044 .expect_compare_and_append_batch(&mut [&mut batch0, &mut batch1], 0, 4)
1045 .await;
1046
1047 let batch = write
1048 .machine
1049 .snapshot(&Antichain::from_elem(3))
1050 .await
1051 .expect("just wrote this")
1052 .into_element();
1053
1054 assert!(batch.runs().count() >= 2);
1055
1056 let expected = vec![
1057 (("1".to_owned(), "one".to_owned()), 1, 2),
1058 (("2".to_owned(), "two".to_owned()), 2, 2),
1059 (("3".to_owned(), "three".to_owned()), 3, 1),
1060 ];
1061 let mut actual = read.expect_snapshot_and_fetch(3).await;
1062 consolidate_updates(&mut actual);
1063 assert_eq!(actual, all_ok(&expected, 3));
1064 }
1065
1066 #[mz_ore::test]
1067 fn writer_id_human_readable_serde() {
1068 #[derive(Debug, Serialize, Deserialize)]
1069 struct Container {
1070 writer_id: WriterId,
1071 }
1072
1073 let id = WriterId::from_str("w00000000-1234-5678-0000-000000000000").expect("valid id");
1075 assert_eq!(
1076 id,
1077 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1078 .expect("deserializable")
1079 );
1080
1081 assert_eq!(
1083 id,
1084 serde_json::from_str("\"w00000000-1234-5678-0000-000000000000\"")
1085 .expect("deserializable")
1086 );
1087
1088 let json = json!({ "writer_id": id });
1090 assert_eq!(
1091 "{\"writer_id\":\"w00000000-1234-5678-0000-000000000000\"}",
1092 &json.to_string()
1093 );
1094 let container: Container = serde_json::from_value(json).expect("deserializable");
1095 assert_eq!(container.writer_id, id);
1096 }
1097
1098 #[mz_persist_proc::test(tokio::test)]
1099 #[cfg_attr(miri, ignore)] async fn hollow_batch_roundtrip(dyncfgs: ConfigUpdates) {
1101 let data = vec![
1102 (("1".to_owned(), "one".to_owned()), 1, 1),
1103 (("2".to_owned(), "two".to_owned()), 2, 1),
1104 (("3".to_owned(), "three".to_owned()), 3, 1),
1105 ];
1106
1107 let (mut write, mut read) = new_test_client(&dyncfgs)
1108 .await
1109 .expect_open::<String, String, u64, i64>(ShardId::new())
1110 .await;
1111
1112 let batch = write.expect_batch(&data, 0, 4).await;
1117 let hollow_batch = batch.into_transmittable_batch();
1118 let mut rehydrated_batch = write.batch_from_transmittable_batch(hollow_batch);
1119
1120 write
1121 .expect_compare_and_append_batch(&mut [&mut rehydrated_batch], 0, 4)
1122 .await;
1123
1124 let expected = vec![
1125 (("1".to_owned(), "one".to_owned()), 1, 1),
1126 (("2".to_owned(), "two".to_owned()), 2, 1),
1127 (("3".to_owned(), "three".to_owned()), 3, 1),
1128 ];
1129 let mut actual = read.expect_snapshot_and_fetch(3).await;
1130 consolidate_updates(&mut actual);
1131 assert_eq!(actual, all_ok(&expected, 3));
1132 }
1133
1134 #[mz_persist_proc::test(tokio::test)]
1135 #[cfg_attr(miri, ignore)] async fn wait_for_upper_past(dyncfgs: ConfigUpdates) {
1137 let client = new_test_client(&dyncfgs).await;
1138 let (mut write, _) = client.expect_open::<(), (), u64, i64>(ShardId::new()).await;
1139 let five = Antichain::from_elem(5);
1140
1141 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1143
1144 write
1146 .expect_compare_and_append(&[(((), ()), 1, 1)], 0, 5)
1147 .await;
1148 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1149
1150 write
1152 .expect_compare_and_append(&[(((), ()), 5, 1)], 5, 7)
1153 .await;
1154 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), Some(()));
1155 assert_eq!(write.upper(), &Antichain::from_elem(7));
1156
1157 assert_eq!(
1160 write
1161 .wait_for_upper_past(&Antichain::from_elem(2))
1162 .now_or_never(),
1163 Some(())
1164 );
1165 assert_eq!(write.upper(), &Antichain::from_elem(7));
1166 }
1167
1168 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1169 #[cfg_attr(miri, ignore)] async fn fetch_recent_upper_linearized() {
1171 type Timestamp = u64;
1172 let max_upper = 1000;
1173
1174 let shard_id = ShardId::new();
1175 let mut clients = PersistClientCache::new_no_metrics();
1176 let upper_writer_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1177 let (mut upper_writer, _) = upper_writer_client
1178 .expect_open::<(), (), Timestamp, i64>(shard_id)
1179 .await;
1180 clients.clear_state_cache();
1183 let upper_reader_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1184 let (mut upper_reader, _) = upper_reader_client
1185 .expect_open::<(), (), Timestamp, i64>(shard_id)
1186 .await;
1187 let (tx, rx) = mpsc::channel();
1188
1189 let task = task::spawn(|| "upper-reader", async move {
1190 let mut upper = Timestamp::MIN;
1191
1192 while upper < max_upper {
1193 while let Ok(new_upper) = rx.try_recv() {
1194 upper = new_upper;
1195 }
1196
1197 let recent_upper = upper_reader
1198 .fetch_recent_upper()
1199 .await
1200 .as_option()
1201 .cloned()
1202 .expect("u64 is totally ordered and the shard is not finalized");
1203 assert!(
1204 recent_upper >= upper,
1205 "recent upper {recent_upper:?} is less than known upper {upper:?}"
1206 );
1207 }
1208 });
1209
1210 for upper in Timestamp::MIN..max_upper {
1211 let next_upper = upper + 1;
1212 upper_writer
1213 .expect_compare_and_append(&[], upper, next_upper)
1214 .await;
1215 tx.send(next_upper).expect("send failed");
1216 }
1217
1218 task.await.expect("await failed");
1219 }
1220}