mz_persist_client/
batch.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A handle to a batch of updates
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::marker::PhantomData;
16use std::mem;
17use std::sync::Arc;
18use std::time::Instant;
19
20use arrow::array::{Array, Int64Array};
21use bytes::Bytes;
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::stream::StreamExt;
26use futures_util::{FutureExt, stream};
27use mz_dyncfg::Config;
28use mz_ore::cast::CastFrom;
29use mz_ore::instrument;
30use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::Blob;
32use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
33use mz_persist_types::columnar::{ColumnDecoder, Schema};
34use mz_persist_types::parquet::{CompressionFormat, EncodingConfig};
35use mz_persist_types::part::{Part, PartBuilder};
36use mz_persist_types::schema::SchemaId;
37use mz_persist_types::stats::{
38    PartStats, TRUNCATE_LEN, TruncateBound, trim_to_budget, truncate_bytes,
39};
40use mz_persist_types::{Codec, Codec64};
41use mz_proto::RustType;
42use mz_timely_util::order::Reverse;
43use proptest_derive::Arbitrary;
44use semver::Version;
45use timely::PartialOrder;
46use timely::order::TotalOrder;
47use timely::progress::{Antichain, Timestamp};
48use tracing::{Instrument, debug_span, trace_span, warn};
49
50use crate::async_runtime::IsolatedRuntime;
51use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
52use crate::error::InvalidUsage;
53use crate::internal::compact::{CompactConfig, Compactor};
54use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
55use crate::internal::machine::retry_external;
56use crate::internal::merge::{MergeTree, Pending};
57use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
58use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
59use crate::internal::state::{
60    BatchPart, ENABLE_INCREMENTAL_COMPACTION, HollowBatch, HollowBatchPart, HollowRun,
61    HollowRunRef, ProtoInlineBatchPart, RunId, RunMeta, RunOrder, RunPart,
62};
63use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns};
64use crate::{PersistConfig, ShardId};
65
66include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs"));
67
68/// A handle to a batch of updates that has been written to blob storage but
69/// which has not yet been appended to a shard.
70///
71/// A [Batch] needs to be marked as consumed or it needs to be deleted via [Self::delete].
72/// Otherwise, a dangling batch will leak and backing blobs will remain in blob storage.
73#[derive(Debug)]
74pub struct Batch<K, V, T, D> {
75    pub(crate) batch_delete_enabled: bool,
76    pub(crate) metrics: Arc<Metrics>,
77    pub(crate) shard_metrics: Arc<ShardMetrics>,
78
79    /// The version of Materialize which wrote this batch.
80    pub(crate) version: Version,
81
82    /// A handle to the data represented by this batch.
83    pub(crate) batch: HollowBatch<T>,
84
85    /// Handle to the [Blob] that the blobs of this batch were uploaded to.
86    pub(crate) blob: Arc<dyn Blob>,
87
88    // These provide a bit more safety against appending a batch with the wrong
89    // type to a shard.
90    pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
91}
92
93impl<K, V, T, D> Drop for Batch<K, V, T, D> {
94    fn drop(&mut self) {
95        if self.batch.part_count() > 0 {
96            warn!(
97                "un-consumed Batch, with {} parts and dangling blob keys: {:?}",
98                self.batch.part_count(),
99                self.batch
100                    .parts
101                    .iter()
102                    .map(|x| x.printable_name())
103                    .collect::<Vec<_>>(),
104            );
105        }
106    }
107}
108
109impl<K, V, T, D> Batch<K, V, T, D>
110where
111    K: Debug + Codec,
112    V: Debug + Codec,
113    T: Timestamp + Lattice + Codec64,
114    D: Semigroup + Codec64,
115{
116    pub(crate) fn new(
117        batch_delete_enabled: bool,
118        metrics: Arc<Metrics>,
119        blob: Arc<dyn Blob>,
120        shard_metrics: Arc<ShardMetrics>,
121        version: Version,
122        batch: HollowBatch<T>,
123    ) -> Self {
124        Self {
125            batch_delete_enabled,
126            metrics,
127            shard_metrics,
128            version,
129            batch,
130            blob,
131            _phantom: PhantomData,
132        }
133    }
134
135    /// The `shard_id` of this [Batch].
136    pub fn shard_id(&self) -> ShardId {
137        self.shard_metrics.shard_id
138    }
139
140    /// The `upper` of this [Batch].
141    pub fn upper(&self) -> &Antichain<T> {
142        self.batch.desc.upper()
143    }
144
145    /// The `lower` of this [Batch].
146    pub fn lower(&self) -> &Antichain<T> {
147        self.batch.desc.lower()
148    }
149
150    /// Marks the blobs that this batch handle points to as consumed, likely
151    /// because they were appended to a shard.
152    ///
153    /// Consumers of a blob need to make this explicit, so that we can log
154    /// warnings in case a batch is not used.
155    pub(crate) fn mark_consumed(&mut self) {
156        self.batch.parts.clear();
157    }
158
159    /// Deletes the blobs that make up this batch from the given blob store and
160    /// marks them as deleted.
161    #[instrument(level = "debug", fields(shard = %self.shard_id()))]
162    pub async fn delete(mut self) {
163        if !self.batch_delete_enabled {
164            self.mark_consumed();
165            return;
166        }
167        let mut deletes = PartDeletes::default();
168        for part in self.batch.parts.drain(..) {
169            deletes.add(&part);
170        }
171        let () = deletes
172            .delete(
173                &*self.blob,
174                self.shard_id(),
175                usize::MAX,
176                &*self.metrics,
177                &*self.metrics.retries.external.batch_delete,
178            )
179            .await;
180    }
181
182    /// Returns the schemas of parts in this batch.
183    pub fn schemas(&self) -> impl Iterator<Item = SchemaId> + '_ {
184        self.batch.parts.iter().flat_map(|b| b.schema_id())
185    }
186
187    /// Turns this [`Batch`] into a `HollowBatch`.
188    ///
189    /// **NOTE**: If this batch is not eventually appended to a shard or
190    /// dropped, the data that it represents will have leaked.
191    pub fn into_hollow_batch(mut self) -> HollowBatch<T> {
192        let ret = self.batch.clone();
193        self.mark_consumed();
194        ret
195    }
196
197    /// Turns this [`Batch`] into a [`ProtoBatch`], which can be used to
198    /// transfer this batch across process boundaries, for example when
199    /// exchanging data between timely workers.
200    ///
201    /// **NOTE**: If this batch is not eventually appended to a shard or
202    /// dropped, the data that it represents will have leaked. The caller is
203    /// responsible for turning this back into a [`Batch`] using
204    /// [`WriteHandle::batch_from_transmittable_batch`](crate::write::WriteHandle::batch_from_transmittable_batch).
205    pub fn into_transmittable_batch(mut self) -> ProtoBatch {
206        let ret = ProtoBatch {
207            shard_id: self.shard_metrics.shard_id.into_proto(),
208            version: self.version.to_string(),
209            batch: Some(self.batch.into_proto()),
210        };
211        self.mark_consumed();
212        ret
213    }
214
215    pub(crate) async fn flush_to_blob(
216        &mut self,
217        cfg: &BatchBuilderConfig,
218        batch_metrics: &BatchWriteMetrics,
219        isolated_runtime: &Arc<IsolatedRuntime>,
220        write_schemas: &Schemas<K, V>,
221    ) {
222        // It's necessary for correctness to keep the parts in the same order.
223        // We could introduce concurrency here with FuturesOrdered, but it would
224        // be pretty unexpected to have inline writes in more than one part, so
225        // don't bother.
226        let mut parts = Vec::new();
227        for (run_meta, run_parts) in self.batch.runs() {
228            for part in run_parts {
229                let (updates, ts_rewrite, schema_id) = match part {
230                    RunPart::Single(BatchPart::Inline {
231                        updates,
232                        ts_rewrite,
233                        schema_id,
234                        deprecated_schema_id: _,
235                    }) => (updates, ts_rewrite, schema_id),
236                    other @ RunPart::Many(_) | other @ RunPart::Single(BatchPart::Hollow(_)) => {
237                        parts.push(other.clone());
238                        continue;
239                    }
240                };
241                let updates = updates
242                    .decode::<T>(&self.metrics.columnar)
243                    .expect("valid inline part");
244                let diffs_sum =
245                    diffs_sum::<D>(updates.updates.diffs()).expect("inline parts are not empty");
246                let mut write_schemas = write_schemas.clone();
247                write_schemas.id = *schema_id;
248
249                let write_span =
250                    debug_span!("batch::flush_to_blob", shard = %self.shard_metrics.shard_id)
251                        .or_current();
252                let handle = mz_ore::task::spawn(
253                    || "batch::flush_to_blob",
254                    BatchParts::write_hollow_part(
255                        cfg.clone(),
256                        Arc::clone(&self.blob),
257                        Arc::clone(&self.metrics),
258                        Arc::clone(&self.shard_metrics),
259                        batch_metrics.clone(),
260                        Arc::clone(isolated_runtime),
261                        updates,
262                        run_meta.order.unwrap_or(RunOrder::Unordered),
263                        ts_rewrite.clone(),
264                        D::encode(&diffs_sum),
265                        write_schemas,
266                    )
267                    .instrument(write_span),
268                );
269                let part = handle.await.expect("part write task failed");
270                parts.push(RunPart::Single(part));
271            }
272        }
273        self.batch.parts = parts;
274    }
275
276    /// The sum of the encoded sizes of all parts in the batch.
277    pub fn encoded_size_bytes(&self) -> usize {
278        self.batch.encoded_size_bytes()
279    }
280}
281
282impl<K, V, T, D> Batch<K, V, T, D>
283where
284    K: Debug + Codec,
285    V: Debug + Codec,
286    T: Timestamp + Lattice + Codec64 + TotalOrder,
287    D: Semigroup + Codec64,
288{
289    /// Efficiently rewrites the timestamps in this not-yet-committed batch.
290    ///
291    /// This [Batch] represents potentially large amounts of data, which may
292    /// have partly or entirely been spilled to s3. This call bulk edits the
293    /// timestamps of all data in this batch in a metadata-only operation (i.e.
294    /// without network calls).
295    ///
296    /// Specifically, every timestamp in the batch is logically advanced_by the
297    /// provided `frontier`.
298    ///
299    /// This method may be called multiple times, with later calls overriding
300    /// previous ones, but the rewrite frontier may not regress across calls.
301    ///
302    /// When this batch was created, it was given an `upper`, which bounds the
303    /// staged data it represents. To allow rewrite past this original `upper`,
304    /// this call accepts a new `upper` which replaces the previous one. Like
305    /// the rewrite frontier, the upper may not regress across calls.
306    ///
307    /// Multiple batches with various rewrite frontiers may be used in a single
308    /// [crate::write::WriteHandle::compare_and_append_batch] call. This is an
309    /// expected usage.
310    ///
311    /// This feature requires that the timestamp impls `TotalOrder`. This is
312    /// because we need to be able to verify that the contained data, after the
313    /// rewrite forward operation, still respects the new upper. It turns out
314    /// that, given the metadata persist currently collects during batch
315    /// collection, this is possible for totally ordered times, but it's known
316    /// to be _not possible_ for partially ordered times. It is believed that we
317    /// could fix this by collecting different metadata in batch creation (e.g.
318    /// the join of or an antichain of the original contained timestamps), but
319    /// the experience of database-issues#7825 has shaken our confidence in our own abilities
320    /// to reason about partially ordered times and anyway all the initial uses
321    /// have totally ordered times.
322    pub fn rewrite_ts(
323        &mut self,
324        frontier: &Antichain<T>,
325        new_upper: Antichain<T>,
326    ) -> Result<(), InvalidUsage<T>> {
327        self.batch
328            .rewrite_ts(frontier, new_upper)
329            .map_err(InvalidUsage::InvalidRewrite)
330    }
331}
332
333/// Indicates what work was done in a call to [BatchBuilder::add]
334#[derive(Debug)]
335pub enum Added {
336    /// A record was inserted into a pending batch part
337    Record,
338    /// A record was inserted into a pending batch part
339    /// and the part was sent to blob storage
340    RecordAndParts,
341}
342
343/// A snapshot of dynamic configs to make it easier to reason about an individual
344/// run of BatchBuilder.
345#[derive(Debug, Clone)]
346pub struct BatchBuilderConfig {
347    writer_key: WriterKey,
348    pub(crate) blob_target_size: usize,
349    pub(crate) batch_delete_enabled: bool,
350    pub(crate) batch_builder_max_outstanding_parts: usize,
351    pub(crate) inline_writes_single_max_bytes: usize,
352    pub(crate) stats_collection_enabled: bool,
353    pub(crate) stats_budget: usize,
354    pub(crate) stats_untrimmable_columns: Arc<UntrimmableColumns>,
355    pub(crate) encoding_config: EncodingConfig,
356    pub(crate) preferred_order: RunOrder,
357    pub(crate) structured_key_lower_len: usize,
358    pub(crate) run_length_limit: usize,
359    pub(crate) enable_incremental_compaction: bool,
360    /// The number of runs to cap the built batch at, or None if we should
361    /// continue to generate one run per part for unordered batches.
362    /// See the config definition for details.
363    pub(crate) max_runs: Option<usize>,
364}
365
366// TODO: Remove this once we're comfortable that there aren't any bugs.
367pub(crate) const BATCH_DELETE_ENABLED: Config<bool> = Config::new(
368    "persist_batch_delete_enabled",
369    true,
370    "Whether to actually delete blobs when batch delete is called (Materialize).",
371);
372
373pub(crate) const ENCODING_ENABLE_DICTIONARY: Config<bool> = Config::new(
374    "persist_encoding_enable_dictionary",
375    true,
376    "A feature flag to enable dictionary encoding for Parquet data (Materialize).",
377);
378
379pub(crate) const ENCODING_COMPRESSION_FORMAT: Config<&'static str> = Config::new(
380    "persist_encoding_compression_format",
381    "none",
382    "A feature flag to enable compression of Parquet data (Materialize).",
383);
384
385pub(crate) const STRUCTURED_KEY_LOWER_LEN: Config<usize> = Config::new(
386    "persist_batch_structured_key_lower_len",
387    256,
388    "The maximum size in proto bytes of any structured key-lower metadata to preserve. \
389    (If we're unable to fit the lower in budget, or the budget is zero, no metadata is kept.)",
390);
391
392pub(crate) const MAX_RUN_LEN: Config<usize> = Config::new(
393    "persist_batch_max_run_len",
394    usize::MAX,
395    "The maximum length a run can have before it will be spilled as a hollow run \
396    into the blob store.",
397);
398
399pub(crate) const MAX_RUNS: Config<usize> = Config::new(
400    "persist_batch_max_runs",
401    1,
402    "The maximum number of runs a batch builder should generate for user batches. \
403    (Compaction outputs always generate a single run.) \
404    The minimum value is 2; below this, compaction is disabled.",
405);
406
407/// A target maximum size of blob payloads in bytes. If a logical "batch" is
408/// bigger than this, it will be broken up into smaller, independent pieces.
409/// This is best-effort, not a guarantee (though as of 2022-06-09, we happen to
410/// always respect it). This target size doesn't apply for an individual update
411/// that exceeds it in size, but that scenario is almost certainly a mis-use of
412/// the system.
413pub(crate) const BLOB_TARGET_SIZE: Config<usize> = Config::new(
414    "persist_blob_target_size",
415    128 * MiB,
416    "A target maximum size of persist blob payloads in bytes (Materialize).",
417);
418
419pub(crate) const INLINE_WRITES_SINGLE_MAX_BYTES: Config<usize> = Config::new(
420    "persist_inline_writes_single_max_bytes",
421    4096,
422    "The (exclusive) maximum size of a write that persist will inline in metadata.",
423);
424
425pub(crate) const INLINE_WRITES_TOTAL_MAX_BYTES: Config<usize> = Config::new(
426    "persist_inline_writes_total_max_bytes",
427    1 * MiB,
428    "\
429    The (exclusive) maximum total size of inline writes in metadata before \
430    persist will backpressure them by flushing out to s3.",
431);
432
433impl BatchBuilderConfig {
434    /// Initialize a batch builder config based on a snapshot of the Persist config.
435    pub fn new(value: &PersistConfig, _shard_id: ShardId) -> Self {
436        let writer_key = WriterKey::for_version(&value.build_version);
437
438        let preferred_order = RunOrder::Structured;
439
440        BatchBuilderConfig {
441            writer_key,
442            blob_target_size: BLOB_TARGET_SIZE.get(value).clamp(1, usize::MAX),
443            batch_delete_enabled: BATCH_DELETE_ENABLED.get(value),
444            batch_builder_max_outstanding_parts: BATCH_BUILDER_MAX_OUTSTANDING_PARTS.get(value),
445            inline_writes_single_max_bytes: INLINE_WRITES_SINGLE_MAX_BYTES.get(value),
446            stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value),
447            stats_budget: STATS_BUDGET_BYTES.get(value),
448            stats_untrimmable_columns: Arc::new(untrimmable_columns(value)),
449            encoding_config: EncodingConfig {
450                use_dictionary: ENCODING_ENABLE_DICTIONARY.get(value),
451                compression: CompressionFormat::from_str(&ENCODING_COMPRESSION_FORMAT.get(value)),
452            },
453            preferred_order,
454            structured_key_lower_len: STRUCTURED_KEY_LOWER_LEN.get(value),
455            run_length_limit: MAX_RUN_LEN.get(value).clamp(2, usize::MAX),
456            max_runs: match MAX_RUNS.get(value) {
457                limit @ 2.. => Some(limit),
458                _ => None,
459            },
460            enable_incremental_compaction: ENABLE_INCREMENTAL_COMPACTION.get(value),
461        }
462    }
463}
464
465/// A list of (lowercase) column names that persist will always retain
466/// stats for, even if it means going over the stats budget.
467#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Arbitrary)]
468pub(crate) struct UntrimmableColumns {
469    /// Always retain columns whose lowercased names exactly equal any of these strings.
470    pub equals: Vec<Cow<'static, str>>,
471    /// Always retain columns whose lowercased names start with any of these strings.
472    pub prefixes: Vec<Cow<'static, str>>,
473    /// Always retain columns whose lowercased names end with any of these strings.
474    pub suffixes: Vec<Cow<'static, str>>,
475}
476
477impl UntrimmableColumns {
478    pub(crate) fn should_retain(&self, name: &str) -> bool {
479        // TODO: see if there's a better way to match different formats than lowercasing
480        // https://github.com/MaterializeInc/database-issues/issues/6421#issue-1863623805
481        let name_lower = name.to_lowercase();
482        for s in &self.equals {
483            if *s == name_lower {
484                return true;
485            }
486        }
487        for s in &self.prefixes {
488            if name_lower.starts_with(s.as_ref()) {
489                return true;
490            }
491        }
492        for s in &self.suffixes {
493            if name_lower.ends_with(s.as_ref()) {
494                return true;
495            }
496        }
497        false
498    }
499}
500
501/// A builder for [Batches](Batch) that allows adding updates piece by piece and
502/// then finishing it.
503#[derive(Debug)]
504pub struct BatchBuilder<K, V, T, D>
505where
506    K: Codec,
507    V: Codec,
508    T: Timestamp + Lattice + Codec64,
509{
510    inline_desc: Description<T>,
511    inclusive_upper: Antichain<Reverse<T>>,
512
513    records_builder: PartBuilder<K, K::Schema, V, V::Schema>,
514    pub(crate) builder: BatchBuilderInternal<K, V, T, D>,
515}
516
517impl<K, V, T, D> BatchBuilder<K, V, T, D>
518where
519    K: Debug + Codec,
520    V: Debug + Codec,
521    T: Timestamp + Lattice + Codec64,
522    D: Semigroup + Codec64,
523{
524    pub(crate) fn new(
525        builder: BatchBuilderInternal<K, V, T, D>,
526        inline_desc: Description<T>,
527    ) -> Self {
528        let records_builder = PartBuilder::new(
529            builder.write_schemas.key.as_ref(),
530            builder.write_schemas.val.as_ref(),
531        );
532        Self {
533            inline_desc,
534            inclusive_upper: Antichain::new(),
535            records_builder,
536            builder,
537        }
538    }
539
540    /// Finish writing this batch and return a handle to the written batch.
541    ///
542    /// This fails if any of the updates in this batch are beyond the given
543    /// `upper`.
544    pub async fn finish(
545        mut self,
546        registered_upper: Antichain<T>,
547    ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
548        if PartialOrder::less_than(&registered_upper, self.inline_desc.lower()) {
549            return Err(InvalidUsage::InvalidBounds {
550                lower: self.inline_desc.lower().clone(),
551                upper: registered_upper,
552            });
553        }
554
555        // When since is less than or equal to lower, the upper is a strict bound
556        // on the updates' timestamp because no advancement has been performed. Because user batches
557        // are always unadvanced, this ensures that new updates are recorded with valid timestamps.
558        // Otherwise, we can make no assumptions about the timestamps
559        if PartialOrder::less_equal(self.inline_desc.since(), self.inline_desc.lower()) {
560            for ts in self.inclusive_upper.iter() {
561                if registered_upper.less_equal(&ts.0) {
562                    return Err(InvalidUsage::UpdateBeyondUpper {
563                        ts: ts.0.clone(),
564                        expected_upper: registered_upper.clone(),
565                    });
566                }
567            }
568        }
569
570        let updates = self.records_builder.finish();
571        self.builder
572            .flush_part(self.inline_desc.clone(), updates)
573            .await;
574
575        self.builder
576            .finish(Description::new(
577                self.inline_desc.lower().clone(),
578                registered_upper,
579                self.inline_desc.since().clone(),
580            ))
581            .await
582    }
583
584    /// Adds the given update to the batch.
585    ///
586    /// The update timestamp must be greater or equal to `lower` that was given
587    /// when creating this [BatchBuilder].
588    pub async fn add(
589        &mut self,
590        key: &K,
591        val: &V,
592        ts: &T,
593        diff: &D,
594    ) -> Result<Added, InvalidUsage<T>> {
595        if !self.inline_desc.lower().less_equal(ts) {
596            return Err(InvalidUsage::UpdateNotBeyondLower {
597                ts: ts.clone(),
598                lower: self.inline_desc.lower().clone(),
599            });
600        }
601        self.inclusive_upper.insert(Reverse(ts.clone()));
602
603        let added = {
604            self.records_builder
605                .push(key, val, ts.clone(), diff.clone());
606            if self.records_builder.goodbytes() >= self.builder.parts.cfg.blob_target_size {
607                let part = self.records_builder.finish_and_replace(
608                    self.builder.write_schemas.key.as_ref(),
609                    self.builder.write_schemas.val.as_ref(),
610                );
611                Some(part)
612            } else {
613                None
614            }
615        };
616
617        let added = if let Some(full_batch) = added {
618            self.builder
619                .flush_part(self.inline_desc.clone(), full_batch)
620                .await;
621            Added::RecordAndParts
622        } else {
623            Added::Record
624        };
625        Ok(added)
626    }
627}
628
629#[derive(Debug)]
630pub(crate) struct BatchBuilderInternal<K, V, T, D>
631where
632    K: Codec,
633    V: Codec,
634    T: Timestamp + Lattice + Codec64,
635{
636    shard_id: ShardId,
637    version: Version,
638    blob: Arc<dyn Blob>,
639    metrics: Arc<Metrics>,
640
641    write_schemas: Schemas<K, V>,
642    parts: BatchParts<T>,
643
644    // These provide a bit more safety against appending a batch with the wrong
645    // type to a shard.
646    _phantom: PhantomData<fn(K, V, T, D)>,
647}
648
649impl<K, V, T, D> BatchBuilderInternal<K, V, T, D>
650where
651    K: Debug + Codec,
652    V: Debug + Codec,
653    T: Timestamp + Lattice + Codec64,
654    D: Semigroup + Codec64,
655{
656    pub(crate) fn new(
657        _cfg: BatchBuilderConfig,
658        parts: BatchParts<T>,
659        metrics: Arc<Metrics>,
660        write_schemas: Schemas<K, V>,
661        blob: Arc<dyn Blob>,
662        shard_id: ShardId,
663        version: Version,
664    ) -> Self {
665        Self {
666            blob,
667            metrics,
668            write_schemas,
669            parts,
670            shard_id,
671            version,
672            _phantom: PhantomData,
673        }
674    }
675
676    /// Finish writing this batch and return a handle to the written batch.
677    ///
678    /// This fails if any of the updates in this batch are beyond the given
679    /// `upper`.
680    #[instrument(level = "debug", name = "batch::finish", fields(shard = %self.shard_id))]
681    pub async fn finish(
682        self,
683        registered_desc: Description<T>,
684    ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
685        let write_run_ids = self.parts.cfg.enable_incremental_compaction;
686        let batch_delete_enabled = self.parts.cfg.batch_delete_enabled;
687        let shard_metrics = Arc::clone(&self.parts.shard_metrics);
688        let runs = self.parts.finish().await;
689
690        let mut run_parts = vec![];
691        let mut run_splits = vec![];
692        let mut run_meta = vec![];
693        let total_updates = runs
694            .iter()
695            .map(|(_, _, num_updates)| num_updates)
696            .sum::<usize>();
697        for (order, parts, num_updates) in runs {
698            if parts.is_empty() {
699                continue;
700            }
701            if run_parts.len() != 0 {
702                run_splits.push(run_parts.len());
703            }
704            run_meta.push(RunMeta {
705                order: Some(order),
706                schema: self.write_schemas.id,
707                // Field has been deprecated but kept around to roundtrip state.
708                deprecated_schema: None,
709                id: if write_run_ids {
710                    Some(RunId::new())
711                } else {
712                    None
713                },
714                len: if write_run_ids {
715                    Some(num_updates)
716                } else {
717                    None
718                },
719            });
720            run_parts.extend(parts);
721        }
722        let desc = registered_desc;
723
724        let batch = Batch::new(
725            batch_delete_enabled,
726            Arc::clone(&self.metrics),
727            self.blob,
728            shard_metrics,
729            self.version,
730            HollowBatch::new(desc, run_parts, total_updates, run_meta, run_splits),
731        );
732
733        Ok(batch)
734    }
735
736    /// Flushes the current part to Blob storage, first consolidating and then
737    /// columnar encoding the updates. It is the caller's responsibility to
738    /// chunk `current_part` to be no greater than
739    /// [BatchBuilderConfig::blob_target_size], and must absolutely be less than
740    /// [mz_persist::indexed::columnar::KEY_VAL_DATA_MAX_LEN]
741    pub async fn flush_part(&mut self, part_desc: Description<T>, columnar: Part) {
742        let num_updates = columnar.len();
743        if num_updates == 0 {
744            return;
745        }
746        let diffs_sum = diffs_sum::<D>(&columnar.diff).expect("part is non empty");
747
748        let start = Instant::now();
749        self.parts
750            .write(&self.write_schemas, part_desc, columnar, diffs_sum)
751            .await;
752        self.metrics
753            .compaction
754            .batch
755            .step_part_writing
756            .inc_by(start.elapsed().as_secs_f64());
757    }
758}
759
760#[derive(Debug, Clone)]
761pub(crate) struct RunWithMeta<T> {
762    pub parts: Vec<RunPart<T>>,
763    pub num_updates: usize,
764}
765
766impl<T> RunWithMeta<T> {
767    pub fn new(parts: Vec<RunPart<T>>, num_updates: usize) -> Self {
768        Self { parts, num_updates }
769    }
770
771    pub fn single(part: RunPart<T>, num_updates: usize) -> Self {
772        Self {
773            parts: vec![part],
774            num_updates,
775        }
776    }
777}
778
779#[derive(Debug)]
780enum WritingRuns<T> {
781    /// Building a single run with the specified ordering. Parts are expected to be internally
782    /// sorted and added in order. Merging a vec of parts will shift them out to a hollow run
783    /// in blob, bounding the total length of a run in memory.
784    Ordered(RunOrder, MergeTree<Pending<RunWithMeta<T>>>),
785    /// Building multiple runs which may have different orders. Merging a vec of runs will cause
786    /// them to be compacted together, bounding the total number of runs we generate.
787    Compacting(MergeTree<(RunOrder, Pending<RunWithMeta<T>>)>),
788}
789
790// TODO: If this is dropped, cancel (and delete?) any writing parts and delete
791// any finished ones.
792#[derive(Debug)]
793pub(crate) struct BatchParts<T> {
794    cfg: BatchBuilderConfig,
795    metrics: Arc<Metrics>,
796    shard_metrics: Arc<ShardMetrics>,
797    shard_id: ShardId,
798    blob: Arc<dyn Blob>,
799    isolated_runtime: Arc<IsolatedRuntime>,
800    next_index: u64,
801    writing_runs: WritingRuns<T>,
802    batch_metrics: BatchWriteMetrics,
803}
804
805impl<T: Timestamp + Codec64> BatchParts<T> {
806    pub(crate) fn new_compacting<K, V, D>(
807        cfg: CompactConfig,
808        desc: Description<T>,
809        runs_per_compaction: usize,
810        metrics: Arc<Metrics>,
811        shard_metrics: Arc<ShardMetrics>,
812        shard_id: ShardId,
813        blob: Arc<dyn Blob>,
814        isolated_runtime: Arc<IsolatedRuntime>,
815        batch_metrics: &BatchWriteMetrics,
816        schemas: Schemas<K, V>,
817    ) -> Self
818    where
819        K: Codec + Debug,
820        V: Codec + Debug,
821        T: Lattice + Send + Sync,
822        D: Semigroup + Ord + Codec64 + Send + Sync,
823    {
824        let writing_runs = {
825            let cfg = cfg.clone();
826            let blob = Arc::clone(&blob);
827            let metrics = Arc::clone(&metrics);
828            let shard_metrics = Arc::clone(&shard_metrics);
829            let isolated_runtime = Arc::clone(&isolated_runtime);
830            // Clamping to prevent extreme values given weird configs.
831            let runs_per_compaction = runs_per_compaction.clamp(2, 1024);
832
833            let merge_fn = move |parts: Vec<(RunOrder, Pending<RunWithMeta<T>>)>| {
834                let blob = Arc::clone(&blob);
835                let metrics = Arc::clone(&metrics);
836                let shard_metrics = Arc::clone(&shard_metrics);
837                let cfg = cfg.clone();
838                let isolated_runtime = Arc::clone(&isolated_runtime);
839                let write_schemas = schemas.clone();
840                let compact_desc = desc.clone();
841                let handle = mz_ore::task::spawn(
842                    || "batch::compact_runs",
843                    async move {
844                        let runs: Vec<_> = stream::iter(parts)
845                            .then(|(order, parts)| async move {
846                                let completed_run = parts.into_result().await;
847                                (
848                                    RunMeta {
849                                        order: Some(order),
850                                        schema: schemas.id,
851                                        // Field has been deprecated but kept around to
852                                        // roundtrip state.
853                                        deprecated_schema: None,
854                                        id: if cfg.incremental_compaction {
855                                            Some(RunId::new())
856                                        } else {
857                                            None
858                                        },
859                                        len: if cfg.incremental_compaction {
860                                            Some(completed_run.num_updates)
861                                        } else {
862                                            None
863                                        },
864                                    },
865                                    completed_run.parts,
866                                )
867                            })
868                            .collect()
869                            .await;
870
871                        let run_refs: Vec<_> = runs
872                            .iter()
873                            .map(|(meta, run)| (&compact_desc, meta, run.as_slice()))
874                            .collect();
875
876                        let output_batch = Compactor::<K, V, T, D>::compact_runs(
877                            &cfg,
878                            &shard_id,
879                            &compact_desc,
880                            run_refs,
881                            blob,
882                            metrics,
883                            shard_metrics,
884                            isolated_runtime,
885                            write_schemas,
886                        )
887                        .await
888                        .expect("successful compaction");
889
890                        assert_eq!(
891                            output_batch.run_meta.len(),
892                            1,
893                            "compaction is guaranteed to emit a single run"
894                        );
895                        let total_compacted_updates: usize = output_batch.len;
896
897                        RunWithMeta::new(output_batch.parts, total_compacted_updates)
898                    }
899                    .instrument(debug_span!("batch::compact_runs")),
900                );
901                (RunOrder::Structured, Pending::new(handle))
902            };
903            WritingRuns::Compacting(MergeTree::new(runs_per_compaction, merge_fn))
904        };
905        BatchParts {
906            cfg: cfg.batch,
907            metrics,
908            shard_metrics,
909            shard_id,
910            blob,
911            isolated_runtime,
912            next_index: 0,
913            writing_runs,
914            batch_metrics: batch_metrics.clone(),
915        }
916    }
917
918    pub(crate) fn new_ordered<D: Semigroup + Codec64>(
919        cfg: BatchBuilderConfig,
920        order: RunOrder,
921        metrics: Arc<Metrics>,
922        shard_metrics: Arc<ShardMetrics>,
923        shard_id: ShardId,
924        blob: Arc<dyn Blob>,
925        isolated_runtime: Arc<IsolatedRuntime>,
926        batch_metrics: &BatchWriteMetrics,
927    ) -> Self {
928        let writing_runs = {
929            let cfg = cfg.clone();
930            let blob = Arc::clone(&blob);
931            let metrics = Arc::clone(&metrics);
932            let writer_key = cfg.writer_key.clone();
933            // Don't spill "unordered" runs to S3, since we'll split them up into many single-element
934            // runs below.
935            let run_length_limit = (order == RunOrder::Unordered)
936                .then_some(usize::MAX)
937                .unwrap_or(cfg.run_length_limit);
938            let merge_fn = move |parts: Vec<Pending<RunWithMeta<T>>>| {
939                let blob = Arc::clone(&blob);
940                let writer_key = writer_key.clone();
941                let metrics = Arc::clone(&metrics);
942                let handle = mz_ore::task::spawn(
943                    || "batch::spill_run",
944                    async move {
945                        let completed_runs: Vec<RunWithMeta<T>> = stream::iter(parts)
946                            .then(|p| p.into_result())
947                            .collect()
948                            .await;
949
950                        let mut all_run_parts = Vec::new();
951                        let mut total_updates = 0;
952
953                        for completed_run in completed_runs {
954                            all_run_parts.extend(completed_run.parts);
955                            total_updates += completed_run.num_updates;
956                        }
957
958                        let run_ref = HollowRunRef::set::<D>(
959                            shard_id,
960                            blob.as_ref(),
961                            &writer_key,
962                            HollowRun {
963                                parts: all_run_parts,
964                            },
965                            &*metrics,
966                        )
967                        .await;
968
969                        RunWithMeta::single(RunPart::Many(run_ref), total_updates)
970                    }
971                    .instrument(debug_span!("batch::spill_run")),
972                );
973                Pending::new(handle)
974            };
975            WritingRuns::Ordered(order, MergeTree::new(run_length_limit, merge_fn))
976        };
977        BatchParts {
978            cfg,
979            metrics,
980            shard_metrics,
981            shard_id,
982            blob,
983            isolated_runtime,
984            next_index: 0,
985            writing_runs,
986            batch_metrics: batch_metrics.clone(),
987        }
988    }
989
990    pub(crate) fn expected_order(&self) -> RunOrder {
991        match self.writing_runs {
992            WritingRuns::Ordered(order, _) => order,
993            WritingRuns::Compacting(_) => RunOrder::Unordered,
994        }
995    }
996
997    pub(crate) async fn write<K: Codec, V: Codec, D: Codec64>(
998        &mut self,
999        write_schemas: &Schemas<K, V>,
1000        desc: Description<T>,
1001        updates: Part,
1002        diffs_sum: D,
1003    ) {
1004        let batch_metrics = self.batch_metrics.clone();
1005        let index = self.next_index;
1006        self.next_index += 1;
1007        let num_updates = updates.len();
1008        let ts_rewrite = None;
1009        let schema_id = write_schemas.id;
1010
1011        // If we're going to encode structured data then halve our limit since we're storing
1012        // it twice, once as binary encoded and once as structured.
1013        let inline_threshold = self.cfg.inline_writes_single_max_bytes;
1014
1015        let updates = BlobTraceUpdates::from_part(updates);
1016        let (name, write_future) = if updates.goodbytes() < inline_threshold {
1017            let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current();
1018            (
1019                "batch::inline_part",
1020                async move {
1021                    let start = Instant::now();
1022                    let updates = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
1023                        desc: Some(desc.into_proto()),
1024                        index: index.into_proto(),
1025                        updates: Some(updates.into_proto()),
1026                    });
1027                    batch_metrics
1028                        .step_inline
1029                        .inc_by(start.elapsed().as_secs_f64());
1030
1031                    RunWithMeta::single(
1032                        RunPart::Single(BatchPart::Inline {
1033                            updates,
1034                            ts_rewrite,
1035                            schema_id,
1036                            // Field has been deprecated but kept around to roundtrip state.
1037                            deprecated_schema_id: None,
1038                        }),
1039                        num_updates,
1040                    )
1041                }
1042                .instrument(span)
1043                .boxed(),
1044            )
1045        } else {
1046            let part = BlobTraceBatchPart {
1047                desc,
1048                updates,
1049                index,
1050            };
1051            let cfg = self.cfg.clone();
1052            let blob = Arc::clone(&self.blob);
1053            let metrics = Arc::clone(&self.metrics);
1054            let shard_metrics = Arc::clone(&self.shard_metrics);
1055            let isolated_runtime = Arc::clone(&self.isolated_runtime);
1056            let expected_order = self.expected_order();
1057            let encoded_diffs_sum = D::encode(&diffs_sum);
1058            let write_schemas_clone = write_schemas.clone();
1059            let write_span =
1060                debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current();
1061            (
1062                "batch::write_part",
1063                async move {
1064                    let part = BatchParts::write_hollow_part(
1065                        cfg,
1066                        blob,
1067                        metrics,
1068                        shard_metrics,
1069                        batch_metrics,
1070                        isolated_runtime,
1071                        part,
1072                        expected_order,
1073                        ts_rewrite,
1074                        encoded_diffs_sum,
1075                        write_schemas_clone,
1076                    )
1077                    .await;
1078                    RunWithMeta::single(RunPart::Single(part), num_updates)
1079                }
1080                .instrument(write_span)
1081                .boxed(),
1082            )
1083        };
1084
1085        match &mut self.writing_runs {
1086            WritingRuns::Ordered(_order, run) => {
1087                let part = Pending::new(mz_ore::task::spawn(|| name, write_future));
1088                run.push(part);
1089
1090                // If there are more than the max outstanding parts, block on all but the
1091                //  most recent.
1092                for part in run
1093                    .iter_mut()
1094                    .rev()
1095                    .skip(self.cfg.batch_builder_max_outstanding_parts)
1096                    .take_while(|p| !p.is_finished())
1097                {
1098                    self.batch_metrics.write_stalls.inc();
1099                    part.block_until_ready().await;
1100                }
1101            }
1102            WritingRuns::Compacting(batches) => {
1103                let run = Pending::Writing(mz_ore::task::spawn(|| name, write_future));
1104                batches.push((RunOrder::Unordered, run));
1105
1106                // Allow up to `max_outstanding_parts` (or one compaction) to be pending, and block
1107                // on the rest.
1108                let mut part_budget = self.cfg.batch_builder_max_outstanding_parts;
1109                let mut compaction_budget = 1;
1110                for (_, part) in batches
1111                    .iter_mut()
1112                    .rev()
1113                    .skip_while(|(order, _)| match order {
1114                        RunOrder::Unordered if part_budget > 0 => {
1115                            part_budget -= 1;
1116                            true
1117                        }
1118                        RunOrder::Structured | RunOrder::Codec if compaction_budget > 0 => {
1119                            compaction_budget -= 1;
1120                            true
1121                        }
1122                        _ => false,
1123                    })
1124                    .take_while(|(_, p)| !p.is_finished())
1125                {
1126                    self.batch_metrics.write_stalls.inc();
1127                    part.block_until_ready().await;
1128                }
1129            }
1130        }
1131    }
1132
1133    async fn write_hollow_part<K: Codec, V: Codec>(
1134        cfg: BatchBuilderConfig,
1135        blob: Arc<dyn Blob>,
1136        metrics: Arc<Metrics>,
1137        shard_metrics: Arc<ShardMetrics>,
1138        batch_metrics: BatchWriteMetrics,
1139        isolated_runtime: Arc<IsolatedRuntime>,
1140        mut updates: BlobTraceBatchPart<T>,
1141        run_order: RunOrder,
1142        ts_rewrite: Option<Antichain<T>>,
1143        diffs_sum: [u8; 8],
1144        write_schemas: Schemas<K, V>,
1145    ) -> BatchPart<T> {
1146        let partial_key = PartialBatchKey::new(&cfg.writer_key, &PartId::new());
1147        let key = partial_key.complete(&shard_metrics.shard_id);
1148        let goodbytes = updates.updates.goodbytes();
1149        let metrics_ = Arc::clone(&metrics);
1150        let schema_id = write_schemas.id;
1151
1152        let (stats, key_lower, structured_key_lower, (buf, encode_time)) = isolated_runtime
1153            .spawn_named(|| "batch::encode_part", async move {
1154                // Measure the expensive steps of the part build - re-encoding and stats collection.
1155                let stats = metrics_.columnar.arrow().measure_part_build(|| {
1156                    let stats = if cfg.stats_collection_enabled {
1157                        let ext = updates.updates.get_or_make_structured::<K, V>(
1158                            write_schemas.key.as_ref(),
1159                            write_schemas.val.as_ref(),
1160                        );
1161
1162                        let key_stats = write_schemas
1163                            .key
1164                            .decoder_any(ext.key.as_ref())
1165                            .expect("decoding just-encoded data")
1166                            .stats();
1167
1168                        let part_stats = PartStats { key: key_stats };
1169
1170                        // Collect stats about the updates, if stats collection is enabled.
1171                        let trimmed_start = Instant::now();
1172                        let mut trimmed_bytes = 0;
1173                        let trimmed_stats = LazyPartStats::encode(&part_stats, |s| {
1174                            trimmed_bytes = trim_to_budget(s, cfg.stats_budget, |s| {
1175                                cfg.stats_untrimmable_columns.should_retain(s)
1176                            })
1177                        });
1178                        let trimmed_duration = trimmed_start.elapsed();
1179                        Some((trimmed_stats, trimmed_duration, trimmed_bytes))
1180                    } else {
1181                        None
1182                    };
1183
1184                    // Ensure the updates are in the specified columnar format before encoding.
1185                    updates.updates = updates.updates.as_structured::<K, V>(
1186                        write_schemas.key.as_ref(),
1187                        write_schemas.val.as_ref(),
1188                    );
1189
1190                    stats
1191                });
1192
1193                let key_lower = if let Some(records) = updates.updates.records() {
1194                    let key_bytes = records.keys();
1195                    if key_bytes.is_empty() {
1196                        &[]
1197                    } else if run_order == RunOrder::Codec {
1198                        key_bytes.value(0)
1199                    } else {
1200                        ::arrow::compute::min_binary(key_bytes).expect("min of nonempty array")
1201                    }
1202                } else {
1203                    &[]
1204                };
1205                let key_lower = truncate_bytes(key_lower, TRUNCATE_LEN, TruncateBound::Lower)
1206                    .expect("lower bound always exists");
1207
1208                let structured_key_lower = if cfg.structured_key_lower_len > 0 {
1209                    updates.updates.structured().and_then(|ext| {
1210                        let min_key = if run_order == RunOrder::Structured {
1211                            0
1212                        } else {
1213                            let ord = ArrayOrd::new(ext.key.as_ref());
1214                            (0..ext.key.len())
1215                                .min_by_key(|i| ord.at(*i))
1216                                .expect("non-empty batch")
1217                        };
1218                        let lower = ArrayBound::new(Arc::clone(&ext.key), min_key)
1219                            .to_proto_lower(cfg.structured_key_lower_len);
1220                        if lower.is_none() {
1221                            batch_metrics.key_lower_too_big.inc()
1222                        }
1223                        lower.map(|proto| LazyProto::from(&proto))
1224                    })
1225                } else {
1226                    None
1227                };
1228
1229                let encode_start = Instant::now();
1230                let mut buf = Vec::new();
1231                updates.encode(&mut buf, &metrics_.columnar, &cfg.encoding_config);
1232
1233                // Drop batch as soon as we can to reclaim its memory.
1234                drop(updates);
1235                (
1236                    stats,
1237                    key_lower,
1238                    structured_key_lower,
1239                    (Bytes::from(buf), encode_start.elapsed()),
1240                )
1241            })
1242            .instrument(debug_span!("batch::encode_part"))
1243            .await
1244            .expect("part encode task failed");
1245        // Can't use the `CodecMetrics::encode` helper because of async.
1246        metrics.codecs.batch.encode_count.inc();
1247        metrics
1248            .codecs
1249            .batch
1250            .encode_seconds
1251            .inc_by(encode_time.as_secs_f64());
1252
1253        let start = Instant::now();
1254        let payload_len = buf.len();
1255        let () = retry_external(&metrics.retries.external.batch_set, || async {
1256            shard_metrics.blob_sets.inc();
1257            blob.set(&key, Bytes::clone(&buf)).await
1258        })
1259        .instrument(trace_span!("batch::set", payload_len))
1260        .await;
1261        batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64());
1262        batch_metrics.bytes.inc_by(u64::cast_from(payload_len));
1263        batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes));
1264        match run_order {
1265            RunOrder::Unordered => batch_metrics.unordered.inc(),
1266            RunOrder::Codec => batch_metrics.codec_order.inc(),
1267            RunOrder::Structured => batch_metrics.structured_order.inc(),
1268        }
1269        let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| {
1270            batch_metrics
1271                .step_stats
1272                .inc_by(stats_step_timing.as_secs_f64());
1273            if trimmed_bytes > 0 {
1274                metrics.pushdown.parts_stats_trimmed_count.inc();
1275                metrics
1276                    .pushdown
1277                    .parts_stats_trimmed_bytes
1278                    .inc_by(u64::cast_from(trimmed_bytes));
1279            }
1280            stats
1281        });
1282
1283        BatchPart::Hollow(HollowBatchPart {
1284            key: partial_key,
1285            encoded_size_bytes: payload_len,
1286            key_lower,
1287            structured_key_lower,
1288            stats,
1289            ts_rewrite,
1290            diffs_sum: Some(diffs_sum),
1291            format: Some(BatchColumnarFormat::Structured),
1292            schema_id,
1293            // Field has been deprecated but kept around to roundtrip state.
1294            deprecated_schema_id: None,
1295        })
1296    }
1297
1298    #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))]
1299    pub(crate) async fn finish(self) -> Vec<(RunOrder, Vec<RunPart<T>>, usize)> {
1300        match self.writing_runs {
1301            WritingRuns::Ordered(RunOrder::Unordered, run) => {
1302                let completed_runs = run.finish();
1303                let mut output = Vec::with_capacity(completed_runs.len());
1304                for completed_run in completed_runs {
1305                    let completed_run = completed_run.into_result().await;
1306                    // Each part becomes its own run for unordered case
1307                    for part in completed_run.parts {
1308                        output.push((RunOrder::Unordered, vec![part], completed_run.num_updates));
1309                    }
1310                }
1311                output
1312            }
1313            WritingRuns::Ordered(order, run) => {
1314                let completed_runs = run.finish();
1315                let mut all_parts = Vec::new();
1316                let mut all_update_counts = 0;
1317                for completed_run in completed_runs {
1318                    let completed_run = completed_run.into_result().await;
1319                    all_parts.extend(completed_run.parts);
1320                    all_update_counts += completed_run.num_updates;
1321                }
1322                vec![(order, all_parts, all_update_counts)]
1323            }
1324            WritingRuns::Compacting(batches) => {
1325                let runs = batches.finish();
1326                let mut output = Vec::new();
1327                for (order, run) in runs {
1328                    let completed_run = run.into_result().await;
1329                    output.push((order, completed_run.parts, completed_run.num_updates));
1330                }
1331                output
1332            }
1333        }
1334    }
1335}
1336
1337pub(crate) fn validate_truncate_batch<T: Timestamp>(
1338    batch: &HollowBatch<T>,
1339    truncate: &Description<T>,
1340    any_batch_rewrite: bool,
1341    enforce_matching_batch_boundaries: bool,
1342) -> Result<(), InvalidUsage<T>> {
1343    // If rewrite_ts is used, we don't allow truncation, to keep things simpler
1344    // to reason about.
1345    if any_batch_rewrite {
1346        // We allow a new upper to be specified at rewrite time, so that's easy:
1347        // it must match exactly. This is both consistent with the upper
1348        // requirement below and proves that there is no data to truncate past
1349        // the upper.
1350        if truncate.upper() != batch.desc.upper() {
1351            return Err(InvalidUsage::InvalidRewrite(format!(
1352                "rewritten batch might have data past {:?} up to {:?}",
1353                truncate.upper().elements(),
1354                batch.desc.upper().elements(),
1355            )));
1356        }
1357        // To prove that there is no data to truncate below the lower, require
1358        // that the lower is <= the rewrite ts.
1359        for part in batch.parts.iter() {
1360            let part_lower_bound = part.ts_rewrite().unwrap_or_else(|| batch.desc.lower());
1361            if !PartialOrder::less_equal(truncate.lower(), part_lower_bound) {
1362                return Err(InvalidUsage::InvalidRewrite(format!(
1363                    "rewritten batch might have data below {:?} at {:?}",
1364                    truncate.lower().elements(),
1365                    part_lower_bound.elements(),
1366                )));
1367            }
1368        }
1369    }
1370
1371    if !enforce_matching_batch_boundaries {
1372        return Ok(());
1373    }
1374
1375    let batch = &batch.desc;
1376    if !PartialOrder::less_equal(batch.lower(), truncate.lower())
1377        || PartialOrder::less_than(batch.upper(), truncate.upper())
1378    {
1379        return Err(InvalidUsage::InvalidBatchBounds {
1380            batch_lower: batch.lower().clone(),
1381            batch_upper: batch.upper().clone(),
1382            append_lower: truncate.lower().clone(),
1383            append_upper: truncate.upper().clone(),
1384        });
1385    }
1386
1387    Ok(())
1388}
1389
1390#[derive(Debug)]
1391pub(crate) struct PartDeletes<T> {
1392    /// Keys to hollow parts or runs that we're ready to delete.
1393    blob_keys: BTreeSet<PartialBatchKey>,
1394    /// Keys to hollow runs that may not have had all their parts deleted (or added to blob_keys) yet.
1395    hollow_runs: BTreeMap<PartialBatchKey, HollowRunRef<T>>,
1396}
1397
1398impl<T> Default for PartDeletes<T> {
1399    fn default() -> Self {
1400        Self {
1401            blob_keys: Default::default(),
1402            hollow_runs: Default::default(),
1403        }
1404    }
1405}
1406
1407impl<T: Timestamp> PartDeletes<T> {
1408    // Adds the part to the set to be deleted and returns true if it was newly
1409    // inserted.
1410    pub fn add(&mut self, part: &RunPart<T>) -> bool {
1411        match part {
1412            RunPart::Many(r) => self.hollow_runs.insert(r.key.clone(), r.clone()).is_none(),
1413            RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.insert(x.key.clone()),
1414            RunPart::Single(BatchPart::Inline { .. }) => {
1415                // Nothing to delete.
1416                true
1417            }
1418        }
1419    }
1420
1421    pub fn contains(&self, part: &RunPart<T>) -> bool {
1422        match part {
1423            RunPart::Many(r) => self.hollow_runs.contains_key(&r.key),
1424            RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.contains(&x.key),
1425            RunPart::Single(BatchPart::Inline { .. }) => false,
1426        }
1427    }
1428
1429    pub fn is_empty(&self) -> bool {
1430        self.len() == 0
1431    }
1432
1433    pub fn len(&self) -> usize {
1434        match self {
1435            Self {
1436                blob_keys,
1437                hollow_runs,
1438            } => blob_keys.len() + hollow_runs.len(),
1439        }
1440    }
1441
1442    pub async fn delete(
1443        mut self,
1444        blob: &dyn Blob,
1445        shard_id: ShardId,
1446        concurrency: usize,
1447        metrics: &Metrics,
1448        delete_metrics: &RetryMetrics,
1449    ) where
1450        T: Codec64,
1451    {
1452        loop {
1453            let () = stream::iter(mem::take(&mut self.blob_keys))
1454                .map(|key| {
1455                    let key = key.complete(&shard_id);
1456                    async move {
1457                        retry_external(delete_metrics, || blob.delete(&key)).await;
1458                    }
1459                })
1460                .buffer_unordered(concurrency)
1461                .collect()
1462                .await;
1463
1464            let Some((run_key, run_ref)) = self.hollow_runs.pop_first() else {
1465                break;
1466            };
1467
1468            if let Some(run) = run_ref.get(shard_id, blob, metrics).await {
1469                // Queue up both all the individual parts and the run itself for deletion.
1470                for part in &run.parts {
1471                    self.add(part);
1472                }
1473                self.blob_keys.insert(run_key);
1474            };
1475        }
1476    }
1477}
1478
1479/// Returns the total sum of diffs or None if there were no updates.
1480fn diffs_sum<D: Semigroup + Codec64>(updates: &Int64Array) -> Option<D> {
1481    let mut sum = None;
1482    for d in updates.values().iter() {
1483        let d = D::decode(d.to_le_bytes());
1484        match &mut sum {
1485            None => sum = Some(d),
1486            Some(x) => x.plus_equals(&d),
1487        }
1488    }
1489
1490    sum
1491}
1492
1493#[cfg(test)]
1494mod tests {
1495    use mz_dyncfg::ConfigUpdates;
1496
1497    use super::*;
1498    use crate::PersistLocation;
1499    use crate::cache::PersistClientCache;
1500    use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
1501    use crate::internal::paths::{BlobKey, PartialBlobKey};
1502    use crate::tests::{all_ok, new_test_client};
1503
1504    #[mz_ore::test(tokio::test)]
1505    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1506    async fn batch_builder_flushing() {
1507        let data = vec![
1508            (("1".to_owned(), "one".to_owned()), 1, 1),
1509            (("2".to_owned(), "two".to_owned()), 2, 1),
1510            (("3".to_owned(), "three".to_owned()), 3, 1),
1511            (("4".to_owned(), "four".to_owned()), 4, 1),
1512        ];
1513
1514        let cache = PersistClientCache::new_no_metrics();
1515
1516        // Set blob_target_size to 0 so that each row gets forced into its own
1517        // batch. Set max_outstanding to a small value that's >1 to test various
1518        // edge cases below.
1519        cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1520        cache.cfg.set_config(&MAX_RUNS, 3);
1521        cache
1522            .cfg
1523            .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 2);
1524
1525        let client = cache
1526            .open(PersistLocation::new_in_mem())
1527            .await
1528            .expect("client construction failed");
1529        let (mut write, mut read) = client
1530            .expect_open::<String, String, u64, i64>(ShardId::new())
1531            .await;
1532
1533        // A new builder has no writing or finished parts.
1534        let mut builder = write.builder(Antichain::from_elem(0));
1535
1536        fn assert_writing(
1537            builder: &BatchBuilder<String, String, u64, i64>,
1538            expected_finished: &[bool],
1539        ) {
1540            let WritingRuns::Compacting(run) = &builder.builder.parts.writing_runs else {
1541                unreachable!("ordered run!")
1542            };
1543
1544            let actual: Vec<_> = run.iter().map(|(_, p)| p.is_finished()).collect();
1545            assert_eq!(*expected_finished, actual);
1546        }
1547
1548        assert_writing(&builder, &[]);
1549
1550        // We set blob_target_size to 0, so the first update gets forced out
1551        // into a run.
1552        let ((k, v), t, d) = &data[0];
1553        builder.add(k, v, t, d).await.expect("invalid usage");
1554        assert_writing(&builder, &[false]);
1555
1556        // We set batch_builder_max_outstanding_parts to 2, so we are allowed to
1557        // pipeline a second part.
1558        let ((k, v), t, d) = &data[1];
1559        builder.add(k, v, t, d).await.expect("invalid usage");
1560        assert_writing(&builder, &[false, false]);
1561
1562        // But now that we have 3 parts, the add call back-pressures until the
1563        // first one finishes.
1564        let ((k, v), t, d) = &data[2];
1565        builder.add(k, v, t, d).await.expect("invalid usage");
1566        assert_writing(&builder, &[true, false, false]);
1567
1568        // Finally, pushing a fourth part will cause the first three to spill out into
1569        // a new compacted run.
1570        let ((k, v), t, d) = &data[3];
1571        builder.add(k, v, t, d).await.expect("invalid usage");
1572        assert_writing(&builder, &[false, false]);
1573
1574        // Finish off the batch and verify that the keys and such get plumbed
1575        // correctly by reading the data back.
1576        let batch = builder
1577            .finish(Antichain::from_elem(5))
1578            .await
1579            .expect("invalid usage");
1580        assert_eq!(batch.batch.runs().count(), 2);
1581        assert_eq!(batch.batch.part_count(), 4);
1582        write
1583            .append_batch(batch, Antichain::from_elem(0), Antichain::from_elem(5))
1584            .await
1585            .expect("invalid usage")
1586            .expect("unexpected upper");
1587        assert_eq!(read.expect_snapshot_and_fetch(4).await, all_ok(&data, 4));
1588    }
1589
1590    #[mz_ore::test(tokio::test)]
1591    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1592    async fn batch_builder_keys() {
1593        let cache = PersistClientCache::new_no_metrics();
1594        // Set blob_target_size to 0 so that each row gets forced into its own batch part
1595        cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1596        // Otherwise fails: expected hollow part!
1597        cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
1598        cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1599        cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1600        let client = cache
1601            .open(PersistLocation::new_in_mem())
1602            .await
1603            .expect("client construction failed");
1604        let shard_id = ShardId::new();
1605        let (mut write, _) = client
1606            .expect_open::<String, String, u64, i64>(shard_id)
1607            .await;
1608
1609        let batch = write
1610            .expect_batch(
1611                &[
1612                    (("1".into(), "one".into()), 1, 1),
1613                    (("2".into(), "two".into()), 2, 1),
1614                    (("3".into(), "three".into()), 3, 1),
1615                ],
1616                0,
1617                4,
1618            )
1619            .await;
1620
1621        assert_eq!(batch.batch.part_count(), 3);
1622        for part in &batch.batch.parts {
1623            let part = part.expect_hollow_part();
1624            match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
1625                Ok((shard, PartialBlobKey::Batch(writer, _))) => {
1626                    assert_eq!(shard.to_string(), shard_id.to_string());
1627                    assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
1628                }
1629                _ => panic!("unparseable blob key"),
1630            }
1631        }
1632    }
1633
1634    #[mz_ore::test(tokio::test)]
1635    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1636    async fn batch_delete() {
1637        let cache = PersistClientCache::new_no_metrics();
1638        cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1639        cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1640        cache.cfg.set_config(&BATCH_DELETE_ENABLED, true);
1641        let client = cache
1642            .open(PersistLocation::new_in_mem())
1643            .await
1644            .expect("client construction failed");
1645        let shard_id = ShardId::new();
1646        let (mut write, _) = client
1647            .expect_open::<String, String, u64, i64>(shard_id)
1648            .await;
1649
1650        let batch = write
1651            .expect_batch(
1652                &[
1653                    (("1".into(), "one".into()), 1, 1),
1654                    (("2".into(), "two".into()), 2, 1),
1655                    (("3".into(), "three".into()), 3, 1),
1656                ],
1657                0,
1658                4,
1659            )
1660            .await;
1661
1662        assert_eq!(batch.batch.part_count(), 1);
1663        let part_key = batch.batch.parts[0]
1664            .expect_hollow_part()
1665            .key
1666            .complete(&shard_id);
1667
1668        let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1669        assert!(part_bytes.is_some());
1670
1671        batch.delete().await;
1672
1673        let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1674        assert!(part_bytes.is_none());
1675    }
1676
1677    #[mz_ore::test]
1678    fn untrimmable_columns() {
1679        let untrimmable = UntrimmableColumns {
1680            equals: vec!["abc".into(), "def".into()],
1681            prefixes: vec!["123".into(), "234".into()],
1682            suffixes: vec!["xyz".into()],
1683        };
1684
1685        // equals
1686        assert!(untrimmable.should_retain("abc"));
1687        assert!(untrimmable.should_retain("ABC"));
1688        assert!(untrimmable.should_retain("aBc"));
1689        assert!(!untrimmable.should_retain("abcd"));
1690        assert!(untrimmable.should_retain("deF"));
1691        assert!(!untrimmable.should_retain("defg"));
1692
1693        // prefix
1694        assert!(untrimmable.should_retain("123"));
1695        assert!(untrimmable.should_retain("123-4"));
1696        assert!(untrimmable.should_retain("1234"));
1697        assert!(untrimmable.should_retain("234"));
1698        assert!(!untrimmable.should_retain("345"));
1699
1700        // suffix
1701        assert!(untrimmable.should_retain("ijk_xyZ"));
1702        assert!(untrimmable.should_retain("ww-XYZ"));
1703        assert!(!untrimmable.should_retain("xya"));
1704    }
1705
1706    // NB: Most edge cases are exercised in datadriven tests.
1707    #[mz_persist_proc::test(tokio::test)]
1708    #[cfg_attr(miri, ignore)] // too slow
1709    async fn rewrite_ts_example(dyncfgs: ConfigUpdates) {
1710        let client = new_test_client(&dyncfgs).await;
1711        let (mut write, read) = client
1712            .expect_open::<String, (), u64, i64>(ShardId::new())
1713            .await;
1714
1715        let mut batch = write.builder(Antichain::from_elem(0));
1716        batch.add(&"foo".to_owned(), &(), &0, &1).await.unwrap();
1717        let batch = batch.finish(Antichain::from_elem(1)).await.unwrap();
1718
1719        // Roundtrip through a transmittable batch.
1720        let batch = batch.into_transmittable_batch();
1721        let mut batch = write.batch_from_transmittable_batch(batch);
1722        batch
1723            .rewrite_ts(&Antichain::from_elem(2), Antichain::from_elem(3))
1724            .unwrap();
1725        write
1726            .expect_compare_and_append_batch(&mut [&mut batch], 0, 3)
1727            .await;
1728
1729        let (actual, _) = read.expect_listen(0).await.read_until(&3).await;
1730        let expected = vec![(((Ok("foo".to_owned())), Ok(())), 2, 1)];
1731        assert_eq!(actual, expected);
1732    }
1733
1734    #[mz_ore::test(tokio::test)]
1735    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1736    async fn structured_lowers() {
1737        let cache = PersistClientCache::new_no_metrics();
1738        // Ensure structured data is calculated, and that we give some budget for a key lower.
1739        cache.cfg().set_config(&STRUCTURED_KEY_LOWER_LEN, 1024);
1740        // Otherwise fails: expected hollow part!
1741        cache.cfg().set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1742        cache.cfg().set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1743        let client = cache
1744            .open(PersistLocation::new_in_mem())
1745            .await
1746            .expect("client construction failed");
1747        let shard_id = ShardId::new();
1748        let (mut write, _) = client
1749            .expect_open::<String, String, u64, i64>(shard_id)
1750            .await;
1751
1752        let batch = write
1753            .expect_batch(
1754                &[
1755                    (("1".into(), "one".into()), 1, 1),
1756                    (("2".into(), "two".into()), 2, 1),
1757                    (("3".into(), "three".into()), 3, 1),
1758                ],
1759                0,
1760                4,
1761            )
1762            .await;
1763
1764        assert_eq!(batch.batch.part_count(), 1);
1765        let [part] = batch.batch.parts.as_slice() else {
1766            panic!("expected single part")
1767        };
1768        // Verifies that the structured key lower is stored and decoded.
1769        assert!(part.structured_key_lower().is_some());
1770    }
1771}