1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::marker::PhantomData;
16use std::mem;
17use std::sync::Arc;
18use std::time::Instant;
19
20use arrow::array::{Array, Int64Array};
21use bytes::Bytes;
22use differential_dataflow::difference::Monoid;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::stream::StreamExt;
26use futures_util::{FutureExt, stream};
27use mz_dyncfg::Config;
28use mz_ore::cast::CastFrom;
29use mz_ore::instrument;
30use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::Blob;
32use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
33use mz_persist_types::columnar::{ColumnDecoder, Schema};
34use mz_persist_types::parquet::{CompressionFormat, EncodingConfig};
35use mz_persist_types::part::{Part, PartBuilder};
36use mz_persist_types::schema::SchemaId;
37use mz_persist_types::stats::{
38 PartStats, TRUNCATE_LEN, TruncateBound, trim_to_budget, truncate_bytes,
39};
40use mz_persist_types::{Codec, Codec64};
41use mz_proto::RustType;
42use mz_timely_util::order::Reverse;
43use proptest_derive::Arbitrary;
44use semver::Version;
45use timely::PartialOrder;
46use timely::order::TotalOrder;
47use timely::progress::{Antichain, Timestamp};
48use tracing::{Instrument, debug_span, trace_span, warn};
49
50use crate::async_runtime::IsolatedRuntime;
51use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
52use crate::error::InvalidUsage;
53use crate::internal::compact::{CompactConfig, Compactor};
54use crate::internal::encoding::{
55 LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, Schemas,
56};
57use crate::internal::machine::retry_external;
58use crate::internal::merge::{MergeTree, Pending};
59use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
60use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
61use crate::internal::state::{
62 BatchPart, ENABLE_INCREMENTAL_COMPACTION, HollowBatch, HollowBatchPart, HollowRun,
63 HollowRunRef, ProtoInlineBatchPart, RunId, RunMeta, RunOrder, RunPart,
64};
65use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns};
66use crate::{PersistConfig, ShardId};
67
68include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs"));
69
70#[derive(Debug)]
76pub struct Batch<K, V, T, D> {
77 pub(crate) batch_delete_enabled: bool,
78 pub(crate) metrics: Arc<Metrics>,
79 pub(crate) shard_metrics: Arc<ShardMetrics>,
80
81 pub(crate) version: Version,
83
84 pub(crate) schemas: (Bytes, Bytes),
86
87 pub(crate) batch: HollowBatch<T>,
89
90 pub(crate) blob: Arc<dyn Blob>,
92
93 pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
96}
97
98impl<K, V, T, D> Drop for Batch<K, V, T, D> {
99 fn drop(&mut self) {
100 if self.batch.part_count() > 0 {
101 warn!(
102 "un-consumed Batch, with {} parts and dangling blob keys: {:?}",
103 self.batch.part_count(),
104 self.batch
105 .parts
106 .iter()
107 .map(|x| x.printable_name())
108 .collect::<Vec<_>>(),
109 );
110 }
111 }
112}
113
114impl<K, V, T, D> Batch<K, V, T, D>
115where
116 K: Debug + Codec,
117 V: Debug + Codec,
118 T: Timestamp + Lattice + Codec64,
119 D: Monoid + Codec64,
120{
121 pub(crate) fn new(
122 batch_delete_enabled: bool,
123 metrics: Arc<Metrics>,
124 blob: Arc<dyn Blob>,
125 shard_metrics: Arc<ShardMetrics>,
126 version: Version,
127 schemas: (Bytes, Bytes),
128 batch: HollowBatch<T>,
129 ) -> Self {
130 Self {
131 batch_delete_enabled,
132 metrics,
133 shard_metrics,
134 version,
135 schemas,
136 batch,
137 blob,
138 _phantom: PhantomData,
139 }
140 }
141
142 pub fn shard_id(&self) -> ShardId {
144 self.shard_metrics.shard_id
145 }
146
147 pub fn upper(&self) -> &Antichain<T> {
149 self.batch.desc.upper()
150 }
151
152 pub fn lower(&self) -> &Antichain<T> {
154 self.batch.desc.lower()
155 }
156
157 pub(crate) fn mark_consumed(&mut self) {
163 self.batch.parts.clear();
164 }
165
166 #[instrument(level = "debug", fields(shard = %self.shard_id()))]
169 pub async fn delete(mut self) {
170 if !self.batch_delete_enabled {
171 self.mark_consumed();
172 return;
173 }
174 let mut deletes = PartDeletes::default();
175 for part in self.batch.parts.drain(..) {
176 deletes.add(&part);
177 }
178 let () = deletes
179 .delete(
180 &*self.blob,
181 self.shard_id(),
182 usize::MAX,
183 &*self.metrics,
184 &*self.metrics.retries.external.batch_delete,
185 )
186 .await;
187 }
188
189 pub fn schemas(&self) -> impl Iterator<Item = SchemaId> + '_ {
191 self.batch.parts.iter().flat_map(|b| b.schema_id())
192 }
193
194 pub fn into_hollow_batch(mut self) -> HollowBatch<T> {
199 let ret = self.batch.clone();
200 self.mark_consumed();
201 ret
202 }
203
204 pub fn into_transmittable_batch(mut self) -> ProtoBatch {
213 let ret = ProtoBatch {
214 shard_id: self.shard_metrics.shard_id.into_proto(),
215 version: self.version.to_string(),
216 batch: Some(self.batch.into_proto()),
217 key_schema: self.schemas.0.clone(),
218 val_schema: self.schemas.1.clone(),
219 };
220 self.mark_consumed();
221 ret
222 }
223
224 pub(crate) async fn flush_to_blob(
225 &mut self,
226 cfg: &BatchBuilderConfig,
227 batch_metrics: &BatchWriteMetrics,
228 isolated_runtime: &Arc<IsolatedRuntime>,
229 write_schemas: &Schemas<K, V>,
230 ) {
231 let mut parts = Vec::new();
236 for (run_meta, run_parts) in self.batch.runs() {
237 for part in run_parts {
238 let (updates, ts_rewrite, schema_id) = match part {
239 RunPart::Single(BatchPart::Inline {
240 updates,
241 ts_rewrite,
242 schema_id,
243 deprecated_schema_id: _,
244 }) => (updates, ts_rewrite, schema_id),
245 other @ RunPart::Many(_) | other @ RunPart::Single(BatchPart::Hollow(_)) => {
246 parts.push(other.clone());
247 continue;
248 }
249 };
250 let updates = updates
251 .decode::<T>(&self.metrics.columnar)
252 .expect("valid inline part");
253 let diffs_sum = diffs_sum::<D>(updates.updates.diffs());
254 let mut write_schemas = write_schemas.clone();
255 write_schemas.id = *schema_id;
256
257 let write_span =
258 debug_span!("batch::flush_to_blob", shard = %self.shard_metrics.shard_id)
259 .or_current();
260 let handle = mz_ore::task::spawn(
261 || "batch::flush_to_blob",
262 BatchParts::write_hollow_part(
263 cfg.clone(),
264 Arc::clone(&self.blob),
265 Arc::clone(&self.metrics),
266 Arc::clone(&self.shard_metrics),
267 batch_metrics.clone(),
268 Arc::clone(isolated_runtime),
269 updates,
270 run_meta.order.unwrap_or(RunOrder::Unordered),
271 ts_rewrite.clone(),
272 D::encode(&diffs_sum),
273 write_schemas,
274 )
275 .instrument(write_span),
276 );
277 let part = handle.await;
278 parts.push(RunPart::Single(part));
279 }
280 }
281 self.batch.parts = parts;
282 }
283
284 pub fn encoded_size_bytes(&self) -> usize {
286 self.batch.encoded_size_bytes()
287 }
288}
289
290impl<K, V, T, D> Batch<K, V, T, D>
291where
292 K: Debug + Codec,
293 V: Debug + Codec,
294 T: Timestamp + Lattice + Codec64 + TotalOrder,
295 D: Monoid + Codec64,
296{
297 pub fn rewrite_ts(
331 &mut self,
332 frontier: &Antichain<T>,
333 new_upper: Antichain<T>,
334 ) -> Result<(), InvalidUsage<T>> {
335 self.batch
336 .rewrite_ts(frontier, new_upper)
337 .map_err(InvalidUsage::InvalidRewrite)
338 }
339}
340
341#[derive(Debug)]
343pub enum Added {
344 Record,
346 RecordAndParts,
349}
350
351#[derive(Debug, Clone)]
354pub struct BatchBuilderConfig {
355 writer_key: WriterKey,
356 pub(crate) blob_target_size: usize,
357 pub(crate) batch_delete_enabled: bool,
358 pub(crate) batch_builder_max_outstanding_parts: usize,
359 pub(crate) inline_writes_single_max_bytes: usize,
360 pub(crate) stats_collection_enabled: bool,
361 pub(crate) stats_budget: usize,
362 pub(crate) stats_untrimmable_columns: Arc<UntrimmableColumns>,
363 pub(crate) encoding_config: EncodingConfig,
364 pub(crate) preferred_order: RunOrder,
365 pub(crate) structured_key_lower_len: usize,
366 pub(crate) run_length_limit: usize,
367 pub(crate) enable_incremental_compaction: bool,
368 pub(crate) max_runs: Option<usize>,
372}
373
374pub(crate) const BATCH_DELETE_ENABLED: Config<bool> = Config::new(
376 "persist_batch_delete_enabled",
377 true,
378 "Whether to actually delete blobs when batch delete is called (Materialize).",
379);
380
381pub(crate) const ENCODING_ENABLE_DICTIONARY: Config<bool> = Config::new(
382 "persist_encoding_enable_dictionary",
383 true,
384 "A feature flag to enable dictionary encoding for Parquet data (Materialize).",
385);
386
387pub(crate) const ENCODING_COMPRESSION_FORMAT: Config<&'static str> = Config::new(
388 "persist_encoding_compression_format",
389 "none",
390 "A feature flag to enable compression of Parquet data (Materialize).",
391);
392
393pub(crate) const STRUCTURED_KEY_LOWER_LEN: Config<usize> = Config::new(
394 "persist_batch_structured_key_lower_len",
395 256,
396 "The maximum size in proto bytes of any structured key-lower metadata to preserve. \
397 (If we're unable to fit the lower in budget, or the budget is zero, no metadata is kept.)",
398);
399
400pub(crate) const MAX_RUN_LEN: Config<usize> = Config::new(
401 "persist_batch_max_run_len",
402 usize::MAX,
403 "The maximum length a run can have before it will be spilled as a hollow run \
404 into the blob store.",
405);
406
407pub(crate) const MAX_RUNS: Config<usize> = Config::new(
408 "persist_batch_max_runs",
409 1,
410 "The maximum number of runs a batch builder should generate for user batches. \
411 (Compaction outputs always generate a single run.) \
412 The minimum value is 2; below this, compaction is disabled.",
413);
414
415pub(crate) const BLOB_TARGET_SIZE: Config<usize> = Config::new(
422 "persist_blob_target_size",
423 128 * MiB,
424 "A target maximum size of persist blob payloads in bytes (Materialize).",
425);
426
427pub(crate) const INLINE_WRITES_SINGLE_MAX_BYTES: Config<usize> = Config::new(
428 "persist_inline_writes_single_max_bytes",
429 4096,
430 "The (exclusive) maximum size of a write that persist will inline in metadata.",
431);
432
433pub(crate) const INLINE_WRITES_TOTAL_MAX_BYTES: Config<usize> = Config::new(
434 "persist_inline_writes_total_max_bytes",
435 1 * MiB,
436 "\
437 The (exclusive) maximum total size of inline writes in metadata before \
438 persist will backpressure them by flushing out to s3.",
439);
440
441impl BatchBuilderConfig {
442 pub fn new(value: &PersistConfig, _shard_id: ShardId) -> Self {
444 let writer_key = WriterKey::for_version(&value.build_version);
445
446 let preferred_order = RunOrder::Structured;
447
448 BatchBuilderConfig {
449 writer_key,
450 blob_target_size: BLOB_TARGET_SIZE.get(value).clamp(1, usize::MAX),
451 batch_delete_enabled: BATCH_DELETE_ENABLED.get(value),
452 batch_builder_max_outstanding_parts: BATCH_BUILDER_MAX_OUTSTANDING_PARTS.get(value),
453 inline_writes_single_max_bytes: INLINE_WRITES_SINGLE_MAX_BYTES.get(value),
454 stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value),
455 stats_budget: STATS_BUDGET_BYTES.get(value),
456 stats_untrimmable_columns: Arc::new(untrimmable_columns(value)),
457 encoding_config: EncodingConfig {
458 use_dictionary: ENCODING_ENABLE_DICTIONARY.get(value),
459 compression: CompressionFormat::from_str(&ENCODING_COMPRESSION_FORMAT.get(value)),
460 },
461 preferred_order,
462 structured_key_lower_len: STRUCTURED_KEY_LOWER_LEN.get(value),
463 run_length_limit: MAX_RUN_LEN.get(value).clamp(2, usize::MAX),
464 max_runs: match MAX_RUNS.get(value) {
465 limit @ 2.. => Some(limit),
466 _ => None,
467 },
468 enable_incremental_compaction: ENABLE_INCREMENTAL_COMPACTION.get(value),
469 }
470 }
471}
472
473#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Arbitrary)]
476pub(crate) struct UntrimmableColumns {
477 pub equals: Vec<Cow<'static, str>>,
479 pub prefixes: Vec<Cow<'static, str>>,
481 pub suffixes: Vec<Cow<'static, str>>,
483}
484
485impl UntrimmableColumns {
486 pub(crate) fn should_retain(&self, name: &str) -> bool {
487 let name_lower = name.to_lowercase();
490 for s in &self.equals {
491 if *s == name_lower {
492 return true;
493 }
494 }
495 for s in &self.prefixes {
496 if name_lower.starts_with(s.as_ref()) {
497 return true;
498 }
499 }
500 for s in &self.suffixes {
501 if name_lower.ends_with(s.as_ref()) {
502 return true;
503 }
504 }
505 false
506 }
507}
508
509#[derive(Debug)]
512pub struct BatchBuilder<K, V, T, D>
513where
514 K: Codec,
515 V: Codec,
516 T: Timestamp + Lattice + Codec64,
517{
518 inline_desc: Description<T>,
519 inclusive_upper: Antichain<Reverse<T>>,
520
521 records_builder: PartBuilder<K, K::Schema, V, V::Schema>,
522 pub(crate) builder: BatchBuilderInternal<K, V, T, D>,
523}
524
525impl<K, V, T, D> BatchBuilder<K, V, T, D>
526where
527 K: Debug + Codec,
528 V: Debug + Codec,
529 T: Timestamp + Lattice + Codec64,
530 D: Monoid + Codec64,
531{
532 pub(crate) fn new(
533 builder: BatchBuilderInternal<K, V, T, D>,
534 inline_desc: Description<T>,
535 ) -> Self {
536 let records_builder = PartBuilder::new(
537 builder.write_schemas.key.as_ref(),
538 builder.write_schemas.val.as_ref(),
539 );
540 Self {
541 inline_desc,
542 inclusive_upper: Antichain::new(),
543 records_builder,
544 builder,
545 }
546 }
547
548 pub async fn finish(
553 mut self,
554 registered_upper: Antichain<T>,
555 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
556 if PartialOrder::less_than(®istered_upper, self.inline_desc.lower()) {
557 return Err(InvalidUsage::InvalidBounds {
558 lower: self.inline_desc.lower().clone(),
559 upper: registered_upper,
560 });
561 }
562
563 if PartialOrder::less_equal(self.inline_desc.since(), self.inline_desc.lower()) {
568 for ts in self.inclusive_upper.iter() {
569 if registered_upper.less_equal(&ts.0) {
570 return Err(InvalidUsage::UpdateBeyondUpper {
571 ts: ts.0.clone(),
572 expected_upper: registered_upper.clone(),
573 });
574 }
575 }
576 }
577
578 let updates = self.records_builder.finish();
579 self.builder
580 .flush_part(self.inline_desc.clone(), updates)
581 .await;
582
583 self.builder
584 .finish(Description::new(
585 self.inline_desc.lower().clone(),
586 registered_upper,
587 self.inline_desc.since().clone(),
588 ))
589 .await
590 }
591
592 pub async fn add(
597 &mut self,
598 key: &K,
599 val: &V,
600 ts: &T,
601 diff: &D,
602 ) -> Result<Added, InvalidUsage<T>> {
603 if !self.inline_desc.lower().less_equal(ts) {
604 return Err(InvalidUsage::UpdateNotBeyondLower {
605 ts: ts.clone(),
606 lower: self.inline_desc.lower().clone(),
607 });
608 }
609 self.inclusive_upper.insert(Reverse(ts.clone()));
610
611 let added = {
612 self.records_builder
613 .push(key, val, ts.clone(), diff.clone());
614 if self.records_builder.goodbytes() >= self.builder.parts.cfg.blob_target_size {
615 let part = self.records_builder.finish_and_replace(
616 self.builder.write_schemas.key.as_ref(),
617 self.builder.write_schemas.val.as_ref(),
618 );
619 Some(part)
620 } else {
621 None
622 }
623 };
624
625 let added = if let Some(full_batch) = added {
626 self.builder
627 .flush_part(self.inline_desc.clone(), full_batch)
628 .await;
629 Added::RecordAndParts
630 } else {
631 Added::Record
632 };
633 Ok(added)
634 }
635}
636
637#[derive(Debug)]
638pub(crate) struct BatchBuilderInternal<K, V, T, D>
639where
640 K: Codec,
641 V: Codec,
642 T: Timestamp + Lattice + Codec64,
643{
644 shard_id: ShardId,
645 version: Version,
646 blob: Arc<dyn Blob>,
647 metrics: Arc<Metrics>,
648
649 write_schemas: Schemas<K, V>,
650 parts: BatchParts<T>,
651
652 _phantom: PhantomData<fn(K, V, T, D)>,
655}
656
657impl<K, V, T, D> BatchBuilderInternal<K, V, T, D>
658where
659 K: Debug + Codec,
660 V: Debug + Codec,
661 T: Timestamp + Lattice + Codec64,
662 D: Monoid + Codec64,
663{
664 pub(crate) fn new(
665 _cfg: BatchBuilderConfig,
666 parts: BatchParts<T>,
667 metrics: Arc<Metrics>,
668 write_schemas: Schemas<K, V>,
669 blob: Arc<dyn Blob>,
670 shard_id: ShardId,
671 version: Version,
672 ) -> Self {
673 Self {
674 blob,
675 metrics,
676 write_schemas,
677 parts,
678 shard_id,
679 version,
680 _phantom: PhantomData,
681 }
682 }
683
684 #[instrument(level = "debug", name = "batch::finish", fields(shard = %self.shard_id))]
689 pub async fn finish(
690 self,
691 registered_desc: Description<T>,
692 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
693 let write_run_ids = self.parts.cfg.enable_incremental_compaction;
694 let batch_delete_enabled = self.parts.cfg.batch_delete_enabled;
695 let shard_metrics = Arc::clone(&self.parts.shard_metrics);
696 let runs = self.parts.finish().await;
697
698 let mut run_parts = vec![];
699 let mut run_splits = vec![];
700 let mut run_meta = vec![];
701 let total_updates = runs
702 .iter()
703 .map(|(_, _, num_updates)| num_updates)
704 .sum::<usize>();
705 for (order, parts, num_updates) in runs {
706 if parts.is_empty() {
707 continue;
708 }
709 if run_parts.len() != 0 {
710 run_splits.push(run_parts.len());
711 }
712 run_meta.push(RunMeta {
713 order: Some(order),
714 schema: self.write_schemas.id,
715 deprecated_schema: None,
717 id: if write_run_ids {
718 Some(RunId::new())
719 } else {
720 None
721 },
722 len: if write_run_ids {
723 Some(num_updates)
724 } else {
725 None
726 },
727 meta: MetadataMap::default(),
728 });
729 run_parts.extend(parts);
730 }
731 let desc = registered_desc;
732
733 let batch = Batch::new(
734 batch_delete_enabled,
735 Arc::clone(&self.metrics),
736 self.blob,
737 shard_metrics,
738 self.version,
739 (
740 K::encode_schema(&*self.write_schemas.key),
741 V::encode_schema(&*self.write_schemas.val),
742 ),
743 HollowBatch::new(desc, run_parts, total_updates, run_meta, run_splits),
744 );
745
746 Ok(batch)
747 }
748
749 pub async fn flush_part(&mut self, part_desc: Description<T>, columnar: Part) {
755 let num_updates = columnar.len();
756 if num_updates == 0 {
757 return;
758 }
759 let diffs_sum = diffs_sum::<D>(&columnar.diff);
760
761 let start = Instant::now();
762 self.parts
763 .write(&self.write_schemas, part_desc, columnar, diffs_sum)
764 .await;
765 self.metrics
766 .compaction
767 .batch
768 .step_part_writing
769 .inc_by(start.elapsed().as_secs_f64());
770 }
771}
772
773#[derive(Debug, Clone)]
774pub(crate) struct RunWithMeta<T> {
775 pub parts: Vec<RunPart<T>>,
776 pub num_updates: usize,
777}
778
779impl<T> RunWithMeta<T> {
780 pub fn new(parts: Vec<RunPart<T>>, num_updates: usize) -> Self {
781 Self { parts, num_updates }
782 }
783
784 pub fn single(part: RunPart<T>, num_updates: usize) -> Self {
785 Self {
786 parts: vec![part],
787 num_updates,
788 }
789 }
790}
791
792#[derive(Debug)]
793enum WritingRuns<T> {
794 Ordered(RunOrder, MergeTree<Pending<RunWithMeta<T>>>),
798 Compacting(MergeTree<(RunOrder, Pending<RunWithMeta<T>>)>),
801}
802
803#[derive(Debug)]
806pub(crate) struct BatchParts<T> {
807 cfg: BatchBuilderConfig,
808 metrics: Arc<Metrics>,
809 shard_metrics: Arc<ShardMetrics>,
810 shard_id: ShardId,
811 blob: Arc<dyn Blob>,
812 isolated_runtime: Arc<IsolatedRuntime>,
813 next_index: u64,
814 writing_runs: WritingRuns<T>,
815 batch_metrics: BatchWriteMetrics,
816}
817
818impl<T: Timestamp + Codec64> BatchParts<T> {
819 pub(crate) fn new_compacting<K, V, D>(
820 cfg: CompactConfig,
821 desc: Description<T>,
822 runs_per_compaction: usize,
823 metrics: Arc<Metrics>,
824 shard_metrics: Arc<ShardMetrics>,
825 shard_id: ShardId,
826 blob: Arc<dyn Blob>,
827 isolated_runtime: Arc<IsolatedRuntime>,
828 batch_metrics: &BatchWriteMetrics,
829 schemas: Schemas<K, V>,
830 ) -> Self
831 where
832 K: Codec + Debug,
833 V: Codec + Debug,
834 T: Lattice + Send + Sync,
835 D: Monoid + Ord + Codec64 + Send + Sync,
836 {
837 let writing_runs = {
838 let cfg = cfg.clone();
839 let blob = Arc::clone(&blob);
840 let metrics = Arc::clone(&metrics);
841 let shard_metrics = Arc::clone(&shard_metrics);
842 let isolated_runtime = Arc::clone(&isolated_runtime);
843 let runs_per_compaction = runs_per_compaction.clamp(2, 1024);
845
846 let merge_fn = move |parts: Vec<(RunOrder, Pending<RunWithMeta<T>>)>| {
847 let blob = Arc::clone(&blob);
848 let metrics = Arc::clone(&metrics);
849 let shard_metrics = Arc::clone(&shard_metrics);
850 let cfg = cfg.clone();
851 let isolated_runtime = Arc::clone(&isolated_runtime);
852 let write_schemas = schemas.clone();
853 let compact_desc = desc.clone();
854 let handle = mz_ore::task::spawn(
855 || "batch::compact_runs",
856 async move {
857 let runs: Vec<_> = stream::iter(parts)
858 .then(|(order, parts)| async move {
859 let completed_run = parts.into_result().await;
860 (
861 RunMeta {
862 order: Some(order),
863 schema: schemas.id,
864 deprecated_schema: None,
867 id: if cfg.batch.enable_incremental_compaction {
868 Some(RunId::new())
869 } else {
870 None
871 },
872 len: if cfg.batch.enable_incremental_compaction {
873 Some(completed_run.num_updates)
874 } else {
875 None
876 },
877 meta: MetadataMap::default(),
878 },
879 completed_run.parts,
880 )
881 })
882 .collect()
883 .await;
884
885 let run_refs: Vec<_> = runs
886 .iter()
887 .map(|(meta, run)| (&compact_desc, meta, run.as_slice()))
888 .collect();
889
890 let output_batch = Compactor::<K, V, T, D>::compact_runs(
891 &cfg,
892 &shard_id,
893 &compact_desc,
894 run_refs,
895 blob,
896 metrics,
897 shard_metrics,
898 isolated_runtime,
899 write_schemas,
900 )
901 .await
902 .expect("successful compaction");
903
904 assert_eq!(
905 output_batch.run_meta.len(),
906 1,
907 "compaction is guaranteed to emit a single run"
908 );
909 let total_compacted_updates: usize = output_batch.len;
910
911 RunWithMeta::new(output_batch.parts, total_compacted_updates)
912 }
913 .instrument(debug_span!("batch::compact_runs")),
914 );
915 (RunOrder::Structured, Pending::new(handle))
916 };
917 WritingRuns::Compacting(MergeTree::new(runs_per_compaction, merge_fn))
918 };
919 BatchParts {
920 cfg: cfg.batch,
921 metrics,
922 shard_metrics,
923 shard_id,
924 blob,
925 isolated_runtime,
926 next_index: 0,
927 writing_runs,
928 batch_metrics: batch_metrics.clone(),
929 }
930 }
931
932 pub(crate) fn new_ordered<D: Monoid + Codec64>(
933 cfg: BatchBuilderConfig,
934 order: RunOrder,
935 metrics: Arc<Metrics>,
936 shard_metrics: Arc<ShardMetrics>,
937 shard_id: ShardId,
938 blob: Arc<dyn Blob>,
939 isolated_runtime: Arc<IsolatedRuntime>,
940 batch_metrics: &BatchWriteMetrics,
941 ) -> Self {
942 let writing_runs = {
943 let cfg = cfg.clone();
944 let blob = Arc::clone(&blob);
945 let metrics = Arc::clone(&metrics);
946 let writer_key = cfg.writer_key.clone();
947 let run_length_limit = (order == RunOrder::Unordered)
950 .then_some(usize::MAX)
951 .unwrap_or(cfg.run_length_limit);
952 let merge_fn = move |parts: Vec<Pending<RunWithMeta<T>>>| {
953 let blob = Arc::clone(&blob);
954 let writer_key = writer_key.clone();
955 let metrics = Arc::clone(&metrics);
956 let handle = mz_ore::task::spawn(
957 || "batch::spill_run",
958 async move {
959 let completed_runs: Vec<RunWithMeta<T>> = stream::iter(parts)
960 .then(|p| p.into_result())
961 .collect()
962 .await;
963
964 let mut all_run_parts = Vec::new();
965 let mut total_updates = 0;
966
967 for completed_run in completed_runs {
968 all_run_parts.extend(completed_run.parts);
969 total_updates += completed_run.num_updates;
970 }
971
972 let run_ref = HollowRunRef::set::<D>(
973 shard_id,
974 blob.as_ref(),
975 &writer_key,
976 HollowRun {
977 parts: all_run_parts,
978 },
979 &*metrics,
980 )
981 .await;
982
983 RunWithMeta::single(RunPart::Many(run_ref), total_updates)
984 }
985 .instrument(debug_span!("batch::spill_run")),
986 );
987 Pending::new(handle)
988 };
989 WritingRuns::Ordered(order, MergeTree::new(run_length_limit, merge_fn))
990 };
991 BatchParts {
992 cfg,
993 metrics,
994 shard_metrics,
995 shard_id,
996 blob,
997 isolated_runtime,
998 next_index: 0,
999 writing_runs,
1000 batch_metrics: batch_metrics.clone(),
1001 }
1002 }
1003
1004 pub(crate) fn expected_order(&self) -> RunOrder {
1005 match self.writing_runs {
1006 WritingRuns::Ordered(order, _) => order,
1007 WritingRuns::Compacting(_) => RunOrder::Unordered,
1008 }
1009 }
1010
1011 pub(crate) async fn write<K: Codec, V: Codec, D: Codec64>(
1012 &mut self,
1013 write_schemas: &Schemas<K, V>,
1014 desc: Description<T>,
1015 updates: Part,
1016 diffs_sum: D,
1017 ) {
1018 let batch_metrics = self.batch_metrics.clone();
1019 let index = self.next_index;
1020 self.next_index += 1;
1021 let num_updates = updates.len();
1022 let ts_rewrite = None;
1023 let schema_id = write_schemas.id;
1024
1025 let inline_threshold = self.cfg.inline_writes_single_max_bytes;
1028
1029 let updates = BlobTraceUpdates::from_part(updates);
1030 let (name, write_future) = if updates.goodbytes() < inline_threshold {
1031 let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current();
1032 (
1033 "batch::inline_part",
1034 async move {
1035 let start = Instant::now();
1036 let updates = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
1037 desc: Some(desc.into_proto()),
1038 index: index.into_proto(),
1039 updates: Some(updates.into_proto()),
1040 });
1041 batch_metrics
1042 .step_inline
1043 .inc_by(start.elapsed().as_secs_f64());
1044
1045 RunWithMeta::single(
1046 RunPart::Single(BatchPart::Inline {
1047 updates,
1048 ts_rewrite,
1049 schema_id,
1050 deprecated_schema_id: None,
1052 }),
1053 num_updates,
1054 )
1055 }
1056 .instrument(span)
1057 .boxed(),
1058 )
1059 } else {
1060 let part = BlobTraceBatchPart {
1061 desc,
1062 updates,
1063 index,
1064 };
1065 let cfg = self.cfg.clone();
1066 let blob = Arc::clone(&self.blob);
1067 let metrics = Arc::clone(&self.metrics);
1068 let shard_metrics = Arc::clone(&self.shard_metrics);
1069 let isolated_runtime = Arc::clone(&self.isolated_runtime);
1070 let expected_order = self.expected_order();
1071 let encoded_diffs_sum = D::encode(&diffs_sum);
1072 let write_schemas_clone = write_schemas.clone();
1073 let write_span =
1074 debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current();
1075 (
1076 "batch::write_part",
1077 async move {
1078 let part = BatchParts::write_hollow_part(
1079 cfg,
1080 blob,
1081 metrics,
1082 shard_metrics,
1083 batch_metrics,
1084 isolated_runtime,
1085 part,
1086 expected_order,
1087 ts_rewrite,
1088 encoded_diffs_sum,
1089 write_schemas_clone,
1090 )
1091 .await;
1092 RunWithMeta::single(RunPart::Single(part), num_updates)
1093 }
1094 .instrument(write_span)
1095 .boxed(),
1096 )
1097 };
1098
1099 match &mut self.writing_runs {
1100 WritingRuns::Ordered(_order, run) => {
1101 let part = Pending::new(mz_ore::task::spawn(|| name, write_future));
1102 run.push(part);
1103
1104 for part in run
1107 .iter_mut()
1108 .rev()
1109 .skip(self.cfg.batch_builder_max_outstanding_parts)
1110 .take_while(|p| !p.is_finished())
1111 {
1112 self.batch_metrics.write_stalls.inc();
1113 part.block_until_ready().await;
1114 }
1115 }
1116 WritingRuns::Compacting(batches) => {
1117 let run = Pending::Writing(mz_ore::task::spawn(|| name, write_future));
1118 batches.push((RunOrder::Unordered, run));
1119
1120 let mut part_budget = self.cfg.batch_builder_max_outstanding_parts;
1123 let mut compaction_budget = 1;
1124 for (_, part) in batches
1125 .iter_mut()
1126 .rev()
1127 .skip_while(|(order, _)| match order {
1128 RunOrder::Unordered if part_budget > 0 => {
1129 part_budget -= 1;
1130 true
1131 }
1132 RunOrder::Structured | RunOrder::Codec if compaction_budget > 0 => {
1133 compaction_budget -= 1;
1134 true
1135 }
1136 _ => false,
1137 })
1138 .take_while(|(_, p)| !p.is_finished())
1139 {
1140 self.batch_metrics.write_stalls.inc();
1141 part.block_until_ready().await;
1142 }
1143 }
1144 }
1145 }
1146
1147 async fn write_hollow_part<K: Codec, V: Codec>(
1148 cfg: BatchBuilderConfig,
1149 blob: Arc<dyn Blob>,
1150 metrics: Arc<Metrics>,
1151 shard_metrics: Arc<ShardMetrics>,
1152 batch_metrics: BatchWriteMetrics,
1153 isolated_runtime: Arc<IsolatedRuntime>,
1154 mut updates: BlobTraceBatchPart<T>,
1155 run_order: RunOrder,
1156 ts_rewrite: Option<Antichain<T>>,
1157 diffs_sum: [u8; 8],
1158 write_schemas: Schemas<K, V>,
1159 ) -> BatchPart<T> {
1160 let partial_key = PartialBatchKey::new(&cfg.writer_key, &PartId::new());
1161 let key = partial_key.complete(&shard_metrics.shard_id);
1162 let goodbytes = updates.updates.goodbytes();
1163 let metrics_ = Arc::clone(&metrics);
1164 let schema_id = write_schemas.id;
1165
1166 let (stats, key_lower, structured_key_lower, (buf, encode_time)) = isolated_runtime
1167 .spawn_named(|| "batch::encode_part", async move {
1168 let stats = metrics_.columnar.arrow().measure_part_build(|| {
1170 let stats = if cfg.stats_collection_enabled {
1171 let ext = updates.updates.get_or_make_structured::<K, V>(
1172 write_schemas.key.as_ref(),
1173 write_schemas.val.as_ref(),
1174 );
1175
1176 let key_stats = write_schemas
1177 .key
1178 .decoder_any(ext.key.as_ref())
1179 .expect("decoding just-encoded data")
1180 .stats();
1181
1182 let part_stats = PartStats { key: key_stats };
1183
1184 let trimmed_start = Instant::now();
1186 let mut trimmed_bytes = 0;
1187 let trimmed_stats = LazyPartStats::encode(&part_stats, |s| {
1188 trimmed_bytes = trim_to_budget(s, cfg.stats_budget, |s| {
1189 cfg.stats_untrimmable_columns.should_retain(s)
1190 })
1191 });
1192 let trimmed_duration = trimmed_start.elapsed();
1193 Some((trimmed_stats, trimmed_duration, trimmed_bytes))
1194 } else {
1195 None
1196 };
1197
1198 updates.updates = updates.updates.as_structured::<K, V>(
1200 write_schemas.key.as_ref(),
1201 write_schemas.val.as_ref(),
1202 );
1203
1204 stats
1205 });
1206
1207 let key_lower = if let Some(records) = updates.updates.records() {
1208 let key_bytes = records.keys();
1209 if key_bytes.is_empty() {
1210 &[]
1211 } else if run_order == RunOrder::Codec {
1212 key_bytes.value(0)
1213 } else {
1214 ::arrow::compute::min_binary(key_bytes).expect("min of nonempty array")
1215 }
1216 } else {
1217 &[]
1218 };
1219 let key_lower = truncate_bytes(key_lower, TRUNCATE_LEN, TruncateBound::Lower)
1220 .expect("lower bound always exists");
1221
1222 let structured_key_lower = if cfg.structured_key_lower_len > 0 {
1223 updates.updates.structured().and_then(|ext| {
1224 let min_key = if run_order == RunOrder::Structured {
1225 0
1226 } else {
1227 let ord = ArrayOrd::new(ext.key.as_ref());
1228 (0..ext.key.len())
1229 .min_by_key(|i| ord.at(*i))
1230 .expect("non-empty batch")
1231 };
1232 let lower = ArrayBound::new(Arc::clone(&ext.key), min_key)
1233 .to_proto_lower(cfg.structured_key_lower_len);
1234 if lower.is_none() {
1235 batch_metrics.key_lower_too_big.inc()
1236 }
1237 lower.map(|proto| LazyProto::from(&proto))
1238 })
1239 } else {
1240 None
1241 };
1242
1243 let encode_start = Instant::now();
1244 let mut buf = Vec::new();
1245 updates.encode(&mut buf, &metrics_.columnar, &cfg.encoding_config);
1246
1247 drop(updates);
1249 (
1250 stats,
1251 key_lower,
1252 structured_key_lower,
1253 (Bytes::from(buf), encode_start.elapsed()),
1254 )
1255 })
1256 .instrument(debug_span!("batch::encode_part"))
1257 .await;
1258 metrics.codecs.batch.encode_count.inc();
1260 metrics
1261 .codecs
1262 .batch
1263 .encode_seconds
1264 .inc_by(encode_time.as_secs_f64());
1265
1266 let start = Instant::now();
1267 let payload_len = buf.len();
1268 let () = retry_external(&metrics.retries.external.batch_set, || async {
1269 shard_metrics.blob_sets.inc();
1270 blob.set(&key, Bytes::clone(&buf)).await
1271 })
1272 .instrument(trace_span!("batch::set", payload_len))
1273 .await;
1274 batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64());
1275 batch_metrics.bytes.inc_by(u64::cast_from(payload_len));
1276 batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes));
1277 match run_order {
1278 RunOrder::Unordered => batch_metrics.unordered.inc(),
1279 RunOrder::Codec => batch_metrics.codec_order.inc(),
1280 RunOrder::Structured => batch_metrics.structured_order.inc(),
1281 }
1282 let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| {
1283 batch_metrics
1284 .step_stats
1285 .inc_by(stats_step_timing.as_secs_f64());
1286 if trimmed_bytes > 0 {
1287 metrics.pushdown.parts_stats_trimmed_count.inc();
1288 metrics
1289 .pushdown
1290 .parts_stats_trimmed_bytes
1291 .inc_by(u64::cast_from(trimmed_bytes));
1292 }
1293 stats
1294 });
1295
1296 let meta = MetadataMap::default();
1297 BatchPart::Hollow(HollowBatchPart {
1298 key: partial_key,
1299 meta,
1300 encoded_size_bytes: payload_len,
1301 key_lower,
1302 structured_key_lower,
1303 stats,
1304 ts_rewrite,
1305 diffs_sum: Some(diffs_sum),
1306 format: Some(BatchColumnarFormat::Structured),
1307 schema_id,
1308 deprecated_schema_id: None,
1310 })
1311 }
1312
1313 #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))]
1314 pub(crate) async fn finish(self) -> Vec<(RunOrder, Vec<RunPart<T>>, usize)> {
1315 match self.writing_runs {
1316 WritingRuns::Ordered(RunOrder::Unordered, run) => {
1317 let completed_runs = run.finish();
1318 let mut output = Vec::with_capacity(completed_runs.len());
1319 for completed_run in completed_runs {
1320 let completed_run = completed_run.into_result().await;
1321 for part in completed_run.parts {
1323 output.push((RunOrder::Unordered, vec![part], completed_run.num_updates));
1324 }
1325 }
1326 output
1327 }
1328 WritingRuns::Ordered(order, run) => {
1329 let completed_runs = run.finish();
1330 let mut all_parts = Vec::new();
1331 let mut all_update_counts = 0;
1332 for completed_run in completed_runs {
1333 let completed_run = completed_run.into_result().await;
1334 all_parts.extend(completed_run.parts);
1335 all_update_counts += completed_run.num_updates;
1336 }
1337 vec![(order, all_parts, all_update_counts)]
1338 }
1339 WritingRuns::Compacting(batches) => {
1340 let runs = batches.finish();
1341 let mut output = Vec::new();
1342 for (order, run) in runs {
1343 let completed_run = run.into_result().await;
1344 output.push((order, completed_run.parts, completed_run.num_updates));
1345 }
1346 output
1347 }
1348 }
1349 }
1350}
1351
1352pub(crate) fn validate_truncate_batch<T: Timestamp>(
1353 batch: &HollowBatch<T>,
1354 truncate: &Description<T>,
1355 any_batch_rewrite: bool,
1356 validate_part_bounds_on_write: bool,
1357) -> Result<(), InvalidUsage<T>> {
1358 if any_batch_rewrite {
1361 if truncate.upper() != batch.desc.upper() {
1366 return Err(InvalidUsage::InvalidRewrite(format!(
1367 "rewritten batch might have data past {:?} up to {:?}",
1368 truncate.upper().elements(),
1369 batch.desc.upper().elements(),
1370 )));
1371 }
1372 for part in batch.parts.iter() {
1375 let part_lower_bound = part.ts_rewrite().unwrap_or_else(|| batch.desc.lower());
1376 if !PartialOrder::less_equal(truncate.lower(), part_lower_bound) {
1377 return Err(InvalidUsage::InvalidRewrite(format!(
1378 "rewritten batch might have data below {:?} at {:?}",
1379 truncate.lower().elements(),
1380 part_lower_bound.elements(),
1381 )));
1382 }
1383 }
1384 }
1385
1386 if !validate_part_bounds_on_write {
1387 return Ok(());
1388 }
1389
1390 let batch = &batch.desc;
1391 if !PartialOrder::less_equal(batch.lower(), truncate.lower())
1392 || PartialOrder::less_than(batch.upper(), truncate.upper())
1393 {
1394 return Err(InvalidUsage::InvalidBatchBounds {
1395 batch_lower: batch.lower().clone(),
1396 batch_upper: batch.upper().clone(),
1397 append_lower: truncate.lower().clone(),
1398 append_upper: truncate.upper().clone(),
1399 });
1400 }
1401
1402 Ok(())
1403}
1404
1405#[derive(Debug)]
1406pub(crate) struct PartDeletes<T> {
1407 blob_keys: BTreeSet<PartialBatchKey>,
1409 hollow_runs: BTreeMap<PartialBatchKey, HollowRunRef<T>>,
1411}
1412
1413impl<T> Default for PartDeletes<T> {
1414 fn default() -> Self {
1415 Self {
1416 blob_keys: Default::default(),
1417 hollow_runs: Default::default(),
1418 }
1419 }
1420}
1421
1422impl<T: Timestamp> PartDeletes<T> {
1423 pub fn add(&mut self, part: &RunPart<T>) -> bool {
1426 match part {
1427 RunPart::Many(r) => self.hollow_runs.insert(r.key.clone(), r.clone()).is_none(),
1428 RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.insert(x.key.clone()),
1429 RunPart::Single(BatchPart::Inline { .. }) => {
1430 true
1432 }
1433 }
1434 }
1435
1436 pub fn contains(&self, part: &RunPart<T>) -> bool {
1437 match part {
1438 RunPart::Many(r) => self.hollow_runs.contains_key(&r.key),
1439 RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.contains(&x.key),
1440 RunPart::Single(BatchPart::Inline { .. }) => false,
1441 }
1442 }
1443
1444 pub fn is_empty(&self) -> bool {
1445 self.len() == 0
1446 }
1447
1448 pub fn len(&self) -> usize {
1449 match self {
1450 Self {
1451 blob_keys,
1452 hollow_runs,
1453 } => blob_keys.len() + hollow_runs.len(),
1454 }
1455 }
1456
1457 pub async fn delete(
1458 mut self,
1459 blob: &dyn Blob,
1460 shard_id: ShardId,
1461 concurrency: usize,
1462 metrics: &Metrics,
1463 delete_metrics: &RetryMetrics,
1464 ) where
1465 T: Codec64,
1466 {
1467 loop {
1468 let () = stream::iter(mem::take(&mut self.blob_keys))
1469 .map(|key| {
1470 let key = key.complete(&shard_id);
1471 async move {
1472 retry_external(delete_metrics, || blob.delete(&key)).await;
1473 }
1474 })
1475 .buffer_unordered(concurrency)
1476 .collect()
1477 .await;
1478
1479 let Some((run_key, run_ref)) = self.hollow_runs.pop_first() else {
1480 break;
1481 };
1482
1483 if let Some(run) = run_ref.get(shard_id, blob, metrics).await {
1484 for part in &run.parts {
1486 self.add(part);
1487 }
1488 self.blob_keys.insert(run_key);
1489 };
1490 }
1491 }
1492}
1493
1494fn diffs_sum<D: Monoid + Codec64>(updates: &Int64Array) -> D {
1496 let mut sum = D::zero();
1497 for d in updates.values().iter() {
1498 let d = D::decode(d.to_le_bytes());
1499 sum.plus_equals(&d);
1500 }
1501 sum
1502}
1503
1504#[cfg(test)]
1505mod tests {
1506 use mz_dyncfg::ConfigUpdates;
1507
1508 use super::*;
1509 use crate::PersistLocation;
1510 use crate::cache::PersistClientCache;
1511 use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
1512 use crate::internal::paths::{BlobKey, PartialBlobKey};
1513 use crate::tests::{all_ok, new_test_client};
1514
1515 #[mz_ore::test(tokio::test)]
1516 #[cfg_attr(miri, ignore)] async fn batch_builder_flushing() {
1518 let data = vec![
1519 (("1".to_owned(), "one".to_owned()), 1, 1),
1520 (("2".to_owned(), "two".to_owned()), 2, 1),
1521 (("3".to_owned(), "three".to_owned()), 3, 1),
1522 (("4".to_owned(), "four".to_owned()), 4, 1),
1523 ];
1524
1525 let cache = PersistClientCache::new_no_metrics();
1526
1527 cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1531 cache.cfg.set_config(&MAX_RUNS, 3);
1532 cache
1533 .cfg
1534 .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 2);
1535
1536 let client = cache
1537 .open(PersistLocation::new_in_mem())
1538 .await
1539 .expect("client construction failed");
1540 let (mut write, mut read) = client
1541 .expect_open::<String, String, u64, i64>(ShardId::new())
1542 .await;
1543
1544 let mut builder = write.builder(Antichain::from_elem(0));
1546
1547 fn assert_writing(
1548 builder: &BatchBuilder<String, String, u64, i64>,
1549 expected_finished: &[bool],
1550 ) {
1551 let WritingRuns::Compacting(run) = &builder.builder.parts.writing_runs else {
1552 unreachable!("ordered run!")
1553 };
1554
1555 let actual: Vec<_> = run.iter().map(|(_, p)| p.is_finished()).collect();
1556 assert_eq!(*expected_finished, actual);
1557 }
1558
1559 assert_writing(&builder, &[]);
1560
1561 let ((k, v), t, d) = &data[0];
1564 builder.add(k, v, t, d).await.expect("invalid usage");
1565 assert_writing(&builder, &[false]);
1566
1567 let ((k, v), t, d) = &data[1];
1570 builder.add(k, v, t, d).await.expect("invalid usage");
1571 assert_writing(&builder, &[false, false]);
1572
1573 let ((k, v), t, d) = &data[2];
1576 builder.add(k, v, t, d).await.expect("invalid usage");
1577 assert_writing(&builder, &[true, false, false]);
1578
1579 let ((k, v), t, d) = &data[3];
1582 builder.add(k, v, t, d).await.expect("invalid usage");
1583 assert_writing(&builder, &[false, false]);
1584
1585 let batch = builder
1588 .finish(Antichain::from_elem(5))
1589 .await
1590 .expect("invalid usage");
1591 assert_eq!(batch.batch.runs().count(), 2);
1592 assert_eq!(batch.batch.part_count(), 4);
1593 write
1594 .append_batch(batch, Antichain::from_elem(0), Antichain::from_elem(5))
1595 .await
1596 .expect("invalid usage")
1597 .expect("unexpected upper");
1598 assert_eq!(read.expect_snapshot_and_fetch(4).await, all_ok(&data, 4));
1599 }
1600
1601 #[mz_ore::test(tokio::test)]
1602 #[cfg_attr(miri, ignore)] async fn batch_builder_keys() {
1604 let cache = PersistClientCache::new_no_metrics();
1605 cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1607 cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
1609 cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1610 cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1611 let client = cache
1612 .open(PersistLocation::new_in_mem())
1613 .await
1614 .expect("client construction failed");
1615 let shard_id = ShardId::new();
1616 let (mut write, _) = client
1617 .expect_open::<String, String, u64, i64>(shard_id)
1618 .await;
1619
1620 let batch = write
1621 .expect_batch(
1622 &[
1623 (("1".into(), "one".into()), 1, 1),
1624 (("2".into(), "two".into()), 2, 1),
1625 (("3".into(), "three".into()), 3, 1),
1626 ],
1627 0,
1628 4,
1629 )
1630 .await;
1631
1632 assert_eq!(batch.batch.part_count(), 3);
1633 for part in &batch.batch.parts {
1634 let part = part.expect_hollow_part();
1635 match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
1636 Ok((shard, PartialBlobKey::Batch(writer, _))) => {
1637 assert_eq!(shard.to_string(), shard_id.to_string());
1638 assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
1639 }
1640 _ => panic!("unparseable blob key"),
1641 }
1642 }
1643 }
1644
1645 #[mz_ore::test(tokio::test)]
1646 #[cfg_attr(miri, ignore)] async fn batch_delete() {
1648 let cache = PersistClientCache::new_no_metrics();
1649 cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1650 cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1651 cache.cfg.set_config(&BATCH_DELETE_ENABLED, true);
1652 let client = cache
1653 .open(PersistLocation::new_in_mem())
1654 .await
1655 .expect("client construction failed");
1656 let shard_id = ShardId::new();
1657 let (mut write, _) = client
1658 .expect_open::<String, String, u64, i64>(shard_id)
1659 .await;
1660
1661 let batch = write
1662 .expect_batch(
1663 &[
1664 (("1".into(), "one".into()), 1, 1),
1665 (("2".into(), "two".into()), 2, 1),
1666 (("3".into(), "three".into()), 3, 1),
1667 ],
1668 0,
1669 4,
1670 )
1671 .await;
1672
1673 assert_eq!(batch.batch.part_count(), 1);
1674 let part_key = batch.batch.parts[0]
1675 .expect_hollow_part()
1676 .key
1677 .complete(&shard_id);
1678
1679 let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1680 assert!(part_bytes.is_some());
1681
1682 batch.delete().await;
1683
1684 let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1685 assert!(part_bytes.is_none());
1686 }
1687
1688 #[mz_ore::test]
1689 fn untrimmable_columns() {
1690 let untrimmable = UntrimmableColumns {
1691 equals: vec!["abc".into(), "def".into()],
1692 prefixes: vec!["123".into(), "234".into()],
1693 suffixes: vec!["xyz".into()],
1694 };
1695
1696 assert!(untrimmable.should_retain("abc"));
1698 assert!(untrimmable.should_retain("ABC"));
1699 assert!(untrimmable.should_retain("aBc"));
1700 assert!(!untrimmable.should_retain("abcd"));
1701 assert!(untrimmable.should_retain("deF"));
1702 assert!(!untrimmable.should_retain("defg"));
1703
1704 assert!(untrimmable.should_retain("123"));
1706 assert!(untrimmable.should_retain("123-4"));
1707 assert!(untrimmable.should_retain("1234"));
1708 assert!(untrimmable.should_retain("234"));
1709 assert!(!untrimmable.should_retain("345"));
1710
1711 assert!(untrimmable.should_retain("ijk_xyZ"));
1713 assert!(untrimmable.should_retain("ww-XYZ"));
1714 assert!(!untrimmable.should_retain("xya"));
1715 }
1716
1717 #[mz_persist_proc::test(tokio::test)]
1719 #[cfg_attr(miri, ignore)] async fn rewrite_ts_example(dyncfgs: ConfigUpdates) {
1721 let client = new_test_client(&dyncfgs).await;
1722 let (mut write, read) = client
1723 .expect_open::<String, (), u64, i64>(ShardId::new())
1724 .await;
1725
1726 let mut batch = write.builder(Antichain::from_elem(0));
1727 batch.add(&"foo".to_owned(), &(), &0, &1).await.unwrap();
1728 let batch = batch.finish(Antichain::from_elem(1)).await.unwrap();
1729
1730 let batch = batch.into_transmittable_batch();
1732 let mut batch = write.batch_from_transmittable_batch(batch);
1733 batch
1734 .rewrite_ts(&Antichain::from_elem(2), Antichain::from_elem(3))
1735 .unwrap();
1736 write
1737 .expect_compare_and_append_batch(&mut [&mut batch], 0, 3)
1738 .await;
1739
1740 let (actual, _) = read.expect_listen(0).await.read_until(&3).await;
1741 let expected = vec![(((Ok("foo".to_owned())), Ok(())), 2, 1)];
1742 assert_eq!(actual, expected);
1743 }
1744
1745 #[mz_ore::test(tokio::test)]
1746 #[cfg_attr(miri, ignore)] async fn structured_lowers() {
1748 let cache = PersistClientCache::new_no_metrics();
1749 cache.cfg().set_config(&STRUCTURED_KEY_LOWER_LEN, 1024);
1751 cache.cfg().set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1753 cache.cfg().set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1754 let client = cache
1755 .open(PersistLocation::new_in_mem())
1756 .await
1757 .expect("client construction failed");
1758 let shard_id = ShardId::new();
1759 let (mut write, _) = client
1760 .expect_open::<String, String, u64, i64>(shard_id)
1761 .await;
1762
1763 let batch = write
1764 .expect_batch(
1765 &[
1766 (("1".into(), "one".into()), 1, 1),
1767 (("2".into(), "two".into()), 2, 1),
1768 (("3".into(), "three".into()), 3, 1),
1769 ],
1770 0,
1771 4,
1772 )
1773 .await;
1774
1775 assert_eq!(batch.batch.part_count(), 1);
1776 let [part] = batch.batch.parts.as_slice() else {
1777 panic!("expected single part")
1778 };
1779 assert!(part.structured_key_lower().is_some());
1781 }
1782}