mz_interchange/avro/
schema.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Conversion from Avro schemas to Materialize `RelationDesc`s.
11//!
12//! A few notes for posterity on how this conversion happens are in order.
13//!
14//! If the schema is an Avro record, we flatten it to its fields, which become the columns
15//! of the relation.
16//!
17//! Each individual field is then converted to its SQL equivalent. For most types, this
18//! conversion is the obvious one. The only non-trivial counterexample is Avro unions.
19//!
20//! Since Avro types are not nullable by default, the typical way normal (i.e., nullable)
21//! SQL fields are represented in Avro is by a union of the underlying type with the
22//! singleton type { Null }; in Avro schema notation, this is `["null", "TheType"]`.
23//! We shall call union types following this pattern _Nullability-Pattern Unions_.
24//! We shall call all other union types (e.g. `["MyType1", "MyType2"]` or `["null", "MyType1", "MyType2"]`) _Essential Unions_.
25//! Since there is an obvious way to represent Nullability-Pattern Unions, but not Essential Unions, in the SQL type system,
26//! we must handle Essential Unions with a bit of a hack (at least until Materialize supports union or sum types, which may be never).
27//!
28//! When an Essential Union appears as one of the fields of a record, we expand
29//! it to _n_ columns in SQL, where _n_ is the number of non-null variants in the union. These
30//! columns will be given names created by pasting their index at the end of the overall name
31//! of the field. For example, if an Essential Union in a field named `"Foo"` has schema `[int, bool]`, it will expand to the columns `"Foo1": bool, "Foo2": int`. There is an implicit constraint upheld be the source pipeline that only one such column will be non-`null` at a time
32//!
33//! When an Essential Union appears _elsewhere_ than as one of the fields of a record,
34//! there is nothing we can do, because we expect to be able to turn it into exactly one
35//! SQL type, not a series of them. Thus, in these cases, we just bail. For example, it's
36//! not possible to ingest an array or map whose element type is an Essential Union.
37
38use std::collections::btree_map::Entry;
39use std::collections::{BTreeMap, BTreeSet};
40use std::fmt;
41use std::str::FromStr;
42use std::sync::Arc;
43
44use anyhow::{Context, anyhow, bail};
45use mz_avro::error::Error as AvroError;
46use mz_avro::schema::{Schema, SchemaNode, SchemaPiece, SchemaPieceOrNamed, resolve_schemas};
47use mz_ore::cast::CastFrom;
48use mz_ore::collections::CollectionExt;
49use mz_ore::future::OreFutureExt;
50use mz_ore::retry::Retry;
51use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
52use mz_repr::adt::timestamp::TimestampPrecision;
53use mz_repr::{ColumnName, ColumnType, RelationDesc, ScalarType};
54use tracing::warn;
55
56use crate::avro::is_null;
57
58pub fn parse_schema(schema: &str) -> anyhow::Result<Schema> {
59    let schema = serde_json::from_str(schema)?;
60    Ok(Schema::parse(&schema)?)
61}
62
63/// Converts an Apache Avro schema into a list of column names and types.
64// TODO(petrosagg): find a way to make this a TryFrom impl somewhere
65pub fn schema_to_relationdesc(schema: Schema) -> Result<RelationDesc, anyhow::Error> {
66    // TODO(petrosagg): call directly into validate_schema_2 and do the Record flattening once
67    // we're in RelationDesc land
68    Ok(RelationDesc::from_names_and_types(validate_schema_1(
69        schema.top_node(),
70    )?))
71}
72
73/// Convert an Avro schema to a series of columns and names, flattening the top-level record,
74/// if the top node is indeed a record.
75fn validate_schema_1(schema: SchemaNode) -> anyhow::Result<Vec<(ColumnName, ColumnType)>> {
76    let mut columns = vec![];
77    let mut seen_avro_nodes = Default::default();
78    match schema.inner {
79        SchemaPiece::Record { fields, .. } => {
80            for f in fields {
81                columns.extend(get_named_columns(
82                    &mut seen_avro_nodes,
83                    schema.step(&f.schema),
84                    Some(&f.name),
85                )?);
86            }
87        }
88        _ => {
89            columns.extend(get_named_columns(&mut seen_avro_nodes, schema, None)?);
90        }
91    }
92    Ok(columns)
93}
94
95/// Get the series of (one or more) SQL columns corresponding to an Avro union.
96/// See module comments for details.
97fn get_union_columns<'a>(
98    seen_avro_nodes: &mut BTreeSet<usize>,
99    schema: SchemaNode<'a>,
100    base_name: Option<&str>,
101) -> anyhow::Result<Vec<(ColumnName, ColumnType)>> {
102    let us = match schema.inner {
103        SchemaPiece::Union(us) => us,
104        _ => panic!("This function should only be called on unions."),
105    };
106    let mut columns = vec![];
107    let vs = us.variants();
108    if vs.is_empty() || (vs.len() == 1 && is_null(&vs[0])) {
109        bail!(anyhow!("Empty or null-only unions are not supported"));
110    } else {
111        for (i, v) in vs.iter().filter(|v| !is_null(v)).enumerate() {
112            let named_idx = match v {
113                SchemaPieceOrNamed::Named(idx) => Some(*idx),
114                _ => None,
115            };
116            if let Some(named_idx) = named_idx {
117                if !seen_avro_nodes.insert(named_idx) {
118                    bail!(
119                        "Recursive types are not supported: {}",
120                        v.get_human_name(schema.root)
121                    );
122                }
123            }
124            let node = schema.step(v);
125            if let SchemaPiece::Union(_) = node.inner {
126                unreachable!("Internal error: directly nested avro union!");
127            }
128
129            let name = if vs.len() == 1 || (vs.len() == 2 && vs.iter().any(is_null)) {
130                // There is only one non-null variant in the
131                // union, so we can use the field name directly.
132                base_name
133                    .map(|n| n.to_owned())
134                    .or_else(|| {
135                        v.get_piece_and_name(schema.root)
136                            .1
137                            .map(|full_name| full_name.base_name().to_owned())
138                    })
139                    .unwrap_or_else(|| "?column?".into())
140            } else {
141                // There are multiple non-null variants in the
142                // union, so we need to invent field names for
143                // each variant.
144                base_name
145                    .map(|n| format!("{}{}", n, i + 1))
146                    .or_else(|| {
147                        v.get_piece_and_name(schema.root)
148                            .1
149                            .map(|full_name| full_name.base_name().to_owned())
150                    })
151                    .unwrap_or_else(|| "?column?".into())
152            };
153
154            // If there is more than one variant in the union,
155            // the column's output type is nullable, as this
156            // column will be null whenever it is uninhabited.
157            let ty = validate_schema_2(seen_avro_nodes, node)?;
158            columns.push((name.into(), ty.nullable(vs.len() > 1)));
159            if let Some(named_idx) = named_idx {
160                seen_avro_nodes.remove(&named_idx);
161            }
162        }
163    }
164    Ok(columns)
165}
166
167fn get_named_columns<'a>(
168    seen_avro_nodes: &mut BTreeSet<usize>,
169    schema: SchemaNode<'a>,
170    base_name: Option<&str>,
171) -> anyhow::Result<Vec<(ColumnName, ColumnType)>> {
172    if let SchemaPiece::Union(_) = schema.inner {
173        get_union_columns(seen_avro_nodes, schema, base_name)
174    } else {
175        let scalar_type = validate_schema_2(seen_avro_nodes, schema)?;
176        Ok(vec![(
177            // TODO(benesch): we should do better than this when there's no base
178            // name, e.g., invent a name based on the type.
179            base_name.unwrap_or("?column?").into(),
180            scalar_type.nullable(false),
181        )])
182    }
183}
184
185/// Get the single column corresponding to a schema node.
186/// It is an error if this node should correspond to more than one column
187/// (because it is an Essential Union in the sense described in the module docs).
188fn validate_schema_2(
189    seen_avro_nodes: &mut BTreeSet<usize>,
190    schema: SchemaNode,
191) -> anyhow::Result<ScalarType> {
192    Ok(match schema.inner {
193        SchemaPiece::Union(_) => {
194            let columns = get_union_columns(seen_avro_nodes, schema, None)?;
195            if columns.len() != 1 {
196                bail!("Union of more than one non-null type not valid here");
197            }
198            let (_column_name, column_type) = columns.into_element();
199            // It's okay to lose the nullability information here, as it's not relevant to
200            // any higher layer. This will either be included in an array or map type,
201            // where all values are nullable. It can't be included as a top-level column
202            // or as a record type, where nullability is actually tracked, because in
203            // those cases we will have already gone through the `Union` code path in
204            // `get_named_columns`.
205            column_type.scalar_type
206        }
207        SchemaPiece::Null => bail!("null outside of union types is not supported"),
208        SchemaPiece::Boolean => ScalarType::Bool,
209        SchemaPiece::Int => ScalarType::Int32,
210        SchemaPiece::Long => ScalarType::Int64,
211        SchemaPiece::Float => ScalarType::Float32,
212        SchemaPiece::Double => ScalarType::Float64,
213        SchemaPiece::Date => ScalarType::Date,
214        SchemaPiece::TimestampMilli => ScalarType::Timestamp {
215            precision: Some(TimestampPrecision::try_from(3).unwrap()),
216        },
217        SchemaPiece::TimestampMicro => ScalarType::Timestamp {
218            precision: Some(TimestampPrecision::try_from(6).unwrap()),
219        },
220        SchemaPiece::Decimal {
221            precision, scale, ..
222        } => {
223            if *precision > usize::cast_from(NUMERIC_DATUM_MAX_PRECISION) {
224                bail!(
225                    "decimals with precision greater than {} are not supported",
226                    NUMERIC_DATUM_MAX_PRECISION
227                )
228            }
229            ScalarType::Numeric {
230                max_scale: Some(NumericMaxScale::try_from(*scale)?),
231            }
232        }
233        SchemaPiece::Bytes | SchemaPiece::Fixed { .. } => ScalarType::Bytes,
234        SchemaPiece::String | SchemaPiece::Enum { .. } => ScalarType::String,
235
236        SchemaPiece::Json => ScalarType::Jsonb,
237        SchemaPiece::Uuid => ScalarType::Uuid,
238        SchemaPiece::Record { fields, .. } => {
239            let mut columns = vec![];
240            for f in fields {
241                let named_idx = match &f.schema {
242                    SchemaPieceOrNamed::Named(idx) => Some(*idx),
243                    _ => None,
244                };
245                if let Some(named_idx) = named_idx {
246                    if !seen_avro_nodes.insert(named_idx) {
247                        bail!(
248                            "Recursive types are not supported: {}",
249                            f.schema.get_human_name(schema.root)
250                        );
251                    }
252                }
253                let next_node = schema.step(&f.schema);
254                columns.extend(
255                    get_named_columns(seen_avro_nodes, next_node, Some(&f.name))?.into_iter(),
256                );
257                if let Some(named_idx) = named_idx {
258                    seen_avro_nodes.remove(&named_idx);
259                }
260            }
261            ScalarType::Record {
262                fields: columns.into(),
263                custom_id: None,
264            }
265        }
266        SchemaPiece::Array(inner) => {
267            let named_idx = match inner.as_ref() {
268                SchemaPieceOrNamed::Named(idx) => Some(*idx),
269                _ => None,
270            };
271            if let Some(named_idx) = named_idx {
272                if !seen_avro_nodes.insert(named_idx) {
273                    bail!(
274                        "Recursive types are not supported: {}",
275                        inner.get_human_name(schema.root)
276                    );
277                }
278            }
279            let next_node = schema.step(inner);
280            let ret = ScalarType::List {
281                element_type: Box::new(validate_schema_2(seen_avro_nodes, next_node)?),
282                custom_id: None,
283            };
284            if let Some(named_idx) = named_idx {
285                seen_avro_nodes.remove(&named_idx);
286            }
287            ret
288        }
289        SchemaPiece::Map(inner) => ScalarType::Map {
290            value_type: Box::new(validate_schema_2(seen_avro_nodes, schema.step(inner))?),
291            custom_id: None,
292        },
293
294        _ => bail!("Unsupported type in schema: {:?}", schema.inner),
295    })
296}
297
298pub struct ConfluentAvroResolver {
299    reader_schema: Schema,
300    writer_schemas: Option<SchemaCache>,
301    confluent_wire_format: bool,
302}
303
304impl ConfluentAvroResolver {
305    pub fn new(
306        reader_schema: &str,
307        ccsr_client: Option<mz_ccsr::Client>,
308        confluent_wire_format: bool,
309    ) -> anyhow::Result<Self> {
310        let reader_schema = parse_schema(reader_schema)?;
311        let writer_schemas = ccsr_client.map(SchemaCache::new).transpose()?;
312        Ok(Self {
313            reader_schema,
314            writer_schemas,
315            confluent_wire_format,
316        })
317    }
318
319    pub async fn resolve<'a, 'b>(
320        &'a mut self,
321        mut bytes: &'b [u8],
322    ) -> anyhow::Result<anyhow::Result<(&'b [u8], &'a Schema, Option<i32>)>> {
323        let (resolved_schema, schema_id) = match &mut self.writer_schemas {
324            Some(cache) => {
325                debug_assert!(
326                    self.confluent_wire_format,
327                    "We should have set 'confluent_wire_format' everywhere \
328                     that can lead to this branch"
329                );
330                // XXX(guswynn): use destructuring assignments when they are stable
331                let (schema_id, adjusted_bytes) = match crate::confluent::extract_avro_header(bytes)
332                {
333                    Ok(ok) => ok,
334                    Err(err) => return Ok(Err(err)),
335                };
336                bytes = adjusted_bytes;
337                let result = cache
338                    .get(schema_id, &self.reader_schema)
339                    // The outer Result describes transient errors so use ? here to propagate
340                    .await?
341                    .with_context(|| format!("failed to resolve Avro schema (id = {})", schema_id));
342                let schema = match result {
343                    Ok(schema) => schema,
344                    Err(err) => return Ok(Err(err)),
345                };
346                (schema, Some(schema_id))
347            }
348
349            // If we haven't been asked to use a schema registry, we have no way
350            // to discover the writer's schema. That's ok; we'll just use the
351            // reader's schema and hope it lines up.
352            None => {
353                if self.confluent_wire_format {
354                    // validate and just move the bytes buffer ahead
355                    let (_, adjusted_bytes) = match crate::confluent::extract_avro_header(bytes) {
356                        Ok(ok) => ok,
357                        Err(err) => return Ok(Err(err)),
358                    };
359                    bytes = adjusted_bytes;
360                }
361                (&self.reader_schema, None)
362            }
363        };
364        Ok(Ok((bytes, resolved_schema, schema_id)))
365    }
366}
367
368impl fmt::Debug for ConfluentAvroResolver {
369    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
370        f.debug_struct("ConfluentAvroResolver")
371            .field("reader_schema", &self.reader_schema)
372            .field(
373                "write_schema",
374                if self.writer_schemas.is_some() {
375                    &"some"
376                } else {
377                    &"none"
378                },
379            )
380            .finish()
381    }
382}
383
384#[derive(Debug)]
385struct SchemaCache {
386    cache: BTreeMap<i32, Result<Schema, AvroError>>,
387    ccsr_client: Arc<mz_ccsr::Client>,
388}
389
390impl SchemaCache {
391    fn new(ccsr_client: mz_ccsr::Client) -> Result<SchemaCache, anyhow::Error> {
392        Ok(SchemaCache {
393            cache: BTreeMap::new(),
394            ccsr_client: Arc::new(ccsr_client),
395        })
396    }
397
398    /// Looks up the writer schema for ID. If the schema is literally identical
399    /// to the reader schema, as determined by the reader schema fingerprint
400    /// that this schema cache was initialized with, returns the schema directly.
401    /// If not, performs schema resolution on the reader and writer and
402    /// returns the result.
403    async fn get(
404        &mut self,
405        id: i32,
406        reader_schema: &Schema,
407    ) -> anyhow::Result<anyhow::Result<&Schema>> {
408        let entry = match self.cache.entry(id) {
409            Entry::Occupied(o) => o.into_mut(),
410            Entry::Vacant(v) => {
411                // An issue with _fetching_ the schema should be returned
412                // immediately, and not cached, since it might get better on the
413                // next retry.
414                let ccsr_client = Arc::clone(&self.ccsr_client);
415                let response = Retry::default()
416                    // Twice the timeout of the ccsr client so we can attempt 2 requests.
417                    .max_duration(ccsr_client.timeout() * 2)
418                    // Canceling because ultimately it's just non-mutating HTTP requests.
419                    .retry_async_canceling(move |state| {
420                        let ccsr_client = Arc::clone(&ccsr_client);
421                        async move {
422                            let res = ccsr_client.get_schema_by_id(id).await;
423                            match res {
424                                Err(e) => {
425                                    if let Some(timeout) = state.next_backoff {
426                                        warn!(
427                                            "transient failure fetching \
428                                                schema id {}: {:?}, retrying in {:?}",
429                                            id, e, timeout
430                                        );
431                                    }
432                                    Err(anyhow::Error::from(e))
433                                }
434                                _ => Ok(res?),
435                            }
436                        }
437                    })
438                    .run_in_task(|| format!("fetch_avro_schema:{}", id))
439                    .await?;
440                // Now, we've gotten some json back, so we want to cache it (regardless of whether it's a valid
441                // avro schema, it won't change).
442                //
443                // However, we can't just cache it directly, since resolving schemas takes significant CPU work,
444                // which  we don't want to repeat for every record. So, parse and resolve it, and cache the
445                // result (whether schema or error).
446                let result = Schema::from_str(&response.raw).and_then(|schema| {
447                    // Schema fingerprints don't actually capture whether two schemas are meaningfully
448                    // different, because they strip out logical types. Thus, resolve in all cases.
449                    let resolved = resolve_schemas(&schema, reader_schema)?;
450                    Ok(resolved)
451                });
452                v.insert(result)
453            }
454        };
455        Ok(entry.as_ref().map_err(|e| anyhow::Error::new(e.clone())))
456    }
457}