rdkafka/
error.rs

1//! Error manipulations.
2
3use std::error::Error;
4use std::ffi::{self, CStr};
5use std::fmt;
6use std::ptr;
7use std::sync::Arc;
8
9use rdkafka_sys as rdsys;
10use rdkafka_sys::types::*;
11
12use crate::util::{KafkaDrop, NativePtr};
13
14// Re-export rdkafka error code
15pub use rdsys::types::RDKafkaErrorCode;
16
17/// Kafka result.
18pub type KafkaResult<T> = Result<T, KafkaError>;
19
20/// Verify if the value represents an error condition.
21///
22/// Some librdkafka codes are informational, rather than true errors.
23pub trait IsError {
24    /// Reports whether the value represents an error.
25    fn is_error(&self) -> bool;
26}
27
28impl IsError for RDKafkaRespErr {
29    fn is_error(&self) -> bool {
30        *self as i32 != RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR as i32
31    }
32}
33
34impl IsError for RDKafkaConfRes {
35    fn is_error(&self) -> bool {
36        *self as i32 != RDKafkaConfRes::RD_KAFKA_CONF_OK as i32
37    }
38}
39
40impl IsError for RDKafkaError {
41    fn is_error(&self) -> bool {
42        self.0.is_some()
43    }
44}
45
46/// Native rdkafka error.
47#[derive(Clone)]
48pub struct RDKafkaError(Option<Arc<NativePtr<rdsys::rd_kafka_error_t>>>);
49
50unsafe impl KafkaDrop for rdsys::rd_kafka_error_t {
51    const TYPE: &'static str = "error";
52    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_error_destroy;
53}
54
55unsafe impl Send for RDKafkaError {}
56unsafe impl Sync for RDKafkaError {}
57
58impl RDKafkaError {
59    pub(crate) unsafe fn from_ptr(ptr: *mut rdsys::rd_kafka_error_t) -> RDKafkaError {
60        RDKafkaError(NativePtr::from_ptr(ptr).map(Arc::new))
61    }
62
63    fn ptr(&self) -> *const rdsys::rd_kafka_error_t {
64        match &self.0 {
65            None => ptr::null(),
66            Some(p) => p.ptr(),
67        }
68    }
69
70    /// Returns the error code or [`RDKafkaErrorCode::NoError`] if the error is
71    /// null.
72    pub fn code(&self) -> RDKafkaErrorCode {
73        unsafe { rdsys::rd_kafka_error_code(self.ptr()).into() }
74    }
75
76    /// Returns the error code name, e.g., "ERR_UNKNOWN_MEMBER_ID" or an empty
77    /// string if the error is null.
78    pub fn name(&self) -> String {
79        let cstr = unsafe { rdsys::rd_kafka_error_name(self.ptr()) };
80        unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() }
81    }
82
83    /// Returns a human readable error string or an empty string if the error is
84    /// null.
85    pub fn string(&self) -> String {
86        let cstr = unsafe { rdsys::rd_kafka_error_string(self.ptr()) };
87        unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() }
88    }
89
90    /// Reports whether the error is a fatal error.
91    ///
92    /// A fatal error indicates that the client instance is no longer usable.
93    pub fn is_fatal(&self) -> bool {
94        unsafe { rdsys::rd_kafka_error_is_fatal(self.ptr()) != 0 }
95    }
96
97    /// Reports whether the operation that encountered the error can be retried.
98    pub fn is_retriable(&self) -> bool {
99        unsafe { rdsys::rd_kafka_error_is_retriable(self.ptr()) != 0 }
100    }
101
102    /// Reports whether the error is an abortable transaction error.
103    pub fn txn_requires_abort(&self) -> bool {
104        unsafe { rdsys::rd_kafka_error_txn_requires_abort(self.ptr()) != 0 }
105    }
106}
107
108impl PartialEq for RDKafkaError {
109    fn eq(&self, other: &RDKafkaError) -> bool {
110        self.code() == other.code()
111    }
112}
113
114impl Eq for RDKafkaError {}
115
116impl fmt::Debug for RDKafkaError {
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        write!(f, "RDKafkaError({})", self)
119    }
120}
121
122impl fmt::Display for RDKafkaError {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        f.write_str(&self.string())
125    }
126}
127
128impl Error for RDKafkaError {}
129
130// TODO: consider using macro
131
132/// Represents all possible Kafka errors.
133///
134/// If applicable, check the underlying [`RDKafkaErrorCode`] to get details.
135#[derive(Clone, PartialEq, Eq)]
136#[non_exhaustive]
137pub enum KafkaError {
138    /// Creation of admin operation failed.
139    AdminOpCreation(String),
140    /// The admin operation itself failed.
141    AdminOp(RDKafkaErrorCode),
142    /// The client was dropped before the operation completed.
143    Canceled,
144    /// Invalid client configuration.
145    ClientConfig(RDKafkaConfRes, String, String, String),
146    /// Client creation failed.
147    ClientCreation(String),
148    /// Consumer commit failed.
149    ConsumerCommit(RDKafkaErrorCode),
150    /// Flushing failed
151    Flush(RDKafkaErrorCode),
152    /// Global error.
153    Global(RDKafkaErrorCode),
154    /// Group list fetch failed.
155    GroupListFetch(RDKafkaErrorCode),
156    /// Message consumption failed.
157    MessageConsumption(RDKafkaErrorCode),
158    /// Message production error.
159    MessageProduction(RDKafkaErrorCode),
160    /// Metadata fetch error.
161    MetadataFetch(RDKafkaErrorCode),
162    /// No message was received.
163    NoMessageReceived,
164    /// Unexpected null pointer
165    Nul(ffi::NulError),
166    /// OAuth configuration failed.
167    OAuthConfig(RDKafkaError),
168    /// Offset fetch failed.
169    OffsetFetch(RDKafkaErrorCode),
170    /// End of partition reached.
171    PartitionEOF(i32),
172    /// Pause/Resume failed.
173    PauseResume(String),
174    /// Seeking a partition failed.
175    Seek(String),
176    /// Setting partition offset failed.
177    SetPartitionOffset(RDKafkaErrorCode),
178    /// Offset store failed.
179    StoreOffset(RDKafkaErrorCode),
180    /// Subscription creation failed.
181    Subscription(String),
182    /// Transaction error.
183    Transaction(RDKafkaError),
184}
185
186impl fmt::Debug for KafkaError {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        match self {
189            KafkaError::AdminOp(err) => write!(f, "KafkaError (Admin operation error: {})", err),
190            KafkaError::AdminOpCreation(ref err) => {
191                write!(f, "KafkaError (Admin operation creation error: {})", err)
192            }
193            KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
194            KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!(
195                f,
196                "KafkaError (Client config error: {} {} {})",
197                desc, key, value
198            ),
199            KafkaError::ClientCreation(ref err) => {
200                write!(f, "KafkaError (Client creation error: {})", err)
201            }
202            KafkaError::ConsumerCommit(err) => {
203                write!(f, "KafkaError (Consumer commit error: {})", err)
204            }
205            KafkaError::Flush(err) => write!(f, "KafkaError (Flush error: {})", err),
206            KafkaError::Global(err) => write!(f, "KafkaError (Global error: {})", err),
207            KafkaError::GroupListFetch(err) => {
208                write!(f, "KafkaError (Group list fetch error: {})", err)
209            }
210            KafkaError::MessageConsumption(err) => {
211                write!(f, "KafkaError (Message consumption error: {})", err)
212            }
213            KafkaError::MessageProduction(err) => {
214                write!(f, "KafkaError (Message production error: {})", err)
215            }
216            KafkaError::MetadataFetch(err) => {
217                write!(f, "KafkaError (Metadata fetch error: {})", err)
218            }
219            KafkaError::NoMessageReceived => {
220                write!(f, "No message received within the given poll interval")
221            }
222            KafkaError::Nul(_) => write!(f, "FFI null error"),
223            KafkaError::OAuthConfig(err) => write!(f, "KafkaError (OAuth config error: {})", err),
224            KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err),
225            KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n),
226            KafkaError::PauseResume(ref err) => {
227                write!(f, "KafkaError (Pause/resume error: {})", err)
228            }
229            KafkaError::Seek(ref err) => write!(f, "KafkaError (Seek error: {})", err),
230            KafkaError::SetPartitionOffset(err) => {
231                write!(f, "KafkaError (Set partition offset error: {})", err)
232            }
233            KafkaError::StoreOffset(err) => write!(f, "KafkaError (Store offset error: {})", err),
234            KafkaError::Subscription(ref err) => {
235                write!(f, "KafkaError (Subscription error: {})", err)
236            }
237            KafkaError::Transaction(err) => write!(f, "KafkaError (Transaction error: {})", err),
238        }
239    }
240}
241
242impl fmt::Display for KafkaError {
243    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244        match self {
245            KafkaError::AdminOp(err) => write!(f, "Admin operation error: {}", err),
246            KafkaError::AdminOpCreation(ref err) => {
247                write!(f, "Admin operation creation error: {}", err)
248            }
249            KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
250            KafkaError::ClientConfig(_, ref desc, ref key, ref value) => {
251                write!(f, "Client config error: {} {} {}", desc, key, value)
252            }
253            KafkaError::ClientCreation(ref err) => write!(f, "Client creation error: {}", err),
254            KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", err),
255            KafkaError::Flush(err) => write!(f, "Flush error: {}", err),
256            KafkaError::Global(err) => write!(f, "Global error: {}", err),
257            KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err),
258            KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err),
259            KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err),
260            KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err),
261            KafkaError::NoMessageReceived => {
262                write!(f, "No message received within the given poll interval")
263            }
264            KafkaError::Nul(_) => write!(f, "FFI nul error"),
265            KafkaError::OAuthConfig(err) => write!(f, "OAuth config error: {}", err),
266            KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err),
267            KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n),
268            KafkaError::PauseResume(ref err) => write!(f, "Pause/resume error: {}", err),
269            KafkaError::Seek(ref err) => write!(f, "Seek error: {}", err),
270            KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err),
271            KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err),
272            KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err),
273            KafkaError::Transaction(err) => write!(f, "Transaction error: {}", err),
274        }
275    }
276}
277
278impl Error for KafkaError {
279    fn source(&self) -> Option<&(dyn Error + 'static)> {
280        match self {
281            KafkaError::AdminOp(_) => None,
282            KafkaError::AdminOpCreation(_) => None,
283            KafkaError::Canceled => None,
284            KafkaError::ClientConfig(..) => None,
285            KafkaError::ClientCreation(_) => None,
286            KafkaError::ConsumerCommit(err) => Some(err),
287            KafkaError::Flush(err) => Some(err),
288            KafkaError::Global(err) => Some(err),
289            KafkaError::GroupListFetch(err) => Some(err),
290            KafkaError::MessageConsumption(err) => Some(err),
291            KafkaError::MessageProduction(err) => Some(err),
292            KafkaError::MetadataFetch(err) => Some(err),
293            KafkaError::NoMessageReceived => None,
294            KafkaError::Nul(_) => None,
295            KafkaError::OAuthConfig(err) => Some(err),
296            KafkaError::OffsetFetch(err) => Some(err),
297            KafkaError::PartitionEOF(_) => None,
298            KafkaError::PauseResume(_) => None,
299            KafkaError::Seek(_) => None,
300            KafkaError::SetPartitionOffset(err) => Some(err),
301            KafkaError::StoreOffset(err) => Some(err),
302            KafkaError::Subscription(_) => None,
303            KafkaError::Transaction(err) => Some(err),
304        }
305    }
306}
307
308impl From<ffi::NulError> for KafkaError {
309    fn from(err: ffi::NulError) -> KafkaError {
310        KafkaError::Nul(err)
311    }
312}
313
314impl KafkaError {
315    /// Returns the [`RDKafkaErrorCode`] underlying this error, if any.
316    #[allow(clippy::match_same_arms)]
317    pub fn rdkafka_error_code(&self) -> Option<RDKafkaErrorCode> {
318        match self {
319            KafkaError::AdminOp(_) => None,
320            KafkaError::AdminOpCreation(_) => None,
321            KafkaError::Canceled => None,
322            KafkaError::ClientConfig(..) => None,
323            KafkaError::ClientCreation(_) => None,
324            KafkaError::ConsumerCommit(err) => Some(*err),
325            KafkaError::Flush(err) => Some(*err),
326            KafkaError::Global(err) => Some(*err),
327            KafkaError::GroupListFetch(err) => Some(*err),
328            KafkaError::MessageConsumption(err) => Some(*err),
329            KafkaError::MessageProduction(err) => Some(*err),
330            KafkaError::MetadataFetch(err) => Some(*err),
331            KafkaError::NoMessageReceived => None,
332            KafkaError::Nul(_) => None,
333            KafkaError::OAuthConfig(err) => Some(err.code()),
334            KafkaError::OffsetFetch(err) => Some(*err),
335            KafkaError::PartitionEOF(_) => None,
336            KafkaError::PauseResume(_) => None,
337            KafkaError::Seek(_) => None,
338            KafkaError::SetPartitionOffset(err) => Some(*err),
339            KafkaError::StoreOffset(err) => Some(*err),
340            KafkaError::Subscription(_) => None,
341            KafkaError::Transaction(err) => Some(err.code()),
342        }
343    }
344}