mz_testdrive/action/kafka/
delete_records.rs1use anyhow::{Context, bail};
11use mz_ore::collections::CollectionExt;
12use rdkafka::{Offset, TopicPartitionList};
13
14use crate::action::{ControlFlow, State};
15use crate::parser::BuiltinCommand;
16
17pub async fn run_delete_records(
18 mut cmd: BuiltinCommand,
19 state: &State,
20) -> Result<ControlFlow, anyhow::Error> {
21 let topic_prefix = format!("testdrive-{}", cmd.args.string("topic")?);
22 let partition = cmd.args.parse("partition")?;
23 let offset = cmd.args.parse("offset")?;
24 cmd.args.done()?;
25
26 let topic_name = format!("{}-{}", topic_prefix, state.seed);
27 println!(
28 "Deleting records up to offset {offset} from partition {partition} of topic {topic_name}",
29 );
30
31 let mut tpl = TopicPartitionList::new();
32 tpl.add_partition_offset(&topic_name, partition, Offset::Offset(offset))
33 .context("internal error: adding partition to delete records topic partition list")?;
34 let res = state
35 .kafka_admin
36 .delete_records(&tpl, &state.kafka_admin_opts)
37 .await
38 .context("deleting records")?;
39 if res.count() != 1 {
40 bail!(
41 "kafka record deletion returned {} results, but exactly one result was expected",
42 res.count()
43 );
44 }
45 res.elements().into_element().error()?;
46
47 Ok(ControlFlow::Continue)
48}