use std::ffi::CStr;
use std::fmt;
use std::slice;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::util::{KafkaDrop, NativePtr};
pub struct GroupMemberInfo(RDKafkaGroupMemberInfo);
impl GroupMemberInfo {
pub fn id(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.member_id)
.to_str()
.expect("Member id is not a valid UTF-8 string")
}
}
pub fn client_id(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.client_id)
.to_str()
.expect("Client id is not a valid UTF-8 string")
}
}
pub fn client_host(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.client_host)
.to_str()
.expect("Member host is not a valid UTF-8 string")
}
}
pub fn metadata(&self) -> Option<&[u8]> {
unsafe {
if self.0.member_metadata.is_null() {
None
} else {
Some(slice::from_raw_parts::<u8>(
self.0.member_metadata as *const u8,
self.0.member_metadata_size as usize,
))
}
}
}
pub fn assignment(&self) -> Option<&[u8]> {
unsafe {
if self.0.member_assignment.is_null() {
None
} else {
Some(slice::from_raw_parts::<u8>(
self.0.member_assignment as *const u8,
self.0.member_assignment_size as usize,
))
}
}
}
}
pub struct GroupInfo(RDKafkaGroupInfo);
impl GroupInfo {
pub fn name(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.group)
.to_str()
.expect("Group name is not a valid UTF-8 string")
}
}
pub fn members(&self) -> &[GroupMemberInfo] {
unsafe {
slice::from_raw_parts(
self.0.members as *const GroupMemberInfo,
self.0.member_cnt as usize,
)
}
}
pub fn state(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.state)
.to_str()
.expect("State is not a valid UTF-8 string")
}
}
pub fn protocol(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.protocol)
.to_str()
.expect("Protocol name is not a valid UTF-8 string")
}
}
pub fn protocol_type(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.protocol_type)
.to_str()
.expect("Protocol type is not a valid UTF-8 string")
}
}
}
impl fmt::Debug for GroupInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.name())
}
}
pub struct GroupList(NativePtr<RDKafkaGroupList>);
unsafe impl KafkaDrop for RDKafkaGroupList {
const TYPE: &'static str = "group";
const DROP: unsafe extern "C" fn(*mut Self) = drop_group_list;
}
unsafe extern "C" fn drop_group_list(ptr: *mut RDKafkaGroupList) {
rdsys::rd_kafka_group_list_destroy(ptr as *const _)
}
impl GroupList {
pub(crate) unsafe fn from_ptr(ptr: *const RDKafkaGroupList) -> GroupList {
GroupList(NativePtr::from_ptr(ptr as *mut _).unwrap())
}
pub fn groups(&self) -> &[GroupInfo] {
unsafe {
slice::from_raw_parts(self.0.groups as *const GroupInfo, self.0.group_cnt as usize)
}
}
}