1use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::Description;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_dyncfg::Config;
22use mz_ore::instrument;
23use mz_ore::task::RuntimeExt;
24use mz_persist::location::Blob;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::{Codec, Codec64};
27use mz_proto::{IntoRustIfSome, ProtoType};
28use proptest_derive::Arbitrary;
29use semver::Version;
30use serde::{Deserialize, Serialize};
31use timely::PartialOrder;
32use timely::order::TotalOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::runtime::Handle;
35use tracing::{Instrument, debug_span, info, warn};
36use uuid::Uuid;
37
38use crate::batch::{
39 Added, BATCH_DELETE_ENABLED, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
40 BatchParts, ProtoBatch, validate_truncate_batch,
41};
42use crate::error::{InvalidUsage, UpperMismatch};
43use crate::fetch::{
44 EncodedPart, FetchBatchFilter, FetchedPart, PartDecodeFormat, VALIDATE_PART_BOUNDS_ON_READ,
45};
46use crate::internal::compact::{CompactConfig, Compactor};
47use crate::internal::encoding::{Schemas, assert_code_can_read_data};
48use crate::internal::machine::{CompareAndAppendRes, ExpireFn, Machine};
49use crate::internal::metrics::{BatchWriteMetrics, Metrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HandleDebugState, HollowBatch, RunOrder, RunPart};
51use crate::read::ReadHandle;
52use crate::schema::PartMigration;
53use crate::{GarbageCollector, IsolatedRuntime, PersistConfig, ShardId, parse_id};
54
55pub(crate) const COMBINE_INLINE_WRITES: Config<bool> = Config::new(
56 "persist_write_combine_inline_writes",
57 true,
58 "If set, re-encode inline writes if they don't fit into the batch metadata limits.",
59);
60
61pub(crate) const VALIDATE_PART_BOUNDS_ON_WRITE: Config<bool> = Config::new(
62 "persist_validate_part_bounds_on_write",
63 true,
64 "Validate the part lower <= the batch lower and the part upper <= batch upper,\
65 for the batch being appended.",
66);
67
68#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
70#[serde(try_from = "String", into = "String")]
71pub struct WriterId(pub(crate) [u8; 16]);
72
73impl std::fmt::Display for WriterId {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 write!(f, "w{}", Uuid::from_bytes(self.0))
76 }
77}
78
79impl std::fmt::Debug for WriterId {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 write!(f, "WriterId({})", Uuid::from_bytes(self.0))
82 }
83}
84
85impl std::str::FromStr for WriterId {
86 type Err = String;
87
88 fn from_str(s: &str) -> Result<Self, Self::Err> {
89 parse_id("w", "WriterId", s).map(WriterId)
90 }
91}
92
93impl From<WriterId> for String {
94 fn from(writer_id: WriterId) -> Self {
95 writer_id.to_string()
96 }
97}
98
99impl TryFrom<String> for WriterId {
100 type Error = String;
101
102 fn try_from(s: String) -> Result<Self, Self::Error> {
103 s.parse()
104 }
105}
106
107impl WriterId {
108 pub(crate) fn new() -> Self {
109 WriterId(*Uuid::new_v4().as_bytes())
110 }
111}
112
113#[derive(Debug)]
129pub struct WriteHandle<K: Codec, V: Codec, T, D> {
130 pub(crate) cfg: PersistConfig,
131 pub(crate) metrics: Arc<Metrics>,
132 pub(crate) machine: Machine<K, V, T, D>,
133 pub(crate) gc: GarbageCollector<K, V, T, D>,
134 pub(crate) compact: Option<Compactor<K, V, T, D>>,
135 pub(crate) blob: Arc<dyn Blob>,
136 pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
137 pub(crate) writer_id: WriterId,
138 pub(crate) debug_state: HandleDebugState,
139 pub(crate) write_schemas: Schemas<K, V>,
140
141 pub(crate) upper: Antichain<T>,
142 expire_fn: Option<ExpireFn>,
143}
144
145impl<K, V, T, D> WriteHandle<K, V, T, D>
146where
147 K: Debug + Codec,
148 V: Debug + Codec,
149 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
150 D: Monoid + Ord + Codec64 + Send + Sync,
151{
152 pub(crate) fn new(
153 cfg: PersistConfig,
154 metrics: Arc<Metrics>,
155 machine: Machine<K, V, T, D>,
156 gc: GarbageCollector<K, V, T, D>,
157 blob: Arc<dyn Blob>,
158 writer_id: WriterId,
159 purpose: &str,
160 write_schemas: Schemas<K, V>,
161 ) -> Self {
162 let isolated_runtime = Arc::clone(&machine.isolated_runtime);
163 let compact = cfg
164 .compaction_enabled
165 .then(|| Compactor::new(cfg.clone(), Arc::clone(&metrics), gc.clone()));
166 let debug_state = HandleDebugState {
167 hostname: cfg.hostname.to_owned(),
168 purpose: purpose.to_owned(),
169 };
170 let upper = machine.applier.clone_upper();
171 let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), writer_id.clone());
172 WriteHandle {
173 cfg,
174 metrics,
175 machine,
176 gc,
177 compact,
178 blob,
179 isolated_runtime,
180 writer_id,
181 debug_state,
182 write_schemas,
183 upper,
184 expire_fn: Some(expire_fn),
185 }
186 }
187
188 pub fn from_read(read: &ReadHandle<K, V, T, D>, purpose: &str) -> Self {
191 Self::new(
192 read.cfg.clone(),
193 Arc::clone(&read.metrics),
194 read.machine.clone(),
195 read.gc.clone(),
196 Arc::clone(&read.blob),
197 WriterId::new(),
198 purpose,
199 read.read_schemas.clone(),
200 )
201 }
202
203 pub fn validate_part_bounds_on_write(&self) -> bool {
206 VALIDATE_PART_BOUNDS_ON_WRITE.get(&self.cfg) && VALIDATE_PART_BOUNDS_ON_READ.get(&self.cfg)
207 }
208
209 pub fn shard_id(&self) -> ShardId {
211 self.machine.shard_id()
212 }
213
214 pub fn schema_id(&self) -> Option<SchemaId> {
216 self.write_schemas.id
217 }
218
219 pub async fn ensure_schema_registered(&mut self) -> SchemaId {
227 let Schemas { id, key, val } = &self.write_schemas;
228
229 if let Some(id) = id {
230 return *id;
231 }
232
233 let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
234 maintenance.start_performing(&self.machine, &self.gc);
235
236 let Some(schema_id) = schema_id else {
237 panic!("unable to register schemas: {key:?} {val:?}");
238 };
239
240 self.write_schemas.id = Some(schema_id);
241 schema_id
242 }
243
244 pub fn upper(&self) -> &Antichain<T> {
251 &self.upper
252 }
253
254 pub fn shared_upper(&self) -> Antichain<T> {
260 self.machine.applier.clone_upper()
261 }
262
263 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
269 pub async fn fetch_recent_upper(&mut self) -> &Antichain<T> {
270 self.machine
273 .applier
274 .fetch_upper(|current_upper| self.upper.clone_from(current_upper))
275 .await;
276 &self.upper
277 }
278
279 pub async fn advance_upper(&mut self, target: &Antichain<T>) {
287 let mut lower = self.shared_upper().clone();
290
291 while !PartialOrder::less_equal(target, &lower) {
292 let since = Antichain::from_elem(T::minimum());
293 let desc = Description::new(lower.clone(), target.clone(), since);
294 let batch = HollowBatch::empty(desc);
295
296 let heartbeat_timestamp = (self.cfg.now)();
297 let res = self
298 .machine
299 .compare_and_append(
300 &batch,
301 &self.writer_id,
302 &self.debug_state,
303 heartbeat_timestamp,
304 )
305 .await;
306
307 use CompareAndAppendRes::*;
308 let new_upper = match res {
309 Success(_seq_no, maintenance) => {
310 maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
311 batch.desc.upper().clone()
312 }
313 UpperMismatch(_seq_no, actual_upper) => actual_upper,
314 InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
315 InlineBackpressure => unreachable!("batch was empty"),
316 };
317
318 self.upper.clone_from(&new_upper);
319 lower = new_upper;
320 }
321 }
322
323 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
353 pub async fn append<SB, KB, VB, TB, DB, I>(
354 &mut self,
355 updates: I,
356 lower: Antichain<T>,
357 upper: Antichain<T>,
358 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
359 where
360 SB: Borrow<((KB, VB), TB, DB)>,
361 KB: Borrow<K>,
362 VB: Borrow<V>,
363 TB: Borrow<T>,
364 DB: Borrow<D>,
365 I: IntoIterator<Item = SB>,
366 D: Send + Sync,
367 {
368 let batch = self.batch(updates, lower.clone(), upper.clone()).await?;
369 self.append_batch(batch, lower, upper).await
370 }
371
372 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
401 pub async fn compare_and_append<SB, KB, VB, TB, DB, I>(
402 &mut self,
403 updates: I,
404 expected_upper: Antichain<T>,
405 new_upper: Antichain<T>,
406 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
407 where
408 SB: Borrow<((KB, VB), TB, DB)>,
409 KB: Borrow<K>,
410 VB: Borrow<V>,
411 TB: Borrow<T>,
412 DB: Borrow<D>,
413 I: IntoIterator<Item = SB>,
414 D: Send + Sync,
415 {
416 let mut batch = self
417 .batch(updates, expected_upper.clone(), new_upper.clone())
418 .await?;
419 match self
420 .compare_and_append_batch(&mut [&mut batch], expected_upper, new_upper, true)
421 .await
422 {
423 ok @ Ok(Ok(())) => ok,
424 err => {
425 batch.delete().await;
430 err
431 }
432 }
433 }
434
435 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
461 pub async fn append_batch(
462 &mut self,
463 mut batch: Batch<K, V, T, D>,
464 mut lower: Antichain<T>,
465 upper: Antichain<T>,
466 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
467 where
468 D: Send + Sync,
469 {
470 loop {
471 let res = self
472 .compare_and_append_batch(&mut [&mut batch], lower.clone(), upper.clone(), true)
473 .await?;
474 match res {
475 Ok(()) => {
476 self.upper = upper;
477 return Ok(Ok(()));
478 }
479 Err(mismatch) => {
480 if PartialOrder::less_than(&mismatch.current, &lower) {
482 self.upper.clone_from(&mismatch.current);
483
484 batch.delete().await;
485
486 return Ok(Err(mismatch));
487 } else if PartialOrder::less_than(&mismatch.current, &upper) {
488 lower = mismatch.current;
495 } else {
496 self.upper = mismatch.current;
498
499 batch.delete().await;
503
504 return Ok(Ok(()));
505 }
506 }
507 }
508 }
509 }
510
511 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
545 pub async fn compare_and_append_batch(
546 &mut self,
547 batches: &mut [&mut Batch<K, V, T, D>],
548 expected_upper: Antichain<T>,
549 new_upper: Antichain<T>,
550 validate_part_bounds_on_write: bool,
551 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
552 where
553 D: Send + Sync,
554 {
555 let schema_id = self.ensure_schema_registered().await;
557
558 for batch in batches.iter() {
559 if self.machine.shard_id() != batch.shard_id() {
560 return Err(InvalidUsage::BatchNotFromThisShard {
561 batch_shard: batch.shard_id(),
562 handle_shard: self.machine.shard_id(),
563 });
564 }
565 assert_code_can_read_data(&self.cfg.build_version, &batch.version);
566 if self.cfg.build_version > batch.version {
567 info!(
568 shard_id =? self.machine.shard_id(),
569 batch_version =? batch.version,
570 writer_version =? self.cfg.build_version,
571 "Appending batch from the past. This is fine but should be rare. \
572 TODO: Error on very old versions once the leaked blob detector exists."
573 )
574 }
575 }
576
577 let lower = expected_upper.clone();
578 let upper = new_upper;
579 let since = Antichain::from_elem(T::minimum());
580 let desc = Description::new(lower, upper, since);
581
582 let mut received_inline_backpressure = false;
583 let mut inline_batch_builder: Option<(_, BatchBuilder<K, V, T, D>)> = None;
590 let maintenance = loop {
591 let any_batch_rewrite = batches
592 .iter()
593 .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some()));
594 let (mut parts, mut num_updates, mut run_splits, mut run_metas) =
595 (vec![], 0, vec![], vec![]);
596 let mut key_storage = None;
597 let mut val_storage = None;
598 for batch in batches.iter() {
599 let () = validate_truncate_batch(
600 &batch.batch,
601 &desc,
602 any_batch_rewrite,
603 validate_part_bounds_on_write,
604 )?;
605 for (run_meta, run) in batch.batch.runs() {
606 let start_index = parts.len();
607 for part in run {
608 if let (
609 RunPart::Single(
610 batch_part @ BatchPart::Inline {
611 updates,
612 ts_rewrite,
613 schema_id: _,
614 deprecated_schema_id: _,
615 },
616 ),
617 Some((schema_cache, builder)),
618 ) = (part, &mut inline_batch_builder)
619 {
620 let schema_migration = PartMigration::new(
621 batch_part,
622 self.write_schemas.clone(),
623 schema_cache,
624 )
625 .await
626 .expect("schemas for inline user part");
627
628 let encoded_part = EncodedPart::from_inline(
629 &crate::fetch::FetchConfig::from_persist_config(&self.cfg),
630 &*self.metrics,
631 self.metrics.read.compaction.clone(),
632 desc.clone(),
633 updates,
634 ts_rewrite.as_ref(),
635 );
636 let mut fetched_part = FetchedPart::new(
637 Arc::clone(&self.metrics),
638 encoded_part,
639 schema_migration,
640 FetchBatchFilter::Compaction {
641 since: desc.since().clone(),
642 },
643 false,
644 PartDecodeFormat::Arrow,
645 None,
646 );
647
648 while let Some(((k, v), t, d)) =
649 fetched_part.next_with_storage(&mut key_storage, &mut val_storage)
650 {
651 builder
652 .add(
653 &k.expect("decoded just-encoded key data"),
654 &v.expect("decoded just-encoded val data"),
655 &t,
656 &d,
657 )
658 .await
659 .expect("re-encoding just-decoded data");
660 }
661 } else {
662 parts.push(part.clone())
663 }
664 }
665
666 let end_index = parts.len();
667
668 if start_index == end_index {
669 continue;
670 }
671
672 if start_index != 0 {
674 run_splits.push(start_index);
675 }
676 run_metas.push(run_meta.clone());
677 }
678 num_updates += batch.batch.len;
679 }
680
681 let mut flushed_inline_batch = if let Some((_, builder)) = inline_batch_builder.take() {
682 let mut finished = builder
683 .finish(desc.upper().clone())
684 .await
685 .expect("invalid usage");
686 let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
687 finished
688 .flush_to_blob(
689 &cfg,
690 &self.metrics.inline.backpressure,
691 &self.isolated_runtime,
692 &self.write_schemas,
693 )
694 .await;
695 Some(finished)
696 } else {
697 None
698 };
699
700 if let Some(batch) = &flushed_inline_batch {
701 for (run_meta, run) in batch.batch.runs() {
702 assert!(run.len() > 0);
703 let start_index = parts.len();
704 if start_index != 0 {
705 run_splits.push(start_index);
706 }
707 run_metas.push(run_meta.clone());
708 parts.extend(run.iter().cloned())
709 }
710 }
711
712 let mut combined_batch =
713 HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits);
714 ensure_batch_schema(&mut combined_batch, self.shard_id(), schema_id);
718 let heartbeat_timestamp = (self.cfg.now)();
719 let res = self
720 .machine
721 .compare_and_append(
722 &combined_batch,
723 &self.writer_id,
724 &self.debug_state,
725 heartbeat_timestamp,
726 )
727 .await;
728
729 match res {
730 CompareAndAppendRes::Success(_seqno, maintenance) => {
731 self.upper.clone_from(desc.upper());
732 for batch in batches.iter_mut() {
733 batch.mark_consumed();
734 }
735 if let Some(batch) = &mut flushed_inline_batch {
736 batch.mark_consumed();
737 }
738 break maintenance;
739 }
740 CompareAndAppendRes::InvalidUsage(invalid_usage) => {
741 if let Some(batch) = flushed_inline_batch.take() {
742 batch.delete().await;
743 }
744 return Err(invalid_usage);
745 }
746 CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => {
747 if let Some(batch) = flushed_inline_batch.take() {
748 batch.delete().await;
749 }
750 self.upper.clone_from(¤t_upper);
753 return Ok(Err(UpperMismatch {
754 current: current_upper,
755 expected: expected_upper,
756 }));
757 }
758 CompareAndAppendRes::InlineBackpressure => {
759 assert_eq!(received_inline_backpressure, false);
762 received_inline_backpressure = true;
763 if COMBINE_INLINE_WRITES.get(&self.cfg) {
764 inline_batch_builder = Some((
765 self.machine.applier.schema_cache(),
766 self.builder(desc.lower().clone()),
767 ));
768 continue;
769 }
770
771 let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
772 let flush_batches = batches
775 .iter_mut()
776 .map(|batch| async {
777 batch
778 .flush_to_blob(
779 &cfg,
780 &self.metrics.inline.backpressure,
781 &self.isolated_runtime,
782 &self.write_schemas,
783 )
784 .await
785 })
786 .collect::<FuturesUnordered<_>>();
787 let () = flush_batches.collect::<()>().await;
788
789 for batch in batches.iter() {
790 assert_eq!(batch.batch.inline_bytes(), 0);
791 }
792
793 continue;
794 }
795 }
796 };
797
798 maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
799
800 Ok(Ok(()))
801 }
802
803 pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> {
806 let shard_id: ShardId = batch
807 .shard_id
808 .into_rust()
809 .expect("valid transmittable batch");
810 assert_eq!(shard_id, self.machine.shard_id());
811
812 let ret = Batch {
813 batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
814 metrics: Arc::clone(&self.metrics),
815 shard_metrics: Arc::clone(&self.machine.applier.shard_metrics),
816 version: Version::parse(&batch.version).expect("valid transmittable batch"),
817 batch: batch
818 .batch
819 .into_rust_if_some("ProtoBatch::batch")
820 .expect("valid transmittable batch"),
821 blob: Arc::clone(&self.blob),
822 _phantom: std::marker::PhantomData,
823 };
824 assert_eq!(ret.shard_id(), self.machine.shard_id());
825 ret
826 }
827
828 pub fn builder(&self, lower: Antichain<T>) -> BatchBuilder<K, V, T, D> {
841 Self::builder_inner(
842 &self.cfg,
843 CompactConfig::new(&self.cfg, self.shard_id()),
844 Arc::clone(&self.metrics),
845 Arc::clone(&self.machine.applier.shard_metrics),
846 &self.metrics.user,
847 Arc::clone(&self.isolated_runtime),
848 Arc::clone(&self.blob),
849 self.shard_id(),
850 self.write_schemas.clone(),
851 lower,
852 )
853 }
854
855 pub(crate) fn builder_inner(
858 persist_cfg: &PersistConfig,
859 compact_cfg: CompactConfig,
860 metrics: Arc<Metrics>,
861 shard_metrics: Arc<ShardMetrics>,
862 user_batch_metrics: &BatchWriteMetrics,
863 isolated_runtime: Arc<IsolatedRuntime>,
864 blob: Arc<dyn Blob>,
865 shard_id: ShardId,
866 schemas: Schemas<K, V>,
867 lower: Antichain<T>,
868 ) -> BatchBuilder<K, V, T, D> {
869 let parts = if let Some(max_runs) = compact_cfg.batch.max_runs {
870 BatchParts::new_compacting::<K, V, D>(
871 compact_cfg,
872 Description::new(
873 lower.clone(),
874 Antichain::new(),
875 Antichain::from_elem(T::minimum()),
876 ),
877 max_runs,
878 Arc::clone(&metrics),
879 shard_metrics,
880 shard_id,
881 Arc::clone(&blob),
882 isolated_runtime,
883 user_batch_metrics,
884 schemas.clone(),
885 )
886 } else {
887 BatchParts::new_ordered::<D>(
888 compact_cfg.batch,
889 RunOrder::Unordered,
890 Arc::clone(&metrics),
891 shard_metrics,
892 shard_id,
893 Arc::clone(&blob),
894 isolated_runtime,
895 user_batch_metrics,
896 )
897 };
898 let builder = BatchBuilderInternal::new(
899 BatchBuilderConfig::new(persist_cfg, shard_id),
900 parts,
901 metrics,
902 schemas,
903 blob,
904 shard_id,
905 persist_cfg.build_version.clone(),
906 );
907 BatchBuilder::new(
908 builder,
909 Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())),
910 )
911 }
912
913 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
916 pub async fn batch<SB, KB, VB, TB, DB, I>(
917 &mut self,
918 updates: I,
919 lower: Antichain<T>,
920 upper: Antichain<T>,
921 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>>
922 where
923 SB: Borrow<((KB, VB), TB, DB)>,
924 KB: Borrow<K>,
925 VB: Borrow<V>,
926 TB: Borrow<T>,
927 DB: Borrow<D>,
928 I: IntoIterator<Item = SB>,
929 {
930 let iter = updates.into_iter();
931
932 let mut builder = self.builder(lower.clone());
933
934 for update in iter {
935 let ((k, v), t, d) = update.borrow();
936 let (k, v, t, d) = (k.borrow(), v.borrow(), t.borrow(), d.borrow());
937 match builder.add(k, v, t, d).await {
938 Ok(Added::Record | Added::RecordAndParts) => (),
939 Err(invalid_usage) => return Err(invalid_usage),
940 }
941 }
942
943 builder.finish(upper.clone()).await
944 }
945
946 pub async fn wait_for_upper_past(&mut self, frontier: &Antichain<T>) {
948 let mut watch = self.machine.applier.watch();
949 let batch = self
950 .machine
951 .next_listen_batch(frontier, &mut watch, None, None)
952 .await;
953 if PartialOrder::less_than(&self.upper, batch.desc.upper()) {
954 self.upper.clone_from(batch.desc.upper());
955 }
956 assert!(PartialOrder::less_than(frontier, &self.upper));
957 }
958
959 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
968 pub async fn expire(mut self) {
969 let Some(expire_fn) = self.expire_fn.take() else {
970 return;
971 };
972 expire_fn.0().await;
973 }
974
975 fn expire_fn(
976 machine: Machine<K, V, T, D>,
977 gc: GarbageCollector<K, V, T, D>,
978 writer_id: WriterId,
979 ) -> ExpireFn {
980 ExpireFn(Box::new(move || {
981 Box::pin(async move {
982 let (_, maintenance) = machine.expire_writer(&writer_id).await;
983 maintenance.start_performing(&machine, &gc);
984 })
985 }))
986 }
987
988 #[cfg(test)]
990 #[track_caller]
991 pub async fn expect_append<L, U>(&mut self, updates: &[((K, V), T, D)], lower: L, new_upper: U)
992 where
993 L: Into<Antichain<T>>,
994 U: Into<Antichain<T>>,
995 D: Send + Sync,
996 {
997 self.append(updates.iter(), lower.into(), new_upper.into())
998 .await
999 .expect("invalid usage")
1000 .expect("unexpected upper");
1001 }
1002
1003 #[cfg(test)]
1006 #[track_caller]
1007 pub async fn expect_compare_and_append(
1008 &mut self,
1009 updates: &[((K, V), T, D)],
1010 expected_upper: T,
1011 new_upper: T,
1012 ) where
1013 D: Send + Sync,
1014 {
1015 self.compare_and_append(
1016 updates.iter().map(|((k, v), t, d)| ((k, v), t, d)),
1017 Antichain::from_elem(expected_upper),
1018 Antichain::from_elem(new_upper),
1019 )
1020 .await
1021 .expect("invalid usage")
1022 .expect("unexpected upper")
1023 }
1024
1025 #[cfg(test)]
1028 #[track_caller]
1029 pub async fn expect_compare_and_append_batch(
1030 &mut self,
1031 batches: &mut [&mut Batch<K, V, T, D>],
1032 expected_upper: T,
1033 new_upper: T,
1034 ) {
1035 self.compare_and_append_batch(
1036 batches,
1037 Antichain::from_elem(expected_upper),
1038 Antichain::from_elem(new_upper),
1039 true,
1040 )
1041 .await
1042 .expect("invalid usage")
1043 .expect("unexpected upper")
1044 }
1045
1046 #[cfg(test)]
1048 #[track_caller]
1049 pub async fn expect_batch(
1050 &mut self,
1051 updates: &[((K, V), T, D)],
1052 lower: T,
1053 upper: T,
1054 ) -> Batch<K, V, T, D> {
1055 self.batch(
1056 updates.iter(),
1057 Antichain::from_elem(lower),
1058 Antichain::from_elem(upper),
1059 )
1060 .await
1061 .expect("invalid usage")
1062 }
1063}
1064
1065impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D> {
1066 fn drop(&mut self) {
1067 let Some(expire_fn) = self.expire_fn.take() else {
1068 return;
1069 };
1070 let handle = match Handle::try_current() {
1071 Ok(x) => x,
1072 Err(_) => {
1073 warn!(
1074 "WriteHandle {} dropped without being explicitly expired, falling back to lease timeout",
1075 self.writer_id
1076 );
1077 return;
1078 }
1079 };
1080 let expire_span = debug_span!("drop::expire");
1086 handle.spawn_named(
1087 || format!("WriteHandle::expire ({})", self.writer_id),
1088 expire_fn.0().instrument(expire_span),
1089 );
1090 }
1091}
1092
1093fn ensure_batch_schema<T>(batch: &mut HollowBatch<T>, shard_id: ShardId, schema_id: SchemaId)
1098where
1099 T: Timestamp + Lattice + Codec64,
1100{
1101 let ensure = |id: &mut Option<SchemaId>| match id {
1102 Some(id) => assert_eq!(*id, schema_id, "schema ID mismatch; shard={shard_id}"),
1103 None => *id = Some(schema_id),
1104 };
1105
1106 for run_meta in &mut batch.run_meta {
1107 ensure(&mut run_meta.schema);
1108 }
1109 for part in &mut batch.parts {
1110 match part {
1111 RunPart::Single(BatchPart::Hollow(part)) => ensure(&mut part.schema_id),
1112 RunPart::Single(BatchPart::Inline { schema_id, .. }) => ensure(schema_id),
1113 RunPart::Many(_hollow_run_ref) => {
1114 }
1118 }
1119 }
1120}
1121
1122#[cfg(test)]
1123mod tests {
1124 use std::str::FromStr;
1125 use std::sync::mpsc;
1126
1127 use differential_dataflow::consolidation::consolidate_updates;
1128 use futures_util::FutureExt;
1129 use mz_dyncfg::ConfigUpdates;
1130 use mz_ore::collections::CollectionExt;
1131 use mz_ore::task;
1132 use serde_json::json;
1133
1134 use crate::cache::PersistClientCache;
1135 use crate::tests::{all_ok, new_test_client};
1136 use crate::{PersistLocation, ShardId};
1137
1138 use super::*;
1139
1140 #[mz_persist_proc::test(tokio::test)]
1141 #[cfg_attr(miri, ignore)] async fn empty_batches(dyncfgs: ConfigUpdates) {
1143 let data = [
1144 (("1".to_owned(), "one".to_owned()), 1, 1),
1145 (("2".to_owned(), "two".to_owned()), 2, 1),
1146 (("3".to_owned(), "three".to_owned()), 3, 1),
1147 ];
1148
1149 let (mut write, _) = new_test_client(&dyncfgs)
1150 .await
1151 .expect_open::<String, String, u64, i64>(ShardId::new())
1152 .await;
1153 let blob = Arc::clone(&write.blob);
1154
1155 let mut upper = 3;
1157 write.expect_append(&data[..2], vec![0], vec![upper]).await;
1158
1159 let mut count_before = 0;
1161 blob.list_keys_and_metadata("", &mut |_| {
1162 count_before += 1;
1163 })
1164 .await
1165 .expect("list_keys failed");
1166 for _ in 0..5 {
1167 let new_upper = upper + 1;
1168 write.expect_compare_and_append(&[], upper, new_upper).await;
1169 upper = new_upper;
1170 }
1171 let mut count_after = 0;
1172 blob.list_keys_and_metadata("", &mut |_| {
1173 count_after += 1;
1174 })
1175 .await
1176 .expect("list_keys failed");
1177 assert_eq!(count_after, count_before);
1178 }
1179
1180 #[mz_persist_proc::test(tokio::test)]
1181 #[cfg_attr(miri, ignore)] async fn compare_and_append_batch_multi(dyncfgs: ConfigUpdates) {
1183 let data0 = vec![
1184 (("1".to_owned(), "one".to_owned()), 1, 1),
1185 (("2".to_owned(), "two".to_owned()), 2, 1),
1186 (("4".to_owned(), "four".to_owned()), 4, 1),
1187 ];
1188 let data1 = vec![
1189 (("1".to_owned(), "one".to_owned()), 1, 1),
1190 (("2".to_owned(), "two".to_owned()), 2, 1),
1191 (("3".to_owned(), "three".to_owned()), 3, 1),
1192 ];
1193
1194 let (mut write, mut read) = new_test_client(&dyncfgs)
1195 .await
1196 .expect_open::<String, String, u64, i64>(ShardId::new())
1197 .await;
1198
1199 let mut batch0 = write.expect_batch(&data0, 0, 5).await;
1200 let mut batch1 = write.expect_batch(&data1, 0, 4).await;
1201
1202 write
1203 .expect_compare_and_append_batch(&mut [&mut batch0, &mut batch1], 0, 4)
1204 .await;
1205
1206 let batch = write
1207 .machine
1208 .snapshot(&Antichain::from_elem(3))
1209 .await
1210 .expect("just wrote this")
1211 .into_element();
1212
1213 assert!(batch.runs().count() >= 2);
1214
1215 let expected = vec![
1216 (("1".to_owned(), "one".to_owned()), 1, 2),
1217 (("2".to_owned(), "two".to_owned()), 2, 2),
1218 (("3".to_owned(), "three".to_owned()), 3, 1),
1219 ];
1220 let mut actual = read.expect_snapshot_and_fetch(3).await;
1221 consolidate_updates(&mut actual);
1222 assert_eq!(actual, all_ok(&expected, 3));
1223 }
1224
1225 #[mz_ore::test]
1226 fn writer_id_human_readable_serde() {
1227 #[derive(Debug, Serialize, Deserialize)]
1228 struct Container {
1229 writer_id: WriterId,
1230 }
1231
1232 let id = WriterId::from_str("w00000000-1234-5678-0000-000000000000").expect("valid id");
1234 assert_eq!(
1235 id,
1236 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1237 .expect("deserializable")
1238 );
1239
1240 assert_eq!(
1242 id,
1243 serde_json::from_str("\"w00000000-1234-5678-0000-000000000000\"")
1244 .expect("deserializable")
1245 );
1246
1247 let json = json!({ "writer_id": id });
1249 assert_eq!(
1250 "{\"writer_id\":\"w00000000-1234-5678-0000-000000000000\"}",
1251 &json.to_string()
1252 );
1253 let container: Container = serde_json::from_value(json).expect("deserializable");
1254 assert_eq!(container.writer_id, id);
1255 }
1256
1257 #[mz_persist_proc::test(tokio::test)]
1258 #[cfg_attr(miri, ignore)] async fn hollow_batch_roundtrip(dyncfgs: ConfigUpdates) {
1260 let data = vec![
1261 (("1".to_owned(), "one".to_owned()), 1, 1),
1262 (("2".to_owned(), "two".to_owned()), 2, 1),
1263 (("3".to_owned(), "three".to_owned()), 3, 1),
1264 ];
1265
1266 let (mut write, mut read) = new_test_client(&dyncfgs)
1267 .await
1268 .expect_open::<String, String, u64, i64>(ShardId::new())
1269 .await;
1270
1271 let batch = write.expect_batch(&data, 0, 4).await;
1276 let hollow_batch = batch.into_transmittable_batch();
1277 let mut rehydrated_batch = write.batch_from_transmittable_batch(hollow_batch);
1278
1279 write
1280 .expect_compare_and_append_batch(&mut [&mut rehydrated_batch], 0, 4)
1281 .await;
1282
1283 let expected = vec![
1284 (("1".to_owned(), "one".to_owned()), 1, 1),
1285 (("2".to_owned(), "two".to_owned()), 2, 1),
1286 (("3".to_owned(), "three".to_owned()), 3, 1),
1287 ];
1288 let mut actual = read.expect_snapshot_and_fetch(3).await;
1289 consolidate_updates(&mut actual);
1290 assert_eq!(actual, all_ok(&expected, 3));
1291 }
1292
1293 #[mz_persist_proc::test(tokio::test)]
1294 #[cfg_attr(miri, ignore)] async fn wait_for_upper_past(dyncfgs: ConfigUpdates) {
1296 let client = new_test_client(&dyncfgs).await;
1297 let (mut write, _) = client.expect_open::<(), (), u64, i64>(ShardId::new()).await;
1298 let five = Antichain::from_elem(5);
1299
1300 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1302
1303 write
1305 .expect_compare_and_append(&[(((), ()), 1, 1)], 0, 5)
1306 .await;
1307 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1308
1309 write
1311 .expect_compare_and_append(&[(((), ()), 5, 1)], 5, 7)
1312 .await;
1313 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), Some(()));
1314 assert_eq!(write.upper(), &Antichain::from_elem(7));
1315
1316 assert_eq!(
1319 write
1320 .wait_for_upper_past(&Antichain::from_elem(2))
1321 .now_or_never(),
1322 Some(())
1323 );
1324 assert_eq!(write.upper(), &Antichain::from_elem(7));
1325 }
1326
1327 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1328 #[cfg_attr(miri, ignore)] async fn fetch_recent_upper_linearized() {
1330 type Timestamp = u64;
1331 let max_upper = 1000;
1332
1333 let shard_id = ShardId::new();
1334 let mut clients = PersistClientCache::new_no_metrics();
1335 let upper_writer_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1336 let (mut upper_writer, _) = upper_writer_client
1337 .expect_open::<(), (), Timestamp, i64>(shard_id)
1338 .await;
1339 clients.clear_state_cache();
1342 let upper_reader_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1343 let (mut upper_reader, _) = upper_reader_client
1344 .expect_open::<(), (), Timestamp, i64>(shard_id)
1345 .await;
1346 let (tx, rx) = mpsc::channel();
1347
1348 let task = task::spawn(|| "upper-reader", async move {
1349 let mut upper = Timestamp::MIN;
1350
1351 while upper < max_upper {
1352 while let Ok(new_upper) = rx.try_recv() {
1353 upper = new_upper;
1354 }
1355
1356 let recent_upper = upper_reader
1357 .fetch_recent_upper()
1358 .await
1359 .as_option()
1360 .cloned()
1361 .expect("u64 is totally ordered and the shard is not finalized");
1362 assert!(
1363 recent_upper >= upper,
1364 "recent upper {recent_upper:?} is less than known upper {upper:?}"
1365 );
1366 }
1367 });
1368
1369 for upper in Timestamp::MIN..max_upper {
1370 let next_upper = upper + 1;
1371 upper_writer
1372 .expect_compare_and_append(&[], upper, next_upper)
1373 .await;
1374 tx.send(next_upper).expect("send failed");
1375 }
1376
1377 task.await.expect("await failed");
1378 }
1379}