mz_interchange/avro/
encode.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::fmt;
12use std::sync::LazyLock;
13
14use anyhow::Ok;
15use byteorder::{NetworkEndian, WriteBytesExt};
16use chrono::Timelike;
17use itertools::Itertools;
18use mz_avro::Schema;
19use mz_avro::types::{DecimalValue, ToAvro, Value};
20use mz_ore::cast::CastFrom;
21use mz_repr::adt::jsonb::JsonbRef;
22use mz_repr::adt::numeric::{self, NUMERIC_AGG_MAX_PRECISION, NUMERIC_DATUM_MAX_PRECISION};
23use mz_repr::{CatalogItemId, ColumnName, ColumnType, Datum, RelationDesc, Row, ScalarType};
24use serde_json::json;
25
26use crate::encode::{Encode, TypedDatum, column_names_and_types};
27use crate::envelopes::{self, DBZ_ROW_TYPE_ID, ENVELOPE_CUSTOM_NAMES};
28use crate::json::{SchemaOptions, build_row_schema_json};
29
30// TODO(rkhaitan): this schema intentionally omits the data_collections field
31// that is typically present in Debezium transaction metadata topics. See
32// https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-transaction-metadata
33// for more information. We chose to omit this field because it is redundant
34// for sinks where each consistency topic corresponds to exactly one sink.
35// We will need to add it in order to be able to reingest sinked topics.
36static DEBEZIUM_TRANSACTION_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
37    Schema::parse(&json!({
38        "type": "record",
39        "name": "envelope",
40        "fields": [
41            {
42                "name": "id",
43                "type": "string"
44            },
45            {
46                "name": "status",
47                "type": "string"
48            },
49            {
50                "name": "event_count",
51                "type": [
52                  "null",
53                  "long"
54                ]
55            },
56            {
57                "name": "data_collections",
58                "type": [
59                    "null",
60                    {
61                        "type": "array",
62                        "items": {
63                            "type": "record",
64                            "name": "data_collection",
65                            "fields": [
66                                {
67                                    "name": "data_collection",
68                                    "type": "string"
69                                },
70                                {
71                                    "name": "event_count",
72                                    "type": "long"
73                                },
74                            ]
75                        }
76                    }
77                ],
78                "default": null,
79            },
80        ]
81    }))
82    .expect("valid schema constructed")
83});
84
85fn encode_avro_header(buf: &mut Vec<u8>, schema_id: i32) {
86    // The first byte is a magic byte (0) that indicates the Confluent
87    // serialization format version, and the next four bytes are a
88    // 32-bit schema ID.
89    //
90    // https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
91    buf.write_u8(0).expect("writing to vec cannot fail");
92    buf.write_i32::<NetworkEndian>(schema_id)
93        .expect("writing to vec cannot fail");
94}
95
96fn encode_message_unchecked(
97    schema_id: i32,
98    row: Row,
99    schema: &Schema,
100    columns: &[(ColumnName, ColumnType)],
101) -> Vec<u8> {
102    let mut buf = vec![];
103    encode_avro_header(&mut buf, schema_id);
104    let value = encode_datums_as_avro(row.iter(), columns);
105    mz_avro::encode_unchecked(&value, schema, &mut buf);
106    buf
107}
108
109#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
110pub enum DocTarget {
111    Type(CatalogItemId),
112    Field {
113        object_id: CatalogItemId,
114        column_name: ColumnName,
115    },
116}
117
118impl DocTarget {
119    fn id(&self) -> CatalogItemId {
120        match self {
121            DocTarget::Type(object_id) => *object_id,
122            DocTarget::Field { object_id, .. } => *object_id,
123        }
124    }
125}
126
127/// Generates an Avro schema
128pub struct AvroSchemaGenerator {
129    columns: Vec<(ColumnName, ColumnType)>,
130    schema: Schema,
131}
132
133impl fmt::Debug for AvroSchemaGenerator {
134    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
135        f.debug_struct("SchemaGenerator")
136            .field("writer_schema", &self.schema())
137            .finish()
138    }
139}
140
141impl AvroSchemaGenerator {
142    pub fn new(
143        desc: RelationDesc,
144        debezium: bool,
145        mut doc_options: BTreeMap<DocTarget, String>,
146        avro_fullname: &str,
147        set_null_defaults: bool,
148        sink_from: Option<CatalogItemId>,
149        use_custom_envelope_names: bool,
150    ) -> Result<Self, anyhow::Error> {
151        let mut columns = column_names_and_types(desc);
152        if debezium {
153            columns = envelopes::dbz_envelope(columns);
154            // With DEBEZIUM envelope the message is wrapped into "before" and "after"
155            // with `DBZ_ROW_TYPE_ID` instead of `sink_from`.
156            // Replacing comments for the columns and type in `sink_from` to `DBZ_ROW_TYPE_ID`.
157            if let Some(sink_from_id) = sink_from {
158                let mut new_column_docs = BTreeMap::new();
159                doc_options.iter().for_each(|(k, v)| {
160                    if k.id() == sink_from_id {
161                        match k {
162                            DocTarget::Field { column_name, .. } => {
163                                new_column_docs.insert(
164                                    DocTarget::Field {
165                                        object_id: DBZ_ROW_TYPE_ID,
166                                        column_name: column_name.clone(),
167                                    },
168                                    v.clone(),
169                                );
170                            }
171                            DocTarget::Type(_) => {
172                                new_column_docs.insert(DocTarget::Type(DBZ_ROW_TYPE_ID), v.clone());
173                            }
174                        }
175                    }
176                });
177                doc_options.append(&mut new_column_docs);
178                doc_options.retain(|k, _v| k.id() != sink_from_id);
179            }
180        }
181        let custom_names = if use_custom_envelope_names {
182            &ENVELOPE_CUSTOM_NAMES
183        } else {
184            &BTreeMap::new()
185        };
186        let row_schema = build_row_schema_json(
187            &columns,
188            avro_fullname,
189            custom_names,
190            sink_from,
191            &SchemaOptions {
192                set_null_defaults,
193                doc_comments: doc_options,
194            },
195        )?;
196        let schema = Schema::parse(&row_schema).expect("valid schema constructed");
197        Ok(AvroSchemaGenerator { columns, schema })
198    }
199
200    pub fn schema(&self) -> &Schema {
201        &self.schema
202    }
203
204    pub fn columns(&self) -> &[(ColumnName, ColumnType)] {
205        &self.columns
206    }
207}
208
209/// Manages encoding of Avro-encoded bytes.
210pub struct AvroEncoder {
211    columns: Vec<(ColumnName, ColumnType)>,
212    schema: Schema,
213    schema_id: i32,
214}
215
216impl fmt::Debug for AvroEncoder {
217    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
218        f.debug_struct("AvroEncoder")
219            .field("writer_schema", &self.schema)
220            .finish()
221    }
222}
223
224impl AvroEncoder {
225    pub fn new(desc: RelationDesc, debezium: bool, schema: &str, schema_id: i32) -> Self {
226        let mut columns = column_names_and_types(desc);
227        if debezium {
228            columns = envelopes::dbz_envelope(columns);
229        };
230        AvroEncoder {
231            columns,
232            schema: Schema::parse(&serde_json::from_str(schema).expect("valid schema json"))
233                .expect("valid schema"),
234            schema_id,
235        }
236    }
237}
238
239impl Encode for AvroEncoder {
240    fn encode_unchecked(&self, row: Row) -> Vec<u8> {
241        encode_message_unchecked(self.schema_id, row, &self.schema, &self.columns)
242    }
243
244    fn hash(&self, buf: &[u8]) -> u64 {
245        // Compute a stable hash by ignoring the avro header which might contain a
246        // non-deterministic schema id.
247        let (_schema_id, payload) = crate::confluent::extract_avro_header(buf).unwrap();
248        seahash::hash(payload)
249    }
250}
251
252/// Encodes a sequence of `Datum` as Avro (key and value), using supplied column names and types.
253pub fn encode_datums_as_avro<'a, I>(datums: I, names_types: &[(ColumnName, ColumnType)]) -> Value
254where
255    I: IntoIterator<Item = Datum<'a>>,
256{
257    let value_fields: Vec<(String, Value)> = names_types
258        .iter()
259        .zip_eq(datums)
260        .map(|((name, typ), datum)| {
261            let name = name.as_str().to_owned();
262            (name, TypedDatum::new(datum, typ).avro())
263        })
264        .collect();
265    let v = Value::Record(value_fields);
266    v
267}
268
269impl<'a> mz_avro::types::ToAvro for TypedDatum<'a> {
270    fn avro(self) -> Value {
271        let TypedDatum { datum, typ } = self;
272        if typ.nullable && datum.is_null() {
273            Value::Union {
274                index: 0,
275                inner: Box::new(Value::Null),
276                n_variants: 2,
277                null_variant: Some(0),
278            }
279        } else {
280            let mut val = match &typ.scalar_type {
281                ScalarType::AclItem => Value::String(datum.unwrap_acl_item().to_string()),
282                ScalarType::Bool => Value::Boolean(datum.unwrap_bool()),
283                ScalarType::PgLegacyChar => {
284                    Value::Fixed(1, datum.unwrap_uint8().to_le_bytes().into())
285                }
286                ScalarType::Int16 => Value::Int(i32::from(datum.unwrap_int16())),
287                ScalarType::Int32 => Value::Int(datum.unwrap_int32()),
288                ScalarType::Int64 => Value::Long(datum.unwrap_int64()),
289                ScalarType::UInt16 => Value::Fixed(2, datum.unwrap_uint16().to_be_bytes().into()),
290                ScalarType::UInt32 => Value::Fixed(4, datum.unwrap_uint32().to_be_bytes().into()),
291                ScalarType::UInt64 => Value::Fixed(8, datum.unwrap_uint64().to_be_bytes().into()),
292                ScalarType::Oid
293                | ScalarType::RegClass
294                | ScalarType::RegProc
295                | ScalarType::RegType => {
296                    Value::Fixed(4, datum.unwrap_uint32().to_be_bytes().into())
297                }
298                ScalarType::Float32 => Value::Float(datum.unwrap_float32()),
299                ScalarType::Float64 => Value::Double(datum.unwrap_float64()),
300                ScalarType::Numeric { max_scale } => {
301                    let mut d = datum.unwrap_numeric().0;
302                    let (unscaled, precision, scale) = match max_scale {
303                        Some(max_scale) => {
304                            // Values must be rescaled to resaturate trailing zeroes
305                            numeric::rescale(&mut d, max_scale.into_u8()).unwrap();
306                            (
307                                numeric::numeric_to_twos_complement_be(d).to_vec(),
308                                NUMERIC_DATUM_MAX_PRECISION,
309                                max_scale.into_u8(),
310                            )
311                        }
312                        // Decimals without specified scale must nonetheless be
313                        // expressed as a fixed scale, so we write everything as
314                        // a 78-digit number with a scale of 39, which
315                        // definitively expresses all valid numeric values.
316                        None => (
317                            numeric::numeric_to_twos_complement_wide(d).to_vec(),
318                            NUMERIC_AGG_MAX_PRECISION,
319                            NUMERIC_DATUM_MAX_PRECISION,
320                        ),
321                    };
322                    Value::Decimal(DecimalValue {
323                        unscaled,
324                        precision: usize::cast_from(precision),
325                        scale: usize::cast_from(scale),
326                    })
327                }
328                ScalarType::Date => Value::Date(datum.unwrap_date().unix_epoch_days()),
329                ScalarType::Time => Value::Long({
330                    let time = datum.unwrap_time();
331                    i64::from(time.num_seconds_from_midnight()) * 1_000_000
332                        + i64::from(time.nanosecond()) / 1_000
333                }),
334                ScalarType::Timestamp { .. } => {
335                    Value::Timestamp(datum.unwrap_timestamp().to_naive())
336                }
337                ScalarType::TimestampTz { .. } => {
338                    Value::Timestamp(datum.unwrap_timestamptz().to_naive())
339                }
340                // SQL intervals and Avro durations differ quite a lot (signed
341                // vs unsigned, different int sizes), so SQL intervals are their
342                // own bespoke type.
343                ScalarType::Interval => Value::Fixed(16, {
344                    let iv = datum.unwrap_interval();
345                    let mut buf = Vec::with_capacity(16);
346                    buf.extend(iv.months.to_le_bytes());
347                    buf.extend(iv.days.to_le_bytes());
348                    buf.extend(iv.micros.to_le_bytes());
349                    debug_assert_eq!(buf.len(), 16);
350                    buf
351                }),
352                ScalarType::Bytes => Value::Bytes(Vec::from(datum.unwrap_bytes())),
353                ScalarType::String | ScalarType::VarChar { .. } | ScalarType::PgLegacyName => {
354                    Value::String(datum.unwrap_str().to_owned())
355                }
356                ScalarType::Char { length } => {
357                    let s = mz_repr::adt::char::format_str_pad(datum.unwrap_str(), *length);
358                    Value::String(s)
359                }
360                ScalarType::Jsonb => Value::Json(JsonbRef::from_datum(datum).to_serde_json()),
361                ScalarType::Uuid => Value::Uuid(datum.unwrap_uuid()),
362                ty @ (ScalarType::Array(..) | ScalarType::Int2Vector | ScalarType::List { .. }) => {
363                    let list = match ty {
364                        ScalarType::Array(_) | ScalarType::Int2Vector => {
365                            datum.unwrap_array().elements()
366                        }
367                        ScalarType::List { .. } => datum.unwrap_list(),
368                        _ => unreachable!(),
369                    };
370
371                    let values = list
372                        .into_iter()
373                        .map(|datum| {
374                            TypedDatum::new(
375                                datum,
376                                &ColumnType {
377                                    nullable: true,
378                                    scalar_type: ty.unwrap_collection_element_type().clone(),
379                                },
380                            )
381                            .avro()
382                        })
383                        .collect();
384                    Value::Array(values)
385                }
386                ScalarType::Map { value_type, .. } => {
387                    let map = datum.unwrap_map();
388                    let elements = map
389                        .into_iter()
390                        .map(|(key, datum)| {
391                            let value = TypedDatum::new(
392                                datum,
393                                &ColumnType {
394                                    nullable: true,
395                                    scalar_type: (**value_type).clone(),
396                                },
397                            )
398                            .avro();
399                            (key.to_string(), value)
400                        })
401                        .collect();
402                    Value::Map(elements)
403                }
404                ScalarType::Record { fields, .. } => {
405                    let list = datum.unwrap_list();
406                    let fields = fields
407                        .iter()
408                        .zip(&list)
409                        .map(|((name, typ), datum)| {
410                            let name = name.to_string();
411                            let datum = TypedDatum::new(datum, typ);
412                            let value = datum.avro();
413                            (name, value)
414                        })
415                        .collect();
416                    Value::Record(fields)
417                }
418                ScalarType::MzTimestamp => Value::String(datum.unwrap_mz_timestamp().to_string()),
419                ScalarType::Range { .. } => Value::String(datum.unwrap_range().to_string()),
420                ScalarType::MzAclItem => Value::String(datum.unwrap_mz_acl_item().to_string()),
421            };
422            if typ.nullable {
423                val = Value::Union {
424                    index: 1,
425                    inner: Box::new(val),
426                    n_variants: 2,
427                    null_variant: Some(0),
428                };
429            }
430            val
431        }
432    }
433}
434
435pub fn get_debezium_transaction_schema() -> &'static Schema {
436    &DEBEZIUM_TRANSACTION_SCHEMA
437}
438
439pub fn encode_debezium_transaction_unchecked(
440    schema_id: i32,
441    collection: &str,
442    id: &str,
443    status: &str,
444    message_count: Option<i64>,
445) -> Vec<u8> {
446    let mut buf = Vec::new();
447    encode_avro_header(&mut buf, schema_id);
448
449    let transaction_id = Value::String(id.to_owned());
450    let status = Value::String(status.to_owned());
451    let event_count = match message_count {
452        None => Value::Union {
453            index: 0,
454            inner: Box::new(Value::Null),
455            n_variants: 2,
456            null_variant: Some(0),
457        },
458        Some(count) => Value::Union {
459            index: 1,
460            inner: Box::new(Value::Long(count)),
461            n_variants: 2,
462            null_variant: Some(0),
463        },
464    };
465
466    let data_collections = if let Some(message_count) = message_count {
467        let collection = Value::Record(vec![
468            ("data_collection".into(), Value::String(collection.into())),
469            ("event_count".into(), Value::Long(message_count)),
470        ]);
471        Value::Union {
472            index: 1,
473            inner: Box::new(Value::Array(vec![collection])),
474            n_variants: 2,
475            null_variant: Some(0),
476        }
477    } else {
478        Value::Union {
479            index: 0,
480            inner: Box::new(Value::Null),
481            n_variants: 2,
482            null_variant: Some(0),
483        }
484    };
485
486    let record_contents = vec![
487        ("id".into(), transaction_id),
488        ("status".into(), status),
489        ("event_count".into(), event_count),
490        ("data_collections".into(), data_collections),
491    ];
492    let avro = Value::Record(record_contents);
493    debug_assert!(avro.validate(DEBEZIUM_TRANSACTION_SCHEMA.top_node()));
494    mz_avro::encode_unchecked(&avro, &DEBEZIUM_TRANSACTION_SCHEMA, &mut buf);
495    buf
496}