1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910use anyhow::{Context, bail};
11use mz_ore::collections::CollectionExt;
12use rdkafka::{Offset, TopicPartitionList};
1314use crate::action::{ControlFlow, State};
15use crate::parser::BuiltinCommand;
1617pub async fn run_delete_records(
18mut cmd: BuiltinCommand,
19 state: &State,
20) -> Result<ControlFlow, anyhow::Error> {
21let topic_prefix = format!("testdrive-{}", cmd.args.string("topic")?);
22let partition = cmd.args.parse("partition")?;
23let offset = cmd.args.parse("offset")?;
24 cmd.args.done()?;
2526let topic_name = format!("{}-{}", topic_prefix, state.seed);
27println!(
28"Deleting records up to offset {offset} from partition {partition} of topic {topic_name}",
29 );
3031let mut tpl = TopicPartitionList::new();
32 tpl.add_partition_offset(&topic_name, partition, Offset::Offset(offset))
33 .context("internal error: adding partition to delete records topic partition list")?;
34let res = state
35 .kafka_admin
36 .delete_records(&tpl, &state.kafka_admin_opts)
37 .await
38.context("deleting records")?;
39if res.count() != 1 {
40bail!(
41"kafka record deletion returned {} results, but exactly one result was expected",
42 res.count()
43 );
44 }
45 res.elements().into_element().error()?;
4647Ok(ControlFlow::Continue)
48}