mz_persist/indexed/
encoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data structures stored in Blobs and Logs in serialized form.
11
12// NB: Everything starting with Blob* is directly serialized as a Blob value.
13// Ditto for Log* and the Log. The others are used internally in these top-level
14// structs.
15
16use std::fmt::{self, Debug};
17use std::sync::Arc;
18
19use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, Int64Array};
20use arrow::buffer::OffsetBuffer;
21use arrow::datatypes::{DataType, Int64Type, ToByteSlice};
22use base64::Engine;
23use bytes::{BufMut, Bytes};
24use differential_dataflow::trace::Description;
25use mz_ore::bytes::SegmentedBytes;
26use mz_ore::cast::CastFrom;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_panic_or_log;
29use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
30use mz_persist_types::columnar::{
31    ColumnEncoder, Schema, codec_to_schema, data_type, schema_to_codec,
32};
33use mz_persist_types::parquet::EncodingConfig;
34use mz_persist_types::part::Part;
35use mz_persist_types::schema::backward_compatible;
36use mz_persist_types::{Codec, Codec64};
37use mz_proto::{RustType, TryFromProtoError};
38use proptest::arbitrary::Arbitrary;
39use proptest::prelude::*;
40use proptest::strategy::{BoxedStrategy, Just};
41use prost::Message;
42use serde::Serialize;
43use timely::PartialOrder;
44use timely::progress::{Antichain, Timestamp};
45use tracing::error;
46
47use crate::error::Error;
48use crate::generated::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
49use crate::generated::persist::{
50    ProtoBatchFormat, ProtoBatchPartInline, ProtoColumnarRecords, ProtoU64Antichain,
51    ProtoU64Description,
52};
53use crate::indexed::columnar::arrow::realloc_array;
54use crate::indexed::columnar::parquet::{decode_trace_parquet, encode_trace_parquet};
55use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
56use crate::location::Blob;
57use crate::metrics::ColumnarMetrics;
58
59/// Column format of a batch.
60#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize)]
61pub enum BatchColumnarFormat {
62    /// Rows are encoded to `ProtoRow` and then a batch is written down as a Parquet with a schema
63    /// of `(k, v, t, d)`, where `k` are the serialized bytes.
64    Row,
65    /// Rows are encoded to `ProtoRow` and a columnar struct. The batch is written down as Parquet
66    /// with a schema of `(k, k_c, v, v_c, t, d)`, where `k` are the serialized bytes and `k_c` is
67    /// nested columnar data.
68    Both(usize),
69    /// Rows are encoded to a columnar struct. The batch is written down as Parquet
70    /// with a schema of `(t, d, k_s, v_s)`, where `k_s` is nested columnar data.
71    Structured,
72}
73
74impl BatchColumnarFormat {
75    /// Returns a default value for [`BatchColumnarFormat`].
76    pub const fn default() -> Self {
77        BatchColumnarFormat::Both(2)
78    }
79
80    /// Returns a [`BatchColumnarFormat`] for a given `&str`, falling back to a default value if
81    /// provided `&str` is invalid.
82    pub fn from_str(s: &str) -> Self {
83        match s {
84            "row" => BatchColumnarFormat::Row,
85            "both" => BatchColumnarFormat::Both(0),
86            "both_v2" => BatchColumnarFormat::Both(2),
87            "structured" => BatchColumnarFormat::Structured,
88            x => {
89                let default = BatchColumnarFormat::default();
90                soft_panic_or_log!("Invalid batch columnar type: {x}, falling back to {default}");
91                default
92            }
93        }
94    }
95
96    /// Returns a string representation for the [`BatchColumnarFormat`].
97    pub const fn as_str(&self) -> &'static str {
98        match self {
99            BatchColumnarFormat::Row => "row",
100            BatchColumnarFormat::Both(0 | 1) => "both",
101            BatchColumnarFormat::Both(2) => "both_v2",
102            _ => panic!("unknown batch columnar format"),
103        }
104    }
105
106    /// Returns if we should encode a Batch in a structured format.
107    pub const fn is_structured(&self) -> bool {
108        match self {
109            BatchColumnarFormat::Row => false,
110            // The V0 format has been deprecated and we ignore its structured columns.
111            BatchColumnarFormat::Both(0 | 1) => false,
112            BatchColumnarFormat::Both(_) => true,
113            BatchColumnarFormat::Structured => true,
114        }
115    }
116}
117
118impl fmt::Display for BatchColumnarFormat {
119    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120        f.write_str(self.as_str())
121    }
122}
123
124impl Arbitrary for BatchColumnarFormat {
125    type Parameters = ();
126    type Strategy = BoxedStrategy<BatchColumnarFormat>;
127
128    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
129        proptest::strategy::Union::new(vec![
130            Just(BatchColumnarFormat::Row).boxed(),
131            Just(BatchColumnarFormat::Both(0)).boxed(),
132            Just(BatchColumnarFormat::Both(1)).boxed(),
133        ])
134        .boxed()
135    }
136}
137
138/// The metadata necessary to reconstruct a list of [BlobTraceBatchPart]s.
139///
140/// Invariants:
141/// - The Description's time interval is non-empty.
142/// - Keys for all trace batch parts are unique.
143/// - Keys for all trace batch parts are stored in index order.
144/// - The data in all of the trace batch parts is sorted and consolidated.
145/// - All of the trace batch parts have the same desc as the metadata.
146#[derive(Clone, Debug, Eq, PartialEq)]
147pub struct TraceBatchMeta {
148    /// The keys to retrieve the batch's data from the blob store.
149    ///
150    /// The set of keys can be empty to denote an empty batch.
151    pub keys: Vec<String>,
152    /// The format of the stored batch data.
153    pub format: ProtoBatchFormat,
154    /// The half-open time interval `[lower, upper)` this batch contains data
155    /// for.
156    pub desc: Description<u64>,
157    /// The compaction level of each batch.
158    pub level: u64,
159    /// Size of the encoded batch.
160    pub size_bytes: u64,
161}
162
163/// The structure serialized and stored as a value in
164/// [crate::location::Blob] storage for data keys corresponding to trace
165/// data.
166///
167/// This batch represents the data that was originally written at some time in
168/// [lower, upper) (more precisely !< lower and < upper). The individual record
169/// times may have later been advanced by compaction to something <= since. This
170/// means the ability to reconstruct the state of the collection at times <
171/// since has been lost. However, there may still be records present in the
172/// batch whose times are < since. Users iterating through updates must take
173/// care to advance records with times < since to since in order to correctly
174/// answer queries at times >= since.
175///
176/// Invariants:
177/// - The [lower, upper) interval of times in desc is non-empty.
178/// - The timestamp of each update is >= to desc.lower().
179/// - The timestamp of each update is < desc.upper() iff desc.upper() >
180///   desc.since(). Otherwise the timestamp of each update is <= desc.since().
181/// - The values in updates are sorted by (key, value, time).
182/// - The values in updates are "consolidated", i.e. (key, value, time) is
183///   unique.
184/// - All entries have a non-zero diff.
185///
186/// TODO: disallow empty trace batch parts in the future so there is one unique
187/// way to represent an empty trace batch.
188#[derive(Clone, Debug, PartialEq)]
189pub struct BlobTraceBatchPart<T> {
190    /// Which updates are included in this batch.
191    ///
192    /// There may be other parts for the batch that also contain updates within
193    /// the specified [lower, upper) range.
194    pub desc: Description<T>,
195    /// Index of this part in the list of parts that form the batch.
196    pub index: u64,
197    /// The updates themselves.
198    pub updates: BlobTraceUpdates,
199}
200
201/// The set of updates that are part of a [`BlobTraceBatchPart`].
202#[derive(Clone, Debug, PartialEq)]
203pub enum BlobTraceUpdates {
204    /// Legacy format. Keys and Values are encoded into bytes via [`Codec`], then stored in our own
205    /// columnar-esque struct.
206    ///
207    /// [`Codec`]: mz_persist_types::Codec
208    Row(ColumnarRecords),
209    /// Migration format. Keys and Values are encoded into bytes via [`Codec`] and structured into
210    /// an Apache Arrow columnar format.
211    ///
212    /// [`Codec`]: mz_persist_types::Codec
213    Both(ColumnarRecords, ColumnarRecordsStructuredExt),
214    /// New-style structured format, including structured representations of the K/V columns and
215    /// the usual timestamp / diff encoding.
216    Structured {
217        /// Key-value data.
218        key_values: ColumnarRecordsStructuredExt,
219        /// Timestamp data.
220        timestamps: Int64Array,
221        /// Diffs.
222        diffs: Int64Array,
223    },
224}
225
226impl BlobTraceUpdates {
227    /// Convert from a [Part].
228    pub fn from_part(part: Part) -> Self {
229        Self::Structured {
230            key_values: ColumnarRecordsStructuredExt {
231                key: part.key,
232                val: part.val,
233            },
234            timestamps: part.time,
235            diffs: part.diff,
236        }
237    }
238
239    /// The number of updates.
240    pub fn len(&self) -> usize {
241        match self {
242            BlobTraceUpdates::Row(c) => c.len(),
243            BlobTraceUpdates::Both(c, _structured) => c.len(),
244            BlobTraceUpdates::Structured { timestamps, .. } => timestamps.len(),
245        }
246    }
247
248    /// The updates' timestamps as an integer array.
249    pub fn timestamps(&self) -> &Int64Array {
250        match self {
251            BlobTraceUpdates::Row(c) => c.timestamps(),
252            BlobTraceUpdates::Both(c, _structured) => c.timestamps(),
253            BlobTraceUpdates::Structured { timestamps, .. } => timestamps,
254        }
255    }
256
257    /// The updates' diffs as an integer array.
258    pub fn diffs(&self) -> &Int64Array {
259        match self {
260            BlobTraceUpdates::Row(c) => c.diffs(),
261            BlobTraceUpdates::Both(c, _structured) => c.diffs(),
262            BlobTraceUpdates::Structured { diffs, .. } => diffs,
263        }
264    }
265
266    /// Return the [`ColumnarRecords`] of the blob.
267    pub fn records(&self) -> Option<&ColumnarRecords> {
268        match self {
269            BlobTraceUpdates::Row(c) => Some(c),
270            BlobTraceUpdates::Both(c, _structured) => Some(c),
271            BlobTraceUpdates::Structured { .. } => None,
272        }
273    }
274
275    /// Return the [`ColumnarRecordsStructuredExt`] of the blob.
276    pub fn structured(&self) -> Option<&ColumnarRecordsStructuredExt> {
277        match self {
278            BlobTraceUpdates::Row(_) => None,
279            BlobTraceUpdates::Both(_, s) => Some(s),
280            BlobTraceUpdates::Structured { key_values, .. } => Some(key_values),
281        }
282    }
283
284    /// Return the estimated memory usage of the raw data.
285    pub fn goodbytes(&self) -> usize {
286        match self {
287            BlobTraceUpdates::Row(c) => c.goodbytes(),
288            // NB: we only report goodbytes for columnar records here, to avoid
289            // dual-counting the same records. (This means that our goodput % is much lower
290            // during the migration, which is an accurate reflection of reality.)
291            BlobTraceUpdates::Both(c, _) => c.goodbytes(),
292            BlobTraceUpdates::Structured {
293                key_values,
294                timestamps,
295                diffs,
296            } => {
297                key_values.goodbytes()
298                    + timestamps.values().to_byte_slice().len()
299                    + diffs.values().to_byte_slice().len()
300            }
301        }
302    }
303
304    /// Return the [ColumnarRecords] of the blob, generating it if it does not exist.
305    pub fn get_or_make_codec<K: Codec, V: Codec>(
306        &mut self,
307        key_schema: &K::Schema,
308        val_schema: &V::Schema,
309    ) -> &ColumnarRecords {
310        match self {
311            BlobTraceUpdates::Row(records) => records,
312            BlobTraceUpdates::Both(records, _) => records,
313            BlobTraceUpdates::Structured {
314                key_values,
315                timestamps,
316                diffs,
317            } => {
318                let key = schema_to_codec::<K>(key_schema, &*key_values.key).expect("valid keys");
319                let val = schema_to_codec::<V>(val_schema, &*key_values.val).expect("valid values");
320                let records = ColumnarRecords::new(key, val, timestamps.clone(), diffs.clone());
321
322                *self = BlobTraceUpdates::Both(records, key_values.clone());
323                let BlobTraceUpdates::Both(records, _) = self else {
324                    unreachable!("set to BlobTraceUpdates::Both in previous line")
325                };
326                records
327            }
328        }
329    }
330
331    /// Return the [`ColumnarRecordsStructuredExt`] of the blob.
332    pub fn get_or_make_structured<K: Codec, V: Codec>(
333        &mut self,
334        key_schema: &K::Schema,
335        val_schema: &V::Schema,
336    ) -> &ColumnarRecordsStructuredExt {
337        let structured = match self {
338            BlobTraceUpdates::Row(records) => {
339                let key = codec_to_schema::<K>(key_schema, records.keys()).expect("valid keys");
340                let val = codec_to_schema::<V>(val_schema, records.vals()).expect("valid values");
341
342                *self = BlobTraceUpdates::Both(
343                    records.clone(),
344                    ColumnarRecordsStructuredExt { key, val },
345                );
346                let BlobTraceUpdates::Both(_, structured) = self else {
347                    unreachable!("set to BlobTraceUpdates::Both in previous line")
348                };
349                structured
350            }
351            BlobTraceUpdates::Both(_, structured) => structured,
352            BlobTraceUpdates::Structured { key_values, .. } => key_values,
353        };
354
355        // If the types don't match, attempt to migrate the array to the new type.
356        // We expect this to succeed, since this should only be called with backwards-
357        // compatible schemas... but if it fails we only log, and let some higher-level
358        // code signal the error if it cares.
359        let migrate = |array: &mut ArrayRef, to_type: DataType| {
360            // TODO: Plumb down the SchemaCache and use it here for the array migrations.
361            let from_type = array.data_type().clone();
362            if from_type != to_type {
363                if let Some(migration) = backward_compatible(&from_type, &to_type) {
364                    *array = migration.migrate(Arc::clone(array));
365                } else {
366                    error!(
367                        ?from_type,
368                        ?to_type,
369                        "failed to migrate array type; backwards-incompatible schema migration?"
370                    );
371                }
372            }
373        };
374        migrate(
375            &mut structured.key,
376            data_type::<K>(key_schema).expect("valid key schema"),
377        );
378        migrate(
379            &mut structured.val,
380            data_type::<V>(val_schema).expect("valid value schema"),
381        );
382
383        structured
384    }
385
386    /// Convert this blob into a structured part, transforming the codec data if necessary.
387    pub fn into_part<K: Codec, V: Codec>(
388        &mut self,
389        key_schema: &K::Schema,
390        val_schema: &V::Schema,
391    ) -> Part {
392        let ext = self
393            .get_or_make_structured::<K, V>(key_schema, val_schema)
394            .clone();
395        Part {
396            key: ext.key,
397            val: ext.val,
398            time: self.timestamps().clone(),
399            diff: self.diffs().clone(),
400        }
401    }
402
403    /// Concatenate the given records together, column-by-column.
404    ///
405    /// If `ensure_codec` is true, then we'll ensure the returned [`BlobTraceUpdates`] includes
406    /// [`Codec`] data, re-encoding structured data if necessary.
407    pub fn concat<K: Codec, V: Codec>(
408        mut updates: Vec<BlobTraceUpdates>,
409        key_schema: &K::Schema,
410        val_schema: &V::Schema,
411        metrics: &ColumnarMetrics,
412    ) -> anyhow::Result<BlobTraceUpdates> {
413        match updates.len() {
414            0 => {
415                return Ok(BlobTraceUpdates::Structured {
416                    key_values: ColumnarRecordsStructuredExt {
417                        key: Arc::new(key_schema.encoder().expect("valid schema").finish()),
418                        val: Arc::new(val_schema.encoder().expect("valid schema").finish()),
419                    },
420                    timestamps: Int64Array::from_iter_values(vec![]),
421                    diffs: Int64Array::from_iter_values(vec![]),
422                });
423            }
424            1 => return Ok(updates.into_iter().into_element()),
425            _ => {}
426        }
427
428        // Always get or make our structured format.
429        let mut keys = Vec::with_capacity(updates.len());
430        let mut vals = Vec::with_capacity(updates.len());
431        for updates in &mut updates {
432            let structured = updates.get_or_make_structured::<K, V>(key_schema, val_schema);
433            keys.push(structured.key.as_ref());
434            vals.push(structured.val.as_ref());
435        }
436        let key_values = ColumnarRecordsStructuredExt {
437            key: ::arrow::compute::concat(&keys)?,
438            val: ::arrow::compute::concat(&vals)?,
439        };
440
441        // If necessary, ensure we have `Codec` data, which internally includes
442        // timestamps and diffs, otherwise get the timestamps and diffs separately.
443        let mut timestamps: Vec<&dyn Array> = Vec::with_capacity(updates.len());
444        let mut diffs: Vec<&dyn Array> = Vec::with_capacity(updates.len());
445
446        for update in &updates {
447            timestamps.push(update.timestamps());
448            diffs.push(update.diffs());
449        }
450        let timestamps = ::arrow::compute::concat(&timestamps)?
451            .as_primitive_opt::<Int64Type>()
452            .ok_or_else(|| anyhow::anyhow!("timestamps changed Array type"))?
453            .clone();
454        let diffs = ::arrow::compute::concat(&diffs)?
455            .as_primitive_opt::<Int64Type>()
456            .ok_or_else(|| anyhow::anyhow!("diffs changed Array type"))?
457            .clone();
458
459        let out = Self::Structured {
460            key_values,
461            timestamps,
462            diffs,
463        };
464        metrics
465            .arrow
466            .concat_bytes
467            .inc_by(u64::cast_from(out.goodbytes()));
468        Ok(out)
469    }
470
471    /// See [RustType::from_proto].
472    pub fn from_proto(
473        lgbytes: &ColumnarMetrics,
474        proto: ProtoColumnarRecords,
475    ) -> Result<Self, TryFromProtoError> {
476        let binary_array = |data: Bytes, offsets: Vec<i32>| {
477            if offsets.is_empty() && proto.len > 0 {
478                return Ok(None);
479            };
480            match BinaryArray::try_new(
481                OffsetBuffer::new(offsets.into()),
482                ::arrow::buffer::Buffer::from_bytes(data.into()),
483                None,
484            ) {
485                Ok(data) => Ok(Some(realloc_array(&data, lgbytes))),
486                Err(e) => Err(TryFromProtoError::InvalidFieldError(format!(
487                    "Unable to decode binary array from repeated proto fields: {e:?}"
488                ))),
489            }
490        };
491
492        let codec_key = binary_array(proto.key_data, proto.key_offsets)?;
493        let codec_val = binary_array(proto.val_data, proto.val_offsets)?;
494
495        let timestamps = realloc_array(&proto.timestamps.into(), lgbytes);
496        let diffs = realloc_array(&proto.diffs.into(), lgbytes);
497        let ext =
498            ColumnarRecordsStructuredExt::from_proto(proto.key_structured, proto.val_structured)?;
499
500        let updates = match (codec_key, codec_val, ext) {
501            (Some(codec_key), Some(codec_val), Some(ext)) => BlobTraceUpdates::Both(
502                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
503                ext,
504            ),
505            (Some(codec_key), Some(codec_val), None) => BlobTraceUpdates::Row(
506                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
507            ),
508            (None, None, Some(ext)) => BlobTraceUpdates::Structured {
509                key_values: ext,
510                timestamps,
511                diffs,
512            },
513            (k, v, ext) => {
514                return Err(TryFromProtoError::InvalidPersistState(format!(
515                    "unexpected mix of key/value columns: k={:?}, v={}, ext={}",
516                    k.is_some(),
517                    v.is_some(),
518                    ext.is_some(),
519                )));
520            }
521        };
522
523        Ok(updates)
524    }
525
526    /// See [RustType::into_proto].
527    pub fn into_proto(&self) -> ProtoColumnarRecords {
528        let (key_offsets, key_data, val_offsets, val_data) = match self.records() {
529            None => (vec![], Bytes::new(), vec![], Bytes::new()),
530            Some(records) => (
531                records.keys().offsets().to_vec(),
532                Bytes::copy_from_slice(records.keys().value_data()),
533                records.vals().offsets().to_vec(),
534                Bytes::copy_from_slice(records.vals().value_data()),
535            ),
536        };
537        let (k_struct, v_struct) = match self.structured().map(|x| x.into_proto()) {
538            None => (None, None),
539            Some((k, v)) => (Some(k), Some(v)),
540        };
541
542        ProtoColumnarRecords {
543            len: self.len().into_proto(),
544            key_offsets,
545            key_data,
546            val_offsets,
547            val_data,
548            timestamps: self.timestamps().values().to_vec(),
549            diffs: self.diffs().values().to_vec(),
550            key_structured: k_struct,
551            val_structured: v_struct,
552        }
553    }
554
555    /// Convert these updates into the specified batch format, re-encoding or discarding key-value
556    /// data as necessary.
557    pub fn as_structured<K: Codec, V: Codec>(
558        &self,
559        key_schema: &K::Schema,
560        val_schema: &V::Schema,
561    ) -> Self {
562        let mut this = self.clone();
563        Self::Structured {
564            key_values: this
565                .get_or_make_structured::<K, V>(key_schema, val_schema)
566                .clone(),
567            timestamps: this.timestamps().clone(),
568            diffs: this.diffs().clone(),
569        }
570    }
571}
572
573impl TraceBatchMeta {
574    /// Asserts Self's documented invariants, returning an error if any are
575    /// violated.
576    pub fn validate(&self) -> Result<(), Error> {
577        // TODO: It's unclear if the equal case (an empty desc) is
578        // useful/harmful. Feel free to make this a less_than if empty descs end
579        // up making sense.
580        if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
581            return Err(format!("invalid desc: {:?}", &self.desc).into());
582        }
583
584        Ok(())
585    }
586
587    /// Assert that all of the [BlobTraceBatchPart]'s obey the required invariants.
588    pub async fn validate_data(
589        &self,
590        blob: &dyn Blob,
591        metrics: &ColumnarMetrics,
592    ) -> Result<(), Error> {
593        let mut batches = vec![];
594        for (idx, key) in self.keys.iter().enumerate() {
595            let value = blob
596                .get(key)
597                .await?
598                .ok_or_else(|| Error::from(format!("no blob for trace batch at key: {}", key)))?;
599            let batch = BlobTraceBatchPart::decode(&value, metrics)?;
600            if batch.desc != self.desc {
601                return Err(format!(
602                    "invalid trace batch part desc expected {:?} got {:?}",
603                    &self.desc, &batch.desc
604                )
605                .into());
606            }
607
608            if batch.index != u64::cast_from(idx) {
609                return Err(format!(
610                    "invalid index for blob trace batch part at key {} expected {} got {}",
611                    key, idx, batch.index
612                )
613                .into());
614            }
615
616            batch.validate()?;
617            batches.push(batch);
618        }
619
620        for (batch_idx, batch) in batches.iter().enumerate() {
621            for (row_idx, diff) in batch.updates.diffs().values().iter().enumerate() {
622                // TODO: Don't assume diff is an i64, take a D type param instead.
623                let diff: u64 = Codec64::decode(diff.to_le_bytes());
624
625                // Check data invariants.
626                if diff == 0 {
627                    return Err(format!(
628                        "update with 0 diff in batch {batch_idx} at row {row_idx}",
629                    )
630                    .into());
631                }
632            }
633        }
634
635        Ok(())
636    }
637}
638
639impl<T: Timestamp + Codec64> BlobTraceBatchPart<T> {
640    /// Asserts the documented invariants, returning an error if any are
641    /// violated.
642    pub fn validate(&self) -> Result<(), Error> {
643        // TODO: It's unclear if the equal case (an empty desc) is
644        // useful/harmful. Feel free to make this a less_than if empty descs end
645        // up making sense.
646        if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
647            return Err(format!("invalid desc: {:?}", &self.desc).into());
648        }
649
650        let uncompacted = PartialOrder::less_equal(self.desc.since(), self.desc.lower());
651
652        for time in self.updates.timestamps().values() {
653            let ts = T::decode(time.to_le_bytes());
654            // Check ts against desc.
655            if !self.desc.lower().less_equal(&ts) {
656                return Err(format!(
657                    "timestamp {:?} is less than the batch lower: {:?}",
658                    ts, self.desc
659                )
660                .into());
661            }
662
663            // when since is less than or equal to lower, the upper is a strict bound on the updates'
664            // timestamp because no compaction has been performed. Because user batches are always
665            // uncompacted, this ensures that new updates are recorded with valid timestamps.
666            // Otherwise, we can make no assumptions about the timestamps
667            if uncompacted && self.desc.upper().less_equal(&ts) {
668                return Err(format!(
669                    "timestamp {:?} is greater than or equal to the batch upper: {:?}",
670                    ts, self.desc
671                )
672                .into());
673            }
674        }
675
676        for (row_idx, diff) in self.updates.diffs().values().iter().enumerate() {
677            // TODO: Don't assume diff is an i64, take a D type param instead.
678            let diff: u64 = Codec64::decode(diff.to_le_bytes());
679
680            // Check data invariants.
681            if diff == 0 {
682                return Err(format!("update with 0 diff at row {row_idx}",).into());
683            }
684        }
685
686        Ok(())
687    }
688
689    /// Encodes an BlobTraceBatchPart into the Parquet format.
690    pub fn encode<B>(&self, buf: &mut B, metrics: &ColumnarMetrics, cfg: &EncodingConfig)
691    where
692        B: BufMut + Send,
693    {
694        encode_trace_parquet(&mut buf.writer(), self, metrics, cfg).expect("batch was invalid");
695    }
696
697    /// Decodes a BlobTraceBatchPart from the Parquet format.
698    pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result<Self, Error> {
699        decode_trace_parquet(buf.clone(), metrics)
700    }
701
702    /// Scans the part and returns a lower bound on the contained keys.
703    pub fn key_lower(&self) -> &[u8] {
704        self.updates
705            .records()
706            .and_then(|r| r.keys().iter().flatten().min())
707            .unwrap_or(&[])
708    }
709
710    /// Scans the part and returns a lower bound on the contained keys.
711    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
712        self.updates.structured().and_then(|r| {
713            let ord = ArrayOrd::new(&r.key);
714            (0..r.key.len())
715                .min_by_key(|i| ord.at(*i))
716                .map(|i| ArrayBound::new(Arc::clone(&r.key), i))
717        })
718    }
719}
720
721impl<T: Timestamp + Codec64> From<ProtoU64Description> for Description<T> {
722    fn from(x: ProtoU64Description) -> Self {
723        Description::new(
724            x.lower
725                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
726            x.upper
727                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
728            x.since
729                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
730        )
731    }
732}
733
734impl<T: Timestamp + Codec64> From<ProtoU64Antichain> for Antichain<T> {
735    fn from(x: ProtoU64Antichain) -> Self {
736        Antichain::from(
737            x.elements
738                .into_iter()
739                .map(|x| T::decode(u64::to_le_bytes(x)))
740                .collect::<Vec<_>>(),
741        )
742    }
743}
744
745impl<T: Timestamp + Codec64> From<&Antichain<T>> for ProtoU64Antichain {
746    fn from(x: &Antichain<T>) -> Self {
747        ProtoU64Antichain {
748            elements: x
749                .elements()
750                .iter()
751                .map(|x| u64::from_le_bytes(T::encode(x)))
752                .collect(),
753        }
754    }
755}
756
757impl<T: Timestamp + Codec64> From<&Description<T>> for ProtoU64Description {
758    fn from(x: &Description<T>) -> Self {
759        ProtoU64Description {
760            lower: Some(x.lower().into()),
761            upper: Some(x.upper().into()),
762            since: Some(x.since().into()),
763        }
764    }
765}
766
767/// Encodes the inline metadata for a trace batch into a base64 string.
768pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(batch: &BlobTraceBatchPart<T>) -> String {
769    let (format, format_metadata) = match &batch.updates {
770        // For the legacy Row format, we only write it as `ParquetKvtd`.
771        BlobTraceUpdates::Row(_) => (ProtoBatchFormat::ParquetKvtd, None),
772        // For the newer structured format we track some metadata about the version of the format.
773        BlobTraceUpdates::Both { .. } => {
774            let metadata = ProtoFormatMetadata::StructuredMigration(2);
775            (ProtoBatchFormat::ParquetStructured, Some(metadata))
776        }
777        BlobTraceUpdates::Structured { .. } => {
778            let metadata = ProtoFormatMetadata::StructuredMigration(3);
779            (ProtoBatchFormat::ParquetStructured, Some(metadata))
780        }
781    };
782
783    let inline = ProtoBatchPartInline {
784        format: format.into(),
785        desc: Some((&batch.desc).into()),
786        index: batch.index,
787        format_metadata,
788    };
789    let inline_encoded = inline.encode_to_vec();
790    base64::engine::general_purpose::STANDARD.encode(inline_encoded)
791}
792
793/// Decodes the inline metadata for a trace batch from a base64 string.
794pub fn decode_trace_inline_meta(
795    inline_base64: Option<&String>,
796) -> Result<(ProtoBatchFormat, ProtoBatchPartInline), Error> {
797    let inline_base64 = inline_base64.ok_or("missing batch metadata")?;
798    let inline_encoded = base64::engine::general_purpose::STANDARD
799        .decode(inline_base64)
800        .map_err(|err| err.to_string())?;
801    let inline = ProtoBatchPartInline::decode(&*inline_encoded).map_err(|err| err.to_string())?;
802    let format = ProtoBatchFormat::try_from(inline.format)
803        .map_err(|_| Error::from(format!("unknown format: {}", inline.format)))?;
804    Ok((format, inline))
805}
806
807#[cfg(test)]
808mod tests {
809    use std::sync::Arc;
810
811    use bytes::Bytes;
812
813    use crate::error::Error;
814    use crate::indexed::columnar::ColumnarRecordsBuilder;
815    use crate::mem::{MemBlob, MemBlobConfig};
816    use crate::metrics::ColumnarMetrics;
817    use crate::workload::DataGenerator;
818
819    use super::*;
820
821    fn update_with_key(ts: u64, key: &'static str) -> ((Vec<u8>, Vec<u8>), u64, i64) {
822        ((key.into(), "".into()), ts, 1)
823    }
824
825    fn u64_desc(lower: u64, upper: u64) -> Description<u64> {
826        Description::new(
827            Antichain::from_elem(lower),
828            Antichain::from_elem(upper),
829            Antichain::from_elem(0),
830        )
831    }
832
833    fn batch_meta(lower: u64, upper: u64) -> TraceBatchMeta {
834        TraceBatchMeta {
835            keys: vec![],
836            format: ProtoBatchFormat::Unknown,
837            desc: u64_desc(lower, upper),
838            level: 1,
839            size_bytes: 0,
840        }
841    }
842
843    fn u64_desc_since(lower: u64, upper: u64, since: u64) -> Description<u64> {
844        Description::new(
845            Antichain::from_elem(lower),
846            Antichain::from_elem(upper),
847            Antichain::from_elem(since),
848        )
849    }
850
851    fn columnar_records(updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)>) -> BlobTraceUpdates {
852        let mut builder = ColumnarRecordsBuilder::default();
853        for ((k, v), t, d) in updates {
854            assert!(builder.push(((&k, &v), Codec64::encode(&t), Codec64::encode(&d))));
855        }
856        let updates = builder.finish(&ColumnarMetrics::disconnected());
857        BlobTraceUpdates::Row(updates)
858    }
859
860    #[mz_ore::test]
861    fn trace_batch_validate() {
862        // Normal case
863        let b = BlobTraceBatchPart {
864            desc: u64_desc(0, 2),
865            index: 0,
866            updates: columnar_records(vec![update_with_key(0, "0"), update_with_key(1, "1")]),
867        };
868        assert_eq!(b.validate(), Ok(()));
869
870        // Empty
871        let b = BlobTraceBatchPart {
872            desc: u64_desc(0, 2),
873            index: 0,
874            updates: columnar_records(vec![]),
875        };
876        assert_eq!(b.validate(), Ok(()));
877
878        // Invalid desc
879        let b = BlobTraceBatchPart {
880            desc: u64_desc(2, 0),
881            index: 0,
882            updates: columnar_records(vec![]),
883        };
884        assert_eq!(
885            b.validate(),
886            Err(Error::from(
887                "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
888            ))
889        );
890
891        // Empty desc
892        let b = BlobTraceBatchPart {
893            desc: u64_desc(0, 0),
894            index: 0,
895            updates: columnar_records(vec![]),
896        };
897        assert_eq!(
898            b.validate(),
899            Err(Error::from(
900                "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
901            ))
902        );
903
904        // Update "before" desc
905        let b = BlobTraceBatchPart {
906            desc: u64_desc(1, 2),
907            index: 0,
908            updates: columnar_records(vec![update_with_key(0, "0")]),
909        };
910        assert_eq!(
911            b.validate(),
912            Err(Error::from(
913                "timestamp 0 is less than the batch lower: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
914            ))
915        );
916
917        // Update "after" desc
918        let b = BlobTraceBatchPart {
919            desc: u64_desc(1, 2),
920            index: 0,
921            updates: columnar_records(vec![update_with_key(2, "0")]),
922        };
923        assert_eq!(
924            b.validate(),
925            Err(Error::from(
926                "timestamp 2 is greater than or equal to the batch upper: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
927            ))
928        );
929
930        // Normal case: update "after" desc and within since
931        let b = BlobTraceBatchPart {
932            desc: u64_desc_since(1, 2, 4),
933            index: 0,
934            updates: columnar_records(vec![update_with_key(2, "0")]),
935        };
936        assert_eq!(b.validate(), Ok(()));
937
938        // Normal case: update "after" desc and at since
939        let b = BlobTraceBatchPart {
940            desc: u64_desc_since(1, 2, 4),
941            index: 0,
942            updates: columnar_records(vec![update_with_key(4, "0")]),
943        };
944        assert_eq!(b.validate(), Ok(()));
945
946        // Update "after" desc since
947        let b = BlobTraceBatchPart {
948            desc: u64_desc_since(1, 2, 4),
949            index: 0,
950            updates: columnar_records(vec![update_with_key(5, "0")]),
951        };
952        assert_eq!(b.validate(), Ok(()));
953
954        // Invalid update
955        let b = BlobTraceBatchPart {
956            desc: u64_desc(0, 1),
957            index: 0,
958            updates: columnar_records(vec![(("0".into(), "0".into()), 0, 0)]),
959        };
960        assert_eq!(
961            b.validate(),
962            Err(Error::from("update with 0 diff at row 0"))
963        );
964    }
965
966    #[mz_ore::test]
967    fn trace_batch_meta_validate() {
968        // Normal case
969        let b = batch_meta(0, 1);
970        assert_eq!(b.validate(), Ok(()));
971
972        // Empty interval
973        let b = batch_meta(0, 0);
974        assert_eq!(
975            b.validate(),
976            Err(Error::from(
977                "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
978            )),
979        );
980
981        // Invalid interval
982        let b = batch_meta(2, 0);
983        assert_eq!(
984            b.validate(),
985            Err(Error::from(
986                "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
987            )),
988        );
989    }
990
991    async fn expect_set_trace_batch<T: Timestamp + Codec64>(
992        blob: &dyn Blob,
993        key: &str,
994        batch: &BlobTraceBatchPart<T>,
995    ) -> u64 {
996        let mut val = Vec::new();
997        let metrics = ColumnarMetrics::disconnected();
998        let config = EncodingConfig::default();
999        batch.encode(&mut val, &metrics, &config);
1000        let val = Bytes::from(val);
1001        let val_len = u64::cast_from(val.len());
1002        blob.set(key, val).await.expect("failed to set trace batch");
1003        val_len
1004    }
1005
1006    #[mz_ore::test(tokio::test)]
1007    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1008    async fn trace_batch_meta_validate_data() -> Result<(), Error> {
1009        let metrics = ColumnarMetrics::disconnected();
1010        let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1011        let format = ProtoBatchFormat::ParquetKvtd;
1012
1013        let batch_desc = u64_desc_since(0, 3, 0);
1014        let batch0 = BlobTraceBatchPart {
1015            desc: batch_desc.clone(),
1016            index: 0,
1017            updates: columnar_records(vec![
1018                (("k".as_bytes().to_vec(), "v".as_bytes().to_vec()), 2, 1),
1019                (("k3".as_bytes().to_vec(), "v3".as_bytes().to_vec()), 2, 1),
1020            ]),
1021        };
1022        let batch1 = BlobTraceBatchPart {
1023            desc: batch_desc.clone(),
1024            index: 1,
1025            updates: columnar_records(vec![
1026                (("k4".as_bytes().to_vec(), "v4".as_bytes().to_vec()), 2, 1),
1027                (("k5".as_bytes().to_vec(), "v5".as_bytes().to_vec()), 2, 1),
1028            ]),
1029        };
1030
1031        let batch0_size_bytes = expect_set_trace_batch(blob.as_ref(), "b0", &batch0).await;
1032        let batch1_size_bytes = expect_set_trace_batch(blob.as_ref(), "b1", &batch1).await;
1033        let size_bytes = batch0_size_bytes + batch1_size_bytes;
1034        let batch_meta = TraceBatchMeta {
1035            keys: vec!["b0".into(), "b1".into()],
1036            format,
1037            desc: batch_desc.clone(),
1038            level: 0,
1039            size_bytes,
1040        };
1041
1042        // Normal case:
1043        assert_eq!(
1044            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1045            Ok(())
1046        );
1047
1048        // Incorrect desc
1049        let batch_meta = TraceBatchMeta {
1050            keys: vec!["b0".into(), "b1".into()],
1051            format,
1052            desc: u64_desc_since(1, 3, 0),
1053            level: 0,
1054            size_bytes,
1055        };
1056        assert_eq!(
1057            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1058            Err(Error::from(
1059                "invalid trace batch part desc expected Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } } got Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } }"
1060            ))
1061        );
1062        // Key with no corresponding batch part
1063        let batch_meta = TraceBatchMeta {
1064            keys: vec!["b0".into(), "b1".into(), "b2".into()],
1065            format,
1066            desc: batch_desc.clone(),
1067            level: 0,
1068            size_bytes,
1069        };
1070        assert_eq!(
1071            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1072            Err(Error::from("no blob for trace batch at key: b2"))
1073        );
1074        // Batch parts not in index order
1075        let batch_meta = TraceBatchMeta {
1076            keys: vec!["b1".into(), "b0".into()],
1077            format,
1078            desc: batch_desc,
1079            level: 0,
1080            size_bytes,
1081        };
1082        assert_eq!(
1083            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1084            Err(Error::from(
1085                "invalid index for blob trace batch part at key b1 expected 0 got 1"
1086            ))
1087        );
1088
1089        Ok(())
1090    }
1091
1092    #[mz_ore::test]
1093    #[cfg_attr(miri, ignore)] // too slow
1094    fn encoded_batch_sizes() {
1095        fn sizes(data: DataGenerator) -> usize {
1096            let metrics = ColumnarMetrics::disconnected();
1097            let config = EncodingConfig::default();
1098            let updates: Vec<_> = data.batches().collect();
1099            let updates = BlobTraceUpdates::Row(ColumnarRecords::concat(&updates, &metrics));
1100            let trace = BlobTraceBatchPart {
1101                desc: Description::new(
1102                    Antichain::from_elem(0u64),
1103                    Antichain::new(),
1104                    Antichain::from_elem(0u64),
1105                ),
1106                index: 0,
1107                updates,
1108            };
1109            let mut trace_buf = Vec::new();
1110            trace.encode(&mut trace_buf, &metrics, &config);
1111            trace_buf.len()
1112        }
1113
1114        let record_size_bytes = DataGenerator::default().record_size_bytes;
1115        // Print all the sizes into one assert so we only have to update one
1116        // place if sizes change.
1117        assert_eq!(
1118            format!(
1119                "1/1={:?} 25/1={:?} 1000/1={:?} 1000/100={:?}",
1120                sizes(DataGenerator::new(1, record_size_bytes, 1)),
1121                sizes(DataGenerator::new(25, record_size_bytes, 25)),
1122                sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)),
1123                sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)),
1124            ),
1125            "1/1=867 25/1=2613 1000/1=72845 1000/100=72845"
1126        );
1127    }
1128}