1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt;
12
13use itertools::Itertools;
14use mz_repr::adt::array::ArrayDimension;
15use mz_repr::adt::char;
16use mz_repr::adt::jsonb::JsonbRef;
17use mz_repr::adt::numeric::{NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION};
18use mz_repr::{CatalogItemId, ColumnName, ColumnType, Datum, RelationDesc, ScalarType};
19use serde_json::{Map, json};
20
21use crate::avro::DocTarget;
22use crate::encode::{Encode, TypedDatum, column_names_and_types};
23use crate::envelopes;
24
25const AVRO_NAMESPACE: &str = "com.materialize.sink";
26const MICROS_PER_MILLIS: u32 = 1_000;
27
28pub struct JsonEncoder {
30 columns: Vec<(ColumnName, ColumnType)>,
31}
32
33impl JsonEncoder {
34 pub fn new(desc: RelationDesc, debezium: bool) -> Self {
35 let mut columns = column_names_and_types(desc);
36 if debezium {
37 columns = envelopes::dbz_envelope(columns);
38 };
39 JsonEncoder { columns }
40 }
41}
42
43impl Encode for JsonEncoder {
44 fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
45 let value = encode_datums_as_json(row.iter(), self.columns.as_ref());
46 value.to_string().into_bytes()
47 }
48}
49
50impl fmt::Debug for JsonEncoder {
51 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
52 f.debug_struct("JsonEncoder")
53 .field(
54 "schema",
55 &format!(
56 "{:?}",
57 build_row_schema_json(
58 &self.columns,
59 "schema",
60 &BTreeMap::new(),
61 None,
62 &Default::default(),
63 )
64 ),
65 )
66 .finish()
67 }
68}
69
70pub fn encode_datums_as_json<'a, I>(
72 datums: I,
73 names_types: &[(ColumnName, ColumnType)],
74) -> serde_json::Value
75where
76 I: IntoIterator<Item = Datum<'a>>,
77{
78 let value_fields = datums
79 .into_iter()
80 .zip_eq(names_types)
81 .map(|(datum, (name, typ))| {
82 (
83 name.to_string(),
84 TypedDatum::new(datum, typ).json(&JsonNumberPolicy::KeepAsNumber),
85 )
86 })
87 .collect();
88 serde_json::Value::Object(value_fields)
89}
90
91#[derive(Debug)]
93pub enum JsonNumberPolicy {
94 KeepAsNumber,
96 ConvertNumberToString,
99}
100
101pub trait ToJson {
102 fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value;
104}
105
106impl ToJson for TypedDatum<'_> {
107 fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value {
108 let TypedDatum { datum, typ } = self;
109 if typ.nullable && datum.is_null() {
110 return serde_json::Value::Null;
111 }
112 let value = match &typ.scalar_type {
113 ScalarType::AclItem => json!(datum.unwrap_acl_item().to_string()),
114 ScalarType::Bool => json!(datum.unwrap_bool()),
115 ScalarType::PgLegacyChar => json!(datum.unwrap_uint8()),
116 ScalarType::Int16 => json!(datum.unwrap_int16()),
117 ScalarType::Int32 => json!(datum.unwrap_int32()),
118 ScalarType::Int64 => json!(datum.unwrap_int64()),
119 ScalarType::UInt16 => json!(datum.unwrap_uint16()),
120 ScalarType::UInt32
121 | ScalarType::Oid
122 | ScalarType::RegClass
123 | ScalarType::RegProc
124 | ScalarType::RegType => {
125 json!(datum.unwrap_uint32())
126 }
127 ScalarType::UInt64 => json!(datum.unwrap_uint64()),
128 ScalarType::Float32 => json!(datum.unwrap_float32()),
129 ScalarType::Float64 => json!(datum.unwrap_float64()),
130 ScalarType::Numeric { .. } => {
131 json!(datum.unwrap_numeric().0.to_standard_notation_string())
132 }
133 ScalarType::Date => serde_json::Value::String(format!("{}", datum.unwrap_date())),
135 ScalarType::Time => serde_json::Value::String(format!("{:?}", datum.unwrap_time())),
136 ScalarType::Timestamp { .. } => {
137 let dt = datum.unwrap_timestamp().to_naive().and_utc();
138 let millis = dt.timestamp_millis();
139 let micros = dt.timestamp_subsec_micros()
140 - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
141 serde_json::Value::String(format!("{millis}.{micros:0>3}"))
142 }
143 ScalarType::TimestampTz { .. } => {
144 let dt = datum.unwrap_timestamptz().to_utc();
145 let millis = dt.timestamp_millis();
146 let micros = dt.timestamp_subsec_micros()
147 - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
148 serde_json::Value::String(format!("{millis}.{micros:0>3}"))
149 }
150 ScalarType::Interval => {
151 serde_json::Value::String(format!("{}", datum.unwrap_interval()))
152 }
153 ScalarType::Bytes => json!(datum.unwrap_bytes()),
154 ScalarType::String | ScalarType::VarChar { .. } | ScalarType::PgLegacyName => {
155 json!(datum.unwrap_str())
156 }
157 ScalarType::Char { length } => {
158 let s = char::format_str_pad(datum.unwrap_str(), *length);
159 serde_json::Value::String(s)
160 }
161 ScalarType::Jsonb => JsonbRef::from_datum(datum).to_serde_json(),
162 ScalarType::Uuid => json!(datum.unwrap_uuid()),
163 ty @ (ScalarType::Array(..) | ScalarType::Int2Vector) => {
164 let array = datum.unwrap_array();
165 let dims = array.dims().into_iter().collect::<Vec<_>>();
166 let mut datums = array.elements().iter();
167 encode_array(&mut datums, &dims, &mut |datum| {
168 TypedDatum::new(
169 datum,
170 &ColumnType {
171 nullable: true,
172 scalar_type: ty.unwrap_collection_element_type().clone(),
173 },
174 )
175 .json(number_policy)
176 })
177 }
178 ScalarType::List { element_type, .. } => {
179 let values = datum
180 .unwrap_list()
181 .into_iter()
182 .map(|datum| {
183 TypedDatum::new(
184 datum,
185 &ColumnType {
186 nullable: true,
187 scalar_type: (**element_type).clone(),
188 },
189 )
190 .json(number_policy)
191 })
192 .collect();
193 serde_json::Value::Array(values)
194 }
195 ScalarType::Record { fields, .. } => {
196 let list = datum.unwrap_list();
197 let fields: Map<String, serde_json::Value> = fields
198 .iter()
199 .zip_eq(&list)
200 .map(|((name, typ), datum)| {
201 let name = name.to_string();
202 let datum = TypedDatum::new(datum, typ);
203 let value = datum.json(number_policy);
204 (name, value)
205 })
206 .collect();
207 fields.into()
208 }
209 ScalarType::Map { value_type, .. } => {
210 let map = datum.unwrap_map();
211 let elements = map
212 .into_iter()
213 .map(|(key, datum)| {
214 let value = TypedDatum::new(
215 datum,
216 &ColumnType {
217 nullable: true,
218 scalar_type: (**value_type).clone(),
219 },
220 )
221 .json(number_policy);
222 (key.to_string(), value)
223 })
224 .collect();
225 serde_json::Value::Object(elements)
226 }
227 ScalarType::MzTimestamp => json!(datum.unwrap_mz_timestamp().to_string()),
228 ScalarType::Range { .. } => {
229 json!(datum.unwrap_range().to_string())
233 }
234 ScalarType::MzAclItem => json!(datum.unwrap_mz_acl_item().to_string()),
235 };
236 match (number_policy, value) {
239 (JsonNumberPolicy::KeepAsNumber, value) => value,
240 (JsonNumberPolicy::ConvertNumberToString, serde_json::Value::Number(n)) => {
241 serde_json::Value::String(n.to_string())
242 }
243 (JsonNumberPolicy::ConvertNumberToString, value) => value,
244 }
245 }
246}
247
248fn encode_array<'a>(
249 elems: &mut impl Iterator<Item = Datum<'a>>,
250 dims: &[ArrayDimension],
251 elem_encoder: &mut impl FnMut(Datum<'_>) -> serde_json::Value,
252) -> serde_json::Value {
253 serde_json::Value::Array(match dims {
254 [] => vec![],
255 [dim] => elems.take(dim.length).map(elem_encoder).collect(),
256 [dim, rest @ ..] => (0..dim.length)
257 .map(|_| encode_array(elems, rest, elem_encoder))
258 .collect(),
259 })
260}
261
262fn build_row_schema_field_type(
263 type_namer: &mut Namer,
264 custom_names: &BTreeMap<CatalogItemId, String>,
265 typ: &ColumnType,
266 item_id: Option<CatalogItemId>,
267 options: &SchemaOptions,
268) -> serde_json::Value {
269 let mut field_type = match &typ.scalar_type {
270 ScalarType::AclItem => json!("string"),
271 ScalarType::Bool => json!("boolean"),
272 ScalarType::PgLegacyChar => json!({
273 "type": "fixed",
274 "size": 1,
275 }),
276 ScalarType::Int16 | ScalarType::Int32 => {
277 json!("int")
278 }
279 ScalarType::Int64 => json!("long"),
280 ScalarType::UInt16 => type_namer.unsigned_type(2),
281 ScalarType::UInt32
282 | ScalarType::Oid
283 | ScalarType::RegClass
284 | ScalarType::RegProc
285 | ScalarType::RegType => type_namer.unsigned_type(4),
286 ScalarType::UInt64 => type_namer.unsigned_type(8),
287 ScalarType::Float32 => json!("float"),
288 ScalarType::Float64 => json!("double"),
289 ScalarType::Date => json!({
290 "type": "int",
291 "logicalType": "date",
292 }),
293 ScalarType::Time => json!({
294 "type": "long",
295 "logicalType": "time-micros",
296 }),
297 ScalarType::Timestamp { precision } | ScalarType::TimestampTz { precision } => json!({
298 "type": "long",
299 "logicalType": match precision {
300 Some(precision) if precision.into_u8() <= 3 => "timestamp-millis",
301 _ => "timestamp-micros",
302 },
303 }),
304 ScalarType::Interval => type_namer.interval_type(),
305 ScalarType::Bytes => json!("bytes"),
306 ScalarType::String
307 | ScalarType::Char { .. }
308 | ScalarType::VarChar { .. }
309 | ScalarType::PgLegacyName => {
310 json!("string")
311 }
312 ScalarType::Jsonb => json!({
313 "type": "string",
314 "connect.name": "io.debezium.data.Json",
315 }),
316 ScalarType::Uuid => json!({
317 "type": "string",
318 "logicalType": "uuid",
319 }),
320 ty @ (ScalarType::Array(..) | ScalarType::Int2Vector | ScalarType::List { .. }) => {
321 let inner = build_row_schema_field_type(
322 type_namer,
323 custom_names,
324 &ColumnType {
325 nullable: true,
326 scalar_type: ty.unwrap_collection_element_type().clone(),
327 },
328 item_id,
329 options,
330 );
331 json!({
332 "type": "array",
333 "items": inner
334 })
335 }
336 ScalarType::Map { value_type, .. } => {
337 let inner = build_row_schema_field_type(
338 type_namer,
339 custom_names,
340 &ColumnType {
341 nullable: true,
342 scalar_type: (**value_type).clone(),
343 },
344 item_id,
345 options,
346 );
347 json!({
348 "type": "map",
349 "values": inner
350 })
351 }
352 ScalarType::Record {
353 fields, custom_id, ..
354 } => {
355 let (name, name_seen) = match custom_id.as_ref().and_then(|id| custom_names.get(id)) {
356 Some(name) => type_namer.valid_name(name),
357 None => (type_namer.anonymous_record_name(), false),
358 };
359 if name_seen {
360 json!(name)
361 } else {
362 let fields = fields.to_vec();
363 let json_fields =
364 build_row_schema_fields(&fields, type_namer, custom_names, *custom_id, options);
365 if let Some(comment) =
366 custom_id.and_then(|id| options.doc_comments.get(&DocTarget::Type(id)))
367 {
368 json!({
369 "type": "record",
370 "name": name,
371 "doc": comment,
372 "fields": json_fields
373 })
374 } else {
375 json!({
376 "type": "record",
377 "name": name,
378 "fields": json_fields
379 })
380 }
381 }
382 }
383 ScalarType::Numeric { max_scale } => {
384 let (p, s) = match max_scale {
385 Some(max_scale) => (NUMERIC_DATUM_MAX_PRECISION, max_scale.into_u8()),
386 None => (NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION),
387 };
388 json!({
389 "type": "bytes",
390 "logicalType": "decimal",
391 "precision": p,
392 "scale": s,
393 })
394 }
395 ScalarType::MzTimestamp => json!("string"),
396 ScalarType::Range { .. } => json!("string"),
398 ScalarType::MzAclItem => json!("string"),
399 };
400 if typ.nullable {
401 field_type = json!(["null", field_type]);
405 }
406 field_type
407}
408
409fn build_row_schema_fields(
410 columns: &[(ColumnName, ColumnType)],
411 type_namer: &mut Namer,
412 custom_names: &BTreeMap<CatalogItemId, String>,
413 item_id: Option<CatalogItemId>,
414 options: &SchemaOptions,
415) -> Vec<serde_json::Value> {
416 let mut fields = Vec::new();
417 let mut field_namer = Namer::default();
418 for (name, typ) in columns.iter() {
419 let (name, _seen) = field_namer.valid_name(name);
420 let field_type =
421 build_row_schema_field_type(type_namer, custom_names, typ, item_id, options);
422
423 let mut field = json!({
424 "name": name,
425 "type": field_type,
426 });
427
428 let is_nullable_union = field_type
430 .as_array()
431 .is_some_and(|array| array.first().is_some_and(|first| first == &json!("null")));
432
433 if options.set_null_defaults && is_nullable_union {
434 field
435 .as_object_mut()
436 .expect("`field` initialized to JSON object above")
437 .insert("default".to_string(), json!(null));
438 }
439
440 if let Some(comment) = item_id.and_then(|item_id| {
441 options.doc_comments.get(&DocTarget::Field {
442 object_id: item_id,
443 column_name: name.into(),
444 })
445 }) {
446 field
447 .as_object_mut()
448 .expect("`field` initialized to JSON object above")
449 .insert("doc".to_string(), json!(comment));
450 }
451
452 fields.push(field);
453 }
454 fields
455}
456
457#[derive(Default, Clone, Debug)]
458pub struct SchemaOptions {
460 pub set_null_defaults: bool,
462 pub doc_comments: BTreeMap<DocTarget, String>,
465}
466
467pub fn build_row_schema_json(
469 columns: &[(ColumnName, ColumnType)],
470 name: &str,
471 custom_names: &BTreeMap<CatalogItemId, String>,
472 item_id: Option<CatalogItemId>,
473 options: &SchemaOptions,
474) -> Result<serde_json::Value, anyhow::Error> {
475 let fields = build_row_schema_fields(
476 columns,
477 &mut Namer::default(),
478 custom_names,
479 item_id,
480 options,
481 );
482
483 let _ = mz_avro::schema::Name::parse_simple(name)?;
484 if let Some(comment) =
485 item_id.and_then(|item_id| options.doc_comments.get(&DocTarget::Type(item_id)))
486 {
487 Ok(json!({
488 "type": "record",
489 "doc": comment,
490 "fields": fields,
491 "name": name
492 }))
493 } else {
494 Ok(json!({
495 "type": "record",
496 "fields": fields,
497 "name": name
498 }))
499 }
500}
501
502#[derive(Default)]
504struct Namer {
505 record_index: usize,
506 seen_interval: bool,
507 seen_unsigneds: BTreeSet<usize>,
508 seen_names: BTreeMap<String, String>,
509 valid_names_count: BTreeMap<String, usize>,
510}
511
512impl Namer {
513 fn interval_type(&mut self) -> serde_json::Value {
515 let name = format!("{AVRO_NAMESPACE}.interval");
516 if self.seen_interval {
517 json!(name)
518 } else {
519 self.seen_interval = true;
520 json!({
521 "type": "fixed",
522 "size": 16,
523 "name": name,
524 })
525 }
526 }
527
528 fn unsigned_type(&mut self, width: usize) -> serde_json::Value {
530 let name = format!("{AVRO_NAMESPACE}.uint{width}");
531 if self.seen_unsigneds.contains(&width) {
532 json!(name)
533 } else {
534 self.seen_unsigneds.insert(width);
535 json!({
536 "type": "fixed",
537 "size": width,
538 "name": name,
539 })
540 }
541 }
542
543 fn anonymous_record_name(&mut self) -> String {
545 let out = format!("{AVRO_NAMESPACE}.record{}", self.record_index);
546 self.record_index += 1;
547 out
548 }
549
550 fn valid_name(&mut self, name: &str) -> (String, bool) {
554 if let Some(valid_name) = self.seen_names.get(name) {
555 (valid_name.into(), true)
556 } else {
557 let mut valid_name = mz_avro::schema::Name::make_valid(name);
558 let valid_name_count = self
559 .valid_names_count
560 .entry(valid_name.clone())
561 .or_default();
562 if *valid_name_count != 0 {
563 valid_name += &valid_name_count.to_string();
564 }
565 *valid_name_count += 1;
566 self.seen_names.insert(name.into(), valid_name.clone());
567 (valid_name, false)
568 }
569 }
570}