1use std::fmt::Debug;
15use std::ops::AddAssign;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use arrow::array::{
20 Array, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBufferBuilder,
21 BooleanBuilder, FixedSizeBinaryArray, FixedSizeBinaryBuilder, Float32Array, Float32Builder,
22 Float64Array, Float64Builder, Int16Array, Int16Builder, Int32Array, Int32Builder, Int64Array,
23 Int64Builder, ListArray, ListBuilder, MapArray, StringArray, StringBuilder, StructArray,
24 UInt8Array, UInt8Builder, UInt16Array, UInt16Builder, UInt32Array, UInt32Builder, UInt64Array,
25 UInt64Builder, make_array,
26};
27use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
28use arrow::datatypes::{DataType, Field, Fields, ToByteSlice};
29use bytes::{BufMut, Bytes};
30use chrono::Timelike;
31use dec::{Context, Decimal, OrderedDecimal};
32use itertools::{EitherOrBoth, Itertools};
33use mz_ore::assert_none;
34use mz_ore::cast::CastFrom;
35use mz_persist_types::Codec;
36use mz_persist_types::arrow::ArrayOrd;
37use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, FixedSizeCodec, Schema};
38use mz_persist_types::stats::{
39 ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, FixedSizeBytesStatsKind,
40 OptionStats, PrimitiveStats, StructStats,
41};
42use mz_proto::chrono::ProtoNaiveTime;
43use mz_proto::{ProtoType, RustType, TryFromProtoError};
44use prost::Message;
45use uuid::Uuid;
46
47use crate::adt::array::{ArrayDimension, PackedArrayDimension};
48use crate::adt::date::Date;
49use crate::adt::datetime::PackedNaiveTime;
50use crate::adt::interval::PackedInterval;
51use crate::adt::jsonb::{JsonbPacker, JsonbRef};
52use crate::adt::mz_acl_item::{PackedAclItem, PackedMzAclItem};
53use crate::adt::numeric::{Numeric, PackedNumeric};
54use crate::adt::range::{Range, RangeInner, RangeLowerBound, RangeUpperBound};
55use crate::adt::timestamp::{CheckedTimestamp, PackedNaiveDateTime};
56use crate::row::proto_datum::DatumType;
57use crate::row::{
58 ProtoArray, ProtoArrayDimension, ProtoDatum, ProtoDatumOther, ProtoDict, ProtoDictElement,
59 ProtoNumeric, ProtoRange, ProtoRangeInner, ProtoRow,
60};
61use crate::stats::{fixed_stats_from_column, numeric_stats_from_column, stats_for_json};
62use crate::{Datum, ProtoRelationDesc, RelationDesc, Row, RowPacker, SqlScalarType, Timestamp};
63
64#[allow(clippy::as_conversions)]
69mod fixed_binary_sizes {
70 use super::*;
71
72 pub const TIME_FIXED_BYTES: i32 = PackedNaiveTime::SIZE as i32;
73 pub const TIMESTAMP_FIXED_BYTES: i32 = PackedNaiveDateTime::SIZE as i32;
74 pub const INTERVAL_FIXED_BYTES: i32 = PackedInterval::SIZE as i32;
75 pub const ACL_ITEM_FIXED_BYTES: i32 = PackedAclItem::SIZE as i32;
76 pub const _MZ_ACL_ITEM_FIXED_BYTES: i32 = PackedMzAclItem::SIZE as i32;
77 pub const ARRAY_DIMENSION_FIXED_BYTES: i32 = PackedArrayDimension::SIZE as i32;
78
79 pub const UUID_FIXED_BYTES: i32 = 16;
80 static_assertions::const_assert_eq!(UUID_FIXED_BYTES as usize, std::mem::size_of::<Uuid>());
81}
82use fixed_binary_sizes::*;
83
84pub fn preserves_order(scalar_type: &SqlScalarType) -> bool {
88 match scalar_type {
89 SqlScalarType::Bool
91 | SqlScalarType::Int16
92 | SqlScalarType::Int32
93 | SqlScalarType::Int64
94 | SqlScalarType::UInt16
95 | SqlScalarType::UInt32
96 | SqlScalarType::UInt64
97 | SqlScalarType::Date
98 | SqlScalarType::Time
99 | SqlScalarType::Timestamp { .. }
100 | SqlScalarType::TimestampTz { .. }
101 | SqlScalarType::Interval
102 | SqlScalarType::Bytes
103 | SqlScalarType::String
104 | SqlScalarType::Uuid
105 | SqlScalarType::MzTimestamp
106 | SqlScalarType::MzAclItem
107 | SqlScalarType::AclItem => true,
108 SqlScalarType::Record { fields, .. } => fields
110 .iter()
111 .all(|(_, field_type)| preserves_order(&field_type.scalar_type)),
112 SqlScalarType::Float32 | SqlScalarType::Float64 => false,
115 SqlScalarType::Numeric { .. } => false,
118 SqlScalarType::PgLegacyChar
121 | SqlScalarType::PgLegacyName
122 | SqlScalarType::Char { .. }
123 | SqlScalarType::VarChar { .. }
124 | SqlScalarType::Jsonb
125 | SqlScalarType::Array(_)
126 | SqlScalarType::List { .. }
127 | SqlScalarType::Oid
128 | SqlScalarType::Map { .. }
129 | SqlScalarType::RegProc
130 | SqlScalarType::RegType
131 | SqlScalarType::RegClass
132 | SqlScalarType::Int2Vector
133 | SqlScalarType::Range { .. } => false,
134 }
135}
136
137#[derive(Debug)]
139struct DatumEncoder {
140 nullable: bool,
141 encoder: DatumColumnEncoder,
142}
143
144impl DatumEncoder {
145 fn goodbytes(&self) -> usize {
146 self.encoder.goodbytes()
147 }
148
149 fn push(&mut self, datum: Datum) {
150 assert!(
151 !datum.is_null() || self.nullable,
152 "tried pushing Null into non-nullable column"
153 );
154 self.encoder.push(datum);
155 }
156
157 fn push_invalid(&mut self) {
158 self.encoder.push_invalid();
159 }
160
161 fn finish(self) -> ArrayRef {
162 self.encoder.finish()
163 }
164}
165
166#[derive(Debug)]
172enum DatumColumnEncoder {
173 Bool(BooleanBuilder),
174 U8(UInt8Builder),
175 U16(UInt16Builder),
176 U32(UInt32Builder),
177 U64(UInt64Builder),
178 I16(Int16Builder),
179 I32(Int32Builder),
180 I64(Int64Builder),
181 F32(Float32Builder),
182 F64(Float64Builder),
183 Numeric {
184 binary_values: BinaryBuilder,
186 approx_values: Float64Builder,
188 numeric_context: Context<Numeric>,
190 },
191 Bytes(BinaryBuilder),
192 String(StringBuilder),
193 Date(Int32Builder),
194 Time(FixedSizeBinaryBuilder),
195 Timestamp(FixedSizeBinaryBuilder),
196 TimestampTz(FixedSizeBinaryBuilder),
197 MzTimestamp(UInt64Builder),
198 Interval(FixedSizeBinaryBuilder),
199 Uuid(FixedSizeBinaryBuilder),
200 AclItem(FixedSizeBinaryBuilder),
201 MzAclItem(BinaryBuilder),
202 Range(BinaryBuilder),
203 Jsonb {
210 offsets: Vec<i32>,
212 buf: Vec<u8>,
214 nulls: Option<BooleanBufferBuilder>,
216 },
217 Array {
218 dims: ListBuilder<FixedSizeBinaryBuilder>,
220 val_lengths: Vec<usize>,
222 vals: Box<DatumColumnEncoder>,
224 nulls: Option<BooleanBufferBuilder>,
226 },
227 List {
228 lengths: Vec<usize>,
230 values: Box<DatumColumnEncoder>,
232 nulls: Option<BooleanBufferBuilder>,
234 },
235 Map {
236 lengths: Vec<usize>,
238 keys: StringBuilder,
240 vals: Box<DatumColumnEncoder>,
242 nulls: Option<BooleanBufferBuilder>,
244 },
245 Record {
246 fields: Vec<DatumEncoder>,
248 nulls: Option<BooleanBufferBuilder>,
250 length: usize,
252 },
253 RecordEmpty(BooleanBuilder),
258}
259
260impl DatumColumnEncoder {
261 fn goodbytes(&self) -> usize {
262 match self {
263 DatumColumnEncoder::Bool(a) => a.len(),
264 DatumColumnEncoder::U8(a) => a.values_slice().to_byte_slice().len(),
265 DatumColumnEncoder::U16(a) => a.values_slice().to_byte_slice().len(),
266 DatumColumnEncoder::U32(a) => a.values_slice().to_byte_slice().len(),
267 DatumColumnEncoder::U64(a) => a.values_slice().to_byte_slice().len(),
268 DatumColumnEncoder::I16(a) => a.values_slice().to_byte_slice().len(),
269 DatumColumnEncoder::I32(a) => a.values_slice().to_byte_slice().len(),
270 DatumColumnEncoder::I64(a) => a.values_slice().to_byte_slice().len(),
271 DatumColumnEncoder::F32(a) => a.values_slice().to_byte_slice().len(),
272 DatumColumnEncoder::F64(a) => a.values_slice().to_byte_slice().len(),
273 DatumColumnEncoder::Numeric {
274 binary_values,
275 approx_values,
276 ..
277 } => {
278 binary_values.values_slice().len()
279 + approx_values.values_slice().to_byte_slice().len()
280 }
281 DatumColumnEncoder::Bytes(a) => a.values_slice().len(),
282 DatumColumnEncoder::String(a) => a.values_slice().len(),
283 DatumColumnEncoder::Date(a) => a.values_slice().to_byte_slice().len(),
284 DatumColumnEncoder::Time(a) => a.len() * PackedNaiveTime::SIZE,
285 DatumColumnEncoder::Timestamp(a) => a.len() * PackedNaiveDateTime::SIZE,
286 DatumColumnEncoder::TimestampTz(a) => a.len() * PackedNaiveDateTime::SIZE,
287 DatumColumnEncoder::MzTimestamp(a) => a.values_slice().to_byte_slice().len(),
288 DatumColumnEncoder::Interval(a) => a.len() * PackedInterval::SIZE,
289 DatumColumnEncoder::Uuid(a) => a.len() * size_of::<Uuid>(),
290 DatumColumnEncoder::AclItem(a) => a.len() * PackedAclItem::SIZE,
291 DatumColumnEncoder::MzAclItem(a) => a.values_slice().len(),
292 DatumColumnEncoder::Range(a) => a.values_slice().len(),
293 DatumColumnEncoder::Jsonb { buf, .. } => buf.len(),
294 DatumColumnEncoder::Array { dims, vals, .. } => {
295 dims.len() * PackedArrayDimension::SIZE + vals.goodbytes()
296 }
297 DatumColumnEncoder::List { values, .. } => values.goodbytes(),
298 DatumColumnEncoder::Map { keys, vals, .. } => {
299 keys.values_slice().len() + vals.goodbytes()
300 }
301 DatumColumnEncoder::Record { fields, .. } => fields.iter().map(|f| f.goodbytes()).sum(),
302 DatumColumnEncoder::RecordEmpty(a) => a.len(),
303 }
304 }
305
306 fn push<'e, 'd>(&'e mut self, datum: Datum<'d>) {
307 match (self, datum) {
308 (DatumColumnEncoder::Bool(bool_builder), Datum::True) => {
309 bool_builder.append_value(true)
310 }
311 (DatumColumnEncoder::Bool(bool_builder), Datum::False) => {
312 bool_builder.append_value(false)
313 }
314 (DatumColumnEncoder::U8(builder), Datum::UInt8(val)) => builder.append_value(val),
315 (DatumColumnEncoder::U16(builder), Datum::UInt16(val)) => builder.append_value(val),
316 (DatumColumnEncoder::U32(builder), Datum::UInt32(val)) => builder.append_value(val),
317 (DatumColumnEncoder::U64(builder), Datum::UInt64(val)) => builder.append_value(val),
318 (DatumColumnEncoder::I16(builder), Datum::Int16(val)) => builder.append_value(val),
319 (DatumColumnEncoder::I32(builder), Datum::Int32(val)) => builder.append_value(val),
320 (DatumColumnEncoder::I64(builder), Datum::Int64(val)) => builder.append_value(val),
321 (DatumColumnEncoder::F32(builder), Datum::Float32(val)) => builder.append_value(*val),
322 (DatumColumnEncoder::F64(builder), Datum::Float64(val)) => builder.append_value(*val),
323 (
324 DatumColumnEncoder::Numeric {
325 approx_values,
326 binary_values,
327 numeric_context,
328 },
329 Datum::Numeric(val),
330 ) => {
331 let float_approx = numeric_context.try_into_f64(val.0).unwrap_or_else(|_| {
332 numeric_context.clear_status();
333 if val.0.is_negative() {
334 f64::NEG_INFINITY
335 } else {
336 f64::INFINITY
337 }
338 });
339 let packed = PackedNumeric::from_value(val.0);
340
341 approx_values.append_value(float_approx);
342 binary_values.append_value(packed.as_bytes());
343 }
344 (DatumColumnEncoder::String(builder), Datum::String(val)) => builder.append_value(val),
345 (DatumColumnEncoder::Bytes(builder), Datum::Bytes(val)) => builder.append_value(val),
346 (DatumColumnEncoder::Date(builder), Datum::Date(val)) => {
347 builder.append_value(val.pg_epoch_days())
348 }
349 (DatumColumnEncoder::Time(builder), Datum::Time(val)) => {
350 let packed = PackedNaiveTime::from_value(val);
351 builder
352 .append_value(packed.as_bytes())
353 .expect("known correct size");
354 }
355 (DatumColumnEncoder::Timestamp(builder), Datum::Timestamp(val)) => {
356 let packed = PackedNaiveDateTime::from_value(val.to_naive());
357 builder
358 .append_value(packed.as_bytes())
359 .expect("known correct size");
360 }
361 (DatumColumnEncoder::TimestampTz(builder), Datum::TimestampTz(val)) => {
362 let packed = PackedNaiveDateTime::from_value(val.to_naive());
363 builder
364 .append_value(packed.as_bytes())
365 .expect("known correct size");
366 }
367 (DatumColumnEncoder::MzTimestamp(builder), Datum::MzTimestamp(val)) => {
368 builder.append_value(val.into());
369 }
370 (DatumColumnEncoder::Interval(builder), Datum::Interval(val)) => {
371 let packed = PackedInterval::from_value(val);
372 builder
373 .append_value(packed.as_bytes())
374 .expect("known correct size");
375 }
376 (DatumColumnEncoder::Uuid(builder), Datum::Uuid(val)) => builder
377 .append_value(val.as_bytes())
378 .expect("known correct size"),
379 (DatumColumnEncoder::AclItem(builder), Datum::AclItem(val)) => {
380 let packed = PackedAclItem::from_value(val);
381 builder
382 .append_value(packed.as_bytes())
383 .expect("known correct size");
384 }
385 (DatumColumnEncoder::MzAclItem(builder), Datum::MzAclItem(val)) => {
386 let packed = PackedMzAclItem::from_value(val);
387 builder.append_value(packed.as_bytes());
388 }
389 (DatumColumnEncoder::Range(builder), d @ Datum::Range(_)) => {
390 let proto = ProtoDatum::from(d);
391 let bytes = proto.encode_to_vec();
392 builder.append_value(&bytes);
393 }
394 (
395 DatumColumnEncoder::Jsonb {
396 offsets,
397 buf,
398 nulls,
399 },
400 d @ Datum::JsonNull
401 | d @ Datum::True
402 | d @ Datum::False
403 | d @ Datum::Numeric(_)
404 | d @ Datum::String(_)
405 | d @ Datum::List(_)
406 | d @ Datum::Map(_),
407 ) => {
408 let mut buf = buf;
410 let json = JsonbRef::from_datum(d);
411
412 json.to_writer(&mut buf)
414 .expect("failed to serialize Datum to jsonb");
415 let offset: i32 = buf.len().try_into().expect("wrote more than 4GB of JSON");
416 offsets.push(offset);
417
418 if let Some(nulls) = nulls {
419 nulls.append(true);
420 }
421 }
422 (
423 DatumColumnEncoder::Array {
424 dims,
425 val_lengths,
426 vals,
427 nulls,
428 },
429 Datum::Array(array),
430 ) => {
431 for dimension in array.dims() {
433 let packed = PackedArrayDimension::from_value(dimension);
434 dims.values()
435 .append_value(packed.as_bytes())
436 .expect("known correct size");
437 }
438 dims.append(true);
439
440 let mut count = 0;
442 for datum in &array.elements() {
443 count += 1;
444 vals.push(datum);
445 }
446 val_lengths.push(count);
447
448 if let Some(nulls) = nulls {
449 nulls.append(true);
450 }
451 }
452 (
453 DatumColumnEncoder::List {
454 lengths,
455 values,
456 nulls,
457 },
458 Datum::List(list),
459 ) => {
460 let mut count = 0;
461 for datum in &list {
462 count += 1;
463 values.push(datum);
464 }
465 lengths.push(count);
466
467 if let Some(nulls) = nulls {
468 nulls.append(true);
469 }
470 }
471 (
472 DatumColumnEncoder::Map {
473 lengths,
474 keys,
475 vals,
476 nulls,
477 },
478 Datum::Map(map),
479 ) => {
480 let mut count = 0;
481 for (key, datum) in &map {
482 count += 1;
483 keys.append_value(key);
484 vals.push(datum);
485 }
486 lengths.push(count);
487
488 if let Some(nulls) = nulls {
489 nulls.append(true);
490 }
491 }
492 (
493 DatumColumnEncoder::Record {
494 fields,
495 nulls,
496 length,
497 },
498 Datum::List(records),
499 ) => {
500 let mut count = 0;
501 for (datum, encoder) in records.into_iter().zip_eq(fields.iter_mut()) {
503 count += 1;
504 encoder.push(datum);
505 }
506 assert_eq!(count, fields.len());
507
508 length.add_assign(1);
509 if let Some(nulls) = nulls.as_mut() {
510 nulls.append(true);
511 }
512 }
513 (DatumColumnEncoder::RecordEmpty(builder), Datum::List(records)) => {
514 assert_none!(records.into_iter().next());
515 builder.append_value(true);
516 }
517 (encoder, Datum::Null) => encoder.push_invalid(),
518 (encoder, datum) => panic!("can't encode {datum:?} into {encoder:?}"),
519 }
520 }
521
522 fn push_invalid(&mut self) {
523 match self {
524 DatumColumnEncoder::Bool(builder) => builder.append_null(),
525 DatumColumnEncoder::U8(builder) => builder.append_null(),
526 DatumColumnEncoder::U16(builder) => builder.append_null(),
527 DatumColumnEncoder::U32(builder) => builder.append_null(),
528 DatumColumnEncoder::U64(builder) => builder.append_null(),
529 DatumColumnEncoder::I16(builder) => builder.append_null(),
530 DatumColumnEncoder::I32(builder) => builder.append_null(),
531 DatumColumnEncoder::I64(builder) => builder.append_null(),
532 DatumColumnEncoder::F32(builder) => builder.append_null(),
533 DatumColumnEncoder::F64(builder) => builder.append_null(),
534 DatumColumnEncoder::Numeric {
535 approx_values,
536 binary_values,
537 numeric_context: _,
538 } => {
539 approx_values.append_null();
540 binary_values.append_null();
541 }
542 DatumColumnEncoder::String(builder) => builder.append_null(),
543 DatumColumnEncoder::Bytes(builder) => builder.append_null(),
544 DatumColumnEncoder::Date(builder) => builder.append_null(),
545 DatumColumnEncoder::Time(builder) => builder.append_null(),
546 DatumColumnEncoder::Timestamp(builder) => builder.append_null(),
547 DatumColumnEncoder::TimestampTz(builder) => builder.append_null(),
548 DatumColumnEncoder::MzTimestamp(builder) => builder.append_null(),
549 DatumColumnEncoder::Interval(builder) => builder.append_null(),
550 DatumColumnEncoder::Uuid(builder) => builder.append_null(),
551 DatumColumnEncoder::AclItem(builder) => builder.append_null(),
552 DatumColumnEncoder::MzAclItem(builder) => builder.append_null(),
553 DatumColumnEncoder::Range(builder) => builder.append_null(),
554 DatumColumnEncoder::Jsonb {
555 offsets,
556 buf: _,
557 nulls,
558 } => {
559 let nulls = nulls.get_or_insert_with(|| {
560 let mut buf = BooleanBufferBuilder::new(offsets.len());
561 buf.append_n(offsets.len() - 1, true);
563 buf
564 });
565
566 offsets.push(offsets.last().copied().unwrap_or(0));
567 nulls.append(false);
568 }
569 DatumColumnEncoder::Array {
570 dims,
571 val_lengths,
572 vals: _,
573 nulls,
574 } => {
575 let nulls = nulls.get_or_insert_with(|| {
576 let mut buf = BooleanBufferBuilder::new(dims.len() + 1);
577 buf.append_n(dims.len(), true);
578 buf
579 });
580 dims.append_null();
581
582 val_lengths.push(0);
583 nulls.append(false);
584 }
585 DatumColumnEncoder::List {
586 lengths,
587 values: _,
588 nulls,
589 } => {
590 let nulls = nulls.get_or_insert_with(|| {
591 let mut buf = BooleanBufferBuilder::new(lengths.len() + 1);
592 buf.append_n(lengths.len(), true);
593 buf
594 });
595
596 lengths.push(0);
597 nulls.append(false);
598 }
599 DatumColumnEncoder::Map {
600 lengths,
601 keys: _,
602 vals: _,
603 nulls,
604 } => {
605 let nulls = nulls.get_or_insert_with(|| {
606 let mut buf = BooleanBufferBuilder::new(lengths.len() + 1);
607 buf.append_n(lengths.len(), true);
608 buf
609 });
610
611 lengths.push(0);
612 nulls.append(false);
613 }
614 DatumColumnEncoder::Record {
615 fields,
616 nulls,
617 length,
618 } => {
619 let nulls = nulls.get_or_insert_with(|| {
620 let mut buf = BooleanBufferBuilder::new(*length + 1);
621 buf.append_n(*length, true);
622 buf
623 });
624 nulls.append(false);
625 length.add_assign(1);
626
627 for field in fields {
628 field.push_invalid();
629 }
630 }
631 DatumColumnEncoder::RecordEmpty(builder) => builder.append_null(),
632 }
633 }
634
635 fn finish(self) -> ArrayRef {
636 match self {
637 DatumColumnEncoder::Bool(mut builder) => {
638 let array = builder.finish();
639 Arc::new(array)
640 }
641 DatumColumnEncoder::U8(mut builder) => {
642 let array = builder.finish();
643 Arc::new(array)
644 }
645 DatumColumnEncoder::U16(mut builder) => {
646 let array = builder.finish();
647 Arc::new(array)
648 }
649 DatumColumnEncoder::U32(mut builder) => {
650 let array = builder.finish();
651 Arc::new(array)
652 }
653 DatumColumnEncoder::U64(mut builder) => {
654 let array = builder.finish();
655 Arc::new(array)
656 }
657 DatumColumnEncoder::I16(mut builder) => {
658 let array = builder.finish();
659 Arc::new(array)
660 }
661 DatumColumnEncoder::I32(mut builder) => {
662 let array = builder.finish();
663 Arc::new(array)
664 }
665 DatumColumnEncoder::I64(mut builder) => {
666 let array = builder.finish();
667 Arc::new(array)
668 }
669 DatumColumnEncoder::F32(mut builder) => {
670 let array = builder.finish();
671 Arc::new(array)
672 }
673 DatumColumnEncoder::F64(mut builder) => {
674 let array = builder.finish();
675 Arc::new(array)
676 }
677 DatumColumnEncoder::Numeric {
678 mut approx_values,
679 mut binary_values,
680 numeric_context: _,
681 } => {
682 let approx_array = approx_values.finish();
683 let binary_array = binary_values.finish();
684
685 assert_eq!(approx_array.len(), binary_array.len());
686 debug_assert_eq!(approx_array.logical_nulls(), binary_array.logical_nulls());
688
689 let fields = Fields::from(vec![
690 Field::new("approx", approx_array.data_type().clone(), true),
691 Field::new("binary", binary_array.data_type().clone(), true),
692 ]);
693 let nulls = approx_array.logical_nulls();
694 let array = StructArray::new(
695 fields,
696 vec![Arc::new(approx_array), Arc::new(binary_array)],
697 nulls,
698 );
699 Arc::new(array)
700 }
701 DatumColumnEncoder::String(mut builder) => {
702 let array = builder.finish();
703 Arc::new(array)
704 }
705 DatumColumnEncoder::Bytes(mut builder) => {
706 let array = builder.finish();
707 Arc::new(array)
708 }
709 DatumColumnEncoder::Date(mut builder) => {
710 let array = builder.finish();
711 Arc::new(array)
712 }
713 DatumColumnEncoder::Time(mut builder) => {
714 let array = builder.finish();
715 Arc::new(array)
716 }
717 DatumColumnEncoder::Timestamp(mut builder) => {
718 let array = builder.finish();
719 Arc::new(array)
720 }
721 DatumColumnEncoder::TimestampTz(mut builder) => {
722 let array = builder.finish();
723 Arc::new(array)
724 }
725 DatumColumnEncoder::MzTimestamp(mut builder) => {
726 let array = builder.finish();
727 Arc::new(array)
728 }
729 DatumColumnEncoder::Interval(mut builder) => {
730 let array = builder.finish();
731 Arc::new(array)
732 }
733 DatumColumnEncoder::Uuid(mut builder) => {
734 let array = builder.finish();
735 Arc::new(array)
736 }
737 DatumColumnEncoder::AclItem(mut builder) => Arc::new(builder.finish()),
738 DatumColumnEncoder::MzAclItem(mut builder) => Arc::new(builder.finish()),
739 DatumColumnEncoder::Range(mut builder) => Arc::new(builder.finish()),
740 DatumColumnEncoder::Jsonb {
741 offsets,
742 buf,
743 mut nulls,
744 } => {
745 let values = Buffer::from_vec(buf);
746 let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
747 let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
748 let array = StringArray::new(offsets, values, nulls);
749 Arc::new(array)
750 }
751 DatumColumnEncoder::Array {
752 mut dims,
753 val_lengths,
754 vals,
755 mut nulls,
756 } => {
757 let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
758 let vals = vals.finish();
759
760 let field = Field::new_list_field(vals.data_type().clone(), true);
763 let val_offsets = OffsetBuffer::from_lengths(val_lengths);
764 let values =
765 ListArray::new(Arc::new(field), val_offsets, Arc::new(vals), nulls.clone());
766
767 let dims = dims.finish();
768 assert_eq!(values.len(), dims.len());
769
770 let fields = Fields::from(vec![
773 Field::new("dims", dims.data_type().clone(), true),
774 Field::new("vals", values.data_type().clone(), true),
775 ]);
776 let array = StructArray::new(fields, vec![Arc::new(dims), Arc::new(values)], nulls);
777
778 Arc::new(array)
779 }
780 DatumColumnEncoder::List {
781 lengths,
782 values,
783 mut nulls,
784 } => {
785 let values = values.finish();
786
787 let field = Field::new_list_field(values.data_type().clone(), true);
790 let offsets = OffsetBuffer::<i32>::from_lengths(lengths.iter().copied());
791 let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
792
793 let array = ListArray::new(Arc::new(field), offsets, values, nulls);
794 Arc::new(array)
795 }
796 DatumColumnEncoder::Map {
797 lengths,
798 mut keys,
799 vals,
800 mut nulls,
801 } => {
802 let keys = keys.finish();
803 let vals = vals.finish();
804
805 let offsets = OffsetBuffer::<i32>::from_lengths(lengths.iter().copied());
806 let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
807
808 assert_none!(keys.logical_nulls());
811 let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false));
812 let val_field = Arc::new(Field::new("val", vals.data_type().clone(), true));
813 let fields = Fields::from(vec![Arc::clone(&key_field), Arc::clone(&val_field)]);
814 let entries = StructArray::new(fields, vec![Arc::new(keys), vals], None);
815
816 let field = Field::new("map_entries", entries.data_type().clone(), false);
819 let array = ListArray::new(Arc::new(field), offsets, Arc::new(entries), nulls);
820 Arc::new(array)
821 }
822 DatumColumnEncoder::Record {
823 fields,
824 mut nulls,
825 length: _,
826 } => {
827 let (fields, arrays): (Vec<_>, Vec<_>) = fields
828 .into_iter()
829 .enumerate()
830 .map(|(tag, encoder)| {
831 let nullable = true;
837 let array = encoder.finish();
838 let field =
839 Field::new(tag.to_string(), array.data_type().clone(), nullable);
840 (field, array)
841 })
842 .unzip();
843 let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
844
845 let array = StructArray::new(Fields::from(fields), arrays, nulls);
846 Arc::new(array)
847 }
848 DatumColumnEncoder::RecordEmpty(mut builder) => Arc::new(builder.finish()),
849 }
850 }
851}
852
853#[derive(Debug)]
858enum DatumColumnDecoder {
859 Bool(BooleanArray),
860 U8(UInt8Array),
861 U16(UInt16Array),
862 U32(UInt32Array),
863 U64(UInt64Array),
864 I16(Int16Array),
865 I32(Int32Array),
866 I64(Int64Array),
867 F32(Float32Array),
868 F64(Float64Array),
869 Numeric(BinaryArray),
870 Bytes(BinaryArray),
871 String(StringArray),
872 Date(Int32Array),
873 Time(FixedSizeBinaryArray),
874 Timestamp(FixedSizeBinaryArray),
875 TimestampTz(FixedSizeBinaryArray),
876 MzTimestamp(UInt64Array),
877 Interval(FixedSizeBinaryArray),
878 Uuid(FixedSizeBinaryArray),
879 Json(StringArray),
880 Array {
881 dim_offsets: OffsetBuffer<i32>,
882 dims: FixedSizeBinaryArray,
883 val_offsets: OffsetBuffer<i32>,
884 vals: Box<DatumColumnDecoder>,
885 nulls: Option<NullBuffer>,
886 },
887 List {
888 offsets: OffsetBuffer<i32>,
889 values: Box<DatumColumnDecoder>,
890 nulls: Option<NullBuffer>,
891 },
892 Map {
893 offsets: OffsetBuffer<i32>,
894 keys: StringArray,
895 vals: Box<DatumColumnDecoder>,
896 nulls: Option<NullBuffer>,
897 },
898 RecordEmpty(BooleanArray),
899 Record {
900 fields: Vec<Box<DatumColumnDecoder>>,
901 nulls: Option<NullBuffer>,
902 },
903 Range(BinaryArray),
904 MzAclItem(BinaryArray),
905 AclItem(FixedSizeBinaryArray),
906}
907
908impl DatumColumnDecoder {
909 fn get<'a>(&'a self, idx: usize, packer: &'a mut RowPacker) {
910 let datum = match self {
911 DatumColumnDecoder::Bool(array) => array
912 .is_valid(idx)
913 .then(|| array.value(idx))
914 .map(|x| if x { Datum::True } else { Datum::False }),
915 DatumColumnDecoder::U8(array) => array
916 .is_valid(idx)
917 .then(|| array.value(idx))
918 .map(Datum::UInt8),
919 DatumColumnDecoder::U16(array) => array
920 .is_valid(idx)
921 .then(|| array.value(idx))
922 .map(Datum::UInt16),
923 DatumColumnDecoder::U32(array) => array
924 .is_valid(idx)
925 .then(|| array.value(idx))
926 .map(Datum::UInt32),
927 DatumColumnDecoder::U64(array) => array
928 .is_valid(idx)
929 .then(|| array.value(idx))
930 .map(Datum::UInt64),
931 DatumColumnDecoder::I16(array) => array
932 .is_valid(idx)
933 .then(|| array.value(idx))
934 .map(Datum::Int16),
935 DatumColumnDecoder::I32(array) => array
936 .is_valid(idx)
937 .then(|| array.value(idx))
938 .map(Datum::Int32),
939 DatumColumnDecoder::I64(array) => array
940 .is_valid(idx)
941 .then(|| array.value(idx))
942 .map(Datum::Int64),
943 DatumColumnDecoder::F32(array) => array
944 .is_valid(idx)
945 .then(|| array.value(idx))
946 .map(|x| Datum::Float32(ordered_float::OrderedFloat(x))),
947 DatumColumnDecoder::F64(array) => array
948 .is_valid(idx)
949 .then(|| array.value(idx))
950 .map(|x| Datum::Float64(ordered_float::OrderedFloat(x))),
951 DatumColumnDecoder::Numeric(array) => array.is_valid(idx).then(|| {
952 let val = array.value(idx);
953 let val = PackedNumeric::from_bytes(val)
954 .expect("failed to roundtrip Numeric")
955 .into_value();
956 Datum::Numeric(OrderedDecimal(val))
957 }),
958 DatumColumnDecoder::String(array) => array
959 .is_valid(idx)
960 .then(|| array.value(idx))
961 .map(Datum::String),
962 DatumColumnDecoder::Bytes(array) => array
963 .is_valid(idx)
964 .then(|| array.value(idx))
965 .map(Datum::Bytes),
966 DatumColumnDecoder::Date(array) => {
967 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
968 let date = Date::from_pg_epoch(x).expect("failed to roundtrip");
969 Datum::Date(date)
970 })
971 }
972 DatumColumnDecoder::Time(array) => {
973 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
974 let packed = PackedNaiveTime::from_bytes(x).expect("failed to roundtrip time");
975 Datum::Time(packed.into_value())
976 })
977 }
978 DatumColumnDecoder::Timestamp(array) => {
979 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
980 let packed = PackedNaiveDateTime::from_bytes(x)
981 .expect("failed to roundtrip PackedNaiveDateTime");
982 let timestamp = CheckedTimestamp::from_timestamplike(packed.into_value())
983 .expect("failed to roundtrip timestamp");
984 Datum::Timestamp(timestamp)
985 })
986 }
987 DatumColumnDecoder::TimestampTz(array) => {
988 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
989 let packed = PackedNaiveDateTime::from_bytes(x)
990 .expect("failed to roundtrip PackedNaiveDateTime");
991 let timestamp =
992 CheckedTimestamp::from_timestamplike(packed.into_value().and_utc())
993 .expect("failed to roundtrip timestamp");
994 Datum::TimestampTz(timestamp)
995 })
996 }
997 DatumColumnDecoder::MzTimestamp(array) => array
998 .is_valid(idx)
999 .then(|| array.value(idx))
1000 .map(|x| Datum::MzTimestamp(Timestamp::from(x))),
1001 DatumColumnDecoder::Interval(array) => {
1002 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1003 let packed =
1004 PackedInterval::from_bytes(x).expect("failed to roundtrip interval");
1005 Datum::Interval(packed.into_value())
1006 })
1007 }
1008 DatumColumnDecoder::Uuid(array) => {
1009 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1010 let uuid = Uuid::from_slice(x).expect("failed to roundtrip uuid");
1011 Datum::Uuid(uuid)
1012 })
1013 }
1014 DatumColumnDecoder::AclItem(array) => {
1015 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1016 let packed =
1017 PackedAclItem::from_bytes(x).expect("failed to roundtrip MzAclItem");
1018 Datum::AclItem(packed.into_value())
1019 })
1020 }
1021 DatumColumnDecoder::MzAclItem(array) => {
1022 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1023 let packed =
1024 PackedMzAclItem::from_bytes(x).expect("failed to roundtrip MzAclItem");
1025 Datum::MzAclItem(packed.into_value())
1026 })
1027 }
1028 DatumColumnDecoder::Range(array) => {
1029 let Some(val) = array.is_valid(idx).then(|| array.value(idx)) else {
1030 packer.push(Datum::Null);
1031 return;
1032 };
1033
1034 let proto = ProtoDatum::decode(val).expect("failed to roundtrip Range");
1035 packer
1036 .try_push_proto(&proto)
1037 .expect("failed to pack ProtoRange");
1038
1039 return;
1041 }
1042 DatumColumnDecoder::Json(array) => {
1043 let Some(val) = array.is_valid(idx).then(|| array.value(idx)) else {
1044 packer.push(Datum::Null);
1045 return;
1046 };
1047 JsonbPacker::new(packer)
1048 .pack_str(val)
1049 .expect("failed to roundtrip JSON");
1050
1051 return;
1053 }
1054 DatumColumnDecoder::Array {
1055 dim_offsets,
1056 dims,
1057 val_offsets,
1058 vals,
1059 nulls,
1060 } => {
1061 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1062 if !is_valid {
1063 packer.push(Datum::Null);
1064 return;
1065 }
1066
1067 let start: usize = dim_offsets[idx]
1068 .try_into()
1069 .expect("unexpected negative offset");
1070 let end: usize = dim_offsets[idx + 1]
1071 .try_into()
1072 .expect("unexpected negative offset");
1073 let dimensions = (start..end).map(|idx| {
1074 PackedArrayDimension::from_bytes(dims.value(idx))
1075 .expect("failed to roundtrip ArrayDimension")
1076 .into_value()
1077 });
1078
1079 let start: usize = val_offsets[idx]
1080 .try_into()
1081 .expect("unexpected negative offset");
1082 let end: usize = val_offsets[idx + 1]
1083 .try_into()
1084 .expect("unexpected negative offset");
1085 packer
1086 .push_array_with_row_major(dimensions, |packer| {
1087 for x in start..end {
1088 vals.get(x, packer);
1089 }
1090 end - start
1092 })
1093 .expect("failed to pack Array");
1094
1095 return;
1097 }
1098 DatumColumnDecoder::List {
1099 offsets,
1100 values,
1101 nulls,
1102 } => {
1103 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1104 if !is_valid {
1105 packer.push(Datum::Null);
1106 return;
1107 }
1108
1109 let start: usize = offsets[idx].try_into().expect("unexpected negative offset");
1110 let end: usize = offsets[idx + 1]
1111 .try_into()
1112 .expect("unexpected negative offset");
1113
1114 packer.push_list_with(|packer| {
1115 for idx in start..end {
1116 values.get(idx, packer)
1117 }
1118 });
1119
1120 return;
1122 }
1123 DatumColumnDecoder::Map {
1124 offsets,
1125 keys,
1126 vals,
1127 nulls,
1128 } => {
1129 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1130 if !is_valid {
1131 packer.push(Datum::Null);
1132 return;
1133 }
1134
1135 let start: usize = offsets[idx].try_into().expect("unexpected negative offset");
1136 let end: usize = offsets[idx + 1]
1137 .try_into()
1138 .expect("unexpected negative offset");
1139
1140 packer.push_dict_with(|packer| {
1141 for idx in start..end {
1142 packer.push(Datum::String(keys.value(idx)));
1143 vals.get(idx, packer);
1144 }
1145 });
1146
1147 return;
1149 }
1150 DatumColumnDecoder::RecordEmpty(array) => array.is_valid(idx).then(Datum::empty_list),
1151 DatumColumnDecoder::Record { fields, nulls } => {
1152 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1153 if !is_valid {
1154 packer.push(Datum::Null);
1155 return;
1156 }
1157
1158 packer.push_list_with(|packer| {
1160 for field in fields {
1161 field.get(idx, packer);
1162 }
1163 });
1164
1165 return;
1167 }
1168 };
1169
1170 match datum {
1171 Some(d) => packer.push(d),
1172 None => packer.push(Datum::Null),
1173 }
1174 }
1175
1176 fn stats(&self) -> ColumnStatKinds {
1177 match self {
1178 DatumColumnDecoder::Bool(a) => PrimitiveStats::<bool>::from_column(a).into(),
1179 DatumColumnDecoder::U8(a) => PrimitiveStats::<u8>::from_column(a).into(),
1180 DatumColumnDecoder::U16(a) => PrimitiveStats::<u16>::from_column(a).into(),
1181 DatumColumnDecoder::U32(a) => PrimitiveStats::<u32>::from_column(a).into(),
1182 DatumColumnDecoder::U64(a) => PrimitiveStats::<u64>::from_column(a).into(),
1183 DatumColumnDecoder::I16(a) => PrimitiveStats::<i16>::from_column(a).into(),
1184 DatumColumnDecoder::I32(a) => PrimitiveStats::<i32>::from_column(a).into(),
1185 DatumColumnDecoder::I64(a) => PrimitiveStats::<i64>::from_column(a).into(),
1186 DatumColumnDecoder::F32(a) => PrimitiveStats::<f32>::from_column(a).into(),
1187 DatumColumnDecoder::F64(a) => PrimitiveStats::<f64>::from_column(a).into(),
1188 DatumColumnDecoder::Numeric(a) => numeric_stats_from_column(a),
1189 DatumColumnDecoder::String(a) => PrimitiveStats::<String>::from_column(a).into(),
1190 DatumColumnDecoder::Bytes(a) => PrimitiveStats::<Vec<u8>>::from_column(a).into(),
1191 DatumColumnDecoder::Date(a) => PrimitiveStats::<i32>::from_column(a).into(),
1192 DatumColumnDecoder::Time(a) => {
1193 fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedTime)
1194 }
1195 DatumColumnDecoder::Timestamp(a) => {
1196 fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedDateTime)
1197 }
1198 DatumColumnDecoder::TimestampTz(a) => {
1199 fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedDateTime)
1200 }
1201 DatumColumnDecoder::MzTimestamp(a) => PrimitiveStats::<u64>::from_column(a).into(),
1202 DatumColumnDecoder::Interval(a) => {
1203 fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedInterval)
1204 }
1205 DatumColumnDecoder::Uuid(a) => {
1206 fixed_stats_from_column(a, FixedSizeBytesStatsKind::Uuid)
1207 }
1208 DatumColumnDecoder::AclItem(_)
1209 | DatumColumnDecoder::MzAclItem(_)
1210 | DatumColumnDecoder::Range(_) => ColumnStatKinds::None,
1211 DatumColumnDecoder::Json(a) => stats_for_json(a.iter()).values,
1212 DatumColumnDecoder::Array { .. }
1213 | DatumColumnDecoder::List { .. }
1214 | DatumColumnDecoder::Map { .. }
1215 | DatumColumnDecoder::Record { .. }
1216 | DatumColumnDecoder::RecordEmpty(_) => ColumnStatKinds::None,
1217 }
1218 }
1219
1220 fn goodbytes(&self) -> usize {
1221 match self {
1222 DatumColumnDecoder::Bool(a) => ArrayOrd::Bool(a.clone()).goodbytes(),
1223 DatumColumnDecoder::U8(a) => ArrayOrd::UInt8(a.clone()).goodbytes(),
1224 DatumColumnDecoder::U16(a) => ArrayOrd::UInt16(a.clone()).goodbytes(),
1225 DatumColumnDecoder::U32(a) => ArrayOrd::UInt32(a.clone()).goodbytes(),
1226 DatumColumnDecoder::U64(a) => ArrayOrd::UInt64(a.clone()).goodbytes(),
1227 DatumColumnDecoder::I16(a) => ArrayOrd::Int16(a.clone()).goodbytes(),
1228 DatumColumnDecoder::I32(a) => ArrayOrd::Int32(a.clone()).goodbytes(),
1229 DatumColumnDecoder::I64(a) => ArrayOrd::Int64(a.clone()).goodbytes(),
1230 DatumColumnDecoder::F32(a) => ArrayOrd::Float32(a.clone()).goodbytes(),
1231 DatumColumnDecoder::F64(a) => ArrayOrd::Float64(a.clone()).goodbytes(),
1232 DatumColumnDecoder::Numeric(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1233 DatumColumnDecoder::String(a) => ArrayOrd::String(a.clone()).goodbytes(),
1234 DatumColumnDecoder::Bytes(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1235 DatumColumnDecoder::Date(a) => ArrayOrd::Int32(a.clone()).goodbytes(),
1236 DatumColumnDecoder::Time(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1237 DatumColumnDecoder::Timestamp(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1238 DatumColumnDecoder::TimestampTz(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1239 DatumColumnDecoder::MzTimestamp(a) => ArrayOrd::UInt64(a.clone()).goodbytes(),
1240 DatumColumnDecoder::Interval(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1241 DatumColumnDecoder::Uuid(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1242 DatumColumnDecoder::AclItem(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1243 DatumColumnDecoder::MzAclItem(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1244 DatumColumnDecoder::Range(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1245 DatumColumnDecoder::Json(a) => ArrayOrd::String(a.clone()).goodbytes(),
1246 DatumColumnDecoder::Array { dims, vals, .. } => {
1247 (dims.len() * PackedArrayDimension::SIZE) + vals.goodbytes()
1248 }
1249 DatumColumnDecoder::List { values, .. } => values.goodbytes(),
1250 DatumColumnDecoder::Map { keys, vals, .. } => {
1251 ArrayOrd::String(keys.clone()).goodbytes() + vals.goodbytes()
1252 }
1253 DatumColumnDecoder::Record { fields, .. } => fields.iter().map(|f| f.goodbytes()).sum(),
1254 DatumColumnDecoder::RecordEmpty(a) => ArrayOrd::Bool(a.clone()).goodbytes(),
1255 }
1256 }
1257}
1258
1259impl Schema<Row> for RelationDesc {
1260 type ArrowColumn = arrow::array::StructArray;
1261 type Statistics = OptionStats<StructStats>;
1262
1263 type Decoder = RowColumnarDecoder;
1264 type Encoder = RowColumnarEncoder;
1265
1266 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1267 RowColumnarDecoder::new(col, self)
1268 }
1269
1270 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1271 RowColumnarEncoder::new(self)
1272 .ok_or_else(|| anyhow::anyhow!("Cannot encode a RelationDesc with no columns"))
1273 }
1274}
1275
1276#[derive(Debug)]
1278pub struct RowColumnarDecoder {
1279 len: usize,
1282 decoders: Vec<(Arc<str>, Option<usize>, DatumColumnDecoder)>,
1285 nullability: Option<NullBuffer>,
1288}
1289
1290fn mask_nulls(column: &ArrayRef, null_mask: Option<&NullBuffer>) -> ArrayRef {
1292 if null_mask.is_none() {
1293 Arc::clone(column)
1294 } else {
1295 let nulls = NullBuffer::union(null_mask, column.nulls());
1298 let data = column
1299 .to_data()
1300 .into_builder()
1301 .nulls(nulls)
1302 .build()
1303 .expect("changed only null mask");
1304 make_array(data)
1305 }
1306}
1307
1308impl RowColumnarDecoder {
1309 pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1314 let inner_columns = col.columns();
1315 let desc_columns = desc.typ().columns();
1316
1317 if desc_columns.len() > inner_columns.len() {
1318 anyhow::bail!(
1319 "provided array has too few columns! {desc_columns:?} > {inner_columns:?}"
1320 );
1321 }
1322
1323 let mut decoders = Vec::with_capacity(desc_columns.len());
1325
1326 let null_mask = col.nulls();
1327
1328 for (col_idx, col_name, col_type) in desc.iter_all() {
1330 let field_name = col_idx.to_stable_name();
1331 let column = col.column_by_name(&field_name).ok_or_else(|| {
1332 anyhow::anyhow!(
1333 "StructArray did not contain column name {field_name}, found {:?}",
1334 col.column_names()
1335 )
1336 })?;
1337 let column = mask_nulls(column, null_mask);
1338 let null_count = col_type.nullable.then(|| column.null_count());
1339 let decoder = array_to_decoder(&column, &col_type.scalar_type)?;
1340 decoders.push((col_name.as_str().into(), null_count, decoder));
1341 }
1342
1343 Ok(RowColumnarDecoder {
1344 len: col.len(),
1345 decoders,
1346 nullability: col.logical_nulls(),
1347 })
1348 }
1349
1350 pub fn null_count(&self) -> usize {
1354 self.nullability.as_ref().map_or(0, |n| n.null_count())
1355 }
1356}
1357
1358impl ColumnDecoder<Row> for RowColumnarDecoder {
1359 fn decode(&self, idx: usize, val: &mut Row) {
1360 let mut packer = val.packer();
1361
1362 for (_, _, decoder) in &self.decoders {
1363 decoder.get(idx, &mut packer);
1364 }
1365 }
1366
1367 fn is_null(&self, idx: usize) -> bool {
1368 let Some(nullability) = self.nullability.as_ref() else {
1369 return false;
1370 };
1371 nullability.is_null(idx)
1372 }
1373
1374 fn goodbytes(&self) -> usize {
1375 let decoders_size: usize = self
1376 .decoders
1377 .iter()
1378 .map(|(_name, _null_count, decoder)| decoder.goodbytes())
1379 .sum();
1380
1381 decoders_size
1382 + self
1383 .nullability
1384 .as_ref()
1385 .map(|nulls| nulls.inner().inner().len())
1386 .unwrap_or(0)
1387 }
1388
1389 fn stats(&self) -> StructStats {
1390 StructStats {
1391 len: self.len,
1392 cols: self
1393 .decoders
1394 .iter()
1395 .map(|(name, null_count, decoder)| {
1396 let name = name.to_string();
1397 let stats = ColumnarStats {
1398 nulls: null_count.map(|count| ColumnNullStats { count }),
1399 values: decoder.stats(),
1400 };
1401 (name, stats)
1402 })
1403 .collect(),
1404 }
1405 }
1406}
1407
1408#[derive(Debug)]
1410pub struct RowColumnarEncoder {
1411 encoders: Vec<DatumEncoder>,
1412 col_names: Vec<(usize, Arc<str>)>,
1414 nullability: BooleanBufferBuilder,
1416}
1417
1418impl RowColumnarEncoder {
1419 pub fn new(desc: &RelationDesc) -> Option<Self> {
1429 if desc.typ().columns().is_empty() {
1430 return None;
1431 }
1432
1433 let (col_names, encoders): (Vec<_>, Vec<_>) = desc
1434 .iter_all()
1435 .map(|(col_idx, col_name, col_type)| {
1436 let encoder = scalar_type_to_encoder(&col_type.scalar_type)
1437 .expect("failed to create encoder");
1438 let encoder = DatumEncoder {
1439 nullable: col_type.nullable,
1440 encoder,
1441 };
1442
1443 let name = (col_idx.to_raw(), col_name.as_str().into());
1446
1447 (name, encoder)
1448 })
1449 .unzip();
1450
1451 Some(RowColumnarEncoder {
1452 encoders,
1453 col_names,
1454 nullability: BooleanBufferBuilder::new(100),
1455 })
1456 }
1457}
1458
1459impl ColumnEncoder<Row> for RowColumnarEncoder {
1460 type FinishedColumn = StructArray;
1461
1462 fn goodbytes(&self) -> usize {
1463 self.encoders.iter().map(|e| e.goodbytes()).sum()
1464 }
1465
1466 fn append(&mut self, val: &Row) {
1467 let mut num_datums = 0;
1468 for (datum, encoder) in val.iter().zip_eq(self.encoders.iter_mut()) {
1469 encoder.push(datum);
1470 num_datums += 1;
1471 }
1472 assert_eq!(
1473 num_datums,
1474 self.encoders.len(),
1475 "tried to encode {val:?}, but only have {:?}",
1476 self.encoders
1477 );
1478
1479 self.nullability.append(true);
1480 }
1481
1482 fn append_null(&mut self) {
1483 for encoder in self.encoders.iter_mut() {
1484 encoder.push_invalid();
1485 }
1486 self.nullability.append(false);
1487 }
1488
1489 fn finish(self) -> Self::FinishedColumn {
1490 let RowColumnarEncoder {
1491 encoders,
1492 col_names,
1493 nullability,
1494 ..
1495 } = self;
1496
1497 let (arrays, fields): (Vec<_>, Vec<_>) = col_names
1498 .iter()
1499 .zip_eq(encoders)
1500 .map(|((col_idx, _col_name), encoder)| {
1501 let nullable = true;
1507 let array = encoder.finish();
1508 let field = Field::new(col_idx.to_string(), array.data_type().clone(), nullable);
1509
1510 (array, field)
1511 })
1512 .multiunzip();
1513
1514 let null_buffer = NullBuffer::from(BooleanBuffer::from(nullability));
1515
1516 let array = StructArray::new(Fields::from(fields), arrays, Some(null_buffer));
1517
1518 array
1519 }
1520}
1521
1522#[inline]
1527fn downcast_array<T: 'static>(array: &Arc<dyn Array>) -> Result<&T, anyhow::Error> {
1528 array
1529 .as_any()
1530 .downcast_ref::<T>()
1531 .ok_or_else(|| anyhow!("expected {}, found {array:?}", std::any::type_name::<T>()))
1532}
1533
1534fn array_to_decoder(
1539 array: &Arc<dyn Array>,
1540 col_ty: &SqlScalarType,
1541) -> Result<DatumColumnDecoder, anyhow::Error> {
1542 let decoder = match (array.data_type(), col_ty) {
1543 (DataType::Boolean, SqlScalarType::Bool) => {
1544 let array = downcast_array::<BooleanArray>(array)?;
1545 DatumColumnDecoder::Bool(array.clone())
1546 }
1547 (DataType::UInt8, SqlScalarType::PgLegacyChar) => {
1548 let array = downcast_array::<UInt8Array>(array)?;
1549 DatumColumnDecoder::U8(array.clone())
1550 }
1551 (DataType::UInt16, SqlScalarType::UInt16) => {
1552 let array = downcast_array::<UInt16Array>(array)?;
1553 DatumColumnDecoder::U16(array.clone())
1554 }
1555 (
1556 DataType::UInt32,
1557 SqlScalarType::UInt32
1558 | SqlScalarType::Oid
1559 | SqlScalarType::RegClass
1560 | SqlScalarType::RegProc
1561 | SqlScalarType::RegType,
1562 ) => {
1563 let array = downcast_array::<UInt32Array>(array)?;
1564 DatumColumnDecoder::U32(array.clone())
1565 }
1566 (DataType::UInt64, SqlScalarType::UInt64) => {
1567 let array = downcast_array::<UInt64Array>(array)?;
1568 DatumColumnDecoder::U64(array.clone())
1569 }
1570 (DataType::Int16, SqlScalarType::Int16) => {
1571 let array = downcast_array::<Int16Array>(array)?;
1572 DatumColumnDecoder::I16(array.clone())
1573 }
1574 (DataType::Int32, SqlScalarType::Int32) => {
1575 let array = downcast_array::<Int32Array>(array)?;
1576 DatumColumnDecoder::I32(array.clone())
1577 }
1578 (DataType::Int64, SqlScalarType::Int64) => {
1579 let array = downcast_array::<Int64Array>(array)?;
1580 DatumColumnDecoder::I64(array.clone())
1581 }
1582 (DataType::Float32, SqlScalarType::Float32) => {
1583 let array = downcast_array::<Float32Array>(array)?;
1584 DatumColumnDecoder::F32(array.clone())
1585 }
1586 (DataType::Float64, SqlScalarType::Float64) => {
1587 let array = downcast_array::<Float64Array>(array)?;
1588 DatumColumnDecoder::F64(array.clone())
1589 }
1590 (DataType::Struct(_), SqlScalarType::Numeric { .. }) => {
1591 let array = downcast_array::<StructArray>(array)?;
1592 let binary_values = array
1595 .column_by_name("binary")
1596 .expect("missing binary column");
1597
1598 let array = downcast_array::<BinaryArray>(binary_values)?;
1599 DatumColumnDecoder::Numeric(array.clone())
1600 }
1601 (
1602 DataType::Utf8,
1603 SqlScalarType::String
1604 | SqlScalarType::PgLegacyName
1605 | SqlScalarType::Char { .. }
1606 | SqlScalarType::VarChar { .. },
1607 ) => {
1608 let array = downcast_array::<StringArray>(array)?;
1609 DatumColumnDecoder::String(array.clone())
1610 }
1611 (DataType::Binary, SqlScalarType::Bytes) => {
1612 let array = downcast_array::<BinaryArray>(array)?;
1613 DatumColumnDecoder::Bytes(array.clone())
1614 }
1615 (DataType::Int32, SqlScalarType::Date) => {
1616 let array = downcast_array::<Int32Array>(array)?;
1617 DatumColumnDecoder::Date(array.clone())
1618 }
1619 (DataType::FixedSizeBinary(TIME_FIXED_BYTES), SqlScalarType::Time) => {
1620 let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1621 DatumColumnDecoder::Time(array.clone())
1622 }
1623 (DataType::FixedSizeBinary(TIMESTAMP_FIXED_BYTES), SqlScalarType::Timestamp { .. }) => {
1624 let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1625 DatumColumnDecoder::Timestamp(array.clone())
1626 }
1627 (DataType::FixedSizeBinary(TIMESTAMP_FIXED_BYTES), SqlScalarType::TimestampTz { .. }) => {
1628 let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1629 DatumColumnDecoder::TimestampTz(array.clone())
1630 }
1631 (DataType::UInt64, SqlScalarType::MzTimestamp) => {
1632 let array = downcast_array::<UInt64Array>(array)?;
1633 DatumColumnDecoder::MzTimestamp(array.clone())
1634 }
1635 (DataType::FixedSizeBinary(INTERVAL_FIXED_BYTES), SqlScalarType::Interval) => {
1636 let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1637 DatumColumnDecoder::Interval(array.clone())
1638 }
1639 (DataType::FixedSizeBinary(UUID_FIXED_BYTES), SqlScalarType::Uuid) => {
1640 let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1641 DatumColumnDecoder::Uuid(array.clone())
1642 }
1643 (DataType::FixedSizeBinary(ACL_ITEM_FIXED_BYTES), SqlScalarType::AclItem) => {
1644 let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1645 DatumColumnDecoder::AclItem(array.clone())
1646 }
1647 (DataType::Binary, SqlScalarType::MzAclItem) => {
1648 let array = downcast_array::<BinaryArray>(array)?;
1649 DatumColumnDecoder::MzAclItem(array.clone())
1650 }
1651 (DataType::Binary, SqlScalarType::Range { .. }) => {
1652 let array = downcast_array::<BinaryArray>(array)?;
1653 DatumColumnDecoder::Range(array.clone())
1654 }
1655 (DataType::Utf8, SqlScalarType::Jsonb) => {
1656 let array = downcast_array::<StringArray>(array)?;
1657 DatumColumnDecoder::Json(array.clone())
1658 }
1659 (DataType::Struct(_), s @ SqlScalarType::Array(_) | s @ SqlScalarType::Int2Vector) => {
1660 let element_type = match s {
1661 SqlScalarType::Array(inner) => inner,
1662 SqlScalarType::Int2Vector => &SqlScalarType::Int16,
1663 _ => unreachable!("checked above"),
1664 };
1665
1666 let array = downcast_array::<StructArray>(array)?;
1667 let nulls = array.nulls().cloned();
1668
1669 let dims = array
1670 .column_by_name("dims")
1671 .expect("missing dimensions column");
1672 let dims = downcast_array::<ListArray>(dims).cloned()?;
1673 let dim_offsets = dims.offsets().clone();
1674 let dims = downcast_array::<FixedSizeBinaryArray>(dims.values()).cloned()?;
1675
1676 let vals = array.column_by_name("vals").expect("missing values column");
1677 let vals = downcast_array::<ListArray>(vals)?;
1678 let val_offsets = vals.offsets().clone();
1679 let vals = array_to_decoder(vals.values(), element_type)?;
1680
1681 DatumColumnDecoder::Array {
1682 dim_offsets,
1683 dims,
1684 val_offsets,
1685 vals: Box::new(vals),
1686 nulls,
1687 }
1688 }
1689 (DataType::List(_), SqlScalarType::List { element_type, .. }) => {
1690 let array = downcast_array::<ListArray>(array)?;
1691 let inner_decoder = array_to_decoder(array.values(), &*element_type)?;
1692 DatumColumnDecoder::List {
1693 offsets: array.offsets().clone(),
1694 values: Box::new(inner_decoder),
1695 nulls: array.nulls().cloned(),
1696 }
1697 }
1698 (DataType::Map(_, true), SqlScalarType::Map { value_type, .. }) => {
1699 let array = downcast_array::<MapArray>(array)?;
1700 let keys = downcast_array::<StringArray>(array.keys())?;
1701 let vals = array_to_decoder(array.values(), value_type)?;
1702 DatumColumnDecoder::Map {
1703 offsets: array.offsets().clone(),
1704 keys: keys.clone(),
1705 vals: Box::new(vals),
1706 nulls: array.nulls().cloned(),
1707 }
1708 }
1709 (DataType::List(_), SqlScalarType::Map { value_type, .. }) => {
1710 let array: &ListArray = downcast_array(array)?;
1711 let entries: &StructArray = downcast_array(array.values())?;
1712 let [keys, values]: &[ArrayRef; 2] = entries.columns().try_into()?;
1713 let keys: &StringArray = downcast_array(keys)?;
1714 let vals: DatumColumnDecoder = array_to_decoder(values, value_type)?;
1715 DatumColumnDecoder::Map {
1716 offsets: array.offsets().clone(),
1717 keys: keys.clone(),
1718 vals: Box::new(vals),
1719 nulls: array.nulls().cloned(),
1720 }
1721 }
1722 (DataType::Boolean, SqlScalarType::Record { fields, .. }) if fields.is_empty() => {
1723 let empty_record_array = downcast_array::<BooleanArray>(array)?;
1724 DatumColumnDecoder::RecordEmpty(empty_record_array.clone())
1725 }
1726 (DataType::Struct(_), SqlScalarType::Record { fields, .. }) => {
1727 let record_array = downcast_array::<StructArray>(array)?;
1728 let null_mask = record_array.nulls();
1729 let mut decoders = Vec::with_capacity(fields.len());
1730 for (tag, (_name, col_type)) in fields.iter().enumerate() {
1731 let inner_array = record_array
1732 .column_by_name(&tag.to_string())
1733 .ok_or_else(|| anyhow::anyhow!("no column named '{tag}'"))?;
1734 let inner_array = mask_nulls(inner_array, null_mask);
1735 let inner_decoder = array_to_decoder(&inner_array, &col_type.scalar_type)?;
1736
1737 decoders.push(Box::new(inner_decoder));
1738 }
1739
1740 DatumColumnDecoder::Record {
1741 fields: decoders,
1742 nulls: record_array.nulls().cloned(),
1743 }
1744 }
1745 (x, y) => {
1746 let msg = format!("can't decode column of {x:?} for scalar type {y:?}");
1747 mz_ore::soft_panic_or_log!("{msg}");
1748 anyhow::bail!("{msg}");
1749 }
1750 };
1751
1752 Ok(decoder)
1753}
1754
1755fn scalar_type_to_encoder(col_ty: &SqlScalarType) -> Result<DatumColumnEncoder, anyhow::Error> {
1757 let encoder = match &col_ty {
1758 SqlScalarType::Bool => DatumColumnEncoder::Bool(BooleanBuilder::new()),
1759 SqlScalarType::PgLegacyChar => DatumColumnEncoder::U8(UInt8Builder::new()),
1760 SqlScalarType::UInt16 => DatumColumnEncoder::U16(UInt16Builder::new()),
1761 SqlScalarType::UInt32
1762 | SqlScalarType::Oid
1763 | SqlScalarType::RegClass
1764 | SqlScalarType::RegProc
1765 | SqlScalarType::RegType => DatumColumnEncoder::U32(UInt32Builder::new()),
1766 SqlScalarType::UInt64 => DatumColumnEncoder::U64(UInt64Builder::new()),
1767 SqlScalarType::Int16 => DatumColumnEncoder::I16(Int16Builder::new()),
1768 SqlScalarType::Int32 => DatumColumnEncoder::I32(Int32Builder::new()),
1769 SqlScalarType::Int64 => DatumColumnEncoder::I64(Int64Builder::new()),
1770 SqlScalarType::Float32 => DatumColumnEncoder::F32(Float32Builder::new()),
1771 SqlScalarType::Float64 => DatumColumnEncoder::F64(Float64Builder::new()),
1772 SqlScalarType::Numeric { .. } => DatumColumnEncoder::Numeric {
1773 approx_values: Float64Builder::new(),
1774 binary_values: BinaryBuilder::new(),
1775 numeric_context: crate::adt::numeric::cx_datum().clone(),
1776 },
1777 SqlScalarType::String
1778 | SqlScalarType::PgLegacyName
1779 | SqlScalarType::Char { .. }
1780 | SqlScalarType::VarChar { .. } => DatumColumnEncoder::String(StringBuilder::new()),
1781 SqlScalarType::Bytes => DatumColumnEncoder::Bytes(BinaryBuilder::new()),
1782 SqlScalarType::Date => DatumColumnEncoder::Date(Int32Builder::new()),
1783 SqlScalarType::Time => {
1784 DatumColumnEncoder::Time(FixedSizeBinaryBuilder::new(TIME_FIXED_BYTES))
1785 }
1786 SqlScalarType::Timestamp { .. } => {
1787 DatumColumnEncoder::Timestamp(FixedSizeBinaryBuilder::new(TIMESTAMP_FIXED_BYTES))
1788 }
1789 SqlScalarType::TimestampTz { .. } => {
1790 DatumColumnEncoder::TimestampTz(FixedSizeBinaryBuilder::new(TIMESTAMP_FIXED_BYTES))
1791 }
1792 SqlScalarType::MzTimestamp => DatumColumnEncoder::MzTimestamp(UInt64Builder::new()),
1793 SqlScalarType::Interval => {
1794 DatumColumnEncoder::Interval(FixedSizeBinaryBuilder::new(INTERVAL_FIXED_BYTES))
1795 }
1796 SqlScalarType::Uuid => {
1797 DatumColumnEncoder::Uuid(FixedSizeBinaryBuilder::new(UUID_FIXED_BYTES))
1798 }
1799 SqlScalarType::AclItem => {
1800 DatumColumnEncoder::AclItem(FixedSizeBinaryBuilder::new(ACL_ITEM_FIXED_BYTES))
1801 }
1802 SqlScalarType::MzAclItem => DatumColumnEncoder::MzAclItem(BinaryBuilder::new()),
1803 SqlScalarType::Range { .. } => DatumColumnEncoder::Range(BinaryBuilder::new()),
1804 SqlScalarType::Jsonb => DatumColumnEncoder::Jsonb {
1805 offsets: vec![0],
1806 buf: Vec::new(),
1807 nulls: None,
1808 },
1809 s @ SqlScalarType::Array(_) | s @ SqlScalarType::Int2Vector => {
1810 let element_type = match s {
1811 SqlScalarType::Array(inner) => inner,
1812 SqlScalarType::Int2Vector => &SqlScalarType::Int16,
1813 _ => unreachable!("checked above"),
1814 };
1815 let inner = scalar_type_to_encoder(element_type)?;
1816 DatumColumnEncoder::Array {
1817 dims: ListBuilder::new(FixedSizeBinaryBuilder::new(ARRAY_DIMENSION_FIXED_BYTES)),
1818 val_lengths: Vec::new(),
1819 vals: Box::new(inner),
1820 nulls: None,
1821 }
1822 }
1823 SqlScalarType::List { element_type, .. } => {
1824 let inner = scalar_type_to_encoder(&*element_type)?;
1825 DatumColumnEncoder::List {
1826 lengths: Vec::new(),
1827 values: Box::new(inner),
1828 nulls: None,
1829 }
1830 }
1831 SqlScalarType::Map { value_type, .. } => {
1832 let inner = scalar_type_to_encoder(&*value_type)?;
1833 DatumColumnEncoder::Map {
1834 lengths: Vec::new(),
1835 keys: StringBuilder::new(),
1836 vals: Box::new(inner),
1837 nulls: None,
1838 }
1839 }
1840 SqlScalarType::Record { fields, .. } if fields.is_empty() => {
1841 DatumColumnEncoder::RecordEmpty(BooleanBuilder::new())
1842 }
1843 SqlScalarType::Record { fields, .. } => {
1844 let encoders = fields
1845 .iter()
1846 .map(|(_name, ty)| {
1847 scalar_type_to_encoder(&ty.scalar_type).map(|e| DatumEncoder {
1848 nullable: ty.nullable,
1849 encoder: e,
1850 })
1851 })
1852 .collect::<Result<_, _>>()?;
1853
1854 DatumColumnEncoder::Record {
1855 fields: encoders,
1856 nulls: None,
1857 length: 0,
1858 }
1859 }
1860 };
1861 Ok(encoder)
1862}
1863
1864impl Codec for Row {
1865 type Storage = ProtoRow;
1866 type Schema = RelationDesc;
1867
1868 fn codec_name() -> String {
1869 "protobuf[Row]".into()
1870 }
1871
1872 fn encode<B>(&self, buf: &mut B)
1878 where
1879 B: BufMut,
1880 {
1881 self.into_proto()
1882 .encode(buf)
1883 .expect("no required fields means no initialization errors");
1884 }
1885
1886 fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Row, String> {
1892 let mut row = Row::with_capacity(buf.len());
1900 <Self as Codec>::decode_from(&mut row, buf, &mut None, schema)?;
1901 Ok(row)
1902 }
1903
1904 fn decode_from<'a>(
1905 &mut self,
1906 buf: &'a [u8],
1907 storage: &mut Option<ProtoRow>,
1908 schema: &RelationDesc,
1909 ) -> Result<(), String> {
1910 let mut proto = storage.take().unwrap_or_default();
1911 proto.clear();
1912 proto.merge(buf).map_err(|err| err.to_string())?;
1913 let ret = self.decode_from_proto(&proto, schema);
1914 storage.replace(proto);
1915 ret
1916 }
1917
1918 fn validate(row: &Self, desc: &Self::Schema) -> Result<(), String> {
1919 for x in Itertools::zip_longest(desc.iter_types(), row.iter()) {
1920 match x {
1921 EitherOrBoth::Both(typ, datum) if datum.is_instance_of_sql(typ) => continue,
1922 _ => return Err(format!("row {:?} did not match desc {:?}", row, desc)),
1923 };
1924 }
1925 Ok(())
1926 }
1927
1928 fn encode_schema(schema: &Self::Schema) -> Bytes {
1929 schema.into_proto().encode_to_vec().into()
1930 }
1931
1932 fn decode_schema(buf: &Bytes) -> Self::Schema {
1933 let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1934 proto.into_rust().expect("valid schema")
1935 }
1936}
1937
1938impl<'a> From<Datum<'a>> for ProtoDatum {
1939 fn from(x: Datum<'a>) -> Self {
1940 let datum_type = match x {
1941 Datum::False => DatumType::Other(ProtoDatumOther::False.into()),
1942 Datum::True => DatumType::Other(ProtoDatumOther::True.into()),
1943 Datum::Int16(x) => DatumType::Int16(x.into()),
1944 Datum::Int32(x) => DatumType::Int32(x),
1945 Datum::UInt8(x) => DatumType::Uint8(x.into()),
1946 Datum::UInt16(x) => DatumType::Uint16(x.into()),
1947 Datum::UInt32(x) => DatumType::Uint32(x),
1948 Datum::UInt64(x) => DatumType::Uint64(x),
1949 Datum::Int64(x) => DatumType::Int64(x),
1950 Datum::Float32(x) => DatumType::Float32(x.into_inner()),
1951 Datum::Float64(x) => DatumType::Float64(x.into_inner()),
1952 Datum::Date(x) => DatumType::Date(x.into_proto()),
1953 Datum::Time(x) => DatumType::Time(ProtoNaiveTime {
1954 secs: x.num_seconds_from_midnight(),
1955 frac: x.nanosecond(),
1956 }),
1957 Datum::Timestamp(x) => DatumType::Timestamp(x.into_proto()),
1958 Datum::TimestampTz(x) => DatumType::TimestampTz(x.into_proto()),
1959 Datum::Interval(x) => DatumType::Interval(x.into_proto()),
1960 Datum::Bytes(x) => DatumType::Bytes(Bytes::copy_from_slice(x)),
1961 Datum::String(x) => DatumType::String(x.to_owned()),
1962 Datum::Array(x) => DatumType::Array(ProtoArray {
1963 elements: Some(ProtoRow {
1964 datums: x.elements().iter().map(|x| x.into()).collect(),
1965 }),
1966 dims: x
1967 .dims()
1968 .into_iter()
1969 .map(|x| ProtoArrayDimension {
1970 lower_bound: i64::cast_from(x.lower_bound),
1971 length: u64::cast_from(x.length),
1972 })
1973 .collect(),
1974 }),
1975 Datum::List(x) => DatumType::List(ProtoRow {
1976 datums: x.iter().map(|x| x.into()).collect(),
1977 }),
1978 Datum::Map(x) => DatumType::Dict(ProtoDict {
1979 elements: x
1980 .iter()
1981 .map(|(k, v)| ProtoDictElement {
1982 key: k.to_owned(),
1983 val: Some(v.into()),
1984 })
1985 .collect(),
1986 }),
1987 Datum::Numeric(x) => {
1988 let mut x = x.0.clone();
1990 if let Some((bcd, scale)) = x.to_packed_bcd() {
1991 DatumType::Numeric(ProtoNumeric { bcd, scale })
1992 } else if x.is_nan() {
1993 DatumType::Other(ProtoDatumOther::NumericNaN.into())
1994 } else if x.is_infinite() {
1995 if x.is_negative() {
1996 DatumType::Other(ProtoDatumOther::NumericNegInf.into())
1997 } else {
1998 DatumType::Other(ProtoDatumOther::NumericPosInf.into())
1999 }
2000 } else if x.is_special() {
2001 panic!("internal error: unhandled special numeric value: {}", x);
2002 } else {
2003 panic!(
2004 "internal error: to_packed_bcd returned None for non-special value: {}",
2005 x
2006 )
2007 }
2008 }
2009 Datum::JsonNull => DatumType::Other(ProtoDatumOther::JsonNull.into()),
2010 Datum::Uuid(x) => DatumType::Uuid(x.as_bytes().to_vec()),
2011 Datum::MzTimestamp(x) => DatumType::MzTimestamp(x.into()),
2012 Datum::Dummy => DatumType::Other(ProtoDatumOther::Dummy.into()),
2013 Datum::Null => DatumType::Other(ProtoDatumOther::Null.into()),
2014 Datum::Range(super::Range { inner }) => DatumType::Range(Box::new(ProtoRange {
2015 inner: inner.map(|RangeInner { lower, upper }| {
2016 Box::new(ProtoRangeInner {
2017 lower_inclusive: lower.inclusive,
2018 lower: lower.bound.map(|bound| Box::new(bound.datum().into())),
2019 upper_inclusive: upper.inclusive,
2020 upper: upper.bound.map(|bound| Box::new(bound.datum().into())),
2021 })
2022 }),
2023 })),
2024 Datum::MzAclItem(x) => DatumType::MzAclItem(x.into_proto()),
2025 Datum::AclItem(x) => DatumType::AclItem(x.into_proto()),
2026 };
2027 ProtoDatum {
2028 datum_type: Some(datum_type),
2029 }
2030 }
2031}
2032
2033impl RowPacker<'_> {
2034 pub(crate) fn try_push_proto(&mut self, x: &ProtoDatum) -> Result<(), String> {
2035 match &x.datum_type {
2036 Some(DatumType::Other(o)) => match ProtoDatumOther::try_from(*o) {
2037 Ok(ProtoDatumOther::Unknown) => return Err("unknown datum type".into()),
2038 Ok(ProtoDatumOther::Null) => self.push(Datum::Null),
2039 Ok(ProtoDatumOther::False) => self.push(Datum::False),
2040 Ok(ProtoDatumOther::True) => self.push(Datum::True),
2041 Ok(ProtoDatumOther::JsonNull) => self.push(Datum::JsonNull),
2042 Ok(ProtoDatumOther::Dummy) => {
2043 #[cfg(feature = "tracing")]
2047 tracing::error!("protobuf decoding found Dummy datum");
2048 self.push(Datum::Dummy);
2049 }
2050 Ok(ProtoDatumOther::NumericPosInf) => self.push(Datum::from(Numeric::infinity())),
2051 Ok(ProtoDatumOther::NumericNegInf) => self.push(Datum::from(-Numeric::infinity())),
2052 Ok(ProtoDatumOther::NumericNaN) => self.push(Datum::from(Numeric::nan())),
2053 Err(_) => return Err(format!("unknown datum type: {}", o)),
2054 },
2055 Some(DatumType::Int16(x)) => {
2056 let x = i16::try_from(*x)
2057 .map_err(|_| format!("int16 field stored with out of range value: {}", *x))?;
2058 self.push(Datum::Int16(x))
2059 }
2060 Some(DatumType::Int32(x)) => self.push(Datum::Int32(*x)),
2061 Some(DatumType::Int64(x)) => self.push(Datum::Int64(*x)),
2062 Some(DatumType::Uint8(x)) => {
2063 let x = u8::try_from(*x)
2064 .map_err(|_| format!("uint8 field stored with out of range value: {}", *x))?;
2065 self.push(Datum::UInt8(x))
2066 }
2067 Some(DatumType::Uint16(x)) => {
2068 let x = u16::try_from(*x)
2069 .map_err(|_| format!("uint16 field stored with out of range value: {}", *x))?;
2070 self.push(Datum::UInt16(x))
2071 }
2072 Some(DatumType::Uint32(x)) => self.push(Datum::UInt32(*x)),
2073 Some(DatumType::Uint64(x)) => self.push(Datum::UInt64(*x)),
2074 Some(DatumType::Float32(x)) => self.push(Datum::Float32((*x).into())),
2075 Some(DatumType::Float64(x)) => self.push(Datum::Float64((*x).into())),
2076 Some(DatumType::Bytes(x)) => self.push(Datum::Bytes(x)),
2077 Some(DatumType::String(x)) => self.push(Datum::String(x)),
2078 Some(DatumType::Uuid(x)) => {
2079 let u = Uuid::from_slice(x).map_err(|err| err.to_string())?;
2084 self.push(Datum::Uuid(u));
2085 }
2086 Some(DatumType::Date(x)) => self.push(Datum::Date(x.clone().into_rust()?)),
2087 Some(DatumType::Time(x)) => self.push(Datum::Time(x.clone().into_rust()?)),
2088 Some(DatumType::Timestamp(x)) => self.push(Datum::Timestamp(x.clone().into_rust()?)),
2089 Some(DatumType::TimestampTz(x)) => {
2090 self.push(Datum::TimestampTz(x.clone().into_rust()?))
2091 }
2092 Some(DatumType::Interval(x)) => self.push(Datum::Interval(
2093 x.clone()
2094 .into_rust()
2095 .map_err(|e: TryFromProtoError| e.to_string())?,
2096 )),
2097 Some(DatumType::List(x)) => self.push_list_with(|row| -> Result<(), String> {
2098 for d in x.datums.iter() {
2099 row.try_push_proto(d)?;
2100 }
2101 Ok(())
2102 })?,
2103 Some(DatumType::Array(x)) => {
2104 let dims = x
2105 .dims
2106 .iter()
2107 .map(|x| ArrayDimension {
2108 lower_bound: isize::cast_from(x.lower_bound),
2109 length: usize::cast_from(x.length),
2110 })
2111 .collect::<Vec<_>>();
2112 match x.elements.as_ref() {
2113 None => self.try_push_array(&dims, [].iter()),
2114 Some(elements) => {
2115 let elements_row = Row::try_from(elements)?;
2118 self.try_push_array(&dims, elements_row.iter())
2119 }
2120 }
2121 .map_err(|err| err.to_string())?
2122 }
2123 Some(DatumType::Dict(x)) => self.push_dict_with(|row| -> Result<(), String> {
2124 for e in x.elements.iter() {
2125 row.push(Datum::from(e.key.as_str()));
2126 let val = e
2127 .val
2128 .as_ref()
2129 .ok_or_else(|| format!("missing val for key: {}", e.key))?;
2130 row.try_push_proto(val)?;
2131 }
2132 Ok(())
2133 })?,
2134 Some(DatumType::Numeric(x)) => {
2135 let n = Decimal::from_packed_bcd(&x.bcd, x.scale).map_err(|err| err.to_string())?;
2138 self.push(Datum::from(n))
2139 }
2140 Some(DatumType::MzTimestamp(x)) => self.push(Datum::MzTimestamp((*x).into())),
2141 Some(DatumType::Range(inner)) => {
2142 let ProtoRange { inner } = &**inner;
2143 match inner {
2144 None => self.push_range(Range { inner: None }).unwrap(),
2145 Some(inner) => {
2146 let ProtoRangeInner {
2147 lower_inclusive,
2148 lower,
2149 upper_inclusive,
2150 upper,
2151 } = &**inner;
2152
2153 self.push_range_with(
2154 RangeLowerBound {
2155 inclusive: *lower_inclusive,
2156 bound: lower
2157 .as_ref()
2158 .map(|d| |row: &mut RowPacker| row.try_push_proto(&*d)),
2159 },
2160 RangeUpperBound {
2161 inclusive: *upper_inclusive,
2162 bound: upper
2163 .as_ref()
2164 .map(|d| |row: &mut RowPacker| row.try_push_proto(&*d)),
2165 },
2166 )
2167 .expect("decoding ProtoRow must succeed");
2168 }
2169 }
2170 }
2171 Some(DatumType::MzAclItem(x)) => self.push(Datum::MzAclItem(x.clone().into_rust()?)),
2172 Some(DatumType::AclItem(x)) => self.push(Datum::AclItem(x.clone().into_rust()?)),
2173 None => return Err("unknown datum type".into()),
2174 };
2175 Ok(())
2176 }
2177}
2178
2179impl TryFrom<&ProtoRow> for Row {
2181 type Error = String;
2182
2183 fn try_from(x: &ProtoRow) -> Result<Self, Self::Error> {
2184 let mut row = Row::default();
2187 let mut packer = row.packer();
2188 for d in x.datums.iter() {
2189 packer.try_push_proto(d)?;
2190 }
2191 Ok(row)
2192 }
2193}
2194
2195impl RustType<ProtoRow> for Row {
2196 fn into_proto(&self) -> ProtoRow {
2197 let datums = self.iter().map(|x| x.into()).collect();
2198 ProtoRow { datums }
2199 }
2200
2201 fn from_proto(proto: ProtoRow) -> Result<Self, TryFromProtoError> {
2202 let mut row = Row::default();
2205 let mut packer = row.packer();
2206 for d in proto.datums.iter() {
2207 packer
2208 .try_push_proto(d)
2209 .map_err(TryFromProtoError::RowConversionError)?;
2210 }
2211 Ok(row)
2212 }
2213}
2214
2215#[cfg(test)]
2216mod tests {
2217 use std::collections::BTreeSet;
2218
2219 use arrow::array::{ArrayData, make_array};
2220 use arrow::compute::SortOptions;
2221 use arrow::datatypes::ArrowNativeType;
2222 use arrow::row::SortField;
2223 use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
2224 use mz_ore::assert_err;
2225 use mz_ore::collections::CollectionExt;
2226 use mz_persist::indexed::columnar::arrow::realloc_array;
2227 use mz_persist::metrics::ColumnarMetrics;
2228 use mz_persist_types::Codec;
2229 use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
2230 use mz_persist_types::columnar::{codec_to_schema, schema_to_codec};
2231 use mz_proto::{ProtoType, RustType};
2232 use proptest::prelude::*;
2233 use proptest::strategy::Strategy;
2234 use uuid::Uuid;
2235
2236 use super::*;
2237 use crate::adt::array::ArrayDimension;
2238 use crate::adt::interval::Interval;
2239 use crate::adt::numeric::Numeric;
2240 use crate::adt::timestamp::CheckedTimestamp;
2241 use crate::fixed_length::ToDatumIter;
2242 use crate::relation::arb_relation_desc;
2243 use crate::{ColumnName, RowArena, SqlColumnType, arb_datum_for_column, arb_row_for_relation};
2244 use crate::{Datum, RelationDesc, Row, SqlScalarType};
2245
2246 #[track_caller]
2247 fn roundtrip_datum<'a>(
2248 ty: SqlColumnType,
2249 datum: impl Iterator<Item = Datum<'a>>,
2250 metrics: &ColumnarMetrics,
2251 ) {
2252 let desc = RelationDesc::builder().with_column("a", ty).finish();
2253 let rows = datum.map(|d| Row::pack_slice(&[d])).collect();
2254 roundtrip_rows(&desc, rows, metrics)
2255 }
2256
2257 #[track_caller]
2258 fn roundtrip_rows(desc: &RelationDesc, rows: Vec<Row>, metrics: &ColumnarMetrics) {
2259 let mut encoder = <RelationDesc as Schema<Row>>::encoder(desc).unwrap();
2260 for row in &rows {
2261 encoder.append(row);
2262 }
2263 let col = encoder.finish();
2264
2265 let col = realloc_array(&col, metrics);
2267 {
2269 let proto = col.to_data().into_proto();
2270 let bytes = proto.encode_to_vec();
2271 let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
2272 let array_data: ArrayData = proto.into_rust().unwrap();
2273
2274 let col_rnd = StructArray::from(array_data.clone());
2275 assert_eq!(col, col_rnd);
2276
2277 let col_dyn = arrow::array::make_array(array_data);
2278 let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
2279 assert_eq!(&col, col_dyn);
2280 }
2281
2282 let decoder = <RelationDesc as Schema<Row>>::decoder(desc, col.clone()).unwrap();
2283 let stats = decoder.stats();
2284
2285 let arena = RowArena::new();
2287 let (stats, stat_nulls): (Vec<_>, Vec<_>) = desc
2288 .iter()
2289 .map(|(name, ty)| {
2290 let col_stats = stats.cols.get(name.as_str()).unwrap();
2291 let lower_upper =
2292 crate::stats::col_values(&ty.scalar_type, &col_stats.values, &arena);
2293 let null_count = col_stats.nulls.map_or(0, |n| n.count);
2294
2295 (lower_upper, null_count)
2296 })
2297 .unzip();
2298 let mut actual_nulls = vec![0usize; stats.len()];
2300
2301 let mut rnd_row = Row::default();
2302 for (idx, og_row) in rows.iter().enumerate() {
2303 decoder.decode(idx, &mut rnd_row);
2304 assert_eq!(og_row, &rnd_row);
2305
2306 for (c_idx, (rnd_datum, ty)) in rnd_row.iter().zip_eq(desc.typ().columns()).enumerate()
2308 {
2309 let lower_upper = stats[c_idx];
2310
2311 if rnd_datum.is_null() {
2313 actual_nulls[c_idx] += 1;
2314 } else if let Some((lower, upper)) = lower_upper {
2315 assert!(rnd_datum >= lower, "{rnd_datum:?} is not >= {lower:?}");
2316 assert!(rnd_datum <= upper, "{rnd_datum:?} is not <= {upper:?}");
2317 } else {
2318 match &ty.scalar_type {
2319 SqlScalarType::Jsonb => (),
2321 SqlScalarType::AclItem
2323 | SqlScalarType::MzAclItem
2324 | SqlScalarType::Range { .. }
2325 | SqlScalarType::Array(_)
2326 | SqlScalarType::Map { .. }
2327 | SqlScalarType::List { .. }
2328 | SqlScalarType::Record { .. }
2329 | SqlScalarType::Int2Vector => (),
2330 other => panic!("should have collected stats for {other:?}"),
2331 }
2332 }
2333 }
2334 }
2335
2336 for (col_idx, (stats_count, actual_count)) in
2338 stat_nulls.iter().zip_eq(actual_nulls.iter()).enumerate()
2339 {
2340 assert_eq!(
2341 stats_count, actual_count,
2342 "column {col_idx} has incorrect number of nulls!"
2343 );
2344 }
2345
2346 let codec = schema_to_codec::<Row>(desc, &col).unwrap();
2348 let col2 = codec_to_schema::<Row>(desc, &codec).unwrap();
2349 assert_eq!(col2.as_ref(), &col);
2350
2351 let converter = arrow::row::RowConverter::new(vec![SortField::new_with_options(
2353 col.data_type().clone(),
2354 SortOptions {
2355 descending: false,
2356 nulls_first: false,
2357 },
2358 )])
2359 .expect("sortable");
2360 let rows = converter
2361 .convert_columns(&[Arc::new(col.clone())])
2362 .expect("convertible");
2363 let mut row_vec = rows.iter().collect::<Vec<_>>();
2364 row_vec.sort();
2365 let row_col = converter
2366 .convert_rows(row_vec)
2367 .expect("convertible")
2368 .into_element();
2369 assert_eq!(row_col.len(), col.len());
2370
2371 let ord = ArrayOrd::new(&col);
2372 let mut indices = (0..u64::usize_as(col.len())).collect::<Vec<_>>();
2373 indices.sort_by_key(|i| ord.at(i.as_usize()));
2374 let indices = UInt64Array::from(indices);
2375 let ord_col = ::arrow::compute::take(&col, &indices, None).expect("takeable");
2376 assert_eq!(row_col.as_ref(), ord_col.as_ref());
2377
2378 let ordered_prefix_len = desc
2380 .iter()
2381 .take_while(|(_, c)| preserves_order(&c.scalar_type))
2382 .count();
2383 let decoder = <RelationDesc as Schema<Row>>::decoder_any(desc, ord_col.as_ref()).unwrap();
2384 let rows = (0..ord_col.len()).map(|i| {
2385 let mut row = Row::default();
2386 decoder.decode(i, &mut row);
2387 row
2388 });
2389 for (a, b) in rows.tuple_windows() {
2390 let a_prefix = a.iter().take(ordered_prefix_len);
2391 let b_prefix = b.iter().take(ordered_prefix_len);
2392 assert!(
2393 a_prefix.cmp(b_prefix).is_le(),
2394 "ordering should be consistent on preserves_order columns: {:#?}\n{:?}\n{:?}",
2395 desc.iter().take(ordered_prefix_len).collect_vec(),
2396 a.to_datum_iter().take(ordered_prefix_len).collect_vec(),
2397 b.to_datum_iter().take(ordered_prefix_len).collect_vec()
2398 );
2399 }
2400
2401 assert_eq!(
2403 ord.goodbytes(),
2404 (0..col.len()).map(|i| ord.at(i).goodbytes()).sum::<usize>(),
2405 "total size should match the sum of the sizes at each index"
2406 );
2407
2408 if !ord_col.is_empty() {
2410 let min_idx = indices.values()[0].as_usize();
2411 let lower_bound = ArrayBound::new(ord_col, min_idx);
2412 let max_encoded_len = 1000;
2413 if let Some(proto) = lower_bound.to_proto_lower(max_encoded_len) {
2414 assert!(
2415 proto.encoded_len() <= max_encoded_len,
2416 "should respect the max len"
2417 );
2418 let array_data = proto.into_rust().expect("valid array");
2419 let new_lower_bound = ArrayBound::new(make_array(array_data), 0);
2420 assert!(
2421 new_lower_bound.get() <= lower_bound.get(),
2422 "proto-roundtripped bound should be <= the original"
2423 );
2424 }
2425 }
2426 }
2427
2428 #[mz_ore::test]
2429 #[cfg_attr(miri, ignore)] fn proptest_datums() {
2431 let strat = any::<SqlColumnType>().prop_flat_map(|ty| {
2432 proptest::collection::vec(arb_datum_for_column(ty.clone()), 0..16)
2433 .prop_map(move |d| (ty.clone(), d))
2434 });
2435 let metrics = ColumnarMetrics::disconnected();
2436
2437 proptest!(|((ty, datums) in strat)| {
2438 roundtrip_datum(ty.clone(), datums.iter().map(Datum::from), &metrics);
2439 })
2440 }
2441
2442 #[mz_ore::test]
2443 #[cfg_attr(miri, ignore)] fn proptest_non_empty_relation_descs() {
2445 let strat = arb_relation_desc(1..8).prop_flat_map(|desc| {
2446 proptest::collection::vec(arb_row_for_relation(&desc), 0..12)
2447 .prop_map(move |rows| (desc.clone(), rows))
2448 });
2449 let metrics = ColumnarMetrics::disconnected();
2450
2451 proptest!(|((desc, rows) in strat)| {
2452 roundtrip_rows(&desc, rows, &metrics)
2453 })
2454 }
2455
2456 #[mz_ore::test]
2457 fn empty_relation_desc_returns_error() {
2458 let empty_desc = RelationDesc::empty();
2459 let result = <RelationDesc as Schema<Row>>::encoder(&empty_desc);
2460 assert_err!(result);
2461 }
2462
2463 #[mz_ore::test]
2464 fn smoketest_collections() {
2465 let mut row = Row::default();
2466 let mut packer = row.packer();
2467 let metrics = ColumnarMetrics::disconnected();
2468
2469 packer
2470 .try_push_array(
2471 &[ArrayDimension {
2472 lower_bound: 0,
2473 length: 3,
2474 }],
2475 [Datum::UInt32(4), Datum::UInt32(5), Datum::UInt32(6)],
2476 )
2477 .unwrap();
2478
2479 let array = row.unpack_first();
2480 roundtrip_datum(
2481 SqlScalarType::Array(Box::new(SqlScalarType::UInt32)).nullable(true),
2482 [array].into_iter(),
2483 &metrics,
2484 );
2485 }
2486
2487 #[mz_ore::test]
2488 fn smoketest_row() {
2489 let desc = RelationDesc::builder()
2490 .with_column("a", SqlScalarType::Int64.nullable(true))
2491 .with_column("b", SqlScalarType::String.nullable(true))
2492 .with_column("c", SqlScalarType::Bool.nullable(true))
2493 .with_column(
2494 "d",
2495 SqlScalarType::List {
2496 element_type: Box::new(SqlScalarType::UInt32),
2497 custom_id: None,
2498 }
2499 .nullable(true),
2500 )
2501 .with_column(
2502 "e",
2503 SqlScalarType::Map {
2504 value_type: Box::new(SqlScalarType::Int16),
2505 custom_id: None,
2506 }
2507 .nullable(true),
2508 )
2509 .finish();
2510 let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2511
2512 let mut og_row = Row::default();
2513 {
2514 let mut packer = og_row.packer();
2515 packer.push(Datum::Int64(100));
2516 packer.push(Datum::String("hello world"));
2517 packer.push(Datum::True);
2518 packer.push_list([Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)]);
2519 packer.push_dict([("bar", Datum::Int16(9)), ("foo", Datum::Int16(3))]);
2520 }
2521 let mut og_row_2 = Row::default();
2522 {
2523 let mut packer = og_row_2.packer();
2524 packer.push(Datum::Null);
2525 packer.push(Datum::Null);
2526 packer.push(Datum::Null);
2527 packer.push(Datum::Null);
2528 packer.push(Datum::Null);
2529 }
2530
2531 encoder.append(&og_row);
2532 encoder.append(&og_row_2);
2533 let col = encoder.finish();
2534
2535 let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2536
2537 let mut rnd_row = Row::default();
2538 decoder.decode(0, &mut rnd_row);
2539 assert_eq!(og_row, rnd_row);
2540
2541 let mut rnd_row = Row::default();
2542 decoder.decode(1, &mut rnd_row);
2543 assert_eq!(og_row_2, rnd_row);
2544 }
2545
2546 #[mz_ore::test]
2547 fn test_nested_list() {
2548 let desc = RelationDesc::builder()
2549 .with_column(
2550 "a",
2551 SqlScalarType::List {
2552 element_type: Box::new(SqlScalarType::List {
2553 element_type: Box::new(SqlScalarType::Int64),
2554 custom_id: None,
2555 }),
2556 custom_id: None,
2557 }
2558 .nullable(false),
2559 )
2560 .finish();
2561 let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2562
2563 let mut og_row = Row::default();
2564 {
2565 let mut packer = og_row.packer();
2566 packer.push_list_with(|inner| {
2567 inner.push_list([Datum::Int64(1), Datum::Int64(2)]);
2568 inner.push_list([Datum::Int64(5)]);
2569 inner.push_list([Datum::Int64(9), Datum::Int64(99), Datum::Int64(999)]);
2570 });
2571 }
2572
2573 encoder.append(&og_row);
2574 let col = encoder.finish();
2575
2576 let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2577 let mut rnd_row = Row::default();
2578 decoder.decode(0, &mut rnd_row);
2579
2580 assert_eq!(og_row, rnd_row);
2581 }
2582
2583 #[mz_ore::test]
2584 fn test_record() {
2585 let desc = RelationDesc::builder()
2586 .with_column(
2587 "a",
2588 SqlScalarType::Record {
2589 fields: [
2590 (
2591 ColumnName::from("foo"),
2592 SqlScalarType::Int64.nullable(false),
2593 ),
2594 (
2595 ColumnName::from("bar"),
2596 SqlScalarType::String.nullable(true),
2597 ),
2598 (
2599 ColumnName::from("baz"),
2600 SqlScalarType::List {
2601 element_type: Box::new(SqlScalarType::UInt32),
2602 custom_id: None,
2603 }
2604 .nullable(false),
2605 ),
2606 ]
2607 .into(),
2608 custom_id: None,
2609 }
2610 .nullable(true),
2611 )
2612 .finish();
2613 let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2614
2615 let mut og_row = Row::default();
2616 {
2617 let mut packer = og_row.packer();
2618 packer.push_list_with(|inner| {
2619 inner.push(Datum::Int64(42));
2620 inner.push(Datum::Null);
2621 inner.push_list([Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)]);
2622 });
2623 }
2624 let null_row = Row::pack_slice(&[Datum::Null]);
2625
2626 encoder.append(&og_row);
2627 encoder.append(&null_row);
2628 let col = encoder.finish();
2629
2630 let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2631 let mut rnd_row = Row::default();
2632
2633 decoder.decode(0, &mut rnd_row);
2634 assert_eq!(og_row, rnd_row);
2635
2636 rnd_row.packer();
2637 decoder.decode(1, &mut rnd_row);
2638 assert_eq!(null_row, rnd_row);
2639 }
2640
2641 #[mz_ore::test]
2642 #[cfg_attr(miri, ignore)] fn roundtrip() {
2644 let mut row = Row::default();
2645 let mut packer = row.packer();
2646 packer.extend([
2647 Datum::False,
2648 Datum::True,
2649 Datum::Int16(1),
2650 Datum::Int32(2),
2651 Datum::Int64(3),
2652 Datum::Float32(4f32.into()),
2653 Datum::Float64(5f64.into()),
2654 Datum::Date(
2655 NaiveDate::from_ymd_opt(6, 7, 8)
2656 .unwrap()
2657 .try_into()
2658 .unwrap(),
2659 ),
2660 Datum::Time(NaiveTime::from_hms_opt(9, 10, 11).unwrap()),
2661 Datum::Timestamp(
2662 CheckedTimestamp::from_timestamplike(
2663 NaiveDate::from_ymd_opt(12, 13 % 12, 14)
2664 .unwrap()
2665 .and_time(NaiveTime::from_hms_opt(15, 16, 17).unwrap()),
2666 )
2667 .unwrap(),
2668 ),
2669 Datum::TimestampTz(
2670 CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
2671 NaiveDate::from_ymd_opt(18, 19 % 12, 20)
2672 .unwrap()
2673 .and_time(NaiveTime::from_hms_opt(21, 22, 23).unwrap()),
2674 Utc,
2675 ))
2676 .unwrap(),
2677 ),
2678 Datum::Interval(Interval {
2679 months: 24,
2680 days: 42,
2681 micros: 25,
2682 }),
2683 Datum::Bytes(&[26, 27]),
2684 Datum::String("28"),
2685 Datum::from(Numeric::from(29)),
2686 Datum::from(Numeric::infinity()),
2687 Datum::from(-Numeric::infinity()),
2688 Datum::from(Numeric::nan()),
2689 Datum::JsonNull,
2690 Datum::Uuid(Uuid::from_u128(30)),
2691 Datum::Dummy,
2692 Datum::Null,
2693 ]);
2694 packer
2695 .try_push_array(
2696 &[ArrayDimension {
2697 lower_bound: 2,
2698 length: 2,
2699 }],
2700 vec![Datum::Int32(31), Datum::Int32(32)],
2701 )
2702 .expect("valid array");
2703 packer.push_list_with(|packer| {
2704 packer.push(Datum::String("33"));
2705 packer.push_list_with(|packer| {
2706 packer.push(Datum::String("34"));
2707 packer.push(Datum::String("35"));
2708 });
2709 packer.push(Datum::String("36"));
2710 packer.push(Datum::String("37"));
2711 });
2712 packer.push_dict_with(|row| {
2713 let mut i = 38;
2716 for _ in 0..20 {
2717 row.push(Datum::String(&i.to_string()));
2718 row.push(Datum::Int32(i + 1));
2719 i += 2;
2720 }
2721 });
2722
2723 let mut desc = RelationDesc::builder();
2724 for (idx, _) in row.iter().enumerate() {
2725 desc = desc.with_column(idx.to_string(), SqlScalarType::Int32.nullable(true));
2728 }
2729 let desc = desc.finish();
2730
2731 let encoded = row.encode_to_vec();
2732 assert_eq!(Row::decode(&encoded, &desc), Ok(row));
2733 }
2734
2735 #[mz_ore::test]
2736 fn smoketest_projection() {
2737 let desc = RelationDesc::builder()
2738 .with_column("a", SqlScalarType::Int64.nullable(true))
2739 .with_column("b", SqlScalarType::String.nullable(true))
2740 .with_column("c", SqlScalarType::Bool.nullable(true))
2741 .finish();
2742 let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2743
2744 let mut og_row = Row::default();
2745 {
2746 let mut packer = og_row.packer();
2747 packer.push(Datum::Int64(100));
2748 packer.push(Datum::String("hello world"));
2749 packer.push(Datum::True);
2750 }
2751 let mut og_row_2 = Row::default();
2752 {
2753 let mut packer = og_row_2.packer();
2754 packer.push(Datum::Null);
2755 packer.push(Datum::Null);
2756 packer.push(Datum::Null);
2757 }
2758
2759 encoder.append(&og_row);
2760 encoder.append(&og_row_2);
2761 let col = encoder.finish();
2762
2763 let projected_desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2764
2765 let decoder = <RelationDesc as Schema<Row>>::decoder(&projected_desc, col).unwrap();
2766
2767 let mut rnd_row = Row::default();
2768 decoder.decode(0, &mut rnd_row);
2769 let expected_row = Row::pack_slice(&[Datum::Int64(100), Datum::True]);
2770 assert_eq!(expected_row, rnd_row);
2771
2772 let mut rnd_row = Row::default();
2773 decoder.decode(1, &mut rnd_row);
2774 let expected_row = Row::pack_slice(&[Datum::Null, Datum::Null]);
2775 assert_eq!(expected_row, rnd_row);
2776 }
2777}