1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use std::time::Duration;
use anyhow::Context;
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::config::ClientConfig;
use rdkafka::error::KafkaError;
use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord};
use kafka_util::client::MzClientContext;
pub struct KafkaClient {
producer: FutureProducer<MzClientContext>,
kafka_url: String,
}
impl KafkaClient {
pub fn new(
kafka_url: &str,
group_id: &str,
configs: &[(&str, &str)],
) -> Result<KafkaClient, anyhow::Error> {
let mut config = ClientConfig::new();
config.set("bootstrap.servers", kafka_url);
config.set("group.id", group_id);
for (key, val) in configs {
config.set(*key, *val);
}
let producer = config.create_with_context(MzClientContext)?;
Ok(KafkaClient {
producer,
kafka_url: kafka_url.to_string(),
})
}
pub async fn create_topic(
&self,
topic_name: &str,
partitions: i32,
replication: i32,
configs: &[(&str, &str)],
timeout: Option<Duration>,
) -> Result<(), anyhow::Error> {
let mut config = ClientConfig::new();
config.set("bootstrap.servers", &self.kafka_url);
let client = config
.create::<AdminClient<_>>()
.expect("creating admin kafka client failed");
let admin_opts = AdminOptions::new().request_timeout(timeout);
let mut topic = NewTopic::new(topic_name, partitions, TopicReplication::Fixed(replication));
for (key, val) in configs {
topic = topic.set(key, val);
}
kafka_util::admin::ensure_topic(&client, &admin_opts, &topic)
.await
.context(format!("creating Kafka topic: {}", topic_name))?;
Ok(())
}
pub fn send(&self, topic_name: &str, message: &[u8]) -> Result<DeliveryFuture, KafkaError> {
let record: FutureRecord<&Vec<u8>, _> = FutureRecord::to(&topic_name)
.payload(message)
.timestamp(chrono::Utc::now().timestamp_millis());
self.producer.send_result(record).map_err(|(e, _message)| e)
}
pub fn send_key_value(
&self,
topic_name: &str,
key: &[u8],
message: Option<Vec<u8>>,
) -> Result<DeliveryFuture, KafkaError> {
let mut record: FutureRecord<_, _> = FutureRecord::to(&topic_name)
.key(key)
.timestamp(chrono::Utc::now().timestamp_millis());
if let Some(message) = &message {
record = record.payload(message);
}
self.producer.send_result(record).map_err(|(e, _message)| e)
}
}