1use mz_expr::{ColumnSpecs, Interpreter, MapFilterProject, ResultSpec, UnmaterializableFunc};
13use mz_persist_types::stats::{
14 BytesStats, ColumnStatKinds, JsonStats, PartStats, PartStatsMetrics,
15};
16use mz_repr::{ColumnIndex, ColumnType, Datum, RelationDesc, RowArena, ScalarType};
17
18#[derive(Debug)]
21pub struct RelationPartStats<'a> {
22 pub(crate) name: &'a str,
23 pub(crate) metrics: &'a PartStatsMetrics,
24 pub(crate) desc: &'a RelationDesc,
25 pub(crate) stats: &'a PartStats,
26}
27
28impl<'a> RelationPartStats<'a> {
29 pub fn new(
30 name: &'a str,
31 metrics: &'a PartStatsMetrics,
32 desc: &'a RelationDesc,
33 stats: &'a PartStats,
34 ) -> Self {
35 Self {
36 name,
37 metrics,
38 desc,
39 stats,
40 }
41 }
42}
43
44impl RelationPartStats<'_> {
45 pub fn may_match_mfp<'a>(&'a self, time_range: ResultSpec<'a>, mfp: &MapFilterProject) -> bool {
46 let arena = RowArena::new();
47 let mut ranges = ColumnSpecs::new(self.desc.typ(), &arena);
48 ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
49
50 if self.err_count().into_iter().any(|count| count > 0) {
51 return true;
53 }
54
55 for (pos, (idx, _name, _typ)) in self.desc.iter_all().enumerate() {
56 let result_spec = self.col_stats(idx, &arena);
57 ranges.push_column(pos, result_spec);
58 }
59 let result = ranges.mfp_filter(mfp).range;
60 result.may_contain(Datum::True) || result.may_fail()
61 }
62
63 fn json_spec<'a>(len: usize, stats: &'a JsonStats, arena: &'a RowArena) -> ResultSpec<'a> {
64 match stats {
65 JsonStats::JsonNulls => ResultSpec::value(Datum::JsonNull),
66 JsonStats::Bools(bools) => {
67 ResultSpec::value_between(bools.lower.into(), bools.upper.into())
68 }
69 JsonStats::Strings(strings) => ResultSpec::value_between(
70 Datum::String(strings.lower.as_str()),
71 Datum::String(strings.upper.as_str()),
72 ),
73 JsonStats::Numerics(numerics) => {
74 match mz_repr::stats::decode_numeric(numerics, arena) {
75 Ok((lower, upper)) => ResultSpec::value_between(lower, upper),
76 Err(err) => {
77 tracing::error!(%err, "failed to decode Json Numeric stats!");
78 ResultSpec::anything()
79 }
80 }
81 }
82 JsonStats::Maps(maps) => {
83 ResultSpec::map_spec(
84 maps.into_iter()
85 .map(|(k, v)| {
86 let mut v_spec = Self::json_spec(v.len, &v.stats, arena);
87 if v.len != len {
88 v_spec = v_spec.union(ResultSpec::null());
91 }
92 let key = arena.make_datum(|r| r.push(Datum::String(k.as_str())));
93 (key, v_spec)
94 })
95 .collect(),
96 )
97 }
98 JsonStats::None => ResultSpec::nothing(),
99 JsonStats::Lists | JsonStats::Mixed => ResultSpec::anything(),
100 }
101 }
102
103 pub fn col_stats<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> ResultSpec<'a> {
104 let value_range = match self.col_values(idx, arena) {
105 Some(spec) => spec,
106 None => ResultSpec::anything(),
107 };
108 let json_range = self
109 .col_json(idx, arena)
110 .unwrap_or_else(ResultSpec::anything);
111
112 value_range.intersect(json_range)
115 }
116
117 fn col_json<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option<ResultSpec<'a>> {
118 let name = self.desc.get_name_idx(idx);
119 let typ = &self.desc.get_type(idx);
120
121 let ok_stats = self.stats.key.col("ok")?;
122 let ok_stats = ok_stats
123 .try_as_optional_struct()
124 .expect("ok column should be nullable struct");
125 let col_stats = ok_stats.some.cols.get(name.as_str())?;
126
127 if let ColumnType {
128 scalar_type: ScalarType::Jsonb,
129 nullable,
130 } = typ
131 {
132 let value_range = match &col_stats.values {
133 ColumnStatKinds::Bytes(BytesStats::Json(json_stats)) => {
134 Self::json_spec(ok_stats.some.len, json_stats, arena)
135 }
136 ColumnStatKinds::Bytes(
137 BytesStats::Primitive(_) | BytesStats::Atomic(_) | BytesStats::FixedSize(_),
138 ) => ResultSpec::anything(),
139 other => {
140 self.metrics.mismatched_count.inc();
141 tracing::error!(
142 "expected BytesStats for JSON column {}, found {other:?}",
143 self.name
144 );
145 return None;
146 }
147 };
148 let null_range = match (nullable, col_stats.nulls) {
149 (false, None) => ResultSpec::nothing(),
150 (true, Some(nulls)) if nulls.count == 0 => ResultSpec::nothing(),
151 (true, Some(_)) => ResultSpec::null(),
152 (col_null, stats_null) => {
153 self.metrics.mismatched_count.inc();
154 tracing::error!(
155 "JSON column nullability mismatch, col {} null: {col_null}, stats: {stats_null:?}",
156 self.name
157 );
158 return None;
159 }
160 };
161
162 Some(null_range.union(value_range))
163 } else {
164 None
165 }
166 }
167
168 pub fn len(&self) -> Option<usize> {
169 Some(self.stats.key.len)
170 }
171
172 pub fn ok_count(&self) -> Option<usize> {
173 let stats = self
175 .stats
176 .key
177 .col("err")?
178 .try_as_optional_bytes()
179 .expect("err column should be a Option<Vec<u8>>");
180 Some(stats.none)
181 }
182
183 pub fn err_count(&self) -> Option<usize> {
184 let num_results = self.stats.key.len;
189 let num_oks = self.ok_count();
190 num_oks.map(|num_oks| num_results - num_oks)
191 }
192
193 fn col_values<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option<ResultSpec<'a>> {
194 let name = self.desc.get_name_idx(idx);
195 let typ = self.desc.get_type(idx);
196
197 let ok_stats = self.stats.key.cols.get("ok")?;
198 let ColumnStatKinds::Struct(ok_stats) = &ok_stats.values else {
199 panic!("'ok' column stats should be a struct")
200 };
201 let col_stats = ok_stats.cols.get(name.as_str())?;
202
203 let min_max = mz_repr::stats::col_values(&typ.scalar_type, &col_stats.values, arena);
204 let null_count = col_stats.nulls.as_ref().map_or(0, |nulls| nulls.count);
205 let total_count = self.len();
206
207 let values = match (total_count, min_max) {
208 (Some(total_count), _) if total_count == null_count => ResultSpec::nothing(),
209 (_, Some((min, max))) => ResultSpec::value_between(min, max),
210 _ => ResultSpec::value_all(),
211 };
212 let nulls = if null_count > 0 {
213 ResultSpec::null()
214 } else {
215 ResultSpec::nothing()
216 };
217
218 Some(values.union(nulls))
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use arrow::array::AsArray;
225 use mz_ore::metrics::MetricsRegistry;
226 use mz_persist_types::codec_impls::UnitSchema;
227 use mz_persist_types::columnar::{ColumnDecoder, Schema};
228 use mz_persist_types::part::PartBuilder;
229 use mz_persist_types::stats::PartStats;
230 use mz_repr::{ColumnType, Datum, RelationDesc, Row, RowArena, ScalarType};
231 use mz_repr::{RelationType, arb_datum_for_column};
232 use proptest::prelude::*;
233 use proptest::strategy::ValueTree;
234
235 use super::*;
236 use crate::sources::SourceData;
237
238 fn validate_stats(column_type: &ColumnType, datums: &[Datum<'_>]) -> Result<(), String> {
239 let schema = RelationDesc::builder()
240 .with_column("col", column_type.clone())
241 .finish();
242
243 let mut builder = PartBuilder::new(&schema, &UnitSchema);
244 let mut row = SourceData(Ok(Row::default()));
245 for datum in datums {
246 row.as_mut().unwrap().packer().push(datum);
247 builder.push(&row, &(), 1u64, 1i64);
248 }
249 let part = builder.finish();
250
251 let key_col = part.key.as_struct();
252 let decoder = <RelationDesc as Schema<SourceData>>::decoder(&schema, key_col.clone())
253 .expect("success");
254 let key_stats = decoder.stats();
255
256 let metrics = PartStatsMetrics::new(&MetricsRegistry::new());
257 let stats = RelationPartStats {
258 name: "test",
259 metrics: &metrics,
260 stats: &PartStats { key: key_stats },
261 desc: &schema,
262 };
263 let arena = RowArena::default();
264
265 for datum in datums {
267 let spec = stats.col_stats(&ColumnIndex::from_raw(0), &arena);
268 assert!(spec.may_contain(*datum));
269 }
270
271 Ok(())
272 }
273
274 fn scalar_type_stats_roundtrip(scalar_type: ScalarType) {
275 let column_type = scalar_type.clone().nullable(false);
277 for datum in scalar_type.interesting_datums() {
278 assert_eq!(validate_stats(&column_type, &[datum]), Ok(()));
279 }
280
281 let column_type = scalar_type.clone().nullable(true);
283 for datum in scalar_type.interesting_datums() {
284 assert_eq!(validate_stats(&column_type, &[datum]), Ok(()));
285 }
286 assert_eq!(validate_stats(&column_type, &[Datum::Null]), Ok(()));
287 }
288
289 #[mz_ore::test]
290 #[cfg_attr(miri, ignore)] fn all_scalar_types_stats_roundtrip() {
292 proptest!(|(scalar_type in any::<ScalarType>())| {
293 scalar_type_stats_roundtrip(scalar_type)
295 });
296 }
297
298 #[mz_ore::test]
299 #[cfg_attr(miri, ignore)] fn all_datums_produce_valid_stats() {
301 let datums = any::<ColumnType>().prop_flat_map(|ty| {
303 prop::collection::vec(arb_datum_for_column(ty.clone()), 0..128)
304 .prop_map(move |datums| (ty.clone(), datums))
305 });
306
307 proptest!(
308 ProptestConfig::with_cases(80),
309 |((ty, datums) in datums)| {
310 let datums: Vec<_> = datums.iter().map(Datum::from).collect();
312 prop_assert_eq!(validate_stats(&ty, &datums[..]), Ok(()));
313 }
314 )
315 }
316
317 #[mz_ore::test]
318 #[ignore] fn statistics_stability() {
320 const RNG_SEED: [u8; 32] = [
323 0xf4, 0x16, 0x16, 0x48, 0xc3, 0xac, 0x77, 0xac, 0x72, 0x20, 0x0b, 0xea, 0x99, 0x67,
324 0x2d, 0x6d, 0xca, 0x9f, 0x76, 0xaf, 0x1b, 0x09, 0x73, 0xa0, 0x59, 0x22, 0x6d, 0xc5,
325 0x46, 0x39, 0x1c, 0x4a,
326 ];
327
328 let rng = proptest::test_runner::TestRng::from_seed(
329 proptest::test_runner::RngAlgorithm::ChaCha,
330 &RNG_SEED,
331 );
332 let config = proptest::test_runner::Config {
334 cases: u32::MAX,
336 rng_algorithm: proptest::test_runner::RngAlgorithm::ChaCha,
337 ..Default::default()
338 };
339 let mut runner = proptest::test_runner::TestRunner::new_with_rng(config, rng);
340
341 let max_cols = 4;
342 let max_rows = 8;
343 let test_cases = 1000;
344
345 let strat = proptest::collection::vec(any::<ColumnType>(), 1..max_cols)
348 .prop_map(|cols| {
349 let col_names = (0..cols.len()).map(|i| i.to_string());
350 RelationDesc::new(RelationType::new(cols), col_names)
351 })
352 .prop_flat_map(|desc| {
353 let rows = desc
354 .typ()
355 .columns()
356 .iter()
357 .cloned()
358 .map(arb_datum_for_column)
359 .collect::<Vec<_>>()
360 .prop_map(|datums| Row::pack(datums.iter().map(Datum::from)));
361 proptest::collection::vec(rows, 1..max_rows)
362 .prop_map(move |rows| (desc.clone(), rows))
363 });
364
365 let mut all_stats = Vec::new();
366 for _ in 0..test_cases {
367 let value_tree = strat.new_tree(&mut runner).unwrap();
368 let (desc, rows) = value_tree.current();
369
370 let mut builder = PartBuilder::new(&desc, &UnitSchema);
371 for row in &rows {
372 builder.push(&SourceData(Ok(row.clone())), &(), 1u64, 1i64);
373 }
374 let part = builder.finish();
375
376 let key_col = part.key.as_struct();
377 let decoder = <RelationDesc as Schema<SourceData>>::decoder(&desc, key_col.clone())
378 .expect("success");
379 let key_stats = decoder.stats();
380
381 all_stats.push(key_stats);
382 }
383
384 insta::assert_json_snapshot!(all_stats);
385 }
386}