1use std::collections::BTreeMap;
11use std::fmt;
12use std::sync::LazyLock;
13
14use anyhow::Ok;
15use byteorder::{NetworkEndian, WriteBytesExt};
16use chrono::Timelike;
17use itertools::Itertools;
18use mz_avro::Schema;
19use mz_avro::types::{DecimalValue, ToAvro, Value};
20use mz_ore::cast::CastFrom;
21use mz_repr::adt::jsonb::JsonbRef;
22use mz_repr::adt::numeric::{self, NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION};
23use mz_repr::{CatalogItemId, ColumnName, ColumnType, Datum, RelationDesc, Row, ScalarType};
24use serde_json::json;
25
26use crate::encode::{Encode, TypedDatum, column_names_and_types};
27use crate::envelopes::{self, DBZ_ROW_TYPE_ID, ENVELOPE_CUSTOM_NAMES};
28use crate::json::{SchemaOptions, build_row_schema_json};
29
30static DEBEZIUM_TRANSACTION_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
37 Schema::parse(&json!({
38 "type": "record",
39 "name": "envelope",
40 "fields": [
41 {
42 "name": "id",
43 "type": "string"
44 },
45 {
46 "name": "status",
47 "type": "string"
48 },
49 {
50 "name": "event_count",
51 "type": [
52 "null",
53 "long"
54 ]
55 },
56 {
57 "name": "data_collections",
58 "type": [
59 "null",
60 {
61 "type": "array",
62 "items": {
63 "type": "record",
64 "name": "data_collection",
65 "fields": [
66 {
67 "name": "data_collection",
68 "type": "string"
69 },
70 {
71 "name": "event_count",
72 "type": "long"
73 },
74 ]
75 }
76 }
77 ],
78 "default": null,
79 },
80 ]
81 }))
82 .expect("valid schema constructed")
83});
84
85fn encode_avro_header(buf: &mut Vec<u8>, schema_id: i32) {
86 buf.write_u8(0).expect("writing to vec cannot fail");
92 buf.write_i32::<NetworkEndian>(schema_id)
93 .expect("writing to vec cannot fail");
94}
95
96fn encode_message_unchecked(
97 schema_id: i32,
98 row: Row,
99 schema: &Schema,
100 columns: &[(ColumnName, ColumnType)],
101) -> Vec<u8> {
102 let mut buf = vec![];
103 encode_avro_header(&mut buf, schema_id);
104 let value = encode_datums_as_avro(row.iter(), columns);
105 mz_avro::encode_unchecked(&value, schema, &mut buf);
106 buf
107}
108
109#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
110pub enum DocTarget {
111 Type(CatalogItemId),
112 Field {
113 object_id: CatalogItemId,
114 column_name: ColumnName,
115 },
116}
117
118impl DocTarget {
119 fn id(&self) -> CatalogItemId {
120 match self {
121 DocTarget::Type(object_id) => *object_id,
122 DocTarget::Field { object_id, .. } => *object_id,
123 }
124 }
125}
126
127pub struct AvroSchemaGenerator {
129 columns: Vec<(ColumnName, ColumnType)>,
130 schema: Schema,
131}
132
133impl fmt::Debug for AvroSchemaGenerator {
134 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
135 f.debug_struct("SchemaGenerator")
136 .field("writer_schema", &self.schema())
137 .finish()
138 }
139}
140
141impl AvroSchemaGenerator {
142 pub fn new(
143 desc: RelationDesc,
144 debezium: bool,
145 mut doc_options: BTreeMap<DocTarget, String>,
146 avro_fullname: &str,
147 set_null_defaults: bool,
148 sink_from: Option<CatalogItemId>,
149 use_custom_envelope_names: bool,
150 ) -> Result<Self, anyhow::Error> {
151 let mut columns = column_names_and_types(desc);
152 if debezium {
153 columns = envelopes::dbz_envelope(columns);
154 if let Some(sink_from_id) = sink_from {
158 let mut new_column_docs = BTreeMap::new();
159 doc_options.iter().for_each(|(k, v)| {
160 if k.id() == sink_from_id {
161 match k {
162 DocTarget::Field { column_name, .. } => {
163 new_column_docs.insert(
164 DocTarget::Field {
165 object_id: DBZ_ROW_TYPE_ID,
166 column_name: column_name.clone(),
167 },
168 v.clone(),
169 );
170 }
171 DocTarget::Type(_) => {
172 new_column_docs.insert(DocTarget::Type(DBZ_ROW_TYPE_ID), v.clone());
173 }
174 }
175 }
176 });
177 doc_options.append(&mut new_column_docs);
178 doc_options.retain(|k, _v| k.id() != sink_from_id);
179 }
180 }
181 let custom_names = if use_custom_envelope_names {
182 &ENVELOPE_CUSTOM_NAMES
183 } else {
184 &BTreeMap::new()
185 };
186 let row_schema = build_row_schema_json(
187 &columns,
188 avro_fullname,
189 custom_names,
190 sink_from,
191 &SchemaOptions {
192 set_null_defaults,
193 doc_comments: doc_options,
194 },
195 )?;
196 let schema = Schema::parse(&row_schema).expect("valid schema constructed");
197 Ok(AvroSchemaGenerator { columns, schema })
198 }
199
200 pub fn schema(&self) -> &Schema {
201 &self.schema
202 }
203
204 pub fn columns(&self) -> &[(ColumnName, ColumnType)] {
205 &self.columns
206 }
207}
208
209pub struct AvroEncoder {
211 columns: Vec<(ColumnName, ColumnType)>,
212 schema: Schema,
213 schema_id: i32,
214}
215
216impl fmt::Debug for AvroEncoder {
217 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
218 f.debug_struct("AvroEncoder")
219 .field("writer_schema", &self.schema)
220 .finish()
221 }
222}
223
224impl AvroEncoder {
225 pub fn new(desc: RelationDesc, debezium: bool, schema: &str, schema_id: i32) -> Self {
226 let mut columns = column_names_and_types(desc);
227 if debezium {
228 columns = envelopes::dbz_envelope(columns);
229 };
230 AvroEncoder {
231 columns,
232 schema: Schema::parse(&serde_json::from_str(schema).expect("valid schema json"))
233 .expect("valid schema"),
234 schema_id,
235 }
236 }
237}
238
239impl Encode for AvroEncoder {
240 fn encode_unchecked(&self, row: Row) -> Vec<u8> {
241 encode_message_unchecked(self.schema_id, row, &self.schema, &self.columns)
242 }
243
244 fn hash(&self, buf: &[u8]) -> u64 {
245 let (_schema_id, payload) = crate::confluent::extract_avro_header(buf).unwrap();
248 seahash::hash(payload)
249 }
250}
251
252pub fn encode_datums_as_avro<'a, I>(datums: I, names_types: &[(ColumnName, ColumnType)]) -> Value
254where
255 I: IntoIterator<Item = Datum<'a>>,
256{
257 let value_fields: Vec<(String, Value)> = names_types
258 .iter()
259 .zip_eq(datums)
260 .map(|((name, typ), datum)| {
261 let name = name.as_str().to_owned();
262 (name, TypedDatum::new(datum, typ).avro())
263 })
264 .collect();
265 let v = Value::Record(value_fields);
266 v
267}
268
269impl<'a> mz_avro::types::ToAvro for TypedDatum<'a> {
270 fn avro(self) -> Value {
271 let TypedDatum { datum, typ } = self;
272 if typ.nullable && datum.is_null() {
273 Value::Union {
274 index: 0,
275 inner: Box::new(Value::Null),
276 n_variants: 2,
277 null_variant: Some(0),
278 }
279 } else {
280 let mut val = match &typ.scalar_type {
281 ScalarType::AclItem => Value::String(datum.unwrap_acl_item().to_string()),
282 ScalarType::Bool => Value::Boolean(datum.unwrap_bool()),
283 ScalarType::PgLegacyChar => {
284 Value::Fixed(1, datum.unwrap_uint8().to_le_bytes().into())
285 }
286 ScalarType::Int16 => Value::Int(i32::from(datum.unwrap_int16())),
287 ScalarType::Int32 => Value::Int(datum.unwrap_int32()),
288 ScalarType::Int64 => Value::Long(datum.unwrap_int64()),
289 ScalarType::UInt16 => Value::Fixed(2, datum.unwrap_uint16().to_be_bytes().into()),
290 ScalarType::UInt32 => Value::Fixed(4, datum.unwrap_uint32().to_be_bytes().into()),
291 ScalarType::UInt64 => Value::Fixed(8, datum.unwrap_uint64().to_be_bytes().into()),
292 ScalarType::Oid
293 | ScalarType::RegClass
294 | ScalarType::RegProc
295 | ScalarType::RegType => {
296 Value::Fixed(4, datum.unwrap_uint32().to_be_bytes().into())
297 }
298 ScalarType::Float32 => Value::Float(datum.unwrap_float32()),
299 ScalarType::Float64 => Value::Double(datum.unwrap_float64()),
300 ScalarType::Numeric { max_scale } => {
301 let mut d = datum.unwrap_numeric().0;
302 let (unscaled, precision, scale) = match max_scale {
303 Some(max_scale) => {
304 numeric::rescale(&mut d, max_scale.into_u8()).unwrap();
306 (
307 numeric::numeric_to_twos_complement_be(d).to_vec(),
308 NUMERIC_DATUM_MAX_PRECISION,
309 max_scale.into_u8(),
310 )
311 }
312 None => (
317 numeric::numeric_to_twos_complement_wide(d).to_vec(),
318 NUMERIC_AGG_MAX_PRECISION,
319 NUMERIC_DATUM_MAX_PRECISION,
320 ),
321 };
322 Value::Decimal(DecimalValue {
323 unscaled,
324 precision: usize::cast_from(precision),
325 scale: usize::cast_from(scale),
326 })
327 }
328 ScalarType::Date => Value::Date(datum.unwrap_date().unix_epoch_days()),
329 ScalarType::Time => Value::Long({
330 let time = datum.unwrap_time();
331 i64::from(time.num_seconds_from_midnight()) * 1_000_000
332 + i64::from(time.nanosecond()) / 1_000
333 }),
334 ScalarType::Timestamp { .. } => {
335 Value::Timestamp(datum.unwrap_timestamp().to_naive())
336 }
337 ScalarType::TimestampTz { .. } => {
338 Value::Timestamp(datum.unwrap_timestamptz().to_naive())
339 }
340 ScalarType::Interval => Value::Fixed(16, {
344 let iv = datum.unwrap_interval();
345 let mut buf = Vec::with_capacity(16);
346 buf.extend(iv.months.to_le_bytes());
347 buf.extend(iv.days.to_le_bytes());
348 buf.extend(iv.micros.to_le_bytes());
349 debug_assert_eq!(buf.len(), 16);
350 buf
351 }),
352 ScalarType::Bytes => Value::Bytes(Vec::from(datum.unwrap_bytes())),
353 ScalarType::String | ScalarType::VarChar { .. } | ScalarType::PgLegacyName => {
354 Value::String(datum.unwrap_str().to_owned())
355 }
356 ScalarType::Char { length } => {
357 let s = mz_repr::adt::char::format_str_pad(datum.unwrap_str(), *length);
358 Value::String(s)
359 }
360 ScalarType::Jsonb => Value::Json(JsonbRef::from_datum(datum).to_serde_json()),
361 ScalarType::Uuid => Value::Uuid(datum.unwrap_uuid()),
362 ty @ (ScalarType::Array(..) | ScalarType::Int2Vector | ScalarType::List { .. }) => {
363 let list = match ty {
364 ScalarType::Array(_) | ScalarType::Int2Vector => {
365 datum.unwrap_array().elements()
366 }
367 ScalarType::List { .. } => datum.unwrap_list(),
368 _ => unreachable!(),
369 };
370
371 let values = list
372 .into_iter()
373 .map(|datum| {
374 TypedDatum::new(
375 datum,
376 &ColumnType {
377 nullable: true,
378 scalar_type: ty.unwrap_collection_element_type().clone(),
379 },
380 )
381 .avro()
382 })
383 .collect();
384 Value::Array(values)
385 }
386 ScalarType::Map { value_type, .. } => {
387 let map = datum.unwrap_map();
388 let elements = map
389 .into_iter()
390 .map(|(key, datum)| {
391 let value = TypedDatum::new(
392 datum,
393 &ColumnType {
394 nullable: true,
395 scalar_type: (**value_type).clone(),
396 },
397 )
398 .avro();
399 (key.to_string(), value)
400 })
401 .collect();
402 Value::Map(elements)
403 }
404 ScalarType::Record { fields, .. } => {
405 let list = datum.unwrap_list();
406 let fields = fields
407 .iter()
408 .zip(&list)
409 .map(|((name, typ), datum)| {
410 let name = name.to_string();
411 let datum = TypedDatum::new(datum, typ);
412 let value = datum.avro();
413 (name, value)
414 })
415 .collect();
416 Value::Record(fields)
417 }
418 ScalarType::MzTimestamp => Value::String(datum.unwrap_mz_timestamp().to_string()),
419 ScalarType::Range { .. } => Value::String(datum.unwrap_range().to_string()),
420 ScalarType::MzAclItem => Value::String(datum.unwrap_mz_acl_item().to_string()),
421 };
422 if typ.nullable {
423 val = Value::Union {
424 index: 1,
425 inner: Box::new(val),
426 n_variants: 2,
427 null_variant: Some(0),
428 };
429 }
430 val
431 }
432 }
433}
434
435pub fn get_debezium_transaction_schema() -> &'static Schema {
436 &DEBEZIUM_TRANSACTION_SCHEMA
437}
438
439pub fn encode_debezium_transaction_unchecked(
440 schema_id: i32,
441 collection: &str,
442 id: &str,
443 status: &str,
444 message_count: Option<i64>,
445) -> Vec<u8> {
446 let mut buf = Vec::new();
447 encode_avro_header(&mut buf, schema_id);
448
449 let transaction_id = Value::String(id.to_owned());
450 let status = Value::String(status.to_owned());
451 let event_count = match message_count {
452 None => Value::Union {
453 index: 0,
454 inner: Box::new(Value::Null),
455 n_variants: 2,
456 null_variant: Some(0),
457 },
458 Some(count) => Value::Union {
459 index: 1,
460 inner: Box::new(Value::Long(count)),
461 n_variants: 2,
462 null_variant: Some(0),
463 },
464 };
465
466 let data_collections = if let Some(message_count) = message_count {
467 let collection = Value::Record(vec![
468 ("data_collection".into(), Value::String(collection.into())),
469 ("event_count".into(), Value::Long(message_count)),
470 ]);
471 Value::Union {
472 index: 1,
473 inner: Box::new(Value::Array(vec![collection])),
474 n_variants: 2,
475 null_variant: Some(0),
476 }
477 } else {
478 Value::Union {
479 index: 0,
480 inner: Box::new(Value::Null),
481 n_variants: 2,
482 null_variant: Some(0),
483 }
484 };
485
486 let record_contents = vec![
487 ("id".into(), transaction_id),
488 ("status".into(), status),
489 ("event_count".into(), event_count),
490 ("data_collections".into(), data_collections),
491 ];
492 let avro = Value::Record(record_contents);
493 debug_assert!(avro.validate(DEBEZIUM_TRANSACTION_SCHEMA.top_node()));
494 mz_avro::encode_unchecked(&avro, &DEBEZIUM_TRANSACTION_SCHEMA, &mut buf);
495 buf
496}