1//! Cluster metadata.
23use std::ffi::CStr;
4use std::slice;
56use rdkafka_sys as rdsys;
7use rdkafka_sys::types::*;
89use crate::error::IsError;
10use crate::util::{KafkaDrop, NativePtr};
1112/// Broker metadata information.
13pub struct MetadataBroker(RDKafkaMetadataBroker);
1415impl MetadataBroker {
16/// Returns the id of the broker.
17pub fn id(&self) -> i32 {
18self.0.id
19 }
2021/// Returns the host name of the broker.
22pub fn host(&self) -> &str {
23unsafe {
24 CStr::from_ptr(self.0.host)
25 .to_str()
26 .expect("Broker host is not a valid UTF-8 string")
27 }
28 }
2930/// Returns the port of the broker.
31pub fn port(&self) -> i32 {
32self.0.port
33 }
34}
3536/// Partition metadata information.
37pub struct MetadataPartition(RDKafkaMetadataPartition);
3839impl MetadataPartition {
40/// Returns the id of the partition.
41pub fn id(&self) -> i32 {
42self.0.id
43 }
4445/// Returns the broker id of the leader broker for the partition.
46pub fn leader(&self) -> i32 {
47self.0.leader
48 }
4950// TODO: return result?
51/// Returns the metadata error for the partition, or `None` if there is no
52 /// error.
53pub fn error(&self) -> Option<RDKafkaRespErr> {
54if self.0.err.is_error() {
55Some(self.0.err)
56 } else {
57None
58}
59 }
6061/// Returns the broker IDs of the replicas.
62pub fn replicas(&self) -> &[i32] {
63unsafe { slice::from_raw_parts(self.0.replicas, self.0.replica_cnt as usize) }
64 }
6566/// Returns the broker IDs of the in-sync replicas.
67pub fn isr(&self) -> &[i32] {
68unsafe { slice::from_raw_parts(self.0.isrs, self.0.isr_cnt as usize) }
69 }
70}
7172/// Topic metadata information.
73pub struct MetadataTopic(RDKafkaMetadataTopic);
7475impl MetadataTopic {
76/// Returns the name of the topic.
77pub fn name(&self) -> &str {
78unsafe {
79 CStr::from_ptr(self.0.topic)
80 .to_str()
81 .expect("Topic name is not a valid UTF-8 string")
82 }
83 }
8485/// Returns the partition metadata information for all the partitions.
86pub fn partitions(&self) -> &[MetadataPartition] {
87unsafe {
88 slice::from_raw_parts(
89self.0.partitions as *const MetadataPartition,
90self.0.partition_cnt as usize,
91 )
92 }
93 }
9495/// Returns the metadata error for the topic, or `None` if there was no
96 /// error.
97pub fn error(&self) -> Option<RDKafkaRespErr> {
98if self.0.err.is_error() {
99Some(self.0.err)
100 } else {
101None
102}
103 }
104}
105106/// Metadata container.
107///
108/// This structure wraps the metadata pointer returned by rdkafka-sys, and
109/// deallocates all the native resources when dropped.
110pub struct Metadata(NativePtr<RDKafkaMetadata>);
111112unsafe impl KafkaDrop for RDKafkaMetadata {
113const TYPE: &'static str = "metadata";
114const DROP: unsafe extern "C" fn(*mut Self) = drop_metadata;
115}
116117unsafe extern "C" fn drop_metadata(ptr: *mut RDKafkaMetadata) {
118 rdsys::rd_kafka_metadata_destroy(ptr as *const _)
119}
120121impl Metadata {
122/// Creates a new Metadata container given a pointer to the native rdkafka-sys metadata.
123pub(crate) unsafe fn from_ptr(ptr: *const RDKafkaMetadata) -> Metadata {
124 Metadata(NativePtr::from_ptr(ptr as *mut _).unwrap())
125 }
126127/// Returns the ID of the broker originating this metadata.
128pub fn orig_broker_id(&self) -> i32 {
129self.0.orig_broker_id
130 }
131132/// Returns the hostname of the broker originating this metadata.
133pub fn orig_broker_name(&self) -> &str {
134unsafe {
135 CStr::from_ptr(self.0.orig_broker_name)
136 .to_str()
137 .expect("Broker name is not a valid UTF-8 string")
138 }
139 }
140141/// Returns the metadata information for all the brokers in the cluster.
142pub fn brokers(&self) -> &[MetadataBroker] {
143unsafe {
144 slice::from_raw_parts(
145self.0.brokers as *const MetadataBroker,
146self.0.broker_cnt as usize,
147 )
148 }
149 }
150151/// Returns the metadata information for all the topics in the cluster.
152pub fn topics(&self) -> &[MetadataTopic] {
153unsafe {
154 slice::from_raw_parts(
155self.0.topics as *const MetadataTopic,
156self.0.topic_cnt as usize,
157 )
158 }
159 }
160}
161162unsafe impl Send for Metadata {}
163unsafe impl Sync for Metadata {}