use std::fmt::Debug;
use std::ops::AddAssign;
use std::sync::Arc;
use anyhow::anyhow;
use arrow::array::{
make_array, Array, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray,
BooleanBufferBuilder, BooleanBuilder, FixedSizeBinaryArray, FixedSizeBinaryBuilder,
Float32Array, Float32Builder, Float64Array, Float64Builder, Int16Array, Int16Builder,
Int32Array, Int32Builder, Int64Array, Int64Builder, ListArray, ListBuilder, MapArray,
StringArray, StringBuilder, StructArray, UInt16Array, UInt16Builder, UInt32Array,
UInt32Builder, UInt64Array, UInt64Builder, UInt8Array, UInt8Builder,
};
use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{DataType, Field, Fields, ToByteSlice};
use bytes::{BufMut, Bytes};
use chrono::Timelike;
use dec::{Context, Decimal, OrderedDecimal};
use itertools::{EitherOrBoth, Itertools};
use mz_ore::assert_none;
use mz_ore::cast::CastFrom;
use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, FixedSizeCodec, Schema2};
use mz_persist_types::stats::{
ColumnNullStats, ColumnStatKinds, ColumnarStats, FixedSizeBytesStatsKind, OptionStats,
PrimitiveStats, StructStats,
};
use mz_persist_types::stats2::ColumnarStatsBuilder;
use mz_persist_types::Codec;
use mz_proto::chrono::ProtoNaiveTime;
use mz_proto::{ProtoType, RustType, TryFromProtoError};
use prost::Message;
use timely::Container;
use uuid::Uuid;
use crate::adt::array::{ArrayDimension, PackedArrayDimension};
use crate::adt::date::Date;
use crate::adt::datetime::PackedNaiveTime;
use crate::adt::interval::PackedInterval;
use crate::adt::jsonb::{JsonbPacker, JsonbRef};
use crate::adt::mz_acl_item::{PackedAclItem, PackedMzAclItem};
use crate::adt::numeric::{Numeric, PackedNumeric};
use crate::adt::range::{Range, RangeInner, RangeLowerBound, RangeUpperBound};
use crate::adt::timestamp::{CheckedTimestamp, PackedNaiveDateTime};
use crate::row::proto_datum::DatumType;
use crate::row::{
ProtoArray, ProtoArrayDimension, ProtoDatum, ProtoDatumOther, ProtoDict, ProtoDictElement,
ProtoNumeric, ProtoRange, ProtoRangeInner, ProtoRow,
};
use crate::stats2::{fixed_stats_from_column, numeric_stats_from_column, stats_for_json};
use crate::{Datum, ProtoRelationDesc, RelationDesc, Row, RowPacker, ScalarType, Timestamp};
#[allow(clippy::as_conversions)]
mod fixed_binary_sizes {
use super::*;
pub const TIME_FIXED_BYTES: i32 = PackedNaiveTime::SIZE as i32;
pub const TIMESTAMP_FIXED_BYTES: i32 = PackedNaiveDateTime::SIZE as i32;
pub const INTERVAL_FIXED_BYTES: i32 = PackedInterval::SIZE as i32;
pub const ACL_ITEM_FIXED_BYTES: i32 = PackedAclItem::SIZE as i32;
pub const _MZ_ACL_ITEM_FIXED_BYTES: i32 = PackedMzAclItem::SIZE as i32;
pub const ARRAY_DIMENSION_FIXED_BYTES: i32 = PackedArrayDimension::SIZE as i32;
pub const UUID_FIXED_BYTES: i32 = 16;
static_assertions::const_assert_eq!(UUID_FIXED_BYTES as usize, std::mem::size_of::<Uuid>());
}
use fixed_binary_sizes::*;
pub fn preserves_order(scalar_type: &ScalarType) -> bool {
match scalar_type {
ScalarType::Bool
| ScalarType::Int16
| ScalarType::Int32
| ScalarType::Int64
| ScalarType::UInt16
| ScalarType::UInt32
| ScalarType::UInt64
| ScalarType::Numeric { .. }
| ScalarType::Date
| ScalarType::Time
| ScalarType::Timestamp { .. }
| ScalarType::TimestampTz { .. }
| ScalarType::Interval
| ScalarType::Bytes
| ScalarType::String
| ScalarType::Uuid
| ScalarType::MzTimestamp
| ScalarType::MzAclItem
| ScalarType::AclItem => true,
ScalarType::Record { fields, .. } => fields
.iter()
.all(|(_, field_type)| preserves_order(&field_type.scalar_type)),
ScalarType::Float32 | ScalarType::Float64 => false,
ScalarType::PgLegacyChar
| ScalarType::PgLegacyName
| ScalarType::Char { .. }
| ScalarType::VarChar { .. }
| ScalarType::Jsonb
| ScalarType::Array(_)
| ScalarType::List { .. }
| ScalarType::Oid
| ScalarType::Map { .. }
| ScalarType::RegProc
| ScalarType::RegType
| ScalarType::RegClass
| ScalarType::Int2Vector
| ScalarType::Range { .. } => false,
}
}
#[derive(Debug)]
struct DatumEncoder {
nullable: bool,
encoder: DatumColumnEncoder,
}
impl DatumEncoder {
fn goodbytes(&self) -> usize {
self.encoder.goodbytes()
}
fn push(&mut self, datum: Datum) {
assert!(
!datum.is_null() || self.nullable,
"tried pushing Null into non-nullable column"
);
self.encoder.push(datum);
}
fn push_invalid(&mut self) {
self.encoder.push_invalid();
}
fn finish(self) -> ArrayRef {
self.encoder.finish()
}
}
#[derive(Debug)]
enum DatumColumnEncoder {
Bool(BooleanBuilder),
U8(UInt8Builder),
U16(UInt16Builder),
U32(UInt32Builder),
U64(UInt64Builder),
I16(Int16Builder),
I32(Int32Builder),
I64(Int64Builder),
F32(Float32Builder),
F64(Float64Builder),
Numeric {
binary_values: BinaryBuilder,
approx_values: Float64Builder,
numeric_context: Context<Numeric>,
},
Bytes(BinaryBuilder),
String(StringBuilder),
Date(Int32Builder),
Time(FixedSizeBinaryBuilder),
Timestamp(FixedSizeBinaryBuilder),
TimestampTz(FixedSizeBinaryBuilder),
MzTimestamp(UInt64Builder),
Interval(FixedSizeBinaryBuilder),
Uuid(FixedSizeBinaryBuilder),
AclItem(FixedSizeBinaryBuilder),
MzAclItem(BinaryBuilder),
Range(BinaryBuilder),
Jsonb {
offsets: Vec<i32>,
buf: Vec<u8>,
nulls: Option<BooleanBufferBuilder>,
},
Array {
dims: ListBuilder<FixedSizeBinaryBuilder>,
val_lengths: Vec<usize>,
vals: Box<DatumColumnEncoder>,
nulls: Option<BooleanBufferBuilder>,
},
List {
lengths: Vec<usize>,
values: Box<DatumColumnEncoder>,
nulls: Option<BooleanBufferBuilder>,
},
Map {
lengths: Vec<usize>,
keys: StringBuilder,
vals: Box<DatumColumnEncoder>,
nulls: Option<BooleanBufferBuilder>,
},
Record {
fields: Vec<DatumEncoder>,
nulls: Option<BooleanBufferBuilder>,
length: usize,
},
RecordEmpty(BooleanBuilder),
}
impl DatumColumnEncoder {
fn goodbytes(&self) -> usize {
match self {
DatumColumnEncoder::Bool(a) => a.len(),
DatumColumnEncoder::U8(a) => a.values_slice().to_byte_slice().len(),
DatumColumnEncoder::U16(a) => a.values_slice().to_byte_slice().len(),
DatumColumnEncoder::U32(a) => a.values_slice().to_byte_slice().len(),
DatumColumnEncoder::U64(a) => a.values_slice().to_byte_slice().len(),
DatumColumnEncoder::I16(a) => a.values_slice().to_byte_slice().len(),
DatumColumnEncoder::I32(a) => a.values_slice().to_byte_slice().len(),
DatumColumnEncoder::I64(a) => a.values_slice().to_byte_slice().len(),
DatumColumnEncoder::F32(a) => a.values_slice().to_byte_slice().len(),
DatumColumnEncoder::F64(a) => a.values_slice().to_byte_slice().len(),
DatumColumnEncoder::Numeric {
binary_values,
approx_values,
..
} => {
binary_values.values_slice().len()
+ approx_values.values_slice().to_byte_slice().len()
}
DatumColumnEncoder::Bytes(a) => a.values_slice().len(),
DatumColumnEncoder::String(a) => a.values_slice().len(),
DatumColumnEncoder::Date(a) => a.values_slice().to_byte_slice().len(),
DatumColumnEncoder::Time(a) => a.len() * PackedNaiveTime::SIZE,
DatumColumnEncoder::Timestamp(a) => a.len() * PackedNaiveDateTime::SIZE,
DatumColumnEncoder::TimestampTz(a) => a.len() * PackedNaiveDateTime::SIZE,
DatumColumnEncoder::MzTimestamp(a) => a.values_slice().to_byte_slice().len(),
DatumColumnEncoder::Interval(a) => a.len() * PackedInterval::SIZE,
DatumColumnEncoder::Uuid(a) => a.len() * size_of::<Uuid>(),
DatumColumnEncoder::AclItem(a) => a.len() * PackedAclItem::SIZE,
DatumColumnEncoder::MzAclItem(a) => a.values_slice().len(),
DatumColumnEncoder::Range(a) => a.values_slice().len(),
DatumColumnEncoder::Jsonb { buf, .. } => buf.len(),
DatumColumnEncoder::Array { dims, vals, .. } => {
dims.len() * PackedArrayDimension::SIZE + vals.goodbytes()
}
DatumColumnEncoder::List { values, .. } => values.goodbytes(),
DatumColumnEncoder::Map { keys, vals, .. } => {
keys.values_slice().len() + vals.goodbytes()
}
DatumColumnEncoder::Record { fields, .. } => fields.iter().map(|f| f.goodbytes()).sum(),
DatumColumnEncoder::RecordEmpty(a) => a.len(),
}
}
fn push<'e, 'd>(&'e mut self, datum: Datum<'d>) {
match (self, datum) {
(DatumColumnEncoder::Bool(bool_builder), Datum::True) => {
bool_builder.append_value(true)
}
(DatumColumnEncoder::Bool(bool_builder), Datum::False) => {
bool_builder.append_value(false)
}
(DatumColumnEncoder::U8(builder), Datum::UInt8(val)) => builder.append_value(val),
(DatumColumnEncoder::U16(builder), Datum::UInt16(val)) => builder.append_value(val),
(DatumColumnEncoder::U32(builder), Datum::UInt32(val)) => builder.append_value(val),
(DatumColumnEncoder::U64(builder), Datum::UInt64(val)) => builder.append_value(val),
(DatumColumnEncoder::I16(builder), Datum::Int16(val)) => builder.append_value(val),
(DatumColumnEncoder::I32(builder), Datum::Int32(val)) => builder.append_value(val),
(DatumColumnEncoder::I64(builder), Datum::Int64(val)) => builder.append_value(val),
(DatumColumnEncoder::F32(builder), Datum::Float32(val)) => builder.append_value(*val),
(DatumColumnEncoder::F64(builder), Datum::Float64(val)) => builder.append_value(*val),
(
DatumColumnEncoder::Numeric {
approx_values,
binary_values,
numeric_context,
},
Datum::Numeric(val),
) => {
let float_approx = numeric_context.try_into_f64(val.0).unwrap_or_else(|_| {
numeric_context.clear_status();
if val.0.is_negative() {
f64::NEG_INFINITY
} else {
f64::INFINITY
}
});
let packed = PackedNumeric::from_value(val.0);
approx_values.append_value(float_approx);
binary_values.append_value(packed.as_bytes());
}
(DatumColumnEncoder::String(builder), Datum::String(val)) => builder.append_value(val),
(DatumColumnEncoder::Bytes(builder), Datum::Bytes(val)) => builder.append_value(val),
(DatumColumnEncoder::Date(builder), Datum::Date(val)) => {
builder.append_value(val.pg_epoch_days())
}
(DatumColumnEncoder::Time(builder), Datum::Time(val)) => {
let packed = PackedNaiveTime::from_value(val);
builder
.append_value(packed.as_bytes())
.expect("known correct size");
}
(DatumColumnEncoder::Timestamp(builder), Datum::Timestamp(val)) => {
let packed = PackedNaiveDateTime::from_value(val.to_naive());
builder
.append_value(packed.as_bytes())
.expect("known correct size");
}
(DatumColumnEncoder::TimestampTz(builder), Datum::TimestampTz(val)) => {
let packed = PackedNaiveDateTime::from_value(val.to_naive());
builder
.append_value(packed.as_bytes())
.expect("known correct size");
}
(DatumColumnEncoder::MzTimestamp(builder), Datum::MzTimestamp(val)) => {
builder.append_value(val.into());
}
(DatumColumnEncoder::Interval(builder), Datum::Interval(val)) => {
let packed = PackedInterval::from_value(val);
builder
.append_value(packed.as_bytes())
.expect("known correct size");
}
(DatumColumnEncoder::Uuid(builder), Datum::Uuid(val)) => builder
.append_value(val.as_bytes())
.expect("known correct size"),
(DatumColumnEncoder::AclItem(builder), Datum::AclItem(val)) => {
let packed = PackedAclItem::from_value(val);
builder
.append_value(packed.as_bytes())
.expect("known correct size");
}
(DatumColumnEncoder::MzAclItem(builder), Datum::MzAclItem(val)) => {
let packed = PackedMzAclItem::from_value(val);
builder.append_value(packed.as_bytes());
}
(DatumColumnEncoder::Range(builder), d @ Datum::Range(_)) => {
let proto = ProtoDatum::from(d);
let bytes = proto.encode_to_vec();
builder.append_value(&bytes);
}
(
DatumColumnEncoder::Jsonb {
offsets,
buf,
nulls,
},
d @ Datum::JsonNull
| d @ Datum::True
| d @ Datum::False
| d @ Datum::Numeric(_)
| d @ Datum::String(_)
| d @ Datum::List(_)
| d @ Datum::Map(_),
) => {
let mut buf = buf;
let json = JsonbRef::from_datum(d);
json.to_writer(&mut buf)
.expect("failed to serialize Datum to jsonb");
let offset: i32 = buf.len().try_into().expect("wrote more than 4GB of JSON");
offsets.push(offset);
if let Some(nulls) = nulls {
nulls.append(true);
}
}
(
DatumColumnEncoder::Array {
dims,
val_lengths,
vals,
nulls,
},
Datum::Array(array),
) => {
for dimension in array.dims() {
let packed = PackedArrayDimension::from_value(dimension);
dims.values()
.append_value(packed.as_bytes())
.expect("known correct size");
}
dims.append(true);
let mut count = 0;
for datum in &array.elements() {
count += 1;
vals.push(datum);
}
val_lengths.push(count);
if let Some(nulls) = nulls {
nulls.append(true);
}
}
(
DatumColumnEncoder::List {
lengths,
values,
nulls,
},
Datum::List(list),
) => {
let mut count = 0;
for datum in &list {
count += 1;
values.push(datum);
}
lengths.push(count);
if let Some(nulls) = nulls {
nulls.append(true);
}
}
(
DatumColumnEncoder::Map {
lengths,
keys,
vals,
nulls,
},
Datum::Map(map),
) => {
let mut count = 0;
for (key, datum) in &map {
count += 1;
keys.append_value(key);
vals.push(datum);
}
lengths.push(count);
if let Some(nulls) = nulls {
nulls.append(true);
}
}
(
DatumColumnEncoder::Record {
fields,
nulls,
length,
},
Datum::List(records),
) => {
let mut count = 0;
for (datum, encoder) in records.into_iter().zip_eq(fields.iter_mut()) {
count += 1;
encoder.push(datum);
}
assert_eq!(count, fields.len());
length.add_assign(1);
if let Some(nulls) = nulls.as_mut() {
nulls.append(true);
}
}
(DatumColumnEncoder::RecordEmpty(builder), Datum::List(records)) => {
assert_none!(records.into_iter().next());
builder.append_value(true);
}
(encoder, Datum::Null) => encoder.push_invalid(),
(encoder, datum) => panic!("can't encode {datum:?} into {encoder:?}"),
}
}
fn push_invalid(&mut self) {
match self {
DatumColumnEncoder::Bool(builder) => builder.append_null(),
DatumColumnEncoder::U8(builder) => builder.append_null(),
DatumColumnEncoder::U16(builder) => builder.append_null(),
DatumColumnEncoder::U32(builder) => builder.append_null(),
DatumColumnEncoder::U64(builder) => builder.append_null(),
DatumColumnEncoder::I16(builder) => builder.append_null(),
DatumColumnEncoder::I32(builder) => builder.append_null(),
DatumColumnEncoder::I64(builder) => builder.append_null(),
DatumColumnEncoder::F32(builder) => builder.append_null(),
DatumColumnEncoder::F64(builder) => builder.append_null(),
DatumColumnEncoder::Numeric {
approx_values,
binary_values,
numeric_context: _,
} => {
approx_values.append_null();
binary_values.append_null();
}
DatumColumnEncoder::String(builder) => builder.append_null(),
DatumColumnEncoder::Bytes(builder) => builder.append_null(),
DatumColumnEncoder::Date(builder) => builder.append_null(),
DatumColumnEncoder::Time(builder) => builder.append_null(),
DatumColumnEncoder::Timestamp(builder) => builder.append_null(),
DatumColumnEncoder::TimestampTz(builder) => builder.append_null(),
DatumColumnEncoder::MzTimestamp(builder) => builder.append_null(),
DatumColumnEncoder::Interval(builder) => builder.append_null(),
DatumColumnEncoder::Uuid(builder) => builder.append_null(),
DatumColumnEncoder::AclItem(builder) => builder.append_null(),
DatumColumnEncoder::MzAclItem(builder) => builder.append_null(),
DatumColumnEncoder::Range(builder) => builder.append_null(),
DatumColumnEncoder::Jsonb {
offsets,
buf: _,
nulls,
} => {
let nulls = nulls.get_or_insert_with(|| {
let mut buf = BooleanBufferBuilder::new(offsets.len());
buf.append_n(offsets.len() - 1, true);
buf
});
offsets.push(offsets.last().copied().unwrap_or(0));
nulls.append(false);
}
DatumColumnEncoder::Array {
dims,
val_lengths,
vals: _,
nulls,
} => {
let nulls = nulls.get_or_insert_with(|| {
let mut buf = BooleanBufferBuilder::new(dims.len() + 1);
buf.append_n(dims.len(), true);
buf
});
dims.append_null();
val_lengths.push(0);
nulls.append(false);
}
DatumColumnEncoder::List {
lengths,
values: _,
nulls,
} => {
let nulls = nulls.get_or_insert_with(|| {
let mut buf = BooleanBufferBuilder::new(lengths.len() + 1);
buf.append_n(lengths.len(), true);
buf
});
lengths.push(0);
nulls.append(false);
}
DatumColumnEncoder::Map {
lengths,
keys: _,
vals: _,
nulls,
} => {
let nulls = nulls.get_or_insert_with(|| {
let mut buf = BooleanBufferBuilder::new(lengths.len() + 1);
buf.append_n(lengths.len(), true);
buf
});
lengths.push(0);
nulls.append(false);
}
DatumColumnEncoder::Record {
fields,
nulls,
length,
} => {
let nulls = nulls.get_or_insert_with(|| {
let mut buf = BooleanBufferBuilder::new(*length + 1);
buf.append_n(*length, true);
buf
});
nulls.append(false);
length.add_assign(1);
for field in fields {
field.push_invalid();
}
}
DatumColumnEncoder::RecordEmpty(builder) => builder.append_null(),
}
}
fn finish(self) -> ArrayRef {
match self {
DatumColumnEncoder::Bool(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::U8(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::U16(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::U32(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::U64(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::I16(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::I32(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::I64(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::F32(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::F64(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::Numeric {
mut approx_values,
mut binary_values,
numeric_context: _,
} => {
let approx_array = approx_values.finish();
let binary_array = binary_values.finish();
assert_eq!(approx_array.len(), binary_array.len());
debug_assert_eq!(approx_array.logical_nulls(), binary_array.logical_nulls());
let fields = Fields::from(vec![
Field::new("approx", approx_array.data_type().clone(), true),
Field::new("binary", binary_array.data_type().clone(), true),
]);
let nulls = approx_array.logical_nulls();
let array = StructArray::new(
fields,
vec![Arc::new(approx_array), Arc::new(binary_array)],
nulls,
);
Arc::new(array)
}
DatumColumnEncoder::String(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::Bytes(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::Date(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::Time(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::Timestamp(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::TimestampTz(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::MzTimestamp(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::Interval(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::Uuid(mut builder) => {
let array = builder.finish();
Arc::new(array)
}
DatumColumnEncoder::AclItem(mut builder) => Arc::new(builder.finish()),
DatumColumnEncoder::MzAclItem(mut builder) => Arc::new(builder.finish()),
DatumColumnEncoder::Range(mut builder) => Arc::new(builder.finish()),
DatumColumnEncoder::Jsonb {
offsets,
buf,
mut nulls,
} => {
let values = Buffer::from_vec(buf);
let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
let array = StringArray::new(offsets, values, nulls);
Arc::new(array)
}
DatumColumnEncoder::Array {
mut dims,
val_lengths,
vals,
mut nulls,
} => {
let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
let vals = vals.finish();
let field = Field::new_list_field(vals.data_type().clone(), true);
let val_offsets = OffsetBuffer::from_lengths(val_lengths);
let values =
ListArray::new(Arc::new(field), val_offsets, Arc::new(vals), nulls.clone());
let dims = dims.finish();
assert_eq!(values.len(), dims.len());
let fields = Fields::from(vec![
Field::new("dims", dims.data_type().clone(), true),
Field::new("vals", values.data_type().clone(), true),
]);
let array = StructArray::new(fields, vec![Arc::new(dims), Arc::new(values)], nulls);
Arc::new(array)
}
DatumColumnEncoder::List {
lengths,
values,
mut nulls,
} => {
let values = values.finish();
let field = Field::new_list_field(values.data_type().clone(), true);
let offsets = OffsetBuffer::<i32>::from_lengths(lengths.iter().copied());
let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
let array = ListArray::new(Arc::new(field), offsets, values, nulls);
Arc::new(array)
}
DatumColumnEncoder::Map {
lengths,
mut keys,
vals,
mut nulls,
} => {
let keys = keys.finish();
let vals = vals.finish();
let offsets = OffsetBuffer::<i32>::from_lengths(lengths.iter().copied());
let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
assert_none!(keys.logical_nulls());
let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false));
let val_field = Arc::new(Field::new("val", vals.data_type().clone(), true));
let fields = Fields::from(vec![Arc::clone(&key_field), Arc::clone(&val_field)]);
let entries = StructArray::new(fields, vec![Arc::new(keys), vals], None);
let field = Field::new("map_entries", entries.data_type().clone(), false);
let array = ListArray::new(Arc::new(field), offsets, Arc::new(entries), nulls);
Arc::new(array)
}
DatumColumnEncoder::Record {
fields,
mut nulls,
length: _,
} => {
let (fields, arrays): (Vec<_>, Vec<_>) = fields
.into_iter()
.enumerate()
.map(|(tag, encoder)| {
let nullable = true;
let array = encoder.finish();
let field =
Field::new(tag.to_string(), array.data_type().clone(), nullable);
(field, array)
})
.unzip();
let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
let array = StructArray::new(Fields::from(fields), arrays, nulls);
Arc::new(array)
}
DatumColumnEncoder::RecordEmpty(mut builder) => Arc::new(builder.finish()),
}
}
}
#[derive(Debug)]
enum DatumColumnDecoder {
Bool(BooleanArray),
U8(UInt8Array),
U16(UInt16Array),
U32(UInt32Array),
U64(UInt64Array),
I16(Int16Array),
I32(Int32Array),
I64(Int64Array),
F32(Float32Array),
F64(Float64Array),
Numeric(BinaryArray),
Bytes(BinaryArray),
String(StringArray),
Date(Int32Array),
Time(FixedSizeBinaryArray),
Timestamp(FixedSizeBinaryArray),
TimestampTz(FixedSizeBinaryArray),
MzTimestamp(UInt64Array),
Interval(FixedSizeBinaryArray),
Uuid(FixedSizeBinaryArray),
Json(StringArray),
Array {
dim_offsets: OffsetBuffer<i32>,
dims: FixedSizeBinaryArray,
val_offsets: OffsetBuffer<i32>,
vals: Box<DatumColumnDecoder>,
nulls: Option<NullBuffer>,
},
List {
offsets: OffsetBuffer<i32>,
values: Box<DatumColumnDecoder>,
nulls: Option<NullBuffer>,
},
Map {
offsets: OffsetBuffer<i32>,
keys: StringArray,
vals: Box<DatumColumnDecoder>,
nulls: Option<NullBuffer>,
},
RecordEmpty(BooleanArray),
Record {
fields: Vec<Box<DatumColumnDecoder>>,
nulls: Option<NullBuffer>,
},
Range(BinaryArray),
MzAclItem(BinaryArray),
AclItem(FixedSizeBinaryArray),
}
impl DatumColumnDecoder {
fn get<'a>(&'a self, idx: usize, packer: &'a mut RowPacker) {
let datum = match self {
DatumColumnDecoder::Bool(array) => array
.is_valid(idx)
.then(|| array.value(idx))
.map(|x| if x { Datum::True } else { Datum::False }),
DatumColumnDecoder::U8(array) => array
.is_valid(idx)
.then(|| array.value(idx))
.map(Datum::UInt8),
DatumColumnDecoder::U16(array) => array
.is_valid(idx)
.then(|| array.value(idx))
.map(Datum::UInt16),
DatumColumnDecoder::U32(array) => array
.is_valid(idx)
.then(|| array.value(idx))
.map(Datum::UInt32),
DatumColumnDecoder::U64(array) => array
.is_valid(idx)
.then(|| array.value(idx))
.map(Datum::UInt64),
DatumColumnDecoder::I16(array) => array
.is_valid(idx)
.then(|| array.value(idx))
.map(Datum::Int16),
DatumColumnDecoder::I32(array) => array
.is_valid(idx)
.then(|| array.value(idx))
.map(Datum::Int32),
DatumColumnDecoder::I64(array) => array
.is_valid(idx)
.then(|| array.value(idx))
.map(Datum::Int64),
DatumColumnDecoder::F32(array) => array
.is_valid(idx)
.then(|| array.value(idx))
.map(|x| Datum::Float32(ordered_float::OrderedFloat(x))),
DatumColumnDecoder::F64(array) => array
.is_valid(idx)
.then(|| array.value(idx))
.map(|x| Datum::Float64(ordered_float::OrderedFloat(x))),
DatumColumnDecoder::Numeric(array) => array.is_valid(idx).then(|| {
let val = array.value(idx);
let val = PackedNumeric::from_bytes(val)
.expect("failed to roundtrip Numeric")
.into_value();
Datum::Numeric(OrderedDecimal(val))
}),
DatumColumnDecoder::String(array) => array
.is_valid(idx)
.then(|| array.value(idx))
.map(Datum::String),
DatumColumnDecoder::Bytes(array) => array
.is_valid(idx)
.then(|| array.value(idx))
.map(Datum::Bytes),
DatumColumnDecoder::Date(array) => {
array.is_valid(idx).then(|| array.value(idx)).map(|x| {
let date = Date::from_pg_epoch(x).expect("failed to roundtrip");
Datum::Date(date)
})
}
DatumColumnDecoder::Time(array) => {
array.is_valid(idx).then(|| array.value(idx)).map(|x| {
let packed = PackedNaiveTime::from_bytes(x).expect("failed to roundtrip time");
Datum::Time(packed.into_value())
})
}
DatumColumnDecoder::Timestamp(array) => {
array.is_valid(idx).then(|| array.value(idx)).map(|x| {
let packed = PackedNaiveDateTime::from_bytes(x)
.expect("failed to roundtrip PackedNaiveDateTime");
let timestamp = CheckedTimestamp::from_timestamplike(packed.into_value())
.expect("failed to roundtrip timestamp");
Datum::Timestamp(timestamp)
})
}
DatumColumnDecoder::TimestampTz(array) => {
array.is_valid(idx).then(|| array.value(idx)).map(|x| {
let packed = PackedNaiveDateTime::from_bytes(x)
.expect("failed to roundtrip PackedNaiveDateTime");
let timestamp =
CheckedTimestamp::from_timestamplike(packed.into_value().and_utc())
.expect("failed to roundtrip timestamp");
Datum::TimestampTz(timestamp)
})
}
DatumColumnDecoder::MzTimestamp(array) => array
.is_valid(idx)
.then(|| array.value(idx))
.map(|x| Datum::MzTimestamp(Timestamp::from(x))),
DatumColumnDecoder::Interval(array) => {
array.is_valid(idx).then(|| array.value(idx)).map(|x| {
let packed =
PackedInterval::from_bytes(x).expect("failed to roundtrip interval");
Datum::Interval(packed.into_value())
})
}
DatumColumnDecoder::Uuid(array) => {
array.is_valid(idx).then(|| array.value(idx)).map(|x| {
let uuid = Uuid::from_slice(x).expect("failed to roundtrip uuid");
Datum::Uuid(uuid)
})
}
DatumColumnDecoder::AclItem(array) => {
array.is_valid(idx).then(|| array.value(idx)).map(|x| {
let packed =
PackedAclItem::from_bytes(x).expect("failed to roundtrip MzAclItem");
Datum::AclItem(packed.into_value())
})
}
DatumColumnDecoder::MzAclItem(array) => {
array.is_valid(idx).then(|| array.value(idx)).map(|x| {
let packed =
PackedMzAclItem::from_bytes(x).expect("failed to roundtrip MzAclItem");
Datum::MzAclItem(packed.into_value())
})
}
DatumColumnDecoder::Range(array) => {
let Some(val) = array.is_valid(idx).then(|| array.value(idx)) else {
packer.push(Datum::Null);
return;
};
let proto = ProtoDatum::decode(val).expect("failed to roundtrip Range");
packer
.try_push_proto(&proto)
.expect("failed to pack ProtoRange");
return;
}
DatumColumnDecoder::Json(array) => {
let Some(val) = array.is_valid(idx).then(|| array.value(idx)) else {
packer.push(Datum::Null);
return;
};
JsonbPacker::new(packer)
.pack_str(val)
.expect("failed to roundtrip JSON");
return;
}
DatumColumnDecoder::Array {
dim_offsets,
dims,
val_offsets,
vals,
nulls,
} => {
let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
if !is_valid {
packer.push(Datum::Null);
return;
}
let start: usize = dim_offsets[idx]
.try_into()
.expect("unexpected negative offset");
let end: usize = dim_offsets[idx + 1]
.try_into()
.expect("unexpected negative offset");
let dimensions = (start..end).map(|idx| {
PackedArrayDimension::from_bytes(dims.value(idx))
.expect("failed to roundtrip ArrayDimension")
.into_value()
});
let start: usize = val_offsets[idx]
.try_into()
.expect("unexpected negative offset");
let end: usize = val_offsets[idx + 1]
.try_into()
.expect("unexpected negative offset");
packer
.push_array_with_row_major(dimensions, |packer| {
for x in start..end {
vals.get(x, packer);
}
end - start
})
.expect("failed to pack Array");
return;
}
DatumColumnDecoder::List {
offsets,
values,
nulls,
} => {
let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
if !is_valid {
packer.push(Datum::Null);
return;
}
let start: usize = offsets[idx].try_into().expect("unexpected negative offset");
let end: usize = offsets[idx + 1]
.try_into()
.expect("unexpected negative offset");
packer.push_list_with(|packer| {
for idx in start..end {
values.get(idx, packer)
}
});
return;
}
DatumColumnDecoder::Map {
offsets,
keys,
vals,
nulls,
} => {
let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
if !is_valid {
packer.push(Datum::Null);
return;
}
let start: usize = offsets[idx].try_into().expect("unexpected negative offset");
let end: usize = offsets[idx + 1]
.try_into()
.expect("unexpected negative offset");
packer.push_dict_with(|packer| {
for idx in start..end {
packer.push(Datum::String(keys.value(idx)));
vals.get(idx, packer);
}
});
return;
}
DatumColumnDecoder::RecordEmpty(array) => array.is_valid(idx).then(Datum::empty_list),
DatumColumnDecoder::Record { fields, nulls } => {
let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
if !is_valid {
packer.push(Datum::Null);
return;
}
packer.push_list_with(|packer| {
for field in fields {
field.get(idx, packer);
}
});
return;
}
};
match datum {
Some(d) => packer.push(d),
None => packer.push(Datum::Null),
}
}
fn stats(&self) -> ColumnStatKinds {
match self {
DatumColumnDecoder::Bool(a) => PrimitiveStats::<bool>::from_column(a).into(),
DatumColumnDecoder::U8(a) => PrimitiveStats::<u8>::from_column(a).into(),
DatumColumnDecoder::U16(a) => PrimitiveStats::<u16>::from_column(a).into(),
DatumColumnDecoder::U32(a) => PrimitiveStats::<u32>::from_column(a).into(),
DatumColumnDecoder::U64(a) => PrimitiveStats::<u64>::from_column(a).into(),
DatumColumnDecoder::I16(a) => PrimitiveStats::<i16>::from_column(a).into(),
DatumColumnDecoder::I32(a) => PrimitiveStats::<i32>::from_column(a).into(),
DatumColumnDecoder::I64(a) => PrimitiveStats::<i64>::from_column(a).into(),
DatumColumnDecoder::F32(a) => PrimitiveStats::<f32>::from_column(a).into(),
DatumColumnDecoder::F64(a) => PrimitiveStats::<f64>::from_column(a).into(),
DatumColumnDecoder::Numeric(a) => numeric_stats_from_column(a),
DatumColumnDecoder::String(a) => PrimitiveStats::<String>::from_column(a).into(),
DatumColumnDecoder::Bytes(a) => PrimitiveStats::<Vec<u8>>::from_column(a).into(),
DatumColumnDecoder::Date(a) => PrimitiveStats::<i32>::from_column(a).into(),
DatumColumnDecoder::Time(a) => {
fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedTime)
}
DatumColumnDecoder::Timestamp(a) => {
fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedDateTime)
}
DatumColumnDecoder::TimestampTz(a) => {
fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedDateTime)
}
DatumColumnDecoder::MzTimestamp(a) => PrimitiveStats::<u64>::from_column(a).into(),
DatumColumnDecoder::Interval(a) => {
fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedInterval)
}
DatumColumnDecoder::Uuid(a) => {
fixed_stats_from_column(a, FixedSizeBytesStatsKind::Uuid)
}
DatumColumnDecoder::AclItem(_)
| DatumColumnDecoder::MzAclItem(_)
| DatumColumnDecoder::Range(_) => ColumnStatKinds::None,
DatumColumnDecoder::Json(a) => stats_for_json(a.iter()).values,
DatumColumnDecoder::Array { .. }
| DatumColumnDecoder::List { .. }
| DatumColumnDecoder::Map { .. }
| DatumColumnDecoder::Record { .. }
| DatumColumnDecoder::RecordEmpty(_) => ColumnStatKinds::None,
}
}
}
impl Schema2<Row> for RelationDesc {
type ArrowColumn = arrow::array::StructArray;
type Statistics = OptionStats<StructStats>;
type Decoder = RowColumnarDecoder;
type Encoder = RowColumnarEncoder;
fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
RowColumnarDecoder::new(col, self)
}
fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
RowColumnarEncoder::new(self)
.ok_or_else(|| anyhow::anyhow!("Cannot encode a RelationDesc with no columns"))
}
}
#[derive(Debug)]
pub struct RowColumnarDecoder {
len: usize,
decoders: Vec<(Arc<str>, Option<usize>, DatumColumnDecoder)>,
nullability: Option<NullBuffer>,
}
fn mask_nulls(column: &ArrayRef, null_mask: Option<&NullBuffer>) -> ArrayRef {
if null_mask.is_none() {
Arc::clone(column)
} else {
let nulls = NullBuffer::union(null_mask, column.nulls());
let data = column
.to_data()
.into_builder()
.nulls(nulls)
.build()
.expect("changed only null mask");
make_array(data)
}
}
impl RowColumnarDecoder {
pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
let inner_columns = col.columns();
let desc_columns = desc.typ().columns();
if inner_columns.len() != desc_columns.len() {
anyhow::bail!(
"provided array has {inner_columns:?}, relation desc has {desc_columns:?}"
);
}
let mut decoders = Vec::with_capacity(desc_columns.len());
let null_mask = col.nulls();
for (col_idx, (col_name, col_type)) in desc.iter().enumerate() {
let field_name = col_idx.to_string();
let column = col.column_by_name(&field_name).ok_or_else(|| {
anyhow::anyhow!(
"StructArray did not contain column name {field_name}, found {:?}",
col.column_names()
)
})?;
let column = mask_nulls(column, null_mask);
let null_count = col_type.nullable.then(|| column.null_count());
let decoder = array_to_decoder(&column, &col_type.scalar_type)?;
decoders.push((col_name.as_str().into(), null_count, decoder));
}
Ok(RowColumnarDecoder {
len: col.len(),
decoders,
nullability: col.logical_nulls(),
})
}
pub fn null_count(&self) -> usize {
self.nullability.as_ref().map_or(0, |n| n.null_count())
}
}
impl ColumnDecoder<Row> for RowColumnarDecoder {
fn decode(&self, idx: usize, val: &mut Row) {
let mut packer = val.packer();
for (_, _, decoder) in &self.decoders {
decoder.get(idx, &mut packer);
}
}
fn is_null(&self, idx: usize) -> bool {
let Some(nullability) = self.nullability.as_ref() else {
return false;
};
nullability.is_null(idx)
}
fn stats(&self) -> StructStats {
StructStats {
len: self.len,
cols: self
.decoders
.iter()
.map(|(name, null_count, decoder)| {
let name = name.to_string();
let stats = ColumnarStats {
nulls: null_count.map(|count| ColumnNullStats { count }),
values: decoder.stats(),
};
(name, stats)
})
.collect(),
}
}
}
#[derive(Debug)]
pub struct RowColumnarEncoder {
encoders: Vec<DatumEncoder>,
col_names: Vec<(usize, Arc<str>)>,
nullability: BooleanBufferBuilder,
}
impl RowColumnarEncoder {
pub fn new(desc: &RelationDesc) -> Option<Self> {
if desc.typ().columns().is_empty() {
return None;
}
let (col_names, encoders): (Vec<_>, Vec<_>) = desc
.iter()
.enumerate()
.map(|(idx, (col_name, col_type))| {
let encoder = scalar_type_to_encoder(&col_type.scalar_type)
.expect("failed to create encoder");
let encoder = DatumEncoder {
nullable: col_type.nullable,
encoder,
};
let name = (idx, col_name.as_str().into());
(name, encoder)
})
.unzip();
Some(RowColumnarEncoder {
encoders,
col_names,
nullability: BooleanBufferBuilder::new(100),
})
}
}
impl ColumnEncoder<Row> for RowColumnarEncoder {
type FinishedColumn = StructArray;
fn goodbytes(&self) -> usize {
self.encoders.iter().map(|e| e.goodbytes()).sum()
}
fn append(&mut self, val: &Row) {
let mut num_datums = 0;
for (datum, encoder) in val.iter().zip(self.encoders.iter_mut()) {
encoder.push(datum);
num_datums += 1;
}
assert_eq!(
num_datums,
self.encoders.len(),
"tried to encode {val:?}, but only have {:?}",
self.encoders
);
self.nullability.append(true);
}
fn append_null(&mut self) {
for encoder in self.encoders.iter_mut() {
encoder.push_invalid();
}
self.nullability.append(false);
}
fn finish(self) -> Self::FinishedColumn {
let RowColumnarEncoder {
encoders,
col_names,
nullability,
..
} = self;
let (arrays, fields): (Vec<_>, Vec<_>) = col_names
.iter()
.zip_eq(encoders)
.map(|((col_idx, _col_name), encoder)| {
let nullable = true;
let array = encoder.finish();
let field = Field::new(col_idx.to_string(), array.data_type().clone(), nullable);
(array, field)
})
.multiunzip();
let null_buffer = NullBuffer::from(BooleanBuffer::from(nullability));
let array = StructArray::new(Fields::from(fields), arrays, Some(null_buffer));
array
}
}
#[inline]
fn downcast_array<T: 'static>(array: &Arc<dyn Array>) -> Result<&T, anyhow::Error> {
array
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("expected {}, found {array:?}", std::any::type_name::<T>()))
}
fn array_to_decoder(
array: &Arc<dyn Array>,
col_ty: &ScalarType,
) -> Result<DatumColumnDecoder, anyhow::Error> {
let decoder = match (array.data_type(), col_ty) {
(DataType::Boolean, ScalarType::Bool) => {
let array = downcast_array::<BooleanArray>(array)?;
DatumColumnDecoder::Bool(array.clone())
}
(DataType::UInt8, ScalarType::PgLegacyChar) => {
let array = downcast_array::<UInt8Array>(array)?;
DatumColumnDecoder::U8(array.clone())
}
(DataType::UInt16, ScalarType::UInt16) => {
let array = downcast_array::<UInt16Array>(array)?;
DatumColumnDecoder::U16(array.clone())
}
(
DataType::UInt32,
ScalarType::UInt32
| ScalarType::Oid
| ScalarType::RegClass
| ScalarType::RegProc
| ScalarType::RegType,
) => {
let array = downcast_array::<UInt32Array>(array)?;
DatumColumnDecoder::U32(array.clone())
}
(DataType::UInt64, ScalarType::UInt64) => {
let array = downcast_array::<UInt64Array>(array)?;
DatumColumnDecoder::U64(array.clone())
}
(DataType::Int16, ScalarType::Int16) => {
let array = downcast_array::<Int16Array>(array)?;
DatumColumnDecoder::I16(array.clone())
}
(DataType::Int32, ScalarType::Int32) => {
let array = downcast_array::<Int32Array>(array)?;
DatumColumnDecoder::I32(array.clone())
}
(DataType::Int64, ScalarType::Int64) => {
let array = downcast_array::<Int64Array>(array)?;
DatumColumnDecoder::I64(array.clone())
}
(DataType::Float32, ScalarType::Float32) => {
let array = downcast_array::<Float32Array>(array)?;
DatumColumnDecoder::F32(array.clone())
}
(DataType::Float64, ScalarType::Float64) => {
let array = downcast_array::<Float64Array>(array)?;
DatumColumnDecoder::F64(array.clone())
}
(DataType::Struct(_), ScalarType::Numeric { .. }) => {
let array = downcast_array::<StructArray>(array)?;
let binary_values = array
.column_by_name("binary")
.expect("missing binary column");
let array = downcast_array::<BinaryArray>(binary_values)?;
DatumColumnDecoder::Numeric(array.clone())
}
(
DataType::Utf8,
ScalarType::String
| ScalarType::PgLegacyName
| ScalarType::Char { .. }
| ScalarType::VarChar { .. },
) => {
let array = downcast_array::<StringArray>(array)?;
DatumColumnDecoder::String(array.clone())
}
(DataType::Binary, ScalarType::Bytes) => {
let array = downcast_array::<BinaryArray>(array)?;
DatumColumnDecoder::Bytes(array.clone())
}
(DataType::Int32, ScalarType::Date) => {
let array = downcast_array::<Int32Array>(array)?;
DatumColumnDecoder::Date(array.clone())
}
(DataType::FixedSizeBinary(TIME_FIXED_BYTES), ScalarType::Time) => {
let array = downcast_array::<FixedSizeBinaryArray>(array)?;
DatumColumnDecoder::Time(array.clone())
}
(DataType::FixedSizeBinary(TIMESTAMP_FIXED_BYTES), ScalarType::Timestamp { .. }) => {
let array = downcast_array::<FixedSizeBinaryArray>(array)?;
DatumColumnDecoder::Timestamp(array.clone())
}
(DataType::FixedSizeBinary(TIMESTAMP_FIXED_BYTES), ScalarType::TimestampTz { .. }) => {
let array = downcast_array::<FixedSizeBinaryArray>(array)?;
DatumColumnDecoder::TimestampTz(array.clone())
}
(DataType::UInt64, ScalarType::MzTimestamp) => {
let array = downcast_array::<UInt64Array>(array)?;
DatumColumnDecoder::MzTimestamp(array.clone())
}
(DataType::FixedSizeBinary(INTERVAL_FIXED_BYTES), ScalarType::Interval) => {
let array = downcast_array::<FixedSizeBinaryArray>(array)?;
DatumColumnDecoder::Interval(array.clone())
}
(DataType::FixedSizeBinary(UUID_FIXED_BYTES), ScalarType::Uuid) => {
let array = downcast_array::<FixedSizeBinaryArray>(array)?;
DatumColumnDecoder::Uuid(array.clone())
}
(DataType::FixedSizeBinary(ACL_ITEM_FIXED_BYTES), ScalarType::AclItem) => {
let array = downcast_array::<FixedSizeBinaryArray>(array)?;
DatumColumnDecoder::AclItem(array.clone())
}
(DataType::Binary, ScalarType::MzAclItem) => {
let array = downcast_array::<BinaryArray>(array)?;
DatumColumnDecoder::MzAclItem(array.clone())
}
(DataType::Binary, ScalarType::Range { .. }) => {
let array = downcast_array::<BinaryArray>(array)?;
DatumColumnDecoder::Range(array.clone())
}
(DataType::Utf8, ScalarType::Jsonb) => {
let array = downcast_array::<StringArray>(array)?;
DatumColumnDecoder::Json(array.clone())
}
(DataType::Struct(_), s @ ScalarType::Array(_) | s @ ScalarType::Int2Vector) => {
let element_type = match s {
ScalarType::Array(inner) => inner,
ScalarType::Int2Vector => &ScalarType::Int16,
_ => unreachable!("checked above"),
};
let array = downcast_array::<StructArray>(array)?;
let nulls = array.nulls().cloned();
let dims = array
.column_by_name("dims")
.expect("missing dimensions column");
let dims = downcast_array::<ListArray>(dims).cloned()?;
let dim_offsets = dims.offsets().clone();
let dims = downcast_array::<FixedSizeBinaryArray>(dims.values()).cloned()?;
let vals = array.column_by_name("vals").expect("missing values column");
let vals = downcast_array::<ListArray>(vals)?;
let val_offsets = vals.offsets().clone();
let vals = array_to_decoder(vals.values(), element_type)?;
DatumColumnDecoder::Array {
dim_offsets,
dims,
val_offsets,
vals: Box::new(vals),
nulls,
}
}
(DataType::List(_), ScalarType::List { element_type, .. }) => {
let array = downcast_array::<ListArray>(array)?;
let inner_decoder = array_to_decoder(array.values(), &*element_type)?;
DatumColumnDecoder::List {
offsets: array.offsets().clone(),
values: Box::new(inner_decoder),
nulls: array.nulls().cloned(),
}
}
(DataType::Map(_, true), ScalarType::Map { value_type, .. }) => {
let array = downcast_array::<MapArray>(array)?;
let keys = downcast_array::<StringArray>(array.keys())?;
let vals = array_to_decoder(array.values(), value_type)?;
DatumColumnDecoder::Map {
offsets: array.offsets().clone(),
keys: keys.clone(),
vals: Box::new(vals),
nulls: array.nulls().cloned(),
}
}
(DataType::List(_), ScalarType::Map { value_type, .. }) => {
let array: &ListArray = downcast_array(array)?;
let entries: &StructArray = downcast_array(array.values())?;
let [keys, values]: &[ArrayRef; 2] = entries.columns().try_into()?;
let keys: &StringArray = downcast_array(keys)?;
let vals: DatumColumnDecoder = array_to_decoder(values, value_type)?;
DatumColumnDecoder::Map {
offsets: array.offsets().clone(),
keys: keys.clone(),
vals: Box::new(vals),
nulls: array.nulls().cloned(),
}
}
(DataType::Boolean, ScalarType::Record { fields, .. }) if fields.is_empty() => {
let empty_record_array = downcast_array::<BooleanArray>(array)?;
DatumColumnDecoder::RecordEmpty(empty_record_array.clone())
}
(DataType::Struct(_), ScalarType::Record { fields, .. }) => {
let record_array = downcast_array::<StructArray>(array)?;
let null_mask = record_array.nulls();
let mut decoders = Vec::with_capacity(fields.len());
for (tag, (_name, col_type)) in fields.iter().enumerate() {
let inner_array = record_array
.column_by_name(&tag.to_string())
.ok_or_else(|| anyhow::anyhow!("no column named '{tag}'"))?;
let inner_array = mask_nulls(inner_array, null_mask);
let inner_decoder = array_to_decoder(&inner_array, &col_type.scalar_type)?;
decoders.push(Box::new(inner_decoder));
}
DatumColumnDecoder::Record {
fields: decoders,
nulls: record_array.nulls().cloned(),
}
}
(x, y) => {
let msg = format!("can't decode column of {x:?} for scalar type {y:?}");
mz_ore::soft_panic_or_log!("{msg}");
anyhow::bail!("{msg}");
}
};
Ok(decoder)
}
fn scalar_type_to_encoder(col_ty: &ScalarType) -> Result<DatumColumnEncoder, anyhow::Error> {
let encoder = match &col_ty {
ScalarType::Bool => DatumColumnEncoder::Bool(BooleanBuilder::new()),
ScalarType::PgLegacyChar => DatumColumnEncoder::U8(UInt8Builder::new()),
ScalarType::UInt16 => DatumColumnEncoder::U16(UInt16Builder::new()),
ScalarType::UInt32
| ScalarType::Oid
| ScalarType::RegClass
| ScalarType::RegProc
| ScalarType::RegType => DatumColumnEncoder::U32(UInt32Builder::new()),
ScalarType::UInt64 => DatumColumnEncoder::U64(UInt64Builder::new()),
ScalarType::Int16 => DatumColumnEncoder::I16(Int16Builder::new()),
ScalarType::Int32 => DatumColumnEncoder::I32(Int32Builder::new()),
ScalarType::Int64 => DatumColumnEncoder::I64(Int64Builder::new()),
ScalarType::Float32 => DatumColumnEncoder::F32(Float32Builder::new()),
ScalarType::Float64 => DatumColumnEncoder::F64(Float64Builder::new()),
ScalarType::Numeric { .. } => DatumColumnEncoder::Numeric {
approx_values: Float64Builder::new(),
binary_values: BinaryBuilder::new(),
numeric_context: crate::adt::numeric::cx_datum().clone(),
},
ScalarType::String
| ScalarType::PgLegacyName
| ScalarType::Char { .. }
| ScalarType::VarChar { .. } => DatumColumnEncoder::String(StringBuilder::new()),
ScalarType::Bytes => DatumColumnEncoder::Bytes(BinaryBuilder::new()),
ScalarType::Date => DatumColumnEncoder::Date(Int32Builder::new()),
ScalarType::Time => DatumColumnEncoder::Time(FixedSizeBinaryBuilder::new(TIME_FIXED_BYTES)),
ScalarType::Timestamp { .. } => {
DatumColumnEncoder::Timestamp(FixedSizeBinaryBuilder::new(TIMESTAMP_FIXED_BYTES))
}
ScalarType::TimestampTz { .. } => {
DatumColumnEncoder::TimestampTz(FixedSizeBinaryBuilder::new(TIMESTAMP_FIXED_BYTES))
}
ScalarType::MzTimestamp => DatumColumnEncoder::MzTimestamp(UInt64Builder::new()),
ScalarType::Interval => {
DatumColumnEncoder::Interval(FixedSizeBinaryBuilder::new(INTERVAL_FIXED_BYTES))
}
ScalarType::Uuid => DatumColumnEncoder::Uuid(FixedSizeBinaryBuilder::new(UUID_FIXED_BYTES)),
ScalarType::AclItem => {
DatumColumnEncoder::AclItem(FixedSizeBinaryBuilder::new(ACL_ITEM_FIXED_BYTES))
}
ScalarType::MzAclItem => DatumColumnEncoder::MzAclItem(BinaryBuilder::new()),
ScalarType::Range { .. } => DatumColumnEncoder::Range(BinaryBuilder::new()),
ScalarType::Jsonb => DatumColumnEncoder::Jsonb {
offsets: vec![0],
buf: Vec::new(),
nulls: None,
},
s @ ScalarType::Array(_) | s @ ScalarType::Int2Vector => {
let element_type = match s {
ScalarType::Array(inner) => inner,
ScalarType::Int2Vector => &ScalarType::Int16,
_ => unreachable!("checked above"),
};
let inner = scalar_type_to_encoder(element_type)?;
DatumColumnEncoder::Array {
dims: ListBuilder::new(FixedSizeBinaryBuilder::new(ARRAY_DIMENSION_FIXED_BYTES)),
val_lengths: Vec::new(),
vals: Box::new(inner),
nulls: None,
}
}
ScalarType::List { element_type, .. } => {
let inner = scalar_type_to_encoder(&*element_type)?;
DatumColumnEncoder::List {
lengths: Vec::new(),
values: Box::new(inner),
nulls: None,
}
}
ScalarType::Map { value_type, .. } => {
let inner = scalar_type_to_encoder(&*value_type)?;
DatumColumnEncoder::Map {
lengths: Vec::new(),
keys: StringBuilder::new(),
vals: Box::new(inner),
nulls: None,
}
}
ScalarType::Record { fields, .. } if fields.is_empty() => {
DatumColumnEncoder::RecordEmpty(BooleanBuilder::new())
}
ScalarType::Record { fields, .. } => {
let encoders = fields
.iter()
.map(|(_name, ty)| {
scalar_type_to_encoder(&ty.scalar_type).map(|e| DatumEncoder {
nullable: ty.nullable,
encoder: e,
})
})
.collect::<Result<_, _>>()?;
DatumColumnEncoder::Record {
fields: encoders,
nulls: None,
length: 0,
}
}
};
Ok(encoder)
}
impl Codec for Row {
type Storage = ProtoRow;
type Schema = RelationDesc;
fn codec_name() -> String {
"protobuf[Row]".into()
}
fn encode<B>(&self, buf: &mut B)
where
B: BufMut,
{
self.into_proto()
.encode(buf)
.expect("no required fields means no initialization errors");
}
fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Row, String> {
let mut row = Row::with_capacity(buf.len());
<Self as Codec>::decode_from(&mut row, buf, &mut None, schema)?;
Ok(row)
}
fn decode_from<'a>(
&mut self,
buf: &'a [u8],
storage: &mut Option<ProtoRow>,
schema: &RelationDesc,
) -> Result<(), String> {
let mut proto = storage.take().unwrap_or_default();
proto.clear();
proto.merge(buf).map_err(|err| err.to_string())?;
let ret = self.decode_from_proto(&proto, schema);
storage.replace(proto);
ret
}
fn validate(row: &Self, desc: &Self::Schema) -> Result<(), String> {
for x in Itertools::zip_longest(desc.iter_types(), row.iter()) {
match x {
EitherOrBoth::Both(typ, datum) if datum.is_instance_of(typ) => continue,
_ => return Err(format!("row {:?} did not match desc {:?}", row, desc)),
};
}
Ok(())
}
fn encode_schema(schema: &Self::Schema) -> Bytes {
schema.into_proto().encode_to_vec().into()
}
fn decode_schema(buf: &Bytes) -> Self::Schema {
let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
proto.into_rust().expect("valid schema")
}
}
impl<'a> From<Datum<'a>> for ProtoDatum {
fn from(x: Datum<'a>) -> Self {
let datum_type = match x {
Datum::False => DatumType::Other(ProtoDatumOther::False.into()),
Datum::True => DatumType::Other(ProtoDatumOther::True.into()),
Datum::Int16(x) => DatumType::Int16(x.into()),
Datum::Int32(x) => DatumType::Int32(x),
Datum::UInt8(x) => DatumType::Uint8(x.into()),
Datum::UInt16(x) => DatumType::Uint16(x.into()),
Datum::UInt32(x) => DatumType::Uint32(x),
Datum::UInt64(x) => DatumType::Uint64(x),
Datum::Int64(x) => DatumType::Int64(x),
Datum::Float32(x) => DatumType::Float32(x.into_inner()),
Datum::Float64(x) => DatumType::Float64(x.into_inner()),
Datum::Date(x) => DatumType::Date(x.into_proto()),
Datum::Time(x) => DatumType::Time(ProtoNaiveTime {
secs: x.num_seconds_from_midnight(),
frac: x.nanosecond(),
}),
Datum::Timestamp(x) => DatumType::Timestamp(x.into_proto()),
Datum::TimestampTz(x) => DatumType::TimestampTz(x.into_proto()),
Datum::Interval(x) => DatumType::Interval(x.into_proto()),
Datum::Bytes(x) => DatumType::Bytes(Bytes::copy_from_slice(x)),
Datum::String(x) => DatumType::String(x.to_owned()),
Datum::Array(x) => DatumType::Array(ProtoArray {
elements: Some(ProtoRow {
datums: x.elements().iter().map(|x| x.into()).collect(),
}),
dims: x
.dims()
.into_iter()
.map(|x| ProtoArrayDimension {
lower_bound: i64::cast_from(x.lower_bound),
length: u64::cast_from(x.length),
})
.collect(),
}),
Datum::List(x) => DatumType::List(ProtoRow {
datums: x.iter().map(|x| x.into()).collect(),
}),
Datum::Map(x) => DatumType::Dict(ProtoDict {
elements: x
.iter()
.map(|(k, v)| ProtoDictElement {
key: k.to_owned(),
val: Some(v.into()),
})
.collect(),
}),
Datum::Numeric(x) => {
let mut x = x.0.clone();
if let Some((bcd, scale)) = x.to_packed_bcd() {
DatumType::Numeric(ProtoNumeric { bcd, scale })
} else if x.is_nan() {
DatumType::Other(ProtoDatumOther::NumericNaN.into())
} else if x.is_infinite() {
if x.is_negative() {
DatumType::Other(ProtoDatumOther::NumericNegInf.into())
} else {
DatumType::Other(ProtoDatumOther::NumericPosInf.into())
}
} else if x.is_special() {
panic!("internal error: unhandled special numeric value: {}", x);
} else {
panic!(
"internal error: to_packed_bcd returned None for non-special value: {}",
x
)
}
}
Datum::JsonNull => DatumType::Other(ProtoDatumOther::JsonNull.into()),
Datum::Uuid(x) => DatumType::Uuid(x.as_bytes().to_vec()),
Datum::MzTimestamp(x) => DatumType::MzTimestamp(x.into()),
Datum::Dummy => DatumType::Other(ProtoDatumOther::Dummy.into()),
Datum::Null => DatumType::Other(ProtoDatumOther::Null.into()),
Datum::Range(super::Range { inner }) => DatumType::Range(Box::new(ProtoRange {
inner: inner.map(|RangeInner { lower, upper }| {
Box::new(ProtoRangeInner {
lower_inclusive: lower.inclusive,
lower: lower.bound.map(|bound| Box::new(bound.datum().into())),
upper_inclusive: upper.inclusive,
upper: upper.bound.map(|bound| Box::new(bound.datum().into())),
})
}),
})),
Datum::MzAclItem(x) => DatumType::MzAclItem(x.into_proto()),
Datum::AclItem(x) => DatumType::AclItem(x.into_proto()),
};
ProtoDatum {
datum_type: Some(datum_type),
}
}
}
impl RowPacker<'_> {
pub(crate) fn try_push_proto(&mut self, x: &ProtoDatum) -> Result<(), String> {
match &x.datum_type {
Some(DatumType::Other(o)) => match ProtoDatumOther::try_from(*o) {
Ok(ProtoDatumOther::Unknown) => return Err("unknown datum type".into()),
Ok(ProtoDatumOther::Null) => self.push(Datum::Null),
Ok(ProtoDatumOther::False) => self.push(Datum::False),
Ok(ProtoDatumOther::True) => self.push(Datum::True),
Ok(ProtoDatumOther::JsonNull) => self.push(Datum::JsonNull),
Ok(ProtoDatumOther::Dummy) => {
#[cfg(feature = "tracing_")]
tracing::error!("protobuf decoding found Dummy datum");
self.push(Datum::Dummy);
}
Ok(ProtoDatumOther::NumericPosInf) => self.push(Datum::from(Numeric::infinity())),
Ok(ProtoDatumOther::NumericNegInf) => self.push(Datum::from(-Numeric::infinity())),
Ok(ProtoDatumOther::NumericNaN) => self.push(Datum::from(Numeric::nan())),
Err(_) => return Err(format!("unknown datum type: {}", o)),
},
Some(DatumType::Int16(x)) => {
let x = i16::try_from(*x)
.map_err(|_| format!("int16 field stored with out of range value: {}", *x))?;
self.push(Datum::Int16(x))
}
Some(DatumType::Int32(x)) => self.push(Datum::Int32(*x)),
Some(DatumType::Int64(x)) => self.push(Datum::Int64(*x)),
Some(DatumType::Uint8(x)) => {
let x = u8::try_from(*x)
.map_err(|_| format!("uint8 field stored with out of range value: {}", *x))?;
self.push(Datum::UInt8(x))
}
Some(DatumType::Uint16(x)) => {
let x = u16::try_from(*x)
.map_err(|_| format!("uint16 field stored with out of range value: {}", *x))?;
self.push(Datum::UInt16(x))
}
Some(DatumType::Uint32(x)) => self.push(Datum::UInt32(*x)),
Some(DatumType::Uint64(x)) => self.push(Datum::UInt64(*x)),
Some(DatumType::Float32(x)) => self.push(Datum::Float32((*x).into())),
Some(DatumType::Float64(x)) => self.push(Datum::Float64((*x).into())),
Some(DatumType::Bytes(x)) => self.push(Datum::Bytes(x)),
Some(DatumType::String(x)) => self.push(Datum::String(x)),
Some(DatumType::Uuid(x)) => {
let u = Uuid::from_slice(x).map_err(|err| err.to_string())?;
self.push(Datum::Uuid(u));
}
Some(DatumType::Date(x)) => self.push(Datum::Date(x.clone().into_rust()?)),
Some(DatumType::Time(x)) => self.push(Datum::Time(x.clone().into_rust()?)),
Some(DatumType::Timestamp(x)) => self.push(Datum::Timestamp(x.clone().into_rust()?)),
Some(DatumType::TimestampTz(x)) => {
self.push(Datum::TimestampTz(x.clone().into_rust()?))
}
Some(DatumType::Interval(x)) => self.push(Datum::Interval(
x.clone()
.into_rust()
.map_err(|e: TryFromProtoError| e.to_string())?,
)),
Some(DatumType::List(x)) => self.push_list_with(|row| -> Result<(), String> {
for d in x.datums.iter() {
row.try_push_proto(d)?;
}
Ok(())
})?,
Some(DatumType::Array(x)) => {
let dims = x
.dims
.iter()
.map(|x| ArrayDimension {
lower_bound: isize::cast_from(x.lower_bound),
length: usize::cast_from(x.length),
})
.collect::<Vec<_>>();
match x.elements.as_ref() {
None => self.push_array(&dims, [].iter()),
Some(elements) => {
let elements_row = Row::try_from(elements)?;
self.push_array(&dims, elements_row.iter())
}
}
.map_err(|err| err.to_string())?
}
Some(DatumType::Dict(x)) => self.push_dict_with(|row| -> Result<(), String> {
for e in x.elements.iter() {
row.push(Datum::from(e.key.as_str()));
let val = e
.val
.as_ref()
.ok_or_else(|| format!("missing val for key: {}", e.key))?;
row.try_push_proto(val)?;
}
Ok(())
})?,
Some(DatumType::Numeric(x)) => {
let n = Decimal::from_packed_bcd(&x.bcd, x.scale).map_err(|err| err.to_string())?;
self.push(Datum::from(n))
}
Some(DatumType::MzTimestamp(x)) => self.push(Datum::MzTimestamp((*x).into())),
Some(DatumType::Range(inner)) => {
let ProtoRange { inner } = &**inner;
match inner {
None => self.push_range(Range { inner: None }).unwrap(),
Some(inner) => {
let ProtoRangeInner {
lower_inclusive,
lower,
upper_inclusive,
upper,
} = &**inner;
self.push_range_with(
RangeLowerBound {
inclusive: *lower_inclusive,
bound: lower
.as_ref()
.map(|d| |row: &mut RowPacker| row.try_push_proto(&*d)),
},
RangeUpperBound {
inclusive: *upper_inclusive,
bound: upper
.as_ref()
.map(|d| |row: &mut RowPacker| row.try_push_proto(&*d)),
},
)
.expect("decoding ProtoRow must succeed");
}
}
}
Some(DatumType::MzAclItem(x)) => self.push(Datum::MzAclItem(x.clone().into_rust()?)),
Some(DatumType::AclItem(x)) => self.push(Datum::AclItem(x.clone().into_rust()?)),
None => return Err("unknown datum type".into()),
};
Ok(())
}
}
impl TryFrom<&ProtoRow> for Row {
type Error = String;
fn try_from(x: &ProtoRow) -> Result<Self, Self::Error> {
let mut row = Row::default();
let mut packer = row.packer();
for d in x.datums.iter() {
packer.try_push_proto(d)?;
}
Ok(row)
}
}
impl RustType<ProtoRow> for Row {
fn into_proto(&self) -> ProtoRow {
let datums = self.iter().map(|x| x.into()).collect();
ProtoRow { datums }
}
fn from_proto(proto: ProtoRow) -> Result<Self, TryFromProtoError> {
let mut row = Row::default();
let mut packer = row.packer();
for d in proto.datums.iter() {
packer
.try_push_proto(d)
.map_err(TryFromProtoError::RowConversionError)?;
}
Ok(row)
}
}
#[cfg(test)]
mod tests {
use arrow::array::{make_array, ArrayData};
use arrow::compute::SortOptions;
use arrow::datatypes::ArrowNativeType;
use arrow::row::SortField;
use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
use mz_ore::assert_err;
use mz_ore::collections::CollectionExt;
use mz_persist::indexed::columnar::arrow::realloc_array;
use mz_persist::metrics::ColumnarMetrics;
use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
use mz_persist_types::columnar::{codec_to_schema2, schema2_to_codec};
use mz_persist_types::Codec;
use mz_proto::{ProtoType, RustType};
use proptest::prelude::*;
use proptest::strategy::Strategy;
use uuid::Uuid;
use super::*;
use crate::adt::array::ArrayDimension;
use crate::adt::interval::Interval;
use crate::adt::numeric::Numeric;
use crate::adt::timestamp::CheckedTimestamp;
use crate::fixed_length::ToDatumIter;
use crate::relation::arb_relation_desc;
use crate::{arb_datum_for_column, arb_row_for_relation, ColumnName, ColumnType, RowArena};
use crate::{Datum, RelationDesc, Row, ScalarType};
#[track_caller]
fn roundtrip_datum<'a>(
ty: ColumnType,
datum: impl Iterator<Item = Datum<'a>>,
metrics: &ColumnarMetrics,
) {
let desc = RelationDesc::builder().with_column("a", ty).finish();
let rows = datum.map(|d| Row::pack_slice(&[d])).collect();
roundtrip_rows(&desc, rows, metrics)
}
#[track_caller]
fn roundtrip_rows(desc: &RelationDesc, rows: Vec<Row>, metrics: &ColumnarMetrics) {
let mut encoder = <RelationDesc as Schema2<Row>>::encoder(desc).unwrap();
for row in &rows {
encoder.append(row);
}
let col = encoder.finish();
let col = realloc_array(&col, metrics);
{
let proto = col.to_data().into_proto();
let bytes = proto.encode_to_vec();
let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
let array_data: ArrayData = proto.into_rust().unwrap();
let col_rnd = StructArray::from(array_data.clone());
assert_eq!(col, col_rnd);
let col_dyn = arrow::array::make_array(array_data);
let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
assert_eq!(&col, col_dyn);
}
let decoder = <RelationDesc as Schema2<Row>>::decoder(desc, col.clone()).unwrap();
let stats = decoder.stats();
let arena = RowArena::new();
let (stats, stat_nulls): (Vec<_>, Vec<_>) = desc
.iter()
.map(|(name, ty)| {
let col_stats = stats.cols.get(name.as_str()).unwrap();
let lower_upper =
crate::stats2::col_values(&ty.scalar_type, &col_stats.values, &arena);
let null_count = col_stats.nulls.map_or(0, |n| n.count);
(lower_upper, null_count)
})
.unzip();
let mut actual_nulls = vec![0usize; stats.len()];
let mut rnd_row = Row::default();
for (idx, og_row) in rows.iter().enumerate() {
decoder.decode(idx, &mut rnd_row);
assert_eq!(og_row, &rnd_row);
for (c_idx, (rnd_datum, ty)) in rnd_row.iter().zip_eq(desc.typ().columns()).enumerate()
{
let lower_upper = stats[c_idx];
if rnd_datum.is_null() {
actual_nulls[c_idx] += 1;
} else if let Some((lower, upper)) = lower_upper {
assert!(rnd_datum >= lower, "{rnd_datum:?} is not >= {lower:?}");
assert!(rnd_datum <= upper, "{rnd_datum:?} is not <= {upper:?}");
} else {
match &ty.scalar_type {
ScalarType::Jsonb => (),
ScalarType::AclItem
| ScalarType::MzAclItem
| ScalarType::Range { .. }
| ScalarType::Array(_)
| ScalarType::Map { .. }
| ScalarType::List { .. }
| ScalarType::Record { .. }
| ScalarType::Int2Vector => (),
other => panic!("should have collected stats for {other:?}"),
}
}
}
}
for (col_idx, (stats_count, actual_count)) in
stat_nulls.iter().zip_eq(actual_nulls.iter()).enumerate()
{
assert_eq!(
stats_count, actual_count,
"column {col_idx} has incorrect number of nulls!"
);
}
let codec = schema2_to_codec::<Row>(desc, &col).unwrap();
let col2 = codec_to_schema2::<Row>(desc, &codec).unwrap();
assert_eq!(col2.as_ref(), &col);
let converter = arrow::row::RowConverter::new(vec![SortField::new_with_options(
col.data_type().clone(),
SortOptions {
descending: false,
nulls_first: false,
},
)])
.expect("sortable");
let rows = converter
.convert_columns(&[Arc::new(col.clone())])
.expect("convertible");
let mut row_vec = rows.iter().collect::<Vec<_>>();
row_vec.sort();
let row_col = converter
.convert_rows(row_vec)
.expect("convertible")
.into_element();
assert_eq!(row_col.len(), col.len());
let ord = ArrayOrd::new(&col);
let mut indices = (0..u64::usize_as(col.len())).collect::<Vec<_>>();
indices.sort_by_key(|i| ord.at(i.as_usize()));
let indices = UInt64Array::from(indices);
let ord_col = ::arrow::compute::take(&col, &indices, None).expect("takeable");
assert_eq!(row_col.as_ref(), ord_col.as_ref());
let ordered_prefix_len = desc
.iter()
.take_while(|(_, c)| preserves_order(&c.scalar_type))
.count();
let decoder = <RelationDesc as Schema2<Row>>::decoder_any(desc, ord_col.as_ref()).unwrap();
let rows = (0..ord_col.len()).map(|i| {
let mut row = Row::default();
decoder.decode(i, &mut row);
row
});
for (a, b) in rows.tuple_windows() {
let a_prefix = a.iter().take(ordered_prefix_len);
let b_prefix = b.iter().take(ordered_prefix_len);
assert!(
a_prefix.cmp(b_prefix).is_le(),
"ordering should be consistent on preserves_order columns: {:#?}\n{:?}\n{:?}",
desc.iter().take(ordered_prefix_len).collect_vec(),
a.to_datum_iter().take(ordered_prefix_len).collect_vec(),
b.to_datum_iter().take(ordered_prefix_len).collect_vec()
);
}
assert_eq!(
ord.goodbytes(),
(0..col.len()).map(|i| ord.at(i).goodbytes()).sum::<usize>(),
"total size should match the sum of the sizes at each index"
);
if !ord_col.is_empty() {
let min_idx = indices.values()[0].as_usize();
let lower_bound = ArrayBound::new(ord_col, min_idx);
let max_encoded_len = 1000;
if let Some(proto) = lower_bound.to_proto_lower(max_encoded_len) {
assert!(
proto.encoded_len() <= max_encoded_len,
"should respect the max len"
);
let array_data = proto.into_rust().expect("valid array");
let new_lower_bound = ArrayBound::new(make_array(array_data), 0);
assert!(
new_lower_bound.get() <= lower_bound.get(),
"proto-roundtripped bound should be <= the original"
);
}
}
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn proptest_datums() {
let strat = any::<ColumnType>().prop_flat_map(|ty| {
proptest::collection::vec(arb_datum_for_column(&ty), 0..16)
.prop_map(move |d| (ty.clone(), d))
});
let metrics = ColumnarMetrics::disconnected();
proptest!(|((ty, datums) in strat)| {
roundtrip_datum(ty.clone(), datums.iter().map(Datum::from), &metrics);
})
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn proptest_non_empty_relation_descs() {
let strat = arb_relation_desc(1..8).prop_flat_map(|desc| {
proptest::collection::vec(arb_row_for_relation(&desc), 0..12)
.prop_map(move |rows| (desc.clone(), rows))
});
let metrics = ColumnarMetrics::disconnected();
proptest!(|((desc, rows) in strat)| {
roundtrip_rows(&desc, rows, &metrics)
})
}
#[mz_ore::test]
fn empty_relation_desc_returns_error() {
let empty_desc = RelationDesc::empty();
let result = <RelationDesc as Schema2<Row>>::encoder(&empty_desc);
assert_err!(result);
}
#[mz_ore::test]
fn smoketest_collections() {
let mut row = Row::default();
let mut packer = row.packer();
let metrics = ColumnarMetrics::disconnected();
packer
.push_array(
&[ArrayDimension {
lower_bound: 0,
length: 3,
}],
[Datum::UInt32(4), Datum::UInt32(5), Datum::UInt32(6)],
)
.unwrap();
let array = row.unpack_first();
roundtrip_datum(
ScalarType::Array(Box::new(ScalarType::UInt32)).nullable(true),
[array].into_iter(),
&metrics,
);
}
#[mz_ore::test]
fn smoketest_row() {
let desc = RelationDesc::builder()
.with_column("a", ScalarType::Int64.nullable(true))
.with_column("b", ScalarType::String.nullable(true))
.with_column("c", ScalarType::Bool.nullable(true))
.with_column(
"d",
ScalarType::List {
element_type: Box::new(ScalarType::UInt32),
custom_id: None,
}
.nullable(true),
)
.with_column(
"e",
ScalarType::Map {
value_type: Box::new(ScalarType::Int16),
custom_id: None,
}
.nullable(true),
)
.finish();
let mut encoder = <RelationDesc as Schema2<Row>>::encoder(&desc).unwrap();
let mut og_row = Row::default();
{
let mut packer = og_row.packer();
packer.push(Datum::Int64(100));
packer.push(Datum::String("hello world"));
packer.push(Datum::True);
packer.push_list([Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)]);
packer.push_dict([("bar", Datum::Int16(9)), ("foo", Datum::Int16(3))]);
}
let mut og_row_2 = Row::default();
{
let mut packer = og_row_2.packer();
packer.push(Datum::Null);
packer.push(Datum::Null);
packer.push(Datum::Null);
packer.push(Datum::Null);
packer.push(Datum::Null);
}
encoder.append(&og_row);
encoder.append(&og_row_2);
let col = encoder.finish();
let decoder = <RelationDesc as Schema2<Row>>::decoder(&desc, col).unwrap();
let mut rnd_row = Row::default();
decoder.decode(0, &mut rnd_row);
assert_eq!(og_row, rnd_row);
let mut rnd_row = Row::default();
decoder.decode(1, &mut rnd_row);
assert_eq!(og_row_2, rnd_row);
}
#[mz_ore::test]
fn test_nested_list() {
let desc = RelationDesc::builder()
.with_column(
"a",
ScalarType::List {
element_type: Box::new(ScalarType::List {
element_type: Box::new(ScalarType::Int64),
custom_id: None,
}),
custom_id: None,
}
.nullable(false),
)
.finish();
let mut encoder = <RelationDesc as Schema2<Row>>::encoder(&desc).unwrap();
let mut og_row = Row::default();
{
let mut packer = og_row.packer();
packer.push_list_with(|inner| {
inner.push_list([Datum::Int64(1), Datum::Int64(2)]);
inner.push_list([Datum::Int64(5)]);
inner.push_list([Datum::Int64(9), Datum::Int64(99), Datum::Int64(999)]);
});
}
encoder.append(&og_row);
let col = encoder.finish();
let decoder = <RelationDesc as Schema2<Row>>::decoder(&desc, col).unwrap();
let mut rnd_row = Row::default();
decoder.decode(0, &mut rnd_row);
assert_eq!(og_row, rnd_row);
}
#[mz_ore::test]
fn test_record() {
let desc = RelationDesc::builder()
.with_column(
"a",
ScalarType::Record {
fields: [
(ColumnName::from("foo"), ScalarType::Int64.nullable(false)),
(ColumnName::from("bar"), ScalarType::String.nullable(true)),
(
ColumnName::from("baz"),
ScalarType::List {
element_type: Box::new(ScalarType::UInt32),
custom_id: None,
}
.nullable(false),
),
]
.into(),
custom_id: None,
}
.nullable(true),
)
.finish();
let mut encoder = <RelationDesc as Schema2<Row>>::encoder(&desc).unwrap();
let mut og_row = Row::default();
{
let mut packer = og_row.packer();
packer.push_list_with(|inner| {
inner.push(Datum::Int64(42));
inner.push(Datum::Null);
inner.push_list([Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)]);
});
}
let null_row = Row::pack_slice(&[Datum::Null]);
encoder.append(&og_row);
encoder.append(&null_row);
let col = encoder.finish();
let decoder = <RelationDesc as Schema2<Row>>::decoder(&desc, col).unwrap();
let mut rnd_row = Row::default();
decoder.decode(0, &mut rnd_row);
assert_eq!(og_row, rnd_row);
rnd_row.packer();
decoder.decode(1, &mut rnd_row);
assert_eq!(null_row, rnd_row);
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn roundtrip() {
let mut row = Row::default();
let mut packer = row.packer();
packer.extend([
Datum::False,
Datum::True,
Datum::Int16(1),
Datum::Int32(2),
Datum::Int64(3),
Datum::Float32(4f32.into()),
Datum::Float64(5f64.into()),
Datum::Date(
NaiveDate::from_ymd_opt(6, 7, 8)
.unwrap()
.try_into()
.unwrap(),
),
Datum::Time(NaiveTime::from_hms_opt(9, 10, 11).unwrap()),
Datum::Timestamp(
CheckedTimestamp::from_timestamplike(
NaiveDate::from_ymd_opt(12, 13 % 12, 14)
.unwrap()
.and_time(NaiveTime::from_hms_opt(15, 16, 17).unwrap()),
)
.unwrap(),
),
Datum::TimestampTz(
CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
NaiveDate::from_ymd_opt(18, 19 % 12, 20)
.unwrap()
.and_time(NaiveTime::from_hms_opt(21, 22, 23).unwrap()),
Utc,
))
.unwrap(),
),
Datum::Interval(Interval {
months: 24,
days: 42,
micros: 25,
}),
Datum::Bytes(&[26, 27]),
Datum::String("28"),
Datum::from(Numeric::from(29)),
Datum::from(Numeric::infinity()),
Datum::from(-Numeric::infinity()),
Datum::from(Numeric::nan()),
Datum::JsonNull,
Datum::Uuid(Uuid::from_u128(30)),
Datum::Dummy,
Datum::Null,
]);
packer
.push_array(
&[ArrayDimension {
lower_bound: 2,
length: 2,
}],
vec![Datum::Int32(31), Datum::Int32(32)],
)
.expect("valid array");
packer.push_list_with(|packer| {
packer.push(Datum::String("33"));
packer.push_list_with(|packer| {
packer.push(Datum::String("34"));
packer.push(Datum::String("35"));
});
packer.push(Datum::String("36"));
packer.push(Datum::String("37"));
});
packer.push_dict_with(|row| {
let mut i = 38;
for _ in 0..20 {
row.push(Datum::String(&i.to_string()));
row.push(Datum::Int32(i + 1));
i += 2;
}
});
let mut desc = RelationDesc::builder();
for (idx, _) in row.iter().enumerate() {
desc = desc.with_column(idx.to_string(), ScalarType::Int32.nullable(true));
}
let desc = desc.finish();
let encoded = row.encode_to_vec();
assert_eq!(Row::decode(&encoded, &desc), Ok(row));
}
}