1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::marker::PhantomData;
16use std::mem;
17use std::sync::Arc;
18use std::time::Instant;
19
20use arrow::array::{Array, Int64Array};
21use bytes::Bytes;
22use differential_dataflow::difference::Monoid;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::stream::StreamExt;
26use futures_util::{FutureExt, stream};
27use mz_dyncfg::Config;
28use mz_ore::cast::CastFrom;
29use mz_ore::instrument;
30use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::Blob;
32use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
33use mz_persist_types::columnar::{ColumnDecoder, Schema};
34use mz_persist_types::parquet::{CompressionFormat, EncodingConfig};
35use mz_persist_types::part::{Part, PartBuilder};
36use mz_persist_types::schema::SchemaId;
37use mz_persist_types::stats::{
38 PartStats, TRUNCATE_LEN, TruncateBound, trim_to_budget, truncate_bytes,
39};
40use mz_persist_types::{Codec, Codec64};
41use mz_proto::RustType;
42use mz_timely_util::order::Reverse;
43use proptest_derive::Arbitrary;
44use semver::Version;
45use timely::PartialOrder;
46use timely::order::TotalOrder;
47use timely::progress::{Antichain, Timestamp};
48use tracing::{Instrument, debug_span, trace_span, warn};
49
50use crate::async_runtime::IsolatedRuntime;
51use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
52use crate::error::InvalidUsage;
53use crate::internal::compact::{CompactConfig, Compactor};
54use crate::internal::encoding::{
55 LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, Schemas,
56};
57use crate::internal::machine::retry_external;
58use crate::internal::merge::{MergeTree, Pending};
59use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
60use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
61use crate::internal::state::{
62 BatchPart, ENABLE_INCREMENTAL_COMPACTION, HollowBatch, HollowBatchPart, HollowRun,
63 HollowRunRef, ProtoInlineBatchPart, RunId, RunMeta, RunOrder, RunPart,
64};
65use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns};
66use crate::{PersistConfig, ShardId};
67
68include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs"));
69
70#[derive(Debug)]
76pub struct Batch<K, V, T, D> {
77 pub(crate) batch_delete_enabled: bool,
78 pub(crate) metrics: Arc<Metrics>,
79 pub(crate) shard_metrics: Arc<ShardMetrics>,
80
81 pub(crate) version: Version,
83
84 pub(crate) schemas: (Bytes, Bytes),
86
87 pub(crate) batch: HollowBatch<T>,
89
90 pub(crate) blob: Arc<dyn Blob>,
92
93 pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
96}
97
98impl<K, V, T, D> Drop for Batch<K, V, T, D> {
99 fn drop(&mut self) {
100 if self.batch.part_count() > 0 {
101 warn!(
102 "un-consumed Batch, with {} parts and dangling blob keys: {:?}",
103 self.batch.part_count(),
104 self.batch
105 .parts
106 .iter()
107 .map(|x| x.printable_name())
108 .collect::<Vec<_>>(),
109 );
110 }
111 }
112}
113
114impl<K, V, T, D> Batch<K, V, T, D>
115where
116 K: Debug + Codec,
117 V: Debug + Codec,
118 T: Timestamp + Lattice + Codec64,
119 D: Monoid + Codec64,
120{
121 pub(crate) fn new(
122 batch_delete_enabled: bool,
123 metrics: Arc<Metrics>,
124 blob: Arc<dyn Blob>,
125 shard_metrics: Arc<ShardMetrics>,
126 version: Version,
127 schemas: (Bytes, Bytes),
128 batch: HollowBatch<T>,
129 ) -> Self {
130 Self {
131 batch_delete_enabled,
132 metrics,
133 shard_metrics,
134 version,
135 schemas,
136 batch,
137 blob,
138 _phantom: PhantomData,
139 }
140 }
141
142 pub fn shard_id(&self) -> ShardId {
144 self.shard_metrics.shard_id
145 }
146
147 pub fn upper(&self) -> &Antichain<T> {
149 self.batch.desc.upper()
150 }
151
152 pub fn lower(&self) -> &Antichain<T> {
154 self.batch.desc.lower()
155 }
156
157 pub(crate) fn mark_consumed(&mut self) {
163 self.batch.parts.clear();
164 }
165
166 #[instrument(level = "debug", fields(shard = %self.shard_id()))]
169 pub async fn delete(mut self) {
170 if !self.batch_delete_enabled {
171 self.mark_consumed();
172 return;
173 }
174 let mut deletes = PartDeletes::default();
175 for part in self.batch.parts.drain(..) {
176 deletes.add(&part);
177 }
178 let () = deletes
179 .delete(
180 &*self.blob,
181 self.shard_id(),
182 usize::MAX,
183 &*self.metrics,
184 &*self.metrics.retries.external.batch_delete,
185 )
186 .await;
187 }
188
189 pub fn schemas(&self) -> impl Iterator<Item = SchemaId> + '_ {
191 self.batch.parts.iter().flat_map(|b| b.schema_id())
192 }
193
194 pub fn into_hollow_batch(mut self) -> HollowBatch<T> {
199 let ret = self.batch.clone();
200 self.mark_consumed();
201 ret
202 }
203
204 pub fn into_transmittable_batch(mut self) -> ProtoBatch {
213 let ret = ProtoBatch {
214 shard_id: self.shard_metrics.shard_id.into_proto(),
215 version: self.version.to_string(),
216 batch: Some(self.batch.into_proto()),
217 key_schema: self.schemas.0.clone(),
218 val_schema: self.schemas.1.clone(),
219 };
220 self.mark_consumed();
221 ret
222 }
223
224 pub(crate) async fn flush_to_blob(
225 &mut self,
226 cfg: &BatchBuilderConfig,
227 batch_metrics: &BatchWriteMetrics,
228 isolated_runtime: &Arc<IsolatedRuntime>,
229 write_schemas: &Schemas<K, V>,
230 ) {
231 let mut parts = Vec::new();
236 for (run_meta, run_parts) in self.batch.runs() {
237 for part in run_parts {
238 let (updates, ts_rewrite, schema_id) = match part {
239 RunPart::Single(BatchPart::Inline {
240 updates,
241 ts_rewrite,
242 schema_id,
243 deprecated_schema_id: _,
244 }) => (updates, ts_rewrite, schema_id),
245 other @ RunPart::Many(_) | other @ RunPart::Single(BatchPart::Hollow(_)) => {
246 parts.push(other.clone());
247 continue;
248 }
249 };
250 let updates = updates
251 .decode::<T>(&self.metrics.columnar)
252 .expect("valid inline part");
253 let diffs_sum = diffs_sum::<D>(updates.updates.diffs());
254 let mut write_schemas = write_schemas.clone();
255 write_schemas.id = *schema_id;
256
257 let write_span =
258 debug_span!("batch::flush_to_blob", shard = %self.shard_metrics.shard_id)
259 .or_current();
260 let handle = mz_ore::task::spawn(
261 || "batch::flush_to_blob",
262 BatchParts::write_hollow_part(
263 cfg.clone(),
264 Arc::clone(&self.blob),
265 Arc::clone(&self.metrics),
266 Arc::clone(&self.shard_metrics),
267 batch_metrics.clone(),
268 Arc::clone(isolated_runtime),
269 updates,
270 run_meta.order.unwrap_or(RunOrder::Unordered),
271 ts_rewrite.clone(),
272 D::encode(&diffs_sum),
273 write_schemas,
274 )
275 .instrument(write_span),
276 );
277 let part = handle.await;
278 parts.push(RunPart::Single(part));
279 }
280 }
281 self.batch.parts = parts;
282 }
283
284 pub fn encoded_size_bytes(&self) -> usize {
286 self.batch.encoded_size_bytes()
287 }
288}
289
290impl<K, V, T, D> Batch<K, V, T, D>
291where
292 K: Debug + Codec,
293 V: Debug + Codec,
294 T: Timestamp + Lattice + Codec64 + TotalOrder,
295 D: Monoid + Codec64,
296{
297 pub fn rewrite_ts(
331 &mut self,
332 frontier: &Antichain<T>,
333 new_upper: Antichain<T>,
334 ) -> Result<(), InvalidUsage<T>> {
335 self.batch
336 .rewrite_ts(frontier, new_upper)
337 .map_err(InvalidUsage::InvalidRewrite)
338 }
339}
340
341#[derive(Debug)]
343pub enum Added {
344 Record,
346 RecordAndParts,
349}
350
351#[derive(Debug, Clone)]
354pub struct BatchBuilderConfig {
355 writer_key: WriterKey,
356 pub(crate) blob_target_size: usize,
357 pub(crate) batch_delete_enabled: bool,
358 pub(crate) batch_builder_max_outstanding_parts: usize,
359 pub(crate) inline_writes_single_max_bytes: usize,
360 pub(crate) stats_collection_enabled: bool,
361 pub(crate) stats_budget: usize,
362 pub(crate) stats_untrimmable_columns: Arc<UntrimmableColumns>,
363 pub(crate) encoding_config: EncodingConfig,
364 pub(crate) preferred_order: RunOrder,
365 pub(crate) structured_key_lower_len: usize,
366 pub(crate) run_length_limit: usize,
367 pub(crate) enable_incremental_compaction: bool,
368 pub(crate) max_runs: Option<usize>,
372}
373
374pub(crate) const BATCH_DELETE_ENABLED: Config<bool> = Config::new(
376 "persist_batch_delete_enabled",
377 true,
378 "Whether to actually delete blobs when batch delete is called (Materialize).",
379);
380
381pub(crate) const ENCODING_ENABLE_DICTIONARY: Config<bool> = Config::new(
382 "persist_encoding_enable_dictionary",
383 true,
384 "A feature flag to enable dictionary encoding for Parquet data (Materialize).",
385);
386
387pub(crate) const ENCODING_COMPRESSION_FORMAT: Config<&'static str> = Config::new(
388 "persist_encoding_compression_format",
389 "none",
390 "A feature flag to enable compression of Parquet data (Materialize).",
391);
392
393pub(crate) const STRUCTURED_KEY_LOWER_LEN: Config<usize> = Config::new(
394 "persist_batch_structured_key_lower_len",
395 256,
396 "The maximum size in proto bytes of any structured key-lower metadata to preserve. \
397 (If we're unable to fit the lower in budget, or the budget is zero, no metadata is kept.)",
398);
399
400pub(crate) const MAX_RUN_LEN: Config<usize> = Config::new(
401 "persist_batch_max_run_len",
402 usize::MAX,
403 "The maximum length a run can have before it will be spilled as a hollow run \
404 into the blob store.",
405);
406
407pub(crate) const MAX_RUNS: Config<usize> = Config::new(
408 "persist_batch_max_runs",
409 1,
410 "The maximum number of runs a batch builder should generate for user batches. \
411 (Compaction outputs always generate a single run.) \
412 The minimum value is 2; below this, compaction is disabled.",
413);
414
415pub(crate) const BLOB_TARGET_SIZE: Config<usize> = Config::new(
422 "persist_blob_target_size",
423 128 * MiB,
424 "A target maximum size of persist blob payloads in bytes (Materialize).",
425);
426
427pub(crate) const INLINE_WRITES_SINGLE_MAX_BYTES: Config<usize> = Config::new(
428 "persist_inline_writes_single_max_bytes",
429 4096,
430 "The (exclusive) maximum size of a write that persist will inline in metadata.",
431);
432
433pub(crate) const INLINE_WRITES_TOTAL_MAX_BYTES: Config<usize> = Config::new(
434 "persist_inline_writes_total_max_bytes",
435 1 * MiB,
436 "\
437 The (exclusive) maximum total size of inline writes in metadata before \
438 persist will backpressure them by flushing out to s3.",
439);
440
441impl BatchBuilderConfig {
442 pub fn new(value: &PersistConfig, _shard_id: ShardId) -> Self {
444 let writer_key = WriterKey::for_version(&value.build_version);
445
446 let preferred_order = RunOrder::Structured;
447
448 BatchBuilderConfig {
449 writer_key,
450 blob_target_size: BLOB_TARGET_SIZE.get(value).clamp(1, usize::MAX),
451 batch_delete_enabled: BATCH_DELETE_ENABLED.get(value),
452 batch_builder_max_outstanding_parts: BATCH_BUILDER_MAX_OUTSTANDING_PARTS.get(value),
453 inline_writes_single_max_bytes: INLINE_WRITES_SINGLE_MAX_BYTES.get(value),
454 stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value),
455 stats_budget: STATS_BUDGET_BYTES.get(value),
456 stats_untrimmable_columns: Arc::new(untrimmable_columns(value)),
457 encoding_config: EncodingConfig {
458 use_dictionary: ENCODING_ENABLE_DICTIONARY.get(value),
459 compression: CompressionFormat::from_str(&ENCODING_COMPRESSION_FORMAT.get(value)),
460 },
461 preferred_order,
462 structured_key_lower_len: STRUCTURED_KEY_LOWER_LEN.get(value),
463 run_length_limit: MAX_RUN_LEN.get(value).clamp(2, usize::MAX),
464 max_runs: match MAX_RUNS.get(value) {
465 limit @ 2.. => Some(limit),
466 _ => None,
467 },
468 enable_incremental_compaction: ENABLE_INCREMENTAL_COMPACTION.get(value),
469 }
470 }
471}
472
473#[derive(
476 Debug,
477 Clone,
478 PartialEq,
479 Eq,
480 serde::Serialize,
481 serde::Deserialize,
482 Arbitrary
483)]
484pub(crate) struct UntrimmableColumns {
485 pub equals: Vec<Cow<'static, str>>,
487 pub prefixes: Vec<Cow<'static, str>>,
489 pub suffixes: Vec<Cow<'static, str>>,
491}
492
493impl UntrimmableColumns {
494 pub(crate) fn should_retain(&self, name: &str) -> bool {
495 let name_lower = name.to_lowercase();
498 for s in &self.equals {
499 if *s == name_lower {
500 return true;
501 }
502 }
503 for s in &self.prefixes {
504 if name_lower.starts_with(s.as_ref()) {
505 return true;
506 }
507 }
508 for s in &self.suffixes {
509 if name_lower.ends_with(s.as_ref()) {
510 return true;
511 }
512 }
513 false
514 }
515}
516
517#[derive(Debug)]
520pub struct BatchBuilder<K, V, T, D>
521where
522 K: Codec,
523 V: Codec,
524 T: Timestamp + Lattice + Codec64,
525{
526 inline_desc: Description<T>,
527 inclusive_upper: Antichain<Reverse<T>>,
528
529 records_builder: PartBuilder<K, K::Schema, V, V::Schema>,
530 pub(crate) builder: BatchBuilderInternal<K, V, T, D>,
531}
532
533impl<K, V, T, D> BatchBuilder<K, V, T, D>
534where
535 K: Debug + Codec,
536 V: Debug + Codec,
537 T: Timestamp + Lattice + Codec64,
538 D: Monoid + Codec64,
539{
540 pub(crate) fn new(
541 builder: BatchBuilderInternal<K, V, T, D>,
542 inline_desc: Description<T>,
543 ) -> Self {
544 let records_builder = PartBuilder::new(
545 builder.write_schemas.key.as_ref(),
546 builder.write_schemas.val.as_ref(),
547 );
548 Self {
549 inline_desc,
550 inclusive_upper: Antichain::new(),
551 records_builder,
552 builder,
553 }
554 }
555
556 pub async fn finish(
561 mut self,
562 registered_upper: Antichain<T>,
563 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
564 if PartialOrder::less_than(®istered_upper, self.inline_desc.lower()) {
565 return Err(InvalidUsage::InvalidBounds {
566 lower: self.inline_desc.lower().clone(),
567 upper: registered_upper,
568 });
569 }
570
571 if PartialOrder::less_equal(self.inline_desc.since(), self.inline_desc.lower()) {
576 for ts in self.inclusive_upper.iter() {
577 if registered_upper.less_equal(&ts.0) {
578 return Err(InvalidUsage::UpdateBeyondUpper {
579 ts: ts.0.clone(),
580 expected_upper: registered_upper.clone(),
581 });
582 }
583 }
584 }
585
586 let updates = self.records_builder.finish();
587 self.builder
588 .flush_part(self.inline_desc.clone(), updates)
589 .await;
590
591 self.builder
592 .finish(Description::new(
593 self.inline_desc.lower().clone(),
594 registered_upper,
595 self.inline_desc.since().clone(),
596 ))
597 .await
598 }
599
600 pub async fn add(
605 &mut self,
606 key: &K,
607 val: &V,
608 ts: &T,
609 diff: &D,
610 ) -> Result<Added, InvalidUsage<T>> {
611 if !self.inline_desc.lower().less_equal(ts) {
612 return Err(InvalidUsage::UpdateNotBeyondLower {
613 ts: ts.clone(),
614 lower: self.inline_desc.lower().clone(),
615 });
616 }
617 self.inclusive_upper.insert(Reverse(ts.clone()));
618
619 let added = {
620 self.records_builder
621 .push(key, val, ts.clone(), diff.clone());
622 if self.records_builder.goodbytes() >= self.builder.parts.cfg.blob_target_size {
623 let part = self.records_builder.finish_and_replace(
624 self.builder.write_schemas.key.as_ref(),
625 self.builder.write_schemas.val.as_ref(),
626 );
627 Some(part)
628 } else {
629 None
630 }
631 };
632
633 let added = if let Some(full_batch) = added {
634 self.builder
635 .flush_part(self.inline_desc.clone(), full_batch)
636 .await;
637 Added::RecordAndParts
638 } else {
639 Added::Record
640 };
641 Ok(added)
642 }
643}
644
645#[derive(Debug)]
646pub(crate) struct BatchBuilderInternal<K, V, T, D>
647where
648 K: Codec,
649 V: Codec,
650 T: Timestamp + Lattice + Codec64,
651{
652 shard_id: ShardId,
653 version: Version,
654 blob: Arc<dyn Blob>,
655 metrics: Arc<Metrics>,
656
657 write_schemas: Schemas<K, V>,
658 parts: BatchParts<T>,
659
660 _phantom: PhantomData<fn(K, V, T, D)>,
663}
664
665impl<K, V, T, D> BatchBuilderInternal<K, V, T, D>
666where
667 K: Debug + Codec,
668 V: Debug + Codec,
669 T: Timestamp + Lattice + Codec64,
670 D: Monoid + Codec64,
671{
672 pub(crate) fn new(
673 _cfg: BatchBuilderConfig,
674 parts: BatchParts<T>,
675 metrics: Arc<Metrics>,
676 write_schemas: Schemas<K, V>,
677 blob: Arc<dyn Blob>,
678 shard_id: ShardId,
679 version: Version,
680 ) -> Self {
681 Self {
682 blob,
683 metrics,
684 write_schemas,
685 parts,
686 shard_id,
687 version,
688 _phantom: PhantomData,
689 }
690 }
691
692 #[instrument(level = "debug", name = "batch::finish", fields(shard = %self.shard_id))]
697 pub async fn finish(
698 self,
699 registered_desc: Description<T>,
700 ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
701 let write_run_ids = self.parts.cfg.enable_incremental_compaction;
702 let batch_delete_enabled = self.parts.cfg.batch_delete_enabled;
703 let shard_metrics = Arc::clone(&self.parts.shard_metrics);
704 let runs = self.parts.finish().await;
705
706 let mut run_parts = vec![];
707 let mut run_splits = vec![];
708 let mut run_meta = vec![];
709 let total_updates = runs
710 .iter()
711 .map(|(_, _, num_updates)| num_updates)
712 .sum::<usize>();
713 for (order, parts, num_updates) in runs {
714 if parts.is_empty() {
715 continue;
716 }
717 if run_parts.len() != 0 {
718 run_splits.push(run_parts.len());
719 }
720 run_meta.push(RunMeta {
721 order: Some(order),
722 schema: self.write_schemas.id,
723 deprecated_schema: None,
725 id: if write_run_ids {
726 Some(RunId::new())
727 } else {
728 None
729 },
730 len: if write_run_ids {
731 Some(num_updates)
732 } else {
733 None
734 },
735 meta: MetadataMap::default(),
736 });
737 run_parts.extend(parts);
738 }
739 let desc = registered_desc;
740
741 let batch = Batch::new(
742 batch_delete_enabled,
743 Arc::clone(&self.metrics),
744 self.blob,
745 shard_metrics,
746 self.version,
747 (
748 K::encode_schema(&*self.write_schemas.key),
749 V::encode_schema(&*self.write_schemas.val),
750 ),
751 HollowBatch::new(desc, run_parts, total_updates, run_meta, run_splits),
752 );
753
754 Ok(batch)
755 }
756
757 pub async fn flush_part(&mut self, part_desc: Description<T>, columnar: Part) {
763 let num_updates = columnar.len();
764 if num_updates == 0 {
765 return;
766 }
767 let diffs_sum = diffs_sum::<D>(&columnar.diff);
768
769 let start = Instant::now();
770 self.parts
771 .write(&self.write_schemas, part_desc, columnar, diffs_sum)
772 .await;
773 self.metrics
774 .compaction
775 .batch
776 .step_part_writing
777 .inc_by(start.elapsed().as_secs_f64());
778 }
779}
780
781#[derive(Debug, Clone)]
782pub(crate) struct RunWithMeta<T> {
783 pub parts: Vec<RunPart<T>>,
784 pub num_updates: usize,
785}
786
787impl<T> RunWithMeta<T> {
788 pub fn new(parts: Vec<RunPart<T>>, num_updates: usize) -> Self {
789 Self { parts, num_updates }
790 }
791
792 pub fn single(part: RunPart<T>, num_updates: usize) -> Self {
793 Self {
794 parts: vec![part],
795 num_updates,
796 }
797 }
798}
799
800#[derive(Debug)]
801enum WritingRuns<T> {
802 Ordered(RunOrder, MergeTree<Pending<RunWithMeta<T>>>),
806 Compacting(MergeTree<(RunOrder, Pending<RunWithMeta<T>>)>),
809}
810
811#[derive(Debug)]
814pub(crate) struct BatchParts<T> {
815 cfg: BatchBuilderConfig,
816 metrics: Arc<Metrics>,
817 shard_metrics: Arc<ShardMetrics>,
818 shard_id: ShardId,
819 blob: Arc<dyn Blob>,
820 isolated_runtime: Arc<IsolatedRuntime>,
821 next_index: u64,
822 writing_runs: WritingRuns<T>,
823 batch_metrics: BatchWriteMetrics,
824}
825
826impl<T: Timestamp + Codec64> BatchParts<T> {
827 pub(crate) fn new_compacting<K, V, D>(
828 cfg: CompactConfig,
829 desc: Description<T>,
830 runs_per_compaction: usize,
831 metrics: Arc<Metrics>,
832 shard_metrics: Arc<ShardMetrics>,
833 shard_id: ShardId,
834 blob: Arc<dyn Blob>,
835 isolated_runtime: Arc<IsolatedRuntime>,
836 batch_metrics: &BatchWriteMetrics,
837 schemas: Schemas<K, V>,
838 ) -> Self
839 where
840 K: Codec + Debug,
841 V: Codec + Debug,
842 T: Lattice + Send + Sync,
843 D: Monoid + Ord + Codec64 + Send + Sync,
844 {
845 let writing_runs = {
846 let cfg = cfg.clone();
847 let blob = Arc::clone(&blob);
848 let metrics = Arc::clone(&metrics);
849 let shard_metrics = Arc::clone(&shard_metrics);
850 let isolated_runtime = Arc::clone(&isolated_runtime);
851 let runs_per_compaction = runs_per_compaction.clamp(2, 1024);
853
854 let merge_fn = move |parts: Vec<(RunOrder, Pending<RunWithMeta<T>>)>| {
855 let blob = Arc::clone(&blob);
856 let metrics = Arc::clone(&metrics);
857 let shard_metrics = Arc::clone(&shard_metrics);
858 let cfg = cfg.clone();
859 let isolated_runtime = Arc::clone(&isolated_runtime);
860 let write_schemas = schemas.clone();
861 let compact_desc = desc.clone();
862 let handle = mz_ore::task::spawn(
863 || "batch::compact_runs",
864 async move {
865 let runs: Vec<_> = stream::iter(parts)
866 .then(|(order, parts)| async move {
867 let completed_run = parts.into_result().await;
868 (
869 RunMeta {
870 order: Some(order),
871 schema: schemas.id,
872 deprecated_schema: None,
875 id: if cfg.batch.enable_incremental_compaction {
876 Some(RunId::new())
877 } else {
878 None
879 },
880 len: if cfg.batch.enable_incremental_compaction {
881 Some(completed_run.num_updates)
882 } else {
883 None
884 },
885 meta: MetadataMap::default(),
886 },
887 completed_run.parts,
888 )
889 })
890 .collect()
891 .await;
892
893 let run_refs: Vec<_> = runs
894 .iter()
895 .map(|(meta, run)| (&compact_desc, meta, run.as_slice()))
896 .collect();
897
898 let output_batch = Compactor::<K, V, T, D>::compact_runs(
899 &cfg,
900 &shard_id,
901 &compact_desc,
902 run_refs,
903 blob,
904 metrics,
905 shard_metrics,
906 isolated_runtime,
907 write_schemas,
908 )
909 .await
910 .expect("successful compaction");
911
912 assert_eq!(
913 output_batch.run_meta.len(),
914 1,
915 "compaction is guaranteed to emit a single run"
916 );
917 let total_compacted_updates: usize = output_batch.len;
918
919 RunWithMeta::new(output_batch.parts, total_compacted_updates)
920 }
921 .instrument(debug_span!("batch::compact_runs")),
922 );
923 (RunOrder::Structured, Pending::new(handle))
924 };
925 WritingRuns::Compacting(MergeTree::new(runs_per_compaction, merge_fn))
926 };
927 BatchParts {
928 cfg: cfg.batch,
929 metrics,
930 shard_metrics,
931 shard_id,
932 blob,
933 isolated_runtime,
934 next_index: 0,
935 writing_runs,
936 batch_metrics: batch_metrics.clone(),
937 }
938 }
939
940 pub(crate) fn new_ordered<D: Monoid + Codec64>(
941 cfg: BatchBuilderConfig,
942 order: RunOrder,
943 metrics: Arc<Metrics>,
944 shard_metrics: Arc<ShardMetrics>,
945 shard_id: ShardId,
946 blob: Arc<dyn Blob>,
947 isolated_runtime: Arc<IsolatedRuntime>,
948 batch_metrics: &BatchWriteMetrics,
949 ) -> Self {
950 let writing_runs = {
951 let cfg = cfg.clone();
952 let blob = Arc::clone(&blob);
953 let metrics = Arc::clone(&metrics);
954 let writer_key = cfg.writer_key.clone();
955 let run_length_limit = (order == RunOrder::Unordered)
958 .then_some(usize::MAX)
959 .unwrap_or(cfg.run_length_limit);
960 let merge_fn = move |parts: Vec<Pending<RunWithMeta<T>>>| {
961 let blob = Arc::clone(&blob);
962 let writer_key = writer_key.clone();
963 let metrics = Arc::clone(&metrics);
964 let handle = mz_ore::task::spawn(
965 || "batch::spill_run",
966 async move {
967 let completed_runs: Vec<RunWithMeta<T>> = stream::iter(parts)
968 .then(|p| p.into_result())
969 .collect()
970 .await;
971
972 let mut all_run_parts = Vec::new();
973 let mut total_updates = 0;
974
975 for completed_run in completed_runs {
976 all_run_parts.extend(completed_run.parts);
977 total_updates += completed_run.num_updates;
978 }
979
980 let run_ref = HollowRunRef::set::<D>(
981 shard_id,
982 blob.as_ref(),
983 &writer_key,
984 HollowRun {
985 parts: all_run_parts,
986 },
987 &*metrics,
988 )
989 .await;
990
991 RunWithMeta::single(RunPart::Many(run_ref), total_updates)
992 }
993 .instrument(debug_span!("batch::spill_run")),
994 );
995 Pending::new(handle)
996 };
997 WritingRuns::Ordered(order, MergeTree::new(run_length_limit, merge_fn))
998 };
999 BatchParts {
1000 cfg,
1001 metrics,
1002 shard_metrics,
1003 shard_id,
1004 blob,
1005 isolated_runtime,
1006 next_index: 0,
1007 writing_runs,
1008 batch_metrics: batch_metrics.clone(),
1009 }
1010 }
1011
1012 pub(crate) fn expected_order(&self) -> RunOrder {
1013 match self.writing_runs {
1014 WritingRuns::Ordered(order, _) => order,
1015 WritingRuns::Compacting(_) => RunOrder::Unordered,
1016 }
1017 }
1018
1019 pub(crate) async fn write<K: Codec, V: Codec, D: Codec64>(
1020 &mut self,
1021 write_schemas: &Schemas<K, V>,
1022 desc: Description<T>,
1023 updates: Part,
1024 diffs_sum: D,
1025 ) {
1026 let batch_metrics = self.batch_metrics.clone();
1027 let index = self.next_index;
1028 self.next_index += 1;
1029 let num_updates = updates.len();
1030 let ts_rewrite = None;
1031 let schema_id = write_schemas.id;
1032
1033 let inline_threshold = self.cfg.inline_writes_single_max_bytes;
1036
1037 let updates = BlobTraceUpdates::from_part(updates);
1038 let (name, write_future) = if updates.goodbytes() < inline_threshold {
1039 let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current();
1040 (
1041 "batch::inline_part",
1042 async move {
1043 let start = Instant::now();
1044 let updates = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
1045 desc: Some(desc.into_proto()),
1046 index: index.into_proto(),
1047 updates: Some(updates.into_proto()),
1048 });
1049 batch_metrics
1050 .step_inline
1051 .inc_by(start.elapsed().as_secs_f64());
1052
1053 RunWithMeta::single(
1054 RunPart::Single(BatchPart::Inline {
1055 updates,
1056 ts_rewrite,
1057 schema_id,
1058 deprecated_schema_id: None,
1060 }),
1061 num_updates,
1062 )
1063 }
1064 .instrument(span)
1065 .boxed(),
1066 )
1067 } else {
1068 let part = BlobTraceBatchPart {
1069 desc,
1070 updates,
1071 index,
1072 };
1073 let cfg = self.cfg.clone();
1074 let blob = Arc::clone(&self.blob);
1075 let metrics = Arc::clone(&self.metrics);
1076 let shard_metrics = Arc::clone(&self.shard_metrics);
1077 let isolated_runtime = Arc::clone(&self.isolated_runtime);
1078 let expected_order = self.expected_order();
1079 let encoded_diffs_sum = D::encode(&diffs_sum);
1080 let write_schemas_clone = write_schemas.clone();
1081 let write_span =
1082 debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current();
1083 (
1084 "batch::write_part",
1085 async move {
1086 let part = BatchParts::write_hollow_part(
1087 cfg,
1088 blob,
1089 metrics,
1090 shard_metrics,
1091 batch_metrics,
1092 isolated_runtime,
1093 part,
1094 expected_order,
1095 ts_rewrite,
1096 encoded_diffs_sum,
1097 write_schemas_clone,
1098 )
1099 .await;
1100 RunWithMeta::single(RunPart::Single(part), num_updates)
1101 }
1102 .instrument(write_span)
1103 .boxed(),
1104 )
1105 };
1106
1107 match &mut self.writing_runs {
1108 WritingRuns::Ordered(_order, run) => {
1109 let part = Pending::new(mz_ore::task::spawn(|| name, write_future));
1110 run.push(part);
1111
1112 for part in run
1115 .iter_mut()
1116 .rev()
1117 .skip(self.cfg.batch_builder_max_outstanding_parts)
1118 .take_while(|p| !p.is_finished())
1119 {
1120 self.batch_metrics.write_stalls.inc();
1121 part.block_until_ready().await;
1122 }
1123 }
1124 WritingRuns::Compacting(batches) => {
1125 let run = Pending::Writing(mz_ore::task::spawn(|| name, write_future));
1126 batches.push((RunOrder::Unordered, run));
1127
1128 let mut part_budget = self.cfg.batch_builder_max_outstanding_parts;
1131 let mut compaction_budget = 1;
1132 for (_, part) in batches
1133 .iter_mut()
1134 .rev()
1135 .skip_while(|(order, _)| match order {
1136 RunOrder::Unordered if part_budget > 0 => {
1137 part_budget -= 1;
1138 true
1139 }
1140 RunOrder::Structured | RunOrder::Codec if compaction_budget > 0 => {
1141 compaction_budget -= 1;
1142 true
1143 }
1144 _ => false,
1145 })
1146 .take_while(|(_, p)| !p.is_finished())
1147 {
1148 self.batch_metrics.write_stalls.inc();
1149 part.block_until_ready().await;
1150 }
1151 }
1152 }
1153 }
1154
1155 async fn write_hollow_part<K: Codec, V: Codec>(
1156 cfg: BatchBuilderConfig,
1157 blob: Arc<dyn Blob>,
1158 metrics: Arc<Metrics>,
1159 shard_metrics: Arc<ShardMetrics>,
1160 batch_metrics: BatchWriteMetrics,
1161 isolated_runtime: Arc<IsolatedRuntime>,
1162 mut updates: BlobTraceBatchPart<T>,
1163 run_order: RunOrder,
1164 ts_rewrite: Option<Antichain<T>>,
1165 diffs_sum: [u8; 8],
1166 write_schemas: Schemas<K, V>,
1167 ) -> BatchPart<T> {
1168 let partial_key = PartialBatchKey::new(&cfg.writer_key, &PartId::new());
1169 let key = partial_key.complete(&shard_metrics.shard_id);
1170 let goodbytes = updates.updates.goodbytes();
1171 let metrics_ = Arc::clone(&metrics);
1172 let schema_id = write_schemas.id;
1173
1174 let (stats, key_lower, structured_key_lower, (buf, encode_time)) = isolated_runtime
1175 .spawn_named(|| "batch::encode_part", async move {
1176 let stats = metrics_.columnar.arrow().measure_part_build(|| {
1178 let stats = if cfg.stats_collection_enabled {
1179 let ext = updates.updates.get_or_make_structured::<K, V>(
1180 write_schemas.key.as_ref(),
1181 write_schemas.val.as_ref(),
1182 );
1183
1184 let key_stats = write_schemas
1185 .key
1186 .decoder_any(ext.key.as_ref())
1187 .expect("decoding just-encoded data")
1188 .stats();
1189
1190 let part_stats = PartStats { key: key_stats };
1191
1192 let trimmed_start = Instant::now();
1194 let mut trimmed_bytes = 0;
1195 let trimmed_stats = LazyPartStats::encode(&part_stats, |s| {
1196 trimmed_bytes = trim_to_budget(s, cfg.stats_budget, |s| {
1197 cfg.stats_untrimmable_columns.should_retain(s)
1198 })
1199 });
1200 let trimmed_duration = trimmed_start.elapsed();
1201 Some((trimmed_stats, trimmed_duration, trimmed_bytes))
1202 } else {
1203 None
1204 };
1205
1206 updates.updates = updates.updates.as_structured::<K, V>(
1208 write_schemas.key.as_ref(),
1209 write_schemas.val.as_ref(),
1210 );
1211
1212 stats
1213 });
1214
1215 let key_lower = if let Some(records) = updates.updates.records() {
1216 let key_bytes = records.keys();
1217 if key_bytes.is_empty() {
1218 &[]
1219 } else if run_order == RunOrder::Codec {
1220 key_bytes.value(0)
1221 } else {
1222 ::arrow::compute::min_binary(key_bytes).expect("min of nonempty array")
1223 }
1224 } else {
1225 &[]
1226 };
1227 let key_lower = truncate_bytes(key_lower, TRUNCATE_LEN, TruncateBound::Lower)
1228 .expect("lower bound always exists");
1229
1230 let structured_key_lower = if cfg.structured_key_lower_len > 0 {
1231 updates.updates.structured().and_then(|ext| {
1232 let min_key = if run_order == RunOrder::Structured {
1233 0
1234 } else {
1235 let ord = ArrayOrd::new(ext.key.as_ref());
1236 (0..ext.key.len())
1237 .min_by_key(|i| ord.at(*i))
1238 .expect("non-empty batch")
1239 };
1240 let lower = ArrayBound::new(Arc::clone(&ext.key), min_key)
1241 .to_proto_lower(cfg.structured_key_lower_len);
1242 if lower.is_none() {
1243 batch_metrics.key_lower_too_big.inc()
1244 }
1245 lower.map(|proto| LazyProto::from(&proto))
1246 })
1247 } else {
1248 None
1249 };
1250
1251 let encode_start = Instant::now();
1252 let mut buf = Vec::new();
1253 updates.encode(&mut buf, &metrics_.columnar, &cfg.encoding_config);
1254
1255 drop(updates);
1257 (
1258 stats,
1259 key_lower,
1260 structured_key_lower,
1261 (Bytes::from(buf), encode_start.elapsed()),
1262 )
1263 })
1264 .instrument(debug_span!("batch::encode_part"))
1265 .await;
1266 metrics.codecs.batch.encode_count.inc();
1268 metrics
1269 .codecs
1270 .batch
1271 .encode_seconds
1272 .inc_by(encode_time.as_secs_f64());
1273
1274 let start = Instant::now();
1275 let payload_len = buf.len();
1276 let () = retry_external(&metrics.retries.external.batch_set, || async {
1277 shard_metrics.blob_sets.inc();
1278 blob.set(&key, Bytes::clone(&buf)).await
1279 })
1280 .instrument(trace_span!("batch::set", payload_len))
1281 .await;
1282 batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64());
1283 batch_metrics.bytes.inc_by(u64::cast_from(payload_len));
1284 batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes));
1285 match run_order {
1286 RunOrder::Unordered => batch_metrics.unordered.inc(),
1287 RunOrder::Codec => batch_metrics.codec_order.inc(),
1288 RunOrder::Structured => batch_metrics.structured_order.inc(),
1289 }
1290 let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| {
1291 batch_metrics
1292 .step_stats
1293 .inc_by(stats_step_timing.as_secs_f64());
1294 if trimmed_bytes > 0 {
1295 metrics.pushdown.parts_stats_trimmed_count.inc();
1296 metrics
1297 .pushdown
1298 .parts_stats_trimmed_bytes
1299 .inc_by(u64::cast_from(trimmed_bytes));
1300 }
1301 stats
1302 });
1303
1304 let meta = MetadataMap::default();
1305 BatchPart::Hollow(HollowBatchPart {
1306 key: partial_key,
1307 meta,
1308 encoded_size_bytes: payload_len,
1309 key_lower,
1310 structured_key_lower,
1311 stats,
1312 ts_rewrite,
1313 diffs_sum: Some(diffs_sum),
1314 format: Some(BatchColumnarFormat::Structured),
1315 schema_id,
1316 deprecated_schema_id: None,
1318 })
1319 }
1320
1321 #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))]
1322 pub(crate) async fn finish(self) -> Vec<(RunOrder, Vec<RunPart<T>>, usize)> {
1323 match self.writing_runs {
1324 WritingRuns::Ordered(RunOrder::Unordered, run) => {
1325 let completed_runs = run.finish();
1326 let mut output = Vec::with_capacity(completed_runs.len());
1327 for completed_run in completed_runs {
1328 let completed_run = completed_run.into_result().await;
1329 for part in completed_run.parts {
1331 output.push((RunOrder::Unordered, vec![part], completed_run.num_updates));
1332 }
1333 }
1334 output
1335 }
1336 WritingRuns::Ordered(order, run) => {
1337 let completed_runs = run.finish();
1338 let mut all_parts = Vec::new();
1339 let mut all_update_counts = 0;
1340 for completed_run in completed_runs {
1341 let completed_run = completed_run.into_result().await;
1342 all_parts.extend(completed_run.parts);
1343 all_update_counts += completed_run.num_updates;
1344 }
1345 vec![(order, all_parts, all_update_counts)]
1346 }
1347 WritingRuns::Compacting(batches) => {
1348 let runs = batches.finish();
1349 let mut output = Vec::new();
1350 for (order, run) in runs {
1351 let completed_run = run.into_result().await;
1352 output.push((order, completed_run.parts, completed_run.num_updates));
1353 }
1354 output
1355 }
1356 }
1357 }
1358}
1359
1360pub(crate) fn validate_truncate_batch<T: Timestamp>(
1361 batch: &HollowBatch<T>,
1362 truncate: &Description<T>,
1363 any_batch_rewrite: bool,
1364 validate_part_bounds_on_write: bool,
1365) -> Result<(), InvalidUsage<T>> {
1366 if any_batch_rewrite {
1369 if truncate.upper() != batch.desc.upper() {
1374 return Err(InvalidUsage::InvalidRewrite(format!(
1375 "rewritten batch might have data past {:?} up to {:?}",
1376 truncate.upper().elements(),
1377 batch.desc.upper().elements(),
1378 )));
1379 }
1380 for part in batch.parts.iter() {
1383 let part_lower_bound = part.ts_rewrite().unwrap_or_else(|| batch.desc.lower());
1384 if !PartialOrder::less_equal(truncate.lower(), part_lower_bound) {
1385 return Err(InvalidUsage::InvalidRewrite(format!(
1386 "rewritten batch might have data below {:?} at {:?}",
1387 truncate.lower().elements(),
1388 part_lower_bound.elements(),
1389 )));
1390 }
1391 }
1392 }
1393
1394 if !validate_part_bounds_on_write {
1395 return Ok(());
1396 }
1397
1398 let batch = &batch.desc;
1399 if !PartialOrder::less_equal(batch.lower(), truncate.lower())
1400 || PartialOrder::less_than(batch.upper(), truncate.upper())
1401 {
1402 return Err(InvalidUsage::InvalidBatchBounds {
1403 batch_lower: batch.lower().clone(),
1404 batch_upper: batch.upper().clone(),
1405 append_lower: truncate.lower().clone(),
1406 append_upper: truncate.upper().clone(),
1407 });
1408 }
1409
1410 Ok(())
1411}
1412
1413#[derive(Debug)]
1414pub(crate) struct PartDeletes<T> {
1415 blob_keys: BTreeSet<PartialBatchKey>,
1417 hollow_runs: BTreeMap<PartialBatchKey, HollowRunRef<T>>,
1419}
1420
1421impl<T> Default for PartDeletes<T> {
1422 fn default() -> Self {
1423 Self {
1424 blob_keys: Default::default(),
1425 hollow_runs: Default::default(),
1426 }
1427 }
1428}
1429
1430impl<T: Timestamp> PartDeletes<T> {
1431 pub fn add(&mut self, part: &RunPart<T>) -> bool {
1434 match part {
1435 RunPart::Many(r) => self.hollow_runs.insert(r.key.clone(), r.clone()).is_none(),
1436 RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.insert(x.key.clone()),
1437 RunPart::Single(BatchPart::Inline { .. }) => {
1438 true
1440 }
1441 }
1442 }
1443
1444 pub fn contains(&self, part: &RunPart<T>) -> bool {
1445 match part {
1446 RunPart::Many(r) => self.hollow_runs.contains_key(&r.key),
1447 RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.contains(&x.key),
1448 RunPart::Single(BatchPart::Inline { .. }) => false,
1449 }
1450 }
1451
1452 pub fn is_empty(&self) -> bool {
1453 self.len() == 0
1454 }
1455
1456 pub fn len(&self) -> usize {
1457 match self {
1458 Self {
1459 blob_keys,
1460 hollow_runs,
1461 } => blob_keys.len() + hollow_runs.len(),
1462 }
1463 }
1464
1465 pub async fn delete(
1466 mut self,
1467 blob: &dyn Blob,
1468 shard_id: ShardId,
1469 concurrency: usize,
1470 metrics: &Metrics,
1471 delete_metrics: &RetryMetrics,
1472 ) where
1473 T: Codec64,
1474 {
1475 loop {
1476 let () = stream::iter(mem::take(&mut self.blob_keys))
1477 .map(|key| {
1478 let key = key.complete(&shard_id);
1479 async move {
1480 retry_external(delete_metrics, || blob.delete(&key)).await;
1481 }
1482 })
1483 .buffer_unordered(concurrency)
1484 .collect()
1485 .await;
1486
1487 let Some((run_key, run_ref)) = self.hollow_runs.pop_first() else {
1488 break;
1489 };
1490
1491 if let Some(run) = run_ref.get(shard_id, blob, metrics).await {
1492 for part in &run.parts {
1494 self.add(part);
1495 }
1496 self.blob_keys.insert(run_key);
1497 };
1498 }
1499 }
1500}
1501
1502fn diffs_sum<D: Monoid + Codec64>(updates: &Int64Array) -> D {
1504 let mut sum = D::zero();
1505 for d in updates.values().iter() {
1506 let d = D::decode(d.to_le_bytes());
1507 sum.plus_equals(&d);
1508 }
1509 sum
1510}
1511
1512#[cfg(test)]
1513mod tests {
1514 use mz_dyncfg::ConfigUpdates;
1515
1516 use super::*;
1517 use crate::PersistLocation;
1518 use crate::cache::PersistClientCache;
1519 use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
1520 use crate::internal::paths::{BlobKey, PartialBlobKey};
1521 use crate::tests::{all_ok, new_test_client};
1522
1523 #[mz_ore::test(tokio::test)]
1524 #[cfg_attr(miri, ignore)] async fn batch_builder_flushing() {
1526 let data = vec![
1527 (("1".to_owned(), "one".to_owned()), 1, 1),
1528 (("2".to_owned(), "two".to_owned()), 2, 1),
1529 (("3".to_owned(), "three".to_owned()), 3, 1),
1530 (("4".to_owned(), "four".to_owned()), 4, 1),
1531 ];
1532
1533 let cache = PersistClientCache::new_no_metrics();
1534
1535 cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1539 cache.cfg.set_config(&MAX_RUNS, 3);
1540 cache
1541 .cfg
1542 .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 2);
1543
1544 let client = cache
1545 .open(PersistLocation::new_in_mem())
1546 .await
1547 .expect("client construction failed");
1548 let (mut write, mut read) = client
1549 .expect_open::<String, String, u64, i64>(ShardId::new())
1550 .await;
1551
1552 let mut builder = write.builder(Antichain::from_elem(0));
1554
1555 fn assert_writing(
1556 builder: &BatchBuilder<String, String, u64, i64>,
1557 expected_finished: &[bool],
1558 ) {
1559 let WritingRuns::Compacting(run) = &builder.builder.parts.writing_runs else {
1560 unreachable!("ordered run!")
1561 };
1562
1563 let actual: Vec<_> = run.iter().map(|(_, p)| p.is_finished()).collect();
1564 assert_eq!(*expected_finished, actual);
1565 }
1566
1567 assert_writing(&builder, &[]);
1568
1569 let ((k, v), t, d) = &data[0];
1572 builder.add(k, v, t, d).await.expect("invalid usage");
1573 assert_writing(&builder, &[false]);
1574
1575 let ((k, v), t, d) = &data[1];
1578 builder.add(k, v, t, d).await.expect("invalid usage");
1579 assert_writing(&builder, &[false, false]);
1580
1581 let ((k, v), t, d) = &data[2];
1584 builder.add(k, v, t, d).await.expect("invalid usage");
1585 assert_writing(&builder, &[true, false, false]);
1586
1587 let ((k, v), t, d) = &data[3];
1590 builder.add(k, v, t, d).await.expect("invalid usage");
1591 assert_writing(&builder, &[false, false]);
1592
1593 let batch = builder
1596 .finish(Antichain::from_elem(5))
1597 .await
1598 .expect("invalid usage");
1599 assert_eq!(batch.batch.runs().count(), 2);
1600 assert_eq!(batch.batch.part_count(), 4);
1601 write
1602 .append_batch(batch, Antichain::from_elem(0), Antichain::from_elem(5))
1603 .await
1604 .expect("invalid usage")
1605 .expect("unexpected upper");
1606 assert_eq!(read.expect_snapshot_and_fetch(4).await, all_ok(&data, 4));
1607 }
1608
1609 #[mz_ore::test(tokio::test)]
1610 #[cfg_attr(miri, ignore)] async fn batch_builder_keys() {
1612 let cache = PersistClientCache::new_no_metrics();
1613 cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1615 cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
1617 cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1618 cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1619 let client = cache
1620 .open(PersistLocation::new_in_mem())
1621 .await
1622 .expect("client construction failed");
1623 let shard_id = ShardId::new();
1624 let (mut write, _) = client
1625 .expect_open::<String, String, u64, i64>(shard_id)
1626 .await;
1627
1628 let batch = write
1629 .expect_batch(
1630 &[
1631 (("1".into(), "one".into()), 1, 1),
1632 (("2".into(), "two".into()), 2, 1),
1633 (("3".into(), "three".into()), 3, 1),
1634 ],
1635 0,
1636 4,
1637 )
1638 .await;
1639
1640 assert_eq!(batch.batch.part_count(), 3);
1641 for part in &batch.batch.parts {
1642 let part = part.expect_hollow_part();
1643 match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
1644 Ok((shard, PartialBlobKey::Batch(writer, _))) => {
1645 assert_eq!(shard.to_string(), shard_id.to_string());
1646 assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
1647 }
1648 _ => panic!("unparseable blob key"),
1649 }
1650 }
1651 }
1652
1653 #[mz_ore::test(tokio::test)]
1654 #[cfg_attr(miri, ignore)] async fn batch_delete() {
1656 let cache = PersistClientCache::new_no_metrics();
1657 cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1658 cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1659 cache.cfg.set_config(&BATCH_DELETE_ENABLED, true);
1660 let client = cache
1661 .open(PersistLocation::new_in_mem())
1662 .await
1663 .expect("client construction failed");
1664 let shard_id = ShardId::new();
1665 let (mut write, _) = client
1666 .expect_open::<String, String, u64, i64>(shard_id)
1667 .await;
1668
1669 let batch = write
1670 .expect_batch(
1671 &[
1672 (("1".into(), "one".into()), 1, 1),
1673 (("2".into(), "two".into()), 2, 1),
1674 (("3".into(), "three".into()), 3, 1),
1675 ],
1676 0,
1677 4,
1678 )
1679 .await;
1680
1681 assert_eq!(batch.batch.part_count(), 1);
1682 let part_key = batch.batch.parts[0]
1683 .expect_hollow_part()
1684 .key
1685 .complete(&shard_id);
1686
1687 let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1688 assert!(part_bytes.is_some());
1689
1690 batch.delete().await;
1691
1692 let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1693 assert!(part_bytes.is_none());
1694 }
1695
1696 #[mz_ore::test]
1697 fn untrimmable_columns() {
1698 let untrimmable = UntrimmableColumns {
1699 equals: vec!["abc".into(), "def".into()],
1700 prefixes: vec!["123".into(), "234".into()],
1701 suffixes: vec!["xyz".into()],
1702 };
1703
1704 assert!(untrimmable.should_retain("abc"));
1706 assert!(untrimmable.should_retain("ABC"));
1707 assert!(untrimmable.should_retain("aBc"));
1708 assert!(!untrimmable.should_retain("abcd"));
1709 assert!(untrimmable.should_retain("deF"));
1710 assert!(!untrimmable.should_retain("defg"));
1711
1712 assert!(untrimmable.should_retain("123"));
1714 assert!(untrimmable.should_retain("123-4"));
1715 assert!(untrimmable.should_retain("1234"));
1716 assert!(untrimmable.should_retain("234"));
1717 assert!(!untrimmable.should_retain("345"));
1718
1719 assert!(untrimmable.should_retain("ijk_xyZ"));
1721 assert!(untrimmable.should_retain("ww-XYZ"));
1722 assert!(!untrimmable.should_retain("xya"));
1723 }
1724
1725 #[mz_persist_proc::test(tokio::test)]
1727 #[cfg_attr(miri, ignore)] async fn rewrite_ts_example(dyncfgs: ConfigUpdates) {
1729 let client = new_test_client(&dyncfgs).await;
1730 let (mut write, read) = client
1731 .expect_open::<String, (), u64, i64>(ShardId::new())
1732 .await;
1733
1734 let mut batch = write.builder(Antichain::from_elem(0));
1735 batch.add(&"foo".to_owned(), &(), &0, &1).await.unwrap();
1736 let batch = batch.finish(Antichain::from_elem(1)).await.unwrap();
1737
1738 let batch = batch.into_transmittable_batch();
1740 let mut batch = write.batch_from_transmittable_batch(batch);
1741 batch
1742 .rewrite_ts(&Antichain::from_elem(2), Antichain::from_elem(3))
1743 .unwrap();
1744 write
1745 .expect_compare_and_append_batch(&mut [&mut batch], 0, 3)
1746 .await;
1747
1748 let (actual, _) = read.expect_listen(0).await.read_until(&3).await;
1749 let expected = vec![((("foo".to_owned()), ()), 2, 1)];
1750 assert_eq!(actual, expected);
1751 }
1752
1753 #[mz_ore::test(tokio::test)]
1754 #[cfg_attr(miri, ignore)] async fn structured_lowers() {
1756 let cache = PersistClientCache::new_no_metrics();
1757 cache.cfg().set_config(&STRUCTURED_KEY_LOWER_LEN, 1024);
1759 cache.cfg().set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1761 cache.cfg().set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1762 let client = cache
1763 .open(PersistLocation::new_in_mem())
1764 .await
1765 .expect("client construction failed");
1766 let shard_id = ShardId::new();
1767 let (mut write, _) = client
1768 .expect_open::<String, String, u64, i64>(shard_id)
1769 .await;
1770
1771 let batch = write
1772 .expect_batch(
1773 &[
1774 (("1".into(), "one".into()), 1, 1),
1775 (("2".into(), "two".into()), 2, 1),
1776 (("3".into(), "three".into()), 3, 1),
1777 ],
1778 0,
1779 4,
1780 )
1781 .await;
1782
1783 assert_eq!(batch.batch.part_count(), 1);
1784 let [part] = batch.batch.parts.as_slice() else {
1785 panic!("expected single part")
1786 };
1787 assert!(part.structured_key_lower().is_some());
1789 }
1790}