mz_interchange/
json.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt;
12
13use mz_repr::adt::array::ArrayDimension;
14use mz_repr::adt::char;
15use mz_repr::adt::jsonb::JsonbRef;
16use mz_repr::adt::numeric::{NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION};
17use mz_repr::{CatalogItemId, ColumnName, ColumnType, Datum, RelationDesc, ScalarType};
18use serde_json::{Map, json};
19
20use crate::avro::DocTarget;
21use crate::encode::{Encode, TypedDatum, column_names_and_types};
22use crate::envelopes;
23
24const AVRO_NAMESPACE: &str = "com.materialize.sink";
25const MICROS_PER_MILLIS: u32 = 1_000;
26
27// Manages encoding of JSON-encoded bytes
28pub struct JsonEncoder {
29    columns: Vec<(ColumnName, ColumnType)>,
30}
31
32impl JsonEncoder {
33    pub fn new(desc: RelationDesc, debezium: bool) -> Self {
34        let mut columns = column_names_and_types(desc);
35        if debezium {
36            columns = envelopes::dbz_envelope(columns);
37        };
38        JsonEncoder { columns }
39    }
40}
41
42impl Encode for JsonEncoder {
43    fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
44        let value = encode_datums_as_json(row.iter(), self.columns.as_ref());
45        value.to_string().into_bytes()
46    }
47}
48
49impl fmt::Debug for JsonEncoder {
50    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
51        f.debug_struct("JsonEncoder")
52            .field(
53                "schema",
54                &format!(
55                    "{:?}",
56                    build_row_schema_json(
57                        &self.columns,
58                        "schema",
59                        &BTreeMap::new(),
60                        None,
61                        &Default::default(),
62                    )
63                ),
64            )
65            .finish()
66    }
67}
68
69/// Encodes a sequence of `Datum` as JSON, using supplied column names and types.
70pub fn encode_datums_as_json<'a, I>(
71    datums: I,
72    names_types: &[(ColumnName, ColumnType)],
73) -> serde_json::Value
74where
75    I: IntoIterator<Item = Datum<'a>>,
76{
77    let value_fields = datums
78        .into_iter()
79        .zip(names_types)
80        .map(|(datum, (name, typ))| {
81            (
82                name.to_string(),
83                TypedDatum::new(datum, typ).json(&JsonNumberPolicy::KeepAsNumber),
84            )
85        })
86        .collect();
87    serde_json::Value::Object(value_fields)
88}
89
90/// Policies for how to handle Numbers in JSON.
91#[derive(Debug)]
92pub enum JsonNumberPolicy {
93    /// Do not change Numbers.
94    KeepAsNumber,
95    /// Convert Numbers to their String representation. Useful for JavaScript consumers that may
96    /// interpret some numbers incorrectly.
97    ConvertNumberToString,
98}
99
100pub trait ToJson {
101    /// Transforms this value to a JSON value.
102    fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value;
103}
104
105impl ToJson for TypedDatum<'_> {
106    fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value {
107        let TypedDatum { datum, typ } = self;
108        if typ.nullable && datum.is_null() {
109            return serde_json::Value::Null;
110        }
111        let value = match &typ.scalar_type {
112            ScalarType::AclItem => json!(datum.unwrap_acl_item().to_string()),
113            ScalarType::Bool => json!(datum.unwrap_bool()),
114            ScalarType::PgLegacyChar => json!(datum.unwrap_uint8()),
115            ScalarType::Int16 => json!(datum.unwrap_int16()),
116            ScalarType::Int32 => json!(datum.unwrap_int32()),
117            ScalarType::Int64 => json!(datum.unwrap_int64()),
118            ScalarType::UInt16 => json!(datum.unwrap_uint16()),
119            ScalarType::UInt32
120            | ScalarType::Oid
121            | ScalarType::RegClass
122            | ScalarType::RegProc
123            | ScalarType::RegType => {
124                json!(datum.unwrap_uint32())
125            }
126            ScalarType::UInt64 => json!(datum.unwrap_uint64()),
127            ScalarType::Float32 => json!(datum.unwrap_float32()),
128            ScalarType::Float64 => json!(datum.unwrap_float64()),
129            ScalarType::Numeric { .. } => {
130                json!(datum.unwrap_numeric().0.to_standard_notation_string())
131            }
132            // https://stackoverflow.com/questions/10286204/what-is-the-right-json-date-format
133            ScalarType::Date => serde_json::Value::String(format!("{}", datum.unwrap_date())),
134            ScalarType::Time => serde_json::Value::String(format!("{:?}", datum.unwrap_time())),
135            ScalarType::Timestamp { .. } => {
136                let dt = datum.unwrap_timestamp().to_naive().and_utc();
137                let millis = dt.timestamp_millis();
138                let micros = dt.timestamp_subsec_micros()
139                    - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
140                serde_json::Value::String(format!("{millis}.{micros:0>3}"))
141            }
142            ScalarType::TimestampTz { .. } => {
143                let dt = datum.unwrap_timestamptz().to_utc();
144                let millis = dt.timestamp_millis();
145                let micros = dt.timestamp_subsec_micros()
146                    - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
147                serde_json::Value::String(format!("{millis}.{micros:0>3}"))
148            }
149            ScalarType::Interval => {
150                serde_json::Value::String(format!("{}", datum.unwrap_interval()))
151            }
152            ScalarType::Bytes => json!(datum.unwrap_bytes()),
153            ScalarType::String | ScalarType::VarChar { .. } | ScalarType::PgLegacyName => {
154                json!(datum.unwrap_str())
155            }
156            ScalarType::Char { length } => {
157                let s = char::format_str_pad(datum.unwrap_str(), *length);
158                serde_json::Value::String(s)
159            }
160            ScalarType::Jsonb => JsonbRef::from_datum(datum).to_serde_json(),
161            ScalarType::Uuid => json!(datum.unwrap_uuid()),
162            ty @ (ScalarType::Array(..) | ScalarType::Int2Vector) => {
163                let array = datum.unwrap_array();
164                let dims = array.dims().into_iter().collect::<Vec<_>>();
165                let mut datums = array.elements().iter();
166                encode_array(&mut datums, &dims, &mut |datum| {
167                    TypedDatum::new(
168                        datum,
169                        &ColumnType {
170                            nullable: true,
171                            scalar_type: ty.unwrap_collection_element_type().clone(),
172                        },
173                    )
174                    .json(number_policy)
175                })
176            }
177            ScalarType::List { element_type, .. } => {
178                let values = datum
179                    .unwrap_list()
180                    .into_iter()
181                    .map(|datum| {
182                        TypedDatum::new(
183                            datum,
184                            &ColumnType {
185                                nullable: true,
186                                scalar_type: (**element_type).clone(),
187                            },
188                        )
189                        .json(number_policy)
190                    })
191                    .collect();
192                serde_json::Value::Array(values)
193            }
194            ScalarType::Record { fields, .. } => {
195                let list = datum.unwrap_list();
196                let fields: Map<String, serde_json::Value> = fields
197                    .iter()
198                    .zip(&list)
199                    .map(|((name, typ), datum)| {
200                        let name = name.to_string();
201                        let datum = TypedDatum::new(datum, typ);
202                        let value = datum.json(number_policy);
203                        (name, value)
204                    })
205                    .collect();
206                fields.into()
207            }
208            ScalarType::Map { value_type, .. } => {
209                let map = datum.unwrap_map();
210                let elements = map
211                    .into_iter()
212                    .map(|(key, datum)| {
213                        let value = TypedDatum::new(
214                            datum,
215                            &ColumnType {
216                                nullable: true,
217                                scalar_type: (**value_type).clone(),
218                            },
219                        )
220                        .json(number_policy);
221                        (key.to_string(), value)
222                    })
223                    .collect();
224                serde_json::Value::Object(elements)
225            }
226            ScalarType::MzTimestamp => json!(datum.unwrap_mz_timestamp().to_string()),
227            ScalarType::Range { .. } => {
228                // Ranges' interiors are not expected to be types whose
229                // string representations are misleading/wrong, e.g.
230                // records.
231                json!(datum.unwrap_range().to_string())
232            }
233            ScalarType::MzAclItem => json!(datum.unwrap_mz_acl_item().to_string()),
234        };
235        // We don't need to recurse into map or object here because those already recursively call
236        // .json() with the number policy to generate the member Values.
237        match (number_policy, value) {
238            (JsonNumberPolicy::KeepAsNumber, value) => value,
239            (JsonNumberPolicy::ConvertNumberToString, serde_json::Value::Number(n)) => {
240                serde_json::Value::String(n.to_string())
241            }
242            (JsonNumberPolicy::ConvertNumberToString, value) => value,
243        }
244    }
245}
246
247fn encode_array<'a>(
248    elems: &mut impl Iterator<Item = Datum<'a>>,
249    dims: &[ArrayDimension],
250    elem_encoder: &mut impl FnMut(Datum<'_>) -> serde_json::Value,
251) -> serde_json::Value {
252    serde_json::Value::Array(match dims {
253        [] => vec![],
254        [dim] => elems.take(dim.length).map(elem_encoder).collect(),
255        [dim, rest @ ..] => (0..dim.length)
256            .map(|_| encode_array(elems, rest, elem_encoder))
257            .collect(),
258    })
259}
260
261fn build_row_schema_field_type(
262    type_namer: &mut Namer,
263    custom_names: &BTreeMap<CatalogItemId, String>,
264    typ: &ColumnType,
265    item_id: Option<CatalogItemId>,
266    options: &SchemaOptions,
267) -> serde_json::Value {
268    let mut field_type = match &typ.scalar_type {
269        ScalarType::AclItem => json!("string"),
270        ScalarType::Bool => json!("boolean"),
271        ScalarType::PgLegacyChar => json!({
272            "type": "fixed",
273            "size": 1,
274        }),
275        ScalarType::Int16 | ScalarType::Int32 => {
276            json!("int")
277        }
278        ScalarType::Int64 => json!("long"),
279        ScalarType::UInt16 => type_namer.unsigned_type(2),
280        ScalarType::UInt32
281        | ScalarType::Oid
282        | ScalarType::RegClass
283        | ScalarType::RegProc
284        | ScalarType::RegType => type_namer.unsigned_type(4),
285        ScalarType::UInt64 => type_namer.unsigned_type(8),
286        ScalarType::Float32 => json!("float"),
287        ScalarType::Float64 => json!("double"),
288        ScalarType::Date => json!({
289            "type": "int",
290            "logicalType": "date",
291        }),
292        ScalarType::Time => json!({
293            "type": "long",
294            "logicalType": "time-micros",
295        }),
296        ScalarType::Timestamp { precision } | ScalarType::TimestampTz { precision } => json!({
297            "type": "long",
298            "logicalType": match precision {
299                Some(precision) if precision.into_u8() <= 3 => "timestamp-millis",
300                _ => "timestamp-micros",
301            },
302        }),
303        ScalarType::Interval => type_namer.interval_type(),
304        ScalarType::Bytes => json!("bytes"),
305        ScalarType::String
306        | ScalarType::Char { .. }
307        | ScalarType::VarChar { .. }
308        | ScalarType::PgLegacyName => {
309            json!("string")
310        }
311        ScalarType::Jsonb => json!({
312            "type": "string",
313            "connect.name": "io.debezium.data.Json",
314        }),
315        ScalarType::Uuid => json!({
316            "type": "string",
317            "logicalType": "uuid",
318        }),
319        ty @ (ScalarType::Array(..) | ScalarType::Int2Vector | ScalarType::List { .. }) => {
320            let inner = build_row_schema_field_type(
321                type_namer,
322                custom_names,
323                &ColumnType {
324                    nullable: true,
325                    scalar_type: ty.unwrap_collection_element_type().clone(),
326                },
327                item_id,
328                options,
329            );
330            json!({
331                "type": "array",
332                "items": inner
333            })
334        }
335        ScalarType::Map { value_type, .. } => {
336            let inner = build_row_schema_field_type(
337                type_namer,
338                custom_names,
339                &ColumnType {
340                    nullable: true,
341                    scalar_type: (**value_type).clone(),
342                },
343                item_id,
344                options,
345            );
346            json!({
347                "type": "map",
348                "values": inner
349            })
350        }
351        ScalarType::Record {
352            fields, custom_id, ..
353        } => {
354            let (name, name_seen) = match custom_id.as_ref().and_then(|id| custom_names.get(id)) {
355                Some(name) => type_namer.valid_name(name),
356                None => (type_namer.anonymous_record_name(), false),
357            };
358            if name_seen {
359                json!(name)
360            } else {
361                let fields = fields.to_vec();
362                let json_fields =
363                    build_row_schema_fields(&fields, type_namer, custom_names, *custom_id, options);
364                if let Some(comment) =
365                    custom_id.and_then(|id| options.doc_comments.get(&DocTarget::Type(id)))
366                {
367                    json!({
368                        "type": "record",
369                        "name": name,
370                        "doc": comment,
371                        "fields": json_fields
372                    })
373                } else {
374                    json!({
375                        "type": "record",
376                        "name": name,
377                        "fields": json_fields
378                    })
379                }
380            }
381        }
382        ScalarType::Numeric { max_scale } => {
383            let (p, s) = match max_scale {
384                Some(max_scale) => (NUMERIC_DATUM_MAX_PRECISION, max_scale.into_u8()),
385                None => (NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION),
386            };
387            json!({
388                "type": "bytes",
389                "logicalType": "decimal",
390                "precision": p,
391                "scale": s,
392            })
393        }
394        ScalarType::MzTimestamp => json!("string"),
395        // https://debezium.io/documentation/reference/stable/connectors/postgresql.html
396        ScalarType::Range { .. } => json!("string"),
397        ScalarType::MzAclItem => json!("string"),
398    };
399    if typ.nullable {
400        // Should be revisited if we ever support a different kind of union scheme.
401        // Currently adding the "null" at the beginning means we can set the default
402        // value to "null" if such a preference is set.
403        field_type = json!(["null", field_type]);
404    }
405    field_type
406}
407
408fn build_row_schema_fields(
409    columns: &[(ColumnName, ColumnType)],
410    type_namer: &mut Namer,
411    custom_names: &BTreeMap<CatalogItemId, String>,
412    item_id: Option<CatalogItemId>,
413    options: &SchemaOptions,
414) -> Vec<serde_json::Value> {
415    let mut fields = Vec::new();
416    let mut field_namer = Namer::default();
417    for (name, typ) in columns.iter() {
418        let (name, _seen) = field_namer.valid_name(name.as_str());
419        let field_type =
420            build_row_schema_field_type(type_namer, custom_names, typ, item_id, options);
421
422        let mut field = json!({
423            "name": name,
424            "type": field_type,
425        });
426
427        // It's a nullable union if the type is an array and the first option is "null"
428        let is_nullable_union = field_type
429            .as_array()
430            .is_some_and(|array| array.first().is_some_and(|first| first == &json!("null")));
431
432        if options.set_null_defaults && is_nullable_union {
433            field
434                .as_object_mut()
435                .expect("`field` initialized to JSON object above")
436                .insert("default".to_string(), json!(null));
437        }
438
439        if let Some(comment) = item_id.and_then(|item_id| {
440            options.doc_comments.get(&DocTarget::Field {
441                object_id: item_id,
442                column_name: name.into(),
443            })
444        }) {
445            field
446                .as_object_mut()
447                .expect("`field` initialized to JSON object above")
448                .insert("doc".to_string(), json!(comment));
449        }
450
451        fields.push(field);
452    }
453    fields
454}
455
456#[derive(Default, Clone, Debug)]
457/// Struct to pass around options to create the json schema
458pub struct SchemaOptions {
459    /// Boolean flag to enable null defaults.
460    pub set_null_defaults: bool,
461    /// Map containing comments for an item or field, used to populate
462    /// documentation in the generated avro schema
463    pub doc_comments: BTreeMap<DocTarget, String>,
464}
465
466/// Builds the JSON for the row schema, which can be independently useful.
467pub fn build_row_schema_json(
468    columns: &[(ColumnName, ColumnType)],
469    name: &str,
470    custom_names: &BTreeMap<CatalogItemId, String>,
471    item_id: Option<CatalogItemId>,
472    options: &SchemaOptions,
473) -> Result<serde_json::Value, anyhow::Error> {
474    let fields = build_row_schema_fields(
475        columns,
476        &mut Namer::default(),
477        custom_names,
478        item_id,
479        options,
480    );
481
482    let _ = mz_avro::schema::Name::parse_simple(name)?;
483    if let Some(comment) =
484        item_id.and_then(|item_id| options.doc_comments.get(&DocTarget::Type(item_id)))
485    {
486        Ok(json!({
487            "type": "record",
488            "doc": comment,
489            "fields": fields,
490            "name": name
491        }))
492    } else {
493        Ok(json!({
494            "type": "record",
495            "fields": fields,
496            "name": name
497        }))
498    }
499}
500
501/// Naming helper for use when constructing an Avro schema.
502#[derive(Default)]
503struct Namer {
504    record_index: usize,
505    seen_interval: bool,
506    seen_unsigneds: BTreeSet<usize>,
507    seen_names: BTreeMap<String, String>,
508    valid_names_count: BTreeMap<String, usize>,
509}
510
511impl Namer {
512    /// Returns the schema for an interval type.
513    fn interval_type(&mut self) -> serde_json::Value {
514        let name = format!("{AVRO_NAMESPACE}.interval");
515        if self.seen_interval {
516            json!(name)
517        } else {
518            self.seen_interval = true;
519            json!({
520            "type": "fixed",
521            "size": 16,
522            "name": name,
523            })
524        }
525    }
526
527    /// Returns the schema for an unsigned integer with the given width.
528    fn unsigned_type(&mut self, width: usize) -> serde_json::Value {
529        let name = format!("{AVRO_NAMESPACE}.uint{width}");
530        if self.seen_unsigneds.contains(&width) {
531            json!(name)
532        } else {
533            self.seen_unsigneds.insert(width);
534            json!({
535                "type": "fixed",
536                "size": width,
537                "name": name,
538            })
539        }
540    }
541
542    /// Returns a name to use for a new anonymous record.
543    fn anonymous_record_name(&mut self) -> String {
544        let out = format!("{AVRO_NAMESPACE}.record{}", self.record_index);
545        self.record_index += 1;
546        out
547    }
548
549    /// Turns `name` into a valid, unique name for use in the Avro schema.
550    ///
551    /// Returns the valid name and whether `name` has been seen before.
552    fn valid_name(&mut self, name: &str) -> (String, bool) {
553        if let Some(valid_name) = self.seen_names.get(name) {
554            (valid_name.into(), true)
555        } else {
556            let mut valid_name = mz_avro::schema::Name::make_valid(name);
557            let valid_name_count = self
558                .valid_names_count
559                .entry(valid_name.clone())
560                .or_default();
561            if *valid_name_count != 0 {
562                valid_name += &valid_name_count.to_string();
563            }
564            *valid_name_count += 1;
565            self.seen_names.insert(name.into(), valid_name.clone());
566            (valid_name, false)
567        }
568    }
569}