Skip to main content

mz_persist/indexed/
encoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data structures stored in Blobs and Logs in serialized form.
11
12// NB: Everything starting with Blob* is directly serialized as a Blob value.
13// Ditto for Log* and the Log. The others are used internally in these top-level
14// structs.
15
16use std::fmt::{self, Debug};
17use std::sync::Arc;
18
19use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, Int64Array};
20use arrow::buffer::OffsetBuffer;
21use arrow::datatypes::{DataType, Int64Type, ToByteSlice};
22use base64::Engine;
23use bytes::{BufMut, Bytes};
24use differential_dataflow::difference::Monoid;
25use differential_dataflow::trace::Description;
26use mz_ore::bytes::SegmentedBytes;
27use mz_ore::cast::CastFrom;
28use mz_ore::collections::CollectionExt;
29use mz_ore::soft_panic_or_log;
30use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
31use mz_persist_types::columnar::{
32    ColumnEncoder, Schema, codec_to_schema, data_type, schema_to_codec,
33};
34use mz_persist_types::parquet::EncodingConfig;
35use mz_persist_types::part::Part;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::{Codec, Codec64};
38use mz_proto::{RustType, TryFromProtoError};
39use proptest::arbitrary::Arbitrary;
40use proptest::prelude::*;
41use proptest::strategy::{BoxedStrategy, Just};
42use prost::Message;
43use serde::Serialize;
44use timely::PartialOrder;
45use timely::progress::{Antichain, Timestamp};
46use tracing::error;
47
48use crate::error::Error;
49use crate::generated::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
50use crate::generated::persist::{
51    ProtoBatchFormat, ProtoBatchPartInline, ProtoColumnarRecords, ProtoU64Antichain,
52    ProtoU64Description,
53};
54use crate::indexed::columnar::arrow::realloc_array;
55use crate::indexed::columnar::parquet::{decode_trace_parquet, encode_trace_parquet};
56use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
57use crate::location::Blob;
58use crate::metrics::ColumnarMetrics;
59
60/// Column format of a batch.
61#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize)]
62pub enum BatchColumnarFormat {
63    /// Rows are encoded to `ProtoRow` and then a batch is written down as a Parquet with a schema
64    /// of `(k, v, t, d)`, where `k` are the serialized bytes.
65    Row,
66    /// Rows are encoded to `ProtoRow` and a columnar struct. The batch is written down as Parquet
67    /// with a schema of `(k, k_c, v, v_c, t, d)`, where `k` are the serialized bytes and `k_c` is
68    /// nested columnar data.
69    Both(usize),
70    /// Rows are encoded to a columnar struct. The batch is written down as Parquet
71    /// with a schema of `(t, d, k_s, v_s)`, where `k_s` is nested columnar data.
72    Structured,
73}
74
75impl BatchColumnarFormat {
76    /// Returns a default value for [`BatchColumnarFormat`].
77    pub const fn default() -> Self {
78        BatchColumnarFormat::Both(2)
79    }
80
81    /// Returns a [`BatchColumnarFormat`] for a given `&str`, falling back to a default value if
82    /// provided `&str` is invalid.
83    pub fn from_str(s: &str) -> Self {
84        match s {
85            "row" => BatchColumnarFormat::Row,
86            "both" => BatchColumnarFormat::Both(0),
87            "both_v2" => BatchColumnarFormat::Both(2),
88            "structured" => BatchColumnarFormat::Structured,
89            x => {
90                let default = BatchColumnarFormat::default();
91                soft_panic_or_log!("Invalid batch columnar type: {x}, falling back to {default}");
92                default
93            }
94        }
95    }
96
97    /// Returns a string representation for the [`BatchColumnarFormat`].
98    pub const fn as_str(&self) -> &'static str {
99        match self {
100            BatchColumnarFormat::Row => "row",
101            BatchColumnarFormat::Both(0 | 1) => "both",
102            BatchColumnarFormat::Both(2) => "both_v2",
103            BatchColumnarFormat::Structured => "structured",
104            BatchColumnarFormat::Both(_) => panic!("unknown batch columnar format"),
105        }
106    }
107
108    /// Returns if we should encode a Batch in a structured format.
109    pub const fn is_structured(&self) -> bool {
110        match self {
111            BatchColumnarFormat::Row => false,
112            // The V0 format has been deprecated and we ignore its structured columns.
113            BatchColumnarFormat::Both(0 | 1) => false,
114            BatchColumnarFormat::Both(_) => true,
115            BatchColumnarFormat::Structured => true,
116        }
117    }
118}
119
120impl fmt::Display for BatchColumnarFormat {
121    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122        f.write_str(self.as_str())
123    }
124}
125
126impl Arbitrary for BatchColumnarFormat {
127    type Parameters = ();
128    type Strategy = BoxedStrategy<BatchColumnarFormat>;
129
130    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
131        proptest::strategy::Union::new(vec![
132            Just(BatchColumnarFormat::Row).boxed(),
133            Just(BatchColumnarFormat::Both(0)).boxed(),
134            Just(BatchColumnarFormat::Both(1)).boxed(),
135            Just(BatchColumnarFormat::Both(2)).boxed(),
136            Just(BatchColumnarFormat::Structured).boxed(),
137        ])
138        .boxed()
139    }
140}
141
142/// The metadata necessary to reconstruct a list of [BlobTraceBatchPart]s.
143///
144/// Invariants:
145/// - The Description's time interval is non-empty.
146/// - Keys for all trace batch parts are unique.
147/// - Keys for all trace batch parts are stored in index order.
148/// - The data in all of the trace batch parts is sorted and consolidated.
149/// - All of the trace batch parts have the same desc as the metadata.
150#[derive(Clone, Debug, Eq, PartialEq)]
151pub struct TraceBatchMeta {
152    /// The keys to retrieve the batch's data from the blob store.
153    ///
154    /// The set of keys can be empty to denote an empty batch.
155    pub keys: Vec<String>,
156    /// The format of the stored batch data.
157    pub format: ProtoBatchFormat,
158    /// The half-open time interval `[lower, upper)` this batch contains data
159    /// for.
160    pub desc: Description<u64>,
161    /// The compaction level of each batch.
162    pub level: u64,
163    /// Size of the encoded batch.
164    pub size_bytes: u64,
165}
166
167/// The structure serialized and stored as a value in
168/// [crate::location::Blob] storage for data keys corresponding to trace
169/// data.
170///
171/// This batch represents the data that was originally written at some time in
172/// [lower, upper) (more precisely !< lower and < upper). The individual record
173/// times may have later been advanced by compaction to something <= since. This
174/// means the ability to reconstruct the state of the collection at times <
175/// since has been lost. However, there may still be records present in the
176/// batch whose times are < since. Users iterating through updates must take
177/// care to advance records with times < since to since in order to correctly
178/// answer queries at times >= since.
179///
180/// Invariants:
181/// - The [lower, upper) interval of times in desc is non-empty.
182/// - The timestamp of each update is >= to desc.lower().
183/// - The timestamp of each update is < desc.upper() iff desc.upper() >
184///   desc.since(). Otherwise the timestamp of each update is <= desc.since().
185/// - The values in updates are sorted by (key, value, time).
186/// - The values in updates are "consolidated", i.e. (key, value, time) is
187///   unique.
188/// - All entries have a non-zero diff.
189///
190/// TODO: disallow empty trace batch parts in the future so there is one unique
191/// way to represent an empty trace batch.
192#[derive(Clone, Debug, PartialEq)]
193pub struct BlobTraceBatchPart<T> {
194    /// Which updates are included in this batch.
195    ///
196    /// There may be other parts for the batch that also contain updates within
197    /// the specified [lower, upper) range.
198    pub desc: Description<T>,
199    /// Index of this part in the list of parts that form the batch.
200    pub index: u64,
201    /// The updates themselves.
202    pub updates: BlobTraceUpdates,
203}
204
205/// The set of updates that are part of a [`BlobTraceBatchPart`].
206#[derive(Clone, Debug, PartialEq)]
207pub enum BlobTraceUpdates {
208    /// Legacy format. Keys and Values are encoded into bytes via [`Codec`], then stored in our own
209    /// columnar-esque struct.
210    ///
211    /// [`Codec`]: mz_persist_types::Codec
212    Row(ColumnarRecords),
213    /// Migration format. Keys and Values are encoded into bytes via [`Codec`] and structured into
214    /// an Apache Arrow columnar format.
215    ///
216    /// [`Codec`]: mz_persist_types::Codec
217    Both(ColumnarRecords, ColumnarRecordsStructuredExt),
218    /// New-style structured format, including structured representations of the K/V columns and
219    /// the usual timestamp / diff encoding.
220    Structured {
221        /// Key-value data.
222        key_values: ColumnarRecordsStructuredExt,
223        /// Timestamp data.
224        timestamps: Int64Array,
225        /// Diffs.
226        diffs: Int64Array,
227    },
228}
229
230impl BlobTraceUpdates {
231    /// Convert from a [Part].
232    pub fn from_part(part: Part) -> Self {
233        Self::Structured {
234            key_values: ColumnarRecordsStructuredExt {
235                key: part.key,
236                val: part.val,
237            },
238            timestamps: part.time,
239            diffs: part.diff,
240        }
241    }
242
243    /// The number of updates.
244    pub fn len(&self) -> usize {
245        match self {
246            BlobTraceUpdates::Row(c) => c.len(),
247            BlobTraceUpdates::Both(c, _structured) => c.len(),
248            BlobTraceUpdates::Structured { timestamps, .. } => timestamps.len(),
249        }
250    }
251
252    /// The updates' timestamps as an integer array.
253    pub fn timestamps(&self) -> &Int64Array {
254        match self {
255            BlobTraceUpdates::Row(c) => c.timestamps(),
256            BlobTraceUpdates::Both(c, _structured) => c.timestamps(),
257            BlobTraceUpdates::Structured { timestamps, .. } => timestamps,
258        }
259    }
260
261    /// The updates' diffs as an integer array.
262    pub fn diffs(&self) -> &Int64Array {
263        match self {
264            BlobTraceUpdates::Row(c) => c.diffs(),
265            BlobTraceUpdates::Both(c, _structured) => c.diffs(),
266            BlobTraceUpdates::Structured { diffs, .. } => diffs,
267        }
268    }
269
270    /// Return the sum of the diffs in the blob.
271    pub fn diffs_sum<D: Codec64 + Monoid>(&self) -> D {
272        let mut sum = D::zero();
273        for d in self.diffs().values().iter() {
274            let d = D::decode(d.to_le_bytes());
275            sum.plus_equals(&d);
276        }
277        sum
278    }
279
280    /// Return the [`ColumnarRecords`] of the blob.
281    pub fn records(&self) -> Option<&ColumnarRecords> {
282        match self {
283            BlobTraceUpdates::Row(c) => Some(c),
284            BlobTraceUpdates::Both(c, _structured) => Some(c),
285            BlobTraceUpdates::Structured { .. } => None,
286        }
287    }
288
289    /// Return the [`ColumnarRecordsStructuredExt`] of the blob.
290    pub fn structured(&self) -> Option<&ColumnarRecordsStructuredExt> {
291        match self {
292            BlobTraceUpdates::Row(_) => None,
293            BlobTraceUpdates::Both(_, s) => Some(s),
294            BlobTraceUpdates::Structured { key_values, .. } => Some(key_values),
295        }
296    }
297
298    /// Return the estimated memory usage of the raw data.
299    pub fn goodbytes(&self) -> usize {
300        match self {
301            BlobTraceUpdates::Row(c) => c.goodbytes(),
302            // NB: we only report goodbytes for columnar records here, to avoid
303            // dual-counting the same records. (This means that our goodput % is much lower
304            // during the migration, which is an accurate reflection of reality.)
305            BlobTraceUpdates::Both(c, _) => c.goodbytes(),
306            BlobTraceUpdates::Structured {
307                key_values,
308                timestamps,
309                diffs,
310            } => {
311                key_values.goodbytes()
312                    + timestamps.values().to_byte_slice().len()
313                    + diffs.values().to_byte_slice().len()
314            }
315        }
316    }
317
318    /// Return the [ColumnarRecords] of the blob, generating it if it does not exist.
319    pub fn get_or_make_codec<K: Codec, V: Codec>(
320        &mut self,
321        key_schema: &K::Schema,
322        val_schema: &V::Schema,
323    ) -> &ColumnarRecords {
324        match self {
325            BlobTraceUpdates::Row(records) => records,
326            BlobTraceUpdates::Both(records, _) => records,
327            BlobTraceUpdates::Structured {
328                key_values,
329                timestamps,
330                diffs,
331            } => {
332                let key = schema_to_codec::<K>(key_schema, &*key_values.key).expect("valid keys");
333                let val = schema_to_codec::<V>(val_schema, &*key_values.val).expect("valid values");
334                let records = ColumnarRecords::new(key, val, timestamps.clone(), diffs.clone());
335
336                *self = BlobTraceUpdates::Both(records, key_values.clone());
337                let BlobTraceUpdates::Both(records, _) = self else {
338                    unreachable!("set to BlobTraceUpdates::Both in previous line")
339                };
340                records
341            }
342        }
343    }
344
345    /// Return the [`ColumnarRecordsStructuredExt`] of the blob.
346    pub fn get_or_make_structured<K: Codec, V: Codec>(
347        &mut self,
348        key_schema: &K::Schema,
349        val_schema: &V::Schema,
350    ) -> &ColumnarRecordsStructuredExt {
351        let structured = match self {
352            BlobTraceUpdates::Row(records) => {
353                let key = codec_to_schema::<K>(key_schema, records.keys()).expect("valid keys");
354                let val = codec_to_schema::<V>(val_schema, records.vals()).expect("valid values");
355
356                *self = BlobTraceUpdates::Both(
357                    records.clone(),
358                    ColumnarRecordsStructuredExt { key, val },
359                );
360                let BlobTraceUpdates::Both(_, structured) = self else {
361                    unreachable!("set to BlobTraceUpdates::Both in previous line")
362                };
363                structured
364            }
365            BlobTraceUpdates::Both(_, structured) => structured,
366            BlobTraceUpdates::Structured { key_values, .. } => key_values,
367        };
368
369        // If the types don't match, attempt to migrate the array to the new type.
370        // We expect this to succeed, since this should only be called with backwards-
371        // compatible schemas... but if it fails we only log, and let some higher-level
372        // code signal the error if it cares.
373        let migrate = |array: &mut ArrayRef, to_type: DataType| {
374            // TODO: Plumb down the SchemaCache and use it here for the array migrations.
375            let from_type = array.data_type().clone();
376            if from_type != to_type {
377                if let Some(migration) = backward_compatible(&from_type, &to_type) {
378                    *array = migration.migrate(Arc::clone(array));
379                    if array.data_type() != &to_type {
380                        error!(
381                            ?from_type,
382                            actual_type=?array.data_type(),
383                            ?to_type,
384                            "migration failed; returned non-matching data type!"
385                        );
386                    }
387                } else {
388                    error!(
389                        ?from_type,
390                        ?to_type,
391                        "failed to migrate array type; backwards-incompatible schema migration?"
392                    );
393                }
394            }
395        };
396        migrate(
397            &mut structured.key,
398            data_type::<K>(key_schema).expect("valid key schema"),
399        );
400        migrate(
401            &mut structured.val,
402            data_type::<V>(val_schema).expect("valid value schema"),
403        );
404
405        structured
406    }
407
408    /// If we have structured data, cast this data as a structured part.
409    pub fn as_part(&self) -> Option<Part> {
410        let ext = self.structured()?.clone();
411        Some(Part {
412            key: ext.key,
413            val: ext.val,
414            time: self.timestamps().clone(),
415            diff: self.diffs().clone(),
416        })
417    }
418
419    /// Convert this blob into a structured part, transforming the codec data if necessary.
420    pub fn into_part<K: Codec, V: Codec>(
421        &mut self,
422        key_schema: &K::Schema,
423        val_schema: &V::Schema,
424    ) -> Part {
425        let ext = self
426            .get_or_make_structured::<K, V>(key_schema, val_schema)
427            .clone();
428        Part {
429            key: ext.key,
430            val: ext.val,
431            time: self.timestamps().clone(),
432            diff: self.diffs().clone(),
433        }
434    }
435
436    /// Concatenate the given records together, column-by-column.
437    ///
438    /// If `ensure_codec` is true, then we'll ensure the returned [`BlobTraceUpdates`] includes
439    /// [`Codec`] data, re-encoding structured data if necessary.
440    pub fn concat<K: Codec, V: Codec>(
441        mut updates: Vec<BlobTraceUpdates>,
442        key_schema: &K::Schema,
443        val_schema: &V::Schema,
444        metrics: &ColumnarMetrics,
445    ) -> anyhow::Result<BlobTraceUpdates> {
446        match updates.len() {
447            0 => {
448                return Ok(BlobTraceUpdates::Structured {
449                    key_values: ColumnarRecordsStructuredExt {
450                        key: Arc::new(key_schema.encoder().expect("valid schema").finish()),
451                        val: Arc::new(val_schema.encoder().expect("valid schema").finish()),
452                    },
453                    timestamps: Int64Array::from_iter_values(vec![]),
454                    diffs: Int64Array::from_iter_values(vec![]),
455                });
456            }
457            1 => return Ok(updates.into_iter().into_element()),
458            _ => {}
459        }
460
461        // Always get or make our structured format.
462        let mut keys = Vec::with_capacity(updates.len());
463        let mut vals = Vec::with_capacity(updates.len());
464        for updates in &mut updates {
465            let structured = updates.get_or_make_structured::<K, V>(key_schema, val_schema);
466            keys.push(structured.key.as_ref());
467            vals.push(structured.val.as_ref());
468        }
469        let key_values = ColumnarRecordsStructuredExt {
470            key: ::arrow::compute::concat(&keys)?,
471            val: ::arrow::compute::concat(&vals)?,
472        };
473
474        // If necessary, ensure we have `Codec` data, which internally includes
475        // timestamps and diffs, otherwise get the timestamps and diffs separately.
476        let mut timestamps: Vec<&dyn Array> = Vec::with_capacity(updates.len());
477        let mut diffs: Vec<&dyn Array> = Vec::with_capacity(updates.len());
478
479        for update in &updates {
480            timestamps.push(update.timestamps());
481            diffs.push(update.diffs());
482        }
483        let timestamps = ::arrow::compute::concat(&timestamps)?
484            .as_primitive_opt::<Int64Type>()
485            .ok_or_else(|| anyhow::anyhow!("timestamps changed Array type"))?
486            .clone();
487        let diffs = ::arrow::compute::concat(&diffs)?
488            .as_primitive_opt::<Int64Type>()
489            .ok_or_else(|| anyhow::anyhow!("diffs changed Array type"))?
490            .clone();
491
492        let out = Self::Structured {
493            key_values,
494            timestamps,
495            diffs,
496        };
497        metrics
498            .arrow
499            .concat_bytes
500            .inc_by(u64::cast_from(out.goodbytes()));
501        Ok(out)
502    }
503
504    /// See [RustType::from_proto].
505    pub fn from_proto(
506        lgbytes: &ColumnarMetrics,
507        proto: ProtoColumnarRecords,
508    ) -> Result<Self, TryFromProtoError> {
509        let binary_array = |data: Bytes, offsets: Vec<i32>| {
510            if offsets.is_empty() && proto.len > 0 {
511                return Ok(None);
512            };
513            match BinaryArray::try_new(
514                OffsetBuffer::new(offsets.into()),
515                ::arrow::buffer::Buffer::from(data),
516                None,
517            ) {
518                Ok(data) => Ok(Some(realloc_array(&data, lgbytes))),
519                Err(e) => Err(TryFromProtoError::InvalidFieldError(format!(
520                    "Unable to decode binary array from repeated proto fields: {e:?}"
521                ))),
522            }
523        };
524
525        let codec_key = binary_array(proto.key_data, proto.key_offsets)?;
526        let codec_val = binary_array(proto.val_data, proto.val_offsets)?;
527
528        let timestamps = realloc_array(&proto.timestamps.into(), lgbytes);
529        let diffs = realloc_array(&proto.diffs.into(), lgbytes);
530        let ext =
531            ColumnarRecordsStructuredExt::from_proto(proto.key_structured, proto.val_structured)?;
532
533        let updates = match (codec_key, codec_val, ext) {
534            (Some(codec_key), Some(codec_val), Some(ext)) => BlobTraceUpdates::Both(
535                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
536                ext,
537            ),
538            (Some(codec_key), Some(codec_val), None) => BlobTraceUpdates::Row(
539                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
540            ),
541            (None, None, Some(ext)) => BlobTraceUpdates::Structured {
542                key_values: ext,
543                timestamps,
544                diffs,
545            },
546            (k, v, ext) => {
547                return Err(TryFromProtoError::InvalidPersistState(format!(
548                    "unexpected mix of key/value columns: k={:?}, v={}, ext={}",
549                    k.is_some(),
550                    v.is_some(),
551                    ext.is_some(),
552                )));
553            }
554        };
555
556        Ok(updates)
557    }
558
559    /// See [RustType::into_proto].
560    pub fn into_proto(&self) -> ProtoColumnarRecords {
561        let (key_offsets, key_data, val_offsets, val_data) = match self.records() {
562            None => (vec![], Bytes::new(), vec![], Bytes::new()),
563            Some(records) => (
564                records.keys().offsets().to_vec(),
565                Bytes::copy_from_slice(records.keys().value_data()),
566                records.vals().offsets().to_vec(),
567                Bytes::copy_from_slice(records.vals().value_data()),
568            ),
569        };
570        let (k_struct, v_struct) = match self.structured().map(|x| x.into_proto()) {
571            None => (None, None),
572            Some((k, v)) => (Some(k), Some(v)),
573        };
574
575        ProtoColumnarRecords {
576            len: self.len().into_proto(),
577            key_offsets,
578            key_data,
579            val_offsets,
580            val_data,
581            timestamps: self.timestamps().values().to_vec(),
582            diffs: self.diffs().values().to_vec(),
583            key_structured: k_struct,
584            val_structured: v_struct,
585        }
586    }
587
588    /// Convert these updates into the specified batch format, re-encoding or discarding key-value
589    /// data as necessary.
590    pub fn as_structured<K: Codec, V: Codec>(
591        &self,
592        key_schema: &K::Schema,
593        val_schema: &V::Schema,
594    ) -> Self {
595        let mut this = self.clone();
596        Self::Structured {
597            key_values: this
598                .get_or_make_structured::<K, V>(key_schema, val_schema)
599                .clone(),
600            timestamps: this.timestamps().clone(),
601            diffs: this.diffs().clone(),
602        }
603    }
604}
605
606impl TraceBatchMeta {
607    /// Asserts Self's documented invariants, returning an error if any are
608    /// violated.
609    pub fn validate(&self) -> Result<(), Error> {
610        // TODO: It's unclear if the equal case (an empty desc) is
611        // useful/harmful. Feel free to make this a less_than if empty descs end
612        // up making sense.
613        if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
614            return Err(format!("invalid desc: {:?}", &self.desc).into());
615        }
616
617        Ok(())
618    }
619
620    /// Assert that all of the [BlobTraceBatchPart]'s obey the required invariants.
621    pub async fn validate_data(
622        &self,
623        blob: &dyn Blob,
624        metrics: &ColumnarMetrics,
625    ) -> Result<(), Error> {
626        let mut batches = vec![];
627        for (idx, key) in self.keys.iter().enumerate() {
628            let value = blob
629                .get(key)
630                .await?
631                .ok_or_else(|| Error::from(format!("no blob for trace batch at key: {}", key)))?;
632            let batch = BlobTraceBatchPart::decode(&value, metrics)?;
633            if batch.desc != self.desc {
634                return Err(format!(
635                    "invalid trace batch part desc expected {:?} got {:?}",
636                    &self.desc, &batch.desc
637                )
638                .into());
639            }
640
641            if batch.index != u64::cast_from(idx) {
642                return Err(format!(
643                    "invalid index for blob trace batch part at key {} expected {} got {}",
644                    key, idx, batch.index
645                )
646                .into());
647            }
648
649            batch.validate()?;
650            batches.push(batch);
651        }
652
653        for (batch_idx, batch) in batches.iter().enumerate() {
654            for (row_idx, diff) in batch.updates.diffs().values().iter().enumerate() {
655                // TODO: Don't assume diff is an i64, take a D type param instead.
656                let diff: u64 = Codec64::decode(diff.to_le_bytes());
657
658                // Check data invariants.
659                if diff == 0 {
660                    return Err(format!(
661                        "update with 0 diff in batch {batch_idx} at row {row_idx}",
662                    )
663                    .into());
664                }
665            }
666        }
667
668        Ok(())
669    }
670}
671
672impl<T: Timestamp + Codec64> BlobTraceBatchPart<T> {
673    /// Asserts the documented invariants, returning an error if any are
674    /// violated.
675    pub fn validate(&self) -> Result<(), Error> {
676        // TODO: It's unclear if the equal case (an empty desc) is
677        // useful/harmful. Feel free to make this a less_than if empty descs end
678        // up making sense.
679        if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
680            return Err(format!("invalid desc: {:?}", &self.desc).into());
681        }
682
683        let uncompacted = PartialOrder::less_equal(self.desc.since(), self.desc.lower());
684
685        for time in self.updates.timestamps().values() {
686            let ts = T::decode(time.to_le_bytes());
687            // Check ts against desc.
688            if !self.desc.lower().less_equal(&ts) {
689                return Err(format!(
690                    "timestamp {:?} is less than the batch lower: {:?}",
691                    ts, self.desc
692                )
693                .into());
694            }
695
696            // when since is less than or equal to lower, the upper is a strict bound on the updates'
697            // timestamp because no compaction has been performed. Because user batches are always
698            // uncompacted, this ensures that new updates are recorded with valid timestamps.
699            // Otherwise, we can make no assumptions about the timestamps
700            if uncompacted && self.desc.upper().less_equal(&ts) {
701                return Err(format!(
702                    "timestamp {:?} is greater than or equal to the batch upper: {:?}",
703                    ts, self.desc
704                )
705                .into());
706            }
707        }
708
709        for (row_idx, diff) in self.updates.diffs().values().iter().enumerate() {
710            // TODO: Don't assume diff is an i64, take a D type param instead.
711            let diff: u64 = Codec64::decode(diff.to_le_bytes());
712
713            // Check data invariants.
714            if diff == 0 {
715                return Err(format!("update with 0 diff at row {row_idx}",).into());
716            }
717        }
718
719        Ok(())
720    }
721
722    /// Encodes an BlobTraceBatchPart into the Parquet format.
723    pub fn encode<B>(&self, buf: &mut B, metrics: &ColumnarMetrics, cfg: &EncodingConfig)
724    where
725        B: BufMut + Send,
726    {
727        encode_trace_parquet(&mut buf.writer(), self, metrics, cfg).expect("batch was invalid");
728    }
729
730    /// Decodes a BlobTraceBatchPart from the Parquet format.
731    pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result<Self, Error> {
732        decode_trace_parquet(buf.clone(), metrics)
733    }
734
735    /// Scans the part and returns a lower bound on the contained keys.
736    pub fn key_lower(&self) -> &[u8] {
737        self.updates
738            .records()
739            .and_then(|r| r.keys().iter().flatten().min())
740            .unwrap_or(&[])
741    }
742
743    /// Scans the part and returns a lower bound on the contained keys.
744    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
745        self.updates.structured().and_then(|r| {
746            let ord = ArrayOrd::new(&r.key);
747            (0..r.key.len())
748                .min_by_key(|i| ord.at(*i))
749                .map(|i| ArrayBound::new(Arc::clone(&r.key), i))
750        })
751    }
752}
753
754impl<T: Timestamp + Codec64> From<ProtoU64Description> for Description<T> {
755    fn from(x: ProtoU64Description) -> Self {
756        Description::new(
757            x.lower
758                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
759            x.upper
760                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
761            x.since
762                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
763        )
764    }
765}
766
767impl<T: Timestamp + Codec64> From<ProtoU64Antichain> for Antichain<T> {
768    fn from(x: ProtoU64Antichain) -> Self {
769        Antichain::from(
770            x.elements
771                .into_iter()
772                .map(|x| T::decode(u64::to_le_bytes(x)))
773                .collect::<Vec<_>>(),
774        )
775    }
776}
777
778impl<T: Timestamp + Codec64> From<&Antichain<T>> for ProtoU64Antichain {
779    fn from(x: &Antichain<T>) -> Self {
780        ProtoU64Antichain {
781            elements: x
782                .elements()
783                .iter()
784                .map(|x| u64::from_le_bytes(T::encode(x)))
785                .collect(),
786        }
787    }
788}
789
790impl<T: Timestamp + Codec64> From<&Description<T>> for ProtoU64Description {
791    fn from(x: &Description<T>) -> Self {
792        ProtoU64Description {
793            lower: Some(x.lower().into()),
794            upper: Some(x.upper().into()),
795            since: Some(x.since().into()),
796        }
797    }
798}
799
800/// Encodes the inline metadata for a trace batch into a base64 string.
801pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(batch: &BlobTraceBatchPart<T>) -> String {
802    let (format, format_metadata) = match &batch.updates {
803        // For the legacy Row format, we only write it as `ParquetKvtd`.
804        BlobTraceUpdates::Row(_) => (ProtoBatchFormat::ParquetKvtd, None),
805        // For the newer structured format we track some metadata about the version of the format.
806        BlobTraceUpdates::Both { .. } => {
807            let metadata = ProtoFormatMetadata::StructuredMigration(2);
808            (ProtoBatchFormat::ParquetStructured, Some(metadata))
809        }
810        BlobTraceUpdates::Structured { .. } => {
811            let metadata = ProtoFormatMetadata::StructuredMigration(3);
812            (ProtoBatchFormat::ParquetStructured, Some(metadata))
813        }
814    };
815
816    let inline = ProtoBatchPartInline {
817        format: format.into(),
818        desc: Some((&batch.desc).into()),
819        index: batch.index,
820        format_metadata,
821    };
822    let inline_encoded = inline.encode_to_vec();
823    base64::engine::general_purpose::STANDARD.encode(inline_encoded)
824}
825
826/// Decodes the inline metadata for a trace batch from a base64 string.
827pub fn decode_trace_inline_meta(
828    inline_base64: Option<&String>,
829) -> Result<(ProtoBatchFormat, ProtoBatchPartInline), Error> {
830    let inline_base64 = inline_base64.ok_or("missing batch metadata")?;
831    let inline_encoded = base64::engine::general_purpose::STANDARD
832        .decode(inline_base64)
833        .map_err(|err| err.to_string())?;
834    let inline = ProtoBatchPartInline::decode(&*inline_encoded).map_err(|err| err.to_string())?;
835    let format = ProtoBatchFormat::try_from(inline.format)
836        .map_err(|_| Error::from(format!("unknown format: {}", inline.format)))?;
837    Ok((format, inline))
838}
839
840#[cfg(test)]
841mod tests {
842    use std::sync::Arc;
843
844    use bytes::Bytes;
845
846    use crate::error::Error;
847    use crate::indexed::columnar::ColumnarRecordsBuilder;
848    use crate::mem::{MemBlob, MemBlobConfig};
849    use crate::metrics::ColumnarMetrics;
850    use crate::workload::DataGenerator;
851
852    use super::*;
853
854    fn update_with_key(ts: u64, key: &'static str) -> ((Vec<u8>, Vec<u8>), u64, i64) {
855        ((key.into(), "".into()), ts, 1)
856    }
857
858    fn u64_desc(lower: u64, upper: u64) -> Description<u64> {
859        Description::new(
860            Antichain::from_elem(lower),
861            Antichain::from_elem(upper),
862            Antichain::from_elem(0),
863        )
864    }
865
866    fn batch_meta(lower: u64, upper: u64) -> TraceBatchMeta {
867        TraceBatchMeta {
868            keys: vec![],
869            format: ProtoBatchFormat::Unknown,
870            desc: u64_desc(lower, upper),
871            level: 1,
872            size_bytes: 0,
873        }
874    }
875
876    fn u64_desc_since(lower: u64, upper: u64, since: u64) -> Description<u64> {
877        Description::new(
878            Antichain::from_elem(lower),
879            Antichain::from_elem(upper),
880            Antichain::from_elem(since),
881        )
882    }
883
884    fn columnar_records(updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)>) -> BlobTraceUpdates {
885        let mut builder = ColumnarRecordsBuilder::default();
886        for ((k, v), t, d) in updates {
887            assert!(builder.push(((&k, &v), Codec64::encode(&t), Codec64::encode(&d))));
888        }
889        let updates = builder.finish(&ColumnarMetrics::disconnected());
890        BlobTraceUpdates::Row(updates)
891    }
892
893    #[mz_ore::test]
894    fn trace_batch_validate() {
895        // Normal case
896        let b = BlobTraceBatchPart {
897            desc: u64_desc(0, 2),
898            index: 0,
899            updates: columnar_records(vec![update_with_key(0, "0"), update_with_key(1, "1")]),
900        };
901        assert_eq!(b.validate(), Ok(()));
902
903        // Empty
904        let b = BlobTraceBatchPart {
905            desc: u64_desc(0, 2),
906            index: 0,
907            updates: columnar_records(vec![]),
908        };
909        assert_eq!(b.validate(), Ok(()));
910
911        // Invalid desc
912        let b = BlobTraceBatchPart {
913            desc: u64_desc(2, 0),
914            index: 0,
915            updates: columnar_records(vec![]),
916        };
917        assert_eq!(
918            b.validate(),
919            Err(Error::from(
920                "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
921            ))
922        );
923
924        // Empty desc
925        let b = BlobTraceBatchPart {
926            desc: u64_desc(0, 0),
927            index: 0,
928            updates: columnar_records(vec![]),
929        };
930        assert_eq!(
931            b.validate(),
932            Err(Error::from(
933                "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
934            ))
935        );
936
937        // Update "before" desc
938        let b = BlobTraceBatchPart {
939            desc: u64_desc(1, 2),
940            index: 0,
941            updates: columnar_records(vec![update_with_key(0, "0")]),
942        };
943        assert_eq!(
944            b.validate(),
945            Err(Error::from(
946                "timestamp 0 is less than the batch lower: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
947            ))
948        );
949
950        // Update "after" desc
951        let b = BlobTraceBatchPart {
952            desc: u64_desc(1, 2),
953            index: 0,
954            updates: columnar_records(vec![update_with_key(2, "0")]),
955        };
956        assert_eq!(
957            b.validate(),
958            Err(Error::from(
959                "timestamp 2 is greater than or equal to the batch upper: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
960            ))
961        );
962
963        // Normal case: update "after" desc and within since
964        let b = BlobTraceBatchPart {
965            desc: u64_desc_since(1, 2, 4),
966            index: 0,
967            updates: columnar_records(vec![update_with_key(2, "0")]),
968        };
969        assert_eq!(b.validate(), Ok(()));
970
971        // Normal case: update "after" desc and at since
972        let b = BlobTraceBatchPart {
973            desc: u64_desc_since(1, 2, 4),
974            index: 0,
975            updates: columnar_records(vec![update_with_key(4, "0")]),
976        };
977        assert_eq!(b.validate(), Ok(()));
978
979        // Update "after" desc since
980        let b = BlobTraceBatchPart {
981            desc: u64_desc_since(1, 2, 4),
982            index: 0,
983            updates: columnar_records(vec![update_with_key(5, "0")]),
984        };
985        assert_eq!(b.validate(), Ok(()));
986
987        // Invalid update
988        let b = BlobTraceBatchPart {
989            desc: u64_desc(0, 1),
990            index: 0,
991            updates: columnar_records(vec![(("0".into(), "0".into()), 0, 0)]),
992        };
993        assert_eq!(
994            b.validate(),
995            Err(Error::from("update with 0 diff at row 0"))
996        );
997    }
998
999    #[mz_ore::test]
1000    fn trace_batch_meta_validate() {
1001        // Normal case
1002        let b = batch_meta(0, 1);
1003        assert_eq!(b.validate(), Ok(()));
1004
1005        // Empty interval
1006        let b = batch_meta(0, 0);
1007        assert_eq!(
1008            b.validate(),
1009            Err(Error::from(
1010                "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
1011            )),
1012        );
1013
1014        // Invalid interval
1015        let b = batch_meta(2, 0);
1016        assert_eq!(
1017            b.validate(),
1018            Err(Error::from(
1019                "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
1020            )),
1021        );
1022    }
1023
1024    async fn expect_set_trace_batch<T: Timestamp + Codec64>(
1025        blob: &dyn Blob,
1026        key: &str,
1027        batch: &BlobTraceBatchPart<T>,
1028    ) -> u64 {
1029        let mut val = Vec::new();
1030        let metrics = ColumnarMetrics::disconnected();
1031        let config = EncodingConfig::default();
1032        batch.encode(&mut val, &metrics, &config);
1033        let val = Bytes::from(val);
1034        let val_len = u64::cast_from(val.len());
1035        blob.set(key, val).await.expect("failed to set trace batch");
1036        val_len
1037    }
1038
1039    #[mz_ore::test(tokio::test)]
1040    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1041    async fn trace_batch_meta_validate_data() -> Result<(), Error> {
1042        let metrics = ColumnarMetrics::disconnected();
1043        let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1044        let format = ProtoBatchFormat::ParquetKvtd;
1045
1046        let batch_desc = u64_desc_since(0, 3, 0);
1047        let batch0 = BlobTraceBatchPart {
1048            desc: batch_desc.clone(),
1049            index: 0,
1050            updates: columnar_records(vec![
1051                (("k".as_bytes().to_vec(), "v".as_bytes().to_vec()), 2, 1),
1052                (("k3".as_bytes().to_vec(), "v3".as_bytes().to_vec()), 2, 1),
1053            ]),
1054        };
1055        let batch1 = BlobTraceBatchPart {
1056            desc: batch_desc.clone(),
1057            index: 1,
1058            updates: columnar_records(vec![
1059                (("k4".as_bytes().to_vec(), "v4".as_bytes().to_vec()), 2, 1),
1060                (("k5".as_bytes().to_vec(), "v5".as_bytes().to_vec()), 2, 1),
1061            ]),
1062        };
1063
1064        let batch0_size_bytes = expect_set_trace_batch(blob.as_ref(), "b0", &batch0).await;
1065        let batch1_size_bytes = expect_set_trace_batch(blob.as_ref(), "b1", &batch1).await;
1066        let size_bytes = batch0_size_bytes + batch1_size_bytes;
1067        let batch_meta = TraceBatchMeta {
1068            keys: vec!["b0".into(), "b1".into()],
1069            format,
1070            desc: batch_desc.clone(),
1071            level: 0,
1072            size_bytes,
1073        };
1074
1075        // Normal case:
1076        assert_eq!(
1077            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1078            Ok(())
1079        );
1080
1081        // Incorrect desc
1082        let batch_meta = TraceBatchMeta {
1083            keys: vec!["b0".into(), "b1".into()],
1084            format,
1085            desc: u64_desc_since(1, 3, 0),
1086            level: 0,
1087            size_bytes,
1088        };
1089        assert_eq!(
1090            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1091            Err(Error::from(
1092                "invalid trace batch part desc expected Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } } got Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } }"
1093            ))
1094        );
1095        // Key with no corresponding batch part
1096        let batch_meta = TraceBatchMeta {
1097            keys: vec!["b0".into(), "b1".into(), "b2".into()],
1098            format,
1099            desc: batch_desc.clone(),
1100            level: 0,
1101            size_bytes,
1102        };
1103        assert_eq!(
1104            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1105            Err(Error::from("no blob for trace batch at key: b2"))
1106        );
1107        // Batch parts not in index order
1108        let batch_meta = TraceBatchMeta {
1109            keys: vec!["b1".into(), "b0".into()],
1110            format,
1111            desc: batch_desc,
1112            level: 0,
1113            size_bytes,
1114        };
1115        assert_eq!(
1116            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1117            Err(Error::from(
1118                "invalid index for blob trace batch part at key b1 expected 0 got 1"
1119            ))
1120        );
1121
1122        Ok(())
1123    }
1124
1125    #[mz_ore::test]
1126    #[cfg_attr(miri, ignore)] // too slow
1127    fn encoded_batch_sizes() {
1128        fn sizes(data: DataGenerator) -> usize {
1129            let metrics = ColumnarMetrics::disconnected();
1130            let config = EncodingConfig::default();
1131            let updates: Vec<_> = data.batches().collect();
1132            let updates = BlobTraceUpdates::Row(ColumnarRecords::concat(&updates, &metrics));
1133            let trace = BlobTraceBatchPart {
1134                desc: Description::new(
1135                    Antichain::from_elem(0u64),
1136                    Antichain::new(),
1137                    Antichain::from_elem(0u64),
1138                ),
1139                index: 0,
1140                updates,
1141            };
1142            let mut trace_buf = Vec::new();
1143            trace.encode(&mut trace_buf, &metrics, &config);
1144            trace_buf.len()
1145        }
1146
1147        let record_size_bytes = DataGenerator::default().record_size_bytes;
1148        // Print all the sizes into one assert so we only have to update one
1149        // place if sizes change.
1150        assert_eq!(
1151            format!(
1152                "1/1={:?} 25/1={:?} 1000/1={:?} 1000/100={:?}",
1153                sizes(DataGenerator::new(1, record_size_bytes, 1)),
1154                sizes(DataGenerator::new(25, record_size_bytes, 25)),
1155                sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)),
1156                sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)),
1157            ),
1158            "1/1=903 25/1=2649 1000/1=72881 1000/100=72881"
1159        );
1160    }
1161}