Skip to main content

mz_interchange/avro/
schema.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Conversion from Avro schemas to Materialize `RelationDesc`s.
11//!
12//! A few notes for posterity on how this conversion happens are in order.
13//!
14//! If the schema is an Avro record, we flatten it to its fields, which become the columns
15//! of the relation.
16//!
17//! Each individual field is then converted to its SQL equivalent. For most types, this
18//! conversion is the obvious one. The only non-trivial counterexample is Avro unions.
19//!
20//! Since Avro types are not nullable by default, the typical way normal (i.e., nullable)
21//! SQL fields are represented in Avro is by a union of the underlying type with the
22//! singleton type { Null }; in Avro schema notation, this is `["null", "TheType"]`.
23//! We shall call union types following this pattern _Nullability-Pattern Unions_.
24//! We shall call all other union types (e.g. `["MyType1", "MyType2"]` or `["null", "MyType1", "MyType2"]`) _Essential Unions_.
25//! Since there is an obvious way to represent Nullability-Pattern Unions, but not Essential Unions, in the SQL type system,
26//! we must handle Essential Unions with a bit of a hack (at least until Materialize supports union or sum types, which may be never).
27//!
28//! When an Essential Union appears as one of the fields of a record, we expand
29//! it to _n_ columns in SQL, where _n_ is the number of non-null variants in the union. These
30//! columns will be given names created by pasting their index at the end of the overall name
31//! of the field. For example, if an Essential Union in a field named `"Foo"` has schema `[int, bool]`, it will expand to the columns `"Foo1": bool, "Foo2": int`. There is an implicit constraint upheld be the source pipeline that only one such column will be non-`null` at a time
32//!
33//! When an Essential Union appears _elsewhere_ than as one of the fields of a record,
34//! there is nothing we can do, because we expect to be able to turn it into exactly one
35//! SQL type, not a series of them. Thus, in these cases, we just bail. For example, it's
36//! not possible to ingest an array or map whose element type is an Essential Union.
37
38use std::collections::btree_map::Entry;
39use std::collections::{BTreeMap, BTreeSet};
40use std::fmt;
41use std::sync::Arc;
42
43use anyhow::{Context, anyhow, bail};
44use mz_avro::error::Error as AvroError;
45use mz_avro::schema::{
46    ParseSchemaError, Schema, SchemaNode, SchemaPiece, SchemaPieceOrNamed, resolve_schemas,
47};
48use mz_ore::cast::CastFrom;
49use mz_ore::collections::CollectionExt;
50use mz_ore::future::OreFutureExt;
51use mz_ore::retry::Retry;
52use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
53use mz_repr::adt::timestamp::TimestampPrecision;
54use mz_repr::{ColumnName, RelationDesc, SqlColumnType, SqlScalarType, UNKNOWN_COLUMN_NAME};
55use tracing::warn;
56
57use crate::avro::is_null;
58
59pub fn parse_schema(schema: &str, references: &[String]) -> anyhow::Result<Schema> {
60    let schema: serde_json::Value = serde_json::from_str(schema)?;
61    // Parse reference schemas incrementally: each reference may depend on previous ones.
62    // References must be provided in dependency order (dependencies first).
63    let mut parsed_refs: Vec<Schema> = Vec::with_capacity(references.len());
64    for reference in references {
65        let ref_json: serde_json::Value = serde_json::from_str(reference)?;
66        let parsed = Schema::parse_with_references(&ref_json, &parsed_refs)?;
67        parsed_refs.push(parsed);
68    }
69    Ok(Schema::parse_with_references(&schema, &parsed_refs)?)
70}
71
72/// Converts an Apache Avro schema into a list of column names and types.
73// TODO(petrosagg): find a way to make this a TryFrom impl somewhere
74pub fn schema_to_relationdesc(schema: Schema) -> Result<RelationDesc, anyhow::Error> {
75    // TODO(petrosagg): call directly into validate_schema_2 and do the Record flattening once
76    // we're in RelationDesc land
77    Ok(RelationDesc::from_names_and_types(validate_schema_1(
78        schema.top_node(),
79    )?))
80}
81
82/// Convert an Avro schema to a series of columns and names, flattening the top-level record,
83/// if the top node is indeed a record.
84fn validate_schema_1(schema: SchemaNode) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
85    let mut columns = vec![];
86    let mut seen_avro_nodes = Default::default();
87    match schema.inner {
88        SchemaPiece::Record { fields, .. } => {
89            for f in fields {
90                columns.extend(get_named_columns(
91                    &mut seen_avro_nodes,
92                    schema.step(&f.schema),
93                    Some(&f.name),
94                )?);
95            }
96        }
97        _ => {
98            columns.extend(get_named_columns(&mut seen_avro_nodes, schema, None)?);
99        }
100    }
101    Ok(columns)
102}
103
104/// Get the series of (one or more) SQL columns corresponding to an Avro union.
105/// See module comments for details.
106fn get_union_columns<'a>(
107    seen_avro_nodes: &mut BTreeSet<usize>,
108    schema: SchemaNode<'a>,
109    base_name: Option<&str>,
110) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
111    let us = match schema.inner {
112        SchemaPiece::Union(us) => us,
113        _ => panic!("This function should only be called on unions."),
114    };
115    let mut columns = vec![];
116    let vs = us.variants();
117    if vs.is_empty() || (vs.len() == 1 && is_null(&vs[0])) {
118        bail!(anyhow!("Empty or null-only unions are not supported"));
119    } else {
120        for (i, v) in vs.iter().filter(|v| !is_null(v)).enumerate() {
121            let named_idx = match v {
122                SchemaPieceOrNamed::Named(idx) => Some(*idx),
123                SchemaPieceOrNamed::Piece(_) => None,
124            };
125            if let Some(named_idx) = named_idx {
126                if !seen_avro_nodes.insert(named_idx) {
127                    bail!(
128                        "Recursive types are not supported: {}",
129                        v.get_human_name(schema.root)
130                    );
131                }
132            }
133            let node = schema.step(v);
134            if let SchemaPiece::Union(_) = node.inner {
135                unreachable!("Internal error: directly nested avro union!");
136            }
137
138            let name = if vs.len() == 1 || (vs.len() == 2 && vs.iter().any(is_null)) {
139                // There is only one non-null variant in the
140                // union, so we can use the field name directly.
141                base_name
142                    .map(|n| n.to_owned())
143                    .or_else(|| {
144                        v.get_piece_and_name(schema.root)
145                            .1
146                            .map(|full_name| full_name.base_name().to_owned())
147                    })
148                    .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into())
149            } else {
150                // There are multiple non-null variants in the
151                // union, so we need to invent field names for
152                // each variant.
153                base_name
154                    .map(|n| format!("{}{}", n, i + 1))
155                    .or_else(|| {
156                        v.get_piece_and_name(schema.root)
157                            .1
158                            .map(|full_name| full_name.base_name().to_owned())
159                    })
160                    .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into())
161            };
162
163            // If there is more than one variant in the union,
164            // the column's output type is nullable, as this
165            // column will be null whenever it is uninhabited.
166            let ty = validate_schema_2(seen_avro_nodes, node)?;
167            columns.push((name.into(), ty.nullable(vs.len() > 1)));
168            if let Some(named_idx) = named_idx {
169                seen_avro_nodes.remove(&named_idx);
170            }
171        }
172    }
173    Ok(columns)
174}
175
176fn get_named_columns<'a>(
177    seen_avro_nodes: &mut BTreeSet<usize>,
178    schema: SchemaNode<'a>,
179    base_name: Option<&str>,
180) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
181    if let SchemaPiece::Union(_) = schema.inner {
182        get_union_columns(seen_avro_nodes, schema, base_name)
183    } else {
184        let scalar_type = validate_schema_2(seen_avro_nodes, schema)?;
185        Ok(vec![(
186            // TODO(benesch): we should do better than this when there's no base
187            // name, e.g., invent a name based on the type.
188            base_name.unwrap_or(UNKNOWN_COLUMN_NAME).into(),
189            scalar_type.nullable(false),
190        )])
191    }
192}
193
194/// Get the single column corresponding to a schema node.
195/// It is an error if this node should correspond to more than one column
196/// (because it is an Essential Union in the sense described in the module docs).
197fn validate_schema_2(
198    seen_avro_nodes: &mut BTreeSet<usize>,
199    schema: SchemaNode,
200) -> anyhow::Result<SqlScalarType> {
201    Ok(match schema.inner {
202        SchemaPiece::Union(_) => {
203            let columns = get_union_columns(seen_avro_nodes, schema, None)?;
204            if columns.len() != 1 {
205                bail!("Union of more than one non-null type not valid here");
206            }
207            let (_column_name, column_type) = columns.into_element();
208            // It's okay to lose the nullability information here, as it's not relevant to
209            // any higher layer. This will either be included in an array or map type,
210            // where all values are nullable. It can't be included as a top-level column
211            // or as a record type, where nullability is actually tracked, because in
212            // those cases we will have already gone through the `Union` code path in
213            // `get_named_columns`.
214            column_type.scalar_type
215        }
216        SchemaPiece::Null => bail!("null outside of union types is not supported"),
217        SchemaPiece::Boolean => SqlScalarType::Bool,
218        SchemaPiece::Int => SqlScalarType::Int32,
219        SchemaPiece::Long => SqlScalarType::Int64,
220        SchemaPiece::Float => SqlScalarType::Float32,
221        SchemaPiece::Double => SqlScalarType::Float64,
222        SchemaPiece::Date => SqlScalarType::Date,
223        SchemaPiece::TimestampMilli => SqlScalarType::Timestamp {
224            precision: Some(TimestampPrecision::try_from(3).unwrap()),
225        },
226        SchemaPiece::TimestampMicro => SqlScalarType::Timestamp {
227            precision: Some(TimestampPrecision::try_from(6).unwrap()),
228        },
229        SchemaPiece::Decimal {
230            precision, scale, ..
231        } => {
232            if *precision > usize::cast_from(NUMERIC_DATUM_MAX_PRECISION) {
233                bail!(
234                    "decimals with precision greater than {} are not supported",
235                    NUMERIC_DATUM_MAX_PRECISION
236                )
237            }
238            SqlScalarType::Numeric {
239                max_scale: Some(NumericMaxScale::try_from(*scale)?),
240            }
241        }
242        SchemaPiece::Bytes | SchemaPiece::Fixed { .. } => SqlScalarType::Bytes,
243        SchemaPiece::String | SchemaPiece::Enum { .. } => SqlScalarType::String,
244
245        SchemaPiece::Json => SqlScalarType::Jsonb,
246        SchemaPiece::Uuid => SqlScalarType::Uuid,
247        SchemaPiece::Record { fields, .. } => {
248            let mut columns = vec![];
249            for f in fields {
250                let named_idx = match &f.schema {
251                    SchemaPieceOrNamed::Named(idx) => Some(*idx),
252                    SchemaPieceOrNamed::Piece(_) => None,
253                };
254                if let Some(named_idx) = named_idx {
255                    if !seen_avro_nodes.insert(named_idx) {
256                        bail!(
257                            "Recursive types are not supported: {}",
258                            f.schema.get_human_name(schema.root)
259                        );
260                    }
261                }
262                let next_node = schema.step(&f.schema);
263                columns.extend(get_named_columns(
264                    seen_avro_nodes,
265                    next_node,
266                    Some(&f.name),
267                )?);
268                if let Some(named_idx) = named_idx {
269                    seen_avro_nodes.remove(&named_idx);
270                }
271            }
272            SqlScalarType::Record {
273                fields: columns.into(),
274                custom_id: None,
275            }
276        }
277        SchemaPiece::Array(inner) => {
278            let named_idx = match inner.as_ref() {
279                SchemaPieceOrNamed::Named(idx) => Some(*idx),
280                SchemaPieceOrNamed::Piece(_) => None,
281            };
282            if let Some(named_idx) = named_idx {
283                if !seen_avro_nodes.insert(named_idx) {
284                    bail!(
285                        "Recursive types are not supported: {}",
286                        inner.get_human_name(schema.root)
287                    );
288                }
289            }
290            let next_node = schema.step(inner);
291            let ret = SqlScalarType::List {
292                element_type: Box::new(validate_schema_2(seen_avro_nodes, next_node)?),
293                custom_id: None,
294            };
295            if let Some(named_idx) = named_idx {
296                seen_avro_nodes.remove(&named_idx);
297            }
298            ret
299        }
300        SchemaPiece::Map(inner) => SqlScalarType::Map {
301            value_type: Box::new(validate_schema_2(seen_avro_nodes, schema.step(inner))?),
302            custom_id: None,
303        },
304
305        _ => bail!("Unsupported type in schema: {:?}", schema.inner),
306    })
307}
308
309pub struct ConfluentAvroResolver {
310    reader_schema: Schema,
311    writer_schemas: Option<SchemaCache>,
312    confluent_wire_format: bool,
313}
314
315impl ConfluentAvroResolver {
316    pub fn new(
317        reader_schema: &str,
318        reader_reference_schemas: &[String],
319        ccsr_client: Option<mz_ccsr::Client>,
320        confluent_wire_format: bool,
321    ) -> anyhow::Result<Self> {
322        // parse_schema handles incremental parsing of references (dependencies first)
323        let reader_schema = parse_schema(reader_schema, reader_reference_schemas)?;
324        let writer_schemas = ccsr_client.map(SchemaCache::new).transpose()?;
325        Ok(Self {
326            reader_schema,
327            writer_schemas,
328            confluent_wire_format,
329        })
330    }
331
332    pub async fn resolve<'a, 'b>(
333        &'a mut self,
334        mut bytes: &'b [u8],
335    ) -> anyhow::Result<anyhow::Result<(&'b [u8], &'a Schema, Option<i32>)>> {
336        let (resolved_schema, schema_id) = match &mut self.writer_schemas {
337            Some(cache) => {
338                debug_assert!(
339                    self.confluent_wire_format,
340                    "We should have set 'confluent_wire_format' everywhere \
341                     that can lead to this branch"
342                );
343                // XXX(guswynn): use destructuring assignments when they are stable
344                let (schema_id, adjusted_bytes) = match crate::confluent::extract_avro_header(bytes)
345                {
346                    Ok(ok) => ok,
347                    Err(err) => return Ok(Err(err)),
348                };
349                bytes = adjusted_bytes;
350                let result = cache
351                    .get(schema_id, &self.reader_schema)
352                    // The outer Result describes transient errors so use ? here to propagate
353                    .await?
354                    .with_context(|| format!("failed to resolve Avro schema (id = {})", schema_id));
355                let schema = match result {
356                    Ok(schema) => schema,
357                    Err(err) => return Ok(Err(err)),
358                };
359                (schema, Some(schema_id))
360            }
361
362            // If we haven't been asked to use a schema registry, we have no way
363            // to discover the writer's schema. That's ok; we'll just use the
364            // reader's schema and hope it lines up.
365            None => {
366                if self.confluent_wire_format {
367                    // validate and just move the bytes buffer ahead
368                    let (_, adjusted_bytes) = match crate::confluent::extract_avro_header(bytes) {
369                        Ok(ok) => ok,
370                        Err(err) => return Ok(Err(err)),
371                    };
372                    bytes = adjusted_bytes;
373                }
374                (&self.reader_schema, None)
375            }
376        };
377        Ok(Ok((bytes, resolved_schema, schema_id)))
378    }
379}
380
381impl fmt::Debug for ConfluentAvroResolver {
382    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
383        f.debug_struct("ConfluentAvroResolver")
384            .field("reader_schema", &self.reader_schema)
385            .field(
386                "write_schema",
387                if self.writer_schemas.is_some() {
388                    &"some"
389                } else {
390                    &"none"
391                },
392            )
393            .finish()
394    }
395}
396
397#[derive(Debug)]
398struct SchemaCache {
399    cache: BTreeMap<i32, Result<Schema, AvroError>>,
400    ccsr_client: Arc<mz_ccsr::Client>,
401}
402
403impl SchemaCache {
404    fn new(ccsr_client: mz_ccsr::Client) -> Result<SchemaCache, anyhow::Error> {
405        Ok(SchemaCache {
406            cache: BTreeMap::new(),
407            ccsr_client: Arc::new(ccsr_client),
408        })
409    }
410
411    /// Looks up the writer schema for ID. If the schema is literally identical
412    /// to the reader schema, as determined by the reader schema fingerprint
413    /// that this schema cache was initialized with, returns the schema directly.
414    /// If not, performs schema resolution on the reader and writer and
415    /// returns the result.
416    ///
417    /// This method also handles schema references: if the schema references types
418    /// defined in other schemas, those schemas are fetched and their types are made
419    /// available during parsing.
420    async fn get(
421        &mut self,
422        id: i32,
423        reader_schema: &Schema,
424    ) -> anyhow::Result<anyhow::Result<&Schema>> {
425        let entry = match self.cache.entry(id) {
426            Entry::Occupied(o) => o.into_mut(),
427            Entry::Vacant(v) => {
428                // An issue with _fetching_ the schema should be returned
429                // immediately, and not cached, since it might get better on the
430                // next retry.
431                let ccsr_client = Arc::clone(&self.ccsr_client);
432
433                // Fetch schema with its references (if any)
434                let (primary_subject, reference_subjects) = Retry::default()
435                    // Twice the timeout of the ccsr client so we can attempt 2 requests.
436                    .max_duration(ccsr_client.timeout() * 2)
437                    // Canceling because ultimately it's just non-mutating HTTP requests.
438                    .retry_async_canceling(move |state| {
439                        let ccsr_client = Arc::clone(&ccsr_client);
440                        async move {
441                            let res = ccsr_client.get_subject_and_references_by_id(id).await;
442                            match res {
443                                Err(e) => {
444                                    if let Some(timeout) = state.next_backoff {
445                                        warn!(
446                                            "transient failure fetching \
447                                                schema id {}: {:?}, retrying in {:?}",
448                                            id, e, timeout
449                                        );
450                                    }
451                                    Err(anyhow::Error::from(e))
452                                }
453                                _ => Ok(res?),
454                            }
455                        }
456                    })
457                    .run_in_task(|| format!("fetch_avro_schema:{}", id))
458                    .await?;
459
460                // Now, we've gotten some json back, so we want to cache it (regardless of whether it's a valid
461                // avro schema, it won't change).
462                //
463                // However, we can't just cache it directly, since resolving schemas takes significant CPU work,
464                // which we don't want to repeat for every record. So, parse and resolve it, and cache the
465                // result (whether schema or error).
466                let result = Self::parse_with_references(
467                    &primary_subject,
468                    &reference_subjects,
469                    reader_schema,
470                );
471                v.insert(result)
472            }
473        };
474        Ok(entry.as_ref().map_err(|e| anyhow::Error::new(e.clone())))
475    }
476
477    /// Parse a schema along with its references and resolve against the reader schema.
478    fn parse_with_references(
479        primary_subject: &mz_ccsr::Subject,
480        reference_subjects: &[mz_ccsr::Subject],
481        reader_schema: &Schema,
482    ) -> Result<Schema, AvroError> {
483        // Parse referenced schemas incrementally: each reference may depend on previous ones.
484        let mut reference_schemas: Vec<Schema> = Vec::with_capacity(reference_subjects.len());
485        for subject in reference_subjects {
486            let ref_json: serde_json::Value = serde_json::from_str(&subject.schema.raw)
487                .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
488            let parsed = Schema::parse_with_references(&ref_json, &reference_schemas)?;
489            reference_schemas.push(parsed);
490        }
491
492        // Parse primary schema, using references, if present.
493        let primary_value: serde_json::Value = serde_json::from_str(&primary_subject.schema.raw)
494            .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
495        let schema = Schema::parse_with_references(&primary_value, &reference_schemas)?;
496
497        // Schema fingerprints don't actually capture whether two schemas are meaningfully
498        // different, because they strip out logical types. Thus, resolve in all cases.
499        let resolved = resolve_schemas(&schema, reader_schema)?;
500        Ok(resolved)
501    }
502}