mz_testdrive/action/kafka/
verify_commit.rs
1use std::time::Duration;
11
12use anyhow::{Context, bail};
13use mz_ore::retry::Retry;
14use mz_ore::str::StrExt;
15use rdkafka::consumer::{Consumer, StreamConsumer};
16use rdkafka::topic_partition_list::{Offset, TopicPartitionList};
17
18use crate::action::{ControlFlow, State};
19use crate::parser::BuiltinCommand;
20
21pub async fn run_verify_commit(
22 mut cmd: BuiltinCommand,
23 state: &State,
24) -> Result<ControlFlow, anyhow::Error> {
25 let consumer_group_id = cmd.args.string("consumer-group-id")?;
26 let topic = cmd.args.string("topic")?;
27 let partition = cmd.args.parse("partition")?;
28 cmd.args.done()?;
29
30 let topic = format!("testdrive-{}-{}", topic, state.seed);
31 let expected_offset = match &cmd.input[..] {
32 [line] => Offset::Offset(line.parse().context("parsing expected offset")?),
33 _ => bail!("kafka-verify-commit requires a single expected offset as input"),
34 };
35
36 println!(
37 "Verifying committed Kafka offset for topic {} and consumer group {}...",
38 topic.quoted(),
39 consumer_group_id.quoted(),
40 );
41
42 let mut config = state.kafka_config.clone();
43 config.set("group.id", &consumer_group_id);
44 Retry::default()
45 .max_duration(state.default_timeout)
46 .retry_async_canceling(|_| async {
47 let config = config.clone();
48 let mut tpl = TopicPartitionList::new();
49 tpl.add_partition(&topic, partition);
50 let committed_tpl = mz_ore::task::spawn_blocking(
51 || "kakfa_committed_offsets".to_string(),
52 move || {
53 let consumer: StreamConsumer =
54 config.create().context("creating kafka consumer")?;
55
56 Ok::<_, anyhow::Error>(
57 consumer.committed_offsets(tpl, Duration::from_secs(10))?,
58 )
59 },
60 )
61 .await
62 .unwrap()?;
63
64 let found_offset = committed_tpl.elements_for_topic(&topic)[0].offset();
65 if found_offset != expected_offset {
66 bail!(
67 "found committed offset `{:?}` does not match expected offset `{:?}`",
68 found_offset,
69 expected_offset
70 );
71 }
72 Ok(())
73 })
74 .await?;
75 Ok(ControlFlow::Continue)
76}