mz_storage_types/
stats.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits that connect up our mz-repr types with the stats that persist maintains.
11
12use mz_expr::{ColumnSpecs, Interpreter, MapFilterProject, ResultSpec, UnmaterializableFunc};
13use mz_persist_types::stats::{
14    BytesStats, ColumnStatKinds, JsonStats, PartStats, PartStatsMetrics,
15};
16use mz_repr::{ColumnIndex, ColumnType, Datum, RelationDesc, RowArena, ScalarType};
17
18/// Bundles together a relation desc with the stats for a specific part, and translates between
19/// Persist's stats representation and the `ResultSpec`s that are used for eg. filter pushdown.
20#[derive(Debug)]
21pub struct RelationPartStats<'a> {
22    pub(crate) name: &'a str,
23    pub(crate) metrics: &'a PartStatsMetrics,
24    pub(crate) desc: &'a RelationDesc,
25    pub(crate) stats: &'a PartStats,
26}
27
28impl<'a> RelationPartStats<'a> {
29    pub fn new(
30        name: &'a str,
31        metrics: &'a PartStatsMetrics,
32        desc: &'a RelationDesc,
33        stats: &'a PartStats,
34    ) -> Self {
35        Self {
36            name,
37            metrics,
38            desc,
39            stats,
40        }
41    }
42}
43
44impl RelationPartStats<'_> {
45    pub fn may_match_mfp<'a>(&'a self, time_range: ResultSpec<'a>, mfp: &MapFilterProject) -> bool {
46        let arena = RowArena::new();
47        let mut ranges = ColumnSpecs::new(self.desc.typ(), &arena);
48        ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
49
50        if self.err_count().into_iter().any(|count| count > 0) {
51            // If the error collection is nonempty, we always keep the part.
52            return true;
53        }
54
55        for (pos, (idx, _name, _typ)) in self.desc.iter_all().enumerate() {
56            let result_spec = self.col_stats(idx, &arena);
57            ranges.push_column(pos, result_spec);
58        }
59        let result = ranges.mfp_filter(mfp).range;
60        result.may_contain(Datum::True) || result.may_fail()
61    }
62
63    fn json_spec<'a>(len: usize, stats: &'a JsonStats, arena: &'a RowArena) -> ResultSpec<'a> {
64        match stats {
65            JsonStats::JsonNulls => ResultSpec::value(Datum::JsonNull),
66            JsonStats::Bools(bools) => {
67                ResultSpec::value_between(bools.lower.into(), bools.upper.into())
68            }
69            JsonStats::Strings(strings) => ResultSpec::value_between(
70                Datum::String(strings.lower.as_str()),
71                Datum::String(strings.upper.as_str()),
72            ),
73            JsonStats::Numerics(numerics) => {
74                match mz_repr::stats::decode_numeric(numerics, arena) {
75                    Ok((lower, upper)) => ResultSpec::value_between(lower, upper),
76                    Err(err) => {
77                        tracing::error!(%err, "failed to decode Json Numeric stats!");
78                        ResultSpec::anything()
79                    }
80                }
81            }
82            JsonStats::Maps(maps) => {
83                ResultSpec::map_spec(
84                    maps.into_iter()
85                        .map(|(k, v)| {
86                            let mut v_spec = Self::json_spec(v.len, &v.stats, arena);
87                            if v.len != len {
88                                // This field is not always present, so assume
89                                // that accessing it might be null.
90                                v_spec = v_spec.union(ResultSpec::null());
91                            }
92                            let key = arena.make_datum(|r| r.push(Datum::String(k.as_str())));
93                            (key, v_spec)
94                        })
95                        .collect(),
96                )
97            }
98            JsonStats::None => ResultSpec::nothing(),
99            JsonStats::Lists | JsonStats::Mixed => ResultSpec::anything(),
100        }
101    }
102
103    pub fn col_stats<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> ResultSpec<'a> {
104        let value_range = match self.col_values(idx, arena) {
105            Some(spec) => spec,
106            None => ResultSpec::anything(),
107        };
108        let json_range = self.col_json(idx, arena).unwrap_or(ResultSpec::anything());
109
110        // If this is not a JSON column or we don't have JSON stats, json_range is
111        // [ResultSpec::anything] and this is a noop.
112        value_range.intersect(json_range)
113    }
114
115    fn col_json<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option<ResultSpec<'a>> {
116        let name = self.desc.get_name_idx(idx);
117        let typ = &self.desc.get_type(idx);
118
119        let ok_stats = self.stats.key.col("ok")?;
120        let ok_stats = ok_stats
121            .try_as_optional_struct()
122            .expect("ok column should be nullable struct");
123        let col_stats = ok_stats.some.cols.get(name.as_str())?;
124
125        if let ColumnType {
126            scalar_type: ScalarType::Jsonb,
127            nullable,
128        } = typ
129        {
130            let value_range = match &col_stats.values {
131                ColumnStatKinds::Bytes(BytesStats::Json(json_stats)) => {
132                    Self::json_spec(ok_stats.some.len, json_stats, arena)
133                }
134                ColumnStatKinds::Bytes(
135                    BytesStats::Primitive(_) | BytesStats::Atomic(_) | BytesStats::FixedSize(_),
136                ) => ResultSpec::anything(),
137                other => {
138                    self.metrics.mismatched_count.inc();
139                    tracing::error!(
140                        "expected BytesStats for JSON column {}, found {other:?}",
141                        self.name
142                    );
143                    return None;
144                }
145            };
146            let null_range = match (nullable, col_stats.nulls) {
147                (false, None) => ResultSpec::nothing(),
148                (true, Some(nulls)) if nulls.count == 0 => ResultSpec::nothing(),
149                (true, Some(_)) => ResultSpec::null(),
150                (col_null, stats_null) => {
151                    self.metrics.mismatched_count.inc();
152                    tracing::error!(
153                        "JSON column nullability mismatch, col {} null: {col_null}, stats: {stats_null:?}",
154                        self.name
155                    );
156                    return None;
157                }
158            };
159
160            Some(null_range.union(value_range))
161        } else {
162            None
163        }
164    }
165
166    pub fn len(&self) -> Option<usize> {
167        Some(self.stats.key.len)
168    }
169
170    pub fn ok_count(&self) -> Option<usize> {
171        // The number of OKs is the number of rows whose error is None.
172        let stats = self
173            .stats
174            .key
175            .col("err")?
176            .try_as_optional_bytes()
177            .expect("err column should be a Option<Vec<u8>>");
178        Some(stats.none)
179    }
180
181    pub fn err_count(&self) -> Option<usize> {
182        // Counter-intuitive: We can easily calculate the number of errors that
183        // were None from the column stats, but not how many were Some. So, what
184        // we do is count the number of Nones, which is the number of Oks, and
185        // then subtract that from the total.
186        let num_results = self.stats.key.len;
187        let num_oks = self.ok_count();
188        num_oks.map(|num_oks| num_results - num_oks)
189    }
190
191    fn col_values<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option<ResultSpec<'a>> {
192        let name = self.desc.get_name_idx(idx);
193        let typ = self.desc.get_type(idx);
194
195        let ok_stats = self.stats.key.cols.get("ok")?;
196        let ColumnStatKinds::Struct(ok_stats) = &ok_stats.values else {
197            panic!("'ok' column stats should be a struct")
198        };
199        let col_stats = ok_stats.cols.get(name.as_str())?;
200
201        let min_max = mz_repr::stats::col_values(&typ.scalar_type, &col_stats.values, arena);
202        let null_count = col_stats.nulls.as_ref().map_or(0, |nulls| nulls.count);
203        let total_count = self.len();
204
205        let values = match (total_count, min_max) {
206            (Some(total_count), _) if total_count == null_count => ResultSpec::nothing(),
207            (_, Some((min, max))) => ResultSpec::value_between(min, max),
208            _ => ResultSpec::value_all(),
209        };
210        let nulls = if null_count > 0 {
211            ResultSpec::null()
212        } else {
213            ResultSpec::nothing()
214        };
215
216        Some(values.union(nulls))
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use arrow::array::AsArray;
223    use mz_ore::metrics::MetricsRegistry;
224    use mz_persist_types::codec_impls::UnitSchema;
225    use mz_persist_types::columnar::{ColumnDecoder, Schema};
226    use mz_persist_types::part::PartBuilder;
227    use mz_persist_types::stats::PartStats;
228    use mz_repr::{ColumnType, Datum, RelationDesc, Row, RowArena, ScalarType};
229    use mz_repr::{RelationType, arb_datum_for_column};
230    use proptest::prelude::*;
231    use proptest::strategy::ValueTree;
232
233    use super::*;
234    use crate::sources::SourceData;
235
236    fn validate_stats(column_type: &ColumnType, datums: &[Datum<'_>]) -> Result<(), String> {
237        let schema = RelationDesc::builder()
238            .with_column("col", column_type.clone())
239            .finish();
240
241        let mut builder = PartBuilder::new(&schema, &UnitSchema);
242        let mut row = SourceData(Ok(Row::default()));
243        for datum in datums {
244            row.as_mut().unwrap().packer().push(datum);
245            builder.push(&row, &(), 1u64, 1i64);
246        }
247        let part = builder.finish();
248
249        let key_col = part.key.as_struct();
250        let decoder = <RelationDesc as Schema<SourceData>>::decoder(&schema, key_col.clone())
251            .expect("success");
252        let key_stats = decoder.stats();
253
254        let metrics = PartStatsMetrics::new(&MetricsRegistry::new());
255        let stats = RelationPartStats {
256            name: "test",
257            metrics: &metrics,
258            stats: &PartStats { key: key_stats },
259            desc: &schema,
260        };
261        let arena = RowArena::default();
262
263        // Validate that the stats would include all of the provided datums.
264        for datum in datums {
265            let spec = stats.col_stats(&ColumnIndex::from_raw(0), &arena);
266            assert!(spec.may_contain(*datum));
267        }
268
269        Ok(())
270    }
271
272    fn scalar_type_stats_roundtrip(scalar_type: ScalarType) {
273        // Non-nullable version of the column.
274        let column_type = scalar_type.clone().nullable(false);
275        for datum in scalar_type.interesting_datums() {
276            assert_eq!(validate_stats(&column_type, &[datum]), Ok(()));
277        }
278
279        // Nullable version of the column.
280        let column_type = scalar_type.clone().nullable(true);
281        for datum in scalar_type.interesting_datums() {
282            assert_eq!(validate_stats(&column_type, &[datum]), Ok(()));
283        }
284        assert_eq!(validate_stats(&column_type, &[Datum::Null]), Ok(()));
285    }
286
287    #[mz_ore::test]
288    #[cfg_attr(miri, ignore)] // too slow
289    fn all_scalar_types_stats_roundtrip() {
290        proptest!(|(scalar_type in any::<ScalarType>())| {
291            // The proptest! macro interferes with rustfmt.
292            scalar_type_stats_roundtrip(scalar_type)
293        });
294    }
295
296    #[mz_ore::test]
297    #[cfg_attr(miri, ignore)] // too slow
298    fn all_datums_produce_valid_stats() {
299        // A strategy that will return a Vec of Datums for an arbitrary ColumnType.
300        let datums = any::<ColumnType>().prop_flat_map(|ty| {
301            prop::collection::vec(arb_datum_for_column(ty.clone()), 0..128)
302                .prop_map(move |datums| (ty.clone(), datums))
303        });
304
305        proptest!(
306            ProptestConfig::with_cases(80),
307            |((ty, datums) in datums)| {
308                // The proptest! macro interferes with rustfmt.
309                let datums: Vec<_> = datums.iter().map(Datum::from).collect();
310                prop_assert_eq!(validate_stats(&ty, &datums[..]), Ok(()));
311            }
312        )
313    }
314
315    #[mz_ore::test]
316    #[ignore] // TODO(parkmycar): Re-enable this test with a smaller sample size.
317    fn statistics_stability() {
318        /// This is the seed [`proptest`] uses for their deterministic RNG. We
319        /// copy it here to prevent breaking this test if [`proptest`] changes.
320        const RNG_SEED: [u8; 32] = [
321            0xf4, 0x16, 0x16, 0x48, 0xc3, 0xac, 0x77, 0xac, 0x72, 0x20, 0x0b, 0xea, 0x99, 0x67,
322            0x2d, 0x6d, 0xca, 0x9f, 0x76, 0xaf, 0x1b, 0x09, 0x73, 0xa0, 0x59, 0x22, 0x6d, 0xc5,
323            0x46, 0x39, 0x1c, 0x4a,
324        ];
325
326        let rng = proptest::test_runner::TestRng::from_seed(
327            proptest::test_runner::RngAlgorithm::ChaCha,
328            &RNG_SEED,
329        );
330        // Generate a collection of Rows.
331        let config = proptest::test_runner::Config {
332            // We let the loop below drive how much data we generate.
333            cases: u32::MAX,
334            rng_algorithm: proptest::test_runner::RngAlgorithm::ChaCha,
335            ..Default::default()
336        };
337        let mut runner = proptest::test_runner::TestRunner::new_with_rng(config, rng);
338
339        let max_cols = 4;
340        let max_rows = 8;
341        let test_cases = 1000;
342
343        // Note: We don't use the `Arbitrary` impl for `RelationDesc` because
344        // it generates large column names which is not interesting to us.
345        let strat = proptest::collection::vec(any::<ColumnType>(), 1..max_cols)
346            .prop_map(|cols| {
347                let col_names = (0..cols.len()).map(|i| i.to_string());
348                RelationDesc::new(RelationType::new(cols), col_names)
349            })
350            .prop_flat_map(|desc| {
351                let rows = desc
352                    .typ()
353                    .columns()
354                    .iter()
355                    .cloned()
356                    .map(arb_datum_for_column)
357                    .collect::<Vec<_>>()
358                    .prop_map(|datums| Row::pack(datums.iter().map(Datum::from)));
359                proptest::collection::vec(rows, 1..max_rows)
360                    .prop_map(move |rows| (desc.clone(), rows))
361            });
362
363        let mut all_stats = Vec::new();
364        for _ in 0..test_cases {
365            let value_tree = strat.new_tree(&mut runner).unwrap();
366            let (desc, rows) = value_tree.current();
367
368            let mut builder = PartBuilder::new(&desc, &UnitSchema);
369            for row in &rows {
370                builder.push(&SourceData(Ok(row.clone())), &(), 1u64, 1i64);
371            }
372            let part = builder.finish();
373
374            let key_col = part.key.as_struct();
375            let decoder = <RelationDesc as Schema<SourceData>>::decoder(&desc, key_col.clone())
376                .expect("success");
377            let key_stats = decoder.stats();
378
379            all_stats.push(key_stats);
380        }
381
382        insta::assert_json_snapshot!(all_stats);
383    }
384}