1//! Group membership API.
23use std::ffi::CStr;
4use std::fmt;
5use std::slice;
67use rdkafka_sys as rdsys;
8use rdkafka_sys::types::*;
910use crate::util::{KafkaDrop, NativePtr};
1112/// Group member information container.
13pub struct GroupMemberInfo(RDKafkaGroupMemberInfo);
1415impl GroupMemberInfo {
16/// Returns the ID of the member.
17pub fn id(&self) -> &str {
18unsafe {
19 CStr::from_ptr(self.0.member_id)
20 .to_str()
21 .expect("Member id is not a valid UTF-8 string")
22 }
23 }
2425/// Returns the client ID of the member.
26pub fn client_id(&self) -> &str {
27unsafe {
28 CStr::from_ptr(self.0.client_id)
29 .to_str()
30 .expect("Client id is not a valid UTF-8 string")
31 }
32 }
3334/// Return the client host of the member.
35pub fn client_host(&self) -> &str {
36unsafe {
37 CStr::from_ptr(self.0.client_host)
38 .to_str()
39 .expect("Member host is not a valid UTF-8 string")
40 }
41 }
4243/// Return the metadata of the member.
44pub fn metadata(&self) -> Option<&[u8]> {
45unsafe {
46if self.0.member_metadata.is_null() {
47None
48} else {
49Some(slice::from_raw_parts::<u8>(
50self.0.member_metadata as *const u8,
51self.0.member_metadata_size as usize,
52 ))
53 }
54 }
55 }
5657/// Return the partition assignment of the member.
58pub fn assignment(&self) -> Option<&[u8]> {
59unsafe {
60if self.0.member_assignment.is_null() {
61None
62} else {
63Some(slice::from_raw_parts::<u8>(
64self.0.member_assignment as *const u8,
65self.0.member_assignment_size as usize,
66 ))
67 }
68 }
69 }
70}
7172/// Group information container.
73pub struct GroupInfo(RDKafkaGroupInfo);
7475impl GroupInfo {
76/// Returns the name of the group.
77pub fn name(&self) -> &str {
78unsafe {
79 CStr::from_ptr(self.0.group)
80 .to_str()
81 .expect("Group name is not a valid UTF-8 string")
82 }
83 }
8485/// Returns the members of the group.
86pub fn members(&self) -> &[GroupMemberInfo] {
87unsafe {
88 slice::from_raw_parts(
89self.0.members as *const GroupMemberInfo,
90self.0.member_cnt as usize,
91 )
92 }
93 }
9495/// Returns the state of the group.
96pub fn state(&self) -> &str {
97unsafe {
98 CStr::from_ptr(self.0.state)
99 .to_str()
100 .expect("State is not a valid UTF-8 string")
101 }
102 }
103104/// Returns the protocol of the group.
105pub fn protocol(&self) -> &str {
106unsafe {
107 CStr::from_ptr(self.0.protocol)
108 .to_str()
109 .expect("Protocol name is not a valid UTF-8 string")
110 }
111 }
112113/// Returns the protocol type of the group.
114pub fn protocol_type(&self) -> &str {
115unsafe {
116 CStr::from_ptr(self.0.protocol_type)
117 .to_str()
118 .expect("Protocol type is not a valid UTF-8 string")
119 }
120 }
121}
122123impl fmt::Debug for GroupInfo {
124fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125write!(f, "{}", self.name())
126 }
127}
128129/// List of groups.
130///
131/// This structure wraps the pointer returned by rdkafka-sys, and deallocates
132/// all the native resources when dropped.
133pub struct GroupList(NativePtr<RDKafkaGroupList>);
134135unsafe impl KafkaDrop for RDKafkaGroupList {
136const TYPE: &'static str = "group";
137const DROP: unsafe extern "C" fn(*mut Self) = drop_group_list;
138}
139140unsafe extern "C" fn drop_group_list(ptr: *mut RDKafkaGroupList) {
141 rdsys::rd_kafka_group_list_destroy(ptr as *const _)
142}
143144impl GroupList {
145/// Creates a new group list given a pointer to the native rdkafka-sys group list structure.
146pub(crate) unsafe fn from_ptr(ptr: *const RDKafkaGroupList) -> GroupList {
147 GroupList(NativePtr::from_ptr(ptr as *mut _).unwrap())
148 }
149150/// Returns all the groups in the list.
151pub fn groups(&self) -> &[GroupInfo] {
152unsafe {
153 slice::from_raw_parts(self.0.groups as *const GroupInfo, self.0.group_cnt as usize)
154 }
155 }
156}