1use std::collections::HashMap;
26use std::ffi::{CStr, CString};
27use std::iter::FromIterator;
28use std::os::raw::c_char;
29use std::ptr;
30
31use rdkafka_sys as rdsys;
32use rdkafka_sys::types::*;
33
34use crate::client::ClientContext;
35use crate::error::{IsError, KafkaError, KafkaResult};
36use crate::log::{log_enabled, DEBUG, INFO, WARN};
37use crate::util::{ErrBuf, KafkaDrop, NativePtr};
38
39#[derive(Copy, Clone, Debug)]
41pub enum RDKafkaLogLevel {
42 Emerg = 0,
45 Alert = 1,
48 Critical = 2,
51 Error = 3,
53 Warning = 4,
55 Notice = 5,
58 Info = 6,
60 Debug = 7,
62}
63
64impl RDKafkaLogLevel {
65 pub(crate) fn from_int(level: i32) -> RDKafkaLogLevel {
66 match level {
67 0 => RDKafkaLogLevel::Emerg,
68 1 => RDKafkaLogLevel::Alert,
69 2 => RDKafkaLogLevel::Critical,
70 3 => RDKafkaLogLevel::Error,
71 4 => RDKafkaLogLevel::Warning,
72 5 => RDKafkaLogLevel::Notice,
73 6 => RDKafkaLogLevel::Info,
74 _ => RDKafkaLogLevel::Debug,
75 }
76 }
77}
78
79pub struct NativeClientConfig {
85 ptr: NativePtr<RDKafkaConf>,
86}
87
88unsafe impl KafkaDrop for RDKafkaConf {
89 const TYPE: &'static str = "client config";
90 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_conf_destroy;
91}
92
93impl NativeClientConfig {
94 pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaConf) -> NativeClientConfig {
96 NativeClientConfig {
97 ptr: NativePtr::from_ptr(ptr).unwrap(),
98 }
99 }
100
101 pub fn ptr(&self) -> *mut RDKafkaConf {
103 self.ptr.ptr()
104 }
105
106 pub fn get(&self, key: &str) -> KafkaResult<String> {
113 let make_err = |res| {
114 KafkaError::ClientConfig(
115 res,
116 match res {
117 RDKafkaConfRes::RD_KAFKA_CONF_UNKNOWN => "Unknown configuration name",
118 RDKafkaConfRes::RD_KAFKA_CONF_INVALID => "Invalid configuration value",
119 RDKafkaConfRes::RD_KAFKA_CONF_OK => "OK",
120 }
121 .into(),
122 key.into(),
123 "".into(),
124 )
125 };
126 let key_c = CString::new(key.to_string())?;
127
128 let mut size = 0_usize;
130 let res = unsafe {
131 rdsys::rd_kafka_conf_get(self.ptr(), key_c.as_ptr(), ptr::null_mut(), &mut size)
132 };
133 if res.is_error() {
134 return Err(make_err(res));
135 }
136
137 let mut buf = vec![0_u8; size];
140 let res = unsafe {
141 rdsys::rd_kafka_conf_get(
142 self.ptr(),
143 key_c.as_ptr(),
144 buf.as_mut_ptr() as *mut c_char,
145 &mut size,
146 )
147 };
148 if res.is_error() {
149 return Err(make_err(res));
150 }
151
152 Ok(CStr::from_bytes_with_nul(&buf)
154 .unwrap()
155 .to_string_lossy()
156 .into())
157 }
158}
159
160#[derive(Clone, Debug)]
162pub struct ClientConfig {
163 conf_map: HashMap<String, String>,
164 pub log_level: RDKafkaLogLevel,
167}
168
169impl Default for ClientConfig {
170 fn default() -> Self {
171 Self::new()
172 }
173}
174
175impl ClientConfig {
176 pub fn new() -> ClientConfig {
178 ClientConfig {
179 conf_map: HashMap::new(),
180 log_level: log_level_from_global_config(),
181 }
182 }
183
184 pub fn get(&self, key: &str) -> Option<&str> {
194 self.conf_map.get(key).map(|val| val.as_str())
195 }
196
197 pub fn set<K, V>(&mut self, key: K, value: V) -> &mut ClientConfig
202 where
203 K: Into<String>,
204 V: Into<String>,
205 {
206 self.conf_map.insert(key.into(), value.into());
207 self
208 }
209
210 pub fn remove<'a>(&'a mut self, key: &str) -> &'a mut ClientConfig {
212 self.conf_map.remove(key);
213 self
214 }
215
216 pub fn set_log_level(&mut self, log_level: RDKafkaLogLevel) -> &mut ClientConfig {
219 self.log_level = log_level;
220 self
221 }
222
223 pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
225 let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) };
226 let mut err_buf = ErrBuf::new();
227 for (key, value) in &self.conf_map {
228 let key_c = CString::new(key.to_string())?;
229 let value_c = CString::new(value.to_string())?;
230 let ret = unsafe {
231 rdsys::rd_kafka_conf_set(
232 conf.ptr(),
233 key_c.as_ptr(),
234 value_c.as_ptr(),
235 err_buf.as_mut_ptr(),
236 err_buf.capacity(),
237 )
238 };
239 if ret.is_error() {
240 return Err(KafkaError::ClientConfig(
241 ret,
242 err_buf.to_string(),
243 key.to_string(),
244 value.to_string(),
245 ));
246 }
247 }
248 Ok(conf)
249 }
250
251 pub fn create<T: FromClientConfig>(&self) -> KafkaResult<T> {
253 T::from_config(self)
254 }
255
256 pub fn create_with_context<C, T>(&self, context: C) -> KafkaResult<T>
258 where
259 C: ClientContext,
260 T: FromClientConfigAndContext<C>,
261 {
262 T::from_config_and_context(self, context)
263 }
264}
265
266impl FromIterator<(String, String)> for ClientConfig {
267 fn from_iter<I>(iter: I) -> ClientConfig
268 where
269 I: IntoIterator<Item = (String, String)>,
270 {
271 let mut config = ClientConfig::new();
272 config.extend(iter);
273 config
274 }
275}
276
277impl Extend<(String, String)> for ClientConfig {
278 fn extend<I>(&mut self, iter: I)
279 where
280 I: IntoIterator<Item = (String, String)>,
281 {
282 self.conf_map.extend(iter)
283 }
284}
285
286fn log_level_from_global_config() -> RDKafkaLogLevel {
288 if log_enabled!(target: "librdkafka", DEBUG) {
289 RDKafkaLogLevel::Debug
290 } else if log_enabled!(target: "librdkafka", INFO) {
291 RDKafkaLogLevel::Info
292 } else if log_enabled!(target: "librdkafka", WARN) {
293 RDKafkaLogLevel::Warning
294 } else {
295 RDKafkaLogLevel::Error
296 }
297}
298
299pub trait FromClientConfig: Sized {
301 fn from_config(_: &ClientConfig) -> KafkaResult<Self>;
304}
305
306pub trait FromClientConfigAndContext<C: ClientContext>: Sized {
308 fn from_config_and_context(_: &ClientConfig, _: C) -> KafkaResult<Self>;
310}
311
312#[cfg(test)]
313mod tests {
314 use super::ClientConfig;
315
316 #[test]
317 fn test_client_config_set_map() {
318 let mut config: ClientConfig = vec![("a".into(), "1".into()), ("b".into(), "1".into())]
319 .into_iter()
320 .collect();
321 config.extend([("b".into(), "2".into()), ("c".into(), "3".into())]);
322
323 assert_eq!(config.get("a").unwrap(), "1");
324 assert_eq!(config.get("b").unwrap(), "2");
325 assert_eq!(config.get("c").unwrap(), "3");
326 }
327}