#![allow(missing_docs)] use std::fmt::Debug;
use anyhow::Context;
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
Int8Array, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use mz_ore::cast::CastFrom;
use mz_ore::metrics::{IntCounter, MetricsRegistry};
use mz_ore::{assert_none, metric};
use mz_proto::{ProtoType, RustType, TryFromProtoError};
use proptest::prelude::*;
use proptest::strategy::{Strategy, Union};
use proptest_derive::Arbitrary;
use prost::Message;
use crate::columnar::{ColumnDecoder, Schema};
use crate::part::Part;
use crate::stats::bytes::any_bytes_stats;
use crate::stats::primitive::any_primitive_stats;
pub mod bytes;
pub mod json;
pub mod primitive;
pub mod structured;
pub use bytes::{AtomicBytesStats, BytesStats, FixedSizeBytesStats, FixedSizeBytesStatsKind};
pub use json::{JsonMapElementStats, JsonStats};
pub use primitive::{
truncate_bytes, truncate_string, PrimitiveStats, PrimitiveStatsVariants, TruncateBound,
TRUNCATE_LEN,
};
pub use structured::StructStats;
include!(concat!(env!("OUT_DIR"), "/mz_persist_types.stats.rs"));
#[derive(Debug, Clone)]
pub struct ColumnarStats {
pub nulls: Option<ColumnNullStats>,
pub values: ColumnStatKinds,
}
impl ColumnarStats {
pub fn one_column_struct(len: usize, col: ColumnStatKinds) -> StructStats {
let col = ColumnarStats {
nulls: None,
values: col,
};
StructStats {
len,
cols: [("".to_owned(), col)].into_iter().collect(),
}
}
pub fn as_non_null_values(&self) -> Option<&ColumnStatKinds> {
match self.nulls {
None => Some(&self.values),
Some(_) => None,
}
}
pub fn into_non_null_values(self) -> Option<ColumnStatKinds> {
match self.nulls {
None => Some(self.values),
Some(_) => None,
}
}
pub fn into_struct_stats(self) -> Option<StructStats> {
match self.into_non_null_values()? {
ColumnStatKinds::Struct(stats) => Some(stats),
_ => None,
}
}
fn try_as_stats<'a, T, F>(&'a self, map: F) -> Result<T, anyhow::Error>
where
F: FnOnce(&'a ColumnStatKinds) -> Result<T, anyhow::Error>,
{
let inner = map(&self.values)?;
match self.nulls {
Some(nulls) => Err(anyhow::anyhow!(
"expected non-nullable stats, found nullable {nulls:?}"
)),
None => Ok(inner),
}
}
fn try_as_option_stats<'a, T, F>(&'a self, map: F) -> Result<OptionStats<T>, anyhow::Error>
where
F: FnOnce(&'a ColumnStatKinds) -> Result<T, anyhow::Error>,
{
let inner = map(&self.values)?;
match self.nulls {
Some(nulls) => Ok(OptionStats {
none: nulls.count,
some: inner,
}),
None => Err(anyhow::anyhow!(
"expected nullable stats, found non-nullable"
)),
}
}
pub fn try_as_optional_struct(&self) -> Result<OptionStats<&StructStats>, anyhow::Error> {
self.try_as_option_stats(|values| match values {
ColumnStatKinds::Struct(inner) => Ok(inner),
other => anyhow::bail!("expected StructStats found {other:?}"),
})
}
pub fn try_as_optional_bytes(&self) -> Result<OptionStats<&BytesStats>, anyhow::Error> {
self.try_as_option_stats(|values| match values {
ColumnStatKinds::Bytes(inner) => Ok(inner),
other => anyhow::bail!("expected BytesStats found {other:?}"),
})
}
pub fn try_as_string(&self) -> Result<&PrimitiveStats<String>, anyhow::Error> {
self.try_as_stats(|values| match values {
ColumnStatKinds::Primitive(PrimitiveStatsVariants::String(inner)) => Ok(inner),
other => anyhow::bail!("expected PrimitiveStats<String> found {other:?}"),
})
}
}
impl DynStats for ColumnarStats {
fn debug_json(&self) -> serde_json::Value {
let value_json = self.values.debug_json();
match (&self.nulls, value_json) {
(Some(nulls), serde_json::Value::Object(mut x)) => {
if nulls.count > 0 {
x.insert("nulls".to_owned(), nulls.count.into());
}
serde_json::Value::Object(x)
}
(Some(nulls), x) => {
serde_json::json!({"nulls": nulls.count, "not nulls": x})
}
(None, x) => x,
}
}
fn into_columnar_stats(self) -> ColumnarStats {
self
}
}
#[derive(Debug, Copy, Clone)]
pub struct ColumnNullStats {
pub count: usize,
}
impl RustType<ProtoOptionStats> for ColumnNullStats {
fn into_proto(&self) -> ProtoOptionStats {
ProtoOptionStats {
none: u64::cast_from(self.count),
}
}
fn from_proto(proto: ProtoOptionStats) -> Result<Self, TryFromProtoError> {
Ok(ColumnNullStats {
count: usize::cast_from(proto.none),
})
}
}
#[derive(Debug, Clone)]
pub enum ColumnStatKinds {
Primitive(PrimitiveStatsVariants),
Struct(StructStats),
Bytes(BytesStats),
None,
}
impl DynStats for ColumnStatKinds {
fn debug_json(&self) -> serde_json::Value {
match self {
ColumnStatKinds::Primitive(prim) => prim.debug_json(),
ColumnStatKinds::Struct(x) => x.debug_json(),
ColumnStatKinds::Bytes(bytes) => bytes.debug_json(),
ColumnStatKinds::None => NoneStats.debug_json(),
}
}
fn into_columnar_stats(self) -> ColumnarStats {
ColumnarStats {
nulls: None,
values: self,
}
}
}
impl RustType<proto_dyn_stats::Kind> for ColumnStatKinds {
fn into_proto(&self) -> proto_dyn_stats::Kind {
match self {
ColumnStatKinds::Primitive(prim) => {
proto_dyn_stats::Kind::Primitive(RustType::into_proto(prim))
}
ColumnStatKinds::Struct(x) => proto_dyn_stats::Kind::Struct(RustType::into_proto(x)),
ColumnStatKinds::Bytes(bytes) => {
proto_dyn_stats::Kind::Bytes(RustType::into_proto(bytes))
}
ColumnStatKinds::None => proto_dyn_stats::Kind::None(()),
}
}
fn from_proto(proto: proto_dyn_stats::Kind) -> Result<Self, TryFromProtoError> {
let stats = match proto {
proto_dyn_stats::Kind::Primitive(prim) => ColumnStatKinds::Primitive(prim.into_rust()?),
proto_dyn_stats::Kind::Struct(x) => ColumnStatKinds::Struct(x.into_rust()?),
proto_dyn_stats::Kind::Bytes(bytes) => ColumnStatKinds::Bytes(bytes.into_rust()?),
proto_dyn_stats::Kind::None(_) => ColumnStatKinds::None,
};
Ok(stats)
}
}
impl<T: Into<PrimitiveStatsVariants>> From<T> for ColumnStatKinds {
fn from(value: T) -> Self {
ColumnStatKinds::Primitive(value.into())
}
}
impl From<PrimitiveStats<Vec<u8>>> for ColumnStatKinds {
fn from(value: PrimitiveStats<Vec<u8>>) -> Self {
ColumnStatKinds::Bytes(BytesStats::Primitive(value))
}
}
impl From<StructStats> for ColumnStatKinds {
fn from(value: StructStats) -> Self {
ColumnStatKinds::Struct(value)
}
}
impl From<BytesStats> for ColumnStatKinds {
fn from(value: BytesStats) -> Self {
ColumnStatKinds::Bytes(value)
}
}
#[derive(Debug)]
pub struct PartStatsMetrics {
pub mismatched_count: IntCounter,
}
impl PartStatsMetrics {
pub fn new(registry: &MetricsRegistry) -> Self {
PartStatsMetrics {
mismatched_count: registry.register(metric!(
name: "mz_persist_pushdown_parts_mismatched_stats_count",
help: "number of parts read with unexpectedly the incorrect type of stats",
)),
}
}
}
pub trait ColumnStats: DynStats {
type Ref<'a>
where
Self: 'a;
fn lower<'a>(&'a self) -> Option<Self::Ref<'a>>;
fn upper<'a>(&'a self) -> Option<Self::Ref<'a>>;
fn none_count(&self) -> usize;
}
pub trait ColumnarStatsBuilder<T>: Debug + DynStats {
type ArrowColumn: arrow::array::Array + 'static;
fn from_column(col: &Self::ArrowColumn) -> Self
where
Self: Sized;
}
pub trait DynStats: Debug + Send + Sync + 'static {
fn type_name(&self) -> &'static str {
std::any::type_name::<Self>()
}
fn debug_json(&self) -> serde_json::Value;
fn into_columnar_stats(self) -> ColumnarStats;
}
pub trait TrimStats: Message {
fn trim(&mut self);
}
#[derive(Arbitrary, Debug)]
pub struct PartStats {
pub key: StructStats,
}
impl serde::Serialize for PartStats {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
let PartStats { key } = self;
key.serialize(s)
}
}
impl PartStats {
pub fn new<T, K>(part: &Part, desc: &K) -> Result<Self, anyhow::Error>
where
K: Schema<T, Statistics = StructStats>,
{
let decoder = K::decoder_any(desc, &part.key).context("decoder_any")?;
let stats = decoder.stats();
Ok(PartStats { key: stats })
}
}
pub struct OptionStats<T> {
pub some: T,
pub none: usize,
}
impl<T: DynStats> Debug for OptionStats<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self.debug_json(), f)
}
}
impl<T: DynStats> DynStats for OptionStats<T> {
fn debug_json(&self) -> serde_json::Value {
match self.some.debug_json() {
serde_json::Value::Object(mut x) => {
if self.none > 0 {
x.insert("nulls".to_owned(), self.none.into());
}
serde_json::Value::Object(x)
}
s => {
serde_json::json!({"nulls": self.none, "not nulls": s})
}
}
}
fn into_columnar_stats(self) -> ColumnarStats {
let inner = self.some.into_columnar_stats();
assert_none!(inner.nulls, "we don't support nested OptionStats");
ColumnarStats {
nulls: Some(ColumnNullStats { count: self.none }),
values: inner.values,
}
}
}
#[derive(Debug)]
#[cfg_attr(any(test), derive(Clone))]
pub struct NoneStats;
impl DynStats for NoneStats {
fn debug_json(&self) -> serde_json::Value {
serde_json::Value::String(format!("{self:?}"))
}
fn into_columnar_stats(self) -> ColumnarStats {
ColumnarStats {
nulls: None,
values: ColumnStatKinds::None,
}
}
}
impl ColumnStats for NoneStats {
type Ref<'a> = ();
fn lower<'a>(&'a self) -> Option<Self::Ref<'a>> {
None
}
fn upper<'a>(&'a self) -> Option<Self::Ref<'a>> {
None
}
fn none_count(&self) -> usize {
0
}
}
impl ColumnStats for OptionStats<NoneStats> {
type Ref<'a> = Option<()>;
fn lower<'a>(&'a self) -> Option<Self::Ref<'a>> {
None
}
fn upper<'a>(&'a self) -> Option<Self::Ref<'a>> {
None
}
fn none_count(&self) -> usize {
self.none
}
}
impl RustType<()> for NoneStats {
fn into_proto(&self) -> () {
()
}
fn from_proto(_proto: ()) -> Result<Self, TryFromProtoError> {
Ok(NoneStats)
}
}
macro_rules! primitive_stats {
($native:ty, $arrow_col:ty, $min_fn:path, $max_fn:path) => {
impl ColumnarStatsBuilder<$native> for PrimitiveStats<$native> {
type ArrowColumn = $arrow_col;
fn from_column(col: &Self::ArrowColumn) -> Self
where
Self: Sized,
{
let lower = $min_fn(col).unwrap_or_default();
let upper = $max_fn(col).unwrap_or_default();
PrimitiveStats { lower, upper }
}
}
};
}
primitive_stats!(
bool,
BooleanArray,
arrow::compute::min_boolean,
arrow::compute::max_boolean
);
primitive_stats!(u8, UInt8Array, arrow::compute::min, arrow::compute::max);
primitive_stats!(u16, UInt16Array, arrow::compute::min, arrow::compute::max);
primitive_stats!(u32, UInt32Array, arrow::compute::min, arrow::compute::max);
primitive_stats!(u64, UInt64Array, arrow::compute::min, arrow::compute::max);
primitive_stats!(i8, Int8Array, arrow::compute::min, arrow::compute::max);
primitive_stats!(i16, Int16Array, arrow::compute::min, arrow::compute::max);
primitive_stats!(i32, Int32Array, arrow::compute::min, arrow::compute::max);
primitive_stats!(i64, Int64Array, arrow::compute::min, arrow::compute::max);
primitive_stats!(f32, Float32Array, arrow::compute::min, arrow::compute::max);
primitive_stats!(f64, Float64Array, arrow::compute::min, arrow::compute::max);
impl ColumnarStatsBuilder<&str> for PrimitiveStats<String> {
type ArrowColumn = StringArray;
fn from_column(col: &Self::ArrowColumn) -> Self
where
Self: Sized,
{
let lower = arrow::compute::min_string(col).unwrap_or_default();
let lower = truncate_string(lower, TRUNCATE_LEN, TruncateBound::Lower)
.expect("lower bound should always truncate");
let upper = arrow::compute::max_string(col).unwrap_or_default();
let upper = truncate_string(upper, TRUNCATE_LEN, TruncateBound::Upper)
.unwrap_or_else(|| upper.to_owned());
PrimitiveStats { lower, upper }
}
}
impl ColumnarStatsBuilder<&[u8]> for PrimitiveStats<Vec<u8>> {
type ArrowColumn = BinaryArray;
fn from_column(col: &Self::ArrowColumn) -> Self
where
Self: Sized,
{
let lower = arrow::compute::min_binary(col).unwrap_or_default();
let lower = truncate_bytes(lower, TRUNCATE_LEN, TruncateBound::Lower)
.expect("lower bound should always truncate");
let upper = arrow::compute::max_binary(col).unwrap_or_default();
let upper = truncate_bytes(upper, TRUNCATE_LEN, TruncateBound::Upper)
.unwrap_or_else(|| upper.to_owned());
PrimitiveStats { lower, upper }
}
}
pub fn trim_to_budget(
stats: &mut ProtoStructStats,
budget: usize,
force_keep_col: impl Fn(&str) -> bool,
) -> usize {
let original_cost = stats.encoded_len();
if original_cost <= budget {
return 0;
}
stats.trim();
let new_cost = stats.encoded_len();
if new_cost <= budget {
return original_cost.saturating_sub(new_cost);
}
let mut budget_shortfall = new_cost.saturating_sub(budget);
trim_to_budget_struct(stats, &mut budget_shortfall, &force_keep_col);
original_cost.saturating_sub(stats.encoded_len())
}
fn trim_to_budget_struct(
stats: &mut ProtoStructStats,
budget_shortfall: &mut usize,
force_keep_col: &impl Fn(&str) -> bool,
) {
let mut col_costs: Vec<_> = stats
.cols
.iter()
.map(|(name, stats)| (name.to_owned(), stats.encoded_len()))
.collect();
col_costs.sort_unstable_by_key(|(_, c)| *c);
while *budget_shortfall > 0 {
let Some((name, cost)) = col_costs.pop() else {
break;
};
if force_keep_col(&name) {
continue;
}
let col_stats = stats.cols.get_mut(&name).expect("col exists");
match &mut col_stats.kind {
Some(proto_dyn_stats::Kind::Struct(col_struct)) => {
trim_to_budget_struct(col_struct, budget_shortfall, force_keep_col);
if *budget_shortfall == 0 {
break;
}
if !col_struct.cols.is_empty() {
continue;
}
*budget_shortfall = budget_shortfall.saturating_sub(col_struct.encoded_len() + 1);
stats.cols.remove(&name);
}
Some(proto_dyn_stats::Kind::Bytes(ProtoBytesStats {
kind:
Some(proto_bytes_stats::Kind::Json(ProtoJsonStats {
kind: Some(proto_json_stats::Kind::Maps(col_jsonb)),
})),
})) => {
trim_to_budget_jsonb(col_jsonb, budget_shortfall, force_keep_col);
if *budget_shortfall == 0 {
break;
}
if !col_jsonb.elements.is_empty() {
continue;
}
*budget_shortfall = budget_shortfall.saturating_sub(col_jsonb.encoded_len() + 1);
stats.cols.remove(&name);
}
_ => {
stats.cols.remove(&name);
*budget_shortfall = budget_shortfall.saturating_sub(cost + 1);
}
}
}
}
fn trim_to_budget_jsonb(
stats: &mut ProtoJsonMapStats,
budget_shortfall: &mut usize,
force_keep_col: &impl Fn(&str) -> bool,
) {
stats
.elements
.sort_unstable_by_key(|element| element.encoded_len());
let mut stats_to_keep = Vec::with_capacity(stats.elements.len());
while *budget_shortfall > 0 {
let Some(mut column) = stats.elements.pop() else {
break;
};
if force_keep_col(&column.name) {
stats_to_keep.push(column);
continue;
}
if let Some(ProtoJsonStats {
kind: Some(proto_json_stats::Kind::Maps(ref mut col_jsonb)),
}) = column.stats
{
trim_to_budget_jsonb(col_jsonb, budget_shortfall, force_keep_col);
if !col_jsonb.elements.is_empty() {
stats_to_keep.push(column);
}
if *budget_shortfall == 0 {
break;
}
} else {
*budget_shortfall = budget_shortfall.saturating_sub(column.encoded_len() + 1);
}
}
stats.elements.extend(stats_to_keep);
}
impl RustType<ProtoDynStats> for ColumnarStats {
fn into_proto(&self) -> ProtoDynStats {
let option = self.nulls.as_ref().map(|n| n.into_proto());
let kind = RustType::into_proto(&self.values);
ProtoDynStats {
option,
kind: Some(kind),
}
}
fn from_proto(proto: ProtoDynStats) -> Result<Self, TryFromProtoError> {
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoDynStats::kind"))?;
Ok(ColumnarStats {
nulls: proto.option.into_rust()?,
values: kind.into_rust()?,
})
}
}
pub(crate) fn any_columnar_stats() -> impl Strategy<Value = ColumnarStats> {
let leaf = Union::new(vec![
any_primitive_stats::<bool>()
.prop_map(|s| ColumnStatKinds::Primitive(s.into()))
.boxed(),
any_primitive_stats::<i64>()
.prop_map(|s| ColumnStatKinds::Primitive(s.into()))
.boxed(),
any_primitive_stats::<String>()
.prop_map(|s| ColumnStatKinds::Primitive(s.into()))
.boxed(),
any_bytes_stats().prop_map(ColumnStatKinds::Bytes).boxed(),
])
.prop_map(|values| ColumnarStats {
nulls: None,
values,
});
leaf.prop_recursive(2, 10, 3, |inner| {
(
any::<usize>(),
proptest::collection::btree_map(any::<String>(), inner, 0..3),
)
.prop_map(|(len, cols)| {
let values = ColumnStatKinds::Struct(StructStats { len, cols });
ColumnarStats {
nulls: None,
values,
}
})
})
}