1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::marker::PhantomData;
16use std::mem;
17use std::sync::Arc;
18use std::time::Instant;
19
20use arrow::array::{Array, Int64Array};
21use bytes::Bytes;
22use differential_dataflow::difference::Monoid;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::stream::StreamExt;
26use futures_util::{FutureExt, stream};
27use mz_dyncfg::Config;
28use mz_ore::cast::CastFrom;
29use mz_ore::instrument;
30use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::Blob;
32use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
33use mz_persist_types::columnar::{ColumnDecoder, Schema};
34use mz_persist_types::parquet::{CompressionFormat, EncodingConfig};
35use mz_persist_types::part::{Part, PartBuilder};
36use mz_persist_types::schema::SchemaId;
37use mz_persist_types::stats::{
38 PartStats, TRUNCATE_LEN, TruncateBound, trim_to_budget, truncate_bytes,
39};
40use mz_persist_types::{Codec, Codec64};
41use mz_proto::RustType;
42use mz_timely_util::order::Reverse;
43use proptest_derive::Arbitrary;
44use semver::Version;
45use timely::PartialOrder;
46use timely::order::TotalOrder;
47use timely::progress::{Antichain, Timestamp};
48use tracing::{Instrument, debug_span, trace_span, warn};
49
50use crate::async_runtime::IsolatedRuntime;
51use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
52use crate::error::InvalidUsage;
53use crate::internal::compact::{CompactConfig, Compactor};
54use crate::internal::encoding::{
55 LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, Schemas,
56};
57use crate::internal::machine::retry_external;
58use crate::internal::merge::{MergeTree, Pending};
59use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
60use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
61use crate::internal::state::{
62 BatchPart, ENABLE_INCREMENTAL_COMPACTION, HollowBatch, HollowBatchPart, HollowRun,
63 HollowRunRef, ProtoInlineBatchPart, RunId, RunMeta, RunOrder, RunPart,
64};
65use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns};
66use crate::{PersistConfig, ShardId};
67
68include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs"));
69
70#[derive(Debug)]
76pub struct Batch<K, V, T, D> {
77 pub(crate) batch_delete_enabled: bool,
78 pub(crate) metrics: Arc<Metrics>,
79 pub(crate) shard_metrics: Arc<ShardMetrics>,
80
81 pub(crate) version: Version,
83
84 pub(crate) batch: HollowBatch<T>,
86
87 pub(crate) blob: Arc<dyn Blob>,
89
90 pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
93}
94
95impl<K, V, T, D> Drop for Batch<K, V, T, D> {
96 fn drop(&mut self) {
97 if self.batch.part_count() > 0 {
98 warn!(
99 "un-consumed Batch, with {} parts and dangling blob keys: {:?}",
100 self.batch.part_count(),
101 self.batch
102 .parts
103 .iter()
104 .map(|x| x.printable_name())
105 .collect::<Vec<_>>(),
106 );
107 }
108 }
109}
110
111impl<K, V, T, D> Batch<K, V, T, D>
112where
113 K: Debug + Codec,
114 V: Debug + Codec,
115 T: Timestamp + Lattice + Codec64,
116 D: Monoid + Codec64,
117{
118 pub(crate) fn new(
119 batch_delete_enabled: bool,
120 metrics: Arc<Metrics>,
121 blob: Arc<dyn Blob>,
122 shard_metrics: Arc<ShardMetrics>,
123 version: Version,
124 batch: HollowBatch<T>,
125 ) -> Self {
126 Self {
127 batch_delete_enabled,
128 metrics,
129 shard_metrics,
130 version,
131 batch,
132 blob,
133 _phantom: PhantomData,
134 }
135 }
136
137 pub fn shard_id(&self) -> ShardId {
139 self.shard_metrics.shard_id
140 }
141
142 pub fn upper(&self) -> &Antichain<T> {
144 self.batch.desc.upper()
145 }
146
147 pub fn lower(&self) -> &Antichain<T> {
149 self.batch.desc.lower()
150 }
151
152 pub(crate) fn mark_consumed(&mut self) {
158 self.batch.parts.clear();
159 }
160
161 #[instrument(level = "debug", fields(shard = %self.shard_id()))]
164 pub async fn delete(mut self) {
165 if !self.batch_delete_enabled {
166 self.mark_consumed();
167 return;
168 }
169 let mut deletes = PartDeletes::default();
170 for part in self.batch.parts.drain(..) {
171 deletes.add(&part);
172 }
173 let () = deletes
174 .delete(
175 &*self.blob,
176 self.shard_id(),
177 usize::MAX,
178 &*self.metrics,
179 &*self.metrics.retries.external.batch_delete,
180 )
181 .await;
182 }
183
184 pub fn schemas(&self) -> impl Iterator<Item = SchemaId> + '_ {
186 self.batch.parts.iter().flat_map(|b| b.schema_id())
187 }
188
189 pub fn into_hollow_batch(mut self) -> HollowBatch<T> {
194 let ret = self.batch.clone();
195 self.mark_consumed();
196 ret
197 }
198
199 pub fn into_transmittable_batch(mut self) -> ProtoBatch {
208 let ret = ProtoBatch {
209 shard_id: self.shard_metrics.shard_id.into_proto(),
210 version: self.version.to_string(),
211 batch: Some(self.batch.into_proto()),
212 };
213 self.mark_consumed();
214 ret
215 }
216
217 pub(crate) async fn flush_to_blob(
218 &mut self,
219 cfg: &BatchBuilderConfig,
220 batch_metrics: &BatchWriteMetrics,
221 isolated_runtime: &Arc<IsolatedRuntime>,
222 write_schemas: &Schemas<K, V>,
223 ) {
224 let mut parts = Vec::new();
229 for (run_meta, run_parts) in self.batch.runs() {
230 for part in run_parts {
231 let (updates, ts_rewrite, schema_id) = match part {
232 RunPart::Single(BatchPart::Inline {
233 updates,
234 ts_rewrite,
235 schema_id,
236 deprecated_schema_id: _,
237 }) => (updates, ts_rewrite, schema_id),
238 other @ RunPart::Many(_) | other @ RunPart::Single(BatchPart::Hollow(_)) => {
239 parts.push(other.clone());
240 continue;
241 }
242 };
243 let updates = updates
244 .decode::<T>(&self.metrics.columnar)
245 .expect("valid inline part");
246 let diffs_sum = diffs_sum::<D>(updates.updates.diffs());
247 let mut write_schemas = write_schemas.clone();
248 write_schemas.id = *schema_id;
249
250 let write_span =
251 debug_span!("batch::flush_to_blob", shard = %self.shard_metrics.shard_id)
252 .or_current();
253 let handle = mz_ore::task::spawn(
254 || "batch::flush_to_blob",
255 BatchParts::write_hollow_part(
256 cfg.clone(),
257 Arc::clone(&self.blob),
258 Arc::clone(&self.metrics),
259 Arc::clone(&self.shard_metrics),
260 batch_metrics.clone(),
261 Arc::clone(isolated_runtime),
262 updates,
263 run_meta.order.unwrap_or(RunOrder::Unordered),
264 ts_rewrite.clone(),
265 D::encode(&diffs_sum),
266 write_schemas,
267 )
268 .instrument(write_span),
269 );
270 let part = handle.await.expect("part write task failed");
271 parts.push(RunPart::Single(part));
272 }
273 }
274 self.batch.parts = parts;
275 }
276
277 pub fn encoded_size_bytes(&self) -> usize {
279 self.batch.encoded_size_bytes()
280 }
281}
282
283impl<K, V, T, D> Batch<K, V, T, D>
284where
285 K: Debug + Codec,
286 V: Debug + Codec,
287 T: Timestamp + Lattice + Codec64 + TotalOrder,
288 D: Monoid + Codec64,
289{
290 pub fn rewrite_ts(
324 &mut self,
325 frontier: &Antichain<T>,
326 new_upper: Antichain<T>,
327 ) -> Result<(), InvalidUsage<T>> {
328 self.batch
329 .rewrite_ts(frontier, new_upper)
330 .map_err(InvalidUsage::InvalidRewrite)
331 }
332}
333
334#[derive(Debug)]
336pub enum Added {
337 Record,
339 RecordAndParts,
342}
343
344#[derive(Debug, Clone)]
347pub struct BatchBuilderConfig {
348 writer_key: WriterKey,
349 pub(crate) blob_target_size: usize,
350 pub(crate) batch_delete_enabled: bool,
351 pub(crate) batch_builder_max_outstanding_parts: usize,
352 pub(crate) inline_writes_single_max_bytes: usize,
353 pub(crate) stats_collection_enabled: bool,
354 pub(crate) stats_budget: usize,
355 pub(crate) stats_untrimmable_columns: Arc<UntrimmableColumns>,
356 pub(crate) encoding_config: EncodingConfig,
357 pub(crate) preferred_order: RunOrder,
358 pub(crate) structured_key_lower_len: usize,
359 pub(crate) run_length_limit: usize,
360 pub(crate) enable_incremental_compaction: bool,
361 pub(crate) max_runs: Option<usize>,
365}
366
367pub(crate) const BATCH_DELETE_ENABLED: Config<bool> = Config::new(
369 "persist_batch_delete_enabled",
370 true,
371 "Whether to actually delete blobs when batch delete is called (Materialize).",
372);
373
374pub(crate) const ENCODING_ENABLE_DICTIONARY: Config<bool> = Config::new(
375 "persist_encoding_enable_dictionary",
376 true,
377 "A feature flag to enable dictionary encoding for Parquet data (Materialize).",
378);
379
380pub(crate) const ENCODING_COMPRESSION_FORMAT: Config<&'static str> = Config::new(
381 "persist_encoding_compression_format",
382 "none",
383 "A feature flag to enable compression of Parquet data (Materialize).",
384);
385
386pub(crate) const STRUCTURED_KEY_LOWER_LEN: Config<usize> = Config::new(
387 "persist_batch_structured_key_lower_len",
388 256,
389 "The maximum size in proto bytes of any structured key-lower metadata to preserve. \
390 (If we're unable to fit the lower in budget, or the budget is zero, no metadata is kept.)",
391);
392
393pub(crate) const MAX_RUN_LEN: Config<usize> = Config::new(
394 "persist_batch_max_run_len",
395 usize::MAX,
396 "The maximum length a run can have before it will be spilled as a hollow run \
397 into the blob store.",
398);
399
400pub(crate) const MAX_RUNS: Config<usize> = Config::new(
401 "persist_batch_max_runs",
402 1,
403 "The maximum number of runs a batch builder should generate for user batches. \
404 (Compaction outputs always generate a single run.) \
405 The minimum value is 2; below this, compaction is disabled.",
406);
407
408pub(crate) const BLOB_TARGET_SIZE: Config<usize> = Config::new(
415 "persist_blob_target_size",
416 128 * MiB,
417 "A target maximum size of persist blob payloads in bytes (Materialize).",
418);
419
420pub(crate) const INLINE_WRITES_SINGLE_MAX_BYTES: Config<usize> = Config::new(
421 "persist_inline_writes_single_max_bytes",
422 4096,
423 "The (exclusive) maximum size of a write that persist will inline in metadata.",
424);
425
426pub(crate) const INLINE_WRITES_TOTAL_MAX_BYTES: Config<usize> = Config::new(
427 "persist_inline_writes_total_max_bytes",
428 1 * MiB,
429 "\
430 The (exclusive) maximum total size of inline writes in metadata before \
431 persist will backpressure them by flushing out to s3.",
432);
433
434impl BatchBuilderConfig {
435 pub fn new(value: &PersistConfig, _shard_id: ShardId) -> Self {
437 let writer_key = WriterKey::for_version(&value.build_version);
438
439 let preferred_order = RunOrder::Structured;
440
441 BatchBuilderConfig {
442 writer_key,
443 blob_target_size: BLOB_TARGET_SIZE.get(value).clamp(1, usize::MAX),
444 batch_delete_enabled: BATCH_DELETE_ENABLED.get(value),
445 batch_builder_max_outstanding_parts: BATCH_BUILDER_MAX_OUTSTANDING_PARTS.get(value),
446 inline_writes_single_max_bytes: INLINE_WRITES_SINGLE_MAX_BYTES.get(value),
447 stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value),
448 stats_budget: STATS_BUDGET_BYTES.get(value),
449 stats_untrimmable_columns: Arc::new(untrimmable_columns(value)),
450 encoding_config: EncodingConfig {
451 use_dictionary: ENCODING_ENABLE_DICTIONARY.get(value),
452 compression: CompressionFormat::from_str(&ENCODING_COMPRESSION_FORMAT.get(value)),
453 },
454 preferred_order,
455 structured_key_lower_len: STRUCTURED_KEY_LOWER_LEN.get(value),
456 run_length_limit: MAX_RUN_LEN.get(value).clamp(2, usize::MAX),
457 max_runs: match MAX_RUNS.get(value) {
458 limit @ 2.. => Some(limit),
459 _ => None,
460 },
461 enable_incremental_compaction: ENABLE_INCREMENTAL_COMPACTION.get(value),
462 }
463 }
464}
465
466#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Arbitrary)]
469pub(crate) struct UntrimmableColumns {
470 pub equals: Vec<Cow<'static, str>>,
472 pub prefixes: Vec<Cow<'static, str>>,
474 pub suffixes: Vec<Cow<'static, str>>,
476}
477
478impl UntrimmableColumns {
479 pub(crate) fn should_retain(&self, name: &str) -> bool {
480 let name_lower = name.to_lowercase();
483 for s in &self.equals {
484 if *s == name_lower {
485 return true;
486 }
487 }
488 for s in &self.prefixes {
489 if name_lower.starts_with(s.as_ref()) {
490 return true;
491 }
492 }
493 for s in &self.suffixes {
494 if name_lower.ends_with(s.as_ref()) {
495 return true;
496 }
497 }
498 false
499 }
500}
501
502#[derive(Debug)]
505pub struct BatchBuilder<K, V, T, D>
506where
507 K: Codec,
508 V: Codec,
509 T: Timestamp + Lattice + Codec64,
510{
511 inline_desc: Description<T>,
512 inclusive_upper: Antichain<Reverse<T>>,
513
514 records_builder: PartBuilder<K, K::Schema, V, V::Schema>,
515 pub(crate) builder: BatchBuilderInternal<K, V, T, D>,
516}
517
518impl<K, V, T, D> BatchBuilder<K, V, T, D>
519where
520 K: Debug + Codec,
521 V: Debug + Codec,
522 T: Timestamp + Lattice + Codec64,
523 D: Monoid + Codec64,
524{
525 pub(crate) fn new(
526 builder: BatchBuilderInternal<K, V, T, D>,
527 inline_desc: Description<T>,
528 ) -> Self {
529 let records_builder = PartBuilder::new(
530 builder.write_schemas.key.as_ref(),
531 builder.write_schemas.val.as_ref(),
532 );
533 Self {
534 inline_desc,
535 inclusive_upper: Antichain::new(),
536 records_builder,
537 builder,
538 }
539 }
540
541 pub async fn finish(
546 mut self,
547 registered_upper: Antichain<T>,
548 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
549 if PartialOrder::less_than(®istered_upper, self.inline_desc.lower()) {
550 return Err(InvalidUsage::InvalidBounds {
551 lower: self.inline_desc.lower().clone(),
552 upper: registered_upper,
553 });
554 }
555
556 if PartialOrder::less_equal(self.inline_desc.since(), self.inline_desc.lower()) {
561 for ts in self.inclusive_upper.iter() {
562 if registered_upper.less_equal(&ts.0) {
563 return Err(InvalidUsage::UpdateBeyondUpper {
564 ts: ts.0.clone(),
565 expected_upper: registered_upper.clone(),
566 });
567 }
568 }
569 }
570
571 let updates = self.records_builder.finish();
572 self.builder
573 .flush_part(self.inline_desc.clone(), updates)
574 .await;
575
576 self.builder
577 .finish(Description::new(
578 self.inline_desc.lower().clone(),
579 registered_upper,
580 self.inline_desc.since().clone(),
581 ))
582 .await
583 }
584
585 pub async fn add(
590 &mut self,
591 key: &K,
592 val: &V,
593 ts: &T,
594 diff: &D,
595 ) -> Result<Added, InvalidUsage<T>> {
596 if !self.inline_desc.lower().less_equal(ts) {
597 return Err(InvalidUsage::UpdateNotBeyondLower {
598 ts: ts.clone(),
599 lower: self.inline_desc.lower().clone(),
600 });
601 }
602 self.inclusive_upper.insert(Reverse(ts.clone()));
603
604 let added = {
605 self.records_builder
606 .push(key, val, ts.clone(), diff.clone());
607 if self.records_builder.goodbytes() >= self.builder.parts.cfg.blob_target_size {
608 let part = self.records_builder.finish_and_replace(
609 self.builder.write_schemas.key.as_ref(),
610 self.builder.write_schemas.val.as_ref(),
611 );
612 Some(part)
613 } else {
614 None
615 }
616 };
617
618 let added = if let Some(full_batch) = added {
619 self.builder
620 .flush_part(self.inline_desc.clone(), full_batch)
621 .await;
622 Added::RecordAndParts
623 } else {
624 Added::Record
625 };
626 Ok(added)
627 }
628}
629
630#[derive(Debug)]
631pub(crate) struct BatchBuilderInternal<K, V, T, D>
632where
633 K: Codec,
634 V: Codec,
635 T: Timestamp + Lattice + Codec64,
636{
637 shard_id: ShardId,
638 version: Version,
639 blob: Arc<dyn Blob>,
640 metrics: Arc<Metrics>,
641
642 write_schemas: Schemas<K, V>,
643 parts: BatchParts<T>,
644
645 _phantom: PhantomData<fn(K, V, T, D)>,
648}
649
650impl<K, V, T, D> BatchBuilderInternal<K, V, T, D>
651where
652 K: Debug + Codec,
653 V: Debug + Codec,
654 T: Timestamp + Lattice + Codec64,
655 D: Monoid + Codec64,
656{
657 pub(crate) fn new(
658 _cfg: BatchBuilderConfig,
659 parts: BatchParts<T>,
660 metrics: Arc<Metrics>,
661 write_schemas: Schemas<K, V>,
662 blob: Arc<dyn Blob>,
663 shard_id: ShardId,
664 version: Version,
665 ) -> Self {
666 Self {
667 blob,
668 metrics,
669 write_schemas,
670 parts,
671 shard_id,
672 version,
673 _phantom: PhantomData,
674 }
675 }
676
677 #[instrument(level = "debug", name = "batch::finish", fields(shard = %self.shard_id))]
682 pub async fn finish(
683 self,
684 registered_desc: Description<T>,
685 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
686 let write_run_ids = self.parts.cfg.enable_incremental_compaction;
687 let batch_delete_enabled = self.parts.cfg.batch_delete_enabled;
688 let shard_metrics = Arc::clone(&self.parts.shard_metrics);
689 let runs = self.parts.finish().await;
690
691 let mut run_parts = vec![];
692 let mut run_splits = vec![];
693 let mut run_meta = vec![];
694 let total_updates = runs
695 .iter()
696 .map(|(_, _, num_updates)| num_updates)
697 .sum::<usize>();
698 for (order, parts, num_updates) in runs {
699 if parts.is_empty() {
700 continue;
701 }
702 if run_parts.len() != 0 {
703 run_splits.push(run_parts.len());
704 }
705 run_meta.push(RunMeta {
706 order: Some(order),
707 schema: self.write_schemas.id,
708 deprecated_schema: None,
710 id: if write_run_ids {
711 Some(RunId::new())
712 } else {
713 None
714 },
715 len: if write_run_ids {
716 Some(num_updates)
717 } else {
718 None
719 },
720 meta: MetadataMap::default(),
721 });
722 run_parts.extend(parts);
723 }
724 let desc = registered_desc;
725
726 let batch = Batch::new(
727 batch_delete_enabled,
728 Arc::clone(&self.metrics),
729 self.blob,
730 shard_metrics,
731 self.version,
732 HollowBatch::new(desc, run_parts, total_updates, run_meta, run_splits),
733 );
734
735 Ok(batch)
736 }
737
738 pub async fn flush_part(&mut self, part_desc: Description<T>, columnar: Part) {
744 let num_updates = columnar.len();
745 if num_updates == 0 {
746 return;
747 }
748 let diffs_sum = diffs_sum::<D>(&columnar.diff);
749
750 let start = Instant::now();
751 self.parts
752 .write(&self.write_schemas, part_desc, columnar, diffs_sum)
753 .await;
754 self.metrics
755 .compaction
756 .batch
757 .step_part_writing
758 .inc_by(start.elapsed().as_secs_f64());
759 }
760}
761
762#[derive(Debug, Clone)]
763pub(crate) struct RunWithMeta<T> {
764 pub parts: Vec<RunPart<T>>,
765 pub num_updates: usize,
766}
767
768impl<T> RunWithMeta<T> {
769 pub fn new(parts: Vec<RunPart<T>>, num_updates: usize) -> Self {
770 Self { parts, num_updates }
771 }
772
773 pub fn single(part: RunPart<T>, num_updates: usize) -> Self {
774 Self {
775 parts: vec![part],
776 num_updates,
777 }
778 }
779}
780
781#[derive(Debug)]
782enum WritingRuns<T> {
783 Ordered(RunOrder, MergeTree<Pending<RunWithMeta<T>>>),
787 Compacting(MergeTree<(RunOrder, Pending<RunWithMeta<T>>)>),
790}
791
792#[derive(Debug)]
795pub(crate) struct BatchParts<T> {
796 cfg: BatchBuilderConfig,
797 metrics: Arc<Metrics>,
798 shard_metrics: Arc<ShardMetrics>,
799 shard_id: ShardId,
800 blob: Arc<dyn Blob>,
801 isolated_runtime: Arc<IsolatedRuntime>,
802 next_index: u64,
803 writing_runs: WritingRuns<T>,
804 batch_metrics: BatchWriteMetrics,
805}
806
807impl<T: Timestamp + Codec64> BatchParts<T> {
808 pub(crate) fn new_compacting<K, V, D>(
809 cfg: CompactConfig,
810 desc: Description<T>,
811 runs_per_compaction: usize,
812 metrics: Arc<Metrics>,
813 shard_metrics: Arc<ShardMetrics>,
814 shard_id: ShardId,
815 blob: Arc<dyn Blob>,
816 isolated_runtime: Arc<IsolatedRuntime>,
817 batch_metrics: &BatchWriteMetrics,
818 schemas: Schemas<K, V>,
819 ) -> Self
820 where
821 K: Codec + Debug,
822 V: Codec + Debug,
823 T: Lattice + Send + Sync,
824 D: Monoid + Ord + Codec64 + Send + Sync,
825 {
826 let writing_runs = {
827 let cfg = cfg.clone();
828 let blob = Arc::clone(&blob);
829 let metrics = Arc::clone(&metrics);
830 let shard_metrics = Arc::clone(&shard_metrics);
831 let isolated_runtime = Arc::clone(&isolated_runtime);
832 let runs_per_compaction = runs_per_compaction.clamp(2, 1024);
834
835 let merge_fn = move |parts: Vec<(RunOrder, Pending<RunWithMeta<T>>)>| {
836 let blob = Arc::clone(&blob);
837 let metrics = Arc::clone(&metrics);
838 let shard_metrics = Arc::clone(&shard_metrics);
839 let cfg = cfg.clone();
840 let isolated_runtime = Arc::clone(&isolated_runtime);
841 let write_schemas = schemas.clone();
842 let compact_desc = desc.clone();
843 let handle = mz_ore::task::spawn(
844 || "batch::compact_runs",
845 async move {
846 let runs: Vec<_> = stream::iter(parts)
847 .then(|(order, parts)| async move {
848 let completed_run = parts.into_result().await;
849 (
850 RunMeta {
851 order: Some(order),
852 schema: schemas.id,
853 deprecated_schema: None,
856 id: if cfg.batch.enable_incremental_compaction {
857 Some(RunId::new())
858 } else {
859 None
860 },
861 len: if cfg.batch.enable_incremental_compaction {
862 Some(completed_run.num_updates)
863 } else {
864 None
865 },
866 meta: MetadataMap::default(),
867 },
868 completed_run.parts,
869 )
870 })
871 .collect()
872 .await;
873
874 let run_refs: Vec<_> = runs
875 .iter()
876 .map(|(meta, run)| (&compact_desc, meta, run.as_slice()))
877 .collect();
878
879 let output_batch = Compactor::<K, V, T, D>::compact_runs(
880 &cfg,
881 &shard_id,
882 &compact_desc,
883 run_refs,
884 blob,
885 metrics,
886 shard_metrics,
887 isolated_runtime,
888 write_schemas,
889 )
890 .await
891 .expect("successful compaction");
892
893 assert_eq!(
894 output_batch.run_meta.len(),
895 1,
896 "compaction is guaranteed to emit a single run"
897 );
898 let total_compacted_updates: usize = output_batch.len;
899
900 RunWithMeta::new(output_batch.parts, total_compacted_updates)
901 }
902 .instrument(debug_span!("batch::compact_runs")),
903 );
904 (RunOrder::Structured, Pending::new(handle))
905 };
906 WritingRuns::Compacting(MergeTree::new(runs_per_compaction, merge_fn))
907 };
908 BatchParts {
909 cfg: cfg.batch,
910 metrics,
911 shard_metrics,
912 shard_id,
913 blob,
914 isolated_runtime,
915 next_index: 0,
916 writing_runs,
917 batch_metrics: batch_metrics.clone(),
918 }
919 }
920
921 pub(crate) fn new_ordered<D: Monoid + Codec64>(
922 cfg: BatchBuilderConfig,
923 order: RunOrder,
924 metrics: Arc<Metrics>,
925 shard_metrics: Arc<ShardMetrics>,
926 shard_id: ShardId,
927 blob: Arc<dyn Blob>,
928 isolated_runtime: Arc<IsolatedRuntime>,
929 batch_metrics: &BatchWriteMetrics,
930 ) -> Self {
931 let writing_runs = {
932 let cfg = cfg.clone();
933 let blob = Arc::clone(&blob);
934 let metrics = Arc::clone(&metrics);
935 let writer_key = cfg.writer_key.clone();
936 let run_length_limit = (order == RunOrder::Unordered)
939 .then_some(usize::MAX)
940 .unwrap_or(cfg.run_length_limit);
941 let merge_fn = move |parts: Vec<Pending<RunWithMeta<T>>>| {
942 let blob = Arc::clone(&blob);
943 let writer_key = writer_key.clone();
944 let metrics = Arc::clone(&metrics);
945 let handle = mz_ore::task::spawn(
946 || "batch::spill_run",
947 async move {
948 let completed_runs: Vec<RunWithMeta<T>> = stream::iter(parts)
949 .then(|p| p.into_result())
950 .collect()
951 .await;
952
953 let mut all_run_parts = Vec::new();
954 let mut total_updates = 0;
955
956 for completed_run in completed_runs {
957 all_run_parts.extend(completed_run.parts);
958 total_updates += completed_run.num_updates;
959 }
960
961 let run_ref = HollowRunRef::set::<D>(
962 shard_id,
963 blob.as_ref(),
964 &writer_key,
965 HollowRun {
966 parts: all_run_parts,
967 },
968 &*metrics,
969 )
970 .await;
971
972 RunWithMeta::single(RunPart::Many(run_ref), total_updates)
973 }
974 .instrument(debug_span!("batch::spill_run")),
975 );
976 Pending::new(handle)
977 };
978 WritingRuns::Ordered(order, MergeTree::new(run_length_limit, merge_fn))
979 };
980 BatchParts {
981 cfg,
982 metrics,
983 shard_metrics,
984 shard_id,
985 blob,
986 isolated_runtime,
987 next_index: 0,
988 writing_runs,
989 batch_metrics: batch_metrics.clone(),
990 }
991 }
992
993 pub(crate) fn expected_order(&self) -> RunOrder {
994 match self.writing_runs {
995 WritingRuns::Ordered(order, _) => order,
996 WritingRuns::Compacting(_) => RunOrder::Unordered,
997 }
998 }
999
1000 pub(crate) async fn write<K: Codec, V: Codec, D: Codec64>(
1001 &mut self,
1002 write_schemas: &Schemas<K, V>,
1003 desc: Description<T>,
1004 updates: Part,
1005 diffs_sum: D,
1006 ) {
1007 let batch_metrics = self.batch_metrics.clone();
1008 let index = self.next_index;
1009 self.next_index += 1;
1010 let num_updates = updates.len();
1011 let ts_rewrite = None;
1012 let schema_id = write_schemas.id;
1013
1014 let inline_threshold = self.cfg.inline_writes_single_max_bytes;
1017
1018 let updates = BlobTraceUpdates::from_part(updates);
1019 let (name, write_future) = if updates.goodbytes() < inline_threshold {
1020 let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current();
1021 (
1022 "batch::inline_part",
1023 async move {
1024 let start = Instant::now();
1025 let updates = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
1026 desc: Some(desc.into_proto()),
1027 index: index.into_proto(),
1028 updates: Some(updates.into_proto()),
1029 });
1030 batch_metrics
1031 .step_inline
1032 .inc_by(start.elapsed().as_secs_f64());
1033
1034 RunWithMeta::single(
1035 RunPart::Single(BatchPart::Inline {
1036 updates,
1037 ts_rewrite,
1038 schema_id,
1039 deprecated_schema_id: None,
1041 }),
1042 num_updates,
1043 )
1044 }
1045 .instrument(span)
1046 .boxed(),
1047 )
1048 } else {
1049 let part = BlobTraceBatchPart {
1050 desc,
1051 updates,
1052 index,
1053 };
1054 let cfg = self.cfg.clone();
1055 let blob = Arc::clone(&self.blob);
1056 let metrics = Arc::clone(&self.metrics);
1057 let shard_metrics = Arc::clone(&self.shard_metrics);
1058 let isolated_runtime = Arc::clone(&self.isolated_runtime);
1059 let expected_order = self.expected_order();
1060 let encoded_diffs_sum = D::encode(&diffs_sum);
1061 let write_schemas_clone = write_schemas.clone();
1062 let write_span =
1063 debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current();
1064 (
1065 "batch::write_part",
1066 async move {
1067 let part = BatchParts::write_hollow_part(
1068 cfg,
1069 blob,
1070 metrics,
1071 shard_metrics,
1072 batch_metrics,
1073 isolated_runtime,
1074 part,
1075 expected_order,
1076 ts_rewrite,
1077 encoded_diffs_sum,
1078 write_schemas_clone,
1079 )
1080 .await;
1081 RunWithMeta::single(RunPart::Single(part), num_updates)
1082 }
1083 .instrument(write_span)
1084 .boxed(),
1085 )
1086 };
1087
1088 match &mut self.writing_runs {
1089 WritingRuns::Ordered(_order, run) => {
1090 let part = Pending::new(mz_ore::task::spawn(|| name, write_future));
1091 run.push(part);
1092
1093 for part in run
1096 .iter_mut()
1097 .rev()
1098 .skip(self.cfg.batch_builder_max_outstanding_parts)
1099 .take_while(|p| !p.is_finished())
1100 {
1101 self.batch_metrics.write_stalls.inc();
1102 part.block_until_ready().await;
1103 }
1104 }
1105 WritingRuns::Compacting(batches) => {
1106 let run = Pending::Writing(mz_ore::task::spawn(|| name, write_future));
1107 batches.push((RunOrder::Unordered, run));
1108
1109 let mut part_budget = self.cfg.batch_builder_max_outstanding_parts;
1112 let mut compaction_budget = 1;
1113 for (_, part) in batches
1114 .iter_mut()
1115 .rev()
1116 .skip_while(|(order, _)| match order {
1117 RunOrder::Unordered if part_budget > 0 => {
1118 part_budget -= 1;
1119 true
1120 }
1121 RunOrder::Structured | RunOrder::Codec if compaction_budget > 0 => {
1122 compaction_budget -= 1;
1123 true
1124 }
1125 _ => false,
1126 })
1127 .take_while(|(_, p)| !p.is_finished())
1128 {
1129 self.batch_metrics.write_stalls.inc();
1130 part.block_until_ready().await;
1131 }
1132 }
1133 }
1134 }
1135
1136 async fn write_hollow_part<K: Codec, V: Codec>(
1137 cfg: BatchBuilderConfig,
1138 blob: Arc<dyn Blob>,
1139 metrics: Arc<Metrics>,
1140 shard_metrics: Arc<ShardMetrics>,
1141 batch_metrics: BatchWriteMetrics,
1142 isolated_runtime: Arc<IsolatedRuntime>,
1143 mut updates: BlobTraceBatchPart<T>,
1144 run_order: RunOrder,
1145 ts_rewrite: Option<Antichain<T>>,
1146 diffs_sum: [u8; 8],
1147 write_schemas: Schemas<K, V>,
1148 ) -> BatchPart<T> {
1149 let partial_key = PartialBatchKey::new(&cfg.writer_key, &PartId::new());
1150 let key = partial_key.complete(&shard_metrics.shard_id);
1151 let goodbytes = updates.updates.goodbytes();
1152 let metrics_ = Arc::clone(&metrics);
1153 let schema_id = write_schemas.id;
1154
1155 let (stats, key_lower, structured_key_lower, (buf, encode_time)) = isolated_runtime
1156 .spawn_named(|| "batch::encode_part", async move {
1157 let stats = metrics_.columnar.arrow().measure_part_build(|| {
1159 let stats = if cfg.stats_collection_enabled {
1160 let ext = updates.updates.get_or_make_structured::<K, V>(
1161 write_schemas.key.as_ref(),
1162 write_schemas.val.as_ref(),
1163 );
1164
1165 let key_stats = write_schemas
1166 .key
1167 .decoder_any(ext.key.as_ref())
1168 .expect("decoding just-encoded data")
1169 .stats();
1170
1171 let part_stats = PartStats { key: key_stats };
1172
1173 let trimmed_start = Instant::now();
1175 let mut trimmed_bytes = 0;
1176 let trimmed_stats = LazyPartStats::encode(&part_stats, |s| {
1177 trimmed_bytes = trim_to_budget(s, cfg.stats_budget, |s| {
1178 cfg.stats_untrimmable_columns.should_retain(s)
1179 })
1180 });
1181 let trimmed_duration = trimmed_start.elapsed();
1182 Some((trimmed_stats, trimmed_duration, trimmed_bytes))
1183 } else {
1184 None
1185 };
1186
1187 updates.updates = updates.updates.as_structured::<K, V>(
1189 write_schemas.key.as_ref(),
1190 write_schemas.val.as_ref(),
1191 );
1192
1193 stats
1194 });
1195
1196 let key_lower = if let Some(records) = updates.updates.records() {
1197 let key_bytes = records.keys();
1198 if key_bytes.is_empty() {
1199 &[]
1200 } else if run_order == RunOrder::Codec {
1201 key_bytes.value(0)
1202 } else {
1203 ::arrow::compute::min_binary(key_bytes).expect("min of nonempty array")
1204 }
1205 } else {
1206 &[]
1207 };
1208 let key_lower = truncate_bytes(key_lower, TRUNCATE_LEN, TruncateBound::Lower)
1209 .expect("lower bound always exists");
1210
1211 let structured_key_lower = if cfg.structured_key_lower_len > 0 {
1212 updates.updates.structured().and_then(|ext| {
1213 let min_key = if run_order == RunOrder::Structured {
1214 0
1215 } else {
1216 let ord = ArrayOrd::new(ext.key.as_ref());
1217 (0..ext.key.len())
1218 .min_by_key(|i| ord.at(*i))
1219 .expect("non-empty batch")
1220 };
1221 let lower = ArrayBound::new(Arc::clone(&ext.key), min_key)
1222 .to_proto_lower(cfg.structured_key_lower_len);
1223 if lower.is_none() {
1224 batch_metrics.key_lower_too_big.inc()
1225 }
1226 lower.map(|proto| LazyProto::from(&proto))
1227 })
1228 } else {
1229 None
1230 };
1231
1232 let encode_start = Instant::now();
1233 let mut buf = Vec::new();
1234 updates.encode(&mut buf, &metrics_.columnar, &cfg.encoding_config);
1235
1236 drop(updates);
1238 (
1239 stats,
1240 key_lower,
1241 structured_key_lower,
1242 (Bytes::from(buf), encode_start.elapsed()),
1243 )
1244 })
1245 .instrument(debug_span!("batch::encode_part"))
1246 .await
1247 .expect("part encode task failed");
1248 metrics.codecs.batch.encode_count.inc();
1250 metrics
1251 .codecs
1252 .batch
1253 .encode_seconds
1254 .inc_by(encode_time.as_secs_f64());
1255
1256 let start = Instant::now();
1257 let payload_len = buf.len();
1258 let () = retry_external(&metrics.retries.external.batch_set, || async {
1259 shard_metrics.blob_sets.inc();
1260 blob.set(&key, Bytes::clone(&buf)).await
1261 })
1262 .instrument(trace_span!("batch::set", payload_len))
1263 .await;
1264 batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64());
1265 batch_metrics.bytes.inc_by(u64::cast_from(payload_len));
1266 batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes));
1267 match run_order {
1268 RunOrder::Unordered => batch_metrics.unordered.inc(),
1269 RunOrder::Codec => batch_metrics.codec_order.inc(),
1270 RunOrder::Structured => batch_metrics.structured_order.inc(),
1271 }
1272 let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| {
1273 batch_metrics
1274 .step_stats
1275 .inc_by(stats_step_timing.as_secs_f64());
1276 if trimmed_bytes > 0 {
1277 metrics.pushdown.parts_stats_trimmed_count.inc();
1278 metrics
1279 .pushdown
1280 .parts_stats_trimmed_bytes
1281 .inc_by(u64::cast_from(trimmed_bytes));
1282 }
1283 stats
1284 });
1285
1286 let meta = MetadataMap::default();
1287 BatchPart::Hollow(HollowBatchPart {
1288 key: partial_key,
1289 meta,
1290 encoded_size_bytes: payload_len,
1291 key_lower,
1292 structured_key_lower,
1293 stats,
1294 ts_rewrite,
1295 diffs_sum: Some(diffs_sum),
1296 format: Some(BatchColumnarFormat::Structured),
1297 schema_id,
1298 deprecated_schema_id: None,
1300 })
1301 }
1302
1303 #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))]
1304 pub(crate) async fn finish(self) -> Vec<(RunOrder, Vec<RunPart<T>>, usize)> {
1305 match self.writing_runs {
1306 WritingRuns::Ordered(RunOrder::Unordered, run) => {
1307 let completed_runs = run.finish();
1308 let mut output = Vec::with_capacity(completed_runs.len());
1309 for completed_run in completed_runs {
1310 let completed_run = completed_run.into_result().await;
1311 for part in completed_run.parts {
1313 output.push((RunOrder::Unordered, vec![part], completed_run.num_updates));
1314 }
1315 }
1316 output
1317 }
1318 WritingRuns::Ordered(order, run) => {
1319 let completed_runs = run.finish();
1320 let mut all_parts = Vec::new();
1321 let mut all_update_counts = 0;
1322 for completed_run in completed_runs {
1323 let completed_run = completed_run.into_result().await;
1324 all_parts.extend(completed_run.parts);
1325 all_update_counts += completed_run.num_updates;
1326 }
1327 vec![(order, all_parts, all_update_counts)]
1328 }
1329 WritingRuns::Compacting(batches) => {
1330 let runs = batches.finish();
1331 let mut output = Vec::new();
1332 for (order, run) in runs {
1333 let completed_run = run.into_result().await;
1334 output.push((order, completed_run.parts, completed_run.num_updates));
1335 }
1336 output
1337 }
1338 }
1339 }
1340}
1341
1342pub(crate) fn validate_truncate_batch<T: Timestamp>(
1343 batch: &HollowBatch<T>,
1344 truncate: &Description<T>,
1345 any_batch_rewrite: bool,
1346 validate_part_bounds_on_write: bool,
1347) -> Result<(), InvalidUsage<T>> {
1348 if any_batch_rewrite {
1351 if truncate.upper() != batch.desc.upper() {
1356 return Err(InvalidUsage::InvalidRewrite(format!(
1357 "rewritten batch might have data past {:?} up to {:?}",
1358 truncate.upper().elements(),
1359 batch.desc.upper().elements(),
1360 )));
1361 }
1362 for part in batch.parts.iter() {
1365 let part_lower_bound = part.ts_rewrite().unwrap_or_else(|| batch.desc.lower());
1366 if !PartialOrder::less_equal(truncate.lower(), part_lower_bound) {
1367 return Err(InvalidUsage::InvalidRewrite(format!(
1368 "rewritten batch might have data below {:?} at {:?}",
1369 truncate.lower().elements(),
1370 part_lower_bound.elements(),
1371 )));
1372 }
1373 }
1374 }
1375
1376 if !validate_part_bounds_on_write {
1377 return Ok(());
1378 }
1379
1380 let batch = &batch.desc;
1381 if !PartialOrder::less_equal(batch.lower(), truncate.lower())
1382 || PartialOrder::less_than(batch.upper(), truncate.upper())
1383 {
1384 return Err(InvalidUsage::InvalidBatchBounds {
1385 batch_lower: batch.lower().clone(),
1386 batch_upper: batch.upper().clone(),
1387 append_lower: truncate.lower().clone(),
1388 append_upper: truncate.upper().clone(),
1389 });
1390 }
1391
1392 Ok(())
1393}
1394
1395#[derive(Debug)]
1396pub(crate) struct PartDeletes<T> {
1397 blob_keys: BTreeSet<PartialBatchKey>,
1399 hollow_runs: BTreeMap<PartialBatchKey, HollowRunRef<T>>,
1401}
1402
1403impl<T> Default for PartDeletes<T> {
1404 fn default() -> Self {
1405 Self {
1406 blob_keys: Default::default(),
1407 hollow_runs: Default::default(),
1408 }
1409 }
1410}
1411
1412impl<T: Timestamp> PartDeletes<T> {
1413 pub fn add(&mut self, part: &RunPart<T>) -> bool {
1416 match part {
1417 RunPart::Many(r) => self.hollow_runs.insert(r.key.clone(), r.clone()).is_none(),
1418 RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.insert(x.key.clone()),
1419 RunPart::Single(BatchPart::Inline { .. }) => {
1420 true
1422 }
1423 }
1424 }
1425
1426 pub fn contains(&self, part: &RunPart<T>) -> bool {
1427 match part {
1428 RunPart::Many(r) => self.hollow_runs.contains_key(&r.key),
1429 RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.contains(&x.key),
1430 RunPart::Single(BatchPart::Inline { .. }) => false,
1431 }
1432 }
1433
1434 pub fn is_empty(&self) -> bool {
1435 self.len() == 0
1436 }
1437
1438 pub fn len(&self) -> usize {
1439 match self {
1440 Self {
1441 blob_keys,
1442 hollow_runs,
1443 } => blob_keys.len() + hollow_runs.len(),
1444 }
1445 }
1446
1447 pub async fn delete(
1448 mut self,
1449 blob: &dyn Blob,
1450 shard_id: ShardId,
1451 concurrency: usize,
1452 metrics: &Metrics,
1453 delete_metrics: &RetryMetrics,
1454 ) where
1455 T: Codec64,
1456 {
1457 loop {
1458 let () = stream::iter(mem::take(&mut self.blob_keys))
1459 .map(|key| {
1460 let key = key.complete(&shard_id);
1461 async move {
1462 retry_external(delete_metrics, || blob.delete(&key)).await;
1463 }
1464 })
1465 .buffer_unordered(concurrency)
1466 .collect()
1467 .await;
1468
1469 let Some((run_key, run_ref)) = self.hollow_runs.pop_first() else {
1470 break;
1471 };
1472
1473 if let Some(run) = run_ref.get(shard_id, blob, metrics).await {
1474 for part in &run.parts {
1476 self.add(part);
1477 }
1478 self.blob_keys.insert(run_key);
1479 };
1480 }
1481 }
1482}
1483
1484fn diffs_sum<D: Monoid + Codec64>(updates: &Int64Array) -> D {
1486 let mut sum = D::zero();
1487 for d in updates.values().iter() {
1488 let d = D::decode(d.to_le_bytes());
1489 sum.plus_equals(&d);
1490 }
1491 sum
1492}
1493
1494#[cfg(test)]
1495mod tests {
1496 use mz_dyncfg::ConfigUpdates;
1497
1498 use super::*;
1499 use crate::PersistLocation;
1500 use crate::cache::PersistClientCache;
1501 use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
1502 use crate::internal::paths::{BlobKey, PartialBlobKey};
1503 use crate::tests::{all_ok, new_test_client};
1504
1505 #[mz_ore::test(tokio::test)]
1506 #[cfg_attr(miri, ignore)] async fn batch_builder_flushing() {
1508 let data = vec![
1509 (("1".to_owned(), "one".to_owned()), 1, 1),
1510 (("2".to_owned(), "two".to_owned()), 2, 1),
1511 (("3".to_owned(), "three".to_owned()), 3, 1),
1512 (("4".to_owned(), "four".to_owned()), 4, 1),
1513 ];
1514
1515 let cache = PersistClientCache::new_no_metrics();
1516
1517 cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1521 cache.cfg.set_config(&MAX_RUNS, 3);
1522 cache
1523 .cfg
1524 .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 2);
1525
1526 let client = cache
1527 .open(PersistLocation::new_in_mem())
1528 .await
1529 .expect("client construction failed");
1530 let (mut write, mut read) = client
1531 .expect_open::<String, String, u64, i64>(ShardId::new())
1532 .await;
1533
1534 let mut builder = write.builder(Antichain::from_elem(0));
1536
1537 fn assert_writing(
1538 builder: &BatchBuilder<String, String, u64, i64>,
1539 expected_finished: &[bool],
1540 ) {
1541 let WritingRuns::Compacting(run) = &builder.builder.parts.writing_runs else {
1542 unreachable!("ordered run!")
1543 };
1544
1545 let actual: Vec<_> = run.iter().map(|(_, p)| p.is_finished()).collect();
1546 assert_eq!(*expected_finished, actual);
1547 }
1548
1549 assert_writing(&builder, &[]);
1550
1551 let ((k, v), t, d) = &data[0];
1554 builder.add(k, v, t, d).await.expect("invalid usage");
1555 assert_writing(&builder, &[false]);
1556
1557 let ((k, v), t, d) = &data[1];
1560 builder.add(k, v, t, d).await.expect("invalid usage");
1561 assert_writing(&builder, &[false, false]);
1562
1563 let ((k, v), t, d) = &data[2];
1566 builder.add(k, v, t, d).await.expect("invalid usage");
1567 assert_writing(&builder, &[true, false, false]);
1568
1569 let ((k, v), t, d) = &data[3];
1572 builder.add(k, v, t, d).await.expect("invalid usage");
1573 assert_writing(&builder, &[false, false]);
1574
1575 let batch = builder
1578 .finish(Antichain::from_elem(5))
1579 .await
1580 .expect("invalid usage");
1581 assert_eq!(batch.batch.runs().count(), 2);
1582 assert_eq!(batch.batch.part_count(), 4);
1583 write
1584 .append_batch(batch, Antichain::from_elem(0), Antichain::from_elem(5))
1585 .await
1586 .expect("invalid usage")
1587 .expect("unexpected upper");
1588 assert_eq!(read.expect_snapshot_and_fetch(4).await, all_ok(&data, 4));
1589 }
1590
1591 #[mz_ore::test(tokio::test)]
1592 #[cfg_attr(miri, ignore)] async fn batch_builder_keys() {
1594 let cache = PersistClientCache::new_no_metrics();
1595 cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1597 cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
1599 cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1600 cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1601 let client = cache
1602 .open(PersistLocation::new_in_mem())
1603 .await
1604 .expect("client construction failed");
1605 let shard_id = ShardId::new();
1606 let (mut write, _) = client
1607 .expect_open::<String, String, u64, i64>(shard_id)
1608 .await;
1609
1610 let batch = write
1611 .expect_batch(
1612 &[
1613 (("1".into(), "one".into()), 1, 1),
1614 (("2".into(), "two".into()), 2, 1),
1615 (("3".into(), "three".into()), 3, 1),
1616 ],
1617 0,
1618 4,
1619 )
1620 .await;
1621
1622 assert_eq!(batch.batch.part_count(), 3);
1623 for part in &batch.batch.parts {
1624 let part = part.expect_hollow_part();
1625 match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
1626 Ok((shard, PartialBlobKey::Batch(writer, _))) => {
1627 assert_eq!(shard.to_string(), shard_id.to_string());
1628 assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
1629 }
1630 _ => panic!("unparseable blob key"),
1631 }
1632 }
1633 }
1634
1635 #[mz_ore::test(tokio::test)]
1636 #[cfg_attr(miri, ignore)] async fn batch_delete() {
1638 let cache = PersistClientCache::new_no_metrics();
1639 cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1640 cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1641 cache.cfg.set_config(&BATCH_DELETE_ENABLED, true);
1642 let client = cache
1643 .open(PersistLocation::new_in_mem())
1644 .await
1645 .expect("client construction failed");
1646 let shard_id = ShardId::new();
1647 let (mut write, _) = client
1648 .expect_open::<String, String, u64, i64>(shard_id)
1649 .await;
1650
1651 let batch = write
1652 .expect_batch(
1653 &[
1654 (("1".into(), "one".into()), 1, 1),
1655 (("2".into(), "two".into()), 2, 1),
1656 (("3".into(), "three".into()), 3, 1),
1657 ],
1658 0,
1659 4,
1660 )
1661 .await;
1662
1663 assert_eq!(batch.batch.part_count(), 1);
1664 let part_key = batch.batch.parts[0]
1665 .expect_hollow_part()
1666 .key
1667 .complete(&shard_id);
1668
1669 let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1670 assert!(part_bytes.is_some());
1671
1672 batch.delete().await;
1673
1674 let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1675 assert!(part_bytes.is_none());
1676 }
1677
1678 #[mz_ore::test]
1679 fn untrimmable_columns() {
1680 let untrimmable = UntrimmableColumns {
1681 equals: vec!["abc".into(), "def".into()],
1682 prefixes: vec!["123".into(), "234".into()],
1683 suffixes: vec!["xyz".into()],
1684 };
1685
1686 assert!(untrimmable.should_retain("abc"));
1688 assert!(untrimmable.should_retain("ABC"));
1689 assert!(untrimmable.should_retain("aBc"));
1690 assert!(!untrimmable.should_retain("abcd"));
1691 assert!(untrimmable.should_retain("deF"));
1692 assert!(!untrimmable.should_retain("defg"));
1693
1694 assert!(untrimmable.should_retain("123"));
1696 assert!(untrimmable.should_retain("123-4"));
1697 assert!(untrimmable.should_retain("1234"));
1698 assert!(untrimmable.should_retain("234"));
1699 assert!(!untrimmable.should_retain("345"));
1700
1701 assert!(untrimmable.should_retain("ijk_xyZ"));
1703 assert!(untrimmable.should_retain("ww-XYZ"));
1704 assert!(!untrimmable.should_retain("xya"));
1705 }
1706
1707 #[mz_persist_proc::test(tokio::test)]
1709 #[cfg_attr(miri, ignore)] async fn rewrite_ts_example(dyncfgs: ConfigUpdates) {
1711 let client = new_test_client(&dyncfgs).await;
1712 let (mut write, read) = client
1713 .expect_open::<String, (), u64, i64>(ShardId::new())
1714 .await;
1715
1716 let mut batch = write.builder(Antichain::from_elem(0));
1717 batch.add(&"foo".to_owned(), &(), &0, &1).await.unwrap();
1718 let batch = batch.finish(Antichain::from_elem(1)).await.unwrap();
1719
1720 let batch = batch.into_transmittable_batch();
1722 let mut batch = write.batch_from_transmittable_batch(batch);
1723 batch
1724 .rewrite_ts(&Antichain::from_elem(2), Antichain::from_elem(3))
1725 .unwrap();
1726 write
1727 .expect_compare_and_append_batch(&mut [&mut batch], 0, 3)
1728 .await;
1729
1730 let (actual, _) = read.expect_listen(0).await.read_until(&3).await;
1731 let expected = vec![(((Ok("foo".to_owned())), Ok(())), 2, 1)];
1732 assert_eq!(actual, expected);
1733 }
1734
1735 #[mz_ore::test(tokio::test)]
1736 #[cfg_attr(miri, ignore)] async fn structured_lowers() {
1738 let cache = PersistClientCache::new_no_metrics();
1739 cache.cfg().set_config(&STRUCTURED_KEY_LOWER_LEN, 1024);
1741 cache.cfg().set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1743 cache.cfg().set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1744 let client = cache
1745 .open(PersistLocation::new_in_mem())
1746 .await
1747 .expect("client construction failed");
1748 let shard_id = ShardId::new();
1749 let (mut write, _) = client
1750 .expect_open::<String, String, u64, i64>(shard_id)
1751 .await;
1752
1753 let batch = write
1754 .expect_batch(
1755 &[
1756 (("1".into(), "one".into()), 1, 1),
1757 (("2".into(), "two".into()), 2, 1),
1758 (("3".into(), "three".into()), 3, 1),
1759 ],
1760 0,
1761 4,
1762 )
1763 .await;
1764
1765 assert_eq!(batch.batch.part_count(), 1);
1766 let [part] = batch.batch.parts.as_slice() else {
1767 panic!("expected single part")
1768 };
1769 assert!(part.structured_key_lower().is_some());
1771 }
1772}