mz_test_util/kafka/
kafka_client.rs
1use std::time::Duration;
13
14use anyhow::Context;
15use mz_kafka_util::admin::EnsureTopicConfig;
16use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
17use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
18use rdkafka::error::KafkaError;
19use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord};
20
21pub struct KafkaClient {
22 producer: FutureProducer<MzClientContext>,
23 kafka_url: String,
24}
25
26impl KafkaClient {
27 pub fn new(
28 kafka_url: &str,
29 group_id: &str,
30 configs: &[(&str, &str)],
31 ) -> Result<KafkaClient, anyhow::Error> {
32 let mut config = create_new_client_config_simple();
33 config.set("bootstrap.servers", kafka_url);
34 config.set("group.id", group_id);
35 for (key, val) in configs {
36 config.set(*key, *val);
37 }
38
39 let producer = config.create_with_context(MzClientContext::default())?;
40
41 Ok(KafkaClient {
42 producer,
43 kafka_url: kafka_url.to_string(),
44 })
45 }
46
47 pub async fn create_topic(
48 &self,
49 topic_name: &str,
50 partitions: i32,
51 replication: i32,
52 configs: &[(&str, &str)],
53 timeout: Option<Duration>,
54 ) -> Result<(), anyhow::Error> {
55 let mut config = create_new_client_config_simple();
56 config.set("bootstrap.servers", &self.kafka_url);
57
58 let client = config
59 .create::<AdminClient<_>>()
60 .expect("creating admin kafka client failed");
61
62 let admin_opts = AdminOptions::new().request_timeout(timeout);
63
64 let mut topic = NewTopic::new(topic_name, partitions, TopicReplication::Fixed(replication));
65 for (key, val) in configs {
66 topic = topic.set(key, val);
67 }
68
69 mz_kafka_util::admin::ensure_topic(&client, &admin_opts, &topic, EnsureTopicConfig::Check)
70 .await
71 .context(format!("creating Kafka topic: {}", topic_name))?;
72
73 Ok(())
74 }
75
76 pub fn send(&self, topic_name: &str, message: &[u8]) -> Result<DeliveryFuture, KafkaError> {
77 let record: FutureRecord<&Vec<u8>, _> = FutureRecord::to(topic_name)
78 .payload(message)
79 .timestamp(chrono::Utc::now().timestamp_millis());
80 self.producer.send_result(record).map_err(|(e, _message)| e)
81 }
82
83 pub fn send_key_value(
84 &self,
85 topic_name: &str,
86 key: &[u8],
87 message: Option<Vec<u8>>,
88 ) -> Result<DeliveryFuture, KafkaError> {
89 let mut record: FutureRecord<_, _> = FutureRecord::to(topic_name)
90 .key(key)
91 .timestamp(chrono::Utc::now().timestamp_millis());
92 if let Some(message) = &message {
93 record = record.payload(message);
94 }
95 self.producer.send_result(record).map_err(|(e, _message)| e)
96 }
97}