1use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::Description;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_dyncfg::Config;
22use mz_ore::task::RuntimeExt;
23use mz_ore::{instrument, soft_panic_or_log};
24use mz_persist::location::Blob;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::{Codec, Codec64};
27use mz_proto::{IntoRustIfSome, ProtoType};
28use proptest_derive::Arbitrary;
29use semver::Version;
30use serde::{Deserialize, Serialize};
31use timely::PartialOrder;
32use timely::order::TotalOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::runtime::Handle;
35use tracing::{Instrument, debug_span, error, info, warn};
36use uuid::Uuid;
37
38use crate::batch::{
39 Added, BATCH_DELETE_ENABLED, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
40 BatchParts, ProtoBatch, validate_truncate_batch,
41};
42use crate::error::{InvalidUsage, UpperMismatch};
43use crate::fetch::{
44 EncodedPart, FetchBatchFilter, FetchedPart, PartDecodeFormat, VALIDATE_PART_BOUNDS_ON_READ,
45};
46use crate::internal::compact::{CompactConfig, Compactor};
47use crate::internal::encoding::{Schemas, assert_code_can_read_data};
48use crate::internal::machine::{
49 CompareAndAppendRes, ExpireFn, Machine, next_listen_batch_retry_params,
50};
51use crate::internal::metrics::{BatchWriteMetrics, Metrics, ShardMetrics};
52use crate::internal::state::{BatchPart, HandleDebugState, HollowBatch, RunOrder, RunPart};
53use crate::read::ReadHandle;
54use crate::schema::PartMigration;
55use crate::{GarbageCollector, IsolatedRuntime, PersistConfig, ShardId, parse_id};
56
57pub(crate) const COMBINE_INLINE_WRITES: Config<bool> = Config::new(
58 "persist_write_combine_inline_writes",
59 true,
60 "If set, re-encode inline writes if they don't fit into the batch metadata limits.",
61);
62
63pub(crate) const VALIDATE_PART_BOUNDS_ON_WRITE: Config<bool> = Config::new(
64 "persist_validate_part_bounds_on_write",
65 false,
66 "Validate the part lower <= the batch lower and the part upper <= batch upper,\
67 for the batch being appended.",
68);
69
70#[derive(
72 Arbitrary,
73 Clone,
74 PartialEq,
75 Eq,
76 PartialOrd,
77 Ord,
78 Hash,
79 Serialize,
80 Deserialize
81)]
82#[serde(try_from = "String", into = "String")]
83pub struct WriterId(pub(crate) [u8; 16]);
84
85impl std::fmt::Display for WriterId {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 write!(f, "w{}", Uuid::from_bytes(self.0))
88 }
89}
90
91impl std::fmt::Debug for WriterId {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 write!(f, "WriterId({})", Uuid::from_bytes(self.0))
94 }
95}
96
97impl std::str::FromStr for WriterId {
98 type Err = String;
99
100 fn from_str(s: &str) -> Result<Self, Self::Err> {
101 parse_id("w", "WriterId", s).map(WriterId)
102 }
103}
104
105impl From<WriterId> for String {
106 fn from(writer_id: WriterId) -> Self {
107 writer_id.to_string()
108 }
109}
110
111impl TryFrom<String> for WriterId {
112 type Error = String;
113
114 fn try_from(s: String) -> Result<Self, Self::Error> {
115 s.parse()
116 }
117}
118
119impl WriterId {
120 pub(crate) fn new() -> Self {
121 WriterId(*Uuid::new_v4().as_bytes())
122 }
123}
124
125#[derive(Debug)]
141pub struct WriteHandle<K: Codec, V: Codec, T, D> {
142 pub(crate) cfg: PersistConfig,
143 pub(crate) metrics: Arc<Metrics>,
144 pub(crate) machine: Machine<K, V, T, D>,
145 pub(crate) gc: GarbageCollector<K, V, T, D>,
146 pub(crate) compact: Option<Compactor<K, V, T, D>>,
147 pub(crate) blob: Arc<dyn Blob>,
148 pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
149 pub(crate) writer_id: WriterId,
150 pub(crate) debug_state: HandleDebugState,
151 pub(crate) write_schemas: Schemas<K, V>,
152
153 pub(crate) upper: Antichain<T>,
154 expire_fn: Option<ExpireFn>,
155}
156
157impl<K, V, T, D> WriteHandle<K, V, T, D>
158where
159 K: Debug + Codec,
160 V: Debug + Codec,
161 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
162 D: Monoid + Ord + Codec64 + Send + Sync,
163{
164 pub(crate) fn new(
165 cfg: PersistConfig,
166 metrics: Arc<Metrics>,
167 machine: Machine<K, V, T, D>,
168 gc: GarbageCollector<K, V, T, D>,
169 blob: Arc<dyn Blob>,
170 writer_id: WriterId,
171 purpose: &str,
172 write_schemas: Schemas<K, V>,
173 ) -> Self {
174 let isolated_runtime = Arc::clone(&machine.isolated_runtime);
175 let compact = cfg
176 .compaction_enabled
177 .then(|| Compactor::new(cfg.clone(), Arc::clone(&metrics), gc.clone()));
178 let debug_state = HandleDebugState {
179 hostname: cfg.hostname.to_owned(),
180 purpose: purpose.to_owned(),
181 };
182 let upper = machine.applier.clone_upper();
183 let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), writer_id.clone());
184 WriteHandle {
185 cfg,
186 metrics,
187 machine,
188 gc,
189 compact,
190 blob,
191 isolated_runtime,
192 writer_id,
193 debug_state,
194 write_schemas,
195 upper,
196 expire_fn: Some(expire_fn),
197 }
198 }
199
200 pub fn from_read(read: &ReadHandle<K, V, T, D>, purpose: &str) -> Self {
203 Self::new(
204 read.cfg.clone(),
205 Arc::clone(&read.metrics),
206 read.machine.clone(),
207 read.gc.clone(),
208 Arc::clone(&read.blob),
209 WriterId::new(),
210 purpose,
211 read.read_schemas.clone(),
212 )
213 }
214
215 pub fn validate_part_bounds_on_write(&self) -> bool {
218 VALIDATE_PART_BOUNDS_ON_WRITE.get(&self.cfg) || VALIDATE_PART_BOUNDS_ON_READ.get(&self.cfg)
221 }
222
223 pub fn shard_id(&self) -> ShardId {
225 self.machine.shard_id()
226 }
227
228 pub fn schema_id(&self) -> Option<SchemaId> {
230 self.write_schemas.id
231 }
232
233 pub async fn try_register_schema(&mut self) -> Option<SchemaId> {
239 let Schemas { id, key, val } = &self.write_schemas;
240
241 if let Some(id) = id {
242 return Some(*id);
243 }
244
245 let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
246 maintenance.start_performing(&self.machine, &self.gc);
247
248 self.write_schemas.id = schema_id;
249 schema_id
250 }
251
252 pub fn upper(&self) -> &Antichain<T> {
259 &self.upper
260 }
261
262 pub fn shared_upper(&self) -> Antichain<T> {
268 self.machine.applier.clone_upper()
269 }
270
271 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
277 pub async fn fetch_recent_upper(&mut self) -> &Antichain<T> {
278 self.machine
281 .applier
282 .fetch_upper(|current_upper| self.upper.clone_from(current_upper))
283 .await;
284 &self.upper
285 }
286
287 pub async fn advance_upper(&mut self, target: &Antichain<T>) {
295 let mut lower = self.shared_upper().clone();
298
299 while !PartialOrder::less_equal(target, &lower) {
300 let since = Antichain::from_elem(T::minimum());
301 let desc = Description::new(lower.clone(), target.clone(), since);
302 let batch = HollowBatch::empty(desc);
303
304 let heartbeat_timestamp = (self.cfg.now)();
305 let res = self
306 .machine
307 .compare_and_append(
308 &batch,
309 &self.writer_id,
310 &self.debug_state,
311 heartbeat_timestamp,
312 )
313 .await;
314
315 use CompareAndAppendRes::*;
316 let new_upper = match res {
317 Success(_seq_no, maintenance) => {
318 maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
319 batch.desc.upper().clone()
320 }
321 UpperMismatch(_seq_no, actual_upper) => actual_upper,
322 InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
323 InlineBackpressure => unreachable!("batch was empty"),
324 };
325
326 self.upper.clone_from(&new_upper);
327 lower = new_upper;
328 }
329 }
330
331 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
361 pub async fn append<SB, KB, VB, TB, DB, I>(
362 &mut self,
363 updates: I,
364 lower: Antichain<T>,
365 upper: Antichain<T>,
366 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
367 where
368 SB: Borrow<((KB, VB), TB, DB)>,
369 KB: Borrow<K>,
370 VB: Borrow<V>,
371 TB: Borrow<T>,
372 DB: Borrow<D>,
373 I: IntoIterator<Item = SB>,
374 D: Send + Sync,
375 {
376 let batch = self.batch(updates, lower.clone(), upper.clone()).await?;
377 self.append_batch(batch, lower, upper).await
378 }
379
380 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
409 pub async fn compare_and_append<SB, KB, VB, TB, DB, I>(
410 &mut self,
411 updates: I,
412 expected_upper: Antichain<T>,
413 new_upper: Antichain<T>,
414 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
415 where
416 SB: Borrow<((KB, VB), TB, DB)>,
417 KB: Borrow<K>,
418 VB: Borrow<V>,
419 TB: Borrow<T>,
420 DB: Borrow<D>,
421 I: IntoIterator<Item = SB>,
422 D: Send + Sync,
423 {
424 let mut batch = self
425 .batch(updates, expected_upper.clone(), new_upper.clone())
426 .await?;
427 match self
428 .compare_and_append_batch(&mut [&mut batch], expected_upper, new_upper, true)
429 .await
430 {
431 ok @ Ok(Ok(())) => ok,
432 err => {
433 batch.delete().await;
438 err
439 }
440 }
441 }
442
443 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
469 pub async fn append_batch(
470 &mut self,
471 mut batch: Batch<K, V, T, D>,
472 mut lower: Antichain<T>,
473 upper: Antichain<T>,
474 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
475 where
476 D: Send + Sync,
477 {
478 loop {
479 let res = self
480 .compare_and_append_batch(&mut [&mut batch], lower.clone(), upper.clone(), true)
481 .await?;
482 match res {
483 Ok(()) => {
484 self.upper = upper;
485 return Ok(Ok(()));
486 }
487 Err(mismatch) => {
488 if PartialOrder::less_than(&mismatch.current, &lower) {
490 self.upper.clone_from(&mismatch.current);
491
492 batch.delete().await;
493
494 return Ok(Err(mismatch));
495 } else if PartialOrder::less_than(&mismatch.current, &upper) {
496 lower = mismatch.current;
503 } else {
504 self.upper = mismatch.current;
506
507 batch.delete().await;
511
512 return Ok(Ok(()));
513 }
514 }
515 }
516 }
517 }
518
519 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
553 pub async fn compare_and_append_batch(
554 &mut self,
555 batches: &mut [&mut Batch<K, V, T, D>],
556 expected_upper: Antichain<T>,
557 new_upper: Antichain<T>,
558 validate_part_bounds_on_write: bool,
559 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
560 where
561 D: Send + Sync,
562 {
563 let schema_id = self.try_register_schema().await;
567
568 for batch in batches.iter() {
569 if self.machine.shard_id() != batch.shard_id() {
570 return Err(InvalidUsage::BatchNotFromThisShard {
571 batch_shard: batch.shard_id(),
572 handle_shard: self.machine.shard_id(),
573 });
574 }
575 assert_code_can_read_data(&self.cfg.build_version, &batch.version);
576 if self.cfg.build_version > batch.version {
577 info!(
578 shard_id =? self.machine.shard_id(),
579 batch_version =? batch.version,
580 writer_version =? self.cfg.build_version,
581 "Appending batch from the past. This is fine but should be rare. \
582 TODO: Error on very old versions once the leaked blob detector exists."
583 )
584 }
585 fn assert_schema<A: Codec>(writer_schema: &A::Schema, batch_schema: &bytes::Bytes) {
586 if batch_schema.is_empty() {
587 return;
589 }
590 let batch_schema: A::Schema = A::decode_schema(batch_schema);
591 if *writer_schema != batch_schema {
592 error!(
593 ?writer_schema,
594 ?batch_schema,
595 "writer and batch schemas should be identical"
596 );
597 soft_panic_or_log!("writer and batch schemas should be identical");
598 }
599 }
600 assert_schema::<K>(&*self.write_schemas.key, &batch.schemas.0);
601 assert_schema::<V>(&*self.write_schemas.val, &batch.schemas.1);
602 }
603
604 let lower = expected_upper.clone();
605 let upper = new_upper;
606 let since = Antichain::from_elem(T::minimum());
607 let desc = Description::new(lower, upper, since);
608
609 let mut received_inline_backpressure = false;
610 let mut inline_batch_builder: Option<(_, BatchBuilder<K, V, T, D>)> = None;
617 let maintenance = loop {
618 let any_batch_rewrite = batches
619 .iter()
620 .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some()));
621 let (mut parts, mut num_updates, mut run_splits, mut run_metas) =
622 (vec![], 0, vec![], vec![]);
623 let mut key_storage = None;
624 let mut val_storage = None;
625 for batch in batches.iter() {
626 let () = validate_truncate_batch(
627 &batch.batch,
628 &desc,
629 any_batch_rewrite,
630 validate_part_bounds_on_write,
631 )?;
632 for (run_meta, run) in batch.batch.runs() {
633 let start_index = parts.len();
634 for part in run {
635 if let (
636 RunPart::Single(
637 batch_part @ BatchPart::Inline {
638 updates,
639 ts_rewrite,
640 schema_id: _,
641 deprecated_schema_id: _,
642 },
643 ),
644 Some((schema_cache, builder)),
645 ) = (part, &mut inline_batch_builder)
646 {
647 let schema_migration = PartMigration::new(
648 batch_part,
649 self.write_schemas.clone(),
650 schema_cache,
651 )
652 .await
653 .expect("schemas for inline user part");
654
655 let encoded_part = EncodedPart::from_inline(
656 &crate::fetch::FetchConfig::from_persist_config(&self.cfg),
657 &*self.metrics,
658 self.metrics.read.compaction.clone(),
659 desc.clone(),
660 updates,
661 ts_rewrite.as_ref(),
662 );
663 let mut fetched_part = FetchedPart::new(
664 Arc::clone(&self.metrics),
665 encoded_part,
666 schema_migration,
667 FetchBatchFilter::Compaction {
668 since: desc.since().clone(),
669 },
670 false,
671 PartDecodeFormat::Arrow,
672 None,
673 );
674
675 while let Some(((k, v), t, d)) =
676 fetched_part.next_with_storage(&mut key_storage, &mut val_storage)
677 {
678 builder
679 .add(&k, &v, &t, &d)
680 .await
681 .expect("re-encoding just-decoded data");
682 }
683 } else {
684 parts.push(part.clone())
685 }
686 }
687
688 let end_index = parts.len();
689
690 if start_index == end_index {
691 continue;
692 }
693
694 if start_index != 0 {
696 run_splits.push(start_index);
697 }
698 run_metas.push(run_meta.clone());
699 }
700 num_updates += batch.batch.len;
701 }
702
703 let mut flushed_inline_batch = if let Some((_, builder)) = inline_batch_builder.take() {
704 let mut finished = builder
705 .finish(desc.upper().clone())
706 .await
707 .expect("invalid usage");
708 let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
709 finished
710 .flush_to_blob(
711 &cfg,
712 &self.metrics.inline.backpressure,
713 &self.isolated_runtime,
714 &self.write_schemas,
715 )
716 .await;
717 Some(finished)
718 } else {
719 None
720 };
721
722 if let Some(batch) = &flushed_inline_batch {
723 for (run_meta, run) in batch.batch.runs() {
724 assert!(run.len() > 0);
725 let start_index = parts.len();
726 if start_index != 0 {
727 run_splits.push(start_index);
728 }
729 run_metas.push(run_meta.clone());
730 parts.extend(run.iter().cloned())
731 }
732 }
733
734 let mut combined_batch =
735 HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits);
736
737 match schema_id {
741 Some(schema_id) => {
742 ensure_batch_schema(&mut combined_batch, self.shard_id(), schema_id);
743 }
744 None => {
745 assert!(
746 self.fetch_recent_upper().await.is_empty(),
747 "fetching a schema id should only fail when the shard is tombstoned"
748 )
749 }
750 }
751
752 let heartbeat_timestamp = (self.cfg.now)();
753 let res = self
754 .machine
755 .compare_and_append(
756 &combined_batch,
757 &self.writer_id,
758 &self.debug_state,
759 heartbeat_timestamp,
760 )
761 .await;
762
763 match res {
764 CompareAndAppendRes::Success(_seqno, maintenance) => {
765 self.upper.clone_from(desc.upper());
766 for batch in batches.iter_mut() {
767 batch.mark_consumed();
768 }
769 if let Some(batch) = &mut flushed_inline_batch {
770 batch.mark_consumed();
771 }
772 break maintenance;
773 }
774 CompareAndAppendRes::InvalidUsage(invalid_usage) => {
775 if let Some(batch) = flushed_inline_batch.take() {
776 batch.delete().await;
777 }
778 return Err(invalid_usage);
779 }
780 CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => {
781 if let Some(batch) = flushed_inline_batch.take() {
782 batch.delete().await;
783 }
784 self.upper.clone_from(¤t_upper);
787 return Ok(Err(UpperMismatch {
788 current: current_upper,
789 expected: expected_upper,
790 }));
791 }
792 CompareAndAppendRes::InlineBackpressure => {
793 assert_eq!(received_inline_backpressure, false);
796 received_inline_backpressure = true;
797 if COMBINE_INLINE_WRITES.get(&self.cfg) {
798 inline_batch_builder = Some((
799 self.machine.applier.schema_cache(),
800 self.builder(desc.lower().clone()),
801 ));
802 continue;
803 }
804
805 let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
806 let flush_batches = batches
809 .iter_mut()
810 .map(|batch| async {
811 batch
812 .flush_to_blob(
813 &cfg,
814 &self.metrics.inline.backpressure,
815 &self.isolated_runtime,
816 &self.write_schemas,
817 )
818 .await
819 })
820 .collect::<FuturesUnordered<_>>();
821 let () = flush_batches.collect::<()>().await;
822
823 for batch in batches.iter() {
824 assert_eq!(batch.batch.inline_bytes(), 0);
825 }
826
827 continue;
828 }
829 }
830 };
831
832 maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
833
834 Ok(Ok(()))
835 }
836
837 pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> {
840 let shard_id: ShardId = batch
841 .shard_id
842 .into_rust()
843 .expect("valid transmittable batch");
844 assert_eq!(shard_id, self.machine.shard_id());
845
846 let ret = Batch {
847 batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
848 metrics: Arc::clone(&self.metrics),
849 shard_metrics: Arc::clone(&self.machine.applier.shard_metrics),
850 version: Version::parse(&batch.version).expect("valid transmittable batch"),
851 schemas: (batch.key_schema, batch.val_schema),
852 batch: batch
853 .batch
854 .into_rust_if_some("ProtoBatch::batch")
855 .expect("valid transmittable batch"),
856 blob: Arc::clone(&self.blob),
857 _phantom: std::marker::PhantomData,
858 };
859 assert_eq!(ret.shard_id(), self.machine.shard_id());
860 ret
861 }
862
863 pub fn builder(&self, lower: Antichain<T>) -> BatchBuilder<K, V, T, D> {
876 Self::builder_inner(
877 &self.cfg,
878 CompactConfig::new(&self.cfg, self.shard_id()),
879 Arc::clone(&self.metrics),
880 Arc::clone(&self.machine.applier.shard_metrics),
881 &self.metrics.user,
882 Arc::clone(&self.isolated_runtime),
883 Arc::clone(&self.blob),
884 self.shard_id(),
885 self.write_schemas.clone(),
886 lower,
887 )
888 }
889
890 pub(crate) fn builder_inner(
893 persist_cfg: &PersistConfig,
894 compact_cfg: CompactConfig,
895 metrics: Arc<Metrics>,
896 shard_metrics: Arc<ShardMetrics>,
897 user_batch_metrics: &BatchWriteMetrics,
898 isolated_runtime: Arc<IsolatedRuntime>,
899 blob: Arc<dyn Blob>,
900 shard_id: ShardId,
901 schemas: Schemas<K, V>,
902 lower: Antichain<T>,
903 ) -> BatchBuilder<K, V, T, D> {
904 let parts = if let Some(max_runs) = compact_cfg.batch.max_runs {
905 BatchParts::new_compacting::<K, V, D>(
906 compact_cfg,
907 Description::new(
908 lower.clone(),
909 Antichain::new(),
910 Antichain::from_elem(T::minimum()),
911 ),
912 max_runs,
913 Arc::clone(&metrics),
914 shard_metrics,
915 shard_id,
916 Arc::clone(&blob),
917 isolated_runtime,
918 user_batch_metrics,
919 schemas.clone(),
920 )
921 } else {
922 BatchParts::new_ordered::<D>(
923 compact_cfg.batch,
924 RunOrder::Unordered,
925 Arc::clone(&metrics),
926 shard_metrics,
927 shard_id,
928 Arc::clone(&blob),
929 isolated_runtime,
930 user_batch_metrics,
931 )
932 };
933 let builder = BatchBuilderInternal::new(
934 BatchBuilderConfig::new(persist_cfg, shard_id),
935 parts,
936 metrics,
937 schemas,
938 blob,
939 shard_id,
940 persist_cfg.build_version.clone(),
941 );
942 BatchBuilder::new(
943 builder,
944 Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())),
945 )
946 }
947
948 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
951 pub async fn batch<SB, KB, VB, TB, DB, I>(
952 &mut self,
953 updates: I,
954 lower: Antichain<T>,
955 upper: Antichain<T>,
956 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>>
957 where
958 SB: Borrow<((KB, VB), TB, DB)>,
959 KB: Borrow<K>,
960 VB: Borrow<V>,
961 TB: Borrow<T>,
962 DB: Borrow<D>,
963 I: IntoIterator<Item = SB>,
964 {
965 let iter = updates.into_iter();
966
967 let mut builder = self.builder(lower.clone());
968
969 for update in iter {
970 let ((k, v), t, d) = update.borrow();
971 let (k, v, t, d) = (k.borrow(), v.borrow(), t.borrow(), d.borrow());
972 match builder.add(k, v, t, d).await {
973 Ok(Added::Record | Added::RecordAndParts) => (),
974 Err(invalid_usage) => return Err(invalid_usage),
975 }
976 }
977
978 builder.finish(upper.clone()).await
979 }
980
981 pub async fn wait_for_upper_past(&mut self, frontier: &Antichain<T>) {
983 let mut watch = self.machine.applier.watch();
984 self.machine
985 .wait_for_upper_past(
986 frontier,
987 &mut watch,
988 None,
989 &self.metrics.retries.next_listen_batch, next_listen_batch_retry_params(&self.cfg),
991 )
992 .await;
993 let upper = self.machine.applier.clone_upper();
994 if PartialOrder::less_than(&self.upper, &upper) {
995 self.upper.clone_from(&upper);
996 }
997 assert!(PartialOrder::less_than(frontier, &self.upper));
998 }
999
1000 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
1009 pub async fn expire(mut self) {
1010 let Some(expire_fn) = self.expire_fn.take() else {
1011 return;
1012 };
1013 expire_fn.0().await;
1014 }
1015
1016 fn expire_fn(
1017 machine: Machine<K, V, T, D>,
1018 gc: GarbageCollector<K, V, T, D>,
1019 writer_id: WriterId,
1020 ) -> ExpireFn {
1021 ExpireFn(Box::new(move || {
1022 Box::pin(async move {
1023 let (_, maintenance) = machine.expire_writer(&writer_id).await;
1024 maintenance.start_performing(&machine, &gc);
1025 })
1026 }))
1027 }
1028
1029 #[cfg(test)]
1031 #[track_caller]
1032 pub async fn expect_append<L, U>(&mut self, updates: &[((K, V), T, D)], lower: L, new_upper: U)
1033 where
1034 L: Into<Antichain<T>>,
1035 U: Into<Antichain<T>>,
1036 D: Send + Sync,
1037 {
1038 self.append(updates.iter(), lower.into(), new_upper.into())
1039 .await
1040 .expect("invalid usage")
1041 .expect("unexpected upper");
1042 }
1043
1044 #[cfg(test)]
1047 #[track_caller]
1048 pub async fn expect_compare_and_append(
1049 &mut self,
1050 updates: &[((K, V), T, D)],
1051 expected_upper: T,
1052 new_upper: T,
1053 ) where
1054 D: Send + Sync,
1055 {
1056 self.compare_and_append(
1057 updates.iter().map(|((k, v), t, d)| ((k, v), t, d)),
1058 Antichain::from_elem(expected_upper),
1059 Antichain::from_elem(new_upper),
1060 )
1061 .await
1062 .expect("invalid usage")
1063 .expect("unexpected upper")
1064 }
1065
1066 #[cfg(test)]
1069 #[track_caller]
1070 pub async fn expect_compare_and_append_batch(
1071 &mut self,
1072 batches: &mut [&mut Batch<K, V, T, D>],
1073 expected_upper: T,
1074 new_upper: T,
1075 ) {
1076 self.compare_and_append_batch(
1077 batches,
1078 Antichain::from_elem(expected_upper),
1079 Antichain::from_elem(new_upper),
1080 true,
1081 )
1082 .await
1083 .expect("invalid usage")
1084 .expect("unexpected upper")
1085 }
1086
1087 #[cfg(test)]
1089 #[track_caller]
1090 pub async fn expect_batch(
1091 &mut self,
1092 updates: &[((K, V), T, D)],
1093 lower: T,
1094 upper: T,
1095 ) -> Batch<K, V, T, D> {
1096 self.batch(
1097 updates.iter(),
1098 Antichain::from_elem(lower),
1099 Antichain::from_elem(upper),
1100 )
1101 .await
1102 .expect("invalid usage")
1103 }
1104}
1105
1106impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D> {
1107 fn drop(&mut self) {
1108 let Some(expire_fn) = self.expire_fn.take() else {
1109 return;
1110 };
1111 let handle = match Handle::try_current() {
1112 Ok(x) => x,
1113 Err(_) => {
1114 warn!(
1115 "WriteHandle {} dropped without being explicitly expired, falling back to lease timeout",
1116 self.writer_id
1117 );
1118 return;
1119 }
1120 };
1121 let expire_span = debug_span!("drop::expire");
1127 handle.spawn_named(
1128 || format!("WriteHandle::expire ({})", self.writer_id),
1129 expire_fn.0().instrument(expire_span),
1130 );
1131 }
1132}
1133
1134fn ensure_batch_schema<T>(batch: &mut HollowBatch<T>, shard_id: ShardId, schema_id: SchemaId)
1139where
1140 T: Timestamp + Lattice + Codec64,
1141{
1142 let ensure = |id: &mut Option<SchemaId>| match id {
1143 Some(id) => assert_eq!(*id, schema_id, "schema ID mismatch; shard={shard_id}"),
1144 None => *id = Some(schema_id),
1145 };
1146
1147 for run_meta in &mut batch.run_meta {
1148 ensure(&mut run_meta.schema);
1149 }
1150 for part in &mut batch.parts {
1151 match part {
1152 RunPart::Single(BatchPart::Hollow(part)) => ensure(&mut part.schema_id),
1153 RunPart::Single(BatchPart::Inline { schema_id, .. }) => ensure(schema_id),
1154 RunPart::Many(_hollow_run_ref) => {
1155 }
1159 }
1160 }
1161}
1162
1163#[cfg(test)]
1164mod tests {
1165 use std::str::FromStr;
1166 use std::sync::mpsc;
1167
1168 use differential_dataflow::consolidation::consolidate_updates;
1169 use futures_util::FutureExt;
1170 use mz_dyncfg::ConfigUpdates;
1171 use mz_ore::collections::CollectionExt;
1172 use mz_ore::task;
1173 use serde_json::json;
1174
1175 use crate::cache::PersistClientCache;
1176 use crate::tests::{all_ok, new_test_client};
1177 use crate::{PersistLocation, ShardId};
1178
1179 use super::*;
1180
1181 #[mz_persist_proc::test(tokio::test)]
1182 #[cfg_attr(miri, ignore)] async fn empty_batches(dyncfgs: ConfigUpdates) {
1184 let data = [
1185 (("1".to_owned(), "one".to_owned()), 1, 1),
1186 (("2".to_owned(), "two".to_owned()), 2, 1),
1187 (("3".to_owned(), "three".to_owned()), 3, 1),
1188 ];
1189
1190 let (mut write, _) = new_test_client(&dyncfgs)
1191 .await
1192 .expect_open::<String, String, u64, i64>(ShardId::new())
1193 .await;
1194 let blob = Arc::clone(&write.blob);
1195
1196 let mut upper = 3;
1198 write.expect_append(&data[..2], vec![0], vec![upper]).await;
1199
1200 let mut count_before = 0;
1202 blob.list_keys_and_metadata("", &mut |_| {
1203 count_before += 1;
1204 })
1205 .await
1206 .expect("list_keys failed");
1207 for _ in 0..5 {
1208 let new_upper = upper + 1;
1209 write.expect_compare_and_append(&[], upper, new_upper).await;
1210 upper = new_upper;
1211 }
1212 let mut count_after = 0;
1213 blob.list_keys_and_metadata("", &mut |_| {
1214 count_after += 1;
1215 })
1216 .await
1217 .expect("list_keys failed");
1218 assert_eq!(count_after, count_before);
1219 }
1220
1221 #[mz_persist_proc::test(tokio::test)]
1222 #[cfg_attr(miri, ignore)] async fn compare_and_append_batch_multi(dyncfgs: ConfigUpdates) {
1224 let data0 = vec![
1225 (("1".to_owned(), "one".to_owned()), 1, 1),
1226 (("2".to_owned(), "two".to_owned()), 2, 1),
1227 (("4".to_owned(), "four".to_owned()), 4, 1),
1228 ];
1229 let data1 = vec![
1230 (("1".to_owned(), "one".to_owned()), 1, 1),
1231 (("2".to_owned(), "two".to_owned()), 2, 1),
1232 (("3".to_owned(), "three".to_owned()), 3, 1),
1233 ];
1234
1235 let (mut write, mut read) = new_test_client(&dyncfgs)
1236 .await
1237 .expect_open::<String, String, u64, i64>(ShardId::new())
1238 .await;
1239
1240 let mut batch0 = write.expect_batch(&data0, 0, 5).await;
1241 let mut batch1 = write.expect_batch(&data1, 0, 4).await;
1242
1243 write
1244 .expect_compare_and_append_batch(&mut [&mut batch0, &mut batch1], 0, 4)
1245 .await;
1246
1247 let batch = write
1248 .machine
1249 .unleased_snapshot(&Antichain::from_elem(3))
1250 .await
1251 .expect("just wrote this")
1252 .into_element();
1253
1254 assert!(batch.runs().count() >= 2);
1255
1256 let expected = vec![
1257 (("1".to_owned(), "one".to_owned()), 1, 2),
1258 (("2".to_owned(), "two".to_owned()), 2, 2),
1259 (("3".to_owned(), "three".to_owned()), 3, 1),
1260 ];
1261 let mut actual = read.expect_snapshot_and_fetch(3).await;
1262 consolidate_updates(&mut actual);
1263 assert_eq!(actual, all_ok(&expected, 3));
1264 }
1265
1266 #[mz_ore::test]
1267 fn writer_id_human_readable_serde() {
1268 #[derive(Debug, Serialize, Deserialize)]
1269 struct Container {
1270 writer_id: WriterId,
1271 }
1272
1273 let id = WriterId::from_str("w00000000-1234-5678-0000-000000000000").expect("valid id");
1275 assert_eq!(
1276 id,
1277 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1278 .expect("deserializable")
1279 );
1280
1281 assert_eq!(
1283 id,
1284 serde_json::from_str("\"w00000000-1234-5678-0000-000000000000\"")
1285 .expect("deserializable")
1286 );
1287
1288 let json = json!({ "writer_id": id });
1290 assert_eq!(
1291 "{\"writer_id\":\"w00000000-1234-5678-0000-000000000000\"}",
1292 &json.to_string()
1293 );
1294 let container: Container = serde_json::from_value(json).expect("deserializable");
1295 assert_eq!(container.writer_id, id);
1296 }
1297
1298 #[mz_persist_proc::test(tokio::test)]
1299 #[cfg_attr(miri, ignore)] async fn hollow_batch_roundtrip(dyncfgs: ConfigUpdates) {
1301 let data = vec![
1302 (("1".to_owned(), "one".to_owned()), 1, 1),
1303 (("2".to_owned(), "two".to_owned()), 2, 1),
1304 (("3".to_owned(), "three".to_owned()), 3, 1),
1305 ];
1306
1307 let (mut write, mut read) = new_test_client(&dyncfgs)
1308 .await
1309 .expect_open::<String, String, u64, i64>(ShardId::new())
1310 .await;
1311
1312 let batch = write.expect_batch(&data, 0, 4).await;
1317 let hollow_batch = batch.into_transmittable_batch();
1318 let mut rehydrated_batch = write.batch_from_transmittable_batch(hollow_batch);
1319
1320 write
1321 .expect_compare_and_append_batch(&mut [&mut rehydrated_batch], 0, 4)
1322 .await;
1323
1324 let expected = vec![
1325 (("1".to_owned(), "one".to_owned()), 1, 1),
1326 (("2".to_owned(), "two".to_owned()), 2, 1),
1327 (("3".to_owned(), "three".to_owned()), 3, 1),
1328 ];
1329 let mut actual = read.expect_snapshot_and_fetch(3).await;
1330 consolidate_updates(&mut actual);
1331 assert_eq!(actual, all_ok(&expected, 3));
1332 }
1333
1334 #[mz_persist_proc::test(tokio::test)]
1335 #[cfg_attr(miri, ignore)] async fn wait_for_upper_past(dyncfgs: ConfigUpdates) {
1337 let client = new_test_client(&dyncfgs).await;
1338 let (mut write, _) = client.expect_open::<(), (), u64, i64>(ShardId::new()).await;
1339 let five = Antichain::from_elem(5);
1340
1341 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1343
1344 write
1346 .expect_compare_and_append(&[(((), ()), 1, 1)], 0, 5)
1347 .await;
1348 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1349
1350 write
1352 .expect_compare_and_append(&[(((), ()), 5, 1)], 5, 7)
1353 .await;
1354 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), Some(()));
1355 assert_eq!(write.upper(), &Antichain::from_elem(7));
1356
1357 assert_eq!(
1360 write
1361 .wait_for_upper_past(&Antichain::from_elem(2))
1362 .now_or_never(),
1363 Some(())
1364 );
1365 assert_eq!(write.upper(), &Antichain::from_elem(7));
1366 }
1367
1368 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1369 #[cfg_attr(miri, ignore)] async fn fetch_recent_upper_linearized() {
1371 type Timestamp = u64;
1372 let max_upper = 1000;
1373
1374 let shard_id = ShardId::new();
1375 let mut clients = PersistClientCache::new_no_metrics();
1376 let upper_writer_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1377 let (mut upper_writer, _) = upper_writer_client
1378 .expect_open::<(), (), Timestamp, i64>(shard_id)
1379 .await;
1380 clients.clear_state_cache();
1383 let upper_reader_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1384 let (mut upper_reader, _) = upper_reader_client
1385 .expect_open::<(), (), Timestamp, i64>(shard_id)
1386 .await;
1387 let (tx, rx) = mpsc::channel();
1388
1389 let task = task::spawn(|| "upper-reader", async move {
1390 let mut upper = Timestamp::MIN;
1391
1392 while upper < max_upper {
1393 while let Ok(new_upper) = rx.try_recv() {
1394 upper = new_upper;
1395 }
1396
1397 let recent_upper = upper_reader
1398 .fetch_recent_upper()
1399 .await
1400 .as_option()
1401 .cloned()
1402 .expect("u64 is totally ordered and the shard is not finalized");
1403 assert!(
1404 recent_upper >= upper,
1405 "recent upper {recent_upper:?} is less than known upper {upper:?}"
1406 );
1407 }
1408 });
1409
1410 for upper in Timestamp::MIN..max_upper {
1411 let next_upper = upper + 1;
1412 upper_writer
1413 .expect_compare_and_append(&[], upper, next_upper)
1414 .await;
1415 tx.send(next_upper).expect("send failed");
1416 }
1417
1418 task.await;
1419 }
1420}