1use std::collections::btree_map::Entry;
39use std::collections::{BTreeMap, BTreeSet};
40use std::fmt;
41use std::sync::Arc;
42
43use anyhow::{Context, anyhow, bail};
44use mz_avro::error::Error as AvroError;
45use mz_avro::schema::{
46 ParseSchemaError, Schema, SchemaNode, SchemaPiece, SchemaPieceOrNamed, resolve_schemas,
47};
48use mz_ore::cast::CastFrom;
49use mz_ore::collections::CollectionExt;
50use mz_ore::future::OreFutureExt;
51use mz_ore::retry::Retry;
52use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
53use mz_repr::adt::timestamp::TimestampPrecision;
54use mz_repr::{ColumnName, RelationDesc, SqlColumnType, SqlScalarType, UNKNOWN_COLUMN_NAME};
55use tracing::warn;
56use uuid::Uuid;
57
58use crate::avro::is_null;
59
60pub fn parse_schema(schema: &str, references: &[String]) -> anyhow::Result<Schema> {
61 let schema: serde_json::Value = serde_json::from_str(schema)?;
62 let mut parsed_refs: Vec<Schema> = Vec::with_capacity(references.len());
65 for reference in references {
66 let ref_json: serde_json::Value = serde_json::from_str(reference)?;
67 let parsed = Schema::parse_with_references(&ref_json, &parsed_refs)?;
68 parsed_refs.push(parsed);
69 }
70 Ok(Schema::parse_with_references(&schema, &parsed_refs)?)
71}
72
73pub fn schema_to_relationdesc(schema: Schema) -> Result<RelationDesc, anyhow::Error> {
76 Ok(RelationDesc::from_names_and_types(validate_schema_1(
79 schema.top_node(),
80 )?))
81}
82
83fn validate_schema_1(schema: SchemaNode) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
86 let mut columns = vec![];
87 let mut seen_avro_nodes = Default::default();
88 match schema.inner {
89 SchemaPiece::Record { fields, .. } => {
90 for f in fields {
91 columns.extend(get_named_columns(
92 &mut seen_avro_nodes,
93 schema.step(&f.schema),
94 Some(&f.name),
95 )?);
96 }
97 }
98 _ => {
99 columns.extend(get_named_columns(&mut seen_avro_nodes, schema, None)?);
100 }
101 }
102 Ok(columns)
103}
104
105fn get_union_columns<'a>(
108 seen_avro_nodes: &mut BTreeSet<usize>,
109 schema: SchemaNode<'a>,
110 base_name: Option<&str>,
111) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
112 let us = match schema.inner {
113 SchemaPiece::Union(us) => us,
114 _ => panic!("This function should only be called on unions."),
115 };
116 let mut columns = vec![];
117 let vs = us.variants();
118 if vs.is_empty() || (vs.len() == 1 && is_null(&vs[0])) {
119 bail!(anyhow!("Empty or null-only unions are not supported"));
120 } else {
121 for (i, v) in vs.iter().filter(|v| !is_null(v)).enumerate() {
122 with_recursion_guard(seen_avro_nodes, schema.root, v, |seen| {
123 let node = schema.step(v);
124 if let SchemaPiece::Union(_) = node.inner {
125 unreachable!("Internal error: directly nested avro union!");
126 }
127
128 let name = if vs.len() == 1 || (vs.len() == 2 && vs.iter().any(is_null)) {
129 base_name
132 .map(|n| n.to_owned())
133 .or_else(|| {
134 v.get_piece_and_name(schema.root)
135 .1
136 .map(|full_name| full_name.base_name().to_owned())
137 })
138 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into())
139 } else {
140 base_name
144 .map(|n| format!("{}{}", n, i + 1))
145 .or_else(|| {
146 v.get_piece_and_name(schema.root)
147 .1
148 .map(|full_name| full_name.base_name().to_owned())
149 })
150 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into())
151 };
152
153 let ty = validate_schema_2(seen, node)?;
157 columns.push((name.into(), ty.nullable(vs.len() > 1)));
158 Ok(())
159 })?;
160 }
161 }
162 Ok(columns)
163}
164
165fn get_named_columns<'a>(
166 seen_avro_nodes: &mut BTreeSet<usize>,
167 schema: SchemaNode<'a>,
168 base_name: Option<&str>,
169) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
170 if let SchemaPiece::Union(_) = schema.inner {
171 get_union_columns(seen_avro_nodes, schema, base_name)
172 } else {
173 let scalar_type = validate_schema_2(seen_avro_nodes, schema)?;
174 Ok(vec![(
175 base_name.unwrap_or(UNKNOWN_COLUMN_NAME).into(),
178 scalar_type.nullable(false),
179 )])
180 }
181}
182
183fn validate_schema_2(
187 seen_avro_nodes: &mut BTreeSet<usize>,
188 schema: SchemaNode,
189) -> anyhow::Result<SqlScalarType> {
190 Ok(match schema.inner {
191 SchemaPiece::Union(_) => {
192 let columns = get_union_columns(seen_avro_nodes, schema, None)?;
193 if columns.len() != 1 {
194 bail!("Union of more than one non-null type not valid here");
195 }
196 let (_column_name, column_type) = columns.into_element();
197 column_type.scalar_type
204 }
205 SchemaPiece::Null => bail!("null outside of union types is not supported"),
206 SchemaPiece::Boolean => SqlScalarType::Bool,
207 SchemaPiece::Int => SqlScalarType::Int32,
208 SchemaPiece::Long => SqlScalarType::Int64,
209 SchemaPiece::Float => SqlScalarType::Float32,
210 SchemaPiece::Double => SqlScalarType::Float64,
211 SchemaPiece::Date => SqlScalarType::Date,
212 SchemaPiece::TimestampMilli => SqlScalarType::Timestamp {
213 precision: Some(TimestampPrecision::try_from(3).unwrap()),
214 },
215 SchemaPiece::TimestampMicro => SqlScalarType::Timestamp {
216 precision: Some(TimestampPrecision::try_from(6).unwrap()),
217 },
218 SchemaPiece::Decimal {
219 precision, scale, ..
220 } => {
221 if *precision > usize::cast_from(NUMERIC_DATUM_MAX_PRECISION) {
222 bail!(
223 "decimals with precision greater than {} are not supported",
224 NUMERIC_DATUM_MAX_PRECISION
225 )
226 }
227 SqlScalarType::Numeric {
228 max_scale: Some(NumericMaxScale::try_from(*scale)?),
229 }
230 }
231 SchemaPiece::Bytes | SchemaPiece::Fixed { .. } => SqlScalarType::Bytes,
232 SchemaPiece::String | SchemaPiece::Enum { .. } => SqlScalarType::String,
233
234 SchemaPiece::Json => SqlScalarType::Jsonb,
235 SchemaPiece::Uuid => SqlScalarType::Uuid,
236 SchemaPiece::Record { fields, .. } => {
237 let mut columns = vec![];
238 for f in fields {
239 with_recursion_guard(seen_avro_nodes, schema.root, &f.schema, |seen| {
240 columns.extend(get_named_columns(
241 seen,
242 schema.step(&f.schema),
243 Some(&f.name),
244 )?);
245 Ok(())
246 })?;
247 }
248 SqlScalarType::Record {
249 fields: columns.into(),
250 custom_id: None,
251 }
252 }
253 SchemaPiece::Array(inner) => {
254 with_recursion_guard(seen_avro_nodes, schema.root, inner.as_ref(), |seen| {
255 Ok(SqlScalarType::List {
256 element_type: Box::new(validate_schema_2(seen, schema.step(inner))?),
257 custom_id: None,
258 })
259 })?
260 }
261 SchemaPiece::Map(inner) => {
262 with_recursion_guard(seen_avro_nodes, schema.root, inner.as_ref(), |seen| {
263 Ok(SqlScalarType::Map {
264 value_type: Box::new(validate_schema_2(seen, schema.step(inner))?),
265 custom_id: None,
266 })
267 })?
268 }
269 _ => bail!("Unsupported type in schema: {:?}", schema.inner),
270 })
271}
272
273fn with_recursion_guard<T>(
277 seen: &mut BTreeSet<usize>,
278 root: &Schema,
279 node: &SchemaPieceOrNamed,
280 f: impl FnOnce(&mut BTreeSet<usize>) -> anyhow::Result<T>,
281) -> anyhow::Result<T> {
282 let named_idx = match node {
283 SchemaPieceOrNamed::Named(idx) => Some(*idx),
284 SchemaPieceOrNamed::Piece(_) => None,
285 };
286 if let Some(named_idx) = named_idx {
287 if !seen.insert(named_idx) {
288 bail!(
289 "Recursive types are not supported: {}",
290 node.get_human_name(root)
291 );
292 }
293 }
294 let result = f(seen);
295 if let Some(named_idx) = named_idx {
296 seen.remove(&named_idx);
297 }
298 result
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304
305 fn assert_recursive(schema: &str) {
310 let err = schema_to_relationdesc(parse_schema(schema, &[]).expect("schema should parse"))
311 .expect_err("recursive schema should be rejected");
312 assert!(
313 err.to_string()
314 .contains("Recursive types are not supported"),
315 "unexpected error: {err}"
316 );
317 }
318
319 #[mz_ore::test]
320 fn recursive_record_field() {
321 assert_recursive(r#"{"type":"record","name":"a","fields":[{"name":"f","type":"a"}]}"#);
322 }
323
324 #[mz_ore::test]
325 fn recursive_union() {
326 assert_recursive(
327 r#"{"type":"record","name":"a","fields":[{"name":"f","type":["a","null"]}]}"#,
328 );
329 }
330
331 #[mz_ore::test]
332 fn recursive_array() {
333 assert_recursive(
334 r#"{"type":"record","name":"a","fields":[{"name":"f","type":{"type":"array","items":"a"}}]}"#,
335 );
336 }
337
338 #[mz_ore::test]
339 fn recursive_map() {
340 assert_recursive(
341 r#"{"type":"record","name":"a","fields":[{"name":"f","type":{"type":"map","values":"a"}}]}"#,
342 );
343 }
344
345 #[mz_ore::test]
349 fn repeated_named_type_is_not_recursive() {
350 let schema = r#"{
351 "type": "record",
352 "name": "outer",
353 "fields": [
354 {"name": "a", "type": {"type": "record", "name": "inner", "fields": [{"name": "x", "type": "int"}]}},
355 {"name": "b", "type": "inner"}
356 ]
357 }"#;
358 let desc = schema_to_relationdesc(parse_schema(schema, &[]).expect("schema should parse"))
359 .expect("diamond reuse of a named type should be allowed");
360 assert_eq!(desc.arity(), 2);
361 }
362
363 #[mz_ore::test]
364 fn registry_name_from_schema_arn_commercial() {
365 assert_eq!(
366 registry_name_from_schema_arn(
367 "arn:aws:glue:us-east-1:123456789012:schema/myreg/myschema"
368 ),
369 Some("myreg")
370 );
371 }
372
373 #[mz_ore::test]
374 fn registry_name_from_schema_arn_china_partition() {
375 assert_eq!(
376 registry_name_from_schema_arn(
377 "arn:aws-cn:glue:cn-north-1:123456789012:schema/myreg/myschema"
378 ),
379 Some("myreg")
380 );
381 }
382
383 #[mz_ore::test]
384 fn registry_name_from_schema_arn_govcloud_partition() {
385 assert_eq!(
386 registry_name_from_schema_arn(
387 "arn:aws-us-gov:glue:us-gov-west-1:123456789012:schema/myreg/myschema"
388 ),
389 Some("myreg")
390 );
391 }
392
393 #[mz_ore::test]
394 fn registry_name_from_schema_arn_rejects_non_aws_partition() {
395 assert_eq!(
396 registry_name_from_schema_arn(
397 "arn:gcp:glue:us-east-1:123456789012:schema/myreg/myschema"
398 ),
399 None
400 );
401 }
402
403 #[mz_ore::test]
404 fn registry_name_from_schema_arn_rejects_missing_arn_prefix() {
405 assert_eq!(registry_name_from_schema_arn("schema/myreg/myschema"), None);
408 }
409
410 #[mz_ore::test]
411 fn registry_name_from_schema_arn_rejects_non_glue_service() {
412 assert_eq!(
413 registry_name_from_schema_arn(
414 "arn:aws:s3:us-east-1:123456789012:schema/myreg/myschema"
415 ),
416 None
417 );
418 }
419
420 #[mz_ore::test]
421 fn registry_name_from_schema_arn_rejects_missing_schema_segment() {
422 assert_eq!(
423 registry_name_from_schema_arn("arn:aws:glue:us-east-1:123456789012:table/myreg/foo"),
424 None
425 );
426 }
427
428 #[mz_ore::test]
429 fn registry_name_from_schema_arn_rejects_empty_registry() {
430 assert_eq!(
431 registry_name_from_schema_arn("arn:aws:glue:us-east-1:123456789012:schema//myschema"),
432 None
433 );
434 }
435
436 #[mz_ore::test]
437 fn registry_name_from_schema_arn_allows_schema_name_with_slashes() {
438 assert_eq!(
441 registry_name_from_schema_arn(
442 "arn:aws:glue:us-east-1:123456789012:schema/myreg/path/like/name"
443 ),
444 Some("myreg")
445 );
446 }
447}
448
449#[derive(Debug, Clone, Copy, PartialEq, Eq)]
455pub enum WriterSchemaKey {
456 Confluent(i32),
457 Glue(Uuid),
458}
459
460impl fmt::Display for WriterSchemaKey {
461 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462 match self {
463 WriterSchemaKey::Confluent(id) => write!(f, "Confluent schema id {}", id),
464 WriterSchemaKey::Glue(uuid) => write!(f, "Glue schema-version {}", uuid),
465 }
466 }
467}
468
469pub enum WriterSchemaProvider {
478 None,
481 Confluent { cache: Option<SchemaCache> },
485 Glue { cache: Option<GlueSchemaCache> },
488}
489
490impl fmt::Debug for WriterSchemaProvider {
491 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
492 let tag = match self {
493 WriterSchemaProvider::None => "none",
494 WriterSchemaProvider::Confluent { cache: None } => "confluent (no cache)",
495 WriterSchemaProvider::Confluent { cache: Some(_) } => "confluent",
496 WriterSchemaProvider::Glue { cache: None } => "glue (no cache)",
497 WriterSchemaProvider::Glue { cache: Some(_) } => "glue",
498 };
499 f.debug_tuple("WriterSchemaProvider").field(&tag).finish()
500 }
501}
502
503impl WriterSchemaProvider {
504 pub fn confluent(ccsr_client: Option<mz_ccsr::Client>) -> Self {
508 let cache = ccsr_client.map(SchemaCache::new);
509 WriterSchemaProvider::Confluent { cache }
510 }
511
512 pub fn glue(
522 client_with_registry: Option<(mz_aws_glue_schema_registry::Client, String)>,
523 ) -> Self {
524 let cache = client_with_registry.map(|(c, registry)| GlueSchemaCache::new(c, registry));
525 WriterSchemaProvider::Glue { cache }
526 }
527}
528
529pub struct AvroSchemaResolver {
530 reader_schema: Schema,
531 writer_schemas: WriterSchemaProvider,
532}
533
534impl AvroSchemaResolver {
535 pub fn new(
536 reader_schema: &str,
537 reader_reference_schemas: &[String],
538 writer_schemas: WriterSchemaProvider,
539 ) -> anyhow::Result<Self> {
540 let reader_schema = parse_schema(reader_schema, reader_reference_schemas)?;
542 Ok(Self {
543 reader_schema,
544 writer_schemas,
545 })
546 }
547
548 pub async fn resolve<'a, 'b>(
549 &'a mut self,
550 mut bytes: &'b [u8],
551 ) -> anyhow::Result<anyhow::Result<(&'b [u8], &'a Schema, Option<WriterSchemaKey>)>> {
552 let (resolved_schema, key) = match &mut self.writer_schemas {
553 WriterSchemaProvider::None => (&self.reader_schema, None),
554
555 WriterSchemaProvider::Confluent { cache: None } => {
556 match crate::confluent::extract_avro_header(bytes) {
560 Ok((_id, adjusted_bytes)) => {
561 bytes = adjusted_bytes;
562 (&self.reader_schema, None)
563 }
564 Err(err) => return Ok(Err(err)),
565 }
566 }
567
568 WriterSchemaProvider::Confluent { cache: Some(cache) } => {
569 let (id, adjusted_bytes) = match crate::confluent::extract_avro_header(bytes) {
570 Ok(ok) => ok,
571 Err(err) => return Ok(Err(err)),
572 };
573 bytes = adjusted_bytes;
574 let result = cache
575 .get(id, &self.reader_schema)
576 .await?
581 .with_context(|| format!("failed to resolve Avro schema (id = {id})"));
582 let schema = match result {
583 Ok(schema) => schema,
584 Err(err) => return Ok(Err(err)),
585 };
586 (schema, Some(WriterSchemaKey::Confluent(id)))
587 }
588
589 WriterSchemaProvider::Glue { cache: None } => {
590 match crate::glue::extract_avro_header(bytes) {
592 Ok((_uuid, adjusted_bytes)) => {
593 bytes = adjusted_bytes;
594 (&self.reader_schema, None)
595 }
596 Err(err) => return Ok(Err(err)),
597 }
598 }
599
600 WriterSchemaProvider::Glue { cache: Some(cache) } => {
601 let (uuid, adjusted_bytes) = match crate::glue::extract_avro_header(bytes) {
602 Ok(ok) => ok,
603 Err(err) => return Ok(Err(err)),
604 };
605 bytes = adjusted_bytes;
606 let result = cache
607 .get(uuid, &self.reader_schema)
608 .await?
609 .with_context(|| format!("failed to resolve Avro schema (uuid = {uuid})"));
610 let schema = match result {
611 Ok(schema) => schema,
612 Err(err) => return Ok(Err(err)),
613 };
614 (schema, Some(WriterSchemaKey::Glue(uuid)))
615 }
616 };
617 Ok(Ok((bytes, resolved_schema, key)))
618 }
619}
620
621impl fmt::Debug for AvroSchemaResolver {
622 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
623 f.debug_struct("AvroSchemaResolver")
624 .field("reader_schema", &self.reader_schema)
625 .field("writer_schemas", &self.writer_schemas)
626 .finish()
627 }
628}
629
630#[derive(Debug)]
640pub struct GlueSchemaCache {
641 cache: BTreeMap<Uuid, Result<Schema, AvroError>>,
642 glue_client: mz_aws_glue_schema_registry::Client,
643 expected_registry: String,
647}
648
649impl GlueSchemaCache {
650 fn new(glue_client: mz_aws_glue_schema_registry::Client, expected_registry: String) -> Self {
651 GlueSchemaCache {
652 cache: BTreeMap::new(),
653 glue_client,
654 expected_registry,
655 }
656 }
657
658 async fn get(
667 &mut self,
668 uuid: Uuid,
669 reader_schema: &Schema,
670 ) -> anyhow::Result<anyhow::Result<&Schema>> {
671 let entry = match self.cache.entry(uuid) {
672 Entry::Occupied(o) => o.into_mut(),
673 Entry::Vacant(v) => {
674 let parsed: Result<Schema, AvroError> = match self
675 .glue_client
676 .get_schema_version_by_id(uuid)
677 .await
678 {
679 Ok(version) => {
680 Self::parse_version(version, uuid, &self.expected_registry, reader_schema)
681 }
682 Err(mz_aws_glue_schema_registry::GetSchemaVersionError::NotFound) => Err(
686 ParseSchemaError::new(format!("Glue schema version {uuid} not found"))
687 .into(),
688 ),
689 Err(e @ mz_aws_glue_schema_registry::GetSchemaVersionError::Other(_)) => {
695 return Err(e.into());
696 }
697 };
698 v.insert(parsed)
699 }
700 };
701 Ok(entry.as_ref().map_err(|e| anyhow::Error::new(e.clone())))
702 }
703
704 fn parse_version(
709 version: mz_aws_glue_schema_registry::SchemaVersion,
710 uuid: Uuid,
711 expected_registry: &str,
712 reader_schema: &Schema,
713 ) -> Result<Schema, AvroError> {
714 use mz_aws_glue_schema_registry::SchemaVersionLifecycleStatus;
715
716 if !matches!(
720 version.lifecycle_status,
721 Some(SchemaVersionLifecycleStatus::Available)
722 ) {
723 return Err(ParseSchemaError::new(format!(
724 "Glue schema version {uuid} is not Available (status: {:?}); \
725 refusing to decode",
726 version.lifecycle_status
727 ))
728 .into());
729 }
730 let definition = version.definition.ok_or_else(|| {
731 ParseSchemaError::new(format!(
732 "Glue schema version {uuid} returned without a definition"
733 ))
734 })?;
735 let arn = version.schema_arn.as_deref().ok_or_else(|| {
740 ParseSchemaError::new(format!(
741 "Glue schema version {uuid} returned without a SchemaArn; \
742 cannot verify registry membership"
743 ))
744 })?;
745 let actual_registry = registry_name_from_schema_arn(arn).ok_or_else(|| {
746 ParseSchemaError::new(format!(
747 "Glue SchemaArn {arn:?} did not match the expected \
748 arn:aws[-...]:glue:<region>:<account>:schema/<registry>/<schema> form"
749 ))
750 })?;
751 if actual_registry != expected_registry {
752 return Err(ParseSchemaError::new(format!(
753 "Glue schema version {uuid} lives in registry {actual_registry:?} \
754 but this source is configured for registry {expected:?}; \
755 refusing to decode",
756 expected = expected_registry,
757 ))
758 .into());
759 }
760 let value: serde_json::Value = serde_json::from_str(&definition)
761 .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {e}")))?;
762 let schema = Schema::parse_with_references(&value, &[])?;
763 resolve_schemas(&schema, reader_schema)
764 }
765}
766
767fn registry_name_from_schema_arn(arn: &str) -> Option<&str> {
778 let rest = arn.strip_prefix("arn:")?;
779 let (partition, after_partition) = rest.split_once(":glue:")?;
780 if !partition.starts_with("aws") {
781 return None;
782 }
783 let (_, after_schema) = after_partition.split_once(":schema/")?;
784 let (registry, _) = after_schema.split_once('/')?;
785 if registry.is_empty() {
786 return None;
787 }
788 Some(registry)
789}
790
791#[derive(Debug)]
796pub struct SchemaCache {
797 cache: BTreeMap<i32, Result<Schema, AvroError>>,
798 ccsr_client: Arc<mz_ccsr::Client>,
799}
800
801impl SchemaCache {
802 fn new(ccsr_client: mz_ccsr::Client) -> SchemaCache {
803 SchemaCache {
804 cache: BTreeMap::new(),
805 ccsr_client: Arc::new(ccsr_client),
806 }
807 }
808
809 async fn get(
819 &mut self,
820 id: i32,
821 reader_schema: &Schema,
822 ) -> anyhow::Result<anyhow::Result<&Schema>> {
823 let entry = match self.cache.entry(id) {
824 Entry::Occupied(o) => o.into_mut(),
825 Entry::Vacant(v) => {
826 let ccsr_client = Arc::clone(&self.ccsr_client);
830
831 let (primary_subject, reference_subjects) = Retry::default()
833 .max_duration(ccsr_client.timeout() * 2)
835 .retry_async_canceling(move |state| {
837 let ccsr_client = Arc::clone(&ccsr_client);
838 async move {
839 let res = ccsr_client.get_subject_and_references_by_id(id).await;
840 match res {
841 Err(e) => {
842 if let Some(timeout) = state.next_backoff {
843 warn!(
844 "transient failure fetching \
845 schema id {}: {:?}, retrying in {:?}",
846 id, e, timeout
847 );
848 }
849 Err(anyhow::Error::from(e))
850 }
851 _ => Ok(res?),
852 }
853 }
854 })
855 .run_in_task(|| format!("fetch_avro_schema:{}", id))
856 .await?;
857
858 let result = Self::parse_with_references(
865 &primary_subject,
866 &reference_subjects,
867 reader_schema,
868 );
869 v.insert(result)
870 }
871 };
872 Ok(entry.as_ref().map_err(|e| anyhow::Error::new(e.clone())))
873 }
874
875 fn parse_with_references(
877 primary_subject: &mz_ccsr::Subject,
878 reference_subjects: &[mz_ccsr::Subject],
879 reader_schema: &Schema,
880 ) -> Result<Schema, AvroError> {
881 let mut reference_schemas: Vec<Schema> = Vec::with_capacity(reference_subjects.len());
883 for subject in reference_subjects {
884 let ref_json: serde_json::Value = serde_json::from_str(&subject.schema.raw)
885 .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
886 let parsed = Schema::parse_with_references(&ref_json, &reference_schemas)?;
887 reference_schemas.push(parsed);
888 }
889
890 let primary_value: serde_json::Value = serde_json::from_str(&primary_subject.schema.raw)
892 .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
893 let schema = Schema::parse_with_references(&primary_value, &reference_schemas)?;
894
895 let resolved = resolve_schemas(&schema, reader_schema)?;
898 Ok(resolved)
899 }
900}