Skip to main content

mz_testdrive/action/kafka/
verify_topic.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::str;
12use std::time::Duration;
13
14use anyhow::{Context, bail};
15use mz_ore::collections::CollectionExt;
16use mz_ore::retry::Retry;
17use mz_postgres_util::{Sql, query_one, sql};
18use rdkafka::admin::{AdminClient, AdminOptions, ResourceSpecifier};
19
20use crate::action::{ControlFlow, State};
21use crate::parser::BuiltinCommand;
22
23enum Topic {
24    FromSink(String),
25    Named(String),
26}
27
28async fn get_topic(sink: &str, topic_field: &str, state: &State) -> Result<String, anyhow::Error> {
29    let query = sql!(
30        "SELECT {} FROM mz_sinks JOIN mz_kafka_sinks \
31         ON mz_sinks.id = mz_kafka_sinks.id \
32         JOIN mz_schemas s ON s.id = mz_sinks.schema_id \
33         LEFT JOIN mz_databases d ON d.id = s.database_id \
34         WHERE d.name = $1 \
35         AND s.name = $2 \
36         AND mz_sinks.name = $3",
37        Sql::ident(topic_field)
38    );
39    let sink_fields: Vec<&str> = sink.split('.').collect();
40    let result = query_one(
41        &state.materialize.pgclient,
42        query,
43        &[&sink_fields[0], &sink_fields[1], &sink_fields[2]],
44    )
45    .await
46    .context("retrieving topic name")?
47    .get(topic_field);
48    Ok(result)
49}
50
51pub async fn run_verify_topic(
52    mut cmd: BuiltinCommand,
53    state: &State,
54) -> Result<ControlFlow, anyhow::Error> {
55    let source = match (cmd.args.opt_string("sink"), cmd.args.opt_string("topic")) {
56        (Some(sink), None) => Topic::FromSink(sink),
57        (None, Some(topic)) => Topic::Named(topic),
58        (Some(_), Some(_)) => {
59            bail!("Can't provide both `source` and `topic` to kafka-verify-topic")
60        }
61        (None, None) => bail!("kafka-verify-topic expects either `source` or `topic`"),
62    };
63
64    let topic: String = match &source {
65        Topic::FromSink(sink) => get_topic(sink, "topic", state).await?,
66        Topic::Named(name) => name.clone(),
67    };
68
69    let await_value_schema = cmd.args.opt_bool("await-value-schema")?.unwrap_or(false);
70    let await_key_schema = cmd.args.opt_bool("await-key-schema")?.unwrap_or(false);
71
72    let topic_config: Option<serde_json::Value> = cmd.args.opt_parse("topic-config")?;
73    let partition_count: Option<usize> = cmd.args.opt_parse("partition-count")?;
74    let replication_factor: Option<usize> = cmd.args.opt_parse("replication-factor")?;
75
76    cmd.args.done()?;
77
78    println!("Verifying Kafka topic {}", topic);
79
80    let mut config = state.kafka_config.clone();
81    config.set("enable.auto.offset.store", "false");
82
83    let client: AdminClient<_> = config.create().context("creating kafka consumer")?;
84
85    println!("waiting to create Kafka topic...");
86
87    Retry::default()
88        .max_duration(state.default_timeout)
89        .retry_async(|_state| async {
90            let meta = client
91                .inner()
92                .fetch_metadata(None, Duration::from_secs(1))?;
93
94            let topic = meta
95                .topics()
96                .iter()
97                .find(|t| t.name() == topic)
98                .ok_or_else(|| anyhow::anyhow!("topic not found"))?;
99
100            if let Some(partitions) = partition_count {
101                if topic.partitions().len() != partitions {
102                    bail!(
103                        "expected {} partitions but found {}",
104                        partitions,
105                        topic.partitions().len()
106                    );
107                }
108            }
109
110            if let Some(replication_factor) = replication_factor {
111                for partition in topic.partitions() {
112                    if partition.replicas().len() != replication_factor {
113                        bail!(
114                            "expected replication factor {} but found {}",
115                            replication_factor,
116                            partition.replicas().len()
117                        );
118                    }
119                }
120            }
121            Ok(())
122        })
123        .await?;
124
125    if let Some(topic_config) = topic_config {
126        println!("verifying topic configuration...");
127        Retry::default()
128            .max_duration(state.default_timeout)
129            .retry_async(|_state| async {
130                let config = client
131                    .describe_configs(
132                        &[ResourceSpecifier::Topic(&topic)],
133                        &AdminOptions::new().request_timeout(Some(Duration::from_secs(2))),
134                    )
135                    .await?
136                    .into_element()?;
137                let configs = config
138                    .entries
139                    .into_iter()
140                    .map(|e| (e.name, e.value))
141                    .collect::<BTreeMap<_, _>>();
142
143                for (key, value) in topic_config.as_object().expect("json object") {
144                    let value = match value {
145                        serde_json::Value::String(s) => s.as_str(),
146                        _ => bail!("expected string value for key {}", key),
147                    };
148                    if let Some(Some(actual)) = configs.get(key) {
149                        if actual != value {
150                            bail!("expected {}={} but found {}", key, value, actual);
151                        }
152                    } else {
153                        bail!("expected {}={} but not found", key, value);
154                    }
155                }
156
157                Ok(())
158            })
159            .await?;
160    }
161
162    let mut await_schemas = vec![];
163    if await_value_schema {
164        await_schemas.push(format!("{topic}-value"));
165    }
166    if await_key_schema {
167        await_schemas.push(format!("{topic}-key"));
168    }
169
170    for schema_subject in await_schemas {
171        println!("waiting for schema subject {}...", schema_subject);
172        Retry::default()
173            .max_duration(state.default_timeout)
174            .retry_async(|_state| async {
175                state
176                    .ccsr_client
177                    .list_subjects()
178                    .await?
179                    .iter()
180                    .find(|subject| subject == &&schema_subject)
181                    .ok_or_else(|| anyhow::anyhow!("schema not found"))
182                    .map(|_| ())
183            })
184            .await?;
185    }
186
187    Ok(ControlFlow::Continue)
188}