1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt;
12
13use mz_repr::adt::array::ArrayDimension;
14use mz_repr::adt::char;
15use mz_repr::adt::jsonb::JsonbRef;
16use mz_repr::adt::numeric::{NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION};
17use mz_repr::{CatalogItemId, ColumnName, ColumnType, Datum, RelationDesc, ScalarType};
18use serde_json::{Map, json};
19
20use crate::avro::DocTarget;
21use crate::encode::{Encode, TypedDatum, column_names_and_types};
22use crate::envelopes;
23
24const AVRO_NAMESPACE: &str = "com.materialize.sink";
25const MICROS_PER_MILLIS: u32 = 1_000;
26
27pub struct JsonEncoder {
29 columns: Vec<(ColumnName, ColumnType)>,
30}
31
32impl JsonEncoder {
33 pub fn new(desc: RelationDesc, debezium: bool) -> Self {
34 let mut columns = column_names_and_types(desc);
35 if debezium {
36 columns = envelopes::dbz_envelope(columns);
37 };
38 JsonEncoder { columns }
39 }
40}
41
42impl Encode for JsonEncoder {
43 fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
44 let value = encode_datums_as_json(row.iter(), self.columns.as_ref());
45 value.to_string().into_bytes()
46 }
47}
48
49impl fmt::Debug for JsonEncoder {
50 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
51 f.debug_struct("JsonEncoder")
52 .field(
53 "schema",
54 &format!(
55 "{:?}",
56 build_row_schema_json(
57 &self.columns,
58 "schema",
59 &BTreeMap::new(),
60 None,
61 &Default::default(),
62 )
63 ),
64 )
65 .finish()
66 }
67}
68
69pub fn encode_datums_as_json<'a, I>(
71 datums: I,
72 names_types: &[(ColumnName, ColumnType)],
73) -> serde_json::Value
74where
75 I: IntoIterator<Item = Datum<'a>>,
76{
77 let value_fields = datums
78 .into_iter()
79 .zip(names_types)
80 .map(|(datum, (name, typ))| {
81 (
82 name.to_string(),
83 TypedDatum::new(datum, typ).json(&JsonNumberPolicy::KeepAsNumber),
84 )
85 })
86 .collect();
87 serde_json::Value::Object(value_fields)
88}
89
90#[derive(Debug)]
92pub enum JsonNumberPolicy {
93 KeepAsNumber,
95 ConvertNumberToString,
98}
99
100pub trait ToJson {
101 fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value;
103}
104
105impl ToJson for TypedDatum<'_> {
106 fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value {
107 let TypedDatum { datum, typ } = self;
108 if typ.nullable && datum.is_null() {
109 return serde_json::Value::Null;
110 }
111 let value = match &typ.scalar_type {
112 ScalarType::AclItem => json!(datum.unwrap_acl_item().to_string()),
113 ScalarType::Bool => json!(datum.unwrap_bool()),
114 ScalarType::PgLegacyChar => json!(datum.unwrap_uint8()),
115 ScalarType::Int16 => json!(datum.unwrap_int16()),
116 ScalarType::Int32 => json!(datum.unwrap_int32()),
117 ScalarType::Int64 => json!(datum.unwrap_int64()),
118 ScalarType::UInt16 => json!(datum.unwrap_uint16()),
119 ScalarType::UInt32
120 | ScalarType::Oid
121 | ScalarType::RegClass
122 | ScalarType::RegProc
123 | ScalarType::RegType => {
124 json!(datum.unwrap_uint32())
125 }
126 ScalarType::UInt64 => json!(datum.unwrap_uint64()),
127 ScalarType::Float32 => json!(datum.unwrap_float32()),
128 ScalarType::Float64 => json!(datum.unwrap_float64()),
129 ScalarType::Numeric { .. } => {
130 json!(datum.unwrap_numeric().0.to_standard_notation_string())
131 }
132 ScalarType::Date => serde_json::Value::String(format!("{}", datum.unwrap_date())),
134 ScalarType::Time => serde_json::Value::String(format!("{:?}", datum.unwrap_time())),
135 ScalarType::Timestamp { .. } => {
136 let dt = datum.unwrap_timestamp().to_naive().and_utc();
137 let millis = dt.timestamp_millis();
138 let micros = dt.timestamp_subsec_micros()
139 - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
140 serde_json::Value::String(format!("{millis}.{micros:0>3}"))
141 }
142 ScalarType::TimestampTz { .. } => {
143 let dt = datum.unwrap_timestamptz().to_utc();
144 let millis = dt.timestamp_millis();
145 let micros = dt.timestamp_subsec_micros()
146 - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
147 serde_json::Value::String(format!("{millis}.{micros:0>3}"))
148 }
149 ScalarType::Interval => {
150 serde_json::Value::String(format!("{}", datum.unwrap_interval()))
151 }
152 ScalarType::Bytes => json!(datum.unwrap_bytes()),
153 ScalarType::String | ScalarType::VarChar { .. } | ScalarType::PgLegacyName => {
154 json!(datum.unwrap_str())
155 }
156 ScalarType::Char { length } => {
157 let s = char::format_str_pad(datum.unwrap_str(), *length);
158 serde_json::Value::String(s)
159 }
160 ScalarType::Jsonb => JsonbRef::from_datum(datum).to_serde_json(),
161 ScalarType::Uuid => json!(datum.unwrap_uuid()),
162 ty @ (ScalarType::Array(..) | ScalarType::Int2Vector) => {
163 let array = datum.unwrap_array();
164 let dims = array.dims().into_iter().collect::<Vec<_>>();
165 let mut datums = array.elements().iter();
166 encode_array(&mut datums, &dims, &mut |datum| {
167 TypedDatum::new(
168 datum,
169 &ColumnType {
170 nullable: true,
171 scalar_type: ty.unwrap_collection_element_type().clone(),
172 },
173 )
174 .json(number_policy)
175 })
176 }
177 ScalarType::List { element_type, .. } => {
178 let values = datum
179 .unwrap_list()
180 .into_iter()
181 .map(|datum| {
182 TypedDatum::new(
183 datum,
184 &ColumnType {
185 nullable: true,
186 scalar_type: (**element_type).clone(),
187 },
188 )
189 .json(number_policy)
190 })
191 .collect();
192 serde_json::Value::Array(values)
193 }
194 ScalarType::Record { fields, .. } => {
195 let list = datum.unwrap_list();
196 let fields: Map<String, serde_json::Value> = fields
197 .iter()
198 .zip(&list)
199 .map(|((name, typ), datum)| {
200 let name = name.to_string();
201 let datum = TypedDatum::new(datum, typ);
202 let value = datum.json(number_policy);
203 (name, value)
204 })
205 .collect();
206 fields.into()
207 }
208 ScalarType::Map { value_type, .. } => {
209 let map = datum.unwrap_map();
210 let elements = map
211 .into_iter()
212 .map(|(key, datum)| {
213 let value = TypedDatum::new(
214 datum,
215 &ColumnType {
216 nullable: true,
217 scalar_type: (**value_type).clone(),
218 },
219 )
220 .json(number_policy);
221 (key.to_string(), value)
222 })
223 .collect();
224 serde_json::Value::Object(elements)
225 }
226 ScalarType::MzTimestamp => json!(datum.unwrap_mz_timestamp().to_string()),
227 ScalarType::Range { .. } => {
228 json!(datum.unwrap_range().to_string())
232 }
233 ScalarType::MzAclItem => json!(datum.unwrap_mz_acl_item().to_string()),
234 };
235 match (number_policy, value) {
238 (JsonNumberPolicy::KeepAsNumber, value) => value,
239 (JsonNumberPolicy::ConvertNumberToString, serde_json::Value::Number(n)) => {
240 serde_json::Value::String(n.to_string())
241 }
242 (JsonNumberPolicy::ConvertNumberToString, value) => value,
243 }
244 }
245}
246
247fn encode_array<'a>(
248 elems: &mut impl Iterator<Item = Datum<'a>>,
249 dims: &[ArrayDimension],
250 elem_encoder: &mut impl FnMut(Datum<'_>) -> serde_json::Value,
251) -> serde_json::Value {
252 serde_json::Value::Array(match dims {
253 [] => vec![],
254 [dim] => elems.take(dim.length).map(elem_encoder).collect(),
255 [dim, rest @ ..] => (0..dim.length)
256 .map(|_| encode_array(elems, rest, elem_encoder))
257 .collect(),
258 })
259}
260
261fn build_row_schema_field_type(
262 type_namer: &mut Namer,
263 custom_names: &BTreeMap<CatalogItemId, String>,
264 typ: &ColumnType,
265 item_id: Option<CatalogItemId>,
266 options: &SchemaOptions,
267) -> serde_json::Value {
268 let mut field_type = match &typ.scalar_type {
269 ScalarType::AclItem => json!("string"),
270 ScalarType::Bool => json!("boolean"),
271 ScalarType::PgLegacyChar => json!({
272 "type": "fixed",
273 "size": 1,
274 }),
275 ScalarType::Int16 | ScalarType::Int32 => {
276 json!("int")
277 }
278 ScalarType::Int64 => json!("long"),
279 ScalarType::UInt16 => type_namer.unsigned_type(2),
280 ScalarType::UInt32
281 | ScalarType::Oid
282 | ScalarType::RegClass
283 | ScalarType::RegProc
284 | ScalarType::RegType => type_namer.unsigned_type(4),
285 ScalarType::UInt64 => type_namer.unsigned_type(8),
286 ScalarType::Float32 => json!("float"),
287 ScalarType::Float64 => json!("double"),
288 ScalarType::Date => json!({
289 "type": "int",
290 "logicalType": "date",
291 }),
292 ScalarType::Time => json!({
293 "type": "long",
294 "logicalType": "time-micros",
295 }),
296 ScalarType::Timestamp { precision } | ScalarType::TimestampTz { precision } => json!({
297 "type": "long",
298 "logicalType": match precision {
299 Some(precision) if precision.into_u8() <= 3 => "timestamp-millis",
300 _ => "timestamp-micros",
301 },
302 }),
303 ScalarType::Interval => type_namer.interval_type(),
304 ScalarType::Bytes => json!("bytes"),
305 ScalarType::String
306 | ScalarType::Char { .. }
307 | ScalarType::VarChar { .. }
308 | ScalarType::PgLegacyName => {
309 json!("string")
310 }
311 ScalarType::Jsonb => json!({
312 "type": "string",
313 "connect.name": "io.debezium.data.Json",
314 }),
315 ScalarType::Uuid => json!({
316 "type": "string",
317 "logicalType": "uuid",
318 }),
319 ty @ (ScalarType::Array(..) | ScalarType::Int2Vector | ScalarType::List { .. }) => {
320 let inner = build_row_schema_field_type(
321 type_namer,
322 custom_names,
323 &ColumnType {
324 nullable: true,
325 scalar_type: ty.unwrap_collection_element_type().clone(),
326 },
327 item_id,
328 options,
329 );
330 json!({
331 "type": "array",
332 "items": inner
333 })
334 }
335 ScalarType::Map { value_type, .. } => {
336 let inner = build_row_schema_field_type(
337 type_namer,
338 custom_names,
339 &ColumnType {
340 nullable: true,
341 scalar_type: (**value_type).clone(),
342 },
343 item_id,
344 options,
345 );
346 json!({
347 "type": "map",
348 "values": inner
349 })
350 }
351 ScalarType::Record {
352 fields, custom_id, ..
353 } => {
354 let (name, name_seen) = match custom_id.as_ref().and_then(|id| custom_names.get(id)) {
355 Some(name) => type_namer.valid_name(name),
356 None => (type_namer.anonymous_record_name(), false),
357 };
358 if name_seen {
359 json!(name)
360 } else {
361 let fields = fields.to_vec();
362 let json_fields =
363 build_row_schema_fields(&fields, type_namer, custom_names, *custom_id, options);
364 if let Some(comment) =
365 custom_id.and_then(|id| options.doc_comments.get(&DocTarget::Type(id)))
366 {
367 json!({
368 "type": "record",
369 "name": name,
370 "doc": comment,
371 "fields": json_fields
372 })
373 } else {
374 json!({
375 "type": "record",
376 "name": name,
377 "fields": json_fields
378 })
379 }
380 }
381 }
382 ScalarType::Numeric { max_scale } => {
383 let (p, s) = match max_scale {
384 Some(max_scale) => (NUMERIC_DATUM_MAX_PRECISION, max_scale.into_u8()),
385 None => (NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION),
386 };
387 json!({
388 "type": "bytes",
389 "logicalType": "decimal",
390 "precision": p,
391 "scale": s,
392 })
393 }
394 ScalarType::MzTimestamp => json!("string"),
395 ScalarType::Range { .. } => json!("string"),
397 ScalarType::MzAclItem => json!("string"),
398 };
399 if typ.nullable {
400 field_type = json!(["null", field_type]);
404 }
405 field_type
406}
407
408fn build_row_schema_fields(
409 columns: &[(ColumnName, ColumnType)],
410 type_namer: &mut Namer,
411 custom_names: &BTreeMap<CatalogItemId, String>,
412 item_id: Option<CatalogItemId>,
413 options: &SchemaOptions,
414) -> Vec<serde_json::Value> {
415 let mut fields = Vec::new();
416 let mut field_namer = Namer::default();
417 for (name, typ) in columns.iter() {
418 let (name, _seen) = field_namer.valid_name(name.as_str());
419 let field_type =
420 build_row_schema_field_type(type_namer, custom_names, typ, item_id, options);
421
422 let mut field = json!({
423 "name": name,
424 "type": field_type,
425 });
426
427 let is_nullable_union = field_type
429 .as_array()
430 .is_some_and(|array| array.first().is_some_and(|first| first == &json!("null")));
431
432 if options.set_null_defaults && is_nullable_union {
433 field
434 .as_object_mut()
435 .expect("`field` initialized to JSON object above")
436 .insert("default".to_string(), json!(null));
437 }
438
439 if let Some(comment) = item_id.and_then(|item_id| {
440 options.doc_comments.get(&DocTarget::Field {
441 object_id: item_id,
442 column_name: name.into(),
443 })
444 }) {
445 field
446 .as_object_mut()
447 .expect("`field` initialized to JSON object above")
448 .insert("doc".to_string(), json!(comment));
449 }
450
451 fields.push(field);
452 }
453 fields
454}
455
456#[derive(Default, Clone, Debug)]
457pub struct SchemaOptions {
459 pub set_null_defaults: bool,
461 pub doc_comments: BTreeMap<DocTarget, String>,
464}
465
466pub fn build_row_schema_json(
468 columns: &[(ColumnName, ColumnType)],
469 name: &str,
470 custom_names: &BTreeMap<CatalogItemId, String>,
471 item_id: Option<CatalogItemId>,
472 options: &SchemaOptions,
473) -> Result<serde_json::Value, anyhow::Error> {
474 let fields = build_row_schema_fields(
475 columns,
476 &mut Namer::default(),
477 custom_names,
478 item_id,
479 options,
480 );
481
482 let _ = mz_avro::schema::Name::parse_simple(name)?;
483 if let Some(comment) =
484 item_id.and_then(|item_id| options.doc_comments.get(&DocTarget::Type(item_id)))
485 {
486 Ok(json!({
487 "type": "record",
488 "doc": comment,
489 "fields": fields,
490 "name": name
491 }))
492 } else {
493 Ok(json!({
494 "type": "record",
495 "fields": fields,
496 "name": name
497 }))
498 }
499}
500
501#[derive(Default)]
503struct Namer {
504 record_index: usize,
505 seen_interval: bool,
506 seen_unsigneds: BTreeSet<usize>,
507 seen_names: BTreeMap<String, String>,
508 valid_names_count: BTreeMap<String, usize>,
509}
510
511impl Namer {
512 fn interval_type(&mut self) -> serde_json::Value {
514 let name = format!("{AVRO_NAMESPACE}.interval");
515 if self.seen_interval {
516 json!(name)
517 } else {
518 self.seen_interval = true;
519 json!({
520 "type": "fixed",
521 "size": 16,
522 "name": name,
523 })
524 }
525 }
526
527 fn unsigned_type(&mut self, width: usize) -> serde_json::Value {
529 let name = format!("{AVRO_NAMESPACE}.uint{width}");
530 if self.seen_unsigneds.contains(&width) {
531 json!(name)
532 } else {
533 self.seen_unsigneds.insert(width);
534 json!({
535 "type": "fixed",
536 "size": width,
537 "name": name,
538 })
539 }
540 }
541
542 fn anonymous_record_name(&mut self) -> String {
544 let out = format!("{AVRO_NAMESPACE}.record{}", self.record_index);
545 self.record_index += 1;
546 out
547 }
548
549 fn valid_name(&mut self, name: &str) -> (String, bool) {
553 if let Some(valid_name) = self.seen_names.get(name) {
554 (valid_name.into(), true)
555 } else {
556 let mut valid_name = mz_avro::schema::Name::make_valid(name);
557 let valid_name_count = self
558 .valid_names_count
559 .entry(valid_name.clone())
560 .or_default();
561 if *valid_name_count != 0 {
562 valid_name += &valid_name_count.to_string();
563 }
564 *valid_name_count += 1;
565 self.seen_names.insert(name.into(), valid_name.clone());
566 (valid_name, false)
567 }
568 }
569}