1use std::borrow::Cow;
15use std::collections::BTreeMap;
16use std::fmt::{Debug, Formatter};
17
18use anyhow::Context;
19use arrow::array::{BinaryArray, FixedSizeBinaryArray};
20use chrono::{NaiveDateTime, NaiveTime};
21use dec::OrderedDecimal;
22use mz_ore::soft_panic_or_log;
23use mz_persist_types::columnar::FixedSizeCodec;
24use mz_persist_types::stats::bytes::{BytesStats, FixedSizeBytesStats, FixedSizeBytesStatsKind};
25use mz_persist_types::stats::json::{JsonMapElementStats, JsonStats};
26use mz_persist_types::stats::primitive::PrimitiveStats;
27use mz_persist_types::stats::{
28 AtomicBytesStats, ColumnNullStats, ColumnStatKinds, ColumnStats, ColumnarStats,
29 PrimitiveStatsVariants,
30};
31use ordered_float::OrderedFloat;
32use prost::Message;
33use serde::Deserializer;
34use serde::de::{DeserializeSeed, Error, MapAccess, SeqAccess, Visitor};
35use uuid::Uuid;
36
37use crate::adt::date::Date;
38use crate::adt::datetime::PackedNaiveTime;
39use crate::adt::interval::{Interval, PackedInterval};
40use crate::adt::jsonb::{KeyClass, KeyClassifier, NumberParser};
41use crate::adt::numeric::{Numeric, PackedNumeric};
42use crate::adt::timestamp::{CheckedTimestamp, PackedNaiveDateTime};
43use crate::row::ProtoDatum;
44use crate::{Datum, RowArena, SqlScalarType};
45
46fn soft_expect_or_log<A, B: Debug>(result: Result<A, B>) -> Option<A> {
47 match result {
48 Ok(a) => Some(a),
49 Err(e) => {
50 soft_panic_or_log!("failed to decode stats: {e:?}");
51 None
52 }
53 }
54}
55
56pub fn fixed_stats_from_column(
59 col: &FixedSizeBinaryArray,
60 kind: FixedSizeBytesStatsKind,
61) -> ColumnStatKinds {
62 let lower = col.into_iter().filter_map(|x| x).min();
69 let upper = col.into_iter().filter_map(|x| x).max();
70
71 let default = || match kind {
74 FixedSizeBytesStatsKind::PackedTime => PackedNaiveTime::from_value(NaiveTime::default())
75 .as_bytes()
76 .to_vec(),
77 FixedSizeBytesStatsKind::PackedDateTime => {
78 PackedNaiveDateTime::from_value(NaiveDateTime::default())
79 .as_bytes()
80 .to_vec()
81 }
82 FixedSizeBytesStatsKind::PackedInterval => PackedInterval::from_value(Interval::default())
83 .as_bytes()
84 .to_vec(),
85 FixedSizeBytesStatsKind::PackedNumeric => {
86 unreachable!("Numeric is not stored in a fixed size byte array")
87 }
88 FixedSizeBytesStatsKind::Uuid => Uuid::default().as_bytes().to_vec(),
89 };
90
91 BytesStats::FixedSize(FixedSizeBytesStats {
92 lower: lower.map_or_else(default, Vec::from),
93 upper: upper.map_or_else(default, Vec::from),
94 kind,
95 })
96 .into()
97}
98
99pub fn col_values<'a>(
101 typ: &SqlScalarType,
102 stats: &'a ColumnStatKinds,
103 arena: &'a RowArena,
104) -> Option<(Datum<'a>, Datum<'a>)> {
105 use PrimitiveStatsVariants::*;
106
107 fn map_stats<'a, T, F>(stats: &'a T, f: F) -> Option<(Datum<'a>, Datum<'a>)>
109 where
110 T: ColumnStats,
111 F: Fn(T::Ref<'a>) -> Datum<'a>,
112 {
113 Some((f(stats.lower()?), f(stats.upper()?)))
114 }
115
116 match (typ, stats) {
117 (SqlScalarType::Bool, ColumnStatKinds::Primitive(Bool(stats))) => {
118 let map_datum = |val| if val { Datum::True } else { Datum::False };
119 map_stats(stats, map_datum)
120 }
121 (SqlScalarType::PgLegacyChar, ColumnStatKinds::Primitive(U8(stats))) => {
122 map_stats(stats, Datum::UInt8)
123 }
124 (SqlScalarType::UInt16, ColumnStatKinds::Primitive(U16(stats))) => {
125 map_stats(stats, Datum::UInt16)
126 }
127 (
128 SqlScalarType::UInt32
129 | SqlScalarType::Oid
130 | SqlScalarType::RegClass
131 | SqlScalarType::RegProc
132 | SqlScalarType::RegType,
133 ColumnStatKinds::Primitive(U32(stats)),
134 ) => map_stats(stats, Datum::UInt32),
135 (SqlScalarType::UInt64, ColumnStatKinds::Primitive(U64(stats))) => {
136 map_stats(stats, Datum::UInt64)
137 }
138 (SqlScalarType::Int16, ColumnStatKinds::Primitive(I16(stats))) => {
139 map_stats(stats, Datum::Int16)
140 }
141 (SqlScalarType::Int32, ColumnStatKinds::Primitive(I32(stats))) => {
142 map_stats(stats, Datum::Int32)
143 }
144 (SqlScalarType::Int64, ColumnStatKinds::Primitive(I64(stats))) => {
145 map_stats(stats, Datum::Int64)
146 }
147 (SqlScalarType::Float32, ColumnStatKinds::Primitive(F32(stats))) => {
148 map_stats(stats, |x| Datum::Float32(OrderedFloat(x)))
149 }
150 (SqlScalarType::Float64, ColumnStatKinds::Primitive(F64(stats))) => {
151 map_stats(stats, |x| Datum::Float64(OrderedFloat(x)))
152 }
153 (
154 SqlScalarType::Numeric { .. },
155 ColumnStatKinds::Bytes(BytesStats::FixedSize(FixedSizeBytesStats {
156 lower,
157 upper,
158 kind: FixedSizeBytesStatsKind::PackedNumeric,
159 })),
160 ) => {
161 let lower = soft_expect_or_log(PackedNumeric::from_bytes(lower))?.into_value();
162 let upper = soft_expect_or_log(PackedNumeric::from_bytes(upper))?.into_value();
163 Some((
164 Datum::Numeric(OrderedDecimal(lower)),
165 Datum::Numeric(OrderedDecimal(upper)),
166 ))
167 }
168 (
169 SqlScalarType::String
170 | SqlScalarType::PgLegacyName
171 | SqlScalarType::Char { .. }
172 | SqlScalarType::VarChar { .. },
173 ColumnStatKinds::Primitive(String(stats)),
174 ) => map_stats(stats, Datum::String),
175 (SqlScalarType::Bytes, ColumnStatKinds::Bytes(BytesStats::Primitive(stats))) => {
176 Some((Datum::Bytes(&stats.lower), Datum::Bytes(&stats.upper)))
177 }
178 (SqlScalarType::Date, ColumnStatKinds::Primitive(I32(stats))) => {
179 let lower = soft_expect_or_log(Date::from_pg_epoch(stats.lower))?;
180 let upper = soft_expect_or_log(Date::from_pg_epoch(stats.upper))?;
181 Some((Datum::Date(lower), Datum::Date(upper)))
182 }
183 (SqlScalarType::Time, ColumnStatKinds::Bytes(BytesStats::FixedSize(stats))) => {
184 let lower = soft_expect_or_log(PackedNaiveTime::from_bytes(&stats.lower))?.into_value();
185 let upper = soft_expect_or_log(PackedNaiveTime::from_bytes(&stats.upper))?.into_value();
186 Some((Datum::Time(lower), Datum::Time(upper)))
187 }
188 (SqlScalarType::Timestamp { .. }, ColumnStatKinds::Bytes(BytesStats::FixedSize(stats))) => {
189 let lower =
190 soft_expect_or_log(PackedNaiveDateTime::from_bytes(&stats.lower))?.into_value();
191 let lower =
192 CheckedTimestamp::from_timestamplike(lower).expect("failed to roundtrip timestamp");
193 let upper =
194 soft_expect_or_log(PackedNaiveDateTime::from_bytes(&stats.upper))?.into_value();
195 let upper =
196 CheckedTimestamp::from_timestamplike(upper).expect("failed to roundtrip timestamp");
197
198 Some((Datum::Timestamp(lower), Datum::Timestamp(upper)))
199 }
200 (
201 SqlScalarType::TimestampTz { .. },
202 ColumnStatKinds::Bytes(BytesStats::FixedSize(stats)),
203 ) => {
204 let lower = soft_expect_or_log(PackedNaiveDateTime::from_bytes(&stats.lower))?
205 .into_value()
206 .and_utc();
207 let lower = soft_expect_or_log(CheckedTimestamp::from_timestamplike(lower))?;
208 let upper = soft_expect_or_log(PackedNaiveDateTime::from_bytes(&stats.upper))?
209 .into_value()
210 .and_utc();
211 let upper = soft_expect_or_log(CheckedTimestamp::from_timestamplike(upper))?;
212
213 Some((Datum::TimestampTz(lower), Datum::TimestampTz(upper)))
214 }
215 (SqlScalarType::MzTimestamp, ColumnStatKinds::Primitive(U64(stats))) => {
216 map_stats(stats, |x| Datum::MzTimestamp(crate::Timestamp::from(x)))
217 }
218 (SqlScalarType::Interval, ColumnStatKinds::Bytes(BytesStats::FixedSize(stats))) => {
219 let lower = soft_expect_or_log(PackedInterval::from_bytes(&stats.lower))?.into_value();
220 let upper = soft_expect_or_log(PackedInterval::from_bytes(&stats.upper))?.into_value();
221 Some((Datum::Interval(lower), Datum::Interval(upper)))
222 }
223 (SqlScalarType::Uuid, ColumnStatKinds::Bytes(BytesStats::FixedSize(stats))) => {
224 let lower = soft_expect_or_log(Uuid::from_slice(&stats.lower))?;
225 let upper = soft_expect_or_log(Uuid::from_slice(&stats.upper))?;
226 Some((Datum::Uuid(lower), Datum::Uuid(upper)))
227 }
228 (SqlScalarType::Jsonb, ColumnStatKinds::Bytes(BytesStats::Json(_))) => None,
230 (
232 SqlScalarType::AclItem
233 | SqlScalarType::MzAclItem
234 | SqlScalarType::Range { .. }
235 | SqlScalarType::Array(_)
236 | SqlScalarType::Map { .. }
237 | SqlScalarType::List { .. }
238 | SqlScalarType::Record { .. }
239 | SqlScalarType::Int2Vector,
240 ColumnStatKinds::None,
241 ) => None,
242 (
244 SqlScalarType::Numeric { .. }
245 | SqlScalarType::Time
246 | SqlScalarType::Timestamp { .. }
247 | SqlScalarType::TimestampTz { .. }
248 | SqlScalarType::Interval
249 | SqlScalarType::Uuid,
250 ColumnStatKinds::Bytes(BytesStats::Atomic(AtomicBytesStats { lower, upper })),
251 ) => {
252 let lower = ProtoDatum::decode(lower.as_slice()).expect("should be a valid ProtoDatum");
253 let lower = arena.make_datum(|p| {
254 p.try_push_proto(&lower)
255 .expect("ProtoDatum should be valid Datum")
256 });
257 let upper = ProtoDatum::decode(upper.as_slice()).expect("should be a valid ProtoDatum");
258 let upper = arena.make_datum(|p| {
259 p.try_push_proto(&upper)
260 .expect("ProtoDatum should be valid Datum")
261 });
262
263 Some((lower, upper))
264 }
265 (typ, stats) => {
266 mz_ore::soft_panic_or_log!("found unexpected {stats:?} for column {typ:?}");
267 None
268 }
269 }
270}
271
272pub fn decode_numeric<'a>(
274 stats: &PrimitiveStats<Vec<u8>>,
275 arena: &'a RowArena,
276) -> Result<(Datum<'a>, Datum<'a>), anyhow::Error> {
277 fn decode<'a>(bytes: &[u8], arena: &'a RowArena) -> Result<Datum<'a>, anyhow::Error> {
278 let proto = ProtoDatum::decode(bytes)?;
279 let datum = arena.make_datum(|r| {
280 r.try_push_proto(&proto)
281 .expect("ProtoDatum should be valid Datum")
282 });
283 let Datum::Numeric(_) = &datum else {
284 anyhow::bail!("expected Numeric found {datum:?}");
285 };
286 Ok(datum)
287 }
288 let lower = decode(&stats.lower, arena).context("lower")?;
289 let upper = decode(&stats.upper, arena).context("upper")?;
290
291 Ok((lower, upper))
292}
293
294pub fn numeric_stats_from_column(col: &BinaryArray) -> ColumnStatKinds {
297 let mut lower = OrderedDecimal(Numeric::nan());
298 let mut upper = OrderedDecimal(-Numeric::infinity());
299
300 for val in col.iter() {
301 let Some(val) = val else {
302 continue;
303 };
304 let val = OrderedDecimal(
305 PackedNumeric::from_bytes(val)
306 .expect("failed to roundtrip Numeric")
307 .into_value(),
308 );
309 lower = val.min(lower);
310 upper = val.max(upper);
311 }
312
313 BytesStats::FixedSize(FixedSizeBytesStats {
314 lower: PackedNumeric::from_value(lower.0).as_bytes().to_vec(),
315 upper: PackedNumeric::from_value(upper.0).as_bytes().to_vec(),
316 kind: FixedSizeBytesStatsKind::PackedNumeric,
317 })
318 .into()
319}
320
321#[derive(Default)]
322struct JsonVisitor<'de> {
323 count: usize,
324 nulls: bool,
325 bools: Option<(bool, bool)>,
326 strings: Option<(Cow<'de, str>, Cow<'de, str>)>,
327 ints: Option<(i64, i64)>,
328 uints: Option<(u64, u64)>,
329 floats: Option<(f64, f64)>,
330 numerics: Option<(Numeric, Numeric)>,
331 lists: bool,
332 maps: bool,
333 fields: BTreeMap<Cow<'de, str>, JsonVisitor<'de>>,
334}
335
336impl<'de> JsonVisitor<'de> {
337 pub fn to_stats(self) -> JsonMapElementStats {
338 let mut context: dec::Context<Numeric> = Default::default();
339 let Self {
340 count,
341 nulls,
342 bools,
343 strings,
344 ints,
345 uints,
346 floats,
347 numerics,
348 lists,
349 maps,
350 fields,
351 } = self;
352 let min_numeric = [
353 numerics.map(|(n, _)| n),
354 ints.map(|(n, _)| n.into()),
355 uints.map(|(n, _)| n.into()),
356 floats.map(|(n, _)| n.into()),
357 ]
358 .into_iter()
359 .flatten()
360 .min_by(|a, b| context.total_cmp(a, b));
361 let max_numeric = [
362 numerics.map(|(_, n)| n),
363 ints.map(|(_, n)| n.into()),
364 uints.map(|(_, n)| n.into()),
365 floats.map(|(_, n)| n.into()),
366 ]
367 .into_iter()
368 .flatten()
369 .max_by(|a, b| context.total_cmp(a, b));
370
371 let stats = match (nulls, min_numeric, max_numeric, bools, strings, lists, maps) {
372 (false, None, None, None, None, false, false) => JsonStats::None,
373 (true, None, None, None, None, false, false) => JsonStats::JsonNulls,
374 (false, Some(min), Some(max), None, None, false, false) => {
375 JsonStats::Numerics(PrimitiveStats {
376 lower: ProtoDatum::from(Datum::Numeric(OrderedDecimal(min))).encode_to_vec(),
377 upper: ProtoDatum::from(Datum::Numeric(OrderedDecimal(max))).encode_to_vec(),
378 })
379 }
380 (false, None, None, Some((min, max)), None, false, false) => {
381 JsonStats::Bools(PrimitiveStats {
382 lower: min,
383 upper: max,
384 })
385 }
386 (false, None, None, None, Some((min, max)), false, false) => {
387 JsonStats::Strings(PrimitiveStats {
388 lower: min.into_owned(),
389 upper: max.into_owned(),
390 })
391 }
392 (false, None, None, None, None, true, false) => JsonStats::Lists,
393 (false, None, None, None, None, false, true) => JsonStats::Maps(
394 fields
395 .into_iter()
396 .map(|(k, v)| (k.into_owned(), v.to_stats()))
397 .collect(),
398 ),
399 _ => JsonStats::Mixed,
400 };
401
402 JsonMapElementStats { len: count, stats }
403 }
404}
405
406impl<'a, 'de> Visitor<'de> for &'a mut JsonVisitor<'de> {
407 type Value = ();
408
409 fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
410 write!(formatter, "json value")
411 }
412
413 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
414 where
415 E: Error,
416 {
417 self.count += 1;
418 let (min, max) = self.bools.get_or_insert((v, v));
419 *min = v.min(*min);
420 *max = v.max(*max);
421 Ok(())
422 }
423
424 fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
425 where
426 E: Error,
427 {
428 self.count += 1;
429 let (min, max) = self.ints.get_or_insert((v, v));
430 *min = v.min(*min);
431 *max = v.max(*max);
432 Ok(())
433 }
434
435 fn visit_u64<E>(self, v: u64) -> Result<(), E>
436 where
437 E: Error,
438 {
439 self.count += 1;
440 let (min, max) = self.uints.get_or_insert((v, v));
441 *min = v.min(*min);
442 *max = v.max(*max);
443 Ok(())
444 }
445
446 fn visit_f64<E>(self, v: f64) -> Result<(), E> {
447 self.count += 1;
448 let (min, max) = self.floats.get_or_insert((v, v));
449 *min = v.min(*min);
450 *max = v.max(*max);
451 Ok(())
452 }
453
454 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
455 where
456 E: Error,
457 {
458 self.count += 1;
459 match &mut self.strings {
460 None => {
461 self.strings = Some((v.to_owned().into(), v.to_owned().into()));
462 }
463 Some((min, max)) => {
464 if v < &**min {
465 *min = v.to_owned().into();
466 } else if v > &**max {
467 *max = v.to_owned().into();
468 }
469 }
470 }
471 Ok(())
472 }
473
474 fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E>
475 where
476 E: Error,
477 {
478 self.count += 1;
479 match &mut self.strings {
480 None => {
481 self.strings = Some((v.into(), v.into()));
482 }
483 Some((min, max)) => {
484 if v < &**min {
485 *min = v.into();
486 } else if v > &**max {
487 *max = v.into();
488 }
489 }
490 }
491 Ok(())
492 }
493
494 fn visit_unit<E>(self) -> Result<Self::Value, E>
495 where
496 E: Error,
497 {
498 self.count += 1;
499 self.nulls = true;
500 Ok(())
501 }
502
503 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
504 where
505 A: SeqAccess<'de>,
506 {
507 self.count += 1;
508 self.lists = true;
509 while let Some(_) = seq.next_element::<serde::de::IgnoredAny>()? {}
510 Ok(())
511 }
512
513 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
514 where
515 A: MapAccess<'de>,
516 {
517 self.count += 1;
518 let mut normal_only = true;
521 while let Some(key) = map.next_key_seed(KeyClassifier)? {
522 match key {
523 KeyClass::Number => {
524 let v = map.next_value_seed(NumberParser)?.0;
525 let (min, max) = self.numerics.get_or_insert((v, v));
526 if v < *min {
527 *min = v;
528 }
529 if v > *max {
530 *max = v;
531 }
532 normal_only = false;
533 }
534 KeyClass::MapKey(key) => {
535 let field = self.fields.entry(key).or_default();
536 map.next_value_seed(field)?;
537 }
538 }
539 }
540 if normal_only {
541 self.maps = true;
542 }
543
544 Ok(())
545 }
546}
547
548impl<'a, 'de> DeserializeSeed<'de> for &'a mut JsonVisitor<'de> {
549 type Value = ();
550
551 fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
552 where
553 D: Deserializer<'de>,
554 {
555 deserializer.deserialize_any(self)
556 }
557}
558
559pub fn stats_for_json<'a>(jsons: impl IntoIterator<Item = Option<&'a str>>) -> ColumnarStats {
560 let mut visitor = JsonVisitor::default();
561 let mut nulls = 0;
562 for json in jsons {
563 match json {
564 None => {
565 nulls += 1;
566 }
567 Some(json) => {
568 let () = serde_json::Deserializer::from_str(json)
569 .deserialize_any(&mut visitor)
570 .unwrap_or_else(|e| panic!("error {e:?} on json: {json}"));
571 }
572 }
573 }
574
575 ColumnarStats {
576 nulls: Some(ColumnNullStats { count: nulls }),
577 values: ColumnStatKinds::Bytes(BytesStats::Json(visitor.to_stats().stats)),
578 }
579}
580
581#[cfg(test)]
582mod tests {
583 use arrow::array::AsArray;
584 use mz_persist_types::codec_impls::UnitSchema;
585 use mz_persist_types::columnar::{ColumnDecoder, Schema};
586 use mz_persist_types::part::PartBuilder;
587 use mz_persist_types::stats::{ProtoStructStats, StructStats, TrimStats};
588 use mz_proto::RustType;
589 use proptest::prelude::*;
590 use uuid::Uuid;
591
592 use crate::{Datum, RelationDesc, Row, RowArena, SqlScalarType};
593
594 fn datum_stats_roundtrip_trim<'a>(
595 schema: &RelationDesc,
596 datums: impl IntoIterator<Item = &'a Row>,
597 ) {
598 let mut builder = PartBuilder::new(schema, &UnitSchema);
599 for datum in datums {
600 builder.push(datum, &(), 1u64, 1i64);
601 }
602 let part = builder.finish();
603
604 let key_col = part.key.as_struct();
605 let decoder =
606 <RelationDesc as Schema<Row>>::decoder(schema, key_col.clone()).expect("success");
607 let mut actual: ProtoStructStats = RustType::into_proto(&decoder.stats());
608
609 actual.trim();
615 let actual: StructStats = RustType::from_proto(actual).unwrap();
616 let arena = RowArena::default();
617 for (name, typ) in schema.iter() {
618 let col_stats = actual.col(name).unwrap();
619 crate::stats::col_values(&typ.scalar_type, &col_stats.values, &arena);
620 }
621 }
622
623 fn scalar_type_stats_roundtrip_trim(scalar_type: SqlScalarType) {
624 let mut rows = Vec::new();
625 for datum in scalar_type.interesting_datums() {
626 rows.push(Row::pack(std::iter::once(datum)));
627 }
628
629 let schema = RelationDesc::builder()
631 .with_column("col", scalar_type.clone().nullable(false))
632 .finish();
633 for row in rows.iter() {
634 datum_stats_roundtrip_trim(&schema, [row]);
635 }
636 datum_stats_roundtrip_trim(&schema, &rows[..]);
637
638 let schema = RelationDesc::builder()
640 .with_column("col", scalar_type.nullable(true))
641 .finish();
642 rows.push(Row::pack(std::iter::once(Datum::Null)));
643 for row in rows.iter() {
644 datum_stats_roundtrip_trim(&schema, [row]);
645 }
646 datum_stats_roundtrip_trim(&schema, &rows[..]);
647 }
648
649 #[mz_ore::test]
652 #[cfg_attr(miri, ignore)] fn all_scalar_types_stats_roundtrip_trim() {
654 proptest!(|(scalar_type in any::<SqlScalarType>())| {
655 scalar_type_stats_roundtrip_trim(scalar_type)
657 });
658 }
659
660 #[mz_ore::test]
661 #[cfg_attr(miri, ignore)] fn proptest_uuid_sort_order() {
663 fn test(mut og: Vec<Uuid>) {
664 let mut as_bytes: Vec<_> = og.iter().map(|u| u.as_bytes().clone()).collect();
665
666 og.sort();
667 as_bytes.sort();
668
669 let rnd: Vec<_> = as_bytes.into_iter().map(Uuid::from_bytes).collect();
670
671 assert_eq!(og, rnd);
672 }
673
674 let arb_uuid = any::<[u8; 16]>().prop_map(Uuid::from_bytes);
675 proptest!(|(uuids in proptest::collection::vec(arb_uuid, 0..128))| {
676 test(uuids);
677 });
678 }
679}