rdkafka/
config.rs

1//! Producer and consumer configuration.
2//!
3//! ## C library configuration
4//!
5//! The Rust library will forward all the configuration to the C library. The
6//! most frequently used parameters are listed here.
7//!
8//! ### Frequently used parameters
9//!
10//! For producer-specific and consumer-specific parameters check the producer
11//! and consumer modules documentation. The full list of available parameters is
12//! available in the [librdkafka documentation][librdkafka-config].
13//!
14//! - `client.id`: Client identifier. Default: `rdkafka`.
15//! - `bootstrap.servers`: Initial list of brokers as a CSV list of broker host
16//!    or host:port. Default: empty.
17//! - `message.max.bytes`: Maximum message size. Default: 1000000.
18//! - `debug`: A comma-separated list of debug contexts to enable. Use 'all' to
19//!    print all the debugging information. Default: empty (off).
20//! - `statistics.interval.ms`: how often the statistic callback
21//!    specified in the [`ClientContext`] will be called. Default: 0 (disabled).
22//!
23//! [librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
24
25use std::collections::HashMap;
26use std::ffi::{CStr, CString};
27use std::iter::FromIterator;
28use std::os::raw::c_char;
29use std::ptr;
30
31use rdkafka_sys as rdsys;
32use rdkafka_sys::types::*;
33
34use crate::client::ClientContext;
35use crate::error::{IsError, KafkaError, KafkaResult};
36use crate::log::{log_enabled, DEBUG, INFO, WARN};
37use crate::util::{ErrBuf, KafkaDrop, NativePtr};
38
39/// The log levels supported by librdkafka.
40#[derive(Copy, Clone, Debug)]
41pub enum RDKafkaLogLevel {
42    /// Higher priority then [`Level::Error`](log::Level::Error) from the log
43    /// crate.
44    Emerg = 0,
45    /// Higher priority then [`Level::Error`](log::Level::Error) from the log
46    /// crate.
47    Alert = 1,
48    /// Higher priority then [`Level::Error`](log::Level::Error) from the log
49    /// crate.
50    Critical = 2,
51    /// Equivalent to [`Level::Error`](log::Level::Error) from the log crate.
52    Error = 3,
53    /// Equivalent to [`Level::Warn`](log::Level::Warn) from the log crate.
54    Warning = 4,
55    /// Higher priority then [`Level::Info`](log::Level::Info) from the log
56    /// crate.
57    Notice = 5,
58    /// Equivalent to [`Level::Info`](log::Level::Info) from the log crate.
59    Info = 6,
60    /// Equivalent to [`Level::Debug`](log::Level::Debug) from the log crate.
61    Debug = 7,
62}
63
64impl RDKafkaLogLevel {
65    pub(crate) fn from_int(level: i32) -> RDKafkaLogLevel {
66        match level {
67            0 => RDKafkaLogLevel::Emerg,
68            1 => RDKafkaLogLevel::Alert,
69            2 => RDKafkaLogLevel::Critical,
70            3 => RDKafkaLogLevel::Error,
71            4 => RDKafkaLogLevel::Warning,
72            5 => RDKafkaLogLevel::Notice,
73            6 => RDKafkaLogLevel::Info,
74            _ => RDKafkaLogLevel::Debug,
75        }
76    }
77}
78
79//
80// ********** CLIENT CONFIG **********
81//
82
83/// A native rdkafka-sys client config.
84pub struct NativeClientConfig {
85    ptr: NativePtr<RDKafkaConf>,
86}
87
88unsafe impl KafkaDrop for RDKafkaConf {
89    const TYPE: &'static str = "client config";
90    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_conf_destroy;
91}
92
93impl NativeClientConfig {
94    /// Wraps a pointer to an `RDKafkaConfig` object and returns a new `NativeClientConfig`.
95    pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaConf) -> NativeClientConfig {
96        NativeClientConfig {
97            ptr: NativePtr::from_ptr(ptr).unwrap(),
98        }
99    }
100
101    /// Returns the pointer to the librdkafka RDKafkaConf structure.
102    pub fn ptr(&self) -> *mut RDKafkaConf {
103        self.ptr.ptr()
104    }
105
106    /// Gets the value of a parameter in the configuration.
107    ///
108    /// This method reflects librdkafka's view of the current value of the
109    /// parameter. If the parameter was overridden by the user, it returns the
110    /// user-specified value. Otherwise, it returns librdkafka's default value
111    /// for the parameter.
112    pub fn get(&self, key: &str) -> KafkaResult<String> {
113        let make_err = |res| {
114            KafkaError::ClientConfig(
115                res,
116                match res {
117                    RDKafkaConfRes::RD_KAFKA_CONF_UNKNOWN => "Unknown configuration name",
118                    RDKafkaConfRes::RD_KAFKA_CONF_INVALID => "Invalid configuration value",
119                    RDKafkaConfRes::RD_KAFKA_CONF_OK => "OK",
120                }
121                .into(),
122                key.into(),
123                "".into(),
124            )
125        };
126        let key_c = CString::new(key.to_string())?;
127
128        // Call with a `NULL` buffer to determine the size of the string.
129        let mut size = 0_usize;
130        let res = unsafe {
131            rdsys::rd_kafka_conf_get(self.ptr(), key_c.as_ptr(), ptr::null_mut(), &mut size)
132        };
133        if res.is_error() {
134            return Err(make_err(res));
135        }
136
137        // Allocate a buffer of that size and call again to get the actual
138        // string.
139        let mut buf = vec![0_u8; size];
140        let res = unsafe {
141            rdsys::rd_kafka_conf_get(
142                self.ptr(),
143                key_c.as_ptr(),
144                buf.as_mut_ptr() as *mut c_char,
145                &mut size,
146            )
147        };
148        if res.is_error() {
149            return Err(make_err(res));
150        }
151
152        // Convert the C string to a Rust string.
153        Ok(CStr::from_bytes_with_nul(&buf)
154            .unwrap()
155            .to_string_lossy()
156            .into())
157    }
158}
159
160/// Client configuration.
161#[derive(Clone, Debug)]
162pub struct ClientConfig {
163    conf_map: HashMap<String, String>,
164    /// The librdkafka logging level. Refer to [`RDKafkaLogLevel`] for the list
165    /// of available levels.
166    pub log_level: RDKafkaLogLevel,
167}
168
169impl Default for ClientConfig {
170    fn default() -> Self {
171        Self::new()
172    }
173}
174
175impl ClientConfig {
176    /// Creates a new empty configuration.
177    pub fn new() -> ClientConfig {
178        ClientConfig {
179            conf_map: HashMap::new(),
180            log_level: log_level_from_global_config(),
181        }
182    }
183
184    /// Gets the value of a parameter in the configuration.
185    ///
186    /// Returns the current value set for `key`, or `None` if no value for `key`
187    /// exists.
188    ///
189    /// Note that this method will only ever return values that were installed
190    /// by a call to [`ClientConfig::set`]. To retrieve librdkafka's default
191    /// value for a parameter, build a [`NativeClientConfig`] and then call
192    /// [`NativeClientConfig::get`] on the resulting object.
193    pub fn get(&self, key: &str) -> Option<&str> {
194        self.conf_map.get(key).map(|val| val.as_str())
195    }
196
197    /// Sets a parameter in the configuration.
198    ///
199    /// If there is an existing value for `key` in the configuration, it is
200    /// overridden with the new `value`.
201    pub fn set<K, V>(&mut self, key: K, value: V) -> &mut ClientConfig
202    where
203        K: Into<String>,
204        V: Into<String>,
205    {
206        self.conf_map.insert(key.into(), value.into());
207        self
208    }
209
210    /// Removes a parameter from the configuration.
211    pub fn remove<'a>(&'a mut self, key: &str) -> &'a mut ClientConfig {
212        self.conf_map.remove(key);
213        self
214    }
215
216    /// Sets the log level of the client. If not specified, the log level will be calculated based
217    /// on the global log level of the log crate.
218    pub fn set_log_level(&mut self, log_level: RDKafkaLogLevel) -> &mut ClientConfig {
219        self.log_level = log_level;
220        self
221    }
222
223    /// Builds a native librdkafka configuration.
224    pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
225        let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) };
226        let mut err_buf = ErrBuf::new();
227        for (key, value) in &self.conf_map {
228            let key_c = CString::new(key.to_string())?;
229            let value_c = CString::new(value.to_string())?;
230            let ret = unsafe {
231                rdsys::rd_kafka_conf_set(
232                    conf.ptr(),
233                    key_c.as_ptr(),
234                    value_c.as_ptr(),
235                    err_buf.as_mut_ptr(),
236                    err_buf.capacity(),
237                )
238            };
239            if ret.is_error() {
240                return Err(KafkaError::ClientConfig(
241                    ret,
242                    err_buf.to_string(),
243                    key.to_string(),
244                    value.to_string(),
245                ));
246            }
247        }
248        Ok(conf)
249    }
250
251    /// Uses the current configuration to create a new Consumer or Producer.
252    pub fn create<T: FromClientConfig>(&self) -> KafkaResult<T> {
253        T::from_config(self)
254    }
255
256    /// Uses the current configuration and the provided context to create a new Consumer or Producer.
257    pub fn create_with_context<C, T>(&self, context: C) -> KafkaResult<T>
258    where
259        C: ClientContext,
260        T: FromClientConfigAndContext<C>,
261    {
262        T::from_config_and_context(self, context)
263    }
264}
265
266impl FromIterator<(String, String)> for ClientConfig {
267    fn from_iter<I>(iter: I) -> ClientConfig
268    where
269        I: IntoIterator<Item = (String, String)>,
270    {
271        let mut config = ClientConfig::new();
272        config.extend(iter);
273        config
274    }
275}
276
277impl Extend<(String, String)> for ClientConfig {
278    fn extend<I>(&mut self, iter: I)
279    where
280        I: IntoIterator<Item = (String, String)>,
281    {
282        self.conf_map.extend(iter)
283    }
284}
285
286/// Return the log level
287fn log_level_from_global_config() -> RDKafkaLogLevel {
288    if log_enabled!(target: "librdkafka", DEBUG) {
289        RDKafkaLogLevel::Debug
290    } else if log_enabled!(target: "librdkafka", INFO) {
291        RDKafkaLogLevel::Info
292    } else if log_enabled!(target: "librdkafka", WARN) {
293        RDKafkaLogLevel::Warning
294    } else {
295        RDKafkaLogLevel::Error
296    }
297}
298
299/// Create a new client based on the provided configuration.
300pub trait FromClientConfig: Sized {
301    /// Creates a client from a client configuration. The default client context
302    /// will be used.
303    fn from_config(_: &ClientConfig) -> KafkaResult<Self>;
304}
305
306/// Create a new client based on the provided configuration and context.
307pub trait FromClientConfigAndContext<C: ClientContext>: Sized {
308    /// Creates a client from a client configuration and a client context.
309    fn from_config_and_context(_: &ClientConfig, _: C) -> KafkaResult<Self>;
310}
311
312#[cfg(test)]
313mod tests {
314    use super::ClientConfig;
315
316    #[test]
317    fn test_client_config_set_map() {
318        let mut config: ClientConfig = vec![("a".into(), "1".into()), ("b".into(), "1".into())]
319            .into_iter()
320            .collect();
321        config.extend([("b".into(), "2".into()), ("c".into(), "3".into())]);
322
323        assert_eq!(config.get("a").unwrap(), "1");
324        assert_eq!(config.get("b").unwrap(), "2");
325        assert_eq!(config.get("c").unwrap(), "3");
326    }
327}