1use mz_expr::{ColumnSpecs, Interpreter, MapFilterProject, ResultSpec, UnmaterializableFunc};
13use mz_persist_types::stats::{
14 BytesStats, ColumnStatKinds, JsonStats, PartStats, PartStatsMetrics,
15};
16use mz_repr::{ColumnIndex, ColumnType, Datum, RelationDesc, RowArena, ScalarType};
17
18#[derive(Debug)]
21pub struct RelationPartStats<'a> {
22 pub(crate) name: &'a str,
23 pub(crate) metrics: &'a PartStatsMetrics,
24 pub(crate) desc: &'a RelationDesc,
25 pub(crate) stats: &'a PartStats,
26}
27
28impl<'a> RelationPartStats<'a> {
29 pub fn new(
30 name: &'a str,
31 metrics: &'a PartStatsMetrics,
32 desc: &'a RelationDesc,
33 stats: &'a PartStats,
34 ) -> Self {
35 Self {
36 name,
37 metrics,
38 desc,
39 stats,
40 }
41 }
42}
43
44impl RelationPartStats<'_> {
45 pub fn may_match_mfp<'a>(&'a self, time_range: ResultSpec<'a>, mfp: &MapFilterProject) -> bool {
46 let arena = RowArena::new();
47 let mut ranges = ColumnSpecs::new(self.desc.typ(), &arena);
48 ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
49
50 if self.err_count().into_iter().any(|count| count > 0) {
51 return true;
53 }
54
55 for (pos, (idx, _name, _typ)) in self.desc.iter_all().enumerate() {
56 let result_spec = self.col_stats(idx, &arena);
57 ranges.push_column(pos, result_spec);
58 }
59 let result = ranges.mfp_filter(mfp).range;
60 result.may_contain(Datum::True) || result.may_fail()
61 }
62
63 fn json_spec<'a>(len: usize, stats: &'a JsonStats, arena: &'a RowArena) -> ResultSpec<'a> {
64 match stats {
65 JsonStats::JsonNulls => ResultSpec::value(Datum::JsonNull),
66 JsonStats::Bools(bools) => {
67 ResultSpec::value_between(bools.lower.into(), bools.upper.into())
68 }
69 JsonStats::Strings(strings) => ResultSpec::value_between(
70 Datum::String(strings.lower.as_str()),
71 Datum::String(strings.upper.as_str()),
72 ),
73 JsonStats::Numerics(numerics) => {
74 match mz_repr::stats::decode_numeric(numerics, arena) {
75 Ok((lower, upper)) => ResultSpec::value_between(lower, upper),
76 Err(err) => {
77 tracing::error!(%err, "failed to decode Json Numeric stats!");
78 ResultSpec::anything()
79 }
80 }
81 }
82 JsonStats::Maps(maps) => {
83 ResultSpec::map_spec(
84 maps.into_iter()
85 .map(|(k, v)| {
86 let mut v_spec = Self::json_spec(v.len, &v.stats, arena);
87 if v.len != len {
88 v_spec = v_spec.union(ResultSpec::null());
91 }
92 let key = arena.make_datum(|r| r.push(Datum::String(k.as_str())));
93 (key, v_spec)
94 })
95 .collect(),
96 )
97 }
98 JsonStats::None => ResultSpec::nothing(),
99 JsonStats::Lists | JsonStats::Mixed => ResultSpec::anything(),
100 }
101 }
102
103 pub fn col_stats<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> ResultSpec<'a> {
104 let value_range = match self.col_values(idx, arena) {
105 Some(spec) => spec,
106 None => ResultSpec::anything(),
107 };
108 let json_range = self.col_json(idx, arena).unwrap_or(ResultSpec::anything());
109
110 value_range.intersect(json_range)
113 }
114
115 fn col_json<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option<ResultSpec<'a>> {
116 let name = self.desc.get_name_idx(idx);
117 let typ = &self.desc.get_type(idx);
118
119 let ok_stats = self.stats.key.col("ok")?;
120 let ok_stats = ok_stats
121 .try_as_optional_struct()
122 .expect("ok column should be nullable struct");
123 let col_stats = ok_stats.some.cols.get(name.as_str())?;
124
125 if let ColumnType {
126 scalar_type: ScalarType::Jsonb,
127 nullable,
128 } = typ
129 {
130 let value_range = match &col_stats.values {
131 ColumnStatKinds::Bytes(BytesStats::Json(json_stats)) => {
132 Self::json_spec(ok_stats.some.len, json_stats, arena)
133 }
134 ColumnStatKinds::Bytes(
135 BytesStats::Primitive(_) | BytesStats::Atomic(_) | BytesStats::FixedSize(_),
136 ) => ResultSpec::anything(),
137 other => {
138 self.metrics.mismatched_count.inc();
139 tracing::error!(
140 "expected BytesStats for JSON column {}, found {other:?}",
141 self.name
142 );
143 return None;
144 }
145 };
146 let null_range = match (nullable, col_stats.nulls) {
147 (false, None) => ResultSpec::nothing(),
148 (true, Some(nulls)) if nulls.count == 0 => ResultSpec::nothing(),
149 (true, Some(_)) => ResultSpec::null(),
150 (col_null, stats_null) => {
151 self.metrics.mismatched_count.inc();
152 tracing::error!(
153 "JSON column nullability mismatch, col {} null: {col_null}, stats: {stats_null:?}",
154 self.name
155 );
156 return None;
157 }
158 };
159
160 Some(null_range.union(value_range))
161 } else {
162 None
163 }
164 }
165
166 pub fn len(&self) -> Option<usize> {
167 Some(self.stats.key.len)
168 }
169
170 pub fn ok_count(&self) -> Option<usize> {
171 let stats = self
173 .stats
174 .key
175 .col("err")?
176 .try_as_optional_bytes()
177 .expect("err column should be a Option<Vec<u8>>");
178 Some(stats.none)
179 }
180
181 pub fn err_count(&self) -> Option<usize> {
182 let num_results = self.stats.key.len;
187 let num_oks = self.ok_count();
188 num_oks.map(|num_oks| num_results - num_oks)
189 }
190
191 fn col_values<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option<ResultSpec<'a>> {
192 let name = self.desc.get_name_idx(idx);
193 let typ = self.desc.get_type(idx);
194
195 let ok_stats = self.stats.key.cols.get("ok")?;
196 let ColumnStatKinds::Struct(ok_stats) = &ok_stats.values else {
197 panic!("'ok' column stats should be a struct")
198 };
199 let col_stats = ok_stats.cols.get(name.as_str())?;
200
201 let min_max = mz_repr::stats::col_values(&typ.scalar_type, &col_stats.values, arena);
202 let null_count = col_stats.nulls.as_ref().map_or(0, |nulls| nulls.count);
203 let total_count = self.len();
204
205 let values = match (total_count, min_max) {
206 (Some(total_count), _) if total_count == null_count => ResultSpec::nothing(),
207 (_, Some((min, max))) => ResultSpec::value_between(min, max),
208 _ => ResultSpec::value_all(),
209 };
210 let nulls = if null_count > 0 {
211 ResultSpec::null()
212 } else {
213 ResultSpec::nothing()
214 };
215
216 Some(values.union(nulls))
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use arrow::array::AsArray;
223 use mz_ore::metrics::MetricsRegistry;
224 use mz_persist_types::codec_impls::UnitSchema;
225 use mz_persist_types::columnar::{ColumnDecoder, Schema};
226 use mz_persist_types::part::PartBuilder;
227 use mz_persist_types::stats::PartStats;
228 use mz_repr::{ColumnType, Datum, RelationDesc, Row, RowArena, ScalarType};
229 use mz_repr::{RelationType, arb_datum_for_column};
230 use proptest::prelude::*;
231 use proptest::strategy::ValueTree;
232
233 use super::*;
234 use crate::sources::SourceData;
235
236 fn validate_stats(column_type: &ColumnType, datums: &[Datum<'_>]) -> Result<(), String> {
237 let schema = RelationDesc::builder()
238 .with_column("col", column_type.clone())
239 .finish();
240
241 let mut builder = PartBuilder::new(&schema, &UnitSchema);
242 let mut row = SourceData(Ok(Row::default()));
243 for datum in datums {
244 row.as_mut().unwrap().packer().push(datum);
245 builder.push(&row, &(), 1u64, 1i64);
246 }
247 let part = builder.finish();
248
249 let key_col = part.key.as_struct();
250 let decoder = <RelationDesc as Schema<SourceData>>::decoder(&schema, key_col.clone())
251 .expect("success");
252 let key_stats = decoder.stats();
253
254 let metrics = PartStatsMetrics::new(&MetricsRegistry::new());
255 let stats = RelationPartStats {
256 name: "test",
257 metrics: &metrics,
258 stats: &PartStats { key: key_stats },
259 desc: &schema,
260 };
261 let arena = RowArena::default();
262
263 for datum in datums {
265 let spec = stats.col_stats(&ColumnIndex::from_raw(0), &arena);
266 assert!(spec.may_contain(*datum));
267 }
268
269 Ok(())
270 }
271
272 fn scalar_type_stats_roundtrip(scalar_type: ScalarType) {
273 let column_type = scalar_type.clone().nullable(false);
275 for datum in scalar_type.interesting_datums() {
276 assert_eq!(validate_stats(&column_type, &[datum]), Ok(()));
277 }
278
279 let column_type = scalar_type.clone().nullable(true);
281 for datum in scalar_type.interesting_datums() {
282 assert_eq!(validate_stats(&column_type, &[datum]), Ok(()));
283 }
284 assert_eq!(validate_stats(&column_type, &[Datum::Null]), Ok(()));
285 }
286
287 #[mz_ore::test]
288 #[cfg_attr(miri, ignore)] fn all_scalar_types_stats_roundtrip() {
290 proptest!(|(scalar_type in any::<ScalarType>())| {
291 scalar_type_stats_roundtrip(scalar_type)
293 });
294 }
295
296 #[mz_ore::test]
297 #[cfg_attr(miri, ignore)] fn all_datums_produce_valid_stats() {
299 let datums = any::<ColumnType>().prop_flat_map(|ty| {
301 prop::collection::vec(arb_datum_for_column(ty.clone()), 0..128)
302 .prop_map(move |datums| (ty.clone(), datums))
303 });
304
305 proptest!(
306 ProptestConfig::with_cases(80),
307 |((ty, datums) in datums)| {
308 let datums: Vec<_> = datums.iter().map(Datum::from).collect();
310 prop_assert_eq!(validate_stats(&ty, &datums[..]), Ok(()));
311 }
312 )
313 }
314
315 #[mz_ore::test]
316 #[ignore] fn statistics_stability() {
318 const RNG_SEED: [u8; 32] = [
321 0xf4, 0x16, 0x16, 0x48, 0xc3, 0xac, 0x77, 0xac, 0x72, 0x20, 0x0b, 0xea, 0x99, 0x67,
322 0x2d, 0x6d, 0xca, 0x9f, 0x76, 0xaf, 0x1b, 0x09, 0x73, 0xa0, 0x59, 0x22, 0x6d, 0xc5,
323 0x46, 0x39, 0x1c, 0x4a,
324 ];
325
326 let rng = proptest::test_runner::TestRng::from_seed(
327 proptest::test_runner::RngAlgorithm::ChaCha,
328 &RNG_SEED,
329 );
330 let config = proptest::test_runner::Config {
332 cases: u32::MAX,
334 rng_algorithm: proptest::test_runner::RngAlgorithm::ChaCha,
335 ..Default::default()
336 };
337 let mut runner = proptest::test_runner::TestRunner::new_with_rng(config, rng);
338
339 let max_cols = 4;
340 let max_rows = 8;
341 let test_cases = 1000;
342
343 let strat = proptest::collection::vec(any::<ColumnType>(), 1..max_cols)
346 .prop_map(|cols| {
347 let col_names = (0..cols.len()).map(|i| i.to_string());
348 RelationDesc::new(RelationType::new(cols), col_names)
349 })
350 .prop_flat_map(|desc| {
351 let rows = desc
352 .typ()
353 .columns()
354 .iter()
355 .cloned()
356 .map(arb_datum_for_column)
357 .collect::<Vec<_>>()
358 .prop_map(|datums| Row::pack(datums.iter().map(Datum::from)));
359 proptest::collection::vec(rows, 1..max_rows)
360 .prop_map(move |rows| (desc.clone(), rows))
361 });
362
363 let mut all_stats = Vec::new();
364 for _ in 0..test_cases {
365 let value_tree = strat.new_tree(&mut runner).unwrap();
366 let (desc, rows) = value_tree.current();
367
368 let mut builder = PartBuilder::new(&desc, &UnitSchema);
369 for row in &rows {
370 builder.push(&SourceData(Ok(row.clone())), &(), 1u64, 1i64);
371 }
372 let part = builder.finish();
373
374 let key_col = part.key.as_struct();
375 let decoder = <RelationDesc as Schema<SourceData>>::decoder(&desc, key_col.clone())
376 .expect("success");
377 let key_stats = decoder.stats();
378
379 all_stats.push(key_stats);
380 }
381
382 insta::assert_json_snapshot!(all_stats);
383 }
384}