1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Columnar understanding of persisted data
//!
//! For efficiency/performance, we directly expose the columnar structure of
//! persist's internal encoding to users during encoding and decoding. Interally
//! we use the [`arrow`] crate that gets durably written as parquet data.
//!
//! Some of the requirements that led to this design:
//! - Support a separation of data and schema because Row is not
//! self-describing: e.g. a Datum::Null can be one of many possible column
//! types. A RelationDesc is necessary to describe a Row schema.
//! - Narrow down [`arrow::datatypes::DataType`] (the arrow "logical" types) to a
//! set we want to support in persist.
//! - Do `dyn Any` downcasting of columns once per part, not once per update.
//!
//! Finally, the [Schema2] trait maps an implementor of [Codec] to the underlying
//! column structure. It also provides a [ColumnEncoder] and [ColumnDecoder] for
//! amortizing any downcasting that does need to happen.
use anyhow::anyhow;
use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder};
use arrow::datatypes::DataType;
use std::fmt::Debug;
use std::sync::Arc;
use crate::stats::{DynStats, StructStats};
use crate::Codec;
/// A __stable__ encoding for a type that gets durably persisted in an
/// [`arrow::array::FixedSizeBinaryArray`].
pub trait FixedSizeCodec<T>: Debug + PartialEq + Eq {
/// Number of bytes the encoded format requires.
const SIZE: usize;
/// Returns the encoded bytes as a slice.
fn as_bytes(&self) -> &[u8];
/// Create an instance of `self` from a slice.
///
/// Note: It is the responsibility of the caller to make sure the provided
/// data is valid for `self`.
fn from_bytes(val: &[u8]) -> Result<Self, String>
where
Self: Sized;
/// Encode a type of `T` into this format.
fn from_value(value: T) -> Self;
/// Decode an instance of `T` from this format.
fn into_value(self) -> T;
}
/// A decoder for values of a fixed schema.
///
/// This allows us to amortize the cost of downcasting columns into concrete
/// types.
pub trait ColumnDecoder<T> {
/// Decode the value at `idx` into the buffer `val`.
///
/// Behavior for when the value at `idx` is null is implementation-defined.
/// Panics if decoding an `idx` that is out-of-bounds.
fn decode(&self, idx: usize, val: &mut T);
/// Returns if the value at `idx` is null.
fn is_null(&self, idx: usize) -> bool;
/// Returns statistics for the column. This structure is defined by Persist,
/// but the contents are determined by the client; Persist will preserve
/// them in the part metadata and make them available to readers.
///
/// TODO: For now, we require that the stats be structured as a non-nullable
/// struct. For a single column, map them to a struct with a single column
/// named the empty string. Fix this restriction if we end up with non-test
/// code that isn't naturally a struct.
fn stats(&self) -> StructStats;
}
/// An encoder for values of a fixed schema
///
/// This allows us to amortize the cost of downcasting columns into concrete
/// types.
pub trait ColumnEncoder<T> {
/// Type of column that this encoder returns when finalized.
type FinishedColumn: arrow::array::Array + Debug + 'static;
/// The amount of "actual data" encoded by this encoder so far.
fn goodbytes(&self) -> usize;
/// Appends `val` onto this encoder.
fn append(&mut self, val: &T);
/// Appends a null value onto this encoder.
fn append_null(&mut self);
/// Finish this encoder, returning an immutable column.
fn finish(self) -> Self::FinishedColumn;
}
/// Description of a type that we encode into Persist.
pub trait Schema2<T>: Debug + Send + Sync {
/// The type of column we decode from, and encoder will finish into.
type ArrowColumn: arrow::array::Array + Debug + Clone + 'static;
/// Statistics we collect for a schema of this type.
type Statistics: DynStats + 'static;
/// Type that is able to decode values of `T` from [`Self::ArrowColumn`].
type Decoder: ColumnDecoder<T> + Debug + Send + Sync;
/// Type that is able to encoder values of `T`.
type Encoder: ColumnEncoder<T, FinishedColumn = Self::ArrowColumn> + Debug;
/// Returns a type that is able to decode instances of `T` from the provider column.
fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error>;
/// Returns a type that is able to decode instances of `T` from a type erased
/// [`arrow::array::Array`], erroring if the provided array is not [`Self::ArrowColumn`].
fn decoder_any(&self, col: &dyn arrow::array::Array) -> Result<Self::Decoder, anyhow::Error> {
let col = col
.as_any()
.downcast_ref::<Self::ArrowColumn>()
.ok_or_else(|| {
anyhow::anyhow!(
"failed downcasting to {}",
std::any::type_name::<Self::ArrowColumn>()
)
})?
.clone();
self.decoder(col)
}
/// Returns a type that can encode values of `T`.
fn encoder(&self) -> Result<Self::Encoder, anyhow::Error>;
}
/// Returns the data type of arrays generated by this schema.
///
/// This obtains the dsta type by encoding an empty array and checking its type.
/// The caller is generally expected to make sure that all columns generated by
/// this schema have the same datatype.
pub fn data_type<A: Codec>(schema: &A::Schema) -> anyhow::Result<DataType> {
let array = schema.encoder()?.finish();
Ok(Array::data_type(&array).clone())
}
/// Helper to convert from codec-encoded data to structured data.
pub fn codec_to_schema2<A: Codec + Default>(
schema: &A::Schema,
data: &BinaryArray,
) -> anyhow::Result<ArrayRef> {
let mut encoder = Schema2::encoder(schema)?;
let mut value: A = A::default();
let mut storage = Some(A::Storage::default());
for bytes in data.iter() {
if let Some(bytes) = bytes {
A::decode_from(&mut value, bytes, &mut storage, schema).map_err(|e| {
anyhow!(
"unable to decode bytes with {} codec: {e:#?}",
A::codec_name()
)
})?;
encoder.append(&value);
} else {
encoder.append_null();
}
}
Ok(Arc::new(encoder.finish()))
}
/// Helper to convert from structured data to codec-encoded data.
pub fn schema2_to_codec<A: Codec + Default>(
schema: &A::Schema,
data: &dyn Array,
) -> anyhow::Result<BinaryArray> {
let len = data.len();
let decoder = Schema2::decoder_any(schema, data)?;
let mut builder = BinaryBuilder::new();
let mut value: A = A::default();
let mut buffer = vec![];
for i in 0..len {
if decoder.is_null(i) {
builder.append_null();
} else {
decoder.decode(i, &mut value);
Codec::encode(&value, &mut buffer);
builder.append_value(&buffer);
buffer.clear()
}
}
Ok(BinaryBuilder::finish(&mut builder))
}