1use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::Description;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_dyncfg::Config;
22use mz_ore::task::RuntimeExt;
23use mz_ore::{instrument, soft_panic_or_log};
24use mz_persist::location::Blob;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::{Codec, Codec64};
27use mz_proto::{IntoRustIfSome, ProtoType};
28use proptest_derive::Arbitrary;
29use semver::Version;
30use serde::{Deserialize, Serialize};
31use timely::PartialOrder;
32use timely::order::TotalOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::runtime::Handle;
35use tracing::{Instrument, debug_span, error, info, warn};
36use uuid::Uuid;
37
38use crate::batch::{
39 Added, BATCH_DELETE_ENABLED, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
40 BatchParts, ProtoBatch, validate_truncate_batch,
41};
42use crate::error::{InvalidUsage, UpperMismatch};
43use crate::fetch::{
44 EncodedPart, FetchBatchFilter, FetchedPart, PartDecodeFormat, VALIDATE_PART_BOUNDS_ON_READ,
45};
46use crate::internal::compact::{CompactConfig, Compactor};
47use crate::internal::encoding::{Schemas, assert_code_can_read_data};
48use crate::internal::machine::{CompareAndAppendRes, ExpireFn, Machine};
49use crate::internal::metrics::{BatchWriteMetrics, Metrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HandleDebugState, HollowBatch, RunOrder, RunPart};
51use crate::read::ReadHandle;
52use crate::schema::PartMigration;
53use crate::{GarbageCollector, IsolatedRuntime, PersistConfig, ShardId, parse_id};
54
55pub(crate) const COMBINE_INLINE_WRITES: Config<bool> = Config::new(
56 "persist_write_combine_inline_writes",
57 true,
58 "If set, re-encode inline writes if they don't fit into the batch metadata limits.",
59);
60
61pub(crate) const VALIDATE_PART_BOUNDS_ON_WRITE: Config<bool> = Config::new(
62 "persist_validate_part_bounds_on_write",
63 false,
64 "Validate the part lower <= the batch lower and the part upper <= batch upper,\
65 for the batch being appended.",
66);
67
68#[derive(
70 Arbitrary,
71 Clone,
72 PartialEq,
73 Eq,
74 PartialOrd,
75 Ord,
76 Hash,
77 Serialize,
78 Deserialize
79)]
80#[serde(try_from = "String", into = "String")]
81pub struct WriterId(pub(crate) [u8; 16]);
82
83impl std::fmt::Display for WriterId {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 write!(f, "w{}", Uuid::from_bytes(self.0))
86 }
87}
88
89impl std::fmt::Debug for WriterId {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 write!(f, "WriterId({})", Uuid::from_bytes(self.0))
92 }
93}
94
95impl std::str::FromStr for WriterId {
96 type Err = String;
97
98 fn from_str(s: &str) -> Result<Self, Self::Err> {
99 parse_id("w", "WriterId", s).map(WriterId)
100 }
101}
102
103impl From<WriterId> for String {
104 fn from(writer_id: WriterId) -> Self {
105 writer_id.to_string()
106 }
107}
108
109impl TryFrom<String> for WriterId {
110 type Error = String;
111
112 fn try_from(s: String) -> Result<Self, Self::Error> {
113 s.parse()
114 }
115}
116
117impl WriterId {
118 pub(crate) fn new() -> Self {
119 WriterId(*Uuid::new_v4().as_bytes())
120 }
121}
122
123#[derive(Debug)]
139pub struct WriteHandle<K: Codec, V: Codec, T, D> {
140 pub(crate) cfg: PersistConfig,
141 pub(crate) metrics: Arc<Metrics>,
142 pub(crate) machine: Machine<K, V, T, D>,
143 pub(crate) gc: GarbageCollector<K, V, T, D>,
144 pub(crate) compact: Option<Compactor<K, V, T, D>>,
145 pub(crate) blob: Arc<dyn Blob>,
146 pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
147 pub(crate) writer_id: WriterId,
148 pub(crate) debug_state: HandleDebugState,
149 pub(crate) write_schemas: Schemas<K, V>,
150
151 pub(crate) upper: Antichain<T>,
152 expire_fn: Option<ExpireFn>,
153}
154
155impl<K, V, T, D> WriteHandle<K, V, T, D>
156where
157 K: Debug + Codec,
158 V: Debug + Codec,
159 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
160 D: Monoid + Ord + Codec64 + Send + Sync,
161{
162 pub(crate) fn new(
163 cfg: PersistConfig,
164 metrics: Arc<Metrics>,
165 machine: Machine<K, V, T, D>,
166 gc: GarbageCollector<K, V, T, D>,
167 blob: Arc<dyn Blob>,
168 writer_id: WriterId,
169 purpose: &str,
170 write_schemas: Schemas<K, V>,
171 ) -> Self {
172 let isolated_runtime = Arc::clone(&machine.isolated_runtime);
173 let compact = cfg
174 .compaction_enabled
175 .then(|| Compactor::new(cfg.clone(), Arc::clone(&metrics), gc.clone()));
176 let debug_state = HandleDebugState {
177 hostname: cfg.hostname.to_owned(),
178 purpose: purpose.to_owned(),
179 };
180 let upper = machine.applier.clone_upper();
181 let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), writer_id.clone());
182 WriteHandle {
183 cfg,
184 metrics,
185 machine,
186 gc,
187 compact,
188 blob,
189 isolated_runtime,
190 writer_id,
191 debug_state,
192 write_schemas,
193 upper,
194 expire_fn: Some(expire_fn),
195 }
196 }
197
198 pub fn from_read(read: &ReadHandle<K, V, T, D>, purpose: &str) -> Self {
201 Self::new(
202 read.cfg.clone(),
203 Arc::clone(&read.metrics),
204 read.machine.clone(),
205 read.gc.clone(),
206 Arc::clone(&read.blob),
207 WriterId::new(),
208 purpose,
209 read.read_schemas.clone(),
210 )
211 }
212
213 pub fn validate_part_bounds_on_write(&self) -> bool {
216 VALIDATE_PART_BOUNDS_ON_WRITE.get(&self.cfg) || VALIDATE_PART_BOUNDS_ON_READ.get(&self.cfg)
219 }
220
221 pub fn shard_id(&self) -> ShardId {
223 self.machine.shard_id()
224 }
225
226 pub fn schema_id(&self) -> Option<SchemaId> {
228 self.write_schemas.id
229 }
230
231 pub async fn try_register_schema(&mut self) -> Option<SchemaId> {
237 let Schemas { id, key, val } = &self.write_schemas;
238
239 if let Some(id) = id {
240 return Some(*id);
241 }
242
243 let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
244 maintenance.start_performing(&self.machine, &self.gc);
245
246 self.write_schemas.id = schema_id;
247 schema_id
248 }
249
250 pub fn upper(&self) -> &Antichain<T> {
257 &self.upper
258 }
259
260 pub fn shared_upper(&self) -> Antichain<T> {
266 self.machine.applier.clone_upper()
267 }
268
269 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
275 pub async fn fetch_recent_upper(&mut self) -> &Antichain<T> {
276 self.machine
279 .applier
280 .fetch_upper(|current_upper| self.upper.clone_from(current_upper))
281 .await;
282 &self.upper
283 }
284
285 pub async fn advance_upper(&mut self, target: &Antichain<T>) {
293 let mut lower = self.shared_upper().clone();
296
297 while !PartialOrder::less_equal(target, &lower) {
298 let since = Antichain::from_elem(T::minimum());
299 let desc = Description::new(lower.clone(), target.clone(), since);
300 let batch = HollowBatch::empty(desc);
301
302 let heartbeat_timestamp = (self.cfg.now)();
303 let res = self
304 .machine
305 .compare_and_append(
306 &batch,
307 &self.writer_id,
308 &self.debug_state,
309 heartbeat_timestamp,
310 )
311 .await;
312
313 use CompareAndAppendRes::*;
314 let new_upper = match res {
315 Success(_seq_no, maintenance) => {
316 maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
317 batch.desc.upper().clone()
318 }
319 UpperMismatch(_seq_no, actual_upper) => actual_upper,
320 InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
321 InlineBackpressure => unreachable!("batch was empty"),
322 };
323
324 self.upper.clone_from(&new_upper);
325 lower = new_upper;
326 }
327 }
328
329 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
359 pub async fn append<SB, KB, VB, TB, DB, I>(
360 &mut self,
361 updates: I,
362 lower: Antichain<T>,
363 upper: Antichain<T>,
364 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
365 where
366 SB: Borrow<((KB, VB), TB, DB)>,
367 KB: Borrow<K>,
368 VB: Borrow<V>,
369 TB: Borrow<T>,
370 DB: Borrow<D>,
371 I: IntoIterator<Item = SB>,
372 D: Send + Sync,
373 {
374 let batch = self.batch(updates, lower.clone(), upper.clone()).await?;
375 self.append_batch(batch, lower, upper).await
376 }
377
378 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
407 pub async fn compare_and_append<SB, KB, VB, TB, DB, I>(
408 &mut self,
409 updates: I,
410 expected_upper: Antichain<T>,
411 new_upper: Antichain<T>,
412 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
413 where
414 SB: Borrow<((KB, VB), TB, DB)>,
415 KB: Borrow<K>,
416 VB: Borrow<V>,
417 TB: Borrow<T>,
418 DB: Borrow<D>,
419 I: IntoIterator<Item = SB>,
420 D: Send + Sync,
421 {
422 let mut batch = self
423 .batch(updates, expected_upper.clone(), new_upper.clone())
424 .await?;
425 match self
426 .compare_and_append_batch(&mut [&mut batch], expected_upper, new_upper, true)
427 .await
428 {
429 ok @ Ok(Ok(())) => ok,
430 err => {
431 batch.delete().await;
436 err
437 }
438 }
439 }
440
441 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
467 pub async fn append_batch(
468 &mut self,
469 mut batch: Batch<K, V, T, D>,
470 mut lower: Antichain<T>,
471 upper: Antichain<T>,
472 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
473 where
474 D: Send + Sync,
475 {
476 loop {
477 let res = self
478 .compare_and_append_batch(&mut [&mut batch], lower.clone(), upper.clone(), true)
479 .await?;
480 match res {
481 Ok(()) => {
482 self.upper = upper;
483 return Ok(Ok(()));
484 }
485 Err(mismatch) => {
486 if PartialOrder::less_than(&mismatch.current, &lower) {
488 self.upper.clone_from(&mismatch.current);
489
490 batch.delete().await;
491
492 return Ok(Err(mismatch));
493 } else if PartialOrder::less_than(&mismatch.current, &upper) {
494 lower = mismatch.current;
501 } else {
502 self.upper = mismatch.current;
504
505 batch.delete().await;
509
510 return Ok(Ok(()));
511 }
512 }
513 }
514 }
515 }
516
517 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
551 pub async fn compare_and_append_batch(
552 &mut self,
553 batches: &mut [&mut Batch<K, V, T, D>],
554 expected_upper: Antichain<T>,
555 new_upper: Antichain<T>,
556 validate_part_bounds_on_write: bool,
557 ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
558 where
559 D: Send + Sync,
560 {
561 let schema_id = self.try_register_schema().await;
565
566 for batch in batches.iter() {
567 if self.machine.shard_id() != batch.shard_id() {
568 return Err(InvalidUsage::BatchNotFromThisShard {
569 batch_shard: batch.shard_id(),
570 handle_shard: self.machine.shard_id(),
571 });
572 }
573 assert_code_can_read_data(&self.cfg.build_version, &batch.version);
574 if self.cfg.build_version > batch.version {
575 info!(
576 shard_id =? self.machine.shard_id(),
577 batch_version =? batch.version,
578 writer_version =? self.cfg.build_version,
579 "Appending batch from the past. This is fine but should be rare. \
580 TODO: Error on very old versions once the leaked blob detector exists."
581 )
582 }
583 fn assert_schema<A: Codec>(writer_schema: &A::Schema, batch_schema: &bytes::Bytes) {
584 if batch_schema.is_empty() {
585 return;
587 }
588 let batch_schema: A::Schema = A::decode_schema(batch_schema);
589 if *writer_schema != batch_schema {
590 error!(
591 ?writer_schema,
592 ?batch_schema,
593 "writer and batch schemas should be identical"
594 );
595 soft_panic_or_log!("writer and batch schemas should be identical");
596 }
597 }
598 assert_schema::<K>(&*self.write_schemas.key, &batch.schemas.0);
599 assert_schema::<V>(&*self.write_schemas.val, &batch.schemas.1);
600 }
601
602 let lower = expected_upper.clone();
603 let upper = new_upper;
604 let since = Antichain::from_elem(T::minimum());
605 let desc = Description::new(lower, upper, since);
606
607 let mut received_inline_backpressure = false;
608 let mut inline_batch_builder: Option<(_, BatchBuilder<K, V, T, D>)> = None;
615 let maintenance = loop {
616 let any_batch_rewrite = batches
617 .iter()
618 .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some()));
619 let (mut parts, mut num_updates, mut run_splits, mut run_metas) =
620 (vec![], 0, vec![], vec![]);
621 let mut key_storage = None;
622 let mut val_storage = None;
623 for batch in batches.iter() {
624 let () = validate_truncate_batch(
625 &batch.batch,
626 &desc,
627 any_batch_rewrite,
628 validate_part_bounds_on_write,
629 )?;
630 for (run_meta, run) in batch.batch.runs() {
631 let start_index = parts.len();
632 for part in run {
633 if let (
634 RunPart::Single(
635 batch_part @ BatchPart::Inline {
636 updates,
637 ts_rewrite,
638 schema_id: _,
639 deprecated_schema_id: _,
640 },
641 ),
642 Some((schema_cache, builder)),
643 ) = (part, &mut inline_batch_builder)
644 {
645 let schema_migration = PartMigration::new(
646 batch_part,
647 self.write_schemas.clone(),
648 schema_cache,
649 )
650 .await
651 .expect("schemas for inline user part");
652
653 let encoded_part = EncodedPart::from_inline(
654 &crate::fetch::FetchConfig::from_persist_config(&self.cfg),
655 &*self.metrics,
656 self.metrics.read.compaction.clone(),
657 desc.clone(),
658 updates,
659 ts_rewrite.as_ref(),
660 );
661 let mut fetched_part = FetchedPart::new(
662 Arc::clone(&self.metrics),
663 encoded_part,
664 schema_migration,
665 FetchBatchFilter::Compaction {
666 since: desc.since().clone(),
667 },
668 false,
669 PartDecodeFormat::Arrow,
670 None,
671 );
672
673 while let Some(((k, v), t, d)) =
674 fetched_part.next_with_storage(&mut key_storage, &mut val_storage)
675 {
676 builder
677 .add(&k, &v, &t, &d)
678 .await
679 .expect("re-encoding just-decoded data");
680 }
681 } else {
682 parts.push(part.clone())
683 }
684 }
685
686 let end_index = parts.len();
687
688 if start_index == end_index {
689 continue;
690 }
691
692 if start_index != 0 {
694 run_splits.push(start_index);
695 }
696 run_metas.push(run_meta.clone());
697 }
698 num_updates += batch.batch.len;
699 }
700
701 let mut flushed_inline_batch = if let Some((_, builder)) = inline_batch_builder.take() {
702 let mut finished = builder
703 .finish(desc.upper().clone())
704 .await
705 .expect("invalid usage");
706 let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
707 finished
708 .flush_to_blob(
709 &cfg,
710 &self.metrics.inline.backpressure,
711 &self.isolated_runtime,
712 &self.write_schemas,
713 )
714 .await;
715 Some(finished)
716 } else {
717 None
718 };
719
720 if let Some(batch) = &flushed_inline_batch {
721 for (run_meta, run) in batch.batch.runs() {
722 assert!(run.len() > 0);
723 let start_index = parts.len();
724 if start_index != 0 {
725 run_splits.push(start_index);
726 }
727 run_metas.push(run_meta.clone());
728 parts.extend(run.iter().cloned())
729 }
730 }
731
732 let mut combined_batch =
733 HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits);
734
735 match schema_id {
739 Some(schema_id) => {
740 ensure_batch_schema(&mut combined_batch, self.shard_id(), schema_id);
741 }
742 None => {
743 assert!(
744 self.fetch_recent_upper().await.is_empty(),
745 "fetching a schema id should only fail when the shard is tombstoned"
746 )
747 }
748 }
749
750 let heartbeat_timestamp = (self.cfg.now)();
751 let res = self
752 .machine
753 .compare_and_append(
754 &combined_batch,
755 &self.writer_id,
756 &self.debug_state,
757 heartbeat_timestamp,
758 )
759 .await;
760
761 match res {
762 CompareAndAppendRes::Success(_seqno, maintenance) => {
763 self.upper.clone_from(desc.upper());
764 for batch in batches.iter_mut() {
765 batch.mark_consumed();
766 }
767 if let Some(batch) = &mut flushed_inline_batch {
768 batch.mark_consumed();
769 }
770 break maintenance;
771 }
772 CompareAndAppendRes::InvalidUsage(invalid_usage) => {
773 if let Some(batch) = flushed_inline_batch.take() {
774 batch.delete().await;
775 }
776 return Err(invalid_usage);
777 }
778 CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => {
779 if let Some(batch) = flushed_inline_batch.take() {
780 batch.delete().await;
781 }
782 self.upper.clone_from(¤t_upper);
785 return Ok(Err(UpperMismatch {
786 current: current_upper,
787 expected: expected_upper,
788 }));
789 }
790 CompareAndAppendRes::InlineBackpressure => {
791 assert_eq!(received_inline_backpressure, false);
794 received_inline_backpressure = true;
795 if COMBINE_INLINE_WRITES.get(&self.cfg) {
796 inline_batch_builder = Some((
797 self.machine.applier.schema_cache(),
798 self.builder(desc.lower().clone()),
799 ));
800 continue;
801 }
802
803 let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
804 let flush_batches = batches
807 .iter_mut()
808 .map(|batch| async {
809 batch
810 .flush_to_blob(
811 &cfg,
812 &self.metrics.inline.backpressure,
813 &self.isolated_runtime,
814 &self.write_schemas,
815 )
816 .await
817 })
818 .collect::<FuturesUnordered<_>>();
819 let () = flush_batches.collect::<()>().await;
820
821 for batch in batches.iter() {
822 assert_eq!(batch.batch.inline_bytes(), 0);
823 }
824
825 continue;
826 }
827 }
828 };
829
830 maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
831
832 Ok(Ok(()))
833 }
834
835 pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> {
838 let shard_id: ShardId = batch
839 .shard_id
840 .into_rust()
841 .expect("valid transmittable batch");
842 assert_eq!(shard_id, self.machine.shard_id());
843
844 let ret = Batch {
845 batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
846 metrics: Arc::clone(&self.metrics),
847 shard_metrics: Arc::clone(&self.machine.applier.shard_metrics),
848 version: Version::parse(&batch.version).expect("valid transmittable batch"),
849 schemas: (batch.key_schema, batch.val_schema),
850 batch: batch
851 .batch
852 .into_rust_if_some("ProtoBatch::batch")
853 .expect("valid transmittable batch"),
854 blob: Arc::clone(&self.blob),
855 _phantom: std::marker::PhantomData,
856 };
857 assert_eq!(ret.shard_id(), self.machine.shard_id());
858 ret
859 }
860
861 pub fn builder(&self, lower: Antichain<T>) -> BatchBuilder<K, V, T, D> {
874 Self::builder_inner(
875 &self.cfg,
876 CompactConfig::new(&self.cfg, self.shard_id()),
877 Arc::clone(&self.metrics),
878 Arc::clone(&self.machine.applier.shard_metrics),
879 &self.metrics.user,
880 Arc::clone(&self.isolated_runtime),
881 Arc::clone(&self.blob),
882 self.shard_id(),
883 self.write_schemas.clone(),
884 lower,
885 )
886 }
887
888 pub(crate) fn builder_inner(
891 persist_cfg: &PersistConfig,
892 compact_cfg: CompactConfig,
893 metrics: Arc<Metrics>,
894 shard_metrics: Arc<ShardMetrics>,
895 user_batch_metrics: &BatchWriteMetrics,
896 isolated_runtime: Arc<IsolatedRuntime>,
897 blob: Arc<dyn Blob>,
898 shard_id: ShardId,
899 schemas: Schemas<K, V>,
900 lower: Antichain<T>,
901 ) -> BatchBuilder<K, V, T, D> {
902 let parts = if let Some(max_runs) = compact_cfg.batch.max_runs {
903 BatchParts::new_compacting::<K, V, D>(
904 compact_cfg,
905 Description::new(
906 lower.clone(),
907 Antichain::new(),
908 Antichain::from_elem(T::minimum()),
909 ),
910 max_runs,
911 Arc::clone(&metrics),
912 shard_metrics,
913 shard_id,
914 Arc::clone(&blob),
915 isolated_runtime,
916 user_batch_metrics,
917 schemas.clone(),
918 )
919 } else {
920 BatchParts::new_ordered::<D>(
921 compact_cfg.batch,
922 RunOrder::Unordered,
923 Arc::clone(&metrics),
924 shard_metrics,
925 shard_id,
926 Arc::clone(&blob),
927 isolated_runtime,
928 user_batch_metrics,
929 )
930 };
931 let builder = BatchBuilderInternal::new(
932 BatchBuilderConfig::new(persist_cfg, shard_id),
933 parts,
934 metrics,
935 schemas,
936 blob,
937 shard_id,
938 persist_cfg.build_version.clone(),
939 );
940 BatchBuilder::new(
941 builder,
942 Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())),
943 )
944 }
945
946 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
949 pub async fn batch<SB, KB, VB, TB, DB, I>(
950 &mut self,
951 updates: I,
952 lower: Antichain<T>,
953 upper: Antichain<T>,
954 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>>
955 where
956 SB: Borrow<((KB, VB), TB, DB)>,
957 KB: Borrow<K>,
958 VB: Borrow<V>,
959 TB: Borrow<T>,
960 DB: Borrow<D>,
961 I: IntoIterator<Item = SB>,
962 {
963 let iter = updates.into_iter();
964
965 let mut builder = self.builder(lower.clone());
966
967 for update in iter {
968 let ((k, v), t, d) = update.borrow();
969 let (k, v, t, d) = (k.borrow(), v.borrow(), t.borrow(), d.borrow());
970 match builder.add(k, v, t, d).await {
971 Ok(Added::Record | Added::RecordAndParts) => (),
972 Err(invalid_usage) => return Err(invalid_usage),
973 }
974 }
975
976 builder.finish(upper.clone()).await
977 }
978
979 pub async fn wait_for_upper_past(&mut self, frontier: &Antichain<T>) {
981 let mut watch = self.machine.applier.watch();
982 let batch = self
983 .machine
984 .next_listen_batch(frontier, &mut watch, None, None)
985 .await;
986 if PartialOrder::less_than(&self.upper, batch.desc.upper()) {
987 self.upper.clone_from(batch.desc.upper());
988 }
989 assert!(PartialOrder::less_than(frontier, &self.upper));
990 }
991
992 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
1001 pub async fn expire(mut self) {
1002 let Some(expire_fn) = self.expire_fn.take() else {
1003 return;
1004 };
1005 expire_fn.0().await;
1006 }
1007
1008 fn expire_fn(
1009 machine: Machine<K, V, T, D>,
1010 gc: GarbageCollector<K, V, T, D>,
1011 writer_id: WriterId,
1012 ) -> ExpireFn {
1013 ExpireFn(Box::new(move || {
1014 Box::pin(async move {
1015 let (_, maintenance) = machine.expire_writer(&writer_id).await;
1016 maintenance.start_performing(&machine, &gc);
1017 })
1018 }))
1019 }
1020
1021 #[cfg(test)]
1023 #[track_caller]
1024 pub async fn expect_append<L, U>(&mut self, updates: &[((K, V), T, D)], lower: L, new_upper: U)
1025 where
1026 L: Into<Antichain<T>>,
1027 U: Into<Antichain<T>>,
1028 D: Send + Sync,
1029 {
1030 self.append(updates.iter(), lower.into(), new_upper.into())
1031 .await
1032 .expect("invalid usage")
1033 .expect("unexpected upper");
1034 }
1035
1036 #[cfg(test)]
1039 #[track_caller]
1040 pub async fn expect_compare_and_append(
1041 &mut self,
1042 updates: &[((K, V), T, D)],
1043 expected_upper: T,
1044 new_upper: T,
1045 ) where
1046 D: Send + Sync,
1047 {
1048 self.compare_and_append(
1049 updates.iter().map(|((k, v), t, d)| ((k, v), t, d)),
1050 Antichain::from_elem(expected_upper),
1051 Antichain::from_elem(new_upper),
1052 )
1053 .await
1054 .expect("invalid usage")
1055 .expect("unexpected upper")
1056 }
1057
1058 #[cfg(test)]
1061 #[track_caller]
1062 pub async fn expect_compare_and_append_batch(
1063 &mut self,
1064 batches: &mut [&mut Batch<K, V, T, D>],
1065 expected_upper: T,
1066 new_upper: T,
1067 ) {
1068 self.compare_and_append_batch(
1069 batches,
1070 Antichain::from_elem(expected_upper),
1071 Antichain::from_elem(new_upper),
1072 true,
1073 )
1074 .await
1075 .expect("invalid usage")
1076 .expect("unexpected upper")
1077 }
1078
1079 #[cfg(test)]
1081 #[track_caller]
1082 pub async fn expect_batch(
1083 &mut self,
1084 updates: &[((K, V), T, D)],
1085 lower: T,
1086 upper: T,
1087 ) -> Batch<K, V, T, D> {
1088 self.batch(
1089 updates.iter(),
1090 Antichain::from_elem(lower),
1091 Antichain::from_elem(upper),
1092 )
1093 .await
1094 .expect("invalid usage")
1095 }
1096}
1097
1098impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D> {
1099 fn drop(&mut self) {
1100 let Some(expire_fn) = self.expire_fn.take() else {
1101 return;
1102 };
1103 let handle = match Handle::try_current() {
1104 Ok(x) => x,
1105 Err(_) => {
1106 warn!(
1107 "WriteHandle {} dropped without being explicitly expired, falling back to lease timeout",
1108 self.writer_id
1109 );
1110 return;
1111 }
1112 };
1113 let expire_span = debug_span!("drop::expire");
1119 handle.spawn_named(
1120 || format!("WriteHandle::expire ({})", self.writer_id),
1121 expire_fn.0().instrument(expire_span),
1122 );
1123 }
1124}
1125
1126fn ensure_batch_schema<T>(batch: &mut HollowBatch<T>, shard_id: ShardId, schema_id: SchemaId)
1131where
1132 T: Timestamp + Lattice + Codec64,
1133{
1134 let ensure = |id: &mut Option<SchemaId>| match id {
1135 Some(id) => assert_eq!(*id, schema_id, "schema ID mismatch; shard={shard_id}"),
1136 None => *id = Some(schema_id),
1137 };
1138
1139 for run_meta in &mut batch.run_meta {
1140 ensure(&mut run_meta.schema);
1141 }
1142 for part in &mut batch.parts {
1143 match part {
1144 RunPart::Single(BatchPart::Hollow(part)) => ensure(&mut part.schema_id),
1145 RunPart::Single(BatchPart::Inline { schema_id, .. }) => ensure(schema_id),
1146 RunPart::Many(_hollow_run_ref) => {
1147 }
1151 }
1152 }
1153}
1154
1155#[cfg(test)]
1156mod tests {
1157 use std::str::FromStr;
1158 use std::sync::mpsc;
1159
1160 use differential_dataflow::consolidation::consolidate_updates;
1161 use futures_util::FutureExt;
1162 use mz_dyncfg::ConfigUpdates;
1163 use mz_ore::collections::CollectionExt;
1164 use mz_ore::task;
1165 use serde_json::json;
1166
1167 use crate::cache::PersistClientCache;
1168 use crate::tests::{all_ok, new_test_client};
1169 use crate::{PersistLocation, ShardId};
1170
1171 use super::*;
1172
1173 #[mz_persist_proc::test(tokio::test)]
1174 #[cfg_attr(miri, ignore)] async fn empty_batches(dyncfgs: ConfigUpdates) {
1176 let data = [
1177 (("1".to_owned(), "one".to_owned()), 1, 1),
1178 (("2".to_owned(), "two".to_owned()), 2, 1),
1179 (("3".to_owned(), "three".to_owned()), 3, 1),
1180 ];
1181
1182 let (mut write, _) = new_test_client(&dyncfgs)
1183 .await
1184 .expect_open::<String, String, u64, i64>(ShardId::new())
1185 .await;
1186 let blob = Arc::clone(&write.blob);
1187
1188 let mut upper = 3;
1190 write.expect_append(&data[..2], vec![0], vec![upper]).await;
1191
1192 let mut count_before = 0;
1194 blob.list_keys_and_metadata("", &mut |_| {
1195 count_before += 1;
1196 })
1197 .await
1198 .expect("list_keys failed");
1199 for _ in 0..5 {
1200 let new_upper = upper + 1;
1201 write.expect_compare_and_append(&[], upper, new_upper).await;
1202 upper = new_upper;
1203 }
1204 let mut count_after = 0;
1205 blob.list_keys_and_metadata("", &mut |_| {
1206 count_after += 1;
1207 })
1208 .await
1209 .expect("list_keys failed");
1210 assert_eq!(count_after, count_before);
1211 }
1212
1213 #[mz_persist_proc::test(tokio::test)]
1214 #[cfg_attr(miri, ignore)] async fn compare_and_append_batch_multi(dyncfgs: ConfigUpdates) {
1216 let data0 = vec![
1217 (("1".to_owned(), "one".to_owned()), 1, 1),
1218 (("2".to_owned(), "two".to_owned()), 2, 1),
1219 (("4".to_owned(), "four".to_owned()), 4, 1),
1220 ];
1221 let data1 = vec![
1222 (("1".to_owned(), "one".to_owned()), 1, 1),
1223 (("2".to_owned(), "two".to_owned()), 2, 1),
1224 (("3".to_owned(), "three".to_owned()), 3, 1),
1225 ];
1226
1227 let (mut write, mut read) = new_test_client(&dyncfgs)
1228 .await
1229 .expect_open::<String, String, u64, i64>(ShardId::new())
1230 .await;
1231
1232 let mut batch0 = write.expect_batch(&data0, 0, 5).await;
1233 let mut batch1 = write.expect_batch(&data1, 0, 4).await;
1234
1235 write
1236 .expect_compare_and_append_batch(&mut [&mut batch0, &mut batch1], 0, 4)
1237 .await;
1238
1239 let batch = write
1240 .machine
1241 .snapshot(&Antichain::from_elem(3))
1242 .await
1243 .expect("just wrote this")
1244 .into_element();
1245
1246 assert!(batch.runs().count() >= 2);
1247
1248 let expected = vec![
1249 (("1".to_owned(), "one".to_owned()), 1, 2),
1250 (("2".to_owned(), "two".to_owned()), 2, 2),
1251 (("3".to_owned(), "three".to_owned()), 3, 1),
1252 ];
1253 let mut actual = read.expect_snapshot_and_fetch(3).await;
1254 consolidate_updates(&mut actual);
1255 assert_eq!(actual, all_ok(&expected, 3));
1256 }
1257
1258 #[mz_ore::test]
1259 fn writer_id_human_readable_serde() {
1260 #[derive(Debug, Serialize, Deserialize)]
1261 struct Container {
1262 writer_id: WriterId,
1263 }
1264
1265 let id = WriterId::from_str("w00000000-1234-5678-0000-000000000000").expect("valid id");
1267 assert_eq!(
1268 id,
1269 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1270 .expect("deserializable")
1271 );
1272
1273 assert_eq!(
1275 id,
1276 serde_json::from_str("\"w00000000-1234-5678-0000-000000000000\"")
1277 .expect("deserializable")
1278 );
1279
1280 let json = json!({ "writer_id": id });
1282 assert_eq!(
1283 "{\"writer_id\":\"w00000000-1234-5678-0000-000000000000\"}",
1284 &json.to_string()
1285 );
1286 let container: Container = serde_json::from_value(json).expect("deserializable");
1287 assert_eq!(container.writer_id, id);
1288 }
1289
1290 #[mz_persist_proc::test(tokio::test)]
1291 #[cfg_attr(miri, ignore)] async fn hollow_batch_roundtrip(dyncfgs: ConfigUpdates) {
1293 let data = vec![
1294 (("1".to_owned(), "one".to_owned()), 1, 1),
1295 (("2".to_owned(), "two".to_owned()), 2, 1),
1296 (("3".to_owned(), "three".to_owned()), 3, 1),
1297 ];
1298
1299 let (mut write, mut read) = new_test_client(&dyncfgs)
1300 .await
1301 .expect_open::<String, String, u64, i64>(ShardId::new())
1302 .await;
1303
1304 let batch = write.expect_batch(&data, 0, 4).await;
1309 let hollow_batch = batch.into_transmittable_batch();
1310 let mut rehydrated_batch = write.batch_from_transmittable_batch(hollow_batch);
1311
1312 write
1313 .expect_compare_and_append_batch(&mut [&mut rehydrated_batch], 0, 4)
1314 .await;
1315
1316 let expected = vec![
1317 (("1".to_owned(), "one".to_owned()), 1, 1),
1318 (("2".to_owned(), "two".to_owned()), 2, 1),
1319 (("3".to_owned(), "three".to_owned()), 3, 1),
1320 ];
1321 let mut actual = read.expect_snapshot_and_fetch(3).await;
1322 consolidate_updates(&mut actual);
1323 assert_eq!(actual, all_ok(&expected, 3));
1324 }
1325
1326 #[mz_persist_proc::test(tokio::test)]
1327 #[cfg_attr(miri, ignore)] async fn wait_for_upper_past(dyncfgs: ConfigUpdates) {
1329 let client = new_test_client(&dyncfgs).await;
1330 let (mut write, _) = client.expect_open::<(), (), u64, i64>(ShardId::new()).await;
1331 let five = Antichain::from_elem(5);
1332
1333 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1335
1336 write
1338 .expect_compare_and_append(&[(((), ()), 1, 1)], 0, 5)
1339 .await;
1340 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1341
1342 write
1344 .expect_compare_and_append(&[(((), ()), 5, 1)], 5, 7)
1345 .await;
1346 assert_eq!(write.wait_for_upper_past(&five).now_or_never(), Some(()));
1347 assert_eq!(write.upper(), &Antichain::from_elem(7));
1348
1349 assert_eq!(
1352 write
1353 .wait_for_upper_past(&Antichain::from_elem(2))
1354 .now_or_never(),
1355 Some(())
1356 );
1357 assert_eq!(write.upper(), &Antichain::from_elem(7));
1358 }
1359
1360 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1361 #[cfg_attr(miri, ignore)] async fn fetch_recent_upper_linearized() {
1363 type Timestamp = u64;
1364 let max_upper = 1000;
1365
1366 let shard_id = ShardId::new();
1367 let mut clients = PersistClientCache::new_no_metrics();
1368 let upper_writer_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1369 let (mut upper_writer, _) = upper_writer_client
1370 .expect_open::<(), (), Timestamp, i64>(shard_id)
1371 .await;
1372 clients.clear_state_cache();
1375 let upper_reader_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1376 let (mut upper_reader, _) = upper_reader_client
1377 .expect_open::<(), (), Timestamp, i64>(shard_id)
1378 .await;
1379 let (tx, rx) = mpsc::channel();
1380
1381 let task = task::spawn(|| "upper-reader", async move {
1382 let mut upper = Timestamp::MIN;
1383
1384 while upper < max_upper {
1385 while let Ok(new_upper) = rx.try_recv() {
1386 upper = new_upper;
1387 }
1388
1389 let recent_upper = upper_reader
1390 .fetch_recent_upper()
1391 .await
1392 .as_option()
1393 .cloned()
1394 .expect("u64 is totally ordered and the shard is not finalized");
1395 assert!(
1396 recent_upper >= upper,
1397 "recent upper {recent_upper:?} is less than known upper {upper:?}"
1398 );
1399 }
1400 });
1401
1402 for upper in Timestamp::MIN..max_upper {
1403 let next_upper = upper + 1;
1404 upper_writer
1405 .expect_compare_and_append(&[], upper, next_upper)
1406 .await;
1407 tx.send(next_upper).expect("send failed");
1408 }
1409
1410 task.await;
1411 }
1412}