mz_testdrive/action/kafka/
verify_topic.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::str;
12use std::time::Duration;
13
14use anyhow::{Context, bail};
15use mz_ore::collections::CollectionExt;
16use mz_ore::retry::Retry;
17use rdkafka::admin::{AdminClient, AdminOptions, ResourceSpecifier};
18
19use crate::action::{ControlFlow, State};
20use crate::parser::BuiltinCommand;
21
22enum Topic {
23    FromSink(String),
24    Named(String),
25}
26
27async fn get_topic(sink: &str, topic_field: &str, state: &State) -> Result<String, anyhow::Error> {
28    let query = format!(
29        "SELECT {} FROM mz_sinks JOIN mz_kafka_sinks \
30        ON mz_sinks.id = mz_kafka_sinks.id \
31        JOIN mz_schemas s ON s.id = mz_sinks.schema_id \
32        LEFT JOIN mz_databases d ON d.id = s.database_id \
33        WHERE d.name = $1 \
34        AND s.name = $2 \
35        AND mz_sinks.name = $3",
36        topic_field
37    );
38    let sink_fields: Vec<&str> = sink.split('.').collect();
39    let result = state
40        .materialize
41        .pgclient
42        .query_one(
43            query.as_str(),
44            &[&sink_fields[0], &sink_fields[1], &sink_fields[2]],
45        )
46        .await
47        .context("retrieving topic name")?
48        .get(topic_field);
49    Ok(result)
50}
51
52pub async fn run_verify_topic(
53    mut cmd: BuiltinCommand,
54    state: &State,
55) -> Result<ControlFlow, anyhow::Error> {
56    let source = match (cmd.args.opt_string("sink"), cmd.args.opt_string("topic")) {
57        (Some(sink), None) => Topic::FromSink(sink),
58        (None, Some(topic)) => Topic::Named(topic),
59        (Some(_), Some(_)) => {
60            bail!("Can't provide both `source` and `topic` to kafka-verify-topic")
61        }
62        (None, None) => bail!("kafka-verify-topic expects either `source` or `topic`"),
63    };
64
65    let topic: String = match &source {
66        Topic::FromSink(sink) => get_topic(sink, "topic", state).await?,
67        Topic::Named(name) => name.clone(),
68    };
69
70    let await_value_schema = cmd.args.opt_bool("await-value-schema")?.unwrap_or(false);
71    let await_key_schema = cmd.args.opt_bool("await-key-schema")?.unwrap_or(false);
72
73    let topic_config: Option<serde_json::Value> = cmd.args.opt_parse("topic-config")?;
74    let partition_count: Option<usize> = cmd.args.opt_parse("partition-count")?;
75    let replication_factor: Option<usize> = cmd.args.opt_parse("replication-factor")?;
76
77    cmd.args.done()?;
78
79    println!("Verifying Kafka topic {}", topic);
80
81    let mut config = state.kafka_config.clone();
82    config.set("enable.auto.offset.store", "false");
83
84    let client: AdminClient<_> = config.create().context("creating kafka consumer")?;
85
86    println!("waiting to create Kafka topic...");
87
88    Retry::default()
89        .max_duration(state.default_timeout)
90        .retry_async(|_state| async {
91            let meta = client
92                .inner()
93                .fetch_metadata(None, Duration::from_secs(1))?;
94
95            let topic = meta
96                .topics()
97                .iter()
98                .find(|t| t.name() == topic)
99                .ok_or_else(|| anyhow::anyhow!("topic not found"))?;
100
101            if let Some(partitions) = partition_count {
102                if topic.partitions().len() != partitions {
103                    bail!(
104                        "expected {} partitions but found {}",
105                        partitions,
106                        topic.partitions().len()
107                    );
108                }
109            }
110
111            if let Some(replication_factor) = replication_factor {
112                for partition in topic.partitions() {
113                    if partition.replicas().len() != replication_factor {
114                        bail!(
115                            "expected replication factor {} but found {}",
116                            replication_factor,
117                            partition.replicas().len()
118                        );
119                    }
120                }
121            }
122            Ok(())
123        })
124        .await?;
125
126    if let Some(topic_config) = topic_config {
127        println!("verifying topic configuration...");
128        Retry::default()
129            .max_duration(state.default_timeout)
130            .retry_async(|_state| async {
131                let config = client
132                    .describe_configs(
133                        &[ResourceSpecifier::Topic(&topic)],
134                        &AdminOptions::new().request_timeout(Some(Duration::from_secs(2))),
135                    )
136                    .await?
137                    .into_element()?;
138                let configs = config
139                    .entries
140                    .into_iter()
141                    .map(|e| (e.name, e.value))
142                    .collect::<BTreeMap<_, _>>();
143
144                for (key, value) in topic_config.as_object().expect("json object") {
145                    let value = match value {
146                        serde_json::Value::String(s) => s.as_str(),
147                        _ => bail!("expected string value for key {}", key),
148                    };
149                    if let Some(Some(actual)) = configs.get(key) {
150                        if actual != value {
151                            bail!("expected {}={} but found {}", key, value, actual);
152                        }
153                    } else {
154                        bail!("expected {}={} but not found", key, value);
155                    }
156                }
157
158                Ok(())
159            })
160            .await?;
161    }
162
163    let mut await_schemas = vec![];
164    if await_value_schema {
165        await_schemas.push(format!("{topic}-value"));
166    }
167    if await_key_schema {
168        await_schemas.push(format!("{topic}-key"));
169    }
170
171    for schema_subject in await_schemas {
172        println!("waiting for schema subject {}...", schema_subject);
173        Retry::default()
174            .max_duration(state.default_timeout)
175            .retry_async(|_state| async {
176                state
177                    .ccsr_client
178                    .list_subjects()
179                    .await?
180                    .iter()
181                    .find(|subject| subject == &&schema_subject)
182                    .ok_or_else(|| anyhow::anyhow!("schema not found"))
183                    .map(|_| ())
184            })
185            .await?;
186    }
187
188    Ok(ControlFlow::Continue)
189}