mz_persist/indexed/
encoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data structures stored in Blobs and Logs in serialized form.
11
12// NB: Everything starting with Blob* is directly serialized as a Blob value.
13// Ditto for Log* and the Log. The others are used internally in these top-level
14// structs.
15
16use std::fmt::{self, Debug};
17use std::sync::Arc;
18
19use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, Int64Array};
20use arrow::buffer::OffsetBuffer;
21use arrow::datatypes::{DataType, Int64Type, ToByteSlice};
22use bytes::{BufMut, Bytes};
23use differential_dataflow::trace::Description;
24use mz_ore::bytes::SegmentedBytes;
25use mz_ore::cast::CastFrom;
26use mz_ore::collections::CollectionExt;
27use mz_ore::soft_panic_or_log;
28use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
29use mz_persist_types::columnar::{
30    ColumnEncoder, Schema, codec_to_schema, data_type, schema_to_codec,
31};
32use mz_persist_types::parquet::EncodingConfig;
33use mz_persist_types::part::Part;
34use mz_persist_types::schema::backward_compatible;
35use mz_persist_types::{Codec, Codec64};
36use mz_proto::{RustType, TryFromProtoError};
37use proptest::arbitrary::Arbitrary;
38use proptest::prelude::*;
39use proptest::strategy::{BoxedStrategy, Just};
40use prost::Message;
41use serde::Serialize;
42use timely::PartialOrder;
43use timely::progress::{Antichain, Timestamp};
44use tracing::error;
45
46use crate::error::Error;
47use crate::generated::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
48use crate::generated::persist::{
49    ProtoBatchFormat, ProtoBatchPartInline, ProtoColumnarRecords, ProtoU64Antichain,
50    ProtoU64Description,
51};
52use crate::indexed::columnar::arrow::realloc_array;
53use crate::indexed::columnar::parquet::{decode_trace_parquet, encode_trace_parquet};
54use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
55use crate::location::Blob;
56use crate::metrics::ColumnarMetrics;
57
58/// Column format of a batch.
59#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize)]
60pub enum BatchColumnarFormat {
61    /// Rows are encoded to `ProtoRow` and then a batch is written down as a Parquet with a schema
62    /// of `(k, v, t, d)`, where `k` are the serialized bytes.
63    Row,
64    /// Rows are encoded to `ProtoRow` and a columnar struct. The batch is written down as Parquet
65    /// with a schema of `(k, k_c, v, v_c, t, d)`, where `k` are the serialized bytes and `k_c` is
66    /// nested columnar data.
67    Both(usize),
68    /// Rows are encoded to a columnar struct. The batch is written down as Parquet
69    /// with a schema of `(t, d, k_s, v_s)`, where `k_s` is nested columnar data.
70    Structured,
71}
72
73impl BatchColumnarFormat {
74    /// Returns a default value for [`BatchColumnarFormat`].
75    pub const fn default() -> Self {
76        BatchColumnarFormat::Both(2)
77    }
78
79    /// Returns a [`BatchColumnarFormat`] for a given `&str`, falling back to a default value if
80    /// provided `&str` is invalid.
81    pub fn from_str(s: &str) -> Self {
82        match s {
83            "row" => BatchColumnarFormat::Row,
84            "both" => BatchColumnarFormat::Both(0),
85            "both_v2" => BatchColumnarFormat::Both(2),
86            "structured" => BatchColumnarFormat::Structured,
87            x => {
88                let default = BatchColumnarFormat::default();
89                soft_panic_or_log!("Invalid batch columnar type: {x}, falling back to {default}");
90                default
91            }
92        }
93    }
94
95    /// Returns a string representation for the [`BatchColumnarFormat`].
96    pub const fn as_str(&self) -> &'static str {
97        match self {
98            BatchColumnarFormat::Row => "row",
99            BatchColumnarFormat::Both(0 | 1) => "both",
100            BatchColumnarFormat::Both(2) => "both_v2",
101            _ => panic!("unknown batch columnar format"),
102        }
103    }
104
105    /// Returns if we should encode a Batch in a structured format.
106    pub const fn is_structured(&self) -> bool {
107        match self {
108            BatchColumnarFormat::Row => false,
109            // The V0 format has been deprecated and we ignore its structured columns.
110            BatchColumnarFormat::Both(0 | 1) => false,
111            BatchColumnarFormat::Both(_) => true,
112            BatchColumnarFormat::Structured => true,
113        }
114    }
115}
116
117impl fmt::Display for BatchColumnarFormat {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        f.write_str(self.as_str())
120    }
121}
122
123impl Arbitrary for BatchColumnarFormat {
124    type Parameters = ();
125    type Strategy = BoxedStrategy<BatchColumnarFormat>;
126
127    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
128        proptest::strategy::Union::new(vec![
129            Just(BatchColumnarFormat::Row).boxed(),
130            Just(BatchColumnarFormat::Both(0)).boxed(),
131            Just(BatchColumnarFormat::Both(1)).boxed(),
132        ])
133        .boxed()
134    }
135}
136
137/// The metadata necessary to reconstruct a list of [BlobTraceBatchPart]s.
138///
139/// Invariants:
140/// - The Description's time interval is non-empty.
141/// - Keys for all trace batch parts are unique.
142/// - Keys for all trace batch parts are stored in index order.
143/// - The data in all of the trace batch parts is sorted and consolidated.
144/// - All of the trace batch parts have the same desc as the metadata.
145#[derive(Clone, Debug, Eq, PartialEq)]
146pub struct TraceBatchMeta {
147    /// The keys to retrieve the batch's data from the blob store.
148    ///
149    /// The set of keys can be empty to denote an empty batch.
150    pub keys: Vec<String>,
151    /// The format of the stored batch data.
152    pub format: ProtoBatchFormat,
153    /// The half-open time interval `[lower, upper)` this batch contains data
154    /// for.
155    pub desc: Description<u64>,
156    /// The compaction level of each batch.
157    pub level: u64,
158    /// Size of the encoded batch.
159    pub size_bytes: u64,
160}
161
162/// The structure serialized and stored as a value in
163/// [crate::location::Blob] storage for data keys corresponding to trace
164/// data.
165///
166/// This batch represents the data that was originally written at some time in
167/// [lower, upper) (more precisely !< lower and < upper). The individual record
168/// times may have later been advanced by compaction to something <= since. This
169/// means the ability to reconstruct the state of the collection at times <
170/// since has been lost. However, there may still be records present in the
171/// batch whose times are < since. Users iterating through updates must take
172/// care to advance records with times < since to since in order to correctly
173/// answer queries at times >= since.
174///
175/// Invariants:
176/// - The [lower, upper) interval of times in desc is non-empty.
177/// - The timestamp of each update is >= to desc.lower().
178/// - The timestamp of each update is < desc.upper() iff desc.upper() >
179///   desc.since(). Otherwise the timestamp of each update is <= desc.since().
180/// - The values in updates are sorted by (key, value, time).
181/// - The values in updates are "consolidated", i.e. (key, value, time) is
182///   unique.
183/// - All entries have a non-zero diff.
184///
185/// TODO: disallow empty trace batch parts in the future so there is one unique
186/// way to represent an empty trace batch.
187#[derive(Clone, Debug, PartialEq)]
188pub struct BlobTraceBatchPart<T> {
189    /// Which updates are included in this batch.
190    ///
191    /// There may be other parts for the batch that also contain updates within
192    /// the specified [lower, upper) range.
193    pub desc: Description<T>,
194    /// Index of this part in the list of parts that form the batch.
195    pub index: u64,
196    /// The updates themselves.
197    pub updates: BlobTraceUpdates,
198}
199
200/// The set of updates that are part of a [`BlobTraceBatchPart`].
201#[derive(Clone, Debug, PartialEq)]
202pub enum BlobTraceUpdates {
203    /// Legacy format. Keys and Values are encoded into bytes via [`Codec`], then stored in our own
204    /// columnar-esque struct.
205    ///
206    /// [`Codec`]: mz_persist_types::Codec
207    Row(ColumnarRecords),
208    /// Migration format. Keys and Values are encoded into bytes via [`Codec`] and structured into
209    /// an Apache Arrow columnar format.
210    ///
211    /// [`Codec`]: mz_persist_types::Codec
212    Both(ColumnarRecords, ColumnarRecordsStructuredExt),
213    /// New-style structured format, including structured representations of the K/V columns and
214    /// the usual timestamp / diff encoding.
215    Structured {
216        /// Key-value data.
217        key_values: ColumnarRecordsStructuredExt,
218        /// Timestamp data.
219        timestamps: Int64Array,
220        /// Diffs.
221        diffs: Int64Array,
222    },
223}
224
225impl BlobTraceUpdates {
226    /// Convert from a [Part].
227    pub fn from_part(part: Part) -> Self {
228        Self::Structured {
229            key_values: ColumnarRecordsStructuredExt {
230                key: part.key,
231                val: part.val,
232            },
233            timestamps: part.time,
234            diffs: part.diff,
235        }
236    }
237
238    /// The number of updates.
239    pub fn len(&self) -> usize {
240        match self {
241            BlobTraceUpdates::Row(c) => c.len(),
242            BlobTraceUpdates::Both(c, _structured) => c.len(),
243            BlobTraceUpdates::Structured { timestamps, .. } => timestamps.len(),
244        }
245    }
246
247    /// The updates' timestamps as an integer array.
248    pub fn timestamps(&self) -> &Int64Array {
249        match self {
250            BlobTraceUpdates::Row(c) => c.timestamps(),
251            BlobTraceUpdates::Both(c, _structured) => c.timestamps(),
252            BlobTraceUpdates::Structured { timestamps, .. } => timestamps,
253        }
254    }
255
256    /// The updates' diffs as an integer array.
257    pub fn diffs(&self) -> &Int64Array {
258        match self {
259            BlobTraceUpdates::Row(c) => c.diffs(),
260            BlobTraceUpdates::Both(c, _structured) => c.diffs(),
261            BlobTraceUpdates::Structured { diffs, .. } => diffs,
262        }
263    }
264
265    /// Return the [`ColumnarRecords`] of the blob.
266    pub fn records(&self) -> Option<&ColumnarRecords> {
267        match self {
268            BlobTraceUpdates::Row(c) => Some(c),
269            BlobTraceUpdates::Both(c, _structured) => Some(c),
270            BlobTraceUpdates::Structured { .. } => None,
271        }
272    }
273
274    /// Return the [`ColumnarRecordsStructuredExt`] of the blob.
275    pub fn structured(&self) -> Option<&ColumnarRecordsStructuredExt> {
276        match self {
277            BlobTraceUpdates::Row(_) => None,
278            BlobTraceUpdates::Both(_, s) => Some(s),
279            BlobTraceUpdates::Structured { key_values, .. } => Some(key_values),
280        }
281    }
282
283    /// Return the estimated memory usage of the raw data.
284    pub fn goodbytes(&self) -> usize {
285        match self {
286            BlobTraceUpdates::Row(c) => c.goodbytes(),
287            // NB: we only report goodbytes for columnar records here, to avoid
288            // dual-counting the same records. (This means that our goodput % is much lower
289            // during the migration, which is an accurate reflection of reality.)
290            BlobTraceUpdates::Both(c, _) => c.goodbytes(),
291            BlobTraceUpdates::Structured {
292                key_values,
293                timestamps,
294                diffs,
295            } => {
296                key_values.goodbytes()
297                    + timestamps.values().to_byte_slice().len()
298                    + diffs.values().to_byte_slice().len()
299            }
300        }
301    }
302
303    /// Return the [ColumnarRecords] of the blob, generating it if it does not exist.
304    pub fn get_or_make_codec<K: Codec, V: Codec>(
305        &mut self,
306        key_schema: &K::Schema,
307        val_schema: &V::Schema,
308    ) -> &ColumnarRecords {
309        match self {
310            BlobTraceUpdates::Row(records) => records,
311            BlobTraceUpdates::Both(records, _) => records,
312            BlobTraceUpdates::Structured {
313                key_values,
314                timestamps,
315                diffs,
316            } => {
317                let key = schema_to_codec::<K>(key_schema, &*key_values.key).expect("valid keys");
318                let val = schema_to_codec::<V>(val_schema, &*key_values.val).expect("valid values");
319                let records = ColumnarRecords::new(key, val, timestamps.clone(), diffs.clone());
320
321                *self = BlobTraceUpdates::Both(records, key_values.clone());
322                let BlobTraceUpdates::Both(records, _) = self else {
323                    unreachable!("set to BlobTraceUpdates::Both in previous line")
324                };
325                records
326            }
327        }
328    }
329
330    /// Return the [`ColumnarRecordsStructuredExt`] of the blob.
331    pub fn get_or_make_structured<K: Codec, V: Codec>(
332        &mut self,
333        key_schema: &K::Schema,
334        val_schema: &V::Schema,
335    ) -> &ColumnarRecordsStructuredExt {
336        let structured = match self {
337            BlobTraceUpdates::Row(records) => {
338                let key = codec_to_schema::<K>(key_schema, records.keys()).expect("valid keys");
339                let val = codec_to_schema::<V>(val_schema, records.vals()).expect("valid values");
340
341                *self = BlobTraceUpdates::Both(
342                    records.clone(),
343                    ColumnarRecordsStructuredExt { key, val },
344                );
345                let BlobTraceUpdates::Both(_, structured) = self else {
346                    unreachable!("set to BlobTraceUpdates::Both in previous line")
347                };
348                structured
349            }
350            BlobTraceUpdates::Both(_, structured) => structured,
351            BlobTraceUpdates::Structured { key_values, .. } => key_values,
352        };
353
354        // If the types don't match, attempt to migrate the array to the new type.
355        // We expect this to succeed, since this should only be called with backwards-
356        // compatible schemas... but if it fails we only log, and let some higher-level
357        // code signal the error if it cares.
358        let migrate = |array: &mut ArrayRef, to_type: DataType| {
359            // TODO: Plumb down the SchemaCache and use it here for the array migrations.
360            let from_type = array.data_type().clone();
361            if from_type != to_type {
362                if let Some(migration) = backward_compatible(&from_type, &to_type) {
363                    *array = migration.migrate(Arc::clone(array));
364                } else {
365                    error!(
366                        ?from_type,
367                        ?to_type,
368                        "failed to migrate array type; backwards-incompatible schema migration?"
369                    );
370                }
371            }
372        };
373        migrate(
374            &mut structured.key,
375            data_type::<K>(key_schema).expect("valid key schema"),
376        );
377        migrate(
378            &mut structured.val,
379            data_type::<V>(val_schema).expect("valid value schema"),
380        );
381
382        structured
383    }
384
385    /// Convert this blob into a structured part, transforming the codec data if necessary.
386    pub fn into_part<K: Codec, V: Codec>(
387        &mut self,
388        key_schema: &K::Schema,
389        val_schema: &V::Schema,
390    ) -> Part {
391        let ext = self
392            .get_or_make_structured::<K, V>(key_schema, val_schema)
393            .clone();
394        Part {
395            key: ext.key,
396            val: ext.val,
397            time: self.timestamps().clone(),
398            diff: self.diffs().clone(),
399        }
400    }
401
402    /// Concatenate the given records together, column-by-column.
403    ///
404    /// If `ensure_codec` is true, then we'll ensure the returned [`BlobTraceUpdates`] includes
405    /// [`Codec`] data, re-encoding structured data if necessary.
406    pub fn concat<K: Codec, V: Codec>(
407        mut updates: Vec<BlobTraceUpdates>,
408        key_schema: &K::Schema,
409        val_schema: &V::Schema,
410        metrics: &ColumnarMetrics,
411    ) -> anyhow::Result<BlobTraceUpdates> {
412        match updates.len() {
413            0 => {
414                return Ok(BlobTraceUpdates::Structured {
415                    key_values: ColumnarRecordsStructuredExt {
416                        key: Arc::new(key_schema.encoder().expect("valid schema").finish()),
417                        val: Arc::new(val_schema.encoder().expect("valid schema").finish()),
418                    },
419                    timestamps: Int64Array::from_iter_values(vec![]),
420                    diffs: Int64Array::from_iter_values(vec![]),
421                });
422            }
423            1 => return Ok(updates.into_iter().into_element()),
424            _ => {}
425        }
426
427        // Always get or make our structured format.
428        let mut keys = Vec::with_capacity(updates.len());
429        let mut vals = Vec::with_capacity(updates.len());
430        for updates in &mut updates {
431            let structured = updates.get_or_make_structured::<K, V>(key_schema, val_schema);
432            keys.push(structured.key.as_ref());
433            vals.push(structured.val.as_ref());
434        }
435        let key_values = ColumnarRecordsStructuredExt {
436            key: ::arrow::compute::concat(&keys)?,
437            val: ::arrow::compute::concat(&vals)?,
438        };
439
440        // If necessary, ensure we have `Codec` data, which internally includes
441        // timestamps and diffs, otherwise get the timestamps and diffs separately.
442        let mut timestamps: Vec<&dyn Array> = Vec::with_capacity(updates.len());
443        let mut diffs: Vec<&dyn Array> = Vec::with_capacity(updates.len());
444
445        for update in &updates {
446            timestamps.push(update.timestamps());
447            diffs.push(update.diffs());
448        }
449        let timestamps = ::arrow::compute::concat(&timestamps)?
450            .as_primitive_opt::<Int64Type>()
451            .ok_or_else(|| anyhow::anyhow!("timestamps changed Array type"))?
452            .clone();
453        let diffs = ::arrow::compute::concat(&diffs)?
454            .as_primitive_opt::<Int64Type>()
455            .ok_or_else(|| anyhow::anyhow!("diffs changed Array type"))?
456            .clone();
457
458        let out = Self::Structured {
459            key_values,
460            timestamps,
461            diffs,
462        };
463        metrics
464            .arrow
465            .concat_bytes
466            .inc_by(u64::cast_from(out.goodbytes()));
467        Ok(out)
468    }
469
470    /// See [RustType::from_proto].
471    pub fn from_proto(
472        lgbytes: &ColumnarMetrics,
473        proto: ProtoColumnarRecords,
474    ) -> Result<Self, TryFromProtoError> {
475        let binary_array = |data: Bytes, offsets: Vec<i32>| {
476            if offsets.is_empty() && proto.len > 0 {
477                return Ok(None);
478            };
479            match BinaryArray::try_new(
480                OffsetBuffer::new(offsets.into()),
481                ::arrow::buffer::Buffer::from_bytes(data.into()),
482                None,
483            ) {
484                Ok(data) => Ok(Some(realloc_array(&data, lgbytes))),
485                Err(e) => Err(TryFromProtoError::InvalidFieldError(format!(
486                    "Unable to decode binary array from repeated proto fields: {e:?}"
487                ))),
488            }
489        };
490
491        let codec_key = binary_array(proto.key_data, proto.key_offsets)?;
492        let codec_val = binary_array(proto.val_data, proto.val_offsets)?;
493
494        let timestamps = realloc_array(&proto.timestamps.into(), lgbytes);
495        let diffs = realloc_array(&proto.diffs.into(), lgbytes);
496        let ext =
497            ColumnarRecordsStructuredExt::from_proto(proto.key_structured, proto.val_structured)?;
498
499        let updates = match (codec_key, codec_val, ext) {
500            (Some(codec_key), Some(codec_val), Some(ext)) => BlobTraceUpdates::Both(
501                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
502                ext,
503            ),
504            (Some(codec_key), Some(codec_val), None) => BlobTraceUpdates::Row(
505                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
506            ),
507            (None, None, Some(ext)) => BlobTraceUpdates::Structured {
508                key_values: ext,
509                timestamps,
510                diffs,
511            },
512            (k, v, ext) => {
513                return Err(TryFromProtoError::InvalidPersistState(format!(
514                    "unexpected mix of key/value columns: k={:?}, v={}, ext={}",
515                    k.is_some(),
516                    v.is_some(),
517                    ext.is_some(),
518                )));
519            }
520        };
521
522        Ok(updates)
523    }
524
525    /// See [RustType::into_proto].
526    pub fn into_proto(&self) -> ProtoColumnarRecords {
527        let (key_offsets, key_data, val_offsets, val_data) = match self.records() {
528            None => (vec![], Bytes::new(), vec![], Bytes::new()),
529            Some(records) => (
530                records.keys().offsets().to_vec(),
531                Bytes::copy_from_slice(records.keys().value_data()),
532                records.vals().offsets().to_vec(),
533                Bytes::copy_from_slice(records.vals().value_data()),
534            ),
535        };
536        let (k_struct, v_struct) = match self.structured().map(|x| x.into_proto()) {
537            None => (None, None),
538            Some((k, v)) => (Some(k), Some(v)),
539        };
540
541        ProtoColumnarRecords {
542            len: self.len().into_proto(),
543            key_offsets,
544            key_data,
545            val_offsets,
546            val_data,
547            timestamps: self.timestamps().values().to_vec(),
548            diffs: self.diffs().values().to_vec(),
549            key_structured: k_struct,
550            val_structured: v_struct,
551        }
552    }
553
554    /// Convert these updates into the specified batch format, re-encoding or discarding key-value
555    /// data as necessary.
556    pub fn as_structured<K: Codec, V: Codec>(
557        &self,
558        key_schema: &K::Schema,
559        val_schema: &V::Schema,
560    ) -> Self {
561        let mut this = self.clone();
562        Self::Structured {
563            key_values: this
564                .get_or_make_structured::<K, V>(key_schema, val_schema)
565                .clone(),
566            timestamps: this.timestamps().clone(),
567            diffs: this.diffs().clone(),
568        }
569    }
570}
571
572impl TraceBatchMeta {
573    /// Asserts Self's documented invariants, returning an error if any are
574    /// violated.
575    pub fn validate(&self) -> Result<(), Error> {
576        // TODO: It's unclear if the equal case (an empty desc) is
577        // useful/harmful. Feel free to make this a less_than if empty descs end
578        // up making sense.
579        if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
580            return Err(format!("invalid desc: {:?}", &self.desc).into());
581        }
582
583        Ok(())
584    }
585
586    /// Assert that all of the [BlobTraceBatchPart]'s obey the required invariants.
587    pub async fn validate_data(
588        &self,
589        blob: &dyn Blob,
590        metrics: &ColumnarMetrics,
591    ) -> Result<(), Error> {
592        let mut batches = vec![];
593        for (idx, key) in self.keys.iter().enumerate() {
594            let value = blob
595                .get(key)
596                .await?
597                .ok_or_else(|| Error::from(format!("no blob for trace batch at key: {}", key)))?;
598            let batch = BlobTraceBatchPart::decode(&value, metrics)?;
599            if batch.desc != self.desc {
600                return Err(format!(
601                    "invalid trace batch part desc expected {:?} got {:?}",
602                    &self.desc, &batch.desc
603                )
604                .into());
605            }
606
607            if batch.index != u64::cast_from(idx) {
608                return Err(format!(
609                    "invalid index for blob trace batch part at key {} expected {} got {}",
610                    key, idx, batch.index
611                )
612                .into());
613            }
614
615            batch.validate()?;
616            batches.push(batch);
617        }
618
619        for (batch_idx, batch) in batches.iter().enumerate() {
620            for (row_idx, diff) in batch.updates.diffs().values().iter().enumerate() {
621                // TODO: Don't assume diff is an i64, take a D type param instead.
622                let diff: u64 = Codec64::decode(diff.to_le_bytes());
623
624                // Check data invariants.
625                if diff == 0 {
626                    return Err(format!(
627                        "update with 0 diff in batch {batch_idx} at row {row_idx}",
628                    )
629                    .into());
630                }
631            }
632        }
633
634        Ok(())
635    }
636}
637
638impl<T: Timestamp + Codec64> BlobTraceBatchPart<T> {
639    /// Asserts the documented invariants, returning an error if any are
640    /// violated.
641    pub fn validate(&self) -> Result<(), Error> {
642        // TODO: It's unclear if the equal case (an empty desc) is
643        // useful/harmful. Feel free to make this a less_than if empty descs end
644        // up making sense.
645        if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
646            return Err(format!("invalid desc: {:?}", &self.desc).into());
647        }
648
649        let uncompacted = PartialOrder::less_equal(self.desc.since(), self.desc.lower());
650
651        for time in self.updates.timestamps().values() {
652            let ts = T::decode(time.to_le_bytes());
653            // Check ts against desc.
654            if !self.desc.lower().less_equal(&ts) {
655                return Err(format!(
656                    "timestamp {:?} is less than the batch lower: {:?}",
657                    ts, self.desc
658                )
659                .into());
660            }
661
662            // when since is less than or equal to lower, the upper is a strict bound on the updates'
663            // timestamp because no compaction has been performed. Because user batches are always
664            // uncompacted, this ensures that new updates are recorded with valid timestamps.
665            // Otherwise, we can make no assumptions about the timestamps
666            if uncompacted && self.desc.upper().less_equal(&ts) {
667                return Err(format!(
668                    "timestamp {:?} is greater than or equal to the batch upper: {:?}",
669                    ts, self.desc
670                )
671                .into());
672            }
673        }
674
675        for (row_idx, diff) in self.updates.diffs().values().iter().enumerate() {
676            // TODO: Don't assume diff is an i64, take a D type param instead.
677            let diff: u64 = Codec64::decode(diff.to_le_bytes());
678
679            // Check data invariants.
680            if diff == 0 {
681                return Err(format!("update with 0 diff at row {row_idx}",).into());
682            }
683        }
684
685        Ok(())
686    }
687
688    /// Encodes an BlobTraceBatchPart into the Parquet format.
689    pub fn encode<B>(&self, buf: &mut B, metrics: &ColumnarMetrics, cfg: &EncodingConfig)
690    where
691        B: BufMut + Send,
692    {
693        encode_trace_parquet(&mut buf.writer(), self, metrics, cfg).expect("batch was invalid");
694    }
695
696    /// Decodes a BlobTraceBatchPart from the Parquet format.
697    pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result<Self, Error> {
698        decode_trace_parquet(buf.clone(), metrics)
699    }
700
701    /// Scans the part and returns a lower bound on the contained keys.
702    pub fn key_lower(&self) -> &[u8] {
703        self.updates
704            .records()
705            .and_then(|r| r.keys().iter().flatten().min())
706            .unwrap_or(&[])
707    }
708
709    /// Scans the part and returns a lower bound on the contained keys.
710    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
711        self.updates.structured().and_then(|r| {
712            let ord = ArrayOrd::new(&r.key);
713            (0..r.key.len())
714                .min_by_key(|i| ord.at(*i))
715                .map(|i| ArrayBound::new(Arc::clone(&r.key), i))
716        })
717    }
718}
719
720impl<T: Timestamp + Codec64> From<ProtoU64Description> for Description<T> {
721    fn from(x: ProtoU64Description) -> Self {
722        Description::new(
723            x.lower
724                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
725            x.upper
726                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
727            x.since
728                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
729        )
730    }
731}
732
733impl<T: Timestamp + Codec64> From<ProtoU64Antichain> for Antichain<T> {
734    fn from(x: ProtoU64Antichain) -> Self {
735        Antichain::from(
736            x.elements
737                .into_iter()
738                .map(|x| T::decode(u64::to_le_bytes(x)))
739                .collect::<Vec<_>>(),
740        )
741    }
742}
743
744impl<T: Timestamp + Codec64> From<&Antichain<T>> for ProtoU64Antichain {
745    fn from(x: &Antichain<T>) -> Self {
746        ProtoU64Antichain {
747            elements: x
748                .elements()
749                .iter()
750                .map(|x| u64::from_le_bytes(T::encode(x)))
751                .collect(),
752        }
753    }
754}
755
756impl<T: Timestamp + Codec64> From<&Description<T>> for ProtoU64Description {
757    fn from(x: &Description<T>) -> Self {
758        ProtoU64Description {
759            lower: Some(x.lower().into()),
760            upper: Some(x.upper().into()),
761            since: Some(x.since().into()),
762        }
763    }
764}
765
766/// Encodes the inline metadata for a trace batch into a base64 string.
767pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(batch: &BlobTraceBatchPart<T>) -> String {
768    let (format, format_metadata) = match &batch.updates {
769        // For the legacy Row format, we only write it as `ParquetKvtd`.
770        BlobTraceUpdates::Row(_) => (ProtoBatchFormat::ParquetKvtd, None),
771        // For the newer structured format we track some metadata about the version of the format.
772        BlobTraceUpdates::Both { .. } => {
773            let metadata = ProtoFormatMetadata::StructuredMigration(2);
774            (ProtoBatchFormat::ParquetStructured, Some(metadata))
775        }
776        BlobTraceUpdates::Structured { .. } => {
777            let metadata = ProtoFormatMetadata::StructuredMigration(3);
778            (ProtoBatchFormat::ParquetStructured, Some(metadata))
779        }
780    };
781
782    let inline = ProtoBatchPartInline {
783        format: format.into(),
784        desc: Some((&batch.desc).into()),
785        index: batch.index,
786        format_metadata,
787    };
788    let inline_encoded = inline.encode_to_vec();
789    base64::encode(inline_encoded)
790}
791
792/// Decodes the inline metadata for a trace batch from a base64 string.
793pub fn decode_trace_inline_meta(
794    inline_base64: Option<&String>,
795) -> Result<(ProtoBatchFormat, ProtoBatchPartInline), Error> {
796    let inline_base64 = inline_base64.ok_or("missing batch metadata")?;
797    let inline_encoded = base64::decode(inline_base64).map_err(|err| err.to_string())?;
798    let inline = ProtoBatchPartInline::decode(&*inline_encoded).map_err(|err| err.to_string())?;
799    let format = ProtoBatchFormat::try_from(inline.format)
800        .map_err(|_| Error::from(format!("unknown format: {}", inline.format)))?;
801    Ok((format, inline))
802}
803
804#[cfg(test)]
805mod tests {
806    use std::sync::Arc;
807
808    use bytes::Bytes;
809
810    use crate::error::Error;
811    use crate::indexed::columnar::ColumnarRecordsBuilder;
812    use crate::mem::{MemBlob, MemBlobConfig};
813    use crate::metrics::ColumnarMetrics;
814    use crate::workload::DataGenerator;
815
816    use super::*;
817
818    fn update_with_key(ts: u64, key: &'static str) -> ((Vec<u8>, Vec<u8>), u64, i64) {
819        ((key.into(), "".into()), ts, 1)
820    }
821
822    fn u64_desc(lower: u64, upper: u64) -> Description<u64> {
823        Description::new(
824            Antichain::from_elem(lower),
825            Antichain::from_elem(upper),
826            Antichain::from_elem(0),
827        )
828    }
829
830    fn batch_meta(lower: u64, upper: u64) -> TraceBatchMeta {
831        TraceBatchMeta {
832            keys: vec![],
833            format: ProtoBatchFormat::Unknown,
834            desc: u64_desc(lower, upper),
835            level: 1,
836            size_bytes: 0,
837        }
838    }
839
840    fn u64_desc_since(lower: u64, upper: u64, since: u64) -> Description<u64> {
841        Description::new(
842            Antichain::from_elem(lower),
843            Antichain::from_elem(upper),
844            Antichain::from_elem(since),
845        )
846    }
847
848    fn columnar_records(updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)>) -> BlobTraceUpdates {
849        let mut builder = ColumnarRecordsBuilder::default();
850        for ((k, v), t, d) in updates {
851            assert!(builder.push(((&k, &v), Codec64::encode(&t), Codec64::encode(&d))));
852        }
853        let updates = builder.finish(&ColumnarMetrics::disconnected());
854        BlobTraceUpdates::Row(updates)
855    }
856
857    #[mz_ore::test]
858    fn trace_batch_validate() {
859        // Normal case
860        let b = BlobTraceBatchPart {
861            desc: u64_desc(0, 2),
862            index: 0,
863            updates: columnar_records(vec![update_with_key(0, "0"), update_with_key(1, "1")]),
864        };
865        assert_eq!(b.validate(), Ok(()));
866
867        // Empty
868        let b = BlobTraceBatchPart {
869            desc: u64_desc(0, 2),
870            index: 0,
871            updates: columnar_records(vec![]),
872        };
873        assert_eq!(b.validate(), Ok(()));
874
875        // Invalid desc
876        let b = BlobTraceBatchPart {
877            desc: u64_desc(2, 0),
878            index: 0,
879            updates: columnar_records(vec![]),
880        };
881        assert_eq!(
882            b.validate(),
883            Err(Error::from(
884                "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
885            ))
886        );
887
888        // Empty desc
889        let b = BlobTraceBatchPart {
890            desc: u64_desc(0, 0),
891            index: 0,
892            updates: columnar_records(vec![]),
893        };
894        assert_eq!(
895            b.validate(),
896            Err(Error::from(
897                "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
898            ))
899        );
900
901        // Update "before" desc
902        let b = BlobTraceBatchPart {
903            desc: u64_desc(1, 2),
904            index: 0,
905            updates: columnar_records(vec![update_with_key(0, "0")]),
906        };
907        assert_eq!(
908            b.validate(),
909            Err(Error::from(
910                "timestamp 0 is less than the batch lower: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
911            ))
912        );
913
914        // Update "after" desc
915        let b = BlobTraceBatchPart {
916            desc: u64_desc(1, 2),
917            index: 0,
918            updates: columnar_records(vec![update_with_key(2, "0")]),
919        };
920        assert_eq!(
921            b.validate(),
922            Err(Error::from(
923                "timestamp 2 is greater than or equal to the batch upper: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
924            ))
925        );
926
927        // Normal case: update "after" desc and within since
928        let b = BlobTraceBatchPart {
929            desc: u64_desc_since(1, 2, 4),
930            index: 0,
931            updates: columnar_records(vec![update_with_key(2, "0")]),
932        };
933        assert_eq!(b.validate(), Ok(()));
934
935        // Normal case: update "after" desc and at since
936        let b = BlobTraceBatchPart {
937            desc: u64_desc_since(1, 2, 4),
938            index: 0,
939            updates: columnar_records(vec![update_with_key(4, "0")]),
940        };
941        assert_eq!(b.validate(), Ok(()));
942
943        // Update "after" desc since
944        let b = BlobTraceBatchPart {
945            desc: u64_desc_since(1, 2, 4),
946            index: 0,
947            updates: columnar_records(vec![update_with_key(5, "0")]),
948        };
949        assert_eq!(b.validate(), Ok(()));
950
951        // Invalid update
952        let b = BlobTraceBatchPart {
953            desc: u64_desc(0, 1),
954            index: 0,
955            updates: columnar_records(vec![(("0".into(), "0".into()), 0, 0)]),
956        };
957        assert_eq!(
958            b.validate(),
959            Err(Error::from("update with 0 diff at row 0"))
960        );
961    }
962
963    #[mz_ore::test]
964    fn trace_batch_meta_validate() {
965        // Normal case
966        let b = batch_meta(0, 1);
967        assert_eq!(b.validate(), Ok(()));
968
969        // Empty interval
970        let b = batch_meta(0, 0);
971        assert_eq!(
972            b.validate(),
973            Err(Error::from(
974                "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
975            )),
976        );
977
978        // Invalid interval
979        let b = batch_meta(2, 0);
980        assert_eq!(
981            b.validate(),
982            Err(Error::from(
983                "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
984            )),
985        );
986    }
987
988    async fn expect_set_trace_batch<T: Timestamp + Codec64>(
989        blob: &dyn Blob,
990        key: &str,
991        batch: &BlobTraceBatchPart<T>,
992    ) -> u64 {
993        let mut val = Vec::new();
994        let metrics = ColumnarMetrics::disconnected();
995        let config = EncodingConfig::default();
996        batch.encode(&mut val, &metrics, &config);
997        let val = Bytes::from(val);
998        let val_len = u64::cast_from(val.len());
999        blob.set(key, val).await.expect("failed to set trace batch");
1000        val_len
1001    }
1002
1003    #[mz_ore::test(tokio::test)]
1004    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1005    async fn trace_batch_meta_validate_data() -> Result<(), Error> {
1006        let metrics = ColumnarMetrics::disconnected();
1007        let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1008        let format = ProtoBatchFormat::ParquetKvtd;
1009
1010        let batch_desc = u64_desc_since(0, 3, 0);
1011        let batch0 = BlobTraceBatchPart {
1012            desc: batch_desc.clone(),
1013            index: 0,
1014            updates: columnar_records(vec![
1015                (("k".as_bytes().to_vec(), "v".as_bytes().to_vec()), 2, 1),
1016                (("k3".as_bytes().to_vec(), "v3".as_bytes().to_vec()), 2, 1),
1017            ]),
1018        };
1019        let batch1 = BlobTraceBatchPart {
1020            desc: batch_desc.clone(),
1021            index: 1,
1022            updates: columnar_records(vec![
1023                (("k4".as_bytes().to_vec(), "v4".as_bytes().to_vec()), 2, 1),
1024                (("k5".as_bytes().to_vec(), "v5".as_bytes().to_vec()), 2, 1),
1025            ]),
1026        };
1027
1028        let batch0_size_bytes = expect_set_trace_batch(blob.as_ref(), "b0", &batch0).await;
1029        let batch1_size_bytes = expect_set_trace_batch(blob.as_ref(), "b1", &batch1).await;
1030        let size_bytes = batch0_size_bytes + batch1_size_bytes;
1031        let batch_meta = TraceBatchMeta {
1032            keys: vec!["b0".into(), "b1".into()],
1033            format,
1034            desc: batch_desc.clone(),
1035            level: 0,
1036            size_bytes,
1037        };
1038
1039        // Normal case:
1040        assert_eq!(
1041            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1042            Ok(())
1043        );
1044
1045        // Incorrect desc
1046        let batch_meta = TraceBatchMeta {
1047            keys: vec!["b0".into(), "b1".into()],
1048            format,
1049            desc: u64_desc_since(1, 3, 0),
1050            level: 0,
1051            size_bytes,
1052        };
1053        assert_eq!(
1054            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1055            Err(Error::from(
1056                "invalid trace batch part desc expected Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } } got Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } }"
1057            ))
1058        );
1059        // Key with no corresponding batch part
1060        let batch_meta = TraceBatchMeta {
1061            keys: vec!["b0".into(), "b1".into(), "b2".into()],
1062            format,
1063            desc: batch_desc.clone(),
1064            level: 0,
1065            size_bytes,
1066        };
1067        assert_eq!(
1068            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1069            Err(Error::from("no blob for trace batch at key: b2"))
1070        );
1071        // Batch parts not in index order
1072        let batch_meta = TraceBatchMeta {
1073            keys: vec!["b1".into(), "b0".into()],
1074            format,
1075            desc: batch_desc,
1076            level: 0,
1077            size_bytes,
1078        };
1079        assert_eq!(
1080            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1081            Err(Error::from(
1082                "invalid index for blob trace batch part at key b1 expected 0 got 1"
1083            ))
1084        );
1085
1086        Ok(())
1087    }
1088
1089    #[mz_ore::test]
1090    #[cfg_attr(miri, ignore)] // too slow
1091    fn encoded_batch_sizes() {
1092        fn sizes(data: DataGenerator) -> usize {
1093            let metrics = ColumnarMetrics::disconnected();
1094            let config = EncodingConfig::default();
1095            let updates: Vec<_> = data.batches().collect();
1096            let updates = BlobTraceUpdates::Row(ColumnarRecords::concat(&updates, &metrics));
1097            let trace = BlobTraceBatchPart {
1098                desc: Description::new(
1099                    Antichain::from_elem(0u64),
1100                    Antichain::new(),
1101                    Antichain::from_elem(0u64),
1102                ),
1103                index: 0,
1104                updates,
1105            };
1106            let mut trace_buf = Vec::new();
1107            trace.encode(&mut trace_buf, &metrics, &config);
1108            trace_buf.len()
1109        }
1110
1111        let record_size_bytes = DataGenerator::default().record_size_bytes;
1112        // Print all the sizes into one assert so we only have to update one
1113        // place if sizes change.
1114        assert_eq!(
1115            format!(
1116                "1/1={:?} 25/1={:?} 1000/1={:?} 1000/100={:?}",
1117                sizes(DataGenerator::new(1, record_size_bytes, 1)),
1118                sizes(DataGenerator::new(25, record_size_bytes, 25)),
1119                sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)),
1120                sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)),
1121            ),
1122            "1/1=867 25/1=2613 1000/1=72845 1000/100=72845"
1123        );
1124    }
1125}