1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::marker::PhantomData;
16use std::mem;
17use std::sync::Arc;
18use std::time::Instant;
19
20use arrow::array::{Array, Int64Array};
21use bytes::Bytes;
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::stream::StreamExt;
26use futures_util::{FutureExt, stream};
27use mz_dyncfg::Config;
28use mz_ore::cast::CastFrom;
29use mz_ore::instrument;
30use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::Blob;
32use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
33use mz_persist_types::columnar::{ColumnDecoder, Schema};
34use mz_persist_types::parquet::{CompressionFormat, EncodingConfig};
35use mz_persist_types::part::{Part, PartBuilder};
36use mz_persist_types::schema::SchemaId;
37use mz_persist_types::stats::{
38 PartStats, TRUNCATE_LEN, TruncateBound, trim_to_budget, truncate_bytes,
39};
40use mz_persist_types::{Codec, Codec64};
41use mz_proto::RustType;
42use mz_timely_util::order::Reverse;
43use proptest_derive::Arbitrary;
44use semver::Version;
45use timely::PartialOrder;
46use timely::order::TotalOrder;
47use timely::progress::{Antichain, Timestamp};
48use tracing::{Instrument, debug_span, trace_span, warn};
49
50use crate::async_runtime::IsolatedRuntime;
51use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
52use crate::error::InvalidUsage;
53use crate::internal::compact::{CompactConfig, Compactor};
54use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
55use crate::internal::machine::retry_external;
56use crate::internal::merge::{MergeTree, Pending};
57use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
58use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
59use crate::internal::state::{
60 BatchPart, HollowBatch, HollowBatchPart, HollowRun, HollowRunRef, ProtoInlineBatchPart,
61 RunMeta, RunOrder, RunPart,
62};
63use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns};
64use crate::{PersistConfig, ShardId};
65
66include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs"));
67
68#[derive(Debug)]
74pub struct Batch<K, V, T, D> {
75 pub(crate) batch_delete_enabled: bool,
76 pub(crate) metrics: Arc<Metrics>,
77 pub(crate) shard_metrics: Arc<ShardMetrics>,
78
79 pub(crate) version: Version,
81
82 pub(crate) batch: HollowBatch<T>,
84
85 pub(crate) blob: Arc<dyn Blob>,
87
88 pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
91}
92
93impl<K, V, T, D> Drop for Batch<K, V, T, D> {
94 fn drop(&mut self) {
95 if self.batch.part_count() > 0 {
96 warn!(
97 "un-consumed Batch, with {} parts and dangling blob keys: {:?}",
98 self.batch.part_count(),
99 self.batch
100 .parts
101 .iter()
102 .map(|x| x.printable_name())
103 .collect::<Vec<_>>(),
104 );
105 }
106 }
107}
108
109impl<K, V, T, D> Batch<K, V, T, D>
110where
111 K: Debug + Codec,
112 V: Debug + Codec,
113 T: Timestamp + Lattice + Codec64,
114 D: Semigroup + Codec64,
115{
116 pub(crate) fn new(
117 batch_delete_enabled: bool,
118 metrics: Arc<Metrics>,
119 blob: Arc<dyn Blob>,
120 shard_metrics: Arc<ShardMetrics>,
121 version: Version,
122 batch: HollowBatch<T>,
123 ) -> Self {
124 Self {
125 batch_delete_enabled,
126 metrics,
127 shard_metrics,
128 version,
129 batch,
130 blob,
131 _phantom: PhantomData,
132 }
133 }
134
135 pub fn shard_id(&self) -> ShardId {
137 self.shard_metrics.shard_id
138 }
139
140 pub fn upper(&self) -> &Antichain<T> {
142 self.batch.desc.upper()
143 }
144
145 pub fn lower(&self) -> &Antichain<T> {
147 self.batch.desc.lower()
148 }
149
150 pub(crate) fn mark_consumed(&mut self) {
156 self.batch.parts.clear();
157 }
158
159 #[instrument(level = "debug", fields(shard = %self.shard_id()))]
162 pub async fn delete(mut self) {
163 if !self.batch_delete_enabled {
164 self.mark_consumed();
165 return;
166 }
167 let mut deletes = PartDeletes::default();
168 for part in self.batch.parts.drain(..) {
169 deletes.add(&part);
170 }
171 let () = deletes
172 .delete(
173 &*self.blob,
174 self.shard_id(),
175 usize::MAX,
176 &*self.metrics,
177 &*self.metrics.retries.external.batch_delete,
178 )
179 .await;
180 }
181
182 pub fn schemas(&self) -> impl Iterator<Item = SchemaId> + '_ {
184 self.batch.parts.iter().flat_map(|b| b.schema_id())
185 }
186
187 pub fn into_hollow_batch(mut self) -> HollowBatch<T> {
192 let ret = self.batch.clone();
193 self.mark_consumed();
194 ret
195 }
196
197 pub fn into_transmittable_batch(mut self) -> ProtoBatch {
206 let ret = ProtoBatch {
207 shard_id: self.shard_metrics.shard_id.into_proto(),
208 version: self.version.to_string(),
209 batch: Some(self.batch.into_proto()),
210 };
211 self.mark_consumed();
212 ret
213 }
214
215 pub(crate) async fn flush_to_blob(
216 &mut self,
217 cfg: &BatchBuilderConfig,
218 batch_metrics: &BatchWriteMetrics,
219 isolated_runtime: &Arc<IsolatedRuntime>,
220 write_schemas: &Schemas<K, V>,
221 ) {
222 let mut parts = Vec::new();
227 for (run_meta, run_parts) in self.batch.runs() {
228 for part in run_parts {
229 let (updates, ts_rewrite, schema_id) = match part {
230 RunPart::Single(BatchPart::Inline {
231 updates,
232 ts_rewrite,
233 schema_id,
234 deprecated_schema_id: _,
235 }) => (updates, ts_rewrite, schema_id),
236 other @ RunPart::Many(_) | other @ RunPart::Single(BatchPart::Hollow(_)) => {
237 parts.push(other.clone());
238 continue;
239 }
240 };
241 let updates = updates
242 .decode::<T>(&self.metrics.columnar)
243 .expect("valid inline part");
244 let diffs_sum =
245 diffs_sum::<D>(updates.updates.diffs()).expect("inline parts are not empty");
246 let mut write_schemas = write_schemas.clone();
247 write_schemas.id = *schema_id;
248
249 let write_span =
250 debug_span!("batch::flush_to_blob", shard = %self.shard_metrics.shard_id)
251 .or_current();
252 let handle = mz_ore::task::spawn(
253 || "batch::flush_to_blob",
254 BatchParts::write_hollow_part(
255 cfg.clone(),
256 Arc::clone(&self.blob),
257 Arc::clone(&self.metrics),
258 Arc::clone(&self.shard_metrics),
259 batch_metrics.clone(),
260 Arc::clone(isolated_runtime),
261 updates,
262 run_meta.order.unwrap_or(RunOrder::Unordered),
263 ts_rewrite.clone(),
264 D::encode(&diffs_sum),
265 write_schemas,
266 )
267 .instrument(write_span),
268 );
269 let part = handle.await.expect("part write task failed");
270 parts.push(RunPart::Single(part));
271 }
272 }
273 self.batch.parts = parts;
274 }
275}
276
277impl<K, V, T, D> Batch<K, V, T, D>
278where
279 K: Debug + Codec,
280 V: Debug + Codec,
281 T: Timestamp + Lattice + Codec64 + TotalOrder,
282 D: Semigroup + Codec64,
283{
284 pub fn rewrite_ts(
318 &mut self,
319 frontier: &Antichain<T>,
320 new_upper: Antichain<T>,
321 ) -> Result<(), InvalidUsage<T>> {
322 self.batch
323 .rewrite_ts(frontier, new_upper)
324 .map_err(InvalidUsage::InvalidRewrite)
325 }
326}
327
328#[derive(Debug)]
330pub enum Added {
331 Record,
333 RecordAndParts,
336}
337
338#[derive(Debug, Clone)]
341pub struct BatchBuilderConfig {
342 writer_key: WriterKey,
343 pub(crate) blob_target_size: usize,
344 pub(crate) batch_delete_enabled: bool,
345 pub(crate) batch_builder_max_outstanding_parts: usize,
346 pub(crate) inline_writes_single_max_bytes: usize,
347 pub(crate) stats_collection_enabled: bool,
348 pub(crate) stats_budget: usize,
349 pub(crate) stats_untrimmable_columns: Arc<UntrimmableColumns>,
350 pub(crate) encoding_config: EncodingConfig,
351 pub(crate) preferred_order: RunOrder,
352 pub(crate) structured_key_lower_len: usize,
353 pub(crate) run_length_limit: usize,
354 pub(crate) max_runs: Option<usize>,
358}
359
360pub(crate) const BATCH_DELETE_ENABLED: Config<bool> = Config::new(
362 "persist_batch_delete_enabled",
363 true,
364 "Whether to actually delete blobs when batch delete is called (Materialize).",
365);
366
367pub(crate) const ENCODING_ENABLE_DICTIONARY: Config<bool> = Config::new(
368 "persist_encoding_enable_dictionary",
369 false,
370 "A feature flag to enable dictionary encoding for Parquet data (Materialize).",
371);
372
373pub(crate) const ENCODING_COMPRESSION_FORMAT: Config<&'static str> = Config::new(
374 "persist_encoding_compression_format",
375 "none",
376 "A feature flag to enable compression of Parquet data (Materialize).",
377);
378
379pub(crate) const STRUCTURED_KEY_LOWER_LEN: Config<usize> = Config::new(
380 "persist_batch_structured_key_lower_len",
381 256,
382 "The maximum size in proto bytes of any structured key-lower metadata to preserve. \
383 (If we're unable to fit the lower in budget, or the budget is zero, no metadata is kept.)",
384);
385
386pub(crate) const MAX_RUN_LEN: Config<usize> = Config::new(
387 "persist_batch_max_run_len",
388 usize::MAX,
389 "The maximum length a run can have before it will be spilled as a hollow run \
390 into the blob store.",
391);
392
393pub(crate) const MAX_RUNS: Config<usize> = Config::new(
394 "persist_batch_max_runs",
395 1,
396 "The maximum number of runs a batch builder should generate for user batches. \
397 (Compaction outputs always generate a single run.) \
398 The minimum value is 2; below this, compaction is disabled.",
399);
400
401pub(crate) const BLOB_TARGET_SIZE: Config<usize> = Config::new(
408 "persist_blob_target_size",
409 128 * MiB,
410 "A target maximum size of persist blob payloads in bytes (Materialize).",
411);
412
413pub(crate) const INLINE_WRITES_SINGLE_MAX_BYTES: Config<usize> = Config::new(
414 "persist_inline_writes_single_max_bytes",
415 4096,
416 "The (exclusive) maximum size of a write that persist will inline in metadata.",
417);
418
419pub(crate) const INLINE_WRITES_TOTAL_MAX_BYTES: Config<usize> = Config::new(
420 "persist_inline_writes_total_max_bytes",
421 1 * MiB,
422 "\
423 The (exclusive) maximum total size of inline writes in metadata before \
424 persist will backpressure them by flushing out to s3.",
425);
426
427impl BatchBuilderConfig {
428 pub fn new(value: &PersistConfig, _shard_id: ShardId) -> Self {
430 let writer_key = WriterKey::for_version(&value.build_version);
431
432 let preferred_order = RunOrder::Structured;
433
434 BatchBuilderConfig {
435 writer_key,
436 blob_target_size: BLOB_TARGET_SIZE.get(value).clamp(1, usize::MAX),
437 batch_delete_enabled: BATCH_DELETE_ENABLED.get(value),
438 batch_builder_max_outstanding_parts: BATCH_BUILDER_MAX_OUTSTANDING_PARTS.get(value),
439 inline_writes_single_max_bytes: INLINE_WRITES_SINGLE_MAX_BYTES.get(value),
440 stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value),
441 stats_budget: STATS_BUDGET_BYTES.get(value),
442 stats_untrimmable_columns: Arc::new(untrimmable_columns(value)),
443 encoding_config: EncodingConfig {
444 use_dictionary: ENCODING_ENABLE_DICTIONARY.get(value),
445 compression: CompressionFormat::from_str(&ENCODING_COMPRESSION_FORMAT.get(value)),
446 },
447 preferred_order,
448 structured_key_lower_len: STRUCTURED_KEY_LOWER_LEN.get(value),
449 run_length_limit: MAX_RUN_LEN.get(value).clamp(2, usize::MAX),
450 max_runs: match MAX_RUNS.get(value) {
451 limit @ 2.. => Some(limit),
452 _ => None,
453 },
454 }
455 }
456}
457
458#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Arbitrary)]
461pub(crate) struct UntrimmableColumns {
462 pub equals: Vec<Cow<'static, str>>,
464 pub prefixes: Vec<Cow<'static, str>>,
466 pub suffixes: Vec<Cow<'static, str>>,
468}
469
470impl UntrimmableColumns {
471 pub(crate) fn should_retain(&self, name: &str) -> bool {
472 let name_lower = name.to_lowercase();
475 for s in &self.equals {
476 if *s == name_lower {
477 return true;
478 }
479 }
480 for s in &self.prefixes {
481 if name_lower.starts_with(s.as_ref()) {
482 return true;
483 }
484 }
485 for s in &self.suffixes {
486 if name_lower.ends_with(s.as_ref()) {
487 return true;
488 }
489 }
490 false
491 }
492}
493
494#[derive(Debug)]
497pub struct BatchBuilder<K, V, T, D>
498where
499 K: Codec,
500 V: Codec,
501 T: Timestamp + Lattice + Codec64,
502{
503 inline_desc: Description<T>,
504 inclusive_upper: Antichain<Reverse<T>>,
505
506 pub(crate) key_buf: Vec<u8>,
508 pub(crate) val_buf: Vec<u8>,
509
510 records_builder: PartBuilder<K, K::Schema, V, V::Schema>,
511 pub(crate) builder: BatchBuilderInternal<K, V, T, D>,
512}
513
514impl<K, V, T, D> BatchBuilder<K, V, T, D>
515where
516 K: Debug + Codec,
517 V: Debug + Codec,
518 T: Timestamp + Lattice + Codec64,
519 D: Semigroup + Codec64,
520{
521 pub(crate) fn new(
522 builder: BatchBuilderInternal<K, V, T, D>,
523 inline_desc: Description<T>,
524 ) -> Self {
525 let records_builder = PartBuilder::new(
526 builder.write_schemas.key.as_ref(),
527 builder.write_schemas.val.as_ref(),
528 );
529 Self {
530 inline_desc,
531 inclusive_upper: Antichain::new(),
532 key_buf: vec![],
533 val_buf: vec![],
534 records_builder,
535 builder,
536 }
537 }
538
539 pub async fn finish(
544 mut self,
545 registered_upper: Antichain<T>,
546 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
547 if PartialOrder::less_than(®istered_upper, self.inline_desc.lower()) {
548 return Err(InvalidUsage::InvalidBounds {
549 lower: self.inline_desc.lower().clone(),
550 upper: registered_upper,
551 });
552 }
553
554 if PartialOrder::less_equal(self.inline_desc.since(), self.inline_desc.lower()) {
559 for ts in self.inclusive_upper.iter() {
560 if registered_upper.less_equal(&ts.0) {
561 return Err(InvalidUsage::UpdateBeyondUpper {
562 ts: ts.0.clone(),
563 expected_upper: registered_upper.clone(),
564 });
565 }
566 }
567 }
568
569 let updates = self.records_builder.finish();
570 self.builder
571 .flush_part(self.inline_desc.clone(), updates)
572 .await;
573
574 self.builder
575 .finish(Description::new(
576 self.inline_desc.lower().clone(),
577 registered_upper,
578 self.inline_desc.since().clone(),
579 ))
580 .await
581 }
582
583 pub async fn add(
588 &mut self,
589 key: &K,
590 val: &V,
591 ts: &T,
592 diff: &D,
593 ) -> Result<Added, InvalidUsage<T>> {
594 if !self.inline_desc.lower().less_equal(ts) {
595 return Err(InvalidUsage::UpdateNotBeyondLower {
596 ts: ts.clone(),
597 lower: self.inline_desc.lower().clone(),
598 });
599 }
600 self.inclusive_upper.insert(Reverse(ts.clone()));
601
602 let added = {
603 self.records_builder
604 .push(key, val, ts.clone(), diff.clone());
605 if self.records_builder.goodbytes() >= self.builder.parts.cfg.blob_target_size {
606 let part = self.records_builder.finish_and_replace(
607 self.builder.write_schemas.key.as_ref(),
608 self.builder.write_schemas.val.as_ref(),
609 );
610 Some(part)
611 } else {
612 None
613 }
614 };
615
616 let added = if let Some(full_batch) = added {
617 self.builder
618 .flush_part(self.inline_desc.clone(), full_batch)
619 .await;
620 Added::RecordAndParts
621 } else {
622 Added::Record
623 };
624 self.key_buf.clear();
625 self.val_buf.clear();
626 Ok(added)
627 }
628}
629
630#[derive(Debug)]
631pub(crate) struct BatchBuilderInternal<K, V, T, D>
632where
633 K: Codec,
634 V: Codec,
635 T: Timestamp + Lattice + Codec64,
636{
637 shard_id: ShardId,
638 version: Version,
639 blob: Arc<dyn Blob>,
640 metrics: Arc<Metrics>,
641
642 write_schemas: Schemas<K, V>,
643 num_updates: usize,
644 parts: BatchParts<T>,
645
646 _phantom: PhantomData<fn(K, V, T, D)>,
649}
650
651impl<K, V, T, D> BatchBuilderInternal<K, V, T, D>
652where
653 K: Debug + Codec,
654 V: Debug + Codec,
655 T: Timestamp + Lattice + Codec64,
656 D: Semigroup + Codec64,
657{
658 pub(crate) fn new(
659 _cfg: BatchBuilderConfig,
660 parts: BatchParts<T>,
661 metrics: Arc<Metrics>,
662 write_schemas: Schemas<K, V>,
663 blob: Arc<dyn Blob>,
664 shard_id: ShardId,
665 version: Version,
666 ) -> Self {
667 Self {
668 blob,
669 metrics,
670 write_schemas,
671 num_updates: 0,
672 parts,
673 shard_id,
674 version,
675 _phantom: PhantomData,
676 }
677 }
678
679 #[instrument(level = "debug", name = "batch::finish", fields(shard = %self.shard_id))]
684 pub async fn finish(
685 self,
686 registered_desc: Description<T>,
687 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
688 let batch_delete_enabled = self.parts.cfg.batch_delete_enabled;
689 let shard_metrics = Arc::clone(&self.parts.shard_metrics);
690 let runs = self.parts.finish().await;
691
692 let mut run_parts = vec![];
693 let mut run_splits = vec![];
694 let mut run_meta = vec![];
695 for (order, parts) in runs {
696 if parts.is_empty() {
697 continue;
698 }
699 if run_parts.len() != 0 {
700 run_splits.push(run_parts.len());
701 }
702 run_meta.push(RunMeta {
703 order: Some(order),
704 schema: self.write_schemas.id,
705 deprecated_schema: None,
707 });
708 run_parts.extend(parts);
709 }
710 let desc = registered_desc;
711
712 let batch = Batch::new(
713 batch_delete_enabled,
714 Arc::clone(&self.metrics),
715 self.blob,
716 shard_metrics,
717 self.version,
718 HollowBatch::new(desc, run_parts, self.num_updates, run_meta, run_splits),
719 );
720
721 Ok(batch)
722 }
723
724 pub async fn flush_part(&mut self, part_desc: Description<T>, columnar: Part) {
730 let num_updates = columnar.len();
731 if num_updates == 0 {
732 return;
733 }
734 let diffs_sum = diffs_sum::<D>(&columnar.diff).expect("part is non empty");
735
736 let start = Instant::now();
737 self.parts
738 .write(&self.write_schemas, part_desc, columnar, diffs_sum)
739 .await;
740 self.metrics
741 .compaction
742 .batch
743 .step_part_writing
744 .inc_by(start.elapsed().as_secs_f64());
745
746 self.num_updates += num_updates;
747 }
748}
749
750#[derive(Debug)]
751enum WritingRuns<T> {
752 Ordered(RunOrder, MergeTree<Pending<RunPart<T>>>),
756 Compacting(MergeTree<(RunOrder, Pending<Vec<RunPart<T>>>)>),
759}
760
761#[derive(Debug)]
764pub(crate) struct BatchParts<T> {
765 cfg: BatchBuilderConfig,
766 metrics: Arc<Metrics>,
767 shard_metrics: Arc<ShardMetrics>,
768 shard_id: ShardId,
769 blob: Arc<dyn Blob>,
770 isolated_runtime: Arc<IsolatedRuntime>,
771 next_index: u64,
772 writing_runs: WritingRuns<T>,
773 batch_metrics: BatchWriteMetrics,
774}
775
776impl<T: Timestamp + Codec64> BatchParts<T> {
777 pub(crate) fn new_compacting<K, V, D>(
778 cfg: CompactConfig,
779 desc: Description<T>,
780 runs_per_compaction: usize,
781 metrics: Arc<Metrics>,
782 shard_metrics: Arc<ShardMetrics>,
783 shard_id: ShardId,
784 blob: Arc<dyn Blob>,
785 isolated_runtime: Arc<IsolatedRuntime>,
786 batch_metrics: &BatchWriteMetrics,
787 schemas: Schemas<K, V>,
788 ) -> Self
789 where
790 K: Codec + Debug,
791 V: Codec + Debug,
792 T: Lattice + Send + Sync,
793 D: Semigroup + Ord + Codec64 + Send + Sync,
794 {
795 let writing_runs = {
796 let cfg = cfg.clone();
797 let blob = Arc::clone(&blob);
798 let metrics = Arc::clone(&metrics);
799 let shard_metrics = Arc::clone(&shard_metrics);
800 let isolated_runtime = Arc::clone(&isolated_runtime);
801 let runs_per_compaction = runs_per_compaction.clamp(2, 1024);
803
804 let merge_fn = move |parts: Vec<(RunOrder, Pending<Vec<RunPart<T>>>)>| {
805 let blob = Arc::clone(&blob);
806 let metrics = Arc::clone(&metrics);
807 let shard_metrics = Arc::clone(&shard_metrics);
808 let cfg = cfg.clone();
809 let isolated_runtime = Arc::clone(&isolated_runtime);
810 let write_schemas = schemas.clone();
811 let compact_desc = desc.clone();
812 let handle = mz_ore::task::spawn(
813 || "batch::compact_runs",
814 async move {
815 let runs: Vec<_> = stream::iter(parts)
816 .then(|(order, parts)| async move {
817 (
818 RunMeta {
819 order: Some(order),
820 schema: schemas.id,
821 deprecated_schema: None,
824 },
825 parts.into_result().await,
826 )
827 })
828 .collect()
829 .await;
830
831 let run_refs: Vec<_> = runs
832 .iter()
833 .map(|(meta, run)| (&compact_desc, meta, run.as_slice()))
834 .collect();
835
836 let output_batch = Compactor::<K, V, T, D>::compact_runs(
837 &cfg,
838 &shard_id,
839 &compact_desc,
840 run_refs,
841 blob,
842 metrics,
843 shard_metrics,
844 isolated_runtime,
845 write_schemas,
846 )
847 .await
848 .expect("successful compaction");
849
850 assert_eq!(
851 output_batch.run_meta.len(),
852 1,
853 "compaction is guaranteed to emit a single run"
854 );
855 output_batch.parts
856 }
857 .instrument(debug_span!("batch::compact_runs")),
858 );
859 (RunOrder::Structured, Pending::new(handle))
860 };
861 WritingRuns::Compacting(MergeTree::new(runs_per_compaction, merge_fn))
862 };
863 BatchParts {
864 cfg: cfg.batch,
865 metrics,
866 shard_metrics,
867 shard_id,
868 blob,
869 isolated_runtime,
870 next_index: 0,
871 writing_runs,
872 batch_metrics: batch_metrics.clone(),
873 }
874 }
875
876 pub(crate) fn new_ordered(
877 cfg: BatchBuilderConfig,
878 order: RunOrder,
879 metrics: Arc<Metrics>,
880 shard_metrics: Arc<ShardMetrics>,
881 shard_id: ShardId,
882 blob: Arc<dyn Blob>,
883 isolated_runtime: Arc<IsolatedRuntime>,
884 batch_metrics: &BatchWriteMetrics,
885 ) -> Self {
886 let writing_runs = {
887 let cfg = cfg.clone();
888 let blob = Arc::clone(&blob);
889 let metrics = Arc::clone(&metrics);
890 let writer_key = cfg.writer_key.clone();
891 let run_length_limit = (order == RunOrder::Unordered)
894 .then_some(usize::MAX)
895 .unwrap_or(cfg.run_length_limit);
896 let merge_fn = move |parts| {
897 let blob = Arc::clone(&blob);
898 let writer_key = writer_key.clone();
899 let metrics = Arc::clone(&metrics);
900 let handle = mz_ore::task::spawn(
901 || "batch::spill_run",
902 async move {
903 let parts = stream::iter(parts)
904 .then(|p: Pending<RunPart<T>>| p.into_result())
905 .collect()
906 .await;
907 let run_ref = HollowRunRef::set(
908 shard_id,
909 blob.as_ref(),
910 &writer_key,
911 HollowRun { parts },
912 &*metrics,
913 )
914 .await;
915
916 RunPart::Many(run_ref)
917 }
918 .instrument(debug_span!("batch::spill_run")),
919 );
920 Pending::new(handle)
921 };
922 WritingRuns::Ordered(order, MergeTree::new(run_length_limit, merge_fn))
923 };
924 BatchParts {
925 cfg,
926 metrics,
927 shard_metrics,
928 shard_id,
929 blob,
930 isolated_runtime,
931 next_index: 0,
932 writing_runs,
933 batch_metrics: batch_metrics.clone(),
934 }
935 }
936
937 pub(crate) fn expected_order(&self) -> RunOrder {
938 match self.writing_runs {
939 WritingRuns::Ordered(order, _) => order,
940 WritingRuns::Compacting(_) => RunOrder::Unordered,
941 }
942 }
943
944 pub(crate) async fn write<K: Codec, V: Codec, D: Codec64>(
945 &mut self,
946 write_schemas: &Schemas<K, V>,
947 desc: Description<T>,
948 updates: Part,
949 diffs_sum: D,
950 ) {
951 let batch_metrics = self.batch_metrics.clone();
952 let index = self.next_index;
953 self.next_index += 1;
954 let ts_rewrite = None;
955 let schema_id = write_schemas.id;
956
957 let inline_threshold = self.cfg.inline_writes_single_max_bytes;
960
961 let updates = BlobTraceUpdates::from_part(updates);
962 let (name, write_future) = if updates.goodbytes() < inline_threshold {
963 let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current();
964 (
965 "batch::inline_part",
966 async move {
967 let start = Instant::now();
968 let updates = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
969 desc: Some(desc.into_proto()),
970 index: index.into_proto(),
971 updates: Some(updates.into_proto()),
972 });
973 batch_metrics
974 .step_inline
975 .inc_by(start.elapsed().as_secs_f64());
976
977 RunPart::Single(BatchPart::Inline {
978 updates,
979 ts_rewrite,
980 schema_id,
981 deprecated_schema_id: None,
983 })
984 }
985 .instrument(span)
986 .boxed(),
987 )
988 } else {
989 let part = BlobTraceBatchPart {
990 desc,
991 updates,
992 index,
993 };
994 let write_span =
995 debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current();
996 (
997 "batch::write_part",
998 BatchParts::write_hollow_part(
999 self.cfg.clone(),
1000 Arc::clone(&self.blob),
1001 Arc::clone(&self.metrics),
1002 Arc::clone(&self.shard_metrics),
1003 batch_metrics.clone(),
1004 Arc::clone(&self.isolated_runtime),
1005 part,
1006 self.expected_order(),
1007 ts_rewrite,
1008 D::encode(&diffs_sum),
1009 write_schemas.clone(),
1010 )
1011 .map(RunPart::Single)
1012 .instrument(write_span)
1013 .boxed(),
1014 )
1015 };
1016
1017 match &mut self.writing_runs {
1018 WritingRuns::Ordered(_order, run) => {
1019 let part = Pending::new(mz_ore::task::spawn(|| name, write_future));
1020 run.push(part);
1021
1022 for part in run
1025 .iter_mut()
1026 .rev()
1027 .skip(self.cfg.batch_builder_max_outstanding_parts)
1028 .take_while(|p| !p.is_finished())
1029 {
1030 self.batch_metrics.write_stalls.inc();
1031 part.block_until_ready().await;
1032 }
1033 }
1034 WritingRuns::Compacting(batches) => {
1035 let run = Pending::Writing(mz_ore::task::spawn(|| name, async move {
1036 vec![write_future.await]
1037 }));
1038 batches.push((RunOrder::Unordered, run));
1039
1040 let mut part_budget = self.cfg.batch_builder_max_outstanding_parts;
1043 let mut compaction_budget = 1;
1044 for (_, part) in batches
1045 .iter_mut()
1046 .rev()
1047 .skip_while(|(order, _)| match order {
1048 RunOrder::Unordered if part_budget > 0 => {
1049 part_budget -= 1;
1050 true
1051 }
1052 RunOrder::Structured | RunOrder::Codec if compaction_budget > 0 => {
1053 compaction_budget -= 1;
1054 true
1055 }
1056 _ => false,
1057 })
1058 .take_while(|(_, p)| !p.is_finished())
1059 {
1060 self.batch_metrics.write_stalls.inc();
1061 part.block_until_ready().await;
1062 }
1063 }
1064 }
1065 }
1066
1067 async fn write_hollow_part<K: Codec, V: Codec>(
1068 cfg: BatchBuilderConfig,
1069 blob: Arc<dyn Blob>,
1070 metrics: Arc<Metrics>,
1071 shard_metrics: Arc<ShardMetrics>,
1072 batch_metrics: BatchWriteMetrics,
1073 isolated_runtime: Arc<IsolatedRuntime>,
1074 mut updates: BlobTraceBatchPart<T>,
1075 run_order: RunOrder,
1076 ts_rewrite: Option<Antichain<T>>,
1077 diffs_sum: [u8; 8],
1078 write_schemas: Schemas<K, V>,
1079 ) -> BatchPart<T> {
1080 let partial_key = PartialBatchKey::new(&cfg.writer_key, &PartId::new());
1081 let key = partial_key.complete(&shard_metrics.shard_id);
1082 let goodbytes = updates.updates.goodbytes();
1083 let metrics_ = Arc::clone(&metrics);
1084 let schema_id = write_schemas.id;
1085
1086 let (stats, key_lower, structured_key_lower, (buf, encode_time)) = isolated_runtime
1087 .spawn_named(|| "batch::encode_part", async move {
1088 let stats = metrics_.columnar.arrow().measure_part_build(|| {
1090 let stats = if cfg.stats_collection_enabled {
1091 let ext = updates.updates.get_or_make_structured::<K, V>(
1092 write_schemas.key.as_ref(),
1093 write_schemas.val.as_ref(),
1094 );
1095
1096 let key_stats = write_schemas
1097 .key
1098 .decoder_any(ext.key.as_ref())
1099 .expect("decoding just-encoded data")
1100 .stats();
1101
1102 let part_stats = PartStats { key: key_stats };
1103
1104 let trimmed_start = Instant::now();
1106 let mut trimmed_bytes = 0;
1107 let trimmed_stats = LazyPartStats::encode(&part_stats, |s| {
1108 trimmed_bytes = trim_to_budget(s, cfg.stats_budget, |s| {
1109 cfg.stats_untrimmable_columns.should_retain(s)
1110 })
1111 });
1112 let trimmed_duration = trimmed_start.elapsed();
1113 Some((trimmed_stats, trimmed_duration, trimmed_bytes))
1114 } else {
1115 None
1116 };
1117
1118 updates.updates = updates.updates.as_structured::<K, V>(
1120 write_schemas.key.as_ref(),
1121 write_schemas.val.as_ref(),
1122 );
1123
1124 stats
1125 });
1126
1127 let key_lower = if let Some(records) = updates.updates.records() {
1128 let key_bytes = records.keys();
1129 if key_bytes.is_empty() {
1130 &[]
1131 } else if run_order == RunOrder::Codec {
1132 key_bytes.value(0)
1133 } else {
1134 ::arrow::compute::min_binary(key_bytes).expect("min of nonempty array")
1135 }
1136 } else {
1137 &[]
1138 };
1139 let key_lower = truncate_bytes(key_lower, TRUNCATE_LEN, TruncateBound::Lower)
1140 .expect("lower bound always exists");
1141
1142 let structured_key_lower = if cfg.structured_key_lower_len > 0 {
1143 updates.updates.structured().and_then(|ext| {
1144 let min_key = if run_order == RunOrder::Structured {
1145 0
1146 } else {
1147 let ord = ArrayOrd::new(ext.key.as_ref());
1148 (0..ext.key.len())
1149 .min_by_key(|i| ord.at(*i))
1150 .expect("non-empty batch")
1151 };
1152 let lower = ArrayBound::new(Arc::clone(&ext.key), min_key)
1153 .to_proto_lower(cfg.structured_key_lower_len);
1154 if lower.is_none() {
1155 batch_metrics.key_lower_too_big.inc()
1156 }
1157 lower.map(|proto| LazyProto::from(&proto))
1158 })
1159 } else {
1160 None
1161 };
1162
1163 let encode_start = Instant::now();
1164 let mut buf = Vec::new();
1165 updates.encode(&mut buf, &metrics_.columnar, &cfg.encoding_config);
1166
1167 drop(updates);
1169 (
1170 stats,
1171 key_lower,
1172 structured_key_lower,
1173 (Bytes::from(buf), encode_start.elapsed()),
1174 )
1175 })
1176 .instrument(debug_span!("batch::encode_part"))
1177 .await
1178 .expect("part encode task failed");
1179 metrics.codecs.batch.encode_count.inc();
1181 metrics
1182 .codecs
1183 .batch
1184 .encode_seconds
1185 .inc_by(encode_time.as_secs_f64());
1186
1187 let start = Instant::now();
1188 let payload_len = buf.len();
1189 let () = retry_external(&metrics.retries.external.batch_set, || async {
1190 shard_metrics.blob_sets.inc();
1191 blob.set(&key, Bytes::clone(&buf)).await
1192 })
1193 .instrument(trace_span!("batch::set", payload_len))
1194 .await;
1195 batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64());
1196 batch_metrics.bytes.inc_by(u64::cast_from(payload_len));
1197 batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes));
1198 match run_order {
1199 RunOrder::Unordered => batch_metrics.unordered.inc(),
1200 RunOrder::Codec => batch_metrics.codec_order.inc(),
1201 RunOrder::Structured => batch_metrics.structured_order.inc(),
1202 }
1203 let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| {
1204 batch_metrics
1205 .step_stats
1206 .inc_by(stats_step_timing.as_secs_f64());
1207 if trimmed_bytes > 0 {
1208 metrics.pushdown.parts_stats_trimmed_count.inc();
1209 metrics
1210 .pushdown
1211 .parts_stats_trimmed_bytes
1212 .inc_by(u64::cast_from(trimmed_bytes));
1213 }
1214 stats
1215 });
1216
1217 BatchPart::Hollow(HollowBatchPart {
1218 key: partial_key,
1219 encoded_size_bytes: payload_len,
1220 key_lower,
1221 structured_key_lower,
1222 stats,
1223 ts_rewrite,
1224 diffs_sum: Some(diffs_sum),
1225 format: Some(BatchColumnarFormat::Structured),
1226 schema_id,
1227 deprecated_schema_id: None,
1229 })
1230 }
1231
1232 #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))]
1233 pub(crate) async fn finish(self) -> Vec<(RunOrder, Vec<RunPart<T>>)> {
1234 match self.writing_runs {
1235 WritingRuns::Ordered(RunOrder::Unordered, run) => {
1236 let parts = run.finish();
1237 let mut output = Vec::with_capacity(parts.len());
1238 for part in parts {
1239 output.push((RunOrder::Unordered, vec![part.into_result().await]));
1240 }
1241 output
1242 }
1243 WritingRuns::Ordered(order, run) => {
1244 let parts = run.finish();
1245 let mut output = Vec::with_capacity(parts.len());
1246 for part in parts {
1247 output.push(part.into_result().await);
1248 }
1249 vec![(order, output)]
1250 }
1251 WritingRuns::Compacting(batches) => {
1252 let runs = batches.finish();
1253 let mut output = Vec::with_capacity(runs.len());
1254 for (order, run) in runs {
1255 let run = run.into_result().await;
1256 output.push((order, run))
1257 }
1258 output
1259 }
1260 }
1261 }
1262}
1263
1264pub(crate) fn validate_truncate_batch<T: Timestamp>(
1265 batch: &HollowBatch<T>,
1266 truncate: &Description<T>,
1267 any_batch_rewrite: bool,
1268) -> Result<(), InvalidUsage<T>> {
1269 if any_batch_rewrite {
1272 if truncate.upper() != batch.desc.upper() {
1277 return Err(InvalidUsage::InvalidRewrite(format!(
1278 "rewritten batch might have data past {:?} up to {:?}",
1279 truncate.upper().elements(),
1280 batch.desc.upper().elements(),
1281 )));
1282 }
1283 for part in batch.parts.iter() {
1286 let part_lower_bound = part.ts_rewrite().unwrap_or_else(|| batch.desc.lower());
1287 if !PartialOrder::less_equal(truncate.lower(), part_lower_bound) {
1288 return Err(InvalidUsage::InvalidRewrite(format!(
1289 "rewritten batch might have data below {:?} at {:?}",
1290 truncate.lower().elements(),
1291 part_lower_bound.elements(),
1292 )));
1293 }
1294 }
1295 }
1296
1297 let batch = &batch.desc;
1298 if !PartialOrder::less_equal(batch.lower(), truncate.lower())
1299 || PartialOrder::less_than(batch.upper(), truncate.upper())
1300 {
1301 return Err(InvalidUsage::InvalidBatchBounds {
1302 batch_lower: batch.lower().clone(),
1303 batch_upper: batch.upper().clone(),
1304 append_lower: truncate.lower().clone(),
1305 append_upper: truncate.upper().clone(),
1306 });
1307 }
1308 Ok(())
1309}
1310
1311#[derive(Debug)]
1312pub(crate) struct PartDeletes<T> {
1313 blob_keys: BTreeSet<PartialBatchKey>,
1315 hollow_runs: BTreeMap<PartialBatchKey, HollowRunRef<T>>,
1317}
1318
1319impl<T> Default for PartDeletes<T> {
1320 fn default() -> Self {
1321 Self {
1322 blob_keys: Default::default(),
1323 hollow_runs: Default::default(),
1324 }
1325 }
1326}
1327
1328impl<T: Timestamp> PartDeletes<T> {
1329 pub fn add(&mut self, part: &RunPart<T>) -> bool {
1332 match part {
1333 RunPart::Many(r) => self.hollow_runs.insert(r.key.clone(), r.clone()).is_none(),
1334 RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.insert(x.key.clone()),
1335 RunPart::Single(BatchPart::Inline { .. }) => {
1336 true
1338 }
1339 }
1340 }
1341
1342 pub fn contains(&self, part: &RunPart<T>) -> bool {
1343 match part {
1344 RunPart::Many(r) => self.hollow_runs.contains_key(&r.key),
1345 RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.contains(&x.key),
1346 RunPart::Single(BatchPart::Inline { .. }) => false,
1347 }
1348 }
1349
1350 pub fn is_empty(&self) -> bool {
1351 self.len() == 0
1352 }
1353
1354 pub fn len(&self) -> usize {
1355 match self {
1356 Self {
1357 blob_keys,
1358 hollow_runs,
1359 } => blob_keys.len() + hollow_runs.len(),
1360 }
1361 }
1362
1363 pub async fn delete(
1364 mut self,
1365 blob: &dyn Blob,
1366 shard_id: ShardId,
1367 concurrency: usize,
1368 metrics: &Metrics,
1369 delete_metrics: &RetryMetrics,
1370 ) where
1371 T: Codec64,
1372 {
1373 loop {
1374 let () = stream::iter(mem::take(&mut self.blob_keys))
1375 .map(|key| {
1376 let key = key.complete(&shard_id);
1377 async move {
1378 retry_external(delete_metrics, || blob.delete(&key)).await;
1379 }
1380 })
1381 .buffer_unordered(concurrency)
1382 .collect()
1383 .await;
1384
1385 let Some((run_key, run_ref)) = self.hollow_runs.pop_first() else {
1386 break;
1387 };
1388
1389 if let Some(run) = run_ref.get(shard_id, blob, metrics).await {
1390 for part in &run.parts {
1392 self.add(part);
1393 }
1394 self.blob_keys.insert(run_key);
1395 };
1396 }
1397 }
1398}
1399
1400fn diffs_sum<D: Semigroup + Codec64>(updates: &Int64Array) -> Option<D> {
1402 let mut sum = None;
1403 for d in updates.values().iter() {
1404 let d = D::decode(d.to_le_bytes());
1405 match &mut sum {
1406 None => sum = Some(d),
1407 Some(x) => x.plus_equals(&d),
1408 }
1409 }
1410
1411 sum
1412}
1413
1414#[cfg(test)]
1415mod tests {
1416 use mz_dyncfg::ConfigUpdates;
1417 use timely::order::Product;
1418
1419 use super::*;
1420 use crate::PersistLocation;
1421 use crate::cache::PersistClientCache;
1422 use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
1423 use crate::internal::paths::{BlobKey, PartialBlobKey};
1424 use crate::tests::{all_ok, new_test_client};
1425
1426 #[mz_ore::test(tokio::test)]
1427 #[cfg_attr(miri, ignore)] async fn batch_builder_flushing() {
1429 let data = vec![
1430 (("1".to_owned(), "one".to_owned()), 1, 1),
1431 (("2".to_owned(), "two".to_owned()), 2, 1),
1432 (("3".to_owned(), "three".to_owned()), 3, 1),
1433 (("4".to_owned(), "four".to_owned()), 4, 1),
1434 ];
1435
1436 let cache = PersistClientCache::new_no_metrics();
1437
1438 cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1442 cache.cfg.set_config(&MAX_RUNS, 3);
1443 cache
1444 .cfg
1445 .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 2);
1446
1447 let client = cache
1448 .open(PersistLocation::new_in_mem())
1449 .await
1450 .expect("client construction failed");
1451 let (mut write, mut read) = client
1452 .expect_open::<String, String, u64, i64>(ShardId::new())
1453 .await;
1454
1455 let mut builder = write.builder(Antichain::from_elem(0));
1457
1458 fn assert_writing(
1459 builder: &BatchBuilder<String, String, u64, i64>,
1460 expected_finished: &[bool],
1461 ) {
1462 let WritingRuns::Compacting(run) = &builder.builder.parts.writing_runs else {
1463 unreachable!("ordered run!")
1464 };
1465
1466 let actual: Vec<_> = run.iter().map(|(_, p)| p.is_finished()).collect();
1467 assert_eq!(*expected_finished, actual);
1468 }
1469
1470 assert_writing(&builder, &[]);
1471
1472 let ((k, v), t, d) = &data[0];
1475 builder.add(k, v, t, d).await.expect("invalid usage");
1476 assert_writing(&builder, &[false]);
1477
1478 let ((k, v), t, d) = &data[1];
1481 builder.add(k, v, t, d).await.expect("invalid usage");
1482 assert_writing(&builder, &[false, false]);
1483
1484 let ((k, v), t, d) = &data[2];
1487 builder.add(k, v, t, d).await.expect("invalid usage");
1488 assert_writing(&builder, &[true, false, false]);
1489
1490 let ((k, v), t, d) = &data[3];
1493 builder.add(k, v, t, d).await.expect("invalid usage");
1494 assert_writing(&builder, &[false, false]);
1495
1496 let batch = builder
1499 .finish(Antichain::from_elem(5))
1500 .await
1501 .expect("invalid usage");
1502 assert_eq!(batch.batch.runs().count(), 2);
1503 assert_eq!(batch.batch.part_count(), 4);
1504 write
1505 .append_batch(batch, Antichain::from_elem(0), Antichain::from_elem(5))
1506 .await
1507 .expect("invalid usage")
1508 .expect("unexpected upper");
1509 assert_eq!(read.expect_snapshot_and_fetch(4).await, all_ok(&data, 4));
1510 }
1511
1512 #[mz_ore::test(tokio::test)]
1513 #[cfg_attr(miri, ignore)] async fn batch_builder_keys() {
1515 let cache = PersistClientCache::new_no_metrics();
1516 cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1518 cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
1520 cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1521 cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1522 let client = cache
1523 .open(PersistLocation::new_in_mem())
1524 .await
1525 .expect("client construction failed");
1526 let shard_id = ShardId::new();
1527 let (mut write, _) = client
1528 .expect_open::<String, String, u64, i64>(shard_id)
1529 .await;
1530
1531 let batch = write
1532 .expect_batch(
1533 &[
1534 (("1".into(), "one".into()), 1, 1),
1535 (("2".into(), "two".into()), 2, 1),
1536 (("3".into(), "three".into()), 3, 1),
1537 ],
1538 0,
1539 4,
1540 )
1541 .await;
1542
1543 assert_eq!(batch.batch.part_count(), 3);
1544 for part in &batch.batch.parts {
1545 let part = part.expect_hollow_part();
1546 match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
1547 Ok((shard, PartialBlobKey::Batch(writer, _))) => {
1548 assert_eq!(shard.to_string(), shard_id.to_string());
1549 assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
1550 }
1551 _ => panic!("unparseable blob key"),
1552 }
1553 }
1554 }
1555
1556 #[mz_ore::test(tokio::test)]
1557 #[cfg_attr(miri, ignore)] async fn batch_delete() {
1559 let cache = PersistClientCache::new_no_metrics();
1560 cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1561 cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1562 cache.cfg.set_config(&BATCH_DELETE_ENABLED, true);
1563 let client = cache
1564 .open(PersistLocation::new_in_mem())
1565 .await
1566 .expect("client construction failed");
1567 let shard_id = ShardId::new();
1568 let (mut write, _) = client
1569 .expect_open::<String, String, u64, i64>(shard_id)
1570 .await;
1571
1572 let batch = write
1573 .expect_batch(
1574 &[
1575 (("1".into(), "one".into()), 1, 1),
1576 (("2".into(), "two".into()), 2, 1),
1577 (("3".into(), "three".into()), 3, 1),
1578 ],
1579 0,
1580 4,
1581 )
1582 .await;
1583
1584 assert_eq!(batch.batch.part_count(), 1);
1585 let part_key = batch.batch.parts[0]
1586 .expect_hollow_part()
1587 .key
1588 .complete(&shard_id);
1589
1590 let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1591 assert!(part_bytes.is_some());
1592
1593 batch.delete().await;
1594
1595 let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1596 assert!(part_bytes.is_none());
1597 }
1598
1599 #[mz_ore::test(tokio::test)]
1600 #[cfg_attr(miri, ignore)] async fn batch_builder_partial_order() {
1602 let cache = PersistClientCache::new_no_metrics();
1603 cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1605 cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
1607 cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1608 cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1609 let client = cache
1610 .open(PersistLocation::new_in_mem())
1611 .await
1612 .expect("client construction failed");
1613 let shard_id = ShardId::new();
1614 let (mut write, _) = client
1615 .expect_open::<String, String, Product<u32, u32>, i64>(shard_id)
1616 .await;
1617
1618 let batch = write
1619 .batch(
1620 &[
1621 (("1".to_owned(), "one".to_owned()), Product::new(0, 10), 1),
1622 (("2".to_owned(), "two".to_owned()), Product::new(10, 0), 1),
1623 ],
1624 Antichain::from_elem(Product::new(0, 0)),
1625 Antichain::from_iter([Product::new(0, 11), Product::new(10, 1)]),
1626 )
1627 .await
1628 .expect("invalid usage");
1629
1630 assert_eq!(batch.batch.part_count(), 2);
1631 for part in &batch.batch.parts {
1632 let part = part.expect_hollow_part();
1633 match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
1634 Ok((shard, PartialBlobKey::Batch(writer, _))) => {
1635 assert_eq!(shard.to_string(), shard_id.to_string());
1636 assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
1637 }
1638 _ => panic!("unparseable blob key"),
1639 }
1640 }
1641 }
1642
1643 #[mz_ore::test]
1644 fn untrimmable_columns() {
1645 let untrimmable = UntrimmableColumns {
1646 equals: vec!["abc".into(), "def".into()],
1647 prefixes: vec!["123".into(), "234".into()],
1648 suffixes: vec!["xyz".into()],
1649 };
1650
1651 assert!(untrimmable.should_retain("abc"));
1653 assert!(untrimmable.should_retain("ABC"));
1654 assert!(untrimmable.should_retain("aBc"));
1655 assert!(!untrimmable.should_retain("abcd"));
1656 assert!(untrimmable.should_retain("deF"));
1657 assert!(!untrimmable.should_retain("defg"));
1658
1659 assert!(untrimmable.should_retain("123"));
1661 assert!(untrimmable.should_retain("123-4"));
1662 assert!(untrimmable.should_retain("1234"));
1663 assert!(untrimmable.should_retain("234"));
1664 assert!(!untrimmable.should_retain("345"));
1665
1666 assert!(untrimmable.should_retain("ijk_xyZ"));
1668 assert!(untrimmable.should_retain("ww-XYZ"));
1669 assert!(!untrimmable.should_retain("xya"));
1670 }
1671
1672 #[mz_persist_proc::test(tokio::test)]
1674 #[cfg_attr(miri, ignore)] async fn rewrite_ts_example(dyncfgs: ConfigUpdates) {
1676 let client = new_test_client(&dyncfgs).await;
1677 let (mut write, read) = client
1678 .expect_open::<String, (), u64, i64>(ShardId::new())
1679 .await;
1680
1681 let mut batch = write.builder(Antichain::from_elem(0));
1682 batch.add(&"foo".to_owned(), &(), &0, &1).await.unwrap();
1683 let batch = batch.finish(Antichain::from_elem(1)).await.unwrap();
1684
1685 let batch = batch.into_transmittable_batch();
1687 let mut batch = write.batch_from_transmittable_batch(batch);
1688 batch
1689 .rewrite_ts(&Antichain::from_elem(2), Antichain::from_elem(3))
1690 .unwrap();
1691 write
1692 .expect_compare_and_append_batch(&mut [&mut batch], 0, 3)
1693 .await;
1694
1695 let (actual, _) = read.expect_listen(0).await.read_until(&3).await;
1696 let expected = vec![(((Ok("foo".to_owned())), Ok(())), 2, 1)];
1697 assert_eq!(actual, expected);
1698 }
1699
1700 #[mz_ore::test(tokio::test)]
1701 #[cfg_attr(miri, ignore)] async fn structured_lowers() {
1703 let cache = PersistClientCache::new_no_metrics();
1704 cache.cfg().set_config(&STRUCTURED_KEY_LOWER_LEN, 1024);
1706 cache.cfg().set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1708 cache.cfg().set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1709 let client = cache
1710 .open(PersistLocation::new_in_mem())
1711 .await
1712 .expect("client construction failed");
1713 let shard_id = ShardId::new();
1714 let (mut write, _) = client
1715 .expect_open::<String, String, u64, i64>(shard_id)
1716 .await;
1717
1718 let batch = write
1719 .expect_batch(
1720 &[
1721 (("1".into(), "one".into()), 1, 1),
1722 (("2".into(), "two".into()), 2, 1),
1723 (("3".into(), "three".into()), 3, 1),
1724 ],
1725 0,
1726 4,
1727 )
1728 .await;
1729
1730 assert_eq!(batch.batch.part_count(), 1);
1731 let [part] = batch.batch.parts.as_slice() else {
1732 panic!("expected single part")
1733 };
1734 assert!(part.structured_key_lower().is_some());
1736 }
1737}