1use std::ffi::CStr;
4use std::fmt;
5use std::future::Future;
6use std::ops::Deref;
7use std::os::raw::c_char;
8use std::os::raw::c_void;
9use std::ptr;
10use std::ptr::NonNull;
11use std::slice;
12use std::sync::Arc;
13#[cfg(feature = "naive-runtime")]
14use std::thread;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17#[cfg(feature = "naive-runtime")]
18use futures_channel::oneshot;
19#[cfg(feature = "naive-runtime")]
20use futures_util::future::{FutureExt, Map};
21
22use crate::log::trace;
23
24use rdkafka_sys as rdsys;
25
26pub fn get_rdkafka_version() -> (u16, String) {
29 let version_number = unsafe { rdsys::rd_kafka_version() } as u16;
30 let c_str = unsafe { CStr::from_ptr(rdsys::rd_kafka_version_str()) };
31 (version_number, c_str.to_string_lossy().into_owned())
32}
33
34#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
36pub enum Timeout {
37 After(Duration),
39 Never,
41}
42
43impl Timeout {
44 pub(crate) fn as_millis(&self) -> i32 {
46 match self {
47 Timeout::After(d) => d.as_millis() as i32,
48 Timeout::Never => -1,
49 }
50 }
51}
52
53impl std::ops::SubAssign for Timeout {
54 fn sub_assign(&mut self, other: Self) {
55 match (self, other) {
56 (Timeout::After(lhs), Timeout::After(rhs)) => *lhs -= rhs,
57 (Timeout::Never, Timeout::After(_)) => (),
58 _ => panic!("subtraction of Timeout::Never is ill-defined"),
59 }
60 }
61}
62
63impl From<Duration> for Timeout {
64 fn from(d: Duration) -> Timeout {
65 Timeout::After(d)
66 }
67}
68
69impl From<Option<Duration>> for Timeout {
70 fn from(v: Option<Duration>) -> Timeout {
71 match v {
72 None => Timeout::Never,
73 Some(d) => Timeout::After(d),
74 }
75 }
76}
77
78pub fn millis_to_epoch(time: SystemTime) -> i64 {
80 time.duration_since(UNIX_EPOCH)
81 .unwrap_or_else(|_| Duration::from_secs(0))
82 .as_millis() as i64
83}
84
85pub fn current_time_millis() -> i64 {
87 millis_to_epoch(SystemTime::now())
88}
89
90pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]> {
93 if ptr.is_null() {
94 None
95 } else {
96 Some(slice::from_raw_parts::<T>(ptr as *const T, size))
97 }
98}
99
100pub(crate) unsafe fn ptr_to_opt_mut_slice<'a, T>(
101 ptr: *const c_void,
102 size: usize,
103) -> Option<&'a mut [T]> {
104 if ptr.is_null() {
105 None
106 } else {
107 Some(slice::from_raw_parts_mut::<T>(ptr as *mut T, size))
108 }
109}
110
111pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T] {
114 if ptr.is_null() || size == 0 {
115 &[][..]
116 } else {
117 slice::from_raw_parts::<T>(ptr as *const T, size)
118 }
119}
120
121pub trait IntoOpaque: Send + Sync + Sized {
126 fn into_ptr(self) -> *mut c_void;
128
129 unsafe fn from_ptr(_: *mut c_void) -> Self;
138}
139
140impl IntoOpaque for () {
141 fn into_ptr(self) -> *mut c_void {
142 ptr::null_mut()
143 }
144
145 unsafe fn from_ptr(_: *mut c_void) -> Self {}
146}
147
148impl IntoOpaque for usize {
149 fn into_ptr(self) -> *mut c_void {
150 self as *mut c_void
151 }
152
153 unsafe fn from_ptr(ptr: *mut c_void) -> Self {
154 ptr as usize
155 }
156}
157
158impl<T: Send + Sync> IntoOpaque for Box<T> {
159 fn into_ptr(self) -> *mut c_void {
160 Box::into_raw(self) as *mut c_void
161 }
162
163 unsafe fn from_ptr(ptr: *mut c_void) -> Self {
164 Box::from_raw(ptr as *mut T)
165 }
166}
167
168impl<T: Send + Sync> IntoOpaque for Arc<T> {
169 fn into_ptr(self) -> *mut c_void {
170 Arc::into_raw(self) as *mut c_void
171 }
172
173 unsafe fn from_ptr(ptr: *mut c_void) -> Self {
174 Arc::from_raw(ptr as *const T)
175 }
176}
177
178pub unsafe fn cstr_to_owned(cstr: *const c_char) -> String {
184 CStr::from_ptr(cstr as *const c_char)
185 .to_string_lossy()
186 .into_owned()
187}
188
189pub(crate) struct ErrBuf {
190 buf: [u8; ErrBuf::MAX_ERR_LEN],
191}
192
193impl ErrBuf {
194 const MAX_ERR_LEN: usize = 512;
195
196 pub fn new() -> ErrBuf {
197 ErrBuf {
198 buf: [0; ErrBuf::MAX_ERR_LEN],
199 }
200 }
201
202 pub fn as_mut_ptr(&mut self) -> *mut c_char {
203 self.buf.as_mut_ptr() as *mut c_char
204 }
205
206 pub fn filled(&self) -> &[u8] {
207 let i = self.buf.iter().position(|c| *c == 0).unwrap();
208 &self.buf[..i + 1]
209 }
210
211 pub fn len(&self) -> usize {
212 self.filled().len()
213 }
214
215 pub fn capacity(&self) -> usize {
216 self.buf.len()
217 }
218}
219
220impl Default for ErrBuf {
221 fn default() -> ErrBuf {
222 ErrBuf::new()
223 }
224}
225
226impl fmt::Display for ErrBuf {
227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228 write!(
229 f,
230 "{}",
231 CStr::from_bytes_with_nul(self.filled())
232 .unwrap()
233 .to_string_lossy()
234 )
235 }
236}
237
238pub(crate) trait AsCArray<T> {
240 fn as_c_array(&self) -> *mut *mut T;
241}
242
243impl<T: KafkaDrop> AsCArray<T> for Vec<NativePtr<T>> {
244 fn as_c_array(&self) -> *mut *mut T {
245 self.as_ptr() as *mut *mut T
246 }
247}
248
249pub(crate) struct NativePtr<T>
250where
251 T: KafkaDrop,
252{
253 ptr: NonNull<T>,
254}
255
256impl<T> Drop for NativePtr<T>
257where
258 T: KafkaDrop,
259{
260 fn drop(&mut self) {
261 trace!("Destroying {}: {:?}", T::TYPE, self.ptr);
262 unsafe { T::DROP(self.ptr.as_ptr()) }
263 trace!("Destroyed {}: {:?}", T::TYPE, self.ptr);
264 }
265}
266
267pub(crate) unsafe trait KafkaDrop {
268 const TYPE: &'static str;
269 const DROP: unsafe extern "C" fn(*mut Self);
270}
271
272impl<T> Deref for NativePtr<T>
273where
274 T: KafkaDrop,
275{
276 type Target = T;
277 fn deref(&self) -> &Self::Target {
278 unsafe { self.ptr.as_ref() }
279 }
280}
281
282impl<T> fmt::Debug for NativePtr<T>
283where
284 T: KafkaDrop,
285{
286 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
287 self.ptr.fmt(f)
288 }
289}
290
291impl<T> NativePtr<T>
292where
293 T: KafkaDrop,
294{
295 pub(crate) unsafe fn from_ptr(ptr: *mut T) -> Option<Self> {
296 NonNull::new(ptr).map(|ptr| Self { ptr })
297 }
298
299 pub(crate) fn ptr(&self) -> *mut T {
300 self.ptr.as_ptr()
301 }
302}
303
304pub trait AsyncRuntime: Send + Sync + 'static {
321 type Delay: Future<Output = ()> + Send;
324
325 fn spawn<T>(task: T)
330 where
331 T: Future<Output = ()> + Send + 'static;
332
333 fn delay_for(duration: Duration) -> Self::Delay;
335}
336
337#[cfg(not(any(feature = "tokio", feature = "naive-runtime")))]
347pub type DefaultRuntime = ();
348
349#[cfg(all(not(feature = "tokio"), feature = "naive-runtime"))]
359pub type DefaultRuntime = NaiveRuntime;
360
361#[cfg(feature = "tokio")]
371pub type DefaultRuntime = TokioRuntime;
372
373#[cfg(feature = "naive-runtime")]
380#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
381pub struct NaiveRuntime;
382
383#[cfg(feature = "naive-runtime")]
384#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
385impl AsyncRuntime for NaiveRuntime {
386 type Delay = Map<oneshot::Receiver<()>, fn(Result<(), oneshot::Canceled>)>;
387
388 fn spawn<T>(task: T)
389 where
390 T: Future<Output = ()> + Send + 'static,
391 {
392 thread::spawn(|| futures_executor::block_on(task));
393 }
394
395 fn delay_for(duration: Duration) -> Self::Delay {
396 let (tx, rx) = oneshot::channel();
397 thread::spawn(move || {
398 thread::sleep(duration);
399 tx.send(())
400 });
401 rx.map(|_| ())
402 }
403}
404
405#[cfg(feature = "tokio")]
410#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
411pub struct TokioRuntime;
412
413#[cfg(feature = "tokio")]
414#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
415impl AsyncRuntime for TokioRuntime {
416 type Delay = tokio::time::Sleep;
417
418 fn spawn<T>(task: T)
419 where
420 T: Future<Output = ()> + Send + 'static,
421 {
422 tokio::spawn(task);
423 }
424
425 fn delay_for(duration: Duration) -> Self::Delay {
426 tokio::time::sleep(duration)
427 }
428}