1use std::borrow::Cow;
15use std::collections::BTreeMap;
16use std::fmt::{Debug, Formatter};
17
18use anyhow::Context;
19use arrow::array::{BinaryArray, FixedSizeBinaryArray};
20use chrono::{NaiveDateTime, NaiveTime};
21use dec::OrderedDecimal;
22use mz_ore::soft_panic_or_log;
23use mz_persist_types::columnar::FixedSizeCodec;
24use mz_persist_types::stats::bytes::{BytesStats, FixedSizeBytesStats, FixedSizeBytesStatsKind};
25use mz_persist_types::stats::json::{JsonMapElementStats, JsonStats};
26use mz_persist_types::stats::primitive::PrimitiveStats;
27use mz_persist_types::stats::{
28 AtomicBytesStats, ColumnNullStats, ColumnStatKinds, ColumnStats, ColumnarStats,
29 PrimitiveStatsVariants,
30};
31use ordered_float::OrderedFloat;
32use prost::Message;
33use serde::Deserializer;
34use serde::de::{DeserializeSeed, Error, MapAccess, SeqAccess, Visitor};
35use uuid::Uuid;
36
37use crate::adt::date::Date;
38use crate::adt::datetime::PackedNaiveTime;
39use crate::adt::interval::{Interval, PackedInterval};
40use crate::adt::jsonb::{KeyClass, KeyClassifier, NumberParser};
41use crate::adt::numeric::{Numeric, PackedNumeric};
42use crate::adt::timestamp::{CheckedTimestamp, PackedNaiveDateTime};
43use crate::row::ProtoDatum;
44use crate::{Datum, RowArena, ScalarType};
45
46fn soft_expect_or_log<A, B: Debug>(result: Result<A, B>) -> Option<A> {
47 match result {
48 Ok(a) => Some(a),
49 Err(e) => {
50 soft_panic_or_log!("failed to decode stats: {e:?}");
51 None
52 }
53 }
54}
55
56pub fn fixed_stats_from_column(
59 col: &FixedSizeBinaryArray,
60 kind: FixedSizeBytesStatsKind,
61) -> ColumnStatKinds {
62 let lower = col.into_iter().filter_map(|x| x).min();
69 let upper = col.into_iter().filter_map(|x| x).max();
70
71 let default = || match kind {
74 FixedSizeBytesStatsKind::PackedTime => PackedNaiveTime::from_value(NaiveTime::default())
75 .as_bytes()
76 .to_vec(),
77 FixedSizeBytesStatsKind::PackedDateTime => {
78 PackedNaiveDateTime::from_value(NaiveDateTime::default())
79 .as_bytes()
80 .to_vec()
81 }
82 FixedSizeBytesStatsKind::PackedInterval => PackedInterval::from_value(Interval::default())
83 .as_bytes()
84 .to_vec(),
85 FixedSizeBytesStatsKind::PackedNumeric => {
86 unreachable!("Numeric is not stored in a fixed size byte array")
87 }
88 FixedSizeBytesStatsKind::Uuid => Uuid::default().as_bytes().to_vec(),
89 };
90
91 BytesStats::FixedSize(FixedSizeBytesStats {
92 lower: lower.map_or_else(default, Vec::from),
93 upper: upper.map_or_else(default, Vec::from),
94 kind,
95 })
96 .into()
97}
98
99pub fn col_values<'a>(
101 typ: &ScalarType,
102 stats: &'a ColumnStatKinds,
103 arena: &'a RowArena,
104) -> Option<(Datum<'a>, Datum<'a>)> {
105 use PrimitiveStatsVariants::*;
106
107 fn map_stats<'a, T, F>(stats: &'a T, f: F) -> Option<(Datum<'a>, Datum<'a>)>
109 where
110 T: ColumnStats,
111 F: Fn(T::Ref<'a>) -> Datum<'a>,
112 {
113 Some((f(stats.lower()?), f(stats.upper()?)))
114 }
115
116 match (typ, stats) {
117 (ScalarType::Bool, ColumnStatKinds::Primitive(Bool(stats))) => {
118 let map_datum = |val| if val { Datum::True } else { Datum::False };
119 map_stats(stats, map_datum)
120 }
121 (ScalarType::PgLegacyChar, ColumnStatKinds::Primitive(U8(stats))) => {
122 map_stats(stats, Datum::UInt8)
123 }
124 (ScalarType::UInt16, ColumnStatKinds::Primitive(U16(stats))) => {
125 map_stats(stats, Datum::UInt16)
126 }
127 (
128 ScalarType::UInt32
129 | ScalarType::Oid
130 | ScalarType::RegClass
131 | ScalarType::RegProc
132 | ScalarType::RegType,
133 ColumnStatKinds::Primitive(U32(stats)),
134 ) => map_stats(stats, Datum::UInt32),
135 (ScalarType::UInt64, ColumnStatKinds::Primitive(U64(stats))) => {
136 map_stats(stats, Datum::UInt64)
137 }
138 (ScalarType::Int16, ColumnStatKinds::Primitive(I16(stats))) => {
139 map_stats(stats, Datum::Int16)
140 }
141 (ScalarType::Int32, ColumnStatKinds::Primitive(I32(stats))) => {
142 map_stats(stats, Datum::Int32)
143 }
144 (ScalarType::Int64, ColumnStatKinds::Primitive(I64(stats))) => {
145 map_stats(stats, Datum::Int64)
146 }
147 (ScalarType::Float32, ColumnStatKinds::Primitive(F32(stats))) => {
148 map_stats(stats, |x| Datum::Float32(OrderedFloat(x)))
149 }
150 (ScalarType::Float64, ColumnStatKinds::Primitive(F64(stats))) => {
151 map_stats(stats, |x| Datum::Float64(OrderedFloat(x)))
152 }
153 (
154 ScalarType::Numeric { .. },
155 ColumnStatKinds::Bytes(BytesStats::FixedSize(FixedSizeBytesStats {
156 lower,
157 upper,
158 kind: FixedSizeBytesStatsKind::PackedNumeric,
159 })),
160 ) => {
161 let lower = soft_expect_or_log(PackedNumeric::from_bytes(lower))?.into_value();
162 let upper = soft_expect_or_log(PackedNumeric::from_bytes(upper))?.into_value();
163 Some((
164 Datum::Numeric(OrderedDecimal(lower)),
165 Datum::Numeric(OrderedDecimal(upper)),
166 ))
167 }
168 (
169 ScalarType::String
170 | ScalarType::PgLegacyName
171 | ScalarType::Char { .. }
172 | ScalarType::VarChar { .. },
173 ColumnStatKinds::Primitive(String(stats)),
174 ) => map_stats(stats, Datum::String),
175 (ScalarType::Bytes, ColumnStatKinds::Bytes(BytesStats::Primitive(stats))) => {
176 Some((Datum::Bytes(&stats.lower), Datum::Bytes(&stats.upper)))
177 }
178 (ScalarType::Date, ColumnStatKinds::Primitive(I32(stats))) => {
179 let lower = soft_expect_or_log(Date::from_pg_epoch(stats.lower))?;
180 let upper = soft_expect_or_log(Date::from_pg_epoch(stats.upper))?;
181 Some((Datum::Date(lower), Datum::Date(upper)))
182 }
183 (ScalarType::Time, ColumnStatKinds::Bytes(BytesStats::FixedSize(stats))) => {
184 let lower = soft_expect_or_log(PackedNaiveTime::from_bytes(&stats.lower))?.into_value();
185 let upper = soft_expect_or_log(PackedNaiveTime::from_bytes(&stats.upper))?.into_value();
186 Some((Datum::Time(lower), Datum::Time(upper)))
187 }
188 (ScalarType::Timestamp { .. }, ColumnStatKinds::Bytes(BytesStats::FixedSize(stats))) => {
189 let lower =
190 soft_expect_or_log(PackedNaiveDateTime::from_bytes(&stats.lower))?.into_value();
191 let lower =
192 CheckedTimestamp::from_timestamplike(lower).expect("failed to roundtrip timestamp");
193 let upper =
194 soft_expect_or_log(PackedNaiveDateTime::from_bytes(&stats.upper))?.into_value();
195 let upper =
196 CheckedTimestamp::from_timestamplike(upper).expect("failed to roundtrip timestamp");
197
198 Some((Datum::Timestamp(lower), Datum::Timestamp(upper)))
199 }
200 (ScalarType::TimestampTz { .. }, ColumnStatKinds::Bytes(BytesStats::FixedSize(stats))) => {
201 let lower = soft_expect_or_log(PackedNaiveDateTime::from_bytes(&stats.lower))?
202 .into_value()
203 .and_utc();
204 let lower = soft_expect_or_log(CheckedTimestamp::from_timestamplike(lower))?;
205 let upper = soft_expect_or_log(PackedNaiveDateTime::from_bytes(&stats.upper))?
206 .into_value()
207 .and_utc();
208 let upper = soft_expect_or_log(CheckedTimestamp::from_timestamplike(upper))?;
209
210 Some((Datum::TimestampTz(lower), Datum::TimestampTz(upper)))
211 }
212 (ScalarType::MzTimestamp, ColumnStatKinds::Primitive(U64(stats))) => {
213 map_stats(stats, |x| Datum::MzTimestamp(crate::Timestamp::from(x)))
214 }
215 (ScalarType::Interval, ColumnStatKinds::Bytes(BytesStats::FixedSize(stats))) => {
216 let lower = soft_expect_or_log(PackedInterval::from_bytes(&stats.lower))?.into_value();
217 let upper = soft_expect_or_log(PackedInterval::from_bytes(&stats.upper))?.into_value();
218 Some((Datum::Interval(lower), Datum::Interval(upper)))
219 }
220 (ScalarType::Uuid, ColumnStatKinds::Bytes(BytesStats::FixedSize(stats))) => {
221 let lower = soft_expect_or_log(Uuid::from_slice(&stats.lower))?;
222 let upper = soft_expect_or_log(Uuid::from_slice(&stats.upper))?;
223 Some((Datum::Uuid(lower), Datum::Uuid(upper)))
224 }
225 (ScalarType::Jsonb, ColumnStatKinds::Bytes(BytesStats::Json(_))) => None,
227 (
229 ScalarType::AclItem
230 | ScalarType::MzAclItem
231 | ScalarType::Range { .. }
232 | ScalarType::Array(_)
233 | ScalarType::Map { .. }
234 | ScalarType::List { .. }
235 | ScalarType::Record { .. }
236 | ScalarType::Int2Vector,
237 ColumnStatKinds::None,
238 ) => None,
239 (
241 ScalarType::Numeric { .. }
242 | ScalarType::Time
243 | ScalarType::Timestamp { .. }
244 | ScalarType::TimestampTz { .. }
245 | ScalarType::Interval
246 | ScalarType::Uuid,
247 ColumnStatKinds::Bytes(BytesStats::Atomic(AtomicBytesStats { lower, upper })),
248 ) => {
249 let lower = ProtoDatum::decode(lower.as_slice()).expect("should be a valid ProtoDatum");
250 let lower = arena.make_datum(|p| {
251 p.try_push_proto(&lower)
252 .expect("ProtoDatum should be valid Datum")
253 });
254 let upper = ProtoDatum::decode(upper.as_slice()).expect("should be a valid ProtoDatum");
255 let upper = arena.make_datum(|p| {
256 p.try_push_proto(&upper)
257 .expect("ProtoDatum should be valid Datum")
258 });
259
260 Some((lower, upper))
261 }
262 (typ, stats) => {
263 mz_ore::soft_panic_or_log!("found unexpected {stats:?} for column {typ:?}");
264 None
265 }
266 }
267}
268
269pub fn decode_numeric<'a>(
271 stats: &PrimitiveStats<Vec<u8>>,
272 arena: &'a RowArena,
273) -> Result<(Datum<'a>, Datum<'a>), anyhow::Error> {
274 fn decode<'a>(bytes: &[u8], arena: &'a RowArena) -> Result<Datum<'a>, anyhow::Error> {
275 let proto = ProtoDatum::decode(bytes)?;
276 let datum = arena.make_datum(|r| {
277 r.try_push_proto(&proto)
278 .expect("ProtoDatum should be valid Datum")
279 });
280 let Datum::Numeric(_) = &datum else {
281 anyhow::bail!("expected Numeric found {datum:?}");
282 };
283 Ok(datum)
284 }
285 let lower = decode(&stats.lower, arena).context("lower")?;
286 let upper = decode(&stats.upper, arena).context("upper")?;
287
288 Ok((lower, upper))
289}
290
291pub fn numeric_stats_from_column(col: &BinaryArray) -> ColumnStatKinds {
294 let mut lower = OrderedDecimal(Numeric::nan());
295 let mut upper = OrderedDecimal(-Numeric::infinity());
296
297 for val in col.iter() {
298 let Some(val) = val else {
299 continue;
300 };
301 let val = OrderedDecimal(
302 PackedNumeric::from_bytes(val)
303 .expect("failed to roundtrip Numeric")
304 .into_value(),
305 );
306 lower = val.min(lower);
307 upper = val.max(upper);
308 }
309
310 BytesStats::FixedSize(FixedSizeBytesStats {
311 lower: PackedNumeric::from_value(lower.0).as_bytes().to_vec(),
312 upper: PackedNumeric::from_value(upper.0).as_bytes().to_vec(),
313 kind: FixedSizeBytesStatsKind::PackedNumeric,
314 })
315 .into()
316}
317
318#[derive(Default)]
319struct JsonVisitor<'de> {
320 count: usize,
321 nulls: bool,
322 bools: Option<(bool, bool)>,
323 strings: Option<(Cow<'de, str>, Cow<'de, str>)>,
324 ints: Option<(i64, i64)>,
325 uints: Option<(u64, u64)>,
326 floats: Option<(f64, f64)>,
327 numerics: Option<(Numeric, Numeric)>,
328 lists: bool,
329 maps: bool,
330 fields: BTreeMap<Cow<'de, str>, JsonVisitor<'de>>,
331}
332
333impl<'de> JsonVisitor<'de> {
334 pub fn to_stats(self) -> JsonMapElementStats {
335 let mut context: dec::Context<Numeric> = Default::default();
336 let Self {
337 count,
338 nulls,
339 bools,
340 strings,
341 ints,
342 uints,
343 floats,
344 numerics,
345 lists,
346 maps,
347 fields,
348 } = self;
349 let min_numeric = [
350 numerics.map(|(n, _)| n),
351 ints.map(|(n, _)| n.into()),
352 uints.map(|(n, _)| n.into()),
353 floats.map(|(n, _)| n.into()),
354 ]
355 .into_iter()
356 .flatten()
357 .min_by(|a, b| context.total_cmp(a, b));
358 let max_numeric = [
359 numerics.map(|(_, n)| n),
360 ints.map(|(_, n)| n.into()),
361 uints.map(|(_, n)| n.into()),
362 floats.map(|(_, n)| n.into()),
363 ]
364 .into_iter()
365 .flatten()
366 .max_by(|a, b| context.total_cmp(a, b));
367
368 let stats = match (nulls, min_numeric, max_numeric, bools, strings, lists, maps) {
369 (false, None, None, None, None, false, false) => JsonStats::None,
370 (true, None, None, None, None, false, false) => JsonStats::JsonNulls,
371 (false, Some(min), Some(max), None, None, false, false) => {
372 JsonStats::Numerics(PrimitiveStats {
373 lower: ProtoDatum::from(Datum::Numeric(OrderedDecimal(min))).encode_to_vec(),
374 upper: ProtoDatum::from(Datum::Numeric(OrderedDecimal(max))).encode_to_vec(),
375 })
376 }
377 (false, None, None, Some((min, max)), None, false, false) => {
378 JsonStats::Bools(PrimitiveStats {
379 lower: min,
380 upper: max,
381 })
382 }
383 (false, None, None, None, Some((min, max)), false, false) => {
384 JsonStats::Strings(PrimitiveStats {
385 lower: min.into_owned(),
386 upper: max.into_owned(),
387 })
388 }
389 (false, None, None, None, None, true, false) => JsonStats::Lists,
390 (false, None, None, None, None, false, true) => JsonStats::Maps(
391 fields
392 .into_iter()
393 .map(|(k, v)| (k.into_owned(), v.to_stats()))
394 .collect(),
395 ),
396 _ => JsonStats::Mixed,
397 };
398
399 JsonMapElementStats { len: count, stats }
400 }
401}
402
403impl<'a, 'de> Visitor<'de> for &'a mut JsonVisitor<'de> {
404 type Value = ();
405
406 fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
407 write!(formatter, "json value")
408 }
409
410 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
411 where
412 E: Error,
413 {
414 self.count += 1;
415 let (min, max) = self.bools.get_or_insert((v, v));
416 *min = v.min(*min);
417 *max = v.max(*max);
418 Ok(())
419 }
420
421 fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
422 where
423 E: Error,
424 {
425 self.count += 1;
426 let (min, max) = self.ints.get_or_insert((v, v));
427 *min = v.min(*min);
428 *max = v.max(*max);
429 Ok(())
430 }
431
432 fn visit_u64<E>(self, v: u64) -> Result<(), E>
433 where
434 E: Error,
435 {
436 self.count += 1;
437 let (min, max) = self.uints.get_or_insert((v, v));
438 *min = v.min(*min);
439 *max = v.max(*max);
440 Ok(())
441 }
442
443 fn visit_f64<E>(self, v: f64) -> Result<(), E> {
444 self.count += 1;
445 let (min, max) = self.floats.get_or_insert((v, v));
446 *min = v.min(*min);
447 *max = v.max(*max);
448 Ok(())
449 }
450
451 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
452 where
453 E: Error,
454 {
455 self.count += 1;
456 match &mut self.strings {
457 None => {
458 self.strings = Some((v.to_owned().into(), v.to_owned().into()));
459 }
460 Some((min, max)) => {
461 if v < &**min {
462 *min = v.to_owned().into();
463 } else if v > &**max {
464 *max = v.to_owned().into();
465 }
466 }
467 }
468 Ok(())
469 }
470
471 fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E>
472 where
473 E: Error,
474 {
475 self.count += 1;
476 match &mut self.strings {
477 None => {
478 self.strings = Some((v.into(), v.into()));
479 }
480 Some((min, max)) => {
481 if v < &**min {
482 *min = v.into();
483 } else if v > &**max {
484 *max = v.into();
485 }
486 }
487 }
488 Ok(())
489 }
490
491 fn visit_unit<E>(self) -> Result<Self::Value, E>
492 where
493 E: Error,
494 {
495 self.count += 1;
496 self.nulls = true;
497 Ok(())
498 }
499
500 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
501 where
502 A: SeqAccess<'de>,
503 {
504 self.count += 1;
505 self.lists = true;
506 while let Some(_) = seq.next_element::<serde::de::IgnoredAny>()? {}
507 Ok(())
508 }
509
510 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
511 where
512 A: MapAccess<'de>,
513 {
514 self.count += 1;
515 let mut normal_only = true;
518 while let Some(key) = map.next_key_seed(KeyClassifier)? {
519 match key {
520 KeyClass::Number => {
521 let v = map.next_value_seed(NumberParser)?.0;
522 let (min, max) = self.numerics.get_or_insert((v, v));
523 if v < *min {
524 *min = v;
525 }
526 if v > *max {
527 *max = v;
528 }
529 normal_only = false;
530 }
531 KeyClass::MapKey(key) => {
532 let field = self.fields.entry(key).or_default();
533 map.next_value_seed(field)?;
534 }
535 }
536 }
537 if normal_only {
538 self.maps = true;
539 }
540
541 Ok(())
542 }
543}
544
545impl<'a, 'de> DeserializeSeed<'de> for &'a mut JsonVisitor<'de> {
546 type Value = ();
547
548 fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
549 where
550 D: Deserializer<'de>,
551 {
552 deserializer.deserialize_any(self)
553 }
554}
555
556pub fn stats_for_json<'a>(jsons: impl IntoIterator<Item = Option<&'a str>>) -> ColumnarStats {
557 let mut visitor = JsonVisitor::default();
558 let mut nulls = 0;
559 for json in jsons {
560 match json {
561 None => {
562 nulls += 1;
563 }
564 Some(json) => {
565 let () = serde_json::Deserializer::from_str(json)
566 .deserialize_any(&mut visitor)
567 .unwrap_or_else(|e| panic!("error {e:?} on json: {json}"));
568 }
569 }
570 }
571
572 ColumnarStats {
573 nulls: Some(ColumnNullStats { count: nulls }),
574 values: ColumnStatKinds::Bytes(BytesStats::Json(visitor.to_stats().stats)),
575 }
576}
577
578#[cfg(test)]
579mod tests {
580 use arrow::array::AsArray;
581 use mz_persist_types::codec_impls::UnitSchema;
582 use mz_persist_types::columnar::{ColumnDecoder, Schema};
583 use mz_persist_types::part::PartBuilder;
584 use mz_persist_types::stats::{ProtoStructStats, StructStats, TrimStats};
585 use mz_proto::RustType;
586 use proptest::prelude::*;
587 use uuid::Uuid;
588
589 use crate::{Datum, RelationDesc, Row, RowArena, ScalarType};
590
591 fn datum_stats_roundtrip_trim<'a>(
592 schema: &RelationDesc,
593 datums: impl IntoIterator<Item = &'a Row>,
594 ) {
595 let mut builder = PartBuilder::new(schema, &UnitSchema);
596 for datum in datums {
597 builder.push(datum, &(), 1u64, 1i64);
598 }
599 let part = builder.finish();
600
601 let key_col = part.key.as_struct();
602 let decoder =
603 <RelationDesc as Schema<Row>>::decoder(schema, key_col.clone()).expect("success");
604 let mut actual: ProtoStructStats = RustType::into_proto(&decoder.stats());
605
606 actual.trim();
612 let actual: StructStats = RustType::from_proto(actual).unwrap();
613 let arena = RowArena::default();
614 for (name, typ) in schema.iter() {
615 let col_stats = actual.col(name.as_str()).unwrap();
616 crate::stats::col_values(&typ.scalar_type, &col_stats.values, &arena);
617 }
618 }
619
620 fn scalar_type_stats_roundtrip_trim(scalar_type: ScalarType) {
621 let mut rows = Vec::new();
622 for datum in scalar_type.interesting_datums() {
623 rows.push(Row::pack(std::iter::once(datum)));
624 }
625
626 let schema = RelationDesc::builder()
628 .with_column("col", scalar_type.clone().nullable(false))
629 .finish();
630 for row in rows.iter() {
631 datum_stats_roundtrip_trim(&schema, [row]);
632 }
633 datum_stats_roundtrip_trim(&schema, &rows[..]);
634
635 let schema = RelationDesc::builder()
637 .with_column("col", scalar_type.nullable(true))
638 .finish();
639 rows.push(Row::pack(std::iter::once(Datum::Null)));
640 for row in rows.iter() {
641 datum_stats_roundtrip_trim(&schema, [row]);
642 }
643 datum_stats_roundtrip_trim(&schema, &rows[..]);
644 }
645
646 #[mz_ore::test]
649 #[cfg_attr(miri, ignore)] fn all_scalar_types_stats_roundtrip_trim() {
651 proptest!(|(scalar_type in any::<ScalarType>())| {
652 scalar_type_stats_roundtrip_trim(scalar_type)
654 });
655 }
656
657 #[mz_ore::test]
658 #[cfg_attr(miri, ignore)] fn proptest_uuid_sort_order() {
660 fn test(mut og: Vec<Uuid>) {
661 let mut as_bytes: Vec<_> = og.iter().map(|u| u.as_bytes().clone()).collect();
662
663 og.sort();
664 as_bytes.sort();
665
666 let rnd: Vec<_> = as_bytes.into_iter().map(Uuid::from_bytes).collect();
667
668 assert_eq!(og, rnd);
669 }
670
671 let arb_uuid = any::<[u8; 16]>().prop_map(Uuid::from_bytes);
672 proptest!(|(uuids in proptest::collection::vec(arb_uuid, 0..128))| {
673 test(uuids);
674 });
675 }
676}