#![allow(missing_docs)] use std::any::Any;
use std::collections::BTreeMap;
use std::fmt::Debug;
use mz_ore::metric;
use mz_ore::metrics::{IntCounter, MetricsRegistry};
use proptest_derive::Arbitrary;
use prost::Message;
use serde::ser::{SerializeMap, SerializeStruct};
use serde_json::json;
use crate::columnar::Data;
use crate::dyn_col::DynColumnRef;
use crate::dyn_struct::ValidityRef;
use crate::part::Part;
use crate::stats::impls::any_struct_stats_cols;
use crate::timestamp::try_parse_monotonic_iso8601_timestamp;
include!(concat!(env!("OUT_DIR"), "/mz_persist_types.stats.rs"));
#[derive(Debug)]
pub struct PartStatsMetrics {
pub mismatched_count: IntCounter,
}
impl PartStatsMetrics {
pub fn new(registry: &MetricsRegistry) -> Self {
PartStatsMetrics {
mismatched_count: registry.register(metric!(
name: "mz_persist_pushdown_parts_mismatched_stats_count",
help: "number of parts read with unexpectedly the incorrect type of stats",
)),
}
}
}
pub enum StatsFn {
Default,
Custom(fn(&DynColumnRef, ValidityRef) -> Result<Box<dyn DynStats>, String>),
}
#[cfg(debug_assertions)]
impl PartialEq for StatsFn {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(StatsFn::Default, StatsFn::Default) => true,
(StatsFn::Custom(s), StatsFn::Custom(o)) => {
let s: fn(&'static DynColumnRef, ValidityRef) -> Result<Box<dyn DynStats>, String> =
*s;
let o: fn(&'static DynColumnRef, ValidityRef) -> Result<Box<dyn DynStats>, String> =
*o;
s == o
}
(StatsFn::Default, StatsFn::Custom(_)) | (StatsFn::Custom(_), StatsFn::Default) => {
false
}
}
}
}
impl std::fmt::Debug for StatsFn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Default => write!(f, "Default"),
Self::Custom(_) => f.debug_struct("Custom").finish_non_exhaustive(),
}
}
}
pub trait ColumnStats<T: Data>: DynStats {
fn lower<'a>(&'a self) -> Option<T::Ref<'a>>;
fn upper<'a>(&'a self) -> Option<T::Ref<'a>>;
fn none_count(&self) -> usize;
}
pub trait StatsFrom<T> {
fn stats_from(col: &T, validity: ValidityRef) -> Self;
}
pub trait DynStats: Debug + Send + Sync + 'static {
fn as_any(&self) -> &dyn Any;
fn type_name(&self) -> &'static str {
std::any::type_name::<Self>()
}
fn into_proto(&self) -> ProtoDynStats;
fn debug_json(&self) -> serde_json::Value;
}
#[derive(Arbitrary, Debug)]
pub struct PartStats {
pub key: StructStats,
}
impl serde::Serialize for PartStats {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
let PartStats { key } = self;
key.serialize(s)
}
}
impl PartStats {
pub fn new(part: &Part) -> Result<Self, String> {
let key = part.key_stats()?;
Ok(PartStats { key })
}
}
#[cfg_attr(any(test), derive(Clone))]
pub struct PrimitiveStats<T> {
pub lower: T,
pub upper: T,
}
pub struct OptionStats<T> {
pub some: T,
pub none: usize,
}
#[derive(Arbitrary, Default)]
pub struct StructStats {
pub len: usize,
#[proptest(strategy = "any_struct_stats_cols()")]
pub cols: BTreeMap<String, Box<dyn DynStats>>,
}
impl std::fmt::Debug for StructStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.debug_json(), f)
}
}
impl serde::Serialize for StructStats {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
let StructStats { len, cols } = self;
let mut s = s.serialize_struct("StructStats", 2)?;
let () = s.serialize_field("len", len)?;
let () = s.serialize_field("cols", &DynStatsCols(cols))?;
s.end()
}
}
struct DynStatsCols<'a>(&'a BTreeMap<String, Box<dyn DynStats>>);
impl serde::Serialize for DynStatsCols<'_> {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
let mut s = s.serialize_map(Some(self.0.len()))?;
for (k, v) in self.0.iter() {
let v = v.debug_json();
let () = s.serialize_entry(k, &v)?;
}
s.end()
}
}
impl StructStats {
pub fn col<T: Data>(&self, name: &str) -> Result<Option<&T::Stats>, String> {
let Some(stats) = self.cols.get(name) else {
return Ok(None);
};
match stats.as_any().downcast_ref() {
Some(x) => Ok(Some(x)),
None => Err(format!(
"expected stats type {} got {}",
std::any::type_name::<T::Stats>(),
stats.type_name()
)),
}
}
}
#[cfg_attr(any(test), derive(Clone))]
pub enum JsonStats {
None,
Mixed,
JsonNulls,
Bools(PrimitiveStats<bool>),
Strings(PrimitiveStats<String>),
Numerics(PrimitiveStats<Vec<u8>>),
Lists,
Maps(BTreeMap<String, JsonMapElementStats>),
}
#[derive(Default)]
#[cfg_attr(any(test), derive(Clone))]
pub struct JsonMapElementStats {
pub len: usize,
pub stats: JsonStats,
}
impl Default for JsonStats {
fn default() -> Self {
JsonStats::None
}
}
impl Debug for JsonStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self.debug_json(), f)
}
}
impl JsonStats {
pub fn debug_json(&self) -> serde_json::Value {
match self {
JsonStats::None => json!({}),
JsonStats::Mixed => "json_mixed".into(),
JsonStats::JsonNulls => "json_nulls".into(),
JsonStats::Bools(x) => x.debug_json(),
JsonStats::Strings(x) => x.debug_json(),
JsonStats::Numerics(x) => x.debug_json(),
JsonStats::Lists => "json_lists".into(),
JsonStats::Maps(x) => x
.iter()
.map(|(k, v)| (k.clone(), v.debug_json()))
.collect::<serde_json::Map<_, _>>()
.into(),
}
}
}
impl JsonMapElementStats {
pub fn debug_json(&self) -> serde_json::Value {
json!({"len": self.len, "stats": self.stats.debug_json()})
}
}
pub enum BytesStats {
Primitive(PrimitiveStats<Vec<u8>>),
Json(JsonStats),
Atomic(AtomicBytesStats),
}
#[derive(Debug)]
#[cfg_attr(any(test), derive(Clone))]
pub struct AtomicBytesStats {
pub lower: Vec<u8>,
pub upper: Vec<u8>,
}
impl AtomicBytesStats {
fn debug_json(&self) -> serde_json::Value {
serde_json::json!({
"lower": hex::encode(&self.lower),
"upper": hex::encode(&self.upper),
})
}
}
#[derive(Debug)]
#[cfg_attr(any(test), derive(Clone))]
pub struct NoneStats;
pub const TRUNCATE_LEN: usize = 100;
pub enum TruncateBound {
Lower,
Upper,
}
pub fn truncate_bytes(x: &[u8], max_len: usize, bound: TruncateBound) -> Option<Vec<u8>> {
if x.len() <= max_len {
return Some(x.to_owned());
}
match bound {
TruncateBound::Lower => Some(x[..max_len].to_owned()),
TruncateBound::Upper => {
for idx in (0..max_len).rev() {
if x[idx] < u8::MAX {
let mut ret = x[..=idx].to_owned();
ret[idx] += 1;
return Some(ret);
}
}
None
}
}
}
pub fn truncate_string(x: &str, max_len: usize, bound: TruncateBound) -> Option<String> {
if x.len() <= max_len {
return Some(x.to_owned());
}
let truncation_idx = x
.char_indices()
.map(|(idx, c)| idx + c.len_utf8())
.take_while(|char_end| *char_end <= max_len)
.last()
.unwrap_or(0);
let truncated = &x[..truncation_idx];
match bound {
TruncateBound::Lower => Some(truncated.to_owned()),
TruncateBound::Upper => {
for (idx, c) in truncated.char_indices().rev() {
if let Ok(new_last_char) = char::try_from(u32::from(c) + 1) {
let mut ret = String::with_capacity(idx + new_last_char.len_utf8());
ret.push_str(&truncated[..idx]);
ret.push(new_last_char);
return Some(ret);
}
}
None
}
}
}
pub trait TrimStats: Message {
fn trim(&mut self);
}
impl TrimStats for ProtoPrimitiveStats {
fn trim(&mut self) {
use proto_primitive_stats::*;
match (&mut self.lower, &mut self.upper) {
(Some(Lower::LowerString(lower)), Some(Upper::UpperString(upper))) => {
if try_parse_monotonic_iso8601_timestamp(lower).is_some()
&& try_parse_monotonic_iso8601_timestamp(upper).is_some()
{
return;
}
let common_prefix = lower
.char_indices()
.zip(upper.chars())
.take_while(|((_, x), y)| x == y)
.last();
if let Some(((o, x), y)) = common_prefix {
let new_len = o + std::cmp::max(x.len_utf8(), y.len_utf8());
*lower = truncate_string(lower, new_len, TruncateBound::Lower)
.expect("lower bound should always truncate");
if let Some(new_upper) = truncate_string(upper, new_len, TruncateBound::Upper) {
*upper = new_upper;
}
}
}
_ => {}
}
}
}
impl TrimStats for ProtoPrimitiveBytesStats {
fn trim(&mut self) {
let common_prefix = self
.lower
.iter()
.zip(self.upper.iter())
.take_while(|(x, y)| x == y)
.count();
self.lower = truncate_bytes(&self.lower, common_prefix + 1, TruncateBound::Lower)
.expect("lower bound should always truncate");
if let Some(upper) = truncate_bytes(&self.upper, common_prefix + 1, TruncateBound::Upper) {
self.upper = upper;
}
}
}
impl TrimStats for ProtoJsonStats {
fn trim(&mut self) {
use proto_json_stats::*;
match &mut self.kind {
Some(Kind::Strings(stats)) => {
stats.trim();
}
Some(Kind::Maps(stats)) => {
for value in &mut stats.elements {
if let Some(stats) = &mut value.stats {
stats.trim();
}
}
}
Some(
Kind::None(_)
| Kind::Mixed(_)
| Kind::JsonNulls(_)
| Kind::Bools(_)
| Kind::Numerics(_)
| Kind::Lists(_),
) => {}
None => {}
}
}
}
impl TrimStats for ProtoBytesStats {
fn trim(&mut self) {
use proto_bytes_stats::*;
match &mut self.kind {
Some(Kind::Primitive(stats)) => stats.trim(),
Some(Kind::Json(stats)) => stats.trim(),
Some(Kind::Atomic(_)) => {}
None => {}
}
}
}
impl TrimStats for ProtoStructStats {
fn trim(&mut self) {
use proto_dyn_stats::*;
for value in self.cols.values_mut() {
match &mut value.kind {
Some(Kind::Primitive(stats)) => stats.trim(),
Some(Kind::Bytes(stats)) => stats.trim(),
Some(Kind::Struct(stats)) => stats.trim(),
Some(Kind::None(())) => (),
None => {}
}
}
}
}
pub fn trim_to_budget(
stats: &mut ProtoStructStats,
budget: usize,
force_keep_col: impl Fn(&str) -> bool,
) -> usize {
let original_cost = stats.encoded_len();
if original_cost <= budget {
return 0;
}
stats.trim();
let new_cost = stats.encoded_len();
if new_cost <= budget {
return original_cost.saturating_sub(new_cost);
}
let mut budget_shortfall = new_cost.saturating_sub(budget);
trim_to_budget_struct(stats, &mut budget_shortfall, &force_keep_col);
original_cost.saturating_sub(stats.encoded_len())
}
fn trim_to_budget_struct(
stats: &mut ProtoStructStats,
budget_shortfall: &mut usize,
force_keep_col: &impl Fn(&str) -> bool,
) {
let mut col_costs: Vec<_> = stats
.cols
.iter()
.map(|(name, stats)| (name.to_owned(), stats.encoded_len()))
.collect();
col_costs.sort_unstable_by_key(|(_, c)| *c);
while *budget_shortfall > 0 {
let Some((name, cost)) = col_costs.pop() else {
break;
};
if force_keep_col(&name) {
continue;
}
let col_stats = stats.cols.get_mut(&name).expect("col exists");
match &mut col_stats.kind {
Some(proto_dyn_stats::Kind::Struct(col_struct)) => {
trim_to_budget_struct(col_struct, budget_shortfall, force_keep_col);
if *budget_shortfall == 0 {
break;
}
if !col_struct.cols.is_empty() {
continue;
}
*budget_shortfall = budget_shortfall.saturating_sub(col_struct.encoded_len() + 1);
stats.cols.remove(&name);
}
Some(proto_dyn_stats::Kind::Bytes(ProtoBytesStats {
kind:
Some(proto_bytes_stats::Kind::Json(ProtoJsonStats {
kind: Some(proto_json_stats::Kind::Maps(col_jsonb)),
})),
})) => {
trim_to_budget_jsonb(col_jsonb, budget_shortfall, force_keep_col);
if *budget_shortfall == 0 {
break;
}
if !col_jsonb.elements.is_empty() {
continue;
}
*budget_shortfall = budget_shortfall.saturating_sub(col_jsonb.encoded_len() + 1);
stats.cols.remove(&name);
}
_ => {
stats.cols.remove(&name);
*budget_shortfall = budget_shortfall.saturating_sub(cost + 1);
}
}
}
}
fn trim_to_budget_jsonb(
stats: &mut ProtoJsonMapStats,
budget_shortfall: &mut usize,
force_keep_col: &impl Fn(&str) -> bool,
) {
stats
.elements
.sort_unstable_by_key(|element| element.encoded_len());
let mut stats_to_keep = Vec::with_capacity(stats.elements.len());
while *budget_shortfall > 0 {
let Some(mut column) = stats.elements.pop() else {
break;
};
if force_keep_col(&column.name) {
stats_to_keep.push(column);
continue;
}
if let Some(ProtoJsonStats {
kind: Some(proto_json_stats::Kind::Maps(ref mut col_jsonb)),
}) = column.stats
{
trim_to_budget_jsonb(col_jsonb, budget_shortfall, force_keep_col);
if !col_jsonb.elements.is_empty() {
stats_to_keep.push(column);
}
if *budget_shortfall == 0 {
break;
}
} else {
*budget_shortfall = budget_shortfall.saturating_sub(column.encoded_len() + 1);
}
}
stats.elements.extend(stats_to_keep);
}
mod impls {
use std::any::Any;
use std::collections::BTreeMap;
use std::fmt::Debug;
use arrow2::array::{BinaryArray, BooleanArray, PrimitiveArray, Utf8Array};
use arrow2::bitmap::Bitmap;
use arrow2::buffer::Buffer;
use arrow2::compute::aggregate::SimdOrd;
use arrow2::types::simd::Simd;
use arrow2::types::NativeType;
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use proptest::strategy::Union;
use proptest::{collection, prelude::*};
use serde::Serialize;
use crate::columnar::Data;
use crate::dyn_struct::{DynStruct, DynStructCol, ValidityRef};
use crate::stats::{
proto_bytes_stats, proto_dyn_stats, proto_json_stats, proto_primitive_stats,
truncate_bytes, truncate_string, AtomicBytesStats, BytesStats, ColumnStats, DynStats,
JsonMapElementStats, JsonStats, NoneStats, OptionStats, PrimitiveStats,
ProtoAtomicBytesStats, ProtoBytesStats, ProtoDynStats, ProtoJsonMapElementStats,
ProtoJsonMapStats, ProtoJsonStats, ProtoOptionStats, ProtoPrimitiveBytesStats,
ProtoPrimitiveStats, ProtoStructStats, StatsFrom, StructStats, TruncateBound, TRUNCATE_LEN,
};
impl<T: Serialize> Debug for PrimitiveStats<T>
where
PrimitiveStats<T>: RustType<ProtoPrimitiveStats>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let l = serde_json::to_value(&self.lower).expect("valid json");
let u = serde_json::to_value(&self.upper).expect("valid json");
Debug::fmt(&serde_json::json!({"lower": l, "upper": u}), f)
}
}
impl<T: Serialize> DynStats for PrimitiveStats<T>
where
PrimitiveStats<T>: RustType<ProtoPrimitiveStats> + Send + Sync + 'static,
{
fn as_any(&self) -> &dyn Any {
self
}
fn into_proto(&self) -> ProtoDynStats {
ProtoDynStats {
option: None,
kind: Some(proto_dyn_stats::Kind::Primitive(RustType::into_proto(self))),
}
}
fn debug_json(&self) -> serde_json::Value {
let l = serde_json::to_value(&self.lower).expect("valid json");
let u = serde_json::to_value(&self.upper).expect("valid json");
serde_json::json!({"lower": l, "upper": u})
}
}
impl Debug for PrimitiveStats<Vec<u8>> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self.debug_json(), f)
}
}
impl DynStats for PrimitiveStats<Vec<u8>> {
fn as_any(&self) -> &dyn Any {
self
}
fn into_proto(&self) -> ProtoDynStats {
ProtoDynStats {
option: None,
kind: Some(proto_dyn_stats::Kind::Bytes(ProtoBytesStats {
kind: Some(proto_bytes_stats::Kind::Primitive(RustType::into_proto(
self,
))),
})),
}
}
fn debug_json(&self) -> serde_json::Value {
serde_json::json!({
"lower": hex::encode(&self.lower),
"upper": hex::encode(&self.upper),
})
}
}
impl<T: DynStats> Debug for OptionStats<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self.debug_json(), f)
}
}
impl<T: DynStats> DynStats for OptionStats<T> {
fn as_any(&self) -> &dyn Any {
self
}
fn into_proto(&self) -> ProtoDynStats {
let mut ret = self.some.into_proto();
assert!(ret.option.is_none());
ret.option = Some(ProtoOptionStats {
none: self.none.into_proto(),
});
ret
}
fn debug_json(&self) -> serde_json::Value {
match self.some.debug_json() {
serde_json::Value::Object(mut x) => {
if self.none > 0 {
x.insert("nulls".to_owned(), self.none.into());
}
serde_json::Value::Object(x)
}
s => {
serde_json::json!({"nulls": self.none, "not nulls": s})
}
}
}
}
impl DynStats for StructStats {
fn as_any(&self) -> &dyn Any {
self
}
fn into_proto(&self) -> ProtoDynStats {
ProtoDynStats {
option: None,
kind: Some(proto_dyn_stats::Kind::Struct(RustType::into_proto(self))),
}
}
fn debug_json(&self) -> serde_json::Value {
let mut cols = serde_json::Map::new();
cols.insert("len".to_owned(), self.len.into());
for (name, stats) in self.cols.iter() {
cols.insert(name.clone(), stats.debug_json());
}
cols.into()
}
}
impl Debug for BytesStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self.debug_json(), f)
}
}
impl DynStats for BytesStats {
fn as_any(&self) -> &dyn Any {
self
}
fn into_proto(&self) -> ProtoDynStats {
ProtoDynStats {
option: None,
kind: Some(proto_dyn_stats::Kind::Bytes(RustType::into_proto(self))),
}
}
fn debug_json(&self) -> serde_json::Value {
match self {
BytesStats::Primitive(x) => x.debug_json(),
BytesStats::Json(x) => x.debug_json(),
BytesStats::Atomic(x) => x.debug_json(),
}
}
}
impl DynStats for NoneStats {
fn as_any(&self) -> &dyn Any {
self
}
fn into_proto(&self) -> ProtoDynStats {
ProtoDynStats {
option: None,
kind: Some(proto_dyn_stats::Kind::None(RustType::into_proto(self))),
}
}
fn debug_json(&self) -> serde_json::Value {
serde_json::Value::String(format!("{self:?}"))
}
}
macro_rules! stats_primitive {
($data:ty, $ref:ident) => {
impl ColumnStats<$data> for PrimitiveStats<$data> {
fn lower<'a>(&'a self) -> Option<<$data as Data>::Ref<'a>> {
Some(self.lower.$ref())
}
fn upper<'a>(&'a self) -> Option<<$data as Data>::Ref<'a>> {
Some(self.upper.$ref())
}
fn none_count(&self) -> usize {
0
}
}
impl ColumnStats<Option<$data>> for OptionStats<PrimitiveStats<$data>> {
fn lower<'a>(&'a self) -> Option<<Option<$data> as Data>::Ref<'a>> {
Some(self.some.lower())
}
fn upper<'a>(&'a self) -> Option<<Option<$data> as Data>::Ref<'a>> {
Some(self.some.upper())
}
fn none_count(&self) -> usize {
self.none
}
}
};
}
stats_primitive!(bool, clone);
stats_primitive!(u8, clone);
stats_primitive!(u16, clone);
stats_primitive!(u32, clone);
stats_primitive!(u64, clone);
stats_primitive!(i8, clone);
stats_primitive!(i16, clone);
stats_primitive!(i32, clone);
stats_primitive!(i64, clone);
stats_primitive!(f32, clone);
stats_primitive!(f64, clone);
stats_primitive!(Vec<u8>, as_slice);
stats_primitive!(String, as_str);
impl ColumnStats<Vec<u8>> for BytesStats {
fn lower<'a>(&'a self) -> Option<<Vec<u8> as Data>::Ref<'a>> {
match self {
BytesStats::Primitive(x) => x.lower(),
BytesStats::Json(_) => None,
BytesStats::Atomic(x) => Some(&x.lower),
}
}
fn upper<'a>(&'a self) -> Option<<Vec<u8> as Data>::Ref<'a>> {
match self {
BytesStats::Primitive(x) => x.upper(),
BytesStats::Json(_) => None,
BytesStats::Atomic(x) => Some(&x.upper),
}
}
fn none_count(&self) -> usize {
0
}
}
impl ColumnStats<Option<Vec<u8>>> for OptionStats<BytesStats> {
fn lower<'a>(&'a self) -> Option<<Option<Vec<u8>> as Data>::Ref<'a>> {
self.some.lower().map(Some)
}
fn upper<'a>(&'a self) -> Option<<Option<Vec<u8>> as Data>::Ref<'a>> {
self.some.upper().map(Some)
}
fn none_count(&self) -> usize {
self.none
}
}
impl ColumnStats<DynStruct> for StructStats {
fn lower<'a>(&'a self) -> Option<<DynStruct as Data>::Ref<'a>> {
None
}
fn upper<'a>(&'a self) -> Option<<DynStruct as Data>::Ref<'a>> {
None
}
fn none_count(&self) -> usize {
0
}
}
impl ColumnStats<Option<DynStruct>> for OptionStats<StructStats> {
fn lower<'a>(&'a self) -> Option<<Option<DynStruct> as Data>::Ref<'a>> {
self.some.lower().map(Some)
}
fn upper<'a>(&'a self) -> Option<<Option<DynStruct> as Data>::Ref<'a>> {
self.some.upper().map(Some)
}
fn none_count(&self) -> usize {
self.none
}
}
impl<T: Data> ColumnStats<T> for NoneStats {
fn lower<'a>(&'a self) -> Option<<T as Data>::Ref<'a>> {
None
}
fn upper<'a>(&'a self) -> Option<<T as Data>::Ref<'a>> {
None
}
fn none_count(&self) -> usize {
0
}
}
impl<T> ColumnStats<Option<T>> for OptionStats<NoneStats>
where
Option<T>: Data,
{
fn lower<'a>(&'a self) -> Option<<Option<T> as Data>::Ref<'a>> {
None
}
fn upper<'a>(&'a self) -> Option<<Option<T> as Data>::Ref<'a>> {
None
}
fn none_count(&self) -> usize {
self.none
}
}
impl StatsFrom<Bitmap> for PrimitiveStats<bool> {
fn stats_from(col: &Bitmap, validity: ValidityRef) -> Self {
let array = BooleanArray::new(
arrow2::datatypes::DataType::Boolean,
col.clone(),
validity.0.as_ref().cloned(),
);
let lower = arrow2::compute::aggregate::min_boolean(&array).unwrap_or_default();
let upper = arrow2::compute::aggregate::max_boolean(&array).unwrap_or_default();
PrimitiveStats { lower, upper }
}
}
impl StatsFrom<BooleanArray> for OptionStats<PrimitiveStats<bool>> {
fn stats_from(col: &BooleanArray, validity: ValidityRef) -> Self {
debug_assert!(validity.is_superset(col.validity()));
let lower = arrow2::compute::aggregate::min_boolean(col).unwrap_or_default();
let upper = arrow2::compute::aggregate::max_boolean(col).unwrap_or_default();
let none = col.validity().map_or(0, |x| x.unset_bits());
OptionStats {
none,
some: PrimitiveStats { lower, upper },
}
}
}
impl<T> StatsFrom<Buffer<T>> for PrimitiveStats<T>
where
T: NativeType + Simd,
T::Simd: SimdOrd<T>,
{
fn stats_from(col: &Buffer<T>, validity: ValidityRef) -> Self {
let array = PrimitiveArray::new(
T::PRIMITIVE.into(),
col.clone(),
validity.0.as_ref().cloned(),
);
let lower = arrow2::compute::aggregate::min_primitive::<T>(&array).unwrap_or_default();
let upper = arrow2::compute::aggregate::max_primitive::<T>(&array).unwrap_or_default();
PrimitiveStats { lower, upper }
}
}
impl<T> StatsFrom<PrimitiveArray<T>> for OptionStats<PrimitiveStats<T>>
where
T: Data + NativeType + Simd,
T::Simd: SimdOrd<T>,
{
fn stats_from(col: &PrimitiveArray<T>, validity: ValidityRef) -> Self {
debug_assert!(validity.is_superset(col.validity()));
let lower = arrow2::compute::aggregate::min_primitive::<T>(col).unwrap_or_default();
let upper = arrow2::compute::aggregate::max_primitive::<T>(col).unwrap_or_default();
let none = col.validity().map_or(0, |x| x.unset_bits());
OptionStats {
none,
some: PrimitiveStats { lower, upper },
}
}
}
impl StatsFrom<BinaryArray<i32>> for PrimitiveStats<Vec<u8>> {
fn stats_from(col: &BinaryArray<i32>, validity: ValidityRef) -> Self {
assert!(col.validity().is_none());
let mut array = col.clone();
array.set_validity(validity.0.as_ref().cloned());
let lower = arrow2::compute::aggregate::min_binary(&array).unwrap_or_default();
let lower = truncate_bytes(lower, TRUNCATE_LEN, TruncateBound::Lower)
.expect("lower bound should always truncate");
let upper = arrow2::compute::aggregate::max_binary(&array).unwrap_or_default();
let upper = truncate_bytes(upper, TRUNCATE_LEN, TruncateBound::Upper)
.unwrap_or_else(|| upper.to_owned());
PrimitiveStats { lower, upper }
}
}
impl StatsFrom<BinaryArray<i32>> for OptionStats<PrimitiveStats<Vec<u8>>> {
fn stats_from(col: &BinaryArray<i32>, validity: ValidityRef) -> Self {
debug_assert!(validity.is_superset(col.validity()));
let lower = arrow2::compute::aggregate::min_binary(col).unwrap_or_default();
let lower = truncate_bytes(lower, TRUNCATE_LEN, TruncateBound::Lower)
.expect("lower bound should always truncate");
let upper = arrow2::compute::aggregate::max_binary(col).unwrap_or_default();
let upper = truncate_bytes(upper, TRUNCATE_LEN, TruncateBound::Upper)
.unwrap_or_else(|| upper.to_owned());
let none = col.validity().map_or(0, |x| x.unset_bits());
OptionStats {
none,
some: PrimitiveStats { lower, upper },
}
}
}
impl StatsFrom<BinaryArray<i32>> for BytesStats {
fn stats_from(col: &BinaryArray<i32>, validity: ValidityRef) -> Self {
BytesStats::Primitive(<PrimitiveStats<Vec<u8>>>::stats_from(col, validity))
}
}
impl StatsFrom<BinaryArray<i32>> for OptionStats<BytesStats> {
fn stats_from(col: &BinaryArray<i32>, validity: ValidityRef) -> Self {
let stats = OptionStats::<PrimitiveStats<Vec<u8>>>::stats_from(col, validity);
OptionStats {
none: stats.none,
some: BytesStats::Primitive(stats.some),
}
}
}
impl StatsFrom<Utf8Array<i32>> for PrimitiveStats<String> {
fn stats_from(col: &Utf8Array<i32>, validity: ValidityRef) -> Self {
assert!(col.validity().is_none());
let mut array = col.clone();
array.set_validity(validity.0.as_ref().cloned());
let lower = arrow2::compute::aggregate::min_string(&array).unwrap_or_default();
let lower = truncate_string(lower, TRUNCATE_LEN, TruncateBound::Lower)
.expect("lower bound should always truncate");
let upper = arrow2::compute::aggregate::max_string(&array).unwrap_or_default();
let upper = truncate_string(upper, TRUNCATE_LEN, TruncateBound::Upper)
.unwrap_or_else(|| upper.to_owned());
PrimitiveStats { lower, upper }
}
}
impl StatsFrom<Utf8Array<i32>> for OptionStats<PrimitiveStats<String>> {
fn stats_from(col: &Utf8Array<i32>, validity: ValidityRef) -> Self {
debug_assert!(validity.is_superset(col.validity()));
let lower = arrow2::compute::aggregate::min_string(col).unwrap_or_default();
let lower = truncate_string(lower, TRUNCATE_LEN, TruncateBound::Lower)
.expect("lower bound should always truncate");
let upper = arrow2::compute::aggregate::max_string(col).unwrap_or_default();
let upper = truncate_string(upper, TRUNCATE_LEN, TruncateBound::Upper)
.unwrap_or_else(|| upper.to_owned());
let none = col.validity().map_or(0, |x| x.unset_bits());
OptionStats {
none,
some: PrimitiveStats { lower, upper },
}
}
}
impl StatsFrom<DynStructCol> for StructStats {
fn stats_from(col: &DynStructCol, validity: ValidityRef) -> Self {
assert!(col.validity.is_none());
col.stats(validity).expect("valid stats").some
}
}
impl StatsFrom<DynStructCol> for OptionStats<StructStats> {
fn stats_from(col: &DynStructCol, validity: ValidityRef) -> Self {
debug_assert!(validity.is_superset(col.validity.as_ref()));
col.stats(validity).expect("valid stats")
}
}
impl<T: arrow2::array::Array> StatsFrom<T> for NoneStats {
fn stats_from(col: &T, _validity: ValidityRef) -> Self {
assert!(col.validity().is_none());
NoneStats
}
}
impl<T: arrow2::array::Array> StatsFrom<T> for OptionStats<NoneStats> {
fn stats_from(col: &T, validity: ValidityRef) -> Self {
debug_assert!(validity.is_superset(col.validity()));
let none = col.validity().map_or(0, |x| x.unset_bits());
OptionStats {
none,
some: NoneStats,
}
}
}
impl RustType<ProtoStructStats> for StructStats {
fn into_proto(&self) -> ProtoStructStats {
ProtoStructStats {
len: self.len.into_proto(),
cols: self
.cols
.iter()
.map(|(k, v)| (k.into_proto(), v.into_proto()))
.collect(),
}
}
fn from_proto(proto: ProtoStructStats) -> Result<Self, TryFromProtoError> {
let mut cols = BTreeMap::new();
for (k, v) in proto.cols {
cols.insert(k.into_rust()?, v.into_rust()?);
}
Ok(StructStats {
len: proto.len.into_rust()?,
cols,
})
}
}
impl RustType<ProtoJsonStats> for JsonStats {
fn into_proto(&self) -> ProtoJsonStats {
ProtoJsonStats {
kind: Some(match self {
JsonStats::None => proto_json_stats::Kind::None(()),
JsonStats::Mixed => proto_json_stats::Kind::Mixed(()),
JsonStats::JsonNulls => proto_json_stats::Kind::JsonNulls(()),
JsonStats::Bools(x) => proto_json_stats::Kind::Bools(RustType::into_proto(x)),
JsonStats::Strings(x) => {
proto_json_stats::Kind::Strings(RustType::into_proto(x))
}
JsonStats::Numerics(x) => {
proto_json_stats::Kind::Numerics(RustType::into_proto(x))
}
JsonStats::Lists => proto_json_stats::Kind::Lists(()),
JsonStats::Maps(x) => proto_json_stats::Kind::Maps(ProtoJsonMapStats {
elements: x
.iter()
.map(|(k, v)| ProtoJsonMapElementStats {
name: k.into_proto(),
len: v.len.into_proto(),
stats: Some(RustType::into_proto(&v.stats)),
})
.collect(),
}),
}),
}
}
fn from_proto(proto: ProtoJsonStats) -> Result<Self, TryFromProtoError> {
Ok(match proto.kind {
Some(proto_json_stats::Kind::None(())) => JsonStats::None,
Some(proto_json_stats::Kind::Mixed(())) => JsonStats::Mixed,
Some(proto_json_stats::Kind::JsonNulls(())) => JsonStats::JsonNulls,
Some(proto_json_stats::Kind::Bools(x)) => JsonStats::Bools(x.into_rust()?),
Some(proto_json_stats::Kind::Strings(x)) => JsonStats::Strings(x.into_rust()?),
Some(proto_json_stats::Kind::Numerics(x)) => JsonStats::Numerics(x.into_rust()?),
Some(proto_json_stats::Kind::Lists(())) => JsonStats::Lists,
Some(proto_json_stats::Kind::Maps(x)) => {
let mut elements = BTreeMap::new();
for x in x.elements {
let stats = JsonMapElementStats {
len: x.len.into_rust()?,
stats: x.stats.into_rust_if_some("JsonMapElementStats::stats")?,
};
elements.insert(x.name.into_rust()?, stats);
}
JsonStats::Maps(elements)
}
None => JsonStats::Mixed,
})
}
}
impl RustType<ProtoBytesStats> for BytesStats {
fn into_proto(&self) -> ProtoBytesStats {
let kind = match self {
BytesStats::Primitive(x) => {
proto_bytes_stats::Kind::Primitive(RustType::into_proto(x))
}
BytesStats::Json(x) => proto_bytes_stats::Kind::Json(RustType::into_proto(x)),
BytesStats::Atomic(x) => proto_bytes_stats::Kind::Atomic(RustType::into_proto(x)),
};
ProtoBytesStats { kind: Some(kind) }
}
fn from_proto(proto: ProtoBytesStats) -> Result<Self, TryFromProtoError> {
match proto.kind {
Some(proto_bytes_stats::Kind::Primitive(x)) => Ok(BytesStats::Primitive(
PrimitiveStats::<Vec<u8>>::from_proto(x)?,
)),
Some(proto_bytes_stats::Kind::Json(x)) => {
Ok(BytesStats::Json(JsonStats::from_proto(x)?))
}
Some(proto_bytes_stats::Kind::Atomic(x)) => {
Ok(BytesStats::Atomic(AtomicBytesStats::from_proto(x)?))
}
None => Err(TryFromProtoError::missing_field("ProtoBytesStats::kind")),
}
}
}
impl RustType<ProtoAtomicBytesStats> for AtomicBytesStats {
fn into_proto(&self) -> ProtoAtomicBytesStats {
ProtoAtomicBytesStats {
lower: self.lower.into_proto(),
upper: self.upper.into_proto(),
}
}
fn from_proto(proto: ProtoAtomicBytesStats) -> Result<Self, TryFromProtoError> {
Ok(AtomicBytesStats {
lower: proto.lower.into_rust()?,
upper: proto.upper.into_rust()?,
})
}
}
impl RustType<()> for NoneStats {
fn into_proto(&self) -> () {
()
}
fn from_proto(_proto: ()) -> Result<Self, TryFromProtoError> {
Ok(NoneStats)
}
}
impl RustType<ProtoDynStats> for Box<dyn DynStats> {
fn into_proto(&self) -> ProtoDynStats {
DynStats::into_proto(self.as_ref())
}
fn from_proto(mut proto: ProtoDynStats) -> Result<Self, TryFromProtoError> {
struct BoxFn;
impl DynStatsFn<Box<dyn DynStats>> for BoxFn {
fn call<T: DynStats>(self, t: T) -> Result<Box<dyn DynStats>, TryFromProtoError> {
Ok(Box::new(t))
}
}
struct OptionStatsFn<F>(usize, F);
impl<R, F: DynStatsFn<R>> DynStatsFn<R> for OptionStatsFn<F> {
fn call<T: DynStats>(self, some: T) -> Result<R, TryFromProtoError> {
let OptionStatsFn(none, f) = self;
f.call(OptionStats { none, some })
}
}
match proto.option.take() {
Some(option) => {
let none = option.none.into_rust()?;
dyn_from_proto(proto, OptionStatsFn(none, BoxFn))
}
None => dyn_from_proto(proto, BoxFn),
}
}
}
trait DynStatsFn<R> {
fn call<T: DynStats>(self, t: T) -> Result<R, TryFromProtoError>;
}
fn dyn_from_proto<R, F: DynStatsFn<R>>(
proto: ProtoDynStats,
f: F,
) -> Result<R, TryFromProtoError> {
assert!(proto.option.is_none());
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoDynStats::kind"))?;
match kind {
proto_dyn_stats::Kind::Primitive(x) => match x.lower {
Some(proto_primitive_stats::Lower::LowerBool(_)) => {
f.call(PrimitiveStats::<bool>::from_proto(x)?)
}
Some(proto_primitive_stats::Lower::LowerU8(_)) => {
f.call(PrimitiveStats::<u8>::from_proto(x)?)
}
Some(proto_primitive_stats::Lower::LowerU16(_)) => {
f.call(PrimitiveStats::<u16>::from_proto(x)?)
}
Some(proto_primitive_stats::Lower::LowerU32(_)) => {
f.call(PrimitiveStats::<u32>::from_proto(x)?)
}
Some(proto_primitive_stats::Lower::LowerU64(_)) => {
f.call(PrimitiveStats::<u64>::from_proto(x)?)
}
Some(proto_primitive_stats::Lower::LowerI8(_)) => {
f.call(PrimitiveStats::<i8>::from_proto(x)?)
}
Some(proto_primitive_stats::Lower::LowerI16(_)) => {
f.call(PrimitiveStats::<i16>::from_proto(x)?)
}
Some(proto_primitive_stats::Lower::LowerI32(_)) => {
f.call(PrimitiveStats::<i32>::from_proto(x)?)
}
Some(proto_primitive_stats::Lower::LowerI64(_)) => {
f.call(PrimitiveStats::<i64>::from_proto(x)?)
}
Some(proto_primitive_stats::Lower::LowerF32(_)) => {
f.call(PrimitiveStats::<f32>::from_proto(x)?)
}
Some(proto_primitive_stats::Lower::LowerF64(_)) => {
f.call(PrimitiveStats::<f64>::from_proto(x)?)
}
Some(proto_primitive_stats::Lower::LowerString(_)) => {
f.call(PrimitiveStats::<String>::from_proto(x)?)
}
None => Err(TryFromProtoError::missing_field("ProtoPrimitiveStats::min")),
},
proto_dyn_stats::Kind::Struct(x) => f.call(StructStats::from_proto(x)?),
proto_dyn_stats::Kind::Bytes(x) => f.call(BytesStats::from_proto(x)?),
proto_dyn_stats::Kind::None(x) => f.call(NoneStats::from_proto(x)?),
}
}
macro_rules! primitive_stats_rust_type {
($typ:ty, $lower:ident, $upper:ident) => {
impl RustType<ProtoPrimitiveStats> for PrimitiveStats<$typ> {
fn into_proto(&self) -> ProtoPrimitiveStats {
ProtoPrimitiveStats {
lower: Some(proto_primitive_stats::Lower::$lower(
self.lower.into_proto(),
)),
upper: Some(proto_primitive_stats::Upper::$upper(
self.upper.into_proto(),
)),
}
}
fn from_proto(proto: ProtoPrimitiveStats) -> Result<Self, TryFromProtoError> {
let lower = proto.lower.ok_or_else(|| {
TryFromProtoError::missing_field("ProtoPrimitiveStats::lower")
})?;
let lower = match lower {
proto_primitive_stats::Lower::$lower(x) => x.into_rust()?,
_ => {
return Err(TryFromProtoError::missing_field(
"proto_primitive_stats::Lower::$lower",
))
}
};
let upper = proto.upper.ok_or_else(|| {
TryFromProtoError::missing_field("ProtoPrimitiveStats::max")
})?;
let upper = match upper {
proto_primitive_stats::Upper::$upper(x) => x.into_rust()?,
_ => {
return Err(TryFromProtoError::missing_field(
"proto_primitive_stats::Upper::$upper",
))
}
};
Ok(PrimitiveStats { lower, upper })
}
}
};
}
primitive_stats_rust_type!(bool, LowerBool, UpperBool);
primitive_stats_rust_type!(u8, LowerU8, UpperU8);
primitive_stats_rust_type!(u16, LowerU16, UpperU16);
primitive_stats_rust_type!(u32, LowerU32, UpperU32);
primitive_stats_rust_type!(u64, LowerU64, UpperU64);
primitive_stats_rust_type!(i8, LowerI8, UpperI8);
primitive_stats_rust_type!(i16, LowerI16, UpperI16);
primitive_stats_rust_type!(i32, LowerI32, UpperI32);
primitive_stats_rust_type!(i64, LowerI64, UpperI64);
primitive_stats_rust_type!(f32, LowerF32, UpperF32);
primitive_stats_rust_type!(f64, LowerF64, UpperF64);
primitive_stats_rust_type!(String, LowerString, UpperString);
impl RustType<ProtoPrimitiveBytesStats> for PrimitiveStats<Vec<u8>> {
fn into_proto(&self) -> ProtoPrimitiveBytesStats {
ProtoPrimitiveBytesStats {
lower: self.lower.into_proto(),
upper: self.upper.into_proto(),
}
}
fn from_proto(proto: ProtoPrimitiveBytesStats) -> Result<Self, TryFromProtoError> {
let lower = proto.lower.into_rust()?;
let upper = proto.upper.into_rust()?;
Ok(PrimitiveStats { lower, upper })
}
}
pub(crate) fn any_struct_stats_cols(
) -> impl Strategy<Value = BTreeMap<String, Box<dyn DynStats>>> {
collection::btree_map(any::<String>(), any_box_dyn_stats(), 1..5)
}
fn any_primitive_stats<T>() -> impl Strategy<Value = PrimitiveStats<T>>
where
T: Arbitrary + Ord + Serialize,
PrimitiveStats<T>: RustType<ProtoPrimitiveStats>,
{
Strategy::prop_map(any::<(T, T)>(), |(x0, x1)| {
if x0 <= x1 {
PrimitiveStats {
lower: x0,
upper: x1,
}
} else {
PrimitiveStats {
lower: x1,
upper: x0,
}
}
})
}
fn any_primitive_vec_u8_stats() -> impl Strategy<Value = PrimitiveStats<Vec<u8>>> {
Strategy::prop_map(any::<(Vec<u8>, Vec<u8>)>(), |(x0, x1)| {
if x0 <= x1 {
PrimitiveStats {
lower: x0,
upper: x1,
}
} else {
PrimitiveStats {
lower: x1,
upper: x0,
}
}
})
}
fn any_bytes_stats() -> impl Strategy<Value = BytesStats> {
Union::new(vec![
any_primitive_vec_u8_stats()
.prop_map(BytesStats::Primitive)
.boxed(),
any_json_stats().prop_map(BytesStats::Json).boxed(),
any_primitive_vec_u8_stats()
.prop_map(|x| {
BytesStats::Atomic(AtomicBytesStats {
lower: x.lower,
upper: x.upper,
})
})
.boxed(),
])
}
fn any_json_stats() -> impl Strategy<Value = JsonStats> {
let leaf = Union::new(vec![
any::<()>().prop_map(|_| JsonStats::None).boxed(),
any::<()>().prop_map(|_| JsonStats::Mixed).boxed(),
any::<()>().prop_map(|_| JsonStats::JsonNulls).boxed(),
any_primitive_stats::<bool>()
.prop_map(JsonStats::Bools)
.boxed(),
any_primitive_stats::<String>()
.prop_map(JsonStats::Strings)
.boxed(),
any::<()>().prop_map(|_| JsonStats::Lists).boxed(),
]);
leaf.prop_recursive(2, 5, 3, |inner| {
(collection::btree_map(any::<String>(), inner, 0..3)).prop_map(|cols| {
let cols = cols
.into_iter()
.map(|(k, stats)| (k, JsonMapElementStats { len: 1, stats }))
.collect();
JsonStats::Maps(cols)
})
})
}
fn any_box_dyn_stats() -> impl Strategy<Value = Box<dyn DynStats>> {
fn into_box_dyn_stats<T: DynStats>(x: T) -> Box<dyn DynStats> {
let x: Box<dyn DynStats> = Box::new(x);
x
}
let leaf = Union::new(vec![
any_primitive_stats::<bool>()
.prop_map(into_box_dyn_stats)
.boxed(),
any_primitive_stats::<i64>()
.prop_map(into_box_dyn_stats)
.boxed(),
any_primitive_stats::<String>()
.prop_map(into_box_dyn_stats)
.boxed(),
any_bytes_stats().prop_map(into_box_dyn_stats).boxed(),
]);
leaf.prop_recursive(2, 10, 3, |inner| {
(
any::<usize>(),
collection::btree_map(any::<String>(), inner, 0..3),
)
.prop_map(|(len, cols)| into_box_dyn_stats(StructStats { len, cols }))
})
}
}
#[cfg(test)]
mod tests {
use arrow2::array::BinaryArray;
use mz_proto::RustType;
use proptest::prelude::*;
use crate::columnar::sealed::ColumnMut;
use crate::columnar::ColumnPush;
use crate::dyn_struct::ValidityRef;
use super::*;
#[mz_ore::test]
fn test_truncate_bytes() {
#[track_caller]
fn testcase(x: &[u8], max_len: usize, upper_should_exist: bool) {
let lower = truncate_bytes(x, max_len, TruncateBound::Lower)
.expect("lower should always exist");
assert!(lower.len() <= max_len);
assert!(lower.as_slice() <= x);
let upper = truncate_bytes(x, max_len, TruncateBound::Upper);
assert_eq!(upper_should_exist, upper.is_some());
if let Some(upper) = upper {
assert!(upper.len() <= max_len);
assert!(upper.as_slice() >= x);
}
}
testcase(&[], 0, true);
testcase(&[], 1, true);
testcase(&[1], 0, false);
testcase(&[1], 1, true);
testcase(&[1], 2, true);
testcase(&[1, 2], 1, true);
testcase(&[1, 255], 2, true);
testcase(&[255, 255], 2, true);
testcase(&[255, 255, 255], 2, false);
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn test_truncate_bytes_proptest() {
fn testcase(x: &[u8]) {
for max_len in 0..=x.len() {
let lower = truncate_bytes(x, max_len, TruncateBound::Lower)
.expect("lower should always exist");
let upper = truncate_bytes(x, max_len, TruncateBound::Upper);
assert!(lower.len() <= max_len);
assert!(lower.as_slice() <= x);
if let Some(upper) = upper {
assert!(upper.len() <= max_len);
assert!(upper.as_slice() >= x);
}
}
}
proptest!(|(x in any::<Vec<u8>>())| {
testcase(x.as_slice())
});
}
#[mz_ore::test]
fn test_truncate_string() {
#[track_caller]
fn testcase(x: &str, max_len: usize, upper_should_exist: bool) {
let lower = truncate_string(x, max_len, TruncateBound::Lower)
.expect("lower should always exist");
let upper = truncate_string(x, max_len, TruncateBound::Upper);
assert!(lower.len() <= max_len);
assert!(lower.as_str() <= x);
assert_eq!(upper_should_exist, upper.is_some());
if let Some(upper) = upper {
assert!(upper.len() <= max_len);
assert!(upper.as_str() >= x);
}
}
testcase("", 0, true);
testcase("1", 0, false);
testcase("1", 1, true);
testcase("12", 1, true);
testcase("⛄", 0, false);
testcase("⛄", 1, false);
testcase("⛄", 3, true);
testcase("\u{10FFFF}", 3, false);
testcase("\u{10FFFF}", 4, true);
testcase("\u{10FFFF}", 5, true);
testcase("⛄⛄", 3, true);
testcase("⛄⛄", 4, true);
testcase("⛄\u{10FFFF}", 6, true);
testcase("⛄\u{10FFFF}", 7, true);
testcase("\u{10FFFF}\u{10FFFF}", 7, false);
testcase("\u{10FFFF}\u{10FFFF}", 8, true);
assert_eq!(
truncate_string("⛄⛄", 3, TruncateBound::Upper),
Some("⛅".to_string())
);
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn test_truncate_string_proptest() {
fn testcase(x: &str) {
for max_len in 0..=x.len() {
let lower = truncate_string(x, max_len, TruncateBound::Lower)
.expect("lower should always exist");
let upper = truncate_string(x, max_len, TruncateBound::Upper);
assert!(lower.len() <= max_len);
assert!(lower.as_str() <= x);
if let Some(upper) = upper {
assert!(upper.len() <= max_len + char::MAX.len_utf8());
assert!(upper.as_str() >= x);
}
}
}
proptest!(|(x in any::<String>())| {
testcase(x.as_str())
});
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn primitive_cost_trim_proptest() {
fn primitive_stats<'a, T: Data<Cfg = ()>, F>(xs: &'a [T], f: F) -> (&'a [T], T::Stats)
where
F: for<'b> Fn(&'b T) -> T::Ref<'b>,
{
let mut col = T::Mut::new(&());
for x in xs {
col.push(f(x));
}
let col = T::Col::from(col);
let stats = T::Stats::stats_from(&col, ValidityRef(None));
(xs, stats)
}
fn testcase<T: Data + PartialOrd + Clone + Debug, P>(xs_stats: (&[T], PrimitiveStats<T>))
where
PrimitiveStats<T>: RustType<P> + DynStats,
P: TrimStats,
{
let (xs, stats) = xs_stats;
for x in xs {
assert!(&stats.lower <= x);
assert!(&stats.upper >= x);
}
let mut proto_stats = RustType::into_proto(&stats);
let cost_before = proto_stats.encoded_len();
proto_stats.trim();
assert!(proto_stats.encoded_len() <= cost_before);
let stats: PrimitiveStats<T> = RustType::from_proto(proto_stats).unwrap();
for x in xs {
assert!(&stats.lower <= x);
assert!(&stats.upper >= x);
}
}
proptest!(|(a in any::<bool>(), b in any::<bool>())| {
testcase(primitive_stats(&[a, b], |x| *x))
});
proptest!(|(a in any::<u8>(), b in any::<u8>())| {
testcase(primitive_stats(&[a, b], |x| *x))
});
proptest!(|(a in any::<u16>(), b in any::<u16>())| {
testcase(primitive_stats(&[a, b], |x| *x))
});
proptest!(|(a in any::<u32>(), b in any::<u32>())| {
testcase(primitive_stats(&[a, b], |x| *x))
});
proptest!(|(a in any::<u64>(), b in any::<u64>())| {
testcase(primitive_stats(&[a, b], |x| *x))
});
proptest!(|(a in any::<i8>(), b in any::<i8>())| {
testcase(primitive_stats(&[a, b], |x| *x))
});
proptest!(|(a in any::<i16>(), b in any::<i16>())| {
testcase(primitive_stats(&[a, b], |x| *x))
});
proptest!(|(a in any::<i32>(), b in any::<i32>())| {
testcase(primitive_stats(&[a, b], |x| *x))
});
proptest!(|(a in any::<i64>(), b in any::<i64>())| {
testcase(primitive_stats(&[a, b], |x| *x))
});
proptest!(|(a in any::<f32>(), b in any::<f32>())| {
testcase(primitive_stats(&[a, b], |x| *x))
});
proptest!(|(a in any::<f64>(), b in any::<f64>())| {
testcase(primitive_stats(&[a, b], |x| *x))
});
proptest!(|(prefix in any::<String>(), a in any::<String>(), b in any::<String>())| {
let vals = &[format!("{}{}", prefix, a), format!("{}{}", prefix, b)];
testcase(primitive_stats(vals, |x| x))
});
proptest!(|(prefix in any::<Vec<u8>>(), a in any::<Vec<u8>>(), b in any::<Vec<u8>>())| {
let mut sa = prefix.clone();
sa.extend(&a);
let mut sb = prefix;
sb.extend(&b);
let vals = &[sa, sb];
let stats = PrimitiveStats::<Vec<u8>>::stats_from(&BinaryArray::<i32>::from_slice(vals), ValidityRef(None));
testcase((vals, stats));
});
}
#[mz_ore::test]
fn struct_trim_to_budget() {
#[track_caller]
fn testcase(cols: &[(&str, usize)], required: Option<&str>) {
let cols = cols
.iter()
.map(|(key, cost)| {
let stats: Box<dyn DynStats> = Box::new(PrimitiveStats {
lower: vec![],
upper: vec![0u8; *cost],
});
((*key).to_owned(), stats)
})
.collect();
let mut stats: ProtoStructStats = RustType::into_proto(&StructStats { len: 0, cols });
let mut budget = stats.encoded_len().next_power_of_two();
while budget > 0 {
let cost_before = stats.encoded_len();
let trimmed = trim_to_budget(&mut stats, budget, |col| Some(col) == required);
let cost_after = stats.encoded_len();
assert!(cost_before >= cost_after);
assert_eq!(trimmed, cost_before - cost_after);
if let Some(required) = required {
assert!(stats.cols.contains_key(required));
} else {
assert!(cost_after <= budget);
}
budget = budget / 2;
}
}
testcase(&[], None);
testcase(&[("a", 100)], None);
testcase(&[("a", 1), ("b", 2), ("c", 4)], None);
testcase(&[("a", 1), ("b", 2), ("c", 4)], Some("b"));
}
#[mz_ore::test]
fn jsonb_trim_to_budget() {
#[track_caller]
fn testcase(cols: &[(&str, usize)], required: Option<&str>) {
let cols = cols
.iter()
.map(|(key, cost)| {
let stats = JsonStats::Numerics(PrimitiveStats {
lower: vec![],
upper: vec![0u8; *cost],
});
let len = stats.debug_json().to_string().len();
((*key).to_owned(), JsonMapElementStats { len, stats })
})
.collect();
let stats: ProtoJsonStats = RustType::into_proto(&JsonStats::Maps(cols));
let ProtoJsonStats {
kind: Some(proto_json_stats::Kind::Maps(mut stats)),
} = stats
else {
panic!("serialized produced wrong type!");
};
let mut budget = stats.encoded_len().next_power_of_two();
while budget > 0 {
let cost_before = stats.encoded_len();
trim_to_budget_jsonb(&mut stats, &mut budget, &|col| Some(col) == required);
let cost_after = stats.encoded_len();
assert!(cost_before >= cost_after);
if let Some(required) = required {
assert!(stats
.elements
.iter()
.any(|element| element.name == required));
} else {
assert!(cost_after <= budget);
}
budget = budget / 2;
}
}
testcase(&[], None);
testcase(&[("a", 100)], None);
testcase(&[("a", 1), ("b", 2), ("c", 4)], None);
testcase(&[("a", 1), ("b", 2), ("c", 4)], Some("b"));
}
#[mz_ore::test]
fn jsonb_trim_to_budget_smoke() {
let og_stats = JsonStats::Maps(
[
(
"a".to_string(),
JsonMapElementStats {
len: 1,
stats: JsonStats::Strings(PrimitiveStats {
lower: "foobar".to_string(),
upper: "foobaz".to_string(),
}),
},
),
(
"context".to_string(),
JsonMapElementStats {
len: 100,
stats: JsonStats::Maps(
[
(
"b".to_string(),
JsonMapElementStats {
len: 99,
stats: JsonStats::Numerics(PrimitiveStats {
lower: vec![],
upper: vec![42u8; 99],
}),
},
),
(
"c".to_string(),
JsonMapElementStats {
len: 1,
stats: JsonStats::Bools(PrimitiveStats {
lower: false,
upper: true,
}),
},
),
]
.into(),
),
},
),
]
.into(),
);
let stats: ProtoJsonStats = RustType::into_proto(&og_stats);
let ProtoJsonStats {
kind: Some(proto_json_stats::Kind::Maps(mut stats)),
} = stats
else {
panic!("serialized produced wrong type!");
};
let mut budget_shortfall = 50;
trim_to_budget_jsonb(&mut stats, &mut budget_shortfall, &|_name| false);
let mut elements = stats
.elements
.into_iter()
.map(|element| (element.name.clone(), element))
.collect::<BTreeMap<String, _>>();
assert!(elements.remove("a").is_some());
let context = elements.remove("context").expect("trimmed too much");
let Some(ProtoJsonStats {
kind: Some(proto_json_stats::Kind::Maps(context)),
}) = context.stats
else {
panic!("serialized produced wrong type!")
};
assert_eq!(context.elements.len(), 1);
assert_eq!(context.elements[0].name, "c");
let stats: ProtoJsonStats = RustType::into_proto(&og_stats);
let ProtoJsonStats {
kind: Some(proto_json_stats::Kind::Maps(mut stats)),
} = stats
else {
panic!("serialized produced wrong type!");
};
let mut budget_shortfall = 50;
trim_to_budget_jsonb(&mut stats, &mut budget_shortfall, &|name| name == "b");
assert_eq!(stats.elements.len(), 1);
assert_eq!(stats.elements[0].name, "context");
let Some(ProtoJsonStats {
kind: Some(proto_json_stats::Kind::Maps(context)),
}) = &stats.elements[0].stats
else {
panic!("serialized produced wrong type!")
};
assert_eq!(context.elements.len(), 1);
assert_eq!(context.elements[0].name, "b");
}
#[mz_ore::test]
fn stats_trim_regression_json() {
#[track_caller]
fn testcase(stats: JsonStats) {
let mut stats = stats.into_proto();
let before = stats.encoded_len();
stats.trim();
let after = stats.encoded_len();
assert!(after < before, "{} vs {}: {:?}", after, before, stats);
}
let col = JsonStats::Strings(PrimitiveStats {
lower: "foobar".into(),
upper: "foobaz".into(),
});
testcase(col.clone());
let mut cols = BTreeMap::new();
cols.insert("col".into(), JsonMapElementStats { len: 1, stats: col });
testcase(JsonStats::Maps(cols));
}
#[mz_ore::test]
fn trim_order_regression() {
fn dyn_stats(lower: &'static str, upper: &'static str) -> Box<dyn DynStats> {
Box::new(PrimitiveStats {
lower: lower.to_owned(),
upper: upper.to_owned(),
})
}
let stats = StructStats {
len: 2,
cols: BTreeMap::from([
("foo".to_owned(), dyn_stats("a", "b")),
(
"bar".to_owned(),
dyn_stats("aaaaaaaaaaaaaaaaaa", "aaaaaaaaaaaaaaaaab"),
),
]),
};
let mut proto_stats = RustType::into_proto(&stats);
trim_to_budget(&mut proto_stats, 30, |_| false);
assert!(proto_stats.cols.contains_key("foo"));
assert!(!proto_stats.cols.contains_key("bar"));
}
#[mz_ore::test]
fn stats_trim_to_budget_regression_recursion() {
fn str_stats(n: usize, l: &str, u: &str) -> Box<dyn DynStats> {
let stats: Box<dyn DynStats> = Box::new(OptionStats {
none: n,
some: PrimitiveStats {
lower: l.to_owned(),
upper: u.to_owned(),
},
});
stats
}
const BIG: usize = 100;
let mut cols = BTreeMap::new();
for col in 'a'..='z' {
let col = col.to_string();
let stats = str_stats(2, "", &col.repeat(BIG));
cols.insert(col, stats);
}
cols.insert("foo_timestamp".to_string(), str_stats(2, "foo", "foo"));
let source_data_stats = StructStats {
len: 2,
cols: BTreeMap::from([
("err".to_owned(), str_stats(2, "", "")),
("ok".to_owned(), Box::new(StructStats { len: 2, cols })),
]),
};
let mut proto_stats = RustType::into_proto(&source_data_stats);
let trimmed = trim_to_budget(&mut proto_stats, BIG, |x| {
x.ends_with("timestamp") || x == "err"
});
assert!(trimmed > 0);
assert!(proto_stats.cols.contains_key("ok"));
assert!(proto_stats.cols.contains_key("err"));
let ok = proto_stats.cols.get("ok").unwrap();
let proto_dyn_stats::Kind::Struct(ok_struct) = ok.kind.as_ref().unwrap() else {
panic!("ok was of unexpected type {:?}", ok);
};
assert!(ok_struct.cols.contains_key("foo_timestamp"));
}
#[mz_ore::test]
fn stats_trim_iso8601_recursion() {
use proto_primitive_stats::*;
let orig = PrimitiveStats {
lower: "2023-08-19T12:00:00.000Z".to_owned(),
upper: "2023-08-20T12:00:00.000Z".to_owned(),
};
let mut stats = RustType::into_proto(&orig);
stats.trim();
assert_eq!(stats.lower.unwrap(), Lower::LowerString(orig.lower));
assert_eq!(stats.upper.unwrap(), Upper::UpperString(orig.upper));
}
}