Skip to main content

mz_avro/
encode.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24use crate::schema::{Schema, SchemaNode, SchemaPiece};
25use crate::types::{DecimalValue, Value};
26use crate::util::{zig_i32, zig_i64};
27
28/// Encode a `Value` into avro format.
29///
30/// **NOTE** This will not perform schema validation. The value is assumed to
31/// be valid with regards to the schema. Schema are needed only to guide the
32/// encoding for complex type values.
33pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
34    encode_ref(value, schema.top_node(), buffer)
35}
36
37fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
38    let bytes = s.as_ref();
39    encode(
40        &Value::Long(bytes.len() as i64),
41        &Schema {
42            named: vec![],
43            indices: Default::default(),
44            top: SchemaPiece::Long.into(),
45        },
46        buffer,
47    );
48    buffer.extend_from_slice(bytes);
49}
50
51fn encode_long(i: i64, buffer: &mut Vec<u8>) {
52    zig_i64(i, buffer)
53}
54
55fn encode_int(i: i32, buffer: &mut Vec<u8>) {
56    zig_i32(i, buffer)
57}
58
59/// Encode a `Value` into avro format.
60///
61/// **NOTE** This will not perform schema validation. The value is assumed to
62/// be valid with regards to the schema. Schema are needed only to guide the
63/// encoding for complex type values.
64pub fn encode_ref(value: &Value, schema: SchemaNode, buffer: &mut Vec<u8>) {
65    match value {
66        Value::Null => (),
67        Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
68        Value::Int(i) => encode_int(*i, buffer),
69        Value::Long(i) => encode_long(*i, buffer),
70        Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
71        Value::Date(d) => encode_int(*d, buffer),
72        Value::Timestamp(d) => {
73            let mult = match schema.inner {
74                SchemaPiece::TimestampMilli => 1_000,
75                SchemaPiece::TimestampMicro => 1_000_000,
76                other => panic!("Invalid schema for timestamp: {:?}", other),
77            };
78            let ts_seconds = d
79                .and_utc()
80                .timestamp()
81                .checked_mul(mult)
82                .expect("All chrono dates can be converted to timestamps");
83            let sub_part: i64 = if mult == 1_000 {
84                d.and_utc().timestamp_subsec_millis().into()
85            } else {
86                d.and_utc().timestamp_subsec_micros().into()
87            };
88            let ts = ts_seconds + sub_part;
89            encode_long(ts, buffer)
90        }
91        Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
92        Value::Decimal(DecimalValue { unscaled, .. }) => match schema.name {
93            None => encode_bytes(unscaled, buffer),
94            Some(_) => {
95                // Fixed-size decimal: left-pad to exact size with two's-complement
96                // sign extension (0xFF for negative, 0x00 for non-negative).
97                if let SchemaPiece::Decimal {
98                    fixed_size: Some(size),
99                    ..
100                } = schema.inner
101                {
102                    let is_negative = unscaled.first().map_or(false, |b| b & 0x80 != 0);
103                    let pad = if is_negative { 0xFFu8 } else { 0x00u8 };
104                    let start = buffer.len();
105                    buffer.resize(start + size.saturating_sub(unscaled.len()), pad);
106                }
107                buffer.extend_from_slice(unscaled);
108            }
109        },
110        Value::Bytes(bytes) => encode_bytes(bytes, buffer),
111        Value::String(s) => match schema.inner {
112            SchemaPiece::String => {
113                encode_bytes(s, buffer);
114            }
115            SchemaPiece::Enum { symbols, .. } => {
116                if let Some(index) = symbols.iter().position(|item| item == s) {
117                    encode_int(index as i32, buffer);
118                }
119            }
120            _ => (),
121        },
122        Value::Fixed(_, bytes) => buffer.extend(bytes),
123        Value::Enum(i, _) => encode_int(*i as i32, buffer),
124        Value::Union { index, inner, .. } => {
125            if let SchemaPiece::Union(schema_inner) = schema.inner {
126                let schema_inner = &schema_inner.variants()[*index];
127                encode_long(*index as i64, buffer);
128                encode_ref(&*inner, schema.step(schema_inner), buffer);
129            }
130        }
131        Value::Array(items) => {
132            if let SchemaPiece::Array(inner) = schema.inner {
133                if !items.is_empty() {
134                    encode_long(items.len() as i64, buffer);
135                    for item in items.iter() {
136                        encode_ref(item, schema.step(&**inner), buffer);
137                    }
138                }
139                buffer.push(0u8);
140            }
141        }
142        Value::Map(items) => {
143            if let SchemaPiece::Map(inner) = schema.inner {
144                if !items.is_empty() {
145                    encode_long(items.len() as i64, buffer);
146                    for (key, value) in items {
147                        encode_bytes(key, buffer);
148                        encode_ref(value, schema.step(&**inner), buffer);
149                    }
150                }
151                buffer.push(0u8);
152            }
153        }
154        Value::Record(fields) => {
155            if let SchemaPiece::Record {
156                fields: inner_fields,
157                ..
158            } = schema.inner
159            {
160                for (i, &(_, ref value)) in fields.iter().enumerate() {
161                    encode_ref(value, schema.step(&inner_fields[i].schema), buffer);
162                }
163            }
164        }
165        Value::Json(j) => {
166            encode_bytes(&j.to_string(), buffer);
167        }
168        Value::Uuid(u) => {
169            let u_str = u.to_string();
170            encode_bytes(&u_str, buffer);
171        }
172    }
173}
174
175pub fn encode_to_vec(value: &Value, schema: &Schema) -> Vec<u8> {
176    let mut buffer = Vec::new();
177    encode(value, schema, &mut buffer);
178    buffer
179}
180
181#[cfg(test)]
182mod tests {
183    use std::collections::BTreeMap;
184
185    use super::*;
186
187    #[mz_ore::test]
188    fn test_encode_empty_array() {
189        let mut buf = Vec::new();
190        let empty: Vec<Value> = Vec::new();
191        encode(
192            &Value::Array(empty),
193            &r#"{"type": "array", "items": "int"}"#.parse().unwrap(),
194            &mut buf,
195        );
196        assert_eq!(vec![0u8], buf);
197    }
198
199    #[mz_ore::test]
200    fn test_encode_empty_map() {
201        let mut buf = Vec::new();
202        let empty: BTreeMap<String, Value> = BTreeMap::new();
203        encode(
204            &Value::Map(empty),
205            &r#"{"type": "map", "values": "int"}"#.parse().unwrap(),
206            &mut buf,
207        );
208        assert_eq!(vec![0u8], buf);
209    }
210}