Skip to main content

mz_persist_client/
batch.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A handle to a batch of updates
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::marker::PhantomData;
16use std::mem;
17use std::sync::Arc;
18use std::time::Instant;
19
20use arrow::array::{Array, Int64Array};
21use bytes::Bytes;
22use differential_dataflow::difference::Monoid;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::stream::StreamExt;
26use futures_util::{FutureExt, stream};
27use mz_dyncfg::Config;
28use mz_ore::cast::CastFrom;
29use mz_ore::instrument;
30use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::Blob;
32use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
33use mz_persist_types::columnar::{ColumnDecoder, Schema};
34use mz_persist_types::parquet::{CompressionFormat, EncodingConfig};
35use mz_persist_types::part::{Part, PartBuilder};
36use mz_persist_types::schema::SchemaId;
37use mz_persist_types::stats::{
38    PartStats, TRUNCATE_LEN, TruncateBound, trim_to_budget, truncate_bytes,
39};
40use mz_persist_types::{Codec, Codec64};
41use mz_proto::RustType;
42use mz_timely_util::order::Reverse;
43use proptest_derive::Arbitrary;
44use semver::Version;
45use timely::PartialOrder;
46use timely::order::TotalOrder;
47use timely::progress::{Antichain, Timestamp};
48use tracing::{Instrument, debug_span, trace_span, warn};
49
50use crate::async_runtime::IsolatedRuntime;
51use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
52use crate::error::InvalidUsage;
53use crate::internal::compact::{CompactConfig, Compactor};
54use crate::internal::encoding::{
55    LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, Schemas,
56};
57use crate::internal::machine::retry_external;
58use crate::internal::merge::{MergeTree, Pending};
59use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
60use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
61use crate::internal::state::{
62    BatchPart, ENABLE_INCREMENTAL_COMPACTION, HollowBatch, HollowBatchPart, HollowRun,
63    HollowRunRef, ProtoInlineBatchPart, RunId, RunMeta, RunOrder, RunPart,
64};
65use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns};
66use crate::{PersistConfig, ShardId};
67
68include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs"));
69
70/// A handle to a batch of updates that has been written to blob storage but
71/// which has not yet been appended to a shard.
72///
73/// A [Batch] needs to be marked as consumed or it needs to be deleted via [Self::delete].
74/// Otherwise, a dangling batch will leak and backing blobs will remain in blob storage.
75#[derive(Debug)]
76pub struct Batch<K, V, T, D> {
77    pub(crate) batch_delete_enabled: bool,
78    pub(crate) metrics: Arc<Metrics>,
79    pub(crate) shard_metrics: Arc<ShardMetrics>,
80
81    /// The version of Materialize which wrote this batch.
82    pub(crate) version: Version,
83
84    /// The encoded schemas of the data in the batch.
85    pub(crate) schemas: (Bytes, Bytes),
86
87    /// A handle to the data represented by this batch.
88    pub(crate) batch: HollowBatch<T>,
89
90    /// Handle to the [Blob] that the blobs of this batch were uploaded to.
91    pub(crate) blob: Arc<dyn Blob>,
92
93    // These provide a bit more safety against appending a batch with the wrong
94    // type to a shard.
95    pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
96}
97
98impl<K, V, T, D> Drop for Batch<K, V, T, D> {
99    fn drop(&mut self) {
100        if self.batch.part_count() > 0 {
101            warn!(
102                "un-consumed Batch, with {} parts and dangling blob keys: {:?}",
103                self.batch.part_count(),
104                self.batch
105                    .parts
106                    .iter()
107                    .map(|x| x.printable_name())
108                    .collect::<Vec<_>>(),
109            );
110        }
111    }
112}
113
114impl<K, V, T, D> Batch<K, V, T, D>
115where
116    K: Debug + Codec,
117    V: Debug + Codec,
118    T: Timestamp + Lattice + Codec64,
119    D: Monoid + Codec64,
120{
121    pub(crate) fn new(
122        batch_delete_enabled: bool,
123        metrics: Arc<Metrics>,
124        blob: Arc<dyn Blob>,
125        shard_metrics: Arc<ShardMetrics>,
126        version: Version,
127        schemas: (Bytes, Bytes),
128        batch: HollowBatch<T>,
129    ) -> Self {
130        Self {
131            batch_delete_enabled,
132            metrics,
133            shard_metrics,
134            version,
135            schemas,
136            batch,
137            blob,
138            _phantom: PhantomData,
139        }
140    }
141
142    /// The `shard_id` of this [Batch].
143    pub fn shard_id(&self) -> ShardId {
144        self.shard_metrics.shard_id
145    }
146
147    /// The `upper` of this [Batch].
148    pub fn upper(&self) -> &Antichain<T> {
149        self.batch.desc.upper()
150    }
151
152    /// The `lower` of this [Batch].
153    pub fn lower(&self) -> &Antichain<T> {
154        self.batch.desc.lower()
155    }
156
157    /// Marks the blobs that this batch handle points to as consumed, likely
158    /// because they were appended to a shard.
159    ///
160    /// Consumers of a blob need to make this explicit, so that we can log
161    /// warnings in case a batch is not used.
162    pub(crate) fn mark_consumed(&mut self) {
163        self.batch.parts.clear();
164    }
165
166    /// Deletes the blobs that make up this batch from the given blob store and
167    /// marks them as deleted.
168    #[instrument(level = "debug", fields(shard = %self.shard_id()))]
169    pub async fn delete(mut self) {
170        if !self.batch_delete_enabled {
171            self.mark_consumed();
172            return;
173        }
174        let mut deletes = PartDeletes::default();
175        for part in self.batch.parts.drain(..) {
176            deletes.add(&part);
177        }
178        let () = deletes
179            .delete(
180                &*self.blob,
181                self.shard_id(),
182                usize::MAX,
183                &*self.metrics,
184                &*self.metrics.retries.external.batch_delete,
185            )
186            .await;
187    }
188
189    /// Returns the schemas of parts in this batch.
190    pub fn schemas(&self) -> impl Iterator<Item = SchemaId> + '_ {
191        self.batch.parts.iter().flat_map(|b| b.schema_id())
192    }
193
194    /// Turns this [`Batch`] into a `HollowBatch`.
195    ///
196    /// **NOTE**: If this batch is not eventually appended to a shard or
197    /// dropped, the data that it represents will have leaked.
198    pub fn into_hollow_batch(mut self) -> HollowBatch<T> {
199        let ret = self.batch.clone();
200        self.mark_consumed();
201        ret
202    }
203
204    /// Turns this [`Batch`] into a [`ProtoBatch`], which can be used to
205    /// transfer this batch across process boundaries, for example when
206    /// exchanging data between timely workers.
207    ///
208    /// **NOTE**: If this batch is not eventually appended to a shard or
209    /// dropped, the data that it represents will have leaked. The caller is
210    /// responsible for turning this back into a [`Batch`] using
211    /// [`WriteHandle::batch_from_transmittable_batch`](crate::write::WriteHandle::batch_from_transmittable_batch).
212    pub fn into_transmittable_batch(mut self) -> ProtoBatch {
213        let ret = ProtoBatch {
214            shard_id: self.shard_metrics.shard_id.into_proto(),
215            version: self.version.to_string(),
216            batch: Some(self.batch.into_proto()),
217            key_schema: self.schemas.0.clone(),
218            val_schema: self.schemas.1.clone(),
219        };
220        self.mark_consumed();
221        ret
222    }
223
224    pub(crate) async fn flush_to_blob(
225        &mut self,
226        cfg: &BatchBuilderConfig,
227        batch_metrics: &BatchWriteMetrics,
228        isolated_runtime: &Arc<IsolatedRuntime>,
229        write_schemas: &Schemas<K, V>,
230    ) {
231        // It's necessary for correctness to keep the parts in the same order.
232        // We could introduce concurrency here with FuturesOrdered, but it would
233        // be pretty unexpected to have inline writes in more than one part, so
234        // don't bother.
235        let mut parts = Vec::new();
236        for (run_meta, run_parts) in self.batch.runs() {
237            for part in run_parts {
238                let (updates, ts_rewrite, schema_id) = match part {
239                    RunPart::Single(BatchPart::Inline {
240                        updates,
241                        ts_rewrite,
242                        schema_id,
243                        deprecated_schema_id: _,
244                    }) => (updates, ts_rewrite, schema_id),
245                    other @ RunPart::Many(_) | other @ RunPart::Single(BatchPart::Hollow(_)) => {
246                        parts.push(other.clone());
247                        continue;
248                    }
249                };
250                let updates = updates
251                    .decode::<T>(&self.metrics.columnar)
252                    .expect("valid inline part");
253                let diffs_sum = diffs_sum::<D>(updates.updates.diffs());
254                let mut write_schemas = write_schemas.clone();
255                write_schemas.id = *schema_id;
256
257                let write_span =
258                    debug_span!("batch::flush_to_blob", shard = %self.shard_metrics.shard_id)
259                        .or_current();
260                let handle = mz_ore::task::spawn(
261                    || "batch::flush_to_blob",
262                    BatchParts::write_hollow_part(
263                        cfg.clone(),
264                        Arc::clone(&self.blob),
265                        Arc::clone(&self.metrics),
266                        Arc::clone(&self.shard_metrics),
267                        batch_metrics.clone(),
268                        Arc::clone(isolated_runtime),
269                        updates,
270                        run_meta.order.unwrap_or(RunOrder::Unordered),
271                        ts_rewrite.clone(),
272                        D::encode(&diffs_sum),
273                        write_schemas,
274                    )
275                    .instrument(write_span),
276                );
277                let part = handle.await;
278                parts.push(RunPart::Single(part));
279            }
280        }
281        self.batch.parts = parts;
282    }
283
284    /// The sum of the encoded sizes of all parts in the batch.
285    pub fn encoded_size_bytes(&self) -> usize {
286        self.batch.encoded_size_bytes()
287    }
288}
289
290impl<K, V, T, D> Batch<K, V, T, D>
291where
292    K: Debug + Codec,
293    V: Debug + Codec,
294    T: Timestamp + Lattice + Codec64 + TotalOrder,
295    D: Monoid + Codec64,
296{
297    /// Efficiently rewrites the timestamps in this not-yet-committed batch.
298    ///
299    /// This [Batch] represents potentially large amounts of data, which may
300    /// have partly or entirely been spilled to s3. This call bulk edits the
301    /// timestamps of all data in this batch in a metadata-only operation (i.e.
302    /// without network calls).
303    ///
304    /// Specifically, every timestamp in the batch is logically advanced_by the
305    /// provided `frontier`.
306    ///
307    /// This method may be called multiple times, with later calls overriding
308    /// previous ones, but the rewrite frontier may not regress across calls.
309    ///
310    /// When this batch was created, it was given an `upper`, which bounds the
311    /// staged data it represents. To allow rewrite past this original `upper`,
312    /// this call accepts a new `upper` which replaces the previous one. Like
313    /// the rewrite frontier, the upper may not regress across calls.
314    ///
315    /// Multiple batches with various rewrite frontiers may be used in a single
316    /// [crate::write::WriteHandle::compare_and_append_batch] call. This is an
317    /// expected usage.
318    ///
319    /// This feature requires that the timestamp impls `TotalOrder`. This is
320    /// because we need to be able to verify that the contained data, after the
321    /// rewrite forward operation, still respects the new upper. It turns out
322    /// that, given the metadata persist currently collects during batch
323    /// collection, this is possible for totally ordered times, but it's known
324    /// to be _not possible_ for partially ordered times. It is believed that we
325    /// could fix this by collecting different metadata in batch creation (e.g.
326    /// the join of or an antichain of the original contained timestamps), but
327    /// the experience of database-issues#7825 has shaken our confidence in our own abilities
328    /// to reason about partially ordered times and anyway all the initial uses
329    /// have totally ordered times.
330    pub fn rewrite_ts(
331        &mut self,
332        frontier: &Antichain<T>,
333        new_upper: Antichain<T>,
334    ) -> Result<(), InvalidUsage<T>> {
335        self.batch
336            .rewrite_ts(frontier, new_upper)
337            .map_err(InvalidUsage::InvalidRewrite)
338    }
339}
340
341/// Indicates what work was done in a call to [BatchBuilder::add]
342#[derive(Debug)]
343pub enum Added {
344    /// A record was inserted into a pending batch part
345    Record,
346    /// A record was inserted into a pending batch part
347    /// and the part was sent to blob storage
348    RecordAndParts,
349}
350
351/// A snapshot of dynamic configs to make it easier to reason about an individual
352/// run of BatchBuilder.
353#[derive(Debug, Clone)]
354pub struct BatchBuilderConfig {
355    writer_key: WriterKey,
356    pub(crate) blob_target_size: usize,
357    pub(crate) batch_delete_enabled: bool,
358    pub(crate) batch_builder_max_outstanding_parts: usize,
359    pub(crate) inline_writes_single_max_bytes: usize,
360    pub(crate) stats_collection_enabled: bool,
361    pub(crate) stats_budget: usize,
362    pub(crate) stats_untrimmable_columns: Arc<UntrimmableColumns>,
363    pub(crate) encoding_config: EncodingConfig,
364    pub(crate) preferred_order: RunOrder,
365    pub(crate) structured_key_lower_len: usize,
366    pub(crate) run_length_limit: usize,
367    pub(crate) enable_incremental_compaction: bool,
368    /// The number of runs to cap the built batch at, or None if we should
369    /// continue to generate one run per part for unordered batches.
370    /// See the config definition for details.
371    pub(crate) max_runs: Option<usize>,
372}
373
374// TODO: Remove this once we're comfortable that there aren't any bugs.
375pub(crate) const BATCH_DELETE_ENABLED: Config<bool> = Config::new(
376    "persist_batch_delete_enabled",
377    true,
378    "Whether to actually delete blobs when batch delete is called (Materialize).",
379);
380
381pub(crate) const ENCODING_ENABLE_DICTIONARY: Config<bool> = Config::new(
382    "persist_encoding_enable_dictionary",
383    true,
384    "A feature flag to enable dictionary encoding for Parquet data (Materialize).",
385);
386
387pub(crate) const ENCODING_COMPRESSION_FORMAT: Config<&'static str> = Config::new(
388    "persist_encoding_compression_format",
389    "none",
390    "A feature flag to enable compression of Parquet data (Materialize).",
391);
392
393pub(crate) const STRUCTURED_KEY_LOWER_LEN: Config<usize> = Config::new(
394    "persist_batch_structured_key_lower_len",
395    256,
396    "The maximum size in proto bytes of any structured key-lower metadata to preserve. \
397    (If we're unable to fit the lower in budget, or the budget is zero, no metadata is kept.)",
398);
399
400pub(crate) const MAX_RUN_LEN: Config<usize> = Config::new(
401    "persist_batch_max_run_len",
402    usize::MAX,
403    "The maximum length a run can have before it will be spilled as a hollow run \
404    into the blob store.",
405);
406
407pub(crate) const MAX_RUNS: Config<usize> = Config::new(
408    "persist_batch_max_runs",
409    1,
410    "The maximum number of runs a batch builder should generate for user batches. \
411    (Compaction outputs always generate a single run.) \
412    The minimum value is 2; below this, compaction is disabled.",
413);
414
415/// A target maximum size of blob payloads in bytes. If a logical "batch" is
416/// bigger than this, it will be broken up into smaller, independent pieces.
417/// This is best-effort, not a guarantee (though as of 2022-06-09, we happen to
418/// always respect it). This target size doesn't apply for an individual update
419/// that exceeds it in size, but that scenario is almost certainly a mis-use of
420/// the system.
421pub(crate) const BLOB_TARGET_SIZE: Config<usize> = Config::new(
422    "persist_blob_target_size",
423    128 * MiB,
424    "A target maximum size of persist blob payloads in bytes (Materialize).",
425);
426
427pub(crate) const INLINE_WRITES_SINGLE_MAX_BYTES: Config<usize> = Config::new(
428    "persist_inline_writes_single_max_bytes",
429    4096,
430    "The (exclusive) maximum size of a write that persist will inline in metadata.",
431);
432
433pub(crate) const INLINE_WRITES_TOTAL_MAX_BYTES: Config<usize> = Config::new(
434    "persist_inline_writes_total_max_bytes",
435    1 * MiB,
436    "\
437    The (exclusive) maximum total size of inline writes in metadata before \
438    persist will backpressure them by flushing out to s3.",
439);
440
441impl BatchBuilderConfig {
442    /// Initialize a batch builder config based on a snapshot of the Persist config.
443    pub fn new(value: &PersistConfig, _shard_id: ShardId) -> Self {
444        let writer_key = WriterKey::for_version(&value.build_version);
445
446        let preferred_order = RunOrder::Structured;
447
448        BatchBuilderConfig {
449            writer_key,
450            blob_target_size: BLOB_TARGET_SIZE.get(value).clamp(1, usize::MAX),
451            batch_delete_enabled: BATCH_DELETE_ENABLED.get(value),
452            batch_builder_max_outstanding_parts: BATCH_BUILDER_MAX_OUTSTANDING_PARTS.get(value),
453            inline_writes_single_max_bytes: INLINE_WRITES_SINGLE_MAX_BYTES.get(value),
454            stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value),
455            stats_budget: STATS_BUDGET_BYTES.get(value),
456            stats_untrimmable_columns: Arc::new(untrimmable_columns(value)),
457            encoding_config: EncodingConfig {
458                use_dictionary: ENCODING_ENABLE_DICTIONARY.get(value),
459                compression: CompressionFormat::from_str(&ENCODING_COMPRESSION_FORMAT.get(value)),
460            },
461            preferred_order,
462            structured_key_lower_len: STRUCTURED_KEY_LOWER_LEN.get(value),
463            run_length_limit: MAX_RUN_LEN.get(value).clamp(2, usize::MAX),
464            max_runs: match MAX_RUNS.get(value) {
465                limit @ 2.. => Some(limit),
466                _ => None,
467            },
468            enable_incremental_compaction: ENABLE_INCREMENTAL_COMPACTION.get(value),
469        }
470    }
471}
472
473/// A list of (lowercase) column names that persist will always retain
474/// stats for, even if it means going over the stats budget.
475#[derive(
476    Debug,
477    Clone,
478    PartialEq,
479    Eq,
480    serde::Serialize,
481    serde::Deserialize,
482    Arbitrary
483)]
484pub(crate) struct UntrimmableColumns {
485    /// Always retain columns whose lowercased names exactly equal any of these strings.
486    pub equals: Vec<Cow<'static, str>>,
487    /// Always retain columns whose lowercased names start with any of these strings.
488    pub prefixes: Vec<Cow<'static, str>>,
489    /// Always retain columns whose lowercased names end with any of these strings.
490    pub suffixes: Vec<Cow<'static, str>>,
491}
492
493impl UntrimmableColumns {
494    pub(crate) fn should_retain(&self, name: &str) -> bool {
495        // TODO: see if there's a better way to match different formats than lowercasing
496        // https://github.com/MaterializeInc/database-issues/issues/6421#issue-1863623805
497        let name_lower = name.to_lowercase();
498        for s in &self.equals {
499            if *s == name_lower {
500                return true;
501            }
502        }
503        for s in &self.prefixes {
504            if name_lower.starts_with(s.as_ref()) {
505                return true;
506            }
507        }
508        for s in &self.suffixes {
509            if name_lower.ends_with(s.as_ref()) {
510                return true;
511            }
512        }
513        false
514    }
515}
516
517/// A builder for [Batches](Batch) that allows adding updates piece by piece and
518/// then finishing it.
519#[derive(Debug)]
520pub struct BatchBuilder<K, V, T, D>
521where
522    K: Codec,
523    V: Codec,
524    T: Timestamp + Lattice + Codec64,
525{
526    inline_desc: Description<T>,
527    inclusive_upper: Antichain<Reverse<T>>,
528
529    records_builder: PartBuilder<K, K::Schema, V, V::Schema>,
530    pub(crate) builder: BatchBuilderInternal<K, V, T, D>,
531}
532
533impl<K, V, T, D> BatchBuilder<K, V, T, D>
534where
535    K: Debug + Codec,
536    V: Debug + Codec,
537    T: Timestamp + Lattice + Codec64,
538    D: Monoid + Codec64,
539{
540    pub(crate) fn new(
541        builder: BatchBuilderInternal<K, V, T, D>,
542        inline_desc: Description<T>,
543    ) -> Self {
544        let records_builder = PartBuilder::new(
545            builder.write_schemas.key.as_ref(),
546            builder.write_schemas.val.as_ref(),
547        );
548        Self {
549            inline_desc,
550            inclusive_upper: Antichain::new(),
551            records_builder,
552            builder,
553        }
554    }
555
556    /// Finish writing this batch and return a handle to the written batch.
557    ///
558    /// This fails if any of the updates in this batch are beyond the given
559    /// `upper`.
560    pub async fn finish(
561        mut self,
562        registered_upper: Antichain<T>,
563    ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
564        if PartialOrder::less_than(&registered_upper, self.inline_desc.lower()) {
565            return Err(InvalidUsage::InvalidBounds {
566                lower: self.inline_desc.lower().clone(),
567                upper: registered_upper,
568            });
569        }
570
571        // When since is less than or equal to lower, the upper is a strict bound
572        // on the updates' timestamp because no advancement has been performed. Because user batches
573        // are always unadvanced, this ensures that new updates are recorded with valid timestamps.
574        // Otherwise, we can make no assumptions about the timestamps
575        if PartialOrder::less_equal(self.inline_desc.since(), self.inline_desc.lower()) {
576            for ts in self.inclusive_upper.iter() {
577                if registered_upper.less_equal(&ts.0) {
578                    return Err(InvalidUsage::UpdateBeyondUpper {
579                        ts: ts.0.clone(),
580                        expected_upper: registered_upper.clone(),
581                    });
582                }
583            }
584        }
585
586        let updates = self.records_builder.finish();
587        self.builder
588            .flush_part(self.inline_desc.clone(), updates)
589            .await;
590
591        self.builder
592            .finish(Description::new(
593                self.inline_desc.lower().clone(),
594                registered_upper,
595                self.inline_desc.since().clone(),
596            ))
597            .await
598    }
599
600    /// Adds the given update to the batch.
601    ///
602    /// The update timestamp must be greater or equal to `lower` that was given
603    /// when creating this [BatchBuilder].
604    pub async fn add(
605        &mut self,
606        key: &K,
607        val: &V,
608        ts: &T,
609        diff: &D,
610    ) -> Result<Added, InvalidUsage<T>> {
611        if !self.inline_desc.lower().less_equal(ts) {
612            return Err(InvalidUsage::UpdateNotBeyondLower {
613                ts: ts.clone(),
614                lower: self.inline_desc.lower().clone(),
615            });
616        }
617        self.inclusive_upper.insert(Reverse(ts.clone()));
618
619        let added = {
620            self.records_builder
621                .push(key, val, ts.clone(), diff.clone());
622            if self.records_builder.goodbytes() >= self.builder.parts.cfg.blob_target_size {
623                let part = self.records_builder.finish_and_replace(
624                    self.builder.write_schemas.key.as_ref(),
625                    self.builder.write_schemas.val.as_ref(),
626                );
627                Some(part)
628            } else {
629                None
630            }
631        };
632
633        let added = if let Some(full_batch) = added {
634            self.builder
635                .flush_part(self.inline_desc.clone(), full_batch)
636                .await;
637            Added::RecordAndParts
638        } else {
639            Added::Record
640        };
641        Ok(added)
642    }
643}
644
645#[derive(Debug)]
646pub(crate) struct BatchBuilderInternal<K, V, T, D>
647where
648    K: Codec,
649    V: Codec,
650    T: Timestamp + Lattice + Codec64,
651{
652    shard_id: ShardId,
653    version: Version,
654    blob: Arc<dyn Blob>,
655    metrics: Arc<Metrics>,
656
657    write_schemas: Schemas<K, V>,
658    parts: BatchParts<T>,
659
660    // These provide a bit more safety against appending a batch with the wrong
661    // type to a shard.
662    _phantom: PhantomData<fn(K, V, T, D)>,
663}
664
665impl<K, V, T, D> BatchBuilderInternal<K, V, T, D>
666where
667    K: Debug + Codec,
668    V: Debug + Codec,
669    T: Timestamp + Lattice + Codec64,
670    D: Monoid + Codec64,
671{
672    pub(crate) fn new(
673        _cfg: BatchBuilderConfig,
674        parts: BatchParts<T>,
675        metrics: Arc<Metrics>,
676        write_schemas: Schemas<K, V>,
677        blob: Arc<dyn Blob>,
678        shard_id: ShardId,
679        version: Version,
680    ) -> Self {
681        Self {
682            blob,
683            metrics,
684            write_schemas,
685            parts,
686            shard_id,
687            version,
688            _phantom: PhantomData,
689        }
690    }
691
692    /// Finish writing this batch and return a handle to the written batch.
693    ///
694    /// This fails if any of the updates in this batch are beyond the given
695    /// `upper`.
696    #[instrument(level = "debug", name = "batch::finish", fields(shard = %self.shard_id))]
697    pub async fn finish(
698        self,
699        registered_desc: Description<T>,
700    ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
701        let write_run_ids = self.parts.cfg.enable_incremental_compaction;
702        let batch_delete_enabled = self.parts.cfg.batch_delete_enabled;
703        let shard_metrics = Arc::clone(&self.parts.shard_metrics);
704        let runs = self.parts.finish().await;
705
706        let mut run_parts = vec![];
707        let mut run_splits = vec![];
708        let mut run_meta = vec![];
709        let total_updates = runs
710            .iter()
711            .map(|(_, _, num_updates)| num_updates)
712            .sum::<usize>();
713        for (order, parts, num_updates) in runs {
714            if parts.is_empty() {
715                continue;
716            }
717            if run_parts.len() != 0 {
718                run_splits.push(run_parts.len());
719            }
720            run_meta.push(RunMeta {
721                order: Some(order),
722                schema: self.write_schemas.id,
723                // Field has been deprecated but kept around to roundtrip state.
724                deprecated_schema: None,
725                id: if write_run_ids {
726                    Some(RunId::new())
727                } else {
728                    None
729                },
730                len: if write_run_ids {
731                    Some(num_updates)
732                } else {
733                    None
734                },
735                meta: MetadataMap::default(),
736            });
737            run_parts.extend(parts);
738        }
739        let desc = registered_desc;
740
741        let batch = Batch::new(
742            batch_delete_enabled,
743            Arc::clone(&self.metrics),
744            self.blob,
745            shard_metrics,
746            self.version,
747            (
748                K::encode_schema(&*self.write_schemas.key),
749                V::encode_schema(&*self.write_schemas.val),
750            ),
751            HollowBatch::new(desc, run_parts, total_updates, run_meta, run_splits),
752        );
753
754        Ok(batch)
755    }
756
757    /// Flushes the current part to Blob storage, first consolidating and then
758    /// columnar encoding the updates. It is the caller's responsibility to
759    /// chunk `current_part` to be no greater than
760    /// [BatchBuilderConfig::blob_target_size], and must absolutely be less than
761    /// [mz_persist::indexed::columnar::KEY_VAL_DATA_MAX_LEN]
762    pub async fn flush_part(&mut self, part_desc: Description<T>, columnar: Part) {
763        let num_updates = columnar.len();
764        if num_updates == 0 {
765            return;
766        }
767        let diffs_sum = diffs_sum::<D>(&columnar.diff);
768
769        let start = Instant::now();
770        self.parts
771            .write(&self.write_schemas, part_desc, columnar, diffs_sum)
772            .await;
773        self.metrics
774            .compaction
775            .batch
776            .step_part_writing
777            .inc_by(start.elapsed().as_secs_f64());
778    }
779}
780
781#[derive(Debug, Clone)]
782pub(crate) struct RunWithMeta<T> {
783    pub parts: Vec<RunPart<T>>,
784    pub num_updates: usize,
785}
786
787impl<T> RunWithMeta<T> {
788    pub fn new(parts: Vec<RunPart<T>>, num_updates: usize) -> Self {
789        Self { parts, num_updates }
790    }
791
792    pub fn single(part: RunPart<T>, num_updates: usize) -> Self {
793        Self {
794            parts: vec![part],
795            num_updates,
796        }
797    }
798}
799
800#[derive(Debug)]
801enum WritingRuns<T> {
802    /// Building a single run with the specified ordering. Parts are expected to be internally
803    /// sorted and added in order. Merging a vec of parts will shift them out to a hollow run
804    /// in blob, bounding the total length of a run in memory.
805    Ordered(RunOrder, MergeTree<Pending<RunWithMeta<T>>>),
806    /// Building multiple runs which may have different orders. Merging a vec of runs will cause
807    /// them to be compacted together, bounding the total number of runs we generate.
808    Compacting(MergeTree<(RunOrder, Pending<RunWithMeta<T>>)>),
809}
810
811// TODO: If this is dropped, cancel (and delete?) any writing parts and delete
812// any finished ones.
813#[derive(Debug)]
814pub(crate) struct BatchParts<T> {
815    cfg: BatchBuilderConfig,
816    metrics: Arc<Metrics>,
817    shard_metrics: Arc<ShardMetrics>,
818    shard_id: ShardId,
819    blob: Arc<dyn Blob>,
820    isolated_runtime: Arc<IsolatedRuntime>,
821    next_index: u64,
822    writing_runs: WritingRuns<T>,
823    batch_metrics: BatchWriteMetrics,
824}
825
826impl<T: Timestamp + Codec64> BatchParts<T> {
827    pub(crate) fn new_compacting<K, V, D>(
828        cfg: CompactConfig,
829        desc: Description<T>,
830        runs_per_compaction: usize,
831        metrics: Arc<Metrics>,
832        shard_metrics: Arc<ShardMetrics>,
833        shard_id: ShardId,
834        blob: Arc<dyn Blob>,
835        isolated_runtime: Arc<IsolatedRuntime>,
836        batch_metrics: &BatchWriteMetrics,
837        schemas: Schemas<K, V>,
838    ) -> Self
839    where
840        K: Codec + Debug,
841        V: Codec + Debug,
842        T: Lattice + Send + Sync,
843        D: Monoid + Ord + Codec64 + Send + Sync,
844    {
845        let writing_runs = {
846            let cfg = cfg.clone();
847            let blob = Arc::clone(&blob);
848            let metrics = Arc::clone(&metrics);
849            let shard_metrics = Arc::clone(&shard_metrics);
850            let isolated_runtime = Arc::clone(&isolated_runtime);
851            // Clamping to prevent extreme values given weird configs.
852            let runs_per_compaction = runs_per_compaction.clamp(2, 1024);
853
854            let merge_fn = move |parts: Vec<(RunOrder, Pending<RunWithMeta<T>>)>| {
855                let blob = Arc::clone(&blob);
856                let metrics = Arc::clone(&metrics);
857                let shard_metrics = Arc::clone(&shard_metrics);
858                let cfg = cfg.clone();
859                let isolated_runtime = Arc::clone(&isolated_runtime);
860                let write_schemas = schemas.clone();
861                let compact_desc = desc.clone();
862                let handle = mz_ore::task::spawn(
863                    || "batch::compact_runs",
864                    async move {
865                        let runs: Vec<_> = stream::iter(parts)
866                            .then(|(order, parts)| async move {
867                                let completed_run = parts.into_result().await;
868                                (
869                                    RunMeta {
870                                        order: Some(order),
871                                        schema: schemas.id,
872                                        // Field has been deprecated but kept around to
873                                        // roundtrip state.
874                                        deprecated_schema: None,
875                                        id: if cfg.batch.enable_incremental_compaction {
876                                            Some(RunId::new())
877                                        } else {
878                                            None
879                                        },
880                                        len: if cfg.batch.enable_incremental_compaction {
881                                            Some(completed_run.num_updates)
882                                        } else {
883                                            None
884                                        },
885                                        meta: MetadataMap::default(),
886                                    },
887                                    completed_run.parts,
888                                )
889                            })
890                            .collect()
891                            .await;
892
893                        let run_refs: Vec<_> = runs
894                            .iter()
895                            .map(|(meta, run)| (&compact_desc, meta, run.as_slice()))
896                            .collect();
897
898                        let output_batch = Compactor::<K, V, T, D>::compact_runs(
899                            &cfg,
900                            &shard_id,
901                            &compact_desc,
902                            run_refs,
903                            blob,
904                            metrics,
905                            shard_metrics,
906                            isolated_runtime,
907                            write_schemas,
908                        )
909                        .await
910                        .expect("successful compaction");
911
912                        assert_eq!(
913                            output_batch.run_meta.len(),
914                            1,
915                            "compaction is guaranteed to emit a single run"
916                        );
917                        let total_compacted_updates: usize = output_batch.len;
918
919                        RunWithMeta::new(output_batch.parts, total_compacted_updates)
920                    }
921                    .instrument(debug_span!("batch::compact_runs")),
922                );
923                (RunOrder::Structured, Pending::new(handle))
924            };
925            WritingRuns::Compacting(MergeTree::new(runs_per_compaction, merge_fn))
926        };
927        BatchParts {
928            cfg: cfg.batch,
929            metrics,
930            shard_metrics,
931            shard_id,
932            blob,
933            isolated_runtime,
934            next_index: 0,
935            writing_runs,
936            batch_metrics: batch_metrics.clone(),
937        }
938    }
939
940    pub(crate) fn new_ordered<D: Monoid + Codec64>(
941        cfg: BatchBuilderConfig,
942        order: RunOrder,
943        metrics: Arc<Metrics>,
944        shard_metrics: Arc<ShardMetrics>,
945        shard_id: ShardId,
946        blob: Arc<dyn Blob>,
947        isolated_runtime: Arc<IsolatedRuntime>,
948        batch_metrics: &BatchWriteMetrics,
949    ) -> Self {
950        let writing_runs = {
951            let cfg = cfg.clone();
952            let blob = Arc::clone(&blob);
953            let metrics = Arc::clone(&metrics);
954            let writer_key = cfg.writer_key.clone();
955            // Don't spill "unordered" runs to S3, since we'll split them up into many single-element
956            // runs below.
957            let run_length_limit = (order == RunOrder::Unordered)
958                .then_some(usize::MAX)
959                .unwrap_or(cfg.run_length_limit);
960            let merge_fn = move |parts: Vec<Pending<RunWithMeta<T>>>| {
961                let blob = Arc::clone(&blob);
962                let writer_key = writer_key.clone();
963                let metrics = Arc::clone(&metrics);
964                let handle = mz_ore::task::spawn(
965                    || "batch::spill_run",
966                    async move {
967                        let completed_runs: Vec<RunWithMeta<T>> = stream::iter(parts)
968                            .then(|p| p.into_result())
969                            .collect()
970                            .await;
971
972                        let mut all_run_parts = Vec::new();
973                        let mut total_updates = 0;
974
975                        for completed_run in completed_runs {
976                            all_run_parts.extend(completed_run.parts);
977                            total_updates += completed_run.num_updates;
978                        }
979
980                        let run_ref = HollowRunRef::set::<D>(
981                            shard_id,
982                            blob.as_ref(),
983                            &writer_key,
984                            HollowRun {
985                                parts: all_run_parts,
986                            },
987                            &*metrics,
988                        )
989                        .await;
990
991                        RunWithMeta::single(RunPart::Many(run_ref), total_updates)
992                    }
993                    .instrument(debug_span!("batch::spill_run")),
994                );
995                Pending::new(handle)
996            };
997            WritingRuns::Ordered(order, MergeTree::new(run_length_limit, merge_fn))
998        };
999        BatchParts {
1000            cfg,
1001            metrics,
1002            shard_metrics,
1003            shard_id,
1004            blob,
1005            isolated_runtime,
1006            next_index: 0,
1007            writing_runs,
1008            batch_metrics: batch_metrics.clone(),
1009        }
1010    }
1011
1012    pub(crate) fn expected_order(&self) -> RunOrder {
1013        match self.writing_runs {
1014            WritingRuns::Ordered(order, _) => order,
1015            WritingRuns::Compacting(_) => RunOrder::Unordered,
1016        }
1017    }
1018
1019    pub(crate) async fn write<K: Codec, V: Codec, D: Codec64>(
1020        &mut self,
1021        write_schemas: &Schemas<K, V>,
1022        desc: Description<T>,
1023        updates: Part,
1024        diffs_sum: D,
1025    ) {
1026        let batch_metrics = self.batch_metrics.clone();
1027        let index = self.next_index;
1028        self.next_index += 1;
1029        let num_updates = updates.len();
1030        let ts_rewrite = None;
1031        let schema_id = write_schemas.id;
1032
1033        // If we're going to encode structured data then halve our limit since we're storing
1034        // it twice, once as binary encoded and once as structured.
1035        let inline_threshold = self.cfg.inline_writes_single_max_bytes;
1036
1037        let updates = BlobTraceUpdates::from_part(updates);
1038        let (name, write_future) = if updates.goodbytes() < inline_threshold {
1039            let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current();
1040            (
1041                "batch::inline_part",
1042                async move {
1043                    let start = Instant::now();
1044                    let updates = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
1045                        desc: Some(desc.into_proto()),
1046                        index: index.into_proto(),
1047                        updates: Some(updates.into_proto()),
1048                    });
1049                    batch_metrics
1050                        .step_inline
1051                        .inc_by(start.elapsed().as_secs_f64());
1052
1053                    RunWithMeta::single(
1054                        RunPart::Single(BatchPart::Inline {
1055                            updates,
1056                            ts_rewrite,
1057                            schema_id,
1058                            // Field has been deprecated but kept around to roundtrip state.
1059                            deprecated_schema_id: None,
1060                        }),
1061                        num_updates,
1062                    )
1063                }
1064                .instrument(span)
1065                .boxed(),
1066            )
1067        } else {
1068            let part = BlobTraceBatchPart {
1069                desc,
1070                updates,
1071                index,
1072            };
1073            let cfg = self.cfg.clone();
1074            let blob = Arc::clone(&self.blob);
1075            let metrics = Arc::clone(&self.metrics);
1076            let shard_metrics = Arc::clone(&self.shard_metrics);
1077            let isolated_runtime = Arc::clone(&self.isolated_runtime);
1078            let expected_order = self.expected_order();
1079            let encoded_diffs_sum = D::encode(&diffs_sum);
1080            let write_schemas_clone = write_schemas.clone();
1081            let write_span =
1082                debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current();
1083            (
1084                "batch::write_part",
1085                async move {
1086                    let part = BatchParts::write_hollow_part(
1087                        cfg,
1088                        blob,
1089                        metrics,
1090                        shard_metrics,
1091                        batch_metrics,
1092                        isolated_runtime,
1093                        part,
1094                        expected_order,
1095                        ts_rewrite,
1096                        encoded_diffs_sum,
1097                        write_schemas_clone,
1098                    )
1099                    .await;
1100                    RunWithMeta::single(RunPart::Single(part), num_updates)
1101                }
1102                .instrument(write_span)
1103                .boxed(),
1104            )
1105        };
1106
1107        match &mut self.writing_runs {
1108            WritingRuns::Ordered(_order, run) => {
1109                let part = Pending::new(mz_ore::task::spawn(|| name, write_future));
1110                run.push(part);
1111
1112                // If there are more than the max outstanding parts, block on all but the
1113                //  most recent.
1114                for part in run
1115                    .iter_mut()
1116                    .rev()
1117                    .skip(self.cfg.batch_builder_max_outstanding_parts)
1118                    .take_while(|p| !p.is_finished())
1119                {
1120                    self.batch_metrics.write_stalls.inc();
1121                    part.block_until_ready().await;
1122                }
1123            }
1124            WritingRuns::Compacting(batches) => {
1125                let run = Pending::Writing(mz_ore::task::spawn(|| name, write_future));
1126                batches.push((RunOrder::Unordered, run));
1127
1128                // Allow up to `max_outstanding_parts` (or one compaction) to be pending, and block
1129                // on the rest.
1130                let mut part_budget = self.cfg.batch_builder_max_outstanding_parts;
1131                let mut compaction_budget = 1;
1132                for (_, part) in batches
1133                    .iter_mut()
1134                    .rev()
1135                    .skip_while(|(order, _)| match order {
1136                        RunOrder::Unordered if part_budget > 0 => {
1137                            part_budget -= 1;
1138                            true
1139                        }
1140                        RunOrder::Structured | RunOrder::Codec if compaction_budget > 0 => {
1141                            compaction_budget -= 1;
1142                            true
1143                        }
1144                        _ => false,
1145                    })
1146                    .take_while(|(_, p)| !p.is_finished())
1147                {
1148                    self.batch_metrics.write_stalls.inc();
1149                    part.block_until_ready().await;
1150                }
1151            }
1152        }
1153    }
1154
1155    async fn write_hollow_part<K: Codec, V: Codec>(
1156        cfg: BatchBuilderConfig,
1157        blob: Arc<dyn Blob>,
1158        metrics: Arc<Metrics>,
1159        shard_metrics: Arc<ShardMetrics>,
1160        batch_metrics: BatchWriteMetrics,
1161        isolated_runtime: Arc<IsolatedRuntime>,
1162        mut updates: BlobTraceBatchPart<T>,
1163        run_order: RunOrder,
1164        ts_rewrite: Option<Antichain<T>>,
1165        diffs_sum: [u8; 8],
1166        write_schemas: Schemas<K, V>,
1167    ) -> BatchPart<T> {
1168        let partial_key = PartialBatchKey::new(&cfg.writer_key, &PartId::new());
1169        let key = partial_key.complete(&shard_metrics.shard_id);
1170        let goodbytes = updates.updates.goodbytes();
1171        let metrics_ = Arc::clone(&metrics);
1172        let schema_id = write_schemas.id;
1173
1174        let (stats, key_lower, structured_key_lower, (buf, encode_time)) = isolated_runtime
1175            .spawn_named(|| "batch::encode_part", async move {
1176                // Measure the expensive steps of the part build - re-encoding and stats collection.
1177                let stats = metrics_.columnar.arrow().measure_part_build(|| {
1178                    let stats = if cfg.stats_collection_enabled {
1179                        let ext = updates.updates.get_or_make_structured::<K, V>(
1180                            write_schemas.key.as_ref(),
1181                            write_schemas.val.as_ref(),
1182                        );
1183
1184                        let key_stats = write_schemas
1185                            .key
1186                            .decoder_any(ext.key.as_ref())
1187                            .expect("decoding just-encoded data")
1188                            .stats();
1189
1190                        let part_stats = PartStats { key: key_stats };
1191
1192                        // Collect stats about the updates, if stats collection is enabled.
1193                        let trimmed_start = Instant::now();
1194                        let mut trimmed_bytes = 0;
1195                        let trimmed_stats = LazyPartStats::encode(&part_stats, |s| {
1196                            trimmed_bytes = trim_to_budget(s, cfg.stats_budget, |s| {
1197                                cfg.stats_untrimmable_columns.should_retain(s)
1198                            })
1199                        });
1200                        let trimmed_duration = trimmed_start.elapsed();
1201                        Some((trimmed_stats, trimmed_duration, trimmed_bytes))
1202                    } else {
1203                        None
1204                    };
1205
1206                    // Ensure the updates are in the specified columnar format before encoding.
1207                    updates.updates = updates.updates.as_structured::<K, V>(
1208                        write_schemas.key.as_ref(),
1209                        write_schemas.val.as_ref(),
1210                    );
1211
1212                    stats
1213                });
1214
1215                let key_lower = if let Some(records) = updates.updates.records() {
1216                    let key_bytes = records.keys();
1217                    if key_bytes.is_empty() {
1218                        &[]
1219                    } else if run_order == RunOrder::Codec {
1220                        key_bytes.value(0)
1221                    } else {
1222                        ::arrow::compute::min_binary(key_bytes).expect("min of nonempty array")
1223                    }
1224                } else {
1225                    &[]
1226                };
1227                let key_lower = truncate_bytes(key_lower, TRUNCATE_LEN, TruncateBound::Lower)
1228                    .expect("lower bound always exists");
1229
1230                let structured_key_lower = if cfg.structured_key_lower_len > 0 {
1231                    updates.updates.structured().and_then(|ext| {
1232                        let min_key = if run_order == RunOrder::Structured {
1233                            0
1234                        } else {
1235                            let ord = ArrayOrd::new(ext.key.as_ref());
1236                            (0..ext.key.len())
1237                                .min_by_key(|i| ord.at(*i))
1238                                .expect("non-empty batch")
1239                        };
1240                        let lower = ArrayBound::new(Arc::clone(&ext.key), min_key)
1241                            .to_proto_lower(cfg.structured_key_lower_len);
1242                        if lower.is_none() {
1243                            batch_metrics.key_lower_too_big.inc()
1244                        }
1245                        lower.map(|proto| LazyProto::from(&proto))
1246                    })
1247                } else {
1248                    None
1249                };
1250
1251                let encode_start = Instant::now();
1252                let mut buf = Vec::new();
1253                updates.encode(&mut buf, &metrics_.columnar, &cfg.encoding_config);
1254
1255                // Drop batch as soon as we can to reclaim its memory.
1256                drop(updates);
1257                (
1258                    stats,
1259                    key_lower,
1260                    structured_key_lower,
1261                    (Bytes::from(buf), encode_start.elapsed()),
1262                )
1263            })
1264            .instrument(debug_span!("batch::encode_part"))
1265            .await;
1266        // Can't use the `CodecMetrics::encode` helper because of async.
1267        metrics.codecs.batch.encode_count.inc();
1268        metrics
1269            .codecs
1270            .batch
1271            .encode_seconds
1272            .inc_by(encode_time.as_secs_f64());
1273
1274        let start = Instant::now();
1275        let payload_len = buf.len();
1276        let () = retry_external(&metrics.retries.external.batch_set, || async {
1277            shard_metrics.blob_sets.inc();
1278            blob.set(&key, Bytes::clone(&buf)).await
1279        })
1280        .instrument(trace_span!("batch::set", payload_len))
1281        .await;
1282        batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64());
1283        batch_metrics.bytes.inc_by(u64::cast_from(payload_len));
1284        batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes));
1285        match run_order {
1286            RunOrder::Unordered => batch_metrics.unordered.inc(),
1287            RunOrder::Codec => batch_metrics.codec_order.inc(),
1288            RunOrder::Structured => batch_metrics.structured_order.inc(),
1289        }
1290        let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| {
1291            batch_metrics
1292                .step_stats
1293                .inc_by(stats_step_timing.as_secs_f64());
1294            if trimmed_bytes > 0 {
1295                metrics.pushdown.parts_stats_trimmed_count.inc();
1296                metrics
1297                    .pushdown
1298                    .parts_stats_trimmed_bytes
1299                    .inc_by(u64::cast_from(trimmed_bytes));
1300            }
1301            stats
1302        });
1303
1304        let meta = MetadataMap::default();
1305        BatchPart::Hollow(HollowBatchPart {
1306            key: partial_key,
1307            meta,
1308            encoded_size_bytes: payload_len,
1309            key_lower,
1310            structured_key_lower,
1311            stats,
1312            ts_rewrite,
1313            diffs_sum: Some(diffs_sum),
1314            format: Some(BatchColumnarFormat::Structured),
1315            schema_id,
1316            // Field has been deprecated but kept around to roundtrip state.
1317            deprecated_schema_id: None,
1318        })
1319    }
1320
1321    #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))]
1322    pub(crate) async fn finish(self) -> Vec<(RunOrder, Vec<RunPart<T>>, usize)> {
1323        match self.writing_runs {
1324            WritingRuns::Ordered(RunOrder::Unordered, run) => {
1325                let completed_runs = run.finish();
1326                let mut output = Vec::with_capacity(completed_runs.len());
1327                for completed_run in completed_runs {
1328                    let completed_run = completed_run.into_result().await;
1329                    // Each part becomes its own run for unordered case
1330                    for part in completed_run.parts {
1331                        output.push((RunOrder::Unordered, vec![part], completed_run.num_updates));
1332                    }
1333                }
1334                output
1335            }
1336            WritingRuns::Ordered(order, run) => {
1337                let completed_runs = run.finish();
1338                let mut all_parts = Vec::new();
1339                let mut all_update_counts = 0;
1340                for completed_run in completed_runs {
1341                    let completed_run = completed_run.into_result().await;
1342                    all_parts.extend(completed_run.parts);
1343                    all_update_counts += completed_run.num_updates;
1344                }
1345                vec![(order, all_parts, all_update_counts)]
1346            }
1347            WritingRuns::Compacting(batches) => {
1348                let runs = batches.finish();
1349                let mut output = Vec::new();
1350                for (order, run) in runs {
1351                    let completed_run = run.into_result().await;
1352                    output.push((order, completed_run.parts, completed_run.num_updates));
1353                }
1354                output
1355            }
1356        }
1357    }
1358}
1359
1360pub(crate) fn validate_truncate_batch<T: Timestamp>(
1361    batch: &HollowBatch<T>,
1362    truncate: &Description<T>,
1363    any_batch_rewrite: bool,
1364    validate_part_bounds_on_write: bool,
1365) -> Result<(), InvalidUsage<T>> {
1366    // If rewrite_ts is used, we don't allow truncation, to keep things simpler
1367    // to reason about.
1368    if any_batch_rewrite {
1369        // We allow a new upper to be specified at rewrite time, so that's easy:
1370        // it must match exactly. This is both consistent with the upper
1371        // requirement below and proves that there is no data to truncate past
1372        // the upper.
1373        if truncate.upper() != batch.desc.upper() {
1374            return Err(InvalidUsage::InvalidRewrite(format!(
1375                "rewritten batch might have data past {:?} up to {:?}",
1376                truncate.upper().elements(),
1377                batch.desc.upper().elements(),
1378            )));
1379        }
1380        // To prove that there is no data to truncate below the lower, require
1381        // that the lower is <= the rewrite ts.
1382        for part in batch.parts.iter() {
1383            let part_lower_bound = part.ts_rewrite().unwrap_or_else(|| batch.desc.lower());
1384            if !PartialOrder::less_equal(truncate.lower(), part_lower_bound) {
1385                return Err(InvalidUsage::InvalidRewrite(format!(
1386                    "rewritten batch might have data below {:?} at {:?}",
1387                    truncate.lower().elements(),
1388                    part_lower_bound.elements(),
1389                )));
1390            }
1391        }
1392    }
1393
1394    if !validate_part_bounds_on_write {
1395        return Ok(());
1396    }
1397
1398    let batch = &batch.desc;
1399    if !PartialOrder::less_equal(batch.lower(), truncate.lower())
1400        || PartialOrder::less_than(batch.upper(), truncate.upper())
1401    {
1402        return Err(InvalidUsage::InvalidBatchBounds {
1403            batch_lower: batch.lower().clone(),
1404            batch_upper: batch.upper().clone(),
1405            append_lower: truncate.lower().clone(),
1406            append_upper: truncate.upper().clone(),
1407        });
1408    }
1409
1410    Ok(())
1411}
1412
1413#[derive(Debug)]
1414pub(crate) struct PartDeletes<T> {
1415    /// Keys to hollow parts or runs that we're ready to delete.
1416    blob_keys: BTreeSet<PartialBatchKey>,
1417    /// Keys to hollow runs that may not have had all their parts deleted (or added to blob_keys) yet.
1418    hollow_runs: BTreeMap<PartialBatchKey, HollowRunRef<T>>,
1419}
1420
1421impl<T> Default for PartDeletes<T> {
1422    fn default() -> Self {
1423        Self {
1424            blob_keys: Default::default(),
1425            hollow_runs: Default::default(),
1426        }
1427    }
1428}
1429
1430impl<T: Timestamp> PartDeletes<T> {
1431    // Adds the part to the set to be deleted and returns true if it was newly
1432    // inserted.
1433    pub fn add(&mut self, part: &RunPart<T>) -> bool {
1434        match part {
1435            RunPart::Many(r) => self.hollow_runs.insert(r.key.clone(), r.clone()).is_none(),
1436            RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.insert(x.key.clone()),
1437            RunPart::Single(BatchPart::Inline { .. }) => {
1438                // Nothing to delete.
1439                true
1440            }
1441        }
1442    }
1443
1444    pub fn contains(&self, part: &RunPart<T>) -> bool {
1445        match part {
1446            RunPart::Many(r) => self.hollow_runs.contains_key(&r.key),
1447            RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.contains(&x.key),
1448            RunPart::Single(BatchPart::Inline { .. }) => false,
1449        }
1450    }
1451
1452    pub fn is_empty(&self) -> bool {
1453        self.len() == 0
1454    }
1455
1456    pub fn len(&self) -> usize {
1457        match self {
1458            Self {
1459                blob_keys,
1460                hollow_runs,
1461            } => blob_keys.len() + hollow_runs.len(),
1462        }
1463    }
1464
1465    pub async fn delete(
1466        mut self,
1467        blob: &dyn Blob,
1468        shard_id: ShardId,
1469        concurrency: usize,
1470        metrics: &Metrics,
1471        delete_metrics: &RetryMetrics,
1472    ) where
1473        T: Codec64,
1474    {
1475        loop {
1476            let () = stream::iter(mem::take(&mut self.blob_keys))
1477                .map(|key| {
1478                    let key = key.complete(&shard_id);
1479                    async move {
1480                        retry_external(delete_metrics, || blob.delete(&key)).await;
1481                    }
1482                })
1483                .buffer_unordered(concurrency)
1484                .collect()
1485                .await;
1486
1487            let Some((run_key, run_ref)) = self.hollow_runs.pop_first() else {
1488                break;
1489            };
1490
1491            if let Some(run) = run_ref.get(shard_id, blob, metrics).await {
1492                // Queue up both all the individual parts and the run itself for deletion.
1493                for part in &run.parts {
1494                    self.add(part);
1495                }
1496                self.blob_keys.insert(run_key);
1497            };
1498        }
1499    }
1500}
1501
1502/// Returns the total sum of diffs or None if there were no updates.
1503fn diffs_sum<D: Monoid + Codec64>(updates: &Int64Array) -> D {
1504    let mut sum = D::zero();
1505    for d in updates.values().iter() {
1506        let d = D::decode(d.to_le_bytes());
1507        sum.plus_equals(&d);
1508    }
1509    sum
1510}
1511
1512#[cfg(test)]
1513mod tests {
1514    use mz_dyncfg::ConfigUpdates;
1515
1516    use super::*;
1517    use crate::PersistLocation;
1518    use crate::cache::PersistClientCache;
1519    use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
1520    use crate::internal::paths::{BlobKey, PartialBlobKey};
1521    use crate::tests::{all_ok, new_test_client};
1522
1523    #[mz_ore::test(tokio::test)]
1524    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1525    async fn batch_builder_flushing() {
1526        let data = vec![
1527            (("1".to_owned(), "one".to_owned()), 1, 1),
1528            (("2".to_owned(), "two".to_owned()), 2, 1),
1529            (("3".to_owned(), "three".to_owned()), 3, 1),
1530            (("4".to_owned(), "four".to_owned()), 4, 1),
1531        ];
1532
1533        let cache = PersistClientCache::new_no_metrics();
1534
1535        // Set blob_target_size to 0 so that each row gets forced into its own
1536        // batch. Set max_outstanding to a small value that's >1 to test various
1537        // edge cases below.
1538        cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1539        cache.cfg.set_config(&MAX_RUNS, 3);
1540        cache
1541            .cfg
1542            .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 2);
1543
1544        let client = cache
1545            .open(PersistLocation::new_in_mem())
1546            .await
1547            .expect("client construction failed");
1548        let (mut write, mut read) = client
1549            .expect_open::<String, String, u64, i64>(ShardId::new())
1550            .await;
1551
1552        // A new builder has no writing or finished parts.
1553        let mut builder = write.builder(Antichain::from_elem(0));
1554
1555        fn assert_writing(
1556            builder: &BatchBuilder<String, String, u64, i64>,
1557            expected_finished: &[bool],
1558        ) {
1559            let WritingRuns::Compacting(run) = &builder.builder.parts.writing_runs else {
1560                unreachable!("ordered run!")
1561            };
1562
1563            let actual: Vec<_> = run.iter().map(|(_, p)| p.is_finished()).collect();
1564            assert_eq!(*expected_finished, actual);
1565        }
1566
1567        assert_writing(&builder, &[]);
1568
1569        // We set blob_target_size to 0, so the first update gets forced out
1570        // into a run.
1571        let ((k, v), t, d) = &data[0];
1572        builder.add(k, v, t, d).await.expect("invalid usage");
1573        assert_writing(&builder, &[false]);
1574
1575        // We set batch_builder_max_outstanding_parts to 2, so we are allowed to
1576        // pipeline a second part.
1577        let ((k, v), t, d) = &data[1];
1578        builder.add(k, v, t, d).await.expect("invalid usage");
1579        assert_writing(&builder, &[false, false]);
1580
1581        // But now that we have 3 parts, the add call back-pressures until the
1582        // first one finishes.
1583        let ((k, v), t, d) = &data[2];
1584        builder.add(k, v, t, d).await.expect("invalid usage");
1585        assert_writing(&builder, &[true, false, false]);
1586
1587        // Finally, pushing a fourth part will cause the first three to spill out into
1588        // a new compacted run.
1589        let ((k, v), t, d) = &data[3];
1590        builder.add(k, v, t, d).await.expect("invalid usage");
1591        assert_writing(&builder, &[false, false]);
1592
1593        // Finish off the batch and verify that the keys and such get plumbed
1594        // correctly by reading the data back.
1595        let batch = builder
1596            .finish(Antichain::from_elem(5))
1597            .await
1598            .expect("invalid usage");
1599        assert_eq!(batch.batch.runs().count(), 2);
1600        assert_eq!(batch.batch.part_count(), 4);
1601        write
1602            .append_batch(batch, Antichain::from_elem(0), Antichain::from_elem(5))
1603            .await
1604            .expect("invalid usage")
1605            .expect("unexpected upper");
1606        assert_eq!(read.expect_snapshot_and_fetch(4).await, all_ok(&data, 4));
1607    }
1608
1609    #[mz_ore::test(tokio::test)]
1610    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1611    async fn batch_builder_keys() {
1612        let cache = PersistClientCache::new_no_metrics();
1613        // Set blob_target_size to 0 so that each row gets forced into its own batch part
1614        cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1615        // Otherwise fails: expected hollow part!
1616        cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
1617        cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1618        cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1619        let client = cache
1620            .open(PersistLocation::new_in_mem())
1621            .await
1622            .expect("client construction failed");
1623        let shard_id = ShardId::new();
1624        let (mut write, _) = client
1625            .expect_open::<String, String, u64, i64>(shard_id)
1626            .await;
1627
1628        let batch = write
1629            .expect_batch(
1630                &[
1631                    (("1".into(), "one".into()), 1, 1),
1632                    (("2".into(), "two".into()), 2, 1),
1633                    (("3".into(), "three".into()), 3, 1),
1634                ],
1635                0,
1636                4,
1637            )
1638            .await;
1639
1640        assert_eq!(batch.batch.part_count(), 3);
1641        for part in &batch.batch.parts {
1642            let part = part.expect_hollow_part();
1643            match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
1644                Ok((shard, PartialBlobKey::Batch(writer, _))) => {
1645                    assert_eq!(shard.to_string(), shard_id.to_string());
1646                    assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
1647                }
1648                _ => panic!("unparseable blob key"),
1649            }
1650        }
1651    }
1652
1653    #[mz_ore::test(tokio::test)]
1654    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1655    async fn batch_delete() {
1656        let cache = PersistClientCache::new_no_metrics();
1657        cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1658        cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1659        cache.cfg.set_config(&BATCH_DELETE_ENABLED, true);
1660        let client = cache
1661            .open(PersistLocation::new_in_mem())
1662            .await
1663            .expect("client construction failed");
1664        let shard_id = ShardId::new();
1665        let (mut write, _) = client
1666            .expect_open::<String, String, u64, i64>(shard_id)
1667            .await;
1668
1669        let batch = write
1670            .expect_batch(
1671                &[
1672                    (("1".into(), "one".into()), 1, 1),
1673                    (("2".into(), "two".into()), 2, 1),
1674                    (("3".into(), "three".into()), 3, 1),
1675                ],
1676                0,
1677                4,
1678            )
1679            .await;
1680
1681        assert_eq!(batch.batch.part_count(), 1);
1682        let part_key = batch.batch.parts[0]
1683            .expect_hollow_part()
1684            .key
1685            .complete(&shard_id);
1686
1687        let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1688        assert!(part_bytes.is_some());
1689
1690        batch.delete().await;
1691
1692        let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1693        assert!(part_bytes.is_none());
1694    }
1695
1696    #[mz_ore::test]
1697    fn untrimmable_columns() {
1698        let untrimmable = UntrimmableColumns {
1699            equals: vec!["abc".into(), "def".into()],
1700            prefixes: vec!["123".into(), "234".into()],
1701            suffixes: vec!["xyz".into()],
1702        };
1703
1704        // equals
1705        assert!(untrimmable.should_retain("abc"));
1706        assert!(untrimmable.should_retain("ABC"));
1707        assert!(untrimmable.should_retain("aBc"));
1708        assert!(!untrimmable.should_retain("abcd"));
1709        assert!(untrimmable.should_retain("deF"));
1710        assert!(!untrimmable.should_retain("defg"));
1711
1712        // prefix
1713        assert!(untrimmable.should_retain("123"));
1714        assert!(untrimmable.should_retain("123-4"));
1715        assert!(untrimmable.should_retain("1234"));
1716        assert!(untrimmable.should_retain("234"));
1717        assert!(!untrimmable.should_retain("345"));
1718
1719        // suffix
1720        assert!(untrimmable.should_retain("ijk_xyZ"));
1721        assert!(untrimmable.should_retain("ww-XYZ"));
1722        assert!(!untrimmable.should_retain("xya"));
1723    }
1724
1725    // NB: Most edge cases are exercised in datadriven tests.
1726    #[mz_persist_proc::test(tokio::test)]
1727    #[cfg_attr(miri, ignore)] // too slow
1728    async fn rewrite_ts_example(dyncfgs: ConfigUpdates) {
1729        let client = new_test_client(&dyncfgs).await;
1730        let (mut write, read) = client
1731            .expect_open::<String, (), u64, i64>(ShardId::new())
1732            .await;
1733
1734        let mut batch = write.builder(Antichain::from_elem(0));
1735        batch.add(&"foo".to_owned(), &(), &0, &1).await.unwrap();
1736        let batch = batch.finish(Antichain::from_elem(1)).await.unwrap();
1737
1738        // Roundtrip through a transmittable batch.
1739        let batch = batch.into_transmittable_batch();
1740        let mut batch = write.batch_from_transmittable_batch(batch);
1741        batch
1742            .rewrite_ts(&Antichain::from_elem(2), Antichain::from_elem(3))
1743            .unwrap();
1744        write
1745            .expect_compare_and_append_batch(&mut [&mut batch], 0, 3)
1746            .await;
1747
1748        let (actual, _) = read.expect_listen(0).await.read_until(&3).await;
1749        let expected = vec![((("foo".to_owned()), ()), 2, 1)];
1750        assert_eq!(actual, expected);
1751    }
1752
1753    #[mz_ore::test(tokio::test)]
1754    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1755    async fn structured_lowers() {
1756        let cache = PersistClientCache::new_no_metrics();
1757        // Ensure structured data is calculated, and that we give some budget for a key lower.
1758        cache.cfg().set_config(&STRUCTURED_KEY_LOWER_LEN, 1024);
1759        // Otherwise fails: expected hollow part!
1760        cache.cfg().set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1761        cache.cfg().set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1762        let client = cache
1763            .open(PersistLocation::new_in_mem())
1764            .await
1765            .expect("client construction failed");
1766        let shard_id = ShardId::new();
1767        let (mut write, _) = client
1768            .expect_open::<String, String, u64, i64>(shard_id)
1769            .await;
1770
1771        let batch = write
1772            .expect_batch(
1773                &[
1774                    (("1".into(), "one".into()), 1, 1),
1775                    (("2".into(), "two".into()), 2, 1),
1776                    (("3".into(), "three".into()), 3, 1),
1777                ],
1778                0,
1779                4,
1780            )
1781            .await;
1782
1783        assert_eq!(batch.batch.part_count(), 1);
1784        let [part] = batch.batch.parts.as_slice() else {
1785            panic!("expected single part")
1786        };
1787        // Verifies that the structured key lower is stored and decoded.
1788        assert!(part.structured_key_lower().is_some());
1789    }
1790}