1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::marker::PhantomData;
15
16use arrow::array::{
17 Array, ArrayBuilder, BinaryArray, BinaryBuilder, NullArray, StringArray, StringBuilder,
18 StructArray,
19};
20use bytes::{BufMut, Bytes};
21
22use crate::arrow::ArrayOrd;
23use crate::columnar::{ColumnDecoder, ColumnEncoder, Schema};
24use crate::stats::{ColumnStatKinds, ColumnarStats, NoneStats, StructStats};
25use crate::{Codec, Codec64, ShardId};
26
27const BYTES_CODEC_NAME: &str = "Vec<u8>";
31
32#[derive(Debug, Default, PartialEq)]
34pub struct UnitSchema;
35
36impl Codec for () {
37 type Storage = ();
38 type Schema = UnitSchema;
39
40 fn codec_name() -> String {
41 "()".into()
42 }
43
44 fn encode<B>(&self, _buf: &mut B)
45 where
46 B: BufMut,
47 {
48 }
50
51 fn decode<'a>(buf: &'a [u8], _schema: &UnitSchema) -> Result<Self, String> {
52 if !buf.is_empty() {
53 return Err(format!("decode expected empty buf got {} bytes", buf.len()));
54 }
55 Ok(())
56 }
57
58 fn encode_schema(_schema: &Self::Schema) -> Bytes {
59 Bytes::new()
60 }
61
62 fn decode_schema(buf: &Bytes) -> Self::Schema {
63 assert_eq!(*buf, Bytes::new());
64 UnitSchema
65 }
66}
67
68#[derive(Debug)]
70pub struct UnitColumnar {
71 len: usize,
73}
74
75impl UnitColumnar {
76 pub fn new(len: usize) -> Self {
78 UnitColumnar { len }
79 }
80}
81
82impl ColumnDecoder<()> for UnitColumnar {
83 fn decode(&self, idx: usize, _val: &mut ()) {
84 if idx >= self.len {
85 panic!("index out of bounds, idx: {idx}, len: {}", self.len);
86 }
87 }
88
89 fn is_null(&self, idx: usize) -> bool {
90 if idx < self.len {
91 true
92 } else {
93 panic!("index out of bounds, idx: {idx}, len: {}", self.len);
94 }
95 }
96
97 fn goodbytes(&self) -> usize {
98 0
99 }
100
101 fn stats(&self) -> StructStats {
102 StructStats {
103 len: self.len,
104 cols: BTreeMap::new(),
105 }
106 }
107}
108
109impl ColumnEncoder<()> for UnitColumnar {
110 type FinishedColumn = NullArray;
111
112 fn goodbytes(&self) -> usize {
113 0
114 }
115
116 fn append(&mut self, _val: &()) {
117 self.len += 1;
118 }
119
120 fn append_null(&mut self) {
121 self.len += 1;
122 }
123
124 fn finish(self) -> Self::FinishedColumn {
125 NullArray::new(self.len)
126 }
127}
128
129impl Schema<()> for UnitSchema {
130 type ArrowColumn = NullArray;
131 type Statistics = NoneStats;
132
133 type Decoder = UnitColumnar;
134 type Encoder = UnitColumnar;
135
136 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
137 Ok(UnitColumnar::new(col.len()))
138 }
139
140 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
141 Ok(UnitColumnar::new(0))
142 }
143}
144
145pub trait SimpleColumnarData {
147 type ArrowBuilder: arrow::array::ArrayBuilder + Default;
149 type ArrowColumn: arrow::array::Array + Clone + 'static;
151
152 fn goodbytes(builder: &Self::ArrowBuilder) -> usize;
154
155 fn push(&self, builder: &mut Self::ArrowBuilder);
157 fn push_null(builder: &mut Self::ArrowBuilder);
159
160 fn read(&mut self, idx: usize, column: &Self::ArrowColumn);
162}
163
164impl SimpleColumnarData for String {
165 type ArrowBuilder = StringBuilder;
166 type ArrowColumn = StringArray;
167
168 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
169 builder.values_slice().len()
170 }
171
172 fn push(&self, builder: &mut Self::ArrowBuilder) {
173 builder.append_value(self.as_str())
174 }
175 fn push_null(builder: &mut Self::ArrowBuilder) {
176 builder.append_null()
177 }
178 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
179 self.clear();
180 self.push_str(column.value(idx));
181 }
182}
183
184impl SimpleColumnarData for Vec<u8> {
185 type ArrowBuilder = BinaryBuilder;
186 type ArrowColumn = BinaryArray;
187
188 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
189 builder.values_slice().len()
190 }
191
192 fn push(&self, builder: &mut Self::ArrowBuilder) {
193 builder.append_value(self.as_slice())
194 }
195 fn push_null(builder: &mut Self::ArrowBuilder) {
196 builder.append_null()
197 }
198 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
199 self.clear();
200 self.extend(column.value(idx));
201 }
202}
203
204impl SimpleColumnarData for Bytes {
205 type ArrowBuilder = BinaryBuilder;
206 type ArrowColumn = BinaryArray;
207
208 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
209 builder.values_slice().len()
210 }
211
212 fn push(&self, builder: &mut Self::ArrowBuilder) {
213 builder.append_value(&self)
214 }
215 fn push_null(builder: &mut Self::ArrowBuilder) {
216 builder.append_null()
217 }
218 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
219 *self = Bytes::copy_from_slice(column.value(idx));
220 }
221}
222
223impl SimpleColumnarData for ShardId {
224 type ArrowBuilder = StringBuilder;
225 type ArrowColumn = StringArray;
226
227 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
228 builder.values_slice().len()
229 }
230
231 fn push(&self, builder: &mut Self::ArrowBuilder) {
232 builder.append_value(&self.to_string());
233 }
234 fn push_null(builder: &mut Self::ArrowBuilder) {
235 builder.append_null();
236 }
237 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
238 *self = column.value(idx).parse().expect("should be valid ShardId");
239 }
240}
241
242#[derive(Debug, Default)]
244pub struct SimpleColumnarEncoder<T: SimpleColumnarData>(T::ArrowBuilder);
245
246impl<T: SimpleColumnarData> ColumnEncoder<T> for SimpleColumnarEncoder<T> {
247 type FinishedColumn = T::ArrowColumn;
248
249 fn goodbytes(&self) -> usize {
250 T::goodbytes(&self.0)
251 }
252
253 fn append(&mut self, val: &T) {
254 T::push(val, &mut self.0);
255 }
256 fn append_null(&mut self) {
257 T::push_null(&mut self.0)
258 }
259 fn finish(mut self) -> Self::FinishedColumn {
260 let array = ArrayBuilder::finish(&mut self.0);
261 let array = array
262 .as_any()
263 .downcast_ref::<T::ArrowColumn>()
264 .expect("created using StringBuilder")
265 .clone();
266
267 array
268 }
269}
270
271#[derive(Debug)]
273pub struct SimpleColumnarDecoder<T: SimpleColumnarData>(T::ArrowColumn);
274
275impl<T: SimpleColumnarData> SimpleColumnarDecoder<T> {
276 pub fn new(col: T::ArrowColumn) -> Self {
278 SimpleColumnarDecoder(col)
279 }
280}
281
282impl<T: SimpleColumnarData> ColumnDecoder<T> for SimpleColumnarDecoder<T> {
283 fn decode(&self, idx: usize, val: &mut T) {
284 T::read(val, idx, &self.0)
285 }
286 fn is_null(&self, idx: usize) -> bool {
287 self.0.is_null(idx)
288 }
289 fn goodbytes(&self) -> usize {
290 ArrayOrd::new(&self.0).goodbytes()
291 }
292
293 fn stats(&self) -> StructStats {
294 ColumnarStats::one_column_struct(self.0.len(), ColumnStatKinds::None)
295 }
296}
297
298#[derive(Debug, Clone, Default, PartialEq)]
300pub struct StringSchema;
301
302impl Schema<String> for StringSchema {
303 type ArrowColumn = StringArray;
304 type Statistics = NoneStats;
305
306 type Decoder = SimpleColumnarDecoder<String>;
307 type Encoder = SimpleColumnarEncoder<String>;
308
309 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
310 Ok(SimpleColumnarEncoder::default())
311 }
312
313 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
314 Ok(SimpleColumnarDecoder::new(col))
315 }
316}
317
318impl Codec for String {
319 type Storage = ();
320 type Schema = StringSchema;
321
322 fn codec_name() -> String {
323 "String".into()
324 }
325
326 fn encode<B>(&self, buf: &mut B)
327 where
328 B: BufMut,
329 {
330 buf.put(self.as_bytes())
331 }
332
333 fn decode<'a>(buf: &'a [u8], _schema: &StringSchema) -> Result<Self, String> {
334 String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())
335 }
336
337 fn encode_schema(_schema: &Self::Schema) -> Bytes {
338 Bytes::new()
339 }
340
341 fn decode_schema(buf: &Bytes) -> Self::Schema {
342 assert_eq!(*buf, Bytes::new());
343 StringSchema
344 }
345}
346
347#[derive(Debug, Clone, Default, PartialEq)]
349pub struct VecU8Schema;
350
351impl Schema<Vec<u8>> for VecU8Schema {
352 type ArrowColumn = BinaryArray;
353 type Statistics = NoneStats;
354
355 type Decoder = SimpleColumnarDecoder<Vec<u8>>;
356 type Encoder = SimpleColumnarEncoder<Vec<u8>>;
357
358 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
359 Ok(SimpleColumnarEncoder::default())
360 }
361
362 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
363 Ok(SimpleColumnarDecoder::new(col))
364 }
365}
366
367impl Schema<Bytes> for VecU8Schema {
368 type ArrowColumn = BinaryArray;
369 type Statistics = NoneStats;
370
371 type Decoder = SimpleColumnarDecoder<Bytes>;
372 type Encoder = SimpleColumnarEncoder<Bytes>;
373
374 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
375 Ok(SimpleColumnarEncoder::default())
376 }
377
378 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
379 Ok(SimpleColumnarDecoder::new(col))
380 }
381}
382
383impl Codec for Vec<u8> {
384 type Storage = ();
385 type Schema = VecU8Schema;
386
387 fn codec_name() -> String {
388 BYTES_CODEC_NAME.into()
389 }
390
391 fn encode<B>(&self, buf: &mut B)
392 where
393 B: BufMut,
394 {
395 buf.put(self.as_slice())
396 }
397
398 fn decode<'a>(buf: &'a [u8], _schema: &VecU8Schema) -> Result<Self, String> {
399 Ok(buf.to_owned())
400 }
401
402 fn encode_schema(_schema: &Self::Schema) -> Bytes {
403 Bytes::new()
404 }
405
406 fn decode_schema(buf: &Bytes) -> Self::Schema {
407 assert_eq!(*buf, Bytes::new());
408 VecU8Schema
409 }
410}
411
412impl Codec for Bytes {
413 type Storage = ();
414 type Schema = VecU8Schema;
415
416 fn codec_name() -> String {
417 BYTES_CODEC_NAME.into()
418 }
419
420 fn encode<B>(&self, buf: &mut B)
421 where
422 B: BufMut,
423 {
424 buf.put(self.into_iter().as_slice())
425 }
426
427 fn decode<'a>(buf: &'a [u8], _schema: &VecU8Schema) -> Result<Self, String> {
428 Ok(Bytes::copy_from_slice(buf))
429 }
430
431 fn encode_schema(_schema: &Self::Schema) -> Bytes {
432 Bytes::new()
433 }
434
435 fn decode_schema(buf: &Bytes) -> Self::Schema {
436 assert_eq!(*buf, Bytes::new());
437 VecU8Schema
438 }
439}
440
441impl Codec for ShardId {
442 type Storage = ();
443 type Schema = ShardIdSchema;
444 fn codec_name() -> String {
445 "ShardId".into()
446 }
447 fn encode<B: BufMut>(&self, buf: &mut B) {
448 buf.put(self.to_string().as_bytes())
449 }
450 fn decode<'a>(buf: &'a [u8], _schema: &ShardIdSchema) -> Result<Self, String> {
451 let shard_id = String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())?;
452 shard_id.parse()
453 }
454 fn encode_schema(_schema: &Self::Schema) -> Bytes {
455 Bytes::new()
456 }
457 fn decode_schema(buf: &Bytes) -> Self::Schema {
458 assert_eq!(*buf, Bytes::new());
459 ShardIdSchema
460 }
461}
462
463#[derive(Debug, PartialEq)]
465pub struct ShardIdSchema;
466
467impl Schema<ShardId> for ShardIdSchema {
468 type ArrowColumn = StringArray;
469 type Statistics = NoneStats;
470
471 type Decoder = SimpleColumnarDecoder<ShardId>;
472 type Encoder = SimpleColumnarEncoder<ShardId>;
473
474 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
475 Ok(SimpleColumnarEncoder::default())
476 }
477
478 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
479 Ok(SimpleColumnarDecoder::new(col))
480 }
481}
482
483impl Codec64 for i64 {
484 fn codec_name() -> String {
485 "i64".to_owned()
486 }
487
488 fn encode(&self) -> [u8; 8] {
489 self.to_le_bytes()
490 }
491
492 fn decode(buf: [u8; 8]) -> Self {
493 i64::from_le_bytes(buf)
494 }
495}
496
497impl Codec64 for u64 {
498 fn codec_name() -> String {
499 "u64".to_owned()
500 }
501
502 fn encode(&self) -> [u8; 8] {
503 self.to_le_bytes()
504 }
505
506 fn decode(buf: [u8; 8]) -> Self {
507 u64::from_le_bytes(buf)
508 }
509}
510
511#[derive(Debug)]
513pub struct TodoSchema<T>(PhantomData<T>);
514
515impl<T> Default for TodoSchema<T> {
516 fn default() -> Self {
517 Self(Default::default())
518 }
519}
520
521impl<T> PartialEq for TodoSchema<T> {
522 fn eq(&self, other: &Self) -> bool {
523 self.0 == other.0
524 }
525}
526
527impl<T: Debug + Send + Sync> Schema<T> for TodoSchema<T> {
528 type ArrowColumn = StructArray;
529 type Statistics = NoneStats;
530
531 type Decoder = TodoColumnarDecoder<T>;
532 type Encoder = TodoColumnarEncoder<T>;
533
534 fn decoder(&self, _col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
535 panic!("TODO")
536 }
537
538 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
539 panic!("TODO")
540 }
541}
542
543#[derive(Debug)]
545pub struct TodoColumnarEncoder<T>(PhantomData<T>);
546
547impl<T> ColumnEncoder<T> for TodoColumnarEncoder<T> {
548 type FinishedColumn = StructArray;
549
550 fn goodbytes(&self) -> usize {
551 panic!("TODO")
552 }
553
554 fn append(&mut self, _val: &T) {
555 panic!("TODO")
556 }
557
558 fn append_null(&mut self) {
559 panic!("TODO")
560 }
561
562 fn finish(self) -> Self::FinishedColumn {
563 panic!("TODO")
564 }
565}
566
567#[derive(Debug)]
569pub struct TodoColumnarDecoder<T>(PhantomData<T>);
570
571impl<T> ColumnDecoder<T> for TodoColumnarDecoder<T> {
572 fn decode(&self, _idx: usize, _val: &mut T) {
573 panic!("TODO")
574 }
575
576 fn is_null(&self, _idx: usize) -> bool {
577 panic!("TODO")
578 }
579
580 fn goodbytes(&self) -> usize {
581 panic!("TODO")
582 }
583
584 fn stats(&self) -> StructStats {
585 panic!("TODO")
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use std::str::FromStr;
592
593 use serde::{Deserialize, Serialize};
594 use serde_json::json;
595
596 use super::*;
597
598 #[mz_ore::test]
599 fn fmt_ids() {
600 assert_eq!(
601 format!("{}", ShardId([0u8; 16])),
602 "s00000000-0000-0000-0000-000000000000"
603 );
604 assert_eq!(
605 format!("{:?}", ShardId([0u8; 16])),
606 "ShardId(00000000-0000-0000-0000-000000000000)"
607 );
608
609 assert_eq!(
611 ShardId::from_str("s00000000-0000-0000-0000-000000000000"),
612 Ok(ShardId([0u8; 16]))
613 );
614 assert_eq!(
615 ShardId::from_str("x00000000-0000-0000-0000-000000000000"),
616 Err(
617 "invalid ShardId x00000000-0000-0000-0000-000000000000: incorrect prefix"
618 .to_string()
619 )
620 );
621 assert_eq!(
622 ShardId::from_str("s0"),
623 Err(
624 "invalid ShardId s0: invalid length: expected length 32 for simple format, found 1"
625 .to_string()
626 )
627 );
628 assert_eq!(
629 ShardId::from_str("s00000000-0000-0000-0000-000000000000FOO"),
630 Err("invalid ShardId s00000000-0000-0000-0000-000000000000FOO: invalid character: expected an optional prefix of `urn:uuid:` followed by [0-9a-fA-F-], found `O` at 38".to_string())
631 );
632 }
633
634 #[mz_ore::test]
635 fn shard_id_human_readable_serde() {
636 #[derive(Debug, Serialize, Deserialize)]
637 struct ShardIdContainer {
638 shard_id: ShardId,
639 }
640
641 let id =
643 ShardId::from_str("s00000000-1234-5678-0000-000000000000").expect("valid shard id");
644 assert_eq!(
645 id,
646 serde_json::from_value(serde_json::to_value(id).expect("serializable"))
647 .expect("deserializable")
648 );
649
650 assert_eq!(
652 id,
653 serde_json::from_str("\"s00000000-1234-5678-0000-000000000000\"")
654 .expect("deserializable")
655 );
656
657 let json = json!({ "shard_id": id });
659 assert_eq!(
660 "{\"shard_id\":\"s00000000-1234-5678-0000-000000000000\"}",
661 &json.to_string()
662 );
663 let container: ShardIdContainer = serde_json::from_value(json).expect("deserializable");
664 assert_eq!(container.shard_id, id);
665 }
666}