mz_interchange/
json.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt;
12
13use itertools::Itertools;
14use mz_repr::adt::array::ArrayDimension;
15use mz_repr::adt::char;
16use mz_repr::adt::jsonb::JsonbRef;
17use mz_repr::adt::numeric::{NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION};
18use mz_repr::{CatalogItemId, ColumnName, Datum, RelationDesc, SqlColumnType, SqlScalarType};
19use serde_json::{Map, json};
20
21use crate::avro::DocTarget;
22use crate::encode::{Encode, TypedDatum, column_names_and_types};
23use crate::envelopes;
24
25const AVRO_NAMESPACE: &str = "com.materialize.sink";
26const MICROS_PER_MILLIS: u32 = 1_000;
27
28// Manages encoding of JSON-encoded bytes
29pub struct JsonEncoder {
30    columns: Vec<(ColumnName, SqlColumnType)>,
31}
32
33impl JsonEncoder {
34    pub fn new(desc: RelationDesc, debezium: bool) -> Self {
35        let mut columns = column_names_and_types(desc);
36        if debezium {
37            columns = envelopes::dbz_envelope(columns);
38        };
39        JsonEncoder { columns }
40    }
41}
42
43impl Encode for JsonEncoder {
44    fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
45        let value = encode_datums_as_json(row.iter(), self.columns.as_ref());
46        value.to_string().into_bytes()
47    }
48}
49
50impl fmt::Debug for JsonEncoder {
51    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
52        f.debug_struct("JsonEncoder")
53            .field(
54                "schema",
55                &format!(
56                    "{:?}",
57                    build_row_schema_json(
58                        &self.columns,
59                        "schema",
60                        &BTreeMap::new(),
61                        None,
62                        &Default::default(),
63                    )
64                ),
65            )
66            .finish()
67    }
68}
69
70/// Encodes a sequence of `Datum` as JSON, using supplied column names and types.
71pub fn encode_datums_as_json<'a, I>(
72    datums: I,
73    names_types: &[(ColumnName, SqlColumnType)],
74) -> serde_json::Value
75where
76    I: IntoIterator<Item = Datum<'a>>,
77{
78    let value_fields = datums
79        .into_iter()
80        .zip_eq(names_types)
81        .map(|(datum, (name, typ))| {
82            (
83                name.to_string(),
84                TypedDatum::new(datum, typ).json(&JsonNumberPolicy::KeepAsNumber),
85            )
86        })
87        .collect();
88    serde_json::Value::Object(value_fields)
89}
90
91/// Policies for how to handle Numbers in JSON.
92#[derive(Debug)]
93pub enum JsonNumberPolicy {
94    /// Do not change Numbers.
95    KeepAsNumber,
96    /// Convert Numbers to their String representation. Useful for JavaScript consumers that may
97    /// interpret some numbers incorrectly.
98    ConvertNumberToString,
99}
100
101pub trait ToJson {
102    /// Transforms this value to a JSON value.
103    fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value;
104}
105
106impl ToJson for TypedDatum<'_> {
107    fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value {
108        let TypedDatum { datum, typ } = self;
109        if typ.nullable && datum.is_null() {
110            return serde_json::Value::Null;
111        }
112        let value = match &typ.scalar_type {
113            SqlScalarType::AclItem => json!(datum.unwrap_acl_item().to_string()),
114            SqlScalarType::Bool => json!(datum.unwrap_bool()),
115            SqlScalarType::PgLegacyChar => json!(datum.unwrap_uint8()),
116            SqlScalarType::Int16 => json!(datum.unwrap_int16()),
117            SqlScalarType::Int32 => json!(datum.unwrap_int32()),
118            SqlScalarType::Int64 => json!(datum.unwrap_int64()),
119            SqlScalarType::UInt16 => json!(datum.unwrap_uint16()),
120            SqlScalarType::UInt32
121            | SqlScalarType::Oid
122            | SqlScalarType::RegClass
123            | SqlScalarType::RegProc
124            | SqlScalarType::RegType => {
125                json!(datum.unwrap_uint32())
126            }
127            SqlScalarType::UInt64 => json!(datum.unwrap_uint64()),
128            SqlScalarType::Float32 => json!(datum.unwrap_float32()),
129            SqlScalarType::Float64 => json!(datum.unwrap_float64()),
130            SqlScalarType::Numeric { .. } => {
131                json!(datum.unwrap_numeric().0.to_standard_notation_string())
132            }
133            // https://stackoverflow.com/questions/10286204/what-is-the-right-json-date-format
134            SqlScalarType::Date => serde_json::Value::String(format!("{}", datum.unwrap_date())),
135            SqlScalarType::Time => serde_json::Value::String(format!("{:?}", datum.unwrap_time())),
136            SqlScalarType::Timestamp { .. } => {
137                let dt = datum.unwrap_timestamp().to_naive().and_utc();
138                let millis = dt.timestamp_millis();
139                let micros = dt.timestamp_subsec_micros()
140                    - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
141                serde_json::Value::String(format!("{millis}.{micros:0>3}"))
142            }
143            SqlScalarType::TimestampTz { .. } => {
144                let dt = datum.unwrap_timestamptz().to_utc();
145                let millis = dt.timestamp_millis();
146                let micros = dt.timestamp_subsec_micros()
147                    - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
148                serde_json::Value::String(format!("{millis}.{micros:0>3}"))
149            }
150            SqlScalarType::Interval => {
151                serde_json::Value::String(format!("{}", datum.unwrap_interval()))
152            }
153            SqlScalarType::Bytes => json!(datum.unwrap_bytes()),
154            SqlScalarType::String | SqlScalarType::VarChar { .. } | SqlScalarType::PgLegacyName => {
155                json!(datum.unwrap_str())
156            }
157            SqlScalarType::Char { length } => {
158                let s = char::format_str_pad(datum.unwrap_str(), *length);
159                serde_json::Value::String(s)
160            }
161            SqlScalarType::Jsonb => JsonbRef::from_datum(datum).to_serde_json(),
162            SqlScalarType::Uuid => json!(datum.unwrap_uuid()),
163            ty @ (SqlScalarType::Array(..) | SqlScalarType::Int2Vector) => {
164                let array = datum.unwrap_array();
165                let dims = array.dims().into_iter().collect::<Vec<_>>();
166                let mut datums = array.elements().iter();
167                encode_array(&mut datums, &dims, &mut |datum| {
168                    TypedDatum::new(
169                        datum,
170                        &SqlColumnType {
171                            nullable: true,
172                            scalar_type: ty.unwrap_collection_element_type().clone(),
173                        },
174                    )
175                    .json(number_policy)
176                })
177            }
178            SqlScalarType::List { element_type, .. } => {
179                let values = datum
180                    .unwrap_list()
181                    .into_iter()
182                    .map(|datum| {
183                        TypedDatum::new(
184                            datum,
185                            &SqlColumnType {
186                                nullable: true,
187                                scalar_type: (**element_type).clone(),
188                            },
189                        )
190                        .json(number_policy)
191                    })
192                    .collect();
193                serde_json::Value::Array(values)
194            }
195            SqlScalarType::Record { fields, .. } => {
196                let list = datum.unwrap_list();
197                let fields: Map<String, serde_json::Value> = fields
198                    .iter()
199                    .zip_eq(&list)
200                    .map(|((name, typ), datum)| {
201                        let name = name.to_string();
202                        let datum = TypedDatum::new(datum, typ);
203                        let value = datum.json(number_policy);
204                        (name, value)
205                    })
206                    .collect();
207                fields.into()
208            }
209            SqlScalarType::Map { value_type, .. } => {
210                let map = datum.unwrap_map();
211                let elements = map
212                    .into_iter()
213                    .map(|(key, datum)| {
214                        let value = TypedDatum::new(
215                            datum,
216                            &SqlColumnType {
217                                nullable: true,
218                                scalar_type: (**value_type).clone(),
219                            },
220                        )
221                        .json(number_policy);
222                        (key.to_string(), value)
223                    })
224                    .collect();
225                serde_json::Value::Object(elements)
226            }
227            SqlScalarType::MzTimestamp => json!(datum.unwrap_mz_timestamp().to_string()),
228            SqlScalarType::Range { .. } => {
229                // Ranges' interiors are not expected to be types whose
230                // string representations are misleading/wrong, e.g.
231                // records.
232                json!(datum.unwrap_range().to_string())
233            }
234            SqlScalarType::MzAclItem => json!(datum.unwrap_mz_acl_item().to_string()),
235        };
236        // We don't need to recurse into map or object here because those already recursively call
237        // .json() with the number policy to generate the member Values.
238        match (number_policy, value) {
239            (JsonNumberPolicy::KeepAsNumber, value) => value,
240            (JsonNumberPolicy::ConvertNumberToString, serde_json::Value::Number(n)) => {
241                serde_json::Value::String(n.to_string())
242            }
243            (JsonNumberPolicy::ConvertNumberToString, value) => value,
244        }
245    }
246}
247
248fn encode_array<'a>(
249    elems: &mut impl Iterator<Item = Datum<'a>>,
250    dims: &[ArrayDimension],
251    elem_encoder: &mut impl FnMut(Datum<'_>) -> serde_json::Value,
252) -> serde_json::Value {
253    serde_json::Value::Array(match dims {
254        [] => vec![],
255        [dim] => elems.take(dim.length).map(elem_encoder).collect(),
256        [dim, rest @ ..] => (0..dim.length)
257            .map(|_| encode_array(elems, rest, elem_encoder))
258            .collect(),
259    })
260}
261
262fn build_row_schema_field_type(
263    type_namer: &mut Namer,
264    custom_names: &BTreeMap<CatalogItemId, String>,
265    typ: &SqlColumnType,
266    item_id: Option<CatalogItemId>,
267    options: &SchemaOptions,
268) -> serde_json::Value {
269    let mut field_type = match &typ.scalar_type {
270        SqlScalarType::AclItem => json!("string"),
271        SqlScalarType::Bool => json!("boolean"),
272        SqlScalarType::PgLegacyChar => json!({
273            "type": "fixed",
274            "size": 1,
275        }),
276        SqlScalarType::Int16 | SqlScalarType::Int32 => {
277            json!("int")
278        }
279        SqlScalarType::Int64 => json!("long"),
280        SqlScalarType::UInt16 => type_namer.unsigned_type(2),
281        SqlScalarType::UInt32
282        | SqlScalarType::Oid
283        | SqlScalarType::RegClass
284        | SqlScalarType::RegProc
285        | SqlScalarType::RegType => type_namer.unsigned_type(4),
286        SqlScalarType::UInt64 => type_namer.unsigned_type(8),
287        SqlScalarType::Float32 => json!("float"),
288        SqlScalarType::Float64 => json!("double"),
289        SqlScalarType::Date => json!({
290            "type": "int",
291            "logicalType": "date",
292        }),
293        SqlScalarType::Time => json!({
294            "type": "long",
295            "logicalType": "time-micros",
296        }),
297        SqlScalarType::Timestamp { precision } | SqlScalarType::TimestampTz { precision } => {
298            json!({
299                "type": "long",
300                "logicalType": match precision {
301                    Some(precision) if precision.into_u8() <= 3 => "timestamp-millis",
302                    _ => "timestamp-micros",
303                },
304            })
305        }
306        SqlScalarType::Interval => type_namer.interval_type(),
307        SqlScalarType::Bytes => json!("bytes"),
308        SqlScalarType::String
309        | SqlScalarType::Char { .. }
310        | SqlScalarType::VarChar { .. }
311        | SqlScalarType::PgLegacyName => {
312            json!("string")
313        }
314        SqlScalarType::Jsonb => json!({
315            "type": "string",
316            "connect.name": "io.debezium.data.Json",
317        }),
318        SqlScalarType::Uuid => json!({
319            "type": "string",
320            "logicalType": "uuid",
321        }),
322        ty
323        @ (SqlScalarType::Array(..) | SqlScalarType::Int2Vector | SqlScalarType::List { .. }) => {
324            let inner = build_row_schema_field_type(
325                type_namer,
326                custom_names,
327                &SqlColumnType {
328                    nullable: true,
329                    scalar_type: ty.unwrap_collection_element_type().clone(),
330                },
331                item_id,
332                options,
333            );
334            json!({
335                "type": "array",
336                "items": inner
337            })
338        }
339        SqlScalarType::Map { value_type, .. } => {
340            let inner = build_row_schema_field_type(
341                type_namer,
342                custom_names,
343                &SqlColumnType {
344                    nullable: true,
345                    scalar_type: (**value_type).clone(),
346                },
347                item_id,
348                options,
349            );
350            json!({
351                "type": "map",
352                "values": inner
353            })
354        }
355        SqlScalarType::Record {
356            fields, custom_id, ..
357        } => {
358            let (name, name_seen) = match custom_id.as_ref().and_then(|id| custom_names.get(id)) {
359                Some(name) => type_namer.valid_name(name),
360                None => (type_namer.anonymous_record_name(), false),
361            };
362            if name_seen {
363                json!(name)
364            } else {
365                let fields = fields.to_vec();
366                let json_fields =
367                    build_row_schema_fields(&fields, type_namer, custom_names, *custom_id, options);
368                if let Some(comment) =
369                    custom_id.and_then(|id| options.doc_comments.get(&DocTarget::Type(id)))
370                {
371                    json!({
372                        "type": "record",
373                        "name": name,
374                        "doc": comment,
375                        "fields": json_fields
376                    })
377                } else {
378                    json!({
379                        "type": "record",
380                        "name": name,
381                        "fields": json_fields
382                    })
383                }
384            }
385        }
386        SqlScalarType::Numeric { max_scale } => {
387            let (p, s) = match max_scale {
388                Some(max_scale) => (NUMERIC_DATUM_MAX_PRECISION, max_scale.into_u8()),
389                None => (NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION),
390            };
391            json!({
392                "type": "bytes",
393                "logicalType": "decimal",
394                "precision": p,
395                "scale": s,
396            })
397        }
398        SqlScalarType::MzTimestamp => json!("string"),
399        // https://debezium.io/documentation/reference/stable/connectors/postgresql.html
400        SqlScalarType::Range { .. } => json!("string"),
401        SqlScalarType::MzAclItem => json!("string"),
402    };
403    if typ.nullable {
404        // Should be revisited if we ever support a different kind of union scheme.
405        // Currently adding the "null" at the beginning means we can set the default
406        // value to "null" if such a preference is set.
407        field_type = json!(["null", field_type]);
408    }
409    field_type
410}
411
412fn build_row_schema_fields(
413    columns: &[(ColumnName, SqlColumnType)],
414    type_namer: &mut Namer,
415    custom_names: &BTreeMap<CatalogItemId, String>,
416    item_id: Option<CatalogItemId>,
417    options: &SchemaOptions,
418) -> Vec<serde_json::Value> {
419    let mut fields = Vec::new();
420    let mut field_namer = Namer::default();
421    for (name, typ) in columns.iter() {
422        let (name, _seen) = field_namer.valid_name(name);
423        let field_type =
424            build_row_schema_field_type(type_namer, custom_names, typ, item_id, options);
425
426        let mut field = json!({
427            "name": name,
428            "type": field_type,
429        });
430
431        // It's a nullable union if the type is an array and the first option is "null"
432        let is_nullable_union = field_type
433            .as_array()
434            .is_some_and(|array| array.first().is_some_and(|first| first == &json!("null")));
435
436        if options.set_null_defaults && is_nullable_union {
437            field
438                .as_object_mut()
439                .expect("`field` initialized to JSON object above")
440                .insert("default".to_string(), json!(null));
441        }
442
443        if let Some(comment) = item_id.and_then(|item_id| {
444            options.doc_comments.get(&DocTarget::Field {
445                object_id: item_id,
446                column_name: name.into(),
447            })
448        }) {
449            field
450                .as_object_mut()
451                .expect("`field` initialized to JSON object above")
452                .insert("doc".to_string(), json!(comment));
453        }
454
455        fields.push(field);
456    }
457    fields
458}
459
460#[derive(Default, Clone, Debug)]
461/// Struct to pass around options to create the json schema
462pub struct SchemaOptions {
463    /// Boolean flag to enable null defaults.
464    pub set_null_defaults: bool,
465    /// Map containing comments for an item or field, used to populate
466    /// documentation in the generated avro schema
467    pub doc_comments: BTreeMap<DocTarget, String>,
468}
469
470/// Builds the JSON for the row schema, which can be independently useful.
471pub fn build_row_schema_json(
472    columns: &[(ColumnName, SqlColumnType)],
473    name: &str,
474    custom_names: &BTreeMap<CatalogItemId, String>,
475    item_id: Option<CatalogItemId>,
476    options: &SchemaOptions,
477) -> Result<serde_json::Value, anyhow::Error> {
478    let fields = build_row_schema_fields(
479        columns,
480        &mut Namer::default(),
481        custom_names,
482        item_id,
483        options,
484    );
485
486    let _ = mz_avro::schema::Name::parse_simple(name)?;
487    if let Some(comment) =
488        item_id.and_then(|item_id| options.doc_comments.get(&DocTarget::Type(item_id)))
489    {
490        Ok(json!({
491            "type": "record",
492            "doc": comment,
493            "fields": fields,
494            "name": name
495        }))
496    } else {
497        Ok(json!({
498            "type": "record",
499            "fields": fields,
500            "name": name
501        }))
502    }
503}
504
505/// Naming helper for use when constructing an Avro schema.
506#[derive(Default)]
507struct Namer {
508    record_index: usize,
509    seen_interval: bool,
510    seen_unsigneds: BTreeSet<usize>,
511    seen_names: BTreeMap<String, String>,
512    valid_names_count: BTreeMap<String, usize>,
513}
514
515impl Namer {
516    /// Returns the schema for an interval type.
517    fn interval_type(&mut self) -> serde_json::Value {
518        let name = format!("{AVRO_NAMESPACE}.interval");
519        if self.seen_interval {
520            json!(name)
521        } else {
522            self.seen_interval = true;
523            json!({
524            "type": "fixed",
525            "size": 16,
526            "name": name,
527            })
528        }
529    }
530
531    /// Returns the schema for an unsigned integer with the given width.
532    fn unsigned_type(&mut self, width: usize) -> serde_json::Value {
533        let name = format!("{AVRO_NAMESPACE}.uint{width}");
534        if self.seen_unsigneds.contains(&width) {
535            json!(name)
536        } else {
537            self.seen_unsigneds.insert(width);
538            json!({
539                "type": "fixed",
540                "size": width,
541                "name": name,
542            })
543        }
544    }
545
546    /// Returns a name to use for a new anonymous record.
547    fn anonymous_record_name(&mut self) -> String {
548        let out = format!("{AVRO_NAMESPACE}.record{}", self.record_index);
549        self.record_index += 1;
550        out
551    }
552
553    /// Turns `name` into a valid, unique name for use in the Avro schema.
554    ///
555    /// Returns the valid name and whether `name` has been seen before.
556    fn valid_name(&mut self, name: &str) -> (String, bool) {
557        if let Some(valid_name) = self.seen_names.get(name) {
558            (valid_name.into(), true)
559        } else {
560            let mut valid_name = mz_avro::schema::Name::make_valid(name);
561            let valid_name_count = self
562                .valid_names_count
563                .entry(valid_name.clone())
564                .or_default();
565            if *valid_name_count != 0 {
566                valid_name += &valid_name_count.to_string();
567            }
568            *valid_name_count += 1;
569            self.seen_names.insert(name.into(), valid_name.clone());
570            (valid_name, false)
571        }
572    }
573}