1use std::collections::btree_map::Entry;
39use std::collections::{BTreeMap, BTreeSet};
40use std::fmt;
41use std::str::FromStr;
42use std::sync::Arc;
43
44use anyhow::{Context, anyhow, bail};
45use mz_avro::error::Error as AvroError;
46use mz_avro::schema::{Schema, SchemaNode, SchemaPiece, SchemaPieceOrNamed, resolve_schemas};
47use mz_ore::cast::CastFrom;
48use mz_ore::collections::CollectionExt;
49use mz_ore::future::OreFutureExt;
50use mz_ore::retry::Retry;
51use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
52use mz_repr::adt::timestamp::TimestampPrecision;
53use mz_repr::{ColumnName, ColumnType, RelationDesc, ScalarType};
54use tracing::warn;
55
56use crate::avro::is_null;
57
58pub fn parse_schema(schema: &str) -> anyhow::Result<Schema> {
59 let schema = serde_json::from_str(schema)?;
60 Ok(Schema::parse(&schema)?)
61}
62
63pub fn schema_to_relationdesc(schema: Schema) -> Result<RelationDesc, anyhow::Error> {
66 Ok(RelationDesc::from_names_and_types(validate_schema_1(
69 schema.top_node(),
70 )?))
71}
72
73fn validate_schema_1(schema: SchemaNode) -> anyhow::Result<Vec<(ColumnName, ColumnType)>> {
76 let mut columns = vec![];
77 let mut seen_avro_nodes = Default::default();
78 match schema.inner {
79 SchemaPiece::Record { fields, .. } => {
80 for f in fields {
81 columns.extend(get_named_columns(
82 &mut seen_avro_nodes,
83 schema.step(&f.schema),
84 Some(&f.name),
85 )?);
86 }
87 }
88 _ => {
89 columns.extend(get_named_columns(&mut seen_avro_nodes, schema, None)?);
90 }
91 }
92 Ok(columns)
93}
94
95fn get_union_columns<'a>(
98 seen_avro_nodes: &mut BTreeSet<usize>,
99 schema: SchemaNode<'a>,
100 base_name: Option<&str>,
101) -> anyhow::Result<Vec<(ColumnName, ColumnType)>> {
102 let us = match schema.inner {
103 SchemaPiece::Union(us) => us,
104 _ => panic!("This function should only be called on unions."),
105 };
106 let mut columns = vec![];
107 let vs = us.variants();
108 if vs.is_empty() || (vs.len() == 1 && is_null(&vs[0])) {
109 bail!(anyhow!("Empty or null-only unions are not supported"));
110 } else {
111 for (i, v) in vs.iter().filter(|v| !is_null(v)).enumerate() {
112 let named_idx = match v {
113 SchemaPieceOrNamed::Named(idx) => Some(*idx),
114 _ => None,
115 };
116 if let Some(named_idx) = named_idx {
117 if !seen_avro_nodes.insert(named_idx) {
118 bail!(
119 "Recursive types are not supported: {}",
120 v.get_human_name(schema.root)
121 );
122 }
123 }
124 let node = schema.step(v);
125 if let SchemaPiece::Union(_) = node.inner {
126 unreachable!("Internal error: directly nested avro union!");
127 }
128
129 let name = if vs.len() == 1 || (vs.len() == 2 && vs.iter().any(is_null)) {
130 base_name
133 .map(|n| n.to_owned())
134 .or_else(|| {
135 v.get_piece_and_name(schema.root)
136 .1
137 .map(|full_name| full_name.base_name().to_owned())
138 })
139 .unwrap_or_else(|| "?column?".into())
140 } else {
141 base_name
145 .map(|n| format!("{}{}", n, i + 1))
146 .or_else(|| {
147 v.get_piece_and_name(schema.root)
148 .1
149 .map(|full_name| full_name.base_name().to_owned())
150 })
151 .unwrap_or_else(|| "?column?".into())
152 };
153
154 let ty = validate_schema_2(seen_avro_nodes, node)?;
158 columns.push((name.into(), ty.nullable(vs.len() > 1)));
159 if let Some(named_idx) = named_idx {
160 seen_avro_nodes.remove(&named_idx);
161 }
162 }
163 }
164 Ok(columns)
165}
166
167fn get_named_columns<'a>(
168 seen_avro_nodes: &mut BTreeSet<usize>,
169 schema: SchemaNode<'a>,
170 base_name: Option<&str>,
171) -> anyhow::Result<Vec<(ColumnName, ColumnType)>> {
172 if let SchemaPiece::Union(_) = schema.inner {
173 get_union_columns(seen_avro_nodes, schema, base_name)
174 } else {
175 let scalar_type = validate_schema_2(seen_avro_nodes, schema)?;
176 Ok(vec![(
177 base_name.unwrap_or("?column?").into(),
180 scalar_type.nullable(false),
181 )])
182 }
183}
184
185fn validate_schema_2(
189 seen_avro_nodes: &mut BTreeSet<usize>,
190 schema: SchemaNode,
191) -> anyhow::Result<ScalarType> {
192 Ok(match schema.inner {
193 SchemaPiece::Union(_) => {
194 let columns = get_union_columns(seen_avro_nodes, schema, None)?;
195 if columns.len() != 1 {
196 bail!("Union of more than one non-null type not valid here");
197 }
198 let (_column_name, column_type) = columns.into_element();
199 column_type.scalar_type
206 }
207 SchemaPiece::Null => bail!("null outside of union types is not supported"),
208 SchemaPiece::Boolean => ScalarType::Bool,
209 SchemaPiece::Int => ScalarType::Int32,
210 SchemaPiece::Long => ScalarType::Int64,
211 SchemaPiece::Float => ScalarType::Float32,
212 SchemaPiece::Double => ScalarType::Float64,
213 SchemaPiece::Date => ScalarType::Date,
214 SchemaPiece::TimestampMilli => ScalarType::Timestamp {
215 precision: Some(TimestampPrecision::try_from(3).unwrap()),
216 },
217 SchemaPiece::TimestampMicro => ScalarType::Timestamp {
218 precision: Some(TimestampPrecision::try_from(6).unwrap()),
219 },
220 SchemaPiece::Decimal {
221 precision, scale, ..
222 } => {
223 if *precision > usize::cast_from(NUMERIC_DATUM_MAX_PRECISION) {
224 bail!(
225 "decimals with precision greater than {} are not supported",
226 NUMERIC_DATUM_MAX_PRECISION
227 )
228 }
229 ScalarType::Numeric {
230 max_scale: Some(NumericMaxScale::try_from(*scale)?),
231 }
232 }
233 SchemaPiece::Bytes | SchemaPiece::Fixed { .. } => ScalarType::Bytes,
234 SchemaPiece::String | SchemaPiece::Enum { .. } => ScalarType::String,
235
236 SchemaPiece::Json => ScalarType::Jsonb,
237 SchemaPiece::Uuid => ScalarType::Uuid,
238 SchemaPiece::Record { fields, .. } => {
239 let mut columns = vec![];
240 for f in fields {
241 let named_idx = match &f.schema {
242 SchemaPieceOrNamed::Named(idx) => Some(*idx),
243 _ => None,
244 };
245 if let Some(named_idx) = named_idx {
246 if !seen_avro_nodes.insert(named_idx) {
247 bail!(
248 "Recursive types are not supported: {}",
249 f.schema.get_human_name(schema.root)
250 );
251 }
252 }
253 let next_node = schema.step(&f.schema);
254 columns.extend(
255 get_named_columns(seen_avro_nodes, next_node, Some(&f.name))?.into_iter(),
256 );
257 if let Some(named_idx) = named_idx {
258 seen_avro_nodes.remove(&named_idx);
259 }
260 }
261 ScalarType::Record {
262 fields: columns.into(),
263 custom_id: None,
264 }
265 }
266 SchemaPiece::Array(inner) => {
267 let named_idx = match inner.as_ref() {
268 SchemaPieceOrNamed::Named(idx) => Some(*idx),
269 _ => None,
270 };
271 if let Some(named_idx) = named_idx {
272 if !seen_avro_nodes.insert(named_idx) {
273 bail!(
274 "Recursive types are not supported: {}",
275 inner.get_human_name(schema.root)
276 );
277 }
278 }
279 let next_node = schema.step(inner);
280 let ret = ScalarType::List {
281 element_type: Box::new(validate_schema_2(seen_avro_nodes, next_node)?),
282 custom_id: None,
283 };
284 if let Some(named_idx) = named_idx {
285 seen_avro_nodes.remove(&named_idx);
286 }
287 ret
288 }
289 SchemaPiece::Map(inner) => ScalarType::Map {
290 value_type: Box::new(validate_schema_2(seen_avro_nodes, schema.step(inner))?),
291 custom_id: None,
292 },
293
294 _ => bail!("Unsupported type in schema: {:?}", schema.inner),
295 })
296}
297
298pub struct ConfluentAvroResolver {
299 reader_schema: Schema,
300 writer_schemas: Option<SchemaCache>,
301 confluent_wire_format: bool,
302}
303
304impl ConfluentAvroResolver {
305 pub fn new(
306 reader_schema: &str,
307 ccsr_client: Option<mz_ccsr::Client>,
308 confluent_wire_format: bool,
309 ) -> anyhow::Result<Self> {
310 let reader_schema = parse_schema(reader_schema)?;
311 let writer_schemas = ccsr_client.map(SchemaCache::new).transpose()?;
312 Ok(Self {
313 reader_schema,
314 writer_schemas,
315 confluent_wire_format,
316 })
317 }
318
319 pub async fn resolve<'a, 'b>(
320 &'a mut self,
321 mut bytes: &'b [u8],
322 ) -> anyhow::Result<anyhow::Result<(&'b [u8], &'a Schema, Option<i32>)>> {
323 let (resolved_schema, schema_id) = match &mut self.writer_schemas {
324 Some(cache) => {
325 debug_assert!(
326 self.confluent_wire_format,
327 "We should have set 'confluent_wire_format' everywhere \
328 that can lead to this branch"
329 );
330 let (schema_id, adjusted_bytes) = match crate::confluent::extract_avro_header(bytes)
332 {
333 Ok(ok) => ok,
334 Err(err) => return Ok(Err(err)),
335 };
336 bytes = adjusted_bytes;
337 let result = cache
338 .get(schema_id, &self.reader_schema)
339 .await?
341 .with_context(|| format!("failed to resolve Avro schema (id = {})", schema_id));
342 let schema = match result {
343 Ok(schema) => schema,
344 Err(err) => return Ok(Err(err)),
345 };
346 (schema, Some(schema_id))
347 }
348
349 None => {
353 if self.confluent_wire_format {
354 let (_, adjusted_bytes) = match crate::confluent::extract_avro_header(bytes) {
356 Ok(ok) => ok,
357 Err(err) => return Ok(Err(err)),
358 };
359 bytes = adjusted_bytes;
360 }
361 (&self.reader_schema, None)
362 }
363 };
364 Ok(Ok((bytes, resolved_schema, schema_id)))
365 }
366}
367
368impl fmt::Debug for ConfluentAvroResolver {
369 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
370 f.debug_struct("ConfluentAvroResolver")
371 .field("reader_schema", &self.reader_schema)
372 .field(
373 "write_schema",
374 if self.writer_schemas.is_some() {
375 &"some"
376 } else {
377 &"none"
378 },
379 )
380 .finish()
381 }
382}
383
384#[derive(Debug)]
385struct SchemaCache {
386 cache: BTreeMap<i32, Result<Schema, AvroError>>,
387 ccsr_client: Arc<mz_ccsr::Client>,
388}
389
390impl SchemaCache {
391 fn new(ccsr_client: mz_ccsr::Client) -> Result<SchemaCache, anyhow::Error> {
392 Ok(SchemaCache {
393 cache: BTreeMap::new(),
394 ccsr_client: Arc::new(ccsr_client),
395 })
396 }
397
398 async fn get(
404 &mut self,
405 id: i32,
406 reader_schema: &Schema,
407 ) -> anyhow::Result<anyhow::Result<&Schema>> {
408 let entry = match self.cache.entry(id) {
409 Entry::Occupied(o) => o.into_mut(),
410 Entry::Vacant(v) => {
411 let ccsr_client = Arc::clone(&self.ccsr_client);
415 let response = Retry::default()
416 .max_duration(ccsr_client.timeout() * 2)
418 .retry_async_canceling(move |state| {
420 let ccsr_client = Arc::clone(&ccsr_client);
421 async move {
422 let res = ccsr_client.get_schema_by_id(id).await;
423 match res {
424 Err(e) => {
425 if let Some(timeout) = state.next_backoff {
426 warn!(
427 "transient failure fetching \
428 schema id {}: {:?}, retrying in {:?}",
429 id, e, timeout
430 );
431 }
432 Err(anyhow::Error::from(e))
433 }
434 _ => Ok(res?),
435 }
436 }
437 })
438 .run_in_task(|| format!("fetch_avro_schema:{}", id))
439 .await?;
440 let result = Schema::from_str(&response.raw).and_then(|schema| {
447 let resolved = resolve_schemas(&schema, reader_schema)?;
450 Ok(resolved)
451 });
452 v.insert(result)
453 }
454 };
455 Ok(entry.as_ref().map_err(|e| anyhow::Error::new(e.clone())))
456 }
457}