1use std::error::Error;
4use std::ffi::{self, CStr};
5use std::fmt;
6use std::ptr;
7use std::sync::Arc;
8
9use rdkafka_sys as rdsys;
10use rdkafka_sys::types::*;
11
12use crate::util::{KafkaDrop, NativePtr};
13
14pub use rdsys::types::RDKafkaErrorCode;
16
17pub type KafkaResult<T> = Result<T, KafkaError>;
19
20pub trait IsError {
24 fn is_error(&self) -> bool;
26}
27
28impl IsError for RDKafkaRespErr {
29 fn is_error(&self) -> bool {
30 *self as i32 != RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR as i32
31 }
32}
33
34impl IsError for RDKafkaConfRes {
35 fn is_error(&self) -> bool {
36 *self as i32 != RDKafkaConfRes::RD_KAFKA_CONF_OK as i32
37 }
38}
39
40impl IsError for RDKafkaError {
41 fn is_error(&self) -> bool {
42 self.0.is_some()
43 }
44}
45
46#[derive(Clone)]
48pub struct RDKafkaError(Option<Arc<NativePtr<rdsys::rd_kafka_error_t>>>);
49
50unsafe impl KafkaDrop for rdsys::rd_kafka_error_t {
51 const TYPE: &'static str = "error";
52 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_error_destroy;
53}
54
55unsafe impl Send for RDKafkaError {}
56unsafe impl Sync for RDKafkaError {}
57
58impl RDKafkaError {
59 pub(crate) unsafe fn from_ptr(ptr: *mut rdsys::rd_kafka_error_t) -> RDKafkaError {
60 RDKafkaError(NativePtr::from_ptr(ptr).map(Arc::new))
61 }
62
63 fn ptr(&self) -> *const rdsys::rd_kafka_error_t {
64 match &self.0 {
65 None => ptr::null(),
66 Some(p) => p.ptr(),
67 }
68 }
69
70 pub fn code(&self) -> RDKafkaErrorCode {
73 unsafe { rdsys::rd_kafka_error_code(self.ptr()).into() }
74 }
75
76 pub fn name(&self) -> String {
79 let cstr = unsafe { rdsys::rd_kafka_error_name(self.ptr()) };
80 unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() }
81 }
82
83 pub fn string(&self) -> String {
86 let cstr = unsafe { rdsys::rd_kafka_error_string(self.ptr()) };
87 unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() }
88 }
89
90 pub fn is_fatal(&self) -> bool {
94 unsafe { rdsys::rd_kafka_error_is_fatal(self.ptr()) != 0 }
95 }
96
97 pub fn is_retriable(&self) -> bool {
99 unsafe { rdsys::rd_kafka_error_is_retriable(self.ptr()) != 0 }
100 }
101
102 pub fn txn_requires_abort(&self) -> bool {
104 unsafe { rdsys::rd_kafka_error_txn_requires_abort(self.ptr()) != 0 }
105 }
106}
107
108impl PartialEq for RDKafkaError {
109 fn eq(&self, other: &RDKafkaError) -> bool {
110 self.code() == other.code()
111 }
112}
113
114impl Eq for RDKafkaError {}
115
116impl fmt::Debug for RDKafkaError {
117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118 write!(f, "RDKafkaError({})", self)
119 }
120}
121
122impl fmt::Display for RDKafkaError {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 f.write_str(&self.string())
125 }
126}
127
128impl Error for RDKafkaError {}
129
130#[derive(Clone, PartialEq, Eq)]
136#[non_exhaustive]
137pub enum KafkaError {
138 AdminOpCreation(String),
140 AdminOp(RDKafkaErrorCode),
142 Canceled,
144 ClientConfig(RDKafkaConfRes, String, String, String),
146 ClientCreation(String),
148 ConsumerCommit(RDKafkaErrorCode),
150 Flush(RDKafkaErrorCode),
152 Global(RDKafkaErrorCode),
154 GroupListFetch(RDKafkaErrorCode),
156 MessageConsumption(RDKafkaErrorCode),
158 MessageProduction(RDKafkaErrorCode),
160 MetadataFetch(RDKafkaErrorCode),
162 NoMessageReceived,
164 Nul(ffi::NulError),
166 OAuthConfig(RDKafkaError),
168 OffsetFetch(RDKafkaErrorCode),
170 PartitionEOF(i32),
172 PauseResume(String),
174 Seek(String),
176 SetPartitionOffset(RDKafkaErrorCode),
178 StoreOffset(RDKafkaErrorCode),
180 Subscription(String),
182 Transaction(RDKafkaError),
184}
185
186impl fmt::Debug for KafkaError {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 match self {
189 KafkaError::AdminOp(err) => write!(f, "KafkaError (Admin operation error: {})", err),
190 KafkaError::AdminOpCreation(ref err) => {
191 write!(f, "KafkaError (Admin operation creation error: {})", err)
192 }
193 KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
194 KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!(
195 f,
196 "KafkaError (Client config error: {} {} {})",
197 desc, key, value
198 ),
199 KafkaError::ClientCreation(ref err) => {
200 write!(f, "KafkaError (Client creation error: {})", err)
201 }
202 KafkaError::ConsumerCommit(err) => {
203 write!(f, "KafkaError (Consumer commit error: {})", err)
204 }
205 KafkaError::Flush(err) => write!(f, "KafkaError (Flush error: {})", err),
206 KafkaError::Global(err) => write!(f, "KafkaError (Global error: {})", err),
207 KafkaError::GroupListFetch(err) => {
208 write!(f, "KafkaError (Group list fetch error: {})", err)
209 }
210 KafkaError::MessageConsumption(err) => {
211 write!(f, "KafkaError (Message consumption error: {})", err)
212 }
213 KafkaError::MessageProduction(err) => {
214 write!(f, "KafkaError (Message production error: {})", err)
215 }
216 KafkaError::MetadataFetch(err) => {
217 write!(f, "KafkaError (Metadata fetch error: {})", err)
218 }
219 KafkaError::NoMessageReceived => {
220 write!(f, "No message received within the given poll interval")
221 }
222 KafkaError::Nul(_) => write!(f, "FFI null error"),
223 KafkaError::OAuthConfig(err) => write!(f, "KafkaError (OAuth config error: {})", err),
224 KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err),
225 KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n),
226 KafkaError::PauseResume(ref err) => {
227 write!(f, "KafkaError (Pause/resume error: {})", err)
228 }
229 KafkaError::Seek(ref err) => write!(f, "KafkaError (Seek error: {})", err),
230 KafkaError::SetPartitionOffset(err) => {
231 write!(f, "KafkaError (Set partition offset error: {})", err)
232 }
233 KafkaError::StoreOffset(err) => write!(f, "KafkaError (Store offset error: {})", err),
234 KafkaError::Subscription(ref err) => {
235 write!(f, "KafkaError (Subscription error: {})", err)
236 }
237 KafkaError::Transaction(err) => write!(f, "KafkaError (Transaction error: {})", err),
238 }
239 }
240}
241
242impl fmt::Display for KafkaError {
243 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244 match self {
245 KafkaError::AdminOp(err) => write!(f, "Admin operation error: {}", err),
246 KafkaError::AdminOpCreation(ref err) => {
247 write!(f, "Admin operation creation error: {}", err)
248 }
249 KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
250 KafkaError::ClientConfig(_, ref desc, ref key, ref value) => {
251 write!(f, "Client config error: {} {} {}", desc, key, value)
252 }
253 KafkaError::ClientCreation(ref err) => write!(f, "Client creation error: {}", err),
254 KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", err),
255 KafkaError::Flush(err) => write!(f, "Flush error: {}", err),
256 KafkaError::Global(err) => write!(f, "Global error: {}", err),
257 KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err),
258 KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err),
259 KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err),
260 KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err),
261 KafkaError::NoMessageReceived => {
262 write!(f, "No message received within the given poll interval")
263 }
264 KafkaError::Nul(_) => write!(f, "FFI nul error"),
265 KafkaError::OAuthConfig(err) => write!(f, "OAuth config error: {}", err),
266 KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err),
267 KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n),
268 KafkaError::PauseResume(ref err) => write!(f, "Pause/resume error: {}", err),
269 KafkaError::Seek(ref err) => write!(f, "Seek error: {}", err),
270 KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err),
271 KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err),
272 KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err),
273 KafkaError::Transaction(err) => write!(f, "Transaction error: {}", err),
274 }
275 }
276}
277
278impl Error for KafkaError {
279 fn source(&self) -> Option<&(dyn Error + 'static)> {
280 match self {
281 KafkaError::AdminOp(_) => None,
282 KafkaError::AdminOpCreation(_) => None,
283 KafkaError::Canceled => None,
284 KafkaError::ClientConfig(..) => None,
285 KafkaError::ClientCreation(_) => None,
286 KafkaError::ConsumerCommit(err) => Some(err),
287 KafkaError::Flush(err) => Some(err),
288 KafkaError::Global(err) => Some(err),
289 KafkaError::GroupListFetch(err) => Some(err),
290 KafkaError::MessageConsumption(err) => Some(err),
291 KafkaError::MessageProduction(err) => Some(err),
292 KafkaError::MetadataFetch(err) => Some(err),
293 KafkaError::NoMessageReceived => None,
294 KafkaError::Nul(_) => None,
295 KafkaError::OAuthConfig(err) => Some(err),
296 KafkaError::OffsetFetch(err) => Some(err),
297 KafkaError::PartitionEOF(_) => None,
298 KafkaError::PauseResume(_) => None,
299 KafkaError::Seek(_) => None,
300 KafkaError::SetPartitionOffset(err) => Some(err),
301 KafkaError::StoreOffset(err) => Some(err),
302 KafkaError::Subscription(_) => None,
303 KafkaError::Transaction(err) => Some(err),
304 }
305 }
306}
307
308impl From<ffi::NulError> for KafkaError {
309 fn from(err: ffi::NulError) -> KafkaError {
310 KafkaError::Nul(err)
311 }
312}
313
314impl KafkaError {
315 #[allow(clippy::match_same_arms)]
317 pub fn rdkafka_error_code(&self) -> Option<RDKafkaErrorCode> {
318 match self {
319 KafkaError::AdminOp(_) => None,
320 KafkaError::AdminOpCreation(_) => None,
321 KafkaError::Canceled => None,
322 KafkaError::ClientConfig(..) => None,
323 KafkaError::ClientCreation(_) => None,
324 KafkaError::ConsumerCommit(err) => Some(*err),
325 KafkaError::Flush(err) => Some(*err),
326 KafkaError::Global(err) => Some(*err),
327 KafkaError::GroupListFetch(err) => Some(*err),
328 KafkaError::MessageConsumption(err) => Some(*err),
329 KafkaError::MessageProduction(err) => Some(*err),
330 KafkaError::MetadataFetch(err) => Some(*err),
331 KafkaError::NoMessageReceived => None,
332 KafkaError::Nul(_) => None,
333 KafkaError::OAuthConfig(err) => Some(err.code()),
334 KafkaError::OffsetFetch(err) => Some(*err),
335 KafkaError::PartitionEOF(_) => None,
336 KafkaError::PauseResume(_) => None,
337 KafkaError::Seek(_) => None,
338 KafkaError::SetPartitionOffset(err) => Some(*err),
339 KafkaError::StoreOffset(err) => Some(*err),
340 KafkaError::Subscription(_) => None,
341 KafkaError::Transaction(err) => Some(err.code()),
342 }
343 }
344}