Skip to main content

mz_interchange/avro/
schema.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Conversion from Avro schemas to Materialize `RelationDesc`s.
11//!
12//! A few notes for posterity on how this conversion happens are in order.
13//!
14//! If the schema is an Avro record, we flatten it to its fields, which become the columns
15//! of the relation.
16//!
17//! Each individual field is then converted to its SQL equivalent. For most types, this
18//! conversion is the obvious one. The only non-trivial counterexample is Avro unions.
19//!
20//! Since Avro types are not nullable by default, the typical way normal (i.e., nullable)
21//! SQL fields are represented in Avro is by a union of the underlying type with the
22//! singleton type { Null }; in Avro schema notation, this is `["null", "TheType"]`.
23//! We shall call union types following this pattern _Nullability-Pattern Unions_.
24//! We shall call all other union types (e.g. `["MyType1", "MyType2"]` or `["null", "MyType1", "MyType2"]`) _Essential Unions_.
25//! Since there is an obvious way to represent Nullability-Pattern Unions, but not Essential Unions, in the SQL type system,
26//! we must handle Essential Unions with a bit of a hack (at least until Materialize supports union or sum types, which may be never).
27//!
28//! When an Essential Union appears as one of the fields of a record, we expand
29//! it to _n_ columns in SQL, where _n_ is the number of non-null variants in the union. These
30//! columns will be given names created by pasting their index at the end of the overall name
31//! of the field. For example, if an Essential Union in a field named `"Foo"` has schema `[int, bool]`, it will expand to the columns `"Foo1": bool, "Foo2": int`. There is an implicit constraint upheld be the source pipeline that only one such column will be non-`null` at a time
32//!
33//! When an Essential Union appears _elsewhere_ than as one of the fields of a record,
34//! there is nothing we can do, because we expect to be able to turn it into exactly one
35//! SQL type, not a series of them. Thus, in these cases, we just bail. For example, it's
36//! not possible to ingest an array or map whose element type is an Essential Union.
37
38use std::collections::btree_map::Entry;
39use std::collections::{BTreeMap, BTreeSet};
40use std::fmt;
41use std::sync::Arc;
42
43use anyhow::{Context, anyhow, bail};
44use mz_avro::error::Error as AvroError;
45use mz_avro::schema::{
46    ParseSchemaError, Schema, SchemaNode, SchemaPiece, SchemaPieceOrNamed, resolve_schemas,
47};
48use mz_ore::cast::CastFrom;
49use mz_ore::collections::CollectionExt;
50use mz_ore::future::OreFutureExt;
51use mz_ore::retry::Retry;
52use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
53use mz_repr::adt::timestamp::TimestampPrecision;
54use mz_repr::{ColumnName, RelationDesc, SqlColumnType, SqlScalarType, UNKNOWN_COLUMN_NAME};
55use tracing::warn;
56
57use crate::avro::is_null;
58
59pub fn parse_schema(schema: &str, references: &[String]) -> anyhow::Result<Schema> {
60    let schema: serde_json::Value = serde_json::from_str(schema)?;
61    // Parse reference schemas incrementally: each reference may depend on previous ones.
62    // References must be provided in dependency order (dependencies first).
63    let mut parsed_refs: Vec<Schema> = Vec::with_capacity(references.len());
64    for reference in references {
65        let ref_json: serde_json::Value = serde_json::from_str(reference)?;
66        let parsed = Schema::parse_with_references(&ref_json, &parsed_refs)?;
67        parsed_refs.push(parsed);
68    }
69    Ok(Schema::parse_with_references(&schema, &parsed_refs)?)
70}
71
72/// Converts an Apache Avro schema into a list of column names and types.
73// TODO(petrosagg): find a way to make this a TryFrom impl somewhere
74pub fn schema_to_relationdesc(schema: Schema) -> Result<RelationDesc, anyhow::Error> {
75    // TODO(petrosagg): call directly into validate_schema_2 and do the Record flattening once
76    // we're in RelationDesc land
77    Ok(RelationDesc::from_names_and_types(validate_schema_1(
78        schema.top_node(),
79    )?))
80}
81
82/// Convert an Avro schema to a series of columns and names, flattening the top-level record,
83/// if the top node is indeed a record.
84fn validate_schema_1(schema: SchemaNode) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
85    let mut columns = vec![];
86    let mut seen_avro_nodes = Default::default();
87    match schema.inner {
88        SchemaPiece::Record { fields, .. } => {
89            for f in fields {
90                columns.extend(get_named_columns(
91                    &mut seen_avro_nodes,
92                    schema.step(&f.schema),
93                    Some(&f.name),
94                )?);
95            }
96        }
97        _ => {
98            columns.extend(get_named_columns(&mut seen_avro_nodes, schema, None)?);
99        }
100    }
101    Ok(columns)
102}
103
104/// Get the series of (one or more) SQL columns corresponding to an Avro union.
105/// See module comments for details.
106fn get_union_columns<'a>(
107    seen_avro_nodes: &mut BTreeSet<usize>,
108    schema: SchemaNode<'a>,
109    base_name: Option<&str>,
110) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
111    let us = match schema.inner {
112        SchemaPiece::Union(us) => us,
113        _ => panic!("This function should only be called on unions."),
114    };
115    let mut columns = vec![];
116    let vs = us.variants();
117    if vs.is_empty() || (vs.len() == 1 && is_null(&vs[0])) {
118        bail!(anyhow!("Empty or null-only unions are not supported"));
119    } else {
120        for (i, v) in vs.iter().filter(|v| !is_null(v)).enumerate() {
121            let named_idx = match v {
122                SchemaPieceOrNamed::Named(idx) => Some(*idx),
123                _ => None,
124            };
125            if let Some(named_idx) = named_idx {
126                if !seen_avro_nodes.insert(named_idx) {
127                    bail!(
128                        "Recursive types are not supported: {}",
129                        v.get_human_name(schema.root)
130                    );
131                }
132            }
133            let node = schema.step(v);
134            if let SchemaPiece::Union(_) = node.inner {
135                unreachable!("Internal error: directly nested avro union!");
136            }
137
138            let name = if vs.len() == 1 || (vs.len() == 2 && vs.iter().any(is_null)) {
139                // There is only one non-null variant in the
140                // union, so we can use the field name directly.
141                base_name
142                    .map(|n| n.to_owned())
143                    .or_else(|| {
144                        v.get_piece_and_name(schema.root)
145                            .1
146                            .map(|full_name| full_name.base_name().to_owned())
147                    })
148                    .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into())
149            } else {
150                // There are multiple non-null variants in the
151                // union, so we need to invent field names for
152                // each variant.
153                base_name
154                    .map(|n| format!("{}{}", n, i + 1))
155                    .or_else(|| {
156                        v.get_piece_and_name(schema.root)
157                            .1
158                            .map(|full_name| full_name.base_name().to_owned())
159                    })
160                    .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into())
161            };
162
163            // If there is more than one variant in the union,
164            // the column's output type is nullable, as this
165            // column will be null whenever it is uninhabited.
166            let ty = validate_schema_2(seen_avro_nodes, node)?;
167            columns.push((name.into(), ty.nullable(vs.len() > 1)));
168            if let Some(named_idx) = named_idx {
169                seen_avro_nodes.remove(&named_idx);
170            }
171        }
172    }
173    Ok(columns)
174}
175
176fn get_named_columns<'a>(
177    seen_avro_nodes: &mut BTreeSet<usize>,
178    schema: SchemaNode<'a>,
179    base_name: Option<&str>,
180) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
181    if let SchemaPiece::Union(_) = schema.inner {
182        get_union_columns(seen_avro_nodes, schema, base_name)
183    } else {
184        let scalar_type = validate_schema_2(seen_avro_nodes, schema)?;
185        Ok(vec![(
186            // TODO(benesch): we should do better than this when there's no base
187            // name, e.g., invent a name based on the type.
188            base_name.unwrap_or(UNKNOWN_COLUMN_NAME).into(),
189            scalar_type.nullable(false),
190        )])
191    }
192}
193
194/// Get the single column corresponding to a schema node.
195/// It is an error if this node should correspond to more than one column
196/// (because it is an Essential Union in the sense described in the module docs).
197fn validate_schema_2(
198    seen_avro_nodes: &mut BTreeSet<usize>,
199    schema: SchemaNode,
200) -> anyhow::Result<SqlScalarType> {
201    Ok(match schema.inner {
202        SchemaPiece::Union(_) => {
203            let columns = get_union_columns(seen_avro_nodes, schema, None)?;
204            if columns.len() != 1 {
205                bail!("Union of more than one non-null type not valid here");
206            }
207            let (_column_name, column_type) = columns.into_element();
208            // It's okay to lose the nullability information here, as it's not relevant to
209            // any higher layer. This will either be included in an array or map type,
210            // where all values are nullable. It can't be included as a top-level column
211            // or as a record type, where nullability is actually tracked, because in
212            // those cases we will have already gone through the `Union` code path in
213            // `get_named_columns`.
214            column_type.scalar_type
215        }
216        SchemaPiece::Null => bail!("null outside of union types is not supported"),
217        SchemaPiece::Boolean => SqlScalarType::Bool,
218        SchemaPiece::Int => SqlScalarType::Int32,
219        SchemaPiece::Long => SqlScalarType::Int64,
220        SchemaPiece::Float => SqlScalarType::Float32,
221        SchemaPiece::Double => SqlScalarType::Float64,
222        SchemaPiece::Date => SqlScalarType::Date,
223        SchemaPiece::TimestampMilli => SqlScalarType::Timestamp {
224            precision: Some(TimestampPrecision::try_from(3).unwrap()),
225        },
226        SchemaPiece::TimestampMicro => SqlScalarType::Timestamp {
227            precision: Some(TimestampPrecision::try_from(6).unwrap()),
228        },
229        SchemaPiece::Decimal {
230            precision, scale, ..
231        } => {
232            if *precision > usize::cast_from(NUMERIC_DATUM_MAX_PRECISION) {
233                bail!(
234                    "decimals with precision greater than {} are not supported",
235                    NUMERIC_DATUM_MAX_PRECISION
236                )
237            }
238            SqlScalarType::Numeric {
239                max_scale: Some(NumericMaxScale::try_from(*scale)?),
240            }
241        }
242        SchemaPiece::Bytes | SchemaPiece::Fixed { .. } => SqlScalarType::Bytes,
243        SchemaPiece::String | SchemaPiece::Enum { .. } => SqlScalarType::String,
244
245        SchemaPiece::Json => SqlScalarType::Jsonb,
246        SchemaPiece::Uuid => SqlScalarType::Uuid,
247        SchemaPiece::Record { fields, .. } => {
248            let mut columns = vec![];
249            for f in fields {
250                let named_idx = match &f.schema {
251                    SchemaPieceOrNamed::Named(idx) => Some(*idx),
252                    _ => None,
253                };
254                if let Some(named_idx) = named_idx {
255                    if !seen_avro_nodes.insert(named_idx) {
256                        bail!(
257                            "Recursive types are not supported: {}",
258                            f.schema.get_human_name(schema.root)
259                        );
260                    }
261                }
262                let next_node = schema.step(&f.schema);
263                columns.extend(
264                    get_named_columns(seen_avro_nodes, next_node, Some(&f.name))?.into_iter(),
265                );
266                if let Some(named_idx) = named_idx {
267                    seen_avro_nodes.remove(&named_idx);
268                }
269            }
270            SqlScalarType::Record {
271                fields: columns.into(),
272                custom_id: None,
273            }
274        }
275        SchemaPiece::Array(inner) => {
276            let named_idx = match inner.as_ref() {
277                SchemaPieceOrNamed::Named(idx) => Some(*idx),
278                _ => None,
279            };
280            if let Some(named_idx) = named_idx {
281                if !seen_avro_nodes.insert(named_idx) {
282                    bail!(
283                        "Recursive types are not supported: {}",
284                        inner.get_human_name(schema.root)
285                    );
286                }
287            }
288            let next_node = schema.step(inner);
289            let ret = SqlScalarType::List {
290                element_type: Box::new(validate_schema_2(seen_avro_nodes, next_node)?),
291                custom_id: None,
292            };
293            if let Some(named_idx) = named_idx {
294                seen_avro_nodes.remove(&named_idx);
295            }
296            ret
297        }
298        SchemaPiece::Map(inner) => SqlScalarType::Map {
299            value_type: Box::new(validate_schema_2(seen_avro_nodes, schema.step(inner))?),
300            custom_id: None,
301        },
302
303        _ => bail!("Unsupported type in schema: {:?}", schema.inner),
304    })
305}
306
307pub struct ConfluentAvroResolver {
308    reader_schema: Schema,
309    writer_schemas: Option<SchemaCache>,
310    confluent_wire_format: bool,
311}
312
313impl ConfluentAvroResolver {
314    pub fn new(
315        reader_schema: &str,
316        reader_reference_schemas: &[String],
317        ccsr_client: Option<mz_ccsr::Client>,
318        confluent_wire_format: bool,
319    ) -> anyhow::Result<Self> {
320        // parse_schema handles incremental parsing of references (dependencies first)
321        let reader_schema = parse_schema(reader_schema, reader_reference_schemas)?;
322        let writer_schemas = ccsr_client.map(SchemaCache::new).transpose()?;
323        Ok(Self {
324            reader_schema,
325            writer_schemas,
326            confluent_wire_format,
327        })
328    }
329
330    pub async fn resolve<'a, 'b>(
331        &'a mut self,
332        mut bytes: &'b [u8],
333    ) -> anyhow::Result<anyhow::Result<(&'b [u8], &'a Schema, Option<i32>)>> {
334        let (resolved_schema, schema_id) = match &mut self.writer_schemas {
335            Some(cache) => {
336                debug_assert!(
337                    self.confluent_wire_format,
338                    "We should have set 'confluent_wire_format' everywhere \
339                     that can lead to this branch"
340                );
341                // XXX(guswynn): use destructuring assignments when they are stable
342                let (schema_id, adjusted_bytes) = match crate::confluent::extract_avro_header(bytes)
343                {
344                    Ok(ok) => ok,
345                    Err(err) => return Ok(Err(err)),
346                };
347                bytes = adjusted_bytes;
348                let result = cache
349                    .get(schema_id, &self.reader_schema)
350                    // The outer Result describes transient errors so use ? here to propagate
351                    .await?
352                    .with_context(|| format!("failed to resolve Avro schema (id = {})", schema_id));
353                let schema = match result {
354                    Ok(schema) => schema,
355                    Err(err) => return Ok(Err(err)),
356                };
357                (schema, Some(schema_id))
358            }
359
360            // If we haven't been asked to use a schema registry, we have no way
361            // to discover the writer's schema. That's ok; we'll just use the
362            // reader's schema and hope it lines up.
363            None => {
364                if self.confluent_wire_format {
365                    // validate and just move the bytes buffer ahead
366                    let (_, adjusted_bytes) = match crate::confluent::extract_avro_header(bytes) {
367                        Ok(ok) => ok,
368                        Err(err) => return Ok(Err(err)),
369                    };
370                    bytes = adjusted_bytes;
371                }
372                (&self.reader_schema, None)
373            }
374        };
375        Ok(Ok((bytes, resolved_schema, schema_id)))
376    }
377}
378
379impl fmt::Debug for ConfluentAvroResolver {
380    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
381        f.debug_struct("ConfluentAvroResolver")
382            .field("reader_schema", &self.reader_schema)
383            .field(
384                "write_schema",
385                if self.writer_schemas.is_some() {
386                    &"some"
387                } else {
388                    &"none"
389                },
390            )
391            .finish()
392    }
393}
394
395#[derive(Debug)]
396struct SchemaCache {
397    cache: BTreeMap<i32, Result<Schema, AvroError>>,
398    ccsr_client: Arc<mz_ccsr::Client>,
399}
400
401impl SchemaCache {
402    fn new(ccsr_client: mz_ccsr::Client) -> Result<SchemaCache, anyhow::Error> {
403        Ok(SchemaCache {
404            cache: BTreeMap::new(),
405            ccsr_client: Arc::new(ccsr_client),
406        })
407    }
408
409    /// Looks up the writer schema for ID. If the schema is literally identical
410    /// to the reader schema, as determined by the reader schema fingerprint
411    /// that this schema cache was initialized with, returns the schema directly.
412    /// If not, performs schema resolution on the reader and writer and
413    /// returns the result.
414    ///
415    /// This method also handles schema references: if the schema references types
416    /// defined in other schemas, those schemas are fetched and their types are made
417    /// available during parsing.
418    async fn get(
419        &mut self,
420        id: i32,
421        reader_schema: &Schema,
422    ) -> anyhow::Result<anyhow::Result<&Schema>> {
423        let entry = match self.cache.entry(id) {
424            Entry::Occupied(o) => o.into_mut(),
425            Entry::Vacant(v) => {
426                // An issue with _fetching_ the schema should be returned
427                // immediately, and not cached, since it might get better on the
428                // next retry.
429                let ccsr_client = Arc::clone(&self.ccsr_client);
430
431                // Fetch schema with its references (if any)
432                let (primary_subject, reference_subjects) = Retry::default()
433                    // Twice the timeout of the ccsr client so we can attempt 2 requests.
434                    .max_duration(ccsr_client.timeout() * 2)
435                    // Canceling because ultimately it's just non-mutating HTTP requests.
436                    .retry_async_canceling(move |state| {
437                        let ccsr_client = Arc::clone(&ccsr_client);
438                        async move {
439                            let res = ccsr_client.get_subject_and_references_by_id(id).await;
440                            match res {
441                                Err(e) => {
442                                    if let Some(timeout) = state.next_backoff {
443                                        warn!(
444                                            "transient failure fetching \
445                                                schema id {}: {:?}, retrying in {:?}",
446                                            id, e, timeout
447                                        );
448                                    }
449                                    Err(anyhow::Error::from(e))
450                                }
451                                _ => Ok(res?),
452                            }
453                        }
454                    })
455                    .run_in_task(|| format!("fetch_avro_schema:{}", id))
456                    .await?;
457
458                // Now, we've gotten some json back, so we want to cache it (regardless of whether it's a valid
459                // avro schema, it won't change).
460                //
461                // However, we can't just cache it directly, since resolving schemas takes significant CPU work,
462                // which we don't want to repeat for every record. So, parse and resolve it, and cache the
463                // result (whether schema or error).
464                let result = Self::parse_with_references(
465                    &primary_subject,
466                    &reference_subjects,
467                    reader_schema,
468                );
469                v.insert(result)
470            }
471        };
472        Ok(entry.as_ref().map_err(|e| anyhow::Error::new(e.clone())))
473    }
474
475    /// Parse a schema along with its references and resolve against the reader schema.
476    fn parse_with_references(
477        primary_subject: &mz_ccsr::Subject,
478        reference_subjects: &[mz_ccsr::Subject],
479        reader_schema: &Schema,
480    ) -> Result<Schema, AvroError> {
481        // Parse referenced schemas incrementally: each reference may depend on previous ones.
482        let mut reference_schemas: Vec<Schema> = Vec::with_capacity(reference_subjects.len());
483        for subject in reference_subjects {
484            let ref_json: serde_json::Value = serde_json::from_str(&subject.schema.raw)
485                .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
486            let parsed = Schema::parse_with_references(&ref_json, &reference_schemas)?;
487            reference_schemas.push(parsed);
488        }
489
490        // Parse primary schema, using references, if present.
491        let primary_value: serde_json::Value = serde_json::from_str(&primary_subject.schema.raw)
492            .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
493        let schema = Schema::parse_with_references(&primary_value, &reference_schemas)?;
494
495        // Schema fingerprints don't actually capture whether two schemas are meaningfully
496        // different, because they strip out logical types. Thus, resolve in all cases.
497        let resolved = resolve_schemas(&schema, reader_schema)?;
498        Ok(resolved)
499    }
500}