mz_interchange/
json.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt;
12
13use itertools::Itertools;
14use mz_repr::adt::array::ArrayDimension;
15use mz_repr::adt::char;
16use mz_repr::adt::jsonb::JsonbRef;
17use mz_repr::adt::numeric::{NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION};
18use mz_repr::{CatalogItemId, ColumnName, ColumnType, Datum, RelationDesc, ScalarType};
19use serde_json::{Map, json};
20
21use crate::avro::DocTarget;
22use crate::encode::{Encode, TypedDatum, column_names_and_types};
23use crate::envelopes;
24
25const AVRO_NAMESPACE: &str = "com.materialize.sink";
26const MICROS_PER_MILLIS: u32 = 1_000;
27
28// Manages encoding of JSON-encoded bytes
29pub struct JsonEncoder {
30    columns: Vec<(ColumnName, ColumnType)>,
31}
32
33impl JsonEncoder {
34    pub fn new(desc: RelationDesc, debezium: bool) -> Self {
35        let mut columns = column_names_and_types(desc);
36        if debezium {
37            columns = envelopes::dbz_envelope(columns);
38        };
39        JsonEncoder { columns }
40    }
41}
42
43impl Encode for JsonEncoder {
44    fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
45        let value = encode_datums_as_json(row.iter(), self.columns.as_ref());
46        value.to_string().into_bytes()
47    }
48}
49
50impl fmt::Debug for JsonEncoder {
51    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
52        f.debug_struct("JsonEncoder")
53            .field(
54                "schema",
55                &format!(
56                    "{:?}",
57                    build_row_schema_json(
58                        &self.columns,
59                        "schema",
60                        &BTreeMap::new(),
61                        None,
62                        &Default::default(),
63                    )
64                ),
65            )
66            .finish()
67    }
68}
69
70/// Encodes a sequence of `Datum` as JSON, using supplied column names and types.
71pub fn encode_datums_as_json<'a, I>(
72    datums: I,
73    names_types: &[(ColumnName, ColumnType)],
74) -> serde_json::Value
75where
76    I: IntoIterator<Item = Datum<'a>>,
77{
78    let value_fields = datums
79        .into_iter()
80        .zip_eq(names_types)
81        .map(|(datum, (name, typ))| {
82            (
83                name.to_string(),
84                TypedDatum::new(datum, typ).json(&JsonNumberPolicy::KeepAsNumber),
85            )
86        })
87        .collect();
88    serde_json::Value::Object(value_fields)
89}
90
91/// Policies for how to handle Numbers in JSON.
92#[derive(Debug)]
93pub enum JsonNumberPolicy {
94    /// Do not change Numbers.
95    KeepAsNumber,
96    /// Convert Numbers to their String representation. Useful for JavaScript consumers that may
97    /// interpret some numbers incorrectly.
98    ConvertNumberToString,
99}
100
101pub trait ToJson {
102    /// Transforms this value to a JSON value.
103    fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value;
104}
105
106impl ToJson for TypedDatum<'_> {
107    fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value {
108        let TypedDatum { datum, typ } = self;
109        if typ.nullable && datum.is_null() {
110            return serde_json::Value::Null;
111        }
112        let value = match &typ.scalar_type {
113            ScalarType::AclItem => json!(datum.unwrap_acl_item().to_string()),
114            ScalarType::Bool => json!(datum.unwrap_bool()),
115            ScalarType::PgLegacyChar => json!(datum.unwrap_uint8()),
116            ScalarType::Int16 => json!(datum.unwrap_int16()),
117            ScalarType::Int32 => json!(datum.unwrap_int32()),
118            ScalarType::Int64 => json!(datum.unwrap_int64()),
119            ScalarType::UInt16 => json!(datum.unwrap_uint16()),
120            ScalarType::UInt32
121            | ScalarType::Oid
122            | ScalarType::RegClass
123            | ScalarType::RegProc
124            | ScalarType::RegType => {
125                json!(datum.unwrap_uint32())
126            }
127            ScalarType::UInt64 => json!(datum.unwrap_uint64()),
128            ScalarType::Float32 => json!(datum.unwrap_float32()),
129            ScalarType::Float64 => json!(datum.unwrap_float64()),
130            ScalarType::Numeric { .. } => {
131                json!(datum.unwrap_numeric().0.to_standard_notation_string())
132            }
133            // https://stackoverflow.com/questions/10286204/what-is-the-right-json-date-format
134            ScalarType::Date => serde_json::Value::String(format!("{}", datum.unwrap_date())),
135            ScalarType::Time => serde_json::Value::String(format!("{:?}", datum.unwrap_time())),
136            ScalarType::Timestamp { .. } => {
137                let dt = datum.unwrap_timestamp().to_naive().and_utc();
138                let millis = dt.timestamp_millis();
139                let micros = dt.timestamp_subsec_micros()
140                    - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
141                serde_json::Value::String(format!("{millis}.{micros:0>3}"))
142            }
143            ScalarType::TimestampTz { .. } => {
144                let dt = datum.unwrap_timestamptz().to_utc();
145                let millis = dt.timestamp_millis();
146                let micros = dt.timestamp_subsec_micros()
147                    - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
148                serde_json::Value::String(format!("{millis}.{micros:0>3}"))
149            }
150            ScalarType::Interval => {
151                serde_json::Value::String(format!("{}", datum.unwrap_interval()))
152            }
153            ScalarType::Bytes => json!(datum.unwrap_bytes()),
154            ScalarType::String | ScalarType::VarChar { .. } | ScalarType::PgLegacyName => {
155                json!(datum.unwrap_str())
156            }
157            ScalarType::Char { length } => {
158                let s = char::format_str_pad(datum.unwrap_str(), *length);
159                serde_json::Value::String(s)
160            }
161            ScalarType::Jsonb => JsonbRef::from_datum(datum).to_serde_json(),
162            ScalarType::Uuid => json!(datum.unwrap_uuid()),
163            ty @ (ScalarType::Array(..) | ScalarType::Int2Vector) => {
164                let array = datum.unwrap_array();
165                let dims = array.dims().into_iter().collect::<Vec<_>>();
166                let mut datums = array.elements().iter();
167                encode_array(&mut datums, &dims, &mut |datum| {
168                    TypedDatum::new(
169                        datum,
170                        &ColumnType {
171                            nullable: true,
172                            scalar_type: ty.unwrap_collection_element_type().clone(),
173                        },
174                    )
175                    .json(number_policy)
176                })
177            }
178            ScalarType::List { element_type, .. } => {
179                let values = datum
180                    .unwrap_list()
181                    .into_iter()
182                    .map(|datum| {
183                        TypedDatum::new(
184                            datum,
185                            &ColumnType {
186                                nullable: true,
187                                scalar_type: (**element_type).clone(),
188                            },
189                        )
190                        .json(number_policy)
191                    })
192                    .collect();
193                serde_json::Value::Array(values)
194            }
195            ScalarType::Record { fields, .. } => {
196                let list = datum.unwrap_list();
197                let fields: Map<String, serde_json::Value> = fields
198                    .iter()
199                    .zip_eq(&list)
200                    .map(|((name, typ), datum)| {
201                        let name = name.to_string();
202                        let datum = TypedDatum::new(datum, typ);
203                        let value = datum.json(number_policy);
204                        (name, value)
205                    })
206                    .collect();
207                fields.into()
208            }
209            ScalarType::Map { value_type, .. } => {
210                let map = datum.unwrap_map();
211                let elements = map
212                    .into_iter()
213                    .map(|(key, datum)| {
214                        let value = TypedDatum::new(
215                            datum,
216                            &ColumnType {
217                                nullable: true,
218                                scalar_type: (**value_type).clone(),
219                            },
220                        )
221                        .json(number_policy);
222                        (key.to_string(), value)
223                    })
224                    .collect();
225                serde_json::Value::Object(elements)
226            }
227            ScalarType::MzTimestamp => json!(datum.unwrap_mz_timestamp().to_string()),
228            ScalarType::Range { .. } => {
229                // Ranges' interiors are not expected to be types whose
230                // string representations are misleading/wrong, e.g.
231                // records.
232                json!(datum.unwrap_range().to_string())
233            }
234            ScalarType::MzAclItem => json!(datum.unwrap_mz_acl_item().to_string()),
235        };
236        // We don't need to recurse into map or object here because those already recursively call
237        // .json() with the number policy to generate the member Values.
238        match (number_policy, value) {
239            (JsonNumberPolicy::KeepAsNumber, value) => value,
240            (JsonNumberPolicy::ConvertNumberToString, serde_json::Value::Number(n)) => {
241                serde_json::Value::String(n.to_string())
242            }
243            (JsonNumberPolicy::ConvertNumberToString, value) => value,
244        }
245    }
246}
247
248fn encode_array<'a>(
249    elems: &mut impl Iterator<Item = Datum<'a>>,
250    dims: &[ArrayDimension],
251    elem_encoder: &mut impl FnMut(Datum<'_>) -> serde_json::Value,
252) -> serde_json::Value {
253    serde_json::Value::Array(match dims {
254        [] => vec![],
255        [dim] => elems.take(dim.length).map(elem_encoder).collect(),
256        [dim, rest @ ..] => (0..dim.length)
257            .map(|_| encode_array(elems, rest, elem_encoder))
258            .collect(),
259    })
260}
261
262fn build_row_schema_field_type(
263    type_namer: &mut Namer,
264    custom_names: &BTreeMap<CatalogItemId, String>,
265    typ: &ColumnType,
266    item_id: Option<CatalogItemId>,
267    options: &SchemaOptions,
268) -> serde_json::Value {
269    let mut field_type = match &typ.scalar_type {
270        ScalarType::AclItem => json!("string"),
271        ScalarType::Bool => json!("boolean"),
272        ScalarType::PgLegacyChar => json!({
273            "type": "fixed",
274            "size": 1,
275        }),
276        ScalarType::Int16 | ScalarType::Int32 => {
277            json!("int")
278        }
279        ScalarType::Int64 => json!("long"),
280        ScalarType::UInt16 => type_namer.unsigned_type(2),
281        ScalarType::UInt32
282        | ScalarType::Oid
283        | ScalarType::RegClass
284        | ScalarType::RegProc
285        | ScalarType::RegType => type_namer.unsigned_type(4),
286        ScalarType::UInt64 => type_namer.unsigned_type(8),
287        ScalarType::Float32 => json!("float"),
288        ScalarType::Float64 => json!("double"),
289        ScalarType::Date => json!({
290            "type": "int",
291            "logicalType": "date",
292        }),
293        ScalarType::Time => json!({
294            "type": "long",
295            "logicalType": "time-micros",
296        }),
297        ScalarType::Timestamp { precision } | ScalarType::TimestampTz { precision } => json!({
298            "type": "long",
299            "logicalType": match precision {
300                Some(precision) if precision.into_u8() <= 3 => "timestamp-millis",
301                _ => "timestamp-micros",
302            },
303        }),
304        ScalarType::Interval => type_namer.interval_type(),
305        ScalarType::Bytes => json!("bytes"),
306        ScalarType::String
307        | ScalarType::Char { .. }
308        | ScalarType::VarChar { .. }
309        | ScalarType::PgLegacyName => {
310            json!("string")
311        }
312        ScalarType::Jsonb => json!({
313            "type": "string",
314            "connect.name": "io.debezium.data.Json",
315        }),
316        ScalarType::Uuid => json!({
317            "type": "string",
318            "logicalType": "uuid",
319        }),
320        ty @ (ScalarType::Array(..) | ScalarType::Int2Vector | ScalarType::List { .. }) => {
321            let inner = build_row_schema_field_type(
322                type_namer,
323                custom_names,
324                &ColumnType {
325                    nullable: true,
326                    scalar_type: ty.unwrap_collection_element_type().clone(),
327                },
328                item_id,
329                options,
330            );
331            json!({
332                "type": "array",
333                "items": inner
334            })
335        }
336        ScalarType::Map { value_type, .. } => {
337            let inner = build_row_schema_field_type(
338                type_namer,
339                custom_names,
340                &ColumnType {
341                    nullable: true,
342                    scalar_type: (**value_type).clone(),
343                },
344                item_id,
345                options,
346            );
347            json!({
348                "type": "map",
349                "values": inner
350            })
351        }
352        ScalarType::Record {
353            fields, custom_id, ..
354        } => {
355            let (name, name_seen) = match custom_id.as_ref().and_then(|id| custom_names.get(id)) {
356                Some(name) => type_namer.valid_name(name),
357                None => (type_namer.anonymous_record_name(), false),
358            };
359            if name_seen {
360                json!(name)
361            } else {
362                let fields = fields.to_vec();
363                let json_fields =
364                    build_row_schema_fields(&fields, type_namer, custom_names, *custom_id, options);
365                if let Some(comment) =
366                    custom_id.and_then(|id| options.doc_comments.get(&DocTarget::Type(id)))
367                {
368                    json!({
369                        "type": "record",
370                        "name": name,
371                        "doc": comment,
372                        "fields": json_fields
373                    })
374                } else {
375                    json!({
376                        "type": "record",
377                        "name": name,
378                        "fields": json_fields
379                    })
380                }
381            }
382        }
383        ScalarType::Numeric { max_scale } => {
384            let (p, s) = match max_scale {
385                Some(max_scale) => (NUMERIC_DATUM_MAX_PRECISION, max_scale.into_u8()),
386                None => (NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION),
387            };
388            json!({
389                "type": "bytes",
390                "logicalType": "decimal",
391                "precision": p,
392                "scale": s,
393            })
394        }
395        ScalarType::MzTimestamp => json!("string"),
396        // https://debezium.io/documentation/reference/stable/connectors/postgresql.html
397        ScalarType::Range { .. } => json!("string"),
398        ScalarType::MzAclItem => json!("string"),
399    };
400    if typ.nullable {
401        // Should be revisited if we ever support a different kind of union scheme.
402        // Currently adding the "null" at the beginning means we can set the default
403        // value to "null" if such a preference is set.
404        field_type = json!(["null", field_type]);
405    }
406    field_type
407}
408
409fn build_row_schema_fields(
410    columns: &[(ColumnName, ColumnType)],
411    type_namer: &mut Namer,
412    custom_names: &BTreeMap<CatalogItemId, String>,
413    item_id: Option<CatalogItemId>,
414    options: &SchemaOptions,
415) -> Vec<serde_json::Value> {
416    let mut fields = Vec::new();
417    let mut field_namer = Namer::default();
418    for (name, typ) in columns.iter() {
419        let (name, _seen) = field_namer.valid_name(name);
420        let field_type =
421            build_row_schema_field_type(type_namer, custom_names, typ, item_id, options);
422
423        let mut field = json!({
424            "name": name,
425            "type": field_type,
426        });
427
428        // It's a nullable union if the type is an array and the first option is "null"
429        let is_nullable_union = field_type
430            .as_array()
431            .is_some_and(|array| array.first().is_some_and(|first| first == &json!("null")));
432
433        if options.set_null_defaults && is_nullable_union {
434            field
435                .as_object_mut()
436                .expect("`field` initialized to JSON object above")
437                .insert("default".to_string(), json!(null));
438        }
439
440        if let Some(comment) = item_id.and_then(|item_id| {
441            options.doc_comments.get(&DocTarget::Field {
442                object_id: item_id,
443                column_name: name.into(),
444            })
445        }) {
446            field
447                .as_object_mut()
448                .expect("`field` initialized to JSON object above")
449                .insert("doc".to_string(), json!(comment));
450        }
451
452        fields.push(field);
453    }
454    fields
455}
456
457#[derive(Default, Clone, Debug)]
458/// Struct to pass around options to create the json schema
459pub struct SchemaOptions {
460    /// Boolean flag to enable null defaults.
461    pub set_null_defaults: bool,
462    /// Map containing comments for an item or field, used to populate
463    /// documentation in the generated avro schema
464    pub doc_comments: BTreeMap<DocTarget, String>,
465}
466
467/// Builds the JSON for the row schema, which can be independently useful.
468pub fn build_row_schema_json(
469    columns: &[(ColumnName, ColumnType)],
470    name: &str,
471    custom_names: &BTreeMap<CatalogItemId, String>,
472    item_id: Option<CatalogItemId>,
473    options: &SchemaOptions,
474) -> Result<serde_json::Value, anyhow::Error> {
475    let fields = build_row_schema_fields(
476        columns,
477        &mut Namer::default(),
478        custom_names,
479        item_id,
480        options,
481    );
482
483    let _ = mz_avro::schema::Name::parse_simple(name)?;
484    if let Some(comment) =
485        item_id.and_then(|item_id| options.doc_comments.get(&DocTarget::Type(item_id)))
486    {
487        Ok(json!({
488            "type": "record",
489            "doc": comment,
490            "fields": fields,
491            "name": name
492        }))
493    } else {
494        Ok(json!({
495            "type": "record",
496            "fields": fields,
497            "name": name
498        }))
499    }
500}
501
502/// Naming helper for use when constructing an Avro schema.
503#[derive(Default)]
504struct Namer {
505    record_index: usize,
506    seen_interval: bool,
507    seen_unsigneds: BTreeSet<usize>,
508    seen_names: BTreeMap<String, String>,
509    valid_names_count: BTreeMap<String, usize>,
510}
511
512impl Namer {
513    /// Returns the schema for an interval type.
514    fn interval_type(&mut self) -> serde_json::Value {
515        let name = format!("{AVRO_NAMESPACE}.interval");
516        if self.seen_interval {
517            json!(name)
518        } else {
519            self.seen_interval = true;
520            json!({
521            "type": "fixed",
522            "size": 16,
523            "name": name,
524            })
525        }
526    }
527
528    /// Returns the schema for an unsigned integer with the given width.
529    fn unsigned_type(&mut self, width: usize) -> serde_json::Value {
530        let name = format!("{AVRO_NAMESPACE}.uint{width}");
531        if self.seen_unsigneds.contains(&width) {
532            json!(name)
533        } else {
534            self.seen_unsigneds.insert(width);
535            json!({
536                "type": "fixed",
537                "size": width,
538                "name": name,
539            })
540        }
541    }
542
543    /// Returns a name to use for a new anonymous record.
544    fn anonymous_record_name(&mut self) -> String {
545        let out = format!("{AVRO_NAMESPACE}.record{}", self.record_index);
546        self.record_index += 1;
547        out
548    }
549
550    /// Turns `name` into a valid, unique name for use in the Avro schema.
551    ///
552    /// Returns the valid name and whether `name` has been seen before.
553    fn valid_name(&mut self, name: &str) -> (String, bool) {
554        if let Some(valid_name) = self.seen_names.get(name) {
555            (valid_name.into(), true)
556        } else {
557            let mut valid_name = mz_avro::schema::Name::make_valid(name);
558            let valid_name_count = self
559                .valid_names_count
560                .entry(valid_name.clone())
561                .or_default();
562            if *valid_name_count != 0 {
563                valid_name += &valid_name_count.to_string();
564            }
565            *valid_name_count += 1;
566            self.seen_names.insert(name.into(), valid_name.clone());
567            (valid_name, false)
568        }
569    }
570}