1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::marker::PhantomData;
16use std::mem;
17use std::sync::Arc;
18use std::time::Instant;
19
20use arrow::array::{Array, Int64Array};
21use bytes::Bytes;
22use differential_dataflow::difference::Monoid;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::stream::StreamExt;
26use futures_util::{FutureExt, stream};
27use mz_dyncfg::Config;
28use mz_ore::cast::CastFrom;
29use mz_ore::instrument;
30use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::Blob;
32use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
33use mz_persist_types::columnar::{ColumnDecoder, Schema};
34use mz_persist_types::parquet::{CompressionFormat, EncodingConfig};
35use mz_persist_types::part::{Part, PartBuilder};
36use mz_persist_types::schema::SchemaId;
37use mz_persist_types::stats::{
38 PartStats, TRUNCATE_LEN, TruncateBound, trim_to_budget, truncate_bytes,
39};
40use mz_persist_types::{Codec, Codec64};
41use mz_proto::RustType;
42use mz_timely_util::order::Reverse;
43use proptest_derive::Arbitrary;
44use semver::Version;
45use timely::PartialOrder;
46use timely::order::TotalOrder;
47use timely::progress::{Antichain, Timestamp};
48use tracing::{Instrument, debug_span, trace_span, warn};
49
50use crate::async_runtime::IsolatedRuntime;
51use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
52use crate::error::InvalidUsage;
53use crate::internal::compact::{CompactConfig, Compactor};
54use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
55use crate::internal::machine::retry_external;
56use crate::internal::merge::{MergeTree, Pending};
57use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
58use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
59use crate::internal::state::{
60 BatchPart, ENABLE_INCREMENTAL_COMPACTION, HollowBatch, HollowBatchPart, HollowRun,
61 HollowRunRef, ProtoInlineBatchPart, RunId, RunMeta, RunOrder, RunPart,
62};
63use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns};
64use crate::{PersistConfig, ShardId};
65
66include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs"));
67
68#[derive(Debug)]
74pub struct Batch<K, V, T, D> {
75 pub(crate) batch_delete_enabled: bool,
76 pub(crate) metrics: Arc<Metrics>,
77 pub(crate) shard_metrics: Arc<ShardMetrics>,
78
79 pub(crate) version: Version,
81
82 pub(crate) batch: HollowBatch<T>,
84
85 pub(crate) blob: Arc<dyn Blob>,
87
88 pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
91}
92
93impl<K, V, T, D> Drop for Batch<K, V, T, D> {
94 fn drop(&mut self) {
95 if self.batch.part_count() > 0 {
96 warn!(
97 "un-consumed Batch, with {} parts and dangling blob keys: {:?}",
98 self.batch.part_count(),
99 self.batch
100 .parts
101 .iter()
102 .map(|x| x.printable_name())
103 .collect::<Vec<_>>(),
104 );
105 }
106 }
107}
108
109impl<K, V, T, D> Batch<K, V, T, D>
110where
111 K: Debug + Codec,
112 V: Debug + Codec,
113 T: Timestamp + Lattice + Codec64,
114 D: Monoid + Codec64,
115{
116 pub(crate) fn new(
117 batch_delete_enabled: bool,
118 metrics: Arc<Metrics>,
119 blob: Arc<dyn Blob>,
120 shard_metrics: Arc<ShardMetrics>,
121 version: Version,
122 batch: HollowBatch<T>,
123 ) -> Self {
124 Self {
125 batch_delete_enabled,
126 metrics,
127 shard_metrics,
128 version,
129 batch,
130 blob,
131 _phantom: PhantomData,
132 }
133 }
134
135 pub fn shard_id(&self) -> ShardId {
137 self.shard_metrics.shard_id
138 }
139
140 pub fn upper(&self) -> &Antichain<T> {
142 self.batch.desc.upper()
143 }
144
145 pub fn lower(&self) -> &Antichain<T> {
147 self.batch.desc.lower()
148 }
149
150 pub(crate) fn mark_consumed(&mut self) {
156 self.batch.parts.clear();
157 }
158
159 #[instrument(level = "debug", fields(shard = %self.shard_id()))]
162 pub async fn delete(mut self) {
163 if !self.batch_delete_enabled {
164 self.mark_consumed();
165 return;
166 }
167 let mut deletes = PartDeletes::default();
168 for part in self.batch.parts.drain(..) {
169 deletes.add(&part);
170 }
171 let () = deletes
172 .delete(
173 &*self.blob,
174 self.shard_id(),
175 usize::MAX,
176 &*self.metrics,
177 &*self.metrics.retries.external.batch_delete,
178 )
179 .await;
180 }
181
182 pub fn schemas(&self) -> impl Iterator<Item = SchemaId> + '_ {
184 self.batch.parts.iter().flat_map(|b| b.schema_id())
185 }
186
187 pub fn into_hollow_batch(mut self) -> HollowBatch<T> {
192 let ret = self.batch.clone();
193 self.mark_consumed();
194 ret
195 }
196
197 pub fn into_transmittable_batch(mut self) -> ProtoBatch {
206 let ret = ProtoBatch {
207 shard_id: self.shard_metrics.shard_id.into_proto(),
208 version: self.version.to_string(),
209 batch: Some(self.batch.into_proto()),
210 };
211 self.mark_consumed();
212 ret
213 }
214
215 pub(crate) async fn flush_to_blob(
216 &mut self,
217 cfg: &BatchBuilderConfig,
218 batch_metrics: &BatchWriteMetrics,
219 isolated_runtime: &Arc<IsolatedRuntime>,
220 write_schemas: &Schemas<K, V>,
221 ) {
222 let mut parts = Vec::new();
227 for (run_meta, run_parts) in self.batch.runs() {
228 for part in run_parts {
229 let (updates, ts_rewrite, schema_id) = match part {
230 RunPart::Single(BatchPart::Inline {
231 updates,
232 ts_rewrite,
233 schema_id,
234 deprecated_schema_id: _,
235 }) => (updates, ts_rewrite, schema_id),
236 other @ RunPart::Many(_) | other @ RunPart::Single(BatchPart::Hollow(_)) => {
237 parts.push(other.clone());
238 continue;
239 }
240 };
241 let updates = updates
242 .decode::<T>(&self.metrics.columnar)
243 .expect("valid inline part");
244 let diffs_sum = diffs_sum::<D>(updates.updates.diffs());
245 let mut write_schemas = write_schemas.clone();
246 write_schemas.id = *schema_id;
247
248 let write_span =
249 debug_span!("batch::flush_to_blob", shard = %self.shard_metrics.shard_id)
250 .or_current();
251 let handle = mz_ore::task::spawn(
252 || "batch::flush_to_blob",
253 BatchParts::write_hollow_part(
254 cfg.clone(),
255 Arc::clone(&self.blob),
256 Arc::clone(&self.metrics),
257 Arc::clone(&self.shard_metrics),
258 batch_metrics.clone(),
259 Arc::clone(isolated_runtime),
260 updates,
261 run_meta.order.unwrap_or(RunOrder::Unordered),
262 ts_rewrite.clone(),
263 D::encode(&diffs_sum),
264 write_schemas,
265 )
266 .instrument(write_span),
267 );
268 let part = handle.await.expect("part write task failed");
269 parts.push(RunPart::Single(part));
270 }
271 }
272 self.batch.parts = parts;
273 }
274
275 pub fn encoded_size_bytes(&self) -> usize {
277 self.batch.encoded_size_bytes()
278 }
279}
280
281impl<K, V, T, D> Batch<K, V, T, D>
282where
283 K: Debug + Codec,
284 V: Debug + Codec,
285 T: Timestamp + Lattice + Codec64 + TotalOrder,
286 D: Monoid + Codec64,
287{
288 pub fn rewrite_ts(
322 &mut self,
323 frontier: &Antichain<T>,
324 new_upper: Antichain<T>,
325 ) -> Result<(), InvalidUsage<T>> {
326 self.batch
327 .rewrite_ts(frontier, new_upper)
328 .map_err(InvalidUsage::InvalidRewrite)
329 }
330}
331
332#[derive(Debug)]
334pub enum Added {
335 Record,
337 RecordAndParts,
340}
341
342#[derive(Debug, Clone)]
345pub struct BatchBuilderConfig {
346 writer_key: WriterKey,
347 pub(crate) blob_target_size: usize,
348 pub(crate) batch_delete_enabled: bool,
349 pub(crate) batch_builder_max_outstanding_parts: usize,
350 pub(crate) inline_writes_single_max_bytes: usize,
351 pub(crate) stats_collection_enabled: bool,
352 pub(crate) stats_budget: usize,
353 pub(crate) stats_untrimmable_columns: Arc<UntrimmableColumns>,
354 pub(crate) encoding_config: EncodingConfig,
355 pub(crate) preferred_order: RunOrder,
356 pub(crate) structured_key_lower_len: usize,
357 pub(crate) run_length_limit: usize,
358 pub(crate) enable_incremental_compaction: bool,
359 pub(crate) max_runs: Option<usize>,
363}
364
365pub(crate) const BATCH_DELETE_ENABLED: Config<bool> = Config::new(
367 "persist_batch_delete_enabled",
368 true,
369 "Whether to actually delete blobs when batch delete is called (Materialize).",
370);
371
372pub(crate) const ENCODING_ENABLE_DICTIONARY: Config<bool> = Config::new(
373 "persist_encoding_enable_dictionary",
374 true,
375 "A feature flag to enable dictionary encoding for Parquet data (Materialize).",
376);
377
378pub(crate) const ENCODING_COMPRESSION_FORMAT: Config<&'static str> = Config::new(
379 "persist_encoding_compression_format",
380 "none",
381 "A feature flag to enable compression of Parquet data (Materialize).",
382);
383
384pub(crate) const STRUCTURED_KEY_LOWER_LEN: Config<usize> = Config::new(
385 "persist_batch_structured_key_lower_len",
386 256,
387 "The maximum size in proto bytes of any structured key-lower metadata to preserve. \
388 (If we're unable to fit the lower in budget, or the budget is zero, no metadata is kept.)",
389);
390
391pub(crate) const MAX_RUN_LEN: Config<usize> = Config::new(
392 "persist_batch_max_run_len",
393 usize::MAX,
394 "The maximum length a run can have before it will be spilled as a hollow run \
395 into the blob store.",
396);
397
398pub(crate) const MAX_RUNS: Config<usize> = Config::new(
399 "persist_batch_max_runs",
400 1,
401 "The maximum number of runs a batch builder should generate for user batches. \
402 (Compaction outputs always generate a single run.) \
403 The minimum value is 2; below this, compaction is disabled.",
404);
405
406pub(crate) const BLOB_TARGET_SIZE: Config<usize> = Config::new(
413 "persist_blob_target_size",
414 128 * MiB,
415 "A target maximum size of persist blob payloads in bytes (Materialize).",
416);
417
418pub(crate) const INLINE_WRITES_SINGLE_MAX_BYTES: Config<usize> = Config::new(
419 "persist_inline_writes_single_max_bytes",
420 4096,
421 "The (exclusive) maximum size of a write that persist will inline in metadata.",
422);
423
424pub(crate) const INLINE_WRITES_TOTAL_MAX_BYTES: Config<usize> = Config::new(
425 "persist_inline_writes_total_max_bytes",
426 1 * MiB,
427 "\
428 The (exclusive) maximum total size of inline writes in metadata before \
429 persist will backpressure them by flushing out to s3.",
430);
431
432impl BatchBuilderConfig {
433 pub fn new(value: &PersistConfig, _shard_id: ShardId) -> Self {
435 let writer_key = WriterKey::for_version(&value.build_version);
436
437 let preferred_order = RunOrder::Structured;
438
439 BatchBuilderConfig {
440 writer_key,
441 blob_target_size: BLOB_TARGET_SIZE.get(value).clamp(1, usize::MAX),
442 batch_delete_enabled: BATCH_DELETE_ENABLED.get(value),
443 batch_builder_max_outstanding_parts: BATCH_BUILDER_MAX_OUTSTANDING_PARTS.get(value),
444 inline_writes_single_max_bytes: INLINE_WRITES_SINGLE_MAX_BYTES.get(value),
445 stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value),
446 stats_budget: STATS_BUDGET_BYTES.get(value),
447 stats_untrimmable_columns: Arc::new(untrimmable_columns(value)),
448 encoding_config: EncodingConfig {
449 use_dictionary: ENCODING_ENABLE_DICTIONARY.get(value),
450 compression: CompressionFormat::from_str(&ENCODING_COMPRESSION_FORMAT.get(value)),
451 },
452 preferred_order,
453 structured_key_lower_len: STRUCTURED_KEY_LOWER_LEN.get(value),
454 run_length_limit: MAX_RUN_LEN.get(value).clamp(2, usize::MAX),
455 max_runs: match MAX_RUNS.get(value) {
456 limit @ 2.. => Some(limit),
457 _ => None,
458 },
459 enable_incremental_compaction: ENABLE_INCREMENTAL_COMPACTION.get(value),
460 }
461 }
462}
463
464#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Arbitrary)]
467pub(crate) struct UntrimmableColumns {
468 pub equals: Vec<Cow<'static, str>>,
470 pub prefixes: Vec<Cow<'static, str>>,
472 pub suffixes: Vec<Cow<'static, str>>,
474}
475
476impl UntrimmableColumns {
477 pub(crate) fn should_retain(&self, name: &str) -> bool {
478 let name_lower = name.to_lowercase();
481 for s in &self.equals {
482 if *s == name_lower {
483 return true;
484 }
485 }
486 for s in &self.prefixes {
487 if name_lower.starts_with(s.as_ref()) {
488 return true;
489 }
490 }
491 for s in &self.suffixes {
492 if name_lower.ends_with(s.as_ref()) {
493 return true;
494 }
495 }
496 false
497 }
498}
499
500#[derive(Debug)]
503pub struct BatchBuilder<K, V, T, D>
504where
505 K: Codec,
506 V: Codec,
507 T: Timestamp + Lattice + Codec64,
508{
509 inline_desc: Description<T>,
510 inclusive_upper: Antichain<Reverse<T>>,
511
512 records_builder: PartBuilder<K, K::Schema, V, V::Schema>,
513 pub(crate) builder: BatchBuilderInternal<K, V, T, D>,
514}
515
516impl<K, V, T, D> BatchBuilder<K, V, T, D>
517where
518 K: Debug + Codec,
519 V: Debug + Codec,
520 T: Timestamp + Lattice + Codec64,
521 D: Monoid + Codec64,
522{
523 pub(crate) fn new(
524 builder: BatchBuilderInternal<K, V, T, D>,
525 inline_desc: Description<T>,
526 ) -> Self {
527 let records_builder = PartBuilder::new(
528 builder.write_schemas.key.as_ref(),
529 builder.write_schemas.val.as_ref(),
530 );
531 Self {
532 inline_desc,
533 inclusive_upper: Antichain::new(),
534 records_builder,
535 builder,
536 }
537 }
538
539 pub async fn finish(
544 mut self,
545 registered_upper: Antichain<T>,
546 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
547 if PartialOrder::less_than(®istered_upper, self.inline_desc.lower()) {
548 return Err(InvalidUsage::InvalidBounds {
549 lower: self.inline_desc.lower().clone(),
550 upper: registered_upper,
551 });
552 }
553
554 if PartialOrder::less_equal(self.inline_desc.since(), self.inline_desc.lower()) {
559 for ts in self.inclusive_upper.iter() {
560 if registered_upper.less_equal(&ts.0) {
561 return Err(InvalidUsage::UpdateBeyondUpper {
562 ts: ts.0.clone(),
563 expected_upper: registered_upper.clone(),
564 });
565 }
566 }
567 }
568
569 let updates = self.records_builder.finish();
570 self.builder
571 .flush_part(self.inline_desc.clone(), updates)
572 .await;
573
574 self.builder
575 .finish(Description::new(
576 self.inline_desc.lower().clone(),
577 registered_upper,
578 self.inline_desc.since().clone(),
579 ))
580 .await
581 }
582
583 pub async fn add(
588 &mut self,
589 key: &K,
590 val: &V,
591 ts: &T,
592 diff: &D,
593 ) -> Result<Added, InvalidUsage<T>> {
594 if !self.inline_desc.lower().less_equal(ts) {
595 return Err(InvalidUsage::UpdateNotBeyondLower {
596 ts: ts.clone(),
597 lower: self.inline_desc.lower().clone(),
598 });
599 }
600 self.inclusive_upper.insert(Reverse(ts.clone()));
601
602 let added = {
603 self.records_builder
604 .push(key, val, ts.clone(), diff.clone());
605 if self.records_builder.goodbytes() >= self.builder.parts.cfg.blob_target_size {
606 let part = self.records_builder.finish_and_replace(
607 self.builder.write_schemas.key.as_ref(),
608 self.builder.write_schemas.val.as_ref(),
609 );
610 Some(part)
611 } else {
612 None
613 }
614 };
615
616 let added = if let Some(full_batch) = added {
617 self.builder
618 .flush_part(self.inline_desc.clone(), full_batch)
619 .await;
620 Added::RecordAndParts
621 } else {
622 Added::Record
623 };
624 Ok(added)
625 }
626}
627
628#[derive(Debug)]
629pub(crate) struct BatchBuilderInternal<K, V, T, D>
630where
631 K: Codec,
632 V: Codec,
633 T: Timestamp + Lattice + Codec64,
634{
635 shard_id: ShardId,
636 version: Version,
637 blob: Arc<dyn Blob>,
638 metrics: Arc<Metrics>,
639
640 write_schemas: Schemas<K, V>,
641 parts: BatchParts<T>,
642
643 _phantom: PhantomData<fn(K, V, T, D)>,
646}
647
648impl<K, V, T, D> BatchBuilderInternal<K, V, T, D>
649where
650 K: Debug + Codec,
651 V: Debug + Codec,
652 T: Timestamp + Lattice + Codec64,
653 D: Monoid + Codec64,
654{
655 pub(crate) fn new(
656 _cfg: BatchBuilderConfig,
657 parts: BatchParts<T>,
658 metrics: Arc<Metrics>,
659 write_schemas: Schemas<K, V>,
660 blob: Arc<dyn Blob>,
661 shard_id: ShardId,
662 version: Version,
663 ) -> Self {
664 Self {
665 blob,
666 metrics,
667 write_schemas,
668 parts,
669 shard_id,
670 version,
671 _phantom: PhantomData,
672 }
673 }
674
675 #[instrument(level = "debug", name = "batch::finish", fields(shard = %self.shard_id))]
680 pub async fn finish(
681 self,
682 registered_desc: Description<T>,
683 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
684 let write_run_ids = self.parts.cfg.enable_incremental_compaction;
685 let batch_delete_enabled = self.parts.cfg.batch_delete_enabled;
686 let shard_metrics = Arc::clone(&self.parts.shard_metrics);
687 let runs = self.parts.finish().await;
688
689 let mut run_parts = vec![];
690 let mut run_splits = vec![];
691 let mut run_meta = vec![];
692 let total_updates = runs
693 .iter()
694 .map(|(_, _, num_updates)| num_updates)
695 .sum::<usize>();
696 for (order, parts, num_updates) in runs {
697 if parts.is_empty() {
698 continue;
699 }
700 if run_parts.len() != 0 {
701 run_splits.push(run_parts.len());
702 }
703 run_meta.push(RunMeta {
704 order: Some(order),
705 schema: self.write_schemas.id,
706 deprecated_schema: None,
708 id: if write_run_ids {
709 Some(RunId::new())
710 } else {
711 None
712 },
713 len: if write_run_ids {
714 Some(num_updates)
715 } else {
716 None
717 },
718 });
719 run_parts.extend(parts);
720 }
721 let desc = registered_desc;
722
723 let batch = Batch::new(
724 batch_delete_enabled,
725 Arc::clone(&self.metrics),
726 self.blob,
727 shard_metrics,
728 self.version,
729 HollowBatch::new(desc, run_parts, total_updates, run_meta, run_splits),
730 );
731
732 Ok(batch)
733 }
734
735 pub async fn flush_part(&mut self, part_desc: Description<T>, columnar: Part) {
741 let num_updates = columnar.len();
742 if num_updates == 0 {
743 return;
744 }
745 let diffs_sum = diffs_sum::<D>(&columnar.diff);
746
747 let start = Instant::now();
748 self.parts
749 .write(&self.write_schemas, part_desc, columnar, diffs_sum)
750 .await;
751 self.metrics
752 .compaction
753 .batch
754 .step_part_writing
755 .inc_by(start.elapsed().as_secs_f64());
756 }
757}
758
759#[derive(Debug, Clone)]
760pub(crate) struct RunWithMeta<T> {
761 pub parts: Vec<RunPart<T>>,
762 pub num_updates: usize,
763}
764
765impl<T> RunWithMeta<T> {
766 pub fn new(parts: Vec<RunPart<T>>, num_updates: usize) -> Self {
767 Self { parts, num_updates }
768 }
769
770 pub fn single(part: RunPart<T>, num_updates: usize) -> Self {
771 Self {
772 parts: vec![part],
773 num_updates,
774 }
775 }
776}
777
778#[derive(Debug)]
779enum WritingRuns<T> {
780 Ordered(RunOrder, MergeTree<Pending<RunWithMeta<T>>>),
784 Compacting(MergeTree<(RunOrder, Pending<RunWithMeta<T>>)>),
787}
788
789#[derive(Debug)]
792pub(crate) struct BatchParts<T> {
793 cfg: BatchBuilderConfig,
794 metrics: Arc<Metrics>,
795 shard_metrics: Arc<ShardMetrics>,
796 shard_id: ShardId,
797 blob: Arc<dyn Blob>,
798 isolated_runtime: Arc<IsolatedRuntime>,
799 next_index: u64,
800 writing_runs: WritingRuns<T>,
801 batch_metrics: BatchWriteMetrics,
802}
803
804impl<T: Timestamp + Codec64> BatchParts<T> {
805 pub(crate) fn new_compacting<K, V, D>(
806 cfg: CompactConfig,
807 desc: Description<T>,
808 runs_per_compaction: usize,
809 metrics: Arc<Metrics>,
810 shard_metrics: Arc<ShardMetrics>,
811 shard_id: ShardId,
812 blob: Arc<dyn Blob>,
813 isolated_runtime: Arc<IsolatedRuntime>,
814 batch_metrics: &BatchWriteMetrics,
815 schemas: Schemas<K, V>,
816 ) -> Self
817 where
818 K: Codec + Debug,
819 V: Codec + Debug,
820 T: Lattice + Send + Sync,
821 D: Monoid + Ord + Codec64 + Send + Sync,
822 {
823 let writing_runs = {
824 let cfg = cfg.clone();
825 let blob = Arc::clone(&blob);
826 let metrics = Arc::clone(&metrics);
827 let shard_metrics = Arc::clone(&shard_metrics);
828 let isolated_runtime = Arc::clone(&isolated_runtime);
829 let runs_per_compaction = runs_per_compaction.clamp(2, 1024);
831
832 let merge_fn = move |parts: Vec<(RunOrder, Pending<RunWithMeta<T>>)>| {
833 let blob = Arc::clone(&blob);
834 let metrics = Arc::clone(&metrics);
835 let shard_metrics = Arc::clone(&shard_metrics);
836 let cfg = cfg.clone();
837 let isolated_runtime = Arc::clone(&isolated_runtime);
838 let write_schemas = schemas.clone();
839 let compact_desc = desc.clone();
840 let handle = mz_ore::task::spawn(
841 || "batch::compact_runs",
842 async move {
843 let runs: Vec<_> = stream::iter(parts)
844 .then(|(order, parts)| async move {
845 let completed_run = parts.into_result().await;
846 (
847 RunMeta {
848 order: Some(order),
849 schema: schemas.id,
850 deprecated_schema: None,
853 id: if cfg.batch.enable_incremental_compaction {
854 Some(RunId::new())
855 } else {
856 None
857 },
858 len: if cfg.batch.enable_incremental_compaction {
859 Some(completed_run.num_updates)
860 } else {
861 None
862 },
863 },
864 completed_run.parts,
865 )
866 })
867 .collect()
868 .await;
869
870 let run_refs: Vec<_> = runs
871 .iter()
872 .map(|(meta, run)| (&compact_desc, meta, run.as_slice()))
873 .collect();
874
875 let output_batch = Compactor::<K, V, T, D>::compact_runs(
876 &cfg,
877 &shard_id,
878 &compact_desc,
879 run_refs,
880 blob,
881 metrics,
882 shard_metrics,
883 isolated_runtime,
884 write_schemas,
885 )
886 .await
887 .expect("successful compaction");
888
889 assert_eq!(
890 output_batch.run_meta.len(),
891 1,
892 "compaction is guaranteed to emit a single run"
893 );
894 let total_compacted_updates: usize = output_batch.len;
895
896 RunWithMeta::new(output_batch.parts, total_compacted_updates)
897 }
898 .instrument(debug_span!("batch::compact_runs")),
899 );
900 (RunOrder::Structured, Pending::new(handle))
901 };
902 WritingRuns::Compacting(MergeTree::new(runs_per_compaction, merge_fn))
903 };
904 BatchParts {
905 cfg: cfg.batch,
906 metrics,
907 shard_metrics,
908 shard_id,
909 blob,
910 isolated_runtime,
911 next_index: 0,
912 writing_runs,
913 batch_metrics: batch_metrics.clone(),
914 }
915 }
916
917 pub(crate) fn new_ordered<D: Monoid + Codec64>(
918 cfg: BatchBuilderConfig,
919 order: RunOrder,
920 metrics: Arc<Metrics>,
921 shard_metrics: Arc<ShardMetrics>,
922 shard_id: ShardId,
923 blob: Arc<dyn Blob>,
924 isolated_runtime: Arc<IsolatedRuntime>,
925 batch_metrics: &BatchWriteMetrics,
926 ) -> Self {
927 let writing_runs = {
928 let cfg = cfg.clone();
929 let blob = Arc::clone(&blob);
930 let metrics = Arc::clone(&metrics);
931 let writer_key = cfg.writer_key.clone();
932 let run_length_limit = (order == RunOrder::Unordered)
935 .then_some(usize::MAX)
936 .unwrap_or(cfg.run_length_limit);
937 let merge_fn = move |parts: Vec<Pending<RunWithMeta<T>>>| {
938 let blob = Arc::clone(&blob);
939 let writer_key = writer_key.clone();
940 let metrics = Arc::clone(&metrics);
941 let handle = mz_ore::task::spawn(
942 || "batch::spill_run",
943 async move {
944 let completed_runs: Vec<RunWithMeta<T>> = stream::iter(parts)
945 .then(|p| p.into_result())
946 .collect()
947 .await;
948
949 let mut all_run_parts = Vec::new();
950 let mut total_updates = 0;
951
952 for completed_run in completed_runs {
953 all_run_parts.extend(completed_run.parts);
954 total_updates += completed_run.num_updates;
955 }
956
957 let run_ref = HollowRunRef::set::<D>(
958 shard_id,
959 blob.as_ref(),
960 &writer_key,
961 HollowRun {
962 parts: all_run_parts,
963 },
964 &*metrics,
965 )
966 .await;
967
968 RunWithMeta::single(RunPart::Many(run_ref), total_updates)
969 }
970 .instrument(debug_span!("batch::spill_run")),
971 );
972 Pending::new(handle)
973 };
974 WritingRuns::Ordered(order, MergeTree::new(run_length_limit, merge_fn))
975 };
976 BatchParts {
977 cfg,
978 metrics,
979 shard_metrics,
980 shard_id,
981 blob,
982 isolated_runtime,
983 next_index: 0,
984 writing_runs,
985 batch_metrics: batch_metrics.clone(),
986 }
987 }
988
989 pub(crate) fn expected_order(&self) -> RunOrder {
990 match self.writing_runs {
991 WritingRuns::Ordered(order, _) => order,
992 WritingRuns::Compacting(_) => RunOrder::Unordered,
993 }
994 }
995
996 pub(crate) async fn write<K: Codec, V: Codec, D: Codec64>(
997 &mut self,
998 write_schemas: &Schemas<K, V>,
999 desc: Description<T>,
1000 updates: Part,
1001 diffs_sum: D,
1002 ) {
1003 let batch_metrics = self.batch_metrics.clone();
1004 let index = self.next_index;
1005 self.next_index += 1;
1006 let num_updates = updates.len();
1007 let ts_rewrite = None;
1008 let schema_id = write_schemas.id;
1009
1010 let inline_threshold = self.cfg.inline_writes_single_max_bytes;
1013
1014 let updates = BlobTraceUpdates::from_part(updates);
1015 let (name, write_future) = if updates.goodbytes() < inline_threshold {
1016 let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current();
1017 (
1018 "batch::inline_part",
1019 async move {
1020 let start = Instant::now();
1021 let updates = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
1022 desc: Some(desc.into_proto()),
1023 index: index.into_proto(),
1024 updates: Some(updates.into_proto()),
1025 });
1026 batch_metrics
1027 .step_inline
1028 .inc_by(start.elapsed().as_secs_f64());
1029
1030 RunWithMeta::single(
1031 RunPart::Single(BatchPart::Inline {
1032 updates,
1033 ts_rewrite,
1034 schema_id,
1035 deprecated_schema_id: None,
1037 }),
1038 num_updates,
1039 )
1040 }
1041 .instrument(span)
1042 .boxed(),
1043 )
1044 } else {
1045 let part = BlobTraceBatchPart {
1046 desc,
1047 updates,
1048 index,
1049 };
1050 let cfg = self.cfg.clone();
1051 let blob = Arc::clone(&self.blob);
1052 let metrics = Arc::clone(&self.metrics);
1053 let shard_metrics = Arc::clone(&self.shard_metrics);
1054 let isolated_runtime = Arc::clone(&self.isolated_runtime);
1055 let expected_order = self.expected_order();
1056 let encoded_diffs_sum = D::encode(&diffs_sum);
1057 let write_schemas_clone = write_schemas.clone();
1058 let write_span =
1059 debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current();
1060 (
1061 "batch::write_part",
1062 async move {
1063 let part = BatchParts::write_hollow_part(
1064 cfg,
1065 blob,
1066 metrics,
1067 shard_metrics,
1068 batch_metrics,
1069 isolated_runtime,
1070 part,
1071 expected_order,
1072 ts_rewrite,
1073 encoded_diffs_sum,
1074 write_schemas_clone,
1075 )
1076 .await;
1077 RunWithMeta::single(RunPart::Single(part), num_updates)
1078 }
1079 .instrument(write_span)
1080 .boxed(),
1081 )
1082 };
1083
1084 match &mut self.writing_runs {
1085 WritingRuns::Ordered(_order, run) => {
1086 let part = Pending::new(mz_ore::task::spawn(|| name, write_future));
1087 run.push(part);
1088
1089 for part in run
1092 .iter_mut()
1093 .rev()
1094 .skip(self.cfg.batch_builder_max_outstanding_parts)
1095 .take_while(|p| !p.is_finished())
1096 {
1097 self.batch_metrics.write_stalls.inc();
1098 part.block_until_ready().await;
1099 }
1100 }
1101 WritingRuns::Compacting(batches) => {
1102 let run = Pending::Writing(mz_ore::task::spawn(|| name, write_future));
1103 batches.push((RunOrder::Unordered, run));
1104
1105 let mut part_budget = self.cfg.batch_builder_max_outstanding_parts;
1108 let mut compaction_budget = 1;
1109 for (_, part) in batches
1110 .iter_mut()
1111 .rev()
1112 .skip_while(|(order, _)| match order {
1113 RunOrder::Unordered if part_budget > 0 => {
1114 part_budget -= 1;
1115 true
1116 }
1117 RunOrder::Structured | RunOrder::Codec if compaction_budget > 0 => {
1118 compaction_budget -= 1;
1119 true
1120 }
1121 _ => false,
1122 })
1123 .take_while(|(_, p)| !p.is_finished())
1124 {
1125 self.batch_metrics.write_stalls.inc();
1126 part.block_until_ready().await;
1127 }
1128 }
1129 }
1130 }
1131
1132 async fn write_hollow_part<K: Codec, V: Codec>(
1133 cfg: BatchBuilderConfig,
1134 blob: Arc<dyn Blob>,
1135 metrics: Arc<Metrics>,
1136 shard_metrics: Arc<ShardMetrics>,
1137 batch_metrics: BatchWriteMetrics,
1138 isolated_runtime: Arc<IsolatedRuntime>,
1139 mut updates: BlobTraceBatchPart<T>,
1140 run_order: RunOrder,
1141 ts_rewrite: Option<Antichain<T>>,
1142 diffs_sum: [u8; 8],
1143 write_schemas: Schemas<K, V>,
1144 ) -> BatchPart<T> {
1145 let partial_key = PartialBatchKey::new(&cfg.writer_key, &PartId::new());
1146 let key = partial_key.complete(&shard_metrics.shard_id);
1147 let goodbytes = updates.updates.goodbytes();
1148 let metrics_ = Arc::clone(&metrics);
1149 let schema_id = write_schemas.id;
1150
1151 let (stats, key_lower, structured_key_lower, (buf, encode_time)) = isolated_runtime
1152 .spawn_named(|| "batch::encode_part", async move {
1153 let stats = metrics_.columnar.arrow().measure_part_build(|| {
1155 let stats = if cfg.stats_collection_enabled {
1156 let ext = updates.updates.get_or_make_structured::<K, V>(
1157 write_schemas.key.as_ref(),
1158 write_schemas.val.as_ref(),
1159 );
1160
1161 let key_stats = write_schemas
1162 .key
1163 .decoder_any(ext.key.as_ref())
1164 .expect("decoding just-encoded data")
1165 .stats();
1166
1167 let part_stats = PartStats { key: key_stats };
1168
1169 let trimmed_start = Instant::now();
1171 let mut trimmed_bytes = 0;
1172 let trimmed_stats = LazyPartStats::encode(&part_stats, |s| {
1173 trimmed_bytes = trim_to_budget(s, cfg.stats_budget, |s| {
1174 cfg.stats_untrimmable_columns.should_retain(s)
1175 })
1176 });
1177 let trimmed_duration = trimmed_start.elapsed();
1178 Some((trimmed_stats, trimmed_duration, trimmed_bytes))
1179 } else {
1180 None
1181 };
1182
1183 updates.updates = updates.updates.as_structured::<K, V>(
1185 write_schemas.key.as_ref(),
1186 write_schemas.val.as_ref(),
1187 );
1188
1189 stats
1190 });
1191
1192 let key_lower = if let Some(records) = updates.updates.records() {
1193 let key_bytes = records.keys();
1194 if key_bytes.is_empty() {
1195 &[]
1196 } else if run_order == RunOrder::Codec {
1197 key_bytes.value(0)
1198 } else {
1199 ::arrow::compute::min_binary(key_bytes).expect("min of nonempty array")
1200 }
1201 } else {
1202 &[]
1203 };
1204 let key_lower = truncate_bytes(key_lower, TRUNCATE_LEN, TruncateBound::Lower)
1205 .expect("lower bound always exists");
1206
1207 let structured_key_lower = if cfg.structured_key_lower_len > 0 {
1208 updates.updates.structured().and_then(|ext| {
1209 let min_key = if run_order == RunOrder::Structured {
1210 0
1211 } else {
1212 let ord = ArrayOrd::new(ext.key.as_ref());
1213 (0..ext.key.len())
1214 .min_by_key(|i| ord.at(*i))
1215 .expect("non-empty batch")
1216 };
1217 let lower = ArrayBound::new(Arc::clone(&ext.key), min_key)
1218 .to_proto_lower(cfg.structured_key_lower_len);
1219 if lower.is_none() {
1220 batch_metrics.key_lower_too_big.inc()
1221 }
1222 lower.map(|proto| LazyProto::from(&proto))
1223 })
1224 } else {
1225 None
1226 };
1227
1228 let encode_start = Instant::now();
1229 let mut buf = Vec::new();
1230 updates.encode(&mut buf, &metrics_.columnar, &cfg.encoding_config);
1231
1232 drop(updates);
1234 (
1235 stats,
1236 key_lower,
1237 structured_key_lower,
1238 (Bytes::from(buf), encode_start.elapsed()),
1239 )
1240 })
1241 .instrument(debug_span!("batch::encode_part"))
1242 .await
1243 .expect("part encode task failed");
1244 metrics.codecs.batch.encode_count.inc();
1246 metrics
1247 .codecs
1248 .batch
1249 .encode_seconds
1250 .inc_by(encode_time.as_secs_f64());
1251
1252 let start = Instant::now();
1253 let payload_len = buf.len();
1254 let () = retry_external(&metrics.retries.external.batch_set, || async {
1255 shard_metrics.blob_sets.inc();
1256 blob.set(&key, Bytes::clone(&buf)).await
1257 })
1258 .instrument(trace_span!("batch::set", payload_len))
1259 .await;
1260 batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64());
1261 batch_metrics.bytes.inc_by(u64::cast_from(payload_len));
1262 batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes));
1263 match run_order {
1264 RunOrder::Unordered => batch_metrics.unordered.inc(),
1265 RunOrder::Codec => batch_metrics.codec_order.inc(),
1266 RunOrder::Structured => batch_metrics.structured_order.inc(),
1267 }
1268 let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| {
1269 batch_metrics
1270 .step_stats
1271 .inc_by(stats_step_timing.as_secs_f64());
1272 if trimmed_bytes > 0 {
1273 metrics.pushdown.parts_stats_trimmed_count.inc();
1274 metrics
1275 .pushdown
1276 .parts_stats_trimmed_bytes
1277 .inc_by(u64::cast_from(trimmed_bytes));
1278 }
1279 stats
1280 });
1281
1282 BatchPart::Hollow(HollowBatchPart {
1283 key: partial_key,
1284 encoded_size_bytes: payload_len,
1285 key_lower,
1286 structured_key_lower,
1287 stats,
1288 ts_rewrite,
1289 diffs_sum: Some(diffs_sum),
1290 format: Some(BatchColumnarFormat::Structured),
1291 schema_id,
1292 deprecated_schema_id: None,
1294 })
1295 }
1296
1297 #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))]
1298 pub(crate) async fn finish(self) -> Vec<(RunOrder, Vec<RunPart<T>>, usize)> {
1299 match self.writing_runs {
1300 WritingRuns::Ordered(RunOrder::Unordered, run) => {
1301 let completed_runs = run.finish();
1302 let mut output = Vec::with_capacity(completed_runs.len());
1303 for completed_run in completed_runs {
1304 let completed_run = completed_run.into_result().await;
1305 for part in completed_run.parts {
1307 output.push((RunOrder::Unordered, vec![part], completed_run.num_updates));
1308 }
1309 }
1310 output
1311 }
1312 WritingRuns::Ordered(order, run) => {
1313 let completed_runs = run.finish();
1314 let mut all_parts = Vec::new();
1315 let mut all_update_counts = 0;
1316 for completed_run in completed_runs {
1317 let completed_run = completed_run.into_result().await;
1318 all_parts.extend(completed_run.parts);
1319 all_update_counts += completed_run.num_updates;
1320 }
1321 vec![(order, all_parts, all_update_counts)]
1322 }
1323 WritingRuns::Compacting(batches) => {
1324 let runs = batches.finish();
1325 let mut output = Vec::new();
1326 for (order, run) in runs {
1327 let completed_run = run.into_result().await;
1328 output.push((order, completed_run.parts, completed_run.num_updates));
1329 }
1330 output
1331 }
1332 }
1333 }
1334}
1335
1336pub(crate) fn validate_truncate_batch<T: Timestamp>(
1337 batch: &HollowBatch<T>,
1338 truncate: &Description<T>,
1339 any_batch_rewrite: bool,
1340 validate_part_bounds_on_write: bool,
1341) -> Result<(), InvalidUsage<T>> {
1342 if any_batch_rewrite {
1345 if truncate.upper() != batch.desc.upper() {
1350 return Err(InvalidUsage::InvalidRewrite(format!(
1351 "rewritten batch might have data past {:?} up to {:?}",
1352 truncate.upper().elements(),
1353 batch.desc.upper().elements(),
1354 )));
1355 }
1356 for part in batch.parts.iter() {
1359 let part_lower_bound = part.ts_rewrite().unwrap_or_else(|| batch.desc.lower());
1360 if !PartialOrder::less_equal(truncate.lower(), part_lower_bound) {
1361 return Err(InvalidUsage::InvalidRewrite(format!(
1362 "rewritten batch might have data below {:?} at {:?}",
1363 truncate.lower().elements(),
1364 part_lower_bound.elements(),
1365 )));
1366 }
1367 }
1368 }
1369
1370 if !validate_part_bounds_on_write {
1371 return Ok(());
1372 }
1373
1374 let batch = &batch.desc;
1375 if !PartialOrder::less_equal(batch.lower(), truncate.lower())
1376 || PartialOrder::less_than(batch.upper(), truncate.upper())
1377 {
1378 return Err(InvalidUsage::InvalidBatchBounds {
1379 batch_lower: batch.lower().clone(),
1380 batch_upper: batch.upper().clone(),
1381 append_lower: truncate.lower().clone(),
1382 append_upper: truncate.upper().clone(),
1383 });
1384 }
1385
1386 Ok(())
1387}
1388
1389#[derive(Debug)]
1390pub(crate) struct PartDeletes<T> {
1391 blob_keys: BTreeSet<PartialBatchKey>,
1393 hollow_runs: BTreeMap<PartialBatchKey, HollowRunRef<T>>,
1395}
1396
1397impl<T> Default for PartDeletes<T> {
1398 fn default() -> Self {
1399 Self {
1400 blob_keys: Default::default(),
1401 hollow_runs: Default::default(),
1402 }
1403 }
1404}
1405
1406impl<T: Timestamp> PartDeletes<T> {
1407 pub fn add(&mut self, part: &RunPart<T>) -> bool {
1410 match part {
1411 RunPart::Many(r) => self.hollow_runs.insert(r.key.clone(), r.clone()).is_none(),
1412 RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.insert(x.key.clone()),
1413 RunPart::Single(BatchPart::Inline { .. }) => {
1414 true
1416 }
1417 }
1418 }
1419
1420 pub fn contains(&self, part: &RunPart<T>) -> bool {
1421 match part {
1422 RunPart::Many(r) => self.hollow_runs.contains_key(&r.key),
1423 RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.contains(&x.key),
1424 RunPart::Single(BatchPart::Inline { .. }) => false,
1425 }
1426 }
1427
1428 pub fn is_empty(&self) -> bool {
1429 self.len() == 0
1430 }
1431
1432 pub fn len(&self) -> usize {
1433 match self {
1434 Self {
1435 blob_keys,
1436 hollow_runs,
1437 } => blob_keys.len() + hollow_runs.len(),
1438 }
1439 }
1440
1441 pub async fn delete(
1442 mut self,
1443 blob: &dyn Blob,
1444 shard_id: ShardId,
1445 concurrency: usize,
1446 metrics: &Metrics,
1447 delete_metrics: &RetryMetrics,
1448 ) where
1449 T: Codec64,
1450 {
1451 loop {
1452 let () = stream::iter(mem::take(&mut self.blob_keys))
1453 .map(|key| {
1454 let key = key.complete(&shard_id);
1455 async move {
1456 retry_external(delete_metrics, || blob.delete(&key)).await;
1457 }
1458 })
1459 .buffer_unordered(concurrency)
1460 .collect()
1461 .await;
1462
1463 let Some((run_key, run_ref)) = self.hollow_runs.pop_first() else {
1464 break;
1465 };
1466
1467 if let Some(run) = run_ref.get(shard_id, blob, metrics).await {
1468 for part in &run.parts {
1470 self.add(part);
1471 }
1472 self.blob_keys.insert(run_key);
1473 };
1474 }
1475 }
1476}
1477
1478fn diffs_sum<D: Monoid + Codec64>(updates: &Int64Array) -> D {
1480 let mut sum = D::zero();
1481 for d in updates.values().iter() {
1482 let d = D::decode(d.to_le_bytes());
1483 sum.plus_equals(&d);
1484 }
1485 sum
1486}
1487
1488#[cfg(test)]
1489mod tests {
1490 use mz_dyncfg::ConfigUpdates;
1491
1492 use super::*;
1493 use crate::PersistLocation;
1494 use crate::cache::PersistClientCache;
1495 use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
1496 use crate::internal::paths::{BlobKey, PartialBlobKey};
1497 use crate::tests::{all_ok, new_test_client};
1498
1499 #[mz_ore::test(tokio::test)]
1500 #[cfg_attr(miri, ignore)] async fn batch_builder_flushing() {
1502 let data = vec![
1503 (("1".to_owned(), "one".to_owned()), 1, 1),
1504 (("2".to_owned(), "two".to_owned()), 2, 1),
1505 (("3".to_owned(), "three".to_owned()), 3, 1),
1506 (("4".to_owned(), "four".to_owned()), 4, 1),
1507 ];
1508
1509 let cache = PersistClientCache::new_no_metrics();
1510
1511 cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1515 cache.cfg.set_config(&MAX_RUNS, 3);
1516 cache
1517 .cfg
1518 .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 2);
1519
1520 let client = cache
1521 .open(PersistLocation::new_in_mem())
1522 .await
1523 .expect("client construction failed");
1524 let (mut write, mut read) = client
1525 .expect_open::<String, String, u64, i64>(ShardId::new())
1526 .await;
1527
1528 let mut builder = write.builder(Antichain::from_elem(0));
1530
1531 fn assert_writing(
1532 builder: &BatchBuilder<String, String, u64, i64>,
1533 expected_finished: &[bool],
1534 ) {
1535 let WritingRuns::Compacting(run) = &builder.builder.parts.writing_runs else {
1536 unreachable!("ordered run!")
1537 };
1538
1539 let actual: Vec<_> = run.iter().map(|(_, p)| p.is_finished()).collect();
1540 assert_eq!(*expected_finished, actual);
1541 }
1542
1543 assert_writing(&builder, &[]);
1544
1545 let ((k, v), t, d) = &data[0];
1548 builder.add(k, v, t, d).await.expect("invalid usage");
1549 assert_writing(&builder, &[false]);
1550
1551 let ((k, v), t, d) = &data[1];
1554 builder.add(k, v, t, d).await.expect("invalid usage");
1555 assert_writing(&builder, &[false, false]);
1556
1557 let ((k, v), t, d) = &data[2];
1560 builder.add(k, v, t, d).await.expect("invalid usage");
1561 assert_writing(&builder, &[true, false, false]);
1562
1563 let ((k, v), t, d) = &data[3];
1566 builder.add(k, v, t, d).await.expect("invalid usage");
1567 assert_writing(&builder, &[false, false]);
1568
1569 let batch = builder
1572 .finish(Antichain::from_elem(5))
1573 .await
1574 .expect("invalid usage");
1575 assert_eq!(batch.batch.runs().count(), 2);
1576 assert_eq!(batch.batch.part_count(), 4);
1577 write
1578 .append_batch(batch, Antichain::from_elem(0), Antichain::from_elem(5))
1579 .await
1580 .expect("invalid usage")
1581 .expect("unexpected upper");
1582 assert_eq!(read.expect_snapshot_and_fetch(4).await, all_ok(&data, 4));
1583 }
1584
1585 #[mz_ore::test(tokio::test)]
1586 #[cfg_attr(miri, ignore)] async fn batch_builder_keys() {
1588 let cache = PersistClientCache::new_no_metrics();
1589 cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1591 cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
1593 cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1594 cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1595 let client = cache
1596 .open(PersistLocation::new_in_mem())
1597 .await
1598 .expect("client construction failed");
1599 let shard_id = ShardId::new();
1600 let (mut write, _) = client
1601 .expect_open::<String, String, u64, i64>(shard_id)
1602 .await;
1603
1604 let batch = write
1605 .expect_batch(
1606 &[
1607 (("1".into(), "one".into()), 1, 1),
1608 (("2".into(), "two".into()), 2, 1),
1609 (("3".into(), "three".into()), 3, 1),
1610 ],
1611 0,
1612 4,
1613 )
1614 .await;
1615
1616 assert_eq!(batch.batch.part_count(), 3);
1617 for part in &batch.batch.parts {
1618 let part = part.expect_hollow_part();
1619 match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
1620 Ok((shard, PartialBlobKey::Batch(writer, _))) => {
1621 assert_eq!(shard.to_string(), shard_id.to_string());
1622 assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
1623 }
1624 _ => panic!("unparseable blob key"),
1625 }
1626 }
1627 }
1628
1629 #[mz_ore::test(tokio::test)]
1630 #[cfg_attr(miri, ignore)] async fn batch_delete() {
1632 let cache = PersistClientCache::new_no_metrics();
1633 cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1634 cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1635 cache.cfg.set_config(&BATCH_DELETE_ENABLED, true);
1636 let client = cache
1637 .open(PersistLocation::new_in_mem())
1638 .await
1639 .expect("client construction failed");
1640 let shard_id = ShardId::new();
1641 let (mut write, _) = client
1642 .expect_open::<String, String, u64, i64>(shard_id)
1643 .await;
1644
1645 let batch = write
1646 .expect_batch(
1647 &[
1648 (("1".into(), "one".into()), 1, 1),
1649 (("2".into(), "two".into()), 2, 1),
1650 (("3".into(), "three".into()), 3, 1),
1651 ],
1652 0,
1653 4,
1654 )
1655 .await;
1656
1657 assert_eq!(batch.batch.part_count(), 1);
1658 let part_key = batch.batch.parts[0]
1659 .expect_hollow_part()
1660 .key
1661 .complete(&shard_id);
1662
1663 let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1664 assert!(part_bytes.is_some());
1665
1666 batch.delete().await;
1667
1668 let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1669 assert!(part_bytes.is_none());
1670 }
1671
1672 #[mz_ore::test]
1673 fn untrimmable_columns() {
1674 let untrimmable = UntrimmableColumns {
1675 equals: vec!["abc".into(), "def".into()],
1676 prefixes: vec!["123".into(), "234".into()],
1677 suffixes: vec!["xyz".into()],
1678 };
1679
1680 assert!(untrimmable.should_retain("abc"));
1682 assert!(untrimmable.should_retain("ABC"));
1683 assert!(untrimmable.should_retain("aBc"));
1684 assert!(!untrimmable.should_retain("abcd"));
1685 assert!(untrimmable.should_retain("deF"));
1686 assert!(!untrimmable.should_retain("defg"));
1687
1688 assert!(untrimmable.should_retain("123"));
1690 assert!(untrimmable.should_retain("123-4"));
1691 assert!(untrimmable.should_retain("1234"));
1692 assert!(untrimmable.should_retain("234"));
1693 assert!(!untrimmable.should_retain("345"));
1694
1695 assert!(untrimmable.should_retain("ijk_xyZ"));
1697 assert!(untrimmable.should_retain("ww-XYZ"));
1698 assert!(!untrimmable.should_retain("xya"));
1699 }
1700
1701 #[mz_persist_proc::test(tokio::test)]
1703 #[cfg_attr(miri, ignore)] async fn rewrite_ts_example(dyncfgs: ConfigUpdates) {
1705 let client = new_test_client(&dyncfgs).await;
1706 let (mut write, read) = client
1707 .expect_open::<String, (), u64, i64>(ShardId::new())
1708 .await;
1709
1710 let mut batch = write.builder(Antichain::from_elem(0));
1711 batch.add(&"foo".to_owned(), &(), &0, &1).await.unwrap();
1712 let batch = batch.finish(Antichain::from_elem(1)).await.unwrap();
1713
1714 let batch = batch.into_transmittable_batch();
1716 let mut batch = write.batch_from_transmittable_batch(batch);
1717 batch
1718 .rewrite_ts(&Antichain::from_elem(2), Antichain::from_elem(3))
1719 .unwrap();
1720 write
1721 .expect_compare_and_append_batch(&mut [&mut batch], 0, 3)
1722 .await;
1723
1724 let (actual, _) = read.expect_listen(0).await.read_until(&3).await;
1725 let expected = vec![(((Ok("foo".to_owned())), Ok(())), 2, 1)];
1726 assert_eq!(actual, expected);
1727 }
1728
1729 #[mz_ore::test(tokio::test)]
1730 #[cfg_attr(miri, ignore)] async fn structured_lowers() {
1732 let cache = PersistClientCache::new_no_metrics();
1733 cache.cfg().set_config(&STRUCTURED_KEY_LOWER_LEN, 1024);
1735 cache.cfg().set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1737 cache.cfg().set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1738 let client = cache
1739 .open(PersistLocation::new_in_mem())
1740 .await
1741 .expect("client construction failed");
1742 let shard_id = ShardId::new();
1743 let (mut write, _) = client
1744 .expect_open::<String, String, u64, i64>(shard_id)
1745 .await;
1746
1747 let batch = write
1748 .expect_batch(
1749 &[
1750 (("1".into(), "one".into()), 1, 1),
1751 (("2".into(), "two".into()), 2, 1),
1752 (("3".into(), "three".into()), 3, 1),
1753 ],
1754 0,
1755 4,
1756 )
1757 .await;
1758
1759 assert_eq!(batch.batch.part_count(), 1);
1760 let [part] = batch.batch.parts.as_slice() else {
1761 panic!("expected single part")
1762 };
1763 assert!(part.structured_key_lower().is_some());
1765 }
1766}