mz_testdrive/action/kafka/
add_partitions.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cmp;
11use std::time::Duration;
12
13use anyhow::{Context, bail};
14use mz_ore::collections::CollectionExt;
15use mz_ore::retry::Retry;
16use rdkafka::admin::NewPartitions;
17use rdkafka::producer::Producer;
18
19use crate::action::{ControlFlow, State};
20use crate::parser::BuiltinCommand;
21
22pub async fn run_add_partitions(
23    mut cmd: BuiltinCommand,
24    state: &mut State,
25) -> Result<ControlFlow, anyhow::Error> {
26    let topic_prefix = format!("testdrive-{}", cmd.args.string("topic")?);
27    let total_partitions = cmd.args.opt_parse("total-partitions")?.unwrap_or(1);
28    cmd.args.done()?;
29
30    let topic_name = format!("{}-{}", topic_prefix, state.seed);
31    println!(
32        "Raising partition count of Kafka topic {} to {}",
33        topic_name, total_partitions
34    );
35
36    match state.kafka_topics.get(&topic_name) {
37        Some(current_partitions) => {
38            if total_partitions <= *current_partitions {
39                bail!(
40                    "new partition count {} is not greater than current partition count {}",
41                    total_partitions,
42                    *current_partitions
43                );
44            }
45        }
46        None => {
47            // ignore that the topic was not created by this instance of kafka-create-topic
48        }
49    }
50
51    let partitions = NewPartitions::new(&topic_name, total_partitions);
52    let res = state
53        .kafka_admin
54        .create_partitions(&[partitions], &state.kafka_admin_opts)
55        .await
56        .context("creating partitions")?;
57    if res.len() != 1 {
58        bail!(
59            "kafka partition addition returned {} results, but exactly one result was expected",
60            res.len()
61        );
62    }
63    if let Err((_topic_name, e)) = res.into_element() {
64        return Err(e.into());
65    }
66
67    Retry::default()
68        .max_duration(state.default_timeout)
69        .retry_async_canceling(|_| async {
70            let metadata = state.kafka_producer.client().fetch_metadata(
71                Some(&topic_name),
72                Some(cmp::max(state.default_timeout, Duration::from_secs(1))),
73            )?;
74            if metadata.topics().len() != 1 {
75                bail!("metadata fetch returned no topics");
76            }
77            let topic = metadata.topics().into_element();
78            if topic.partitions().len() != total_partitions {
79                bail!(
80                    "topic {} has {} partitions when exactly {} was expected",
81                    topic_name,
82                    topic.partitions().len(),
83                    total_partitions,
84                );
85            }
86            Ok(())
87        })
88        .await?;
89
90    state.kafka_topics.insert(topic_name, total_partitions);
91    Ok(ControlFlow::Continue)
92}