Skip to main content

mz_testdrive/action/
schema_registry.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use anyhow::{Context, bail};
13use mz_ccsr::{SchemaReference, SchemaType};
14use mz_ore::retry::Retry;
15use mz_ore::str::StrExt;
16use serde_json::Value as JsonValue;
17
18use crate::action::{ControlFlow, State};
19use crate::format::avro;
20use crate::parser::BuiltinCommand;
21
22/// Extracts the fully qualified name from an Avro schema JSON string.
23/// For record types, this combines namespace and name (e.g., "com.example.User").
24fn extract_avro_fullname(schema_json: &str) -> anyhow::Result<String> {
25    let value: JsonValue =
26        serde_json::from_str(schema_json).context("parsing schema JSON to extract fullname")?;
27
28    let name = value
29        .get("name")
30        .and_then(|v| v.as_str())
31        .ok_or_else(|| anyhow::anyhow!("schema missing 'name' field"))?;
32
33    let namespace = value.get("namespace").and_then(|v| v.as_str());
34
35    // If name contains dots, it's already fully qualified
36    if name.contains('.') {
37        Ok(name.to_string())
38    } else if let Some(ns) = namespace {
39        Ok(format!("{}.{}", ns, name))
40    } else {
41        Ok(name.to_string())
42    }
43}
44
45pub async fn run_publish(
46    mut cmd: BuiltinCommand,
47    state: &State,
48) -> Result<ControlFlow, anyhow::Error> {
49    // Parse arguments.
50    let subject = cmd.args.string("subject")?;
51    let schema_type = match cmd.args.string("schema-type")?.as_str() {
52        "avro" => SchemaType::Avro,
53        "json" => SchemaType::Json,
54        "protobuf" => SchemaType::Protobuf,
55        s => bail!("unknown schema type: {}", s),
56    };
57    let references_in = match cmd.args.opt_string("references") {
58        None => vec![],
59        Some(s) => s.split(',').map(|s| s.to_string()).collect(),
60    };
61    cmd.args.done()?;
62    let schema = cmd.input.join("\n");
63
64    // Run action.
65    println!(
66        "Publishing schema for subject {} to the schema registry...",
67        subject.quoted(),
68    );
69    let mut references = vec![];
70    for reference in references_in {
71        let subject = state
72            .ccsr_client
73            .get_subject_latest(&reference)
74            .await
75            .with_context(|| format!("fetching reference {}", reference))?;
76        let type_name = match schema_type {
77            // Extract the fully qualified Avro type name from the schema.
78            // The Schema Registry reference `name` field should be the type name
79            // (e.g., "com.example.Address"), not the subject name.
80            SchemaType::Avro => extract_avro_fullname(&subject.schema.raw).with_context(|| {
81                format!("extracting type name from reference schema {}", reference)
82            })?,
83            SchemaType::Protobuf | SchemaType::Json => subject.name,
84        };
85
86        references.push(SchemaReference {
87            name: type_name,
88            subject: reference.to_string(),
89            version: subject.version,
90        })
91    }
92    state
93        .ccsr_client
94        .publish_schema(&subject, &schema, schema_type, &references)
95        .await
96        .context("publishing schema")?;
97    Ok(ControlFlow::Continue)
98}
99
100pub async fn run_verify(
101    mut cmd: BuiltinCommand,
102    state: &State,
103) -> Result<ControlFlow, anyhow::Error> {
104    // Parse arguments.
105    let subject = cmd.args.string("subject")?;
106    match cmd.args.string("schema-type")?.as_str() {
107        "avro" => (),
108        f => bail!("unknown format: {}", f),
109    };
110    let compatibility_level = cmd.args.opt_string("compatibility-level");
111    cmd.args.done()?;
112    let expected_schema = match &cmd.input[..] {
113        [expected_schema] => {
114            avro::parse_schema(expected_schema, &[]).context("parsing expected avro schema")?
115        }
116        _ => bail!("unable to read expected schema input"),
117    };
118
119    // Run action.
120    println!(
121        "Verifying contents of latest schema for subject {} in the schema registry...",
122        subject.quoted(),
123    );
124
125    // Finding the published schema is retryable because it's published
126    // asynchronously and only after the source/sink is created.
127    let actual_schema = mz_ore::retry::Retry::default()
128        .max_duration(state.default_timeout)
129        .retry_async(|_| async {
130            match state.ccsr_client.get_schema_by_subject(&subject).await {
131                Ok(s) => mz_ore::retry::RetryResult::Ok(s.raw),
132                Err(
133                    e @ mz_ccsr::GetBySubjectError::SubjectNotFound
134                    | e @ mz_ccsr::GetBySubjectError::VersionNotFound(_),
135                ) => mz_ore::retry::RetryResult::RetryableErr(e),
136                Err(e) => mz_ore::retry::RetryResult::FatalErr(e),
137            }
138        })
139        .await
140        .context("fetching schema")?;
141
142    let actual_schema =
143        avro::parse_schema(&actual_schema, &[]).context("parsing actual avro schema")?;
144
145    if expected_schema != actual_schema {
146        bail!(
147            "schema did not match\nexpected:\n{:?}\n\nactual:\n{:?}",
148            expected_schema,
149            actual_schema,
150        );
151    }
152
153    if let Some(compatibility_level) = compatibility_level {
154        println!(
155            "Verifying compatibility level of subject {} in the schema registry...",
156            subject.quoted(),
157        );
158        let res = state.ccsr_client.get_subject_config(&subject).await?;
159        if compatibility_level != res.compatibility_level.to_string() {
160            bail!(
161                "compatibility level did not match\nexpected: {}\nactual: {}",
162                compatibility_level,
163                res.compatibility_level,
164            );
165        }
166    }
167    Ok(ControlFlow::Continue)
168}
169
170pub async fn run_wait(
171    mut cmd: BuiltinCommand,
172    state: &State,
173) -> Result<ControlFlow, anyhow::Error> {
174    // Parse arguments.
175    let topic = cmd.args.string("topic")?;
176    let subjects = [format!("{}-value", topic), format!("{}-key", topic)];
177
178    cmd.args.done()?;
179    cmd.assert_no_input()?;
180
181    // Run action.
182
183    let mut waiting_for_kafka = false;
184
185    println!(
186        "Waiting for schema for subjects {:?} to become available in the schema registry...",
187        subjects
188    );
189
190    let topic = &topic;
191    let subjects = &subjects;
192    Retry::default()
193        .initial_backoff(Duration::from_millis(50))
194        .factor(1.5)
195        .max_duration(state.timeout)
196        .retry_async_canceling(|_| async move {
197            if !waiting_for_kafka {
198                futures::future::try_join_all(subjects.iter().map(|subject| async move {
199                    state
200                        .ccsr_client
201                        // This doesn't take `ccsr_client` by `&mut self`, so it should be safe to cancel
202                        // by try-joining.
203                        .get_schema_by_subject(subject)
204                        .await
205                        .context("fetching schema")
206                        .and(Ok(()))
207                }))
208                .await?;
209
210                waiting_for_kafka = true;
211                println!("Waiting for Kafka topic {} to exist", topic);
212            }
213
214            if waiting_for_kafka {
215                super::kafka::check_topic_exists(topic, state).await?
216            }
217
218            Ok::<(), anyhow::Error>(())
219        })
220        .await?;
221
222    Ok(ControlFlow::Continue)
223}