Skip to main content

mz_storage_types/
stats.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits that connect up our mz-repr types with the stats that persist maintains.
11
12use mz_expr::{ColumnSpecs, Interpreter, MapFilterProject, ResultSpec, UnmaterializableFunc};
13use mz_persist_types::stats::{
14    BytesStats, ColumnStatKinds, JsonStats, PartStats, PartStatsMetrics,
15};
16use mz_repr::{
17    ColumnIndex, Datum, RelationDesc, ReprRelationType, RowArena, SqlColumnType, SqlScalarType,
18};
19
20/// Bundles together a relation desc with the stats for a specific part, and translates between
21/// Persist's stats representation and the `ResultSpec`s that are used for eg. filter pushdown.
22#[derive(Debug)]
23pub struct RelationPartStats<'a> {
24    pub(crate) name: &'a str,
25    pub(crate) metrics: &'a PartStatsMetrics,
26    pub(crate) desc: &'a RelationDesc,
27    pub(crate) stats: &'a PartStats,
28}
29
30impl<'a> RelationPartStats<'a> {
31    pub fn new(
32        name: &'a str,
33        metrics: &'a PartStatsMetrics,
34        desc: &'a RelationDesc,
35        stats: &'a PartStats,
36    ) -> Self {
37        Self {
38            name,
39            metrics,
40            desc,
41            stats,
42        }
43    }
44}
45
46impl RelationPartStats<'_> {
47    pub fn may_match_mfp<'a>(&'a self, time_range: ResultSpec<'a>, mfp: &MapFilterProject) -> bool {
48        let arena = RowArena::new();
49        let relation = ReprRelationType::from(self.desc.typ());
50        let mut ranges = ColumnSpecs::new(&relation, &arena);
51        ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
52
53        if self.err_count().into_iter().any(|count| count > 0) {
54            // If the error collection is nonempty, we always keep the part.
55            return true;
56        }
57
58        for (pos, (idx, _name, _typ)) in self.desc.iter_all().enumerate() {
59            let result_spec = self.col_stats(idx, &arena);
60            ranges.push_column(pos, result_spec);
61        }
62        let result = ranges.mfp_filter(mfp).range;
63        result.may_contain(Datum::True) || result.may_fail()
64    }
65
66    fn json_spec<'a>(len: usize, stats: &'a JsonStats, arena: &'a RowArena) -> ResultSpec<'a> {
67        match stats {
68            JsonStats::JsonNulls => ResultSpec::value(Datum::JsonNull),
69            JsonStats::Bools(bools) => {
70                ResultSpec::value_between(bools.lower.into(), bools.upper.into())
71            }
72            JsonStats::Strings(strings) => ResultSpec::value_between(
73                Datum::String(strings.lower.as_str()),
74                Datum::String(strings.upper.as_str()),
75            ),
76            JsonStats::Numerics(numerics) => {
77                match mz_repr::stats::decode_numeric(numerics, arena) {
78                    Ok((lower, upper)) => ResultSpec::value_between(lower, upper),
79                    Err(err) => {
80                        tracing::error!(%err, "failed to decode Json Numeric stats!");
81                        ResultSpec::anything()
82                    }
83                }
84            }
85            JsonStats::Maps(maps) => {
86                ResultSpec::map_spec(
87                    maps.into_iter()
88                        .map(|(k, v)| {
89                            let mut v_spec = Self::json_spec(v.len, &v.stats, arena);
90                            if v.len != len {
91                                // This field is not always present, so assume
92                                // that accessing it might be null.
93                                v_spec = v_spec.union(ResultSpec::null());
94                            }
95                            let key = arena.make_datum(|r| r.push(Datum::String(k.as_str())));
96                            (key, v_spec)
97                        })
98                        .collect(),
99                )
100            }
101            JsonStats::None => ResultSpec::nothing(),
102            JsonStats::Lists | JsonStats::Mixed => ResultSpec::anything(),
103        }
104    }
105
106    pub fn col_stats<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> ResultSpec<'a> {
107        let value_range = match self.col_values(idx, arena) {
108            Some(spec) => spec,
109            None => ResultSpec::anything(),
110        };
111        let json_range = self
112            .col_json(idx, arena)
113            .unwrap_or_else(ResultSpec::anything);
114
115        // If this is not a JSON column or we don't have JSON stats, json_range is
116        // [ResultSpec::anything] and this is a noop.
117        value_range.intersect(json_range)
118    }
119
120    fn col_json<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option<ResultSpec<'a>> {
121        let name = self.desc.get_name_idx(idx);
122        let typ = &self.desc.get_type(idx);
123
124        let ok_stats = self.stats.key.col("ok")?;
125        let ok_stats = ok_stats
126            .try_as_optional_struct()
127            .expect("ok column should be nullable struct");
128        let col_stats = ok_stats.some.cols.get(name.as_str())?;
129
130        if let SqlColumnType {
131            scalar_type: SqlScalarType::Jsonb,
132            nullable,
133        } = typ
134        {
135            let value_range = match &col_stats.values {
136                ColumnStatKinds::Bytes(BytesStats::Json(json_stats)) => {
137                    Self::json_spec(ok_stats.some.len, json_stats, arena)
138                }
139                ColumnStatKinds::Bytes(
140                    BytesStats::Primitive(_) | BytesStats::Atomic(_) | BytesStats::FixedSize(_),
141                ) => ResultSpec::anything(),
142                other => {
143                    self.metrics.mismatched_count.inc();
144                    tracing::error!(
145                        "expected BytesStats for JSON column {}, found {other:?}",
146                        self.name
147                    );
148                    return None;
149                }
150            };
151            let null_range = match (nullable, col_stats.nulls) {
152                (false, None) => ResultSpec::nothing(),
153                (true, Some(nulls)) if nulls.count == 0 => ResultSpec::nothing(),
154                (true, Some(_)) => ResultSpec::null(),
155                (col_null, stats_null) => {
156                    self.metrics.mismatched_count.inc();
157                    tracing::error!(
158                        "JSON column nullability mismatch, col {} null: {col_null}, stats: {stats_null:?}",
159                        self.name
160                    );
161                    return None;
162                }
163            };
164
165            Some(null_range.union(value_range))
166        } else {
167            None
168        }
169    }
170
171    pub fn len(&self) -> Option<usize> {
172        Some(self.stats.key.len)
173    }
174
175    pub fn ok_count(&self) -> Option<usize> {
176        // The number of OKs is the number of rows whose error is None.
177        let stats = self
178            .stats
179            .key
180            .col("err")?
181            .try_as_optional_bytes()
182            .expect("err column should be a Option<Vec<u8>>");
183        Some(stats.none)
184    }
185
186    pub fn err_count(&self) -> Option<usize> {
187        // Counter-intuitive: We can easily calculate the number of errors that
188        // were None from the column stats, but not how many were Some. So, what
189        // we do is count the number of Nones, which is the number of Oks, and
190        // then subtract that from the total.
191        let num_results = self.stats.key.len;
192        let num_oks = self.ok_count();
193        num_oks.map(|num_oks| num_results - num_oks)
194    }
195
196    fn col_values<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option<ResultSpec<'a>> {
197        let name = self.desc.get_name_idx(idx);
198        let typ = self.desc.get_type(idx);
199
200        let ok_stats = self.stats.key.cols.get("ok")?;
201        let ColumnStatKinds::Struct(ok_stats) = &ok_stats.values else {
202            panic!("'ok' column stats should be a struct")
203        };
204        let col_stats = ok_stats.cols.get(name.as_str())?;
205
206        let min_max = mz_repr::stats::col_values(&typ.scalar_type, &col_stats.values, arena);
207        let null_count = col_stats.nulls.as_ref().map_or(0, |nulls| nulls.count);
208        let total_count = self.len();
209
210        let values = match (total_count, min_max) {
211            (Some(total_count), _) if total_count == null_count => ResultSpec::nothing(),
212            (_, Some((min, max))) => ResultSpec::value_between(min, max),
213            _ => ResultSpec::value_all(),
214        };
215        let nulls = if null_count > 0 {
216            ResultSpec::null()
217        } else {
218            ResultSpec::nothing()
219        };
220
221        Some(values.union(nulls))
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use arrow::array::AsArray;
228    use mz_ore::metrics::MetricsRegistry;
229    use mz_persist_types::codec_impls::UnitSchema;
230    use mz_persist_types::columnar::{ColumnDecoder, Schema};
231    use mz_persist_types::part::PartBuilder;
232    use mz_persist_types::stats::PartStats;
233    use mz_repr::{Datum, RelationDesc, Row, RowArena, SqlColumnType, SqlScalarType};
234    use mz_repr::{SqlRelationType, arb_datum_for_column};
235    use proptest::prelude::*;
236    use proptest::strategy::ValueTree;
237
238    use super::*;
239    use crate::sources::SourceData;
240
241    fn validate_stats(column_type: &SqlColumnType, datums: &[Datum<'_>]) -> Result<(), String> {
242        let schema = RelationDesc::builder()
243            .with_column("col", column_type.clone())
244            .finish();
245
246        let mut builder = PartBuilder::new(&schema, &UnitSchema);
247        let mut row = SourceData(Ok(Row::default()));
248        for datum in datums {
249            row.as_mut().unwrap().packer().push(datum);
250            builder.push(&row, &(), 1u64, 1i64);
251        }
252        let part = builder.finish();
253
254        let key_col = part.key.as_struct();
255        let decoder = <RelationDesc as Schema<SourceData>>::decoder(&schema, key_col.clone())
256            .expect("success");
257        let key_stats = decoder.stats();
258
259        let metrics = PartStatsMetrics::new(&MetricsRegistry::new());
260        let stats = RelationPartStats {
261            name: "test",
262            metrics: &metrics,
263            stats: &PartStats { key: key_stats },
264            desc: &schema,
265        };
266        let arena = RowArena::default();
267
268        // Validate that the stats would include all of the provided datums.
269        for datum in datums {
270            let spec = stats.col_stats(&ColumnIndex::from_raw(0), &arena);
271            assert!(spec.may_contain(*datum));
272        }
273
274        Ok(())
275    }
276
277    fn scalar_type_stats_roundtrip(scalar_type: SqlScalarType) {
278        // Non-nullable version of the column.
279        let column_type = scalar_type.clone().nullable(false);
280        for datum in scalar_type.interesting_datums() {
281            assert_eq!(validate_stats(&column_type, &[datum]), Ok(()));
282        }
283
284        // Nullable version of the column.
285        let column_type = scalar_type.clone().nullable(true);
286        for datum in scalar_type.interesting_datums() {
287            assert_eq!(validate_stats(&column_type, &[datum]), Ok(()));
288        }
289        assert_eq!(validate_stats(&column_type, &[Datum::Null]), Ok(()));
290    }
291
292    #[mz_ore::test]
293    #[cfg_attr(miri, ignore)] // too slow
294    fn all_scalar_types_stats_roundtrip() {
295        proptest!(|(scalar_type in any::<SqlScalarType>())| {
296            // The proptest! macro interferes with rustfmt.
297            scalar_type_stats_roundtrip(scalar_type)
298        });
299    }
300
301    #[mz_ore::test]
302    #[cfg_attr(miri, ignore)] // too slow
303    fn all_datums_produce_valid_stats() {
304        // A strategy that will return a Vec of Datums for an arbitrary SqlColumnType.
305        let datums = any::<SqlColumnType>().prop_flat_map(|ty| {
306            prop::collection::vec(arb_datum_for_column(ty.clone()), 0..128)
307                .prop_map(move |datums| (ty.clone(), datums))
308        });
309
310        proptest!(
311            ProptestConfig::with_cases(80),
312            |((ty, datums) in datums)| {
313                // The proptest! macro interferes with rustfmt.
314                let datums: Vec<_> = datums.iter().map(Datum::from).collect();
315                prop_assert_eq!(validate_stats(&ty, &datums[..]), Ok(()));
316            }
317        )
318    }
319
320    #[mz_ore::test]
321    #[ignore] // TODO(parkmycar): Re-enable this test with a smaller sample size.
322    fn statistics_stability() {
323        /// This is the seed [`proptest`] uses for their deterministic RNG. We
324        /// copy it here to prevent breaking this test if [`proptest`] changes.
325        const RNG_SEED: [u8; 32] = [
326            0xf4, 0x16, 0x16, 0x48, 0xc3, 0xac, 0x77, 0xac, 0x72, 0x20, 0x0b, 0xea, 0x99, 0x67,
327            0x2d, 0x6d, 0xca, 0x9f, 0x76, 0xaf, 0x1b, 0x09, 0x73, 0xa0, 0x59, 0x22, 0x6d, 0xc5,
328            0x46, 0x39, 0x1c, 0x4a,
329        ];
330
331        let rng = proptest::test_runner::TestRng::from_seed(
332            proptest::test_runner::RngAlgorithm::ChaCha,
333            &RNG_SEED,
334        );
335        // Generate a collection of Rows.
336        let config = proptest::test_runner::Config {
337            // We let the loop below drive how much data we generate.
338            cases: u32::MAX,
339            rng_algorithm: proptest::test_runner::RngAlgorithm::ChaCha,
340            ..Default::default()
341        };
342        let mut runner = proptest::test_runner::TestRunner::new_with_rng(config, rng);
343
344        let max_cols = 4;
345        let max_rows = 8;
346        let test_cases = 1000;
347
348        // Note: We don't use the `Arbitrary` impl for `RelationDesc` because
349        // it generates large column names which is not interesting to us.
350        let strat = proptest::collection::vec(any::<SqlColumnType>(), 1..max_cols)
351            .prop_map(|cols| {
352                let col_names = (0..cols.len()).map(|i| i.to_string());
353                RelationDesc::new(SqlRelationType::new(cols), col_names)
354            })
355            .prop_flat_map(|desc| {
356                let rows = desc
357                    .typ()
358                    .columns()
359                    .iter()
360                    .cloned()
361                    .map(arb_datum_for_column)
362                    .collect::<Vec<_>>()
363                    .prop_map(|datums| Row::pack(datums.iter().map(Datum::from)));
364                proptest::collection::vec(rows, 1..max_rows)
365                    .prop_map(move |rows| (desc.clone(), rows))
366            });
367
368        let mut all_stats = Vec::new();
369        for _ in 0..test_cases {
370            let value_tree = strat.new_tree(&mut runner).unwrap();
371            let (desc, rows) = value_tree.current();
372
373            let mut builder = PartBuilder::new(&desc, &UnitSchema);
374            for row in &rows {
375                builder.push(&SourceData(Ok(row.clone())), &(), 1u64, 1i64);
376            }
377            let part = builder.finish();
378
379            let key_col = part.key.as_struct();
380            let decoder = <RelationDesc as Schema<SourceData>>::decoder(&desc, key_col.clone())
381                .expect("success");
382            let key_stats = decoder.stats();
383
384            all_stats.push(key_stats);
385        }
386
387        insta::assert_json_snapshot!(all_stats);
388    }
389}