mz_persist_client/
batch.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A handle to a batch of updates
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::marker::PhantomData;
16use std::mem;
17use std::sync::Arc;
18use std::time::Instant;
19
20use arrow::array::{Array, Int64Array};
21use bytes::Bytes;
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::stream::StreamExt;
26use futures_util::{FutureExt, stream};
27use mz_dyncfg::Config;
28use mz_ore::cast::CastFrom;
29use mz_ore::instrument;
30use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::Blob;
32use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
33use mz_persist_types::columnar::{ColumnDecoder, Schema};
34use mz_persist_types::parquet::{CompressionFormat, EncodingConfig};
35use mz_persist_types::part::{Part, PartBuilder};
36use mz_persist_types::schema::SchemaId;
37use mz_persist_types::stats::{
38    PartStats, TRUNCATE_LEN, TruncateBound, trim_to_budget, truncate_bytes,
39};
40use mz_persist_types::{Codec, Codec64};
41use mz_proto::RustType;
42use mz_timely_util::order::Reverse;
43use proptest_derive::Arbitrary;
44use semver::Version;
45use timely::PartialOrder;
46use timely::order::TotalOrder;
47use timely::progress::{Antichain, Timestamp};
48use tracing::{Instrument, debug_span, trace_span, warn};
49
50use crate::async_runtime::IsolatedRuntime;
51use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
52use crate::error::InvalidUsage;
53use crate::internal::compact::{CompactConfig, Compactor};
54use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
55use crate::internal::machine::retry_external;
56use crate::internal::merge::{MergeTree, Pending};
57use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
58use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
59use crate::internal::state::{
60    BatchPart, HollowBatch, HollowBatchPart, HollowRun, HollowRunRef, ProtoInlineBatchPart,
61    RunMeta, RunOrder, RunPart,
62};
63use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns};
64use crate::{PersistConfig, ShardId};
65
66include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs"));
67
68/// A handle to a batch of updates that has been written to blob storage but
69/// which has not yet been appended to a shard.
70///
71/// A [Batch] needs to be marked as consumed or it needs to be deleted via [Self::delete].
72/// Otherwise, a dangling batch will leak and backing blobs will remain in blob storage.
73#[derive(Debug)]
74pub struct Batch<K, V, T, D> {
75    pub(crate) batch_delete_enabled: bool,
76    pub(crate) metrics: Arc<Metrics>,
77    pub(crate) shard_metrics: Arc<ShardMetrics>,
78
79    /// The version of Materialize which wrote this batch.
80    pub(crate) version: Version,
81
82    /// A handle to the data represented by this batch.
83    pub(crate) batch: HollowBatch<T>,
84
85    /// Handle to the [Blob] that the blobs of this batch were uploaded to.
86    pub(crate) blob: Arc<dyn Blob>,
87
88    // These provide a bit more safety against appending a batch with the wrong
89    // type to a shard.
90    pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
91}
92
93impl<K, V, T, D> Drop for Batch<K, V, T, D> {
94    fn drop(&mut self) {
95        if self.batch.part_count() > 0 {
96            warn!(
97                "un-consumed Batch, with {} parts and dangling blob keys: {:?}",
98                self.batch.part_count(),
99                self.batch
100                    .parts
101                    .iter()
102                    .map(|x| x.printable_name())
103                    .collect::<Vec<_>>(),
104            );
105        }
106    }
107}
108
109impl<K, V, T, D> Batch<K, V, T, D>
110where
111    K: Debug + Codec,
112    V: Debug + Codec,
113    T: Timestamp + Lattice + Codec64,
114    D: Semigroup + Codec64,
115{
116    pub(crate) fn new(
117        batch_delete_enabled: bool,
118        metrics: Arc<Metrics>,
119        blob: Arc<dyn Blob>,
120        shard_metrics: Arc<ShardMetrics>,
121        version: Version,
122        batch: HollowBatch<T>,
123    ) -> Self {
124        Self {
125            batch_delete_enabled,
126            metrics,
127            shard_metrics,
128            version,
129            batch,
130            blob,
131            _phantom: PhantomData,
132        }
133    }
134
135    /// The `shard_id` of this [Batch].
136    pub fn shard_id(&self) -> ShardId {
137        self.shard_metrics.shard_id
138    }
139
140    /// The `upper` of this [Batch].
141    pub fn upper(&self) -> &Antichain<T> {
142        self.batch.desc.upper()
143    }
144
145    /// The `lower` of this [Batch].
146    pub fn lower(&self) -> &Antichain<T> {
147        self.batch.desc.lower()
148    }
149
150    /// Marks the blobs that this batch handle points to as consumed, likely
151    /// because they were appended to a shard.
152    ///
153    /// Consumers of a blob need to make this explicit, so that we can log
154    /// warnings in case a batch is not used.
155    pub(crate) fn mark_consumed(&mut self) {
156        self.batch.parts.clear();
157    }
158
159    /// Deletes the blobs that make up this batch from the given blob store and
160    /// marks them as deleted.
161    #[instrument(level = "debug", fields(shard = %self.shard_id()))]
162    pub async fn delete(mut self) {
163        if !self.batch_delete_enabled {
164            self.mark_consumed();
165            return;
166        }
167        let mut deletes = PartDeletes::default();
168        for part in self.batch.parts.drain(..) {
169            deletes.add(&part);
170        }
171        let () = deletes
172            .delete(
173                &*self.blob,
174                self.shard_id(),
175                usize::MAX,
176                &*self.metrics,
177                &*self.metrics.retries.external.batch_delete,
178            )
179            .await;
180    }
181
182    /// Returns the schemas of parts in this batch.
183    pub fn schemas(&self) -> impl Iterator<Item = SchemaId> + '_ {
184        self.batch.parts.iter().flat_map(|b| b.schema_id())
185    }
186
187    /// Turns this [`Batch`] into a `HollowBatch`.
188    ///
189    /// **NOTE**: If this batch is not eventually appended to a shard or
190    /// dropped, the data that it represents will have leaked.
191    pub fn into_hollow_batch(mut self) -> HollowBatch<T> {
192        let ret = self.batch.clone();
193        self.mark_consumed();
194        ret
195    }
196
197    /// Turns this [`Batch`] into a [`ProtoBatch`], which can be used to
198    /// transfer this batch across process boundaries, for example when
199    /// exchanging data between timely workers.
200    ///
201    /// **NOTE**: If this batch is not eventually appended to a shard or
202    /// dropped, the data that it represents will have leaked. The caller is
203    /// responsible for turning this back into a [`Batch`] using
204    /// [`WriteHandle::batch_from_transmittable_batch`](crate::write::WriteHandle::batch_from_transmittable_batch).
205    pub fn into_transmittable_batch(mut self) -> ProtoBatch {
206        let ret = ProtoBatch {
207            shard_id: self.shard_metrics.shard_id.into_proto(),
208            version: self.version.to_string(),
209            batch: Some(self.batch.into_proto()),
210        };
211        self.mark_consumed();
212        ret
213    }
214
215    pub(crate) async fn flush_to_blob(
216        &mut self,
217        cfg: &BatchBuilderConfig,
218        batch_metrics: &BatchWriteMetrics,
219        isolated_runtime: &Arc<IsolatedRuntime>,
220        write_schemas: &Schemas<K, V>,
221    ) {
222        // It's necessary for correctness to keep the parts in the same order.
223        // We could introduce concurrency here with FuturesOrdered, but it would
224        // be pretty unexpected to have inline writes in more than one part, so
225        // don't bother.
226        let mut parts = Vec::new();
227        for (run_meta, run_parts) in self.batch.runs() {
228            for part in run_parts {
229                let (updates, ts_rewrite, schema_id) = match part {
230                    RunPart::Single(BatchPart::Inline {
231                        updates,
232                        ts_rewrite,
233                        schema_id,
234                        deprecated_schema_id: _,
235                    }) => (updates, ts_rewrite, schema_id),
236                    other @ RunPart::Many(_) | other @ RunPart::Single(BatchPart::Hollow(_)) => {
237                        parts.push(other.clone());
238                        continue;
239                    }
240                };
241                let updates = updates
242                    .decode::<T>(&self.metrics.columnar)
243                    .expect("valid inline part");
244                let diffs_sum =
245                    diffs_sum::<D>(updates.updates.diffs()).expect("inline parts are not empty");
246                let mut write_schemas = write_schemas.clone();
247                write_schemas.id = *schema_id;
248
249                let write_span =
250                    debug_span!("batch::flush_to_blob", shard = %self.shard_metrics.shard_id)
251                        .or_current();
252                let handle = mz_ore::task::spawn(
253                    || "batch::flush_to_blob",
254                    BatchParts::write_hollow_part(
255                        cfg.clone(),
256                        Arc::clone(&self.blob),
257                        Arc::clone(&self.metrics),
258                        Arc::clone(&self.shard_metrics),
259                        batch_metrics.clone(),
260                        Arc::clone(isolated_runtime),
261                        updates,
262                        run_meta.order.unwrap_or(RunOrder::Unordered),
263                        ts_rewrite.clone(),
264                        D::encode(&diffs_sum),
265                        write_schemas,
266                    )
267                    .instrument(write_span),
268                );
269                let part = handle.await.expect("part write task failed");
270                parts.push(RunPart::Single(part));
271            }
272        }
273        self.batch.parts = parts;
274    }
275}
276
277impl<K, V, T, D> Batch<K, V, T, D>
278where
279    K: Debug + Codec,
280    V: Debug + Codec,
281    T: Timestamp + Lattice + Codec64 + TotalOrder,
282    D: Semigroup + Codec64,
283{
284    /// Efficiently rewrites the timestamps in this not-yet-committed batch.
285    ///
286    /// This [Batch] represents potentially large amounts of data, which may
287    /// have partly or entirely been spilled to s3. This call bulk edits the
288    /// timestamps of all data in this batch in a metadata-only operation (i.e.
289    /// without network calls).
290    ///
291    /// Specifically, every timestamp in the batch is logically advanced_by the
292    /// provided `frontier`.
293    ///
294    /// This method may be called multiple times, with later calls overriding
295    /// previous ones, but the rewrite frontier may not regress across calls.
296    ///
297    /// When this batch was created, it was given an `upper`, which bounds the
298    /// staged data it represents. To allow rewrite past this original `upper`,
299    /// this call accepts a new `upper` which replaces the previous one. Like
300    /// the rewrite frontier, the upper may not regress across calls.
301    ///
302    /// Multiple batches with various rewrite frontiers may be used in a single
303    /// [crate::write::WriteHandle::compare_and_append_batch] call. This is an
304    /// expected usage.
305    ///
306    /// This feature requires that the timestamp impls `TotalOrder`. This is
307    /// because we need to be able to verify that the contained data, after the
308    /// rewrite forward operation, still respects the new upper. It turns out
309    /// that, given the metadata persist currently collects during batch
310    /// collection, this is possible for totally ordered times, but it's known
311    /// to be _not possible_ for partially ordered times. It is believed that we
312    /// could fix this by collecting different metadata in batch creation (e.g.
313    /// the join of or an antichain of the original contained timestamps), but
314    /// the experience of database-issues#7825 has shaken our confidence in our own abilities
315    /// to reason about partially ordered times and anyway all the initial uses
316    /// have totally ordered times.
317    pub fn rewrite_ts(
318        &mut self,
319        frontier: &Antichain<T>,
320        new_upper: Antichain<T>,
321    ) -> Result<(), InvalidUsage<T>> {
322        self.batch
323            .rewrite_ts(frontier, new_upper)
324            .map_err(InvalidUsage::InvalidRewrite)
325    }
326}
327
328/// Indicates what work was done in a call to [BatchBuilder::add]
329#[derive(Debug)]
330pub enum Added {
331    /// A record was inserted into a pending batch part
332    Record,
333    /// A record was inserted into a pending batch part
334    /// and the part was sent to blob storage
335    RecordAndParts,
336}
337
338/// A snapshot of dynamic configs to make it easier to reason about an individual
339/// run of BatchBuilder.
340#[derive(Debug, Clone)]
341pub struct BatchBuilderConfig {
342    writer_key: WriterKey,
343    pub(crate) blob_target_size: usize,
344    pub(crate) batch_delete_enabled: bool,
345    pub(crate) batch_builder_max_outstanding_parts: usize,
346    pub(crate) inline_writes_single_max_bytes: usize,
347    pub(crate) stats_collection_enabled: bool,
348    pub(crate) stats_budget: usize,
349    pub(crate) stats_untrimmable_columns: Arc<UntrimmableColumns>,
350    pub(crate) encoding_config: EncodingConfig,
351    pub(crate) preferred_order: RunOrder,
352    pub(crate) structured_key_lower_len: usize,
353    pub(crate) run_length_limit: usize,
354    /// The number of runs to cap the built batch at, or None if we should
355    /// continue to generate one run per part for unordered batches.
356    /// See the config definition for details.
357    pub(crate) max_runs: Option<usize>,
358}
359
360// TODO: Remove this once we're comfortable that there aren't any bugs.
361pub(crate) const BATCH_DELETE_ENABLED: Config<bool> = Config::new(
362    "persist_batch_delete_enabled",
363    true,
364    "Whether to actually delete blobs when batch delete is called (Materialize).",
365);
366
367pub(crate) const ENCODING_ENABLE_DICTIONARY: Config<bool> = Config::new(
368    "persist_encoding_enable_dictionary",
369    false,
370    "A feature flag to enable dictionary encoding for Parquet data (Materialize).",
371);
372
373pub(crate) const ENCODING_COMPRESSION_FORMAT: Config<&'static str> = Config::new(
374    "persist_encoding_compression_format",
375    "none",
376    "A feature flag to enable compression of Parquet data (Materialize).",
377);
378
379pub(crate) const STRUCTURED_KEY_LOWER_LEN: Config<usize> = Config::new(
380    "persist_batch_structured_key_lower_len",
381    256,
382    "The maximum size in proto bytes of any structured key-lower metadata to preserve. \
383    (If we're unable to fit the lower in budget, or the budget is zero, no metadata is kept.)",
384);
385
386pub(crate) const MAX_RUN_LEN: Config<usize> = Config::new(
387    "persist_batch_max_run_len",
388    usize::MAX,
389    "The maximum length a run can have before it will be spilled as a hollow run \
390    into the blob store.",
391);
392
393pub(crate) const MAX_RUNS: Config<usize> = Config::new(
394    "persist_batch_max_runs",
395    1,
396    "The maximum number of runs a batch builder should generate for user batches. \
397    (Compaction outputs always generate a single run.) \
398    The minimum value is 2; below this, compaction is disabled.",
399);
400
401/// A target maximum size of blob payloads in bytes. If a logical "batch" is
402/// bigger than this, it will be broken up into smaller, independent pieces.
403/// This is best-effort, not a guarantee (though as of 2022-06-09, we happen to
404/// always respect it). This target size doesn't apply for an individual update
405/// that exceeds it in size, but that scenario is almost certainly a mis-use of
406/// the system.
407pub(crate) const BLOB_TARGET_SIZE: Config<usize> = Config::new(
408    "persist_blob_target_size",
409    128 * MiB,
410    "A target maximum size of persist blob payloads in bytes (Materialize).",
411);
412
413pub(crate) const INLINE_WRITES_SINGLE_MAX_BYTES: Config<usize> = Config::new(
414    "persist_inline_writes_single_max_bytes",
415    4096,
416    "The (exclusive) maximum size of a write that persist will inline in metadata.",
417);
418
419pub(crate) const INLINE_WRITES_TOTAL_MAX_BYTES: Config<usize> = Config::new(
420    "persist_inline_writes_total_max_bytes",
421    1 * MiB,
422    "\
423    The (exclusive) maximum total size of inline writes in metadata before \
424    persist will backpressure them by flushing out to s3.",
425);
426
427impl BatchBuilderConfig {
428    /// Initialize a batch builder config based on a snapshot of the Persist config.
429    pub fn new(value: &PersistConfig, _shard_id: ShardId) -> Self {
430        let writer_key = WriterKey::for_version(&value.build_version);
431
432        let preferred_order = RunOrder::Structured;
433
434        BatchBuilderConfig {
435            writer_key,
436            blob_target_size: BLOB_TARGET_SIZE.get(value).clamp(1, usize::MAX),
437            batch_delete_enabled: BATCH_DELETE_ENABLED.get(value),
438            batch_builder_max_outstanding_parts: BATCH_BUILDER_MAX_OUTSTANDING_PARTS.get(value),
439            inline_writes_single_max_bytes: INLINE_WRITES_SINGLE_MAX_BYTES.get(value),
440            stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value),
441            stats_budget: STATS_BUDGET_BYTES.get(value),
442            stats_untrimmable_columns: Arc::new(untrimmable_columns(value)),
443            encoding_config: EncodingConfig {
444                use_dictionary: ENCODING_ENABLE_DICTIONARY.get(value),
445                compression: CompressionFormat::from_str(&ENCODING_COMPRESSION_FORMAT.get(value)),
446            },
447            preferred_order,
448            structured_key_lower_len: STRUCTURED_KEY_LOWER_LEN.get(value),
449            run_length_limit: MAX_RUN_LEN.get(value).clamp(2, usize::MAX),
450            max_runs: match MAX_RUNS.get(value) {
451                limit @ 2.. => Some(limit),
452                _ => None,
453            },
454        }
455    }
456}
457
458/// A list of (lowercase) column names that persist will always retain
459/// stats for, even if it means going over the stats budget.
460#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Arbitrary)]
461pub(crate) struct UntrimmableColumns {
462    /// Always retain columns whose lowercased names exactly equal any of these strings.
463    pub equals: Vec<Cow<'static, str>>,
464    /// Always retain columns whose lowercased names start with any of these strings.
465    pub prefixes: Vec<Cow<'static, str>>,
466    /// Always retain columns whose lowercased names end with any of these strings.
467    pub suffixes: Vec<Cow<'static, str>>,
468}
469
470impl UntrimmableColumns {
471    pub(crate) fn should_retain(&self, name: &str) -> bool {
472        // TODO: see if there's a better way to match different formats than lowercasing
473        // https://github.com/MaterializeInc/database-issues/issues/6421#issue-1863623805
474        let name_lower = name.to_lowercase();
475        for s in &self.equals {
476            if *s == name_lower {
477                return true;
478            }
479        }
480        for s in &self.prefixes {
481            if name_lower.starts_with(s.as_ref()) {
482                return true;
483            }
484        }
485        for s in &self.suffixes {
486            if name_lower.ends_with(s.as_ref()) {
487                return true;
488            }
489        }
490        false
491    }
492}
493
494/// A builder for [Batches](Batch) that allows adding updates piece by piece and
495/// then finishing it.
496#[derive(Debug)]
497pub struct BatchBuilder<K, V, T, D>
498where
499    K: Codec,
500    V: Codec,
501    T: Timestamp + Lattice + Codec64,
502{
503    inline_desc: Description<T>,
504    inclusive_upper: Antichain<Reverse<T>>,
505
506    // Reusable buffers for encoding data. Should be cleared after use!
507    pub(crate) key_buf: Vec<u8>,
508    pub(crate) val_buf: Vec<u8>,
509
510    records_builder: PartBuilder<K, K::Schema, V, V::Schema>,
511    pub(crate) builder: BatchBuilderInternal<K, V, T, D>,
512}
513
514impl<K, V, T, D> BatchBuilder<K, V, T, D>
515where
516    K: Debug + Codec,
517    V: Debug + Codec,
518    T: Timestamp + Lattice + Codec64,
519    D: Semigroup + Codec64,
520{
521    pub(crate) fn new(
522        builder: BatchBuilderInternal<K, V, T, D>,
523        inline_desc: Description<T>,
524    ) -> Self {
525        let records_builder = PartBuilder::new(
526            builder.write_schemas.key.as_ref(),
527            builder.write_schemas.val.as_ref(),
528        );
529        Self {
530            inline_desc,
531            inclusive_upper: Antichain::new(),
532            key_buf: vec![],
533            val_buf: vec![],
534            records_builder,
535            builder,
536        }
537    }
538
539    /// Finish writing this batch and return a handle to the written batch.
540    ///
541    /// This fails if any of the updates in this batch are beyond the given
542    /// `upper`.
543    pub async fn finish(
544        mut self,
545        registered_upper: Antichain<T>,
546    ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
547        if PartialOrder::less_than(&registered_upper, self.inline_desc.lower()) {
548            return Err(InvalidUsage::InvalidBounds {
549                lower: self.inline_desc.lower().clone(),
550                upper: registered_upper,
551            });
552        }
553
554        // When since is less than or equal to lower, the upper is a strict bound
555        // on the updates' timestamp because no advancement has been performed. Because user batches
556        // are always unadvanced, this ensures that new updates are recorded with valid timestamps.
557        // Otherwise, we can make no assumptions about the timestamps
558        if PartialOrder::less_equal(self.inline_desc.since(), self.inline_desc.lower()) {
559            for ts in self.inclusive_upper.iter() {
560                if registered_upper.less_equal(&ts.0) {
561                    return Err(InvalidUsage::UpdateBeyondUpper {
562                        ts: ts.0.clone(),
563                        expected_upper: registered_upper.clone(),
564                    });
565                }
566            }
567        }
568
569        let updates = self.records_builder.finish();
570        self.builder
571            .flush_part(self.inline_desc.clone(), updates)
572            .await;
573
574        self.builder
575            .finish(Description::new(
576                self.inline_desc.lower().clone(),
577                registered_upper,
578                self.inline_desc.since().clone(),
579            ))
580            .await
581    }
582
583    /// Adds the given update to the batch.
584    ///
585    /// The update timestamp must be greater or equal to `lower` that was given
586    /// when creating this [BatchBuilder].
587    pub async fn add(
588        &mut self,
589        key: &K,
590        val: &V,
591        ts: &T,
592        diff: &D,
593    ) -> Result<Added, InvalidUsage<T>> {
594        if !self.inline_desc.lower().less_equal(ts) {
595            return Err(InvalidUsage::UpdateNotBeyondLower {
596                ts: ts.clone(),
597                lower: self.inline_desc.lower().clone(),
598            });
599        }
600        self.inclusive_upper.insert(Reverse(ts.clone()));
601
602        let added = {
603            self.records_builder
604                .push(key, val, ts.clone(), diff.clone());
605            if self.records_builder.goodbytes() >= self.builder.parts.cfg.blob_target_size {
606                let part = self.records_builder.finish_and_replace(
607                    self.builder.write_schemas.key.as_ref(),
608                    self.builder.write_schemas.val.as_ref(),
609                );
610                Some(part)
611            } else {
612                None
613            }
614        };
615
616        let added = if let Some(full_batch) = added {
617            self.builder
618                .flush_part(self.inline_desc.clone(), full_batch)
619                .await;
620            Added::RecordAndParts
621        } else {
622            Added::Record
623        };
624        self.key_buf.clear();
625        self.val_buf.clear();
626        Ok(added)
627    }
628}
629
630#[derive(Debug)]
631pub(crate) struct BatchBuilderInternal<K, V, T, D>
632where
633    K: Codec,
634    V: Codec,
635    T: Timestamp + Lattice + Codec64,
636{
637    shard_id: ShardId,
638    version: Version,
639    blob: Arc<dyn Blob>,
640    metrics: Arc<Metrics>,
641
642    write_schemas: Schemas<K, V>,
643    num_updates: usize,
644    parts: BatchParts<T>,
645
646    // These provide a bit more safety against appending a batch with the wrong
647    // type to a shard.
648    _phantom: PhantomData<fn(K, V, T, D)>,
649}
650
651impl<K, V, T, D> BatchBuilderInternal<K, V, T, D>
652where
653    K: Debug + Codec,
654    V: Debug + Codec,
655    T: Timestamp + Lattice + Codec64,
656    D: Semigroup + Codec64,
657{
658    pub(crate) fn new(
659        _cfg: BatchBuilderConfig,
660        parts: BatchParts<T>,
661        metrics: Arc<Metrics>,
662        write_schemas: Schemas<K, V>,
663        blob: Arc<dyn Blob>,
664        shard_id: ShardId,
665        version: Version,
666    ) -> Self {
667        Self {
668            blob,
669            metrics,
670            write_schemas,
671            num_updates: 0,
672            parts,
673            shard_id,
674            version,
675            _phantom: PhantomData,
676        }
677    }
678
679    /// Finish writing this batch and return a handle to the written batch.
680    ///
681    /// This fails if any of the updates in this batch are beyond the given
682    /// `upper`.
683    #[instrument(level = "debug", name = "batch::finish", fields(shard = %self.shard_id))]
684    pub async fn finish(
685        self,
686        registered_desc: Description<T>,
687    ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
688        let batch_delete_enabled = self.parts.cfg.batch_delete_enabled;
689        let shard_metrics = Arc::clone(&self.parts.shard_metrics);
690        let runs = self.parts.finish().await;
691
692        let mut run_parts = vec![];
693        let mut run_splits = vec![];
694        let mut run_meta = vec![];
695        for (order, parts) in runs {
696            if parts.is_empty() {
697                continue;
698            }
699            if run_parts.len() != 0 {
700                run_splits.push(run_parts.len());
701            }
702            run_meta.push(RunMeta {
703                order: Some(order),
704                schema: self.write_schemas.id,
705                // Field has been deprecated but kept around to roundtrip state.
706                deprecated_schema: None,
707            });
708            run_parts.extend(parts);
709        }
710        let desc = registered_desc;
711
712        let batch = Batch::new(
713            batch_delete_enabled,
714            Arc::clone(&self.metrics),
715            self.blob,
716            shard_metrics,
717            self.version,
718            HollowBatch::new(desc, run_parts, self.num_updates, run_meta, run_splits),
719        );
720
721        Ok(batch)
722    }
723
724    /// Flushes the current part to Blob storage, first consolidating and then
725    /// columnar encoding the updates. It is the caller's responsibility to
726    /// chunk `current_part` to be no greater than
727    /// [BatchBuilderConfig::blob_target_size], and must absolutely be less than
728    /// [mz_persist::indexed::columnar::KEY_VAL_DATA_MAX_LEN]
729    pub async fn flush_part(&mut self, part_desc: Description<T>, columnar: Part) {
730        let num_updates = columnar.len();
731        if num_updates == 0 {
732            return;
733        }
734        let diffs_sum = diffs_sum::<D>(&columnar.diff).expect("part is non empty");
735
736        let start = Instant::now();
737        self.parts
738            .write(&self.write_schemas, part_desc, columnar, diffs_sum)
739            .await;
740        self.metrics
741            .compaction
742            .batch
743            .step_part_writing
744            .inc_by(start.elapsed().as_secs_f64());
745
746        self.num_updates += num_updates;
747    }
748}
749
750#[derive(Debug)]
751enum WritingRuns<T> {
752    /// Building a single run with the specified ordering. Parts are expected to be internally
753    /// sorted and added in order. Merging a vec of parts will shift them out to a hollow run
754    /// in blob, bounding the total length of a run in memory.
755    Ordered(RunOrder, MergeTree<Pending<RunPart<T>>>),
756    /// Building multiple runs which may have different orders. Merging a vec of runs will cause
757    /// them to be compacted together, bounding the total number of runs we generate.
758    Compacting(MergeTree<(RunOrder, Pending<Vec<RunPart<T>>>)>),
759}
760
761// TODO: If this is dropped, cancel (and delete?) any writing parts and delete
762// any finished ones.
763#[derive(Debug)]
764pub(crate) struct BatchParts<T> {
765    cfg: BatchBuilderConfig,
766    metrics: Arc<Metrics>,
767    shard_metrics: Arc<ShardMetrics>,
768    shard_id: ShardId,
769    blob: Arc<dyn Blob>,
770    isolated_runtime: Arc<IsolatedRuntime>,
771    next_index: u64,
772    writing_runs: WritingRuns<T>,
773    batch_metrics: BatchWriteMetrics,
774}
775
776impl<T: Timestamp + Codec64> BatchParts<T> {
777    pub(crate) fn new_compacting<K, V, D>(
778        cfg: CompactConfig,
779        desc: Description<T>,
780        runs_per_compaction: usize,
781        metrics: Arc<Metrics>,
782        shard_metrics: Arc<ShardMetrics>,
783        shard_id: ShardId,
784        blob: Arc<dyn Blob>,
785        isolated_runtime: Arc<IsolatedRuntime>,
786        batch_metrics: &BatchWriteMetrics,
787        schemas: Schemas<K, V>,
788    ) -> Self
789    where
790        K: Codec + Debug,
791        V: Codec + Debug,
792        T: Lattice + Send + Sync,
793        D: Semigroup + Ord + Codec64 + Send + Sync,
794    {
795        let writing_runs = {
796            let cfg = cfg.clone();
797            let blob = Arc::clone(&blob);
798            let metrics = Arc::clone(&metrics);
799            let shard_metrics = Arc::clone(&shard_metrics);
800            let isolated_runtime = Arc::clone(&isolated_runtime);
801            // Clamping to prevent extreme values given weird configs.
802            let runs_per_compaction = runs_per_compaction.clamp(2, 1024);
803
804            let merge_fn = move |parts: Vec<(RunOrder, Pending<Vec<RunPart<T>>>)>| {
805                let blob = Arc::clone(&blob);
806                let metrics = Arc::clone(&metrics);
807                let shard_metrics = Arc::clone(&shard_metrics);
808                let cfg = cfg.clone();
809                let isolated_runtime = Arc::clone(&isolated_runtime);
810                let write_schemas = schemas.clone();
811                let compact_desc = desc.clone();
812                let handle = mz_ore::task::spawn(
813                    || "batch::compact_runs",
814                    async move {
815                        let runs: Vec<_> = stream::iter(parts)
816                            .then(|(order, parts)| async move {
817                                (
818                                    RunMeta {
819                                        order: Some(order),
820                                        schema: schemas.id,
821                                        // Field has been deprecated but kept around to
822                                        // roundtrip state.
823                                        deprecated_schema: None,
824                                    },
825                                    parts.into_result().await,
826                                )
827                            })
828                            .collect()
829                            .await;
830
831                        let run_refs: Vec<_> = runs
832                            .iter()
833                            .map(|(meta, run)| (&compact_desc, meta, run.as_slice()))
834                            .collect();
835
836                        let output_batch = Compactor::<K, V, T, D>::compact_runs(
837                            &cfg,
838                            &shard_id,
839                            &compact_desc,
840                            run_refs,
841                            blob,
842                            metrics,
843                            shard_metrics,
844                            isolated_runtime,
845                            write_schemas,
846                        )
847                        .await
848                        .expect("successful compaction");
849
850                        assert_eq!(
851                            output_batch.run_meta.len(),
852                            1,
853                            "compaction is guaranteed to emit a single run"
854                        );
855                        output_batch.parts
856                    }
857                    .instrument(debug_span!("batch::compact_runs")),
858                );
859                (RunOrder::Structured, Pending::new(handle))
860            };
861            WritingRuns::Compacting(MergeTree::new(runs_per_compaction, merge_fn))
862        };
863        BatchParts {
864            cfg: cfg.batch,
865            metrics,
866            shard_metrics,
867            shard_id,
868            blob,
869            isolated_runtime,
870            next_index: 0,
871            writing_runs,
872            batch_metrics: batch_metrics.clone(),
873        }
874    }
875
876    pub(crate) fn new_ordered(
877        cfg: BatchBuilderConfig,
878        order: RunOrder,
879        metrics: Arc<Metrics>,
880        shard_metrics: Arc<ShardMetrics>,
881        shard_id: ShardId,
882        blob: Arc<dyn Blob>,
883        isolated_runtime: Arc<IsolatedRuntime>,
884        batch_metrics: &BatchWriteMetrics,
885    ) -> Self {
886        let writing_runs = {
887            let cfg = cfg.clone();
888            let blob = Arc::clone(&blob);
889            let metrics = Arc::clone(&metrics);
890            let writer_key = cfg.writer_key.clone();
891            // Don't spill "unordered" runs to S3, since we'll split them up into many single-element
892            // runs below.
893            let run_length_limit = (order == RunOrder::Unordered)
894                .then_some(usize::MAX)
895                .unwrap_or(cfg.run_length_limit);
896            let merge_fn = move |parts| {
897                let blob = Arc::clone(&blob);
898                let writer_key = writer_key.clone();
899                let metrics = Arc::clone(&metrics);
900                let handle = mz_ore::task::spawn(
901                    || "batch::spill_run",
902                    async move {
903                        let parts = stream::iter(parts)
904                            .then(|p: Pending<RunPart<T>>| p.into_result())
905                            .collect()
906                            .await;
907                        let run_ref = HollowRunRef::set(
908                            shard_id,
909                            blob.as_ref(),
910                            &writer_key,
911                            HollowRun { parts },
912                            &*metrics,
913                        )
914                        .await;
915
916                        RunPart::Many(run_ref)
917                    }
918                    .instrument(debug_span!("batch::spill_run")),
919                );
920                Pending::new(handle)
921            };
922            WritingRuns::Ordered(order, MergeTree::new(run_length_limit, merge_fn))
923        };
924        BatchParts {
925            cfg,
926            metrics,
927            shard_metrics,
928            shard_id,
929            blob,
930            isolated_runtime,
931            next_index: 0,
932            writing_runs,
933            batch_metrics: batch_metrics.clone(),
934        }
935    }
936
937    pub(crate) fn expected_order(&self) -> RunOrder {
938        match self.writing_runs {
939            WritingRuns::Ordered(order, _) => order,
940            WritingRuns::Compacting(_) => RunOrder::Unordered,
941        }
942    }
943
944    pub(crate) async fn write<K: Codec, V: Codec, D: Codec64>(
945        &mut self,
946        write_schemas: &Schemas<K, V>,
947        desc: Description<T>,
948        updates: Part,
949        diffs_sum: D,
950    ) {
951        let batch_metrics = self.batch_metrics.clone();
952        let index = self.next_index;
953        self.next_index += 1;
954        let ts_rewrite = None;
955        let schema_id = write_schemas.id;
956
957        // If we're going to encode structured data then halve our limit since we're storing
958        // it twice, once as binary encoded and once as structured.
959        let inline_threshold = self.cfg.inline_writes_single_max_bytes;
960
961        let updates = BlobTraceUpdates::from_part(updates);
962        let (name, write_future) = if updates.goodbytes() < inline_threshold {
963            let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current();
964            (
965                "batch::inline_part",
966                async move {
967                    let start = Instant::now();
968                    let updates = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
969                        desc: Some(desc.into_proto()),
970                        index: index.into_proto(),
971                        updates: Some(updates.into_proto()),
972                    });
973                    batch_metrics
974                        .step_inline
975                        .inc_by(start.elapsed().as_secs_f64());
976
977                    RunPart::Single(BatchPart::Inline {
978                        updates,
979                        ts_rewrite,
980                        schema_id,
981                        // Field has been deprecated but kept around to roundtrip state.
982                        deprecated_schema_id: None,
983                    })
984                }
985                .instrument(span)
986                .boxed(),
987            )
988        } else {
989            let part = BlobTraceBatchPart {
990                desc,
991                updates,
992                index,
993            };
994            let write_span =
995                debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current();
996            (
997                "batch::write_part",
998                BatchParts::write_hollow_part(
999                    self.cfg.clone(),
1000                    Arc::clone(&self.blob),
1001                    Arc::clone(&self.metrics),
1002                    Arc::clone(&self.shard_metrics),
1003                    batch_metrics.clone(),
1004                    Arc::clone(&self.isolated_runtime),
1005                    part,
1006                    self.expected_order(),
1007                    ts_rewrite,
1008                    D::encode(&diffs_sum),
1009                    write_schemas.clone(),
1010                )
1011                .map(RunPart::Single)
1012                .instrument(write_span)
1013                .boxed(),
1014            )
1015        };
1016
1017        match &mut self.writing_runs {
1018            WritingRuns::Ordered(_order, run) => {
1019                let part = Pending::new(mz_ore::task::spawn(|| name, write_future));
1020                run.push(part);
1021
1022                // If there are more than the max outstanding parts, block on all but the
1023                //  most recent.
1024                for part in run
1025                    .iter_mut()
1026                    .rev()
1027                    .skip(self.cfg.batch_builder_max_outstanding_parts)
1028                    .take_while(|p| !p.is_finished())
1029                {
1030                    self.batch_metrics.write_stalls.inc();
1031                    part.block_until_ready().await;
1032                }
1033            }
1034            WritingRuns::Compacting(batches) => {
1035                let run = Pending::Writing(mz_ore::task::spawn(|| name, async move {
1036                    vec![write_future.await]
1037                }));
1038                batches.push((RunOrder::Unordered, run));
1039
1040                // Allow up to `max_outstanding_parts` (or one compaction) to be pending, and block
1041                // on the rest.
1042                let mut part_budget = self.cfg.batch_builder_max_outstanding_parts;
1043                let mut compaction_budget = 1;
1044                for (_, part) in batches
1045                    .iter_mut()
1046                    .rev()
1047                    .skip_while(|(order, _)| match order {
1048                        RunOrder::Unordered if part_budget > 0 => {
1049                            part_budget -= 1;
1050                            true
1051                        }
1052                        RunOrder::Structured | RunOrder::Codec if compaction_budget > 0 => {
1053                            compaction_budget -= 1;
1054                            true
1055                        }
1056                        _ => false,
1057                    })
1058                    .take_while(|(_, p)| !p.is_finished())
1059                {
1060                    self.batch_metrics.write_stalls.inc();
1061                    part.block_until_ready().await;
1062                }
1063            }
1064        }
1065    }
1066
1067    async fn write_hollow_part<K: Codec, V: Codec>(
1068        cfg: BatchBuilderConfig,
1069        blob: Arc<dyn Blob>,
1070        metrics: Arc<Metrics>,
1071        shard_metrics: Arc<ShardMetrics>,
1072        batch_metrics: BatchWriteMetrics,
1073        isolated_runtime: Arc<IsolatedRuntime>,
1074        mut updates: BlobTraceBatchPart<T>,
1075        run_order: RunOrder,
1076        ts_rewrite: Option<Antichain<T>>,
1077        diffs_sum: [u8; 8],
1078        write_schemas: Schemas<K, V>,
1079    ) -> BatchPart<T> {
1080        let partial_key = PartialBatchKey::new(&cfg.writer_key, &PartId::new());
1081        let key = partial_key.complete(&shard_metrics.shard_id);
1082        let goodbytes = updates.updates.goodbytes();
1083        let metrics_ = Arc::clone(&metrics);
1084        let schema_id = write_schemas.id;
1085
1086        let (stats, key_lower, structured_key_lower, (buf, encode_time)) = isolated_runtime
1087            .spawn_named(|| "batch::encode_part", async move {
1088                // Measure the expensive steps of the part build - re-encoding and stats collection.
1089                let stats = metrics_.columnar.arrow().measure_part_build(|| {
1090                    let stats = if cfg.stats_collection_enabled {
1091                        let ext = updates.updates.get_or_make_structured::<K, V>(
1092                            write_schemas.key.as_ref(),
1093                            write_schemas.val.as_ref(),
1094                        );
1095
1096                        let key_stats = write_schemas
1097                            .key
1098                            .decoder_any(ext.key.as_ref())
1099                            .expect("decoding just-encoded data")
1100                            .stats();
1101
1102                        let part_stats = PartStats { key: key_stats };
1103
1104                        // Collect stats about the updates, if stats collection is enabled.
1105                        let trimmed_start = Instant::now();
1106                        let mut trimmed_bytes = 0;
1107                        let trimmed_stats = LazyPartStats::encode(&part_stats, |s| {
1108                            trimmed_bytes = trim_to_budget(s, cfg.stats_budget, |s| {
1109                                cfg.stats_untrimmable_columns.should_retain(s)
1110                            })
1111                        });
1112                        let trimmed_duration = trimmed_start.elapsed();
1113                        Some((trimmed_stats, trimmed_duration, trimmed_bytes))
1114                    } else {
1115                        None
1116                    };
1117
1118                    // Ensure the updates are in the specified columnar format before encoding.
1119                    updates.updates = updates.updates.as_structured::<K, V>(
1120                        write_schemas.key.as_ref(),
1121                        write_schemas.val.as_ref(),
1122                    );
1123
1124                    stats
1125                });
1126
1127                let key_lower = if let Some(records) = updates.updates.records() {
1128                    let key_bytes = records.keys();
1129                    if key_bytes.is_empty() {
1130                        &[]
1131                    } else if run_order == RunOrder::Codec {
1132                        key_bytes.value(0)
1133                    } else {
1134                        ::arrow::compute::min_binary(key_bytes).expect("min of nonempty array")
1135                    }
1136                } else {
1137                    &[]
1138                };
1139                let key_lower = truncate_bytes(key_lower, TRUNCATE_LEN, TruncateBound::Lower)
1140                    .expect("lower bound always exists");
1141
1142                let structured_key_lower = if cfg.structured_key_lower_len > 0 {
1143                    updates.updates.structured().and_then(|ext| {
1144                        let min_key = if run_order == RunOrder::Structured {
1145                            0
1146                        } else {
1147                            let ord = ArrayOrd::new(ext.key.as_ref());
1148                            (0..ext.key.len())
1149                                .min_by_key(|i| ord.at(*i))
1150                                .expect("non-empty batch")
1151                        };
1152                        let lower = ArrayBound::new(Arc::clone(&ext.key), min_key)
1153                            .to_proto_lower(cfg.structured_key_lower_len);
1154                        if lower.is_none() {
1155                            batch_metrics.key_lower_too_big.inc()
1156                        }
1157                        lower.map(|proto| LazyProto::from(&proto))
1158                    })
1159                } else {
1160                    None
1161                };
1162
1163                let encode_start = Instant::now();
1164                let mut buf = Vec::new();
1165                updates.encode(&mut buf, &metrics_.columnar, &cfg.encoding_config);
1166
1167                // Drop batch as soon as we can to reclaim its memory.
1168                drop(updates);
1169                (
1170                    stats,
1171                    key_lower,
1172                    structured_key_lower,
1173                    (Bytes::from(buf), encode_start.elapsed()),
1174                )
1175            })
1176            .instrument(debug_span!("batch::encode_part"))
1177            .await
1178            .expect("part encode task failed");
1179        // Can't use the `CodecMetrics::encode` helper because of async.
1180        metrics.codecs.batch.encode_count.inc();
1181        metrics
1182            .codecs
1183            .batch
1184            .encode_seconds
1185            .inc_by(encode_time.as_secs_f64());
1186
1187        let start = Instant::now();
1188        let payload_len = buf.len();
1189        let () = retry_external(&metrics.retries.external.batch_set, || async {
1190            shard_metrics.blob_sets.inc();
1191            blob.set(&key, Bytes::clone(&buf)).await
1192        })
1193        .instrument(trace_span!("batch::set", payload_len))
1194        .await;
1195        batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64());
1196        batch_metrics.bytes.inc_by(u64::cast_from(payload_len));
1197        batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes));
1198        match run_order {
1199            RunOrder::Unordered => batch_metrics.unordered.inc(),
1200            RunOrder::Codec => batch_metrics.codec_order.inc(),
1201            RunOrder::Structured => batch_metrics.structured_order.inc(),
1202        }
1203        let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| {
1204            batch_metrics
1205                .step_stats
1206                .inc_by(stats_step_timing.as_secs_f64());
1207            if trimmed_bytes > 0 {
1208                metrics.pushdown.parts_stats_trimmed_count.inc();
1209                metrics
1210                    .pushdown
1211                    .parts_stats_trimmed_bytes
1212                    .inc_by(u64::cast_from(trimmed_bytes));
1213            }
1214            stats
1215        });
1216
1217        BatchPart::Hollow(HollowBatchPart {
1218            key: partial_key,
1219            encoded_size_bytes: payload_len,
1220            key_lower,
1221            structured_key_lower,
1222            stats,
1223            ts_rewrite,
1224            diffs_sum: Some(diffs_sum),
1225            format: Some(BatchColumnarFormat::Structured),
1226            schema_id,
1227            // Field has been deprecated but kept around to roundtrip state.
1228            deprecated_schema_id: None,
1229        })
1230    }
1231
1232    #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))]
1233    pub(crate) async fn finish(self) -> Vec<(RunOrder, Vec<RunPart<T>>)> {
1234        match self.writing_runs {
1235            WritingRuns::Ordered(RunOrder::Unordered, run) => {
1236                let parts = run.finish();
1237                let mut output = Vec::with_capacity(parts.len());
1238                for part in parts {
1239                    output.push((RunOrder::Unordered, vec![part.into_result().await]));
1240                }
1241                output
1242            }
1243            WritingRuns::Ordered(order, run) => {
1244                let parts = run.finish();
1245                let mut output = Vec::with_capacity(parts.len());
1246                for part in parts {
1247                    output.push(part.into_result().await);
1248                }
1249                vec![(order, output)]
1250            }
1251            WritingRuns::Compacting(batches) => {
1252                let runs = batches.finish();
1253                let mut output = Vec::with_capacity(runs.len());
1254                for (order, run) in runs {
1255                    let run = run.into_result().await;
1256                    output.push((order, run))
1257                }
1258                output
1259            }
1260        }
1261    }
1262}
1263
1264pub(crate) fn validate_truncate_batch<T: Timestamp>(
1265    batch: &HollowBatch<T>,
1266    truncate: &Description<T>,
1267    any_batch_rewrite: bool,
1268) -> Result<(), InvalidUsage<T>> {
1269    // If rewrite_ts is used, we don't allow truncation, to keep things simpler
1270    // to reason about.
1271    if any_batch_rewrite {
1272        // We allow a new upper to be specified at rewrite time, so that's easy:
1273        // it must match exactly. This is both consistent with the upper
1274        // requirement below and proves that there is no data to truncate past
1275        // the upper.
1276        if truncate.upper() != batch.desc.upper() {
1277            return Err(InvalidUsage::InvalidRewrite(format!(
1278                "rewritten batch might have data past {:?} up to {:?}",
1279                truncate.upper().elements(),
1280                batch.desc.upper().elements(),
1281            )));
1282        }
1283        // To prove that there is no data to truncate below the lower, require
1284        // that the lower is <= the rewrite ts.
1285        for part in batch.parts.iter() {
1286            let part_lower_bound = part.ts_rewrite().unwrap_or_else(|| batch.desc.lower());
1287            if !PartialOrder::less_equal(truncate.lower(), part_lower_bound) {
1288                return Err(InvalidUsage::InvalidRewrite(format!(
1289                    "rewritten batch might have data below {:?} at {:?}",
1290                    truncate.lower().elements(),
1291                    part_lower_bound.elements(),
1292                )));
1293            }
1294        }
1295    }
1296
1297    let batch = &batch.desc;
1298    if !PartialOrder::less_equal(batch.lower(), truncate.lower())
1299        || PartialOrder::less_than(batch.upper(), truncate.upper())
1300    {
1301        return Err(InvalidUsage::InvalidBatchBounds {
1302            batch_lower: batch.lower().clone(),
1303            batch_upper: batch.upper().clone(),
1304            append_lower: truncate.lower().clone(),
1305            append_upper: truncate.upper().clone(),
1306        });
1307    }
1308    Ok(())
1309}
1310
1311#[derive(Debug)]
1312pub(crate) struct PartDeletes<T> {
1313    /// Keys to hollow parts or runs that we're ready to delete.
1314    blob_keys: BTreeSet<PartialBatchKey>,
1315    /// Keys to hollow runs that may not have had all their parts deleted (or added to blob_keys) yet.
1316    hollow_runs: BTreeMap<PartialBatchKey, HollowRunRef<T>>,
1317}
1318
1319impl<T> Default for PartDeletes<T> {
1320    fn default() -> Self {
1321        Self {
1322            blob_keys: Default::default(),
1323            hollow_runs: Default::default(),
1324        }
1325    }
1326}
1327
1328impl<T: Timestamp> PartDeletes<T> {
1329    // Adds the part to the set to be deleted and returns true if it was newly
1330    // inserted.
1331    pub fn add(&mut self, part: &RunPart<T>) -> bool {
1332        match part {
1333            RunPart::Many(r) => self.hollow_runs.insert(r.key.clone(), r.clone()).is_none(),
1334            RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.insert(x.key.clone()),
1335            RunPart::Single(BatchPart::Inline { .. }) => {
1336                // Nothing to delete.
1337                true
1338            }
1339        }
1340    }
1341
1342    pub fn contains(&self, part: &RunPart<T>) -> bool {
1343        match part {
1344            RunPart::Many(r) => self.hollow_runs.contains_key(&r.key),
1345            RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.contains(&x.key),
1346            RunPart::Single(BatchPart::Inline { .. }) => false,
1347        }
1348    }
1349
1350    pub fn is_empty(&self) -> bool {
1351        self.len() == 0
1352    }
1353
1354    pub fn len(&self) -> usize {
1355        match self {
1356            Self {
1357                blob_keys,
1358                hollow_runs,
1359            } => blob_keys.len() + hollow_runs.len(),
1360        }
1361    }
1362
1363    pub async fn delete(
1364        mut self,
1365        blob: &dyn Blob,
1366        shard_id: ShardId,
1367        concurrency: usize,
1368        metrics: &Metrics,
1369        delete_metrics: &RetryMetrics,
1370    ) where
1371        T: Codec64,
1372    {
1373        loop {
1374            let () = stream::iter(mem::take(&mut self.blob_keys))
1375                .map(|key| {
1376                    let key = key.complete(&shard_id);
1377                    async move {
1378                        retry_external(delete_metrics, || blob.delete(&key)).await;
1379                    }
1380                })
1381                .buffer_unordered(concurrency)
1382                .collect()
1383                .await;
1384
1385            let Some((run_key, run_ref)) = self.hollow_runs.pop_first() else {
1386                break;
1387            };
1388
1389            if let Some(run) = run_ref.get(shard_id, blob, metrics).await {
1390                // Queue up both all the individual parts and the run itself for deletion.
1391                for part in &run.parts {
1392                    self.add(part);
1393                }
1394                self.blob_keys.insert(run_key);
1395            };
1396        }
1397    }
1398}
1399
1400/// Returns the total sum of diffs or None if there were no updates.
1401fn diffs_sum<D: Semigroup + Codec64>(updates: &Int64Array) -> Option<D> {
1402    let mut sum = None;
1403    for d in updates.values().iter() {
1404        let d = D::decode(d.to_le_bytes());
1405        match &mut sum {
1406            None => sum = Some(d),
1407            Some(x) => x.plus_equals(&d),
1408        }
1409    }
1410
1411    sum
1412}
1413
1414#[cfg(test)]
1415mod tests {
1416    use mz_dyncfg::ConfigUpdates;
1417    use timely::order::Product;
1418
1419    use super::*;
1420    use crate::PersistLocation;
1421    use crate::cache::PersistClientCache;
1422    use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
1423    use crate::internal::paths::{BlobKey, PartialBlobKey};
1424    use crate::tests::{all_ok, new_test_client};
1425
1426    #[mz_ore::test(tokio::test)]
1427    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1428    async fn batch_builder_flushing() {
1429        let data = vec![
1430            (("1".to_owned(), "one".to_owned()), 1, 1),
1431            (("2".to_owned(), "two".to_owned()), 2, 1),
1432            (("3".to_owned(), "three".to_owned()), 3, 1),
1433            (("4".to_owned(), "four".to_owned()), 4, 1),
1434        ];
1435
1436        let cache = PersistClientCache::new_no_metrics();
1437
1438        // Set blob_target_size to 0 so that each row gets forced into its own
1439        // batch. Set max_outstanding to a small value that's >1 to test various
1440        // edge cases below.
1441        cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1442        cache.cfg.set_config(&MAX_RUNS, 3);
1443        cache
1444            .cfg
1445            .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 2);
1446
1447        let client = cache
1448            .open(PersistLocation::new_in_mem())
1449            .await
1450            .expect("client construction failed");
1451        let (mut write, mut read) = client
1452            .expect_open::<String, String, u64, i64>(ShardId::new())
1453            .await;
1454
1455        // A new builder has no writing or finished parts.
1456        let mut builder = write.builder(Antichain::from_elem(0));
1457
1458        fn assert_writing(
1459            builder: &BatchBuilder<String, String, u64, i64>,
1460            expected_finished: &[bool],
1461        ) {
1462            let WritingRuns::Compacting(run) = &builder.builder.parts.writing_runs else {
1463                unreachable!("ordered run!")
1464            };
1465
1466            let actual: Vec<_> = run.iter().map(|(_, p)| p.is_finished()).collect();
1467            assert_eq!(*expected_finished, actual);
1468        }
1469
1470        assert_writing(&builder, &[]);
1471
1472        // We set blob_target_size to 0, so the first update gets forced out
1473        // into a run.
1474        let ((k, v), t, d) = &data[0];
1475        builder.add(k, v, t, d).await.expect("invalid usage");
1476        assert_writing(&builder, &[false]);
1477
1478        // We set batch_builder_max_outstanding_parts to 2, so we are allowed to
1479        // pipeline a second part.
1480        let ((k, v), t, d) = &data[1];
1481        builder.add(k, v, t, d).await.expect("invalid usage");
1482        assert_writing(&builder, &[false, false]);
1483
1484        // But now that we have 3 parts, the add call back-pressures until the
1485        // first one finishes.
1486        let ((k, v), t, d) = &data[2];
1487        builder.add(k, v, t, d).await.expect("invalid usage");
1488        assert_writing(&builder, &[true, false, false]);
1489
1490        // Finally, pushing a fourth part will cause the first three to spill out into
1491        // a new compacted run.
1492        let ((k, v), t, d) = &data[3];
1493        builder.add(k, v, t, d).await.expect("invalid usage");
1494        assert_writing(&builder, &[false, false]);
1495
1496        // Finish off the batch and verify that the keys and such get plumbed
1497        // correctly by reading the data back.
1498        let batch = builder
1499            .finish(Antichain::from_elem(5))
1500            .await
1501            .expect("invalid usage");
1502        assert_eq!(batch.batch.runs().count(), 2);
1503        assert_eq!(batch.batch.part_count(), 4);
1504        write
1505            .append_batch(batch, Antichain::from_elem(0), Antichain::from_elem(5))
1506            .await
1507            .expect("invalid usage")
1508            .expect("unexpected upper");
1509        assert_eq!(read.expect_snapshot_and_fetch(4).await, all_ok(&data, 4));
1510    }
1511
1512    #[mz_ore::test(tokio::test)]
1513    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1514    async fn batch_builder_keys() {
1515        let cache = PersistClientCache::new_no_metrics();
1516        // Set blob_target_size to 0 so that each row gets forced into its own batch part
1517        cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1518        // Otherwise fails: expected hollow part!
1519        cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
1520        cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1521        cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1522        let client = cache
1523            .open(PersistLocation::new_in_mem())
1524            .await
1525            .expect("client construction failed");
1526        let shard_id = ShardId::new();
1527        let (mut write, _) = client
1528            .expect_open::<String, String, u64, i64>(shard_id)
1529            .await;
1530
1531        let batch = write
1532            .expect_batch(
1533                &[
1534                    (("1".into(), "one".into()), 1, 1),
1535                    (("2".into(), "two".into()), 2, 1),
1536                    (("3".into(), "three".into()), 3, 1),
1537                ],
1538                0,
1539                4,
1540            )
1541            .await;
1542
1543        assert_eq!(batch.batch.part_count(), 3);
1544        for part in &batch.batch.parts {
1545            let part = part.expect_hollow_part();
1546            match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
1547                Ok((shard, PartialBlobKey::Batch(writer, _))) => {
1548                    assert_eq!(shard.to_string(), shard_id.to_string());
1549                    assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
1550                }
1551                _ => panic!("unparseable blob key"),
1552            }
1553        }
1554    }
1555
1556    #[mz_ore::test(tokio::test)]
1557    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1558    async fn batch_delete() {
1559        let cache = PersistClientCache::new_no_metrics();
1560        cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1561        cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1562        cache.cfg.set_config(&BATCH_DELETE_ENABLED, true);
1563        let client = cache
1564            .open(PersistLocation::new_in_mem())
1565            .await
1566            .expect("client construction failed");
1567        let shard_id = ShardId::new();
1568        let (mut write, _) = client
1569            .expect_open::<String, String, u64, i64>(shard_id)
1570            .await;
1571
1572        let batch = write
1573            .expect_batch(
1574                &[
1575                    (("1".into(), "one".into()), 1, 1),
1576                    (("2".into(), "two".into()), 2, 1),
1577                    (("3".into(), "three".into()), 3, 1),
1578                ],
1579                0,
1580                4,
1581            )
1582            .await;
1583
1584        assert_eq!(batch.batch.part_count(), 1);
1585        let part_key = batch.batch.parts[0]
1586            .expect_hollow_part()
1587            .key
1588            .complete(&shard_id);
1589
1590        let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1591        assert!(part_bytes.is_some());
1592
1593        batch.delete().await;
1594
1595        let part_bytes = client.blob.get(&part_key).await.expect("invalid usage");
1596        assert!(part_bytes.is_none());
1597    }
1598
1599    #[mz_ore::test(tokio::test)]
1600    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1601    async fn batch_builder_partial_order() {
1602        let cache = PersistClientCache::new_no_metrics();
1603        // Set blob_target_size to 0 so that each row gets forced into its own batch part
1604        cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1605        // Otherwise fails: expected hollow part!
1606        cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
1607        cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1608        cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1609        let client = cache
1610            .open(PersistLocation::new_in_mem())
1611            .await
1612            .expect("client construction failed");
1613        let shard_id = ShardId::new();
1614        let (mut write, _) = client
1615            .expect_open::<String, String, Product<u32, u32>, i64>(shard_id)
1616            .await;
1617
1618        let batch = write
1619            .batch(
1620                &[
1621                    (("1".to_owned(), "one".to_owned()), Product::new(0, 10), 1),
1622                    (("2".to_owned(), "two".to_owned()), Product::new(10, 0), 1),
1623                ],
1624                Antichain::from_elem(Product::new(0, 0)),
1625                Antichain::from_iter([Product::new(0, 11), Product::new(10, 1)]),
1626            )
1627            .await
1628            .expect("invalid usage");
1629
1630        assert_eq!(batch.batch.part_count(), 2);
1631        for part in &batch.batch.parts {
1632            let part = part.expect_hollow_part();
1633            match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
1634                Ok((shard, PartialBlobKey::Batch(writer, _))) => {
1635                    assert_eq!(shard.to_string(), shard_id.to_string());
1636                    assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
1637                }
1638                _ => panic!("unparseable blob key"),
1639            }
1640        }
1641    }
1642
1643    #[mz_ore::test]
1644    fn untrimmable_columns() {
1645        let untrimmable = UntrimmableColumns {
1646            equals: vec!["abc".into(), "def".into()],
1647            prefixes: vec!["123".into(), "234".into()],
1648            suffixes: vec!["xyz".into()],
1649        };
1650
1651        // equals
1652        assert!(untrimmable.should_retain("abc"));
1653        assert!(untrimmable.should_retain("ABC"));
1654        assert!(untrimmable.should_retain("aBc"));
1655        assert!(!untrimmable.should_retain("abcd"));
1656        assert!(untrimmable.should_retain("deF"));
1657        assert!(!untrimmable.should_retain("defg"));
1658
1659        // prefix
1660        assert!(untrimmable.should_retain("123"));
1661        assert!(untrimmable.should_retain("123-4"));
1662        assert!(untrimmable.should_retain("1234"));
1663        assert!(untrimmable.should_retain("234"));
1664        assert!(!untrimmable.should_retain("345"));
1665
1666        // suffix
1667        assert!(untrimmable.should_retain("ijk_xyZ"));
1668        assert!(untrimmable.should_retain("ww-XYZ"));
1669        assert!(!untrimmable.should_retain("xya"));
1670    }
1671
1672    // NB: Most edge cases are exercised in datadriven tests.
1673    #[mz_persist_proc::test(tokio::test)]
1674    #[cfg_attr(miri, ignore)] // too slow
1675    async fn rewrite_ts_example(dyncfgs: ConfigUpdates) {
1676        let client = new_test_client(&dyncfgs).await;
1677        let (mut write, read) = client
1678            .expect_open::<String, (), u64, i64>(ShardId::new())
1679            .await;
1680
1681        let mut batch = write.builder(Antichain::from_elem(0));
1682        batch.add(&"foo".to_owned(), &(), &0, &1).await.unwrap();
1683        let batch = batch.finish(Antichain::from_elem(1)).await.unwrap();
1684
1685        // Roundtrip through a transmittable batch.
1686        let batch = batch.into_transmittable_batch();
1687        let mut batch = write.batch_from_transmittable_batch(batch);
1688        batch
1689            .rewrite_ts(&Antichain::from_elem(2), Antichain::from_elem(3))
1690            .unwrap();
1691        write
1692            .expect_compare_and_append_batch(&mut [&mut batch], 0, 3)
1693            .await;
1694
1695        let (actual, _) = read.expect_listen(0).await.read_until(&3).await;
1696        let expected = vec![(((Ok("foo".to_owned())), Ok(())), 2, 1)];
1697        assert_eq!(actual, expected);
1698    }
1699
1700    #[mz_ore::test(tokio::test)]
1701    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1702    async fn structured_lowers() {
1703        let cache = PersistClientCache::new_no_metrics();
1704        // Ensure structured data is calculated, and that we give some budget for a key lower.
1705        cache.cfg().set_config(&STRUCTURED_KEY_LOWER_LEN, 1024);
1706        // Otherwise fails: expected hollow part!
1707        cache.cfg().set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1708        cache.cfg().set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1709        let client = cache
1710            .open(PersistLocation::new_in_mem())
1711            .await
1712            .expect("client construction failed");
1713        let shard_id = ShardId::new();
1714        let (mut write, _) = client
1715            .expect_open::<String, String, u64, i64>(shard_id)
1716            .await;
1717
1718        let batch = write
1719            .expect_batch(
1720                &[
1721                    (("1".into(), "one".into()), 1, 1),
1722                    (("2".into(), "two".into()), 2, 1),
1723                    (("3".into(), "three".into()), 3, 1),
1724                ],
1725                0,
1726                4,
1727            )
1728            .await;
1729
1730        assert_eq!(batch.batch.part_count(), 1);
1731        let [part] = batch.batch.parts.as_slice() else {
1732            panic!("expected single part")
1733        };
1734        // Verifies that the structured key lower is stored and decoded.
1735        assert!(part.structured_key_lower().is_some());
1736    }
1737}