1use mz_expr::{ColumnSpecs, Interpreter, MapFilterProject, ResultSpec, UnmaterializableFunc};
13use mz_persist_types::stats::{
14 BytesStats, ColumnStatKinds, JsonStats, PartStats, PartStatsMetrics,
15};
16use mz_repr::{
17 ColumnIndex, Datum, RelationDesc, ReprRelationType, RowArena, SqlColumnType, SqlScalarType,
18};
19
20#[derive(Debug)]
23pub struct RelationPartStats<'a> {
24 pub(crate) name: &'a str,
25 pub(crate) metrics: &'a PartStatsMetrics,
26 pub(crate) desc: &'a RelationDesc,
27 pub(crate) stats: &'a PartStats,
28}
29
30impl<'a> RelationPartStats<'a> {
31 pub fn new(
32 name: &'a str,
33 metrics: &'a PartStatsMetrics,
34 desc: &'a RelationDesc,
35 stats: &'a PartStats,
36 ) -> Self {
37 Self {
38 name,
39 metrics,
40 desc,
41 stats,
42 }
43 }
44}
45
46impl RelationPartStats<'_> {
47 pub fn may_match_mfp<'a>(&'a self, time_range: ResultSpec<'a>, mfp: &MapFilterProject) -> bool {
48 let arena = RowArena::new();
49 let relation = ReprRelationType::from(self.desc.typ());
50 let mut ranges = ColumnSpecs::new(&relation, &arena);
51 ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
52
53 if self.err_count().into_iter().any(|count| count > 0) {
54 return true;
56 }
57
58 for (pos, (idx, _name, _typ)) in self.desc.iter_all().enumerate() {
59 let result_spec = self.col_stats(idx, &arena);
60 ranges.push_column(pos, result_spec);
61 }
62 let result = ranges.mfp_filter(mfp).range;
63 result.may_contain(Datum::True) || result.may_fail()
64 }
65
66 fn json_spec<'a>(len: usize, stats: &'a JsonStats, arena: &'a RowArena) -> ResultSpec<'a> {
67 match stats {
68 JsonStats::JsonNulls => ResultSpec::value(Datum::JsonNull),
69 JsonStats::Bools(bools) => {
70 ResultSpec::value_between(bools.lower.into(), bools.upper.into())
71 }
72 JsonStats::Strings(strings) => ResultSpec::value_between(
73 Datum::String(strings.lower.as_str()),
74 Datum::String(strings.upper.as_str()),
75 ),
76 JsonStats::Numerics(numerics) => {
77 match mz_repr::stats::decode_numeric(numerics, arena) {
78 Ok((lower, upper)) => ResultSpec::value_between(lower, upper),
79 Err(err) => {
80 tracing::error!(%err, "failed to decode Json Numeric stats!");
81 ResultSpec::anything()
82 }
83 }
84 }
85 JsonStats::Maps(maps) => {
86 ResultSpec::map_spec(
87 maps.into_iter()
88 .map(|(k, v)| {
89 let mut v_spec = Self::json_spec(v.len, &v.stats, arena);
90 if v.len != len {
91 v_spec = v_spec.union(ResultSpec::null());
94 }
95 let key = arena.make_datum(|r| r.push(Datum::String(k.as_str())));
96 (key, v_spec)
97 })
98 .collect(),
99 )
100 }
101 JsonStats::None => ResultSpec::nothing(),
102 JsonStats::Lists | JsonStats::Mixed => ResultSpec::anything(),
103 }
104 }
105
106 pub fn col_stats<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> ResultSpec<'a> {
107 let value_range = match self.col_values(idx, arena) {
108 Some(spec) => spec,
109 None => ResultSpec::anything(),
110 };
111 let json_range = self
112 .col_json(idx, arena)
113 .unwrap_or_else(ResultSpec::anything);
114
115 value_range.intersect(json_range)
118 }
119
120 fn col_json<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option<ResultSpec<'a>> {
121 let name = self.desc.get_name_idx(idx);
122 let typ = &self.desc.get_type(idx);
123
124 let ok_stats = self.stats.key.col("ok")?;
125 let ok_stats = ok_stats
126 .try_as_optional_struct()
127 .expect("ok column should be nullable struct");
128 let col_stats = ok_stats.some.cols.get(name.as_str())?;
129
130 if let SqlColumnType {
131 scalar_type: SqlScalarType::Jsonb,
132 nullable,
133 } = typ
134 {
135 let value_range = match &col_stats.values {
136 ColumnStatKinds::Bytes(BytesStats::Json(json_stats)) => {
137 Self::json_spec(ok_stats.some.len, json_stats, arena)
138 }
139 ColumnStatKinds::Bytes(
140 BytesStats::Primitive(_) | BytesStats::Atomic(_) | BytesStats::FixedSize(_),
141 ) => ResultSpec::anything(),
142 other => {
143 self.metrics.mismatched_count.inc();
144 tracing::error!(
145 "expected BytesStats for JSON column {}, found {other:?}",
146 self.name
147 );
148 return None;
149 }
150 };
151 let null_range = match (nullable, col_stats.nulls) {
152 (false, None) => ResultSpec::nothing(),
153 (true, Some(nulls)) if nulls.count == 0 => ResultSpec::nothing(),
154 (true, Some(_)) => ResultSpec::null(),
155 (col_null, stats_null) => {
156 self.metrics.mismatched_count.inc();
157 tracing::error!(
158 "JSON column nullability mismatch, col {} null: {col_null}, stats: {stats_null:?}",
159 self.name
160 );
161 return None;
162 }
163 };
164
165 Some(null_range.union(value_range))
166 } else {
167 None
168 }
169 }
170
171 pub fn len(&self) -> Option<usize> {
172 Some(self.stats.key.len)
173 }
174
175 pub fn ok_count(&self) -> Option<usize> {
176 let stats = self
178 .stats
179 .key
180 .col("err")?
181 .try_as_optional_bytes()
182 .expect("err column should be a Option<Vec<u8>>");
183 Some(stats.none)
184 }
185
186 pub fn err_count(&self) -> Option<usize> {
187 let num_results = self.stats.key.len;
192 let num_oks = self.ok_count();
193 num_oks.map(|num_oks| num_results - num_oks)
194 }
195
196 fn col_values<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option<ResultSpec<'a>> {
197 let name = self.desc.get_name_idx(idx);
198 let typ = self.desc.get_type(idx);
199
200 let ok_stats = self.stats.key.cols.get("ok")?;
201 let ColumnStatKinds::Struct(ok_stats) = &ok_stats.values else {
202 panic!("'ok' column stats should be a struct")
203 };
204 let col_stats = ok_stats.cols.get(name.as_str())?;
205
206 let min_max = mz_repr::stats::col_values(&typ.scalar_type, &col_stats.values, arena);
207 let null_count = col_stats.nulls.as_ref().map_or(0, |nulls| nulls.count);
208 let total_count = self.len();
209
210 let values = match (total_count, min_max) {
211 (Some(total_count), _) if total_count == null_count => ResultSpec::nothing(),
212 (_, Some((min, max))) => ResultSpec::value_between(min, max),
213 _ => ResultSpec::value_all(),
214 };
215 let nulls = if null_count > 0 {
216 ResultSpec::null()
217 } else {
218 ResultSpec::nothing()
219 };
220
221 Some(values.union(nulls))
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use arrow::array::AsArray;
228 use mz_ore::metrics::MetricsRegistry;
229 use mz_persist_types::codec_impls::UnitSchema;
230 use mz_persist_types::columnar::{ColumnDecoder, Schema};
231 use mz_persist_types::part::PartBuilder;
232 use mz_persist_types::stats::PartStats;
233 use mz_repr::{Datum, RelationDesc, Row, RowArena, SqlColumnType, SqlScalarType};
234 use mz_repr::{SqlRelationType, arb_datum_for_column};
235 use proptest::prelude::*;
236 use proptest::strategy::ValueTree;
237
238 use super::*;
239 use crate::sources::SourceData;
240
241 fn validate_stats(column_type: &SqlColumnType, datums: &[Datum<'_>]) -> Result<(), String> {
242 let schema = RelationDesc::builder()
243 .with_column("col", column_type.clone())
244 .finish();
245
246 let mut builder = PartBuilder::new(&schema, &UnitSchema);
247 let mut row = SourceData(Ok(Row::default()));
248 for datum in datums {
249 row.as_mut().unwrap().packer().push(datum);
250 builder.push(&row, &(), 1u64, 1i64);
251 }
252 let part = builder.finish();
253
254 let key_col = part.key.as_struct();
255 let decoder = <RelationDesc as Schema<SourceData>>::decoder(&schema, key_col.clone())
256 .expect("success");
257 let key_stats = decoder.stats();
258
259 let metrics = PartStatsMetrics::new(&MetricsRegistry::new());
260 let stats = RelationPartStats {
261 name: "test",
262 metrics: &metrics,
263 stats: &PartStats { key: key_stats },
264 desc: &schema,
265 };
266 let arena = RowArena::default();
267
268 for datum in datums {
270 let spec = stats.col_stats(&ColumnIndex::from_raw(0), &arena);
271 assert!(spec.may_contain(*datum));
272 }
273
274 Ok(())
275 }
276
277 fn scalar_type_stats_roundtrip(scalar_type: SqlScalarType) {
278 let column_type = scalar_type.clone().nullable(false);
280 for datum in scalar_type.interesting_datums() {
281 assert_eq!(validate_stats(&column_type, &[datum]), Ok(()));
282 }
283
284 let column_type = scalar_type.clone().nullable(true);
286 for datum in scalar_type.interesting_datums() {
287 assert_eq!(validate_stats(&column_type, &[datum]), Ok(()));
288 }
289 assert_eq!(validate_stats(&column_type, &[Datum::Null]), Ok(()));
290 }
291
292 #[mz_ore::test]
293 #[cfg_attr(miri, ignore)] fn all_scalar_types_stats_roundtrip() {
295 proptest!(|(scalar_type in any::<SqlScalarType>())| {
296 scalar_type_stats_roundtrip(scalar_type)
298 });
299 }
300
301 #[mz_ore::test]
302 #[cfg_attr(miri, ignore)] fn all_datums_produce_valid_stats() {
304 let datums = any::<SqlColumnType>().prop_flat_map(|ty| {
306 prop::collection::vec(arb_datum_for_column(ty.clone()), 0..128)
307 .prop_map(move |datums| (ty.clone(), datums))
308 });
309
310 proptest!(
311 ProptestConfig::with_cases(80),
312 |((ty, datums) in datums)| {
313 let datums: Vec<_> = datums.iter().map(Datum::from).collect();
315 prop_assert_eq!(validate_stats(&ty, &datums[..]), Ok(()));
316 }
317 )
318 }
319
320 #[mz_ore::test]
321 #[ignore] fn statistics_stability() {
323 const RNG_SEED: [u8; 32] = [
326 0xf4, 0x16, 0x16, 0x48, 0xc3, 0xac, 0x77, 0xac, 0x72, 0x20, 0x0b, 0xea, 0x99, 0x67,
327 0x2d, 0x6d, 0xca, 0x9f, 0x76, 0xaf, 0x1b, 0x09, 0x73, 0xa0, 0x59, 0x22, 0x6d, 0xc5,
328 0x46, 0x39, 0x1c, 0x4a,
329 ];
330
331 let rng = proptest::test_runner::TestRng::from_seed(
332 proptest::test_runner::RngAlgorithm::ChaCha,
333 &RNG_SEED,
334 );
335 let config = proptest::test_runner::Config {
337 cases: u32::MAX,
339 rng_algorithm: proptest::test_runner::RngAlgorithm::ChaCha,
340 ..Default::default()
341 };
342 let mut runner = proptest::test_runner::TestRunner::new_with_rng(config, rng);
343
344 let max_cols = 4;
345 let max_rows = 8;
346 let test_cases = 1000;
347
348 let strat = proptest::collection::vec(any::<SqlColumnType>(), 1..max_cols)
351 .prop_map(|cols| {
352 let col_names = (0..cols.len()).map(|i| i.to_string());
353 RelationDesc::new(SqlRelationType::new(cols), col_names)
354 })
355 .prop_flat_map(|desc| {
356 let rows = desc
357 .typ()
358 .columns()
359 .iter()
360 .cloned()
361 .map(arb_datum_for_column)
362 .collect::<Vec<_>>()
363 .prop_map(|datums| Row::pack(datums.iter().map(Datum::from)));
364 proptest::collection::vec(rows, 1..max_rows)
365 .prop_map(move |rows| (desc.clone(), rows))
366 });
367
368 let mut all_stats = Vec::new();
369 for _ in 0..test_cases {
370 let value_tree = strat.new_tree(&mut runner).unwrap();
371 let (desc, rows) = value_tree.current();
372
373 let mut builder = PartBuilder::new(&desc, &UnitSchema);
374 for row in &rows {
375 builder.push(&SourceData(Ok(row.clone())), &(), 1u64, 1i64);
376 }
377 let part = builder.finish();
378
379 let key_col = part.key.as_struct();
380 let decoder = <RelationDesc as Schema<SourceData>>::decoder(&desc, key_col.clone())
381 .expect("success");
382 let key_stats = decoder.stats();
383
384 all_stats.push(key_stats);
385 }
386
387 insta::assert_json_snapshot!(all_stats);
388 }
389}