mz_test_util/kafka/
kafka_client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Kafka topic management
11
12use std::time::Duration;
13
14use anyhow::Context;
15use mz_kafka_util::admin::EnsureTopicConfig;
16use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
17use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
18use rdkafka::error::KafkaError;
19use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord};
20
21pub struct KafkaClient {
22    producer: FutureProducer<MzClientContext>,
23    kafka_url: String,
24}
25
26impl KafkaClient {
27    pub fn new(
28        kafka_url: &str,
29        group_id: &str,
30        configs: &[(&str, &str)],
31    ) -> Result<KafkaClient, anyhow::Error> {
32        let mut config = create_new_client_config_simple();
33        config.set("bootstrap.servers", kafka_url);
34        config.set("group.id", group_id);
35        for (key, val) in configs {
36            config.set(*key, *val);
37        }
38
39        let producer = config.create_with_context(MzClientContext::default())?;
40
41        Ok(KafkaClient {
42            producer,
43            kafka_url: kafka_url.to_string(),
44        })
45    }
46
47    pub async fn create_topic(
48        &self,
49        topic_name: &str,
50        partitions: i32,
51        replication: i32,
52        configs: &[(&str, &str)],
53        timeout: Option<Duration>,
54    ) -> Result<(), anyhow::Error> {
55        let mut config = create_new_client_config_simple();
56        config.set("bootstrap.servers", &self.kafka_url);
57
58        let client = config
59            .create::<AdminClient<_>>()
60            .expect("creating admin kafka client failed");
61
62        let admin_opts = AdminOptions::new().request_timeout(timeout);
63
64        let mut topic = NewTopic::new(topic_name, partitions, TopicReplication::Fixed(replication));
65        for (key, val) in configs {
66            topic = topic.set(key, val);
67        }
68
69        mz_kafka_util::admin::ensure_topic(&client, &admin_opts, &topic, EnsureTopicConfig::Check)
70            .await
71            .context(format!("creating Kafka topic: {}", topic_name))?;
72
73        Ok(())
74    }
75
76    pub fn send(&self, topic_name: &str, message: &[u8]) -> Result<DeliveryFuture, KafkaError> {
77        let record: FutureRecord<&Vec<u8>, _> = FutureRecord::to(topic_name)
78            .payload(message)
79            .timestamp(chrono::Utc::now().timestamp_millis());
80        self.producer.send_result(record).map_err(|(e, _message)| e)
81    }
82
83    pub fn send_key_value(
84        &self,
85        topic_name: &str,
86        key: &[u8],
87        message: Option<Vec<u8>>,
88    ) -> Result<DeliveryFuture, KafkaError> {
89        let mut record: FutureRecord<_, _> = FutureRecord::to(topic_name)
90            .key(key)
91            .timestamp(chrono::Utc::now().timestamp_millis());
92        if let Some(message) = &message {
93            record = record.payload(message);
94        }
95        self.producer.send_result(record).map_err(|(e, _message)| e)
96    }
97}