1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::marker::PhantomData;
15
16use arrow::array::{
17 Array, ArrayBuilder, BinaryArray, BinaryBuilder, NullArray, StringArray, StringBuilder,
18 StructArray,
19};
20use bytes::{BufMut, Bytes};
21use timely::order::Product;
22
23use crate::arrow::ArrayOrd;
24use crate::columnar::{ColumnDecoder, ColumnEncoder, Schema};
25use crate::stats::{ColumnStatKinds, ColumnarStats, NoneStats, StructStats};
26use crate::{Codec, Codec64, Opaque, ShardId};
27
28const BYTES_CODEC_NAME: &str = "Vec<u8>";
32
33#[derive(Debug, Default, PartialEq)]
35pub struct UnitSchema;
36
37impl Codec for () {
38 type Storage = ();
39 type Schema = UnitSchema;
40
41 fn codec_name() -> String {
42 "()".into()
43 }
44
45 fn encode<B>(&self, _buf: &mut B)
46 where
47 B: BufMut,
48 {
49 }
51
52 fn decode<'a>(buf: &'a [u8], _schema: &UnitSchema) -> Result<Self, String> {
53 if !buf.is_empty() {
54 return Err(format!("decode expected empty buf got {} bytes", buf.len()));
55 }
56 Ok(())
57 }
58
59 fn encode_schema(_schema: &Self::Schema) -> Bytes {
60 Bytes::new()
61 }
62
63 fn decode_schema(buf: &Bytes) -> Self::Schema {
64 assert_eq!(*buf, Bytes::new());
65 UnitSchema
66 }
67}
68
69#[derive(Debug)]
71pub struct UnitColumnar {
72 len: usize,
74}
75
76impl UnitColumnar {
77 pub fn new(len: usize) -> Self {
79 UnitColumnar { len }
80 }
81}
82
83impl ColumnDecoder<()> for UnitColumnar {
84 fn decode(&self, idx: usize, _val: &mut ()) {
85 if idx >= self.len {
86 panic!("index out of bounds, idx: {idx}, len: {}", self.len);
87 }
88 }
89
90 fn is_null(&self, idx: usize) -> bool {
91 if idx < self.len {
92 true
93 } else {
94 panic!("index out of bounds, idx: {idx}, len: {}", self.len);
95 }
96 }
97
98 fn goodbytes(&self) -> usize {
99 0
100 }
101
102 fn stats(&self) -> StructStats {
103 StructStats {
104 len: self.len,
105 cols: BTreeMap::new(),
106 }
107 }
108}
109
110impl ColumnEncoder<()> for UnitColumnar {
111 type FinishedColumn = NullArray;
112
113 fn goodbytes(&self) -> usize {
114 0
115 }
116
117 fn append(&mut self, _val: &()) {
118 self.len += 1;
119 }
120
121 fn append_null(&mut self) {
122 self.len += 1;
123 }
124
125 fn finish(self) -> Self::FinishedColumn {
126 NullArray::new(self.len)
127 }
128}
129
130impl Schema<()> for UnitSchema {
131 type ArrowColumn = NullArray;
132 type Statistics = NoneStats;
133
134 type Decoder = UnitColumnar;
135 type Encoder = UnitColumnar;
136
137 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
138 Ok(UnitColumnar::new(col.len()))
139 }
140
141 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
142 Ok(UnitColumnar::new(0))
143 }
144}
145
146pub trait SimpleColumnarData {
148 type ArrowBuilder: arrow::array::ArrayBuilder + Default;
150 type ArrowColumn: arrow::array::Array + Clone + 'static;
152
153 fn goodbytes(builder: &Self::ArrowBuilder) -> usize;
155
156 fn push(&self, builder: &mut Self::ArrowBuilder);
158 fn push_null(builder: &mut Self::ArrowBuilder);
160
161 fn read(&mut self, idx: usize, column: &Self::ArrowColumn);
163}
164
165impl SimpleColumnarData for String {
166 type ArrowBuilder = StringBuilder;
167 type ArrowColumn = StringArray;
168
169 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
170 builder.values_slice().len()
171 }
172
173 fn push(&self, builder: &mut Self::ArrowBuilder) {
174 builder.append_value(self.as_str())
175 }
176 fn push_null(builder: &mut Self::ArrowBuilder) {
177 builder.append_null()
178 }
179 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
180 self.clear();
181 self.push_str(column.value(idx));
182 }
183}
184
185impl SimpleColumnarData for Vec<u8> {
186 type ArrowBuilder = BinaryBuilder;
187 type ArrowColumn = BinaryArray;
188
189 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
190 builder.values_slice().len()
191 }
192
193 fn push(&self, builder: &mut Self::ArrowBuilder) {
194 builder.append_value(self.as_slice())
195 }
196 fn push_null(builder: &mut Self::ArrowBuilder) {
197 builder.append_null()
198 }
199 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
200 self.clear();
201 self.extend(column.value(idx));
202 }
203}
204
205impl SimpleColumnarData for Bytes {
206 type ArrowBuilder = BinaryBuilder;
207 type ArrowColumn = BinaryArray;
208
209 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
210 builder.values_slice().len()
211 }
212
213 fn push(&self, builder: &mut Self::ArrowBuilder) {
214 builder.append_value(&self)
215 }
216 fn push_null(builder: &mut Self::ArrowBuilder) {
217 builder.append_null()
218 }
219 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
220 *self = Bytes::copy_from_slice(column.value(idx));
221 }
222}
223
224impl SimpleColumnarData for ShardId {
225 type ArrowBuilder = StringBuilder;
226 type ArrowColumn = StringArray;
227
228 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
229 builder.values_slice().len()
230 }
231
232 fn push(&self, builder: &mut Self::ArrowBuilder) {
233 builder.append_value(&self.to_string());
234 }
235 fn push_null(builder: &mut Self::ArrowBuilder) {
236 builder.append_null();
237 }
238 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
239 *self = column.value(idx).parse().expect("should be valid ShardId");
240 }
241}
242
243#[derive(Debug, Default)]
245pub struct SimpleColumnarEncoder<T: SimpleColumnarData>(T::ArrowBuilder);
246
247impl<T: SimpleColumnarData> ColumnEncoder<T> for SimpleColumnarEncoder<T> {
248 type FinishedColumn = T::ArrowColumn;
249
250 fn goodbytes(&self) -> usize {
251 T::goodbytes(&self.0)
252 }
253
254 fn append(&mut self, val: &T) {
255 T::push(val, &mut self.0);
256 }
257 fn append_null(&mut self) {
258 T::push_null(&mut self.0)
259 }
260 fn finish(mut self) -> Self::FinishedColumn {
261 let array = ArrayBuilder::finish(&mut self.0);
262 let array = array
263 .as_any()
264 .downcast_ref::<T::ArrowColumn>()
265 .expect("created using StringBuilder")
266 .clone();
267
268 array
269 }
270}
271
272#[derive(Debug)]
274pub struct SimpleColumnarDecoder<T: SimpleColumnarData>(T::ArrowColumn);
275
276impl<T: SimpleColumnarData> SimpleColumnarDecoder<T> {
277 pub fn new(col: T::ArrowColumn) -> Self {
279 SimpleColumnarDecoder(col)
280 }
281}
282
283impl<T: SimpleColumnarData> ColumnDecoder<T> for SimpleColumnarDecoder<T> {
284 fn decode(&self, idx: usize, val: &mut T) {
285 T::read(val, idx, &self.0)
286 }
287 fn is_null(&self, idx: usize) -> bool {
288 self.0.is_null(idx)
289 }
290 fn goodbytes(&self) -> usize {
291 ArrayOrd::new(&self.0).goodbytes()
292 }
293
294 fn stats(&self) -> StructStats {
295 ColumnarStats::one_column_struct(self.0.len(), ColumnStatKinds::None)
296 }
297}
298
299#[derive(Debug, Clone, Default, PartialEq)]
301pub struct StringSchema;
302
303impl Schema<String> for StringSchema {
304 type ArrowColumn = StringArray;
305 type Statistics = NoneStats;
306
307 type Decoder = SimpleColumnarDecoder<String>;
308 type Encoder = SimpleColumnarEncoder<String>;
309
310 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
311 Ok(SimpleColumnarEncoder::default())
312 }
313
314 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
315 Ok(SimpleColumnarDecoder::new(col))
316 }
317}
318
319impl Codec for String {
320 type Storage = ();
321 type Schema = StringSchema;
322
323 fn codec_name() -> String {
324 "String".into()
325 }
326
327 fn encode<B>(&self, buf: &mut B)
328 where
329 B: BufMut,
330 {
331 buf.put(self.as_bytes())
332 }
333
334 fn decode<'a>(buf: &'a [u8], _schema: &StringSchema) -> Result<Self, String> {
335 String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())
336 }
337
338 fn encode_schema(_schema: &Self::Schema) -> Bytes {
339 Bytes::new()
340 }
341
342 fn decode_schema(buf: &Bytes) -> Self::Schema {
343 assert_eq!(*buf, Bytes::new());
344 StringSchema
345 }
346}
347
348#[derive(Debug, Clone, Default, PartialEq)]
350pub struct VecU8Schema;
351
352impl Schema<Vec<u8>> for VecU8Schema {
353 type ArrowColumn = BinaryArray;
354 type Statistics = NoneStats;
355
356 type Decoder = SimpleColumnarDecoder<Vec<u8>>;
357 type Encoder = SimpleColumnarEncoder<Vec<u8>>;
358
359 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
360 Ok(SimpleColumnarEncoder::default())
361 }
362
363 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
364 Ok(SimpleColumnarDecoder::new(col))
365 }
366}
367
368impl Schema<Bytes> for VecU8Schema {
369 type ArrowColumn = BinaryArray;
370 type Statistics = NoneStats;
371
372 type Decoder = SimpleColumnarDecoder<Bytes>;
373 type Encoder = SimpleColumnarEncoder<Bytes>;
374
375 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
376 Ok(SimpleColumnarEncoder::default())
377 }
378
379 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
380 Ok(SimpleColumnarDecoder::new(col))
381 }
382}
383
384impl Codec for Vec<u8> {
385 type Storage = ();
386 type Schema = VecU8Schema;
387
388 fn codec_name() -> String {
389 BYTES_CODEC_NAME.into()
390 }
391
392 fn encode<B>(&self, buf: &mut B)
393 where
394 B: BufMut,
395 {
396 buf.put(self.as_slice())
397 }
398
399 fn decode<'a>(buf: &'a [u8], _schema: &VecU8Schema) -> Result<Self, String> {
400 Ok(buf.to_owned())
401 }
402
403 fn encode_schema(_schema: &Self::Schema) -> Bytes {
404 Bytes::new()
405 }
406
407 fn decode_schema(buf: &Bytes) -> Self::Schema {
408 assert_eq!(*buf, Bytes::new());
409 VecU8Schema
410 }
411}
412
413impl Codec for Bytes {
414 type Storage = ();
415 type Schema = VecU8Schema;
416
417 fn codec_name() -> String {
418 BYTES_CODEC_NAME.into()
419 }
420
421 fn encode<B>(&self, buf: &mut B)
422 where
423 B: BufMut,
424 {
425 buf.put(self.into_iter().as_slice())
426 }
427
428 fn decode<'a>(buf: &'a [u8], _schema: &VecU8Schema) -> Result<Self, String> {
429 Ok(Bytes::copy_from_slice(buf))
430 }
431
432 fn encode_schema(_schema: &Self::Schema) -> Bytes {
433 Bytes::new()
434 }
435
436 fn decode_schema(buf: &Bytes) -> Self::Schema {
437 assert_eq!(*buf, Bytes::new());
438 VecU8Schema
439 }
440}
441
442impl Codec for ShardId {
443 type Storage = ();
444 type Schema = ShardIdSchema;
445 fn codec_name() -> String {
446 "ShardId".into()
447 }
448 fn encode<B: BufMut>(&self, buf: &mut B) {
449 buf.put(self.to_string().as_bytes())
450 }
451 fn decode<'a>(buf: &'a [u8], _schema: &ShardIdSchema) -> Result<Self, String> {
452 let shard_id = String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())?;
453 shard_id.parse()
454 }
455 fn encode_schema(_schema: &Self::Schema) -> Bytes {
456 Bytes::new()
457 }
458 fn decode_schema(buf: &Bytes) -> Self::Schema {
459 assert_eq!(*buf, Bytes::new());
460 ShardIdSchema
461 }
462}
463
464#[derive(Debug, PartialEq)]
466pub struct ShardIdSchema;
467
468impl Schema<ShardId> for ShardIdSchema {
469 type ArrowColumn = StringArray;
470 type Statistics = NoneStats;
471
472 type Decoder = SimpleColumnarDecoder<ShardId>;
473 type Encoder = SimpleColumnarEncoder<ShardId>;
474
475 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
476 Ok(SimpleColumnarEncoder::default())
477 }
478
479 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
480 Ok(SimpleColumnarDecoder::new(col))
481 }
482}
483
484impl Codec64 for i64 {
485 fn codec_name() -> String {
486 "i64".to_owned()
487 }
488
489 fn encode(&self) -> [u8; 8] {
490 self.to_le_bytes()
491 }
492
493 fn decode(buf: [u8; 8]) -> Self {
494 i64::from_le_bytes(buf)
495 }
496}
497
498impl Codec64 for u64 {
499 fn codec_name() -> String {
500 "u64".to_owned()
501 }
502
503 fn encode(&self) -> [u8; 8] {
504 self.to_le_bytes()
505 }
506
507 fn decode(buf: [u8; 8]) -> Self {
508 u64::from_le_bytes(buf)
509 }
510}
511
512impl Opaque for u64 {
513 fn initial() -> Self {
514 u64::MIN
515 }
516}
517
518impl Codec64 for Product<u32, u32> {
519 fn codec_name() -> String {
520 "Product<u32, u32>".to_owned()
521 }
522
523 fn encode(&self) -> [u8; 8] {
524 let o = self.outer.to_le_bytes();
525 let i = self.inner.to_le_bytes();
526 [o[0], o[1], o[2], o[3], i[0], i[1], i[2], i[3]]
527 }
528
529 fn decode(buf: [u8; 8]) -> Self {
530 let outer = [buf[0], buf[1], buf[2], buf[3]];
531 let inner = [buf[4], buf[5], buf[6], buf[7]];
532 Product::new(u32::from_le_bytes(outer), u32::from_le_bytes(inner))
533 }
534}
535
536impl Opaque for i64 {
539 fn initial() -> Self {
540 i64::MIN
541 }
542}
543
544#[derive(Debug)]
546pub struct TodoSchema<T>(PhantomData<T>);
547
548impl<T> Default for TodoSchema<T> {
549 fn default() -> Self {
550 Self(Default::default())
551 }
552}
553
554impl<T> PartialEq for TodoSchema<T> {
555 fn eq(&self, other: &Self) -> bool {
556 self.0 == other.0
557 }
558}
559
560impl<T: Debug + Send + Sync> Schema<T> for TodoSchema<T> {
561 type ArrowColumn = StructArray;
562 type Statistics = NoneStats;
563
564 type Decoder = TodoColumnarDecoder<T>;
565 type Encoder = TodoColumnarEncoder<T>;
566
567 fn decoder(&self, _col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
568 panic!("TODO")
569 }
570
571 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
572 panic!("TODO")
573 }
574}
575
576#[derive(Debug)]
578pub struct TodoColumnarEncoder<T>(PhantomData<T>);
579
580impl<T> ColumnEncoder<T> for TodoColumnarEncoder<T> {
581 type FinishedColumn = StructArray;
582
583 fn goodbytes(&self) -> usize {
584 panic!("TODO")
585 }
586
587 fn append(&mut self, _val: &T) {
588 panic!("TODO")
589 }
590
591 fn append_null(&mut self) {
592 panic!("TODO")
593 }
594
595 fn finish(self) -> Self::FinishedColumn {
596 panic!("TODO")
597 }
598}
599
600#[derive(Debug)]
602pub struct TodoColumnarDecoder<T>(PhantomData<T>);
603
604impl<T> ColumnDecoder<T> for TodoColumnarDecoder<T> {
605 fn decode(&self, _idx: usize, _val: &mut T) {
606 panic!("TODO")
607 }
608
609 fn is_null(&self, _idx: usize) -> bool {
610 panic!("TODO")
611 }
612
613 fn goodbytes(&self) -> usize {
614 panic!("TODO")
615 }
616
617 fn stats(&self) -> StructStats {
618 panic!("TODO")
619 }
620}
621
622#[cfg(test)]
623mod tests {
624 use std::str::FromStr;
625
626 use serde::{Deserialize, Serialize};
627 use serde_json::json;
628
629 use super::*;
630
631 #[mz_ore::test]
632 fn fmt_ids() {
633 assert_eq!(
634 format!("{}", ShardId([0u8; 16])),
635 "s00000000-0000-0000-0000-000000000000"
636 );
637 assert_eq!(
638 format!("{:?}", ShardId([0u8; 16])),
639 "ShardId(00000000-0000-0000-0000-000000000000)"
640 );
641
642 assert_eq!(
644 ShardId::from_str("s00000000-0000-0000-0000-000000000000"),
645 Ok(ShardId([0u8; 16]))
646 );
647 assert_eq!(
648 ShardId::from_str("x00000000-0000-0000-0000-000000000000"),
649 Err(
650 "invalid ShardId x00000000-0000-0000-0000-000000000000: incorrect prefix"
651 .to_string()
652 )
653 );
654 assert_eq!(
655 ShardId::from_str("s0"),
656 Err(
657 "invalid ShardId s0: invalid length: expected length 32 for simple format, found 1"
658 .to_string()
659 )
660 );
661 assert_eq!(
662 ShardId::from_str("s00000000-0000-0000-0000-000000000000FOO"),
663 Err("invalid ShardId s00000000-0000-0000-0000-000000000000FOO: invalid character: expected an optional prefix of `urn:uuid:` followed by [0-9a-fA-F-], found `O` at 38".to_string())
664 );
665 }
666
667 #[mz_ore::test]
668 fn shard_id_human_readable_serde() {
669 #[derive(Debug, Serialize, Deserialize)]
670 struct ShardIdContainer {
671 shard_id: ShardId,
672 }
673
674 let id =
676 ShardId::from_str("s00000000-1234-5678-0000-000000000000").expect("valid shard id");
677 assert_eq!(
678 id,
679 serde_json::from_value(serde_json::to_value(id).expect("serializable"))
680 .expect("deserializable")
681 );
682
683 assert_eq!(
685 id,
686 serde_json::from_str("\"s00000000-1234-5678-0000-000000000000\"")
687 .expect("deserializable")
688 );
689
690 let json = json!({ "shard_id": id });
692 assert_eq!(
693 "{\"shard_id\":\"s00000000-1234-5678-0000-000000000000\"}",
694 &json.to_string()
695 );
696 let container: ShardIdContainer = serde_json::from_value(json).expect("deserializable");
697 assert_eq!(container.shard_id, id);
698 }
699}