1use std::collections::btree_map::Entry;
39use std::collections::{BTreeMap, BTreeSet};
40use std::fmt;
41use std::sync::Arc;
42
43use anyhow::{Context, anyhow, bail};
44use mz_avro::error::Error as AvroError;
45use mz_avro::schema::{
46 ParseSchemaError, Schema, SchemaNode, SchemaPiece, SchemaPieceOrNamed, resolve_schemas,
47};
48use mz_ore::cast::CastFrom;
49use mz_ore::collections::CollectionExt;
50use mz_ore::future::OreFutureExt;
51use mz_ore::retry::Retry;
52use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
53use mz_repr::adt::timestamp::TimestampPrecision;
54use mz_repr::{ColumnName, RelationDesc, SqlColumnType, SqlScalarType, UNKNOWN_COLUMN_NAME};
55use tracing::warn;
56
57use crate::avro::is_null;
58
59pub fn parse_schema(schema: &str, references: &[String]) -> anyhow::Result<Schema> {
60 let schema: serde_json::Value = serde_json::from_str(schema)?;
61 let mut parsed_refs: Vec<Schema> = Vec::with_capacity(references.len());
64 for reference in references {
65 let ref_json: serde_json::Value = serde_json::from_str(reference)?;
66 let parsed = Schema::parse_with_references(&ref_json, &parsed_refs)?;
67 parsed_refs.push(parsed);
68 }
69 Ok(Schema::parse_with_references(&schema, &parsed_refs)?)
70}
71
72pub fn schema_to_relationdesc(schema: Schema) -> Result<RelationDesc, anyhow::Error> {
75 Ok(RelationDesc::from_names_and_types(validate_schema_1(
78 schema.top_node(),
79 )?))
80}
81
82fn validate_schema_1(schema: SchemaNode) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
85 let mut columns = vec![];
86 let mut seen_avro_nodes = Default::default();
87 match schema.inner {
88 SchemaPiece::Record { fields, .. } => {
89 for f in fields {
90 columns.extend(get_named_columns(
91 &mut seen_avro_nodes,
92 schema.step(&f.schema),
93 Some(&f.name),
94 )?);
95 }
96 }
97 _ => {
98 columns.extend(get_named_columns(&mut seen_avro_nodes, schema, None)?);
99 }
100 }
101 Ok(columns)
102}
103
104fn get_union_columns<'a>(
107 seen_avro_nodes: &mut BTreeSet<usize>,
108 schema: SchemaNode<'a>,
109 base_name: Option<&str>,
110) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
111 let us = match schema.inner {
112 SchemaPiece::Union(us) => us,
113 _ => panic!("This function should only be called on unions."),
114 };
115 let mut columns = vec![];
116 let vs = us.variants();
117 if vs.is_empty() || (vs.len() == 1 && is_null(&vs[0])) {
118 bail!(anyhow!("Empty or null-only unions are not supported"));
119 } else {
120 for (i, v) in vs.iter().filter(|v| !is_null(v)).enumerate() {
121 let named_idx = match v {
122 SchemaPieceOrNamed::Named(idx) => Some(*idx),
123 SchemaPieceOrNamed::Piece(_) => None,
124 };
125 if let Some(named_idx) = named_idx {
126 if !seen_avro_nodes.insert(named_idx) {
127 bail!(
128 "Recursive types are not supported: {}",
129 v.get_human_name(schema.root)
130 );
131 }
132 }
133 let node = schema.step(v);
134 if let SchemaPiece::Union(_) = node.inner {
135 unreachable!("Internal error: directly nested avro union!");
136 }
137
138 let name = if vs.len() == 1 || (vs.len() == 2 && vs.iter().any(is_null)) {
139 base_name
142 .map(|n| n.to_owned())
143 .or_else(|| {
144 v.get_piece_and_name(schema.root)
145 .1
146 .map(|full_name| full_name.base_name().to_owned())
147 })
148 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into())
149 } else {
150 base_name
154 .map(|n| format!("{}{}", n, i + 1))
155 .or_else(|| {
156 v.get_piece_and_name(schema.root)
157 .1
158 .map(|full_name| full_name.base_name().to_owned())
159 })
160 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into())
161 };
162
163 let ty = validate_schema_2(seen_avro_nodes, node)?;
167 columns.push((name.into(), ty.nullable(vs.len() > 1)));
168 if let Some(named_idx) = named_idx {
169 seen_avro_nodes.remove(&named_idx);
170 }
171 }
172 }
173 Ok(columns)
174}
175
176fn get_named_columns<'a>(
177 seen_avro_nodes: &mut BTreeSet<usize>,
178 schema: SchemaNode<'a>,
179 base_name: Option<&str>,
180) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
181 if let SchemaPiece::Union(_) = schema.inner {
182 get_union_columns(seen_avro_nodes, schema, base_name)
183 } else {
184 let scalar_type = validate_schema_2(seen_avro_nodes, schema)?;
185 Ok(vec![(
186 base_name.unwrap_or(UNKNOWN_COLUMN_NAME).into(),
189 scalar_type.nullable(false),
190 )])
191 }
192}
193
194fn validate_schema_2(
198 seen_avro_nodes: &mut BTreeSet<usize>,
199 schema: SchemaNode,
200) -> anyhow::Result<SqlScalarType> {
201 Ok(match schema.inner {
202 SchemaPiece::Union(_) => {
203 let columns = get_union_columns(seen_avro_nodes, schema, None)?;
204 if columns.len() != 1 {
205 bail!("Union of more than one non-null type not valid here");
206 }
207 let (_column_name, column_type) = columns.into_element();
208 column_type.scalar_type
215 }
216 SchemaPiece::Null => bail!("null outside of union types is not supported"),
217 SchemaPiece::Boolean => SqlScalarType::Bool,
218 SchemaPiece::Int => SqlScalarType::Int32,
219 SchemaPiece::Long => SqlScalarType::Int64,
220 SchemaPiece::Float => SqlScalarType::Float32,
221 SchemaPiece::Double => SqlScalarType::Float64,
222 SchemaPiece::Date => SqlScalarType::Date,
223 SchemaPiece::TimestampMilli => SqlScalarType::Timestamp {
224 precision: Some(TimestampPrecision::try_from(3).unwrap()),
225 },
226 SchemaPiece::TimestampMicro => SqlScalarType::Timestamp {
227 precision: Some(TimestampPrecision::try_from(6).unwrap()),
228 },
229 SchemaPiece::Decimal {
230 precision, scale, ..
231 } => {
232 if *precision > usize::cast_from(NUMERIC_DATUM_MAX_PRECISION) {
233 bail!(
234 "decimals with precision greater than {} are not supported",
235 NUMERIC_DATUM_MAX_PRECISION
236 )
237 }
238 SqlScalarType::Numeric {
239 max_scale: Some(NumericMaxScale::try_from(*scale)?),
240 }
241 }
242 SchemaPiece::Bytes | SchemaPiece::Fixed { .. } => SqlScalarType::Bytes,
243 SchemaPiece::String | SchemaPiece::Enum { .. } => SqlScalarType::String,
244
245 SchemaPiece::Json => SqlScalarType::Jsonb,
246 SchemaPiece::Uuid => SqlScalarType::Uuid,
247 SchemaPiece::Record { fields, .. } => {
248 let mut columns = vec![];
249 for f in fields {
250 let named_idx = match &f.schema {
251 SchemaPieceOrNamed::Named(idx) => Some(*idx),
252 SchemaPieceOrNamed::Piece(_) => None,
253 };
254 if let Some(named_idx) = named_idx {
255 if !seen_avro_nodes.insert(named_idx) {
256 bail!(
257 "Recursive types are not supported: {}",
258 f.schema.get_human_name(schema.root)
259 );
260 }
261 }
262 let next_node = schema.step(&f.schema);
263 columns.extend(get_named_columns(
264 seen_avro_nodes,
265 next_node,
266 Some(&f.name),
267 )?);
268 if let Some(named_idx) = named_idx {
269 seen_avro_nodes.remove(&named_idx);
270 }
271 }
272 SqlScalarType::Record {
273 fields: columns.into(),
274 custom_id: None,
275 }
276 }
277 SchemaPiece::Array(inner) => {
278 let named_idx = match inner.as_ref() {
279 SchemaPieceOrNamed::Named(idx) => Some(*idx),
280 SchemaPieceOrNamed::Piece(_) => None,
281 };
282 if let Some(named_idx) = named_idx {
283 if !seen_avro_nodes.insert(named_idx) {
284 bail!(
285 "Recursive types are not supported: {}",
286 inner.get_human_name(schema.root)
287 );
288 }
289 }
290 let next_node = schema.step(inner);
291 let ret = SqlScalarType::List {
292 element_type: Box::new(validate_schema_2(seen_avro_nodes, next_node)?),
293 custom_id: None,
294 };
295 if let Some(named_idx) = named_idx {
296 seen_avro_nodes.remove(&named_idx);
297 }
298 ret
299 }
300 SchemaPiece::Map(inner) => SqlScalarType::Map {
301 value_type: Box::new(validate_schema_2(seen_avro_nodes, schema.step(inner))?),
302 custom_id: None,
303 },
304
305 _ => bail!("Unsupported type in schema: {:?}", schema.inner),
306 })
307}
308
309pub struct ConfluentAvroResolver {
310 reader_schema: Schema,
311 writer_schemas: Option<SchemaCache>,
312 confluent_wire_format: bool,
313}
314
315impl ConfluentAvroResolver {
316 pub fn new(
317 reader_schema: &str,
318 reader_reference_schemas: &[String],
319 ccsr_client: Option<mz_ccsr::Client>,
320 confluent_wire_format: bool,
321 ) -> anyhow::Result<Self> {
322 let reader_schema = parse_schema(reader_schema, reader_reference_schemas)?;
324 let writer_schemas = ccsr_client.map(SchemaCache::new).transpose()?;
325 Ok(Self {
326 reader_schema,
327 writer_schemas,
328 confluent_wire_format,
329 })
330 }
331
332 pub async fn resolve<'a, 'b>(
333 &'a mut self,
334 mut bytes: &'b [u8],
335 ) -> anyhow::Result<anyhow::Result<(&'b [u8], &'a Schema, Option<i32>)>> {
336 let (resolved_schema, schema_id) = match &mut self.writer_schemas {
337 Some(cache) => {
338 debug_assert!(
339 self.confluent_wire_format,
340 "We should have set 'confluent_wire_format' everywhere \
341 that can lead to this branch"
342 );
343 let (schema_id, adjusted_bytes) = match crate::confluent::extract_avro_header(bytes)
345 {
346 Ok(ok) => ok,
347 Err(err) => return Ok(Err(err)),
348 };
349 bytes = adjusted_bytes;
350 let result = cache
351 .get(schema_id, &self.reader_schema)
352 .await?
354 .with_context(|| format!("failed to resolve Avro schema (id = {})", schema_id));
355 let schema = match result {
356 Ok(schema) => schema,
357 Err(err) => return Ok(Err(err)),
358 };
359 (schema, Some(schema_id))
360 }
361
362 None => {
366 if self.confluent_wire_format {
367 let (_, adjusted_bytes) = match crate::confluent::extract_avro_header(bytes) {
369 Ok(ok) => ok,
370 Err(err) => return Ok(Err(err)),
371 };
372 bytes = adjusted_bytes;
373 }
374 (&self.reader_schema, None)
375 }
376 };
377 Ok(Ok((bytes, resolved_schema, schema_id)))
378 }
379}
380
381impl fmt::Debug for ConfluentAvroResolver {
382 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
383 f.debug_struct("ConfluentAvroResolver")
384 .field("reader_schema", &self.reader_schema)
385 .field(
386 "write_schema",
387 if self.writer_schemas.is_some() {
388 &"some"
389 } else {
390 &"none"
391 },
392 )
393 .finish()
394 }
395}
396
397#[derive(Debug)]
398struct SchemaCache {
399 cache: BTreeMap<i32, Result<Schema, AvroError>>,
400 ccsr_client: Arc<mz_ccsr::Client>,
401}
402
403impl SchemaCache {
404 fn new(ccsr_client: mz_ccsr::Client) -> Result<SchemaCache, anyhow::Error> {
405 Ok(SchemaCache {
406 cache: BTreeMap::new(),
407 ccsr_client: Arc::new(ccsr_client),
408 })
409 }
410
411 async fn get(
421 &mut self,
422 id: i32,
423 reader_schema: &Schema,
424 ) -> anyhow::Result<anyhow::Result<&Schema>> {
425 let entry = match self.cache.entry(id) {
426 Entry::Occupied(o) => o.into_mut(),
427 Entry::Vacant(v) => {
428 let ccsr_client = Arc::clone(&self.ccsr_client);
432
433 let (primary_subject, reference_subjects) = Retry::default()
435 .max_duration(ccsr_client.timeout() * 2)
437 .retry_async_canceling(move |state| {
439 let ccsr_client = Arc::clone(&ccsr_client);
440 async move {
441 let res = ccsr_client.get_subject_and_references_by_id(id).await;
442 match res {
443 Err(e) => {
444 if let Some(timeout) = state.next_backoff {
445 warn!(
446 "transient failure fetching \
447 schema id {}: {:?}, retrying in {:?}",
448 id, e, timeout
449 );
450 }
451 Err(anyhow::Error::from(e))
452 }
453 _ => Ok(res?),
454 }
455 }
456 })
457 .run_in_task(|| format!("fetch_avro_schema:{}", id))
458 .await?;
459
460 let result = Self::parse_with_references(
467 &primary_subject,
468 &reference_subjects,
469 reader_schema,
470 );
471 v.insert(result)
472 }
473 };
474 Ok(entry.as_ref().map_err(|e| anyhow::Error::new(e.clone())))
475 }
476
477 fn parse_with_references(
479 primary_subject: &mz_ccsr::Subject,
480 reference_subjects: &[mz_ccsr::Subject],
481 reader_schema: &Schema,
482 ) -> Result<Schema, AvroError> {
483 let mut reference_schemas: Vec<Schema> = Vec::with_capacity(reference_subjects.len());
485 for subject in reference_subjects {
486 let ref_json: serde_json::Value = serde_json::from_str(&subject.schema.raw)
487 .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
488 let parsed = Schema::parse_with_references(&ref_json, &reference_schemas)?;
489 reference_schemas.push(parsed);
490 }
491
492 let primary_value: serde_json::Value = serde_json::from_str(&primary_subject.schema.raw)
494 .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
495 let schema = Schema::parse_with_references(&primary_value, &reference_schemas)?;
496
497 let resolved = resolve_schemas(&schema, reader_schema)?;
500 Ok(resolved)
501 }
502}