1use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::Description;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_dyncfg::Config;
22use mz_ore::instrument;
23use mz_ore::task::RuntimeExt;
24use mz_persist::location::Blob;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::{Codec, Codec64};
27use mz_proto::{IntoRustIfSome, ProtoType};
28use proptest_derive::Arbitrary;
29use semver::Version;
30use serde::{Deserialize, Serialize};
31use timely::PartialOrder;
32use timely::order::TotalOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::runtime::Handle;
35use tracing::{Instrument, debug_span, info, warn};
36use uuid::Uuid;
37
38use crate::batch::{
39 Added, BATCH_DELETE_ENABLED, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
40 BatchParts, ProtoBatch, validate_truncate_batch,
41};
42use crate::error::{InvalidUsage, UpperMismatch};
43use crate::fetch::{
44 EncodedPart, FetchBatchFilter, FetchedPart, PartDecodeFormat, VALIDATE_PART_BOUNDS_ON_READ,
45};
46use crate::internal::compact::{CompactConfig, Compactor};
47use crate::internal::encoding::{Schemas, check_data_version};
48use crate::internal::machine::{CompareAndAppendRes, ExpireFn, Machine};
49use crate::internal::metrics::{BatchWriteMetrics, Metrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HandleDebugState, HollowBatch, RunOrder, RunPart};
51use crate::read::ReadHandle;
52use crate::schema::PartMigration;
53use crate::{GarbageCollector, IsolatedRuntime, PersistConfig, ShardId, parse_id};
54
55pub(crate) const COMBINE_INLINE_WRITES: Config<bool> = Config::new(
56 "persist_write_combine_inline_writes",
57 true,
58 "If set, re-encode inline writes if they don't fit into the batch metadata limits.",
59);
60
61pub(crate) const VALIDATE_PART_BOUNDS_ON_WRITE: Config<bool> = Config::new(
62 "persist_validate_part_bounds_on_write",
63 true,
64 "Validate the part lower <= the batch lower and the part upper <= batch upper,\
65 for the batch being appended.",
66);
67
68#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
70#[serde(try_from = "String", into = "String")]
71pub struct WriterId(pub(crate) [u8; 16]);
72
73impl std::fmt::Display for WriterId {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 write!(f, "w{}", Uuid::from_bytes(self.0))
76 }
77}
78
79impl std::fmt::Debug for WriterId {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 write!(f, "WriterId({})", Uuid::from_bytes(self.0))
82 }
83}
84
85impl std::str::FromStr for WriterId {
86 type Err = String;
87
88 fn from_str(s: &str) -> Result<Self, Self::Err> {
89 parse_id("w", "WriterId", s).map(WriterId)
90 }
91}
92
93impl From<WriterId> for String {
94 fn from(writer_id: WriterId) -> Self {
95 writer_id.to_string()
96 }
97}
98
99impl TryFrom<String> for WriterId {
100 type Error = String;
101
102 fn try_from(s: String) -> Result<Self, Self::Error> {
103 s.parse()
104 }
105}
106
107impl WriterId {
108 pub(crate) fn new() -> Self {
109 WriterId(*Uuid::new_v4().as_bytes())
110 }
111}
112
113#[derive(Debug)]
129pub struct WriteHandle<K: Codec, V: Codec, T, D> {
130 pub(crate) cfg: PersistConfig,
131 pub(crate) metrics: Arc<Metrics>,
132 pub(crate) machine: Machine<K, V, T, D>,
133 pub(crate) gc: GarbageCollector<K, V, T, D>,
134 pub(crate) compact: Option<Compactor<K, V, T, D>>,
135 pub(crate) blob: Arc<dyn Blob>,
136 pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
137 pub(crate) writer_id: WriterId,
138 pub(crate) debug_state: HandleDebugState,
139 pub(crate) write_schemas: Schemas<K, V>,
140
141 pub(crate) upper: Antichain<T>,
142 expire_fn: Option<ExpireFn>,
143}
144
145impl<K, V, T, D> WriteHandle<K, V, T, D>
146where
147 K: Debug + Codec,
148 V: Debug + Codec,
149 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
150 D: Monoid + Ord + Codec64 + Send + Sync,
151{
152 pub(crate) fn new(
153 cfg: PersistConfig,
154 metrics: Arc<Metrics>,
155 machine: Machine<K, V, T, D>,
156 gc: GarbageCollector<K, V, T, D>,
157 blob: Arc<dyn Blob>,
158 writer_id: WriterId,
159 purpose: &str,
160 write_schemas: Schemas<K, V>,
161 ) -> Self {
162 let isolated_runtime = Arc::clone(&machine.isolated_runtime);
163 let compact = cfg.compaction_enabled.then(|| {
164 Compactor::new(
165 cfg.clone(),
166 Arc::clone(&metrics),
167 write_schemas.clone(),
168 gc.clone(),
169 )
170 });
171 let debug_state = HandleDebugState {
172 hostname: cfg.hostname.to_owned(),
173 purpose: purpose.to_owned(),
174 };
175 let upper = machine.applier.clone_upper();
176 let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), writer_id.clone());
177 WriteHandle {
178 cfg,
179 metrics,
180 machine,
181 gc,
182 compact,
183 blob,
184 isolated_runtime,
185 writer_id,
186 debug_state,
187 write_schemas,
188 upper,
189 expire_fn: Some(expire_fn),
190 }
191 }
192
193 pub fn from_read(read: &ReadHandle<K, V, T, D>, purpose: &str) -> Self {
196 Self::new(
197 read.cfg.clone(),
198 Arc::clone(&read.metrics),
199 read.machine.clone(),
200 read.gc.clone(),
201 Arc::clone(&read.blob),
202 WriterId::new(),
203 purpose,
204 read.read_schemas.clone(),
205 )
206 }
207
208 pub fn validate_part_bounds_on_write(&self) -> bool {
211 VALIDATE_PART_BOUNDS_ON_WRITE.get(&self.cfg) && VALIDATE_PART_BOUNDS_ON_READ.get(&self.cfg)
212 }
213
214 pub fn shard_id(&self) -> ShardId {
216 self.machine.shard_id()
217 }
218
219 pub fn schema_id(&self) -> Option<SchemaId> {
221 self.write_schemas.id
222 }
223
224 pub async fn ensure_schema_registered(&mut self) -> SchemaId {
232 let Schemas { id, key, val } = &self.write_schemas;
233
234 if let Some(id) = id {
235 return *id;
236 }
237
238 let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
239 maintenance.start_performing(&self.machine, &self.gc);
240
241 let Some(schema_id) = schema_id else {
242 panic!("unable to register schemas: {key:?} {val:?}");
243 };
244
245 self.write_schemas.id = Some(schema_id);
246 schema_id
247 }
248
249 pub fn upper(&self) -> &Antichain<T> {
256 &self.upper
257 }
258
259 pub fn shared_upper(&self) -> Antichain<T> {
265 self.machine.applier.clone_upper()
266 }
267
268 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
274 pub async fn fetch_recent_upper(&mut self) -> &Antichain<T> {
275 self.machine
278 .applier
279 .fetch_upper(|current_upper| self.upper.clone_from(current_upper))
280 .await;
281 &self.upper
282 }
283
284 pub async fn advance_upper(&mut self, target: &Antichain<T>) {
292 let mut lower = self.shared_upper().clone();
295
296 while !PartialOrder::less_equal(target, &lower) {
297 let since = Antichain::from_elem(T::minimum());
298 let desc = Description::new(lower.clone(), target.clone(), since);
299 let batch = HollowBatch::empty(desc);
300
301 let heartbeat_timestamp = (self.cfg.now)();
302 let res = self
303 .machine
304 .compare_and_append(
305 &batch,
306 &self.writer_id,
307 &self.debug_state,
308 heartbeat_timestamp,
309 )
310 .await;
311
312 use CompareAndAppendRes::*;
313 let new_upper = match res {
314 Success(_seq_no, maintenance) => {
315 maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
316 batch.desc.upper().clone()
317 }
318 UpperMismatch(_seq_no, actual_upper) => actual_upper,
319 InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
320 InlineBackpressure => unreachable!("batch was empty"),
321 };
322
323 self.upper.clone_from(&new_upper);
324 lower = new_upper;
325 }
326 }
327
328 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
358 pub async fn append<SB, KB, VB, TB, DB, I>(
359 &mut self,
360 updates: I,
361 lower: Antichain<T>,
362 upper: Antichain<T>,
363 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
364 where
365 SB: Borrow<((KB, VB), TB, DB)>,
366 KB: Borrow<K>,
367 VB: Borrow<V>,
368 TB: Borrow<T>,
369 DB: Borrow<D>,
370 I: IntoIterator<Item = SB>,
371 D: Send + Sync,
372 {
373 let batch = self.batch(updates, lower.clone(), upper.clone()).await?;
374 self.append_batch(batch, lower, upper).await
375 }
376
377 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
406 pub async fn compare_and_append<SB, KB, VB, TB, DB, I>(
407 &mut self,
408 updates: I,
409 expected_upper: Antichain<T>,
410 new_upper: Antichain<T>,
411 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
412 where
413 SB: Borrow<((KB, VB), TB, DB)>,
414 KB: Borrow<K>,
415 VB: Borrow<V>,
416 TB: Borrow<T>,
417 DB: Borrow<D>,
418 I: IntoIterator<Item = SB>,
419 D: Send + Sync,
420 {
421 let mut batch = self
422 .batch(updates, expected_upper.clone(), new_upper.clone())
423 .await?;
424 match self
425 .compare_and_append_batch(&mut [&mut batch], expected_upper, new_upper, true)
426 .await
427 {
428 ok @ Ok(Ok(())) => ok,
429 err => {
430 batch.delete().await;
435 err
436 }
437 }
438 }
439
440 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
466 pub async fn append_batch(
467 &mut self,
468 mut batch: Batch<K, V, T, D>,
469 mut lower: Antichain<T>,
470 upper: Antichain<T>,
471 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
472 where
473 D: Send + Sync,
474 {
475 loop {
476 let res = self
477 .compare_and_append_batch(&mut [&mut batch], lower.clone(), upper.clone(), true)
478 .await?;
479 match res {
480 Ok(()) => {
481 self.upper = upper;
482 return Ok(Ok(()));
483 }
484 Err(mismatch) => {
485 if PartialOrder::less_than(&mismatch.current, &lower) {
487 self.upper.clone_from(&mismatch.current);
488
489 batch.delete().await;
490
491 return Ok(Err(mismatch));
492 } else if PartialOrder::less_than(&mismatch.current, &upper) {
493 lower = mismatch.current;
500 } else {
501 self.upper = mismatch.current;
503
504 batch.delete().await;
508
509 return Ok(Ok(()));
510 }
511 }
512 }
513 }
514 }
515
516 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
550 pub async fn compare_and_append_batch(
551 &mut self,
552 batches: &mut [&mut Batch<K, V, T, D>],
553 expected_upper: Antichain<T>,
554 new_upper: Antichain<T>,
555 validate_part_bounds_on_write: bool,
556 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
557 where
558 D: Send + Sync,
559 {
560 let schema_id = self.ensure_schema_registered().await;
562
563 for batch in batches.iter() {
564 if self.machine.shard_id() != batch.shard_id() {
565 return Err(InvalidUsage::BatchNotFromThisShard {
566 batch_shard: batch.shard_id(),
567 handle_shard: self.machine.shard_id(),
568 });
569 }
570 check_data_version(&self.cfg.build_version, &batch.version);
571 if self.cfg.build_version > batch.version {
572 info!(
573 shard_id =? self.machine.shard_id(),
574 batch_version =? batch.version,
575 writer_version =? self.cfg.build_version,
576 "Appending batch from the past. This is fine but should be rare. \
577 TODO: Error on very old versions once the leaked blob detector exists."
578 )
579 }
580 }
581
582 let lower = expected_upper.clone();
583 let upper = new_upper;
584 let since = Antichain::from_elem(T::minimum());
585 let desc = Description::new(lower, upper, since);
586
587 let mut received_inline_backpressure = false;
588 let mut inline_batch_builder: Option<(_, BatchBuilder<K, V, T, D>)> = None;
595 let maintenance = loop {
596 let any_batch_rewrite = batches
597 .iter()
598 .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some()));
599 let (mut parts, mut num_updates, mut run_splits, mut run_metas) =
600 (vec![], 0, vec![], vec![]);
601 let mut key_storage = None;
602 let mut val_storage = None;
603 for batch in batches.iter() {
604 let () = validate_truncate_batch(
605 &batch.batch,
606 &desc,
607 any_batch_rewrite,
608 validate_part_bounds_on_write,
609 )?;
610 for (run_meta, run) in batch.batch.runs() {
611 let start_index = parts.len();
612 for part in run {
613 if let (
614 RunPart::Single(
615 batch_part @ BatchPart::Inline {
616 updates,
617 ts_rewrite,
618 schema_id: _,
619 deprecated_schema_id: _,
620 },
621 ),
622 Some((schema_cache, builder)),
623 ) = (part, &mut inline_batch_builder)
624 {
625 let schema_migration = PartMigration::new(
626 batch_part,
627 self.write_schemas.clone(),
628 schema_cache,
629 )
630 .await
631 .expect("schemas for inline user part");
632
633 let encoded_part = EncodedPart::from_inline(
634 &crate::fetch::FetchConfig::from_persist_config(&self.cfg),
635 &*self.metrics,
636 self.metrics.read.compaction.clone(),
637 desc.clone(),
638 updates,
639 ts_rewrite.as_ref(),
640 );
641 let mut fetched_part = FetchedPart::new(
642 Arc::clone(&self.metrics),
643 encoded_part,
644 schema_migration,
645 FetchBatchFilter::Compaction {
646 since: desc.since().clone(),
647 },
648 false,
649 PartDecodeFormat::Arrow,
650 None,
651 );
652
653 while let Some(((k, v), t, d)) =
654 fetched_part.next_with_storage(&mut key_storage, &mut val_storage)
655 {
656 builder
657 .add(
658 &k.expect("decoded just-encoded key data"),
659 &v.expect("decoded just-encoded val data"),
660 &t,
661 &d,
662 )
663 .await
664 .expect("re-encoding just-decoded data");
665 }
666 } else {
667 parts.push(part.clone())
668 }
669 }
670
671 let end_index = parts.len();
672
673 if start_index == end_index {
674 continue;
675 }
676
677 if start_index != 0 {
679 run_splits.push(start_index);
680 }
681 run_metas.push(run_meta.clone());
682 }
683 num_updates += batch.batch.len;
684 }
685
686 let mut flushed_inline_batch = if let Some((_, builder)) = inline_batch_builder.take() {
687 let mut finished = builder
688 .finish(desc.upper().clone())
689 .await
690 .expect("invalid usage");
691 let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
692 finished
693 .flush_to_blob(
694 &cfg,
695 &self.metrics.inline.backpressure,
696 &self.isolated_runtime,
697 &self.write_schemas,
698 )
699 .await;
700 Some(finished)
701 } else {
702 None
703 };
704
705 if let Some(batch) = &flushed_inline_batch {
706 for (run_meta, run) in batch.batch.runs() {
707 assert!(run.len() > 0);
708 let start_index = parts.len();
709 if start_index != 0 {
710 run_splits.push(start_index);
711 }
712 run_metas.push(run_meta.clone());
713 parts.extend(run.iter().cloned())
714 }
715 }
716
717 let mut combined_batch =
718 HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits);
719 ensure_batch_schema(&mut combined_batch, self.shard_id(), schema_id);
723 let heartbeat_timestamp = (self.cfg.now)();
724 let res = self
725 .machine
726 .compare_and_append(
727 &combined_batch,
728 &self.writer_id,
729 &self.debug_state,
730 heartbeat_timestamp,
731 )
732 .await;
733
734 match res {
735 CompareAndAppendRes::Success(_seqno, maintenance) => {
736 self.upper.clone_from(desc.upper());
737 for batch in batches.iter_mut() {
738 batch.mark_consumed();
739 }
740 if let Some(batch) = &mut flushed_inline_batch {
741 batch.mark_consumed();
742 }
743 break maintenance;
744 }
745 CompareAndAppendRes::InvalidUsage(invalid_usage) => {
746 if let Some(batch) = flushed_inline_batch.take() {
747 batch.delete().await;
748 }
749 return Err(invalid_usage);
750 }
751 CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => {
752 if let Some(batch) = flushed_inline_batch.take() {
753 batch.delete().await;
754 }
755 self.upper.clone_from(¤t_upper);
758 return Ok(Err(UpperMismatch {
759 current: current_upper,
760 expected: expected_upper,
761 }));
762 }
763 CompareAndAppendRes::InlineBackpressure => {
764 assert_eq!(received_inline_backpressure, false);
767 received_inline_backpressure = true;
768 if COMBINE_INLINE_WRITES.get(&self.cfg) {
769 inline_batch_builder = Some((
770 self.machine.applier.schema_cache(),
771 self.builder(desc.lower().clone()),
772 ));
773 continue;
774 }
775
776 let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
777 let flush_batches = batches
780 .iter_mut()
781 .map(|batch| async {
782 batch
783 .flush_to_blob(
784 &cfg,
785 &self.metrics.inline.backpressure,
786 &self.isolated_runtime,
787 &self.write_schemas,
788 )
789 .await
790 })
791 .collect::<FuturesUnordered<_>>();
792 let () = flush_batches.collect::<()>().await;
793
794 for batch in batches.iter() {
795 assert_eq!(batch.batch.inline_bytes(), 0);
796 }
797
798 continue;
799 }
800 }
801 };
802
803 maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
804
805 Ok(Ok(()))
806 }
807
808 pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> {
811 let shard_id: ShardId = batch
812 .shard_id
813 .into_rust()
814 .expect("valid transmittable batch");
815 assert_eq!(shard_id, self.machine.shard_id());
816
817 let ret = Batch {
818 batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
819 metrics: Arc::clone(&self.metrics),
820 shard_metrics: Arc::clone(&self.machine.applier.shard_metrics),
821 version: Version::parse(&batch.version).expect("valid transmittable batch"),
822 batch: batch
823 .batch
824 .into_rust_if_some("ProtoBatch::batch")
825 .expect("valid transmittable batch"),
826 blob: Arc::clone(&self.blob),
827 _phantom: std::marker::PhantomData,
828 };
829 assert_eq!(ret.shard_id(), self.machine.shard_id());
830 ret
831 }
832
833 pub fn builder(&self, lower: Antichain<T>) -> BatchBuilder<K, V, T, D> {
846 Self::builder_inner(
847 &self.cfg,
848 CompactConfig::new(&self.cfg, self.shard_id()),
849 Arc::clone(&self.metrics),
850 Arc::clone(&self.machine.applier.shard_metrics),
851 &self.metrics.user,
852 Arc::clone(&self.isolated_runtime),
853 Arc::clone(&self.blob),
854 self.shard_id(),
855 self.write_schemas.clone(),
856 lower,
857 )
858 }
859
860 pub(crate) fn builder_inner(
863 persist_cfg: &PersistConfig,
864 compact_cfg: CompactConfig,
865 metrics: Arc<Metrics>,
866 shard_metrics: Arc<ShardMetrics>,
867 user_batch_metrics: &BatchWriteMetrics,
868 isolated_runtime: Arc<IsolatedRuntime>,
869 blob: Arc<dyn Blob>,
870 shard_id: ShardId,
871 schemas: Schemas<K, V>,
872 lower: Antichain<T>,
873 ) -> BatchBuilder<K, V, T, D> {
874 let parts = if let Some(max_runs) = compact_cfg.batch.max_runs {
875 BatchParts::new_compacting::<K, V, D>(
876 compact_cfg,
877 Description::new(
878 lower.clone(),
879 Antichain::new(),
880 Antichain::from_elem(T::minimum()),
881 ),
882 max_runs,
883 Arc::clone(&metrics),
884 shard_metrics,
885 shard_id,
886 Arc::clone(&blob),
887 isolated_runtime,
888 user_batch_metrics,
889 schemas.clone(),
890 )
891 } else {
892 BatchParts::new_ordered::<D>(
893 compact_cfg.batch,
894 RunOrder::Unordered,
895 Arc::clone(&metrics),
896 shard_metrics,
897 shard_id,
898 Arc::clone(&blob),
899 isolated_runtime,
900 user_batch_metrics,
901 )
902 };
903 let builder = BatchBuilderInternal::new(
904 BatchBuilderConfig::new(persist_cfg, shard_id),
905 parts,
906 metrics,
907 schemas,
908 blob,
909 shard_id,
910 persist_cfg.build_version.clone(),
911 );
912 BatchBuilder::new(
913 builder,
914 Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())),
915 )
916 }
917
918 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
921 pub async fn batch<SB, KB, VB, TB, DB, I>(
922 &mut self,
923 updates: I,
924 lower: Antichain<T>,
925 upper: Antichain<T>,
926 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>>
927 where
928 SB: Borrow<((KB, VB), TB, DB)>,
929 KB: Borrow<K>,
930 VB: Borrow<V>,
931 TB: Borrow<T>,
932 DB: Borrow<D>,
933 I: IntoIterator<Item = SB>,
934 {
935 let iter = updates.into_iter();
936
937 let mut builder = self.builder(lower.clone());
938
939 for update in iter {
940 let ((k, v), t, d) = update.borrow();
941 let (k, v, t, d) = (k.borrow(), v.borrow(), t.borrow(), d.borrow());
942 match builder.add(k, v, t, d).await {
943 Ok(Added::Record | Added::RecordAndParts) => (),
944 Err(invalid_usage) => return Err(invalid_usage),
945 }
946 }
947
948 builder.finish(upper.clone()).await
949 }
950
951 pub async fn wait_for_upper_past(&mut self, frontier: &Antichain<T>) {
953 let mut watch = self.machine.applier.watch();
954 let batch = self
955 .machine
956 .next_listen_batch(frontier, &mut watch, None, None)
957 .await;
958 if PartialOrder::less_than(&self.upper, batch.desc.upper()) {
959 self.upper.clone_from(batch.desc.upper());
960 }
961 assert!(PartialOrder::less_than(frontier, &self.upper));
962 }
963
964 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
973 pub async fn expire(mut self) {
974 let Some(expire_fn) = self.expire_fn.take() else {
975 return;
976 };
977 expire_fn.0().await;
978 }
979
980 fn expire_fn(
981 machine: Machine<K, V, T, D>,
982 gc: GarbageCollector<K, V, T, D>,
983 writer_id: WriterId,
984 ) -> ExpireFn {
985 ExpireFn(Box::new(move || {
986 Box::pin(async move {
987 let (_, maintenance) = machine.expire_writer(&writer_id).await;
988 maintenance.start_performing(&machine, &gc);
989 })
990 }))
991 }
992
993 #[cfg(test)]
995 #[track_caller]
996 pub async fn expect_append<L, U>(&mut self, updates: &[((K, V), T, D)], lower: L, new_upper: U)
997 where
998 L: Into<Antichain<T>>,
999 U: Into<Antichain<T>>,
1000 D: Send + Sync,
1001 {
1002 self.append(updates.iter(), lower.into(), new_upper.into())
1003 .await
1004 .expect("invalid usage")
1005 .expect("unexpected upper");
1006 }
1007
1008 #[cfg(test)]
1011 #[track_caller]
1012 pub async fn expect_compare_and_append(
1013 &mut self,
1014 updates: &[((K, V), T, D)],
1015 expected_upper: T,
1016 new_upper: T,
1017 ) where
1018 D: Send + Sync,
1019 {
1020 self.compare_and_append(
1021 updates.iter().map(|((k, v), t, d)| ((k, v), t, d)),
1022 Antichain::from_elem(expected_upper),
1023 Antichain::from_elem(new_upper),
1024 )
1025 .await
1026 .expect("invalid usage")
1027 .expect("unexpected upper")
1028 }
1029
1030 #[cfg(test)]
1033 #[track_caller]
1034 pub async fn expect_compare_and_append_batch(
1035 &mut self,
1036 batches: &mut [&mut Batch<K, V, T, D>],
1037 expected_upper: T,
1038 new_upper: T,
1039 ) {
1040 self.compare_and_append_batch(
1041 batches,
1042 Antichain::from_elem(expected_upper),
1043 Antichain::from_elem(new_upper),
1044 true,
1045 )
1046 .await
1047 .expect("invalid usage")
1048 .expect("unexpected upper")
1049 }
1050
1051 #[cfg(test)]
1053 #[track_caller]
1054 pub async fn expect_batch(
1055 &mut self,
1056 updates: &[((K, V), T, D)],
1057 lower: T,
1058 upper: T,
1059 ) -> Batch<K, V, T, D> {
1060 self.batch(
1061 updates.iter(),
1062 Antichain::from_elem(lower),
1063 Antichain::from_elem(upper),
1064 )
1065 .await
1066 .expect("invalid usage")
1067 }
1068}
1069
1070impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D> {
1071 fn drop(&mut self) {
1072 let Some(expire_fn) = self.expire_fn.take() else {
1073 return;
1074 };
1075 let handle = match Handle::try_current() {
1076 Ok(x) => x,
1077 Err(_) => {
1078 warn!(
1079 "WriteHandle {} dropped without being explicitly expired, falling back to lease timeout",
1080 self.writer_id
1081 );
1082 return;
1083 }
1084 };
1085 let expire_span = debug_span!("drop::expire");
1091 handle.spawn_named(
1092 || format!("WriteHandle::expire ({})", self.writer_id),
1093 expire_fn.0().instrument(expire_span),
1094 );
1095 }
1096}
1097
1098fn ensure_batch_schema<T>(batch: &mut HollowBatch<T>, shard_id: ShardId, schema_id: SchemaId)
1103where
1104 T: Timestamp + Lattice + Codec64,
1105{
1106 let ensure = |id: &mut Option<SchemaId>| match id {
1107 Some(id) => assert_eq!(*id, schema_id, "schema ID mismatch; shard={shard_id}"),
1108 None => *id = Some(schema_id),
1109 };
1110
1111 for run_meta in &mut batch.run_meta {
1112 ensure(&mut run_meta.schema);
1113 }
1114 for part in &mut batch.parts {
1115 match part {
1116 RunPart::Single(BatchPart::Hollow(part)) => ensure(&mut part.schema_id),
1117 RunPart::Single(BatchPart::Inline { schema_id, .. }) => ensure(schema_id),
1118 RunPart::Many(_hollow_run_ref) => {
1119 }
1123 }
1124 }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129 use std::str::FromStr;
1130 use std::sync::mpsc;
1131
1132 use differential_dataflow::consolidation::consolidate_updates;
1133 use futures_util::FutureExt;
1134 use mz_dyncfg::ConfigUpdates;
1135 use mz_ore::collections::CollectionExt;
1136 use mz_ore::task;
1137 use serde_json::json;
1138
1139 use crate::cache::PersistClientCache;
1140 use crate::tests::{all_ok, new_test_client};
1141 use crate::{PersistLocation, ShardId};
1142
1143 use super::*;
1144
1145 #[mz_persist_proc::test(tokio::test)]
1146 #[cfg_attr(miri, ignore)] async fn empty_batches(dyncfgs: ConfigUpdates) {
1148 let data = [
1149 (("1".to_owned(), "one".to_owned()), 1, 1),
1150 (("2".to_owned(), "two".to_owned()), 2, 1),
1151 (("3".to_owned(), "three".to_owned()), 3, 1),
1152 ];
1153
1154 let (mut write, _) = new_test_client(&dyncfgs)
1155 .await
1156 .expect_open::<String, String, u64, i64>(ShardId::new())
1157 .await;
1158 let blob = Arc::clone(&write.blob);
1159
1160 let mut upper = 3;
1162 write.expect_append(&data[..2], vec![0], vec![upper]).await;
1163
1164 let mut count_before = 0;
1166 blob.list_keys_and_metadata("", &mut |_| {
1167 count_before += 1;
1168 })
1169 .await
1170 .expect("list_keys failed");
1171 for _ in 0..5 {
1172 let new_upper = upper + 1;
1173 write.expect_compare_and_append(&[], upper, new_upper).await;
1174 upper = new_upper;
1175 }
1176 let mut count_after = 0;
1177 blob.list_keys_and_metadata("", &mut |_| {
1178 count_after += 1;
1179 })
1180 .await
1181 .expect("list_keys failed");
1182 assert_eq!(count_after, count_before);
1183 }
1184
1185 #[mz_persist_proc::test(tokio::test)]
1186 #[cfg_attr(miri, ignore)] async fn compare_and_append_batch_multi(dyncfgs: ConfigUpdates) {
1188 let data0 = vec![
1189 (("1".to_owned(), "one".to_owned()), 1, 1),
1190 (("2".to_owned(), "two".to_owned()), 2, 1),
1191 (("4".to_owned(), "four".to_owned()), 4, 1),
1192 ];
1193 let data1 = vec![
1194 (("1".to_owned(), "one".to_owned()), 1, 1),
1195 (("2".to_owned(), "two".to_owned()), 2, 1),
1196 (("3".to_owned(), "three".to_owned()), 3, 1),
1197 ];
1198
1199 let (mut write, mut read) = new_test_client(&dyncfgs)
1200 .await
1201 .expect_open::<String, String, u64, i64>(ShardId::new())
1202 .await;
1203
1204 let mut batch0 = write.expect_batch(&data0, 0, 5).await;
1205 let mut batch1 = write.expect_batch(&data1, 0, 4).await;
1206
1207 write
1208 .expect_compare_and_append_batch(&mut [&mut batch0, &mut batch1], 0, 4)
1209 .await;
1210
1211 let batch = write
1212 .machine
1213 .snapshot(&Antichain::from_elem(3))
1214 .await
1215 .expect("just wrote this")
1216 .into_element();
1217
1218 assert!(batch.runs().count() >= 2);
1219
1220 let expected = vec![
1221 (("1".to_owned(), "one".to_owned()), 1, 2),
1222 (("2".to_owned(), "two".to_owned()), 2, 2),
1223 (("3".to_owned(), "three".to_owned()), 3, 1),
1224 ];
1225 let mut actual = read.expect_snapshot_and_fetch(3).await;
1226 consolidate_updates(&mut actual);
1227 assert_eq!(actual, all_ok(&expected, 3));
1228 }
1229
1230 #[mz_ore::test]
1231 fn writer_id_human_readable_serde() {
1232 #[derive(Debug, Serialize, Deserialize)]
1233 struct Container {
1234 writer_id: WriterId,
1235 }
1236
1237 let id = WriterId::from_str("w00000000-1234-5678-0000-000000000000").expect("valid id");
1239 assert_eq!(
1240 id,
1241 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1242 .expect("deserializable")
1243 );
1244
1245 assert_eq!(
1247 id,
1248 serde_json::from_str("\"w00000000-1234-5678-0000-000000000000\"")
1249 .expect("deserializable")
1250 );
1251
1252 let json = json!({ "writer_id": id });
1254 assert_eq!(
1255 "{\"writer_id\":\"w00000000-1234-5678-0000-000000000000\"}",
1256 &json.to_string()
1257 );
1258 let container: Container = serde_json::from_value(json).expect("deserializable");
1259 assert_eq!(container.writer_id, id);
1260 }
1261
1262 #[mz_persist_proc::test(tokio::test)]
1263 #[cfg_attr(miri, ignore)] async fn hollow_batch_roundtrip(dyncfgs: ConfigUpdates) {
1265 let data = vec![
1266 (("1".to_owned(), "one".to_owned()), 1, 1),
1267 (("2".to_owned(), "two".to_owned()), 2, 1),
1268 (("3".to_owned(), "three".to_owned()), 3, 1),
1269 ];
1270
1271 let (mut write, mut read) = new_test_client(&dyncfgs)
1272 .await
1273 .expect_open::<String, String, u64, i64>(ShardId::new())
1274 .await;
1275
1276 let batch = write.expect_batch(&data, 0, 4).await;
1281 let hollow_batch = batch.into_transmittable_batch();
1282 let mut rehydrated_batch = write.batch_from_transmittable_batch(hollow_batch);
1283
1284 write
1285 .expect_compare_and_append_batch(&mut [&mut rehydrated_batch], 0, 4)
1286 .await;
1287
1288 let expected = vec![
1289 (("1".to_owned(), "one".to_owned()), 1, 1),
1290 (("2".to_owned(), "two".to_owned()), 2, 1),
1291 (("3".to_owned(), "three".to_owned()), 3, 1),
1292 ];
1293 let mut actual = read.expect_snapshot_and_fetch(3).await;
1294 consolidate_updates(&mut actual);
1295 assert_eq!(actual, all_ok(&expected, 3));
1296 }
1297
1298 #[mz_persist_proc::test(tokio::test)]
1299 #[cfg_attr(miri, ignore)] async fn wait_for_upper_past(dyncfgs: ConfigUpdates) {
1301 let client = new_test_client(&dyncfgs).await;
1302 let (mut write, _) = client.expect_open::<(), (), u64, i64>(ShardId::new()).await;
1303 let five = Antichain::from_elem(5);
1304
1305 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1307
1308 write
1310 .expect_compare_and_append(&[(((), ()), 1, 1)], 0, 5)
1311 .await;
1312 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1313
1314 write
1316 .expect_compare_and_append(&[(((), ()), 5, 1)], 5, 7)
1317 .await;
1318 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), Some(()));
1319 assert_eq!(write.upper(), &Antichain::from_elem(7));
1320
1321 assert_eq!(
1324 write
1325 .wait_for_upper_past(&Antichain::from_elem(2))
1326 .now_or_never(),
1327 Some(())
1328 );
1329 assert_eq!(write.upper(), &Antichain::from_elem(7));
1330 }
1331
1332 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1333 #[cfg_attr(miri, ignore)] async fn fetch_recent_upper_linearized() {
1335 type Timestamp = u64;
1336 let max_upper = 1000;
1337
1338 let shard_id = ShardId::new();
1339 let mut clients = PersistClientCache::new_no_metrics();
1340 let upper_writer_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1341 let (mut upper_writer, _) = upper_writer_client
1342 .expect_open::<(), (), Timestamp, i64>(shard_id)
1343 .await;
1344 clients.clear_state_cache();
1347 let upper_reader_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1348 let (mut upper_reader, _) = upper_reader_client
1349 .expect_open::<(), (), Timestamp, i64>(shard_id)
1350 .await;
1351 let (tx, rx) = mpsc::channel();
1352
1353 let task = task::spawn(|| "upper-reader", async move {
1354 let mut upper = Timestamp::MIN;
1355
1356 while upper < max_upper {
1357 while let Ok(new_upper) = rx.try_recv() {
1358 upper = new_upper;
1359 }
1360
1361 let recent_upper = upper_reader
1362 .fetch_recent_upper()
1363 .await
1364 .as_option()
1365 .cloned()
1366 .expect("u64 is totally ordered and the shard is not finalized");
1367 assert!(
1368 recent_upper >= upper,
1369 "recent upper {recent_upper:?} is less than known upper {upper:?}"
1370 );
1371 }
1372 });
1373
1374 for upper in Timestamp::MIN..max_upper {
1375 let next_upper = upper + 1;
1376 upper_writer
1377 .expect_compare_and_append(&[], upper, next_upper)
1378 .await;
1379 tx.send(next_upper).expect("send failed");
1380 }
1381
1382 task.await.expect("await failed");
1383 }
1384}