1use std::collections::btree_map::Entry;
39use std::collections::{BTreeMap, BTreeSet};
40use std::fmt;
41use std::sync::Arc;
42
43use anyhow::{Context, anyhow, bail};
44use mz_avro::error::Error as AvroError;
45use mz_avro::schema::{
46 ParseSchemaError, Schema, SchemaNode, SchemaPiece, SchemaPieceOrNamed, resolve_schemas,
47};
48use mz_ore::cast::CastFrom;
49use mz_ore::collections::CollectionExt;
50use mz_ore::future::OreFutureExt;
51use mz_ore::retry::Retry;
52use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
53use mz_repr::adt::timestamp::TimestampPrecision;
54use mz_repr::{ColumnName, RelationDesc, SqlColumnType, SqlScalarType, UNKNOWN_COLUMN_NAME};
55use tracing::warn;
56
57use crate::avro::is_null;
58
59pub fn parse_schema(schema: &str, references: &[String]) -> anyhow::Result<Schema> {
60 let schema: serde_json::Value = serde_json::from_str(schema)?;
61 let mut parsed_refs: Vec<Schema> = Vec::with_capacity(references.len());
64 for reference in references {
65 let ref_json: serde_json::Value = serde_json::from_str(reference)?;
66 let parsed = Schema::parse_with_references(&ref_json, &parsed_refs)?;
67 parsed_refs.push(parsed);
68 }
69 Ok(Schema::parse_with_references(&schema, &parsed_refs)?)
70}
71
72pub fn schema_to_relationdesc(schema: Schema) -> Result<RelationDesc, anyhow::Error> {
75 Ok(RelationDesc::from_names_and_types(validate_schema_1(
78 schema.top_node(),
79 )?))
80}
81
82fn validate_schema_1(schema: SchemaNode) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
85 let mut columns = vec![];
86 let mut seen_avro_nodes = Default::default();
87 match schema.inner {
88 SchemaPiece::Record { fields, .. } => {
89 for f in fields {
90 columns.extend(get_named_columns(
91 &mut seen_avro_nodes,
92 schema.step(&f.schema),
93 Some(&f.name),
94 )?);
95 }
96 }
97 _ => {
98 columns.extend(get_named_columns(&mut seen_avro_nodes, schema, None)?);
99 }
100 }
101 Ok(columns)
102}
103
104fn get_union_columns<'a>(
107 seen_avro_nodes: &mut BTreeSet<usize>,
108 schema: SchemaNode<'a>,
109 base_name: Option<&str>,
110) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
111 let us = match schema.inner {
112 SchemaPiece::Union(us) => us,
113 _ => panic!("This function should only be called on unions."),
114 };
115 let mut columns = vec![];
116 let vs = us.variants();
117 if vs.is_empty() || (vs.len() == 1 && is_null(&vs[0])) {
118 bail!(anyhow!("Empty or null-only unions are not supported"));
119 } else {
120 for (i, v) in vs.iter().filter(|v| !is_null(v)).enumerate() {
121 let named_idx = match v {
122 SchemaPieceOrNamed::Named(idx) => Some(*idx),
123 _ => None,
124 };
125 if let Some(named_idx) = named_idx {
126 if !seen_avro_nodes.insert(named_idx) {
127 bail!(
128 "Recursive types are not supported: {}",
129 v.get_human_name(schema.root)
130 );
131 }
132 }
133 let node = schema.step(v);
134 if let SchemaPiece::Union(_) = node.inner {
135 unreachable!("Internal error: directly nested avro union!");
136 }
137
138 let name = if vs.len() == 1 || (vs.len() == 2 && vs.iter().any(is_null)) {
139 base_name
142 .map(|n| n.to_owned())
143 .or_else(|| {
144 v.get_piece_and_name(schema.root)
145 .1
146 .map(|full_name| full_name.base_name().to_owned())
147 })
148 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into())
149 } else {
150 base_name
154 .map(|n| format!("{}{}", n, i + 1))
155 .or_else(|| {
156 v.get_piece_and_name(schema.root)
157 .1
158 .map(|full_name| full_name.base_name().to_owned())
159 })
160 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into())
161 };
162
163 let ty = validate_schema_2(seen_avro_nodes, node)?;
167 columns.push((name.into(), ty.nullable(vs.len() > 1)));
168 if let Some(named_idx) = named_idx {
169 seen_avro_nodes.remove(&named_idx);
170 }
171 }
172 }
173 Ok(columns)
174}
175
176fn get_named_columns<'a>(
177 seen_avro_nodes: &mut BTreeSet<usize>,
178 schema: SchemaNode<'a>,
179 base_name: Option<&str>,
180) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
181 if let SchemaPiece::Union(_) = schema.inner {
182 get_union_columns(seen_avro_nodes, schema, base_name)
183 } else {
184 let scalar_type = validate_schema_2(seen_avro_nodes, schema)?;
185 Ok(vec![(
186 base_name.unwrap_or(UNKNOWN_COLUMN_NAME).into(),
189 scalar_type.nullable(false),
190 )])
191 }
192}
193
194fn validate_schema_2(
198 seen_avro_nodes: &mut BTreeSet<usize>,
199 schema: SchemaNode,
200) -> anyhow::Result<SqlScalarType> {
201 Ok(match schema.inner {
202 SchemaPiece::Union(_) => {
203 let columns = get_union_columns(seen_avro_nodes, schema, None)?;
204 if columns.len() != 1 {
205 bail!("Union of more than one non-null type not valid here");
206 }
207 let (_column_name, column_type) = columns.into_element();
208 column_type.scalar_type
215 }
216 SchemaPiece::Null => bail!("null outside of union types is not supported"),
217 SchemaPiece::Boolean => SqlScalarType::Bool,
218 SchemaPiece::Int => SqlScalarType::Int32,
219 SchemaPiece::Long => SqlScalarType::Int64,
220 SchemaPiece::Float => SqlScalarType::Float32,
221 SchemaPiece::Double => SqlScalarType::Float64,
222 SchemaPiece::Date => SqlScalarType::Date,
223 SchemaPiece::TimestampMilli => SqlScalarType::Timestamp {
224 precision: Some(TimestampPrecision::try_from(3).unwrap()),
225 },
226 SchemaPiece::TimestampMicro => SqlScalarType::Timestamp {
227 precision: Some(TimestampPrecision::try_from(6).unwrap()),
228 },
229 SchemaPiece::Decimal {
230 precision, scale, ..
231 } => {
232 if *precision > usize::cast_from(NUMERIC_DATUM_MAX_PRECISION) {
233 bail!(
234 "decimals with precision greater than {} are not supported",
235 NUMERIC_DATUM_MAX_PRECISION
236 )
237 }
238 SqlScalarType::Numeric {
239 max_scale: Some(NumericMaxScale::try_from(*scale)?),
240 }
241 }
242 SchemaPiece::Bytes | SchemaPiece::Fixed { .. } => SqlScalarType::Bytes,
243 SchemaPiece::String | SchemaPiece::Enum { .. } => SqlScalarType::String,
244
245 SchemaPiece::Json => SqlScalarType::Jsonb,
246 SchemaPiece::Uuid => SqlScalarType::Uuid,
247 SchemaPiece::Record { fields, .. } => {
248 let mut columns = vec![];
249 for f in fields {
250 let named_idx = match &f.schema {
251 SchemaPieceOrNamed::Named(idx) => Some(*idx),
252 _ => None,
253 };
254 if let Some(named_idx) = named_idx {
255 if !seen_avro_nodes.insert(named_idx) {
256 bail!(
257 "Recursive types are not supported: {}",
258 f.schema.get_human_name(schema.root)
259 );
260 }
261 }
262 let next_node = schema.step(&f.schema);
263 columns.extend(
264 get_named_columns(seen_avro_nodes, next_node, Some(&f.name))?.into_iter(),
265 );
266 if let Some(named_idx) = named_idx {
267 seen_avro_nodes.remove(&named_idx);
268 }
269 }
270 SqlScalarType::Record {
271 fields: columns.into(),
272 custom_id: None,
273 }
274 }
275 SchemaPiece::Array(inner) => {
276 let named_idx = match inner.as_ref() {
277 SchemaPieceOrNamed::Named(idx) => Some(*idx),
278 _ => None,
279 };
280 if let Some(named_idx) = named_idx {
281 if !seen_avro_nodes.insert(named_idx) {
282 bail!(
283 "Recursive types are not supported: {}",
284 inner.get_human_name(schema.root)
285 );
286 }
287 }
288 let next_node = schema.step(inner);
289 let ret = SqlScalarType::List {
290 element_type: Box::new(validate_schema_2(seen_avro_nodes, next_node)?),
291 custom_id: None,
292 };
293 if let Some(named_idx) = named_idx {
294 seen_avro_nodes.remove(&named_idx);
295 }
296 ret
297 }
298 SchemaPiece::Map(inner) => SqlScalarType::Map {
299 value_type: Box::new(validate_schema_2(seen_avro_nodes, schema.step(inner))?),
300 custom_id: None,
301 },
302
303 _ => bail!("Unsupported type in schema: {:?}", schema.inner),
304 })
305}
306
307pub struct ConfluentAvroResolver {
308 reader_schema: Schema,
309 writer_schemas: Option<SchemaCache>,
310 confluent_wire_format: bool,
311}
312
313impl ConfluentAvroResolver {
314 pub fn new(
315 reader_schema: &str,
316 reader_reference_schemas: &[String],
317 ccsr_client: Option<mz_ccsr::Client>,
318 confluent_wire_format: bool,
319 ) -> anyhow::Result<Self> {
320 let reader_schema = parse_schema(reader_schema, reader_reference_schemas)?;
322 let writer_schemas = ccsr_client.map(SchemaCache::new).transpose()?;
323 Ok(Self {
324 reader_schema,
325 writer_schemas,
326 confluent_wire_format,
327 })
328 }
329
330 pub async fn resolve<'a, 'b>(
331 &'a mut self,
332 mut bytes: &'b [u8],
333 ) -> anyhow::Result<anyhow::Result<(&'b [u8], &'a Schema, Option<i32>)>> {
334 let (resolved_schema, schema_id) = match &mut self.writer_schemas {
335 Some(cache) => {
336 debug_assert!(
337 self.confluent_wire_format,
338 "We should have set 'confluent_wire_format' everywhere \
339 that can lead to this branch"
340 );
341 let (schema_id, adjusted_bytes) = match crate::confluent::extract_avro_header(bytes)
343 {
344 Ok(ok) => ok,
345 Err(err) => return Ok(Err(err)),
346 };
347 bytes = adjusted_bytes;
348 let result = cache
349 .get(schema_id, &self.reader_schema)
350 .await?
352 .with_context(|| format!("failed to resolve Avro schema (id = {})", schema_id));
353 let schema = match result {
354 Ok(schema) => schema,
355 Err(err) => return Ok(Err(err)),
356 };
357 (schema, Some(schema_id))
358 }
359
360 None => {
364 if self.confluent_wire_format {
365 let (_, adjusted_bytes) = match crate::confluent::extract_avro_header(bytes) {
367 Ok(ok) => ok,
368 Err(err) => return Ok(Err(err)),
369 };
370 bytes = adjusted_bytes;
371 }
372 (&self.reader_schema, None)
373 }
374 };
375 Ok(Ok((bytes, resolved_schema, schema_id)))
376 }
377}
378
379impl fmt::Debug for ConfluentAvroResolver {
380 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
381 f.debug_struct("ConfluentAvroResolver")
382 .field("reader_schema", &self.reader_schema)
383 .field(
384 "write_schema",
385 if self.writer_schemas.is_some() {
386 &"some"
387 } else {
388 &"none"
389 },
390 )
391 .finish()
392 }
393}
394
395#[derive(Debug)]
396struct SchemaCache {
397 cache: BTreeMap<i32, Result<Schema, AvroError>>,
398 ccsr_client: Arc<mz_ccsr::Client>,
399}
400
401impl SchemaCache {
402 fn new(ccsr_client: mz_ccsr::Client) -> Result<SchemaCache, anyhow::Error> {
403 Ok(SchemaCache {
404 cache: BTreeMap::new(),
405 ccsr_client: Arc::new(ccsr_client),
406 })
407 }
408
409 async fn get(
419 &mut self,
420 id: i32,
421 reader_schema: &Schema,
422 ) -> anyhow::Result<anyhow::Result<&Schema>> {
423 let entry = match self.cache.entry(id) {
424 Entry::Occupied(o) => o.into_mut(),
425 Entry::Vacant(v) => {
426 let ccsr_client = Arc::clone(&self.ccsr_client);
430
431 let (primary_subject, reference_subjects) = Retry::default()
433 .max_duration(ccsr_client.timeout() * 2)
435 .retry_async_canceling(move |state| {
437 let ccsr_client = Arc::clone(&ccsr_client);
438 async move {
439 let res = ccsr_client.get_subject_and_references_by_id(id).await;
440 match res {
441 Err(e) => {
442 if let Some(timeout) = state.next_backoff {
443 warn!(
444 "transient failure fetching \
445 schema id {}: {:?}, retrying in {:?}",
446 id, e, timeout
447 );
448 }
449 Err(anyhow::Error::from(e))
450 }
451 _ => Ok(res?),
452 }
453 }
454 })
455 .run_in_task(|| format!("fetch_avro_schema:{}", id))
456 .await?;
457
458 let result = Self::parse_with_references(
465 &primary_subject,
466 &reference_subjects,
467 reader_schema,
468 );
469 v.insert(result)
470 }
471 };
472 Ok(entry.as_ref().map_err(|e| anyhow::Error::new(e.clone())))
473 }
474
475 fn parse_with_references(
477 primary_subject: &mz_ccsr::Subject,
478 reference_subjects: &[mz_ccsr::Subject],
479 reader_schema: &Schema,
480 ) -> Result<Schema, AvroError> {
481 let mut reference_schemas: Vec<Schema> = Vec::with_capacity(reference_subjects.len());
483 for subject in reference_subjects {
484 let ref_json: serde_json::Value = serde_json::from_str(&subject.schema.raw)
485 .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
486 let parsed = Schema::parse_with_references(&ref_json, &reference_schemas)?;
487 reference_schemas.push(parsed);
488 }
489
490 let primary_value: serde_json::Value = serde_json::from_str(&primary_subject.schema.raw)
492 .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
493 let schema = Schema::parse_with_references(&primary_value, &reference_schemas)?;
494
495 let resolved = resolve_schemas(&schema, reader_schema)?;
498 Ok(resolved)
499 }
500}