1use crate::schema::{Schema, SchemaNode, SchemaPiece};
25use crate::types::{DecimalValue, Value};
26use crate::util::{zig_i32, zig_i64};
27
28pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
34 encode_ref(value, schema.top_node(), buffer)
35}
36
37fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
38 let bytes = s.as_ref();
39 encode(
40 &Value::Long(bytes.len() as i64),
41 &Schema {
42 named: vec![],
43 indices: Default::default(),
44 top: SchemaPiece::Long.into(),
45 },
46 buffer,
47 );
48 buffer.extend_from_slice(bytes);
49}
50
51fn encode_long(i: i64, buffer: &mut Vec<u8>) {
52 zig_i64(i, buffer)
53}
54
55fn encode_int(i: i32, buffer: &mut Vec<u8>) {
56 zig_i32(i, buffer)
57}
58
59pub fn encode_ref(value: &Value, schema: SchemaNode, buffer: &mut Vec<u8>) {
65 match value {
66 Value::Null => (),
67 Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
68 Value::Int(i) => encode_int(*i, buffer),
69 Value::Long(i) => encode_long(*i, buffer),
70 Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
71 Value::Date(d) => encode_int(*d, buffer),
72 Value::Timestamp(d) => {
73 let mult = match schema.inner {
74 SchemaPiece::TimestampMilli => 1_000,
75 SchemaPiece::TimestampMicro => 1_000_000,
76 other => panic!("Invalid schema for timestamp: {:?}", other),
77 };
78 let ts_seconds = d
79 .and_utc()
80 .timestamp()
81 .checked_mul(mult)
82 .expect("All chrono dates can be converted to timestamps");
83 let sub_part: i64 = if mult == 1_000 {
84 d.and_utc().timestamp_subsec_millis().into()
85 } else {
86 d.and_utc().timestamp_subsec_micros().into()
87 };
88 let ts = ts_seconds + sub_part;
89 encode_long(ts, buffer)
90 }
91 Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
92 Value::Decimal(DecimalValue { unscaled, .. }) => match schema.name {
93 None => encode_bytes(unscaled, buffer),
94 Some(_) => {
95 if let SchemaPiece::Decimal {
98 fixed_size: Some(size),
99 ..
100 } = schema.inner
101 {
102 let is_negative = unscaled.first().map_or(false, |b| b & 0x80 != 0);
103 let pad = if is_negative { 0xFFu8 } else { 0x00u8 };
104 let start = buffer.len();
105 buffer.resize(start + size.saturating_sub(unscaled.len()), pad);
106 }
107 buffer.extend_from_slice(unscaled);
108 }
109 },
110 Value::Bytes(bytes) => encode_bytes(bytes, buffer),
111 Value::String(s) => match schema.inner {
112 SchemaPiece::String => {
113 encode_bytes(s, buffer);
114 }
115 SchemaPiece::Enum { symbols, .. } => {
116 if let Some(index) = symbols.iter().position(|item| item == s) {
117 encode_int(index as i32, buffer);
118 }
119 }
120 _ => (),
121 },
122 Value::Fixed(_, bytes) => buffer.extend(bytes),
123 Value::Enum(i, _) => encode_int(*i as i32, buffer),
124 Value::Union { index, inner, .. } => {
125 if let SchemaPiece::Union(schema_inner) = schema.inner {
126 let schema_inner = &schema_inner.variants()[*index];
127 encode_long(*index as i64, buffer);
128 encode_ref(&*inner, schema.step(schema_inner), buffer);
129 }
130 }
131 Value::Array(items) => {
132 if let SchemaPiece::Array(inner) = schema.inner {
133 if !items.is_empty() {
134 encode_long(items.len() as i64, buffer);
135 for item in items.iter() {
136 encode_ref(item, schema.step(&**inner), buffer);
137 }
138 }
139 buffer.push(0u8);
140 }
141 }
142 Value::Map(items) => {
143 if let SchemaPiece::Map(inner) = schema.inner {
144 if !items.is_empty() {
145 encode_long(items.len() as i64, buffer);
146 for (key, value) in items {
147 encode_bytes(key, buffer);
148 encode_ref(value, schema.step(&**inner), buffer);
149 }
150 }
151 buffer.push(0u8);
152 }
153 }
154 Value::Record(fields) => {
155 if let SchemaPiece::Record {
156 fields: inner_fields,
157 ..
158 } = schema.inner
159 {
160 for (i, &(_, ref value)) in fields.iter().enumerate() {
161 encode_ref(value, schema.step(&inner_fields[i].schema), buffer);
162 }
163 }
164 }
165 Value::Json(j) => {
166 encode_bytes(&j.to_string(), buffer);
167 }
168 Value::Uuid(u) => {
169 let u_str = u.to_string();
170 encode_bytes(&u_str, buffer);
171 }
172 }
173}
174
175pub fn encode_to_vec(value: &Value, schema: &Schema) -> Vec<u8> {
176 let mut buffer = Vec::new();
177 encode(value, schema, &mut buffer);
178 buffer
179}
180
181#[cfg(test)]
182mod tests {
183 use std::collections::BTreeMap;
184
185 use super::*;
186
187 #[mz_ore::test]
188 fn test_encode_empty_array() {
189 let mut buf = Vec::new();
190 let empty: Vec<Value> = Vec::new();
191 encode(
192 &Value::Array(empty),
193 &r#"{"type": "array", "items": "int"}"#.parse().unwrap(),
194 &mut buf,
195 );
196 assert_eq!(vec![0u8], buf);
197 }
198
199 #[mz_ore::test]
200 fn test_encode_empty_map() {
201 let mut buf = Vec::new();
202 let empty: BTreeMap<String, Value> = BTreeMap::new();
203 encode(
204 &Value::Map(empty),
205 &r#"{"type": "map", "values": "int"}"#.parse().unwrap(),
206 &mut buf,
207 );
208 assert_eq!(vec![0u8], buf);
209 }
210}