mz_persist_types/columnar.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Columnar understanding of persisted data
11//!
12//! For efficiency/performance, we directly expose the columnar structure of
13//! persist's internal encoding to users during encoding and decoding. Interally
14//! we use the [`arrow`] crate that gets durably written as parquet data.
15//!
16//! Some of the requirements that led to this design:
17//! - Support a separation of data and schema because Row is not
18//! self-describing: e.g. a Datum::Null can be one of many possible column
19//! types. A RelationDesc is necessary to describe a Row schema.
20//! - Narrow down [`arrow::datatypes::DataType`] (the arrow "logical" types) to a
21//! set we want to support in persist.
22//! - Do `dyn Any` downcasting of columns once per part, not once per update.
23//!
24//! Finally, the [Schema] trait maps an implementor of [Codec] to the underlying
25//! column structure. It also provides a [ColumnEncoder] and [ColumnDecoder] for
26//! amortizing any downcasting that does need to happen.
27
28use anyhow::anyhow;
29use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder};
30use arrow::datatypes::DataType;
31use std::fmt::Debug;
32use std::sync::Arc;
33
34use crate::Codec;
35use crate::stats::{DynStats, StructStats};
36
37/// A __stable__ encoding for a type that gets durably persisted in an
38/// [`arrow::array::FixedSizeBinaryArray`].
39pub trait FixedSizeCodec<T>: Debug + PartialEq + Eq {
40 /// Number of bytes the encoded format requires.
41 const SIZE: usize;
42
43 /// Returns the encoded bytes as a slice.
44 fn as_bytes(&self) -> &[u8];
45 /// Create an instance of `self` from a slice.
46 ///
47 /// Note: It is the responsibility of the caller to make sure the provided
48 /// data is valid for `self`.
49 fn from_bytes(val: &[u8]) -> Result<Self, String>
50 where
51 Self: Sized;
52
53 /// Encode a type of `T` into this format.
54 fn from_value(value: T) -> Self;
55 /// Decode an instance of `T` from this format.
56 fn into_value(self) -> T;
57}
58
59/// A decoder for values of a fixed schema.
60///
61/// This allows us to amortize the cost of downcasting columns into concrete
62/// types.
63pub trait ColumnDecoder<T> {
64 /// Decode the value at `idx` into the buffer `val`.
65 ///
66 /// Behavior for when the value at `idx` is null is implementation-defined.
67 /// Panics if decoding an `idx` that is out-of-bounds.
68 fn decode(&self, idx: usize, val: &mut T);
69
70 /// Returns if the value at `idx` is null.
71 fn is_null(&self, idx: usize) -> bool;
72
73 /// Returns the number of bytes used by this decoder.
74 fn goodbytes(&self) -> usize;
75
76 /// Returns statistics for the column. This structure is defined by Persist,
77 /// but the contents are determined by the client; Persist will preserve
78 /// them in the part metadata and make them available to readers.
79 ///
80 /// TODO: For now, we require that the stats be structured as a non-nullable
81 /// struct. For a single column, map them to a struct with a single column
82 /// named the empty string. Fix this restriction if we end up with non-test
83 /// code that isn't naturally a struct.
84 fn stats(&self) -> StructStats;
85}
86
87/// An encoder for values of a fixed schema
88///
89/// This allows us to amortize the cost of downcasting columns into concrete
90/// types.
91pub trait ColumnEncoder<T> {
92 /// Type of column that this encoder returns when finalized.
93 type FinishedColumn: arrow::array::Array + Debug + 'static;
94
95 /// The amount of "actual data" encoded by this encoder so far.
96 fn goodbytes(&self) -> usize;
97
98 /// Appends `val` onto this encoder.
99 fn append(&mut self, val: &T);
100
101 /// Appends a null value onto this encoder.
102 fn append_null(&mut self);
103
104 /// Finish this encoder, returning an immutable column.
105 fn finish(self) -> Self::FinishedColumn;
106}
107
108/// Description of a type that we encode into Persist.
109pub trait Schema<T>: Debug + Send + Sync {
110 /// The type of column we decode from, and encoder will finish into.
111 type ArrowColumn: arrow::array::Array + Debug + Clone + 'static;
112 /// Statistics we collect for a schema of this type.
113 type Statistics: DynStats + 'static;
114
115 /// Type that is able to decode values of `T` from [`Self::ArrowColumn`].
116 type Decoder: ColumnDecoder<T> + Debug + Send + Sync;
117 /// Type that is able to encoder values of `T`.
118 type Encoder: ColumnEncoder<T, FinishedColumn = Self::ArrowColumn> + Debug + Send + Sync;
119
120 /// Returns a type that is able to decode instances of `T` from the provider column.
121 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error>;
122 /// Returns a type that is able to decode instances of `T` from a type erased
123 /// [`arrow::array::Array`], erroring if the provided array is not [`Self::ArrowColumn`].
124 fn decoder_any(&self, col: &dyn arrow::array::Array) -> Result<Self::Decoder, anyhow::Error> {
125 let col = col
126 .as_any()
127 .downcast_ref::<Self::ArrowColumn>()
128 .ok_or_else(|| {
129 anyhow::anyhow!(
130 "failed downcasting to {}",
131 std::any::type_name::<Self::ArrowColumn>()
132 )
133 })?
134 .clone();
135 self.decoder(col)
136 }
137
138 /// Returns a type that can encode values of `T`.
139 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error>;
140}
141
142/// Returns the data type of arrays generated by this schema.
143///
144/// This obtains the dsta type by encoding an empty array and checking its type.
145/// The caller is generally expected to make sure that all columns generated by
146/// this schema have the same datatype.
147pub fn data_type<A: Codec>(schema: &A::Schema) -> anyhow::Result<DataType> {
148 let array = schema.encoder()?.finish();
149 Ok(Array::data_type(&array).clone())
150}
151
152/// Helper to convert from codec-encoded data to structured data.
153pub fn codec_to_schema<A: Codec + Default>(
154 schema: &A::Schema,
155 data: &BinaryArray,
156) -> anyhow::Result<ArrayRef> {
157 let mut encoder = Schema::encoder(schema)?;
158
159 let mut value: A = A::default();
160 let mut storage = Some(A::Storage::default());
161
162 for bytes in data.iter() {
163 if let Some(bytes) = bytes {
164 A::decode_from(&mut value, bytes, &mut storage, schema).map_err(|e| {
165 anyhow!(
166 "unable to decode bytes with {} codec: {e:#?}",
167 A::codec_name()
168 )
169 })?;
170 encoder.append(&value);
171 } else {
172 encoder.append_null();
173 }
174 }
175
176 Ok(Arc::new(encoder.finish()))
177}
178
179/// Helper to convert from structured data to codec-encoded data.
180pub fn schema_to_codec<A: Codec + Default>(
181 schema: &A::Schema,
182 data: &dyn Array,
183) -> anyhow::Result<BinaryArray> {
184 let len = data.len();
185 let decoder = Schema::decoder_any(schema, data)?;
186 let mut builder = BinaryBuilder::new();
187
188 let mut value: A = A::default();
189 let mut buffer = vec![];
190
191 for i in 0..len {
192 // The binary encoding of key/value types can never be null.
193 // Defer to the implementation-defined behaviour for null entries in that case.
194 decoder.decode(i, &mut value);
195 Codec::encode(&value, &mut buffer);
196 builder.append_value(&buffer);
197 buffer.clear()
198 }
199
200 Ok(BinaryBuilder::finish(&mut builder))
201}