mz_interchange/
json.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt;
12
13use mz_repr::adt::array::ArrayDimension;
14use mz_repr::adt::char;
15use mz_repr::adt::jsonb::JsonbRef;
16use mz_repr::adt::numeric::{NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION};
17use mz_repr::{CatalogItemId, ColumnName, Datum, RelationDesc, SqlColumnType, SqlScalarType};
18use serde_json::{Map, json};
19
20use crate::avro::DocTarget;
21use crate::encode::{Encode, TypedDatum, column_names_and_types};
22use crate::envelopes;
23
24const AVRO_NAMESPACE: &str = "com.materialize.sink";
25const MICROS_PER_MILLIS: u32 = 1_000;
26
27// Manages encoding of JSON-encoded bytes
28pub struct JsonEncoder {
29    columns: Vec<(ColumnName, SqlColumnType)>,
30}
31
32impl JsonEncoder {
33    pub fn new(desc: RelationDesc, debezium: bool) -> Self {
34        let mut columns = column_names_and_types(desc);
35        if debezium {
36            columns = envelopes::dbz_envelope(columns);
37        };
38        JsonEncoder { columns }
39    }
40}
41
42impl Encode for JsonEncoder {
43    fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
44        let value = encode_datums_as_json(row.iter(), self.columns.as_ref());
45        value.to_string().into_bytes()
46    }
47}
48
49impl fmt::Debug for JsonEncoder {
50    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
51        f.debug_struct("JsonEncoder")
52            .field(
53                "schema",
54                &format!(
55                    "{:?}",
56                    build_row_schema_json(
57                        &self.columns,
58                        "schema",
59                        &BTreeMap::new(),
60                        None,
61                        &Default::default(),
62                    )
63                ),
64            )
65            .finish()
66    }
67}
68
69/// Encodes a sequence of `Datum` as JSON, using supplied column names and types.
70pub fn encode_datums_as_json<'a, I>(
71    datums: I,
72    names_types: &[(ColumnName, SqlColumnType)],
73) -> serde_json::Value
74where
75    I: IntoIterator<Item = Datum<'a>>,
76{
77    let value_fields = datums
78        .into_iter()
79        .zip(names_types)
80        .map(|(datum, (name, typ))| {
81            (
82                name.to_string(),
83                TypedDatum::new(datum, typ).json(&JsonNumberPolicy::KeepAsNumber),
84            )
85        })
86        .collect();
87    serde_json::Value::Object(value_fields)
88}
89
90/// Policies for how to handle Numbers in JSON.
91#[derive(Debug)]
92pub enum JsonNumberPolicy {
93    /// Do not change Numbers.
94    KeepAsNumber,
95    /// Convert Numbers to their String representation. Useful for JavaScript consumers that may
96    /// interpret some numbers incorrectly.
97    ConvertNumberToString,
98}
99
100pub trait ToJson {
101    /// Transforms this value to a JSON value.
102    fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value;
103}
104
105impl ToJson for TypedDatum<'_> {
106    fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value {
107        let TypedDatum { datum, typ } = self;
108        if typ.nullable && datum.is_null() {
109            return serde_json::Value::Null;
110        }
111        let value = match &typ.scalar_type {
112            SqlScalarType::AclItem => json!(datum.unwrap_acl_item().to_string()),
113            SqlScalarType::Bool => json!(datum.unwrap_bool()),
114            SqlScalarType::PgLegacyChar => json!(datum.unwrap_uint8()),
115            SqlScalarType::Int16 => json!(datum.unwrap_int16()),
116            SqlScalarType::Int32 => json!(datum.unwrap_int32()),
117            SqlScalarType::Int64 => json!(datum.unwrap_int64()),
118            SqlScalarType::UInt16 => json!(datum.unwrap_uint16()),
119            SqlScalarType::UInt32
120            | SqlScalarType::Oid
121            | SqlScalarType::RegClass
122            | SqlScalarType::RegProc
123            | SqlScalarType::RegType => {
124                json!(datum.unwrap_uint32())
125            }
126            SqlScalarType::UInt64 => json!(datum.unwrap_uint64()),
127            SqlScalarType::Float32 => json!(datum.unwrap_float32()),
128            SqlScalarType::Float64 => json!(datum.unwrap_float64()),
129            SqlScalarType::Numeric { .. } => {
130                json!(datum.unwrap_numeric().0.to_standard_notation_string())
131            }
132            // https://stackoverflow.com/questions/10286204/what-is-the-right-json-date-format
133            SqlScalarType::Date => serde_json::Value::String(format!("{}", datum.unwrap_date())),
134            SqlScalarType::Time => serde_json::Value::String(format!("{:?}", datum.unwrap_time())),
135            SqlScalarType::Timestamp { .. } => {
136                let dt = datum.unwrap_timestamp().to_naive().and_utc();
137                let millis = dt.timestamp_millis();
138                let micros = dt.timestamp_subsec_micros()
139                    - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
140                serde_json::Value::String(format!("{millis}.{micros:0>3}"))
141            }
142            SqlScalarType::TimestampTz { .. } => {
143                let dt = datum.unwrap_timestamptz().to_utc();
144                let millis = dt.timestamp_millis();
145                let micros = dt.timestamp_subsec_micros()
146                    - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
147                serde_json::Value::String(format!("{millis}.{micros:0>3}"))
148            }
149            SqlScalarType::Interval => {
150                serde_json::Value::String(format!("{}", datum.unwrap_interval()))
151            }
152            SqlScalarType::Bytes => json!(datum.unwrap_bytes()),
153            SqlScalarType::String | SqlScalarType::VarChar { .. } | SqlScalarType::PgLegacyName => {
154                json!(datum.unwrap_str())
155            }
156            SqlScalarType::Char { length } => {
157                let s = char::format_str_pad(datum.unwrap_str(), *length);
158                serde_json::Value::String(s)
159            }
160            SqlScalarType::Jsonb => JsonbRef::from_datum(datum).to_serde_json(),
161            SqlScalarType::Uuid => json!(datum.unwrap_uuid()),
162            ty @ (SqlScalarType::Array(..) | SqlScalarType::Int2Vector) => {
163                let array = datum.unwrap_array();
164                let dims = array.dims().into_iter().collect::<Vec<_>>();
165                let mut datums = array.elements().iter();
166                encode_array(&mut datums, &dims, &mut |datum| {
167                    TypedDatum::new(
168                        datum,
169                        &SqlColumnType {
170                            nullable: true,
171                            scalar_type: ty.unwrap_collection_element_type().clone(),
172                        },
173                    )
174                    .json(number_policy)
175                })
176            }
177            SqlScalarType::List { element_type, .. } => {
178                let values = datum
179                    .unwrap_list()
180                    .into_iter()
181                    .map(|datum| {
182                        TypedDatum::new(
183                            datum,
184                            &SqlColumnType {
185                                nullable: true,
186                                scalar_type: (**element_type).clone(),
187                            },
188                        )
189                        .json(number_policy)
190                    })
191                    .collect();
192                serde_json::Value::Array(values)
193            }
194            SqlScalarType::Record { fields, .. } => {
195                let list = datum.unwrap_list();
196                let fields: Map<String, serde_json::Value> = fields
197                    .iter()
198                    .zip(&list)
199                    .map(|((name, typ), datum)| {
200                        let name = name.to_string();
201                        let datum = TypedDatum::new(datum, typ);
202                        let value = datum.json(number_policy);
203                        (name, value)
204                    })
205                    .collect();
206                fields.into()
207            }
208            SqlScalarType::Map { value_type, .. } => {
209                let map = datum.unwrap_map();
210                let elements = map
211                    .into_iter()
212                    .map(|(key, datum)| {
213                        let value = TypedDatum::new(
214                            datum,
215                            &SqlColumnType {
216                                nullable: true,
217                                scalar_type: (**value_type).clone(),
218                            },
219                        )
220                        .json(number_policy);
221                        (key.to_string(), value)
222                    })
223                    .collect();
224                serde_json::Value::Object(elements)
225            }
226            SqlScalarType::MzTimestamp => json!(datum.unwrap_mz_timestamp().to_string()),
227            SqlScalarType::Range { .. } => {
228                // Ranges' interiors are not expected to be types whose
229                // string representations are misleading/wrong, e.g.
230                // records.
231                json!(datum.unwrap_range().to_string())
232            }
233            SqlScalarType::MzAclItem => json!(datum.unwrap_mz_acl_item().to_string()),
234        };
235        // We don't need to recurse into map or object here because those already recursively call
236        // .json() with the number policy to generate the member Values.
237        match (number_policy, value) {
238            (JsonNumberPolicy::KeepAsNumber, value) => value,
239            (JsonNumberPolicy::ConvertNumberToString, serde_json::Value::Number(n)) => {
240                serde_json::Value::String(n.to_string())
241            }
242            (JsonNumberPolicy::ConvertNumberToString, value) => value,
243        }
244    }
245}
246
247fn encode_array<'a>(
248    elems: &mut impl Iterator<Item = Datum<'a>>,
249    dims: &[ArrayDimension],
250    elem_encoder: &mut impl FnMut(Datum<'_>) -> serde_json::Value,
251) -> serde_json::Value {
252    serde_json::Value::Array(match dims {
253        [] => vec![],
254        [dim] => elems.take(dim.length).map(elem_encoder).collect(),
255        [dim, rest @ ..] => (0..dim.length)
256            .map(|_| encode_array(elems, rest, elem_encoder))
257            .collect(),
258    })
259}
260
261fn build_row_schema_field_type(
262    type_namer: &mut Namer,
263    custom_names: &BTreeMap<CatalogItemId, String>,
264    typ: &SqlColumnType,
265    item_id: Option<CatalogItemId>,
266    options: &SchemaOptions,
267) -> serde_json::Value {
268    let mut field_type = match &typ.scalar_type {
269        SqlScalarType::AclItem => json!("string"),
270        SqlScalarType::Bool => json!("boolean"),
271        SqlScalarType::PgLegacyChar => json!({
272            "type": "fixed",
273            "size": 1,
274        }),
275        SqlScalarType::Int16 | SqlScalarType::Int32 => {
276            json!("int")
277        }
278        SqlScalarType::Int64 => json!("long"),
279        SqlScalarType::UInt16 => type_namer.unsigned_type(2),
280        SqlScalarType::UInt32
281        | SqlScalarType::Oid
282        | SqlScalarType::RegClass
283        | SqlScalarType::RegProc
284        | SqlScalarType::RegType => type_namer.unsigned_type(4),
285        SqlScalarType::UInt64 => type_namer.unsigned_type(8),
286        SqlScalarType::Float32 => json!("float"),
287        SqlScalarType::Float64 => json!("double"),
288        SqlScalarType::Date => json!({
289            "type": "int",
290            "logicalType": "date",
291        }),
292        SqlScalarType::Time => json!({
293            "type": "long",
294            "logicalType": "time-micros",
295        }),
296        SqlScalarType::Timestamp { precision } | SqlScalarType::TimestampTz { precision } => {
297            json!({
298                "type": "long",
299                "logicalType": match precision {
300                    Some(precision) if precision.into_u8() <= 3 => "timestamp-millis",
301                    _ => "timestamp-micros",
302                },
303            })
304        }
305        SqlScalarType::Interval => type_namer.interval_type(),
306        SqlScalarType::Bytes => json!("bytes"),
307        SqlScalarType::String
308        | SqlScalarType::Char { .. }
309        | SqlScalarType::VarChar { .. }
310        | SqlScalarType::PgLegacyName => {
311            json!("string")
312        }
313        SqlScalarType::Jsonb => json!({
314            "type": "string",
315            "connect.name": "io.debezium.data.Json",
316        }),
317        SqlScalarType::Uuid => json!({
318            "type": "string",
319            "logicalType": "uuid",
320        }),
321        ty
322        @ (SqlScalarType::Array(..) | SqlScalarType::Int2Vector | SqlScalarType::List { .. }) => {
323            let inner = build_row_schema_field_type(
324                type_namer,
325                custom_names,
326                &SqlColumnType {
327                    nullable: true,
328                    scalar_type: ty.unwrap_collection_element_type().clone(),
329                },
330                item_id,
331                options,
332            );
333            json!({
334                "type": "array",
335                "items": inner
336            })
337        }
338        SqlScalarType::Map { value_type, .. } => {
339            let inner = build_row_schema_field_type(
340                type_namer,
341                custom_names,
342                &SqlColumnType {
343                    nullable: true,
344                    scalar_type: (**value_type).clone(),
345                },
346                item_id,
347                options,
348            );
349            json!({
350                "type": "map",
351                "values": inner
352            })
353        }
354        SqlScalarType::Record {
355            fields, custom_id, ..
356        } => {
357            let (name, name_seen) = match custom_id.as_ref().and_then(|id| custom_names.get(id)) {
358                Some(name) => type_namer.valid_name(name),
359                None => (type_namer.anonymous_record_name(), false),
360            };
361            if name_seen {
362                json!(name)
363            } else {
364                let fields = fields.to_vec();
365                let json_fields =
366                    build_row_schema_fields(&fields, type_namer, custom_names, *custom_id, options);
367                if let Some(comment) =
368                    custom_id.and_then(|id| options.doc_comments.get(&DocTarget::Type(id)))
369                {
370                    json!({
371                        "type": "record",
372                        "name": name,
373                        "doc": comment,
374                        "fields": json_fields
375                    })
376                } else {
377                    json!({
378                        "type": "record",
379                        "name": name,
380                        "fields": json_fields
381                    })
382                }
383            }
384        }
385        SqlScalarType::Numeric { max_scale } => {
386            let (p, s) = match max_scale {
387                Some(max_scale) => (NUMERIC_DATUM_MAX_PRECISION, max_scale.into_u8()),
388                None => (NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION),
389            };
390            json!({
391                "type": "bytes",
392                "logicalType": "decimal",
393                "precision": p,
394                "scale": s,
395            })
396        }
397        SqlScalarType::MzTimestamp => json!("string"),
398        // https://debezium.io/documentation/reference/stable/connectors/postgresql.html
399        SqlScalarType::Range { .. } => json!("string"),
400        SqlScalarType::MzAclItem => json!("string"),
401    };
402    if typ.nullable {
403        // Should be revisited if we ever support a different kind of union scheme.
404        // Currently adding the "null" at the beginning means we can set the default
405        // value to "null" if such a preference is set.
406        field_type = json!(["null", field_type]);
407    }
408    field_type
409}
410
411fn build_row_schema_fields(
412    columns: &[(ColumnName, SqlColumnType)],
413    type_namer: &mut Namer,
414    custom_names: &BTreeMap<CatalogItemId, String>,
415    item_id: Option<CatalogItemId>,
416    options: &SchemaOptions,
417) -> Vec<serde_json::Value> {
418    let mut fields = Vec::new();
419    let mut field_namer = Namer::default();
420    for (name, typ) in columns.iter() {
421        let (name, _seen) = field_namer.valid_name(name);
422        let field_type =
423            build_row_schema_field_type(type_namer, custom_names, typ, item_id, options);
424
425        let mut field = json!({
426            "name": name,
427            "type": field_type,
428        });
429
430        // It's a nullable union if the type is an array and the first option is "null"
431        let is_nullable_union = field_type
432            .as_array()
433            .is_some_and(|array| array.first().is_some_and(|first| first == &json!("null")));
434
435        if options.set_null_defaults && is_nullable_union {
436            field
437                .as_object_mut()
438                .expect("`field` initialized to JSON object above")
439                .insert("default".to_string(), json!(null));
440        }
441
442        if let Some(comment) = item_id.and_then(|item_id| {
443            options.doc_comments.get(&DocTarget::Field {
444                object_id: item_id,
445                column_name: name.into(),
446            })
447        }) {
448            field
449                .as_object_mut()
450                .expect("`field` initialized to JSON object above")
451                .insert("doc".to_string(), json!(comment));
452        }
453
454        fields.push(field);
455    }
456    fields
457}
458
459#[derive(Default, Clone, Debug)]
460/// Struct to pass around options to create the json schema
461pub struct SchemaOptions {
462    /// Boolean flag to enable null defaults.
463    pub set_null_defaults: bool,
464    /// Map containing comments for an item or field, used to populate
465    /// documentation in the generated avro schema
466    pub doc_comments: BTreeMap<DocTarget, String>,
467}
468
469/// Builds the JSON for the row schema, which can be independently useful.
470pub fn build_row_schema_json(
471    columns: &[(ColumnName, SqlColumnType)],
472    name: &str,
473    custom_names: &BTreeMap<CatalogItemId, String>,
474    item_id: Option<CatalogItemId>,
475    options: &SchemaOptions,
476) -> Result<serde_json::Value, anyhow::Error> {
477    let fields = build_row_schema_fields(
478        columns,
479        &mut Namer::default(),
480        custom_names,
481        item_id,
482        options,
483    );
484
485    let _ = mz_avro::schema::Name::parse_simple(name)?;
486    if let Some(comment) =
487        item_id.and_then(|item_id| options.doc_comments.get(&DocTarget::Type(item_id)))
488    {
489        Ok(json!({
490            "type": "record",
491            "doc": comment,
492            "fields": fields,
493            "name": name
494        }))
495    } else {
496        Ok(json!({
497            "type": "record",
498            "fields": fields,
499            "name": name
500        }))
501    }
502}
503
504/// Naming helper for use when constructing an Avro schema.
505#[derive(Default)]
506struct Namer {
507    record_index: usize,
508    seen_interval: bool,
509    seen_unsigneds: BTreeSet<usize>,
510    seen_names: BTreeMap<String, String>,
511    valid_names_count: BTreeMap<String, usize>,
512}
513
514impl Namer {
515    /// Returns the schema for an interval type.
516    fn interval_type(&mut self) -> serde_json::Value {
517        let name = format!("{AVRO_NAMESPACE}.interval");
518        if self.seen_interval {
519            json!(name)
520        } else {
521            self.seen_interval = true;
522            json!({
523            "type": "fixed",
524            "size": 16,
525            "name": name,
526            })
527        }
528    }
529
530    /// Returns the schema for an unsigned integer with the given width.
531    fn unsigned_type(&mut self, width: usize) -> serde_json::Value {
532        let name = format!("{AVRO_NAMESPACE}.uint{width}");
533        if self.seen_unsigneds.contains(&width) {
534            json!(name)
535        } else {
536            self.seen_unsigneds.insert(width);
537            json!({
538                "type": "fixed",
539                "size": width,
540                "name": name,
541            })
542        }
543    }
544
545    /// Returns a name to use for a new anonymous record.
546    fn anonymous_record_name(&mut self) -> String {
547        let out = format!("{AVRO_NAMESPACE}.record{}", self.record_index);
548        self.record_index += 1;
549        out
550    }
551
552    /// Turns `name` into a valid, unique name for use in the Avro schema.
553    ///
554    /// Returns the valid name and whether `name` has been seen before.
555    fn valid_name(&mut self, name: &str) -> (String, bool) {
556        if let Some(valid_name) = self.seen_names.get(name) {
557            (valid_name.into(), true)
558        } else {
559            let mut valid_name = mz_avro::schema::Name::make_valid(name);
560            let valid_name_count = self
561                .valid_names_count
562                .entry(valid_name.clone())
563                .or_default();
564            if *valid_name_count != 0 {
565                valid_name += &valid_name_count.to_string();
566            }
567            *valid_name_count += 1;
568            self.seen_names.insert(name.into(), valid_name.clone());
569            (valid_name, false)
570        }
571    }
572}