Skip to main content

mz_persist/indexed/
encoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data structures stored in Blobs and Logs in serialized form.
11
12// NB: Everything starting with Blob* is directly serialized as a Blob value.
13// Ditto for Log* and the Log. The others are used internally in these top-level
14// structs.
15
16use std::fmt::{self, Debug};
17use std::sync::Arc;
18
19use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, Int64Array};
20use arrow::buffer::OffsetBuffer;
21use arrow::datatypes::{DataType, Int64Type, ToByteSlice};
22use base64::Engine;
23use bytes::{BufMut, Bytes};
24use differential_dataflow::difference::Monoid;
25use differential_dataflow::trace::Description;
26use mz_ore::bytes::SegmentedBytes;
27use mz_ore::cast::CastFrom;
28use mz_ore::collections::CollectionExt;
29use mz_ore::soft_panic_or_log;
30use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
31use mz_persist_types::columnar::{
32    ColumnEncoder, Schema, codec_to_schema, data_type, schema_to_codec,
33};
34use mz_persist_types::parquet::EncodingConfig;
35use mz_persist_types::part::Part;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::{Codec, Codec64};
38use mz_proto::{RustType, TryFromProtoError};
39use proptest::arbitrary::Arbitrary;
40use proptest::prelude::*;
41use proptest::strategy::{BoxedStrategy, Just};
42use prost::Message;
43use serde::Serialize;
44use timely::PartialOrder;
45use timely::progress::{Antichain, Timestamp};
46use tracing::error;
47
48use crate::error::Error;
49use crate::generated::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
50use crate::generated::persist::{
51    ProtoBatchFormat, ProtoBatchPartInline, ProtoColumnarRecords, ProtoU64Antichain,
52    ProtoU64Description,
53};
54use crate::indexed::columnar::arrow::realloc_array;
55use crate::indexed::columnar::parquet::{decode_trace_parquet, encode_trace_parquet};
56use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
57use crate::location::Blob;
58use crate::metrics::ColumnarMetrics;
59
60/// Column format of a batch.
61#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize)]
62pub enum BatchColumnarFormat {
63    /// Rows are encoded to `ProtoRow` and then a batch is written down as a Parquet with a schema
64    /// of `(k, v, t, d)`, where `k` are the serialized bytes.
65    Row,
66    /// Rows are encoded to `ProtoRow` and a columnar struct. The batch is written down as Parquet
67    /// with a schema of `(k, k_c, v, v_c, t, d)`, where `k` are the serialized bytes and `k_c` is
68    /// nested columnar data.
69    Both(usize),
70    /// Rows are encoded to a columnar struct. The batch is written down as Parquet
71    /// with a schema of `(t, d, k_s, v_s)`, where `k_s` is nested columnar data.
72    Structured,
73}
74
75impl BatchColumnarFormat {
76    /// Returns a default value for [`BatchColumnarFormat`].
77    pub const fn default() -> Self {
78        BatchColumnarFormat::Both(2)
79    }
80
81    /// Returns a [`BatchColumnarFormat`] for a given `&str`, falling back to a default value if
82    /// provided `&str` is invalid.
83    pub fn from_str(s: &str) -> Self {
84        match s {
85            "row" => BatchColumnarFormat::Row,
86            "both" => BatchColumnarFormat::Both(0),
87            "both_v2" => BatchColumnarFormat::Both(2),
88            "structured" => BatchColumnarFormat::Structured,
89            x => {
90                let default = BatchColumnarFormat::default();
91                soft_panic_or_log!("Invalid batch columnar type: {x}, falling back to {default}");
92                default
93            }
94        }
95    }
96
97    /// Returns a string representation for the [`BatchColumnarFormat`].
98    pub const fn as_str(&self) -> &'static str {
99        match self {
100            BatchColumnarFormat::Row => "row",
101            BatchColumnarFormat::Both(0 | 1) => "both",
102            BatchColumnarFormat::Both(2) => "both_v2",
103            BatchColumnarFormat::Structured => "structured",
104            _ => panic!("unknown batch columnar format"),
105        }
106    }
107
108    /// Returns if we should encode a Batch in a structured format.
109    pub const fn is_structured(&self) -> bool {
110        match self {
111            BatchColumnarFormat::Row => false,
112            // The V0 format has been deprecated and we ignore its structured columns.
113            BatchColumnarFormat::Both(0 | 1) => false,
114            BatchColumnarFormat::Both(_) => true,
115            BatchColumnarFormat::Structured => true,
116        }
117    }
118}
119
120impl fmt::Display for BatchColumnarFormat {
121    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122        f.write_str(self.as_str())
123    }
124}
125
126impl Arbitrary for BatchColumnarFormat {
127    type Parameters = ();
128    type Strategy = BoxedStrategy<BatchColumnarFormat>;
129
130    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
131        proptest::strategy::Union::new(vec![
132            Just(BatchColumnarFormat::Row).boxed(),
133            Just(BatchColumnarFormat::Both(0)).boxed(),
134            Just(BatchColumnarFormat::Both(1)).boxed(),
135        ])
136        .boxed()
137    }
138}
139
140/// The metadata necessary to reconstruct a list of [BlobTraceBatchPart]s.
141///
142/// Invariants:
143/// - The Description's time interval is non-empty.
144/// - Keys for all trace batch parts are unique.
145/// - Keys for all trace batch parts are stored in index order.
146/// - The data in all of the trace batch parts is sorted and consolidated.
147/// - All of the trace batch parts have the same desc as the metadata.
148#[derive(Clone, Debug, Eq, PartialEq)]
149pub struct TraceBatchMeta {
150    /// The keys to retrieve the batch's data from the blob store.
151    ///
152    /// The set of keys can be empty to denote an empty batch.
153    pub keys: Vec<String>,
154    /// The format of the stored batch data.
155    pub format: ProtoBatchFormat,
156    /// The half-open time interval `[lower, upper)` this batch contains data
157    /// for.
158    pub desc: Description<u64>,
159    /// The compaction level of each batch.
160    pub level: u64,
161    /// Size of the encoded batch.
162    pub size_bytes: u64,
163}
164
165/// The structure serialized and stored as a value in
166/// [crate::location::Blob] storage for data keys corresponding to trace
167/// data.
168///
169/// This batch represents the data that was originally written at some time in
170/// [lower, upper) (more precisely !< lower and < upper). The individual record
171/// times may have later been advanced by compaction to something <= since. This
172/// means the ability to reconstruct the state of the collection at times <
173/// since has been lost. However, there may still be records present in the
174/// batch whose times are < since. Users iterating through updates must take
175/// care to advance records with times < since to since in order to correctly
176/// answer queries at times >= since.
177///
178/// Invariants:
179/// - The [lower, upper) interval of times in desc is non-empty.
180/// - The timestamp of each update is >= to desc.lower().
181/// - The timestamp of each update is < desc.upper() iff desc.upper() >
182///   desc.since(). Otherwise the timestamp of each update is <= desc.since().
183/// - The values in updates are sorted by (key, value, time).
184/// - The values in updates are "consolidated", i.e. (key, value, time) is
185///   unique.
186/// - All entries have a non-zero diff.
187///
188/// TODO: disallow empty trace batch parts in the future so there is one unique
189/// way to represent an empty trace batch.
190#[derive(Clone, Debug, PartialEq)]
191pub struct BlobTraceBatchPart<T> {
192    /// Which updates are included in this batch.
193    ///
194    /// There may be other parts for the batch that also contain updates within
195    /// the specified [lower, upper) range.
196    pub desc: Description<T>,
197    /// Index of this part in the list of parts that form the batch.
198    pub index: u64,
199    /// The updates themselves.
200    pub updates: BlobTraceUpdates,
201}
202
203/// The set of updates that are part of a [`BlobTraceBatchPart`].
204#[derive(Clone, Debug, PartialEq)]
205pub enum BlobTraceUpdates {
206    /// Legacy format. Keys and Values are encoded into bytes via [`Codec`], then stored in our own
207    /// columnar-esque struct.
208    ///
209    /// [`Codec`]: mz_persist_types::Codec
210    Row(ColumnarRecords),
211    /// Migration format. Keys and Values are encoded into bytes via [`Codec`] and structured into
212    /// an Apache Arrow columnar format.
213    ///
214    /// [`Codec`]: mz_persist_types::Codec
215    Both(ColumnarRecords, ColumnarRecordsStructuredExt),
216    /// New-style structured format, including structured representations of the K/V columns and
217    /// the usual timestamp / diff encoding.
218    Structured {
219        /// Key-value data.
220        key_values: ColumnarRecordsStructuredExt,
221        /// Timestamp data.
222        timestamps: Int64Array,
223        /// Diffs.
224        diffs: Int64Array,
225    },
226}
227
228impl BlobTraceUpdates {
229    /// Convert from a [Part].
230    pub fn from_part(part: Part) -> Self {
231        Self::Structured {
232            key_values: ColumnarRecordsStructuredExt {
233                key: part.key,
234                val: part.val,
235            },
236            timestamps: part.time,
237            diffs: part.diff,
238        }
239    }
240
241    /// The number of updates.
242    pub fn len(&self) -> usize {
243        match self {
244            BlobTraceUpdates::Row(c) => c.len(),
245            BlobTraceUpdates::Both(c, _structured) => c.len(),
246            BlobTraceUpdates::Structured { timestamps, .. } => timestamps.len(),
247        }
248    }
249
250    /// The updates' timestamps as an integer array.
251    pub fn timestamps(&self) -> &Int64Array {
252        match self {
253            BlobTraceUpdates::Row(c) => c.timestamps(),
254            BlobTraceUpdates::Both(c, _structured) => c.timestamps(),
255            BlobTraceUpdates::Structured { timestamps, .. } => timestamps,
256        }
257    }
258
259    /// The updates' diffs as an integer array.
260    pub fn diffs(&self) -> &Int64Array {
261        match self {
262            BlobTraceUpdates::Row(c) => c.diffs(),
263            BlobTraceUpdates::Both(c, _structured) => c.diffs(),
264            BlobTraceUpdates::Structured { diffs, .. } => diffs,
265        }
266    }
267
268    /// Return the sum of the diffs in the blob.
269    pub fn diffs_sum<D: Codec64 + Monoid>(&self) -> D {
270        let mut sum = D::zero();
271        for d in self.diffs().values().iter() {
272            let d = D::decode(d.to_le_bytes());
273            sum.plus_equals(&d);
274        }
275        sum
276    }
277
278    /// Return the [`ColumnarRecords`] of the blob.
279    pub fn records(&self) -> Option<&ColumnarRecords> {
280        match self {
281            BlobTraceUpdates::Row(c) => Some(c),
282            BlobTraceUpdates::Both(c, _structured) => Some(c),
283            BlobTraceUpdates::Structured { .. } => None,
284        }
285    }
286
287    /// Return the [`ColumnarRecordsStructuredExt`] of the blob.
288    pub fn structured(&self) -> Option<&ColumnarRecordsStructuredExt> {
289        match self {
290            BlobTraceUpdates::Row(_) => None,
291            BlobTraceUpdates::Both(_, s) => Some(s),
292            BlobTraceUpdates::Structured { key_values, .. } => Some(key_values),
293        }
294    }
295
296    /// Return the estimated memory usage of the raw data.
297    pub fn goodbytes(&self) -> usize {
298        match self {
299            BlobTraceUpdates::Row(c) => c.goodbytes(),
300            // NB: we only report goodbytes for columnar records here, to avoid
301            // dual-counting the same records. (This means that our goodput % is much lower
302            // during the migration, which is an accurate reflection of reality.)
303            BlobTraceUpdates::Both(c, _) => c.goodbytes(),
304            BlobTraceUpdates::Structured {
305                key_values,
306                timestamps,
307                diffs,
308            } => {
309                key_values.goodbytes()
310                    + timestamps.values().to_byte_slice().len()
311                    + diffs.values().to_byte_slice().len()
312            }
313        }
314    }
315
316    /// Return the [ColumnarRecords] of the blob, generating it if it does not exist.
317    pub fn get_or_make_codec<K: Codec, V: Codec>(
318        &mut self,
319        key_schema: &K::Schema,
320        val_schema: &V::Schema,
321    ) -> &ColumnarRecords {
322        match self {
323            BlobTraceUpdates::Row(records) => records,
324            BlobTraceUpdates::Both(records, _) => records,
325            BlobTraceUpdates::Structured {
326                key_values,
327                timestamps,
328                diffs,
329            } => {
330                let key = schema_to_codec::<K>(key_schema, &*key_values.key).expect("valid keys");
331                let val = schema_to_codec::<V>(val_schema, &*key_values.val).expect("valid values");
332                let records = ColumnarRecords::new(key, val, timestamps.clone(), diffs.clone());
333
334                *self = BlobTraceUpdates::Both(records, key_values.clone());
335                let BlobTraceUpdates::Both(records, _) = self else {
336                    unreachable!("set to BlobTraceUpdates::Both in previous line")
337                };
338                records
339            }
340        }
341    }
342
343    /// Return the [`ColumnarRecordsStructuredExt`] of the blob.
344    pub fn get_or_make_structured<K: Codec, V: Codec>(
345        &mut self,
346        key_schema: &K::Schema,
347        val_schema: &V::Schema,
348    ) -> &ColumnarRecordsStructuredExt {
349        let structured = match self {
350            BlobTraceUpdates::Row(records) => {
351                let key = codec_to_schema::<K>(key_schema, records.keys()).expect("valid keys");
352                let val = codec_to_schema::<V>(val_schema, records.vals()).expect("valid values");
353
354                *self = BlobTraceUpdates::Both(
355                    records.clone(),
356                    ColumnarRecordsStructuredExt { key, val },
357                );
358                let BlobTraceUpdates::Both(_, structured) = self else {
359                    unreachable!("set to BlobTraceUpdates::Both in previous line")
360                };
361                structured
362            }
363            BlobTraceUpdates::Both(_, structured) => structured,
364            BlobTraceUpdates::Structured { key_values, .. } => key_values,
365        };
366
367        // If the types don't match, attempt to migrate the array to the new type.
368        // We expect this to succeed, since this should only be called with backwards-
369        // compatible schemas... but if it fails we only log, and let some higher-level
370        // code signal the error if it cares.
371        let migrate = |array: &mut ArrayRef, to_type: DataType| {
372            // TODO: Plumb down the SchemaCache and use it here for the array migrations.
373            let from_type = array.data_type().clone();
374            if from_type != to_type {
375                if let Some(migration) = backward_compatible(&from_type, &to_type) {
376                    *array = migration.migrate(Arc::clone(array));
377                    if array.data_type() != &to_type {
378                        error!(
379                            ?from_type,
380                            actual_type=?array.data_type(),
381                            ?to_type,
382                            "migration failed; returned non-matching data type!"
383                        );
384                    }
385                } else {
386                    error!(
387                        ?from_type,
388                        ?to_type,
389                        "failed to migrate array type; backwards-incompatible schema migration?"
390                    );
391                }
392            }
393        };
394        migrate(
395            &mut structured.key,
396            data_type::<K>(key_schema).expect("valid key schema"),
397        );
398        migrate(
399            &mut structured.val,
400            data_type::<V>(val_schema).expect("valid value schema"),
401        );
402
403        structured
404    }
405
406    /// If we have structured data, cast this data as a structured part.
407    pub fn as_part(&self) -> Option<Part> {
408        let ext = self.structured()?.clone();
409        Some(Part {
410            key: ext.key,
411            val: ext.val,
412            time: self.timestamps().clone(),
413            diff: self.diffs().clone(),
414        })
415    }
416
417    /// Convert this blob into a structured part, transforming the codec data if necessary.
418    pub fn into_part<K: Codec, V: Codec>(
419        &mut self,
420        key_schema: &K::Schema,
421        val_schema: &V::Schema,
422    ) -> Part {
423        let ext = self
424            .get_or_make_structured::<K, V>(key_schema, val_schema)
425            .clone();
426        Part {
427            key: ext.key,
428            val: ext.val,
429            time: self.timestamps().clone(),
430            diff: self.diffs().clone(),
431        }
432    }
433
434    /// Concatenate the given records together, column-by-column.
435    ///
436    /// If `ensure_codec` is true, then we'll ensure the returned [`BlobTraceUpdates`] includes
437    /// [`Codec`] data, re-encoding structured data if necessary.
438    pub fn concat<K: Codec, V: Codec>(
439        mut updates: Vec<BlobTraceUpdates>,
440        key_schema: &K::Schema,
441        val_schema: &V::Schema,
442        metrics: &ColumnarMetrics,
443    ) -> anyhow::Result<BlobTraceUpdates> {
444        match updates.len() {
445            0 => {
446                return Ok(BlobTraceUpdates::Structured {
447                    key_values: ColumnarRecordsStructuredExt {
448                        key: Arc::new(key_schema.encoder().expect("valid schema").finish()),
449                        val: Arc::new(val_schema.encoder().expect("valid schema").finish()),
450                    },
451                    timestamps: Int64Array::from_iter_values(vec![]),
452                    diffs: Int64Array::from_iter_values(vec![]),
453                });
454            }
455            1 => return Ok(updates.into_iter().into_element()),
456            _ => {}
457        }
458
459        // Always get or make our structured format.
460        let mut keys = Vec::with_capacity(updates.len());
461        let mut vals = Vec::with_capacity(updates.len());
462        for updates in &mut updates {
463            let structured = updates.get_or_make_structured::<K, V>(key_schema, val_schema);
464            keys.push(structured.key.as_ref());
465            vals.push(structured.val.as_ref());
466        }
467        let key_values = ColumnarRecordsStructuredExt {
468            key: ::arrow::compute::concat(&keys)?,
469            val: ::arrow::compute::concat(&vals)?,
470        };
471
472        // If necessary, ensure we have `Codec` data, which internally includes
473        // timestamps and diffs, otherwise get the timestamps and diffs separately.
474        let mut timestamps: Vec<&dyn Array> = Vec::with_capacity(updates.len());
475        let mut diffs: Vec<&dyn Array> = Vec::with_capacity(updates.len());
476
477        for update in &updates {
478            timestamps.push(update.timestamps());
479            diffs.push(update.diffs());
480        }
481        let timestamps = ::arrow::compute::concat(&timestamps)?
482            .as_primitive_opt::<Int64Type>()
483            .ok_or_else(|| anyhow::anyhow!("timestamps changed Array type"))?
484            .clone();
485        let diffs = ::arrow::compute::concat(&diffs)?
486            .as_primitive_opt::<Int64Type>()
487            .ok_or_else(|| anyhow::anyhow!("diffs changed Array type"))?
488            .clone();
489
490        let out = Self::Structured {
491            key_values,
492            timestamps,
493            diffs,
494        };
495        metrics
496            .arrow
497            .concat_bytes
498            .inc_by(u64::cast_from(out.goodbytes()));
499        Ok(out)
500    }
501
502    /// See [RustType::from_proto].
503    pub fn from_proto(
504        lgbytes: &ColumnarMetrics,
505        proto: ProtoColumnarRecords,
506    ) -> Result<Self, TryFromProtoError> {
507        let binary_array = |data: Bytes, offsets: Vec<i32>| {
508            if offsets.is_empty() && proto.len > 0 {
509                return Ok(None);
510            };
511            match BinaryArray::try_new(
512                OffsetBuffer::new(offsets.into()),
513                ::arrow::buffer::Buffer::from(data),
514                None,
515            ) {
516                Ok(data) => Ok(Some(realloc_array(&data, lgbytes))),
517                Err(e) => Err(TryFromProtoError::InvalidFieldError(format!(
518                    "Unable to decode binary array from repeated proto fields: {e:?}"
519                ))),
520            }
521        };
522
523        let codec_key = binary_array(proto.key_data, proto.key_offsets)?;
524        let codec_val = binary_array(proto.val_data, proto.val_offsets)?;
525
526        let timestamps = realloc_array(&proto.timestamps.into(), lgbytes);
527        let diffs = realloc_array(&proto.diffs.into(), lgbytes);
528        let ext =
529            ColumnarRecordsStructuredExt::from_proto(proto.key_structured, proto.val_structured)?;
530
531        let updates = match (codec_key, codec_val, ext) {
532            (Some(codec_key), Some(codec_val), Some(ext)) => BlobTraceUpdates::Both(
533                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
534                ext,
535            ),
536            (Some(codec_key), Some(codec_val), None) => BlobTraceUpdates::Row(
537                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
538            ),
539            (None, None, Some(ext)) => BlobTraceUpdates::Structured {
540                key_values: ext,
541                timestamps,
542                diffs,
543            },
544            (k, v, ext) => {
545                return Err(TryFromProtoError::InvalidPersistState(format!(
546                    "unexpected mix of key/value columns: k={:?}, v={}, ext={}",
547                    k.is_some(),
548                    v.is_some(),
549                    ext.is_some(),
550                )));
551            }
552        };
553
554        Ok(updates)
555    }
556
557    /// See [RustType::into_proto].
558    pub fn into_proto(&self) -> ProtoColumnarRecords {
559        let (key_offsets, key_data, val_offsets, val_data) = match self.records() {
560            None => (vec![], Bytes::new(), vec![], Bytes::new()),
561            Some(records) => (
562                records.keys().offsets().to_vec(),
563                Bytes::copy_from_slice(records.keys().value_data()),
564                records.vals().offsets().to_vec(),
565                Bytes::copy_from_slice(records.vals().value_data()),
566            ),
567        };
568        let (k_struct, v_struct) = match self.structured().map(|x| x.into_proto()) {
569            None => (None, None),
570            Some((k, v)) => (Some(k), Some(v)),
571        };
572
573        ProtoColumnarRecords {
574            len: self.len().into_proto(),
575            key_offsets,
576            key_data,
577            val_offsets,
578            val_data,
579            timestamps: self.timestamps().values().to_vec(),
580            diffs: self.diffs().values().to_vec(),
581            key_structured: k_struct,
582            val_structured: v_struct,
583        }
584    }
585
586    /// Convert these updates into the specified batch format, re-encoding or discarding key-value
587    /// data as necessary.
588    pub fn as_structured<K: Codec, V: Codec>(
589        &self,
590        key_schema: &K::Schema,
591        val_schema: &V::Schema,
592    ) -> Self {
593        let mut this = self.clone();
594        Self::Structured {
595            key_values: this
596                .get_or_make_structured::<K, V>(key_schema, val_schema)
597                .clone(),
598            timestamps: this.timestamps().clone(),
599            diffs: this.diffs().clone(),
600        }
601    }
602}
603
604impl TraceBatchMeta {
605    /// Asserts Self's documented invariants, returning an error if any are
606    /// violated.
607    pub fn validate(&self) -> Result<(), Error> {
608        // TODO: It's unclear if the equal case (an empty desc) is
609        // useful/harmful. Feel free to make this a less_than if empty descs end
610        // up making sense.
611        if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
612            return Err(format!("invalid desc: {:?}", &self.desc).into());
613        }
614
615        Ok(())
616    }
617
618    /// Assert that all of the [BlobTraceBatchPart]'s obey the required invariants.
619    pub async fn validate_data(
620        &self,
621        blob: &dyn Blob,
622        metrics: &ColumnarMetrics,
623    ) -> Result<(), Error> {
624        let mut batches = vec![];
625        for (idx, key) in self.keys.iter().enumerate() {
626            let value = blob
627                .get(key)
628                .await?
629                .ok_or_else(|| Error::from(format!("no blob for trace batch at key: {}", key)))?;
630            let batch = BlobTraceBatchPart::decode(&value, metrics)?;
631            if batch.desc != self.desc {
632                return Err(format!(
633                    "invalid trace batch part desc expected {:?} got {:?}",
634                    &self.desc, &batch.desc
635                )
636                .into());
637            }
638
639            if batch.index != u64::cast_from(idx) {
640                return Err(format!(
641                    "invalid index for blob trace batch part at key {} expected {} got {}",
642                    key, idx, batch.index
643                )
644                .into());
645            }
646
647            batch.validate()?;
648            batches.push(batch);
649        }
650
651        for (batch_idx, batch) in batches.iter().enumerate() {
652            for (row_idx, diff) in batch.updates.diffs().values().iter().enumerate() {
653                // TODO: Don't assume diff is an i64, take a D type param instead.
654                let diff: u64 = Codec64::decode(diff.to_le_bytes());
655
656                // Check data invariants.
657                if diff == 0 {
658                    return Err(format!(
659                        "update with 0 diff in batch {batch_idx} at row {row_idx}",
660                    )
661                    .into());
662                }
663            }
664        }
665
666        Ok(())
667    }
668}
669
670impl<T: Timestamp + Codec64> BlobTraceBatchPart<T> {
671    /// Asserts the documented invariants, returning an error if any are
672    /// violated.
673    pub fn validate(&self) -> Result<(), Error> {
674        // TODO: It's unclear if the equal case (an empty desc) is
675        // useful/harmful. Feel free to make this a less_than if empty descs end
676        // up making sense.
677        if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
678            return Err(format!("invalid desc: {:?}", &self.desc).into());
679        }
680
681        let uncompacted = PartialOrder::less_equal(self.desc.since(), self.desc.lower());
682
683        for time in self.updates.timestamps().values() {
684            let ts = T::decode(time.to_le_bytes());
685            // Check ts against desc.
686            if !self.desc.lower().less_equal(&ts) {
687                return Err(format!(
688                    "timestamp {:?} is less than the batch lower: {:?}",
689                    ts, self.desc
690                )
691                .into());
692            }
693
694            // when since is less than or equal to lower, the upper is a strict bound on the updates'
695            // timestamp because no compaction has been performed. Because user batches are always
696            // uncompacted, this ensures that new updates are recorded with valid timestamps.
697            // Otherwise, we can make no assumptions about the timestamps
698            if uncompacted && self.desc.upper().less_equal(&ts) {
699                return Err(format!(
700                    "timestamp {:?} is greater than or equal to the batch upper: {:?}",
701                    ts, self.desc
702                )
703                .into());
704            }
705        }
706
707        for (row_idx, diff) in self.updates.diffs().values().iter().enumerate() {
708            // TODO: Don't assume diff is an i64, take a D type param instead.
709            let diff: u64 = Codec64::decode(diff.to_le_bytes());
710
711            // Check data invariants.
712            if diff == 0 {
713                return Err(format!("update with 0 diff at row {row_idx}",).into());
714            }
715        }
716
717        Ok(())
718    }
719
720    /// Encodes an BlobTraceBatchPart into the Parquet format.
721    pub fn encode<B>(&self, buf: &mut B, metrics: &ColumnarMetrics, cfg: &EncodingConfig)
722    where
723        B: BufMut + Send,
724    {
725        encode_trace_parquet(&mut buf.writer(), self, metrics, cfg).expect("batch was invalid");
726    }
727
728    /// Decodes a BlobTraceBatchPart from the Parquet format.
729    pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result<Self, Error> {
730        decode_trace_parquet(buf.clone(), metrics)
731    }
732
733    /// Scans the part and returns a lower bound on the contained keys.
734    pub fn key_lower(&self) -> &[u8] {
735        self.updates
736            .records()
737            .and_then(|r| r.keys().iter().flatten().min())
738            .unwrap_or(&[])
739    }
740
741    /// Scans the part and returns a lower bound on the contained keys.
742    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
743        self.updates.structured().and_then(|r| {
744            let ord = ArrayOrd::new(&r.key);
745            (0..r.key.len())
746                .min_by_key(|i| ord.at(*i))
747                .map(|i| ArrayBound::new(Arc::clone(&r.key), i))
748        })
749    }
750}
751
752impl<T: Timestamp + Codec64> From<ProtoU64Description> for Description<T> {
753    fn from(x: ProtoU64Description) -> Self {
754        Description::new(
755            x.lower
756                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
757            x.upper
758                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
759            x.since
760                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
761        )
762    }
763}
764
765impl<T: Timestamp + Codec64> From<ProtoU64Antichain> for Antichain<T> {
766    fn from(x: ProtoU64Antichain) -> Self {
767        Antichain::from(
768            x.elements
769                .into_iter()
770                .map(|x| T::decode(u64::to_le_bytes(x)))
771                .collect::<Vec<_>>(),
772        )
773    }
774}
775
776impl<T: Timestamp + Codec64> From<&Antichain<T>> for ProtoU64Antichain {
777    fn from(x: &Antichain<T>) -> Self {
778        ProtoU64Antichain {
779            elements: x
780                .elements()
781                .iter()
782                .map(|x| u64::from_le_bytes(T::encode(x)))
783                .collect(),
784        }
785    }
786}
787
788impl<T: Timestamp + Codec64> From<&Description<T>> for ProtoU64Description {
789    fn from(x: &Description<T>) -> Self {
790        ProtoU64Description {
791            lower: Some(x.lower().into()),
792            upper: Some(x.upper().into()),
793            since: Some(x.since().into()),
794        }
795    }
796}
797
798/// Encodes the inline metadata for a trace batch into a base64 string.
799pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(batch: &BlobTraceBatchPart<T>) -> String {
800    let (format, format_metadata) = match &batch.updates {
801        // For the legacy Row format, we only write it as `ParquetKvtd`.
802        BlobTraceUpdates::Row(_) => (ProtoBatchFormat::ParquetKvtd, None),
803        // For the newer structured format we track some metadata about the version of the format.
804        BlobTraceUpdates::Both { .. } => {
805            let metadata = ProtoFormatMetadata::StructuredMigration(2);
806            (ProtoBatchFormat::ParquetStructured, Some(metadata))
807        }
808        BlobTraceUpdates::Structured { .. } => {
809            let metadata = ProtoFormatMetadata::StructuredMigration(3);
810            (ProtoBatchFormat::ParquetStructured, Some(metadata))
811        }
812    };
813
814    let inline = ProtoBatchPartInline {
815        format: format.into(),
816        desc: Some((&batch.desc).into()),
817        index: batch.index,
818        format_metadata,
819    };
820    let inline_encoded = inline.encode_to_vec();
821    base64::engine::general_purpose::STANDARD.encode(inline_encoded)
822}
823
824/// Decodes the inline metadata for a trace batch from a base64 string.
825pub fn decode_trace_inline_meta(
826    inline_base64: Option<&String>,
827) -> Result<(ProtoBatchFormat, ProtoBatchPartInline), Error> {
828    let inline_base64 = inline_base64.ok_or("missing batch metadata")?;
829    let inline_encoded = base64::engine::general_purpose::STANDARD
830        .decode(inline_base64)
831        .map_err(|err| err.to_string())?;
832    let inline = ProtoBatchPartInline::decode(&*inline_encoded).map_err(|err| err.to_string())?;
833    let format = ProtoBatchFormat::try_from(inline.format)
834        .map_err(|_| Error::from(format!("unknown format: {}", inline.format)))?;
835    Ok((format, inline))
836}
837
838#[cfg(test)]
839mod tests {
840    use std::sync::Arc;
841
842    use bytes::Bytes;
843
844    use crate::error::Error;
845    use crate::indexed::columnar::ColumnarRecordsBuilder;
846    use crate::mem::{MemBlob, MemBlobConfig};
847    use crate::metrics::ColumnarMetrics;
848    use crate::workload::DataGenerator;
849
850    use super::*;
851
852    fn update_with_key(ts: u64, key: &'static str) -> ((Vec<u8>, Vec<u8>), u64, i64) {
853        ((key.into(), "".into()), ts, 1)
854    }
855
856    fn u64_desc(lower: u64, upper: u64) -> Description<u64> {
857        Description::new(
858            Antichain::from_elem(lower),
859            Antichain::from_elem(upper),
860            Antichain::from_elem(0),
861        )
862    }
863
864    fn batch_meta(lower: u64, upper: u64) -> TraceBatchMeta {
865        TraceBatchMeta {
866            keys: vec![],
867            format: ProtoBatchFormat::Unknown,
868            desc: u64_desc(lower, upper),
869            level: 1,
870            size_bytes: 0,
871        }
872    }
873
874    fn u64_desc_since(lower: u64, upper: u64, since: u64) -> Description<u64> {
875        Description::new(
876            Antichain::from_elem(lower),
877            Antichain::from_elem(upper),
878            Antichain::from_elem(since),
879        )
880    }
881
882    fn columnar_records(updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)>) -> BlobTraceUpdates {
883        let mut builder = ColumnarRecordsBuilder::default();
884        for ((k, v), t, d) in updates {
885            assert!(builder.push(((&k, &v), Codec64::encode(&t), Codec64::encode(&d))));
886        }
887        let updates = builder.finish(&ColumnarMetrics::disconnected());
888        BlobTraceUpdates::Row(updates)
889    }
890
891    #[mz_ore::test]
892    fn trace_batch_validate() {
893        // Normal case
894        let b = BlobTraceBatchPart {
895            desc: u64_desc(0, 2),
896            index: 0,
897            updates: columnar_records(vec![update_with_key(0, "0"), update_with_key(1, "1")]),
898        };
899        assert_eq!(b.validate(), Ok(()));
900
901        // Empty
902        let b = BlobTraceBatchPart {
903            desc: u64_desc(0, 2),
904            index: 0,
905            updates: columnar_records(vec![]),
906        };
907        assert_eq!(b.validate(), Ok(()));
908
909        // Invalid desc
910        let b = BlobTraceBatchPart {
911            desc: u64_desc(2, 0),
912            index: 0,
913            updates: columnar_records(vec![]),
914        };
915        assert_eq!(
916            b.validate(),
917            Err(Error::from(
918                "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
919            ))
920        );
921
922        // Empty desc
923        let b = BlobTraceBatchPart {
924            desc: u64_desc(0, 0),
925            index: 0,
926            updates: columnar_records(vec![]),
927        };
928        assert_eq!(
929            b.validate(),
930            Err(Error::from(
931                "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
932            ))
933        );
934
935        // Update "before" desc
936        let b = BlobTraceBatchPart {
937            desc: u64_desc(1, 2),
938            index: 0,
939            updates: columnar_records(vec![update_with_key(0, "0")]),
940        };
941        assert_eq!(
942            b.validate(),
943            Err(Error::from(
944                "timestamp 0 is less than the batch lower: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
945            ))
946        );
947
948        // Update "after" desc
949        let b = BlobTraceBatchPart {
950            desc: u64_desc(1, 2),
951            index: 0,
952            updates: columnar_records(vec![update_with_key(2, "0")]),
953        };
954        assert_eq!(
955            b.validate(),
956            Err(Error::from(
957                "timestamp 2 is greater than or equal to the batch upper: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
958            ))
959        );
960
961        // Normal case: update "after" desc and within since
962        let b = BlobTraceBatchPart {
963            desc: u64_desc_since(1, 2, 4),
964            index: 0,
965            updates: columnar_records(vec![update_with_key(2, "0")]),
966        };
967        assert_eq!(b.validate(), Ok(()));
968
969        // Normal case: update "after" desc and at since
970        let b = BlobTraceBatchPart {
971            desc: u64_desc_since(1, 2, 4),
972            index: 0,
973            updates: columnar_records(vec![update_with_key(4, "0")]),
974        };
975        assert_eq!(b.validate(), Ok(()));
976
977        // Update "after" desc since
978        let b = BlobTraceBatchPart {
979            desc: u64_desc_since(1, 2, 4),
980            index: 0,
981            updates: columnar_records(vec![update_with_key(5, "0")]),
982        };
983        assert_eq!(b.validate(), Ok(()));
984
985        // Invalid update
986        let b = BlobTraceBatchPart {
987            desc: u64_desc(0, 1),
988            index: 0,
989            updates: columnar_records(vec![(("0".into(), "0".into()), 0, 0)]),
990        };
991        assert_eq!(
992            b.validate(),
993            Err(Error::from("update with 0 diff at row 0"))
994        );
995    }
996
997    #[mz_ore::test]
998    fn trace_batch_meta_validate() {
999        // Normal case
1000        let b = batch_meta(0, 1);
1001        assert_eq!(b.validate(), Ok(()));
1002
1003        // Empty interval
1004        let b = batch_meta(0, 0);
1005        assert_eq!(
1006            b.validate(),
1007            Err(Error::from(
1008                "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
1009            )),
1010        );
1011
1012        // Invalid interval
1013        let b = batch_meta(2, 0);
1014        assert_eq!(
1015            b.validate(),
1016            Err(Error::from(
1017                "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
1018            )),
1019        );
1020    }
1021
1022    async fn expect_set_trace_batch<T: Timestamp + Codec64>(
1023        blob: &dyn Blob,
1024        key: &str,
1025        batch: &BlobTraceBatchPart<T>,
1026    ) -> u64 {
1027        let mut val = Vec::new();
1028        let metrics = ColumnarMetrics::disconnected();
1029        let config = EncodingConfig::default();
1030        batch.encode(&mut val, &metrics, &config);
1031        let val = Bytes::from(val);
1032        let val_len = u64::cast_from(val.len());
1033        blob.set(key, val).await.expect("failed to set trace batch");
1034        val_len
1035    }
1036
1037    #[mz_ore::test(tokio::test)]
1038    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1039    async fn trace_batch_meta_validate_data() -> Result<(), Error> {
1040        let metrics = ColumnarMetrics::disconnected();
1041        let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1042        let format = ProtoBatchFormat::ParquetKvtd;
1043
1044        let batch_desc = u64_desc_since(0, 3, 0);
1045        let batch0 = BlobTraceBatchPart {
1046            desc: batch_desc.clone(),
1047            index: 0,
1048            updates: columnar_records(vec![
1049                (("k".as_bytes().to_vec(), "v".as_bytes().to_vec()), 2, 1),
1050                (("k3".as_bytes().to_vec(), "v3".as_bytes().to_vec()), 2, 1),
1051            ]),
1052        };
1053        let batch1 = BlobTraceBatchPart {
1054            desc: batch_desc.clone(),
1055            index: 1,
1056            updates: columnar_records(vec![
1057                (("k4".as_bytes().to_vec(), "v4".as_bytes().to_vec()), 2, 1),
1058                (("k5".as_bytes().to_vec(), "v5".as_bytes().to_vec()), 2, 1),
1059            ]),
1060        };
1061
1062        let batch0_size_bytes = expect_set_trace_batch(blob.as_ref(), "b0", &batch0).await;
1063        let batch1_size_bytes = expect_set_trace_batch(blob.as_ref(), "b1", &batch1).await;
1064        let size_bytes = batch0_size_bytes + batch1_size_bytes;
1065        let batch_meta = TraceBatchMeta {
1066            keys: vec!["b0".into(), "b1".into()],
1067            format,
1068            desc: batch_desc.clone(),
1069            level: 0,
1070            size_bytes,
1071        };
1072
1073        // Normal case:
1074        assert_eq!(
1075            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1076            Ok(())
1077        );
1078
1079        // Incorrect desc
1080        let batch_meta = TraceBatchMeta {
1081            keys: vec!["b0".into(), "b1".into()],
1082            format,
1083            desc: u64_desc_since(1, 3, 0),
1084            level: 0,
1085            size_bytes,
1086        };
1087        assert_eq!(
1088            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1089            Err(Error::from(
1090                "invalid trace batch part desc expected Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } } got Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } }"
1091            ))
1092        );
1093        // Key with no corresponding batch part
1094        let batch_meta = TraceBatchMeta {
1095            keys: vec!["b0".into(), "b1".into(), "b2".into()],
1096            format,
1097            desc: batch_desc.clone(),
1098            level: 0,
1099            size_bytes,
1100        };
1101        assert_eq!(
1102            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1103            Err(Error::from("no blob for trace batch at key: b2"))
1104        );
1105        // Batch parts not in index order
1106        let batch_meta = TraceBatchMeta {
1107            keys: vec!["b1".into(), "b0".into()],
1108            format,
1109            desc: batch_desc,
1110            level: 0,
1111            size_bytes,
1112        };
1113        assert_eq!(
1114            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1115            Err(Error::from(
1116                "invalid index for blob trace batch part at key b1 expected 0 got 1"
1117            ))
1118        );
1119
1120        Ok(())
1121    }
1122
1123    #[mz_ore::test]
1124    #[cfg_attr(miri, ignore)] // too slow
1125    fn encoded_batch_sizes() {
1126        fn sizes(data: DataGenerator) -> usize {
1127            let metrics = ColumnarMetrics::disconnected();
1128            let config = EncodingConfig::default();
1129            let updates: Vec<_> = data.batches().collect();
1130            let updates = BlobTraceUpdates::Row(ColumnarRecords::concat(&updates, &metrics));
1131            let trace = BlobTraceBatchPart {
1132                desc: Description::new(
1133                    Antichain::from_elem(0u64),
1134                    Antichain::new(),
1135                    Antichain::from_elem(0u64),
1136                ),
1137                index: 0,
1138                updates,
1139            };
1140            let mut trace_buf = Vec::new();
1141            trace.encode(&mut trace_buf, &metrics, &config);
1142            trace_buf.len()
1143        }
1144
1145        let record_size_bytes = DataGenerator::default().record_size_bytes;
1146        // Print all the sizes into one assert so we only have to update one
1147        // place if sizes change.
1148        assert_eq!(
1149            format!(
1150                "1/1={:?} 25/1={:?} 1000/1={:?} 1000/100={:?}",
1151                sizes(DataGenerator::new(1, record_size_bytes, 1)),
1152                sizes(DataGenerator::new(25, record_size_bytes, 25)),
1153                sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)),
1154                sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)),
1155            ),
1156            "1/1=903 25/1=2649 1000/1=72881 1000/100=72881"
1157        );
1158    }
1159}