mz_storage_types/
stats.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits that connect up our mz-repr types with the stats that persist maintains.
11
12use mz_expr::{ColumnSpecs, Interpreter, MapFilterProject, ResultSpec, UnmaterializableFunc};
13use mz_persist_types::stats::{
14    BytesStats, ColumnStatKinds, JsonStats, PartStats, PartStatsMetrics,
15};
16use mz_repr::{ColumnIndex, ColumnType, Datum, RelationDesc, RowArena, ScalarType};
17
18/// Bundles together a relation desc with the stats for a specific part, and translates between
19/// Persist's stats representation and the `ResultSpec`s that are used for eg. filter pushdown.
20#[derive(Debug)]
21pub struct RelationPartStats<'a> {
22    pub(crate) name: &'a str,
23    pub(crate) metrics: &'a PartStatsMetrics,
24    pub(crate) desc: &'a RelationDesc,
25    pub(crate) stats: &'a PartStats,
26}
27
28impl<'a> RelationPartStats<'a> {
29    pub fn new(
30        name: &'a str,
31        metrics: &'a PartStatsMetrics,
32        desc: &'a RelationDesc,
33        stats: &'a PartStats,
34    ) -> Self {
35        Self {
36            name,
37            metrics,
38            desc,
39            stats,
40        }
41    }
42}
43
44impl RelationPartStats<'_> {
45    pub fn may_match_mfp<'a>(&'a self, time_range: ResultSpec<'a>, mfp: &MapFilterProject) -> bool {
46        let arena = RowArena::new();
47        let mut ranges = ColumnSpecs::new(self.desc.typ(), &arena);
48        ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
49
50        if self.err_count().into_iter().any(|count| count > 0) {
51            // If the error collection is nonempty, we always keep the part.
52            return true;
53        }
54
55        for (pos, (idx, _name, _typ)) in self.desc.iter_all().enumerate() {
56            let result_spec = self.col_stats(idx, &arena);
57            ranges.push_column(pos, result_spec);
58        }
59        let result = ranges.mfp_filter(mfp).range;
60        result.may_contain(Datum::True) || result.may_fail()
61    }
62
63    fn json_spec<'a>(len: usize, stats: &'a JsonStats, arena: &'a RowArena) -> ResultSpec<'a> {
64        match stats {
65            JsonStats::JsonNulls => ResultSpec::value(Datum::JsonNull),
66            JsonStats::Bools(bools) => {
67                ResultSpec::value_between(bools.lower.into(), bools.upper.into())
68            }
69            JsonStats::Strings(strings) => ResultSpec::value_between(
70                Datum::String(strings.lower.as_str()),
71                Datum::String(strings.upper.as_str()),
72            ),
73            JsonStats::Numerics(numerics) => {
74                match mz_repr::stats::decode_numeric(numerics, arena) {
75                    Ok((lower, upper)) => ResultSpec::value_between(lower, upper),
76                    Err(err) => {
77                        tracing::error!(%err, "failed to decode Json Numeric stats!");
78                        ResultSpec::anything()
79                    }
80                }
81            }
82            JsonStats::Maps(maps) => {
83                ResultSpec::map_spec(
84                    maps.into_iter()
85                        .map(|(k, v)| {
86                            let mut v_spec = Self::json_spec(v.len, &v.stats, arena);
87                            if v.len != len {
88                                // This field is not always present, so assume
89                                // that accessing it might be null.
90                                v_spec = v_spec.union(ResultSpec::null());
91                            }
92                            let key = arena.make_datum(|r| r.push(Datum::String(k.as_str())));
93                            (key, v_spec)
94                        })
95                        .collect(),
96                )
97            }
98            JsonStats::None => ResultSpec::nothing(),
99            JsonStats::Lists | JsonStats::Mixed => ResultSpec::anything(),
100        }
101    }
102
103    pub fn col_stats<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> ResultSpec<'a> {
104        let value_range = match self.col_values(idx, arena) {
105            Some(spec) => spec,
106            None => ResultSpec::anything(),
107        };
108        let json_range = self
109            .col_json(idx, arena)
110            .unwrap_or_else(ResultSpec::anything);
111
112        // If this is not a JSON column or we don't have JSON stats, json_range is
113        // [ResultSpec::anything] and this is a noop.
114        value_range.intersect(json_range)
115    }
116
117    fn col_json<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option<ResultSpec<'a>> {
118        let name = self.desc.get_name_idx(idx);
119        let typ = &self.desc.get_type(idx);
120
121        let ok_stats = self.stats.key.col("ok")?;
122        let ok_stats = ok_stats
123            .try_as_optional_struct()
124            .expect("ok column should be nullable struct");
125        let col_stats = ok_stats.some.cols.get(name.as_str())?;
126
127        if let ColumnType {
128            scalar_type: ScalarType::Jsonb,
129            nullable,
130        } = typ
131        {
132            let value_range = match &col_stats.values {
133                ColumnStatKinds::Bytes(BytesStats::Json(json_stats)) => {
134                    Self::json_spec(ok_stats.some.len, json_stats, arena)
135                }
136                ColumnStatKinds::Bytes(
137                    BytesStats::Primitive(_) | BytesStats::Atomic(_) | BytesStats::FixedSize(_),
138                ) => ResultSpec::anything(),
139                other => {
140                    self.metrics.mismatched_count.inc();
141                    tracing::error!(
142                        "expected BytesStats for JSON column {}, found {other:?}",
143                        self.name
144                    );
145                    return None;
146                }
147            };
148            let null_range = match (nullable, col_stats.nulls) {
149                (false, None) => ResultSpec::nothing(),
150                (true, Some(nulls)) if nulls.count == 0 => ResultSpec::nothing(),
151                (true, Some(_)) => ResultSpec::null(),
152                (col_null, stats_null) => {
153                    self.metrics.mismatched_count.inc();
154                    tracing::error!(
155                        "JSON column nullability mismatch, col {} null: {col_null}, stats: {stats_null:?}",
156                        self.name
157                    );
158                    return None;
159                }
160            };
161
162            Some(null_range.union(value_range))
163        } else {
164            None
165        }
166    }
167
168    pub fn len(&self) -> Option<usize> {
169        Some(self.stats.key.len)
170    }
171
172    pub fn ok_count(&self) -> Option<usize> {
173        // The number of OKs is the number of rows whose error is None.
174        let stats = self
175            .stats
176            .key
177            .col("err")?
178            .try_as_optional_bytes()
179            .expect("err column should be a Option<Vec<u8>>");
180        Some(stats.none)
181    }
182
183    pub fn err_count(&self) -> Option<usize> {
184        // Counter-intuitive: We can easily calculate the number of errors that
185        // were None from the column stats, but not how many were Some. So, what
186        // we do is count the number of Nones, which is the number of Oks, and
187        // then subtract that from the total.
188        let num_results = self.stats.key.len;
189        let num_oks = self.ok_count();
190        num_oks.map(|num_oks| num_results - num_oks)
191    }
192
193    fn col_values<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option<ResultSpec<'a>> {
194        let name = self.desc.get_name_idx(idx);
195        let typ = self.desc.get_type(idx);
196
197        let ok_stats = self.stats.key.cols.get("ok")?;
198        let ColumnStatKinds::Struct(ok_stats) = &ok_stats.values else {
199            panic!("'ok' column stats should be a struct")
200        };
201        let col_stats = ok_stats.cols.get(name.as_str())?;
202
203        let min_max = mz_repr::stats::col_values(&typ.scalar_type, &col_stats.values, arena);
204        let null_count = col_stats.nulls.as_ref().map_or(0, |nulls| nulls.count);
205        let total_count = self.len();
206
207        let values = match (total_count, min_max) {
208            (Some(total_count), _) if total_count == null_count => ResultSpec::nothing(),
209            (_, Some((min, max))) => ResultSpec::value_between(min, max),
210            _ => ResultSpec::value_all(),
211        };
212        let nulls = if null_count > 0 {
213            ResultSpec::null()
214        } else {
215            ResultSpec::nothing()
216        };
217
218        Some(values.union(nulls))
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use arrow::array::AsArray;
225    use mz_ore::metrics::MetricsRegistry;
226    use mz_persist_types::codec_impls::UnitSchema;
227    use mz_persist_types::columnar::{ColumnDecoder, Schema};
228    use mz_persist_types::part::PartBuilder;
229    use mz_persist_types::stats::PartStats;
230    use mz_repr::{ColumnType, Datum, RelationDesc, Row, RowArena, ScalarType};
231    use mz_repr::{RelationType, arb_datum_for_column};
232    use proptest::prelude::*;
233    use proptest::strategy::ValueTree;
234
235    use super::*;
236    use crate::sources::SourceData;
237
238    fn validate_stats(column_type: &ColumnType, datums: &[Datum<'_>]) -> Result<(), String> {
239        let schema = RelationDesc::builder()
240            .with_column("col", column_type.clone())
241            .finish();
242
243        let mut builder = PartBuilder::new(&schema, &UnitSchema);
244        let mut row = SourceData(Ok(Row::default()));
245        for datum in datums {
246            row.as_mut().unwrap().packer().push(datum);
247            builder.push(&row, &(), 1u64, 1i64);
248        }
249        let part = builder.finish();
250
251        let key_col = part.key.as_struct();
252        let decoder = <RelationDesc as Schema<SourceData>>::decoder(&schema, key_col.clone())
253            .expect("success");
254        let key_stats = decoder.stats();
255
256        let metrics = PartStatsMetrics::new(&MetricsRegistry::new());
257        let stats = RelationPartStats {
258            name: "test",
259            metrics: &metrics,
260            stats: &PartStats { key: key_stats },
261            desc: &schema,
262        };
263        let arena = RowArena::default();
264
265        // Validate that the stats would include all of the provided datums.
266        for datum in datums {
267            let spec = stats.col_stats(&ColumnIndex::from_raw(0), &arena);
268            assert!(spec.may_contain(*datum));
269        }
270
271        Ok(())
272    }
273
274    fn scalar_type_stats_roundtrip(scalar_type: ScalarType) {
275        // Non-nullable version of the column.
276        let column_type = scalar_type.clone().nullable(false);
277        for datum in scalar_type.interesting_datums() {
278            assert_eq!(validate_stats(&column_type, &[datum]), Ok(()));
279        }
280
281        // Nullable version of the column.
282        let column_type = scalar_type.clone().nullable(true);
283        for datum in scalar_type.interesting_datums() {
284            assert_eq!(validate_stats(&column_type, &[datum]), Ok(()));
285        }
286        assert_eq!(validate_stats(&column_type, &[Datum::Null]), Ok(()));
287    }
288
289    #[mz_ore::test]
290    #[cfg_attr(miri, ignore)] // too slow
291    fn all_scalar_types_stats_roundtrip() {
292        proptest!(|(scalar_type in any::<ScalarType>())| {
293            // The proptest! macro interferes with rustfmt.
294            scalar_type_stats_roundtrip(scalar_type)
295        });
296    }
297
298    #[mz_ore::test]
299    #[cfg_attr(miri, ignore)] // too slow
300    fn all_datums_produce_valid_stats() {
301        // A strategy that will return a Vec of Datums for an arbitrary ColumnType.
302        let datums = any::<ColumnType>().prop_flat_map(|ty| {
303            prop::collection::vec(arb_datum_for_column(ty.clone()), 0..128)
304                .prop_map(move |datums| (ty.clone(), datums))
305        });
306
307        proptest!(
308            ProptestConfig::with_cases(80),
309            |((ty, datums) in datums)| {
310                // The proptest! macro interferes with rustfmt.
311                let datums: Vec<_> = datums.iter().map(Datum::from).collect();
312                prop_assert_eq!(validate_stats(&ty, &datums[..]), Ok(()));
313            }
314        )
315    }
316
317    #[mz_ore::test]
318    #[ignore] // TODO(parkmycar): Re-enable this test with a smaller sample size.
319    fn statistics_stability() {
320        /// This is the seed [`proptest`] uses for their deterministic RNG. We
321        /// copy it here to prevent breaking this test if [`proptest`] changes.
322        const RNG_SEED: [u8; 32] = [
323            0xf4, 0x16, 0x16, 0x48, 0xc3, 0xac, 0x77, 0xac, 0x72, 0x20, 0x0b, 0xea, 0x99, 0x67,
324            0x2d, 0x6d, 0xca, 0x9f, 0x76, 0xaf, 0x1b, 0x09, 0x73, 0xa0, 0x59, 0x22, 0x6d, 0xc5,
325            0x46, 0x39, 0x1c, 0x4a,
326        ];
327
328        let rng = proptest::test_runner::TestRng::from_seed(
329            proptest::test_runner::RngAlgorithm::ChaCha,
330            &RNG_SEED,
331        );
332        // Generate a collection of Rows.
333        let config = proptest::test_runner::Config {
334            // We let the loop below drive how much data we generate.
335            cases: u32::MAX,
336            rng_algorithm: proptest::test_runner::RngAlgorithm::ChaCha,
337            ..Default::default()
338        };
339        let mut runner = proptest::test_runner::TestRunner::new_with_rng(config, rng);
340
341        let max_cols = 4;
342        let max_rows = 8;
343        let test_cases = 1000;
344
345        // Note: We don't use the `Arbitrary` impl for `RelationDesc` because
346        // it generates large column names which is not interesting to us.
347        let strat = proptest::collection::vec(any::<ColumnType>(), 1..max_cols)
348            .prop_map(|cols| {
349                let col_names = (0..cols.len()).map(|i| i.to_string());
350                RelationDesc::new(RelationType::new(cols), col_names)
351            })
352            .prop_flat_map(|desc| {
353                let rows = desc
354                    .typ()
355                    .columns()
356                    .iter()
357                    .cloned()
358                    .map(arb_datum_for_column)
359                    .collect::<Vec<_>>()
360                    .prop_map(|datums| Row::pack(datums.iter().map(Datum::from)));
361                proptest::collection::vec(rows, 1..max_rows)
362                    .prop_map(move |rows| (desc.clone(), rows))
363            });
364
365        let mut all_stats = Vec::new();
366        for _ in 0..test_cases {
367            let value_tree = strat.new_tree(&mut runner).unwrap();
368            let (desc, rows) = value_tree.current();
369
370            let mut builder = PartBuilder::new(&desc, &UnitSchema);
371            for row in &rows {
372                builder.push(&SourceData(Ok(row.clone())), &(), 1u64, 1i64);
373            }
374            let part = builder.finish();
375
376            let key_col = part.key.as_struct();
377            let decoder = <RelationDesc as Schema<SourceData>>::decoder(&desc, key_col.clone())
378                .expect("success");
379            let key_stats = decoder.stats();
380
381            all_stats.push(key_stats);
382        }
383
384        insta::assert_json_snapshot!(all_stats);
385    }
386}