1use std::fmt::Debug;
15use std::ops::AddAssign;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use arrow::array::{
20 Array, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBufferBuilder,
21 BooleanBuilder, FixedSizeBinaryArray, FixedSizeBinaryBuilder, Float32Array, Float32Builder,
22 Float64Array, Float64Builder, Int16Array, Int16Builder, Int32Array, Int32Builder, Int64Array,
23 Int64Builder, ListArray, ListBuilder, MapArray, StringArray, StringBuilder, StructArray,
24 UInt8Array, UInt8Builder, UInt16Array, UInt16Builder, UInt32Array, UInt32Builder, UInt64Array,
25 UInt64Builder, make_array,
26};
27use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
28use arrow::datatypes::{DataType, Field, Fields, ToByteSlice};
29use bytes::{BufMut, Bytes};
30use chrono::Timelike;
31use dec::{Context, Decimal, OrderedDecimal};
32use itertools::{EitherOrBoth, Itertools};
33use mz_ore::assert_none;
34use mz_ore::cast::CastFrom;
35use mz_persist_types::Codec;
36use mz_persist_types::arrow::ArrayOrd;
37use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, FixedSizeCodec, Schema};
38use mz_persist_types::stats::{
39 ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, FixedSizeBytesStatsKind,
40 OptionStats, PrimitiveStats, StructStats,
41};
42use mz_proto::chrono::ProtoNaiveTime;
43use mz_proto::{ProtoType, RustType, TryFromProtoError};
44use prost::Message;
45use timely::Container;
46use uuid::Uuid;
47
48use crate::adt::array::{ArrayDimension, PackedArrayDimension};
49use crate::adt::date::Date;
50use crate::adt::datetime::PackedNaiveTime;
51use crate::adt::interval::PackedInterval;
52use crate::adt::jsonb::{JsonbPacker, JsonbRef};
53use crate::adt::mz_acl_item::{PackedAclItem, PackedMzAclItem};
54use crate::adt::numeric::{Numeric, PackedNumeric};
55use crate::adt::range::{Range, RangeInner, RangeLowerBound, RangeUpperBound};
56use crate::adt::timestamp::{CheckedTimestamp, PackedNaiveDateTime};
57use crate::row::proto_datum::DatumType;
58use crate::row::{
59 ProtoArray, ProtoArrayDimension, ProtoDatum, ProtoDatumOther, ProtoDict, ProtoDictElement,
60 ProtoNumeric, ProtoRange, ProtoRangeInner, ProtoRow,
61};
62use crate::stats::{fixed_stats_from_column, numeric_stats_from_column, stats_for_json};
63use crate::{Datum, ProtoRelationDesc, RelationDesc, Row, RowPacker, ScalarType, Timestamp};
64
65#[allow(clippy::as_conversions)]
70mod fixed_binary_sizes {
71 use super::*;
72
73 pub const TIME_FIXED_BYTES: i32 = PackedNaiveTime::SIZE as i32;
74 pub const TIMESTAMP_FIXED_BYTES: i32 = PackedNaiveDateTime::SIZE as i32;
75 pub const INTERVAL_FIXED_BYTES: i32 = PackedInterval::SIZE as i32;
76 pub const ACL_ITEM_FIXED_BYTES: i32 = PackedAclItem::SIZE as i32;
77 pub const _MZ_ACL_ITEM_FIXED_BYTES: i32 = PackedMzAclItem::SIZE as i32;
78 pub const ARRAY_DIMENSION_FIXED_BYTES: i32 = PackedArrayDimension::SIZE as i32;
79
80 pub const UUID_FIXED_BYTES: i32 = 16;
81 static_assertions::const_assert_eq!(UUID_FIXED_BYTES as usize, std::mem::size_of::<Uuid>());
82}
83use fixed_binary_sizes::*;
84
85pub fn preserves_order(scalar_type: &ScalarType) -> bool {
89 match scalar_type {
90 ScalarType::Bool
92 | ScalarType::Int16
93 | ScalarType::Int32
94 | ScalarType::Int64
95 | ScalarType::UInt16
96 | ScalarType::UInt32
97 | ScalarType::UInt64
98 | ScalarType::Date
99 | ScalarType::Time
100 | ScalarType::Timestamp { .. }
101 | ScalarType::TimestampTz { .. }
102 | ScalarType::Interval
103 | ScalarType::Bytes
104 | ScalarType::String
105 | ScalarType::Uuid
106 | ScalarType::MzTimestamp
107 | ScalarType::MzAclItem
108 | ScalarType::AclItem => true,
109 ScalarType::Record { fields, .. } => fields
111 .iter()
112 .all(|(_, field_type)| preserves_order(&field_type.scalar_type)),
113 ScalarType::Float32 | ScalarType::Float64 => false,
116 ScalarType::Numeric { .. } => false,
119 ScalarType::PgLegacyChar
122 | ScalarType::PgLegacyName
123 | ScalarType::Char { .. }
124 | ScalarType::VarChar { .. }
125 | ScalarType::Jsonb
126 | ScalarType::Array(_)
127 | ScalarType::List { .. }
128 | ScalarType::Oid
129 | ScalarType::Map { .. }
130 | ScalarType::RegProc
131 | ScalarType::RegType
132 | ScalarType::RegClass
133 | ScalarType::Int2Vector
134 | ScalarType::Range { .. } => false,
135 }
136}
137
138#[derive(Debug)]
140struct DatumEncoder {
141 nullable: bool,
142 encoder: DatumColumnEncoder,
143}
144
145impl DatumEncoder {
146 fn goodbytes(&self) -> usize {
147 self.encoder.goodbytes()
148 }
149
150 fn push(&mut self, datum: Datum) {
151 assert!(
152 !datum.is_null() || self.nullable,
153 "tried pushing Null into non-nullable column"
154 );
155 self.encoder.push(datum);
156 }
157
158 fn push_invalid(&mut self) {
159 self.encoder.push_invalid();
160 }
161
162 fn finish(self) -> ArrayRef {
163 self.encoder.finish()
164 }
165}
166
167#[derive(Debug)]
173enum DatumColumnEncoder {
174 Bool(BooleanBuilder),
175 U8(UInt8Builder),
176 U16(UInt16Builder),
177 U32(UInt32Builder),
178 U64(UInt64Builder),
179 I16(Int16Builder),
180 I32(Int32Builder),
181 I64(Int64Builder),
182 F32(Float32Builder),
183 F64(Float64Builder),
184 Numeric {
185 binary_values: BinaryBuilder,
187 approx_values: Float64Builder,
189 numeric_context: Context<Numeric>,
191 },
192 Bytes(BinaryBuilder),
193 String(StringBuilder),
194 Date(Int32Builder),
195 Time(FixedSizeBinaryBuilder),
196 Timestamp(FixedSizeBinaryBuilder),
197 TimestampTz(FixedSizeBinaryBuilder),
198 MzTimestamp(UInt64Builder),
199 Interval(FixedSizeBinaryBuilder),
200 Uuid(FixedSizeBinaryBuilder),
201 AclItem(FixedSizeBinaryBuilder),
202 MzAclItem(BinaryBuilder),
203 Range(BinaryBuilder),
204 Jsonb {
211 offsets: Vec<i32>,
213 buf: Vec<u8>,
215 nulls: Option<BooleanBufferBuilder>,
217 },
218 Array {
219 dims: ListBuilder<FixedSizeBinaryBuilder>,
221 val_lengths: Vec<usize>,
223 vals: Box<DatumColumnEncoder>,
225 nulls: Option<BooleanBufferBuilder>,
227 },
228 List {
229 lengths: Vec<usize>,
231 values: Box<DatumColumnEncoder>,
233 nulls: Option<BooleanBufferBuilder>,
235 },
236 Map {
237 lengths: Vec<usize>,
239 keys: StringBuilder,
241 vals: Box<DatumColumnEncoder>,
243 nulls: Option<BooleanBufferBuilder>,
245 },
246 Record {
247 fields: Vec<DatumEncoder>,
249 nulls: Option<BooleanBufferBuilder>,
251 length: usize,
253 },
254 RecordEmpty(BooleanBuilder),
259}
260
261impl DatumColumnEncoder {
262 fn goodbytes(&self) -> usize {
263 match self {
264 DatumColumnEncoder::Bool(a) => a.len(),
265 DatumColumnEncoder::U8(a) => a.values_slice().to_byte_slice().len(),
266 DatumColumnEncoder::U16(a) => a.values_slice().to_byte_slice().len(),
267 DatumColumnEncoder::U32(a) => a.values_slice().to_byte_slice().len(),
268 DatumColumnEncoder::U64(a) => a.values_slice().to_byte_slice().len(),
269 DatumColumnEncoder::I16(a) => a.values_slice().to_byte_slice().len(),
270 DatumColumnEncoder::I32(a) => a.values_slice().to_byte_slice().len(),
271 DatumColumnEncoder::I64(a) => a.values_slice().to_byte_slice().len(),
272 DatumColumnEncoder::F32(a) => a.values_slice().to_byte_slice().len(),
273 DatumColumnEncoder::F64(a) => a.values_slice().to_byte_slice().len(),
274 DatumColumnEncoder::Numeric {
275 binary_values,
276 approx_values,
277 ..
278 } => {
279 binary_values.values_slice().len()
280 + approx_values.values_slice().to_byte_slice().len()
281 }
282 DatumColumnEncoder::Bytes(a) => a.values_slice().len(),
283 DatumColumnEncoder::String(a) => a.values_slice().len(),
284 DatumColumnEncoder::Date(a) => a.values_slice().to_byte_slice().len(),
285 DatumColumnEncoder::Time(a) => a.len() * PackedNaiveTime::SIZE,
286 DatumColumnEncoder::Timestamp(a) => a.len() * PackedNaiveDateTime::SIZE,
287 DatumColumnEncoder::TimestampTz(a) => a.len() * PackedNaiveDateTime::SIZE,
288 DatumColumnEncoder::MzTimestamp(a) => a.values_slice().to_byte_slice().len(),
289 DatumColumnEncoder::Interval(a) => a.len() * PackedInterval::SIZE,
290 DatumColumnEncoder::Uuid(a) => a.len() * size_of::<Uuid>(),
291 DatumColumnEncoder::AclItem(a) => a.len() * PackedAclItem::SIZE,
292 DatumColumnEncoder::MzAclItem(a) => a.values_slice().len(),
293 DatumColumnEncoder::Range(a) => a.values_slice().len(),
294 DatumColumnEncoder::Jsonb { buf, .. } => buf.len(),
295 DatumColumnEncoder::Array { dims, vals, .. } => {
296 dims.len() * PackedArrayDimension::SIZE + vals.goodbytes()
297 }
298 DatumColumnEncoder::List { values, .. } => values.goodbytes(),
299 DatumColumnEncoder::Map { keys, vals, .. } => {
300 keys.values_slice().len() + vals.goodbytes()
301 }
302 DatumColumnEncoder::Record { fields, .. } => fields.iter().map(|f| f.goodbytes()).sum(),
303 DatumColumnEncoder::RecordEmpty(a) => a.len(),
304 }
305 }
306
307 fn push<'e, 'd>(&'e mut self, datum: Datum<'d>) {
308 match (self, datum) {
309 (DatumColumnEncoder::Bool(bool_builder), Datum::True) => {
310 bool_builder.append_value(true)
311 }
312 (DatumColumnEncoder::Bool(bool_builder), Datum::False) => {
313 bool_builder.append_value(false)
314 }
315 (DatumColumnEncoder::U8(builder), Datum::UInt8(val)) => builder.append_value(val),
316 (DatumColumnEncoder::U16(builder), Datum::UInt16(val)) => builder.append_value(val),
317 (DatumColumnEncoder::U32(builder), Datum::UInt32(val)) => builder.append_value(val),
318 (DatumColumnEncoder::U64(builder), Datum::UInt64(val)) => builder.append_value(val),
319 (DatumColumnEncoder::I16(builder), Datum::Int16(val)) => builder.append_value(val),
320 (DatumColumnEncoder::I32(builder), Datum::Int32(val)) => builder.append_value(val),
321 (DatumColumnEncoder::I64(builder), Datum::Int64(val)) => builder.append_value(val),
322 (DatumColumnEncoder::F32(builder), Datum::Float32(val)) => builder.append_value(*val),
323 (DatumColumnEncoder::F64(builder), Datum::Float64(val)) => builder.append_value(*val),
324 (
325 DatumColumnEncoder::Numeric {
326 approx_values,
327 binary_values,
328 numeric_context,
329 },
330 Datum::Numeric(val),
331 ) => {
332 let float_approx = numeric_context.try_into_f64(val.0).unwrap_or_else(|_| {
333 numeric_context.clear_status();
334 if val.0.is_negative() {
335 f64::NEG_INFINITY
336 } else {
337 f64::INFINITY
338 }
339 });
340 let packed = PackedNumeric::from_value(val.0);
341
342 approx_values.append_value(float_approx);
343 binary_values.append_value(packed.as_bytes());
344 }
345 (DatumColumnEncoder::String(builder), Datum::String(val)) => builder.append_value(val),
346 (DatumColumnEncoder::Bytes(builder), Datum::Bytes(val)) => builder.append_value(val),
347 (DatumColumnEncoder::Date(builder), Datum::Date(val)) => {
348 builder.append_value(val.pg_epoch_days())
349 }
350 (DatumColumnEncoder::Time(builder), Datum::Time(val)) => {
351 let packed = PackedNaiveTime::from_value(val);
352 builder
353 .append_value(packed.as_bytes())
354 .expect("known correct size");
355 }
356 (DatumColumnEncoder::Timestamp(builder), Datum::Timestamp(val)) => {
357 let packed = PackedNaiveDateTime::from_value(val.to_naive());
358 builder
359 .append_value(packed.as_bytes())
360 .expect("known correct size");
361 }
362 (DatumColumnEncoder::TimestampTz(builder), Datum::TimestampTz(val)) => {
363 let packed = PackedNaiveDateTime::from_value(val.to_naive());
364 builder
365 .append_value(packed.as_bytes())
366 .expect("known correct size");
367 }
368 (DatumColumnEncoder::MzTimestamp(builder), Datum::MzTimestamp(val)) => {
369 builder.append_value(val.into());
370 }
371 (DatumColumnEncoder::Interval(builder), Datum::Interval(val)) => {
372 let packed = PackedInterval::from_value(val);
373 builder
374 .append_value(packed.as_bytes())
375 .expect("known correct size");
376 }
377 (DatumColumnEncoder::Uuid(builder), Datum::Uuid(val)) => builder
378 .append_value(val.as_bytes())
379 .expect("known correct size"),
380 (DatumColumnEncoder::AclItem(builder), Datum::AclItem(val)) => {
381 let packed = PackedAclItem::from_value(val);
382 builder
383 .append_value(packed.as_bytes())
384 .expect("known correct size");
385 }
386 (DatumColumnEncoder::MzAclItem(builder), Datum::MzAclItem(val)) => {
387 let packed = PackedMzAclItem::from_value(val);
388 builder.append_value(packed.as_bytes());
389 }
390 (DatumColumnEncoder::Range(builder), d @ Datum::Range(_)) => {
391 let proto = ProtoDatum::from(d);
392 let bytes = proto.encode_to_vec();
393 builder.append_value(&bytes);
394 }
395 (
396 DatumColumnEncoder::Jsonb {
397 offsets,
398 buf,
399 nulls,
400 },
401 d @ Datum::JsonNull
402 | d @ Datum::True
403 | d @ Datum::False
404 | d @ Datum::Numeric(_)
405 | d @ Datum::String(_)
406 | d @ Datum::List(_)
407 | d @ Datum::Map(_),
408 ) => {
409 let mut buf = buf;
411 let json = JsonbRef::from_datum(d);
412
413 json.to_writer(&mut buf)
415 .expect("failed to serialize Datum to jsonb");
416 let offset: i32 = buf.len().try_into().expect("wrote more than 4GB of JSON");
417 offsets.push(offset);
418
419 if let Some(nulls) = nulls {
420 nulls.append(true);
421 }
422 }
423 (
424 DatumColumnEncoder::Array {
425 dims,
426 val_lengths,
427 vals,
428 nulls,
429 },
430 Datum::Array(array),
431 ) => {
432 for dimension in array.dims() {
434 let packed = PackedArrayDimension::from_value(dimension);
435 dims.values()
436 .append_value(packed.as_bytes())
437 .expect("known correct size");
438 }
439 dims.append(true);
440
441 let mut count = 0;
443 for datum in &array.elements() {
444 count += 1;
445 vals.push(datum);
446 }
447 val_lengths.push(count);
448
449 if let Some(nulls) = nulls {
450 nulls.append(true);
451 }
452 }
453 (
454 DatumColumnEncoder::List {
455 lengths,
456 values,
457 nulls,
458 },
459 Datum::List(list),
460 ) => {
461 let mut count = 0;
462 for datum in &list {
463 count += 1;
464 values.push(datum);
465 }
466 lengths.push(count);
467
468 if let Some(nulls) = nulls {
469 nulls.append(true);
470 }
471 }
472 (
473 DatumColumnEncoder::Map {
474 lengths,
475 keys,
476 vals,
477 nulls,
478 },
479 Datum::Map(map),
480 ) => {
481 let mut count = 0;
482 for (key, datum) in &map {
483 count += 1;
484 keys.append_value(key);
485 vals.push(datum);
486 }
487 lengths.push(count);
488
489 if let Some(nulls) = nulls {
490 nulls.append(true);
491 }
492 }
493 (
494 DatumColumnEncoder::Record {
495 fields,
496 nulls,
497 length,
498 },
499 Datum::List(records),
500 ) => {
501 let mut count = 0;
502 for (datum, encoder) in records.into_iter().zip_eq(fields.iter_mut()) {
504 count += 1;
505 encoder.push(datum);
506 }
507 assert_eq!(count, fields.len());
508
509 length.add_assign(1);
510 if let Some(nulls) = nulls.as_mut() {
511 nulls.append(true);
512 }
513 }
514 (DatumColumnEncoder::RecordEmpty(builder), Datum::List(records)) => {
515 assert_none!(records.into_iter().next());
516 builder.append_value(true);
517 }
518 (encoder, Datum::Null) => encoder.push_invalid(),
519 (encoder, datum) => panic!("can't encode {datum:?} into {encoder:?}"),
520 }
521 }
522
523 fn push_invalid(&mut self) {
524 match self {
525 DatumColumnEncoder::Bool(builder) => builder.append_null(),
526 DatumColumnEncoder::U8(builder) => builder.append_null(),
527 DatumColumnEncoder::U16(builder) => builder.append_null(),
528 DatumColumnEncoder::U32(builder) => builder.append_null(),
529 DatumColumnEncoder::U64(builder) => builder.append_null(),
530 DatumColumnEncoder::I16(builder) => builder.append_null(),
531 DatumColumnEncoder::I32(builder) => builder.append_null(),
532 DatumColumnEncoder::I64(builder) => builder.append_null(),
533 DatumColumnEncoder::F32(builder) => builder.append_null(),
534 DatumColumnEncoder::F64(builder) => builder.append_null(),
535 DatumColumnEncoder::Numeric {
536 approx_values,
537 binary_values,
538 numeric_context: _,
539 } => {
540 approx_values.append_null();
541 binary_values.append_null();
542 }
543 DatumColumnEncoder::String(builder) => builder.append_null(),
544 DatumColumnEncoder::Bytes(builder) => builder.append_null(),
545 DatumColumnEncoder::Date(builder) => builder.append_null(),
546 DatumColumnEncoder::Time(builder) => builder.append_null(),
547 DatumColumnEncoder::Timestamp(builder) => builder.append_null(),
548 DatumColumnEncoder::TimestampTz(builder) => builder.append_null(),
549 DatumColumnEncoder::MzTimestamp(builder) => builder.append_null(),
550 DatumColumnEncoder::Interval(builder) => builder.append_null(),
551 DatumColumnEncoder::Uuid(builder) => builder.append_null(),
552 DatumColumnEncoder::AclItem(builder) => builder.append_null(),
553 DatumColumnEncoder::MzAclItem(builder) => builder.append_null(),
554 DatumColumnEncoder::Range(builder) => builder.append_null(),
555 DatumColumnEncoder::Jsonb {
556 offsets,
557 buf: _,
558 nulls,
559 } => {
560 let nulls = nulls.get_or_insert_with(|| {
561 let mut buf = BooleanBufferBuilder::new(offsets.len());
562 buf.append_n(offsets.len() - 1, true);
564 buf
565 });
566
567 offsets.push(offsets.last().copied().unwrap_or(0));
568 nulls.append(false);
569 }
570 DatumColumnEncoder::Array {
571 dims,
572 val_lengths,
573 vals: _,
574 nulls,
575 } => {
576 let nulls = nulls.get_or_insert_with(|| {
577 let mut buf = BooleanBufferBuilder::new(dims.len() + 1);
578 buf.append_n(dims.len(), true);
579 buf
580 });
581 dims.append_null();
582
583 val_lengths.push(0);
584 nulls.append(false);
585 }
586 DatumColumnEncoder::List {
587 lengths,
588 values: _,
589 nulls,
590 } => {
591 let nulls = nulls.get_or_insert_with(|| {
592 let mut buf = BooleanBufferBuilder::new(lengths.len() + 1);
593 buf.append_n(lengths.len(), true);
594 buf
595 });
596
597 lengths.push(0);
598 nulls.append(false);
599 }
600 DatumColumnEncoder::Map {
601 lengths,
602 keys: _,
603 vals: _,
604 nulls,
605 } => {
606 let nulls = nulls.get_or_insert_with(|| {
607 let mut buf = BooleanBufferBuilder::new(lengths.len() + 1);
608 buf.append_n(lengths.len(), true);
609 buf
610 });
611
612 lengths.push(0);
613 nulls.append(false);
614 }
615 DatumColumnEncoder::Record {
616 fields,
617 nulls,
618 length,
619 } => {
620 let nulls = nulls.get_or_insert_with(|| {
621 let mut buf = BooleanBufferBuilder::new(*length + 1);
622 buf.append_n(*length, true);
623 buf
624 });
625 nulls.append(false);
626 length.add_assign(1);
627
628 for field in fields {
629 field.push_invalid();
630 }
631 }
632 DatumColumnEncoder::RecordEmpty(builder) => builder.append_null(),
633 }
634 }
635
636 fn finish(self) -> ArrayRef {
637 match self {
638 DatumColumnEncoder::Bool(mut builder) => {
639 let array = builder.finish();
640 Arc::new(array)
641 }
642 DatumColumnEncoder::U8(mut builder) => {
643 let array = builder.finish();
644 Arc::new(array)
645 }
646 DatumColumnEncoder::U16(mut builder) => {
647 let array = builder.finish();
648 Arc::new(array)
649 }
650 DatumColumnEncoder::U32(mut builder) => {
651 let array = builder.finish();
652 Arc::new(array)
653 }
654 DatumColumnEncoder::U64(mut builder) => {
655 let array = builder.finish();
656 Arc::new(array)
657 }
658 DatumColumnEncoder::I16(mut builder) => {
659 let array = builder.finish();
660 Arc::new(array)
661 }
662 DatumColumnEncoder::I32(mut builder) => {
663 let array = builder.finish();
664 Arc::new(array)
665 }
666 DatumColumnEncoder::I64(mut builder) => {
667 let array = builder.finish();
668 Arc::new(array)
669 }
670 DatumColumnEncoder::F32(mut builder) => {
671 let array = builder.finish();
672 Arc::new(array)
673 }
674 DatumColumnEncoder::F64(mut builder) => {
675 let array = builder.finish();
676 Arc::new(array)
677 }
678 DatumColumnEncoder::Numeric {
679 mut approx_values,
680 mut binary_values,
681 numeric_context: _,
682 } => {
683 let approx_array = approx_values.finish();
684 let binary_array = binary_values.finish();
685
686 assert_eq!(approx_array.len(), binary_array.len());
687 debug_assert_eq!(approx_array.logical_nulls(), binary_array.logical_nulls());
689
690 let fields = Fields::from(vec![
691 Field::new("approx", approx_array.data_type().clone(), true),
692 Field::new("binary", binary_array.data_type().clone(), true),
693 ]);
694 let nulls = approx_array.logical_nulls();
695 let array = StructArray::new(
696 fields,
697 vec![Arc::new(approx_array), Arc::new(binary_array)],
698 nulls,
699 );
700 Arc::new(array)
701 }
702 DatumColumnEncoder::String(mut builder) => {
703 let array = builder.finish();
704 Arc::new(array)
705 }
706 DatumColumnEncoder::Bytes(mut builder) => {
707 let array = builder.finish();
708 Arc::new(array)
709 }
710 DatumColumnEncoder::Date(mut builder) => {
711 let array = builder.finish();
712 Arc::new(array)
713 }
714 DatumColumnEncoder::Time(mut builder) => {
715 let array = builder.finish();
716 Arc::new(array)
717 }
718 DatumColumnEncoder::Timestamp(mut builder) => {
719 let array = builder.finish();
720 Arc::new(array)
721 }
722 DatumColumnEncoder::TimestampTz(mut builder) => {
723 let array = builder.finish();
724 Arc::new(array)
725 }
726 DatumColumnEncoder::MzTimestamp(mut builder) => {
727 let array = builder.finish();
728 Arc::new(array)
729 }
730 DatumColumnEncoder::Interval(mut builder) => {
731 let array = builder.finish();
732 Arc::new(array)
733 }
734 DatumColumnEncoder::Uuid(mut builder) => {
735 let array = builder.finish();
736 Arc::new(array)
737 }
738 DatumColumnEncoder::AclItem(mut builder) => Arc::new(builder.finish()),
739 DatumColumnEncoder::MzAclItem(mut builder) => Arc::new(builder.finish()),
740 DatumColumnEncoder::Range(mut builder) => Arc::new(builder.finish()),
741 DatumColumnEncoder::Jsonb {
742 offsets,
743 buf,
744 mut nulls,
745 } => {
746 let values = Buffer::from_vec(buf);
747 let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
748 let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
749 let array = StringArray::new(offsets, values, nulls);
750 Arc::new(array)
751 }
752 DatumColumnEncoder::Array {
753 mut dims,
754 val_lengths,
755 vals,
756 mut nulls,
757 } => {
758 let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
759 let vals = vals.finish();
760
761 let field = Field::new_list_field(vals.data_type().clone(), true);
764 let val_offsets = OffsetBuffer::from_lengths(val_lengths);
765 let values =
766 ListArray::new(Arc::new(field), val_offsets, Arc::new(vals), nulls.clone());
767
768 let dims = dims.finish();
769 assert_eq!(values.len(), dims.len());
770
771 let fields = Fields::from(vec![
774 Field::new("dims", dims.data_type().clone(), true),
775 Field::new("vals", values.data_type().clone(), true),
776 ]);
777 let array = StructArray::new(fields, vec![Arc::new(dims), Arc::new(values)], nulls);
778
779 Arc::new(array)
780 }
781 DatumColumnEncoder::List {
782 lengths,
783 values,
784 mut nulls,
785 } => {
786 let values = values.finish();
787
788 let field = Field::new_list_field(values.data_type().clone(), true);
791 let offsets = OffsetBuffer::<i32>::from_lengths(lengths.iter().copied());
792 let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
793
794 let array = ListArray::new(Arc::new(field), offsets, values, nulls);
795 Arc::new(array)
796 }
797 DatumColumnEncoder::Map {
798 lengths,
799 mut keys,
800 vals,
801 mut nulls,
802 } => {
803 let keys = keys.finish();
804 let vals = vals.finish();
805
806 let offsets = OffsetBuffer::<i32>::from_lengths(lengths.iter().copied());
807 let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
808
809 assert_none!(keys.logical_nulls());
812 let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false));
813 let val_field = Arc::new(Field::new("val", vals.data_type().clone(), true));
814 let fields = Fields::from(vec![Arc::clone(&key_field), Arc::clone(&val_field)]);
815 let entries = StructArray::new(fields, vec![Arc::new(keys), vals], None);
816
817 let field = Field::new("map_entries", entries.data_type().clone(), false);
820 let array = ListArray::new(Arc::new(field), offsets, Arc::new(entries), nulls);
821 Arc::new(array)
822 }
823 DatumColumnEncoder::Record {
824 fields,
825 mut nulls,
826 length: _,
827 } => {
828 let (fields, arrays): (Vec<_>, Vec<_>) = fields
829 .into_iter()
830 .enumerate()
831 .map(|(tag, encoder)| {
832 let nullable = true;
838 let array = encoder.finish();
839 let field =
840 Field::new(tag.to_string(), array.data_type().clone(), nullable);
841 (field, array)
842 })
843 .unzip();
844 let nulls = nulls.as_mut().map(|n| NullBuffer::from(n.finish()));
845
846 let array = StructArray::new(Fields::from(fields), arrays, nulls);
847 Arc::new(array)
848 }
849 DatumColumnEncoder::RecordEmpty(mut builder) => Arc::new(builder.finish()),
850 }
851 }
852}
853
854#[derive(Debug)]
859enum DatumColumnDecoder {
860 Bool(BooleanArray),
861 U8(UInt8Array),
862 U16(UInt16Array),
863 U32(UInt32Array),
864 U64(UInt64Array),
865 I16(Int16Array),
866 I32(Int32Array),
867 I64(Int64Array),
868 F32(Float32Array),
869 F64(Float64Array),
870 Numeric(BinaryArray),
871 Bytes(BinaryArray),
872 String(StringArray),
873 Date(Int32Array),
874 Time(FixedSizeBinaryArray),
875 Timestamp(FixedSizeBinaryArray),
876 TimestampTz(FixedSizeBinaryArray),
877 MzTimestamp(UInt64Array),
878 Interval(FixedSizeBinaryArray),
879 Uuid(FixedSizeBinaryArray),
880 Json(StringArray),
881 Array {
882 dim_offsets: OffsetBuffer<i32>,
883 dims: FixedSizeBinaryArray,
884 val_offsets: OffsetBuffer<i32>,
885 vals: Box<DatumColumnDecoder>,
886 nulls: Option<NullBuffer>,
887 },
888 List {
889 offsets: OffsetBuffer<i32>,
890 values: Box<DatumColumnDecoder>,
891 nulls: Option<NullBuffer>,
892 },
893 Map {
894 offsets: OffsetBuffer<i32>,
895 keys: StringArray,
896 vals: Box<DatumColumnDecoder>,
897 nulls: Option<NullBuffer>,
898 },
899 RecordEmpty(BooleanArray),
900 Record {
901 fields: Vec<Box<DatumColumnDecoder>>,
902 nulls: Option<NullBuffer>,
903 },
904 Range(BinaryArray),
905 MzAclItem(BinaryArray),
906 AclItem(FixedSizeBinaryArray),
907}
908
909impl DatumColumnDecoder {
910 fn get<'a>(&'a self, idx: usize, packer: &'a mut RowPacker) {
911 let datum = match self {
912 DatumColumnDecoder::Bool(array) => array
913 .is_valid(idx)
914 .then(|| array.value(idx))
915 .map(|x| if x { Datum::True } else { Datum::False }),
916 DatumColumnDecoder::U8(array) => array
917 .is_valid(idx)
918 .then(|| array.value(idx))
919 .map(Datum::UInt8),
920 DatumColumnDecoder::U16(array) => array
921 .is_valid(idx)
922 .then(|| array.value(idx))
923 .map(Datum::UInt16),
924 DatumColumnDecoder::U32(array) => array
925 .is_valid(idx)
926 .then(|| array.value(idx))
927 .map(Datum::UInt32),
928 DatumColumnDecoder::U64(array) => array
929 .is_valid(idx)
930 .then(|| array.value(idx))
931 .map(Datum::UInt64),
932 DatumColumnDecoder::I16(array) => array
933 .is_valid(idx)
934 .then(|| array.value(idx))
935 .map(Datum::Int16),
936 DatumColumnDecoder::I32(array) => array
937 .is_valid(idx)
938 .then(|| array.value(idx))
939 .map(Datum::Int32),
940 DatumColumnDecoder::I64(array) => array
941 .is_valid(idx)
942 .then(|| array.value(idx))
943 .map(Datum::Int64),
944 DatumColumnDecoder::F32(array) => array
945 .is_valid(idx)
946 .then(|| array.value(idx))
947 .map(|x| Datum::Float32(ordered_float::OrderedFloat(x))),
948 DatumColumnDecoder::F64(array) => array
949 .is_valid(idx)
950 .then(|| array.value(idx))
951 .map(|x| Datum::Float64(ordered_float::OrderedFloat(x))),
952 DatumColumnDecoder::Numeric(array) => array.is_valid(idx).then(|| {
953 let val = array.value(idx);
954 let val = PackedNumeric::from_bytes(val)
955 .expect("failed to roundtrip Numeric")
956 .into_value();
957 Datum::Numeric(OrderedDecimal(val))
958 }),
959 DatumColumnDecoder::String(array) => array
960 .is_valid(idx)
961 .then(|| array.value(idx))
962 .map(Datum::String),
963 DatumColumnDecoder::Bytes(array) => array
964 .is_valid(idx)
965 .then(|| array.value(idx))
966 .map(Datum::Bytes),
967 DatumColumnDecoder::Date(array) => {
968 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
969 let date = Date::from_pg_epoch(x).expect("failed to roundtrip");
970 Datum::Date(date)
971 })
972 }
973 DatumColumnDecoder::Time(array) => {
974 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
975 let packed = PackedNaiveTime::from_bytes(x).expect("failed to roundtrip time");
976 Datum::Time(packed.into_value())
977 })
978 }
979 DatumColumnDecoder::Timestamp(array) => {
980 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
981 let packed = PackedNaiveDateTime::from_bytes(x)
982 .expect("failed to roundtrip PackedNaiveDateTime");
983 let timestamp = CheckedTimestamp::from_timestamplike(packed.into_value())
984 .expect("failed to roundtrip timestamp");
985 Datum::Timestamp(timestamp)
986 })
987 }
988 DatumColumnDecoder::TimestampTz(array) => {
989 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
990 let packed = PackedNaiveDateTime::from_bytes(x)
991 .expect("failed to roundtrip PackedNaiveDateTime");
992 let timestamp =
993 CheckedTimestamp::from_timestamplike(packed.into_value().and_utc())
994 .expect("failed to roundtrip timestamp");
995 Datum::TimestampTz(timestamp)
996 })
997 }
998 DatumColumnDecoder::MzTimestamp(array) => array
999 .is_valid(idx)
1000 .then(|| array.value(idx))
1001 .map(|x| Datum::MzTimestamp(Timestamp::from(x))),
1002 DatumColumnDecoder::Interval(array) => {
1003 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1004 let packed =
1005 PackedInterval::from_bytes(x).expect("failed to roundtrip interval");
1006 Datum::Interval(packed.into_value())
1007 })
1008 }
1009 DatumColumnDecoder::Uuid(array) => {
1010 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1011 let uuid = Uuid::from_slice(x).expect("failed to roundtrip uuid");
1012 Datum::Uuid(uuid)
1013 })
1014 }
1015 DatumColumnDecoder::AclItem(array) => {
1016 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1017 let packed =
1018 PackedAclItem::from_bytes(x).expect("failed to roundtrip MzAclItem");
1019 Datum::AclItem(packed.into_value())
1020 })
1021 }
1022 DatumColumnDecoder::MzAclItem(array) => {
1023 array.is_valid(idx).then(|| array.value(idx)).map(|x| {
1024 let packed =
1025 PackedMzAclItem::from_bytes(x).expect("failed to roundtrip MzAclItem");
1026 Datum::MzAclItem(packed.into_value())
1027 })
1028 }
1029 DatumColumnDecoder::Range(array) => {
1030 let Some(val) = array.is_valid(idx).then(|| array.value(idx)) else {
1031 packer.push(Datum::Null);
1032 return;
1033 };
1034
1035 let proto = ProtoDatum::decode(val).expect("failed to roundtrip Range");
1036 packer
1037 .try_push_proto(&proto)
1038 .expect("failed to pack ProtoRange");
1039
1040 return;
1042 }
1043 DatumColumnDecoder::Json(array) => {
1044 let Some(val) = array.is_valid(idx).then(|| array.value(idx)) else {
1045 packer.push(Datum::Null);
1046 return;
1047 };
1048 JsonbPacker::new(packer)
1049 .pack_str(val)
1050 .expect("failed to roundtrip JSON");
1051
1052 return;
1054 }
1055 DatumColumnDecoder::Array {
1056 dim_offsets,
1057 dims,
1058 val_offsets,
1059 vals,
1060 nulls,
1061 } => {
1062 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1063 if !is_valid {
1064 packer.push(Datum::Null);
1065 return;
1066 }
1067
1068 let start: usize = dim_offsets[idx]
1069 .try_into()
1070 .expect("unexpected negative offset");
1071 let end: usize = dim_offsets[idx + 1]
1072 .try_into()
1073 .expect("unexpected negative offset");
1074 let dimensions = (start..end).map(|idx| {
1075 PackedArrayDimension::from_bytes(dims.value(idx))
1076 .expect("failed to roundtrip ArrayDimension")
1077 .into_value()
1078 });
1079
1080 let start: usize = val_offsets[idx]
1081 .try_into()
1082 .expect("unexpected negative offset");
1083 let end: usize = val_offsets[idx + 1]
1084 .try_into()
1085 .expect("unexpected negative offset");
1086 packer
1087 .push_array_with_row_major(dimensions, |packer| {
1088 for x in start..end {
1089 vals.get(x, packer);
1090 }
1091 end - start
1093 })
1094 .expect("failed to pack Array");
1095
1096 return;
1098 }
1099 DatumColumnDecoder::List {
1100 offsets,
1101 values,
1102 nulls,
1103 } => {
1104 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1105 if !is_valid {
1106 packer.push(Datum::Null);
1107 return;
1108 }
1109
1110 let start: usize = offsets[idx].try_into().expect("unexpected negative offset");
1111 let end: usize = offsets[idx + 1]
1112 .try_into()
1113 .expect("unexpected negative offset");
1114
1115 packer.push_list_with(|packer| {
1116 for idx in start..end {
1117 values.get(idx, packer)
1118 }
1119 });
1120
1121 return;
1123 }
1124 DatumColumnDecoder::Map {
1125 offsets,
1126 keys,
1127 vals,
1128 nulls,
1129 } => {
1130 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1131 if !is_valid {
1132 packer.push(Datum::Null);
1133 return;
1134 }
1135
1136 let start: usize = offsets[idx].try_into().expect("unexpected negative offset");
1137 let end: usize = offsets[idx + 1]
1138 .try_into()
1139 .expect("unexpected negative offset");
1140
1141 packer.push_dict_with(|packer| {
1142 for idx in start..end {
1143 packer.push(Datum::String(keys.value(idx)));
1144 vals.get(idx, packer);
1145 }
1146 });
1147
1148 return;
1150 }
1151 DatumColumnDecoder::RecordEmpty(array) => array.is_valid(idx).then(Datum::empty_list),
1152 DatumColumnDecoder::Record { fields, nulls } => {
1153 let is_valid = nulls.as_ref().map(|n| n.is_valid(idx)).unwrap_or(true);
1154 if !is_valid {
1155 packer.push(Datum::Null);
1156 return;
1157 }
1158
1159 packer.push_list_with(|packer| {
1161 for field in fields {
1162 field.get(idx, packer);
1163 }
1164 });
1165
1166 return;
1168 }
1169 };
1170
1171 match datum {
1172 Some(d) => packer.push(d),
1173 None => packer.push(Datum::Null),
1174 }
1175 }
1176
1177 fn stats(&self) -> ColumnStatKinds {
1178 match self {
1179 DatumColumnDecoder::Bool(a) => PrimitiveStats::<bool>::from_column(a).into(),
1180 DatumColumnDecoder::U8(a) => PrimitiveStats::<u8>::from_column(a).into(),
1181 DatumColumnDecoder::U16(a) => PrimitiveStats::<u16>::from_column(a).into(),
1182 DatumColumnDecoder::U32(a) => PrimitiveStats::<u32>::from_column(a).into(),
1183 DatumColumnDecoder::U64(a) => PrimitiveStats::<u64>::from_column(a).into(),
1184 DatumColumnDecoder::I16(a) => PrimitiveStats::<i16>::from_column(a).into(),
1185 DatumColumnDecoder::I32(a) => PrimitiveStats::<i32>::from_column(a).into(),
1186 DatumColumnDecoder::I64(a) => PrimitiveStats::<i64>::from_column(a).into(),
1187 DatumColumnDecoder::F32(a) => PrimitiveStats::<f32>::from_column(a).into(),
1188 DatumColumnDecoder::F64(a) => PrimitiveStats::<f64>::from_column(a).into(),
1189 DatumColumnDecoder::Numeric(a) => numeric_stats_from_column(a),
1190 DatumColumnDecoder::String(a) => PrimitiveStats::<String>::from_column(a).into(),
1191 DatumColumnDecoder::Bytes(a) => PrimitiveStats::<Vec<u8>>::from_column(a).into(),
1192 DatumColumnDecoder::Date(a) => PrimitiveStats::<i32>::from_column(a).into(),
1193 DatumColumnDecoder::Time(a) => {
1194 fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedTime)
1195 }
1196 DatumColumnDecoder::Timestamp(a) => {
1197 fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedDateTime)
1198 }
1199 DatumColumnDecoder::TimestampTz(a) => {
1200 fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedDateTime)
1201 }
1202 DatumColumnDecoder::MzTimestamp(a) => PrimitiveStats::<u64>::from_column(a).into(),
1203 DatumColumnDecoder::Interval(a) => {
1204 fixed_stats_from_column(a, FixedSizeBytesStatsKind::PackedInterval)
1205 }
1206 DatumColumnDecoder::Uuid(a) => {
1207 fixed_stats_from_column(a, FixedSizeBytesStatsKind::Uuid)
1208 }
1209 DatumColumnDecoder::AclItem(_)
1210 | DatumColumnDecoder::MzAclItem(_)
1211 | DatumColumnDecoder::Range(_) => ColumnStatKinds::None,
1212 DatumColumnDecoder::Json(a) => stats_for_json(a.iter()).values,
1213 DatumColumnDecoder::Array { .. }
1214 | DatumColumnDecoder::List { .. }
1215 | DatumColumnDecoder::Map { .. }
1216 | DatumColumnDecoder::Record { .. }
1217 | DatumColumnDecoder::RecordEmpty(_) => ColumnStatKinds::None,
1218 }
1219 }
1220
1221 fn goodbytes(&self) -> usize {
1222 match self {
1223 DatumColumnDecoder::Bool(a) => ArrayOrd::Bool(a.clone()).goodbytes(),
1224 DatumColumnDecoder::U8(a) => ArrayOrd::UInt8(a.clone()).goodbytes(),
1225 DatumColumnDecoder::U16(a) => ArrayOrd::UInt16(a.clone()).goodbytes(),
1226 DatumColumnDecoder::U32(a) => ArrayOrd::UInt32(a.clone()).goodbytes(),
1227 DatumColumnDecoder::U64(a) => ArrayOrd::UInt64(a.clone()).goodbytes(),
1228 DatumColumnDecoder::I16(a) => ArrayOrd::Int16(a.clone()).goodbytes(),
1229 DatumColumnDecoder::I32(a) => ArrayOrd::Int32(a.clone()).goodbytes(),
1230 DatumColumnDecoder::I64(a) => ArrayOrd::Int64(a.clone()).goodbytes(),
1231 DatumColumnDecoder::F32(a) => ArrayOrd::Float32(a.clone()).goodbytes(),
1232 DatumColumnDecoder::F64(a) => ArrayOrd::Float64(a.clone()).goodbytes(),
1233 DatumColumnDecoder::Numeric(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1234 DatumColumnDecoder::String(a) => ArrayOrd::String(a.clone()).goodbytes(),
1235 DatumColumnDecoder::Bytes(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1236 DatumColumnDecoder::Date(a) => ArrayOrd::Int32(a.clone()).goodbytes(),
1237 DatumColumnDecoder::Time(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1238 DatumColumnDecoder::Timestamp(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1239 DatumColumnDecoder::TimestampTz(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1240 DatumColumnDecoder::MzTimestamp(a) => ArrayOrd::UInt64(a.clone()).goodbytes(),
1241 DatumColumnDecoder::Interval(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1242 DatumColumnDecoder::Uuid(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1243 DatumColumnDecoder::AclItem(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(),
1244 DatumColumnDecoder::MzAclItem(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1245 DatumColumnDecoder::Range(a) => ArrayOrd::Binary(a.clone()).goodbytes(),
1246 DatumColumnDecoder::Json(a) => ArrayOrd::String(a.clone()).goodbytes(),
1247 DatumColumnDecoder::Array { dims, vals, .. } => {
1248 (dims.len() * PackedArrayDimension::SIZE) + vals.goodbytes()
1249 }
1250 DatumColumnDecoder::List { values, .. } => values.goodbytes(),
1251 DatumColumnDecoder::Map { keys, vals, .. } => {
1252 ArrayOrd::String(keys.clone()).goodbytes() + vals.goodbytes()
1253 }
1254 DatumColumnDecoder::Record { fields, .. } => fields.iter().map(|f| f.goodbytes()).sum(),
1255 DatumColumnDecoder::RecordEmpty(a) => ArrayOrd::Bool(a.clone()).goodbytes(),
1256 }
1257 }
1258}
1259
1260impl Schema<Row> for RelationDesc {
1261 type ArrowColumn = arrow::array::StructArray;
1262 type Statistics = OptionStats<StructStats>;
1263
1264 type Decoder = RowColumnarDecoder;
1265 type Encoder = RowColumnarEncoder;
1266
1267 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1268 RowColumnarDecoder::new(col, self)
1269 }
1270
1271 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1272 RowColumnarEncoder::new(self)
1273 .ok_or_else(|| anyhow::anyhow!("Cannot encode a RelationDesc with no columns"))
1274 }
1275}
1276
1277#[derive(Debug)]
1279pub struct RowColumnarDecoder {
1280 len: usize,
1283 decoders: Vec<(Arc<str>, Option<usize>, DatumColumnDecoder)>,
1286 nullability: Option<NullBuffer>,
1289}
1290
1291fn mask_nulls(column: &ArrayRef, null_mask: Option<&NullBuffer>) -> ArrayRef {
1293 if null_mask.is_none() {
1294 Arc::clone(column)
1295 } else {
1296 let nulls = NullBuffer::union(null_mask, column.nulls());
1299 let data = column
1300 .to_data()
1301 .into_builder()
1302 .nulls(nulls)
1303 .build()
1304 .expect("changed only null mask");
1305 make_array(data)
1306 }
1307}
1308
1309impl RowColumnarDecoder {
1310 pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1315 let inner_columns = col.columns();
1316 let desc_columns = desc.typ().columns();
1317
1318 if desc_columns.len() > inner_columns.len() {
1319 anyhow::bail!(
1320 "provided array has too few columns! {desc_columns:?} > {inner_columns:?}"
1321 );
1322 }
1323
1324 let mut decoders = Vec::with_capacity(desc_columns.len());
1326
1327 let null_mask = col.nulls();
1328
1329 for (col_idx, col_name, col_type) in desc.iter_all() {
1331 let field_name = col_idx.to_stable_name();
1332 let column = col.column_by_name(&field_name).ok_or_else(|| {
1333 anyhow::anyhow!(
1334 "StructArray did not contain column name {field_name}, found {:?}",
1335 col.column_names()
1336 )
1337 })?;
1338 let column = mask_nulls(column, null_mask);
1339 let null_count = col_type.nullable.then(|| column.null_count());
1340 let decoder = array_to_decoder(&column, &col_type.scalar_type)?;
1341 decoders.push((col_name.as_str().into(), null_count, decoder));
1342 }
1343
1344 Ok(RowColumnarDecoder {
1345 len: col.len(),
1346 decoders,
1347 nullability: col.logical_nulls(),
1348 })
1349 }
1350
1351 pub fn null_count(&self) -> usize {
1355 self.nullability.as_ref().map_or(0, |n| n.null_count())
1356 }
1357}
1358
1359impl ColumnDecoder<Row> for RowColumnarDecoder {
1360 fn decode(&self, idx: usize, val: &mut Row) {
1361 let mut packer = val.packer();
1362
1363 for (_, _, decoder) in &self.decoders {
1364 decoder.get(idx, &mut packer);
1365 }
1366 }
1367
1368 fn is_null(&self, idx: usize) -> bool {
1369 let Some(nullability) = self.nullability.as_ref() else {
1370 return false;
1371 };
1372 nullability.is_null(idx)
1373 }
1374
1375 fn goodbytes(&self) -> usize {
1376 let decoders_size: usize = self
1377 .decoders
1378 .iter()
1379 .map(|(_name, _null_count, decoder)| decoder.goodbytes())
1380 .sum();
1381
1382 decoders_size
1383 + self
1384 .nullability
1385 .as_ref()
1386 .map(|nulls| nulls.inner().inner().len())
1387 .unwrap_or(0)
1388 }
1389
1390 fn stats(&self) -> StructStats {
1391 StructStats {
1392 len: self.len,
1393 cols: self
1394 .decoders
1395 .iter()
1396 .map(|(name, null_count, decoder)| {
1397 let name = name.to_string();
1398 let stats = ColumnarStats {
1399 nulls: null_count.map(|count| ColumnNullStats { count }),
1400 values: decoder.stats(),
1401 };
1402 (name, stats)
1403 })
1404 .collect(),
1405 }
1406 }
1407}
1408
1409#[derive(Debug)]
1411pub struct RowColumnarEncoder {
1412 encoders: Vec<DatumEncoder>,
1413 col_names: Vec<(usize, Arc<str>)>,
1415 nullability: BooleanBufferBuilder,
1417}
1418
1419impl RowColumnarEncoder {
1420 pub fn new(desc: &RelationDesc) -> Option<Self> {
1430 if desc.typ().columns().is_empty() {
1431 return None;
1432 }
1433
1434 let (col_names, encoders): (Vec<_>, Vec<_>) = desc
1435 .iter_all()
1436 .map(|(col_idx, col_name, col_type)| {
1437 let encoder = scalar_type_to_encoder(&col_type.scalar_type)
1438 .expect("failed to create encoder");
1439 let encoder = DatumEncoder {
1440 nullable: col_type.nullable,
1441 encoder,
1442 };
1443
1444 let name = (col_idx.to_raw(), col_name.as_str().into());
1447
1448 (name, encoder)
1449 })
1450 .unzip();
1451
1452 Some(RowColumnarEncoder {
1453 encoders,
1454 col_names,
1455 nullability: BooleanBufferBuilder::new(100),
1456 })
1457 }
1458}
1459
1460impl ColumnEncoder<Row> for RowColumnarEncoder {
1461 type FinishedColumn = StructArray;
1462
1463 fn goodbytes(&self) -> usize {
1464 self.encoders.iter().map(|e| e.goodbytes()).sum()
1465 }
1466
1467 fn append(&mut self, val: &Row) {
1468 let mut num_datums = 0;
1469 for (datum, encoder) in val.iter().zip(self.encoders.iter_mut()) {
1470 encoder.push(datum);
1471 num_datums += 1;
1472 }
1473 assert_eq!(
1474 num_datums,
1475 self.encoders.len(),
1476 "tried to encode {val:?}, but only have {:?}",
1477 self.encoders
1478 );
1479
1480 self.nullability.append(true);
1481 }
1482
1483 fn append_null(&mut self) {
1484 for encoder in self.encoders.iter_mut() {
1485 encoder.push_invalid();
1486 }
1487 self.nullability.append(false);
1488 }
1489
1490 fn finish(self) -> Self::FinishedColumn {
1491 let RowColumnarEncoder {
1492 encoders,
1493 col_names,
1494 nullability,
1495 ..
1496 } = self;
1497
1498 let (arrays, fields): (Vec<_>, Vec<_>) = col_names
1499 .iter()
1500 .zip_eq(encoders)
1501 .map(|((col_idx, _col_name), encoder)| {
1502 let nullable = true;
1508 let array = encoder.finish();
1509 let field = Field::new(col_idx.to_string(), array.data_type().clone(), nullable);
1510
1511 (array, field)
1512 })
1513 .multiunzip();
1514
1515 let null_buffer = NullBuffer::from(BooleanBuffer::from(nullability));
1516
1517 let array = StructArray::new(Fields::from(fields), arrays, Some(null_buffer));
1518
1519 array
1520 }
1521}
1522
1523#[inline]
1528fn downcast_array<T: 'static>(array: &Arc<dyn Array>) -> Result<&T, anyhow::Error> {
1529 array
1530 .as_any()
1531 .downcast_ref::<T>()
1532 .ok_or_else(|| anyhow!("expected {}, found {array:?}", std::any::type_name::<T>()))
1533}
1534
1535fn array_to_decoder(
1540 array: &Arc<dyn Array>,
1541 col_ty: &ScalarType,
1542) -> Result<DatumColumnDecoder, anyhow::Error> {
1543 let decoder = match (array.data_type(), col_ty) {
1544 (DataType::Boolean, ScalarType::Bool) => {
1545 let array = downcast_array::<BooleanArray>(array)?;
1546 DatumColumnDecoder::Bool(array.clone())
1547 }
1548 (DataType::UInt8, ScalarType::PgLegacyChar) => {
1549 let array = downcast_array::<UInt8Array>(array)?;
1550 DatumColumnDecoder::U8(array.clone())
1551 }
1552 (DataType::UInt16, ScalarType::UInt16) => {
1553 let array = downcast_array::<UInt16Array>(array)?;
1554 DatumColumnDecoder::U16(array.clone())
1555 }
1556 (
1557 DataType::UInt32,
1558 ScalarType::UInt32
1559 | ScalarType::Oid
1560 | ScalarType::RegClass
1561 | ScalarType::RegProc
1562 | ScalarType::RegType,
1563 ) => {
1564 let array = downcast_array::<UInt32Array>(array)?;
1565 DatumColumnDecoder::U32(array.clone())
1566 }
1567 (DataType::UInt64, ScalarType::UInt64) => {
1568 let array = downcast_array::<UInt64Array>(array)?;
1569 DatumColumnDecoder::U64(array.clone())
1570 }
1571 (DataType::Int16, ScalarType::Int16) => {
1572 let array = downcast_array::<Int16Array>(array)?;
1573 DatumColumnDecoder::I16(array.clone())
1574 }
1575 (DataType::Int32, ScalarType::Int32) => {
1576 let array = downcast_array::<Int32Array>(array)?;
1577 DatumColumnDecoder::I32(array.clone())
1578 }
1579 (DataType::Int64, ScalarType::Int64) => {
1580 let array = downcast_array::<Int64Array>(array)?;
1581 DatumColumnDecoder::I64(array.clone())
1582 }
1583 (DataType::Float32, ScalarType::Float32) => {
1584 let array = downcast_array::<Float32Array>(array)?;
1585 DatumColumnDecoder::F32(array.clone())
1586 }
1587 (DataType::Float64, ScalarType::Float64) => {
1588 let array = downcast_array::<Float64Array>(array)?;
1589 DatumColumnDecoder::F64(array.clone())
1590 }
1591 (DataType::Struct(_), ScalarType::Numeric { .. }) => {
1592 let array = downcast_array::<StructArray>(array)?;
1593 let binary_values = array
1596 .column_by_name("binary")
1597 .expect("missing binary column");
1598
1599 let array = downcast_array::<BinaryArray>(binary_values)?;
1600 DatumColumnDecoder::Numeric(array.clone())
1601 }
1602 (
1603 DataType::Utf8,
1604 ScalarType::String
1605 | ScalarType::PgLegacyName
1606 | ScalarType::Char { .. }
1607 | ScalarType::VarChar { .. },
1608 ) => {
1609 let array = downcast_array::<StringArray>(array)?;
1610 DatumColumnDecoder::String(array.clone())
1611 }
1612 (DataType::Binary, ScalarType::Bytes) => {
1613 let array = downcast_array::<BinaryArray>(array)?;
1614 DatumColumnDecoder::Bytes(array.clone())
1615 }
1616 (DataType::Int32, ScalarType::Date) => {
1617 let array = downcast_array::<Int32Array>(array)?;
1618 DatumColumnDecoder::Date(array.clone())
1619 }
1620 (DataType::FixedSizeBinary(TIME_FIXED_BYTES), ScalarType::Time) => {
1621 let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1622 DatumColumnDecoder::Time(array.clone())
1623 }
1624 (DataType::FixedSizeBinary(TIMESTAMP_FIXED_BYTES), ScalarType::Timestamp { .. }) => {
1625 let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1626 DatumColumnDecoder::Timestamp(array.clone())
1627 }
1628 (DataType::FixedSizeBinary(TIMESTAMP_FIXED_BYTES), ScalarType::TimestampTz { .. }) => {
1629 let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1630 DatumColumnDecoder::TimestampTz(array.clone())
1631 }
1632 (DataType::UInt64, ScalarType::MzTimestamp) => {
1633 let array = downcast_array::<UInt64Array>(array)?;
1634 DatumColumnDecoder::MzTimestamp(array.clone())
1635 }
1636 (DataType::FixedSizeBinary(INTERVAL_FIXED_BYTES), ScalarType::Interval) => {
1637 let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1638 DatumColumnDecoder::Interval(array.clone())
1639 }
1640 (DataType::FixedSizeBinary(UUID_FIXED_BYTES), ScalarType::Uuid) => {
1641 let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1642 DatumColumnDecoder::Uuid(array.clone())
1643 }
1644 (DataType::FixedSizeBinary(ACL_ITEM_FIXED_BYTES), ScalarType::AclItem) => {
1645 let array = downcast_array::<FixedSizeBinaryArray>(array)?;
1646 DatumColumnDecoder::AclItem(array.clone())
1647 }
1648 (DataType::Binary, ScalarType::MzAclItem) => {
1649 let array = downcast_array::<BinaryArray>(array)?;
1650 DatumColumnDecoder::MzAclItem(array.clone())
1651 }
1652 (DataType::Binary, ScalarType::Range { .. }) => {
1653 let array = downcast_array::<BinaryArray>(array)?;
1654 DatumColumnDecoder::Range(array.clone())
1655 }
1656 (DataType::Utf8, ScalarType::Jsonb) => {
1657 let array = downcast_array::<StringArray>(array)?;
1658 DatumColumnDecoder::Json(array.clone())
1659 }
1660 (DataType::Struct(_), s @ ScalarType::Array(_) | s @ ScalarType::Int2Vector) => {
1661 let element_type = match s {
1662 ScalarType::Array(inner) => inner,
1663 ScalarType::Int2Vector => &ScalarType::Int16,
1664 _ => unreachable!("checked above"),
1665 };
1666
1667 let array = downcast_array::<StructArray>(array)?;
1668 let nulls = array.nulls().cloned();
1669
1670 let dims = array
1671 .column_by_name("dims")
1672 .expect("missing dimensions column");
1673 let dims = downcast_array::<ListArray>(dims).cloned()?;
1674 let dim_offsets = dims.offsets().clone();
1675 let dims = downcast_array::<FixedSizeBinaryArray>(dims.values()).cloned()?;
1676
1677 let vals = array.column_by_name("vals").expect("missing values column");
1678 let vals = downcast_array::<ListArray>(vals)?;
1679 let val_offsets = vals.offsets().clone();
1680 let vals = array_to_decoder(vals.values(), element_type)?;
1681
1682 DatumColumnDecoder::Array {
1683 dim_offsets,
1684 dims,
1685 val_offsets,
1686 vals: Box::new(vals),
1687 nulls,
1688 }
1689 }
1690 (DataType::List(_), ScalarType::List { element_type, .. }) => {
1691 let array = downcast_array::<ListArray>(array)?;
1692 let inner_decoder = array_to_decoder(array.values(), &*element_type)?;
1693 DatumColumnDecoder::List {
1694 offsets: array.offsets().clone(),
1695 values: Box::new(inner_decoder),
1696 nulls: array.nulls().cloned(),
1697 }
1698 }
1699 (DataType::Map(_, true), ScalarType::Map { value_type, .. }) => {
1700 let array = downcast_array::<MapArray>(array)?;
1701 let keys = downcast_array::<StringArray>(array.keys())?;
1702 let vals = array_to_decoder(array.values(), value_type)?;
1703 DatumColumnDecoder::Map {
1704 offsets: array.offsets().clone(),
1705 keys: keys.clone(),
1706 vals: Box::new(vals),
1707 nulls: array.nulls().cloned(),
1708 }
1709 }
1710 (DataType::List(_), ScalarType::Map { value_type, .. }) => {
1711 let array: &ListArray = downcast_array(array)?;
1712 let entries: &StructArray = downcast_array(array.values())?;
1713 let [keys, values]: &[ArrayRef; 2] = entries.columns().try_into()?;
1714 let keys: &StringArray = downcast_array(keys)?;
1715 let vals: DatumColumnDecoder = array_to_decoder(values, value_type)?;
1716 DatumColumnDecoder::Map {
1717 offsets: array.offsets().clone(),
1718 keys: keys.clone(),
1719 vals: Box::new(vals),
1720 nulls: array.nulls().cloned(),
1721 }
1722 }
1723 (DataType::Boolean, ScalarType::Record { fields, .. }) if fields.is_empty() => {
1724 let empty_record_array = downcast_array::<BooleanArray>(array)?;
1725 DatumColumnDecoder::RecordEmpty(empty_record_array.clone())
1726 }
1727 (DataType::Struct(_), ScalarType::Record { fields, .. }) => {
1728 let record_array = downcast_array::<StructArray>(array)?;
1729 let null_mask = record_array.nulls();
1730 let mut decoders = Vec::with_capacity(fields.len());
1731 for (tag, (_name, col_type)) in fields.iter().enumerate() {
1732 let inner_array = record_array
1733 .column_by_name(&tag.to_string())
1734 .ok_or_else(|| anyhow::anyhow!("no column named '{tag}'"))?;
1735 let inner_array = mask_nulls(inner_array, null_mask);
1736 let inner_decoder = array_to_decoder(&inner_array, &col_type.scalar_type)?;
1737
1738 decoders.push(Box::new(inner_decoder));
1739 }
1740
1741 DatumColumnDecoder::Record {
1742 fields: decoders,
1743 nulls: record_array.nulls().cloned(),
1744 }
1745 }
1746 (x, y) => {
1747 let msg = format!("can't decode column of {x:?} for scalar type {y:?}");
1748 mz_ore::soft_panic_or_log!("{msg}");
1749 anyhow::bail!("{msg}");
1750 }
1751 };
1752
1753 Ok(decoder)
1754}
1755
1756fn scalar_type_to_encoder(col_ty: &ScalarType) -> Result<DatumColumnEncoder, anyhow::Error> {
1758 let encoder = match &col_ty {
1759 ScalarType::Bool => DatumColumnEncoder::Bool(BooleanBuilder::new()),
1760 ScalarType::PgLegacyChar => DatumColumnEncoder::U8(UInt8Builder::new()),
1761 ScalarType::UInt16 => DatumColumnEncoder::U16(UInt16Builder::new()),
1762 ScalarType::UInt32
1763 | ScalarType::Oid
1764 | ScalarType::RegClass
1765 | ScalarType::RegProc
1766 | ScalarType::RegType => DatumColumnEncoder::U32(UInt32Builder::new()),
1767 ScalarType::UInt64 => DatumColumnEncoder::U64(UInt64Builder::new()),
1768 ScalarType::Int16 => DatumColumnEncoder::I16(Int16Builder::new()),
1769 ScalarType::Int32 => DatumColumnEncoder::I32(Int32Builder::new()),
1770 ScalarType::Int64 => DatumColumnEncoder::I64(Int64Builder::new()),
1771 ScalarType::Float32 => DatumColumnEncoder::F32(Float32Builder::new()),
1772 ScalarType::Float64 => DatumColumnEncoder::F64(Float64Builder::new()),
1773 ScalarType::Numeric { .. } => DatumColumnEncoder::Numeric {
1774 approx_values: Float64Builder::new(),
1775 binary_values: BinaryBuilder::new(),
1776 numeric_context: crate::adt::numeric::cx_datum().clone(),
1777 },
1778 ScalarType::String
1779 | ScalarType::PgLegacyName
1780 | ScalarType::Char { .. }
1781 | ScalarType::VarChar { .. } => DatumColumnEncoder::String(StringBuilder::new()),
1782 ScalarType::Bytes => DatumColumnEncoder::Bytes(BinaryBuilder::new()),
1783 ScalarType::Date => DatumColumnEncoder::Date(Int32Builder::new()),
1784 ScalarType::Time => DatumColumnEncoder::Time(FixedSizeBinaryBuilder::new(TIME_FIXED_BYTES)),
1785 ScalarType::Timestamp { .. } => {
1786 DatumColumnEncoder::Timestamp(FixedSizeBinaryBuilder::new(TIMESTAMP_FIXED_BYTES))
1787 }
1788 ScalarType::TimestampTz { .. } => {
1789 DatumColumnEncoder::TimestampTz(FixedSizeBinaryBuilder::new(TIMESTAMP_FIXED_BYTES))
1790 }
1791 ScalarType::MzTimestamp => DatumColumnEncoder::MzTimestamp(UInt64Builder::new()),
1792 ScalarType::Interval => {
1793 DatumColumnEncoder::Interval(FixedSizeBinaryBuilder::new(INTERVAL_FIXED_BYTES))
1794 }
1795 ScalarType::Uuid => DatumColumnEncoder::Uuid(FixedSizeBinaryBuilder::new(UUID_FIXED_BYTES)),
1796 ScalarType::AclItem => {
1797 DatumColumnEncoder::AclItem(FixedSizeBinaryBuilder::new(ACL_ITEM_FIXED_BYTES))
1798 }
1799 ScalarType::MzAclItem => DatumColumnEncoder::MzAclItem(BinaryBuilder::new()),
1800 ScalarType::Range { .. } => DatumColumnEncoder::Range(BinaryBuilder::new()),
1801 ScalarType::Jsonb => DatumColumnEncoder::Jsonb {
1802 offsets: vec![0],
1803 buf: Vec::new(),
1804 nulls: None,
1805 },
1806 s @ ScalarType::Array(_) | s @ ScalarType::Int2Vector => {
1807 let element_type = match s {
1808 ScalarType::Array(inner) => inner,
1809 ScalarType::Int2Vector => &ScalarType::Int16,
1810 _ => unreachable!("checked above"),
1811 };
1812 let inner = scalar_type_to_encoder(element_type)?;
1813 DatumColumnEncoder::Array {
1814 dims: ListBuilder::new(FixedSizeBinaryBuilder::new(ARRAY_DIMENSION_FIXED_BYTES)),
1815 val_lengths: Vec::new(),
1816 vals: Box::new(inner),
1817 nulls: None,
1818 }
1819 }
1820 ScalarType::List { element_type, .. } => {
1821 let inner = scalar_type_to_encoder(&*element_type)?;
1822 DatumColumnEncoder::List {
1823 lengths: Vec::new(),
1824 values: Box::new(inner),
1825 nulls: None,
1826 }
1827 }
1828 ScalarType::Map { value_type, .. } => {
1829 let inner = scalar_type_to_encoder(&*value_type)?;
1830 DatumColumnEncoder::Map {
1831 lengths: Vec::new(),
1832 keys: StringBuilder::new(),
1833 vals: Box::new(inner),
1834 nulls: None,
1835 }
1836 }
1837 ScalarType::Record { fields, .. } if fields.is_empty() => {
1838 DatumColumnEncoder::RecordEmpty(BooleanBuilder::new())
1839 }
1840 ScalarType::Record { fields, .. } => {
1841 let encoders = fields
1842 .iter()
1843 .map(|(_name, ty)| {
1844 scalar_type_to_encoder(&ty.scalar_type).map(|e| DatumEncoder {
1845 nullable: ty.nullable,
1846 encoder: e,
1847 })
1848 })
1849 .collect::<Result<_, _>>()?;
1850
1851 DatumColumnEncoder::Record {
1852 fields: encoders,
1853 nulls: None,
1854 length: 0,
1855 }
1856 }
1857 };
1858 Ok(encoder)
1859}
1860
1861impl Codec for Row {
1862 type Storage = ProtoRow;
1863 type Schema = RelationDesc;
1864
1865 fn codec_name() -> String {
1866 "protobuf[Row]".into()
1867 }
1868
1869 fn encode<B>(&self, buf: &mut B)
1875 where
1876 B: BufMut,
1877 {
1878 self.into_proto()
1879 .encode(buf)
1880 .expect("no required fields means no initialization errors");
1881 }
1882
1883 fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Row, String> {
1889 let mut row = Row::with_capacity(buf.len());
1897 <Self as Codec>::decode_from(&mut row, buf, &mut None, schema)?;
1898 Ok(row)
1899 }
1900
1901 fn decode_from<'a>(
1902 &mut self,
1903 buf: &'a [u8],
1904 storage: &mut Option<ProtoRow>,
1905 schema: &RelationDesc,
1906 ) -> Result<(), String> {
1907 let mut proto = storage.take().unwrap_or_default();
1908 proto.clear();
1909 proto.merge(buf).map_err(|err| err.to_string())?;
1910 let ret = self.decode_from_proto(&proto, schema);
1911 storage.replace(proto);
1912 ret
1913 }
1914
1915 fn validate(row: &Self, desc: &Self::Schema) -> Result<(), String> {
1916 for x in Itertools::zip_longest(desc.iter_types(), row.iter()) {
1917 match x {
1918 EitherOrBoth::Both(typ, datum) if datum.is_instance_of(typ) => continue,
1919 _ => return Err(format!("row {:?} did not match desc {:?}", row, desc)),
1920 };
1921 }
1922 Ok(())
1923 }
1924
1925 fn encode_schema(schema: &Self::Schema) -> Bytes {
1926 schema.into_proto().encode_to_vec().into()
1927 }
1928
1929 fn decode_schema(buf: &Bytes) -> Self::Schema {
1930 let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1931 proto.into_rust().expect("valid schema")
1932 }
1933}
1934
1935impl<'a> From<Datum<'a>> for ProtoDatum {
1936 fn from(x: Datum<'a>) -> Self {
1937 let datum_type = match x {
1938 Datum::False => DatumType::Other(ProtoDatumOther::False.into()),
1939 Datum::True => DatumType::Other(ProtoDatumOther::True.into()),
1940 Datum::Int16(x) => DatumType::Int16(x.into()),
1941 Datum::Int32(x) => DatumType::Int32(x),
1942 Datum::UInt8(x) => DatumType::Uint8(x.into()),
1943 Datum::UInt16(x) => DatumType::Uint16(x.into()),
1944 Datum::UInt32(x) => DatumType::Uint32(x),
1945 Datum::UInt64(x) => DatumType::Uint64(x),
1946 Datum::Int64(x) => DatumType::Int64(x),
1947 Datum::Float32(x) => DatumType::Float32(x.into_inner()),
1948 Datum::Float64(x) => DatumType::Float64(x.into_inner()),
1949 Datum::Date(x) => DatumType::Date(x.into_proto()),
1950 Datum::Time(x) => DatumType::Time(ProtoNaiveTime {
1951 secs: x.num_seconds_from_midnight(),
1952 frac: x.nanosecond(),
1953 }),
1954 Datum::Timestamp(x) => DatumType::Timestamp(x.into_proto()),
1955 Datum::TimestampTz(x) => DatumType::TimestampTz(x.into_proto()),
1956 Datum::Interval(x) => DatumType::Interval(x.into_proto()),
1957 Datum::Bytes(x) => DatumType::Bytes(Bytes::copy_from_slice(x)),
1958 Datum::String(x) => DatumType::String(x.to_owned()),
1959 Datum::Array(x) => DatumType::Array(ProtoArray {
1960 elements: Some(ProtoRow {
1961 datums: x.elements().iter().map(|x| x.into()).collect(),
1962 }),
1963 dims: x
1964 .dims()
1965 .into_iter()
1966 .map(|x| ProtoArrayDimension {
1967 lower_bound: i64::cast_from(x.lower_bound),
1968 length: u64::cast_from(x.length),
1969 })
1970 .collect(),
1971 }),
1972 Datum::List(x) => DatumType::List(ProtoRow {
1973 datums: x.iter().map(|x| x.into()).collect(),
1974 }),
1975 Datum::Map(x) => DatumType::Dict(ProtoDict {
1976 elements: x
1977 .iter()
1978 .map(|(k, v)| ProtoDictElement {
1979 key: k.to_owned(),
1980 val: Some(v.into()),
1981 })
1982 .collect(),
1983 }),
1984 Datum::Numeric(x) => {
1985 let mut x = x.0.clone();
1987 if let Some((bcd, scale)) = x.to_packed_bcd() {
1988 DatumType::Numeric(ProtoNumeric { bcd, scale })
1989 } else if x.is_nan() {
1990 DatumType::Other(ProtoDatumOther::NumericNaN.into())
1991 } else if x.is_infinite() {
1992 if x.is_negative() {
1993 DatumType::Other(ProtoDatumOther::NumericNegInf.into())
1994 } else {
1995 DatumType::Other(ProtoDatumOther::NumericPosInf.into())
1996 }
1997 } else if x.is_special() {
1998 panic!("internal error: unhandled special numeric value: {}", x);
1999 } else {
2000 panic!(
2001 "internal error: to_packed_bcd returned None for non-special value: {}",
2002 x
2003 )
2004 }
2005 }
2006 Datum::JsonNull => DatumType::Other(ProtoDatumOther::JsonNull.into()),
2007 Datum::Uuid(x) => DatumType::Uuid(x.as_bytes().to_vec()),
2008 Datum::MzTimestamp(x) => DatumType::MzTimestamp(x.into()),
2009 Datum::Dummy => DatumType::Other(ProtoDatumOther::Dummy.into()),
2010 Datum::Null => DatumType::Other(ProtoDatumOther::Null.into()),
2011 Datum::Range(super::Range { inner }) => DatumType::Range(Box::new(ProtoRange {
2012 inner: inner.map(|RangeInner { lower, upper }| {
2013 Box::new(ProtoRangeInner {
2014 lower_inclusive: lower.inclusive,
2015 lower: lower.bound.map(|bound| Box::new(bound.datum().into())),
2016 upper_inclusive: upper.inclusive,
2017 upper: upper.bound.map(|bound| Box::new(bound.datum().into())),
2018 })
2019 }),
2020 })),
2021 Datum::MzAclItem(x) => DatumType::MzAclItem(x.into_proto()),
2022 Datum::AclItem(x) => DatumType::AclItem(x.into_proto()),
2023 };
2024 ProtoDatum {
2025 datum_type: Some(datum_type),
2026 }
2027 }
2028}
2029
2030impl RowPacker<'_> {
2031 pub(crate) fn try_push_proto(&mut self, x: &ProtoDatum) -> Result<(), String> {
2032 match &x.datum_type {
2033 Some(DatumType::Other(o)) => match ProtoDatumOther::try_from(*o) {
2034 Ok(ProtoDatumOther::Unknown) => return Err("unknown datum type".into()),
2035 Ok(ProtoDatumOther::Null) => self.push(Datum::Null),
2036 Ok(ProtoDatumOther::False) => self.push(Datum::False),
2037 Ok(ProtoDatumOther::True) => self.push(Datum::True),
2038 Ok(ProtoDatumOther::JsonNull) => self.push(Datum::JsonNull),
2039 Ok(ProtoDatumOther::Dummy) => {
2040 #[cfg(feature = "tracing")]
2044 tracing::error!("protobuf decoding found Dummy datum");
2045 self.push(Datum::Dummy);
2046 }
2047 Ok(ProtoDatumOther::NumericPosInf) => self.push(Datum::from(Numeric::infinity())),
2048 Ok(ProtoDatumOther::NumericNegInf) => self.push(Datum::from(-Numeric::infinity())),
2049 Ok(ProtoDatumOther::NumericNaN) => self.push(Datum::from(Numeric::nan())),
2050 Err(_) => return Err(format!("unknown datum type: {}", o)),
2051 },
2052 Some(DatumType::Int16(x)) => {
2053 let x = i16::try_from(*x)
2054 .map_err(|_| format!("int16 field stored with out of range value: {}", *x))?;
2055 self.push(Datum::Int16(x))
2056 }
2057 Some(DatumType::Int32(x)) => self.push(Datum::Int32(*x)),
2058 Some(DatumType::Int64(x)) => self.push(Datum::Int64(*x)),
2059 Some(DatumType::Uint8(x)) => {
2060 let x = u8::try_from(*x)
2061 .map_err(|_| format!("uint8 field stored with out of range value: {}", *x))?;
2062 self.push(Datum::UInt8(x))
2063 }
2064 Some(DatumType::Uint16(x)) => {
2065 let x = u16::try_from(*x)
2066 .map_err(|_| format!("uint16 field stored with out of range value: {}", *x))?;
2067 self.push(Datum::UInt16(x))
2068 }
2069 Some(DatumType::Uint32(x)) => self.push(Datum::UInt32(*x)),
2070 Some(DatumType::Uint64(x)) => self.push(Datum::UInt64(*x)),
2071 Some(DatumType::Float32(x)) => self.push(Datum::Float32((*x).into())),
2072 Some(DatumType::Float64(x)) => self.push(Datum::Float64((*x).into())),
2073 Some(DatumType::Bytes(x)) => self.push(Datum::Bytes(x)),
2074 Some(DatumType::String(x)) => self.push(Datum::String(x)),
2075 Some(DatumType::Uuid(x)) => {
2076 let u = Uuid::from_slice(x).map_err(|err| err.to_string())?;
2081 self.push(Datum::Uuid(u));
2082 }
2083 Some(DatumType::Date(x)) => self.push(Datum::Date(x.clone().into_rust()?)),
2084 Some(DatumType::Time(x)) => self.push(Datum::Time(x.clone().into_rust()?)),
2085 Some(DatumType::Timestamp(x)) => self.push(Datum::Timestamp(x.clone().into_rust()?)),
2086 Some(DatumType::TimestampTz(x)) => {
2087 self.push(Datum::TimestampTz(x.clone().into_rust()?))
2088 }
2089 Some(DatumType::Interval(x)) => self.push(Datum::Interval(
2090 x.clone()
2091 .into_rust()
2092 .map_err(|e: TryFromProtoError| e.to_string())?,
2093 )),
2094 Some(DatumType::List(x)) => self.push_list_with(|row| -> Result<(), String> {
2095 for d in x.datums.iter() {
2096 row.try_push_proto(d)?;
2097 }
2098 Ok(())
2099 })?,
2100 Some(DatumType::Array(x)) => {
2101 let dims = x
2102 .dims
2103 .iter()
2104 .map(|x| ArrayDimension {
2105 lower_bound: isize::cast_from(x.lower_bound),
2106 length: usize::cast_from(x.length),
2107 })
2108 .collect::<Vec<_>>();
2109 match x.elements.as_ref() {
2110 None => self.try_push_array(&dims, [].iter()),
2111 Some(elements) => {
2112 let elements_row = Row::try_from(elements)?;
2115 self.try_push_array(&dims, elements_row.iter())
2116 }
2117 }
2118 .map_err(|err| err.to_string())?
2119 }
2120 Some(DatumType::Dict(x)) => self.push_dict_with(|row| -> Result<(), String> {
2121 for e in x.elements.iter() {
2122 row.push(Datum::from(e.key.as_str()));
2123 let val = e
2124 .val
2125 .as_ref()
2126 .ok_or_else(|| format!("missing val for key: {}", e.key))?;
2127 row.try_push_proto(val)?;
2128 }
2129 Ok(())
2130 })?,
2131 Some(DatumType::Numeric(x)) => {
2132 let n = Decimal::from_packed_bcd(&x.bcd, x.scale).map_err(|err| err.to_string())?;
2135 self.push(Datum::from(n))
2136 }
2137 Some(DatumType::MzTimestamp(x)) => self.push(Datum::MzTimestamp((*x).into())),
2138 Some(DatumType::Range(inner)) => {
2139 let ProtoRange { inner } = &**inner;
2140 match inner {
2141 None => self.push_range(Range { inner: None }).unwrap(),
2142 Some(inner) => {
2143 let ProtoRangeInner {
2144 lower_inclusive,
2145 lower,
2146 upper_inclusive,
2147 upper,
2148 } = &**inner;
2149
2150 self.push_range_with(
2151 RangeLowerBound {
2152 inclusive: *lower_inclusive,
2153 bound: lower
2154 .as_ref()
2155 .map(|d| |row: &mut RowPacker| row.try_push_proto(&*d)),
2156 },
2157 RangeUpperBound {
2158 inclusive: *upper_inclusive,
2159 bound: upper
2160 .as_ref()
2161 .map(|d| |row: &mut RowPacker| row.try_push_proto(&*d)),
2162 },
2163 )
2164 .expect("decoding ProtoRow must succeed");
2165 }
2166 }
2167 }
2168 Some(DatumType::MzAclItem(x)) => self.push(Datum::MzAclItem(x.clone().into_rust()?)),
2169 Some(DatumType::AclItem(x)) => self.push(Datum::AclItem(x.clone().into_rust()?)),
2170 None => return Err("unknown datum type".into()),
2171 };
2172 Ok(())
2173 }
2174}
2175
2176impl TryFrom<&ProtoRow> for Row {
2178 type Error = String;
2179
2180 fn try_from(x: &ProtoRow) -> Result<Self, Self::Error> {
2181 let mut row = Row::default();
2184 let mut packer = row.packer();
2185 for d in x.datums.iter() {
2186 packer.try_push_proto(d)?;
2187 }
2188 Ok(row)
2189 }
2190}
2191
2192impl RustType<ProtoRow> for Row {
2193 fn into_proto(&self) -> ProtoRow {
2194 let datums = self.iter().map(|x| x.into()).collect();
2195 ProtoRow { datums }
2196 }
2197
2198 fn from_proto(proto: ProtoRow) -> Result<Self, TryFromProtoError> {
2199 let mut row = Row::default();
2202 let mut packer = row.packer();
2203 for d in proto.datums.iter() {
2204 packer
2205 .try_push_proto(d)
2206 .map_err(TryFromProtoError::RowConversionError)?;
2207 }
2208 Ok(row)
2209 }
2210}
2211
2212#[cfg(test)]
2213mod tests {
2214 use std::collections::BTreeSet;
2215
2216 use arrow::array::{ArrayData, make_array};
2217 use arrow::compute::SortOptions;
2218 use arrow::datatypes::ArrowNativeType;
2219 use arrow::row::SortField;
2220 use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
2221 use mz_ore::assert_err;
2222 use mz_ore::collections::CollectionExt;
2223 use mz_persist::indexed::columnar::arrow::realloc_array;
2224 use mz_persist::metrics::ColumnarMetrics;
2225 use mz_persist_types::Codec;
2226 use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
2227 use mz_persist_types::columnar::{codec_to_schema, schema_to_codec};
2228 use mz_proto::{ProtoType, RustType};
2229 use proptest::prelude::*;
2230 use proptest::strategy::Strategy;
2231 use uuid::Uuid;
2232
2233 use super::*;
2234 use crate::adt::array::ArrayDimension;
2235 use crate::adt::interval::Interval;
2236 use crate::adt::numeric::Numeric;
2237 use crate::adt::timestamp::CheckedTimestamp;
2238 use crate::fixed_length::ToDatumIter;
2239 use crate::relation::arb_relation_desc;
2240 use crate::{ColumnName, ColumnType, RowArena, arb_datum_for_column, arb_row_for_relation};
2241 use crate::{Datum, RelationDesc, Row, ScalarType};
2242
2243 #[track_caller]
2244 fn roundtrip_datum<'a>(
2245 ty: ColumnType,
2246 datum: impl Iterator<Item = Datum<'a>>,
2247 metrics: &ColumnarMetrics,
2248 ) {
2249 let desc = RelationDesc::builder().with_column("a", ty).finish();
2250 let rows = datum.map(|d| Row::pack_slice(&[d])).collect();
2251 roundtrip_rows(&desc, rows, metrics)
2252 }
2253
2254 #[track_caller]
2255 fn roundtrip_rows(desc: &RelationDesc, rows: Vec<Row>, metrics: &ColumnarMetrics) {
2256 let mut encoder = <RelationDesc as Schema<Row>>::encoder(desc).unwrap();
2257 for row in &rows {
2258 encoder.append(row);
2259 }
2260 let col = encoder.finish();
2261
2262 let col = realloc_array(&col, metrics);
2264 {
2266 let proto = col.to_data().into_proto();
2267 let bytes = proto.encode_to_vec();
2268 let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
2269 let array_data: ArrayData = proto.into_rust().unwrap();
2270
2271 let col_rnd = StructArray::from(array_data.clone());
2272 assert_eq!(col, col_rnd);
2273
2274 let col_dyn = arrow::array::make_array(array_data);
2275 let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
2276 assert_eq!(&col, col_dyn);
2277 }
2278
2279 let decoder = <RelationDesc as Schema<Row>>::decoder(desc, col.clone()).unwrap();
2280 let stats = decoder.stats();
2281
2282 let arena = RowArena::new();
2284 let (stats, stat_nulls): (Vec<_>, Vec<_>) = desc
2285 .iter()
2286 .map(|(name, ty)| {
2287 let col_stats = stats.cols.get(name.as_str()).unwrap();
2288 let lower_upper =
2289 crate::stats::col_values(&ty.scalar_type, &col_stats.values, &arena);
2290 let null_count = col_stats.nulls.map_or(0, |n| n.count);
2291
2292 (lower_upper, null_count)
2293 })
2294 .unzip();
2295 let mut actual_nulls = vec![0usize; stats.len()];
2297
2298 let mut rnd_row = Row::default();
2299 for (idx, og_row) in rows.iter().enumerate() {
2300 decoder.decode(idx, &mut rnd_row);
2301 assert_eq!(og_row, &rnd_row);
2302
2303 for (c_idx, (rnd_datum, ty)) in rnd_row.iter().zip_eq(desc.typ().columns()).enumerate()
2305 {
2306 let lower_upper = stats[c_idx];
2307
2308 if rnd_datum.is_null() {
2310 actual_nulls[c_idx] += 1;
2311 } else if let Some((lower, upper)) = lower_upper {
2312 assert!(rnd_datum >= lower, "{rnd_datum:?} is not >= {lower:?}");
2313 assert!(rnd_datum <= upper, "{rnd_datum:?} is not <= {upper:?}");
2314 } else {
2315 match &ty.scalar_type {
2316 ScalarType::Jsonb => (),
2318 ScalarType::AclItem
2320 | ScalarType::MzAclItem
2321 | ScalarType::Range { .. }
2322 | ScalarType::Array(_)
2323 | ScalarType::Map { .. }
2324 | ScalarType::List { .. }
2325 | ScalarType::Record { .. }
2326 | ScalarType::Int2Vector => (),
2327 other => panic!("should have collected stats for {other:?}"),
2328 }
2329 }
2330 }
2331 }
2332
2333 for (col_idx, (stats_count, actual_count)) in
2335 stat_nulls.iter().zip_eq(actual_nulls.iter()).enumerate()
2336 {
2337 assert_eq!(
2338 stats_count, actual_count,
2339 "column {col_idx} has incorrect number of nulls!"
2340 );
2341 }
2342
2343 let codec = schema_to_codec::<Row>(desc, &col).unwrap();
2345 let col2 = codec_to_schema::<Row>(desc, &codec).unwrap();
2346 assert_eq!(col2.as_ref(), &col);
2347
2348 let converter = arrow::row::RowConverter::new(vec![SortField::new_with_options(
2350 col.data_type().clone(),
2351 SortOptions {
2352 descending: false,
2353 nulls_first: false,
2354 },
2355 )])
2356 .expect("sortable");
2357 let rows = converter
2358 .convert_columns(&[Arc::new(col.clone())])
2359 .expect("convertible");
2360 let mut row_vec = rows.iter().collect::<Vec<_>>();
2361 row_vec.sort();
2362 let row_col = converter
2363 .convert_rows(row_vec)
2364 .expect("convertible")
2365 .into_element();
2366 assert_eq!(row_col.len(), col.len());
2367
2368 let ord = ArrayOrd::new(&col);
2369 let mut indices = (0..u64::usize_as(col.len())).collect::<Vec<_>>();
2370 indices.sort_by_key(|i| ord.at(i.as_usize()));
2371 let indices = UInt64Array::from(indices);
2372 let ord_col = ::arrow::compute::take(&col, &indices, None).expect("takeable");
2373 assert_eq!(row_col.as_ref(), ord_col.as_ref());
2374
2375 let ordered_prefix_len = desc
2377 .iter()
2378 .take_while(|(_, c)| preserves_order(&c.scalar_type))
2379 .count();
2380 let decoder = <RelationDesc as Schema<Row>>::decoder_any(desc, ord_col.as_ref()).unwrap();
2381 let rows = (0..ord_col.len()).map(|i| {
2382 let mut row = Row::default();
2383 decoder.decode(i, &mut row);
2384 row
2385 });
2386 for (a, b) in rows.tuple_windows() {
2387 let a_prefix = a.iter().take(ordered_prefix_len);
2388 let b_prefix = b.iter().take(ordered_prefix_len);
2389 assert!(
2390 a_prefix.cmp(b_prefix).is_le(),
2391 "ordering should be consistent on preserves_order columns: {:#?}\n{:?}\n{:?}",
2392 desc.iter().take(ordered_prefix_len).collect_vec(),
2393 a.to_datum_iter().take(ordered_prefix_len).collect_vec(),
2394 b.to_datum_iter().take(ordered_prefix_len).collect_vec()
2395 );
2396 }
2397
2398 assert_eq!(
2400 ord.goodbytes(),
2401 (0..col.len()).map(|i| ord.at(i).goodbytes()).sum::<usize>(),
2402 "total size should match the sum of the sizes at each index"
2403 );
2404
2405 if !ord_col.is_empty() {
2407 let min_idx = indices.values()[0].as_usize();
2408 let lower_bound = ArrayBound::new(ord_col, min_idx);
2409 let max_encoded_len = 1000;
2410 if let Some(proto) = lower_bound.to_proto_lower(max_encoded_len) {
2411 assert!(
2412 proto.encoded_len() <= max_encoded_len,
2413 "should respect the max len"
2414 );
2415 let array_data = proto.into_rust().expect("valid array");
2416 let new_lower_bound = ArrayBound::new(make_array(array_data), 0);
2417 assert!(
2418 new_lower_bound.get() <= lower_bound.get(),
2419 "proto-roundtripped bound should be <= the original"
2420 );
2421 }
2422 }
2423 }
2424
2425 #[mz_ore::test]
2426 #[cfg_attr(miri, ignore)] fn proptest_datums() {
2428 let strat = any::<ColumnType>().prop_flat_map(|ty| {
2429 proptest::collection::vec(arb_datum_for_column(ty.clone()), 0..16)
2430 .prop_map(move |d| (ty.clone(), d))
2431 });
2432 let metrics = ColumnarMetrics::disconnected();
2433
2434 proptest!(|((ty, datums) in strat)| {
2435 roundtrip_datum(ty.clone(), datums.iter().map(Datum::from), &metrics);
2436 })
2437 }
2438
2439 #[mz_ore::test]
2440 #[cfg_attr(miri, ignore)] fn proptest_non_empty_relation_descs() {
2442 let strat = arb_relation_desc(1..8).prop_flat_map(|desc| {
2443 proptest::collection::vec(arb_row_for_relation(&desc), 0..12)
2444 .prop_map(move |rows| (desc.clone(), rows))
2445 });
2446 let metrics = ColumnarMetrics::disconnected();
2447
2448 proptest!(|((desc, rows) in strat)| {
2449 roundtrip_rows(&desc, rows, &metrics)
2450 })
2451 }
2452
2453 #[mz_ore::test]
2454 fn empty_relation_desc_returns_error() {
2455 let empty_desc = RelationDesc::empty();
2456 let result = <RelationDesc as Schema<Row>>::encoder(&empty_desc);
2457 assert_err!(result);
2458 }
2459
2460 #[mz_ore::test]
2461 fn smoketest_collections() {
2462 let mut row = Row::default();
2463 let mut packer = row.packer();
2464 let metrics = ColumnarMetrics::disconnected();
2465
2466 packer
2467 .try_push_array(
2468 &[ArrayDimension {
2469 lower_bound: 0,
2470 length: 3,
2471 }],
2472 [Datum::UInt32(4), Datum::UInt32(5), Datum::UInt32(6)],
2473 )
2474 .unwrap();
2475
2476 let array = row.unpack_first();
2477 roundtrip_datum(
2478 ScalarType::Array(Box::new(ScalarType::UInt32)).nullable(true),
2479 [array].into_iter(),
2480 &metrics,
2481 );
2482 }
2483
2484 #[mz_ore::test]
2485 fn smoketest_row() {
2486 let desc = RelationDesc::builder()
2487 .with_column("a", ScalarType::Int64.nullable(true))
2488 .with_column("b", ScalarType::String.nullable(true))
2489 .with_column("c", ScalarType::Bool.nullable(true))
2490 .with_column(
2491 "d",
2492 ScalarType::List {
2493 element_type: Box::new(ScalarType::UInt32),
2494 custom_id: None,
2495 }
2496 .nullable(true),
2497 )
2498 .with_column(
2499 "e",
2500 ScalarType::Map {
2501 value_type: Box::new(ScalarType::Int16),
2502 custom_id: None,
2503 }
2504 .nullable(true),
2505 )
2506 .finish();
2507 let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2508
2509 let mut og_row = Row::default();
2510 {
2511 let mut packer = og_row.packer();
2512 packer.push(Datum::Int64(100));
2513 packer.push(Datum::String("hello world"));
2514 packer.push(Datum::True);
2515 packer.push_list([Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)]);
2516 packer.push_dict([("bar", Datum::Int16(9)), ("foo", Datum::Int16(3))]);
2517 }
2518 let mut og_row_2 = Row::default();
2519 {
2520 let mut packer = og_row_2.packer();
2521 packer.push(Datum::Null);
2522 packer.push(Datum::Null);
2523 packer.push(Datum::Null);
2524 packer.push(Datum::Null);
2525 packer.push(Datum::Null);
2526 }
2527
2528 encoder.append(&og_row);
2529 encoder.append(&og_row_2);
2530 let col = encoder.finish();
2531
2532 let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2533
2534 let mut rnd_row = Row::default();
2535 decoder.decode(0, &mut rnd_row);
2536 assert_eq!(og_row, rnd_row);
2537
2538 let mut rnd_row = Row::default();
2539 decoder.decode(1, &mut rnd_row);
2540 assert_eq!(og_row_2, rnd_row);
2541 }
2542
2543 #[mz_ore::test]
2544 fn test_nested_list() {
2545 let desc = RelationDesc::builder()
2546 .with_column(
2547 "a",
2548 ScalarType::List {
2549 element_type: Box::new(ScalarType::List {
2550 element_type: Box::new(ScalarType::Int64),
2551 custom_id: None,
2552 }),
2553 custom_id: None,
2554 }
2555 .nullable(false),
2556 )
2557 .finish();
2558 let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2559
2560 let mut og_row = Row::default();
2561 {
2562 let mut packer = og_row.packer();
2563 packer.push_list_with(|inner| {
2564 inner.push_list([Datum::Int64(1), Datum::Int64(2)]);
2565 inner.push_list([Datum::Int64(5)]);
2566 inner.push_list([Datum::Int64(9), Datum::Int64(99), Datum::Int64(999)]);
2567 });
2568 }
2569
2570 encoder.append(&og_row);
2571 let col = encoder.finish();
2572
2573 let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2574 let mut rnd_row = Row::default();
2575 decoder.decode(0, &mut rnd_row);
2576
2577 assert_eq!(og_row, rnd_row);
2578 }
2579
2580 #[mz_ore::test]
2581 fn test_record() {
2582 let desc = RelationDesc::builder()
2583 .with_column(
2584 "a",
2585 ScalarType::Record {
2586 fields: [
2587 (ColumnName::from("foo"), ScalarType::Int64.nullable(false)),
2588 (ColumnName::from("bar"), ScalarType::String.nullable(true)),
2589 (
2590 ColumnName::from("baz"),
2591 ScalarType::List {
2592 element_type: Box::new(ScalarType::UInt32),
2593 custom_id: None,
2594 }
2595 .nullable(false),
2596 ),
2597 ]
2598 .into(),
2599 custom_id: None,
2600 }
2601 .nullable(true),
2602 )
2603 .finish();
2604 let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2605
2606 let mut og_row = Row::default();
2607 {
2608 let mut packer = og_row.packer();
2609 packer.push_list_with(|inner| {
2610 inner.push(Datum::Int64(42));
2611 inner.push(Datum::Null);
2612 inner.push_list([Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)]);
2613 });
2614 }
2615 let null_row = Row::pack_slice(&[Datum::Null]);
2616
2617 encoder.append(&og_row);
2618 encoder.append(&null_row);
2619 let col = encoder.finish();
2620
2621 let decoder = <RelationDesc as Schema<Row>>::decoder(&desc, col).unwrap();
2622 let mut rnd_row = Row::default();
2623
2624 decoder.decode(0, &mut rnd_row);
2625 assert_eq!(og_row, rnd_row);
2626
2627 rnd_row.packer();
2628 decoder.decode(1, &mut rnd_row);
2629 assert_eq!(null_row, rnd_row);
2630 }
2631
2632 #[mz_ore::test]
2633 #[cfg_attr(miri, ignore)] fn roundtrip() {
2635 let mut row = Row::default();
2636 let mut packer = row.packer();
2637 packer.extend([
2638 Datum::False,
2639 Datum::True,
2640 Datum::Int16(1),
2641 Datum::Int32(2),
2642 Datum::Int64(3),
2643 Datum::Float32(4f32.into()),
2644 Datum::Float64(5f64.into()),
2645 Datum::Date(
2646 NaiveDate::from_ymd_opt(6, 7, 8)
2647 .unwrap()
2648 .try_into()
2649 .unwrap(),
2650 ),
2651 Datum::Time(NaiveTime::from_hms_opt(9, 10, 11).unwrap()),
2652 Datum::Timestamp(
2653 CheckedTimestamp::from_timestamplike(
2654 NaiveDate::from_ymd_opt(12, 13 % 12, 14)
2655 .unwrap()
2656 .and_time(NaiveTime::from_hms_opt(15, 16, 17).unwrap()),
2657 )
2658 .unwrap(),
2659 ),
2660 Datum::TimestampTz(
2661 CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
2662 NaiveDate::from_ymd_opt(18, 19 % 12, 20)
2663 .unwrap()
2664 .and_time(NaiveTime::from_hms_opt(21, 22, 23).unwrap()),
2665 Utc,
2666 ))
2667 .unwrap(),
2668 ),
2669 Datum::Interval(Interval {
2670 months: 24,
2671 days: 42,
2672 micros: 25,
2673 }),
2674 Datum::Bytes(&[26, 27]),
2675 Datum::String("28"),
2676 Datum::from(Numeric::from(29)),
2677 Datum::from(Numeric::infinity()),
2678 Datum::from(-Numeric::infinity()),
2679 Datum::from(Numeric::nan()),
2680 Datum::JsonNull,
2681 Datum::Uuid(Uuid::from_u128(30)),
2682 Datum::Dummy,
2683 Datum::Null,
2684 ]);
2685 packer
2686 .try_push_array(
2687 &[ArrayDimension {
2688 lower_bound: 2,
2689 length: 2,
2690 }],
2691 vec![Datum::Int32(31), Datum::Int32(32)],
2692 )
2693 .expect("valid array");
2694 packer.push_list_with(|packer| {
2695 packer.push(Datum::String("33"));
2696 packer.push_list_with(|packer| {
2697 packer.push(Datum::String("34"));
2698 packer.push(Datum::String("35"));
2699 });
2700 packer.push(Datum::String("36"));
2701 packer.push(Datum::String("37"));
2702 });
2703 packer.push_dict_with(|row| {
2704 let mut i = 38;
2707 for _ in 0..20 {
2708 row.push(Datum::String(&i.to_string()));
2709 row.push(Datum::Int32(i + 1));
2710 i += 2;
2711 }
2712 });
2713
2714 let mut desc = RelationDesc::builder();
2715 for (idx, _) in row.iter().enumerate() {
2716 desc = desc.with_column(idx.to_string(), ScalarType::Int32.nullable(true));
2719 }
2720 let desc = desc.finish();
2721
2722 let encoded = row.encode_to_vec();
2723 assert_eq!(Row::decode(&encoded, &desc), Ok(row));
2724 }
2725
2726 #[mz_ore::test]
2727 fn smoketest_projection() {
2728 let desc = RelationDesc::builder()
2729 .with_column("a", ScalarType::Int64.nullable(true))
2730 .with_column("b", ScalarType::String.nullable(true))
2731 .with_column("c", ScalarType::Bool.nullable(true))
2732 .finish();
2733 let mut encoder = <RelationDesc as Schema<Row>>::encoder(&desc).unwrap();
2734
2735 let mut og_row = Row::default();
2736 {
2737 let mut packer = og_row.packer();
2738 packer.push(Datum::Int64(100));
2739 packer.push(Datum::String("hello world"));
2740 packer.push(Datum::True);
2741 }
2742 let mut og_row_2 = Row::default();
2743 {
2744 let mut packer = og_row_2.packer();
2745 packer.push(Datum::Null);
2746 packer.push(Datum::Null);
2747 packer.push(Datum::Null);
2748 }
2749
2750 encoder.append(&og_row);
2751 encoder.append(&og_row_2);
2752 let col = encoder.finish();
2753
2754 let projected_desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2755
2756 let decoder = <RelationDesc as Schema<Row>>::decoder(&projected_desc, col).unwrap();
2757
2758 let mut rnd_row = Row::default();
2759 decoder.decode(0, &mut rnd_row);
2760 let expected_row = Row::pack_slice(&[Datum::Int64(100), Datum::True]);
2761 assert_eq!(expected_row, rnd_row);
2762
2763 let mut rnd_row = Row::default();
2764 decoder.decode(1, &mut rnd_row);
2765 let expected_row = Row::pack_slice(&[Datum::Null, Datum::Null]);
2766 assert_eq!(expected_row, rnd_row);
2767 }
2768}