mz_persist_client/
batch.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A handle to a batch of updates
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::marker::PhantomData;
16use std::mem;
17use std::sync::Arc;
18use std::time::Instant;
19
20use arrow::array::{Array, Int64Array};
21use bytes::Bytes;
22use differential_dataflow::difference::Monoid;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::stream::StreamExt;
26use futures_util::{FutureExt, stream};
27use mz_dyncfg::Config;
28use mz_ore::cast::CastFrom;
29use mz_ore::instrument;
30use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::Blob;
32use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
33use mz_persist_types::columnar::{ColumnDecoder, Schema};
34use mz_persist_types::parquet::{CompressionFormat, EncodingConfig};
35use mz_persist_types::part::{Part, PartBuilder};
36use mz_persist_types::schema::SchemaId;
37use mz_persist_types::stats::{
38    PartStats, TRUNCATE_LEN, TruncateBound, trim_to_budget, truncate_bytes,
39};
40use mz_persist_types::{Codec, Codec64};
41use mz_proto::RustType;
42use mz_timely_util::order::Reverse;
43use proptest_derive::Arbitrary;
44use semver::Version;
45use timely::PartialOrder;
46use timely::order::TotalOrder;
47use timely::progress::{Antichain, Timestamp};
48use tracing::{Instrument, debug_span, trace_span, warn};
49
50use crate::async_runtime::IsolatedRuntime;
51use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
52use crate::error::InvalidUsage;
53use crate::internal::compact::{CompactConfig, Compactor};
54use crate::internal::encoding::{
55    LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, Schemas,
56};
57use crate::internal::machine::retry_external;
58use crate::internal::merge::{MergeTree, Pending};
59use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
60use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
61use crate::internal::state::{
62    BatchPart, ENABLE_INCREMENTAL_COMPACTION, HollowBatch, HollowBatchPart, HollowRun,
63    HollowRunRef, ProtoInlineBatchPart, RunId, RunMeta, RunOrder, RunPart,
64};
65use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns};
66use crate::{PersistConfig, ShardId};
67
68include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs"));
69
70/// A handle to a batch of updates that has been written to blob storage but
71/// which has not yet been appended to a shard.
72///
73/// A [Batch] needs to be marked as consumed or it needs to be deleted via [Self::delete].
74/// Otherwise, a dangling batch will leak and backing blobs will remain in blob storage.
75#[derive(Debug)]
76pub struct Batch<K, V, T, D> {
77    pub(crate) batch_delete_enabled: bool,
78    pub(crate) metrics: Arc<Metrics>,
79    pub(crate) shard_metrics: Arc<ShardMetrics>,
80
81    /// The version of Materialize which wrote this batch.
82    pub(crate) version: Version,
83
84    /// A handle to the data represented by this batch.
85    pub(crate) batch: HollowBatch<T>,
86
87    /// Handle to the [Blob] that the blobs of this batch were uploaded to.
88    pub(crate) blob: Arc<dyn Blob>,
89
90    // These provide a bit more safety against appending a batch with the wrong
91    // type to a shard.
92    pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
93}
94
95impl<K, V, T, D> Drop for Batch<K, V, T, D> {
96    fn drop(&mut self) {
97        if self.batch.part_count() > 0 {
98            warn!(
99                "un-consumed Batch, with {} parts and dangling blob keys: {:?}",
100                self.batch.part_count(),
101                self.batch
102                    .parts
103                    .iter()
104                    .map(|x| x.printable_name())
105                    .collect::<Vec<_>>(),
106            );
107        }
108    }
109}
110
111impl<K, V, T, D> Batch<K, V, T, D>
112where
113    K: Debug + Codec,
114    V: Debug + Codec,
115    T: Timestamp + Lattice + Codec64,
116    D: Monoid + Codec64,
117{
118    pub(crate) fn new(
119        batch_delete_enabled: bool,
120        metrics: Arc<Metrics>,
121        blob: Arc<dyn Blob>,
122        shard_metrics: Arc<ShardMetrics>,
123        version: Version,
124        batch: HollowBatch<T>,
125    ) -> Self {
126        Self {
127            batch_delete_enabled,
128            metrics,
129            shard_metrics,
130            version,
131            batch,
132            blob,
133            _phantom: PhantomData,
134        }
135    }
136
137    /// The `shard_id` of this [Batch].
138    pub fn shard_id(&self) -> ShardId {
139        self.shard_metrics.shard_id
140    }
141
142    /// The `upper` of this [Batch].
143    pub fn upper(&self) -> &Antichain<T> {
144        self.batch.desc.upper()
145    }
146
147    /// The `lower` of this [Batch].
148    pub fn lower(&self) -> &Antichain<T> {
149        self.batch.desc.lower()
150    }
151
152    /// Marks the blobs that this batch handle points to as consumed, likely
153    /// because they were appended to a shard.
154    ///
155    /// Consumers of a blob need to make this explicit, so that we can log
156    /// warnings in case a batch is not used.
157    pub(crate) fn mark_consumed(&mut self) {
158        self.batch.parts.clear();
159    }
160
161    /// Deletes the blobs that make up this batch from the given blob store and
162    /// marks them as deleted.
163    #[instrument(level = "debug", fields(shard = %self.shard_id()))]
164    pub async fn delete(mut self) {
165        if !self.batch_delete_enabled {
166            self.mark_consumed();
167            return;
168        }
169        let mut deletes = PartDeletes::default();
170        for part in self.batch.parts.drain(..) {
171            deletes.add(&part);
172        }
173        let () = deletes
174            .delete(
175                &*self.blob,
176                self.shard_id(),
177                usize::MAX,
178                &*self.metrics,
179                &*self.metrics.retries.external.batch_delete,
180            )
181            .await;
182    }
183
184    /// Returns the schemas of parts in this batch.
185    pub fn schemas(&self) -> impl Iterator<Item = SchemaId> + '_ {
186        self.batch.parts.iter().flat_map(|b| b.schema_id())
187    }
188
189    /// Turns this [`Batch`] into a `HollowBatch`.
190    ///
191    /// **NOTE**: If this batch is not eventually appended to a shard or
192    /// dropped, the data that it represents will have leaked.
193    pub fn into_hollow_batch(mut self) -> HollowBatch<T> {
194        let ret = self.batch.clone();
195        self.mark_consumed();
196        ret
197    }
198
199    /// Turns this [`Batch`] into a [`ProtoBatch`], which can be used to
200    /// transfer this batch across process boundaries, for example when
201    /// exchanging data between timely workers.
202    ///
203    /// **NOTE**: If this batch is not eventually appended to a shard or
204    /// dropped, the data that it represents will have leaked. The caller is
205    /// responsible for turning this back into a [`Batch`] using
206    /// [`WriteHandle::batch_from_transmittable_batch`](crate::write::WriteHandle::batch_from_transmittable_batch).
207    pub fn into_transmittable_batch(mut self) -> ProtoBatch {
208        let ret = ProtoBatch {
209            shard_id: self.shard_metrics.shard_id.into_proto(),
210            version: self.version.to_string(),
211            batch: Some(self.batch.into_proto()),
212        };
213        self.mark_consumed();
214        ret
215    }
216
217    pub(crate) async fn flush_to_blob(
218        &mut self,
219        cfg: &BatchBuilderConfig,
220        batch_metrics: &BatchWriteMetrics,
221        isolated_runtime: &Arc<IsolatedRuntime>,
222        write_schemas: &Schemas<K, V>,
223    ) {
224        // It's necessary for correctness to keep the parts in the same order.
225        // We could introduce concurrency here with FuturesOrdered, but it would
226        // be pretty unexpected to have inline writes in more than one part, so
227        // don't bother.
228        let mut parts = Vec::new();
229        for (run_meta, run_parts) in self.batch.runs() {
230            for part in run_parts {
231                let (updates, ts_rewrite, schema_id) = match part {
232                    RunPart::Single(BatchPart::Inline {
233                        updates,
234                        ts_rewrite,
235                        schema_id,
236                        deprecated_schema_id: _,
237                    }) => (updates, ts_rewrite, schema_id),
238                    other @ RunPart::Many(_) | other @ RunPart::Single(BatchPart::Hollow(_)) => {
239                        parts.push(other.clone());
240                        continue;
241                    }
242                };
243                let updates = updates
244                    .decode::<T>(&self.metrics.columnar)
245                    .expect("valid inline part");
246                let diffs_sum = diffs_sum::<D>(updates.updates.diffs());
247                let mut write_schemas = write_schemas.clone();
248                write_schemas.id = *schema_id;
249
250                let write_span =
251                    debug_span!("batch::flush_to_blob", shard = %self.shard_metrics.shard_id)
252                        .or_current();
253                let handle = mz_ore::task::spawn(
254                    || "batch::flush_to_blob",
255                    BatchParts::write_hollow_part(
256                        cfg.clone(),
257                        Arc::clone(&self.blob),
258                        Arc::clone(&self.metrics),
259                        Arc::clone(&self.shard_metrics),
260                        batch_metrics.clone(),
261                        Arc::clone(isolated_runtime),
262                        updates,
263                        run_meta.order.unwrap_or(RunOrder::Unordered),
264                        ts_rewrite.clone(),
265                        D::encode(&diffs_sum),
266                        write_schemas,
267                    )
268                    .instrument(write_span),
269                );
270                let part = handle.await.expect("part write task failed");
271                parts.push(RunPart::Single(part));
272            }
273        }
274        self.batch.parts = parts;
275    }
276
277    /// The sum of the encoded sizes of all parts in the batch.
278    pub fn encoded_size_bytes(&self) -> usize {
279        self.batch.encoded_size_bytes()
280    }
281}
282
283impl<K, V, T, D> Batch<K, V, T, D>
284where
285    K: Debug + Codec,
286    V: Debug + Codec,
287    T: Timestamp + Lattice + Codec64 + TotalOrder,
288    D: Monoid + Codec64,
289{
290    /// Efficiently rewrites the timestamps in this not-yet-committed batch.
291    ///
292    /// This [Batch] represents potentially large amounts of data, which may
293    /// have partly or entirely been spilled to s3. This call bulk edits the
294    /// timestamps of all data in this batch in a metadata-only operation (i.e.
295    /// without network calls).
296    ///
297    /// Specifically, every timestamp in the batch is logically advanced_by the
298    /// provided `frontier`.
299    ///
300    /// This method may be called multiple times, with later calls overriding
301    /// previous ones, but the rewrite frontier may not regress across calls.
302    ///
303    /// When this batch was created, it was given an `upper`, which bounds the
304    /// staged data it represents. To allow rewrite past this original `upper`,
305    /// this call accepts a new `upper` which replaces the previous one. Like
306    /// the rewrite frontier, the upper may not regress across calls.
307    ///
308    /// Multiple batches with various rewrite frontiers may be used in a single
309    /// [crate::write::WriteHandle::compare_and_append_batch] call. This is an
310    /// expected usage.
311    ///
312    /// This feature requires that the timestamp impls `TotalOrder`. This is
313    /// because we need to be able to verify that the contained data, after the
314    /// rewrite forward operation, still respects the new upper. It turns out
315    /// that, given the metadata persist currently collects during batch
316    /// collection, this is possible for totally ordered times, but it's known
317    /// to be _not possible_ for partially ordered times. It is believed that we
318    /// could fix this by collecting different metadata in batch creation (e.g.
319    /// the join of or an antichain of the original contained timestamps), but
320    /// the experience of database-issues#7825 has shaken our confidence in our own abilities
321    /// to reason about partially ordered times and anyway all the initial uses
322    /// have totally ordered times.
323    pub fn rewrite_ts(
324        &mut self,
325        frontier: &Antichain<T>,
326        new_upper: Antichain<T>,
327    ) -> Result<(), InvalidUsage<T>> {
328        self.batch
329            .rewrite_ts(frontier, new_upper)
330            .map_err(InvalidUsage::InvalidRewrite)
331    }
332}
333
334/// Indicates what work was done in a call to [BatchBuilder::add]
335#[derive(Debug)]
336pub enum Added {
337    /// A record was inserted into a pending batch part
338    Record,
339    /// A record was inserted into a pending batch part
340    /// and the part was sent to blob storage
341    RecordAndParts,
342}
343
344/// A snapshot of dynamic configs to make it easier to reason about an individual
345/// run of BatchBuilder.
346#[derive(Debug, Clone)]
347pub struct BatchBuilderConfig {
348    writer_key: WriterKey,
349    pub(crate) blob_target_size: usize,
350    pub(crate) batch_delete_enabled: bool,
351    pub(crate) batch_builder_max_outstanding_parts: usize,
352    pub(crate) inline_writes_single_max_bytes: usize,
353    pub(crate) stats_collection_enabled: bool,
354    pub(crate) stats_budget: usize,
355    pub(crate) stats_untrimmable_columns: Arc<UntrimmableColumns>,
356    pub(crate) encoding_config: EncodingConfig,
357    pub(crate) preferred_order: RunOrder,
358    pub(crate) structured_key_lower_len: usize,
359    pub(crate) run_length_limit: usize,
360    pub(crate) enable_incremental_compaction: bool,
361    /// The number of runs to cap the built batch at, or None if we should
362    /// continue to generate one run per part for unordered batches.
363    /// See the config definition for details.
364    pub(crate) max_runs: Option<usize>,
365}
366
367// TODO: Remove this once we're comfortable that there aren't any bugs.
368pub(crate) const BATCH_DELETE_ENABLED: Config<bool> = Config::new(
369    "persist_batch_delete_enabled",
370    true,
371    "Whether to actually delete blobs when batch delete is called (Materialize).",
372);
373
374pub(crate) const ENCODING_ENABLE_DICTIONARY: Config<bool> = Config::new(
375    "persist_encoding_enable_dictionary",
376    true,
377    "A feature flag to enable dictionary encoding for Parquet data (Materialize).",
378);
379
380pub(crate) const ENCODING_COMPRESSION_FORMAT: Config<&'static str> = Config::new(
381    "persist_encoding_compression_format",
382    "none",
383    "A feature flag to enable compression of Parquet data (Materialize).",
384);
385
386pub(crate) const STRUCTURED_KEY_LOWER_LEN: Config<usize> = Config::new(
387    "persist_batch_structured_key_lower_len",
388    256,
389    "The maximum size in proto bytes of any structured key-lower metadata to preserve. \
390    (If we're unable to fit the lower in budget, or the budget is zero, no metadata is kept.)",
391);
392
393pub(crate) const MAX_RUN_LEN: Config<usize> = Config::new(
394    "persist_batch_max_run_len",
395    usize::MAX,
396    "The maximum length a run can have before it will be spilled as a hollow run \
397    into the blob store.",
398);
399
400pub(crate) const MAX_RUNS: Config<usize> = Config::new(
401    "persist_batch_max_runs",
402    1,
403    "The maximum number of runs a batch builder should generate for user batches. \
404    (Compaction outputs always generate a single run.) \
405    The minimum value is 2; below this, compaction is disabled.",
406);
407
408/// A target maximum size of blob payloads in bytes. If a logical "batch" is
409/// bigger than this, it will be broken up into smaller, independent pieces.
410/// This is best-effort, not a guarantee (though as of 2022-06-09, we happen to
411/// always respect it). This target size doesn't apply for an individual update
412/// that exceeds it in size, but that scenario is almost certainly a mis-use of
413/// the system.
414pub(crate) const BLOB_TARGET_SIZE: Config<usize> = Config::new(
415    "persist_blob_target_size",
416    128 * MiB,
417    "A target maximum size of persist blob payloads in bytes (Materialize).",
418);
419
420pub(crate) const INLINE_WRITES_SINGLE_MAX_BYTES: Config<usize> = Config::new(
421    "persist_inline_writes_single_max_bytes",
422    4096,
423    "The (exclusive) maximum size of a write that persist will inline in metadata.",
424);
425
426pub(crate) const INLINE_WRITES_TOTAL_MAX_BYTES: Config<usize> = Config::new(
427    "persist_inline_writes_total_max_bytes",
428    1 * MiB,
429    "\
430    The (exclusive) maximum total size of inline writes in metadata before \
431    persist will backpressure them by flushing out to s3.",
432);
433
434impl BatchBuilderConfig {
435    /// Initialize a batch builder config based on a snapshot of the Persist config.
436    pub fn new(value: &PersistConfig, _shard_id: ShardId) -> Self {
437        let writer_key = WriterKey::for_version(&value.build_version);
438
439        let preferred_order = RunOrder::Structured;
440
441        BatchBuilderConfig {
442            writer_key,
443            blob_target_size: BLOB_TARGET_SIZE.get(value).clamp(1, usize::MAX),
444            batch_delete_enabled: BATCH_DELETE_ENABLED.get(value),
445            batch_builder_max_outstanding_parts: BATCH_BUILDER_MAX_OUTSTANDING_PARTS.get(value),
446            inline_writes_single_max_bytes: INLINE_WRITES_SINGLE_MAX_BYTES.get(value),
447            stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value),
448            stats_budget: STATS_BUDGET_BYTES.get(value),
449            stats_untrimmable_columns: Arc::new(untrimmable_columns(value)),
450            encoding_config: EncodingConfig {
451                use_dictionary: ENCODING_ENABLE_DICTIONARY.get(value),
452                compression: CompressionFormat::from_str(&ENCODING_COMPRESSION_FORMAT.get(value)),
453            },
454            preferred_order,
455            structured_key_lower_len: STRUCTURED_KEY_LOWER_LEN.get(value),
456            run_length_limit: MAX_RUN_LEN.get(value).clamp(2, usize::MAX),
457            max_runs: match MAX_RUNS.get(value) {
458                limit @ 2.. => Some(limit),
459                _ => None,
460            },
461            enable_incremental_compaction: ENABLE_INCREMENTAL_COMPACTION.get(value),
462        }
463    }
464}
465
466/// A list of (lowercase) column names that persist will always retain
467/// stats for, even if it means going over the stats budget.
468#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Arbitrary)]
469pub(crate) struct UntrimmableColumns {
470    /// Always retain columns whose lowercased names exactly equal any of these strings.
471    pub equals: Vec<Cow<'static, str>>,
472    /// Always retain columns whose lowercased names start with any of these strings.
473    pub prefixes: Vec<Cow<'static, str>>,
474    /// Always retain columns whose lowercased names end with any of these strings.
475    pub suffixes: Vec<Cow<'static, str>>,
476}
477
478impl UntrimmableColumns {
479    pub(crate) fn should_retain(&self, name: &str) -> bool {
480        // TODO: see if there's a better way to match different formats than lowercasing
481        // https://github.com/MaterializeInc/database-issues/issues/6421#issue-1863623805
482        let name_lower = name.to_lowercase();
483        for s in &self.equals {
484            if *s == name_lower {
485                return true;
486            }
487        }
488        for s in &self.prefixes {
489            if name_lower.starts_with(s.as_ref()) {
490                return true;
491            }
492        }
493        for s in &self.suffixes {
494            if name_lower.ends_with(s.as_ref()) {
495                return true;
496            }
497        }
498        false
499    }
500}
501
502/// A builder for [Batches](Batch) that allows adding updates piece by piece and
503/// then finishing it.
504#[derive(Debug)]
505pub struct BatchBuilder<K, V, T, D>
506where
507    K: Codec,
508    V: Codec,
509    T: Timestamp + Lattice + Codec64,
510{
511    inline_desc: Description<T>,
512    inclusive_upper: Antichain<Reverse<T>>,
513
514    records_builder: PartBuilder<K, K::Schema, V, V::Schema>,
515    pub(crate) builder: BatchBuilderInternal<K, V, T, D>,
516}
517
518impl<K, V, T, D> BatchBuilder<K, V, T, D>
519where
520    K: Debug + Codec,
521    V: Debug + Codec,
522    T: Timestamp + Lattice + Codec64,
523    D: Monoid + Codec64,
524{
525    pub(crate) fn new(
526        builder: BatchBuilderInternal<K, V, T, D>,
527        inline_desc: Description<T>,
528    ) -> Self {
529        let records_builder = PartBuilder::new(
530            builder.write_schemas.key.as_ref(),
531            builder.write_schemas.val.as_ref(),
532        );
533        Self {
534            inline_desc,
535            inclusive_upper: Antichain::new(),
536            records_builder,
537            builder,
538        }
539    }
540
541    /// Finish writing this batch and return a handle to the written batch.
542    ///
543    /// This fails if any of the updates in this batch are beyond the given
544    /// `upper`.
545    pub async fn finish(
546        mut self,
547        registered_upper: Antichain<T>,
548    ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
549        if PartialOrder::less_than(&registered_upper, self.inline_desc.lower()) {
550            return Err(InvalidUsage::InvalidBounds {
551                lower: self.inline_desc.lower().clone(),
552                upper: registered_upper,
553            });
554        }
555
556        // When since is less than or equal to lower, the upper is a strict bound
557        // on the updates' timestamp because no advancement has been performed. Because user batches
558        // are always unadvanced, this ensures that new updates are recorded with valid timestamps.
559        // Otherwise, we can make no assumptions about the timestamps
560        if PartialOrder::less_equal(self.inline_desc.since(), self.inline_desc.lower()) {
561            for ts in self.inclusive_upper.iter() {
562                if registered_upper.less_equal(&ts.0) {
563                    return Err(InvalidUsage::UpdateBeyondUpper {
564                        ts: ts.0.clone(),
565                        expected_upper: registered_upper.clone(),
566                    });
567                }
568            }
569        }
570
571        let updates = self.records_builder.finish();
572        self.builder
573            .flush_part(self.inline_desc.clone(), updates)
574            .await;
575
576        self.builder
577            .finish(Description::new(
578                self.inline_desc.lower().clone(),
579                registered_upper,
580                self.inline_desc.since().clone(),
581            ))
582            .await
583    }
584
585    /// Adds the given update to the batch.
586    ///
587    /// The update timestamp must be greater or equal to `lower` that was given
588    /// when creating this [BatchBuilder].
589    pub async fn add(
590        &mut self,
591        key: &K,
592        val: &V,
593        ts: &T,
594        diff: &D,
595    ) -> Result<Added, InvalidUsage<T>> {
596        if !self.inline_desc.lower().less_equal(ts) {
597            return Err(InvalidUsage::UpdateNotBeyondLower {
598                ts: ts.clone(),
599                lower: self.inline_desc.lower().clone(),
600            });
601        }
602        self.inclusive_upper.insert(Reverse(ts.clone()));
603
604        let added = {
605            self.records_builder
606                .push(key, val, ts.clone(), diff.clone());
607            if self.records_builder.goodbytes() >= self.builder.parts.cfg.blob_target_size {
608                let part = self.records_builder.finish_and_replace(
609                    self.builder.write_schemas.key.as_ref(),
610                    self.builder.write_schemas.val.as_ref(),
611                );
612                Some(part)
613            } else {
614                None
615            }
616        };
617
618        let added = if let Some(full_batch) = added {
619            self.builder
620                .flush_part(self.inline_desc.clone(), full_batch)
621                .await;
622            Added::RecordAndParts
623        } else {
624            Added::Record
625        };
626        Ok(added)
627    }
628}
629
630#[derive(Debug)]
631pub(crate) struct BatchBuilderInternal<K, V, T, D>
632where
633    K: Codec,
634    V: Codec,
635    T: Timestamp + Lattice + Codec64,
636{
637    shard_id: ShardId,
638    version: Version,
639    blob: Arc<dyn Blob>,
640    metrics: Arc<Metrics>,
641
642    write_schemas: Schemas<K, V>,
643    parts: BatchParts<T>,
644
645    // These provide a bit more safety against appending a batch with the wrong
646    // type to a shard.
647    _phantom: PhantomData<fn(K, V, T, D)>,
648}
649
650impl<K, V, T, D> BatchBuilderInternal<K, V, T, D>
651where
652    K: Debug + Codec,
653    V: Debug + Codec,
654    T: Timestamp + Lattice + Codec64,
655    D: Monoid + Codec64,
656{
657    pub(crate) fn new(
658        _cfg: BatchBuilderConfig,
659        parts: BatchParts<T>,
660        metrics: Arc<Metrics>,
661        write_schemas: Schemas<K, V>,
662        blob: Arc<dyn Blob>,
663        shard_id: ShardId,
664        version: Version,
665    ) -> Self {
666        Self {
667            blob,
668            metrics,
669            write_schemas,
670            parts,
671            shard_id,
672            version,
673            _phantom: PhantomData,
674        }
675    }
676
677    /// Finish writing this batch and return a handle to the written batch.
678    ///
679    /// This fails if any of the updates in this batch are beyond the given
680    /// `upper`.
681    #[instrument(level = "debug", name = "batch::finish", fields(shard = %self.shard_id))]
682    pub async fn finish(
683        self,
684        registered_desc: Description<T>,
685    ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
686        let write_run_ids = self.parts.cfg.enable_incremental_compaction;
687        let batch_delete_enabled = self.parts.cfg.batch_delete_enabled;
688        let shard_metrics = Arc::clone(&self.parts.shard_metrics);
689        let runs = self.parts.finish().await;
690
691        let mut run_parts = vec![];
692        let mut run_splits = vec![];
693        let mut run_meta = vec![];
694        let total_updates = runs
695            .iter()
696            .map(|(_, _, num_updates)| num_updates)
697            .sum::<usize>();
698        for (order, parts, num_updates) in runs {
699            if parts.is_empty() {
700                continue;
701            }
702            if run_parts.len() != 0 {
703                run_splits.push(run_parts.len());
704            }
705            run_meta.push(RunMeta {
706                order: Some(order),
707                schema: self.write_schemas.id,
708                // Field has been deprecated but kept around to roundtrip state.
709                deprecated_schema: None,
710                id: if write_run_ids {
711                    Some(RunId::new())
712                } else {
713                    None
714                },
715                len: if write_run_ids {
716                    Some(num_updates)
717                } else {
718                    None
719                },
720                meta: MetadataMap::default(),
721            });
722            run_parts.extend(parts);
723        }
724        let desc = registered_desc;
725
726        let batch = Batch::new(
727            batch_delete_enabled,
728            Arc::clone(&self.metrics),
729            self.blob,
730            shard_metrics,
731            self.version,
732            HollowBatch::new(desc, run_parts, total_updates, run_meta, run_splits),
733        );
734
735        Ok(batch)
736    }
737
738    /// Flushes the current part to Blob storage, first consolidating and then
739    /// columnar encoding the updates. It is the caller's responsibility to
740    /// chunk `current_part` to be no greater than
741    /// [BatchBuilderConfig::blob_target_size], and must absolutely be less than
742    /// [mz_persist::indexed::columnar::KEY_VAL_DATA_MAX_LEN]
743    pub async fn flush_part(&mut self, part_desc: Description<T>, columnar: Part) {
744        let num_updates = columnar.len();
745        if num_updates == 0 {
746            return;
747        }
748        let diffs_sum = diffs_sum::<D>(&columnar.diff);
749
750        let start = Instant::now();
751        self.parts
752            .write(&self.write_schemas, part_desc, columnar, diffs_sum)
753            .await;
754        self.metrics
755            .compaction
756            .batch
757            .step_part_writing
758            .inc_by(start.elapsed().as_secs_f64());
759    }
760}
761
762#[derive(Debug, Clone)]
763pub(crate) struct RunWithMeta<T> {
764    pub parts: Vec<RunPart<T>>,
765    pub num_updates: usize,
766}
767
768impl<T> RunWithMeta<T> {
769    pub fn new(parts: Vec<RunPart<T>>, num_updates: usize) -> Self {
770        Self { parts, num_updates }
771    }
772
773    pub fn single(part: RunPart<T>, num_updates: usize) -> Self {
774        Self {
775            parts: vec![part],
776            num_updates,
777        }
778    }
779}
780
781#[derive(Debug)]
782enum WritingRuns<T> {
783    /// Building a single run with the specified ordering. Parts are expected to be internally
784    /// sorted and added in order. Merging a vec of parts will shift them out to a hollow run
785    /// in blob, bounding the total length of a run in memory.
786    Ordered(RunOrder, MergeTree<Pending<RunWithMeta<T>>>),
787    /// Building multiple runs which may have different orders. Merging a vec of runs will cause
788    /// them to be compacted together, bounding the total number of runs we generate.
789    Compacting(MergeTree<(RunOrder, Pending<RunWithMeta<T>>)>),
790}
791
792// TODO: If this is dropped, cancel (and delete?) any writing parts and delete
793// any finished ones.
794#[derive(Debug)]
795pub(crate) struct BatchParts<T> {
796    cfg: BatchBuilderConfig,
797    metrics: Arc<Metrics>,
798    shard_metrics: Arc<ShardMetrics>,
799    shard_id: ShardId,
800    blob: Arc<dyn Blob>,
801    isolated_runtime: Arc<IsolatedRuntime>,
802    next_index: u64,
803    writing_runs: WritingRuns<T>,
804    batch_metrics: BatchWriteMetrics,
805}
806
807impl<T: Timestamp + Codec64> BatchParts<T> {
808    pub(crate) fn new_compacting<K, V, D>(
809        cfg: CompactConfig,
810        desc: Description<T>,
811        runs_per_compaction: usize,
812        metrics: Arc<Metrics>,
813        shard_metrics: Arc<ShardMetrics>,
814        shard_id: ShardId,
815        blob: Arc<dyn Blob>,
816        isolated_runtime: Arc<IsolatedRuntime>,
817        batch_metrics: &BatchWriteMetrics,
818        schemas: Schemas<K, V>,
819    ) -> Self
820    where
821        K: Codec + Debug,
822        V: Codec + Debug,
823        T: Lattice + Send + Sync,
824        D: Monoid + Ord + Codec64 + Send + Sync,
825    {
826        let writing_runs = {
827            let cfg = cfg.clone();
828            let blob = Arc::clone(&blob);
829            let metrics = Arc::clone(&metrics);
830            let shard_metrics = Arc::clone(&shard_metrics);
831            let isolated_runtime = Arc::clone(&isolated_runtime);
832            // Clamping to prevent extreme values given weird configs.
833            let runs_per_compaction = runs_per_compaction.clamp(2, 1024);
834
835            let merge_fn = move |parts: Vec<(RunOrder, Pending<RunWithMeta<T>>)>| {
836                let blob = Arc::clone(&blob);
837                let metrics = Arc::clone(&metrics);
838                let shard_metrics = Arc::clone(&shard_metrics);
839                let cfg = cfg.clone();
840                let isolated_runtime = Arc::clone(&isolated_runtime);
841                let write_schemas = schemas.clone();
842                let compact_desc = desc.clone();
843                let handle = mz_ore::task::spawn(
844                    || "batch::compact_runs",
845                    async move {
846                        let runs: Vec<_> = stream::iter(parts)
847                            .then(|(order, parts)| async move {
848                                let completed_run = parts.into_result().await;
849                                (
850                                    RunMeta {
851                                        order: Some(order),
852                                        schema: schemas.id,
853                                        // Field has been deprecated but kept around to
854                                        // roundtrip state.
855                                        deprecated_schema: None,
856                                        id: if cfg.batch.enable_incremental_compaction {
857                                            Some(RunId::new())
858                                        } else {
859                                            None
860                                        },
861                                        len: if cfg.batch.enable_incremental_compaction {
862                                            Some(completed_run.num_updates)
863                                        } else {
864                                            None
865                                        },
866                                        meta: MetadataMap::default(),
867                                    },
868                                    completed_run.parts,
869                                )
870                            })
871                            .collect()
872                            .await;
873
874                        let run_refs: Vec<_> = runs
875                            .iter()
876                            .map(|(meta, run)| (&compact_desc, meta, run.as_slice()))
877                            .collect();
878
879                        let output_batch = Compactor::<K, V, T, D>::compact_runs(
880                            &cfg,
881                            &shard_id,
882                            &compact_desc,
883                            run_refs,
884                            blob,
885                            metrics,
886                            shard_metrics,
887                            isolated_runtime,
888                            write_schemas,
889                        )
890                        .await
891                        .expect("successful compaction");
892
893                        assert_eq!(
894                            output_batch.run_meta.len(),
895                            1,
896                            "compaction is guaranteed to emit a single run"
897                        );
898                        let total_compacted_updates: usize = output_batch.len;
899
900                        RunWithMeta::new(output_batch.parts, total_compacted_updates)
901                    }
902                    .instrument(debug_span!("batch::compact_runs")),
903                );
904                (RunOrder::Structured, Pending::new(handle))
905            };
906            WritingRuns::Compacting(MergeTree::new(runs_per_compaction, merge_fn))
907        };
908        BatchParts {
909            cfg: cfg.batch,
910            metrics,
911            shard_metrics,
912            shard_id,
913            blob,
914            isolated_runtime,
915            next_index: 0,
916            writing_runs,
917            batch_metrics: batch_metrics.clone(),
918        }
919    }
920
921    pub(crate) fn new_ordered<D: Monoid + Codec64>(
922        cfg: BatchBuilderConfig,
923        order: RunOrder,
924        metrics: Arc<Metrics>,
925        shard_metrics: Arc<ShardMetrics>,
926        shard_id: ShardId,
927        blob: Arc<dyn Blob>,
928        isolated_runtime: Arc<IsolatedRuntime>,
929        batch_metrics: &BatchWriteMetrics,
930    ) -> Self {
931        let writing_runs = {
932            let cfg = cfg.clone();
933            let blob = Arc::clone(&blob);
934            let metrics = Arc::clone(&metrics);
935            let writer_key = cfg.writer_key.clone();
936            // Don't spill "unordered" runs to S3, since we'll split them up into many single-element
937            // runs below.
938            let run_length_limit = (order == RunOrder::Unordered)
939                .then_some(usize::MAX)
940                .unwrap_or(cfg.run_length_limit);
941            let merge_fn = move |parts: Vec<Pending<RunWithMeta<T>>>| {
942                let blob = Arc::clone(&blob);
943                let writer_key = writer_key.clone();
944                let metrics = Arc::clone(&metrics);
945                let handle = mz_ore::task::spawn(
946                    || "batch::spill_run",
947                    async move {
948                        let completed_runs: Vec<RunWithMeta<T>> = stream::iter(parts)
949                            .then(|p| p.into_result())
950                            .collect()
951                            .await;
952
953                        let mut all_run_parts = Vec::new();
954                        let mut total_updates = 0;
955
956                        for completed_run in completed_runs {
957                            all_run_parts.extend(completed_run.parts);
958                            total_updates += completed_run.num_updates;
959                        }
960
961                        let run_ref = HollowRunRef::set::<D>(
962                            shard_id,
963                            blob.as_ref(),
964                            &writer_key,
965                            HollowRun {
966                                parts: all_run_parts,
967                            },
968                            &*metrics,
969                        )
970                        .await;
971
972                        RunWithMeta::single(RunPart::Many(run_ref), total_updates)
973                    }
974                    .instrument(debug_span!("batch::spill_run")),
975                );
976                Pending::new(handle)
977            };
978            WritingRuns::Ordered(order, MergeTree::new(run_length_limit, merge_fn))
979        };
980        BatchParts {
981            cfg,
982            metrics,
983            shard_metrics,
984            shard_id,
985            blob,
986            isolated_runtime,
987            next_index: 0,
988            writing_runs,
989            batch_metrics: batch_metrics.clone(),
990        }
991    }
992
993    pub(crate) fn expected_order(&self) -> RunOrder {
994        match self.writing_runs {
995            WritingRuns::Ordered(order, _) => order,
996            WritingRuns::Compacting(_) => RunOrder::Unordered,
997        }
998    }
999
1000    pub(crate) async fn write<K: Codec, V: Codec, D: Codec64>(
1001        &mut self,
1002        write_schemas: &Schemas<K, V>,
1003        desc: Description<T>,
1004        updates: Part,
1005        diffs_sum: D,
1006    ) {
1007        let batch_metrics = self.batch_metrics.clone();
1008        let index = self.next_index;
1009        self.next_index += 1;
1010        let num_updates = updates.len();
1011        let ts_rewrite = None;
1012        let schema_id = write_schemas.id;
1013
1014        // If we're going to encode structured data then halve our limit since we're storing
1015        // it twice, once as binary encoded and once as structured.
1016        let inline_threshold = self.cfg.inline_writes_single_max_bytes;
1017
1018        let updates = BlobTraceUpdates::from_part(updates);
1019        let (name, write_future) = if updates.goodbytes() < inline_threshold {
1020            let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current();
1021            (
1022                "batch::inline_part",
1023                async move {
1024                    let start = Instant::now();
1025                    let updates = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
1026                        desc: Some(desc.into_proto()),
1027                        index: index.into_proto(),
1028                        updates: Some(updates.into_proto()),
1029                    });
1030                    batch_metrics
1031                        .step_inline
1032                        .inc_by(start.elapsed().as_secs_f64());
1033
1034                    RunWithMeta::single(
1035                        RunPart::Single(BatchPart::Inline {
1036                            updates,
1037                            ts_rewrite,
1038                            schema_id,
1039                            // Field has been deprecated but kept around to roundtrip state.
1040                            deprecated_schema_id: None,
1041                        }),
1042                        num_updates,
1043                    )
1044                }
1045                .instrument(span)
1046                .boxed(),
1047            )
1048        } else {
1049            let part = BlobTraceBatchPart {
1050                desc,
1051                updates,
1052                index,
1053            };
1054            let cfg = self.cfg.clone();
1055            let blob = Arc::clone(&self.blob);
1056            let metrics = Arc::clone(&self.metrics);
1057            let shard_metrics = Arc::clone(&self.shard_metrics);
1058            let isolated_runtime = Arc::clone(&self.isolated_runtime);
1059            let expected_order = self.expected_order();
1060            let encoded_diffs_sum = D::encode(&diffs_sum);
1061            let write_schemas_clone = write_schemas.clone();
1062            let write_span =
1063                debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current();
1064            (
1065                "batch::write_part",
1066                async move {
1067                    let part = BatchParts::write_hollow_part(
1068                        cfg,
1069                        blob,
1070                        metrics,
1071                        shard_metrics,
1072                        batch_metrics,
1073                        isolated_runtime,
1074                        part,
1075                        expected_order,
1076                        ts_rewrite,
1077                        encoded_diffs_sum,
1078                        write_schemas_clone,
1079                    )
1080                    .await;
1081                    RunWithMeta::single(RunPart::Single(part), num_updates)
1082                }
1083                .instrument(write_span)
1084                .boxed(),
1085            )
1086        };
1087
1088        match &mut self.writing_runs {
1089            WritingRuns::Ordered(_order, run) => {
1090                let part = Pending::new(mz_ore::task::spawn(|| name, write_future));
1091                run.push(part);
1092
1093                // If there are more than the max outstanding parts, block on all but the
1094                //  most recent.
1095                for part in run
1096                    .iter_mut()
1097                    .rev()
1098                    .skip(self.cfg.batch_builder_max_outstanding_parts)
1099                    .take_while(|p| !p.is_finished())
1100                {
1101                    self.batch_metrics.write_stalls.inc();
1102                    part.block_until_ready().await;
1103                }
1104            }
1105            WritingRuns::Compacting(batches) => {
1106                let run = Pending::Writing(mz_ore::task::spawn(|| name, write_future));
1107                batches.push((RunOrder::Unordered, run));
1108
1109                // Allow up to `max_outstanding_parts` (or one compaction) to be pending, and block
1110                // on the rest.
1111                let mut part_budget = self.cfg.batch_builder_max_outstanding_parts;
1112                let mut compaction_budget = 1;
1113                for (_, part) in batches
1114                    .iter_mut()
1115                    .rev()
1116                    .skip_while(|(order, _)| match order {
1117                        RunOrder::Unordered if part_budget > 0 => {
1118                            part_budget -= 1;
1119                            true
1120                        }
1121                        RunOrder::Structured | RunOrder::Codec if compaction_budget > 0 => {
1122                            compaction_budget -= 1;
1123                            true
1124                        }
1125                        _ => false,
1126                    })
1127                    .take_while(|(_, p)| !p.is_finished())
1128                {
1129                    self.batch_metrics.write_stalls.inc();
1130                    part.block_until_ready().await;
1131                }
1132            }
1133        }
1134    }
1135
1136    async fn write_hollow_part<K: Codec, V: Codec>(
1137        cfg: BatchBuilderConfig,
1138        blob: Arc<dyn Blob>,
1139        metrics: Arc<Metrics>,
1140        shard_metrics: Arc<ShardMetrics>,
1141        batch_metrics: BatchWriteMetrics,
1142        isolated_runtime: Arc<IsolatedRuntime>,
1143        mut updates: BlobTraceBatchPart<T>,
1144        run_order: RunOrder,
1145        ts_rewrite: Option<Antichain<T>>,
1146        diffs_sum: [u8; 8],
1147        write_schemas: Schemas<K, V>,
1148    ) -> BatchPart<T> {
1149        let partial_key = PartialBatchKey::new(&cfg.writer_key, &PartId::new());
1150        let key = partial_key.complete(&shard_metrics.shard_id);
1151        let goodbytes = updates.updates.goodbytes();
1152        let metrics_ = Arc::clone(&metrics);
1153        let schema_id = write_schemas.id;
1154
1155        let (stats, key_lower, structured_key_lower, (buf, encode_time)) = isolated_runtime
1156            .spawn_named(|| "batch::encode_part", async move {
1157                // Measure the expensive steps of the part build - re-encoding and stats collection.
1158                let stats = metrics_.columnar.arrow().measure_part_build(|| {
1159                    let stats = if cfg.stats_collection_enabled {
1160                        let ext = updates.updates.get_or_make_structured::<K, V>(
1161                            write_schemas.key.as_ref(),
1162                            write_schemas.val.as_ref(),
1163                        );
1164
1165                        let key_stats = write_schemas
1166                            .key
1167                            .decoder_any(ext.key.as_ref())
1168                            .expect("decoding just-encoded data")
1169                            .stats();
1170
1171                        let part_stats = PartStats { key: key_stats };
1172
1173                        // Collect stats about the updates, if stats collection is enabled.
1174                        let trimmed_start = Instant::now();
1175                        let mut trimmed_bytes = 0;
1176                        let trimmed_stats = LazyPartStats::encode(&part_stats, |s| {
1177                            trimmed_bytes = trim_to_budget(s, cfg.stats_budget, |s| {
1178                                cfg.stats_untrimmable_columns.should_retain(s)
1179                            })
1180                        });
1181                        let trimmed_duration = trimmed_start.elapsed();
1182                        Some((trimmed_stats, trimmed_duration, trimmed_bytes))
1183                    } else {
1184                        None
1185                    };
1186
1187                    // Ensure the updates are in the specified columnar format before encoding.
1188                    updates.updates = updates.updates.as_structured::<K, V>(
1189                        write_schemas.key.as_ref(),
1190                        write_schemas.val.as_ref(),
1191                    );
1192
1193                    stats
1194                });
1195
1196                let key_lower = if let Some(records) = updates.updates.records() {
1197                    let key_bytes = records.keys();
1198                    if key_bytes.is_empty() {
1199                        &[]
1200                    } else if run_order == RunOrder::Codec {
1201                        key_bytes.value(0)
1202                    } else {
1203                        ::arrow::compute::min_binary(key_bytes).expect("min of nonempty array")
1204                    }
1205                } else {
1206                    &[]
1207                };
1208                let key_lower = truncate_bytes(key_lower, TRUNCATE_LEN, TruncateBound::Lower)
1209                    .expect("lower bound always exists");
1210
1211                let structured_key_lower = if cfg.structured_key_lower_len > 0 {
1212                    updates.updates.structured().and_then(|ext| {
1213                        let min_key = if run_order == RunOrder::Structured {
1214                            0
1215                        } else {
1216                            let ord = ArrayOrd::new(ext.key.as_ref());
1217                            (0..ext.key.len())
1218                                .min_by_key(|i| ord.at(*i))
1219                                .expect("non-empty batch")
1220                        };
1221                        let lower = ArrayBound::new(Arc::clone(&ext.key), min_key)
1222                            .to_proto_lower(cfg.structured_key_lower_len);
1223                        if lower.is_none() {
1224                            batch_metrics.key_lower_too_big.inc()
1225                        }
1226                        lower.map(|proto| LazyProto::from(&proto))
1227                    })
1228                } else {
1229                    None
1230                };
1231
1232                let encode_start = Instant::now();
1233                let mut buf = Vec::new();
1234                updates.encode(&mut buf, &metrics_.columnar, &cfg.encoding_config);
1235
1236                // Drop batch as soon as we can to reclaim its memory.
1237                drop(updates);
1238                (
1239                    stats,
1240                    key_lower,
1241                    structured_key_lower,
1242                    (Bytes::from(buf), encode_start.elapsed()),
1243                )
1244            })
1245            .instrument(debug_span!("batch::encode_part"))
1246            .await
1247            .expect("part encode task failed");
1248        // Can't use the `CodecMetrics::encode` helper because of async.
1249        metrics.codecs.batch.encode_count.inc();
1250        metrics
1251            .codecs
1252            .batch
1253            .encode_seconds
1254            .inc_by(encode_time.as_secs_f64());
1255
1256        let start = Instant::now();
1257        let payload_len = buf.len();
1258        let () = retry_external(&metrics.retries.external.batch_set, || async {
1259            shard_metrics.blob_sets.inc();
1260            blob.set(&key, Bytes::clone(&buf)).await
1261        })
1262        .instrument(trace_span!("batch::set", payload_len))
1263        .await;
1264        batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64());
1265        batch_metrics.bytes.inc_by(u64::cast_from(payload_len));
1266        batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes));
1267        match run_order {
1268            RunOrder::Unordered => batch_metrics.unordered.inc(),
1269            RunOrder::Codec => batch_metrics.codec_order.inc(),
1270            RunOrder::Structured => batch_metrics.structured_order.inc(),
1271        }
1272        let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| {
1273            batch_metrics
1274                .step_stats
1275                .inc_by(stats_step_timing.as_secs_f64());
1276            if trimmed_bytes > 0 {
1277                metrics.pushdown.parts_stats_trimmed_count.inc();
1278                metrics
1279                    .pushdown
1280                    .parts_stats_trimmed_bytes
1281                    .inc_by(u64::cast_from(trimmed_bytes));
1282            }
1283            stats
1284        });
1285
1286        let meta = MetadataMap::default();
1287        BatchPart::Hollow(HollowBatchPart {
1288            key: partial_key,
1289            meta,
1290            encoded_size_bytes: payload_len,
1291            key_lower,
1292            structured_key_lower,
1293            stats,
1294            ts_rewrite,
1295            diffs_sum: Some(diffs_sum),
1296            format: Some(BatchColumnarFormat::Structured),
1297            schema_id,
1298            // Field has been deprecated but kept around to roundtrip state.
1299            deprecated_schema_id: None,
1300        })
1301    }
1302
1303    #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))]
1304    pub(crate) async fn finish(self) -> Vec<(RunOrder, Vec<RunPart<T>>, usize)> {
1305        match self.writing_runs {
1306            WritingRuns::Ordered(RunOrder::Unordered, run) => {
1307                let completed_runs = run.finish();
1308                let mut output = Vec::with_capacity(completed_runs.len());
1309                for completed_run in completed_runs {
1310                    let completed_run = completed_run.into_result().await;
1311                    // Each part becomes its own run for unordered case
1312                    for part in completed_run.parts {
1313                        output.push((RunOrder::Unordered, vec![part], completed_run.num_updates));
1314                    }
1315                }
1316                output
1317            }
1318            WritingRuns::Ordered(order, run) => {
1319                let completed_runs = run.finish();
1320                let mut all_parts = Vec::new();
1321                let mut all_update_counts = 0;
1322                for completed_run in completed_runs {
1323                    let completed_run = completed_run.into_result().await;
1324                    all_parts.extend(completed_run.parts);
1325                    all_update_counts += completed_run.num_updates;
1326                }
1327                vec![(order, all_parts, all_update_counts)]
1328            }
1329            WritingRuns::Compacting(batches) => {
1330                let runs = batches.finish();
1331                let mut output = Vec::new();
1332                for (order, run) in runs {
1333                    let completed_run = run.into_result().await;
1334                    output.push((order, completed_run.parts, completed_run.num_updates));
1335                }
1336                output
1337            }
1338        }
1339    }
1340}
1341
1342pub(crate) fn validate_truncate_batch<T: Timestamp>(
1343    batch: &HollowBatch<T>,
1344    truncate: &Description<T>,
1345    any_batch_rewrite: bool,
1346    validate_part_bounds_on_write: bool,
1347) -> Result<(), InvalidUsage<T>> {
1348    // If rewrite_ts is used, we don't allow truncation, to keep things simpler
1349    // to reason about.
1350    if any_batch_rewrite {
1351        // We allow a new upper to be specified at rewrite time, so that's easy:
1352        // it must match exactly. This is both consistent with the upper
1353        // requirement below and proves that there is no data to truncate past
1354        // the upper.
1355        if truncate.upper() != batch.desc.upper() {
1356            return Err(InvalidUsage::InvalidRewrite(format!(
1357                "rewritten batch might have data past {:?} up to {:?}",
1358                truncate.upper().elements(),
1359                batch.desc.upper().elements(),
1360            )));
1361        }
1362        // To prove that there is no data to truncate below the lower, require
1363        // that the lower is <= the rewrite ts.
1364        for part in batch.parts.iter() {
1365            let part_lower_bound = part.ts_rewrite().unwrap_or_else(|| batch.desc.lower());
1366            if !PartialOrder::less_equal(truncate.lower(), part_lower_bound) {
1367                return Err(InvalidUsage::InvalidRewrite(format!(
1368                    "rewritten batch might have data below {:?} at {:?}",
1369                    truncate.lower().elements(),
1370                    part_lower_bound.elements(),
1371                )));
1372            }
1373        }
1374    }
1375
1376    if !validate_part_bounds_on_write {
1377        return Ok(());
1378    }
1379
1380    let batch = &batch.desc;
1381    if !PartialOrder::less_equal(batch.lower(), truncate.lower())
1382        || PartialOrder::less_than(batch.upper(), truncate.upper())
1383    {
1384        return Err(InvalidUsage::InvalidBatchBounds {
1385            batch_lower: batch.lower().clone(),
1386            batch_upper: batch.upper().clone(),
1387            append_lower: truncate.lower().clone(),
1388            append_upper: truncate.upper().clone(),
1389        });
1390    }
1391
1392    Ok(())
1393}
1394
1395#[derive(Debug)]
1396pub(crate) struct PartDeletes<T> {
1397    /// Keys to hollow parts or runs that we're ready to delete.
1398    blob_keys: BTreeSet<PartialBatchKey>,
1399    /// Keys to hollow runs that may not have had all their parts deleted (or added to blob_keys) yet.
1400    hollow_runs: BTreeMap<PartialBatchKey, HollowRunRef<T>>,
1401}
1402
1403impl<T> Default for PartDeletes<T> {
1404    fn default() -> Self {
1405        Self {
1406            blob_keys: Default::default(),
1407            hollow_runs: Default::default(),
1408        }
1409    }
1410}
1411
1412impl<T: Timestamp> PartDeletes<T> {
1413    // Adds the part to the set to be deleted and returns true if it was newly
1414    // inserted.
1415    pub fn add(&mut self, part: &RunPart<T>) -> bool {
1416        match part {
1417            RunPart::Many(r) => self.hollow_runs.insert(r.key.clone(), r.clone()).is_none(),
1418            RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.insert(x.key.clone()),
1419            RunPart::Single(BatchPart::Inline { .. }) => {
1420                // Nothing to delete.
1421                true
1422            }
1423        }
1424    }
1425
1426    pub fn contains(&self, part: &RunPart<T>) -> bool {
1427        match part {
1428            RunPart::Many(r) => self.hollow_runs.contains_key(&r.key),
1429            RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.contains(&x.key),
1430            RunPart::Single(BatchPart::Inline { .. }) => false,
1431        }
1432    }
1433
1434    pub fn is_empty(&self) -> bool {
1435        self.len() == 0
1436    }
1437
1438    pub fn len(&self) -> usize {
1439        match self {
1440            Self {
1441                blob_keys,
1442                hollow_runs,
1443            } => blob_keys.len() + hollow_runs.len(),
1444        }
1445    }
1446
1447    pub async fn delete(
1448        mut self,
1449        blob: &dyn Blob,
1450        shard_id: ShardId,
1451        concurrency: usize,
1452        metrics: &Metrics,
1453        delete_metrics: &RetryMetrics,
1454    ) where
1455        T: Codec64,
1456    {
1457        loop {
1458            let () = stream::iter(mem::take(&mut self.blob_keys))
1459                .map(|key| {
1460                    let key = key.complete(&shard_id);
1461                    async move {
1462                        retry_external(delete_metrics, || blob.delete(&key)).await;
1463                    }
1464                })
1465                .buffer_unordered(concurrency)
1466                .collect()
1467                .await;
1468
1469            let Some((run_key, run_ref)) = self.hollow_runs.pop_first() else {
1470                break;
1471            };
1472
1473            if let Some(run) = run_ref.get(shard_id, blob, metrics).await {
1474                // Queue up both all the individual parts and the run itself for deletion.
1475                for part in &run.parts {
1476                    self.add(part);
1477                }
1478                self.blob_keys.insert(run_key);
1479            };
1480        }
1481    }
1482}
1483
1484/// Returns the total sum of diffs or None if there were no updates.
1485fn diffs_sum<D: Monoid + Codec64>(updates: &Int64Array) -> D {
1486    let mut sum = D::zero();
1487    for d in updates.values().iter() {
1488        let d = D::decode(d.to_le_bytes());
1489        sum.plus_equals(&d);
1490    }
1491    sum
1492}
1493
1494#[cfg(test)]
1495mod tests {
1496    use mz_dyncfg::ConfigUpdates;
1497
1498    use super::*;
1499    use crate::PersistLocation;
1500    use crate::cache::PersistClientCache;
1501    use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
1502    use crate::internal::paths::{BlobKey, PartialBlobKey};
1503    use crate::tests::{all_ok, new_test_client};
1504
1505    #[mz_ore::test(tokio::test)]
1506    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1507    async fn batch_builder_flushing() {
1508        let data = vec![
1509            (("1".to_owned(), "one".to_owned()), 1, 1),
1510            (("2".to_owned(), "two".to_owned()), 2, 1),
1511            (("3".to_owned(), "three".to_owned()), 3, 1),
1512            (("4".to_owned(), "four".to_owned()), 4, 1),
1513        ];
1514
1515        let cache = PersistClientCache::new_no_metrics();
1516
1517        // Set blob_target_size to 0 so that each row gets forced into its own
1518        // batch. Set max_outstanding to a small value that's >1 to test various
1519        // edge cases below.
1520        cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1521        cache.cfg.set_config(&MAX_RUNS, 3);
1522        cache
1523            .cfg
1524            .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 2);
1525
1526        let client = cache
1527            .open(PersistLocation::new_in_mem())
1528            .await
1529            .expect("client construction failed");
1530        let (mut write, mut read) = client
1531            .expect_open::<String, String, u64, i64>(ShardId::new())
1532            .await;
1533
1534        // A new builder has no writing or finished parts.
1535        let mut builder = write.builder(Antichain::from_elem(0));
1536
1537        fn assert_writing(
1538            builder: &BatchBuilder<String, String, u64, i64>,
1539            expected_finished: &[bool],
1540        ) {
1541            let WritingRuns::Compacting(run) = &builder.builder.parts.writing_runs else {
1542                unreachable!("ordered run!")
1543            };
1544
1545            let actual: Vec<_> = run.iter().map(|(_, p)| p.is_finished()).collect();
1546            assert_eq!(*expected_finished, actual);
1547        }
1548
1549        assert_writing(&builder, &[]);
1550
1551        // We set blob_target_size to 0, so the first update gets forced out
1552        // into a run.
1553        let ((k, v), t, d) = &data[0];
1554        builder.add(k, v, t, d).await.expect("invalid usage");
1555        assert_writing(&builder, &[false]);
1556
1557        // We set batch_builder_max_outstanding_parts to 2, so we are allowed to
1558        // pipeline a second part.
1559        let ((k, v), t, d) = &data[1];
1560        builder.add(k, v, t, d).await.expect("invalid usage");
1561        assert_writing(&builder, &[false, false]);
1562
1563        // But now that we have 3 parts, the add call back-pressures until the
1564        // first one finishes.
1565        let ((k, v), t, d) = &data[2];
1566        builder.add(k, v, t, d).await.expect("invalid usage");
1567        assert_writing(&builder, &[true, false, false]);
1568
1569        // Finally, pushing a fourth part will cause the first three to spill out into
1570        // a new compacted run.
1571        let ((k, v), t, d) = &data[3];
1572        builder.add(k, v, t, d).await.expect("invalid usage");
1573        assert_writing(&builder, &[false, false]);
1574
1575        // Finish off the batch and verify that the keys and such get plumbed
1576        // correctly by reading the data back.
1577        let batch = builder
1578            .finish(Antichain::from_elem(5))
1579            .await
1580            .expect("invalid usage");
1581        assert_eq!(batch.batch.runs().count(), 2);
1582        assert_eq!(batch.batch.part_count(), 4);
1583        write
1584            .append_batch(batch, Antichain::from_elem(0), Antichain::from_elem(5))
1585            .await
1586            .expect("invalid usage")
1587            .expect("unexpected upper");
1588        assert_eq!(read.expect_snapshot_and_fetch(4).await, all_ok(&data, 4));
1589    }
1590
1591    #[mz_ore::test(tokio::test)]
1592    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1593    async fn batch_builder_keys() {
1594        let cache = PersistClientCache::new_no_metrics();
1595        // Set blob_target_size to 0 so that each row gets forced into its own batch part
1596        cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1597        // Otherwise fails: expected hollow part!
1598        cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
1599        cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1600        cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1601        let client = cache
1602            .open(PersistLocation::new_in_mem())
1603            .await
1604            .expect("client construction failed");
1605        let shard_id = ShardId::new();
1606        let (mut write, _) = client
1607            .expect_open::<String, String, u64, i64>(shard_id)
1608            .await;
1609
1610        let batch = write
1611            .expect_batch(
1612                &[
1613                    (("1".into(), "one".into()), 1, 1),
1614                    (("2".into(), "two".into()), 2, 1),
1615                    (("3".into(), "three".into()), 3, 1),
1616                ],
1617                0,
1618                4,
1619            )
1620            .await;
1621
1622        assert_eq!(batch.batch.part_count(), 3);
1623        for part in &batch.batch.parts {
1624            let part = part.expect_hollow_part();
1625            match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
1626                Ok((shard, PartialBlobKey::Batch(writer, _))) => {
1627                    assert_eq!(shard.to_string(), shard_id.to_string());
1628                    assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
1629                }
1630                _ => panic!("unparseable blob key"),
1631            }
1632        }
1633    }
1634
1635    #[mz_ore::test(tokio::test)]
1636    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1637    async fn batch_delete() {
1638        let cache = PersistClientCache::new_no_metrics();
1639        cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1640        cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1641        cache.cfg.set_config(&BATCH_DELETE_ENABLED, true);
1642        let client = cache
1643            .open(PersistLocation::new_in_mem())
1644            .await
1645            .expect("client construction failed");
1646        let shard_id = ShardId::new();
1647        let (mut write, _) = client
1648            .expect_open::<String, String, u64, i64>(shard_id)
1649            .await;
1650
1651        let batch = write
1652            .expect_batch(
1653                &[
1654                    (("1".into(), "one".into()), 1, 1),
1655                    (("2".into(), "two".into()), 2, 1),
1656                    (("3".into(), "three".into()), 3, 1),
1657                ],
1658                0,
1659                4,
1660            )
1661            .await;
1662
1663        assert_eq!(batch.batch.part_count(), 1);
1664        let part_key = batch.batch.parts[0]
1665            .expect_hollow_part()
1666            .key
1667            .complete(&shard_id);
1668
1669        let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1670        assert!(part_bytes.is_some());
1671
1672        batch.delete().await;
1673
1674        let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1675        assert!(part_bytes.is_none());
1676    }
1677
1678    #[mz_ore::test]
1679    fn untrimmable_columns() {
1680        let untrimmable = UntrimmableColumns {
1681            equals: vec!["abc".into(), "def".into()],
1682            prefixes: vec!["123".into(), "234".into()],
1683            suffixes: vec!["xyz".into()],
1684        };
1685
1686        // equals
1687        assert!(untrimmable.should_retain("abc"));
1688        assert!(untrimmable.should_retain("ABC"));
1689        assert!(untrimmable.should_retain("aBc"));
1690        assert!(!untrimmable.should_retain("abcd"));
1691        assert!(untrimmable.should_retain("deF"));
1692        assert!(!untrimmable.should_retain("defg"));
1693
1694        // prefix
1695        assert!(untrimmable.should_retain("123"));
1696        assert!(untrimmable.should_retain("123-4"));
1697        assert!(untrimmable.should_retain("1234"));
1698        assert!(untrimmable.should_retain("234"));
1699        assert!(!untrimmable.should_retain("345"));
1700
1701        // suffix
1702        assert!(untrimmable.should_retain("ijk_xyZ"));
1703        assert!(untrimmable.should_retain("ww-XYZ"));
1704        assert!(!untrimmable.should_retain("xya"));
1705    }
1706
1707    // NB: Most edge cases are exercised in datadriven tests.
1708    #[mz_persist_proc::test(tokio::test)]
1709    #[cfg_attr(miri, ignore)] // too slow
1710    async fn rewrite_ts_example(dyncfgs: ConfigUpdates) {
1711        let client = new_test_client(&dyncfgs).await;
1712        let (mut write, read) = client
1713            .expect_open::<String, (), u64, i64>(ShardId::new())
1714            .await;
1715
1716        let mut batch = write.builder(Antichain::from_elem(0));
1717        batch.add(&"foo".to_owned(), &(), &0, &1).await.unwrap();
1718        let batch = batch.finish(Antichain::from_elem(1)).await.unwrap();
1719
1720        // Roundtrip through a transmittable batch.
1721        let batch = batch.into_transmittable_batch();
1722        let mut batch = write.batch_from_transmittable_batch(batch);
1723        batch
1724            .rewrite_ts(&Antichain::from_elem(2), Antichain::from_elem(3))
1725            .unwrap();
1726        write
1727            .expect_compare_and_append_batch(&mut [&mut batch], 0, 3)
1728            .await;
1729
1730        let (actual, _) = read.expect_listen(0).await.read_until(&3).await;
1731        let expected = vec![(((Ok("foo".to_owned())), Ok(())), 2, 1)];
1732        assert_eq!(actual, expected);
1733    }
1734
1735    #[mz_ore::test(tokio::test)]
1736    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1737    async fn structured_lowers() {
1738        let cache = PersistClientCache::new_no_metrics();
1739        // Ensure structured data is calculated, and that we give some budget for a key lower.
1740        cache.cfg().set_config(&STRUCTURED_KEY_LOWER_LEN, 1024);
1741        // Otherwise fails: expected hollow part!
1742        cache.cfg().set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1743        cache.cfg().set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1744        let client = cache
1745            .open(PersistLocation::new_in_mem())
1746            .await
1747            .expect("client construction failed");
1748        let shard_id = ShardId::new();
1749        let (mut write, _) = client
1750            .expect_open::<String, String, u64, i64>(shard_id)
1751            .await;
1752
1753        let batch = write
1754            .expect_batch(
1755                &[
1756                    (("1".into(), "one".into()), 1, 1),
1757                    (("2".into(), "two".into()), 2, 1),
1758                    (("3".into(), "three".into()), 3, 1),
1759                ],
1760                0,
1761                4,
1762            )
1763            .await;
1764
1765        assert_eq!(batch.batch.part_count(), 1);
1766        let [part] = batch.batch.parts.as_slice() else {
1767            panic!("expected single part")
1768        };
1769        // Verifies that the structured key lower is stored and decoded.
1770        assert!(part.structured_key_lower().is_some());
1771    }
1772}