rdkafka/
util.rs

1//! Utility functions and types.
2
3use std::ffi::CStr;
4use std::fmt;
5use std::future::Future;
6use std::ops::Deref;
7use std::os::raw::c_char;
8use std::os::raw::c_void;
9use std::ptr;
10use std::ptr::NonNull;
11use std::slice;
12use std::sync::Arc;
13#[cfg(feature = "naive-runtime")]
14use std::thread;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17#[cfg(feature = "naive-runtime")]
18use futures_channel::oneshot;
19#[cfg(feature = "naive-runtime")]
20use futures_util::future::{FutureExt, Map};
21
22use crate::log::trace;
23
24use rdkafka_sys as rdsys;
25
26/// Returns a tuple representing the version of `librdkafka` in hexadecimal and
27/// string format.
28pub fn get_rdkafka_version() -> (u16, String) {
29    let version_number = unsafe { rdsys::rd_kafka_version() } as u16;
30    let c_str = unsafe { CStr::from_ptr(rdsys::rd_kafka_version_str()) };
31    (version_number, c_str.to_string_lossy().into_owned())
32}
33
34/// Specifies a timeout for a Kafka operation.
35#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
36pub enum Timeout {
37    /// Time out after the specified duration elapses.
38    After(Duration),
39    /// Block forever.
40    Never,
41}
42
43impl Timeout {
44    /// Converts a timeout to Kafka's expected representation.
45    pub(crate) fn as_millis(&self) -> i32 {
46        match self {
47            Timeout::After(d) => d.as_millis() as i32,
48            Timeout::Never => -1,
49        }
50    }
51}
52
53impl std::ops::SubAssign for Timeout {
54    fn sub_assign(&mut self, other: Self) {
55        match (self, other) {
56            (Timeout::After(lhs), Timeout::After(rhs)) => *lhs -= rhs,
57            (Timeout::Never, Timeout::After(_)) => (),
58            _ => panic!("subtraction of Timeout::Never is ill-defined"),
59        }
60    }
61}
62
63impl From<Duration> for Timeout {
64    fn from(d: Duration) -> Timeout {
65        Timeout::After(d)
66    }
67}
68
69impl From<Option<Duration>> for Timeout {
70    fn from(v: Option<Duration>) -> Timeout {
71        match v {
72            None => Timeout::Never,
73            Some(d) => Timeout::After(d),
74        }
75    }
76}
77
78/// Converts the given time to the number of milliseconds since the Unix epoch.
79pub fn millis_to_epoch(time: SystemTime) -> i64 {
80    time.duration_since(UNIX_EPOCH)
81        .unwrap_or_else(|_| Duration::from_secs(0))
82        .as_millis() as i64
83}
84
85/// Returns the current time in milliseconds since the Unix epoch.
86pub fn current_time_millis() -> i64 {
87    millis_to_epoch(SystemTime::now())
88}
89
90/// Converts a pointer to an array to an optional slice. If the pointer is null,
91/// returns `None`.
92pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]> {
93    if ptr.is_null() {
94        None
95    } else {
96        Some(slice::from_raw_parts::<T>(ptr as *const T, size))
97    }
98}
99
100pub(crate) unsafe fn ptr_to_opt_mut_slice<'a, T>(
101    ptr: *const c_void,
102    size: usize,
103) -> Option<&'a mut [T]> {
104    if ptr.is_null() {
105        None
106    } else {
107        Some(slice::from_raw_parts_mut::<T>(ptr as *mut T, size))
108    }
109}
110
111/// Converts a pointer to an array to a slice. If the pointer is null or the
112/// size is zero, returns a zero-length slice..
113pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T] {
114    if ptr.is_null() || size == 0 {
115        &[][..]
116    } else {
117        slice::from_raw_parts::<T>(ptr as *const T, size)
118    }
119}
120
121/// Converts Rust data to and from raw pointers.
122///
123/// This conversion is used to pass opaque objects to the C library and vice
124/// versa.
125pub trait IntoOpaque: Send + Sync + Sized {
126    /// Converts the object into a raw pointer.
127    fn into_ptr(self) -> *mut c_void;
128
129    /// Converts the raw pointer back to the original Rust object.
130    ///
131    /// # Safety
132    ///
133    /// The pointer must be created with [into_ptr](IntoOpaque::into_ptr).
134    ///
135    /// Care must be taken to not call more than once if it would result
136    /// in an aliasing violation (e.g. [Box]).
137    unsafe fn from_ptr(_: *mut c_void) -> Self;
138}
139
140impl IntoOpaque for () {
141    fn into_ptr(self) -> *mut c_void {
142        ptr::null_mut()
143    }
144
145    unsafe fn from_ptr(_: *mut c_void) -> Self {}
146}
147
148impl IntoOpaque for usize {
149    fn into_ptr(self) -> *mut c_void {
150        self as *mut c_void
151    }
152
153    unsafe fn from_ptr(ptr: *mut c_void) -> Self {
154        ptr as usize
155    }
156}
157
158impl<T: Send + Sync> IntoOpaque for Box<T> {
159    fn into_ptr(self) -> *mut c_void {
160        Box::into_raw(self) as *mut c_void
161    }
162
163    unsafe fn from_ptr(ptr: *mut c_void) -> Self {
164        Box::from_raw(ptr as *mut T)
165    }
166}
167
168impl<T: Send + Sync> IntoOpaque for Arc<T> {
169    fn into_ptr(self) -> *mut c_void {
170        Arc::into_raw(self) as *mut c_void
171    }
172
173    unsafe fn from_ptr(ptr: *mut c_void) -> Self {
174        Arc::from_raw(ptr as *const T)
175    }
176}
177
178/// Converts a C string into a [`String`].
179///
180/// # Safety
181///
182/// `cstr` must point to a valid, null-terminated C string.
183pub unsafe fn cstr_to_owned(cstr: *const c_char) -> String {
184    CStr::from_ptr(cstr as *const c_char)
185        .to_string_lossy()
186        .into_owned()
187}
188
189pub(crate) struct ErrBuf {
190    buf: [u8; ErrBuf::MAX_ERR_LEN],
191}
192
193impl ErrBuf {
194    const MAX_ERR_LEN: usize = 512;
195
196    pub fn new() -> ErrBuf {
197        ErrBuf {
198            buf: [0; ErrBuf::MAX_ERR_LEN],
199        }
200    }
201
202    pub fn as_mut_ptr(&mut self) -> *mut c_char {
203        self.buf.as_mut_ptr() as *mut c_char
204    }
205
206    pub fn filled(&self) -> &[u8] {
207        let i = self.buf.iter().position(|c| *c == 0).unwrap();
208        &self.buf[..i + 1]
209    }
210
211    pub fn len(&self) -> usize {
212        self.filled().len()
213    }
214
215    pub fn capacity(&self) -> usize {
216        self.buf.len()
217    }
218}
219
220impl Default for ErrBuf {
221    fn default() -> ErrBuf {
222        ErrBuf::new()
223    }
224}
225
226impl fmt::Display for ErrBuf {
227    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228        write!(
229            f,
230            "{}",
231            CStr::from_bytes_with_nul(self.filled())
232                .unwrap()
233                .to_string_lossy()
234        )
235    }
236}
237
238/// Converts a container into a C array.
239pub(crate) trait AsCArray<T> {
240    fn as_c_array(&self) -> *mut *mut T;
241}
242
243impl<T: KafkaDrop> AsCArray<T> for Vec<NativePtr<T>> {
244    fn as_c_array(&self) -> *mut *mut T {
245        self.as_ptr() as *mut *mut T
246    }
247}
248
249pub(crate) struct NativePtr<T>
250where
251    T: KafkaDrop,
252{
253    ptr: NonNull<T>,
254}
255
256impl<T> Drop for NativePtr<T>
257where
258    T: KafkaDrop,
259{
260    fn drop(&mut self) {
261        trace!("Destroying {}: {:?}", T::TYPE, self.ptr);
262        unsafe { T::DROP(self.ptr.as_ptr()) }
263        trace!("Destroyed {}: {:?}", T::TYPE, self.ptr);
264    }
265}
266
267pub(crate) unsafe trait KafkaDrop {
268    const TYPE: &'static str;
269    const DROP: unsafe extern "C" fn(*mut Self);
270}
271
272impl<T> Deref for NativePtr<T>
273where
274    T: KafkaDrop,
275{
276    type Target = T;
277    fn deref(&self) -> &Self::Target {
278        unsafe { self.ptr.as_ref() }
279    }
280}
281
282impl<T> fmt::Debug for NativePtr<T>
283where
284    T: KafkaDrop,
285{
286    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
287        self.ptr.fmt(f)
288    }
289}
290
291impl<T> NativePtr<T>
292where
293    T: KafkaDrop,
294{
295    pub(crate) unsafe fn from_ptr(ptr: *mut T) -> Option<Self> {
296        NonNull::new(ptr).map(|ptr| Self { ptr })
297    }
298
299    pub(crate) fn ptr(&self) -> *mut T {
300        self.ptr.as_ptr()
301    }
302}
303
304/// An abstraction over asynchronous runtimes.
305///
306/// There are several asynchronous runtimes available for Rust. By default
307/// rust-rdkafka uses Tokio, via the [`TokioRuntime`], but it has pluggable
308/// support for any runtime that can satisfy this trait.
309///
310/// For an example of using the [smol] runtime with rust-rdkafka, see the
311/// [runtime_smol] example.
312///
313/// For an example of using the [async-std] runtime with rust-rdkafka, see the
314/// [runtime_async_std] example.
315///
316/// [smol]: https://docs.rs/smol
317/// [async-std]: https://docs.rs/async-std
318/// [runtime_smol]: https://github.com/fede1024/rust-rdkafka/tree/master/examples/runtime_smol.rs
319/// [runtime_async_std]: https://github.com/fede1024/rust-rdkafka/tree/master/examples/runtime_async_std.rs
320pub trait AsyncRuntime: Send + Sync + 'static {
321    /// The type of the future returned by
322    /// [`delay_for`](AsyncRuntime::delay_for).
323    type Delay: Future<Output = ()> + Send;
324
325    /// Spawns an asynchronous task.
326    ///
327    /// The task should be be polled to completion, unless the runtime exits
328    /// first. With some runtimes this requires an explicit "detach" step.
329    fn spawn<T>(task: T)
330    where
331        T: Future<Output = ()> + Send + 'static;
332
333    /// Constructs a future that will resolve after `duration` has elapsed.
334    fn delay_for(duration: Duration) -> Self::Delay;
335}
336
337/// The default [`AsyncRuntime`] used when one is not explicitly specified.
338///
339/// This is defined to be the [`TokioRuntime`] when the `tokio` feature is
340/// enabled, or the [`NaiveRuntime`] if the `naive-runtime` feature is enabled.
341///
342/// If neither the `tokio` nor `naive-runtime` feature is enabled, this is
343/// defined to be `()`, which is not a valid `AsyncRuntime` and will cause
344/// compilation errors if used as one. You will need to explicitly specify a
345/// custom async runtime wherever one is required.
346#[cfg(not(any(feature = "tokio", feature = "naive-runtime")))]
347pub type DefaultRuntime = ();
348
349/// The default [`AsyncRuntime`] used when one is not explicitly specified.
350///
351/// This is defined to be the [`TokioRuntime`] when the `tokio` feature is
352/// enabled, or the [`NaiveRuntime`] if the `naive-runtime` feature is enabled.
353///
354/// If neither the `tokio` nor `naive-runtime` feature is enabled, this is
355/// defined to be `()`, which is not a valid `AsyncRuntime` and will cause
356/// compilation errors if used as one. You will need to explicitly specify a
357/// custom async runtime wherever one is required.
358#[cfg(all(not(feature = "tokio"), feature = "naive-runtime"))]
359pub type DefaultRuntime = NaiveRuntime;
360
361/// The default [`AsyncRuntime`] used when one is not explicitly specified.
362///
363/// This is defined to be the [`TokioRuntime`] when the `tokio` feature is
364/// enabled, or the [`NaiveRuntime`] if the `naive-runtime` feature is enabled.
365///
366/// If neither the `tokio` nor `naive-runtime` feature is enabled, this is
367/// defined to be `()`, which is not a valid `AsyncRuntime` and will cause
368/// compilation errors if used as one. You will need to explicitly specify a
369/// custom async runtime wherever one is required.
370#[cfg(feature = "tokio")]
371pub type DefaultRuntime = TokioRuntime;
372
373/// An [`AsyncRuntime`] implementation backed by the executor in the
374/// [`futures_executor`](futures_executor) crate.
375///
376/// This runtime should not be used when performance is a concern, as it makes
377/// heavy use of threads to compensate for the lack of a timer in the futures
378/// executor.
379#[cfg(feature = "naive-runtime")]
380#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
381pub struct NaiveRuntime;
382
383#[cfg(feature = "naive-runtime")]
384#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
385impl AsyncRuntime for NaiveRuntime {
386    type Delay = Map<oneshot::Receiver<()>, fn(Result<(), oneshot::Canceled>)>;
387
388    fn spawn<T>(task: T)
389    where
390        T: Future<Output = ()> + Send + 'static,
391    {
392        thread::spawn(|| futures_executor::block_on(task));
393    }
394
395    fn delay_for(duration: Duration) -> Self::Delay {
396        let (tx, rx) = oneshot::channel();
397        thread::spawn(move || {
398            thread::sleep(duration);
399            tx.send(())
400        });
401        rx.map(|_| ())
402    }
403}
404
405/// An [`AsyncRuntime`] implementation backed by [Tokio](tokio).
406///
407/// This runtime is used by default throughout the crate, unless the `tokio`
408/// feature is disabled.
409#[cfg(feature = "tokio")]
410#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
411pub struct TokioRuntime;
412
413#[cfg(feature = "tokio")]
414#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
415impl AsyncRuntime for TokioRuntime {
416    type Delay = tokio::time::Sleep;
417
418    fn spawn<T>(task: T)
419    where
420        T: Future<Output = ()> + Send + 'static,
421    {
422        tokio::spawn(task);
423    }
424
425    fn delay_for(duration: Duration) -> Self::Delay {
426        tokio::time::sleep(duration)
427    }
428}