mz_testdrive/action/kafka/
verify_topic.rs
1use std::collections::BTreeMap;
11use std::str;
12use std::time::Duration;
13
14use anyhow::{Context, bail};
15use mz_ore::collections::CollectionExt;
16use mz_ore::retry::Retry;
17use rdkafka::admin::{AdminClient, AdminOptions, ResourceSpecifier};
18
19use crate::action::{ControlFlow, State};
20use crate::parser::BuiltinCommand;
21
22enum Topic {
23 FromSink(String),
24 Named(String),
25}
26
27async fn get_topic(sink: &str, topic_field: &str, state: &State) -> Result<String, anyhow::Error> {
28 let query = format!(
29 "SELECT {} FROM mz_sinks JOIN mz_kafka_sinks \
30 ON mz_sinks.id = mz_kafka_sinks.id \
31 JOIN mz_schemas s ON s.id = mz_sinks.schema_id \
32 LEFT JOIN mz_databases d ON d.id = s.database_id \
33 WHERE d.name = $1 \
34 AND s.name = $2 \
35 AND mz_sinks.name = $3",
36 topic_field
37 );
38 let sink_fields: Vec<&str> = sink.split('.').collect();
39 let result = state
40 .materialize
41 .pgclient
42 .query_one(
43 query.as_str(),
44 &[&sink_fields[0], &sink_fields[1], &sink_fields[2]],
45 )
46 .await
47 .context("retrieving topic name")?
48 .get(topic_field);
49 Ok(result)
50}
51
52pub async fn run_verify_topic(
53 mut cmd: BuiltinCommand,
54 state: &State,
55) -> Result<ControlFlow, anyhow::Error> {
56 let source = match (cmd.args.opt_string("sink"), cmd.args.opt_string("topic")) {
57 (Some(sink), None) => Topic::FromSink(sink),
58 (None, Some(topic)) => Topic::Named(topic),
59 (Some(_), Some(_)) => {
60 bail!("Can't provide both `source` and `topic` to kafka-verify-topic")
61 }
62 (None, None) => bail!("kafka-verify-topic expects either `source` or `topic`"),
63 };
64
65 let topic: String = match &source {
66 Topic::FromSink(sink) => get_topic(sink, "topic", state).await?,
67 Topic::Named(name) => name.clone(),
68 };
69
70 let await_value_schema = cmd.args.opt_bool("await-value-schema")?.unwrap_or(false);
71 let await_key_schema = cmd.args.opt_bool("await-key-schema")?.unwrap_or(false);
72
73 let topic_config: Option<serde_json::Value> = cmd.args.opt_parse("topic-config")?;
74 let partition_count: Option<usize> = cmd.args.opt_parse("partition-count")?;
75 let replication_factor: Option<usize> = cmd.args.opt_parse("replication-factor")?;
76
77 cmd.args.done()?;
78
79 println!("Verifying Kafka topic {}", topic);
80
81 let mut config = state.kafka_config.clone();
82 config.set("enable.auto.offset.store", "false");
83
84 let client: AdminClient<_> = config.create().context("creating kafka consumer")?;
85
86 println!("waiting to create Kafka topic...");
87
88 Retry::default()
89 .max_duration(state.default_timeout)
90 .retry_async(|_state| async {
91 let meta = client
92 .inner()
93 .fetch_metadata(None, Duration::from_secs(1))?;
94
95 let topic = meta
96 .topics()
97 .iter()
98 .find(|t| t.name() == topic)
99 .ok_or_else(|| anyhow::anyhow!("topic not found"))?;
100
101 if let Some(partitions) = partition_count {
102 if topic.partitions().len() != partitions {
103 bail!(
104 "expected {} partitions but found {}",
105 partitions,
106 topic.partitions().len()
107 );
108 }
109 }
110
111 if let Some(replication_factor) = replication_factor {
112 for partition in topic.partitions() {
113 if partition.replicas().len() != replication_factor {
114 bail!(
115 "expected replication factor {} but found {}",
116 replication_factor,
117 partition.replicas().len()
118 );
119 }
120 }
121 }
122 Ok(())
123 })
124 .await?;
125
126 if let Some(topic_config) = topic_config {
127 println!("verifying topic configuration...");
128 Retry::default()
129 .max_duration(state.default_timeout)
130 .retry_async(|_state| async {
131 let config = client
132 .describe_configs(
133 &[ResourceSpecifier::Topic(&topic)],
134 &AdminOptions::new().request_timeout(Some(Duration::from_secs(2))),
135 )
136 .await?
137 .into_element()?;
138 let configs = config
139 .entries
140 .into_iter()
141 .map(|e| (e.name, e.value))
142 .collect::<BTreeMap<_, _>>();
143
144 for (key, value) in topic_config.as_object().expect("json object") {
145 let value = match value {
146 serde_json::Value::String(s) => s.as_str(),
147 _ => bail!("expected string value for key {}", key),
148 };
149 if let Some(Some(actual)) = configs.get(key) {
150 if actual != value {
151 bail!("expected {}={} but found {}", key, value, actual);
152 }
153 } else {
154 bail!("expected {}={} but not found", key, value);
155 }
156 }
157
158 Ok(())
159 })
160 .await?;
161 }
162
163 let mut await_schemas = vec![];
164 if await_value_schema {
165 await_schemas.push(format!("{topic}-value"));
166 }
167 if await_key_schema {
168 await_schemas.push(format!("{topic}-key"));
169 }
170
171 for schema_subject in await_schemas {
172 println!("waiting for schema subject {}...", schema_subject);
173 Retry::default()
174 .max_duration(state.default_timeout)
175 .retry_async(|_state| async {
176 state
177 .ccsr_client
178 .list_subjects()
179 .await?
180 .iter()
181 .find(|subject| subject == &&schema_subject)
182 .ok_or_else(|| anyhow::anyhow!("schema not found"))
183 .map(|_| ())
184 })
185 .await?;
186 }
187
188 Ok(ControlFlow::Continue)
189}