1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::marker::PhantomData;
15
16use arrow::array::{
17 Array, ArrayBuilder, BinaryArray, BinaryBuilder, NullArray, StringArray, StringBuilder,
18 StructArray,
19};
20use bytes::{BufMut, Bytes};
21
22use crate::arrow::ArrayOrd;
23use crate::columnar::{ColumnDecoder, ColumnEncoder, Schema};
24use crate::stats::{ColumnStatKinds, ColumnarStats, NoneStats, StructStats};
25use crate::{Codec, Codec64, Opaque, ShardId};
26
27const BYTES_CODEC_NAME: &str = "Vec<u8>";
31
32#[derive(Debug, Default, PartialEq)]
34pub struct UnitSchema;
35
36impl Codec for () {
37 type Storage = ();
38 type Schema = UnitSchema;
39
40 fn codec_name() -> String {
41 "()".into()
42 }
43
44 fn encode<B>(&self, _buf: &mut B)
45 where
46 B: BufMut,
47 {
48 }
50
51 fn decode<'a>(buf: &'a [u8], _schema: &UnitSchema) -> Result<Self, String> {
52 if !buf.is_empty() {
53 return Err(format!("decode expected empty buf got {} bytes", buf.len()));
54 }
55 Ok(())
56 }
57
58 fn encode_schema(_schema: &Self::Schema) -> Bytes {
59 Bytes::new()
60 }
61
62 fn decode_schema(buf: &Bytes) -> Self::Schema {
63 assert_eq!(*buf, Bytes::new());
64 UnitSchema
65 }
66}
67
68#[derive(Debug)]
70pub struct UnitColumnar {
71 len: usize,
73}
74
75impl UnitColumnar {
76 pub fn new(len: usize) -> Self {
78 UnitColumnar { len }
79 }
80}
81
82impl ColumnDecoder<()> for UnitColumnar {
83 fn decode(&self, idx: usize, _val: &mut ()) {
84 if idx >= self.len {
85 panic!("index out of bounds, idx: {idx}, len: {}", self.len);
86 }
87 }
88
89 fn is_null(&self, idx: usize) -> bool {
90 if idx < self.len {
91 true
92 } else {
93 panic!("index out of bounds, idx: {idx}, len: {}", self.len);
94 }
95 }
96
97 fn goodbytes(&self) -> usize {
98 0
99 }
100
101 fn stats(&self) -> StructStats {
102 StructStats {
103 len: self.len,
104 cols: BTreeMap::new(),
105 }
106 }
107}
108
109impl ColumnEncoder<()> for UnitColumnar {
110 type FinishedColumn = NullArray;
111
112 fn goodbytes(&self) -> usize {
113 0
114 }
115
116 fn append(&mut self, _val: &()) {
117 self.len += 1;
118 }
119
120 fn append_null(&mut self) {
121 self.len += 1;
122 }
123
124 fn finish(self) -> Self::FinishedColumn {
125 NullArray::new(self.len)
126 }
127}
128
129impl Schema<()> for UnitSchema {
130 type ArrowColumn = NullArray;
131 type Statistics = NoneStats;
132
133 type Decoder = UnitColumnar;
134 type Encoder = UnitColumnar;
135
136 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
137 Ok(UnitColumnar::new(col.len()))
138 }
139
140 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
141 Ok(UnitColumnar::new(0))
142 }
143}
144
145pub trait SimpleColumnarData {
147 type ArrowBuilder: arrow::array::ArrayBuilder + Default;
149 type ArrowColumn: arrow::array::Array + Clone + 'static;
151
152 fn goodbytes(builder: &Self::ArrowBuilder) -> usize;
154
155 fn push(&self, builder: &mut Self::ArrowBuilder);
157 fn push_null(builder: &mut Self::ArrowBuilder);
159
160 fn read(&mut self, idx: usize, column: &Self::ArrowColumn);
162}
163
164impl SimpleColumnarData for String {
165 type ArrowBuilder = StringBuilder;
166 type ArrowColumn = StringArray;
167
168 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
169 builder.values_slice().len()
170 }
171
172 fn push(&self, builder: &mut Self::ArrowBuilder) {
173 builder.append_value(self.as_str())
174 }
175 fn push_null(builder: &mut Self::ArrowBuilder) {
176 builder.append_null()
177 }
178 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
179 self.clear();
180 self.push_str(column.value(idx));
181 }
182}
183
184impl SimpleColumnarData for Vec<u8> {
185 type ArrowBuilder = BinaryBuilder;
186 type ArrowColumn = BinaryArray;
187
188 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
189 builder.values_slice().len()
190 }
191
192 fn push(&self, builder: &mut Self::ArrowBuilder) {
193 builder.append_value(self.as_slice())
194 }
195 fn push_null(builder: &mut Self::ArrowBuilder) {
196 builder.append_null()
197 }
198 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
199 self.clear();
200 self.extend(column.value(idx));
201 }
202}
203
204impl SimpleColumnarData for Bytes {
205 type ArrowBuilder = BinaryBuilder;
206 type ArrowColumn = BinaryArray;
207
208 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
209 builder.values_slice().len()
210 }
211
212 fn push(&self, builder: &mut Self::ArrowBuilder) {
213 builder.append_value(&self)
214 }
215 fn push_null(builder: &mut Self::ArrowBuilder) {
216 builder.append_null()
217 }
218 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
219 *self = Bytes::copy_from_slice(column.value(idx));
220 }
221}
222
223impl SimpleColumnarData for ShardId {
224 type ArrowBuilder = StringBuilder;
225 type ArrowColumn = StringArray;
226
227 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
228 builder.values_slice().len()
229 }
230
231 fn push(&self, builder: &mut Self::ArrowBuilder) {
232 builder.append_value(&self.to_string());
233 }
234 fn push_null(builder: &mut Self::ArrowBuilder) {
235 builder.append_null();
236 }
237 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
238 *self = column.value(idx).parse().expect("should be valid ShardId");
239 }
240}
241
242#[derive(Debug, Default)]
244pub struct SimpleColumnarEncoder<T: SimpleColumnarData>(T::ArrowBuilder);
245
246impl<T: SimpleColumnarData> ColumnEncoder<T> for SimpleColumnarEncoder<T> {
247 type FinishedColumn = T::ArrowColumn;
248
249 fn goodbytes(&self) -> usize {
250 T::goodbytes(&self.0)
251 }
252
253 fn append(&mut self, val: &T) {
254 T::push(val, &mut self.0);
255 }
256 fn append_null(&mut self) {
257 T::push_null(&mut self.0)
258 }
259 fn finish(mut self) -> Self::FinishedColumn {
260 let array = ArrayBuilder::finish(&mut self.0);
261 let array = array
262 .as_any()
263 .downcast_ref::<T::ArrowColumn>()
264 .expect("created using StringBuilder")
265 .clone();
266
267 array
268 }
269}
270
271#[derive(Debug)]
273pub struct SimpleColumnarDecoder<T: SimpleColumnarData>(T::ArrowColumn);
274
275impl<T: SimpleColumnarData> SimpleColumnarDecoder<T> {
276 pub fn new(col: T::ArrowColumn) -> Self {
278 SimpleColumnarDecoder(col)
279 }
280}
281
282impl<T: SimpleColumnarData> ColumnDecoder<T> for SimpleColumnarDecoder<T> {
283 fn decode(&self, idx: usize, val: &mut T) {
284 T::read(val, idx, &self.0)
285 }
286 fn is_null(&self, idx: usize) -> bool {
287 self.0.is_null(idx)
288 }
289 fn goodbytes(&self) -> usize {
290 ArrayOrd::new(&self.0).goodbytes()
291 }
292
293 fn stats(&self) -> StructStats {
294 ColumnarStats::one_column_struct(self.0.len(), ColumnStatKinds::None)
295 }
296}
297
298#[derive(Debug, Clone, Default, PartialEq)]
300pub struct StringSchema;
301
302impl Schema<String> for StringSchema {
303 type ArrowColumn = StringArray;
304 type Statistics = NoneStats;
305
306 type Decoder = SimpleColumnarDecoder<String>;
307 type Encoder = SimpleColumnarEncoder<String>;
308
309 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
310 Ok(SimpleColumnarEncoder::default())
311 }
312
313 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
314 Ok(SimpleColumnarDecoder::new(col))
315 }
316}
317
318impl Codec for String {
319 type Storage = ();
320 type Schema = StringSchema;
321
322 fn codec_name() -> String {
323 "String".into()
324 }
325
326 fn encode<B>(&self, buf: &mut B)
327 where
328 B: BufMut,
329 {
330 buf.put(self.as_bytes())
331 }
332
333 fn decode<'a>(buf: &'a [u8], _schema: &StringSchema) -> Result<Self, String> {
334 String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())
335 }
336
337 fn encode_schema(_schema: &Self::Schema) -> Bytes {
338 Bytes::new()
339 }
340
341 fn decode_schema(buf: &Bytes) -> Self::Schema {
342 assert_eq!(*buf, Bytes::new());
343 StringSchema
344 }
345}
346
347#[derive(Debug, Clone, Default, PartialEq)]
349pub struct VecU8Schema;
350
351impl Schema<Vec<u8>> for VecU8Schema {
352 type ArrowColumn = BinaryArray;
353 type Statistics = NoneStats;
354
355 type Decoder = SimpleColumnarDecoder<Vec<u8>>;
356 type Encoder = SimpleColumnarEncoder<Vec<u8>>;
357
358 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
359 Ok(SimpleColumnarEncoder::default())
360 }
361
362 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
363 Ok(SimpleColumnarDecoder::new(col))
364 }
365}
366
367impl Schema<Bytes> for VecU8Schema {
368 type ArrowColumn = BinaryArray;
369 type Statistics = NoneStats;
370
371 type Decoder = SimpleColumnarDecoder<Bytes>;
372 type Encoder = SimpleColumnarEncoder<Bytes>;
373
374 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
375 Ok(SimpleColumnarEncoder::default())
376 }
377
378 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
379 Ok(SimpleColumnarDecoder::new(col))
380 }
381}
382
383impl Codec for Vec<u8> {
384 type Storage = ();
385 type Schema = VecU8Schema;
386
387 fn codec_name() -> String {
388 BYTES_CODEC_NAME.into()
389 }
390
391 fn encode<B>(&self, buf: &mut B)
392 where
393 B: BufMut,
394 {
395 buf.put(self.as_slice())
396 }
397
398 fn decode<'a>(buf: &'a [u8], _schema: &VecU8Schema) -> Result<Self, String> {
399 Ok(buf.to_owned())
400 }
401
402 fn encode_schema(_schema: &Self::Schema) -> Bytes {
403 Bytes::new()
404 }
405
406 fn decode_schema(buf: &Bytes) -> Self::Schema {
407 assert_eq!(*buf, Bytes::new());
408 VecU8Schema
409 }
410}
411
412impl Codec for Bytes {
413 type Storage = ();
414 type Schema = VecU8Schema;
415
416 fn codec_name() -> String {
417 BYTES_CODEC_NAME.into()
418 }
419
420 fn encode<B>(&self, buf: &mut B)
421 where
422 B: BufMut,
423 {
424 buf.put(self.into_iter().as_slice())
425 }
426
427 fn decode<'a>(buf: &'a [u8], _schema: &VecU8Schema) -> Result<Self, String> {
428 Ok(Bytes::copy_from_slice(buf))
429 }
430
431 fn encode_schema(_schema: &Self::Schema) -> Bytes {
432 Bytes::new()
433 }
434
435 fn decode_schema(buf: &Bytes) -> Self::Schema {
436 assert_eq!(*buf, Bytes::new());
437 VecU8Schema
438 }
439}
440
441impl Codec for ShardId {
442 type Storage = ();
443 type Schema = ShardIdSchema;
444 fn codec_name() -> String {
445 "ShardId".into()
446 }
447 fn encode<B: BufMut>(&self, buf: &mut B) {
448 buf.put(self.to_string().as_bytes())
449 }
450 fn decode<'a>(buf: &'a [u8], _schema: &ShardIdSchema) -> Result<Self, String> {
451 let shard_id = String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())?;
452 shard_id.parse()
453 }
454 fn encode_schema(_schema: &Self::Schema) -> Bytes {
455 Bytes::new()
456 }
457 fn decode_schema(buf: &Bytes) -> Self::Schema {
458 assert_eq!(*buf, Bytes::new());
459 ShardIdSchema
460 }
461}
462
463#[derive(Debug, PartialEq)]
465pub struct ShardIdSchema;
466
467impl Schema<ShardId> for ShardIdSchema {
468 type ArrowColumn = StringArray;
469 type Statistics = NoneStats;
470
471 type Decoder = SimpleColumnarDecoder<ShardId>;
472 type Encoder = SimpleColumnarEncoder<ShardId>;
473
474 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
475 Ok(SimpleColumnarEncoder::default())
476 }
477
478 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
479 Ok(SimpleColumnarDecoder::new(col))
480 }
481}
482
483impl Codec64 for i64 {
484 fn codec_name() -> String {
485 "i64".to_owned()
486 }
487
488 fn encode(&self) -> [u8; 8] {
489 self.to_le_bytes()
490 }
491
492 fn decode(buf: [u8; 8]) -> Self {
493 i64::from_le_bytes(buf)
494 }
495}
496
497impl Codec64 for u64 {
498 fn codec_name() -> String {
499 "u64".to_owned()
500 }
501
502 fn encode(&self) -> [u8; 8] {
503 self.to_le_bytes()
504 }
505
506 fn decode(buf: [u8; 8]) -> Self {
507 u64::from_le_bytes(buf)
508 }
509}
510
511impl Opaque for u64 {
512 fn initial() -> Self {
513 u64::MIN
514 }
515}
516
517impl Opaque for i64 {
520 fn initial() -> Self {
521 i64::MIN
522 }
523}
524
525#[derive(Debug)]
527pub struct TodoSchema<T>(PhantomData<T>);
528
529impl<T> Default for TodoSchema<T> {
530 fn default() -> Self {
531 Self(Default::default())
532 }
533}
534
535impl<T> PartialEq for TodoSchema<T> {
536 fn eq(&self, other: &Self) -> bool {
537 self.0 == other.0
538 }
539}
540
541impl<T: Debug + Send + Sync> Schema<T> for TodoSchema<T> {
542 type ArrowColumn = StructArray;
543 type Statistics = NoneStats;
544
545 type Decoder = TodoColumnarDecoder<T>;
546 type Encoder = TodoColumnarEncoder<T>;
547
548 fn decoder(&self, _col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
549 panic!("TODO")
550 }
551
552 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
553 panic!("TODO")
554 }
555}
556
557#[derive(Debug)]
559pub struct TodoColumnarEncoder<T>(PhantomData<T>);
560
561impl<T> ColumnEncoder<T> for TodoColumnarEncoder<T> {
562 type FinishedColumn = StructArray;
563
564 fn goodbytes(&self) -> usize {
565 panic!("TODO")
566 }
567
568 fn append(&mut self, _val: &T) {
569 panic!("TODO")
570 }
571
572 fn append_null(&mut self) {
573 panic!("TODO")
574 }
575
576 fn finish(self) -> Self::FinishedColumn {
577 panic!("TODO")
578 }
579}
580
581#[derive(Debug)]
583pub struct TodoColumnarDecoder<T>(PhantomData<T>);
584
585impl<T> ColumnDecoder<T> for TodoColumnarDecoder<T> {
586 fn decode(&self, _idx: usize, _val: &mut T) {
587 panic!("TODO")
588 }
589
590 fn is_null(&self, _idx: usize) -> bool {
591 panic!("TODO")
592 }
593
594 fn goodbytes(&self) -> usize {
595 panic!("TODO")
596 }
597
598 fn stats(&self) -> StructStats {
599 panic!("TODO")
600 }
601}
602
603#[cfg(test)]
604mod tests {
605 use std::str::FromStr;
606
607 use serde::{Deserialize, Serialize};
608 use serde_json::json;
609
610 use super::*;
611
612 #[mz_ore::test]
613 fn fmt_ids() {
614 assert_eq!(
615 format!("{}", ShardId([0u8; 16])),
616 "s00000000-0000-0000-0000-000000000000"
617 );
618 assert_eq!(
619 format!("{:?}", ShardId([0u8; 16])),
620 "ShardId(00000000-0000-0000-0000-000000000000)"
621 );
622
623 assert_eq!(
625 ShardId::from_str("s00000000-0000-0000-0000-000000000000"),
626 Ok(ShardId([0u8; 16]))
627 );
628 assert_eq!(
629 ShardId::from_str("x00000000-0000-0000-0000-000000000000"),
630 Err(
631 "invalid ShardId x00000000-0000-0000-0000-000000000000: incorrect prefix"
632 .to_string()
633 )
634 );
635 assert_eq!(
636 ShardId::from_str("s0"),
637 Err(
638 "invalid ShardId s0: invalid length: expected length 32 for simple format, found 1"
639 .to_string()
640 )
641 );
642 assert_eq!(
643 ShardId::from_str("s00000000-0000-0000-0000-000000000000FOO"),
644 Err("invalid ShardId s00000000-0000-0000-0000-000000000000FOO: invalid character: expected an optional prefix of `urn:uuid:` followed by [0-9a-fA-F-], found `O` at 38".to_string())
645 );
646 }
647
648 #[mz_ore::test]
649 fn shard_id_human_readable_serde() {
650 #[derive(Debug, Serialize, Deserialize)]
651 struct ShardIdContainer {
652 shard_id: ShardId,
653 }
654
655 let id =
657 ShardId::from_str("s00000000-1234-5678-0000-000000000000").expect("valid shard id");
658 assert_eq!(
659 id,
660 serde_json::from_value(serde_json::to_value(id).expect("serializable"))
661 .expect("deserializable")
662 );
663
664 assert_eq!(
666 id,
667 serde_json::from_str("\"s00000000-1234-5678-0000-000000000000\"")
668 .expect("deserializable")
669 );
670
671 let json = json!({ "shard_id": id });
673 assert_eq!(
674 "{\"shard_id\":\"s00000000-1234-5678-0000-000000000000\"}",
675 &json.to_string()
676 );
677 let container: ShardIdContainer = serde_json::from_value(json).expect("deserializable");
678 assert_eq!(container.shard_id, id);
679 }
680}