mz_testdrive/action/kafka/
verify_commit.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use anyhow::{Context, bail};
13use mz_ore::retry::Retry;
14use mz_ore::str::StrExt;
15use rdkafka::consumer::{Consumer, StreamConsumer};
16use rdkafka::topic_partition_list::{Offset, TopicPartitionList};
17
18use crate::action::{ControlFlow, State};
19use crate::parser::BuiltinCommand;
20
21pub async fn run_verify_commit(
22    mut cmd: BuiltinCommand,
23    state: &State,
24) -> Result<ControlFlow, anyhow::Error> {
25    let consumer_group_id = cmd.args.string("consumer-group-id")?;
26    let topic = cmd.args.string("topic")?;
27    let partition = cmd.args.parse("partition")?;
28    cmd.args.done()?;
29
30    let topic = format!("testdrive-{}-{}", topic, state.seed);
31    let expected_offset = match &cmd.input[..] {
32        [line] => Offset::Offset(line.parse().context("parsing expected offset")?),
33        _ => bail!("kafka-verify-commit requires a single expected offset as input"),
34    };
35
36    println!(
37        "Verifying committed Kafka offset for topic {} and consumer group {}...",
38        topic.quoted(),
39        consumer_group_id.quoted(),
40    );
41
42    let mut config = state.kafka_config.clone();
43    config.set("group.id", &consumer_group_id);
44    Retry::default()
45        .max_duration(state.default_timeout)
46        .retry_async_canceling(|_| async {
47            let config = config.clone();
48            let mut tpl = TopicPartitionList::new();
49            tpl.add_partition(&topic, partition);
50            let committed_tpl = mz_ore::task::spawn_blocking(
51                || "kakfa_committed_offsets".to_string(),
52                move || {
53                    let consumer: StreamConsumer =
54                        config.create().context("creating kafka consumer")?;
55
56                    Ok::<_, anyhow::Error>(
57                        consumer.committed_offsets(tpl, Duration::from_secs(10))?,
58                    )
59                },
60            )
61            .await
62            .unwrap()?;
63
64            let found_offset = committed_tpl.elements_for_topic(&topic)[0].offset();
65            if found_offset != expected_offset {
66                bail!(
67                    "found committed offset `{:?}` does not match expected offset `{:?}`",
68                    found_offset,
69                    expected_offset
70                );
71            }
72            Ok(())
73        })
74        .await?;
75    Ok(ControlFlow::Continue)
76}