1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt;
12
13use itertools::Itertools;
14use mz_repr::adt::array::ArrayDimension;
15use mz_repr::adt::char;
16use mz_repr::adt::jsonb::JsonbRef;
17use mz_repr::adt::numeric::{NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION};
18use mz_repr::{CatalogItemId, ColumnName, Datum, RelationDesc, SqlColumnType, SqlScalarType};
19use serde_json::{Map, json};
20
21use crate::avro::DocTarget;
22use crate::encode::{Encode, TypedDatum, column_names_and_types};
23use crate::envelopes;
24
25const AVRO_NAMESPACE: &str = "com.materialize.sink";
26const MICROS_PER_MILLIS: u32 = 1_000;
27
28pub struct JsonEncoder {
30 columns: Vec<(ColumnName, SqlColumnType)>,
31}
32
33impl JsonEncoder {
34 pub fn new(desc: RelationDesc, debezium: bool) -> Self {
35 let mut columns = column_names_and_types(desc);
36 if debezium {
37 columns = envelopes::dbz_envelope(columns);
38 };
39 JsonEncoder { columns }
40 }
41}
42
43impl Encode for JsonEncoder {
44 fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
45 let value = encode_datums_as_json(row.iter(), self.columns.as_ref());
46 value.to_string().into_bytes()
47 }
48}
49
50impl fmt::Debug for JsonEncoder {
51 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
52 f.debug_struct("JsonEncoder")
53 .field(
54 "schema",
55 &format!(
56 "{:?}",
57 build_row_schema_json(
58 &self.columns,
59 "schema",
60 &BTreeMap::new(),
61 None,
62 &Default::default(),
63 )
64 ),
65 )
66 .finish()
67 }
68}
69
70pub fn encode_datums_as_json<'a, I>(
72 datums: I,
73 names_types: &[(ColumnName, SqlColumnType)],
74) -> serde_json::Value
75where
76 I: IntoIterator<Item = Datum<'a>>,
77{
78 let value_fields = datums
79 .into_iter()
80 .zip_eq(names_types)
81 .map(|(datum, (name, typ))| {
82 (
83 name.to_string(),
84 TypedDatum::new(datum, typ).json(&JsonNumberPolicy::KeepAsNumber),
85 )
86 })
87 .collect();
88 serde_json::Value::Object(value_fields)
89}
90
91#[derive(Debug)]
93pub enum JsonNumberPolicy {
94 KeepAsNumber,
96 ConvertNumberToString,
99}
100
101pub trait ToJson {
102 fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value;
104}
105
106impl ToJson for TypedDatum<'_> {
107 fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value {
108 let TypedDatum { datum, typ } = self;
109 if typ.nullable && datum.is_null() {
110 return serde_json::Value::Null;
111 }
112 let value = match &typ.scalar_type {
113 SqlScalarType::AclItem => json!(datum.unwrap_acl_item().to_string()),
114 SqlScalarType::Bool => json!(datum.unwrap_bool()),
115 SqlScalarType::PgLegacyChar => json!(datum.unwrap_uint8()),
116 SqlScalarType::Int16 => json!(datum.unwrap_int16()),
117 SqlScalarType::Int32 => json!(datum.unwrap_int32()),
118 SqlScalarType::Int64 => json!(datum.unwrap_int64()),
119 SqlScalarType::UInt16 => json!(datum.unwrap_uint16()),
120 SqlScalarType::UInt32
121 | SqlScalarType::Oid
122 | SqlScalarType::RegClass
123 | SqlScalarType::RegProc
124 | SqlScalarType::RegType => {
125 json!(datum.unwrap_uint32())
126 }
127 SqlScalarType::UInt64 => json!(datum.unwrap_uint64()),
128 SqlScalarType::Float32 => json!(datum.unwrap_float32()),
129 SqlScalarType::Float64 => json!(datum.unwrap_float64()),
130 SqlScalarType::Numeric { .. } => {
131 json!(datum.unwrap_numeric().0.to_standard_notation_string())
132 }
133 SqlScalarType::Date => serde_json::Value::String(format!("{}", datum.unwrap_date())),
135 SqlScalarType::Time => serde_json::Value::String(format!("{:?}", datum.unwrap_time())),
136 SqlScalarType::Timestamp { .. } => {
137 let dt = datum.unwrap_timestamp().to_naive().and_utc();
138 let millis = dt.timestamp_millis();
139 let micros = dt.timestamp_subsec_micros()
140 - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
141 serde_json::Value::String(format!("{millis}.{micros:0>3}"))
142 }
143 SqlScalarType::TimestampTz { .. } => {
144 let dt = datum.unwrap_timestamptz().to_utc();
145 let millis = dt.timestamp_millis();
146 let micros = dt.timestamp_subsec_micros()
147 - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
148 serde_json::Value::String(format!("{millis}.{micros:0>3}"))
149 }
150 SqlScalarType::Interval => {
151 serde_json::Value::String(format!("{}", datum.unwrap_interval()))
152 }
153 SqlScalarType::Bytes => json!(datum.unwrap_bytes()),
154 SqlScalarType::String | SqlScalarType::VarChar { .. } | SqlScalarType::PgLegacyName => {
155 json!(datum.unwrap_str())
156 }
157 SqlScalarType::Char { length } => {
158 let s = char::format_str_pad(datum.unwrap_str(), *length);
159 serde_json::Value::String(s)
160 }
161 SqlScalarType::Jsonb => JsonbRef::from_datum(datum).to_serde_json(),
162 SqlScalarType::Uuid => json!(datum.unwrap_uuid()),
163 ty @ (SqlScalarType::Array(..) | SqlScalarType::Int2Vector) => {
164 let array = datum.unwrap_array();
165 let dims = array.dims().into_iter().collect::<Vec<_>>();
166 let mut datums = array.elements().iter();
167 encode_array(&mut datums, &dims, &mut |datum| {
168 TypedDatum::new(
169 datum,
170 &SqlColumnType {
171 nullable: true,
172 scalar_type: ty.unwrap_collection_element_type().clone(),
173 },
174 )
175 .json(number_policy)
176 })
177 }
178 SqlScalarType::List { element_type, .. } => {
179 let values = datum
180 .unwrap_list()
181 .into_iter()
182 .map(|datum| {
183 TypedDatum::new(
184 datum,
185 &SqlColumnType {
186 nullable: true,
187 scalar_type: (**element_type).clone(),
188 },
189 )
190 .json(number_policy)
191 })
192 .collect();
193 serde_json::Value::Array(values)
194 }
195 SqlScalarType::Record { fields, .. } => {
196 let list = datum.unwrap_list();
197 let fields: Map<String, serde_json::Value> = fields
198 .iter()
199 .zip_eq(&list)
200 .map(|((name, typ), datum)| {
201 let name = name.to_string();
202 let datum = TypedDatum::new(datum, typ);
203 let value = datum.json(number_policy);
204 (name, value)
205 })
206 .collect();
207 fields.into()
208 }
209 SqlScalarType::Map { value_type, .. } => {
210 let map = datum.unwrap_map();
211 let elements = map
212 .into_iter()
213 .map(|(key, datum)| {
214 let value = TypedDatum::new(
215 datum,
216 &SqlColumnType {
217 nullable: true,
218 scalar_type: (**value_type).clone(),
219 },
220 )
221 .json(number_policy);
222 (key.to_string(), value)
223 })
224 .collect();
225 serde_json::Value::Object(elements)
226 }
227 SqlScalarType::MzTimestamp => json!(datum.unwrap_mz_timestamp().to_string()),
228 SqlScalarType::Range { .. } => {
229 json!(datum.unwrap_range().to_string())
233 }
234 SqlScalarType::MzAclItem => json!(datum.unwrap_mz_acl_item().to_string()),
235 };
236 match (number_policy, value) {
239 (JsonNumberPolicy::KeepAsNumber, value) => value,
240 (JsonNumberPolicy::ConvertNumberToString, serde_json::Value::Number(n)) => {
241 serde_json::Value::String(n.to_string())
242 }
243 (JsonNumberPolicy::ConvertNumberToString, value) => value,
244 }
245 }
246}
247
248fn encode_array<'a>(
249 elems: &mut impl Iterator<Item = Datum<'a>>,
250 dims: &[ArrayDimension],
251 elem_encoder: &mut impl FnMut(Datum<'_>) -> serde_json::Value,
252) -> serde_json::Value {
253 serde_json::Value::Array(match dims {
254 [] => vec![],
255 [dim] => elems.take(dim.length).map(elem_encoder).collect(),
256 [dim, rest @ ..] => (0..dim.length)
257 .map(|_| encode_array(elems, rest, elem_encoder))
258 .collect(),
259 })
260}
261
262fn build_row_schema_field_type(
263 type_namer: &mut Namer,
264 custom_names: &BTreeMap<CatalogItemId, String>,
265 typ: &SqlColumnType,
266 item_id: Option<CatalogItemId>,
267 options: &SchemaOptions,
268) -> serde_json::Value {
269 let mut field_type = match &typ.scalar_type {
270 SqlScalarType::AclItem => json!("string"),
271 SqlScalarType::Bool => json!("boolean"),
272 SqlScalarType::PgLegacyChar => json!({
273 "type": "fixed",
274 "size": 1,
275 }),
276 SqlScalarType::Int16 | SqlScalarType::Int32 => {
277 json!("int")
278 }
279 SqlScalarType::Int64 => json!("long"),
280 SqlScalarType::UInt16 => type_namer.unsigned_type(2),
281 SqlScalarType::UInt32
282 | SqlScalarType::Oid
283 | SqlScalarType::RegClass
284 | SqlScalarType::RegProc
285 | SqlScalarType::RegType => type_namer.unsigned_type(4),
286 SqlScalarType::UInt64 => type_namer.unsigned_type(8),
287 SqlScalarType::Float32 => json!("float"),
288 SqlScalarType::Float64 => json!("double"),
289 SqlScalarType::Date => json!({
290 "type": "int",
291 "logicalType": "date",
292 }),
293 SqlScalarType::Time => json!({
294 "type": "long",
295 "logicalType": "time-micros",
296 }),
297 SqlScalarType::Timestamp { precision } | SqlScalarType::TimestampTz { precision } => {
298 json!({
299 "type": "long",
300 "logicalType": match precision {
301 Some(precision) if precision.into_u8() <= 3 => "timestamp-millis",
302 _ => "timestamp-micros",
303 },
304 })
305 }
306 SqlScalarType::Interval => type_namer.interval_type(),
307 SqlScalarType::Bytes => json!("bytes"),
308 SqlScalarType::String
309 | SqlScalarType::Char { .. }
310 | SqlScalarType::VarChar { .. }
311 | SqlScalarType::PgLegacyName => {
312 json!("string")
313 }
314 SqlScalarType::Jsonb => json!({
315 "type": "string",
316 "connect.name": "io.debezium.data.Json",
317 }),
318 SqlScalarType::Uuid => json!({
319 "type": "string",
320 "logicalType": "uuid",
321 }),
322 ty
323 @ (SqlScalarType::Array(..) | SqlScalarType::Int2Vector | SqlScalarType::List { .. }) => {
324 let inner = build_row_schema_field_type(
325 type_namer,
326 custom_names,
327 &SqlColumnType {
328 nullable: true,
329 scalar_type: ty.unwrap_collection_element_type().clone(),
330 },
331 item_id,
332 options,
333 );
334 json!({
335 "type": "array",
336 "items": inner
337 })
338 }
339 SqlScalarType::Map { value_type, .. } => {
340 let inner = build_row_schema_field_type(
341 type_namer,
342 custom_names,
343 &SqlColumnType {
344 nullable: true,
345 scalar_type: (**value_type).clone(),
346 },
347 item_id,
348 options,
349 );
350 json!({
351 "type": "map",
352 "values": inner
353 })
354 }
355 SqlScalarType::Record {
356 fields, custom_id, ..
357 } => {
358 let (name, name_seen) = match custom_id.as_ref().and_then(|id| custom_names.get(id)) {
359 Some(name) => type_namer.valid_name(name),
360 None => (type_namer.anonymous_record_name(), false),
361 };
362 if name_seen {
363 json!(name)
364 } else {
365 let fields = fields.to_vec();
366 let json_fields =
367 build_row_schema_fields(&fields, type_namer, custom_names, *custom_id, options);
368 if let Some(comment) =
369 custom_id.and_then(|id| options.doc_comments.get(&DocTarget::Type(id)))
370 {
371 json!({
372 "type": "record",
373 "name": name,
374 "doc": comment,
375 "fields": json_fields
376 })
377 } else {
378 json!({
379 "type": "record",
380 "name": name,
381 "fields": json_fields
382 })
383 }
384 }
385 }
386 SqlScalarType::Numeric { max_scale } => {
387 let (p, s) = match max_scale {
388 Some(max_scale) => (NUMERIC_DATUM_MAX_PRECISION, max_scale.into_u8()),
389 None => (NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION),
390 };
391 json!({
392 "type": "bytes",
393 "logicalType": "decimal",
394 "precision": p,
395 "scale": s,
396 })
397 }
398 SqlScalarType::MzTimestamp => json!("string"),
399 SqlScalarType::Range { .. } => json!("string"),
401 SqlScalarType::MzAclItem => json!("string"),
402 };
403 if typ.nullable {
404 field_type = json!(["null", field_type]);
408 }
409 field_type
410}
411
412fn build_row_schema_fields(
413 columns: &[(ColumnName, SqlColumnType)],
414 type_namer: &mut Namer,
415 custom_names: &BTreeMap<CatalogItemId, String>,
416 item_id: Option<CatalogItemId>,
417 options: &SchemaOptions,
418) -> Vec<serde_json::Value> {
419 let mut fields = Vec::new();
420 let mut field_namer = Namer::default();
421 for (name, typ) in columns.iter() {
422 let (name, _seen) = field_namer.valid_name(name);
423 let field_type =
424 build_row_schema_field_type(type_namer, custom_names, typ, item_id, options);
425
426 let mut field = json!({
427 "name": name,
428 "type": field_type,
429 });
430
431 let is_nullable_union = field_type
433 .as_array()
434 .is_some_and(|array| array.first().is_some_and(|first| first == &json!("null")));
435
436 if options.set_null_defaults && is_nullable_union {
437 field
438 .as_object_mut()
439 .expect("`field` initialized to JSON object above")
440 .insert("default".to_string(), json!(null));
441 }
442
443 if let Some(comment) = item_id.and_then(|item_id| {
444 options.doc_comments.get(&DocTarget::Field {
445 object_id: item_id,
446 column_name: name.into(),
447 })
448 }) {
449 field
450 .as_object_mut()
451 .expect("`field` initialized to JSON object above")
452 .insert("doc".to_string(), json!(comment));
453 }
454
455 fields.push(field);
456 }
457 fields
458}
459
460#[derive(Default, Clone, Debug)]
461pub struct SchemaOptions {
463 pub set_null_defaults: bool,
465 pub doc_comments: BTreeMap<DocTarget, String>,
468}
469
470pub fn build_row_schema_json(
472 columns: &[(ColumnName, SqlColumnType)],
473 name: &str,
474 custom_names: &BTreeMap<CatalogItemId, String>,
475 item_id: Option<CatalogItemId>,
476 options: &SchemaOptions,
477) -> Result<serde_json::Value, anyhow::Error> {
478 let fields = build_row_schema_fields(
479 columns,
480 &mut Namer::default(),
481 custom_names,
482 item_id,
483 options,
484 );
485
486 let _ = mz_avro::schema::Name::parse_simple(name)?;
487 if let Some(comment) =
488 item_id.and_then(|item_id| options.doc_comments.get(&DocTarget::Type(item_id)))
489 {
490 Ok(json!({
491 "type": "record",
492 "doc": comment,
493 "fields": fields,
494 "name": name
495 }))
496 } else {
497 Ok(json!({
498 "type": "record",
499 "fields": fields,
500 "name": name
501 }))
502 }
503}
504
505#[derive(Default)]
507struct Namer {
508 record_index: usize,
509 seen_interval: bool,
510 seen_unsigneds: BTreeSet<usize>,
511 seen_names: BTreeMap<String, String>,
512 valid_names_count: BTreeMap<String, usize>,
513}
514
515impl Namer {
516 fn interval_type(&mut self) -> serde_json::Value {
518 let name = format!("{AVRO_NAMESPACE}.interval");
519 if self.seen_interval {
520 json!(name)
521 } else {
522 self.seen_interval = true;
523 json!({
524 "type": "fixed",
525 "size": 16,
526 "name": name,
527 })
528 }
529 }
530
531 fn unsigned_type(&mut self, width: usize) -> serde_json::Value {
533 let name = format!("{AVRO_NAMESPACE}.uint{width}");
534 if self.seen_unsigneds.contains(&width) {
535 json!(name)
536 } else {
537 self.seen_unsigneds.insert(width);
538 json!({
539 "type": "fixed",
540 "size": width,
541 "name": name,
542 })
543 }
544 }
545
546 fn anonymous_record_name(&mut self) -> String {
548 let out = format!("{AVRO_NAMESPACE}.record{}", self.record_index);
549 self.record_index += 1;
550 out
551 }
552
553 fn valid_name(&mut self, name: &str) -> (String, bool) {
557 if let Some(valid_name) = self.seen_names.get(name) {
558 (valid_name.into(), true)
559 } else {
560 let mut valid_name = mz_avro::schema::Name::make_valid(name);
561 let valid_name_count = self
562 .valid_names_count
563 .entry(valid_name.clone())
564 .or_default();
565 if *valid_name_count != 0 {
566 valid_name += &valid_name_count.to_string();
567 }
568 *valid_name_count += 1;
569 self.seen_names.insert(name.into(), valid_name.clone());
570 (valid_name, false)
571 }
572 }
573}