1use mz_expr::{ColumnSpecs, Interpreter, MapFilterProject, ResultSpec, UnmaterializableFunc};
13use mz_persist_types::stats::{
14    BytesStats, ColumnStatKinds, JsonStats, PartStats, PartStatsMetrics,
15};
16use mz_repr::{ColumnIndex, Datum, RelationDesc, RowArena, SqlColumnType, SqlScalarType};
17
18#[derive(Debug)]
21pub struct RelationPartStats<'a> {
22    pub(crate) name: &'a str,
23    pub(crate) metrics: &'a PartStatsMetrics,
24    pub(crate) desc: &'a RelationDesc,
25    pub(crate) stats: &'a PartStats,
26}
27
28impl<'a> RelationPartStats<'a> {
29    pub fn new(
30        name: &'a str,
31        metrics: &'a PartStatsMetrics,
32        desc: &'a RelationDesc,
33        stats: &'a PartStats,
34    ) -> Self {
35        Self {
36            name,
37            metrics,
38            desc,
39            stats,
40        }
41    }
42}
43
44impl RelationPartStats<'_> {
45    pub fn may_match_mfp<'a>(&'a self, time_range: ResultSpec<'a>, mfp: &MapFilterProject) -> bool {
46        let arena = RowArena::new();
47        let mut ranges = ColumnSpecs::new(self.desc.typ(), &arena);
48        ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
49
50        if self.err_count().into_iter().any(|count| count > 0) {
51            return true;
53        }
54
55        for (pos, (idx, _name, _typ)) in self.desc.iter_all().enumerate() {
56            let result_spec = self.col_stats(idx, &arena);
57            ranges.push_column(pos, result_spec);
58        }
59        let result = ranges.mfp_filter(mfp).range;
60        result.may_contain(Datum::True) || result.may_fail()
61    }
62
63    fn json_spec<'a>(len: usize, stats: &'a JsonStats, arena: &'a RowArena) -> ResultSpec<'a> {
64        match stats {
65            JsonStats::JsonNulls => ResultSpec::value(Datum::JsonNull),
66            JsonStats::Bools(bools) => {
67                ResultSpec::value_between(bools.lower.into(), bools.upper.into())
68            }
69            JsonStats::Strings(strings) => ResultSpec::value_between(
70                Datum::String(strings.lower.as_str()),
71                Datum::String(strings.upper.as_str()),
72            ),
73            JsonStats::Numerics(numerics) => {
74                match mz_repr::stats::decode_numeric(numerics, arena) {
75                    Ok((lower, upper)) => ResultSpec::value_between(lower, upper),
76                    Err(err) => {
77                        tracing::error!(%err, "failed to decode Json Numeric stats!");
78                        ResultSpec::anything()
79                    }
80                }
81            }
82            JsonStats::Maps(maps) => {
83                ResultSpec::map_spec(
84                    maps.into_iter()
85                        .map(|(k, v)| {
86                            let mut v_spec = Self::json_spec(v.len, &v.stats, arena);
87                            if v.len != len {
88                                v_spec = v_spec.union(ResultSpec::null());
91                            }
92                            let key = arena.make_datum(|r| r.push(Datum::String(k.as_str())));
93                            (key, v_spec)
94                        })
95                        .collect(),
96                )
97            }
98            JsonStats::None => ResultSpec::nothing(),
99            JsonStats::Lists | JsonStats::Mixed => ResultSpec::anything(),
100        }
101    }
102
103    pub fn col_stats<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> ResultSpec<'a> {
104        let value_range = match self.col_values(idx, arena) {
105            Some(spec) => spec,
106            None => ResultSpec::anything(),
107        };
108        let json_range = self
109            .col_json(idx, arena)
110            .unwrap_or_else(ResultSpec::anything);
111
112        value_range.intersect(json_range)
115    }
116
117    fn col_json<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option<ResultSpec<'a>> {
118        let name = self.desc.get_name_idx(idx);
119        let typ = &self.desc.get_type(idx);
120
121        let ok_stats = self.stats.key.col("ok")?;
122        let ok_stats = ok_stats
123            .try_as_optional_struct()
124            .expect("ok column should be nullable struct");
125        let col_stats = ok_stats.some.cols.get(name.as_str())?;
126
127        if let SqlColumnType {
128            scalar_type: SqlScalarType::Jsonb,
129            nullable,
130        } = typ
131        {
132            let value_range = match &col_stats.values {
133                ColumnStatKinds::Bytes(BytesStats::Json(json_stats)) => {
134                    Self::json_spec(ok_stats.some.len, json_stats, arena)
135                }
136                ColumnStatKinds::Bytes(
137                    BytesStats::Primitive(_) | BytesStats::Atomic(_) | BytesStats::FixedSize(_),
138                ) => ResultSpec::anything(),
139                other => {
140                    self.metrics.mismatched_count.inc();
141                    tracing::error!(
142                        "expected BytesStats for JSON column {}, found {other:?}",
143                        self.name
144                    );
145                    return None;
146                }
147            };
148            let null_range = match (nullable, col_stats.nulls) {
149                (false, None) => ResultSpec::nothing(),
150                (true, Some(nulls)) if nulls.count == 0 => ResultSpec::nothing(),
151                (true, Some(_)) => ResultSpec::null(),
152                (col_null, stats_null) => {
153                    self.metrics.mismatched_count.inc();
154                    tracing::error!(
155                        "JSON column nullability mismatch, col {} null: {col_null}, stats: {stats_null:?}",
156                        self.name
157                    );
158                    return None;
159                }
160            };
161
162            Some(null_range.union(value_range))
163        } else {
164            None
165        }
166    }
167
168    pub fn len(&self) -> Option<usize> {
169        Some(self.stats.key.len)
170    }
171
172    pub fn ok_count(&self) -> Option<usize> {
173        let stats = self
175            .stats
176            .key
177            .col("err")?
178            .try_as_optional_bytes()
179            .expect("err column should be a Option<Vec<u8>>");
180        Some(stats.none)
181    }
182
183    pub fn err_count(&self) -> Option<usize> {
184        let num_results = self.stats.key.len;
189        let num_oks = self.ok_count();
190        num_oks.map(|num_oks| num_results - num_oks)
191    }
192
193    fn col_values<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option<ResultSpec<'a>> {
194        let name = self.desc.get_name_idx(idx);
195        let typ = self.desc.get_type(idx);
196
197        let ok_stats = self.stats.key.cols.get("ok")?;
198        let ColumnStatKinds::Struct(ok_stats) = &ok_stats.values else {
199            panic!("'ok' column stats should be a struct")
200        };
201        let col_stats = ok_stats.cols.get(name.as_str())?;
202
203        let min_max = mz_repr::stats::col_values(&typ.scalar_type, &col_stats.values, arena);
204        let null_count = col_stats.nulls.as_ref().map_or(0, |nulls| nulls.count);
205        let total_count = self.len();
206
207        let values = match (total_count, min_max) {
208            (Some(total_count), _) if total_count == null_count => ResultSpec::nothing(),
209            (_, Some((min, max))) => ResultSpec::value_between(min, max),
210            _ => ResultSpec::value_all(),
211        };
212        let nulls = if null_count > 0 {
213            ResultSpec::null()
214        } else {
215            ResultSpec::nothing()
216        };
217
218        Some(values.union(nulls))
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use arrow::array::AsArray;
225    use mz_ore::metrics::MetricsRegistry;
226    use mz_persist_types::codec_impls::UnitSchema;
227    use mz_persist_types::columnar::{ColumnDecoder, Schema};
228    use mz_persist_types::part::PartBuilder;
229    use mz_persist_types::stats::PartStats;
230    use mz_repr::{Datum, RelationDesc, Row, RowArena, SqlColumnType, SqlScalarType};
231    use mz_repr::{SqlRelationType, arb_datum_for_column};
232    use proptest::prelude::*;
233    use proptest::strategy::ValueTree;
234
235    use super::*;
236    use crate::sources::SourceData;
237
238    fn validate_stats(column_type: &SqlColumnType, datums: &[Datum<'_>]) -> Result<(), String> {
239        let schema = RelationDesc::builder()
240            .with_column("col", column_type.clone())
241            .finish();
242
243        let mut builder = PartBuilder::new(&schema, &UnitSchema);
244        let mut row = SourceData(Ok(Row::default()));
245        for datum in datums {
246            row.as_mut().unwrap().packer().push(datum);
247            builder.push(&row, &(), 1u64, 1i64);
248        }
249        let part = builder.finish();
250
251        let key_col = part.key.as_struct();
252        let decoder = <RelationDesc as Schema<SourceData>>::decoder(&schema, key_col.clone())
253            .expect("success");
254        let key_stats = decoder.stats();
255
256        let metrics = PartStatsMetrics::new(&MetricsRegistry::new());
257        let stats = RelationPartStats {
258            name: "test",
259            metrics: &metrics,
260            stats: &PartStats { key: key_stats },
261            desc: &schema,
262        };
263        let arena = RowArena::default();
264
265        for datum in datums {
267            let spec = stats.col_stats(&ColumnIndex::from_raw(0), &arena);
268            assert!(spec.may_contain(*datum));
269        }
270
271        Ok(())
272    }
273
274    fn scalar_type_stats_roundtrip(scalar_type: SqlScalarType) {
275        let column_type = scalar_type.clone().nullable(false);
277        for datum in scalar_type.interesting_datums() {
278            assert_eq!(validate_stats(&column_type, &[datum]), Ok(()));
279        }
280
281        let column_type = scalar_type.clone().nullable(true);
283        for datum in scalar_type.interesting_datums() {
284            assert_eq!(validate_stats(&column_type, &[datum]), Ok(()));
285        }
286        assert_eq!(validate_stats(&column_type, &[Datum::Null]), Ok(()));
287    }
288
289    #[mz_ore::test]
290    #[cfg_attr(miri, ignore)] fn all_scalar_types_stats_roundtrip() {
292        proptest!(|(scalar_type in any::<SqlScalarType>())| {
293            scalar_type_stats_roundtrip(scalar_type)
295        });
296    }
297
298    #[mz_ore::test]
299    #[cfg_attr(miri, ignore)] fn all_datums_produce_valid_stats() {
301        let datums = any::<SqlColumnType>().prop_flat_map(|ty| {
303            prop::collection::vec(arb_datum_for_column(ty.clone()), 0..128)
304                .prop_map(move |datums| (ty.clone(), datums))
305        });
306
307        proptest!(
308            ProptestConfig::with_cases(80),
309            |((ty, datums) in datums)| {
310                let datums: Vec<_> = datums.iter().map(Datum::from).collect();
312                prop_assert_eq!(validate_stats(&ty, &datums[..]), Ok(()));
313            }
314        )
315    }
316
317    #[mz_ore::test]
318    #[ignore] fn statistics_stability() {
320        const RNG_SEED: [u8; 32] = [
323            0xf4, 0x16, 0x16, 0x48, 0xc3, 0xac, 0x77, 0xac, 0x72, 0x20, 0x0b, 0xea, 0x99, 0x67,
324            0x2d, 0x6d, 0xca, 0x9f, 0x76, 0xaf, 0x1b, 0x09, 0x73, 0xa0, 0x59, 0x22, 0x6d, 0xc5,
325            0x46, 0x39, 0x1c, 0x4a,
326        ];
327
328        let rng = proptest::test_runner::TestRng::from_seed(
329            proptest::test_runner::RngAlgorithm::ChaCha,
330            &RNG_SEED,
331        );
332        let config = proptest::test_runner::Config {
334            cases: u32::MAX,
336            rng_algorithm: proptest::test_runner::RngAlgorithm::ChaCha,
337            ..Default::default()
338        };
339        let mut runner = proptest::test_runner::TestRunner::new_with_rng(config, rng);
340
341        let max_cols = 4;
342        let max_rows = 8;
343        let test_cases = 1000;
344
345        let strat = proptest::collection::vec(any::<SqlColumnType>(), 1..max_cols)
348            .prop_map(|cols| {
349                let col_names = (0..cols.len()).map(|i| i.to_string());
350                RelationDesc::new(SqlRelationType::new(cols), col_names)
351            })
352            .prop_flat_map(|desc| {
353                let rows = desc
354                    .typ()
355                    .columns()
356                    .iter()
357                    .cloned()
358                    .map(arb_datum_for_column)
359                    .collect::<Vec<_>>()
360                    .prop_map(|datums| Row::pack(datums.iter().map(Datum::from)));
361                proptest::collection::vec(rows, 1..max_rows)
362                    .prop_map(move |rows| (desc.clone(), rows))
363            });
364
365        let mut all_stats = Vec::new();
366        for _ in 0..test_cases {
367            let value_tree = strat.new_tree(&mut runner).unwrap();
368            let (desc, rows) = value_tree.current();
369
370            let mut builder = PartBuilder::new(&desc, &UnitSchema);
371            for row in &rows {
372                builder.push(&SourceData(Ok(row.clone())), &(), 1u64, 1i64);
373            }
374            let part = builder.finish();
375
376            let key_col = part.key.as_struct();
377            let decoder = <RelationDesc as Schema<SourceData>>::decoder(&desc, key_col.clone())
378                .expect("success");
379            let key_stats = decoder.stats();
380
381            all_stats.push(key_stats);
382        }
383
384        insta::assert_json_snapshot!(all_stats);
385    }
386}