mz_persist_types/
stats.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)] // For generated protos.
11
12//! Aggregate statistics about data stored in persist.
13
14use std::fmt::Debug;
15
16use anyhow::Context;
17use arrow::array::{
18    BinaryArray, BooleanArray, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array,
19    Int64Array, StringArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
20};
21use mz_ore::cast::CastFrom;
22use mz_ore::metrics::{IntCounter, MetricsRegistry};
23use mz_ore::{assert_none, metric};
24use mz_proto::{ProtoType, RustType, TryFromProtoError};
25use proptest::prelude::*;
26use proptest::strategy::{Strategy, Union};
27use proptest_derive::Arbitrary;
28use prost::Message;
29
30use crate::columnar::{ColumnDecoder, Schema};
31use crate::part::Part;
32use crate::stats::bytes::any_bytes_stats;
33use crate::stats::primitive::any_primitive_stats;
34
35pub mod bytes;
36pub mod json;
37pub mod primitive;
38pub mod structured;
39
40pub use bytes::{AtomicBytesStats, BytesStats, FixedSizeBytesStats, FixedSizeBytesStatsKind};
41pub use json::{JsonMapElementStats, JsonStats};
42pub use primitive::{
43    PrimitiveStats, PrimitiveStatsVariants, TRUNCATE_LEN, TruncateBound, truncate_bytes,
44    truncate_string,
45};
46pub use structured::StructStats;
47
48include!(concat!(env!("OUT_DIR"), "/mz_persist_types.stats.rs"));
49
50/// Statistics for a single column of data.
51#[derive(Debug, Clone)]
52pub struct ColumnarStats {
53    /// Expected to be `None` if the associated column is non-nullable.
54    pub nulls: Option<ColumnNullStats>,
55    /// Statistics on the values of the column.
56    pub values: ColumnStatKinds,
57}
58
59impl ColumnarStats {
60    /// Returns a `[StructStats]` with a single non-nullable column.
61    pub fn one_column_struct(len: usize, col: ColumnStatKinds) -> StructStats {
62        let col = ColumnarStats {
63            nulls: None,
64            values: col,
65        };
66        StructStats {
67            len,
68            cols: [("".to_owned(), col)].into_iter().collect(),
69        }
70    }
71
72    /// Returns the inner [`ColumnStatKinds`] if `nulls` is [`None`].
73    pub fn as_non_null_values(&self) -> Option<&ColumnStatKinds> {
74        match self.nulls {
75            None => Some(&self.values),
76            Some(_) => None,
77        }
78    }
79
80    /// Returns the inner [`ColumnStatKinds`] if `nulls` is [`None`].
81    pub fn into_non_null_values(self) -> Option<ColumnStatKinds> {
82        match self.nulls {
83            None => Some(self.values),
84            Some(_) => None,
85        }
86    }
87
88    /// Returns the inner [`StructStats`] if `nulls` is [`None`] and `values`
89    /// is [`ColumnStatKinds::Struct`].
90    pub fn into_struct_stats(self) -> Option<StructStats> {
91        match self.into_non_null_values()? {
92            ColumnStatKinds::Struct(stats) => Some(stats),
93            _ => None,
94        }
95    }
96
97    /// Helper method to "downcast" to stats of type `T`.
98    fn try_as_stats<'a, T, F>(&'a self, map: F) -> Result<T, anyhow::Error>
99    where
100        F: FnOnce(&'a ColumnStatKinds) -> Result<T, anyhow::Error>,
101    {
102        let inner = map(&self.values)?;
103        match self.nulls {
104            Some(nulls) => Err(anyhow::anyhow!(
105                "expected non-nullable stats, found nullable {nulls:?}"
106            )),
107            None => Ok(inner),
108        }
109    }
110
111    /// Helper method to "downcast" [`OptionStats<T>`].
112    fn try_as_option_stats<'a, T, F>(&'a self, map: F) -> Result<OptionStats<T>, anyhow::Error>
113    where
114        F: FnOnce(&'a ColumnStatKinds) -> Result<T, anyhow::Error>,
115    {
116        let inner = map(&self.values)?;
117        match self.nulls {
118            Some(nulls) => Ok(OptionStats {
119                none: nulls.count,
120                some: inner,
121            }),
122            None => Err(anyhow::anyhow!(
123                "expected nullable stats, found non-nullable"
124            )),
125        }
126    }
127
128    /// Tries to "downcast" this instance of [`ColumnarStats`] to an [`OptionStats<StructStats>`]
129    /// if the inner statistics are nullable and for a structured column.
130    pub fn try_as_optional_struct(&self) -> Result<OptionStats<&StructStats>, anyhow::Error> {
131        self.try_as_option_stats(|values| match values {
132            ColumnStatKinds::Struct(inner) => Ok(inner),
133            other => anyhow::bail!("expected StructStats found {other:?}"),
134        })
135    }
136
137    /// Tries to "downcast" this instance of [`ColumnarStats`] to an [`OptionStats<BytesStats>`]
138    /// if the inner statistics are nullable and for a column of bytes.
139    pub fn try_as_optional_bytes(&self) -> Result<OptionStats<&BytesStats>, anyhow::Error> {
140        self.try_as_option_stats(|values| match values {
141            ColumnStatKinds::Bytes(inner) => Ok(inner),
142            other => anyhow::bail!("expected BytesStats found {other:?}"),
143        })
144    }
145
146    /// Tries to "downcast" this instance of [`ColumnarStats`] to a [`PrimitiveStats<String>`]
147    /// if the inner statistics are nullable and for a structured column.
148    pub fn try_as_string(&self) -> Result<&PrimitiveStats<String>, anyhow::Error> {
149        self.try_as_stats(|values| match values {
150            ColumnStatKinds::Primitive(PrimitiveStatsVariants::String(inner)) => Ok(inner),
151            other => anyhow::bail!("expected PrimitiveStats<String> found {other:?}"),
152        })
153    }
154}
155
156impl DynStats for ColumnarStats {
157    fn debug_json(&self) -> serde_json::Value {
158        let value_json = self.values.debug_json();
159
160        match (&self.nulls, value_json) {
161            (Some(nulls), serde_json::Value::Object(mut x)) => {
162                if nulls.count > 0 {
163                    x.insert("nulls".to_owned(), nulls.count.into());
164                }
165                serde_json::Value::Object(x)
166            }
167            (Some(nulls), x) => {
168                serde_json::json!({"nulls": nulls.count, "not nulls": x})
169            }
170            (None, x) => x,
171        }
172    }
173
174    fn into_columnar_stats(self) -> ColumnarStats {
175        self
176    }
177}
178
179/// Statistics about the null values in a column.
180#[derive(Debug, Copy, Clone)]
181pub struct ColumnNullStats {
182    /// Number of nulls in the column.
183    pub count: usize,
184}
185
186impl RustType<ProtoOptionStats> for ColumnNullStats {
187    fn into_proto(&self) -> ProtoOptionStats {
188        ProtoOptionStats {
189            none: u64::cast_from(self.count),
190        }
191    }
192
193    fn from_proto(proto: ProtoOptionStats) -> Result<Self, TryFromProtoError> {
194        Ok(ColumnNullStats {
195            count: usize::cast_from(proto.none),
196        })
197    }
198}
199
200/// All of the kinds of statistics that we support.
201#[derive(Debug, Clone)]
202pub enum ColumnStatKinds {
203    /// Primitive stats that maintin just an upper and lower bound.
204    Primitive(PrimitiveStatsVariants),
205    /// Statistics for objects with multiple fields.
206    Struct(StructStats),
207    /// Statistics about a column of binary data.
208    Bytes(BytesStats),
209    /// Maintain no statistics for a given column.
210    None,
211}
212
213impl DynStats for ColumnStatKinds {
214    fn debug_json(&self) -> serde_json::Value {
215        match self {
216            ColumnStatKinds::Primitive(prim) => prim.debug_json(),
217            ColumnStatKinds::Struct(x) => x.debug_json(),
218            ColumnStatKinds::Bytes(bytes) => bytes.debug_json(),
219            ColumnStatKinds::None => NoneStats.debug_json(),
220        }
221    }
222
223    fn into_columnar_stats(self) -> ColumnarStats {
224        ColumnarStats {
225            nulls: None,
226            values: self,
227        }
228    }
229}
230
231impl RustType<proto_dyn_stats::Kind> for ColumnStatKinds {
232    fn into_proto(&self) -> proto_dyn_stats::Kind {
233        match self {
234            ColumnStatKinds::Primitive(prim) => {
235                proto_dyn_stats::Kind::Primitive(RustType::into_proto(prim))
236            }
237            ColumnStatKinds::Struct(x) => proto_dyn_stats::Kind::Struct(RustType::into_proto(x)),
238            ColumnStatKinds::Bytes(bytes) => {
239                proto_dyn_stats::Kind::Bytes(RustType::into_proto(bytes))
240            }
241            ColumnStatKinds::None => proto_dyn_stats::Kind::None(()),
242        }
243    }
244
245    fn from_proto(proto: proto_dyn_stats::Kind) -> Result<Self, TryFromProtoError> {
246        let stats = match proto {
247            proto_dyn_stats::Kind::Primitive(prim) => ColumnStatKinds::Primitive(prim.into_rust()?),
248            proto_dyn_stats::Kind::Struct(x) => ColumnStatKinds::Struct(x.into_rust()?),
249            proto_dyn_stats::Kind::Bytes(bytes) => ColumnStatKinds::Bytes(bytes.into_rust()?),
250            proto_dyn_stats::Kind::None(_) => ColumnStatKinds::None,
251        };
252        Ok(stats)
253    }
254}
255
256impl<T: Into<PrimitiveStatsVariants>> From<T> for ColumnStatKinds {
257    fn from(value: T) -> Self {
258        ColumnStatKinds::Primitive(value.into())
259    }
260}
261
262impl From<PrimitiveStats<Vec<u8>>> for ColumnStatKinds {
263    fn from(value: PrimitiveStats<Vec<u8>>) -> Self {
264        ColumnStatKinds::Bytes(BytesStats::Primitive(value))
265    }
266}
267
268impl From<StructStats> for ColumnStatKinds {
269    fn from(value: StructStats) -> Self {
270        ColumnStatKinds::Struct(value)
271    }
272}
273
274impl From<BytesStats> for ColumnStatKinds {
275    fn from(value: BytesStats) -> Self {
276        ColumnStatKinds::Bytes(value)
277    }
278}
279
280/// Metrics for [PartStats].
281#[derive(Debug)]
282pub struct PartStatsMetrics {
283    pub mismatched_count: IntCounter,
284}
285
286impl PartStatsMetrics {
287    pub fn new(registry: &MetricsRegistry) -> Self {
288        PartStatsMetrics {
289            mismatched_count: registry.register(metric!(
290                name: "mz_persist_pushdown_parts_mismatched_stats_count",
291                help: "number of parts read with unexpectedly the incorrect type of stats",
292            )),
293        }
294    }
295}
296
297/// Aggregate statistics about a column of type `T`.
298pub trait ColumnStats: DynStats {
299    /// Type returned as the stat bounds.
300    type Ref<'a>
301    where
302        Self: 'a;
303
304    /// An inclusive lower bound on the data contained in the column, if known.
305    ///
306    /// This will often be a tight bound, but it's not guaranteed. Persist
307    /// reserves the right to (for example) invent smaller bounds for long byte
308    /// strings. SUBTLE: This means that this exact value may not be present in
309    /// the column.
310    ///
311    /// Similarly, if the column is empty, this will contain `T: Default`.
312    /// Emptiness will be indicated in statistics higher up (i.e.
313    /// [StructStats]).
314    fn lower<'a>(&'a self) -> Option<Self::Ref<'a>>;
315    /// Same as [Self::lower] but an (also inclusive) upper bound.
316    fn upper<'a>(&'a self) -> Option<Self::Ref<'a>>;
317    /// The number of `None`s if this column is optional or 0 if it isn't.
318    fn none_count(&self) -> usize;
319}
320
321/// A type that can incrementally collect stats from a sequence of values.
322pub trait ColumnarStatsBuilder<T>: Debug + DynStats {
323    /// Type of [`arrow`] column these statistics can be derived from.
324    type ArrowColumn: arrow::array::Array + 'static;
325
326    /// Derive statistics from a column of data.
327    fn from_column(col: &Self::ArrowColumn) -> Self
328    where
329        Self: Sized;
330}
331
332/// Type that can be used to represent some [`ColumnStats`].
333///
334/// This is a separate trait than [`ColumnStats`] because its implementations
335/// generally don't care about what kind of stats they contain, whereas
336/// [`ColumnStats`] is generic over the inner type of statistics.
337pub trait DynStats: Debug + Send + Sync + 'static {
338    /// Returns the name of the erased type for use in error messages.
339    fn type_name(&self) -> &'static str {
340        std::any::type_name::<Self>()
341    }
342
343    /// Formats these statistics for use in `INSPECT SHARD` and debugging.
344    fn debug_json(&self) -> serde_json::Value;
345
346    /// Return `self` as [`ColumnarStats`].
347    fn into_columnar_stats(self) -> ColumnarStats;
348}
349
350/// Trim, possibly in a lossy way, statistics to reduce the serialization costs.
351pub trait TrimStats: Message {
352    /// Attempts to reduce the serialization costs of these stats.
353    ///
354    /// This is lossy (might increase the false positive rate) and so should
355    /// be avoided if the full fidelity stats are within an acceptable cost
356    /// threshold.
357    fn trim(&mut self);
358}
359
360/// Aggregate statistics about data contained in a [Part].
361#[derive(Arbitrary, Debug)]
362pub struct PartStats {
363    /// Aggregate statistics about key data contained in a [Part].
364    pub key: StructStats,
365}
366
367impl serde::Serialize for PartStats {
368    fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
369        let PartStats { key } = self;
370        key.serialize(s)
371    }
372}
373
374impl PartStats {
375    /// Calculates and returns stats for the given [`Part`].
376    pub fn new<T, K>(part: &Part, desc: &K) -> Result<Self, anyhow::Error>
377    where
378        K: Schema<T, Statistics = StructStats>,
379    {
380        let decoder = K::decoder_any(desc, &part.key).context("decoder_any")?;
381        let stats = decoder.stats();
382        Ok(PartStats { key: stats })
383    }
384}
385
386/// Statistics about a column of some optional type.
387pub struct OptionStats<T> {
388    /// Statistics about the `Some` values in the column.
389    pub some: T,
390    /// The count of `None` values in the column.
391    pub none: usize,
392}
393
394impl<T: DynStats> Debug for OptionStats<T> {
395    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396        Debug::fmt(&self.debug_json(), f)
397    }
398}
399
400impl<T: DynStats> DynStats for OptionStats<T> {
401    fn debug_json(&self) -> serde_json::Value {
402        match self.some.debug_json() {
403            serde_json::Value::Object(mut x) => {
404                if self.none > 0 {
405                    x.insert("nulls".to_owned(), self.none.into());
406                }
407                serde_json::Value::Object(x)
408            }
409            s => {
410                serde_json::json!({"nulls": self.none, "not nulls": s})
411            }
412        }
413    }
414
415    fn into_columnar_stats(self) -> ColumnarStats {
416        let inner = self.some.into_columnar_stats();
417        assert_none!(inner.nulls, "we don't support nested OptionStats");
418
419        ColumnarStats {
420            nulls: Some(ColumnNullStats { count: self.none }),
421            values: inner.values,
422        }
423    }
424}
425
426/// Empty set of statistics.
427#[derive(Debug)]
428#[cfg_attr(any(test), derive(Clone))]
429pub struct NoneStats;
430
431impl DynStats for NoneStats {
432    fn debug_json(&self) -> serde_json::Value {
433        serde_json::Value::String(format!("{self:?}"))
434    }
435
436    fn into_columnar_stats(self) -> ColumnarStats {
437        ColumnarStats {
438            nulls: None,
439            values: ColumnStatKinds::None,
440        }
441    }
442}
443
444impl ColumnStats for NoneStats {
445    type Ref<'a> = ();
446
447    fn lower<'a>(&'a self) -> Option<Self::Ref<'a>> {
448        None
449    }
450
451    fn upper<'a>(&'a self) -> Option<Self::Ref<'a>> {
452        None
453    }
454
455    fn none_count(&self) -> usize {
456        0
457    }
458}
459
460impl ColumnStats for OptionStats<NoneStats> {
461    type Ref<'a> = Option<()>;
462
463    fn lower<'a>(&'a self) -> Option<Self::Ref<'a>> {
464        None
465    }
466
467    fn upper<'a>(&'a self) -> Option<Self::Ref<'a>> {
468        None
469    }
470
471    fn none_count(&self) -> usize {
472        self.none
473    }
474}
475
476impl RustType<()> for NoneStats {
477    fn into_proto(&self) -> () {
478        ()
479    }
480
481    fn from_proto(_proto: ()) -> Result<Self, TryFromProtoError> {
482        Ok(NoneStats)
483    }
484}
485
486/// We collect stats for all primitive types in exactly the same way. This
487/// macro de-duplicates some of that logic.
488///
489/// Note: If at any point someone finds this macro too complex, they should
490/// feel free to refactor it away!
491macro_rules! primitive_stats {
492    ($native:ty, $arrow_col:ty, $min_fn:path, $max_fn:path) => {
493        impl ColumnarStatsBuilder<$native> for PrimitiveStats<$native> {
494            type ArrowColumn = $arrow_col;
495
496            fn from_column(col: &Self::ArrowColumn) -> Self
497            where
498                Self: Sized,
499            {
500                let lower = $min_fn(col).unwrap_or_default();
501                let upper = $max_fn(col).unwrap_or_default();
502
503                PrimitiveStats { lower, upper }
504            }
505        }
506    };
507}
508
509primitive_stats!(
510    bool,
511    BooleanArray,
512    arrow::compute::min_boolean,
513    arrow::compute::max_boolean
514);
515primitive_stats!(u8, UInt8Array, arrow::compute::min, arrow::compute::max);
516primitive_stats!(u16, UInt16Array, arrow::compute::min, arrow::compute::max);
517primitive_stats!(u32, UInt32Array, arrow::compute::min, arrow::compute::max);
518primitive_stats!(u64, UInt64Array, arrow::compute::min, arrow::compute::max);
519primitive_stats!(i8, Int8Array, arrow::compute::min, arrow::compute::max);
520primitive_stats!(i16, Int16Array, arrow::compute::min, arrow::compute::max);
521primitive_stats!(i32, Int32Array, arrow::compute::min, arrow::compute::max);
522primitive_stats!(i64, Int64Array, arrow::compute::min, arrow::compute::max);
523primitive_stats!(f32, Float32Array, arrow::compute::min, arrow::compute::max);
524primitive_stats!(f64, Float64Array, arrow::compute::min, arrow::compute::max);
525
526impl ColumnarStatsBuilder<&str> for PrimitiveStats<String> {
527    type ArrowColumn = StringArray;
528    fn from_column(col: &Self::ArrowColumn) -> Self
529    where
530        Self: Sized,
531    {
532        let lower = arrow::compute::min_string(col).unwrap_or_default();
533        let lower = truncate_string(lower, TRUNCATE_LEN, TruncateBound::Lower)
534            .expect("lower bound should always truncate");
535        let upper = arrow::compute::max_string(col).unwrap_or_default();
536        let upper = truncate_string(upper, TRUNCATE_LEN, TruncateBound::Upper)
537            // NB: The cost+trim stuff will remove the column entirely if
538            // it's still too big (also this should be extremely rare in
539            // practice).
540            .unwrap_or_else(|| upper.to_owned());
541
542        PrimitiveStats { lower, upper }
543    }
544}
545
546impl ColumnarStatsBuilder<&[u8]> for PrimitiveStats<Vec<u8>> {
547    type ArrowColumn = BinaryArray;
548    fn from_column(col: &Self::ArrowColumn) -> Self
549    where
550        Self: Sized,
551    {
552        let lower = arrow::compute::min_binary(col).unwrap_or_default();
553        let lower = truncate_bytes(lower, TRUNCATE_LEN, TruncateBound::Lower)
554            .expect("lower bound should always truncate");
555        let upper = arrow::compute::max_binary(col).unwrap_or_default();
556        let upper = truncate_bytes(upper, TRUNCATE_LEN, TruncateBound::Upper)
557            // NB: The cost+trim stuff will remove the column entirely if
558            // it's still too big (also this should be extremely rare in
559            // practice).
560            .unwrap_or_else(|| upper.to_owned());
561
562        PrimitiveStats { lower, upper }
563    }
564}
565
566/// Trims the included column status until they fit within a budget.
567///
568/// This might remove stats for a column entirely, unless `force_keep_col`
569/// returns true for that column. The resulting StructStats object is
570/// guaranteed to fit within the passed budget, except when the columns that
571/// are force-kept are collectively larger than the budget.
572///
573/// The number of bytes trimmed is returned.
574pub fn trim_to_budget(
575    stats: &mut ProtoStructStats,
576    budget: usize,
577    force_keep_col: impl Fn(&str) -> bool,
578) -> usize {
579    // No trimming necessary should be the overwhelming common case in practice.
580    let original_cost = stats.encoded_len();
581    if original_cost <= budget {
582        return 0;
583    }
584
585    // First try any lossy trimming that doesn't lose an entire column.
586    stats.trim();
587    let new_cost = stats.encoded_len();
588    if new_cost <= budget {
589        return original_cost.saturating_sub(new_cost);
590    }
591
592    // That wasn't enough. Try recursively trimming out entire cols.
593    //
594    // TODO: There are certainly more elegant things we could do here. One idea
595    // would be to call `trim_to_budget_struct` but with a closure for
596    // force_keep_col that always returns false. That would potentially at least
597    // keep _something_. Another possibility would be to replace this whole bit
598    // with some sort of recursive max-cost search with force_keep_col things
599    // weighted after everything else.
600    let mut budget_shortfall = new_cost.saturating_sub(budget);
601    trim_to_budget_struct(stats, &mut budget_shortfall, &force_keep_col);
602    original_cost.saturating_sub(stats.encoded_len())
603}
604
605/// Recursively trims cols in the struct, greatest-size first, keeping force
606/// kept cols and ancestors of force kept cols.
607fn trim_to_budget_struct(
608    stats: &mut ProtoStructStats,
609    budget_shortfall: &mut usize,
610    force_keep_col: &impl Fn(&str) -> bool,
611) {
612    // Sort the columns in order of ascending size and keep however many fit
613    // within the budget. This strategy both keeps the largest total number of
614    // columns and also optimizes for the sort of columns we expect to need
615    // stats in practice (timestamps are numbers or small strings).
616    //
617    // Note: even though we sort in ascending order, we use `.pop()` to iterate
618    // over the elements, which takes from the back of the Vec.
619    let mut col_costs: Vec<_> = stats
620        .cols
621        .iter()
622        .map(|(name, stats)| (name.to_owned(), stats.encoded_len()))
623        .collect();
624    col_costs.sort_unstable_by_key(|(_, c)| *c);
625
626    while *budget_shortfall > 0 {
627        let Some((name, cost)) = col_costs.pop() else {
628            break;
629        };
630
631        if force_keep_col(&name) {
632            continue;
633        }
634
635        // Otherwise, if the col is a struct, recurse into it.
636        //
637        // TODO: Do this same recursion for json stats.
638        let col_stats = stats.cols.get_mut(&name).expect("col exists");
639        match &mut col_stats.kind {
640            Some(proto_dyn_stats::Kind::Struct(col_struct)) => {
641                trim_to_budget_struct(col_struct, budget_shortfall, force_keep_col);
642                // This recursion might have gotten us under budget.
643                if *budget_shortfall == 0 {
644                    break;
645                }
646                // Otherwise, if any columns are left, they must have been force
647                // kept, which means we need to force keep this struct as well.
648                if !col_struct.cols.is_empty() {
649                    continue;
650                }
651                // We have to recompute the cost because trim_to_budget_struct might
652                // have already accounted for some of the shortfall.
653                *budget_shortfall = budget_shortfall.saturating_sub(col_struct.encoded_len() + 1);
654                stats.cols.remove(&name);
655            }
656            Some(proto_dyn_stats::Kind::Bytes(ProtoBytesStats {
657                kind:
658                    Some(proto_bytes_stats::Kind::Json(ProtoJsonStats {
659                        kind: Some(proto_json_stats::Kind::Maps(col_jsonb)),
660                    })),
661            })) => {
662                trim_to_budget_jsonb(col_jsonb, budget_shortfall, force_keep_col);
663                // This recursion might have gotten us under budget.
664                if *budget_shortfall == 0 {
665                    break;
666                }
667                // Otherwise, if any columns are left, they must have been force
668                // kept, which means we need to force keep this struct as well.
669                if !col_jsonb.elements.is_empty() {
670                    continue;
671                }
672                // We have to recompute the cost because trim_to_budget_jsonb might
673                // have already accounted for some of the shortfall.
674                *budget_shortfall = budget_shortfall.saturating_sub(col_jsonb.encoded_len() + 1);
675                stats.cols.remove(&name);
676            }
677            _ => {
678                stats.cols.remove(&name);
679                // Each field costs at least the cost of serializing the value
680                // and a byte for the tag. (Though a tag may be more than one
681                // byte in extreme cases.)
682                *budget_shortfall = budget_shortfall.saturating_sub(cost + 1);
683            }
684        }
685    }
686}
687
688fn trim_to_budget_jsonb(
689    stats: &mut ProtoJsonMapStats,
690    budget_shortfall: &mut usize,
691    force_keep_col: &impl Fn(&str) -> bool,
692) {
693    // Sort the columns in order of ascending size and keep however many fit
694    // within the budget. This strategy both keeps the largest total number of
695    // columns and also optimizes for the sort of columns we expect to need
696    // stats in practice (timestamps are numbers or small strings).
697    //
698    // Note: even though we sort in ascending order, we use `.pop()` to iterate
699    // over the elements, which takes from the back of the Vec.
700    stats
701        .elements
702        .sort_unstable_by_key(|element| element.encoded_len());
703
704    // Our strategy is to pop of stats until there are no more, or we're under
705    // budget. As we trim anything we want to keep, e.g. with force_keep_col,
706    // we stash it here, and later re-append.
707    let mut stats_to_keep = Vec::with_capacity(stats.elements.len());
708
709    while *budget_shortfall > 0 {
710        let Some(mut column) = stats.elements.pop() else {
711            break;
712        };
713
714        // We're force keeping this column.
715        if force_keep_col(&column.name) {
716            stats_to_keep.push(column);
717            continue;
718        }
719
720        // If the col is another JSON map, recurse into it and trim its stats.
721        if let Some(ProtoJsonStats {
722            kind: Some(proto_json_stats::Kind::Maps(ref mut col_jsonb)),
723        }) = column.stats
724        {
725            trim_to_budget_jsonb(col_jsonb, budget_shortfall, force_keep_col);
726
727            // We still have some columns left after trimming, so we want to keep these stats.
728            if !col_jsonb.elements.is_empty() {
729                stats_to_keep.push(column);
730            }
731
732            // We've trimmed enough, so we can stop recursing!
733            if *budget_shortfall == 0 {
734                break;
735            }
736        } else {
737            // Each field costs at least the cost of serializing the value
738            // and a byte for the tag. (Though a tag may be more than one
739            // byte in extreme cases.)
740            *budget_shortfall = budget_shortfall.saturating_sub(column.encoded_len() + 1);
741        }
742    }
743
744    // Re-add all of the stats we want to keep.
745    stats.elements.extend(stats_to_keep);
746}
747
748impl RustType<ProtoDynStats> for ColumnarStats {
749    fn into_proto(&self) -> ProtoDynStats {
750        let option = self.nulls.as_ref().map(|n| n.into_proto());
751        let kind = RustType::into_proto(&self.values);
752
753        ProtoDynStats {
754            option,
755            kind: Some(kind),
756        }
757    }
758
759    fn from_proto(proto: ProtoDynStats) -> Result<Self, TryFromProtoError> {
760        let kind = proto
761            .kind
762            .ok_or_else(|| TryFromProtoError::missing_field("ProtoDynStats::kind"))?;
763
764        Ok(ColumnarStats {
765            nulls: proto.option.into_rust()?,
766            values: kind.into_rust()?,
767        })
768    }
769}
770
771/// Returns a [`Strategy`] that generates arbitrary [`ColumnarStats`].
772pub(crate) fn any_columnar_stats() -> impl Strategy<Value = ColumnarStats> {
773    let leaf = Union::new(vec![
774        any_primitive_stats::<bool>()
775            .prop_map(|s| ColumnStatKinds::Primitive(s.into()))
776            .boxed(),
777        any_primitive_stats::<i64>()
778            .prop_map(|s| ColumnStatKinds::Primitive(s.into()))
779            .boxed(),
780        any_primitive_stats::<String>()
781            .prop_map(|s| ColumnStatKinds::Primitive(s.into()))
782            .boxed(),
783        any_bytes_stats().prop_map(ColumnStatKinds::Bytes).boxed(),
784    ])
785    .prop_map(|values| ColumnarStats {
786        nulls: None,
787        values,
788    });
789
790    leaf.prop_recursive(2, 10, 3, |inner| {
791        (
792            any::<usize>(),
793            proptest::collection::btree_map(any::<String>(), inner, 0..3),
794        )
795            .prop_map(|(len, cols)| {
796                let values = ColumnStatKinds::Struct(StructStats { len, cols });
797                ColumnarStats {
798                    nulls: None,
799                    values,
800                }
801            })
802    })
803}