mz_testdrive/action/kafka/
verify_topic.rs1use std::collections::BTreeMap;
11use std::str;
12use std::time::Duration;
13
14use anyhow::{Context, bail};
15use mz_ore::collections::CollectionExt;
16use mz_ore::retry::Retry;
17use mz_postgres_util::{Sql, query_one, sql};
18use rdkafka::admin::{AdminClient, AdminOptions, ResourceSpecifier};
19
20use crate::action::{ControlFlow, State};
21use crate::parser::BuiltinCommand;
22
23enum Topic {
24 FromSink(String),
25 Named(String),
26}
27
28async fn get_topic(sink: &str, topic_field: &str, state: &State) -> Result<String, anyhow::Error> {
29 let query = sql!(
30 "SELECT {} FROM mz_sinks JOIN mz_kafka_sinks \
31 ON mz_sinks.id = mz_kafka_sinks.id \
32 JOIN mz_schemas s ON s.id = mz_sinks.schema_id \
33 LEFT JOIN mz_databases d ON d.id = s.database_id \
34 WHERE d.name = $1 \
35 AND s.name = $2 \
36 AND mz_sinks.name = $3",
37 Sql::ident(topic_field)
38 );
39 let sink_fields: Vec<&str> = sink.split('.').collect();
40 let result = query_one(
41 &state.materialize.pgclient,
42 query,
43 &[&sink_fields[0], &sink_fields[1], &sink_fields[2]],
44 )
45 .await
46 .context("retrieving topic name")?
47 .get(topic_field);
48 Ok(result)
49}
50
51pub async fn run_verify_topic(
52 mut cmd: BuiltinCommand,
53 state: &State,
54) -> Result<ControlFlow, anyhow::Error> {
55 let source = match (cmd.args.opt_string("sink"), cmd.args.opt_string("topic")) {
56 (Some(sink), None) => Topic::FromSink(sink),
57 (None, Some(topic)) => Topic::Named(topic),
58 (Some(_), Some(_)) => {
59 bail!("Can't provide both `source` and `topic` to kafka-verify-topic")
60 }
61 (None, None) => bail!("kafka-verify-topic expects either `source` or `topic`"),
62 };
63
64 let topic: String = match &source {
65 Topic::FromSink(sink) => get_topic(sink, "topic", state).await?,
66 Topic::Named(name) => name.clone(),
67 };
68
69 let await_value_schema = cmd.args.opt_bool("await-value-schema")?.unwrap_or(false);
70 let await_key_schema = cmd.args.opt_bool("await-key-schema")?.unwrap_or(false);
71
72 let topic_config: Option<serde_json::Value> = cmd.args.opt_parse("topic-config")?;
73 let partition_count: Option<usize> = cmd.args.opt_parse("partition-count")?;
74 let replication_factor: Option<usize> = cmd.args.opt_parse("replication-factor")?;
75
76 cmd.args.done()?;
77
78 println!("Verifying Kafka topic {}", topic);
79
80 let mut config = state.kafka_config.clone();
81 config.set("enable.auto.offset.store", "false");
82
83 let client: AdminClient<_> = config.create().context("creating kafka consumer")?;
84
85 println!("waiting to create Kafka topic...");
86
87 Retry::default()
88 .max_duration(state.default_timeout)
89 .retry_async(|_state| async {
90 let meta = client
91 .inner()
92 .fetch_metadata(None, Duration::from_secs(1))?;
93
94 let topic = meta
95 .topics()
96 .iter()
97 .find(|t| t.name() == topic)
98 .ok_or_else(|| anyhow::anyhow!("topic not found"))?;
99
100 if let Some(partitions) = partition_count {
101 if topic.partitions().len() != partitions {
102 bail!(
103 "expected {} partitions but found {}",
104 partitions,
105 topic.partitions().len()
106 );
107 }
108 }
109
110 if let Some(replication_factor) = replication_factor {
111 for partition in topic.partitions() {
112 if partition.replicas().len() != replication_factor {
113 bail!(
114 "expected replication factor {} but found {}",
115 replication_factor,
116 partition.replicas().len()
117 );
118 }
119 }
120 }
121 Ok(())
122 })
123 .await?;
124
125 if let Some(topic_config) = topic_config {
126 println!("verifying topic configuration...");
127 Retry::default()
128 .max_duration(state.default_timeout)
129 .retry_async(|_state| async {
130 let config = client
131 .describe_configs(
132 &[ResourceSpecifier::Topic(&topic)],
133 &AdminOptions::new().request_timeout(Some(Duration::from_secs(2))),
134 )
135 .await?
136 .into_element()?;
137 let configs = config
138 .entries
139 .into_iter()
140 .map(|e| (e.name, e.value))
141 .collect::<BTreeMap<_, _>>();
142
143 for (key, value) in topic_config.as_object().expect("json object") {
144 let value = match value {
145 serde_json::Value::String(s) => s.as_str(),
146 _ => bail!("expected string value for key {}", key),
147 };
148 if let Some(Some(actual)) = configs.get(key) {
149 if actual != value {
150 bail!("expected {}={} but found {}", key, value, actual);
151 }
152 } else {
153 bail!("expected {}={} but not found", key, value);
154 }
155 }
156
157 Ok(())
158 })
159 .await?;
160 }
161
162 let mut await_schemas = vec![];
163 if await_value_schema {
164 await_schemas.push(format!("{topic}-value"));
165 }
166 if await_key_schema {
167 await_schemas.push(format!("{topic}-key"));
168 }
169
170 for schema_subject in await_schemas {
171 println!("waiting for schema subject {}...", schema_subject);
172 Retry::default()
173 .max_duration(state.default_timeout)
174 .retry_async(|_state| async {
175 state
176 .ccsr_client
177 .list_subjects()
178 .await?
179 .iter()
180 .find(|subject| subject == &&schema_subject)
181 .ok_or_else(|| anyhow::anyhow!("schema not found"))
182 .map(|_| ())
183 })
184 .await?;
185 }
186
187 Ok(ControlFlow::Continue)
188}