mz_persist/indexed/
encoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data structures stored in Blobs and Logs in serialized form.
11
12// NB: Everything starting with Blob* is directly serialized as a Blob value.
13// Ditto for Log* and the Log. The others are used internally in these top-level
14// structs.
15
16use std::fmt::{self, Debug};
17use std::sync::Arc;
18
19use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, Int64Array};
20use arrow::buffer::OffsetBuffer;
21use arrow::datatypes::{DataType, Int64Type, ToByteSlice};
22use base64::Engine;
23use bytes::{BufMut, Bytes};
24use differential_dataflow::difference::Monoid;
25use differential_dataflow::trace::Description;
26use mz_ore::bytes::SegmentedBytes;
27use mz_ore::cast::CastFrom;
28use mz_ore::collections::CollectionExt;
29use mz_ore::soft_panic_or_log;
30use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
31use mz_persist_types::columnar::{
32    ColumnEncoder, Schema, codec_to_schema, data_type, schema_to_codec,
33};
34use mz_persist_types::parquet::EncodingConfig;
35use mz_persist_types::part::Part;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::{Codec, Codec64};
38use mz_proto::{RustType, TryFromProtoError};
39use proptest::arbitrary::Arbitrary;
40use proptest::prelude::*;
41use proptest::strategy::{BoxedStrategy, Just};
42use prost::Message;
43use serde::Serialize;
44use timely::PartialOrder;
45use timely::progress::{Antichain, Timestamp};
46use tracing::error;
47
48use crate::error::Error;
49use crate::generated::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
50use crate::generated::persist::{
51    ProtoBatchFormat, ProtoBatchPartInline, ProtoColumnarRecords, ProtoU64Antichain,
52    ProtoU64Description,
53};
54use crate::indexed::columnar::arrow::realloc_array;
55use crate::indexed::columnar::parquet::{decode_trace_parquet, encode_trace_parquet};
56use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
57use crate::location::Blob;
58use crate::metrics::ColumnarMetrics;
59
60/// Column format of a batch.
61#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize)]
62pub enum BatchColumnarFormat {
63    /// Rows are encoded to `ProtoRow` and then a batch is written down as a Parquet with a schema
64    /// of `(k, v, t, d)`, where `k` are the serialized bytes.
65    Row,
66    /// Rows are encoded to `ProtoRow` and a columnar struct. The batch is written down as Parquet
67    /// with a schema of `(k, k_c, v, v_c, t, d)`, where `k` are the serialized bytes and `k_c` is
68    /// nested columnar data.
69    Both(usize),
70    /// Rows are encoded to a columnar struct. The batch is written down as Parquet
71    /// with a schema of `(t, d, k_s, v_s)`, where `k_s` is nested columnar data.
72    Structured,
73}
74
75impl BatchColumnarFormat {
76    /// Returns a default value for [`BatchColumnarFormat`].
77    pub const fn default() -> Self {
78        BatchColumnarFormat::Both(2)
79    }
80
81    /// Returns a [`BatchColumnarFormat`] for a given `&str`, falling back to a default value if
82    /// provided `&str` is invalid.
83    pub fn from_str(s: &str) -> Self {
84        match s {
85            "row" => BatchColumnarFormat::Row,
86            "both" => BatchColumnarFormat::Both(0),
87            "both_v2" => BatchColumnarFormat::Both(2),
88            "structured" => BatchColumnarFormat::Structured,
89            x => {
90                let default = BatchColumnarFormat::default();
91                soft_panic_or_log!("Invalid batch columnar type: {x}, falling back to {default}");
92                default
93            }
94        }
95    }
96
97    /// Returns a string representation for the [`BatchColumnarFormat`].
98    pub const fn as_str(&self) -> &'static str {
99        match self {
100            BatchColumnarFormat::Row => "row",
101            BatchColumnarFormat::Both(0 | 1) => "both",
102            BatchColumnarFormat::Both(2) => "both_v2",
103            _ => panic!("unknown batch columnar format"),
104        }
105    }
106
107    /// Returns if we should encode a Batch in a structured format.
108    pub const fn is_structured(&self) -> bool {
109        match self {
110            BatchColumnarFormat::Row => false,
111            // The V0 format has been deprecated and we ignore its structured columns.
112            BatchColumnarFormat::Both(0 | 1) => false,
113            BatchColumnarFormat::Both(_) => true,
114            BatchColumnarFormat::Structured => true,
115        }
116    }
117}
118
119impl fmt::Display for BatchColumnarFormat {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        f.write_str(self.as_str())
122    }
123}
124
125impl Arbitrary for BatchColumnarFormat {
126    type Parameters = ();
127    type Strategy = BoxedStrategy<BatchColumnarFormat>;
128
129    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
130        proptest::strategy::Union::new(vec![
131            Just(BatchColumnarFormat::Row).boxed(),
132            Just(BatchColumnarFormat::Both(0)).boxed(),
133            Just(BatchColumnarFormat::Both(1)).boxed(),
134        ])
135        .boxed()
136    }
137}
138
139/// The metadata necessary to reconstruct a list of [BlobTraceBatchPart]s.
140///
141/// Invariants:
142/// - The Description's time interval is non-empty.
143/// - Keys for all trace batch parts are unique.
144/// - Keys for all trace batch parts are stored in index order.
145/// - The data in all of the trace batch parts is sorted and consolidated.
146/// - All of the trace batch parts have the same desc as the metadata.
147#[derive(Clone, Debug, Eq, PartialEq)]
148pub struct TraceBatchMeta {
149    /// The keys to retrieve the batch's data from the blob store.
150    ///
151    /// The set of keys can be empty to denote an empty batch.
152    pub keys: Vec<String>,
153    /// The format of the stored batch data.
154    pub format: ProtoBatchFormat,
155    /// The half-open time interval `[lower, upper)` this batch contains data
156    /// for.
157    pub desc: Description<u64>,
158    /// The compaction level of each batch.
159    pub level: u64,
160    /// Size of the encoded batch.
161    pub size_bytes: u64,
162}
163
164/// The structure serialized and stored as a value in
165/// [crate::location::Blob] storage for data keys corresponding to trace
166/// data.
167///
168/// This batch represents the data that was originally written at some time in
169/// [lower, upper) (more precisely !< lower and < upper). The individual record
170/// times may have later been advanced by compaction to something <= since. This
171/// means the ability to reconstruct the state of the collection at times <
172/// since has been lost. However, there may still be records present in the
173/// batch whose times are < since. Users iterating through updates must take
174/// care to advance records with times < since to since in order to correctly
175/// answer queries at times >= since.
176///
177/// Invariants:
178/// - The [lower, upper) interval of times in desc is non-empty.
179/// - The timestamp of each update is >= to desc.lower().
180/// - The timestamp of each update is < desc.upper() iff desc.upper() >
181///   desc.since(). Otherwise the timestamp of each update is <= desc.since().
182/// - The values in updates are sorted by (key, value, time).
183/// - The values in updates are "consolidated", i.e. (key, value, time) is
184///   unique.
185/// - All entries have a non-zero diff.
186///
187/// TODO: disallow empty trace batch parts in the future so there is one unique
188/// way to represent an empty trace batch.
189#[derive(Clone, Debug, PartialEq)]
190pub struct BlobTraceBatchPart<T> {
191    /// Which updates are included in this batch.
192    ///
193    /// There may be other parts for the batch that also contain updates within
194    /// the specified [lower, upper) range.
195    pub desc: Description<T>,
196    /// Index of this part in the list of parts that form the batch.
197    pub index: u64,
198    /// The updates themselves.
199    pub updates: BlobTraceUpdates,
200}
201
202/// The set of updates that are part of a [`BlobTraceBatchPart`].
203#[derive(Clone, Debug, PartialEq)]
204pub enum BlobTraceUpdates {
205    /// Legacy format. Keys and Values are encoded into bytes via [`Codec`], then stored in our own
206    /// columnar-esque struct.
207    ///
208    /// [`Codec`]: mz_persist_types::Codec
209    Row(ColumnarRecords),
210    /// Migration format. Keys and Values are encoded into bytes via [`Codec`] and structured into
211    /// an Apache Arrow columnar format.
212    ///
213    /// [`Codec`]: mz_persist_types::Codec
214    Both(ColumnarRecords, ColumnarRecordsStructuredExt),
215    /// New-style structured format, including structured representations of the K/V columns and
216    /// the usual timestamp / diff encoding.
217    Structured {
218        /// Key-value data.
219        key_values: ColumnarRecordsStructuredExt,
220        /// Timestamp data.
221        timestamps: Int64Array,
222        /// Diffs.
223        diffs: Int64Array,
224    },
225}
226
227impl BlobTraceUpdates {
228    /// Convert from a [Part].
229    pub fn from_part(part: Part) -> Self {
230        Self::Structured {
231            key_values: ColumnarRecordsStructuredExt {
232                key: part.key,
233                val: part.val,
234            },
235            timestamps: part.time,
236            diffs: part.diff,
237        }
238    }
239
240    /// The number of updates.
241    pub fn len(&self) -> usize {
242        match self {
243            BlobTraceUpdates::Row(c) => c.len(),
244            BlobTraceUpdates::Both(c, _structured) => c.len(),
245            BlobTraceUpdates::Structured { timestamps, .. } => timestamps.len(),
246        }
247    }
248
249    /// The updates' timestamps as an integer array.
250    pub fn timestamps(&self) -> &Int64Array {
251        match self {
252            BlobTraceUpdates::Row(c) => c.timestamps(),
253            BlobTraceUpdates::Both(c, _structured) => c.timestamps(),
254            BlobTraceUpdates::Structured { timestamps, .. } => timestamps,
255        }
256    }
257
258    /// The updates' diffs as an integer array.
259    pub fn diffs(&self) -> &Int64Array {
260        match self {
261            BlobTraceUpdates::Row(c) => c.diffs(),
262            BlobTraceUpdates::Both(c, _structured) => c.diffs(),
263            BlobTraceUpdates::Structured { diffs, .. } => diffs,
264        }
265    }
266
267    /// Return the sum of the diffs in the blob.
268    pub fn diffs_sum<D: Codec64 + Monoid>(&self) -> D {
269        let mut sum = D::zero();
270        for d in self.diffs().values().iter() {
271            let d = D::decode(d.to_le_bytes());
272            sum.plus_equals(&d);
273        }
274        sum
275    }
276
277    /// Return the [`ColumnarRecords`] of the blob.
278    pub fn records(&self) -> Option<&ColumnarRecords> {
279        match self {
280            BlobTraceUpdates::Row(c) => Some(c),
281            BlobTraceUpdates::Both(c, _structured) => Some(c),
282            BlobTraceUpdates::Structured { .. } => None,
283        }
284    }
285
286    /// Return the [`ColumnarRecordsStructuredExt`] of the blob.
287    pub fn structured(&self) -> Option<&ColumnarRecordsStructuredExt> {
288        match self {
289            BlobTraceUpdates::Row(_) => None,
290            BlobTraceUpdates::Both(_, s) => Some(s),
291            BlobTraceUpdates::Structured { key_values, .. } => Some(key_values),
292        }
293    }
294
295    /// Return the estimated memory usage of the raw data.
296    pub fn goodbytes(&self) -> usize {
297        match self {
298            BlobTraceUpdates::Row(c) => c.goodbytes(),
299            // NB: we only report goodbytes for columnar records here, to avoid
300            // dual-counting the same records. (This means that our goodput % is much lower
301            // during the migration, which is an accurate reflection of reality.)
302            BlobTraceUpdates::Both(c, _) => c.goodbytes(),
303            BlobTraceUpdates::Structured {
304                key_values,
305                timestamps,
306                diffs,
307            } => {
308                key_values.goodbytes()
309                    + timestamps.values().to_byte_slice().len()
310                    + diffs.values().to_byte_slice().len()
311            }
312        }
313    }
314
315    /// Return the [ColumnarRecords] of the blob, generating it if it does not exist.
316    pub fn get_or_make_codec<K: Codec, V: Codec>(
317        &mut self,
318        key_schema: &K::Schema,
319        val_schema: &V::Schema,
320    ) -> &ColumnarRecords {
321        match self {
322            BlobTraceUpdates::Row(records) => records,
323            BlobTraceUpdates::Both(records, _) => records,
324            BlobTraceUpdates::Structured {
325                key_values,
326                timestamps,
327                diffs,
328            } => {
329                let key = schema_to_codec::<K>(key_schema, &*key_values.key).expect("valid keys");
330                let val = schema_to_codec::<V>(val_schema, &*key_values.val).expect("valid values");
331                let records = ColumnarRecords::new(key, val, timestamps.clone(), diffs.clone());
332
333                *self = BlobTraceUpdates::Both(records, key_values.clone());
334                let BlobTraceUpdates::Both(records, _) = self else {
335                    unreachable!("set to BlobTraceUpdates::Both in previous line")
336                };
337                records
338            }
339        }
340    }
341
342    /// Return the [`ColumnarRecordsStructuredExt`] of the blob.
343    pub fn get_or_make_structured<K: Codec, V: Codec>(
344        &mut self,
345        key_schema: &K::Schema,
346        val_schema: &V::Schema,
347    ) -> &ColumnarRecordsStructuredExt {
348        let structured = match self {
349            BlobTraceUpdates::Row(records) => {
350                let key = codec_to_schema::<K>(key_schema, records.keys()).expect("valid keys");
351                let val = codec_to_schema::<V>(val_schema, records.vals()).expect("valid values");
352
353                *self = BlobTraceUpdates::Both(
354                    records.clone(),
355                    ColumnarRecordsStructuredExt { key, val },
356                );
357                let BlobTraceUpdates::Both(_, structured) = self else {
358                    unreachable!("set to BlobTraceUpdates::Both in previous line")
359                };
360                structured
361            }
362            BlobTraceUpdates::Both(_, structured) => structured,
363            BlobTraceUpdates::Structured { key_values, .. } => key_values,
364        };
365
366        // If the types don't match, attempt to migrate the array to the new type.
367        // We expect this to succeed, since this should only be called with backwards-
368        // compatible schemas... but if it fails we only log, and let some higher-level
369        // code signal the error if it cares.
370        let migrate = |array: &mut ArrayRef, to_type: DataType| {
371            // TODO: Plumb down the SchemaCache and use it here for the array migrations.
372            let from_type = array.data_type().clone();
373            if from_type != to_type {
374                if let Some(migration) = backward_compatible(&from_type, &to_type) {
375                    *array = migration.migrate(Arc::clone(array));
376                    if array.data_type() != &to_type {
377                        error!(
378                            ?from_type,
379                            actual_type=?array.data_type(),
380                            ?to_type,
381                            "migration failed; returned non-matching data type!"
382                        );
383                    }
384                } else {
385                    error!(
386                        ?from_type,
387                        ?to_type,
388                        "failed to migrate array type; backwards-incompatible schema migration?"
389                    );
390                }
391            }
392        };
393        migrate(
394            &mut structured.key,
395            data_type::<K>(key_schema).expect("valid key schema"),
396        );
397        migrate(
398            &mut structured.val,
399            data_type::<V>(val_schema).expect("valid value schema"),
400        );
401
402        structured
403    }
404
405    /// If we have structured data, cast this data as a structured part.
406    pub fn as_part(&self) -> Option<Part> {
407        let ext = self.structured()?.clone();
408        Some(Part {
409            key: ext.key,
410            val: ext.val,
411            time: self.timestamps().clone(),
412            diff: self.diffs().clone(),
413        })
414    }
415
416    /// Convert this blob into a structured part, transforming the codec data if necessary.
417    pub fn into_part<K: Codec, V: Codec>(
418        &mut self,
419        key_schema: &K::Schema,
420        val_schema: &V::Schema,
421    ) -> Part {
422        let ext = self
423            .get_or_make_structured::<K, V>(key_schema, val_schema)
424            .clone();
425        Part {
426            key: ext.key,
427            val: ext.val,
428            time: self.timestamps().clone(),
429            diff: self.diffs().clone(),
430        }
431    }
432
433    /// Concatenate the given records together, column-by-column.
434    ///
435    /// If `ensure_codec` is true, then we'll ensure the returned [`BlobTraceUpdates`] includes
436    /// [`Codec`] data, re-encoding structured data if necessary.
437    pub fn concat<K: Codec, V: Codec>(
438        mut updates: Vec<BlobTraceUpdates>,
439        key_schema: &K::Schema,
440        val_schema: &V::Schema,
441        metrics: &ColumnarMetrics,
442    ) -> anyhow::Result<BlobTraceUpdates> {
443        match updates.len() {
444            0 => {
445                return Ok(BlobTraceUpdates::Structured {
446                    key_values: ColumnarRecordsStructuredExt {
447                        key: Arc::new(key_schema.encoder().expect("valid schema").finish()),
448                        val: Arc::new(val_schema.encoder().expect("valid schema").finish()),
449                    },
450                    timestamps: Int64Array::from_iter_values(vec![]),
451                    diffs: Int64Array::from_iter_values(vec![]),
452                });
453            }
454            1 => return Ok(updates.into_iter().into_element()),
455            _ => {}
456        }
457
458        // Always get or make our structured format.
459        let mut keys = Vec::with_capacity(updates.len());
460        let mut vals = Vec::with_capacity(updates.len());
461        for updates in &mut updates {
462            let structured = updates.get_or_make_structured::<K, V>(key_schema, val_schema);
463            keys.push(structured.key.as_ref());
464            vals.push(structured.val.as_ref());
465        }
466        let key_values = ColumnarRecordsStructuredExt {
467            key: ::arrow::compute::concat(&keys)?,
468            val: ::arrow::compute::concat(&vals)?,
469        };
470
471        // If necessary, ensure we have `Codec` data, which internally includes
472        // timestamps and diffs, otherwise get the timestamps and diffs separately.
473        let mut timestamps: Vec<&dyn Array> = Vec::with_capacity(updates.len());
474        let mut diffs: Vec<&dyn Array> = Vec::with_capacity(updates.len());
475
476        for update in &updates {
477            timestamps.push(update.timestamps());
478            diffs.push(update.diffs());
479        }
480        let timestamps = ::arrow::compute::concat(&timestamps)?
481            .as_primitive_opt::<Int64Type>()
482            .ok_or_else(|| anyhow::anyhow!("timestamps changed Array type"))?
483            .clone();
484        let diffs = ::arrow::compute::concat(&diffs)?
485            .as_primitive_opt::<Int64Type>()
486            .ok_or_else(|| anyhow::anyhow!("diffs changed Array type"))?
487            .clone();
488
489        let out = Self::Structured {
490            key_values,
491            timestamps,
492            diffs,
493        };
494        metrics
495            .arrow
496            .concat_bytes
497            .inc_by(u64::cast_from(out.goodbytes()));
498        Ok(out)
499    }
500
501    /// See [RustType::from_proto].
502    pub fn from_proto(
503        lgbytes: &ColumnarMetrics,
504        proto: ProtoColumnarRecords,
505    ) -> Result<Self, TryFromProtoError> {
506        let binary_array = |data: Bytes, offsets: Vec<i32>| {
507            if offsets.is_empty() && proto.len > 0 {
508                return Ok(None);
509            };
510            match BinaryArray::try_new(
511                OffsetBuffer::new(offsets.into()),
512                ::arrow::buffer::Buffer::from(data),
513                None,
514            ) {
515                Ok(data) => Ok(Some(realloc_array(&data, lgbytes))),
516                Err(e) => Err(TryFromProtoError::InvalidFieldError(format!(
517                    "Unable to decode binary array from repeated proto fields: {e:?}"
518                ))),
519            }
520        };
521
522        let codec_key = binary_array(proto.key_data, proto.key_offsets)?;
523        let codec_val = binary_array(proto.val_data, proto.val_offsets)?;
524
525        let timestamps = realloc_array(&proto.timestamps.into(), lgbytes);
526        let diffs = realloc_array(&proto.diffs.into(), lgbytes);
527        let ext =
528            ColumnarRecordsStructuredExt::from_proto(proto.key_structured, proto.val_structured)?;
529
530        let updates = match (codec_key, codec_val, ext) {
531            (Some(codec_key), Some(codec_val), Some(ext)) => BlobTraceUpdates::Both(
532                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
533                ext,
534            ),
535            (Some(codec_key), Some(codec_val), None) => BlobTraceUpdates::Row(
536                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
537            ),
538            (None, None, Some(ext)) => BlobTraceUpdates::Structured {
539                key_values: ext,
540                timestamps,
541                diffs,
542            },
543            (k, v, ext) => {
544                return Err(TryFromProtoError::InvalidPersistState(format!(
545                    "unexpected mix of key/value columns: k={:?}, v={}, ext={}",
546                    k.is_some(),
547                    v.is_some(),
548                    ext.is_some(),
549                )));
550            }
551        };
552
553        Ok(updates)
554    }
555
556    /// See [RustType::into_proto].
557    pub fn into_proto(&self) -> ProtoColumnarRecords {
558        let (key_offsets, key_data, val_offsets, val_data) = match self.records() {
559            None => (vec![], Bytes::new(), vec![], Bytes::new()),
560            Some(records) => (
561                records.keys().offsets().to_vec(),
562                Bytes::copy_from_slice(records.keys().value_data()),
563                records.vals().offsets().to_vec(),
564                Bytes::copy_from_slice(records.vals().value_data()),
565            ),
566        };
567        let (k_struct, v_struct) = match self.structured().map(|x| x.into_proto()) {
568            None => (None, None),
569            Some((k, v)) => (Some(k), Some(v)),
570        };
571
572        ProtoColumnarRecords {
573            len: self.len().into_proto(),
574            key_offsets,
575            key_data,
576            val_offsets,
577            val_data,
578            timestamps: self.timestamps().values().to_vec(),
579            diffs: self.diffs().values().to_vec(),
580            key_structured: k_struct,
581            val_structured: v_struct,
582        }
583    }
584
585    /// Convert these updates into the specified batch format, re-encoding or discarding key-value
586    /// data as necessary.
587    pub fn as_structured<K: Codec, V: Codec>(
588        &self,
589        key_schema: &K::Schema,
590        val_schema: &V::Schema,
591    ) -> Self {
592        let mut this = self.clone();
593        Self::Structured {
594            key_values: this
595                .get_or_make_structured::<K, V>(key_schema, val_schema)
596                .clone(),
597            timestamps: this.timestamps().clone(),
598            diffs: this.diffs().clone(),
599        }
600    }
601}
602
603impl TraceBatchMeta {
604    /// Asserts Self's documented invariants, returning an error if any are
605    /// violated.
606    pub fn validate(&self) -> Result<(), Error> {
607        // TODO: It's unclear if the equal case (an empty desc) is
608        // useful/harmful. Feel free to make this a less_than if empty descs end
609        // up making sense.
610        if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
611            return Err(format!("invalid desc: {:?}", &self.desc).into());
612        }
613
614        Ok(())
615    }
616
617    /// Assert that all of the [BlobTraceBatchPart]'s obey the required invariants.
618    pub async fn validate_data(
619        &self,
620        blob: &dyn Blob,
621        metrics: &ColumnarMetrics,
622    ) -> Result<(), Error> {
623        let mut batches = vec![];
624        for (idx, key) in self.keys.iter().enumerate() {
625            let value = blob
626                .get(key)
627                .await?
628                .ok_or_else(|| Error::from(format!("no blob for trace batch at key: {}", key)))?;
629            let batch = BlobTraceBatchPart::decode(&value, metrics)?;
630            if batch.desc != self.desc {
631                return Err(format!(
632                    "invalid trace batch part desc expected {:?} got {:?}",
633                    &self.desc, &batch.desc
634                )
635                .into());
636            }
637
638            if batch.index != u64::cast_from(idx) {
639                return Err(format!(
640                    "invalid index for blob trace batch part at key {} expected {} got {}",
641                    key, idx, batch.index
642                )
643                .into());
644            }
645
646            batch.validate()?;
647            batches.push(batch);
648        }
649
650        for (batch_idx, batch) in batches.iter().enumerate() {
651            for (row_idx, diff) in batch.updates.diffs().values().iter().enumerate() {
652                // TODO: Don't assume diff is an i64, take a D type param instead.
653                let diff: u64 = Codec64::decode(diff.to_le_bytes());
654
655                // Check data invariants.
656                if diff == 0 {
657                    return Err(format!(
658                        "update with 0 diff in batch {batch_idx} at row {row_idx}",
659                    )
660                    .into());
661                }
662            }
663        }
664
665        Ok(())
666    }
667}
668
669impl<T: Timestamp + Codec64> BlobTraceBatchPart<T> {
670    /// Asserts the documented invariants, returning an error if any are
671    /// violated.
672    pub fn validate(&self) -> Result<(), Error> {
673        // TODO: It's unclear if the equal case (an empty desc) is
674        // useful/harmful. Feel free to make this a less_than if empty descs end
675        // up making sense.
676        if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
677            return Err(format!("invalid desc: {:?}", &self.desc).into());
678        }
679
680        let uncompacted = PartialOrder::less_equal(self.desc.since(), self.desc.lower());
681
682        for time in self.updates.timestamps().values() {
683            let ts = T::decode(time.to_le_bytes());
684            // Check ts against desc.
685            if !self.desc.lower().less_equal(&ts) {
686                return Err(format!(
687                    "timestamp {:?} is less than the batch lower: {:?}",
688                    ts, self.desc
689                )
690                .into());
691            }
692
693            // when since is less than or equal to lower, the upper is a strict bound on the updates'
694            // timestamp because no compaction has been performed. Because user batches are always
695            // uncompacted, this ensures that new updates are recorded with valid timestamps.
696            // Otherwise, we can make no assumptions about the timestamps
697            if uncompacted && self.desc.upper().less_equal(&ts) {
698                return Err(format!(
699                    "timestamp {:?} is greater than or equal to the batch upper: {:?}",
700                    ts, self.desc
701                )
702                .into());
703            }
704        }
705
706        for (row_idx, diff) in self.updates.diffs().values().iter().enumerate() {
707            // TODO: Don't assume diff is an i64, take a D type param instead.
708            let diff: u64 = Codec64::decode(diff.to_le_bytes());
709
710            // Check data invariants.
711            if diff == 0 {
712                return Err(format!("update with 0 diff at row {row_idx}",).into());
713            }
714        }
715
716        Ok(())
717    }
718
719    /// Encodes an BlobTraceBatchPart into the Parquet format.
720    pub fn encode<B>(&self, buf: &mut B, metrics: &ColumnarMetrics, cfg: &EncodingConfig)
721    where
722        B: BufMut + Send,
723    {
724        encode_trace_parquet(&mut buf.writer(), self, metrics, cfg).expect("batch was invalid");
725    }
726
727    /// Decodes a BlobTraceBatchPart from the Parquet format.
728    pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result<Self, Error> {
729        decode_trace_parquet(buf.clone(), metrics)
730    }
731
732    /// Scans the part and returns a lower bound on the contained keys.
733    pub fn key_lower(&self) -> &[u8] {
734        self.updates
735            .records()
736            .and_then(|r| r.keys().iter().flatten().min())
737            .unwrap_or(&[])
738    }
739
740    /// Scans the part and returns a lower bound on the contained keys.
741    pub fn structured_key_lower(&self) -> Option<ArrayBound> {
742        self.updates.structured().and_then(|r| {
743            let ord = ArrayOrd::new(&r.key);
744            (0..r.key.len())
745                .min_by_key(|i| ord.at(*i))
746                .map(|i| ArrayBound::new(Arc::clone(&r.key), i))
747        })
748    }
749}
750
751impl<T: Timestamp + Codec64> From<ProtoU64Description> for Description<T> {
752    fn from(x: ProtoU64Description) -> Self {
753        Description::new(
754            x.lower
755                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
756            x.upper
757                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
758            x.since
759                .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
760        )
761    }
762}
763
764impl<T: Timestamp + Codec64> From<ProtoU64Antichain> for Antichain<T> {
765    fn from(x: ProtoU64Antichain) -> Self {
766        Antichain::from(
767            x.elements
768                .into_iter()
769                .map(|x| T::decode(u64::to_le_bytes(x)))
770                .collect::<Vec<_>>(),
771        )
772    }
773}
774
775impl<T: Timestamp + Codec64> From<&Antichain<T>> for ProtoU64Antichain {
776    fn from(x: &Antichain<T>) -> Self {
777        ProtoU64Antichain {
778            elements: x
779                .elements()
780                .iter()
781                .map(|x| u64::from_le_bytes(T::encode(x)))
782                .collect(),
783        }
784    }
785}
786
787impl<T: Timestamp + Codec64> From<&Description<T>> for ProtoU64Description {
788    fn from(x: &Description<T>) -> Self {
789        ProtoU64Description {
790            lower: Some(x.lower().into()),
791            upper: Some(x.upper().into()),
792            since: Some(x.since().into()),
793        }
794    }
795}
796
797/// Encodes the inline metadata for a trace batch into a base64 string.
798pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(batch: &BlobTraceBatchPart<T>) -> String {
799    let (format, format_metadata) = match &batch.updates {
800        // For the legacy Row format, we only write it as `ParquetKvtd`.
801        BlobTraceUpdates::Row(_) => (ProtoBatchFormat::ParquetKvtd, None),
802        // For the newer structured format we track some metadata about the version of the format.
803        BlobTraceUpdates::Both { .. } => {
804            let metadata = ProtoFormatMetadata::StructuredMigration(2);
805            (ProtoBatchFormat::ParquetStructured, Some(metadata))
806        }
807        BlobTraceUpdates::Structured { .. } => {
808            let metadata = ProtoFormatMetadata::StructuredMigration(3);
809            (ProtoBatchFormat::ParquetStructured, Some(metadata))
810        }
811    };
812
813    let inline = ProtoBatchPartInline {
814        format: format.into(),
815        desc: Some((&batch.desc).into()),
816        index: batch.index,
817        format_metadata,
818    };
819    let inline_encoded = inline.encode_to_vec();
820    base64::engine::general_purpose::STANDARD.encode(inline_encoded)
821}
822
823/// Decodes the inline metadata for a trace batch from a base64 string.
824pub fn decode_trace_inline_meta(
825    inline_base64: Option<&String>,
826) -> Result<(ProtoBatchFormat, ProtoBatchPartInline), Error> {
827    let inline_base64 = inline_base64.ok_or("missing batch metadata")?;
828    let inline_encoded = base64::engine::general_purpose::STANDARD
829        .decode(inline_base64)
830        .map_err(|err| err.to_string())?;
831    let inline = ProtoBatchPartInline::decode(&*inline_encoded).map_err(|err| err.to_string())?;
832    let format = ProtoBatchFormat::try_from(inline.format)
833        .map_err(|_| Error::from(format!("unknown format: {}", inline.format)))?;
834    Ok((format, inline))
835}
836
837#[cfg(test)]
838mod tests {
839    use std::sync::Arc;
840
841    use bytes::Bytes;
842
843    use crate::error::Error;
844    use crate::indexed::columnar::ColumnarRecordsBuilder;
845    use crate::mem::{MemBlob, MemBlobConfig};
846    use crate::metrics::ColumnarMetrics;
847    use crate::workload::DataGenerator;
848
849    use super::*;
850
851    fn update_with_key(ts: u64, key: &'static str) -> ((Vec<u8>, Vec<u8>), u64, i64) {
852        ((key.into(), "".into()), ts, 1)
853    }
854
855    fn u64_desc(lower: u64, upper: u64) -> Description<u64> {
856        Description::new(
857            Antichain::from_elem(lower),
858            Antichain::from_elem(upper),
859            Antichain::from_elem(0),
860        )
861    }
862
863    fn batch_meta(lower: u64, upper: u64) -> TraceBatchMeta {
864        TraceBatchMeta {
865            keys: vec![],
866            format: ProtoBatchFormat::Unknown,
867            desc: u64_desc(lower, upper),
868            level: 1,
869            size_bytes: 0,
870        }
871    }
872
873    fn u64_desc_since(lower: u64, upper: u64, since: u64) -> Description<u64> {
874        Description::new(
875            Antichain::from_elem(lower),
876            Antichain::from_elem(upper),
877            Antichain::from_elem(since),
878        )
879    }
880
881    fn columnar_records(updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)>) -> BlobTraceUpdates {
882        let mut builder = ColumnarRecordsBuilder::default();
883        for ((k, v), t, d) in updates {
884            assert!(builder.push(((&k, &v), Codec64::encode(&t), Codec64::encode(&d))));
885        }
886        let updates = builder.finish(&ColumnarMetrics::disconnected());
887        BlobTraceUpdates::Row(updates)
888    }
889
890    #[mz_ore::test]
891    fn trace_batch_validate() {
892        // Normal case
893        let b = BlobTraceBatchPart {
894            desc: u64_desc(0, 2),
895            index: 0,
896            updates: columnar_records(vec![update_with_key(0, "0"), update_with_key(1, "1")]),
897        };
898        assert_eq!(b.validate(), Ok(()));
899
900        // Empty
901        let b = BlobTraceBatchPart {
902            desc: u64_desc(0, 2),
903            index: 0,
904            updates: columnar_records(vec![]),
905        };
906        assert_eq!(b.validate(), Ok(()));
907
908        // Invalid desc
909        let b = BlobTraceBatchPart {
910            desc: u64_desc(2, 0),
911            index: 0,
912            updates: columnar_records(vec![]),
913        };
914        assert_eq!(
915            b.validate(),
916            Err(Error::from(
917                "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
918            ))
919        );
920
921        // Empty desc
922        let b = BlobTraceBatchPart {
923            desc: u64_desc(0, 0),
924            index: 0,
925            updates: columnar_records(vec![]),
926        };
927        assert_eq!(
928            b.validate(),
929            Err(Error::from(
930                "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
931            ))
932        );
933
934        // Update "before" desc
935        let b = BlobTraceBatchPart {
936            desc: u64_desc(1, 2),
937            index: 0,
938            updates: columnar_records(vec![update_with_key(0, "0")]),
939        };
940        assert_eq!(
941            b.validate(),
942            Err(Error::from(
943                "timestamp 0 is less than the batch lower: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
944            ))
945        );
946
947        // Update "after" desc
948        let b = BlobTraceBatchPart {
949            desc: u64_desc(1, 2),
950            index: 0,
951            updates: columnar_records(vec![update_with_key(2, "0")]),
952        };
953        assert_eq!(
954            b.validate(),
955            Err(Error::from(
956                "timestamp 2 is greater than or equal to the batch upper: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
957            ))
958        );
959
960        // Normal case: update "after" desc and within since
961        let b = BlobTraceBatchPart {
962            desc: u64_desc_since(1, 2, 4),
963            index: 0,
964            updates: columnar_records(vec![update_with_key(2, "0")]),
965        };
966        assert_eq!(b.validate(), Ok(()));
967
968        // Normal case: update "after" desc and at since
969        let b = BlobTraceBatchPart {
970            desc: u64_desc_since(1, 2, 4),
971            index: 0,
972            updates: columnar_records(vec![update_with_key(4, "0")]),
973        };
974        assert_eq!(b.validate(), Ok(()));
975
976        // Update "after" desc since
977        let b = BlobTraceBatchPart {
978            desc: u64_desc_since(1, 2, 4),
979            index: 0,
980            updates: columnar_records(vec![update_with_key(5, "0")]),
981        };
982        assert_eq!(b.validate(), Ok(()));
983
984        // Invalid update
985        let b = BlobTraceBatchPart {
986            desc: u64_desc(0, 1),
987            index: 0,
988            updates: columnar_records(vec![(("0".into(), "0".into()), 0, 0)]),
989        };
990        assert_eq!(
991            b.validate(),
992            Err(Error::from("update with 0 diff at row 0"))
993        );
994    }
995
996    #[mz_ore::test]
997    fn trace_batch_meta_validate() {
998        // Normal case
999        let b = batch_meta(0, 1);
1000        assert_eq!(b.validate(), Ok(()));
1001
1002        // Empty interval
1003        let b = batch_meta(0, 0);
1004        assert_eq!(
1005            b.validate(),
1006            Err(Error::from(
1007                "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
1008            )),
1009        );
1010
1011        // Invalid interval
1012        let b = batch_meta(2, 0);
1013        assert_eq!(
1014            b.validate(),
1015            Err(Error::from(
1016                "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
1017            )),
1018        );
1019    }
1020
1021    async fn expect_set_trace_batch<T: Timestamp + Codec64>(
1022        blob: &dyn Blob,
1023        key: &str,
1024        batch: &BlobTraceBatchPart<T>,
1025    ) -> u64 {
1026        let mut val = Vec::new();
1027        let metrics = ColumnarMetrics::disconnected();
1028        let config = EncodingConfig::default();
1029        batch.encode(&mut val, &metrics, &config);
1030        let val = Bytes::from(val);
1031        let val_len = u64::cast_from(val.len());
1032        blob.set(key, val).await.expect("failed to set trace batch");
1033        val_len
1034    }
1035
1036    #[mz_ore::test(tokio::test)]
1037    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1038    async fn trace_batch_meta_validate_data() -> Result<(), Error> {
1039        let metrics = ColumnarMetrics::disconnected();
1040        let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1041        let format = ProtoBatchFormat::ParquetKvtd;
1042
1043        let batch_desc = u64_desc_since(0, 3, 0);
1044        let batch0 = BlobTraceBatchPart {
1045            desc: batch_desc.clone(),
1046            index: 0,
1047            updates: columnar_records(vec![
1048                (("k".as_bytes().to_vec(), "v".as_bytes().to_vec()), 2, 1),
1049                (("k3".as_bytes().to_vec(), "v3".as_bytes().to_vec()), 2, 1),
1050            ]),
1051        };
1052        let batch1 = BlobTraceBatchPart {
1053            desc: batch_desc.clone(),
1054            index: 1,
1055            updates: columnar_records(vec![
1056                (("k4".as_bytes().to_vec(), "v4".as_bytes().to_vec()), 2, 1),
1057                (("k5".as_bytes().to_vec(), "v5".as_bytes().to_vec()), 2, 1),
1058            ]),
1059        };
1060
1061        let batch0_size_bytes = expect_set_trace_batch(blob.as_ref(), "b0", &batch0).await;
1062        let batch1_size_bytes = expect_set_trace_batch(blob.as_ref(), "b1", &batch1).await;
1063        let size_bytes = batch0_size_bytes + batch1_size_bytes;
1064        let batch_meta = TraceBatchMeta {
1065            keys: vec!["b0".into(), "b1".into()],
1066            format,
1067            desc: batch_desc.clone(),
1068            level: 0,
1069            size_bytes,
1070        };
1071
1072        // Normal case:
1073        assert_eq!(
1074            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1075            Ok(())
1076        );
1077
1078        // Incorrect desc
1079        let batch_meta = TraceBatchMeta {
1080            keys: vec!["b0".into(), "b1".into()],
1081            format,
1082            desc: u64_desc_since(1, 3, 0),
1083            level: 0,
1084            size_bytes,
1085        };
1086        assert_eq!(
1087            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1088            Err(Error::from(
1089                "invalid trace batch part desc expected Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } } got Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } }"
1090            ))
1091        );
1092        // Key with no corresponding batch part
1093        let batch_meta = TraceBatchMeta {
1094            keys: vec!["b0".into(), "b1".into(), "b2".into()],
1095            format,
1096            desc: batch_desc.clone(),
1097            level: 0,
1098            size_bytes,
1099        };
1100        assert_eq!(
1101            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1102            Err(Error::from("no blob for trace batch at key: b2"))
1103        );
1104        // Batch parts not in index order
1105        let batch_meta = TraceBatchMeta {
1106            keys: vec!["b1".into(), "b0".into()],
1107            format,
1108            desc: batch_desc,
1109            level: 0,
1110            size_bytes,
1111        };
1112        assert_eq!(
1113            batch_meta.validate_data(blob.as_ref(), &metrics).await,
1114            Err(Error::from(
1115                "invalid index for blob trace batch part at key b1 expected 0 got 1"
1116            ))
1117        );
1118
1119        Ok(())
1120    }
1121
1122    #[mz_ore::test]
1123    #[cfg_attr(miri, ignore)] // too slow
1124    fn encoded_batch_sizes() {
1125        fn sizes(data: DataGenerator) -> usize {
1126            let metrics = ColumnarMetrics::disconnected();
1127            let config = EncodingConfig::default();
1128            let updates: Vec<_> = data.batches().collect();
1129            let updates = BlobTraceUpdates::Row(ColumnarRecords::concat(&updates, &metrics));
1130            let trace = BlobTraceBatchPart {
1131                desc: Description::new(
1132                    Antichain::from_elem(0u64),
1133                    Antichain::new(),
1134                    Antichain::from_elem(0u64),
1135                ),
1136                index: 0,
1137                updates,
1138            };
1139            let mut trace_buf = Vec::new();
1140            trace.encode(&mut trace_buf, &metrics, &config);
1141            trace_buf.len()
1142        }
1143
1144        let record_size_bytes = DataGenerator::default().record_size_bytes;
1145        // Print all the sizes into one assert so we only have to update one
1146        // place if sizes change.
1147        assert_eq!(
1148            format!(
1149                "1/1={:?} 25/1={:?} 1000/1={:?} 1000/100={:?}",
1150                sizes(DataGenerator::new(1, record_size_bytes, 1)),
1151                sizes(DataGenerator::new(25, record_size_bytes, 25)),
1152                sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)),
1153                sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)),
1154            ),
1155            "1/1=903 25/1=2649 1000/1=72881 1000/100=72881"
1156        );
1157    }
1158}