1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use std::time::Duration;
use anyhow::bail;
use ore::collections::CollectionExt;
use rdkafka::client::Client;
use rdkafka::consumer::ConsumerContext;
use rdkafka::producer::{DefaultProducerContext, DeliveryResult, ProducerContext};
use rdkafka::ClientContext;
use tracing::{debug, error, info, warn};
pub struct MzClientContext;
impl ClientContext for MzClientContext {
fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
use rdkafka::config::RDKafkaLogLevel::*;
match level {
Emerg | Alert | Critical | Error => {
error!(target: "librdkafka", "{} {}", fac, log_message);
}
Warning => warn!(target: "librdkafka", "{} {}", fac, log_message),
Notice => info!(target: "librdkafka", "{} {}", fac, log_message),
Info => info!(target: "librdkafka", "{} {}", fac, log_message),
Debug => debug!(target: "librdkafka", "{} {}", fac, log_message),
}
}
fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
error!("librdkafka: {}: {}", error, reason);
}
}
impl ConsumerContext for MzClientContext {}
impl ProducerContext for MzClientContext {
type DeliveryOpaque = <DefaultProducerContext as ProducerContext>::DeliveryOpaque;
fn delivery(
&self,
delivery_result: &DeliveryResult<'_>,
delivery_opaque: Self::DeliveryOpaque,
) {
DefaultProducerContext.delivery(delivery_result, delivery_opaque);
}
}
pub fn get_partitions<C: ClientContext>(
client: &Client<C>,
topic: &str,
timeout: Duration,
) -> Result<Vec<i32>, anyhow::Error> {
let meta = client.fetch_metadata(Some(&topic), timeout)?;
if meta.topics().len() != 1 {
bail!(
"topic {} has {} metadata entries; expected 1",
topic,
meta.topics().len()
);
}
let meta_topic = meta.topics().into_element();
if meta_topic.name() != topic {
bail!(
"got results for wrong topic {} (expected {})",
meta_topic.name(),
topic
);
}
if meta_topic.partitions().len() == 0 {
bail!("topic {} does not exist", topic);
}
Ok(meta_topic.partitions().iter().map(|x| x.id()).collect())
}