mz_testdrive/action/
schema_registry.rs
1use std::time::Duration;
11
12use anyhow::{Context, bail};
13use mz_ccsr::{SchemaReference, SchemaType};
14use mz_ore::retry::Retry;
15use mz_ore::str::StrExt;
16
17use crate::action::{ControlFlow, State};
18use crate::format::avro;
19use crate::parser::BuiltinCommand;
20
21pub async fn run_publish(
22 mut cmd: BuiltinCommand,
23 state: &State,
24) -> Result<ControlFlow, anyhow::Error> {
25 let subject = cmd.args.string("subject")?;
27 let schema_type = match cmd.args.string("schema-type")?.as_str() {
28 "avro" => SchemaType::Avro,
29 "json" => SchemaType::Json,
30 "protobuf" => SchemaType::Protobuf,
31 s => bail!("unknown schema type: {}", s),
32 };
33 let references_in = match cmd.args.opt_string("references") {
34 None => vec![],
35 Some(s) => s.split(',').map(|s| s.to_string()).collect(),
36 };
37 cmd.args.done()?;
38 let schema = cmd.input.join("\n");
39
40 println!(
42 "Publishing schema for subject {} to the schema registry...",
43 subject.quoted(),
44 );
45 let mut references = vec![];
46 for reference in references_in {
47 let subject = state
48 .ccsr_client
49 .get_subject_latest(&reference)
50 .await
51 .with_context(|| format!("fetching reference {}", reference))?;
52 references.push(SchemaReference {
53 name: subject.name,
54 subject: reference.to_string(),
55 version: subject.version,
56 })
57 }
58 state
59 .ccsr_client
60 .publish_schema(&subject, &schema, schema_type, &references)
61 .await
62 .context("publishing schema")?;
63 Ok(ControlFlow::Continue)
64}
65
66pub async fn run_verify(
67 mut cmd: BuiltinCommand,
68 state: &State,
69) -> Result<ControlFlow, anyhow::Error> {
70 let subject = cmd.args.string("subject")?;
72 match cmd.args.string("schema-type")?.as_str() {
73 "avro" => (),
74 f => bail!("unknown format: {}", f),
75 };
76 let compatibility_level = cmd.args.opt_string("compatibility-level");
77 cmd.args.done()?;
78 let expected_schema = match &cmd.input[..] {
79 [expected_schema] => {
80 avro::parse_schema(expected_schema).context("parsing expected avro schema")?
81 }
82 _ => bail!("unable to read expected schema input"),
83 };
84
85 println!(
87 "Verifying contents of latest schema for subject {} in the schema registry...",
88 subject.quoted(),
89 );
90
91 let actual_schema = mz_ore::retry::Retry::default()
94 .max_duration(state.default_timeout)
95 .retry_async(|_| async {
96 match state.ccsr_client.get_schema_by_subject(&subject).await {
97 Ok(s) => mz_ore::retry::RetryResult::Ok(s.raw),
98 Err(
99 e @ mz_ccsr::GetBySubjectError::SubjectNotFound
100 | e @ mz_ccsr::GetBySubjectError::VersionNotFound(_),
101 ) => mz_ore::retry::RetryResult::RetryableErr(e),
102 Err(e) => mz_ore::retry::RetryResult::FatalErr(e),
103 }
104 })
105 .await
106 .context("fetching schema")?;
107
108 let actual_schema = avro::parse_schema(&actual_schema).context("parsing actual avro schema")?;
109 if expected_schema != actual_schema {
110 bail!(
111 "schema did not match\nexpected:\n{:?}\n\nactual:\n{:?}",
112 expected_schema,
113 actual_schema,
114 );
115 }
116
117 if let Some(compatibility_level) = compatibility_level {
118 println!(
119 "Verifying compatibility level of subject {} in the schema registry...",
120 subject.quoted(),
121 );
122 let res = state.ccsr_client.get_subject_config(&subject).await?;
123 if compatibility_level != res.compatibility_level.to_string() {
124 bail!(
125 "compatibility level did not match\nexpected: {}\nactual: {}",
126 compatibility_level,
127 res.compatibility_level,
128 );
129 }
130 }
131 Ok(ControlFlow::Continue)
132}
133
134pub async fn run_wait(
135 mut cmd: BuiltinCommand,
136 state: &State,
137) -> Result<ControlFlow, anyhow::Error> {
138 let topic = cmd.args.string("topic")?;
140 let subjects = [format!("{}-value", topic), format!("{}-key", topic)];
141
142 cmd.args.done()?;
143 cmd.assert_no_input()?;
144
145 let mut waiting_for_kafka = false;
148
149 println!(
150 "Waiting for schema for subjects {:?} to become available in the schema registry...",
151 subjects
152 );
153
154 let topic = &topic;
155 let subjects = &subjects;
156 Retry::default()
157 .initial_backoff(Duration::from_millis(50))
158 .factor(1.5)
159 .max_duration(state.timeout)
160 .retry_async_canceling(|_| async move {
161 if !waiting_for_kafka {
162 futures::future::try_join_all(subjects.iter().map(|subject| async move {
163 state
164 .ccsr_client
165 .get_schema_by_subject(subject)
168 .await
169 .context("fetching schema")
170 .and(Ok(()))
171 }))
172 .await?;
173
174 waiting_for_kafka = true;
175 println!("Waiting for Kafka topic {} to exist", topic);
176 }
177
178 if waiting_for_kafka {
179 super::kafka::check_topic_exists(topic, state).await?
180 }
181
182 Ok::<(), anyhow::Error>(())
183 })
184 .await?;
185
186 Ok(ControlFlow::Continue)
187}