mz_testdrive/action/kafka/
add_partitions.rs
1use std::cmp;
11use std::time::Duration;
12
13use anyhow::{Context, bail};
14use mz_ore::collections::CollectionExt;
15use mz_ore::retry::Retry;
16use rdkafka::admin::NewPartitions;
17use rdkafka::producer::Producer;
18
19use crate::action::{ControlFlow, State};
20use crate::parser::BuiltinCommand;
21
22pub async fn run_add_partitions(
23 mut cmd: BuiltinCommand,
24 state: &mut State,
25) -> Result<ControlFlow, anyhow::Error> {
26 let topic_prefix = format!("testdrive-{}", cmd.args.string("topic")?);
27 let total_partitions = cmd.args.opt_parse("total-partitions")?.unwrap_or(1);
28 cmd.args.done()?;
29
30 let topic_name = format!("{}-{}", topic_prefix, state.seed);
31 println!(
32 "Raising partition count of Kafka topic {} to {}",
33 topic_name, total_partitions
34 );
35
36 match state.kafka_topics.get(&topic_name) {
37 Some(current_partitions) => {
38 if total_partitions <= *current_partitions {
39 bail!(
40 "new partition count {} is not greater than current partition count {}",
41 total_partitions,
42 *current_partitions
43 );
44 }
45 }
46 None => {
47 }
49 }
50
51 let partitions = NewPartitions::new(&topic_name, total_partitions);
52 let res = state
53 .kafka_admin
54 .create_partitions(&[partitions], &state.kafka_admin_opts)
55 .await
56 .context("creating partitions")?;
57 if res.len() != 1 {
58 bail!(
59 "kafka partition addition returned {} results, but exactly one result was expected",
60 res.len()
61 );
62 }
63 if let Err((_topic_name, e)) = res.into_element() {
64 return Err(e.into());
65 }
66
67 Retry::default()
68 .max_duration(state.default_timeout)
69 .retry_async_canceling(|_| async {
70 let metadata = state.kafka_producer.client().fetch_metadata(
71 Some(&topic_name),
72 Some(cmp::max(state.default_timeout, Duration::from_secs(1))),
73 )?;
74 if metadata.topics().len() != 1 {
75 bail!("metadata fetch returned no topics");
76 }
77 let topic = metadata.topics().into_element();
78 if topic.partitions().len() != total_partitions {
79 bail!(
80 "topic {} has {} partitions when exactly {} was expected",
81 topic_name,
82 topic.partitions().len(),
83 total_partitions,
84 );
85 }
86 Ok(())
87 })
88 .await?;
89
90 state.kafka_topics.insert(topic_name, total_partitions);
91 Ok(ControlFlow::Continue)
92}