1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::marker::PhantomData;
16use std::mem;
17use std::sync::Arc;
18use std::time::Instant;
19
20use arrow::array::{Array, Int64Array};
21use bytes::Bytes;
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::stream::StreamExt;
26use futures_util::{FutureExt, stream};
27use mz_dyncfg::Config;
28use mz_ore::cast::CastFrom;
29use mz_ore::instrument;
30use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::Blob;
32use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
33use mz_persist_types::columnar::{ColumnDecoder, Schema};
34use mz_persist_types::parquet::{CompressionFormat, EncodingConfig};
35use mz_persist_types::part::{Part, PartBuilder};
36use mz_persist_types::schema::SchemaId;
37use mz_persist_types::stats::{
38 PartStats, TRUNCATE_LEN, TruncateBound, trim_to_budget, truncate_bytes,
39};
40use mz_persist_types::{Codec, Codec64};
41use mz_proto::RustType;
42use mz_timely_util::order::Reverse;
43use proptest_derive::Arbitrary;
44use semver::Version;
45use timely::PartialOrder;
46use timely::order::TotalOrder;
47use timely::progress::{Antichain, Timestamp};
48use tracing::{Instrument, debug_span, trace_span, warn};
49
50use crate::async_runtime::IsolatedRuntime;
51use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
52use crate::error::InvalidUsage;
53use crate::internal::compact::{CompactConfig, Compactor};
54use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
55use crate::internal::machine::retry_external;
56use crate::internal::merge::{MergeTree, Pending};
57use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
58use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
59use crate::internal::state::{
60 BatchPart, ENABLE_INCREMENTAL_COMPACTION, HollowBatch, HollowBatchPart, HollowRun,
61 HollowRunRef, ProtoInlineBatchPart, RunId, RunMeta, RunOrder, RunPart,
62};
63use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns};
64use crate::{PersistConfig, ShardId};
65
66include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs"));
67
68#[derive(Debug)]
74pub struct Batch<K, V, T, D> {
75 pub(crate) batch_delete_enabled: bool,
76 pub(crate) metrics: Arc<Metrics>,
77 pub(crate) shard_metrics: Arc<ShardMetrics>,
78
79 pub(crate) version: Version,
81
82 pub(crate) batch: HollowBatch<T>,
84
85 pub(crate) blob: Arc<dyn Blob>,
87
88 pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
91}
92
93impl<K, V, T, D> Drop for Batch<K, V, T, D> {
94 fn drop(&mut self) {
95 if self.batch.part_count() > 0 {
96 warn!(
97 "un-consumed Batch, with {} parts and dangling blob keys: {:?}",
98 self.batch.part_count(),
99 self.batch
100 .parts
101 .iter()
102 .map(|x| x.printable_name())
103 .collect::<Vec<_>>(),
104 );
105 }
106 }
107}
108
109impl<K, V, T, D> Batch<K, V, T, D>
110where
111 K: Debug + Codec,
112 V: Debug + Codec,
113 T: Timestamp + Lattice + Codec64,
114 D: Semigroup + Codec64,
115{
116 pub(crate) fn new(
117 batch_delete_enabled: bool,
118 metrics: Arc<Metrics>,
119 blob: Arc<dyn Blob>,
120 shard_metrics: Arc<ShardMetrics>,
121 version: Version,
122 batch: HollowBatch<T>,
123 ) -> Self {
124 Self {
125 batch_delete_enabled,
126 metrics,
127 shard_metrics,
128 version,
129 batch,
130 blob,
131 _phantom: PhantomData,
132 }
133 }
134
135 pub fn shard_id(&self) -> ShardId {
137 self.shard_metrics.shard_id
138 }
139
140 pub fn upper(&self) -> &Antichain<T> {
142 self.batch.desc.upper()
143 }
144
145 pub fn lower(&self) -> &Antichain<T> {
147 self.batch.desc.lower()
148 }
149
150 pub(crate) fn mark_consumed(&mut self) {
156 self.batch.parts.clear();
157 }
158
159 #[instrument(level = "debug", fields(shard = %self.shard_id()))]
162 pub async fn delete(mut self) {
163 if !self.batch_delete_enabled {
164 self.mark_consumed();
165 return;
166 }
167 let mut deletes = PartDeletes::default();
168 for part in self.batch.parts.drain(..) {
169 deletes.add(&part);
170 }
171 let () = deletes
172 .delete(
173 &*self.blob,
174 self.shard_id(),
175 usize::MAX,
176 &*self.metrics,
177 &*self.metrics.retries.external.batch_delete,
178 )
179 .await;
180 }
181
182 pub fn schemas(&self) -> impl Iterator<Item = SchemaId> + '_ {
184 self.batch.parts.iter().flat_map(|b| b.schema_id())
185 }
186
187 pub fn into_hollow_batch(mut self) -> HollowBatch<T> {
192 let ret = self.batch.clone();
193 self.mark_consumed();
194 ret
195 }
196
197 pub fn into_transmittable_batch(mut self) -> ProtoBatch {
206 let ret = ProtoBatch {
207 shard_id: self.shard_metrics.shard_id.into_proto(),
208 version: self.version.to_string(),
209 batch: Some(self.batch.into_proto()),
210 };
211 self.mark_consumed();
212 ret
213 }
214
215 pub(crate) async fn flush_to_blob(
216 &mut self,
217 cfg: &BatchBuilderConfig,
218 batch_metrics: &BatchWriteMetrics,
219 isolated_runtime: &Arc<IsolatedRuntime>,
220 write_schemas: &Schemas<K, V>,
221 ) {
222 let mut parts = Vec::new();
227 for (run_meta, run_parts) in self.batch.runs() {
228 for part in run_parts {
229 let (updates, ts_rewrite, schema_id) = match part {
230 RunPart::Single(BatchPart::Inline {
231 updates,
232 ts_rewrite,
233 schema_id,
234 deprecated_schema_id: _,
235 }) => (updates, ts_rewrite, schema_id),
236 other @ RunPart::Many(_) | other @ RunPart::Single(BatchPart::Hollow(_)) => {
237 parts.push(other.clone());
238 continue;
239 }
240 };
241 let updates = updates
242 .decode::<T>(&self.metrics.columnar)
243 .expect("valid inline part");
244 let diffs_sum =
245 diffs_sum::<D>(updates.updates.diffs()).expect("inline parts are not empty");
246 let mut write_schemas = write_schemas.clone();
247 write_schemas.id = *schema_id;
248
249 let write_span =
250 debug_span!("batch::flush_to_blob", shard = %self.shard_metrics.shard_id)
251 .or_current();
252 let handle = mz_ore::task::spawn(
253 || "batch::flush_to_blob",
254 BatchParts::write_hollow_part(
255 cfg.clone(),
256 Arc::clone(&self.blob),
257 Arc::clone(&self.metrics),
258 Arc::clone(&self.shard_metrics),
259 batch_metrics.clone(),
260 Arc::clone(isolated_runtime),
261 updates,
262 run_meta.order.unwrap_or(RunOrder::Unordered),
263 ts_rewrite.clone(),
264 D::encode(&diffs_sum),
265 write_schemas,
266 )
267 .instrument(write_span),
268 );
269 let part = handle.await.expect("part write task failed");
270 parts.push(RunPart::Single(part));
271 }
272 }
273 self.batch.parts = parts;
274 }
275
276 pub fn encoded_size_bytes(&self) -> usize {
278 self.batch.encoded_size_bytes()
279 }
280}
281
282impl<K, V, T, D> Batch<K, V, T, D>
283where
284 K: Debug + Codec,
285 V: Debug + Codec,
286 T: Timestamp + Lattice + Codec64 + TotalOrder,
287 D: Semigroup + Codec64,
288{
289 pub fn rewrite_ts(
323 &mut self,
324 frontier: &Antichain<T>,
325 new_upper: Antichain<T>,
326 ) -> Result<(), InvalidUsage<T>> {
327 self.batch
328 .rewrite_ts(frontier, new_upper)
329 .map_err(InvalidUsage::InvalidRewrite)
330 }
331}
332
333#[derive(Debug)]
335pub enum Added {
336 Record,
338 RecordAndParts,
341}
342
343#[derive(Debug, Clone)]
346pub struct BatchBuilderConfig {
347 writer_key: WriterKey,
348 pub(crate) blob_target_size: usize,
349 pub(crate) batch_delete_enabled: bool,
350 pub(crate) batch_builder_max_outstanding_parts: usize,
351 pub(crate) inline_writes_single_max_bytes: usize,
352 pub(crate) stats_collection_enabled: bool,
353 pub(crate) stats_budget: usize,
354 pub(crate) stats_untrimmable_columns: Arc<UntrimmableColumns>,
355 pub(crate) encoding_config: EncodingConfig,
356 pub(crate) preferred_order: RunOrder,
357 pub(crate) structured_key_lower_len: usize,
358 pub(crate) run_length_limit: usize,
359 pub(crate) enable_incremental_compaction: bool,
360 pub(crate) max_runs: Option<usize>,
364}
365
366pub(crate) const BATCH_DELETE_ENABLED: Config<bool> = Config::new(
368 "persist_batch_delete_enabled",
369 true,
370 "Whether to actually delete blobs when batch delete is called (Materialize).",
371);
372
373pub(crate) const ENCODING_ENABLE_DICTIONARY: Config<bool> = Config::new(
374 "persist_encoding_enable_dictionary",
375 true,
376 "A feature flag to enable dictionary encoding for Parquet data (Materialize).",
377);
378
379pub(crate) const ENCODING_COMPRESSION_FORMAT: Config<&'static str> = Config::new(
380 "persist_encoding_compression_format",
381 "none",
382 "A feature flag to enable compression of Parquet data (Materialize).",
383);
384
385pub(crate) const STRUCTURED_KEY_LOWER_LEN: Config<usize> = Config::new(
386 "persist_batch_structured_key_lower_len",
387 256,
388 "The maximum size in proto bytes of any structured key-lower metadata to preserve. \
389 (If we're unable to fit the lower in budget, or the budget is zero, no metadata is kept.)",
390);
391
392pub(crate) const MAX_RUN_LEN: Config<usize> = Config::new(
393 "persist_batch_max_run_len",
394 usize::MAX,
395 "The maximum length a run can have before it will be spilled as a hollow run \
396 into the blob store.",
397);
398
399pub(crate) const MAX_RUNS: Config<usize> = Config::new(
400 "persist_batch_max_runs",
401 1,
402 "The maximum number of runs a batch builder should generate for user batches. \
403 (Compaction outputs always generate a single run.) \
404 The minimum value is 2; below this, compaction is disabled.",
405);
406
407pub(crate) const BLOB_TARGET_SIZE: Config<usize> = Config::new(
414 "persist_blob_target_size",
415 128 * MiB,
416 "A target maximum size of persist blob payloads in bytes (Materialize).",
417);
418
419pub(crate) const INLINE_WRITES_SINGLE_MAX_BYTES: Config<usize> = Config::new(
420 "persist_inline_writes_single_max_bytes",
421 4096,
422 "The (exclusive) maximum size of a write that persist will inline in metadata.",
423);
424
425pub(crate) const INLINE_WRITES_TOTAL_MAX_BYTES: Config<usize> = Config::new(
426 "persist_inline_writes_total_max_bytes",
427 1 * MiB,
428 "\
429 The (exclusive) maximum total size of inline writes in metadata before \
430 persist will backpressure them by flushing out to s3.",
431);
432
433impl BatchBuilderConfig {
434 pub fn new(value: &PersistConfig, _shard_id: ShardId) -> Self {
436 let writer_key = WriterKey::for_version(&value.build_version);
437
438 let preferred_order = RunOrder::Structured;
439
440 BatchBuilderConfig {
441 writer_key,
442 blob_target_size: BLOB_TARGET_SIZE.get(value).clamp(1, usize::MAX),
443 batch_delete_enabled: BATCH_DELETE_ENABLED.get(value),
444 batch_builder_max_outstanding_parts: BATCH_BUILDER_MAX_OUTSTANDING_PARTS.get(value),
445 inline_writes_single_max_bytes: INLINE_WRITES_SINGLE_MAX_BYTES.get(value),
446 stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value),
447 stats_budget: STATS_BUDGET_BYTES.get(value),
448 stats_untrimmable_columns: Arc::new(untrimmable_columns(value)),
449 encoding_config: EncodingConfig {
450 use_dictionary: ENCODING_ENABLE_DICTIONARY.get(value),
451 compression: CompressionFormat::from_str(&ENCODING_COMPRESSION_FORMAT.get(value)),
452 },
453 preferred_order,
454 structured_key_lower_len: STRUCTURED_KEY_LOWER_LEN.get(value),
455 run_length_limit: MAX_RUN_LEN.get(value).clamp(2, usize::MAX),
456 max_runs: match MAX_RUNS.get(value) {
457 limit @ 2.. => Some(limit),
458 _ => None,
459 },
460 enable_incremental_compaction: ENABLE_INCREMENTAL_COMPACTION.get(value),
461 }
462 }
463}
464
465#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Arbitrary)]
468pub(crate) struct UntrimmableColumns {
469 pub equals: Vec<Cow<'static, str>>,
471 pub prefixes: Vec<Cow<'static, str>>,
473 pub suffixes: Vec<Cow<'static, str>>,
475}
476
477impl UntrimmableColumns {
478 pub(crate) fn should_retain(&self, name: &str) -> bool {
479 let name_lower = name.to_lowercase();
482 for s in &self.equals {
483 if *s == name_lower {
484 return true;
485 }
486 }
487 for s in &self.prefixes {
488 if name_lower.starts_with(s.as_ref()) {
489 return true;
490 }
491 }
492 for s in &self.suffixes {
493 if name_lower.ends_with(s.as_ref()) {
494 return true;
495 }
496 }
497 false
498 }
499}
500
501#[derive(Debug)]
504pub struct BatchBuilder<K, V, T, D>
505where
506 K: Codec,
507 V: Codec,
508 T: Timestamp + Lattice + Codec64,
509{
510 inline_desc: Description<T>,
511 inclusive_upper: Antichain<Reverse<T>>,
512
513 records_builder: PartBuilder<K, K::Schema, V, V::Schema>,
514 pub(crate) builder: BatchBuilderInternal<K, V, T, D>,
515}
516
517impl<K, V, T, D> BatchBuilder<K, V, T, D>
518where
519 K: Debug + Codec,
520 V: Debug + Codec,
521 T: Timestamp + Lattice + Codec64,
522 D: Semigroup + Codec64,
523{
524 pub(crate) fn new(
525 builder: BatchBuilderInternal<K, V, T, D>,
526 inline_desc: Description<T>,
527 ) -> Self {
528 let records_builder = PartBuilder::new(
529 builder.write_schemas.key.as_ref(),
530 builder.write_schemas.val.as_ref(),
531 );
532 Self {
533 inline_desc,
534 inclusive_upper: Antichain::new(),
535 records_builder,
536 builder,
537 }
538 }
539
540 pub async fn finish(
545 mut self,
546 registered_upper: Antichain<T>,
547 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
548 if PartialOrder::less_than(®istered_upper, self.inline_desc.lower()) {
549 return Err(InvalidUsage::InvalidBounds {
550 lower: self.inline_desc.lower().clone(),
551 upper: registered_upper,
552 });
553 }
554
555 if PartialOrder::less_equal(self.inline_desc.since(), self.inline_desc.lower()) {
560 for ts in self.inclusive_upper.iter() {
561 if registered_upper.less_equal(&ts.0) {
562 return Err(InvalidUsage::UpdateBeyondUpper {
563 ts: ts.0.clone(),
564 expected_upper: registered_upper.clone(),
565 });
566 }
567 }
568 }
569
570 let updates = self.records_builder.finish();
571 self.builder
572 .flush_part(self.inline_desc.clone(), updates)
573 .await;
574
575 self.builder
576 .finish(Description::new(
577 self.inline_desc.lower().clone(),
578 registered_upper,
579 self.inline_desc.since().clone(),
580 ))
581 .await
582 }
583
584 pub async fn add(
589 &mut self,
590 key: &K,
591 val: &V,
592 ts: &T,
593 diff: &D,
594 ) -> Result<Added, InvalidUsage<T>> {
595 if !self.inline_desc.lower().less_equal(ts) {
596 return Err(InvalidUsage::UpdateNotBeyondLower {
597 ts: ts.clone(),
598 lower: self.inline_desc.lower().clone(),
599 });
600 }
601 self.inclusive_upper.insert(Reverse(ts.clone()));
602
603 let added = {
604 self.records_builder
605 .push(key, val, ts.clone(), diff.clone());
606 if self.records_builder.goodbytes() >= self.builder.parts.cfg.blob_target_size {
607 let part = self.records_builder.finish_and_replace(
608 self.builder.write_schemas.key.as_ref(),
609 self.builder.write_schemas.val.as_ref(),
610 );
611 Some(part)
612 } else {
613 None
614 }
615 };
616
617 let added = if let Some(full_batch) = added {
618 self.builder
619 .flush_part(self.inline_desc.clone(), full_batch)
620 .await;
621 Added::RecordAndParts
622 } else {
623 Added::Record
624 };
625 Ok(added)
626 }
627}
628
629#[derive(Debug)]
630pub(crate) struct BatchBuilderInternal<K, V, T, D>
631where
632 K: Codec,
633 V: Codec,
634 T: Timestamp + Lattice + Codec64,
635{
636 shard_id: ShardId,
637 version: Version,
638 blob: Arc<dyn Blob>,
639 metrics: Arc<Metrics>,
640
641 write_schemas: Schemas<K, V>,
642 parts: BatchParts<T>,
643
644 _phantom: PhantomData<fn(K, V, T, D)>,
647}
648
649impl<K, V, T, D> BatchBuilderInternal<K, V, T, D>
650where
651 K: Debug + Codec,
652 V: Debug + Codec,
653 T: Timestamp + Lattice + Codec64,
654 D: Semigroup + Codec64,
655{
656 pub(crate) fn new(
657 _cfg: BatchBuilderConfig,
658 parts: BatchParts<T>,
659 metrics: Arc<Metrics>,
660 write_schemas: Schemas<K, V>,
661 blob: Arc<dyn Blob>,
662 shard_id: ShardId,
663 version: Version,
664 ) -> Self {
665 Self {
666 blob,
667 metrics,
668 write_schemas,
669 parts,
670 shard_id,
671 version,
672 _phantom: PhantomData,
673 }
674 }
675
676 #[instrument(level = "debug", name = "batch::finish", fields(shard = %self.shard_id))]
681 pub async fn finish(
682 self,
683 registered_desc: Description<T>,
684 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
685 let write_run_ids = self.parts.cfg.enable_incremental_compaction;
686 let batch_delete_enabled = self.parts.cfg.batch_delete_enabled;
687 let shard_metrics = Arc::clone(&self.parts.shard_metrics);
688 let runs = self.parts.finish().await;
689
690 let mut run_parts = vec![];
691 let mut run_splits = vec![];
692 let mut run_meta = vec![];
693 let total_updates = runs
694 .iter()
695 .map(|(_, _, num_updates)| num_updates)
696 .sum::<usize>();
697 for (order, parts, num_updates) in runs {
698 if parts.is_empty() {
699 continue;
700 }
701 if run_parts.len() != 0 {
702 run_splits.push(run_parts.len());
703 }
704 run_meta.push(RunMeta {
705 order: Some(order),
706 schema: self.write_schemas.id,
707 deprecated_schema: None,
709 id: if write_run_ids {
710 Some(RunId::new())
711 } else {
712 None
713 },
714 len: if write_run_ids {
715 Some(num_updates)
716 } else {
717 None
718 },
719 });
720 run_parts.extend(parts);
721 }
722 let desc = registered_desc;
723
724 let batch = Batch::new(
725 batch_delete_enabled,
726 Arc::clone(&self.metrics),
727 self.blob,
728 shard_metrics,
729 self.version,
730 HollowBatch::new(desc, run_parts, total_updates, run_meta, run_splits),
731 );
732
733 Ok(batch)
734 }
735
736 pub async fn flush_part(&mut self, part_desc: Description<T>, columnar: Part) {
742 let num_updates = columnar.len();
743 if num_updates == 0 {
744 return;
745 }
746 let diffs_sum = diffs_sum::<D>(&columnar.diff).expect("part is non empty");
747
748 let start = Instant::now();
749 self.parts
750 .write(&self.write_schemas, part_desc, columnar, diffs_sum)
751 .await;
752 self.metrics
753 .compaction
754 .batch
755 .step_part_writing
756 .inc_by(start.elapsed().as_secs_f64());
757 }
758}
759
760#[derive(Debug, Clone)]
761pub(crate) struct RunWithMeta<T> {
762 pub parts: Vec<RunPart<T>>,
763 pub num_updates: usize,
764}
765
766impl<T> RunWithMeta<T> {
767 pub fn new(parts: Vec<RunPart<T>>, num_updates: usize) -> Self {
768 Self { parts, num_updates }
769 }
770
771 pub fn single(part: RunPart<T>, num_updates: usize) -> Self {
772 Self {
773 parts: vec![part],
774 num_updates,
775 }
776 }
777}
778
779#[derive(Debug)]
780enum WritingRuns<T> {
781 Ordered(RunOrder, MergeTree<Pending<RunWithMeta<T>>>),
785 Compacting(MergeTree<(RunOrder, Pending<RunWithMeta<T>>)>),
788}
789
790#[derive(Debug)]
793pub(crate) struct BatchParts<T> {
794 cfg: BatchBuilderConfig,
795 metrics: Arc<Metrics>,
796 shard_metrics: Arc<ShardMetrics>,
797 shard_id: ShardId,
798 blob: Arc<dyn Blob>,
799 isolated_runtime: Arc<IsolatedRuntime>,
800 next_index: u64,
801 writing_runs: WritingRuns<T>,
802 batch_metrics: BatchWriteMetrics,
803}
804
805impl<T: Timestamp + Codec64> BatchParts<T> {
806 pub(crate) fn new_compacting<K, V, D>(
807 cfg: CompactConfig,
808 desc: Description<T>,
809 runs_per_compaction: usize,
810 metrics: Arc<Metrics>,
811 shard_metrics: Arc<ShardMetrics>,
812 shard_id: ShardId,
813 blob: Arc<dyn Blob>,
814 isolated_runtime: Arc<IsolatedRuntime>,
815 batch_metrics: &BatchWriteMetrics,
816 schemas: Schemas<K, V>,
817 ) -> Self
818 where
819 K: Codec + Debug,
820 V: Codec + Debug,
821 T: Lattice + Send + Sync,
822 D: Semigroup + Ord + Codec64 + Send + Sync,
823 {
824 let writing_runs = {
825 let cfg = cfg.clone();
826 let blob = Arc::clone(&blob);
827 let metrics = Arc::clone(&metrics);
828 let shard_metrics = Arc::clone(&shard_metrics);
829 let isolated_runtime = Arc::clone(&isolated_runtime);
830 let runs_per_compaction = runs_per_compaction.clamp(2, 1024);
832
833 let merge_fn = move |parts: Vec<(RunOrder, Pending<RunWithMeta<T>>)>| {
834 let blob = Arc::clone(&blob);
835 let metrics = Arc::clone(&metrics);
836 let shard_metrics = Arc::clone(&shard_metrics);
837 let cfg = cfg.clone();
838 let isolated_runtime = Arc::clone(&isolated_runtime);
839 let write_schemas = schemas.clone();
840 let compact_desc = desc.clone();
841 let handle = mz_ore::task::spawn(
842 || "batch::compact_runs",
843 async move {
844 let runs: Vec<_> = stream::iter(parts)
845 .then(|(order, parts)| async move {
846 let completed_run = parts.into_result().await;
847 (
848 RunMeta {
849 order: Some(order),
850 schema: schemas.id,
851 deprecated_schema: None,
854 id: if cfg.incremental_compaction {
855 Some(RunId::new())
856 } else {
857 None
858 },
859 len: if cfg.incremental_compaction {
860 Some(completed_run.num_updates)
861 } else {
862 None
863 },
864 },
865 completed_run.parts,
866 )
867 })
868 .collect()
869 .await;
870
871 let run_refs: Vec<_> = runs
872 .iter()
873 .map(|(meta, run)| (&compact_desc, meta, run.as_slice()))
874 .collect();
875
876 let output_batch = Compactor::<K, V, T, D>::compact_runs(
877 &cfg,
878 &shard_id,
879 &compact_desc,
880 run_refs,
881 blob,
882 metrics,
883 shard_metrics,
884 isolated_runtime,
885 write_schemas,
886 )
887 .await
888 .expect("successful compaction");
889
890 assert_eq!(
891 output_batch.run_meta.len(),
892 1,
893 "compaction is guaranteed to emit a single run"
894 );
895 let total_compacted_updates: usize = output_batch.len;
896
897 RunWithMeta::new(output_batch.parts, total_compacted_updates)
898 }
899 .instrument(debug_span!("batch::compact_runs")),
900 );
901 (RunOrder::Structured, Pending::new(handle))
902 };
903 WritingRuns::Compacting(MergeTree::new(runs_per_compaction, merge_fn))
904 };
905 BatchParts {
906 cfg: cfg.batch,
907 metrics,
908 shard_metrics,
909 shard_id,
910 blob,
911 isolated_runtime,
912 next_index: 0,
913 writing_runs,
914 batch_metrics: batch_metrics.clone(),
915 }
916 }
917
918 pub(crate) fn new_ordered<D: Semigroup + Codec64>(
919 cfg: BatchBuilderConfig,
920 order: RunOrder,
921 metrics: Arc<Metrics>,
922 shard_metrics: Arc<ShardMetrics>,
923 shard_id: ShardId,
924 blob: Arc<dyn Blob>,
925 isolated_runtime: Arc<IsolatedRuntime>,
926 batch_metrics: &BatchWriteMetrics,
927 ) -> Self {
928 let writing_runs = {
929 let cfg = cfg.clone();
930 let blob = Arc::clone(&blob);
931 let metrics = Arc::clone(&metrics);
932 let writer_key = cfg.writer_key.clone();
933 let run_length_limit = (order == RunOrder::Unordered)
936 .then_some(usize::MAX)
937 .unwrap_or(cfg.run_length_limit);
938 let merge_fn = move |parts: Vec<Pending<RunWithMeta<T>>>| {
939 let blob = Arc::clone(&blob);
940 let writer_key = writer_key.clone();
941 let metrics = Arc::clone(&metrics);
942 let handle = mz_ore::task::spawn(
943 || "batch::spill_run",
944 async move {
945 let completed_runs: Vec<RunWithMeta<T>> = stream::iter(parts)
946 .then(|p| p.into_result())
947 .collect()
948 .await;
949
950 let mut all_run_parts = Vec::new();
951 let mut total_updates = 0;
952
953 for completed_run in completed_runs {
954 all_run_parts.extend(completed_run.parts);
955 total_updates += completed_run.num_updates;
956 }
957
958 let run_ref = HollowRunRef::set::<D>(
959 shard_id,
960 blob.as_ref(),
961 &writer_key,
962 HollowRun {
963 parts: all_run_parts,
964 },
965 &*metrics,
966 )
967 .await;
968
969 RunWithMeta::single(RunPart::Many(run_ref), total_updates)
970 }
971 .instrument(debug_span!("batch::spill_run")),
972 );
973 Pending::new(handle)
974 };
975 WritingRuns::Ordered(order, MergeTree::new(run_length_limit, merge_fn))
976 };
977 BatchParts {
978 cfg,
979 metrics,
980 shard_metrics,
981 shard_id,
982 blob,
983 isolated_runtime,
984 next_index: 0,
985 writing_runs,
986 batch_metrics: batch_metrics.clone(),
987 }
988 }
989
990 pub(crate) fn expected_order(&self) -> RunOrder {
991 match self.writing_runs {
992 WritingRuns::Ordered(order, _) => order,
993 WritingRuns::Compacting(_) => RunOrder::Unordered,
994 }
995 }
996
997 pub(crate) async fn write<K: Codec, V: Codec, D: Codec64>(
998 &mut self,
999 write_schemas: &Schemas<K, V>,
1000 desc: Description<T>,
1001 updates: Part,
1002 diffs_sum: D,
1003 ) {
1004 let batch_metrics = self.batch_metrics.clone();
1005 let index = self.next_index;
1006 self.next_index += 1;
1007 let num_updates = updates.len();
1008 let ts_rewrite = None;
1009 let schema_id = write_schemas.id;
1010
1011 let inline_threshold = self.cfg.inline_writes_single_max_bytes;
1014
1015 let updates = BlobTraceUpdates::from_part(updates);
1016 let (name, write_future) = if updates.goodbytes() < inline_threshold {
1017 let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current();
1018 (
1019 "batch::inline_part",
1020 async move {
1021 let start = Instant::now();
1022 let updates = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
1023 desc: Some(desc.into_proto()),
1024 index: index.into_proto(),
1025 updates: Some(updates.into_proto()),
1026 });
1027 batch_metrics
1028 .step_inline
1029 .inc_by(start.elapsed().as_secs_f64());
1030
1031 RunWithMeta::single(
1032 RunPart::Single(BatchPart::Inline {
1033 updates,
1034 ts_rewrite,
1035 schema_id,
1036 deprecated_schema_id: None,
1038 }),
1039 num_updates,
1040 )
1041 }
1042 .instrument(span)
1043 .boxed(),
1044 )
1045 } else {
1046 let part = BlobTraceBatchPart {
1047 desc,
1048 updates,
1049 index,
1050 };
1051 let cfg = self.cfg.clone();
1052 let blob = Arc::clone(&self.blob);
1053 let metrics = Arc::clone(&self.metrics);
1054 let shard_metrics = Arc::clone(&self.shard_metrics);
1055 let isolated_runtime = Arc::clone(&self.isolated_runtime);
1056 let expected_order = self.expected_order();
1057 let encoded_diffs_sum = D::encode(&diffs_sum);
1058 let write_schemas_clone = write_schemas.clone();
1059 let write_span =
1060 debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current();
1061 (
1062 "batch::write_part",
1063 async move {
1064 let part = BatchParts::write_hollow_part(
1065 cfg,
1066 blob,
1067 metrics,
1068 shard_metrics,
1069 batch_metrics,
1070 isolated_runtime,
1071 part,
1072 expected_order,
1073 ts_rewrite,
1074 encoded_diffs_sum,
1075 write_schemas_clone,
1076 )
1077 .await;
1078 RunWithMeta::single(RunPart::Single(part), num_updates)
1079 }
1080 .instrument(write_span)
1081 .boxed(),
1082 )
1083 };
1084
1085 match &mut self.writing_runs {
1086 WritingRuns::Ordered(_order, run) => {
1087 let part = Pending::new(mz_ore::task::spawn(|| name, write_future));
1088 run.push(part);
1089
1090 for part in run
1093 .iter_mut()
1094 .rev()
1095 .skip(self.cfg.batch_builder_max_outstanding_parts)
1096 .take_while(|p| !p.is_finished())
1097 {
1098 self.batch_metrics.write_stalls.inc();
1099 part.block_until_ready().await;
1100 }
1101 }
1102 WritingRuns::Compacting(batches) => {
1103 let run = Pending::Writing(mz_ore::task::spawn(|| name, write_future));
1104 batches.push((RunOrder::Unordered, run));
1105
1106 let mut part_budget = self.cfg.batch_builder_max_outstanding_parts;
1109 let mut compaction_budget = 1;
1110 for (_, part) in batches
1111 .iter_mut()
1112 .rev()
1113 .skip_while(|(order, _)| match order {
1114 RunOrder::Unordered if part_budget > 0 => {
1115 part_budget -= 1;
1116 true
1117 }
1118 RunOrder::Structured | RunOrder::Codec if compaction_budget > 0 => {
1119 compaction_budget -= 1;
1120 true
1121 }
1122 _ => false,
1123 })
1124 .take_while(|(_, p)| !p.is_finished())
1125 {
1126 self.batch_metrics.write_stalls.inc();
1127 part.block_until_ready().await;
1128 }
1129 }
1130 }
1131 }
1132
1133 async fn write_hollow_part<K: Codec, V: Codec>(
1134 cfg: BatchBuilderConfig,
1135 blob: Arc<dyn Blob>,
1136 metrics: Arc<Metrics>,
1137 shard_metrics: Arc<ShardMetrics>,
1138 batch_metrics: BatchWriteMetrics,
1139 isolated_runtime: Arc<IsolatedRuntime>,
1140 mut updates: BlobTraceBatchPart<T>,
1141 run_order: RunOrder,
1142 ts_rewrite: Option<Antichain<T>>,
1143 diffs_sum: [u8; 8],
1144 write_schemas: Schemas<K, V>,
1145 ) -> BatchPart<T> {
1146 let partial_key = PartialBatchKey::new(&cfg.writer_key, &PartId::new());
1147 let key = partial_key.complete(&shard_metrics.shard_id);
1148 let goodbytes = updates.updates.goodbytes();
1149 let metrics_ = Arc::clone(&metrics);
1150 let schema_id = write_schemas.id;
1151
1152 let (stats, key_lower, structured_key_lower, (buf, encode_time)) = isolated_runtime
1153 .spawn_named(|| "batch::encode_part", async move {
1154 let stats = metrics_.columnar.arrow().measure_part_build(|| {
1156 let stats = if cfg.stats_collection_enabled {
1157 let ext = updates.updates.get_or_make_structured::<K, V>(
1158 write_schemas.key.as_ref(),
1159 write_schemas.val.as_ref(),
1160 );
1161
1162 let key_stats = write_schemas
1163 .key
1164 .decoder_any(ext.key.as_ref())
1165 .expect("decoding just-encoded data")
1166 .stats();
1167
1168 let part_stats = PartStats { key: key_stats };
1169
1170 let trimmed_start = Instant::now();
1172 let mut trimmed_bytes = 0;
1173 let trimmed_stats = LazyPartStats::encode(&part_stats, |s| {
1174 trimmed_bytes = trim_to_budget(s, cfg.stats_budget, |s| {
1175 cfg.stats_untrimmable_columns.should_retain(s)
1176 })
1177 });
1178 let trimmed_duration = trimmed_start.elapsed();
1179 Some((trimmed_stats, trimmed_duration, trimmed_bytes))
1180 } else {
1181 None
1182 };
1183
1184 updates.updates = updates.updates.as_structured::<K, V>(
1186 write_schemas.key.as_ref(),
1187 write_schemas.val.as_ref(),
1188 );
1189
1190 stats
1191 });
1192
1193 let key_lower = if let Some(records) = updates.updates.records() {
1194 let key_bytes = records.keys();
1195 if key_bytes.is_empty() {
1196 &[]
1197 } else if run_order == RunOrder::Codec {
1198 key_bytes.value(0)
1199 } else {
1200 ::arrow::compute::min_binary(key_bytes).expect("min of nonempty array")
1201 }
1202 } else {
1203 &[]
1204 };
1205 let key_lower = truncate_bytes(key_lower, TRUNCATE_LEN, TruncateBound::Lower)
1206 .expect("lower bound always exists");
1207
1208 let structured_key_lower = if cfg.structured_key_lower_len > 0 {
1209 updates.updates.structured().and_then(|ext| {
1210 let min_key = if run_order == RunOrder::Structured {
1211 0
1212 } else {
1213 let ord = ArrayOrd::new(ext.key.as_ref());
1214 (0..ext.key.len())
1215 .min_by_key(|i| ord.at(*i))
1216 .expect("non-empty batch")
1217 };
1218 let lower = ArrayBound::new(Arc::clone(&ext.key), min_key)
1219 .to_proto_lower(cfg.structured_key_lower_len);
1220 if lower.is_none() {
1221 batch_metrics.key_lower_too_big.inc()
1222 }
1223 lower.map(|proto| LazyProto::from(&proto))
1224 })
1225 } else {
1226 None
1227 };
1228
1229 let encode_start = Instant::now();
1230 let mut buf = Vec::new();
1231 updates.encode(&mut buf, &metrics_.columnar, &cfg.encoding_config);
1232
1233 drop(updates);
1235 (
1236 stats,
1237 key_lower,
1238 structured_key_lower,
1239 (Bytes::from(buf), encode_start.elapsed()),
1240 )
1241 })
1242 .instrument(debug_span!("batch::encode_part"))
1243 .await
1244 .expect("part encode task failed");
1245 metrics.codecs.batch.encode_count.inc();
1247 metrics
1248 .codecs
1249 .batch
1250 .encode_seconds
1251 .inc_by(encode_time.as_secs_f64());
1252
1253 let start = Instant::now();
1254 let payload_len = buf.len();
1255 let () = retry_external(&metrics.retries.external.batch_set, || async {
1256 shard_metrics.blob_sets.inc();
1257 blob.set(&key, Bytes::clone(&buf)).await
1258 })
1259 .instrument(trace_span!("batch::set", payload_len))
1260 .await;
1261 batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64());
1262 batch_metrics.bytes.inc_by(u64::cast_from(payload_len));
1263 batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes));
1264 match run_order {
1265 RunOrder::Unordered => batch_metrics.unordered.inc(),
1266 RunOrder::Codec => batch_metrics.codec_order.inc(),
1267 RunOrder::Structured => batch_metrics.structured_order.inc(),
1268 }
1269 let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| {
1270 batch_metrics
1271 .step_stats
1272 .inc_by(stats_step_timing.as_secs_f64());
1273 if trimmed_bytes > 0 {
1274 metrics.pushdown.parts_stats_trimmed_count.inc();
1275 metrics
1276 .pushdown
1277 .parts_stats_trimmed_bytes
1278 .inc_by(u64::cast_from(trimmed_bytes));
1279 }
1280 stats
1281 });
1282
1283 BatchPart::Hollow(HollowBatchPart {
1284 key: partial_key,
1285 encoded_size_bytes: payload_len,
1286 key_lower,
1287 structured_key_lower,
1288 stats,
1289 ts_rewrite,
1290 diffs_sum: Some(diffs_sum),
1291 format: Some(BatchColumnarFormat::Structured),
1292 schema_id,
1293 deprecated_schema_id: None,
1295 })
1296 }
1297
1298 #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))]
1299 pub(crate) async fn finish(self) -> Vec<(RunOrder, Vec<RunPart<T>>, usize)> {
1300 match self.writing_runs {
1301 WritingRuns::Ordered(RunOrder::Unordered, run) => {
1302 let completed_runs = run.finish();
1303 let mut output = Vec::with_capacity(completed_runs.len());
1304 for completed_run in completed_runs {
1305 let completed_run = completed_run.into_result().await;
1306 for part in completed_run.parts {
1308 output.push((RunOrder::Unordered, vec![part], completed_run.num_updates));
1309 }
1310 }
1311 output
1312 }
1313 WritingRuns::Ordered(order, run) => {
1314 let completed_runs = run.finish();
1315 let mut all_parts = Vec::new();
1316 let mut all_update_counts = 0;
1317 for completed_run in completed_runs {
1318 let completed_run = completed_run.into_result().await;
1319 all_parts.extend(completed_run.parts);
1320 all_update_counts += completed_run.num_updates;
1321 }
1322 vec![(order, all_parts, all_update_counts)]
1323 }
1324 WritingRuns::Compacting(batches) => {
1325 let runs = batches.finish();
1326 let mut output = Vec::new();
1327 for (order, run) in runs {
1328 let completed_run = run.into_result().await;
1329 output.push((order, completed_run.parts, completed_run.num_updates));
1330 }
1331 output
1332 }
1333 }
1334 }
1335}
1336
1337pub(crate) fn validate_truncate_batch<T: Timestamp>(
1338 batch: &HollowBatch<T>,
1339 truncate: &Description<T>,
1340 any_batch_rewrite: bool,
1341 enforce_matching_batch_boundaries: bool,
1342) -> Result<(), InvalidUsage<T>> {
1343 if any_batch_rewrite {
1346 if truncate.upper() != batch.desc.upper() {
1351 return Err(InvalidUsage::InvalidRewrite(format!(
1352 "rewritten batch might have data past {:?} up to {:?}",
1353 truncate.upper().elements(),
1354 batch.desc.upper().elements(),
1355 )));
1356 }
1357 for part in batch.parts.iter() {
1360 let part_lower_bound = part.ts_rewrite().unwrap_or_else(|| batch.desc.lower());
1361 if !PartialOrder::less_equal(truncate.lower(), part_lower_bound) {
1362 return Err(InvalidUsage::InvalidRewrite(format!(
1363 "rewritten batch might have data below {:?} at {:?}",
1364 truncate.lower().elements(),
1365 part_lower_bound.elements(),
1366 )));
1367 }
1368 }
1369 }
1370
1371 if !enforce_matching_batch_boundaries {
1372 return Ok(());
1373 }
1374
1375 let batch = &batch.desc;
1376 if !PartialOrder::less_equal(batch.lower(), truncate.lower())
1377 || PartialOrder::less_than(batch.upper(), truncate.upper())
1378 {
1379 return Err(InvalidUsage::InvalidBatchBounds {
1380 batch_lower: batch.lower().clone(),
1381 batch_upper: batch.upper().clone(),
1382 append_lower: truncate.lower().clone(),
1383 append_upper: truncate.upper().clone(),
1384 });
1385 }
1386
1387 Ok(())
1388}
1389
1390#[derive(Debug)]
1391pub(crate) struct PartDeletes<T> {
1392 blob_keys: BTreeSet<PartialBatchKey>,
1394 hollow_runs: BTreeMap<PartialBatchKey, HollowRunRef<T>>,
1396}
1397
1398impl<T> Default for PartDeletes<T> {
1399 fn default() -> Self {
1400 Self {
1401 blob_keys: Default::default(),
1402 hollow_runs: Default::default(),
1403 }
1404 }
1405}
1406
1407impl<T: Timestamp> PartDeletes<T> {
1408 pub fn add(&mut self, part: &RunPart<T>) -> bool {
1411 match part {
1412 RunPart::Many(r) => self.hollow_runs.insert(r.key.clone(), r.clone()).is_none(),
1413 RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.insert(x.key.clone()),
1414 RunPart::Single(BatchPart::Inline { .. }) => {
1415 true
1417 }
1418 }
1419 }
1420
1421 pub fn contains(&self, part: &RunPart<T>) -> bool {
1422 match part {
1423 RunPart::Many(r) => self.hollow_runs.contains_key(&r.key),
1424 RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.contains(&x.key),
1425 RunPart::Single(BatchPart::Inline { .. }) => false,
1426 }
1427 }
1428
1429 pub fn is_empty(&self) -> bool {
1430 self.len() == 0
1431 }
1432
1433 pub fn len(&self) -> usize {
1434 match self {
1435 Self {
1436 blob_keys,
1437 hollow_runs,
1438 } => blob_keys.len() + hollow_runs.len(),
1439 }
1440 }
1441
1442 pub async fn delete(
1443 mut self,
1444 blob: &dyn Blob,
1445 shard_id: ShardId,
1446 concurrency: usize,
1447 metrics: &Metrics,
1448 delete_metrics: &RetryMetrics,
1449 ) where
1450 T: Codec64,
1451 {
1452 loop {
1453 let () = stream::iter(mem::take(&mut self.blob_keys))
1454 .map(|key| {
1455 let key = key.complete(&shard_id);
1456 async move {
1457 retry_external(delete_metrics, || blob.delete(&key)).await;
1458 }
1459 })
1460 .buffer_unordered(concurrency)
1461 .collect()
1462 .await;
1463
1464 let Some((run_key, run_ref)) = self.hollow_runs.pop_first() else {
1465 break;
1466 };
1467
1468 if let Some(run) = run_ref.get(shard_id, blob, metrics).await {
1469 for part in &run.parts {
1471 self.add(part);
1472 }
1473 self.blob_keys.insert(run_key);
1474 };
1475 }
1476 }
1477}
1478
1479fn diffs_sum<D: Semigroup + Codec64>(updates: &Int64Array) -> Option<D> {
1481 let mut sum = None;
1482 for d in updates.values().iter() {
1483 let d = D::decode(d.to_le_bytes());
1484 match &mut sum {
1485 None => sum = Some(d),
1486 Some(x) => x.plus_equals(&d),
1487 }
1488 }
1489
1490 sum
1491}
1492
1493#[cfg(test)]
1494mod tests {
1495 use mz_dyncfg::ConfigUpdates;
1496
1497 use super::*;
1498 use crate::PersistLocation;
1499 use crate::cache::PersistClientCache;
1500 use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
1501 use crate::internal::paths::{BlobKey, PartialBlobKey};
1502 use crate::tests::{all_ok, new_test_client};
1503
1504 #[mz_ore::test(tokio::test)]
1505 #[cfg_attr(miri, ignore)] async fn batch_builder_flushing() {
1507 let data = vec![
1508 (("1".to_owned(), "one".to_owned()), 1, 1),
1509 (("2".to_owned(), "two".to_owned()), 2, 1),
1510 (("3".to_owned(), "three".to_owned()), 3, 1),
1511 (("4".to_owned(), "four".to_owned()), 4, 1),
1512 ];
1513
1514 let cache = PersistClientCache::new_no_metrics();
1515
1516 cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1520 cache.cfg.set_config(&MAX_RUNS, 3);
1521 cache
1522 .cfg
1523 .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 2);
1524
1525 let client = cache
1526 .open(PersistLocation::new_in_mem())
1527 .await
1528 .expect("client construction failed");
1529 let (mut write, mut read) = client
1530 .expect_open::<String, String, u64, i64>(ShardId::new())
1531 .await;
1532
1533 let mut builder = write.builder(Antichain::from_elem(0));
1535
1536 fn assert_writing(
1537 builder: &BatchBuilder<String, String, u64, i64>,
1538 expected_finished: &[bool],
1539 ) {
1540 let WritingRuns::Compacting(run) = &builder.builder.parts.writing_runs else {
1541 unreachable!("ordered run!")
1542 };
1543
1544 let actual: Vec<_> = run.iter().map(|(_, p)| p.is_finished()).collect();
1545 assert_eq!(*expected_finished, actual);
1546 }
1547
1548 assert_writing(&builder, &[]);
1549
1550 let ((k, v), t, d) = &data[0];
1553 builder.add(k, v, t, d).await.expect("invalid usage");
1554 assert_writing(&builder, &[false]);
1555
1556 let ((k, v), t, d) = &data[1];
1559 builder.add(k, v, t, d).await.expect("invalid usage");
1560 assert_writing(&builder, &[false, false]);
1561
1562 let ((k, v), t, d) = &data[2];
1565 builder.add(k, v, t, d).await.expect("invalid usage");
1566 assert_writing(&builder, &[true, false, false]);
1567
1568 let ((k, v), t, d) = &data[3];
1571 builder.add(k, v, t, d).await.expect("invalid usage");
1572 assert_writing(&builder, &[false, false]);
1573
1574 let batch = builder
1577 .finish(Antichain::from_elem(5))
1578 .await
1579 .expect("invalid usage");
1580 assert_eq!(batch.batch.runs().count(), 2);
1581 assert_eq!(batch.batch.part_count(), 4);
1582 write
1583 .append_batch(batch, Antichain::from_elem(0), Antichain::from_elem(5))
1584 .await
1585 .expect("invalid usage")
1586 .expect("unexpected upper");
1587 assert_eq!(read.expect_snapshot_and_fetch(4).await, all_ok(&data, 4));
1588 }
1589
1590 #[mz_ore::test(tokio::test)]
1591 #[cfg_attr(miri, ignore)] async fn batch_builder_keys() {
1593 let cache = PersistClientCache::new_no_metrics();
1594 cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1596 cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
1598 cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1599 cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1600 let client = cache
1601 .open(PersistLocation::new_in_mem())
1602 .await
1603 .expect("client construction failed");
1604 let shard_id = ShardId::new();
1605 let (mut write, _) = client
1606 .expect_open::<String, String, u64, i64>(shard_id)
1607 .await;
1608
1609 let batch = write
1610 .expect_batch(
1611 &[
1612 (("1".into(), "one".into()), 1, 1),
1613 (("2".into(), "two".into()), 2, 1),
1614 (("3".into(), "three".into()), 3, 1),
1615 ],
1616 0,
1617 4,
1618 )
1619 .await;
1620
1621 assert_eq!(batch.batch.part_count(), 3);
1622 for part in &batch.batch.parts {
1623 let part = part.expect_hollow_part();
1624 match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
1625 Ok((shard, PartialBlobKey::Batch(writer, _))) => {
1626 assert_eq!(shard.to_string(), shard_id.to_string());
1627 assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
1628 }
1629 _ => panic!("unparseable blob key"),
1630 }
1631 }
1632 }
1633
1634 #[mz_ore::test(tokio::test)]
1635 #[cfg_attr(miri, ignore)] async fn batch_delete() {
1637 let cache = PersistClientCache::new_no_metrics();
1638 cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1639 cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1640 cache.cfg.set_config(&BATCH_DELETE_ENABLED, true);
1641 let client = cache
1642 .open(PersistLocation::new_in_mem())
1643 .await
1644 .expect("client construction failed");
1645 let shard_id = ShardId::new();
1646 let (mut write, _) = client
1647 .expect_open::<String, String, u64, i64>(shard_id)
1648 .await;
1649
1650 let batch = write
1651 .expect_batch(
1652 &[
1653 (("1".into(), "one".into()), 1, 1),
1654 (("2".into(), "two".into()), 2, 1),
1655 (("3".into(), "three".into()), 3, 1),
1656 ],
1657 0,
1658 4,
1659 )
1660 .await;
1661
1662 assert_eq!(batch.batch.part_count(), 1);
1663 let part_key = batch.batch.parts[0]
1664 .expect_hollow_part()
1665 .key
1666 .complete(&shard_id);
1667
1668 let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1669 assert!(part_bytes.is_some());
1670
1671 batch.delete().await;
1672
1673 let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1674 assert!(part_bytes.is_none());
1675 }
1676
1677 #[mz_ore::test]
1678 fn untrimmable_columns() {
1679 let untrimmable = UntrimmableColumns {
1680 equals: vec!["abc".into(), "def".into()],
1681 prefixes: vec!["123".into(), "234".into()],
1682 suffixes: vec!["xyz".into()],
1683 };
1684
1685 assert!(untrimmable.should_retain("abc"));
1687 assert!(untrimmable.should_retain("ABC"));
1688 assert!(untrimmable.should_retain("aBc"));
1689 assert!(!untrimmable.should_retain("abcd"));
1690 assert!(untrimmable.should_retain("deF"));
1691 assert!(!untrimmable.should_retain("defg"));
1692
1693 assert!(untrimmable.should_retain("123"));
1695 assert!(untrimmable.should_retain("123-4"));
1696 assert!(untrimmable.should_retain("1234"));
1697 assert!(untrimmable.should_retain("234"));
1698 assert!(!untrimmable.should_retain("345"));
1699
1700 assert!(untrimmable.should_retain("ijk_xyZ"));
1702 assert!(untrimmable.should_retain("ww-XYZ"));
1703 assert!(!untrimmable.should_retain("xya"));
1704 }
1705
1706 #[mz_persist_proc::test(tokio::test)]
1708 #[cfg_attr(miri, ignore)] async fn rewrite_ts_example(dyncfgs: ConfigUpdates) {
1710 let client = new_test_client(&dyncfgs).await;
1711 let (mut write, read) = client
1712 .expect_open::<String, (), u64, i64>(ShardId::new())
1713 .await;
1714
1715 let mut batch = write.builder(Antichain::from_elem(0));
1716 batch.add(&"foo".to_owned(), &(), &0, &1).await.unwrap();
1717 let batch = batch.finish(Antichain::from_elem(1)).await.unwrap();
1718
1719 let batch = batch.into_transmittable_batch();
1721 let mut batch = write.batch_from_transmittable_batch(batch);
1722 batch
1723 .rewrite_ts(&Antichain::from_elem(2), Antichain::from_elem(3))
1724 .unwrap();
1725 write
1726 .expect_compare_and_append_batch(&mut [&mut batch], 0, 3)
1727 .await;
1728
1729 let (actual, _) = read.expect_listen(0).await.read_until(&3).await;
1730 let expected = vec![(((Ok("foo".to_owned())), Ok(())), 2, 1)];
1731 assert_eq!(actual, expected);
1732 }
1733
1734 #[mz_ore::test(tokio::test)]
1735 #[cfg_attr(miri, ignore)] async fn structured_lowers() {
1737 let cache = PersistClientCache::new_no_metrics();
1738 cache.cfg().set_config(&STRUCTURED_KEY_LOWER_LEN, 1024);
1740 cache.cfg().set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1742 cache.cfg().set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1743 let client = cache
1744 .open(PersistLocation::new_in_mem())
1745 .await
1746 .expect("client construction failed");
1747 let shard_id = ShardId::new();
1748 let (mut write, _) = client
1749 .expect_open::<String, String, u64, i64>(shard_id)
1750 .await;
1751
1752 let batch = write
1753 .expect_batch(
1754 &[
1755 (("1".into(), "one".into()), 1, 1),
1756 (("2".into(), "two".into()), 2, 1),
1757 (("3".into(), "three".into()), 3, 1),
1758 ],
1759 0,
1760 4,
1761 )
1762 .await;
1763
1764 assert_eq!(batch.batch.part_count(), 1);
1765 let [part] = batch.batch.parts.as_slice() else {
1766 panic!("expected single part")
1767 };
1768 assert!(part.structured_key_lower().is_some());
1770 }
1771}