mz_persist_types/
codec_impls.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementations of [Codec] for stdlib types.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::marker::PhantomData;
15
16use arrow::array::{
17    Array, ArrayBuilder, BinaryArray, BinaryBuilder, NullArray, StringArray, StringBuilder,
18    StructArray,
19};
20use bytes::{BufMut, Bytes};
21
22use crate::arrow::ArrayOrd;
23use crate::columnar::{ColumnDecoder, ColumnEncoder, Schema};
24use crate::stats::{ColumnStatKinds, ColumnarStats, NoneStats, StructStats};
25use crate::{Codec, Codec64, Opaque, ShardId};
26
27/// All codecs that are compatible with a blob of bytes use that same name. This allows us to
28/// switch between the codecs without any incompatibility errors. The name is chosen for historical
29/// reasons.
30const BYTES_CODEC_NAME: &str = "Vec<u8>";
31
32/// An implementation of [Schema] for [()].
33#[derive(Debug, Default, PartialEq)]
34pub struct UnitSchema;
35
36impl Codec for () {
37    type Storage = ();
38    type Schema = UnitSchema;
39
40    fn codec_name() -> String {
41        "()".into()
42    }
43
44    fn encode<B>(&self, _buf: &mut B)
45    where
46        B: BufMut,
47    {
48        // No-op.
49    }
50
51    fn decode<'a>(buf: &'a [u8], _schema: &UnitSchema) -> Result<Self, String> {
52        if !buf.is_empty() {
53            return Err(format!("decode expected empty buf got {} bytes", buf.len()));
54        }
55        Ok(())
56    }
57
58    fn encode_schema(_schema: &Self::Schema) -> Bytes {
59        Bytes::new()
60    }
61
62    fn decode_schema(buf: &Bytes) -> Self::Schema {
63        assert_eq!(*buf, Bytes::new());
64        UnitSchema
65    }
66}
67
68/// An encoder and decoder for [`UnitSchema`].
69#[derive(Debug)]
70pub struct UnitColumnar {
71    /// Number of entries in this column.
72    len: usize,
73}
74
75impl UnitColumnar {
76    /// Returns a new [`UnitColumnar`] with the number of entries specified.
77    pub fn new(len: usize) -> Self {
78        UnitColumnar { len }
79    }
80}
81
82impl ColumnDecoder<()> for UnitColumnar {
83    fn decode(&self, idx: usize, _val: &mut ()) {
84        if idx >= self.len {
85            panic!("index out of bounds, idx: {idx}, len: {}", self.len);
86        }
87    }
88
89    fn is_null(&self, idx: usize) -> bool {
90        if idx < self.len {
91            true
92        } else {
93            panic!("index out of bounds, idx: {idx}, len: {}", self.len);
94        }
95    }
96
97    fn goodbytes(&self) -> usize {
98        0
99    }
100
101    fn stats(&self) -> StructStats {
102        StructStats {
103            len: self.len,
104            cols: BTreeMap::new(),
105        }
106    }
107}
108
109impl ColumnEncoder<()> for UnitColumnar {
110    type FinishedColumn = NullArray;
111
112    fn goodbytes(&self) -> usize {
113        0
114    }
115
116    fn append(&mut self, _val: &()) {
117        self.len += 1;
118    }
119
120    fn append_null(&mut self) {
121        self.len += 1;
122    }
123
124    fn finish(self) -> Self::FinishedColumn {
125        NullArray::new(self.len)
126    }
127}
128
129impl Schema<()> for UnitSchema {
130    type ArrowColumn = NullArray;
131    type Statistics = NoneStats;
132
133    type Decoder = UnitColumnar;
134    type Encoder = UnitColumnar;
135
136    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
137        Ok(UnitColumnar::new(col.len()))
138    }
139
140    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
141        Ok(UnitColumnar::new(0))
142    }
143}
144
145/// Simple type of data that can be columnar encoded.
146pub trait SimpleColumnarData {
147    /// Type of [`arrow`] builder that we encode data into.
148    type ArrowBuilder: arrow::array::ArrayBuilder + Default;
149    /// Type of [`arrow`] array the we decode data from.
150    type ArrowColumn: arrow::array::Array + Clone + 'static;
151
152    /// The number of actual data bytes this item represents.
153    fn goodbytes(builder: &Self::ArrowBuilder) -> usize;
154
155    /// Encode `self` into `builder`.
156    fn push(&self, builder: &mut Self::ArrowBuilder);
157    /// Encode a null value into `builder`.
158    fn push_null(builder: &mut Self::ArrowBuilder);
159
160    /// Decode an instance of `self` from `column`.
161    fn read(&mut self, idx: usize, column: &Self::ArrowColumn);
162}
163
164impl SimpleColumnarData for String {
165    type ArrowBuilder = StringBuilder;
166    type ArrowColumn = StringArray;
167
168    fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
169        builder.values_slice().len()
170    }
171
172    fn push(&self, builder: &mut Self::ArrowBuilder) {
173        builder.append_value(self.as_str())
174    }
175    fn push_null(builder: &mut Self::ArrowBuilder) {
176        builder.append_null()
177    }
178    fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
179        self.clear();
180        self.push_str(column.value(idx));
181    }
182}
183
184impl SimpleColumnarData for Vec<u8> {
185    type ArrowBuilder = BinaryBuilder;
186    type ArrowColumn = BinaryArray;
187
188    fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
189        builder.values_slice().len()
190    }
191
192    fn push(&self, builder: &mut Self::ArrowBuilder) {
193        builder.append_value(self.as_slice())
194    }
195    fn push_null(builder: &mut Self::ArrowBuilder) {
196        builder.append_null()
197    }
198    fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
199        self.clear();
200        self.extend(column.value(idx));
201    }
202}
203
204impl SimpleColumnarData for Bytes {
205    type ArrowBuilder = BinaryBuilder;
206    type ArrowColumn = BinaryArray;
207
208    fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
209        builder.values_slice().len()
210    }
211
212    fn push(&self, builder: &mut Self::ArrowBuilder) {
213        builder.append_value(&self)
214    }
215    fn push_null(builder: &mut Self::ArrowBuilder) {
216        builder.append_null()
217    }
218    fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
219        *self = Bytes::copy_from_slice(column.value(idx));
220    }
221}
222
223impl SimpleColumnarData for ShardId {
224    type ArrowBuilder = StringBuilder;
225    type ArrowColumn = StringArray;
226
227    fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
228        builder.values_slice().len()
229    }
230
231    fn push(&self, builder: &mut Self::ArrowBuilder) {
232        builder.append_value(&self.to_string());
233    }
234    fn push_null(builder: &mut Self::ArrowBuilder) {
235        builder.append_null();
236    }
237    fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
238        *self = column.value(idx).parse().expect("should be valid ShardId");
239    }
240}
241
242/// A type that implements [`ColumnEncoder`] for [`SimpleColumnarData`].
243#[derive(Debug, Default)]
244pub struct SimpleColumnarEncoder<T: SimpleColumnarData>(T::ArrowBuilder);
245
246impl<T: SimpleColumnarData> ColumnEncoder<T> for SimpleColumnarEncoder<T> {
247    type FinishedColumn = T::ArrowColumn;
248
249    fn goodbytes(&self) -> usize {
250        T::goodbytes(&self.0)
251    }
252
253    fn append(&mut self, val: &T) {
254        T::push(val, &mut self.0);
255    }
256    fn append_null(&mut self) {
257        T::push_null(&mut self.0)
258    }
259    fn finish(mut self) -> Self::FinishedColumn {
260        let array = ArrayBuilder::finish(&mut self.0);
261        let array = array
262            .as_any()
263            .downcast_ref::<T::ArrowColumn>()
264            .expect("created using StringBuilder")
265            .clone();
266
267        array
268    }
269}
270
271/// A type that implements [`ColumnDecoder`] for [`SimpleColumnarData`].
272#[derive(Debug)]
273pub struct SimpleColumnarDecoder<T: SimpleColumnarData>(T::ArrowColumn);
274
275impl<T: SimpleColumnarData> SimpleColumnarDecoder<T> {
276    /// Returns a new [`SimpleColumnarDecoder`] with the provided column.
277    pub fn new(col: T::ArrowColumn) -> Self {
278        SimpleColumnarDecoder(col)
279    }
280}
281
282impl<T: SimpleColumnarData> ColumnDecoder<T> for SimpleColumnarDecoder<T> {
283    fn decode(&self, idx: usize, val: &mut T) {
284        T::read(val, idx, &self.0)
285    }
286    fn is_null(&self, idx: usize) -> bool {
287        self.0.is_null(idx)
288    }
289    fn goodbytes(&self) -> usize {
290        ArrayOrd::new(&self.0).goodbytes()
291    }
292
293    fn stats(&self) -> StructStats {
294        ColumnarStats::one_column_struct(self.0.len(), ColumnStatKinds::None)
295    }
296}
297
298/// An implementation of [Schema] for [String].
299#[derive(Debug, Clone, Default, PartialEq)]
300pub struct StringSchema;
301
302impl Schema<String> for StringSchema {
303    type ArrowColumn = StringArray;
304    type Statistics = NoneStats;
305
306    type Decoder = SimpleColumnarDecoder<String>;
307    type Encoder = SimpleColumnarEncoder<String>;
308
309    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
310        Ok(SimpleColumnarEncoder::default())
311    }
312
313    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
314        Ok(SimpleColumnarDecoder::new(col))
315    }
316}
317
318impl Codec for String {
319    type Storage = ();
320    type Schema = StringSchema;
321
322    fn codec_name() -> String {
323        "String".into()
324    }
325
326    fn encode<B>(&self, buf: &mut B)
327    where
328        B: BufMut,
329    {
330        buf.put(self.as_bytes())
331    }
332
333    fn decode<'a>(buf: &'a [u8], _schema: &StringSchema) -> Result<Self, String> {
334        String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())
335    }
336
337    fn encode_schema(_schema: &Self::Schema) -> Bytes {
338        Bytes::new()
339    }
340
341    fn decode_schema(buf: &Bytes) -> Self::Schema {
342        assert_eq!(*buf, Bytes::new());
343        StringSchema
344    }
345}
346
347/// An implementation of [Schema] for [`Vec<u8>`].
348#[derive(Debug, Clone, Default, PartialEq)]
349pub struct VecU8Schema;
350
351impl Schema<Vec<u8>> for VecU8Schema {
352    type ArrowColumn = BinaryArray;
353    type Statistics = NoneStats;
354
355    type Decoder = SimpleColumnarDecoder<Vec<u8>>;
356    type Encoder = SimpleColumnarEncoder<Vec<u8>>;
357
358    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
359        Ok(SimpleColumnarEncoder::default())
360    }
361
362    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
363        Ok(SimpleColumnarDecoder::new(col))
364    }
365}
366
367impl Schema<Bytes> for VecU8Schema {
368    type ArrowColumn = BinaryArray;
369    type Statistics = NoneStats;
370
371    type Decoder = SimpleColumnarDecoder<Bytes>;
372    type Encoder = SimpleColumnarEncoder<Bytes>;
373
374    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
375        Ok(SimpleColumnarEncoder::default())
376    }
377
378    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
379        Ok(SimpleColumnarDecoder::new(col))
380    }
381}
382
383impl Codec for Vec<u8> {
384    type Storage = ();
385    type Schema = VecU8Schema;
386
387    fn codec_name() -> String {
388        BYTES_CODEC_NAME.into()
389    }
390
391    fn encode<B>(&self, buf: &mut B)
392    where
393        B: BufMut,
394    {
395        buf.put(self.as_slice())
396    }
397
398    fn decode<'a>(buf: &'a [u8], _schema: &VecU8Schema) -> Result<Self, String> {
399        Ok(buf.to_owned())
400    }
401
402    fn encode_schema(_schema: &Self::Schema) -> Bytes {
403        Bytes::new()
404    }
405
406    fn decode_schema(buf: &Bytes) -> Self::Schema {
407        assert_eq!(*buf, Bytes::new());
408        VecU8Schema
409    }
410}
411
412impl Codec for Bytes {
413    type Storage = ();
414    type Schema = VecU8Schema;
415
416    fn codec_name() -> String {
417        BYTES_CODEC_NAME.into()
418    }
419
420    fn encode<B>(&self, buf: &mut B)
421    where
422        B: BufMut,
423    {
424        buf.put(self.into_iter().as_slice())
425    }
426
427    fn decode<'a>(buf: &'a [u8], _schema: &VecU8Schema) -> Result<Self, String> {
428        Ok(Bytes::copy_from_slice(buf))
429    }
430
431    fn encode_schema(_schema: &Self::Schema) -> Bytes {
432        Bytes::new()
433    }
434
435    fn decode_schema(buf: &Bytes) -> Self::Schema {
436        assert_eq!(*buf, Bytes::new());
437        VecU8Schema
438    }
439}
440
441impl Codec for ShardId {
442    type Storage = ();
443    type Schema = ShardIdSchema;
444    fn codec_name() -> String {
445        "ShardId".into()
446    }
447    fn encode<B: BufMut>(&self, buf: &mut B) {
448        buf.put(self.to_string().as_bytes())
449    }
450    fn decode<'a>(buf: &'a [u8], _schema: &ShardIdSchema) -> Result<Self, String> {
451        let shard_id = String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())?;
452        shard_id.parse()
453    }
454    fn encode_schema(_schema: &Self::Schema) -> Bytes {
455        Bytes::new()
456    }
457    fn decode_schema(buf: &Bytes) -> Self::Schema {
458        assert_eq!(*buf, Bytes::new());
459        ShardIdSchema
460    }
461}
462
463/// An implementation of [Schema] for [ShardId].
464#[derive(Debug, PartialEq)]
465pub struct ShardIdSchema;
466
467impl Schema<ShardId> for ShardIdSchema {
468    type ArrowColumn = StringArray;
469    type Statistics = NoneStats;
470
471    type Decoder = SimpleColumnarDecoder<ShardId>;
472    type Encoder = SimpleColumnarEncoder<ShardId>;
473
474    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
475        Ok(SimpleColumnarEncoder::default())
476    }
477
478    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
479        Ok(SimpleColumnarDecoder::new(col))
480    }
481}
482
483impl Codec64 for i64 {
484    fn codec_name() -> String {
485        "i64".to_owned()
486    }
487
488    fn encode(&self) -> [u8; 8] {
489        self.to_le_bytes()
490    }
491
492    fn decode(buf: [u8; 8]) -> Self {
493        i64::from_le_bytes(buf)
494    }
495}
496
497impl Codec64 for u64 {
498    fn codec_name() -> String {
499        "u64".to_owned()
500    }
501
502    fn encode(&self) -> [u8; 8] {
503        self.to_le_bytes()
504    }
505
506    fn decode(buf: [u8; 8]) -> Self {
507        u64::from_le_bytes(buf)
508    }
509}
510
511impl Opaque for u64 {
512    fn initial() -> Self {
513        u64::MIN
514    }
515}
516
517// TODO: Remove this once we wrap coord epochs in an `Epoch` struct and impl
518// Opaque on `Epoch` instead.
519impl Opaque for i64 {
520    fn initial() -> Self {
521        i64::MIN
522    }
523}
524
525/// A placeholder for a [Codec] impl that hasn't yet gotten a real [Schema].
526#[derive(Debug)]
527pub struct TodoSchema<T>(PhantomData<T>);
528
529impl<T> Default for TodoSchema<T> {
530    fn default() -> Self {
531        Self(Default::default())
532    }
533}
534
535impl<T> PartialEq for TodoSchema<T> {
536    fn eq(&self, other: &Self) -> bool {
537        self.0 == other.0
538    }
539}
540
541impl<T: Debug + Send + Sync> Schema<T> for TodoSchema<T> {
542    type ArrowColumn = StructArray;
543    type Statistics = NoneStats;
544
545    type Decoder = TodoColumnarDecoder<T>;
546    type Encoder = TodoColumnarEncoder<T>;
547
548    fn decoder(&self, _col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
549        panic!("TODO")
550    }
551
552    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
553        panic!("TODO")
554    }
555}
556
557/// A [`ColumnEncoder`] that has no implementation.
558#[derive(Debug)]
559pub struct TodoColumnarEncoder<T>(PhantomData<T>);
560
561impl<T> ColumnEncoder<T> for TodoColumnarEncoder<T> {
562    type FinishedColumn = StructArray;
563
564    fn goodbytes(&self) -> usize {
565        panic!("TODO")
566    }
567
568    fn append(&mut self, _val: &T) {
569        panic!("TODO")
570    }
571
572    fn append_null(&mut self) {
573        panic!("TODO")
574    }
575
576    fn finish(self) -> Self::FinishedColumn {
577        panic!("TODO")
578    }
579}
580
581/// A [`ColumnDecoder`] that has no implementation.
582#[derive(Debug)]
583pub struct TodoColumnarDecoder<T>(PhantomData<T>);
584
585impl<T> ColumnDecoder<T> for TodoColumnarDecoder<T> {
586    fn decode(&self, _idx: usize, _val: &mut T) {
587        panic!("TODO")
588    }
589
590    fn is_null(&self, _idx: usize) -> bool {
591        panic!("TODO")
592    }
593
594    fn goodbytes(&self) -> usize {
595        panic!("TODO")
596    }
597
598    fn stats(&self) -> StructStats {
599        panic!("TODO")
600    }
601}
602
603#[cfg(test)]
604mod tests {
605    use std::str::FromStr;
606
607    use serde::{Deserialize, Serialize};
608    use serde_json::json;
609
610    use super::*;
611
612    #[mz_ore::test]
613    fn fmt_ids() {
614        assert_eq!(
615            format!("{}", ShardId([0u8; 16])),
616            "s00000000-0000-0000-0000-000000000000"
617        );
618        assert_eq!(
619            format!("{:?}", ShardId([0u8; 16])),
620            "ShardId(00000000-0000-0000-0000-000000000000)"
621        );
622
623        // ShardId can be parsed back from its Display/to_string format.
624        assert_eq!(
625            ShardId::from_str("s00000000-0000-0000-0000-000000000000"),
626            Ok(ShardId([0u8; 16]))
627        );
628        assert_eq!(
629            ShardId::from_str("x00000000-0000-0000-0000-000000000000"),
630            Err(
631                "invalid ShardId x00000000-0000-0000-0000-000000000000: incorrect prefix"
632                    .to_string()
633            )
634        );
635        assert_eq!(
636            ShardId::from_str("s0"),
637            Err(
638                "invalid ShardId s0: invalid length: expected length 32 for simple format, found 1"
639                    .to_string()
640            )
641        );
642        assert_eq!(
643            ShardId::from_str("s00000000-0000-0000-0000-000000000000FOO"),
644            Err("invalid ShardId s00000000-0000-0000-0000-000000000000FOO: invalid character: expected an optional prefix of `urn:uuid:` followed by [0-9a-fA-F-], found `O` at 38".to_string())
645        );
646    }
647
648    #[mz_ore::test]
649    fn shard_id_human_readable_serde() {
650        #[derive(Debug, Serialize, Deserialize)]
651        struct ShardIdContainer {
652            shard_id: ShardId,
653        }
654
655        // roundtrip id through json
656        let id =
657            ShardId::from_str("s00000000-1234-5678-0000-000000000000").expect("valid shard id");
658        assert_eq!(
659            id,
660            serde_json::from_value(serde_json::to_value(id).expect("serializable"))
661                .expect("deserializable")
662        );
663
664        // deserialize a serialized string directly
665        assert_eq!(
666            id,
667            serde_json::from_str("\"s00000000-1234-5678-0000-000000000000\"")
668                .expect("deserializable")
669        );
670
671        // roundtrip shard id through a container type
672        let json = json!({ "shard_id": id });
673        assert_eq!(
674            "{\"shard_id\":\"s00000000-1234-5678-0000-000000000000\"}",
675            &json.to_string()
676        );
677        let container: ShardIdContainer = serde_json::from_value(json).expect("deserializable");
678        assert_eq!(container.shard_id, id);
679    }
680}