1use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::Description;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_dyncfg::Config;
22use mz_ore::task::RuntimeExt;
23use mz_ore::{instrument, soft_panic_or_log};
24use mz_persist::location::Blob;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::{Codec, Codec64};
27use mz_proto::{IntoRustIfSome, ProtoType};
28use proptest_derive::Arbitrary;
29use semver::Version;
30use serde::{Deserialize, Serialize};
31use timely::PartialOrder;
32use timely::order::TotalOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::runtime::Handle;
35use tracing::{Instrument, debug_span, error, info, warn};
36use uuid::Uuid;
37
38use crate::batch::{
39 Added, BATCH_DELETE_ENABLED, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
40 BatchParts, ProtoBatch, validate_truncate_batch,
41};
42use crate::error::{InvalidUsage, UpperMismatch};
43use crate::fetch::{
44 EncodedPart, FetchBatchFilter, FetchedPart, PartDecodeFormat, VALIDATE_PART_BOUNDS_ON_READ,
45};
46use crate::internal::compact::{CompactConfig, Compactor};
47use crate::internal::encoding::{Schemas, assert_code_can_read_data};
48use crate::internal::machine::{CompareAndAppendRes, ExpireFn, Machine};
49use crate::internal::metrics::{BatchWriteMetrics, Metrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HandleDebugState, HollowBatch, RunOrder, RunPart};
51use crate::read::ReadHandle;
52use crate::schema::PartMigration;
53use crate::{GarbageCollector, IsolatedRuntime, PersistConfig, ShardId, parse_id};
54
55pub(crate) const COMBINE_INLINE_WRITES: Config<bool> = Config::new(
56 "persist_write_combine_inline_writes",
57 true,
58 "If set, re-encode inline writes if they don't fit into the batch metadata limits.",
59);
60
61pub(crate) const VALIDATE_PART_BOUNDS_ON_WRITE: Config<bool> = Config::new(
62 "persist_validate_part_bounds_on_write",
63 false,
64 "Validate the part lower <= the batch lower and the part upper <= batch upper,\
65 for the batch being appended.",
66);
67
68#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
70#[serde(try_from = "String", into = "String")]
71pub struct WriterId(pub(crate) [u8; 16]);
72
73impl std::fmt::Display for WriterId {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 write!(f, "w{}", Uuid::from_bytes(self.0))
76 }
77}
78
79impl std::fmt::Debug for WriterId {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 write!(f, "WriterId({})", Uuid::from_bytes(self.0))
82 }
83}
84
85impl std::str::FromStr for WriterId {
86 type Err = String;
87
88 fn from_str(s: &str) -> Result<Self, Self::Err> {
89 parse_id("w", "WriterId", s).map(WriterId)
90 }
91}
92
93impl From<WriterId> for String {
94 fn from(writer_id: WriterId) -> Self {
95 writer_id.to_string()
96 }
97}
98
99impl TryFrom<String> for WriterId {
100 type Error = String;
101
102 fn try_from(s: String) -> Result<Self, Self::Error> {
103 s.parse()
104 }
105}
106
107impl WriterId {
108 pub(crate) fn new() -> Self {
109 WriterId(*Uuid::new_v4().as_bytes())
110 }
111}
112
113#[derive(Debug)]
129pub struct WriteHandle<K: Codec, V: Codec, T, D> {
130 pub(crate) cfg: PersistConfig,
131 pub(crate) metrics: Arc<Metrics>,
132 pub(crate) machine: Machine<K, V, T, D>,
133 pub(crate) gc: GarbageCollector<K, V, T, D>,
134 pub(crate) compact: Option<Compactor<K, V, T, D>>,
135 pub(crate) blob: Arc<dyn Blob>,
136 pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
137 pub(crate) writer_id: WriterId,
138 pub(crate) debug_state: HandleDebugState,
139 pub(crate) write_schemas: Schemas<K, V>,
140
141 pub(crate) upper: Antichain<T>,
142 expire_fn: Option<ExpireFn>,
143}
144
145impl<K, V, T, D> WriteHandle<K, V, T, D>
146where
147 K: Debug + Codec,
148 V: Debug + Codec,
149 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
150 D: Monoid + Ord + Codec64 + Send + Sync,
151{
152 pub(crate) fn new(
153 cfg: PersistConfig,
154 metrics: Arc<Metrics>,
155 machine: Machine<K, V, T, D>,
156 gc: GarbageCollector<K, V, T, D>,
157 blob: Arc<dyn Blob>,
158 writer_id: WriterId,
159 purpose: &str,
160 write_schemas: Schemas<K, V>,
161 ) -> Self {
162 let isolated_runtime = Arc::clone(&machine.isolated_runtime);
163 let compact = cfg
164 .compaction_enabled
165 .then(|| Compactor::new(cfg.clone(), Arc::clone(&metrics), gc.clone()));
166 let debug_state = HandleDebugState {
167 hostname: cfg.hostname.to_owned(),
168 purpose: purpose.to_owned(),
169 };
170 let upper = machine.applier.clone_upper();
171 let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), writer_id.clone());
172 WriteHandle {
173 cfg,
174 metrics,
175 machine,
176 gc,
177 compact,
178 blob,
179 isolated_runtime,
180 writer_id,
181 debug_state,
182 write_schemas,
183 upper,
184 expire_fn: Some(expire_fn),
185 }
186 }
187
188 pub fn from_read(read: &ReadHandle<K, V, T, D>, purpose: &str) -> Self {
191 Self::new(
192 read.cfg.clone(),
193 Arc::clone(&read.metrics),
194 read.machine.clone(),
195 read.gc.clone(),
196 Arc::clone(&read.blob),
197 WriterId::new(),
198 purpose,
199 read.read_schemas.clone(),
200 )
201 }
202
203 pub fn validate_part_bounds_on_write(&self) -> bool {
206 VALIDATE_PART_BOUNDS_ON_WRITE.get(&self.cfg) || VALIDATE_PART_BOUNDS_ON_READ.get(&self.cfg)
209 }
210
211 pub fn shard_id(&self) -> ShardId {
213 self.machine.shard_id()
214 }
215
216 pub fn schema_id(&self) -> Option<SchemaId> {
218 self.write_schemas.id
219 }
220
221 pub async fn try_register_schema(&mut self) -> Option<SchemaId> {
227 let Schemas { id, key, val } = &self.write_schemas;
228
229 if let Some(id) = id {
230 return Some(*id);
231 }
232
233 let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
234 maintenance.start_performing(&self.machine, &self.gc);
235
236 self.write_schemas.id = schema_id;
237 schema_id
238 }
239
240 pub fn upper(&self) -> &Antichain<T> {
247 &self.upper
248 }
249
250 pub fn shared_upper(&self) -> Antichain<T> {
256 self.machine.applier.clone_upper()
257 }
258
259 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
265 pub async fn fetch_recent_upper(&mut self) -> &Antichain<T> {
266 self.machine
269 .applier
270 .fetch_upper(|current_upper| self.upper.clone_from(current_upper))
271 .await;
272 &self.upper
273 }
274
275 pub async fn advance_upper(&mut self, target: &Antichain<T>) {
283 let mut lower = self.shared_upper().clone();
286
287 while !PartialOrder::less_equal(target, &lower) {
288 let since = Antichain::from_elem(T::minimum());
289 let desc = Description::new(lower.clone(), target.clone(), since);
290 let batch = HollowBatch::empty(desc);
291
292 let heartbeat_timestamp = (self.cfg.now)();
293 let res = self
294 .machine
295 .compare_and_append(
296 &batch,
297 &self.writer_id,
298 &self.debug_state,
299 heartbeat_timestamp,
300 )
301 .await;
302
303 use CompareAndAppendRes::*;
304 let new_upper = match res {
305 Success(_seq_no, maintenance) => {
306 maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
307 batch.desc.upper().clone()
308 }
309 UpperMismatch(_seq_no, actual_upper) => actual_upper,
310 InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
311 InlineBackpressure => unreachable!("batch was empty"),
312 };
313
314 self.upper.clone_from(&new_upper);
315 lower = new_upper;
316 }
317 }
318
319 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
349 pub async fn append<SB, KB, VB, TB, DB, I>(
350 &mut self,
351 updates: I,
352 lower: Antichain<T>,
353 upper: Antichain<T>,
354 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
355 where
356 SB: Borrow<((KB, VB), TB, DB)>,
357 KB: Borrow<K>,
358 VB: Borrow<V>,
359 TB: Borrow<T>,
360 DB: Borrow<D>,
361 I: IntoIterator<Item = SB>,
362 D: Send + Sync,
363 {
364 let batch = self.batch(updates, lower.clone(), upper.clone()).await?;
365 self.append_batch(batch, lower, upper).await
366 }
367
368 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
397 pub async fn compare_and_append<SB, KB, VB, TB, DB, I>(
398 &mut self,
399 updates: I,
400 expected_upper: Antichain<T>,
401 new_upper: Antichain<T>,
402 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
403 where
404 SB: Borrow<((KB, VB), TB, DB)>,
405 KB: Borrow<K>,
406 VB: Borrow<V>,
407 TB: Borrow<T>,
408 DB: Borrow<D>,
409 I: IntoIterator<Item = SB>,
410 D: Send + Sync,
411 {
412 let mut batch = self
413 .batch(updates, expected_upper.clone(), new_upper.clone())
414 .await?;
415 match self
416 .compare_and_append_batch(&mut [&mut batch], expected_upper, new_upper, true)
417 .await
418 {
419 ok @ Ok(Ok(())) => ok,
420 err => {
421 batch.delete().await;
426 err
427 }
428 }
429 }
430
431 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
457 pub async fn append_batch(
458 &mut self,
459 mut batch: Batch<K, V, T, D>,
460 mut lower: Antichain<T>,
461 upper: Antichain<T>,
462 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
463 where
464 D: Send + Sync,
465 {
466 loop {
467 let res = self
468 .compare_and_append_batch(&mut [&mut batch], lower.clone(), upper.clone(), true)
469 .await?;
470 match res {
471 Ok(()) => {
472 self.upper = upper;
473 return Ok(Ok(()));
474 }
475 Err(mismatch) => {
476 if PartialOrder::less_than(&mismatch.current, &lower) {
478 self.upper.clone_from(&mismatch.current);
479
480 batch.delete().await;
481
482 return Ok(Err(mismatch));
483 } else if PartialOrder::less_than(&mismatch.current, &upper) {
484 lower = mismatch.current;
491 } else {
492 self.upper = mismatch.current;
494
495 batch.delete().await;
499
500 return Ok(Ok(()));
501 }
502 }
503 }
504 }
505 }
506
507 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
541 pub async fn compare_and_append_batch(
542 &mut self,
543 batches: &mut [&mut Batch<K, V, T, D>],
544 expected_upper: Antichain<T>,
545 new_upper: Antichain<T>,
546 validate_part_bounds_on_write: bool,
547 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
548 where
549 D: Send + Sync,
550 {
551 let schema_id = self.try_register_schema().await;
555
556 for batch in batches.iter() {
557 if self.machine.shard_id() != batch.shard_id() {
558 return Err(InvalidUsage::BatchNotFromThisShard {
559 batch_shard: batch.shard_id(),
560 handle_shard: self.machine.shard_id(),
561 });
562 }
563 assert_code_can_read_data(&self.cfg.build_version, &batch.version);
564 if self.cfg.build_version > batch.version {
565 info!(
566 shard_id =? self.machine.shard_id(),
567 batch_version =? batch.version,
568 writer_version =? self.cfg.build_version,
569 "Appending batch from the past. This is fine but should be rare. \
570 TODO: Error on very old versions once the leaked blob detector exists."
571 )
572 }
573 fn assert_schema<A: Codec>(writer_schema: &A::Schema, batch_schema: &bytes::Bytes) {
574 if batch_schema.is_empty() {
575 return;
577 }
578 let batch_schema: A::Schema = A::decode_schema(batch_schema);
579 if *writer_schema != batch_schema {
580 error!(
581 ?writer_schema,
582 ?batch_schema,
583 "writer and batch schemas should be identical"
584 );
585 soft_panic_or_log!("writer and batch schemas should be identical");
586 }
587 }
588 assert_schema::<K>(&*self.write_schemas.key, &batch.schemas.0);
589 assert_schema::<V>(&*self.write_schemas.val, &batch.schemas.1);
590 }
591
592 let lower = expected_upper.clone();
593 let upper = new_upper;
594 let since = Antichain::from_elem(T::minimum());
595 let desc = Description::new(lower, upper, since);
596
597 let mut received_inline_backpressure = false;
598 let mut inline_batch_builder: Option<(_, BatchBuilder<K, V, T, D>)> = None;
605 let maintenance = loop {
606 let any_batch_rewrite = batches
607 .iter()
608 .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some()));
609 let (mut parts, mut num_updates, mut run_splits, mut run_metas) =
610 (vec![], 0, vec![], vec![]);
611 let mut key_storage = None;
612 let mut val_storage = None;
613 for batch in batches.iter() {
614 let () = validate_truncate_batch(
615 &batch.batch,
616 &desc,
617 any_batch_rewrite,
618 validate_part_bounds_on_write,
619 )?;
620 for (run_meta, run) in batch.batch.runs() {
621 let start_index = parts.len();
622 for part in run {
623 if let (
624 RunPart::Single(
625 batch_part @ BatchPart::Inline {
626 updates,
627 ts_rewrite,
628 schema_id: _,
629 deprecated_schema_id: _,
630 },
631 ),
632 Some((schema_cache, builder)),
633 ) = (part, &mut inline_batch_builder)
634 {
635 let schema_migration = PartMigration::new(
636 batch_part,
637 self.write_schemas.clone(),
638 schema_cache,
639 )
640 .await
641 .expect("schemas for inline user part");
642
643 let encoded_part = EncodedPart::from_inline(
644 &crate::fetch::FetchConfig::from_persist_config(&self.cfg),
645 &*self.metrics,
646 self.metrics.read.compaction.clone(),
647 desc.clone(),
648 updates,
649 ts_rewrite.as_ref(),
650 );
651 let mut fetched_part = FetchedPart::new(
652 Arc::clone(&self.metrics),
653 encoded_part,
654 schema_migration,
655 FetchBatchFilter::Compaction {
656 since: desc.since().clone(),
657 },
658 false,
659 PartDecodeFormat::Arrow,
660 None,
661 );
662
663 while let Some(((k, v), t, d)) =
664 fetched_part.next_with_storage(&mut key_storage, &mut val_storage)
665 {
666 builder
667 .add(&k, &v, &t, &d)
668 .await
669 .expect("re-encoding just-decoded data");
670 }
671 } else {
672 parts.push(part.clone())
673 }
674 }
675
676 let end_index = parts.len();
677
678 if start_index == end_index {
679 continue;
680 }
681
682 if start_index != 0 {
684 run_splits.push(start_index);
685 }
686 run_metas.push(run_meta.clone());
687 }
688 num_updates += batch.batch.len;
689 }
690
691 let mut flushed_inline_batch = if let Some((_, builder)) = inline_batch_builder.take() {
692 let mut finished = builder
693 .finish(desc.upper().clone())
694 .await
695 .expect("invalid usage");
696 let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
697 finished
698 .flush_to_blob(
699 &cfg,
700 &self.metrics.inline.backpressure,
701 &self.isolated_runtime,
702 &self.write_schemas,
703 )
704 .await;
705 Some(finished)
706 } else {
707 None
708 };
709
710 if let Some(batch) = &flushed_inline_batch {
711 for (run_meta, run) in batch.batch.runs() {
712 assert!(run.len() > 0);
713 let start_index = parts.len();
714 if start_index != 0 {
715 run_splits.push(start_index);
716 }
717 run_metas.push(run_meta.clone());
718 parts.extend(run.iter().cloned())
719 }
720 }
721
722 let mut combined_batch =
723 HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits);
724
725 match schema_id {
729 Some(schema_id) => {
730 ensure_batch_schema(&mut combined_batch, self.shard_id(), schema_id);
731 }
732 None => {
733 assert!(
734 self.fetch_recent_upper().await.is_empty(),
735 "fetching a schema id should only fail when the shard is tombstoned"
736 )
737 }
738 }
739
740 let heartbeat_timestamp = (self.cfg.now)();
741 let res = self
742 .machine
743 .compare_and_append(
744 &combined_batch,
745 &self.writer_id,
746 &self.debug_state,
747 heartbeat_timestamp,
748 )
749 .await;
750
751 match res {
752 CompareAndAppendRes::Success(_seqno, maintenance) => {
753 self.upper.clone_from(desc.upper());
754 for batch in batches.iter_mut() {
755 batch.mark_consumed();
756 }
757 if let Some(batch) = &mut flushed_inline_batch {
758 batch.mark_consumed();
759 }
760 break maintenance;
761 }
762 CompareAndAppendRes::InvalidUsage(invalid_usage) => {
763 if let Some(batch) = flushed_inline_batch.take() {
764 batch.delete().await;
765 }
766 return Err(invalid_usage);
767 }
768 CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => {
769 if let Some(batch) = flushed_inline_batch.take() {
770 batch.delete().await;
771 }
772 self.upper.clone_from(¤t_upper);
775 return Ok(Err(UpperMismatch {
776 current: current_upper,
777 expected: expected_upper,
778 }));
779 }
780 CompareAndAppendRes::InlineBackpressure => {
781 assert_eq!(received_inline_backpressure, false);
784 received_inline_backpressure = true;
785 if COMBINE_INLINE_WRITES.get(&self.cfg) {
786 inline_batch_builder = Some((
787 self.machine.applier.schema_cache(),
788 self.builder(desc.lower().clone()),
789 ));
790 continue;
791 }
792
793 let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
794 let flush_batches = batches
797 .iter_mut()
798 .map(|batch| async {
799 batch
800 .flush_to_blob(
801 &cfg,
802 &self.metrics.inline.backpressure,
803 &self.isolated_runtime,
804 &self.write_schemas,
805 )
806 .await
807 })
808 .collect::<FuturesUnordered<_>>();
809 let () = flush_batches.collect::<()>().await;
810
811 for batch in batches.iter() {
812 assert_eq!(batch.batch.inline_bytes(), 0);
813 }
814
815 continue;
816 }
817 }
818 };
819
820 maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
821
822 Ok(Ok(()))
823 }
824
825 pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> {
828 let shard_id: ShardId = batch
829 .shard_id
830 .into_rust()
831 .expect("valid transmittable batch");
832 assert_eq!(shard_id, self.machine.shard_id());
833
834 let ret = Batch {
835 batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
836 metrics: Arc::clone(&self.metrics),
837 shard_metrics: Arc::clone(&self.machine.applier.shard_metrics),
838 version: Version::parse(&batch.version).expect("valid transmittable batch"),
839 schemas: (batch.key_schema, batch.val_schema),
840 batch: batch
841 .batch
842 .into_rust_if_some("ProtoBatch::batch")
843 .expect("valid transmittable batch"),
844 blob: Arc::clone(&self.blob),
845 _phantom: std::marker::PhantomData,
846 };
847 assert_eq!(ret.shard_id(), self.machine.shard_id());
848 ret
849 }
850
851 pub fn builder(&self, lower: Antichain<T>) -> BatchBuilder<K, V, T, D> {
864 Self::builder_inner(
865 &self.cfg,
866 CompactConfig::new(&self.cfg, self.shard_id()),
867 Arc::clone(&self.metrics),
868 Arc::clone(&self.machine.applier.shard_metrics),
869 &self.metrics.user,
870 Arc::clone(&self.isolated_runtime),
871 Arc::clone(&self.blob),
872 self.shard_id(),
873 self.write_schemas.clone(),
874 lower,
875 )
876 }
877
878 pub(crate) fn builder_inner(
881 persist_cfg: &PersistConfig,
882 compact_cfg: CompactConfig,
883 metrics: Arc<Metrics>,
884 shard_metrics: Arc<ShardMetrics>,
885 user_batch_metrics: &BatchWriteMetrics,
886 isolated_runtime: Arc<IsolatedRuntime>,
887 blob: Arc<dyn Blob>,
888 shard_id: ShardId,
889 schemas: Schemas<K, V>,
890 lower: Antichain<T>,
891 ) -> BatchBuilder<K, V, T, D> {
892 let parts = if let Some(max_runs) = compact_cfg.batch.max_runs {
893 BatchParts::new_compacting::<K, V, D>(
894 compact_cfg,
895 Description::new(
896 lower.clone(),
897 Antichain::new(),
898 Antichain::from_elem(T::minimum()),
899 ),
900 max_runs,
901 Arc::clone(&metrics),
902 shard_metrics,
903 shard_id,
904 Arc::clone(&blob),
905 isolated_runtime,
906 user_batch_metrics,
907 schemas.clone(),
908 )
909 } else {
910 BatchParts::new_ordered::<D>(
911 compact_cfg.batch,
912 RunOrder::Unordered,
913 Arc::clone(&metrics),
914 shard_metrics,
915 shard_id,
916 Arc::clone(&blob),
917 isolated_runtime,
918 user_batch_metrics,
919 )
920 };
921 let builder = BatchBuilderInternal::new(
922 BatchBuilderConfig::new(persist_cfg, shard_id),
923 parts,
924 metrics,
925 schemas,
926 blob,
927 shard_id,
928 persist_cfg.build_version.clone(),
929 );
930 BatchBuilder::new(
931 builder,
932 Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())),
933 )
934 }
935
936 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
939 pub async fn batch<SB, KB, VB, TB, DB, I>(
940 &mut self,
941 updates: I,
942 lower: Antichain<T>,
943 upper: Antichain<T>,
944 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>>
945 where
946 SB: Borrow<((KB, VB), TB, DB)>,
947 KB: Borrow<K>,
948 VB: Borrow<V>,
949 TB: Borrow<T>,
950 DB: Borrow<D>,
951 I: IntoIterator<Item = SB>,
952 {
953 let iter = updates.into_iter();
954
955 let mut builder = self.builder(lower.clone());
956
957 for update in iter {
958 let ((k, v), t, d) = update.borrow();
959 let (k, v, t, d) = (k.borrow(), v.borrow(), t.borrow(), d.borrow());
960 match builder.add(k, v, t, d).await {
961 Ok(Added::Record | Added::RecordAndParts) => (),
962 Err(invalid_usage) => return Err(invalid_usage),
963 }
964 }
965
966 builder.finish(upper.clone()).await
967 }
968
969 pub async fn wait_for_upper_past(&mut self, frontier: &Antichain<T>) {
971 let mut watch = self.machine.applier.watch();
972 let batch = self
973 .machine
974 .next_listen_batch(frontier, &mut watch, None, None)
975 .await;
976 if PartialOrder::less_than(&self.upper, batch.desc.upper()) {
977 self.upper.clone_from(batch.desc.upper());
978 }
979 assert!(PartialOrder::less_than(frontier, &self.upper));
980 }
981
982 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
991 pub async fn expire(mut self) {
992 let Some(expire_fn) = self.expire_fn.take() else {
993 return;
994 };
995 expire_fn.0().await;
996 }
997
998 fn expire_fn(
999 machine: Machine<K, V, T, D>,
1000 gc: GarbageCollector<K, V, T, D>,
1001 writer_id: WriterId,
1002 ) -> ExpireFn {
1003 ExpireFn(Box::new(move || {
1004 Box::pin(async move {
1005 let (_, maintenance) = machine.expire_writer(&writer_id).await;
1006 maintenance.start_performing(&machine, &gc);
1007 })
1008 }))
1009 }
1010
1011 #[cfg(test)]
1013 #[track_caller]
1014 pub async fn expect_append<L, U>(&mut self, updates: &[((K, V), T, D)], lower: L, new_upper: U)
1015 where
1016 L: Into<Antichain<T>>,
1017 U: Into<Antichain<T>>,
1018 D: Send + Sync,
1019 {
1020 self.append(updates.iter(), lower.into(), new_upper.into())
1021 .await
1022 .expect("invalid usage")
1023 .expect("unexpected upper");
1024 }
1025
1026 #[cfg(test)]
1029 #[track_caller]
1030 pub async fn expect_compare_and_append(
1031 &mut self,
1032 updates: &[((K, V), T, D)],
1033 expected_upper: T,
1034 new_upper: T,
1035 ) where
1036 D: Send + Sync,
1037 {
1038 self.compare_and_append(
1039 updates.iter().map(|((k, v), t, d)| ((k, v), t, d)),
1040 Antichain::from_elem(expected_upper),
1041 Antichain::from_elem(new_upper),
1042 )
1043 .await
1044 .expect("invalid usage")
1045 .expect("unexpected upper")
1046 }
1047
1048 #[cfg(test)]
1051 #[track_caller]
1052 pub async fn expect_compare_and_append_batch(
1053 &mut self,
1054 batches: &mut [&mut Batch<K, V, T, D>],
1055 expected_upper: T,
1056 new_upper: T,
1057 ) {
1058 self.compare_and_append_batch(
1059 batches,
1060 Antichain::from_elem(expected_upper),
1061 Antichain::from_elem(new_upper),
1062 true,
1063 )
1064 .await
1065 .expect("invalid usage")
1066 .expect("unexpected upper")
1067 }
1068
1069 #[cfg(test)]
1071 #[track_caller]
1072 pub async fn expect_batch(
1073 &mut self,
1074 updates: &[((K, V), T, D)],
1075 lower: T,
1076 upper: T,
1077 ) -> Batch<K, V, T, D> {
1078 self.batch(
1079 updates.iter(),
1080 Antichain::from_elem(lower),
1081 Antichain::from_elem(upper),
1082 )
1083 .await
1084 .expect("invalid usage")
1085 }
1086}
1087
1088impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D> {
1089 fn drop(&mut self) {
1090 let Some(expire_fn) = self.expire_fn.take() else {
1091 return;
1092 };
1093 let handle = match Handle::try_current() {
1094 Ok(x) => x,
1095 Err(_) => {
1096 warn!(
1097 "WriteHandle {} dropped without being explicitly expired, falling back to lease timeout",
1098 self.writer_id
1099 );
1100 return;
1101 }
1102 };
1103 let expire_span = debug_span!("drop::expire");
1109 handle.spawn_named(
1110 || format!("WriteHandle::expire ({})", self.writer_id),
1111 expire_fn.0().instrument(expire_span),
1112 );
1113 }
1114}
1115
1116fn ensure_batch_schema<T>(batch: &mut HollowBatch<T>, shard_id: ShardId, schema_id: SchemaId)
1121where
1122 T: Timestamp + Lattice + Codec64,
1123{
1124 let ensure = |id: &mut Option<SchemaId>| match id {
1125 Some(id) => assert_eq!(*id, schema_id, "schema ID mismatch; shard={shard_id}"),
1126 None => *id = Some(schema_id),
1127 };
1128
1129 for run_meta in &mut batch.run_meta {
1130 ensure(&mut run_meta.schema);
1131 }
1132 for part in &mut batch.parts {
1133 match part {
1134 RunPart::Single(BatchPart::Hollow(part)) => ensure(&mut part.schema_id),
1135 RunPart::Single(BatchPart::Inline { schema_id, .. }) => ensure(schema_id),
1136 RunPart::Many(_hollow_run_ref) => {
1137 }
1141 }
1142 }
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147 use std::str::FromStr;
1148 use std::sync::mpsc;
1149
1150 use differential_dataflow::consolidation::consolidate_updates;
1151 use futures_util::FutureExt;
1152 use mz_dyncfg::ConfigUpdates;
1153 use mz_ore::collections::CollectionExt;
1154 use mz_ore::task;
1155 use serde_json::json;
1156
1157 use crate::cache::PersistClientCache;
1158 use crate::tests::{all_ok, new_test_client};
1159 use crate::{PersistLocation, ShardId};
1160
1161 use super::*;
1162
1163 #[mz_persist_proc::test(tokio::test)]
1164 #[cfg_attr(miri, ignore)] async fn empty_batches(dyncfgs: ConfigUpdates) {
1166 let data = [
1167 (("1".to_owned(), "one".to_owned()), 1, 1),
1168 (("2".to_owned(), "two".to_owned()), 2, 1),
1169 (("3".to_owned(), "three".to_owned()), 3, 1),
1170 ];
1171
1172 let (mut write, _) = new_test_client(&dyncfgs)
1173 .await
1174 .expect_open::<String, String, u64, i64>(ShardId::new())
1175 .await;
1176 let blob = Arc::clone(&write.blob);
1177
1178 let mut upper = 3;
1180 write.expect_append(&data[..2], vec![0], vec![upper]).await;
1181
1182 let mut count_before = 0;
1184 blob.list_keys_and_metadata("", &mut |_| {
1185 count_before += 1;
1186 })
1187 .await
1188 .expect("list_keys failed");
1189 for _ in 0..5 {
1190 let new_upper = upper + 1;
1191 write.expect_compare_and_append(&[], upper, new_upper).await;
1192 upper = new_upper;
1193 }
1194 let mut count_after = 0;
1195 blob.list_keys_and_metadata("", &mut |_| {
1196 count_after += 1;
1197 })
1198 .await
1199 .expect("list_keys failed");
1200 assert_eq!(count_after, count_before);
1201 }
1202
1203 #[mz_persist_proc::test(tokio::test)]
1204 #[cfg_attr(miri, ignore)] async fn compare_and_append_batch_multi(dyncfgs: ConfigUpdates) {
1206 let data0 = vec![
1207 (("1".to_owned(), "one".to_owned()), 1, 1),
1208 (("2".to_owned(), "two".to_owned()), 2, 1),
1209 (("4".to_owned(), "four".to_owned()), 4, 1),
1210 ];
1211 let data1 = vec![
1212 (("1".to_owned(), "one".to_owned()), 1, 1),
1213 (("2".to_owned(), "two".to_owned()), 2, 1),
1214 (("3".to_owned(), "three".to_owned()), 3, 1),
1215 ];
1216
1217 let (mut write, mut read) = new_test_client(&dyncfgs)
1218 .await
1219 .expect_open::<String, String, u64, i64>(ShardId::new())
1220 .await;
1221
1222 let mut batch0 = write.expect_batch(&data0, 0, 5).await;
1223 let mut batch1 = write.expect_batch(&data1, 0, 4).await;
1224
1225 write
1226 .expect_compare_and_append_batch(&mut [&mut batch0, &mut batch1], 0, 4)
1227 .await;
1228
1229 let batch = write
1230 .machine
1231 .snapshot(&Antichain::from_elem(3))
1232 .await
1233 .expect("just wrote this")
1234 .into_element();
1235
1236 assert!(batch.runs().count() >= 2);
1237
1238 let expected = vec![
1239 (("1".to_owned(), "one".to_owned()), 1, 2),
1240 (("2".to_owned(), "two".to_owned()), 2, 2),
1241 (("3".to_owned(), "three".to_owned()), 3, 1),
1242 ];
1243 let mut actual = read.expect_snapshot_and_fetch(3).await;
1244 consolidate_updates(&mut actual);
1245 assert_eq!(actual, all_ok(&expected, 3));
1246 }
1247
1248 #[mz_ore::test]
1249 fn writer_id_human_readable_serde() {
1250 #[derive(Debug, Serialize, Deserialize)]
1251 struct Container {
1252 writer_id: WriterId,
1253 }
1254
1255 let id = WriterId::from_str("w00000000-1234-5678-0000-000000000000").expect("valid id");
1257 assert_eq!(
1258 id,
1259 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1260 .expect("deserializable")
1261 );
1262
1263 assert_eq!(
1265 id,
1266 serde_json::from_str("\"w00000000-1234-5678-0000-000000000000\"")
1267 .expect("deserializable")
1268 );
1269
1270 let json = json!({ "writer_id": id });
1272 assert_eq!(
1273 "{\"writer_id\":\"w00000000-1234-5678-0000-000000000000\"}",
1274 &json.to_string()
1275 );
1276 let container: Container = serde_json::from_value(json).expect("deserializable");
1277 assert_eq!(container.writer_id, id);
1278 }
1279
1280 #[mz_persist_proc::test(tokio::test)]
1281 #[cfg_attr(miri, ignore)] async fn hollow_batch_roundtrip(dyncfgs: ConfigUpdates) {
1283 let data = vec![
1284 (("1".to_owned(), "one".to_owned()), 1, 1),
1285 (("2".to_owned(), "two".to_owned()), 2, 1),
1286 (("3".to_owned(), "three".to_owned()), 3, 1),
1287 ];
1288
1289 let (mut write, mut read) = new_test_client(&dyncfgs)
1290 .await
1291 .expect_open::<String, String, u64, i64>(ShardId::new())
1292 .await;
1293
1294 let batch = write.expect_batch(&data, 0, 4).await;
1299 let hollow_batch = batch.into_transmittable_batch();
1300 let mut rehydrated_batch = write.batch_from_transmittable_batch(hollow_batch);
1301
1302 write
1303 .expect_compare_and_append_batch(&mut [&mut rehydrated_batch], 0, 4)
1304 .await;
1305
1306 let expected = vec![
1307 (("1".to_owned(), "one".to_owned()), 1, 1),
1308 (("2".to_owned(), "two".to_owned()), 2, 1),
1309 (("3".to_owned(), "three".to_owned()), 3, 1),
1310 ];
1311 let mut actual = read.expect_snapshot_and_fetch(3).await;
1312 consolidate_updates(&mut actual);
1313 assert_eq!(actual, all_ok(&expected, 3));
1314 }
1315
1316 #[mz_persist_proc::test(tokio::test)]
1317 #[cfg_attr(miri, ignore)] async fn wait_for_upper_past(dyncfgs: ConfigUpdates) {
1319 let client = new_test_client(&dyncfgs).await;
1320 let (mut write, _) = client.expect_open::<(), (), u64, i64>(ShardId::new()).await;
1321 let five = Antichain::from_elem(5);
1322
1323 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1325
1326 write
1328 .expect_compare_and_append(&[(((), ()), 1, 1)], 0, 5)
1329 .await;
1330 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1331
1332 write
1334 .expect_compare_and_append(&[(((), ()), 5, 1)], 5, 7)
1335 .await;
1336 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), Some(()));
1337 assert_eq!(write.upper(), &Antichain::from_elem(7));
1338
1339 assert_eq!(
1342 write
1343 .wait_for_upper_past(&Antichain::from_elem(2))
1344 .now_or_never(),
1345 Some(())
1346 );
1347 assert_eq!(write.upper(), &Antichain::from_elem(7));
1348 }
1349
1350 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1351 #[cfg_attr(miri, ignore)] async fn fetch_recent_upper_linearized() {
1353 type Timestamp = u64;
1354 let max_upper = 1000;
1355
1356 let shard_id = ShardId::new();
1357 let mut clients = PersistClientCache::new_no_metrics();
1358 let upper_writer_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1359 let (mut upper_writer, _) = upper_writer_client
1360 .expect_open::<(), (), Timestamp, i64>(shard_id)
1361 .await;
1362 clients.clear_state_cache();
1365 let upper_reader_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1366 let (mut upper_reader, _) = upper_reader_client
1367 .expect_open::<(), (), Timestamp, i64>(shard_id)
1368 .await;
1369 let (tx, rx) = mpsc::channel();
1370
1371 let task = task::spawn(|| "upper-reader", async move {
1372 let mut upper = Timestamp::MIN;
1373
1374 while upper < max_upper {
1375 while let Ok(new_upper) = rx.try_recv() {
1376 upper = new_upper;
1377 }
1378
1379 let recent_upper = upper_reader
1380 .fetch_recent_upper()
1381 .await
1382 .as_option()
1383 .cloned()
1384 .expect("u64 is totally ordered and the shard is not finalized");
1385 assert!(
1386 recent_upper >= upper,
1387 "recent upper {recent_upper:?} is less than known upper {upper:?}"
1388 );
1389 }
1390 });
1391
1392 for upper in Timestamp::MIN..max_upper {
1393 let next_upper = upper + 1;
1394 upper_writer
1395 .expect_compare_and_append(&[], upper, next_upper)
1396 .await;
1397 tx.send(next_upper).expect("send failed");
1398 }
1399
1400 task.await;
1401 }
1402}