mz_testdrive/action/
schema_registry.rs1use std::time::Duration;
11
12use anyhow::{Context, bail};
13use mz_ccsr::{SchemaReference, SchemaType};
14use mz_ore::retry::Retry;
15use mz_ore::str::StrExt;
16use serde_json::Value as JsonValue;
17
18use crate::action::{ControlFlow, State};
19use crate::format::avro;
20use crate::parser::BuiltinCommand;
21
22fn extract_avro_fullname(schema_json: &str) -> anyhow::Result<String> {
25 let value: JsonValue =
26 serde_json::from_str(schema_json).context("parsing schema JSON to extract fullname")?;
27
28 let name = value
29 .get("name")
30 .and_then(|v| v.as_str())
31 .ok_or_else(|| anyhow::anyhow!("schema missing 'name' field"))?;
32
33 let namespace = value.get("namespace").and_then(|v| v.as_str());
34
35 if name.contains('.') {
37 Ok(name.to_string())
38 } else if let Some(ns) = namespace {
39 Ok(format!("{}.{}", ns, name))
40 } else {
41 Ok(name.to_string())
42 }
43}
44
45pub async fn run_publish(
46 mut cmd: BuiltinCommand,
47 state: &State,
48) -> Result<ControlFlow, anyhow::Error> {
49 let subject = cmd.args.string("subject")?;
51 let schema_type = match cmd.args.string("schema-type")?.as_str() {
52 "avro" => SchemaType::Avro,
53 "json" => SchemaType::Json,
54 "protobuf" => SchemaType::Protobuf,
55 s => bail!("unknown schema type: {}", s),
56 };
57 let references_in = match cmd.args.opt_string("references") {
58 None => vec![],
59 Some(s) => s.split(',').map(|s| s.to_string()).collect(),
60 };
61 cmd.args.done()?;
62 let schema = cmd.input.join("\n");
63
64 println!(
66 "Publishing schema for subject {} to the schema registry...",
67 subject.quoted(),
68 );
69 let mut references = vec![];
70 for reference in references_in {
71 let subject = state
72 .ccsr_client
73 .get_subject_latest(&reference)
74 .await
75 .with_context(|| format!("fetching reference {}", reference))?;
76 let type_name = match schema_type {
77 SchemaType::Avro => extract_avro_fullname(&subject.schema.raw).with_context(|| {
81 format!("extracting type name from reference schema {}", reference)
82 })?,
83 SchemaType::Protobuf | SchemaType::Json => subject.name,
84 };
85
86 references.push(SchemaReference {
87 name: type_name,
88 subject: reference.to_string(),
89 version: subject.version,
90 })
91 }
92 state
93 .ccsr_client
94 .publish_schema(&subject, &schema, schema_type, &references)
95 .await
96 .context("publishing schema")?;
97 Ok(ControlFlow::Continue)
98}
99
100pub async fn run_verify(
101 mut cmd: BuiltinCommand,
102 state: &State,
103) -> Result<ControlFlow, anyhow::Error> {
104 let subject = cmd.args.string("subject")?;
106 match cmd.args.string("schema-type")?.as_str() {
107 "avro" => (),
108 f => bail!("unknown format: {}", f),
109 };
110 let compatibility_level = cmd.args.opt_string("compatibility-level");
111 cmd.args.done()?;
112 let expected_schema = match &cmd.input[..] {
113 [expected_schema] => {
114 avro::parse_schema(expected_schema, &[]).context("parsing expected avro schema")?
115 }
116 _ => bail!("unable to read expected schema input"),
117 };
118
119 println!(
121 "Verifying contents of latest schema for subject {} in the schema registry...",
122 subject.quoted(),
123 );
124
125 let actual_schema = mz_ore::retry::Retry::default()
128 .max_duration(state.default_timeout)
129 .retry_async(|_| async {
130 match state.ccsr_client.get_schema_by_subject(&subject).await {
131 Ok(s) => mz_ore::retry::RetryResult::Ok(s.raw),
132 Err(
133 e @ mz_ccsr::GetBySubjectError::SubjectNotFound
134 | e @ mz_ccsr::GetBySubjectError::VersionNotFound(_),
135 ) => mz_ore::retry::RetryResult::RetryableErr(e),
136 Err(e) => mz_ore::retry::RetryResult::FatalErr(e),
137 }
138 })
139 .await
140 .context("fetching schema")?;
141
142 let actual_schema =
143 avro::parse_schema(&actual_schema, &[]).context("parsing actual avro schema")?;
144
145 if expected_schema != actual_schema {
146 bail!(
147 "schema did not match\nexpected:\n{:?}\n\nactual:\n{:?}",
148 expected_schema,
149 actual_schema,
150 );
151 }
152
153 if let Some(compatibility_level) = compatibility_level {
154 println!(
155 "Verifying compatibility level of subject {} in the schema registry...",
156 subject.quoted(),
157 );
158 let res = state.ccsr_client.get_subject_config(&subject).await?;
159 if compatibility_level != res.compatibility_level.to_string() {
160 bail!(
161 "compatibility level did not match\nexpected: {}\nactual: {}",
162 compatibility_level,
163 res.compatibility_level,
164 );
165 }
166 }
167 Ok(ControlFlow::Continue)
168}
169
170pub async fn run_wait(
171 mut cmd: BuiltinCommand,
172 state: &State,
173) -> Result<ControlFlow, anyhow::Error> {
174 let topic = cmd.args.string("topic")?;
176 let subjects = [format!("{}-value", topic), format!("{}-key", topic)];
177
178 cmd.args.done()?;
179 cmd.assert_no_input()?;
180
181 let mut waiting_for_kafka = false;
184
185 println!(
186 "Waiting for schema for subjects {:?} to become available in the schema registry...",
187 subjects
188 );
189
190 let topic = &topic;
191 let subjects = &subjects;
192 Retry::default()
193 .initial_backoff(Duration::from_millis(50))
194 .factor(1.5)
195 .max_duration(state.timeout)
196 .retry_async_canceling(|_| async move {
197 if !waiting_for_kafka {
198 futures::future::try_join_all(subjects.iter().map(|subject| async move {
199 state
200 .ccsr_client
201 .get_schema_by_subject(subject)
204 .await
205 .context("fetching schema")
206 .and(Ok(()))
207 }))
208 .await?;
209
210 waiting_for_kafka = true;
211 println!("Waiting for Kafka topic {} to exist", topic);
212 }
213
214 if waiting_for_kafka {
215 super::kafka::check_topic_exists(topic, state).await?
216 }
217
218 Ok::<(), anyhow::Error>(())
219 })
220 .await?;
221
222 Ok(ControlFlow::Continue)
223}