mz_persist_types/columnar.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Columnar understanding of persisted data
11//!
12//! For efficiency/performance, we directly expose the columnar structure of
13//! persist's internal encoding to users during encoding and decoding. Interally
14//! we use the [`arrow`] crate that gets durably written as parquet data.
15//!
16//! Some of the requirements that led to this design:
17//! - Support a separation of data and schema because Row is not
18//!   self-describing: e.g. a Datum::Null can be one of many possible column
19//!   types. A RelationDesc is necessary to describe a Row schema.
20//! - Narrow down [`arrow::datatypes::DataType`] (the arrow "logical" types) to a
21//!   set we want to support in persist.
22//! - Do `dyn Any` downcasting of columns once per part, not once per update.
23//!
24//! Finally, the [Schema] trait maps an implementor of [Codec] to the underlying
25//! column structure. It also provides a [ColumnEncoder] and [ColumnDecoder] for
26//! amortizing any downcasting that does need to happen.
27
28use anyhow::anyhow;
29use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder};
30use arrow::datatypes::DataType;
31use std::fmt::Debug;
32use std::sync::Arc;
33
34use crate::Codec;
35use crate::stats::{DynStats, StructStats};
36
37/// A __stable__ encoding for a type that gets durably persisted in an
38/// [`arrow::array::FixedSizeBinaryArray`].
39pub trait FixedSizeCodec<T>: Debug + PartialEq + Eq {
40    /// Number of bytes the encoded format requires.
41    const SIZE: usize;
42
43    /// Returns the encoded bytes as a slice.
44    fn as_bytes(&self) -> &[u8];
45    /// Create an instance of `self` from a slice.
46    ///
47    /// Note: It is the responsibility of the caller to make sure the provided
48    /// data is valid for `self`.
49    fn from_bytes(val: &[u8]) -> Result<Self, String>
50    where
51        Self: Sized;
52
53    /// Encode a type of `T` into this format.
54    fn from_value(value: T) -> Self;
55    /// Decode an instance of `T` from this format.
56    fn into_value(self) -> T;
57}
58
59/// A decoder for values of a fixed schema.
60///
61/// This allows us to amortize the cost of downcasting columns into concrete
62/// types.
63pub trait ColumnDecoder<T> {
64    /// Decode the value at `idx` into the buffer `val`.
65    ///
66    /// Behavior for when the value at `idx` is null is implementation-defined.
67    /// Panics if decoding an `idx` that is out-of-bounds.
68    fn decode(&self, idx: usize, val: &mut T);
69
70    /// Returns if the value at `idx` is null.
71    fn is_null(&self, idx: usize) -> bool;
72
73    /// Returns the number of bytes used by this decoder.
74    fn goodbytes(&self) -> usize;
75
76    /// Returns statistics for the column. This structure is defined by Persist,
77    /// but the contents are determined by the client; Persist will preserve
78    /// them in the part metadata and make them available to readers.
79    ///
80    /// TODO: For now, we require that the stats be structured as a non-nullable
81    /// struct. For a single column, map them to a struct with a single column
82    /// named the empty string. Fix this restriction if we end up with non-test
83    /// code that isn't naturally a struct.
84    fn stats(&self) -> StructStats;
85}
86
87/// An encoder for values of a fixed schema
88///
89/// This allows us to amortize the cost of downcasting columns into concrete
90/// types.
91pub trait ColumnEncoder<T> {
92    /// Type of column that this encoder returns when finalized.
93    type FinishedColumn: arrow::array::Array + Debug + 'static;
94
95    /// The amount of "actual data" encoded by this encoder so far.
96    fn goodbytes(&self) -> usize;
97
98    /// Appends `val` onto this encoder.
99    fn append(&mut self, val: &T);
100
101    /// Appends a null value onto this encoder.
102    fn append_null(&mut self);
103
104    /// Finish this encoder, returning an immutable column.
105    fn finish(self) -> Self::FinishedColumn;
106}
107
108/// Description of a type that we encode into Persist.
109pub trait Schema<T>: Debug + Send + Sync {
110    /// The type of column we decode from, and encoder will finish into.
111    type ArrowColumn: arrow::array::Array + Debug + Clone + 'static;
112    /// Statistics we collect for a schema of this type.
113    type Statistics: DynStats + 'static;
114
115    /// Type that is able to decode values of `T` from [`Self::ArrowColumn`].
116    type Decoder: ColumnDecoder<T> + Debug + Send + Sync;
117    /// Type that is able to encoder values of `T`.
118    type Encoder: ColumnEncoder<T, FinishedColumn = Self::ArrowColumn> + Debug + Send + Sync;
119
120    /// Returns a type that is able to decode instances of `T` from the provider column.
121    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error>;
122    /// Returns a type that is able to decode instances of `T` from a type erased
123    /// [`arrow::array::Array`], erroring if the provided array is not [`Self::ArrowColumn`].
124    fn decoder_any(&self, col: &dyn arrow::array::Array) -> Result<Self::Decoder, anyhow::Error> {
125        let col = col
126            .as_any()
127            .downcast_ref::<Self::ArrowColumn>()
128            .ok_or_else(|| {
129                anyhow::anyhow!(
130                    "failed downcasting to {}",
131                    std::any::type_name::<Self::ArrowColumn>()
132                )
133            })?
134            .clone();
135        self.decoder(col)
136    }
137
138    /// Returns a type that can encode values of `T`.
139    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error>;
140}
141
142/// Returns the data type of arrays generated by this schema.
143///
144/// This obtains the dsta type by encoding an empty array and checking its type.
145/// The caller is generally expected to make sure that all columns generated by
146/// this schema have the same datatype.
147pub fn data_type<A: Codec>(schema: &A::Schema) -> anyhow::Result<DataType> {
148    let array = schema.encoder()?.finish();
149    Ok(Array::data_type(&array).clone())
150}
151
152/// Helper to convert from codec-encoded data to structured data.
153pub fn codec_to_schema<A: Codec + Default>(
154    schema: &A::Schema,
155    data: &BinaryArray,
156) -> anyhow::Result<ArrayRef> {
157    let mut encoder = Schema::encoder(schema)?;
158
159    let mut value: A = A::default();
160    let mut storage = Some(A::Storage::default());
161
162    for bytes in data.iter() {
163        if let Some(bytes) = bytes {
164            A::decode_from(&mut value, bytes, &mut storage, schema).map_err(|e| {
165                anyhow!(
166                    "unable to decode bytes with {} codec: {e:#?}",
167                    A::codec_name()
168                )
169            })?;
170            encoder.append(&value);
171        } else {
172            encoder.append_null();
173        }
174    }
175
176    Ok(Arc::new(encoder.finish()))
177}
178
179/// Helper to convert from structured data to codec-encoded data.
180pub fn schema_to_codec<A: Codec + Default>(
181    schema: &A::Schema,
182    data: &dyn Array,
183) -> anyhow::Result<BinaryArray> {
184    let len = data.len();
185    let decoder = Schema::decoder_any(schema, data)?;
186    let mut builder = BinaryBuilder::new();
187
188    let mut value: A = A::default();
189    let mut buffer = vec![];
190
191    for i in 0..len {
192        // The binary encoding of key/value types can never be null.
193        // Defer to the implementation-defined behaviour for null entries in that case.
194        decoder.decode(i, &mut value);
195        Codec::encode(&value, &mut buffer);
196        builder.append_value(&buffer);
197        buffer.clear()
198    }
199
200    Ok(BinaryBuilder::finish(&mut builder))
201}