mz_persist_types/
codec_impls.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementations of [Codec] for stdlib types.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::marker::PhantomData;
15
16use arrow::array::{
17    Array, ArrayBuilder, BinaryArray, BinaryBuilder, NullArray, StringArray, StringBuilder,
18    StructArray,
19};
20use bytes::{BufMut, Bytes};
21use timely::order::Product;
22
23use crate::arrow::ArrayOrd;
24use crate::columnar::{ColumnDecoder, ColumnEncoder, Schema};
25use crate::stats::{ColumnStatKinds, ColumnarStats, NoneStats, StructStats};
26use crate::{Codec, Codec64, Opaque, ShardId};
27
28/// All codecs that are compatible with a blob of bytes use that same name. This allows us to
29/// switch between the codecs without any incompatibility errors. The name is chosen for historical
30/// reasons.
31const BYTES_CODEC_NAME: &str = "Vec<u8>";
32
33/// An implementation of [Schema] for [()].
34#[derive(Debug, Default, PartialEq)]
35pub struct UnitSchema;
36
37impl Codec for () {
38    type Storage = ();
39    type Schema = UnitSchema;
40
41    fn codec_name() -> String {
42        "()".into()
43    }
44
45    fn encode<B>(&self, _buf: &mut B)
46    where
47        B: BufMut,
48    {
49        // No-op.
50    }
51
52    fn decode<'a>(buf: &'a [u8], _schema: &UnitSchema) -> Result<Self, String> {
53        if !buf.is_empty() {
54            return Err(format!("decode expected empty buf got {} bytes", buf.len()));
55        }
56        Ok(())
57    }
58
59    fn encode_schema(_schema: &Self::Schema) -> Bytes {
60        Bytes::new()
61    }
62
63    fn decode_schema(buf: &Bytes) -> Self::Schema {
64        assert_eq!(*buf, Bytes::new());
65        UnitSchema
66    }
67}
68
69/// An encoder and decoder for [`UnitSchema`].
70#[derive(Debug)]
71pub struct UnitColumnar {
72    /// Number of entries in this column.
73    len: usize,
74}
75
76impl UnitColumnar {
77    /// Returns a new [`UnitColumnar`] with the number of entries specified.
78    pub fn new(len: usize) -> Self {
79        UnitColumnar { len }
80    }
81}
82
83impl ColumnDecoder<()> for UnitColumnar {
84    fn decode(&self, idx: usize, _val: &mut ()) {
85        if idx >= self.len {
86            panic!("index out of bounds, idx: {idx}, len: {}", self.len);
87        }
88    }
89
90    fn is_null(&self, idx: usize) -> bool {
91        if idx < self.len {
92            true
93        } else {
94            panic!("index out of bounds, idx: {idx}, len: {}", self.len);
95        }
96    }
97
98    fn goodbytes(&self) -> usize {
99        0
100    }
101
102    fn stats(&self) -> StructStats {
103        StructStats {
104            len: self.len,
105            cols: BTreeMap::new(),
106        }
107    }
108}
109
110impl ColumnEncoder<()> for UnitColumnar {
111    type FinishedColumn = NullArray;
112
113    fn goodbytes(&self) -> usize {
114        0
115    }
116
117    fn append(&mut self, _val: &()) {
118        self.len += 1;
119    }
120
121    fn append_null(&mut self) {
122        self.len += 1;
123    }
124
125    fn finish(self) -> Self::FinishedColumn {
126        NullArray::new(self.len)
127    }
128}
129
130impl Schema<()> for UnitSchema {
131    type ArrowColumn = NullArray;
132    type Statistics = NoneStats;
133
134    type Decoder = UnitColumnar;
135    type Encoder = UnitColumnar;
136
137    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
138        Ok(UnitColumnar::new(col.len()))
139    }
140
141    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
142        Ok(UnitColumnar::new(0))
143    }
144}
145
146/// Simple type of data that can be columnar encoded.
147pub trait SimpleColumnarData {
148    /// Type of [`arrow`] builder that we encode data into.
149    type ArrowBuilder: arrow::array::ArrayBuilder + Default;
150    /// Type of [`arrow`] array the we decode data from.
151    type ArrowColumn: arrow::array::Array + Clone + 'static;
152
153    /// The number of actual data bytes this item represents.
154    fn goodbytes(builder: &Self::ArrowBuilder) -> usize;
155
156    /// Encode `self` into `builder`.
157    fn push(&self, builder: &mut Self::ArrowBuilder);
158    /// Encode a null value into `builder`.
159    fn push_null(builder: &mut Self::ArrowBuilder);
160
161    /// Decode an instance of `self` from `column`.
162    fn read(&mut self, idx: usize, column: &Self::ArrowColumn);
163}
164
165impl SimpleColumnarData for String {
166    type ArrowBuilder = StringBuilder;
167    type ArrowColumn = StringArray;
168
169    fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
170        builder.values_slice().len()
171    }
172
173    fn push(&self, builder: &mut Self::ArrowBuilder) {
174        builder.append_value(self.as_str())
175    }
176    fn push_null(builder: &mut Self::ArrowBuilder) {
177        builder.append_null()
178    }
179    fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
180        self.clear();
181        self.push_str(column.value(idx));
182    }
183}
184
185impl SimpleColumnarData for Vec<u8> {
186    type ArrowBuilder = BinaryBuilder;
187    type ArrowColumn = BinaryArray;
188
189    fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
190        builder.values_slice().len()
191    }
192
193    fn push(&self, builder: &mut Self::ArrowBuilder) {
194        builder.append_value(self.as_slice())
195    }
196    fn push_null(builder: &mut Self::ArrowBuilder) {
197        builder.append_null()
198    }
199    fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
200        self.clear();
201        self.extend(column.value(idx));
202    }
203}
204
205impl SimpleColumnarData for Bytes {
206    type ArrowBuilder = BinaryBuilder;
207    type ArrowColumn = BinaryArray;
208
209    fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
210        builder.values_slice().len()
211    }
212
213    fn push(&self, builder: &mut Self::ArrowBuilder) {
214        builder.append_value(&self)
215    }
216    fn push_null(builder: &mut Self::ArrowBuilder) {
217        builder.append_null()
218    }
219    fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
220        *self = Bytes::copy_from_slice(column.value(idx));
221    }
222}
223
224impl SimpleColumnarData for ShardId {
225    type ArrowBuilder = StringBuilder;
226    type ArrowColumn = StringArray;
227
228    fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
229        builder.values_slice().len()
230    }
231
232    fn push(&self, builder: &mut Self::ArrowBuilder) {
233        builder.append_value(&self.to_string());
234    }
235    fn push_null(builder: &mut Self::ArrowBuilder) {
236        builder.append_null();
237    }
238    fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
239        *self = column.value(idx).parse().expect("should be valid ShardId");
240    }
241}
242
243/// A type that implements [`ColumnEncoder`] for [`SimpleColumnarData`].
244#[derive(Debug, Default)]
245pub struct SimpleColumnarEncoder<T: SimpleColumnarData>(T::ArrowBuilder);
246
247impl<T: SimpleColumnarData> ColumnEncoder<T> for SimpleColumnarEncoder<T> {
248    type FinishedColumn = T::ArrowColumn;
249
250    fn goodbytes(&self) -> usize {
251        T::goodbytes(&self.0)
252    }
253
254    fn append(&mut self, val: &T) {
255        T::push(val, &mut self.0);
256    }
257    fn append_null(&mut self) {
258        T::push_null(&mut self.0)
259    }
260    fn finish(mut self) -> Self::FinishedColumn {
261        let array = ArrayBuilder::finish(&mut self.0);
262        let array = array
263            .as_any()
264            .downcast_ref::<T::ArrowColumn>()
265            .expect("created using StringBuilder")
266            .clone();
267
268        array
269    }
270}
271
272/// A type that implements [`ColumnDecoder`] for [`SimpleColumnarData`].
273#[derive(Debug)]
274pub struct SimpleColumnarDecoder<T: SimpleColumnarData>(T::ArrowColumn);
275
276impl<T: SimpleColumnarData> SimpleColumnarDecoder<T> {
277    /// Returns a new [`SimpleColumnarDecoder`] with the provided column.
278    pub fn new(col: T::ArrowColumn) -> Self {
279        SimpleColumnarDecoder(col)
280    }
281}
282
283impl<T: SimpleColumnarData> ColumnDecoder<T> for SimpleColumnarDecoder<T> {
284    fn decode(&self, idx: usize, val: &mut T) {
285        T::read(val, idx, &self.0)
286    }
287    fn is_null(&self, idx: usize) -> bool {
288        self.0.is_null(idx)
289    }
290    fn goodbytes(&self) -> usize {
291        ArrayOrd::new(&self.0).goodbytes()
292    }
293
294    fn stats(&self) -> StructStats {
295        ColumnarStats::one_column_struct(self.0.len(), ColumnStatKinds::None)
296    }
297}
298
299/// An implementation of [Schema] for [String].
300#[derive(Debug, Clone, Default, PartialEq)]
301pub struct StringSchema;
302
303impl Schema<String> for StringSchema {
304    type ArrowColumn = StringArray;
305    type Statistics = NoneStats;
306
307    type Decoder = SimpleColumnarDecoder<String>;
308    type Encoder = SimpleColumnarEncoder<String>;
309
310    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
311        Ok(SimpleColumnarEncoder::default())
312    }
313
314    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
315        Ok(SimpleColumnarDecoder::new(col))
316    }
317}
318
319impl Codec for String {
320    type Storage = ();
321    type Schema = StringSchema;
322
323    fn codec_name() -> String {
324        "String".into()
325    }
326
327    fn encode<B>(&self, buf: &mut B)
328    where
329        B: BufMut,
330    {
331        buf.put(self.as_bytes())
332    }
333
334    fn decode<'a>(buf: &'a [u8], _schema: &StringSchema) -> Result<Self, String> {
335        String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())
336    }
337
338    fn encode_schema(_schema: &Self::Schema) -> Bytes {
339        Bytes::new()
340    }
341
342    fn decode_schema(buf: &Bytes) -> Self::Schema {
343        assert_eq!(*buf, Bytes::new());
344        StringSchema
345    }
346}
347
348/// An implementation of [Schema] for [`Vec<u8>`].
349#[derive(Debug, Clone, Default, PartialEq)]
350pub struct VecU8Schema;
351
352impl Schema<Vec<u8>> for VecU8Schema {
353    type ArrowColumn = BinaryArray;
354    type Statistics = NoneStats;
355
356    type Decoder = SimpleColumnarDecoder<Vec<u8>>;
357    type Encoder = SimpleColumnarEncoder<Vec<u8>>;
358
359    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
360        Ok(SimpleColumnarEncoder::default())
361    }
362
363    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
364        Ok(SimpleColumnarDecoder::new(col))
365    }
366}
367
368impl Schema<Bytes> for VecU8Schema {
369    type ArrowColumn = BinaryArray;
370    type Statistics = NoneStats;
371
372    type Decoder = SimpleColumnarDecoder<Bytes>;
373    type Encoder = SimpleColumnarEncoder<Bytes>;
374
375    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
376        Ok(SimpleColumnarEncoder::default())
377    }
378
379    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
380        Ok(SimpleColumnarDecoder::new(col))
381    }
382}
383
384impl Codec for Vec<u8> {
385    type Storage = ();
386    type Schema = VecU8Schema;
387
388    fn codec_name() -> String {
389        BYTES_CODEC_NAME.into()
390    }
391
392    fn encode<B>(&self, buf: &mut B)
393    where
394        B: BufMut,
395    {
396        buf.put(self.as_slice())
397    }
398
399    fn decode<'a>(buf: &'a [u8], _schema: &VecU8Schema) -> Result<Self, String> {
400        Ok(buf.to_owned())
401    }
402
403    fn encode_schema(_schema: &Self::Schema) -> Bytes {
404        Bytes::new()
405    }
406
407    fn decode_schema(buf: &Bytes) -> Self::Schema {
408        assert_eq!(*buf, Bytes::new());
409        VecU8Schema
410    }
411}
412
413impl Codec for Bytes {
414    type Storage = ();
415    type Schema = VecU8Schema;
416
417    fn codec_name() -> String {
418        BYTES_CODEC_NAME.into()
419    }
420
421    fn encode<B>(&self, buf: &mut B)
422    where
423        B: BufMut,
424    {
425        buf.put(self.into_iter().as_slice())
426    }
427
428    fn decode<'a>(buf: &'a [u8], _schema: &VecU8Schema) -> Result<Self, String> {
429        Ok(Bytes::copy_from_slice(buf))
430    }
431
432    fn encode_schema(_schema: &Self::Schema) -> Bytes {
433        Bytes::new()
434    }
435
436    fn decode_schema(buf: &Bytes) -> Self::Schema {
437        assert_eq!(*buf, Bytes::new());
438        VecU8Schema
439    }
440}
441
442impl Codec for ShardId {
443    type Storage = ();
444    type Schema = ShardIdSchema;
445    fn codec_name() -> String {
446        "ShardId".into()
447    }
448    fn encode<B: BufMut>(&self, buf: &mut B) {
449        buf.put(self.to_string().as_bytes())
450    }
451    fn decode<'a>(buf: &'a [u8], _schema: &ShardIdSchema) -> Result<Self, String> {
452        let shard_id = String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())?;
453        shard_id.parse()
454    }
455    fn encode_schema(_schema: &Self::Schema) -> Bytes {
456        Bytes::new()
457    }
458    fn decode_schema(buf: &Bytes) -> Self::Schema {
459        assert_eq!(*buf, Bytes::new());
460        ShardIdSchema
461    }
462}
463
464/// An implementation of [Schema] for [ShardId].
465#[derive(Debug, PartialEq)]
466pub struct ShardIdSchema;
467
468impl Schema<ShardId> for ShardIdSchema {
469    type ArrowColumn = StringArray;
470    type Statistics = NoneStats;
471
472    type Decoder = SimpleColumnarDecoder<ShardId>;
473    type Encoder = SimpleColumnarEncoder<ShardId>;
474
475    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
476        Ok(SimpleColumnarEncoder::default())
477    }
478
479    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
480        Ok(SimpleColumnarDecoder::new(col))
481    }
482}
483
484impl Codec64 for i64 {
485    fn codec_name() -> String {
486        "i64".to_owned()
487    }
488
489    fn encode(&self) -> [u8; 8] {
490        self.to_le_bytes()
491    }
492
493    fn decode(buf: [u8; 8]) -> Self {
494        i64::from_le_bytes(buf)
495    }
496}
497
498impl Codec64 for u64 {
499    fn codec_name() -> String {
500        "u64".to_owned()
501    }
502
503    fn encode(&self) -> [u8; 8] {
504        self.to_le_bytes()
505    }
506
507    fn decode(buf: [u8; 8]) -> Self {
508        u64::from_le_bytes(buf)
509    }
510}
511
512impl Opaque for u64 {
513    fn initial() -> Self {
514        u64::MIN
515    }
516}
517
518impl Codec64 for Product<u32, u32> {
519    fn codec_name() -> String {
520        "Product<u32, u32>".to_owned()
521    }
522
523    fn encode(&self) -> [u8; 8] {
524        let o = self.outer.to_le_bytes();
525        let i = self.inner.to_le_bytes();
526        [o[0], o[1], o[2], o[3], i[0], i[1], i[2], i[3]]
527    }
528
529    fn decode(buf: [u8; 8]) -> Self {
530        let outer = [buf[0], buf[1], buf[2], buf[3]];
531        let inner = [buf[4], buf[5], buf[6], buf[7]];
532        Product::new(u32::from_le_bytes(outer), u32::from_le_bytes(inner))
533    }
534}
535
536// TODO: Remove this once we wrap coord epochs in an `Epoch` struct and impl
537// Opaque on `Epoch` instead.
538impl Opaque for i64 {
539    fn initial() -> Self {
540        i64::MIN
541    }
542}
543
544/// A placeholder for a [Codec] impl that hasn't yet gotten a real [Schema].
545#[derive(Debug)]
546pub struct TodoSchema<T>(PhantomData<T>);
547
548impl<T> Default for TodoSchema<T> {
549    fn default() -> Self {
550        Self(Default::default())
551    }
552}
553
554impl<T> PartialEq for TodoSchema<T> {
555    fn eq(&self, other: &Self) -> bool {
556        self.0 == other.0
557    }
558}
559
560impl<T: Debug + Send + Sync> Schema<T> for TodoSchema<T> {
561    type ArrowColumn = StructArray;
562    type Statistics = NoneStats;
563
564    type Decoder = TodoColumnarDecoder<T>;
565    type Encoder = TodoColumnarEncoder<T>;
566
567    fn decoder(&self, _col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
568        panic!("TODO")
569    }
570
571    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
572        panic!("TODO")
573    }
574}
575
576/// A [`ColumnEncoder`] that has no implementation.
577#[derive(Debug)]
578pub struct TodoColumnarEncoder<T>(PhantomData<T>);
579
580impl<T> ColumnEncoder<T> for TodoColumnarEncoder<T> {
581    type FinishedColumn = StructArray;
582
583    fn goodbytes(&self) -> usize {
584        panic!("TODO")
585    }
586
587    fn append(&mut self, _val: &T) {
588        panic!("TODO")
589    }
590
591    fn append_null(&mut self) {
592        panic!("TODO")
593    }
594
595    fn finish(self) -> Self::FinishedColumn {
596        panic!("TODO")
597    }
598}
599
600/// A [`ColumnDecoder`] that has no implementation.
601#[derive(Debug)]
602pub struct TodoColumnarDecoder<T>(PhantomData<T>);
603
604impl<T> ColumnDecoder<T> for TodoColumnarDecoder<T> {
605    fn decode(&self, _idx: usize, _val: &mut T) {
606        panic!("TODO")
607    }
608
609    fn is_null(&self, _idx: usize) -> bool {
610        panic!("TODO")
611    }
612
613    fn goodbytes(&self) -> usize {
614        panic!("TODO")
615    }
616
617    fn stats(&self) -> StructStats {
618        panic!("TODO")
619    }
620}
621
622#[cfg(test)]
623mod tests {
624    use std::str::FromStr;
625
626    use serde::{Deserialize, Serialize};
627    use serde_json::json;
628
629    use super::*;
630
631    #[mz_ore::test]
632    fn fmt_ids() {
633        assert_eq!(
634            format!("{}", ShardId([0u8; 16])),
635            "s00000000-0000-0000-0000-000000000000"
636        );
637        assert_eq!(
638            format!("{:?}", ShardId([0u8; 16])),
639            "ShardId(00000000-0000-0000-0000-000000000000)"
640        );
641
642        // ShardId can be parsed back from its Display/to_string format.
643        assert_eq!(
644            ShardId::from_str("s00000000-0000-0000-0000-000000000000"),
645            Ok(ShardId([0u8; 16]))
646        );
647        assert_eq!(
648            ShardId::from_str("x00000000-0000-0000-0000-000000000000"),
649            Err(
650                "invalid ShardId x00000000-0000-0000-0000-000000000000: incorrect prefix"
651                    .to_string()
652            )
653        );
654        assert_eq!(
655            ShardId::from_str("s0"),
656            Err(
657                "invalid ShardId s0: invalid length: expected length 32 for simple format, found 1"
658                    .to_string()
659            )
660        );
661        assert_eq!(
662            ShardId::from_str("s00000000-0000-0000-0000-000000000000FOO"),
663            Err("invalid ShardId s00000000-0000-0000-0000-000000000000FOO: invalid character: expected an optional prefix of `urn:uuid:` followed by [0-9a-fA-F-], found `O` at 38".to_string())
664        );
665    }
666
667    #[mz_ore::test]
668    fn shard_id_human_readable_serde() {
669        #[derive(Debug, Serialize, Deserialize)]
670        struct ShardIdContainer {
671            shard_id: ShardId,
672        }
673
674        // roundtrip id through json
675        let id =
676            ShardId::from_str("s00000000-1234-5678-0000-000000000000").expect("valid shard id");
677        assert_eq!(
678            id,
679            serde_json::from_value(serde_json::to_value(id).expect("serializable"))
680                .expect("deserializable")
681        );
682
683        // deserialize a serialized string directly
684        assert_eq!(
685            id,
686            serde_json::from_str("\"s00000000-1234-5678-0000-000000000000\"")
687                .expect("deserializable")
688        );
689
690        // roundtrip shard id through a container type
691        let json = json!({ "shard_id": id });
692        assert_eq!(
693            "{\"shard_id\":\"s00000000-1234-5678-0000-000000000000\"}",
694            &json.to_string()
695        );
696        let container: ShardIdContainer = serde_json::from_value(json).expect("deserializable");
697        assert_eq!(container.shard_id, id);
698    }
699}