1#![allow(missing_docs)] use std::fmt::Debug;
15
16use anyhow::Context;
17use arrow::array::{
18 BinaryArray, BooleanArray, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array,
19 Int64Array, StringArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
20};
21use mz_ore::cast::CastFrom;
22use mz_ore::metrics::{IntCounter, MetricsRegistry};
23use mz_ore::{assert_none, metric};
24use mz_proto::{ProtoType, RustType, TryFromProtoError};
25use proptest::prelude::*;
26use proptest::strategy::{Strategy, Union};
27use proptest_derive::Arbitrary;
28use prost::Message;
29
30use crate::columnar::{ColumnDecoder, Schema};
31use crate::part::Part;
32use crate::stats::bytes::any_bytes_stats;
33use crate::stats::primitive::any_primitive_stats;
34
35pub mod bytes;
36pub mod json;
37pub mod primitive;
38pub mod structured;
39
40pub use bytes::{AtomicBytesStats, BytesStats, FixedSizeBytesStats, FixedSizeBytesStatsKind};
41pub use json::{JsonMapElementStats, JsonStats};
42pub use primitive::{
43 PrimitiveStats, PrimitiveStatsVariants, TRUNCATE_LEN, TruncateBound, truncate_bytes,
44 truncate_string,
45};
46pub use structured::StructStats;
47
48include!(concat!(env!("OUT_DIR"), "/mz_persist_types.stats.rs"));
49
50#[derive(Debug, Clone)]
52pub struct ColumnarStats {
53 pub nulls: Option<ColumnNullStats>,
55 pub values: ColumnStatKinds,
57}
58
59impl ColumnarStats {
60 pub fn one_column_struct(len: usize, col: ColumnStatKinds) -> StructStats {
62 let col = ColumnarStats {
63 nulls: None,
64 values: col,
65 };
66 StructStats {
67 len,
68 cols: [("".to_owned(), col)].into_iter().collect(),
69 }
70 }
71
72 pub fn as_non_null_values(&self) -> Option<&ColumnStatKinds> {
74 match self.nulls {
75 None => Some(&self.values),
76 Some(_) => None,
77 }
78 }
79
80 pub fn into_non_null_values(self) -> Option<ColumnStatKinds> {
82 match self.nulls {
83 None => Some(self.values),
84 Some(_) => None,
85 }
86 }
87
88 pub fn into_struct_stats(self) -> Option<StructStats> {
91 match self.into_non_null_values()? {
92 ColumnStatKinds::Struct(stats) => Some(stats),
93 _ => None,
94 }
95 }
96
97 fn try_as_stats<'a, T, F>(&'a self, map: F) -> Result<T, anyhow::Error>
99 where
100 F: FnOnce(&'a ColumnStatKinds) -> Result<T, anyhow::Error>,
101 {
102 let inner = map(&self.values)?;
103 match self.nulls {
104 Some(nulls) => Err(anyhow::anyhow!(
105 "expected non-nullable stats, found nullable {nulls:?}"
106 )),
107 None => Ok(inner),
108 }
109 }
110
111 fn try_as_option_stats<'a, T, F>(&'a self, map: F) -> Result<OptionStats<T>, anyhow::Error>
113 where
114 F: FnOnce(&'a ColumnStatKinds) -> Result<T, anyhow::Error>,
115 {
116 let inner = map(&self.values)?;
117 match self.nulls {
118 Some(nulls) => Ok(OptionStats {
119 none: nulls.count,
120 some: inner,
121 }),
122 None => Err(anyhow::anyhow!(
123 "expected nullable stats, found non-nullable"
124 )),
125 }
126 }
127
128 pub fn try_as_optional_struct(&self) -> Result<OptionStats<&StructStats>, anyhow::Error> {
131 self.try_as_option_stats(|values| match values {
132 ColumnStatKinds::Struct(inner) => Ok(inner),
133 other => anyhow::bail!("expected StructStats found {other:?}"),
134 })
135 }
136
137 pub fn try_as_optional_bytes(&self) -> Result<OptionStats<&BytesStats>, anyhow::Error> {
140 self.try_as_option_stats(|values| match values {
141 ColumnStatKinds::Bytes(inner) => Ok(inner),
142 other => anyhow::bail!("expected BytesStats found {other:?}"),
143 })
144 }
145
146 pub fn try_as_string(&self) -> Result<&PrimitiveStats<String>, anyhow::Error> {
149 self.try_as_stats(|values| match values {
150 ColumnStatKinds::Primitive(PrimitiveStatsVariants::String(inner)) => Ok(inner),
151 other => anyhow::bail!("expected PrimitiveStats<String> found {other:?}"),
152 })
153 }
154}
155
156impl DynStats for ColumnarStats {
157 fn debug_json(&self) -> serde_json::Value {
158 let value_json = self.values.debug_json();
159
160 match (&self.nulls, value_json) {
161 (Some(nulls), serde_json::Value::Object(mut x)) => {
162 if nulls.count > 0 {
163 x.insert("nulls".to_owned(), nulls.count.into());
164 }
165 serde_json::Value::Object(x)
166 }
167 (Some(nulls), x) => {
168 serde_json::json!({"nulls": nulls.count, "not nulls": x})
169 }
170 (None, x) => x,
171 }
172 }
173
174 fn into_columnar_stats(self) -> ColumnarStats {
175 self
176 }
177}
178
179#[derive(Debug, Copy, Clone)]
181pub struct ColumnNullStats {
182 pub count: usize,
184}
185
186impl RustType<ProtoOptionStats> for ColumnNullStats {
187 fn into_proto(&self) -> ProtoOptionStats {
188 ProtoOptionStats {
189 none: u64::cast_from(self.count),
190 }
191 }
192
193 fn from_proto(proto: ProtoOptionStats) -> Result<Self, TryFromProtoError> {
194 Ok(ColumnNullStats {
195 count: usize::cast_from(proto.none),
196 })
197 }
198}
199
200#[derive(Debug, Clone)]
202pub enum ColumnStatKinds {
203 Primitive(PrimitiveStatsVariants),
205 Struct(StructStats),
207 Bytes(BytesStats),
209 None,
211}
212
213impl DynStats for ColumnStatKinds {
214 fn debug_json(&self) -> serde_json::Value {
215 match self {
216 ColumnStatKinds::Primitive(prim) => prim.debug_json(),
217 ColumnStatKinds::Struct(x) => x.debug_json(),
218 ColumnStatKinds::Bytes(bytes) => bytes.debug_json(),
219 ColumnStatKinds::None => NoneStats.debug_json(),
220 }
221 }
222
223 fn into_columnar_stats(self) -> ColumnarStats {
224 ColumnarStats {
225 nulls: None,
226 values: self,
227 }
228 }
229}
230
231impl RustType<proto_dyn_stats::Kind> for ColumnStatKinds {
232 fn into_proto(&self) -> proto_dyn_stats::Kind {
233 match self {
234 ColumnStatKinds::Primitive(prim) => {
235 proto_dyn_stats::Kind::Primitive(RustType::into_proto(prim))
236 }
237 ColumnStatKinds::Struct(x) => proto_dyn_stats::Kind::Struct(RustType::into_proto(x)),
238 ColumnStatKinds::Bytes(bytes) => {
239 proto_dyn_stats::Kind::Bytes(RustType::into_proto(bytes))
240 }
241 ColumnStatKinds::None => proto_dyn_stats::Kind::None(()),
242 }
243 }
244
245 fn from_proto(proto: proto_dyn_stats::Kind) -> Result<Self, TryFromProtoError> {
246 let stats = match proto {
247 proto_dyn_stats::Kind::Primitive(prim) => ColumnStatKinds::Primitive(prim.into_rust()?),
248 proto_dyn_stats::Kind::Struct(x) => ColumnStatKinds::Struct(x.into_rust()?),
249 proto_dyn_stats::Kind::Bytes(bytes) => ColumnStatKinds::Bytes(bytes.into_rust()?),
250 proto_dyn_stats::Kind::None(_) => ColumnStatKinds::None,
251 };
252 Ok(stats)
253 }
254}
255
256impl<T: Into<PrimitiveStatsVariants>> From<T> for ColumnStatKinds {
257 fn from(value: T) -> Self {
258 ColumnStatKinds::Primitive(value.into())
259 }
260}
261
262impl From<PrimitiveStats<Vec<u8>>> for ColumnStatKinds {
263 fn from(value: PrimitiveStats<Vec<u8>>) -> Self {
264 ColumnStatKinds::Bytes(BytesStats::Primitive(value))
265 }
266}
267
268impl From<StructStats> for ColumnStatKinds {
269 fn from(value: StructStats) -> Self {
270 ColumnStatKinds::Struct(value)
271 }
272}
273
274impl From<BytesStats> for ColumnStatKinds {
275 fn from(value: BytesStats) -> Self {
276 ColumnStatKinds::Bytes(value)
277 }
278}
279
280#[derive(Debug)]
282pub struct PartStatsMetrics {
283 pub mismatched_count: IntCounter,
284}
285
286impl PartStatsMetrics {
287 pub fn new(registry: &MetricsRegistry) -> Self {
288 PartStatsMetrics {
289 mismatched_count: registry.register(metric!(
290 name: "mz_persist_pushdown_parts_mismatched_stats_count",
291 help: "number of parts read with unexpectedly the incorrect type of stats",
292 )),
293 }
294 }
295}
296
297pub trait ColumnStats: DynStats {
299 type Ref<'a>
301 where
302 Self: 'a;
303
304 fn lower<'a>(&'a self) -> Option<Self::Ref<'a>>;
315 fn upper<'a>(&'a self) -> Option<Self::Ref<'a>>;
317 fn none_count(&self) -> usize;
319}
320
321pub trait ColumnarStatsBuilder<T>: Debug + DynStats {
323 type ArrowColumn: arrow::array::Array + 'static;
325
326 fn from_column(col: &Self::ArrowColumn) -> Self
328 where
329 Self: Sized;
330}
331
332pub trait DynStats: Debug + Send + Sync + 'static {
338 fn type_name(&self) -> &'static str {
340 std::any::type_name::<Self>()
341 }
342
343 fn debug_json(&self) -> serde_json::Value;
345
346 fn into_columnar_stats(self) -> ColumnarStats;
348}
349
350pub trait TrimStats: Message {
352 fn trim(&mut self);
358}
359
360#[derive(Arbitrary, Debug)]
362pub struct PartStats {
363 pub key: StructStats,
365}
366
367impl serde::Serialize for PartStats {
368 fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
369 let PartStats { key } = self;
370 key.serialize(s)
371 }
372}
373
374impl PartStats {
375 pub fn new<T, K>(part: &Part, desc: &K) -> Result<Self, anyhow::Error>
377 where
378 K: Schema<T, Statistics = StructStats>,
379 {
380 let decoder = K::decoder_any(desc, &part.key).context("decoder_any")?;
381 let stats = decoder.stats();
382 Ok(PartStats { key: stats })
383 }
384}
385
386pub struct OptionStats<T> {
388 pub some: T,
390 pub none: usize,
392}
393
394impl<T: DynStats> Debug for OptionStats<T> {
395 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396 Debug::fmt(&self.debug_json(), f)
397 }
398}
399
400impl<T: DynStats> DynStats for OptionStats<T> {
401 fn debug_json(&self) -> serde_json::Value {
402 match self.some.debug_json() {
403 serde_json::Value::Object(mut x) => {
404 if self.none > 0 {
405 x.insert("nulls".to_owned(), self.none.into());
406 }
407 serde_json::Value::Object(x)
408 }
409 s => {
410 serde_json::json!({"nulls": self.none, "not nulls": s})
411 }
412 }
413 }
414
415 fn into_columnar_stats(self) -> ColumnarStats {
416 let inner = self.some.into_columnar_stats();
417 assert_none!(inner.nulls, "we don't support nested OptionStats");
418
419 ColumnarStats {
420 nulls: Some(ColumnNullStats { count: self.none }),
421 values: inner.values,
422 }
423 }
424}
425
426#[derive(Debug)]
428#[cfg_attr(any(test), derive(Clone))]
429pub struct NoneStats;
430
431impl DynStats for NoneStats {
432 fn debug_json(&self) -> serde_json::Value {
433 serde_json::Value::String(format!("{self:?}"))
434 }
435
436 fn into_columnar_stats(self) -> ColumnarStats {
437 ColumnarStats {
438 nulls: None,
439 values: ColumnStatKinds::None,
440 }
441 }
442}
443
444impl ColumnStats for NoneStats {
445 type Ref<'a> = ();
446
447 fn lower<'a>(&'a self) -> Option<Self::Ref<'a>> {
448 None
449 }
450
451 fn upper<'a>(&'a self) -> Option<Self::Ref<'a>> {
452 None
453 }
454
455 fn none_count(&self) -> usize {
456 0
457 }
458}
459
460impl ColumnStats for OptionStats<NoneStats> {
461 type Ref<'a> = Option<()>;
462
463 fn lower<'a>(&'a self) -> Option<Self::Ref<'a>> {
464 None
465 }
466
467 fn upper<'a>(&'a self) -> Option<Self::Ref<'a>> {
468 None
469 }
470
471 fn none_count(&self) -> usize {
472 self.none
473 }
474}
475
476impl RustType<()> for NoneStats {
477 fn into_proto(&self) -> () {
478 ()
479 }
480
481 fn from_proto(_proto: ()) -> Result<Self, TryFromProtoError> {
482 Ok(NoneStats)
483 }
484}
485
486macro_rules! primitive_stats {
492 ($native:ty, $arrow_col:ty, $min_fn:path, $max_fn:path) => {
493 impl ColumnarStatsBuilder<$native> for PrimitiveStats<$native> {
494 type ArrowColumn = $arrow_col;
495
496 fn from_column(col: &Self::ArrowColumn) -> Self
497 where
498 Self: Sized,
499 {
500 let lower = $min_fn(col).unwrap_or_default();
501 let upper = $max_fn(col).unwrap_or_default();
502
503 PrimitiveStats { lower, upper }
504 }
505 }
506 };
507}
508
509primitive_stats!(
510 bool,
511 BooleanArray,
512 arrow::compute::min_boolean,
513 arrow::compute::max_boolean
514);
515primitive_stats!(u8, UInt8Array, arrow::compute::min, arrow::compute::max);
516primitive_stats!(u16, UInt16Array, arrow::compute::min, arrow::compute::max);
517primitive_stats!(u32, UInt32Array, arrow::compute::min, arrow::compute::max);
518primitive_stats!(u64, UInt64Array, arrow::compute::min, arrow::compute::max);
519primitive_stats!(i8, Int8Array, arrow::compute::min, arrow::compute::max);
520primitive_stats!(i16, Int16Array, arrow::compute::min, arrow::compute::max);
521primitive_stats!(i32, Int32Array, arrow::compute::min, arrow::compute::max);
522primitive_stats!(i64, Int64Array, arrow::compute::min, arrow::compute::max);
523primitive_stats!(f32, Float32Array, arrow::compute::min, arrow::compute::max);
524primitive_stats!(f64, Float64Array, arrow::compute::min, arrow::compute::max);
525
526impl ColumnarStatsBuilder<&str> for PrimitiveStats<String> {
527 type ArrowColumn = StringArray;
528 fn from_column(col: &Self::ArrowColumn) -> Self
529 where
530 Self: Sized,
531 {
532 let lower = arrow::compute::min_string(col).unwrap_or_default();
533 let lower = truncate_string(lower, TRUNCATE_LEN, TruncateBound::Lower)
534 .expect("lower bound should always truncate");
535 let upper = arrow::compute::max_string(col).unwrap_or_default();
536 let upper = truncate_string(upper, TRUNCATE_LEN, TruncateBound::Upper)
537 .unwrap_or_else(|| upper.to_owned());
541
542 PrimitiveStats { lower, upper }
543 }
544}
545
546impl ColumnarStatsBuilder<&[u8]> for PrimitiveStats<Vec<u8>> {
547 type ArrowColumn = BinaryArray;
548 fn from_column(col: &Self::ArrowColumn) -> Self
549 where
550 Self: Sized,
551 {
552 let lower = arrow::compute::min_binary(col).unwrap_or_default();
553 let lower = truncate_bytes(lower, TRUNCATE_LEN, TruncateBound::Lower)
554 .expect("lower bound should always truncate");
555 let upper = arrow::compute::max_binary(col).unwrap_or_default();
556 let upper = truncate_bytes(upper, TRUNCATE_LEN, TruncateBound::Upper)
557 .unwrap_or_else(|| upper.to_owned());
561
562 PrimitiveStats { lower, upper }
563 }
564}
565
566pub fn trim_to_budget(
575 stats: &mut ProtoStructStats,
576 budget: usize,
577 force_keep_col: impl Fn(&str) -> bool,
578) -> usize {
579 let original_cost = stats.encoded_len();
581 if original_cost <= budget {
582 return 0;
583 }
584
585 stats.trim();
587 let new_cost = stats.encoded_len();
588 if new_cost <= budget {
589 return original_cost.saturating_sub(new_cost);
590 }
591
592 let mut budget_shortfall = new_cost.saturating_sub(budget);
601 trim_to_budget_struct(stats, &mut budget_shortfall, &force_keep_col);
602 original_cost.saturating_sub(stats.encoded_len())
603}
604
605fn trim_to_budget_struct(
608 stats: &mut ProtoStructStats,
609 budget_shortfall: &mut usize,
610 force_keep_col: &impl Fn(&str) -> bool,
611) {
612 let mut col_costs: Vec<_> = stats
620 .cols
621 .iter()
622 .map(|(name, stats)| (name.to_owned(), stats.encoded_len()))
623 .collect();
624 col_costs.sort_unstable_by_key(|(_, c)| *c);
625
626 while *budget_shortfall > 0 {
627 let Some((name, cost)) = col_costs.pop() else {
628 break;
629 };
630
631 if force_keep_col(&name) {
632 continue;
633 }
634
635 let col_stats = stats.cols.get_mut(&name).expect("col exists");
639 match &mut col_stats.kind {
640 Some(proto_dyn_stats::Kind::Struct(col_struct)) => {
641 trim_to_budget_struct(col_struct, budget_shortfall, force_keep_col);
642 if *budget_shortfall == 0 {
644 break;
645 }
646 if !col_struct.cols.is_empty() {
649 continue;
650 }
651 *budget_shortfall = budget_shortfall.saturating_sub(col_struct.encoded_len() + 1);
654 stats.cols.remove(&name);
655 }
656 Some(proto_dyn_stats::Kind::Bytes(ProtoBytesStats {
657 kind:
658 Some(proto_bytes_stats::Kind::Json(ProtoJsonStats {
659 kind: Some(proto_json_stats::Kind::Maps(col_jsonb)),
660 })),
661 })) => {
662 trim_to_budget_jsonb(col_jsonb, budget_shortfall, force_keep_col);
663 if *budget_shortfall == 0 {
665 break;
666 }
667 if !col_jsonb.elements.is_empty() {
670 continue;
671 }
672 *budget_shortfall = budget_shortfall.saturating_sub(col_jsonb.encoded_len() + 1);
675 stats.cols.remove(&name);
676 }
677 _ => {
678 stats.cols.remove(&name);
679 *budget_shortfall = budget_shortfall.saturating_sub(cost + 1);
683 }
684 }
685 }
686}
687
688fn trim_to_budget_jsonb(
689 stats: &mut ProtoJsonMapStats,
690 budget_shortfall: &mut usize,
691 force_keep_col: &impl Fn(&str) -> bool,
692) {
693 stats
701 .elements
702 .sort_unstable_by_key(|element| element.encoded_len());
703
704 let mut stats_to_keep = Vec::with_capacity(stats.elements.len());
708
709 while *budget_shortfall > 0 {
710 let Some(mut column) = stats.elements.pop() else {
711 break;
712 };
713
714 if force_keep_col(&column.name) {
716 stats_to_keep.push(column);
717 continue;
718 }
719
720 if let Some(ProtoJsonStats {
722 kind: Some(proto_json_stats::Kind::Maps(ref mut col_jsonb)),
723 }) = column.stats
724 {
725 trim_to_budget_jsonb(col_jsonb, budget_shortfall, force_keep_col);
726
727 if !col_jsonb.elements.is_empty() {
729 stats_to_keep.push(column);
730 }
731
732 if *budget_shortfall == 0 {
734 break;
735 }
736 } else {
737 *budget_shortfall = budget_shortfall.saturating_sub(column.encoded_len() + 1);
741 }
742 }
743
744 stats.elements.extend(stats_to_keep);
746}
747
748impl RustType<ProtoDynStats> for ColumnarStats {
749 fn into_proto(&self) -> ProtoDynStats {
750 let option = self.nulls.as_ref().map(|n| n.into_proto());
751 let kind = RustType::into_proto(&self.values);
752
753 ProtoDynStats {
754 option,
755 kind: Some(kind),
756 }
757 }
758
759 fn from_proto(proto: ProtoDynStats) -> Result<Self, TryFromProtoError> {
760 let kind = proto
761 .kind
762 .ok_or_else(|| TryFromProtoError::missing_field("ProtoDynStats::kind"))?;
763
764 Ok(ColumnarStats {
765 nulls: proto.option.into_rust()?,
766 values: kind.into_rust()?,
767 })
768 }
769}
770
771pub(crate) fn any_columnar_stats() -> impl Strategy<Value = ColumnarStats> {
773 let leaf = Union::new(vec![
774 any_primitive_stats::<bool>()
775 .prop_map(|s| ColumnStatKinds::Primitive(s.into()))
776 .boxed(),
777 any_primitive_stats::<i64>()
778 .prop_map(|s| ColumnStatKinds::Primitive(s.into()))
779 .boxed(),
780 any_primitive_stats::<String>()
781 .prop_map(|s| ColumnStatKinds::Primitive(s.into()))
782 .boxed(),
783 any_bytes_stats().prop_map(ColumnStatKinds::Bytes).boxed(),
784 ])
785 .prop_map(|values| ColumnarStats {
786 nulls: None,
787 values,
788 });
789
790 leaf.prop_recursive(2, 10, 3, |inner| {
791 (
792 any::<usize>(),
793 proptest::collection::btree_map(any::<String>(), inner, 0..3),
794 )
795 .prop_map(|(len, cols)| {
796 let values = ColumnStatKinds::Struct(StructStats { len, cols });
797 ColumnarStats {
798 nulls: None,
799 values,
800 }
801 })
802 })
803}