1use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::Description;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_dyncfg::Config;
22use mz_ore::task::RuntimeExt;
23use mz_ore::{instrument, soft_panic_or_log};
24use mz_persist::location::Blob;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::{Codec, Codec64};
27use mz_proto::{IntoRustIfSome, ProtoType};
28use proptest_derive::Arbitrary;
29use semver::Version;
30use serde::{Deserialize, Serialize};
31use timely::PartialOrder;
32use timely::order::TotalOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::runtime::Handle;
35use tracing::{Instrument, debug_span, error, info, warn};
36use uuid::Uuid;
37
38use crate::batch::{
39 Added, BATCH_DELETE_ENABLED, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
40 BatchParts, ProtoBatch, validate_truncate_batch,
41};
42use crate::error::{InvalidUsage, UpperMismatch};
43use crate::fetch::{
44 EncodedPart, FetchBatchFilter, FetchedPart, PartDecodeFormat, VALIDATE_PART_BOUNDS_ON_READ,
45};
46use crate::internal::compact::{CompactConfig, Compactor};
47use crate::internal::encoding::{Schemas, assert_code_can_read_data};
48use crate::internal::machine::{CompareAndAppendRes, ExpireFn, Machine};
49use crate::internal::metrics::{BatchWriteMetrics, Metrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HandleDebugState, HollowBatch, RunOrder, RunPart};
51use crate::read::ReadHandle;
52use crate::schema::PartMigration;
53use crate::{GarbageCollector, IsolatedRuntime, PersistConfig, ShardId, parse_id};
54
55pub(crate) const COMBINE_INLINE_WRITES: Config<bool> = Config::new(
56 "persist_write_combine_inline_writes",
57 true,
58 "If set, re-encode inline writes if they don't fit into the batch metadata limits.",
59);
60
61pub(crate) const VALIDATE_PART_BOUNDS_ON_WRITE: Config<bool> = Config::new(
62 "persist_validate_part_bounds_on_write",
63 true,
64 "Validate the part lower <= the batch lower and the part upper <= batch upper,\
65 for the batch being appended.",
66);
67
68#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
70#[serde(try_from = "String", into = "String")]
71pub struct WriterId(pub(crate) [u8; 16]);
72
73impl std::fmt::Display for WriterId {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 write!(f, "w{}", Uuid::from_bytes(self.0))
76 }
77}
78
79impl std::fmt::Debug for WriterId {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 write!(f, "WriterId({})", Uuid::from_bytes(self.0))
82 }
83}
84
85impl std::str::FromStr for WriterId {
86 type Err = String;
87
88 fn from_str(s: &str) -> Result<Self, Self::Err> {
89 parse_id("w", "WriterId", s).map(WriterId)
90 }
91}
92
93impl From<WriterId> for String {
94 fn from(writer_id: WriterId) -> Self {
95 writer_id.to_string()
96 }
97}
98
99impl TryFrom<String> for WriterId {
100 type Error = String;
101
102 fn try_from(s: String) -> Result<Self, Self::Error> {
103 s.parse()
104 }
105}
106
107impl WriterId {
108 pub(crate) fn new() -> Self {
109 WriterId(*Uuid::new_v4().as_bytes())
110 }
111}
112
113#[derive(Debug)]
129pub struct WriteHandle<K: Codec, V: Codec, T, D> {
130 pub(crate) cfg: PersistConfig,
131 pub(crate) metrics: Arc<Metrics>,
132 pub(crate) machine: Machine<K, V, T, D>,
133 pub(crate) gc: GarbageCollector<K, V, T, D>,
134 pub(crate) compact: Option<Compactor<K, V, T, D>>,
135 pub(crate) blob: Arc<dyn Blob>,
136 pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
137 pub(crate) writer_id: WriterId,
138 pub(crate) debug_state: HandleDebugState,
139 pub(crate) write_schemas: Schemas<K, V>,
140
141 pub(crate) upper: Antichain<T>,
142 expire_fn: Option<ExpireFn>,
143}
144
145impl<K, V, T, D> WriteHandle<K, V, T, D>
146where
147 K: Debug + Codec,
148 V: Debug + Codec,
149 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
150 D: Monoid + Ord + Codec64 + Send + Sync,
151{
152 pub(crate) fn new(
153 cfg: PersistConfig,
154 metrics: Arc<Metrics>,
155 machine: Machine<K, V, T, D>,
156 gc: GarbageCollector<K, V, T, D>,
157 blob: Arc<dyn Blob>,
158 writer_id: WriterId,
159 purpose: &str,
160 write_schemas: Schemas<K, V>,
161 ) -> Self {
162 let isolated_runtime = Arc::clone(&machine.isolated_runtime);
163 let compact = cfg
164 .compaction_enabled
165 .then(|| Compactor::new(cfg.clone(), Arc::clone(&metrics), gc.clone()));
166 let debug_state = HandleDebugState {
167 hostname: cfg.hostname.to_owned(),
168 purpose: purpose.to_owned(),
169 };
170 let upper = machine.applier.clone_upper();
171 let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), writer_id.clone());
172 WriteHandle {
173 cfg,
174 metrics,
175 machine,
176 gc,
177 compact,
178 blob,
179 isolated_runtime,
180 writer_id,
181 debug_state,
182 write_schemas,
183 upper,
184 expire_fn: Some(expire_fn),
185 }
186 }
187
188 pub fn from_read(read: &ReadHandle<K, V, T, D>, purpose: &str) -> Self {
191 Self::new(
192 read.cfg.clone(),
193 Arc::clone(&read.metrics),
194 read.machine.clone(),
195 read.gc.clone(),
196 Arc::clone(&read.blob),
197 WriterId::new(),
198 purpose,
199 read.read_schemas.clone(),
200 )
201 }
202
203 pub fn validate_part_bounds_on_write(&self) -> bool {
206 VALIDATE_PART_BOUNDS_ON_WRITE.get(&self.cfg) || VALIDATE_PART_BOUNDS_ON_READ.get(&self.cfg)
209 }
210
211 pub fn shard_id(&self) -> ShardId {
213 self.machine.shard_id()
214 }
215
216 pub fn schema_id(&self) -> Option<SchemaId> {
218 self.write_schemas.id
219 }
220
221 pub async fn try_register_schema(&mut self) -> Option<SchemaId> {
227 let Schemas { id, key, val } = &self.write_schemas;
228
229 if let Some(id) = id {
230 return Some(*id);
231 }
232
233 let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
234 maintenance.start_performing(&self.machine, &self.gc);
235
236 self.write_schemas.id = schema_id;
237 schema_id
238 }
239
240 pub fn upper(&self) -> &Antichain<T> {
247 &self.upper
248 }
249
250 pub fn shared_upper(&self) -> Antichain<T> {
256 self.machine.applier.clone_upper()
257 }
258
259 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
265 pub async fn fetch_recent_upper(&mut self) -> &Antichain<T> {
266 self.machine
269 .applier
270 .fetch_upper(|current_upper| self.upper.clone_from(current_upper))
271 .await;
272 &self.upper
273 }
274
275 pub async fn advance_upper(&mut self, target: &Antichain<T>) {
283 let mut lower = self.shared_upper().clone();
286
287 while !PartialOrder::less_equal(target, &lower) {
288 let since = Antichain::from_elem(T::minimum());
289 let desc = Description::new(lower.clone(), target.clone(), since);
290 let batch = HollowBatch::empty(desc);
291
292 let heartbeat_timestamp = (self.cfg.now)();
293 let res = self
294 .machine
295 .compare_and_append(
296 &batch,
297 &self.writer_id,
298 &self.debug_state,
299 heartbeat_timestamp,
300 )
301 .await;
302
303 use CompareAndAppendRes::*;
304 let new_upper = match res {
305 Success(_seq_no, maintenance) => {
306 maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
307 batch.desc.upper().clone()
308 }
309 UpperMismatch(_seq_no, actual_upper) => actual_upper,
310 InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
311 InlineBackpressure => unreachable!("batch was empty"),
312 };
313
314 self.upper.clone_from(&new_upper);
315 lower = new_upper;
316 }
317 }
318
319 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
349 pub async fn append<SB, KB, VB, TB, DB, I>(
350 &mut self,
351 updates: I,
352 lower: Antichain<T>,
353 upper: Antichain<T>,
354 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
355 where
356 SB: Borrow<((KB, VB), TB, DB)>,
357 KB: Borrow<K>,
358 VB: Borrow<V>,
359 TB: Borrow<T>,
360 DB: Borrow<D>,
361 I: IntoIterator<Item = SB>,
362 D: Send + Sync,
363 {
364 let batch = self.batch(updates, lower.clone(), upper.clone()).await?;
365 self.append_batch(batch, lower, upper).await
366 }
367
368 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
397 pub async fn compare_and_append<SB, KB, VB, TB, DB, I>(
398 &mut self,
399 updates: I,
400 expected_upper: Antichain<T>,
401 new_upper: Antichain<T>,
402 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
403 where
404 SB: Borrow<((KB, VB), TB, DB)>,
405 KB: Borrow<K>,
406 VB: Borrow<V>,
407 TB: Borrow<T>,
408 DB: Borrow<D>,
409 I: IntoIterator<Item = SB>,
410 D: Send + Sync,
411 {
412 let mut batch = self
413 .batch(updates, expected_upper.clone(), new_upper.clone())
414 .await?;
415 match self
416 .compare_and_append_batch(&mut [&mut batch], expected_upper, new_upper, true)
417 .await
418 {
419 ok @ Ok(Ok(())) => ok,
420 err => {
421 batch.delete().await;
426 err
427 }
428 }
429 }
430
431 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
457 pub async fn append_batch(
458 &mut self,
459 mut batch: Batch<K, V, T, D>,
460 mut lower: Antichain<T>,
461 upper: Antichain<T>,
462 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
463 where
464 D: Send + Sync,
465 {
466 loop {
467 let res = self
468 .compare_and_append_batch(&mut [&mut batch], lower.clone(), upper.clone(), true)
469 .await?;
470 match res {
471 Ok(()) => {
472 self.upper = upper;
473 return Ok(Ok(()));
474 }
475 Err(mismatch) => {
476 if PartialOrder::less_than(&mismatch.current, &lower) {
478 self.upper.clone_from(&mismatch.current);
479
480 batch.delete().await;
481
482 return Ok(Err(mismatch));
483 } else if PartialOrder::less_than(&mismatch.current, &upper) {
484 lower = mismatch.current;
491 } else {
492 self.upper = mismatch.current;
494
495 batch.delete().await;
499
500 return Ok(Ok(()));
501 }
502 }
503 }
504 }
505 }
506
507 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
541 pub async fn compare_and_append_batch(
542 &mut self,
543 batches: &mut [&mut Batch<K, V, T, D>],
544 expected_upper: Antichain<T>,
545 new_upper: Antichain<T>,
546 validate_part_bounds_on_write: bool,
547 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
548 where
549 D: Send + Sync,
550 {
551 let schema_id = self.try_register_schema().await;
555
556 for batch in batches.iter() {
557 if self.machine.shard_id() != batch.shard_id() {
558 return Err(InvalidUsage::BatchNotFromThisShard {
559 batch_shard: batch.shard_id(),
560 handle_shard: self.machine.shard_id(),
561 });
562 }
563 assert_code_can_read_data(&self.cfg.build_version, &batch.version);
564 if self.cfg.build_version > batch.version {
565 info!(
566 shard_id =? self.machine.shard_id(),
567 batch_version =? batch.version,
568 writer_version =? self.cfg.build_version,
569 "Appending batch from the past. This is fine but should be rare. \
570 TODO: Error on very old versions once the leaked blob detector exists."
571 )
572 }
573 fn assert_schema<A: Codec>(writer_schema: &A::Schema, batch_schema: &bytes::Bytes) {
574 if batch_schema.is_empty() {
575 return;
577 }
578 let batch_schema: A::Schema = A::decode_schema(batch_schema);
579 if *writer_schema != batch_schema {
580 error!(
581 ?writer_schema,
582 ?batch_schema,
583 "writer and batch schemas should be identical"
584 );
585 soft_panic_or_log!("writer and batch schemas should be identical");
586 }
587 }
588 assert_schema::<K>(&*self.write_schemas.key, &batch.schemas.0);
589 assert_schema::<V>(&*self.write_schemas.val, &batch.schemas.1);
590 }
591
592 let lower = expected_upper.clone();
593 let upper = new_upper;
594 let since = Antichain::from_elem(T::minimum());
595 let desc = Description::new(lower, upper, since);
596
597 let mut received_inline_backpressure = false;
598 let mut inline_batch_builder: Option<(_, BatchBuilder<K, V, T, D>)> = None;
605 let maintenance = loop {
606 let any_batch_rewrite = batches
607 .iter()
608 .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some()));
609 let (mut parts, mut num_updates, mut run_splits, mut run_metas) =
610 (vec![], 0, vec![], vec![]);
611 let mut key_storage = None;
612 let mut val_storage = None;
613 for batch in batches.iter() {
614 let () = validate_truncate_batch(
615 &batch.batch,
616 &desc,
617 any_batch_rewrite,
618 validate_part_bounds_on_write,
619 )?;
620 for (run_meta, run) in batch.batch.runs() {
621 let start_index = parts.len();
622 for part in run {
623 if let (
624 RunPart::Single(
625 batch_part @ BatchPart::Inline {
626 updates,
627 ts_rewrite,
628 schema_id: _,
629 deprecated_schema_id: _,
630 },
631 ),
632 Some((schema_cache, builder)),
633 ) = (part, &mut inline_batch_builder)
634 {
635 let schema_migration = PartMigration::new(
636 batch_part,
637 self.write_schemas.clone(),
638 schema_cache,
639 )
640 .await
641 .expect("schemas for inline user part");
642
643 let encoded_part = EncodedPart::from_inline(
644 &crate::fetch::FetchConfig::from_persist_config(&self.cfg),
645 &*self.metrics,
646 self.metrics.read.compaction.clone(),
647 desc.clone(),
648 updates,
649 ts_rewrite.as_ref(),
650 );
651 let mut fetched_part = FetchedPart::new(
652 Arc::clone(&self.metrics),
653 encoded_part,
654 schema_migration,
655 FetchBatchFilter::Compaction {
656 since: desc.since().clone(),
657 },
658 false,
659 PartDecodeFormat::Arrow,
660 None,
661 );
662
663 while let Some(((k, v), t, d)) =
664 fetched_part.next_with_storage(&mut key_storage, &mut val_storage)
665 {
666 builder
667 .add(
668 &k.expect("decoded just-encoded key data"),
669 &v.expect("decoded just-encoded val data"),
670 &t,
671 &d,
672 )
673 .await
674 .expect("re-encoding just-decoded data");
675 }
676 } else {
677 parts.push(part.clone())
678 }
679 }
680
681 let end_index = parts.len();
682
683 if start_index == end_index {
684 continue;
685 }
686
687 if start_index != 0 {
689 run_splits.push(start_index);
690 }
691 run_metas.push(run_meta.clone());
692 }
693 num_updates += batch.batch.len;
694 }
695
696 let mut flushed_inline_batch = if let Some((_, builder)) = inline_batch_builder.take() {
697 let mut finished = builder
698 .finish(desc.upper().clone())
699 .await
700 .expect("invalid usage");
701 let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
702 finished
703 .flush_to_blob(
704 &cfg,
705 &self.metrics.inline.backpressure,
706 &self.isolated_runtime,
707 &self.write_schemas,
708 )
709 .await;
710 Some(finished)
711 } else {
712 None
713 };
714
715 if let Some(batch) = &flushed_inline_batch {
716 for (run_meta, run) in batch.batch.runs() {
717 assert!(run.len() > 0);
718 let start_index = parts.len();
719 if start_index != 0 {
720 run_splits.push(start_index);
721 }
722 run_metas.push(run_meta.clone());
723 parts.extend(run.iter().cloned())
724 }
725 }
726
727 let mut combined_batch =
728 HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits);
729
730 match schema_id {
734 Some(schema_id) => {
735 ensure_batch_schema(&mut combined_batch, self.shard_id(), schema_id);
736 }
737 None => {
738 assert!(
739 self.fetch_recent_upper().await.is_empty(),
740 "fetching a schema id should only fail when the shard is tombstoned"
741 )
742 }
743 }
744
745 let heartbeat_timestamp = (self.cfg.now)();
746 let res = self
747 .machine
748 .compare_and_append(
749 &combined_batch,
750 &self.writer_id,
751 &self.debug_state,
752 heartbeat_timestamp,
753 )
754 .await;
755
756 match res {
757 CompareAndAppendRes::Success(_seqno, maintenance) => {
758 self.upper.clone_from(desc.upper());
759 for batch in batches.iter_mut() {
760 batch.mark_consumed();
761 }
762 if let Some(batch) = &mut flushed_inline_batch {
763 batch.mark_consumed();
764 }
765 break maintenance;
766 }
767 CompareAndAppendRes::InvalidUsage(invalid_usage) => {
768 if let Some(batch) = flushed_inline_batch.take() {
769 batch.delete().await;
770 }
771 return Err(invalid_usage);
772 }
773 CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => {
774 if let Some(batch) = flushed_inline_batch.take() {
775 batch.delete().await;
776 }
777 self.upper.clone_from(¤t_upper);
780 return Ok(Err(UpperMismatch {
781 current: current_upper,
782 expected: expected_upper,
783 }));
784 }
785 CompareAndAppendRes::InlineBackpressure => {
786 assert_eq!(received_inline_backpressure, false);
789 received_inline_backpressure = true;
790 if COMBINE_INLINE_WRITES.get(&self.cfg) {
791 inline_batch_builder = Some((
792 self.machine.applier.schema_cache(),
793 self.builder(desc.lower().clone()),
794 ));
795 continue;
796 }
797
798 let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
799 let flush_batches = batches
802 .iter_mut()
803 .map(|batch| async {
804 batch
805 .flush_to_blob(
806 &cfg,
807 &self.metrics.inline.backpressure,
808 &self.isolated_runtime,
809 &self.write_schemas,
810 )
811 .await
812 })
813 .collect::<FuturesUnordered<_>>();
814 let () = flush_batches.collect::<()>().await;
815
816 for batch in batches.iter() {
817 assert_eq!(batch.batch.inline_bytes(), 0);
818 }
819
820 continue;
821 }
822 }
823 };
824
825 maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
826
827 Ok(Ok(()))
828 }
829
830 pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> {
833 let shard_id: ShardId = batch
834 .shard_id
835 .into_rust()
836 .expect("valid transmittable batch");
837 assert_eq!(shard_id, self.machine.shard_id());
838
839 let ret = Batch {
840 batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
841 metrics: Arc::clone(&self.metrics),
842 shard_metrics: Arc::clone(&self.machine.applier.shard_metrics),
843 version: Version::parse(&batch.version).expect("valid transmittable batch"),
844 schemas: (batch.key_schema, batch.val_schema),
845 batch: batch
846 .batch
847 .into_rust_if_some("ProtoBatch::batch")
848 .expect("valid transmittable batch"),
849 blob: Arc::clone(&self.blob),
850 _phantom: std::marker::PhantomData,
851 };
852 assert_eq!(ret.shard_id(), self.machine.shard_id());
853 ret
854 }
855
856 pub fn builder(&self, lower: Antichain<T>) -> BatchBuilder<K, V, T, D> {
869 Self::builder_inner(
870 &self.cfg,
871 CompactConfig::new(&self.cfg, self.shard_id()),
872 Arc::clone(&self.metrics),
873 Arc::clone(&self.machine.applier.shard_metrics),
874 &self.metrics.user,
875 Arc::clone(&self.isolated_runtime),
876 Arc::clone(&self.blob),
877 self.shard_id(),
878 self.write_schemas.clone(),
879 lower,
880 )
881 }
882
883 pub(crate) fn builder_inner(
886 persist_cfg: &PersistConfig,
887 compact_cfg: CompactConfig,
888 metrics: Arc<Metrics>,
889 shard_metrics: Arc<ShardMetrics>,
890 user_batch_metrics: &BatchWriteMetrics,
891 isolated_runtime: Arc<IsolatedRuntime>,
892 blob: Arc<dyn Blob>,
893 shard_id: ShardId,
894 schemas: Schemas<K, V>,
895 lower: Antichain<T>,
896 ) -> BatchBuilder<K, V, T, D> {
897 let parts = if let Some(max_runs) = compact_cfg.batch.max_runs {
898 BatchParts::new_compacting::<K, V, D>(
899 compact_cfg,
900 Description::new(
901 lower.clone(),
902 Antichain::new(),
903 Antichain::from_elem(T::minimum()),
904 ),
905 max_runs,
906 Arc::clone(&metrics),
907 shard_metrics,
908 shard_id,
909 Arc::clone(&blob),
910 isolated_runtime,
911 user_batch_metrics,
912 schemas.clone(),
913 )
914 } else {
915 BatchParts::new_ordered::<D>(
916 compact_cfg.batch,
917 RunOrder::Unordered,
918 Arc::clone(&metrics),
919 shard_metrics,
920 shard_id,
921 Arc::clone(&blob),
922 isolated_runtime,
923 user_batch_metrics,
924 )
925 };
926 let builder = BatchBuilderInternal::new(
927 BatchBuilderConfig::new(persist_cfg, shard_id),
928 parts,
929 metrics,
930 schemas,
931 blob,
932 shard_id,
933 persist_cfg.build_version.clone(),
934 );
935 BatchBuilder::new(
936 builder,
937 Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())),
938 )
939 }
940
941 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
944 pub async fn batch<SB, KB, VB, TB, DB, I>(
945 &mut self,
946 updates: I,
947 lower: Antichain<T>,
948 upper: Antichain<T>,
949 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>>
950 where
951 SB: Borrow<((KB, VB), TB, DB)>,
952 KB: Borrow<K>,
953 VB: Borrow<V>,
954 TB: Borrow<T>,
955 DB: Borrow<D>,
956 I: IntoIterator<Item = SB>,
957 {
958 let iter = updates.into_iter();
959
960 let mut builder = self.builder(lower.clone());
961
962 for update in iter {
963 let ((k, v), t, d) = update.borrow();
964 let (k, v, t, d) = (k.borrow(), v.borrow(), t.borrow(), d.borrow());
965 match builder.add(k, v, t, d).await {
966 Ok(Added::Record | Added::RecordAndParts) => (),
967 Err(invalid_usage) => return Err(invalid_usage),
968 }
969 }
970
971 builder.finish(upper.clone()).await
972 }
973
974 pub async fn wait_for_upper_past(&mut self, frontier: &Antichain<T>) {
976 let mut watch = self.machine.applier.watch();
977 let batch = self
978 .machine
979 .next_listen_batch(frontier, &mut watch, None, None)
980 .await;
981 if PartialOrder::less_than(&self.upper, batch.desc.upper()) {
982 self.upper.clone_from(batch.desc.upper());
983 }
984 assert!(PartialOrder::less_than(frontier, &self.upper));
985 }
986
987 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
996 pub async fn expire(mut self) {
997 let Some(expire_fn) = self.expire_fn.take() else {
998 return;
999 };
1000 expire_fn.0().await;
1001 }
1002
1003 fn expire_fn(
1004 machine: Machine<K, V, T, D>,
1005 gc: GarbageCollector<K, V, T, D>,
1006 writer_id: WriterId,
1007 ) -> ExpireFn {
1008 ExpireFn(Box::new(move || {
1009 Box::pin(async move {
1010 let (_, maintenance) = machine.expire_writer(&writer_id).await;
1011 maintenance.start_performing(&machine, &gc);
1012 })
1013 }))
1014 }
1015
1016 #[cfg(test)]
1018 #[track_caller]
1019 pub async fn expect_append<L, U>(&mut self, updates: &[((K, V), T, D)], lower: L, new_upper: U)
1020 where
1021 L: Into<Antichain<T>>,
1022 U: Into<Antichain<T>>,
1023 D: Send + Sync,
1024 {
1025 self.append(updates.iter(), lower.into(), new_upper.into())
1026 .await
1027 .expect("invalid usage")
1028 .expect("unexpected upper");
1029 }
1030
1031 #[cfg(test)]
1034 #[track_caller]
1035 pub async fn expect_compare_and_append(
1036 &mut self,
1037 updates: &[((K, V), T, D)],
1038 expected_upper: T,
1039 new_upper: T,
1040 ) where
1041 D: Send + Sync,
1042 {
1043 self.compare_and_append(
1044 updates.iter().map(|((k, v), t, d)| ((k, v), t, d)),
1045 Antichain::from_elem(expected_upper),
1046 Antichain::from_elem(new_upper),
1047 )
1048 .await
1049 .expect("invalid usage")
1050 .expect("unexpected upper")
1051 }
1052
1053 #[cfg(test)]
1056 #[track_caller]
1057 pub async fn expect_compare_and_append_batch(
1058 &mut self,
1059 batches: &mut [&mut Batch<K, V, T, D>],
1060 expected_upper: T,
1061 new_upper: T,
1062 ) {
1063 self.compare_and_append_batch(
1064 batches,
1065 Antichain::from_elem(expected_upper),
1066 Antichain::from_elem(new_upper),
1067 true,
1068 )
1069 .await
1070 .expect("invalid usage")
1071 .expect("unexpected upper")
1072 }
1073
1074 #[cfg(test)]
1076 #[track_caller]
1077 pub async fn expect_batch(
1078 &mut self,
1079 updates: &[((K, V), T, D)],
1080 lower: T,
1081 upper: T,
1082 ) -> Batch<K, V, T, D> {
1083 self.batch(
1084 updates.iter(),
1085 Antichain::from_elem(lower),
1086 Antichain::from_elem(upper),
1087 )
1088 .await
1089 .expect("invalid usage")
1090 }
1091}
1092
1093impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D> {
1094 fn drop(&mut self) {
1095 let Some(expire_fn) = self.expire_fn.take() else {
1096 return;
1097 };
1098 let handle = match Handle::try_current() {
1099 Ok(x) => x,
1100 Err(_) => {
1101 warn!(
1102 "WriteHandle {} dropped without being explicitly expired, falling back to lease timeout",
1103 self.writer_id
1104 );
1105 return;
1106 }
1107 };
1108 let expire_span = debug_span!("drop::expire");
1114 handle.spawn_named(
1115 || format!("WriteHandle::expire ({})", self.writer_id),
1116 expire_fn.0().instrument(expire_span),
1117 );
1118 }
1119}
1120
1121fn ensure_batch_schema<T>(batch: &mut HollowBatch<T>, shard_id: ShardId, schema_id: SchemaId)
1126where
1127 T: Timestamp + Lattice + Codec64,
1128{
1129 let ensure = |id: &mut Option<SchemaId>| match id {
1130 Some(id) => assert_eq!(*id, schema_id, "schema ID mismatch; shard={shard_id}"),
1131 None => *id = Some(schema_id),
1132 };
1133
1134 for run_meta in &mut batch.run_meta {
1135 ensure(&mut run_meta.schema);
1136 }
1137 for part in &mut batch.parts {
1138 match part {
1139 RunPart::Single(BatchPart::Hollow(part)) => ensure(&mut part.schema_id),
1140 RunPart::Single(BatchPart::Inline { schema_id, .. }) => ensure(schema_id),
1141 RunPart::Many(_hollow_run_ref) => {
1142 }
1146 }
1147 }
1148}
1149
1150#[cfg(test)]
1151mod tests {
1152 use std::str::FromStr;
1153 use std::sync::mpsc;
1154
1155 use differential_dataflow::consolidation::consolidate_updates;
1156 use futures_util::FutureExt;
1157 use mz_dyncfg::ConfigUpdates;
1158 use mz_ore::collections::CollectionExt;
1159 use mz_ore::task;
1160 use serde_json::json;
1161
1162 use crate::cache::PersistClientCache;
1163 use crate::tests::{all_ok, new_test_client};
1164 use crate::{PersistLocation, ShardId};
1165
1166 use super::*;
1167
1168 #[mz_persist_proc::test(tokio::test)]
1169 #[cfg_attr(miri, ignore)] async fn empty_batches(dyncfgs: ConfigUpdates) {
1171 let data = [
1172 (("1".to_owned(), "one".to_owned()), 1, 1),
1173 (("2".to_owned(), "two".to_owned()), 2, 1),
1174 (("3".to_owned(), "three".to_owned()), 3, 1),
1175 ];
1176
1177 let (mut write, _) = new_test_client(&dyncfgs)
1178 .await
1179 .expect_open::<String, String, u64, i64>(ShardId::new())
1180 .await;
1181 let blob = Arc::clone(&write.blob);
1182
1183 let mut upper = 3;
1185 write.expect_append(&data[..2], vec![0], vec![upper]).await;
1186
1187 let mut count_before = 0;
1189 blob.list_keys_and_metadata("", &mut |_| {
1190 count_before += 1;
1191 })
1192 .await
1193 .expect("list_keys failed");
1194 for _ in 0..5 {
1195 let new_upper = upper + 1;
1196 write.expect_compare_and_append(&[], upper, new_upper).await;
1197 upper = new_upper;
1198 }
1199 let mut count_after = 0;
1200 blob.list_keys_and_metadata("", &mut |_| {
1201 count_after += 1;
1202 })
1203 .await
1204 .expect("list_keys failed");
1205 assert_eq!(count_after, count_before);
1206 }
1207
1208 #[mz_persist_proc::test(tokio::test)]
1209 #[cfg_attr(miri, ignore)] async fn compare_and_append_batch_multi(dyncfgs: ConfigUpdates) {
1211 let data0 = vec![
1212 (("1".to_owned(), "one".to_owned()), 1, 1),
1213 (("2".to_owned(), "two".to_owned()), 2, 1),
1214 (("4".to_owned(), "four".to_owned()), 4, 1),
1215 ];
1216 let data1 = vec![
1217 (("1".to_owned(), "one".to_owned()), 1, 1),
1218 (("2".to_owned(), "two".to_owned()), 2, 1),
1219 (("3".to_owned(), "three".to_owned()), 3, 1),
1220 ];
1221
1222 let (mut write, mut read) = new_test_client(&dyncfgs)
1223 .await
1224 .expect_open::<String, String, u64, i64>(ShardId::new())
1225 .await;
1226
1227 let mut batch0 = write.expect_batch(&data0, 0, 5).await;
1228 let mut batch1 = write.expect_batch(&data1, 0, 4).await;
1229
1230 write
1231 .expect_compare_and_append_batch(&mut [&mut batch0, &mut batch1], 0, 4)
1232 .await;
1233
1234 let batch = write
1235 .machine
1236 .snapshot(&Antichain::from_elem(3))
1237 .await
1238 .expect("just wrote this")
1239 .into_element();
1240
1241 assert!(batch.runs().count() >= 2);
1242
1243 let expected = vec![
1244 (("1".to_owned(), "one".to_owned()), 1, 2),
1245 (("2".to_owned(), "two".to_owned()), 2, 2),
1246 (("3".to_owned(), "three".to_owned()), 3, 1),
1247 ];
1248 let mut actual = read.expect_snapshot_and_fetch(3).await;
1249 consolidate_updates(&mut actual);
1250 assert_eq!(actual, all_ok(&expected, 3));
1251 }
1252
1253 #[mz_ore::test]
1254 fn writer_id_human_readable_serde() {
1255 #[derive(Debug, Serialize, Deserialize)]
1256 struct Container {
1257 writer_id: WriterId,
1258 }
1259
1260 let id = WriterId::from_str("w00000000-1234-5678-0000-000000000000").expect("valid id");
1262 assert_eq!(
1263 id,
1264 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1265 .expect("deserializable")
1266 );
1267
1268 assert_eq!(
1270 id,
1271 serde_json::from_str("\"w00000000-1234-5678-0000-000000000000\"")
1272 .expect("deserializable")
1273 );
1274
1275 let json = json!({ "writer_id": id });
1277 assert_eq!(
1278 "{\"writer_id\":\"w00000000-1234-5678-0000-000000000000\"}",
1279 &json.to_string()
1280 );
1281 let container: Container = serde_json::from_value(json).expect("deserializable");
1282 assert_eq!(container.writer_id, id);
1283 }
1284
1285 #[mz_persist_proc::test(tokio::test)]
1286 #[cfg_attr(miri, ignore)] async fn hollow_batch_roundtrip(dyncfgs: ConfigUpdates) {
1288 let data = vec![
1289 (("1".to_owned(), "one".to_owned()), 1, 1),
1290 (("2".to_owned(), "two".to_owned()), 2, 1),
1291 (("3".to_owned(), "three".to_owned()), 3, 1),
1292 ];
1293
1294 let (mut write, mut read) = new_test_client(&dyncfgs)
1295 .await
1296 .expect_open::<String, String, u64, i64>(ShardId::new())
1297 .await;
1298
1299 let batch = write.expect_batch(&data, 0, 4).await;
1304 let hollow_batch = batch.into_transmittable_batch();
1305 let mut rehydrated_batch = write.batch_from_transmittable_batch(hollow_batch);
1306
1307 write
1308 .expect_compare_and_append_batch(&mut [&mut rehydrated_batch], 0, 4)
1309 .await;
1310
1311 let expected = vec![
1312 (("1".to_owned(), "one".to_owned()), 1, 1),
1313 (("2".to_owned(), "two".to_owned()), 2, 1),
1314 (("3".to_owned(), "three".to_owned()), 3, 1),
1315 ];
1316 let mut actual = read.expect_snapshot_and_fetch(3).await;
1317 consolidate_updates(&mut actual);
1318 assert_eq!(actual, all_ok(&expected, 3));
1319 }
1320
1321 #[mz_persist_proc::test(tokio::test)]
1322 #[cfg_attr(miri, ignore)] async fn wait_for_upper_past(dyncfgs: ConfigUpdates) {
1324 let client = new_test_client(&dyncfgs).await;
1325 let (mut write, _) = client.expect_open::<(), (), u64, i64>(ShardId::new()).await;
1326 let five = Antichain::from_elem(5);
1327
1328 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1330
1331 write
1333 .expect_compare_and_append(&[(((), ()), 1, 1)], 0, 5)
1334 .await;
1335 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1336
1337 write
1339 .expect_compare_and_append(&[(((), ()), 5, 1)], 5, 7)
1340 .await;
1341 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), Some(()));
1342 assert_eq!(write.upper(), &Antichain::from_elem(7));
1343
1344 assert_eq!(
1347 write
1348 .wait_for_upper_past(&Antichain::from_elem(2))
1349 .now_or_never(),
1350 Some(())
1351 );
1352 assert_eq!(write.upper(), &Antichain::from_elem(7));
1353 }
1354
1355 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1356 #[cfg_attr(miri, ignore)] async fn fetch_recent_upper_linearized() {
1358 type Timestamp = u64;
1359 let max_upper = 1000;
1360
1361 let shard_id = ShardId::new();
1362 let mut clients = PersistClientCache::new_no_metrics();
1363 let upper_writer_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1364 let (mut upper_writer, _) = upper_writer_client
1365 .expect_open::<(), (), Timestamp, i64>(shard_id)
1366 .await;
1367 clients.clear_state_cache();
1370 let upper_reader_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1371 let (mut upper_reader, _) = upper_reader_client
1372 .expect_open::<(), (), Timestamp, i64>(shard_id)
1373 .await;
1374 let (tx, rx) = mpsc::channel();
1375
1376 let task = task::spawn(|| "upper-reader", async move {
1377 let mut upper = Timestamp::MIN;
1378
1379 while upper < max_upper {
1380 while let Ok(new_upper) = rx.try_recv() {
1381 upper = new_upper;
1382 }
1383
1384 let recent_upper = upper_reader
1385 .fetch_recent_upper()
1386 .await
1387 .as_option()
1388 .cloned()
1389 .expect("u64 is totally ordered and the shard is not finalized");
1390 assert!(
1391 recent_upper >= upper,
1392 "recent upper {recent_upper:?} is less than known upper {upper:?}"
1393 );
1394 }
1395 });
1396
1397 for upper in Timestamp::MIN..max_upper {
1398 let next_upper = upper + 1;
1399 upper_writer
1400 .expect_compare_and_append(&[], upper, next_upper)
1401 .await;
1402 tx.send(next_upper).expect("send failed");
1403 }
1404
1405 task.await;
1406 }
1407}