1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt;
12
13use mz_repr::adt::array::ArrayDimension;
14use mz_repr::adt::char;
15use mz_repr::adt::jsonb::JsonbRef;
16use mz_repr::adt::numeric::{NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION};
17use mz_repr::{CatalogItemId, ColumnName, Datum, RelationDesc, SqlColumnType, SqlScalarType};
18use serde_json::{Map, json};
19
20use crate::avro::DocTarget;
21use crate::encode::{Encode, TypedDatum, column_names_and_types};
22use crate::envelopes;
23
24const AVRO_NAMESPACE: &str = "com.materialize.sink";
25const MICROS_PER_MILLIS: u32 = 1_000;
26
27pub struct JsonEncoder {
29 columns: Vec<(ColumnName, SqlColumnType)>,
30}
31
32impl JsonEncoder {
33 pub fn new(desc: RelationDesc, debezium: bool) -> Self {
34 let mut columns = column_names_and_types(desc);
35 if debezium {
36 columns = envelopes::dbz_envelope(columns);
37 };
38 JsonEncoder { columns }
39 }
40}
41
42impl Encode for JsonEncoder {
43 fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
44 let value = encode_datums_as_json(row.iter(), self.columns.as_ref());
45 value.to_string().into_bytes()
46 }
47}
48
49impl fmt::Debug for JsonEncoder {
50 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
51 f.debug_struct("JsonEncoder")
52 .field(
53 "schema",
54 &format!(
55 "{:?}",
56 build_row_schema_json(
57 &self.columns,
58 "schema",
59 &BTreeMap::new(),
60 None,
61 &Default::default(),
62 )
63 ),
64 )
65 .finish()
66 }
67}
68
69pub fn encode_datums_as_json<'a, I>(
71 datums: I,
72 names_types: &[(ColumnName, SqlColumnType)],
73) -> serde_json::Value
74where
75 I: IntoIterator<Item = Datum<'a>>,
76{
77 let value_fields = datums
78 .into_iter()
79 .zip(names_types)
80 .map(|(datum, (name, typ))| {
81 (
82 name.to_string(),
83 TypedDatum::new(datum, typ).json(&JsonNumberPolicy::KeepAsNumber),
84 )
85 })
86 .collect();
87 serde_json::Value::Object(value_fields)
88}
89
90#[derive(Debug)]
92pub enum JsonNumberPolicy {
93 KeepAsNumber,
95 ConvertNumberToString,
98}
99
100pub trait ToJson {
101 fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value;
103}
104
105impl ToJson for TypedDatum<'_> {
106 fn json(self, number_policy: &JsonNumberPolicy) -> serde_json::Value {
107 let TypedDatum { datum, typ } = self;
108 if typ.nullable && datum.is_null() {
109 return serde_json::Value::Null;
110 }
111 let value = match &typ.scalar_type {
112 SqlScalarType::AclItem => json!(datum.unwrap_acl_item().to_string()),
113 SqlScalarType::Bool => json!(datum.unwrap_bool()),
114 SqlScalarType::PgLegacyChar => json!(datum.unwrap_uint8()),
115 SqlScalarType::Int16 => json!(datum.unwrap_int16()),
116 SqlScalarType::Int32 => json!(datum.unwrap_int32()),
117 SqlScalarType::Int64 => json!(datum.unwrap_int64()),
118 SqlScalarType::UInt16 => json!(datum.unwrap_uint16()),
119 SqlScalarType::UInt32
120 | SqlScalarType::Oid
121 | SqlScalarType::RegClass
122 | SqlScalarType::RegProc
123 | SqlScalarType::RegType => {
124 json!(datum.unwrap_uint32())
125 }
126 SqlScalarType::UInt64 => json!(datum.unwrap_uint64()),
127 SqlScalarType::Float32 => json!(datum.unwrap_float32()),
128 SqlScalarType::Float64 => json!(datum.unwrap_float64()),
129 SqlScalarType::Numeric { .. } => {
130 json!(datum.unwrap_numeric().0.to_standard_notation_string())
131 }
132 SqlScalarType::Date => serde_json::Value::String(format!("{}", datum.unwrap_date())),
134 SqlScalarType::Time => serde_json::Value::String(format!("{:?}", datum.unwrap_time())),
135 SqlScalarType::Timestamp { .. } => {
136 let dt = datum.unwrap_timestamp().to_naive().and_utc();
137 let millis = dt.timestamp_millis();
138 let micros = dt.timestamp_subsec_micros()
139 - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
140 serde_json::Value::String(format!("{millis}.{micros:0>3}"))
141 }
142 SqlScalarType::TimestampTz { .. } => {
143 let dt = datum.unwrap_timestamptz().to_utc();
144 let millis = dt.timestamp_millis();
145 let micros = dt.timestamp_subsec_micros()
146 - (dt.timestamp_subsec_millis() * MICROS_PER_MILLIS);
147 serde_json::Value::String(format!("{millis}.{micros:0>3}"))
148 }
149 SqlScalarType::Interval => {
150 serde_json::Value::String(format!("{}", datum.unwrap_interval()))
151 }
152 SqlScalarType::Bytes => json!(datum.unwrap_bytes()),
153 SqlScalarType::String | SqlScalarType::VarChar { .. } | SqlScalarType::PgLegacyName => {
154 json!(datum.unwrap_str())
155 }
156 SqlScalarType::Char { length } => {
157 let s = char::format_str_pad(datum.unwrap_str(), *length);
158 serde_json::Value::String(s)
159 }
160 SqlScalarType::Jsonb => JsonbRef::from_datum(datum).to_serde_json(),
161 SqlScalarType::Uuid => json!(datum.unwrap_uuid()),
162 ty @ (SqlScalarType::Array(..) | SqlScalarType::Int2Vector) => {
163 let array = datum.unwrap_array();
164 let dims = array.dims().into_iter().collect::<Vec<_>>();
165 let mut datums = array.elements().iter();
166 encode_array(&mut datums, &dims, &mut |datum| {
167 TypedDatum::new(
168 datum,
169 &SqlColumnType {
170 nullable: true,
171 scalar_type: ty.unwrap_collection_element_type().clone(),
172 },
173 )
174 .json(number_policy)
175 })
176 }
177 SqlScalarType::List { element_type, .. } => {
178 let values = datum
179 .unwrap_list()
180 .into_iter()
181 .map(|datum| {
182 TypedDatum::new(
183 datum,
184 &SqlColumnType {
185 nullable: true,
186 scalar_type: (**element_type).clone(),
187 },
188 )
189 .json(number_policy)
190 })
191 .collect();
192 serde_json::Value::Array(values)
193 }
194 SqlScalarType::Record { fields, .. } => {
195 let list = datum.unwrap_list();
196 let fields: Map<String, serde_json::Value> = fields
197 .iter()
198 .zip(&list)
199 .map(|((name, typ), datum)| {
200 let name = name.to_string();
201 let datum = TypedDatum::new(datum, typ);
202 let value = datum.json(number_policy);
203 (name, value)
204 })
205 .collect();
206 fields.into()
207 }
208 SqlScalarType::Map { value_type, .. } => {
209 let map = datum.unwrap_map();
210 let elements = map
211 .into_iter()
212 .map(|(key, datum)| {
213 let value = TypedDatum::new(
214 datum,
215 &SqlColumnType {
216 nullable: true,
217 scalar_type: (**value_type).clone(),
218 },
219 )
220 .json(number_policy);
221 (key.to_string(), value)
222 })
223 .collect();
224 serde_json::Value::Object(elements)
225 }
226 SqlScalarType::MzTimestamp => json!(datum.unwrap_mz_timestamp().to_string()),
227 SqlScalarType::Range { .. } => {
228 json!(datum.unwrap_range().to_string())
232 }
233 SqlScalarType::MzAclItem => json!(datum.unwrap_mz_acl_item().to_string()),
234 };
235 match (number_policy, value) {
238 (JsonNumberPolicy::KeepAsNumber, value) => value,
239 (JsonNumberPolicy::ConvertNumberToString, serde_json::Value::Number(n)) => {
240 serde_json::Value::String(n.to_string())
241 }
242 (JsonNumberPolicy::ConvertNumberToString, value) => value,
243 }
244 }
245}
246
247fn encode_array<'a>(
248 elems: &mut impl Iterator<Item = Datum<'a>>,
249 dims: &[ArrayDimension],
250 elem_encoder: &mut impl FnMut(Datum<'_>) -> serde_json::Value,
251) -> serde_json::Value {
252 serde_json::Value::Array(match dims {
253 [] => vec![],
254 [dim] => elems.take(dim.length).map(elem_encoder).collect(),
255 [dim, rest @ ..] => (0..dim.length)
256 .map(|_| encode_array(elems, rest, elem_encoder))
257 .collect(),
258 })
259}
260
261fn build_row_schema_field_type(
262 type_namer: &mut Namer,
263 custom_names: &BTreeMap<CatalogItemId, String>,
264 typ: &SqlColumnType,
265 item_id: Option<CatalogItemId>,
266 options: &SchemaOptions,
267) -> serde_json::Value {
268 let mut field_type = match &typ.scalar_type {
269 SqlScalarType::AclItem => json!("string"),
270 SqlScalarType::Bool => json!("boolean"),
271 SqlScalarType::PgLegacyChar => json!({
272 "type": "fixed",
273 "size": 1,
274 }),
275 SqlScalarType::Int16 | SqlScalarType::Int32 => {
276 json!("int")
277 }
278 SqlScalarType::Int64 => json!("long"),
279 SqlScalarType::UInt16 => type_namer.unsigned_type(2),
280 SqlScalarType::UInt32
281 | SqlScalarType::Oid
282 | SqlScalarType::RegClass
283 | SqlScalarType::RegProc
284 | SqlScalarType::RegType => type_namer.unsigned_type(4),
285 SqlScalarType::UInt64 => type_namer.unsigned_type(8),
286 SqlScalarType::Float32 => json!("float"),
287 SqlScalarType::Float64 => json!("double"),
288 SqlScalarType::Date => json!({
289 "type": "int",
290 "logicalType": "date",
291 }),
292 SqlScalarType::Time => json!({
293 "type": "long",
294 "logicalType": "time-micros",
295 }),
296 SqlScalarType::Timestamp { precision } | SqlScalarType::TimestampTz { precision } => {
297 json!({
298 "type": "long",
299 "logicalType": match precision {
300 Some(precision) if precision.into_u8() <= 3 => "timestamp-millis",
301 _ => "timestamp-micros",
302 },
303 })
304 }
305 SqlScalarType::Interval => type_namer.interval_type(),
306 SqlScalarType::Bytes => json!("bytes"),
307 SqlScalarType::String
308 | SqlScalarType::Char { .. }
309 | SqlScalarType::VarChar { .. }
310 | SqlScalarType::PgLegacyName => {
311 json!("string")
312 }
313 SqlScalarType::Jsonb => json!({
314 "type": "string",
315 "connect.name": "io.debezium.data.Json",
316 }),
317 SqlScalarType::Uuid => json!({
318 "type": "string",
319 "logicalType": "uuid",
320 }),
321 ty
322 @ (SqlScalarType::Array(..) | SqlScalarType::Int2Vector | SqlScalarType::List { .. }) => {
323 let inner = build_row_schema_field_type(
324 type_namer,
325 custom_names,
326 &SqlColumnType {
327 nullable: true,
328 scalar_type: ty.unwrap_collection_element_type().clone(),
329 },
330 item_id,
331 options,
332 );
333 json!({
334 "type": "array",
335 "items": inner
336 })
337 }
338 SqlScalarType::Map { value_type, .. } => {
339 let inner = build_row_schema_field_type(
340 type_namer,
341 custom_names,
342 &SqlColumnType {
343 nullable: true,
344 scalar_type: (**value_type).clone(),
345 },
346 item_id,
347 options,
348 );
349 json!({
350 "type": "map",
351 "values": inner
352 })
353 }
354 SqlScalarType::Record {
355 fields, custom_id, ..
356 } => {
357 let (name, name_seen) = match custom_id.as_ref().and_then(|id| custom_names.get(id)) {
358 Some(name) => type_namer.valid_name(name),
359 None => (type_namer.anonymous_record_name(), false),
360 };
361 if name_seen {
362 json!(name)
363 } else {
364 let fields = fields.to_vec();
365 let json_fields =
366 build_row_schema_fields(&fields, type_namer, custom_names, *custom_id, options);
367 if let Some(comment) =
368 custom_id.and_then(|id| options.doc_comments.get(&DocTarget::Type(id)))
369 {
370 json!({
371 "type": "record",
372 "name": name,
373 "doc": comment,
374 "fields": json_fields
375 })
376 } else {
377 json!({
378 "type": "record",
379 "name": name,
380 "fields": json_fields
381 })
382 }
383 }
384 }
385 SqlScalarType::Numeric { max_scale } => {
386 let (p, s) = match max_scale {
387 Some(max_scale) => (NUMERIC_DATUM_MAX_PRECISION, max_scale.into_u8()),
388 None => (NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION),
389 };
390 json!({
391 "type": "bytes",
392 "logicalType": "decimal",
393 "precision": p,
394 "scale": s,
395 })
396 }
397 SqlScalarType::MzTimestamp => json!("string"),
398 SqlScalarType::Range { .. } => json!("string"),
400 SqlScalarType::MzAclItem => json!("string"),
401 };
402 if typ.nullable {
403 field_type = json!(["null", field_type]);
407 }
408 field_type
409}
410
411fn build_row_schema_fields(
412 columns: &[(ColumnName, SqlColumnType)],
413 type_namer: &mut Namer,
414 custom_names: &BTreeMap<CatalogItemId, String>,
415 item_id: Option<CatalogItemId>,
416 options: &SchemaOptions,
417) -> Vec<serde_json::Value> {
418 let mut fields = Vec::new();
419 let mut field_namer = Namer::default();
420 for (name, typ) in columns.iter() {
421 let (name, _seen) = field_namer.valid_name(name);
422 let field_type =
423 build_row_schema_field_type(type_namer, custom_names, typ, item_id, options);
424
425 let mut field = json!({
426 "name": name,
427 "type": field_type,
428 });
429
430 let is_nullable_union = field_type
432 .as_array()
433 .is_some_and(|array| array.first().is_some_and(|first| first == &json!("null")));
434
435 if options.set_null_defaults && is_nullable_union {
436 field
437 .as_object_mut()
438 .expect("`field` initialized to JSON object above")
439 .insert("default".to_string(), json!(null));
440 }
441
442 if let Some(comment) = item_id.and_then(|item_id| {
443 options.doc_comments.get(&DocTarget::Field {
444 object_id: item_id,
445 column_name: name.into(),
446 })
447 }) {
448 field
449 .as_object_mut()
450 .expect("`field` initialized to JSON object above")
451 .insert("doc".to_string(), json!(comment));
452 }
453
454 fields.push(field);
455 }
456 fields
457}
458
459#[derive(Default, Clone, Debug)]
460pub struct SchemaOptions {
462 pub set_null_defaults: bool,
464 pub doc_comments: BTreeMap<DocTarget, String>,
467}
468
469pub fn build_row_schema_json(
471 columns: &[(ColumnName, SqlColumnType)],
472 name: &str,
473 custom_names: &BTreeMap<CatalogItemId, String>,
474 item_id: Option<CatalogItemId>,
475 options: &SchemaOptions,
476) -> Result<serde_json::Value, anyhow::Error> {
477 let fields = build_row_schema_fields(
478 columns,
479 &mut Namer::default(),
480 custom_names,
481 item_id,
482 options,
483 );
484
485 let _ = mz_avro::schema::Name::parse_simple(name)?;
486 if let Some(comment) =
487 item_id.and_then(|item_id| options.doc_comments.get(&DocTarget::Type(item_id)))
488 {
489 Ok(json!({
490 "type": "record",
491 "doc": comment,
492 "fields": fields,
493 "name": name
494 }))
495 } else {
496 Ok(json!({
497 "type": "record",
498 "fields": fields,
499 "name": name
500 }))
501 }
502}
503
504#[derive(Default)]
506struct Namer {
507 record_index: usize,
508 seen_interval: bool,
509 seen_unsigneds: BTreeSet<usize>,
510 seen_names: BTreeMap<String, String>,
511 valid_names_count: BTreeMap<String, usize>,
512}
513
514impl Namer {
515 fn interval_type(&mut self) -> serde_json::Value {
517 let name = format!("{AVRO_NAMESPACE}.interval");
518 if self.seen_interval {
519 json!(name)
520 } else {
521 self.seen_interval = true;
522 json!({
523 "type": "fixed",
524 "size": 16,
525 "name": name,
526 })
527 }
528 }
529
530 fn unsigned_type(&mut self, width: usize) -> serde_json::Value {
532 let name = format!("{AVRO_NAMESPACE}.uint{width}");
533 if self.seen_unsigneds.contains(&width) {
534 json!(name)
535 } else {
536 self.seen_unsigneds.insert(width);
537 json!({
538 "type": "fixed",
539 "size": width,
540 "name": name,
541 })
542 }
543 }
544
545 fn anonymous_record_name(&mut self) -> String {
547 let out = format!("{AVRO_NAMESPACE}.record{}", self.record_index);
548 self.record_index += 1;
549 out
550 }
551
552 fn valid_name(&mut self, name: &str) -> (String, bool) {
556 if let Some(valid_name) = self.seen_names.get(name) {
557 (valid_name.into(), true)
558 } else {
559 let mut valid_name = mz_avro::schema::Name::make_valid(name);
560 let valid_name_count = self
561 .valid_names_count
562 .entry(valid_name.clone())
563 .or_default();
564 if *valid_name_count != 0 {
565 valid_name += &valid_name_count.to_string();
566 }
567 *valid_name_count += 1;
568 self.seen_names.insert(name.into(), valid_name.clone());
569 (valid_name, false)
570 }
571 }
572}