mz_testdrive/action/kafka/
wait_topic.rs1use std::time::Duration;
11
12use mz_ore::retry::Retry;
13
14use crate::action::{ControlFlow, State};
15use crate::parser::BuiltinCommand;
16
17pub async fn run_wait_topic(
18 mut cmd: BuiltinCommand,
19 state: &State,
20) -> Result<ControlFlow, anyhow::Error> {
21 let topic = cmd.args.string("topic")?;
22
23 println!("Waiting for Kafka topic {} to exist", topic);
24 Retry::default()
25 .initial_backoff(Duration::from_millis(50))
26 .factor(1.5)
27 .max_duration(state.timeout)
28 .retry_async_canceling(|_| async { check_topic_exists(&topic, &*state).await })
29 .await?;
30
31 Ok(ControlFlow::Continue)
32}
33
34pub(crate) async fn check_topic_exists(topic: &str, state: &State) -> Result<(), anyhow::Error> {
35 let metadata = state
36 .kafka_admin
37 .inner()
38 .fetch_metadata(None, Some(Duration::from_secs(10)))?;
44
45 let topic_exists = metadata.topics().iter().any(|t| t.name() == topic);
46 if !topic_exists {
47 Err(anyhow::anyhow!("topic {} doesn't exist", topic))
48 } else {
49 Ok(())
50 }
51}