1use std::cmp;
4use std::ffi::CString;
5use std::mem::ManuallyDrop;
6use std::os::raw::c_void;
7use std::ptr;
8use std::sync::Arc;
9
10use rdkafka_sys as rdsys;
11use rdkafka_sys::types::*;
12
13use crate::client::{Client, NativeClient, NativeQueue};
14use crate::config::{
15 ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig,
16};
17use crate::consumer::{
18 CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
19 RebalanceProtocol,
20};
21use crate::error::{IsError, KafkaError, KafkaResult};
22use crate::groups::GroupList;
23use crate::message::{BorrowedMessage, Message};
24use crate::metadata::Metadata;
25use crate::topic_partition_list::{Offset, TopicPartitionList};
26use crate::util::{cstr_to_owned, NativePtr, Timeout};
27
28pub(crate) unsafe extern "C" fn native_commit_cb<C: ConsumerContext>(
29 _conf: *mut RDKafka,
30 err: RDKafkaRespErr,
31 offsets: *mut RDKafkaTopicPartitionList,
32 opaque_ptr: *mut c_void,
33) {
34 let context = &mut *(opaque_ptr as *mut C);
35 let commit_error = if err.is_error() {
36 Err(KafkaError::ConsumerCommit(err.into()))
37 } else {
38 Ok(())
39 };
40 if offsets.is_null() {
41 let tpl = TopicPartitionList::new();
42 context.commit_callback(commit_error, &tpl);
43 } else {
44 let tpl = ManuallyDrop::new(TopicPartitionList::from_ptr(offsets));
45 context.commit_callback(commit_error, &tpl);
46 }
47}
48
49unsafe extern "C" fn native_rebalance_cb<C: ConsumerContext>(
52 rk: *mut RDKafka,
53 err: RDKafkaRespErr,
54 native_tpl: *mut RDKafkaTopicPartitionList,
55 opaque_ptr: *mut c_void,
56) {
57 let context = &mut *(opaque_ptr as *mut C);
58 let native_client = ManuallyDrop::new(NativeClient::from_ptr(rk));
59 let mut tpl = ManuallyDrop::new(TopicPartitionList::from_ptr(native_tpl));
60 context.rebalance(&native_client, err, &mut tpl);
61}
62
63pub struct BaseConsumer<C = DefaultConsumerContext>
68where
69 C: ConsumerContext + 'static,
70{
71 client: Client<C>,
72 main_queue_min_poll_interval: Timeout,
73}
74
75impl FromClientConfig for BaseConsumer {
76 fn from_config(config: &ClientConfig) -> KafkaResult<BaseConsumer> {
77 BaseConsumer::from_config_and_context(config, DefaultConsumerContext)
78 }
79}
80
81impl<C: ConsumerContext> FromClientConfigAndContext<C> for BaseConsumer<C> {
83 fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<BaseConsumer<C>> {
84 BaseConsumer::new(config, config.create_native_config()?, context)
85 }
86}
87
88impl<C> BaseConsumer<C>
89where
90 C: ConsumerContext,
91{
92 pub(crate) fn new(
93 config: &ClientConfig,
94 native_config: NativeClientConfig,
95 context: C,
96 ) -> KafkaResult<BaseConsumer<C>> {
97 unsafe {
98 rdsys::rd_kafka_conf_set_rebalance_cb(
99 native_config.ptr(),
100 Some(native_rebalance_cb::<C>),
101 );
102 rdsys::rd_kafka_conf_set_offset_commit_cb(
103 native_config.ptr(),
104 Some(native_commit_cb::<C>),
105 );
106 }
107 let main_queue_min_poll_interval = context.main_queue_min_poll_interval();
108 let client = Client::new(
109 config,
110 native_config,
111 RDKafkaType::RD_KAFKA_CONSUMER,
112 context,
113 )?;
114 Ok(BaseConsumer {
115 client,
116 main_queue_min_poll_interval,
117 })
118 }
119
120 pub(crate) fn poll_raw(&self, mut timeout: Timeout) -> Option<NativePtr<RDKafkaMessage>> {
123 loop {
124 unsafe { rdsys::rd_kafka_poll(self.client.native_ptr(), 0) };
125 let op_timeout = cmp::min(timeout, self.main_queue_min_poll_interval);
126 let message_ptr = unsafe {
127 NativePtr::from_ptr(rdsys::rd_kafka_consumer_poll(
128 self.client.native_ptr(),
129 op_timeout.as_millis(),
130 ))
131 };
132 if let Some(message_ptr) = message_ptr {
133 break Some(message_ptr);
134 }
135 if op_timeout >= timeout {
136 break None;
137 }
138 timeout -= op_timeout;
139 }
140 }
141
142 pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage<'_>>> {
156 self.poll_raw(timeout.into())
157 .map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr, self) })
158 }
159
160 pub fn iter(&self) -> Iter<'_, C> {
201 Iter(self)
202 }
203
204 pub fn split_partition_queue(
227 self: &Arc<Self>,
228 topic: &str,
229 partition: i32,
230 ) -> Option<PartitionQueue<C>> {
231 let topic = match CString::new(topic) {
232 Ok(topic) => topic,
233 Err(_) => return None,
234 };
235 let queue = unsafe {
236 NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_partition(
237 self.client.native_ptr(),
238 topic.as_ptr(),
239 partition,
240 ))
241 };
242 queue.map(|queue| {
243 unsafe { rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut()) }
244 PartitionQueue::new(self.clone(), queue)
245 })
246 }
247}
248
249impl<C> Consumer<C> for BaseConsumer<C>
250where
251 C: ConsumerContext,
252{
253 fn client(&self) -> &Client<C> {
254 &self.client
255 }
256
257 fn group_metadata(&self) -> Option<ConsumerGroupMetadata> {
258 let ptr = unsafe {
259 NativePtr::from_ptr(rdsys::rd_kafka_consumer_group_metadata(
260 self.client.native_ptr(),
261 ))
262 }?;
263 Some(ConsumerGroupMetadata(ptr))
264 }
265
266 fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> {
267 let mut tpl = TopicPartitionList::new();
268 for topic in topics {
269 tpl.add_topic_unassigned(topic);
270 }
271 let ret_code = unsafe { rdsys::rd_kafka_subscribe(self.client.native_ptr(), tpl.ptr()) };
272 if ret_code.is_error() {
273 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
274 return Err(KafkaError::Subscription(error));
275 };
276 Ok(())
277 }
278
279 fn unsubscribe(&self) {
280 unsafe { rdsys::rd_kafka_unsubscribe(self.client.native_ptr()) };
281 }
282
283 fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
284 let ret_code =
285 unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), assignment.ptr()) };
286 if ret_code.is_error() {
287 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
288 return Err(KafkaError::Subscription(error));
289 };
290 Ok(())
291 }
292
293 fn seek<T: Into<Timeout>>(
294 &self,
295 topic: &str,
296 partition: i32,
297 offset: Offset,
298 timeout: T,
299 ) -> KafkaResult<()> {
300 let topic = self.client.native_topic(topic)?;
301 let ret_code = match offset.to_raw() {
302 Some(offset) => unsafe {
303 rdsys::rd_kafka_seek(topic.ptr(), partition, offset, timeout.into().as_millis())
304 },
305 None => return Err(KafkaError::Seek("Local: Unrepresentable offset".into())),
306 };
307 if ret_code.is_error() {
308 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
309 return Err(KafkaError::Seek(error));
310 };
311 Ok(())
312 }
313
314 fn commit(
315 &self,
316 topic_partition_list: &TopicPartitionList,
317 mode: CommitMode,
318 ) -> KafkaResult<()> {
319 let error = unsafe {
320 rdsys::rd_kafka_commit(
321 self.client.native_ptr(),
322 topic_partition_list.ptr(),
323 mode as i32,
324 )
325 };
326 if error.is_error() {
327 Err(KafkaError::ConsumerCommit(error.into()))
328 } else {
329 Ok(())
330 }
331 }
332
333 fn commit_consumer_state(&self, mode: CommitMode) -> KafkaResult<()> {
334 let error = unsafe {
335 rdsys::rd_kafka_commit(self.client.native_ptr(), ptr::null_mut(), mode as i32)
336 };
337 if error.is_error() {
338 Err(KafkaError::ConsumerCommit(error.into()))
339 } else {
340 Ok(())
341 }
342 }
343
344 fn commit_message(&self, message: &BorrowedMessage<'_>, mode: CommitMode) -> KafkaResult<()> {
345 let error = unsafe {
346 rdsys::rd_kafka_commit_message(self.client.native_ptr(), message.ptr(), mode as i32)
347 };
348 if error.is_error() {
349 Err(KafkaError::ConsumerCommit(error.into()))
350 } else {
351 Ok(())
352 }
353 }
354
355 fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> {
356 let topic = self.client.native_topic(topic)?;
357 let error = unsafe { rdsys::rd_kafka_offset_store(topic.ptr(), partition, offset) };
358 if error.is_error() {
359 Err(KafkaError::StoreOffset(error.into()))
360 } else {
361 Ok(())
362 }
363 }
364
365 fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
366 let error = unsafe {
367 rdsys::rd_kafka_offset_store(message.topic_ptr(), message.partition(), message.offset())
368 };
369 if error.is_error() {
370 Err(KafkaError::StoreOffset(error.into()))
371 } else {
372 Ok(())
373 }
374 }
375
376 fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()> {
377 let error = unsafe { rdsys::rd_kafka_offsets_store(self.client.native_ptr(), tpl.ptr()) };
378 if error.is_error() {
379 Err(KafkaError::StoreOffset(error.into()))
380 } else {
381 Ok(())
382 }
383 }
384
385 fn subscription(&self) -> KafkaResult<TopicPartitionList> {
386 let mut tpl_ptr = ptr::null_mut();
387 let error = unsafe { rdsys::rd_kafka_subscription(self.client.native_ptr(), &mut tpl_ptr) };
388
389 if error.is_error() {
390 Err(KafkaError::MetadataFetch(error.into()))
391 } else {
392 Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
393 }
394 }
395
396 fn assignment(&self) -> KafkaResult<TopicPartitionList> {
397 let mut tpl_ptr = ptr::null_mut();
398 let error = unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
399
400 if error.is_error() {
401 Err(KafkaError::MetadataFetch(error.into()))
402 } else {
403 Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
404 }
405 }
406
407 fn committed<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<TopicPartitionList> {
408 let mut tpl_ptr = ptr::null_mut();
409 let assignment_error =
410 unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
411 if assignment_error.is_error() {
412 return Err(KafkaError::MetadataFetch(assignment_error.into()));
413 }
414
415 self.committed_offsets(unsafe { TopicPartitionList::from_ptr(tpl_ptr) }, timeout)
416 }
417
418 fn committed_offsets<T: Into<Timeout>>(
419 &self,
420 tpl: TopicPartitionList,
421 timeout: T,
422 ) -> KafkaResult<TopicPartitionList> {
423 let committed_error = unsafe {
424 rdsys::rd_kafka_committed(
425 self.client.native_ptr(),
426 tpl.ptr(),
427 timeout.into().as_millis(),
428 )
429 };
430
431 if committed_error.is_error() {
432 Err(KafkaError::MetadataFetch(committed_error.into()))
433 } else {
434 Ok(tpl)
435 }
436 }
437
438 fn offsets_for_timestamp<T: Into<Timeout>>(
439 &self,
440 timestamp: i64,
441 timeout: T,
442 ) -> KafkaResult<TopicPartitionList> {
443 let mut tpl_ptr = ptr::null_mut();
444 let assignment_error =
445 unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
446 if assignment_error.is_error() {
447 return Err(KafkaError::MetadataFetch(assignment_error.into()));
448 }
449 let mut tpl = unsafe { TopicPartitionList::from_ptr(tpl_ptr) };
450
451 tpl.set_all_offsets(Offset::Offset(timestamp))?;
454
455 self.offsets_for_times(tpl, timeout)
456 }
457
458 fn offsets_for_times<T: Into<Timeout>>(
461 &self,
462 timestamps: TopicPartitionList,
463 timeout: T,
464 ) -> KafkaResult<TopicPartitionList> {
465 let offsets_for_times_error = unsafe {
468 rdsys::rd_kafka_offsets_for_times(
469 self.client.native_ptr(),
470 timestamps.ptr(),
471 timeout.into().as_millis(),
472 )
473 };
474
475 if offsets_for_times_error.is_error() {
476 Err(KafkaError::MetadataFetch(offsets_for_times_error.into()))
477 } else {
478 Ok(timestamps)
479 }
480 }
481
482 fn position(&self) -> KafkaResult<TopicPartitionList> {
483 let tpl = self.assignment()?;
484 let error = unsafe { rdsys::rd_kafka_position(self.client.native_ptr(), tpl.ptr()) };
485 if error.is_error() {
486 Err(KafkaError::MetadataFetch(error.into()))
487 } else {
488 Ok(tpl)
489 }
490 }
491
492 fn fetch_metadata<T: Into<Timeout>>(
493 &self,
494 topic: Option<&str>,
495 timeout: T,
496 ) -> KafkaResult<Metadata> {
497 self.client.fetch_metadata(topic, timeout)
498 }
499
500 fn fetch_watermarks<T: Into<Timeout>>(
501 &self,
502 topic: &str,
503 partition: i32,
504 timeout: T,
505 ) -> KafkaResult<(i64, i64)> {
506 self.client.fetch_watermarks(topic, partition, timeout)
507 }
508
509 fn fetch_group_list<T: Into<Timeout>>(
510 &self,
511 group: Option<&str>,
512 timeout: T,
513 ) -> KafkaResult<GroupList> {
514 self.client.fetch_group_list(group, timeout)
515 }
516
517 fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
518 let ret_code =
519 unsafe { rdsys::rd_kafka_pause_partitions(self.client.native_ptr(), partitions.ptr()) };
520 if ret_code.is_error() {
521 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
522 return Err(KafkaError::PauseResume(error));
523 };
524 Ok(())
525 }
526
527 fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
528 let ret_code = unsafe {
529 rdsys::rd_kafka_resume_partitions(self.client.native_ptr(), partitions.ptr())
530 };
531 if ret_code.is_error() {
532 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
533 return Err(KafkaError::PauseResume(error));
534 };
535 Ok(())
536 }
537
538 fn rebalance_protocol(&self) -> RebalanceProtocol {
539 self.client.native_client().rebalance_protocol()
540 }
541}
542
543pub struct Iter<'a, C>(&'a BaseConsumer<C>)
548where
549 C: ConsumerContext + 'static;
550
551impl<'a, C> Iterator for Iter<'a, C>
552where
553 C: ConsumerContext,
554{
555 type Item = KafkaResult<BorrowedMessage<'a>>;
556
557 fn next(&mut self) -> Option<Self::Item> {
558 loop {
559 if let Some(item) = self.0.poll(None) {
560 return Some(item);
561 }
562 }
563 }
564}
565
566impl<'a, C> IntoIterator for &'a BaseConsumer<C>
567where
568 C: ConsumerContext,
569{
570 type Item = KafkaResult<BorrowedMessage<'a>>;
571 type IntoIter = Iter<'a, C>;
572
573 fn into_iter(self) -> Self::IntoIter {
574 self.iter()
575 }
576}
577
578pub struct PartitionQueue<C>
580where
581 C: ConsumerContext + 'static,
582{
583 consumer: Arc<BaseConsumer<C>>,
584 queue: NativeQueue,
585 nonempty_callback: Option<Box<Box<dyn Fn() + Send + Sync>>>,
586}
587
588impl<C> PartitionQueue<C>
589where
590 C: ConsumerContext,
591{
592 pub(crate) fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
593 PartitionQueue {
594 consumer,
595 queue,
596 nonempty_callback: None,
597 }
598 }
599
600 pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage<'_>>> {
609 unsafe {
610 NativePtr::from_ptr(rdsys::rd_kafka_consume_queue(
611 self.queue.ptr(),
612 timeout.into().as_millis(),
613 ))
614 }
615 .map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr, &self.consumer) })
616 }
617
618 pub fn set_nonempty_callback<F>(&mut self, f: F)
621 where
622 F: Fn() + Send + Sync + 'static,
623 {
624 unsafe extern "C" fn native_message_queue_nonempty_cb(
630 _: *mut RDKafka,
631 opaque_ptr: *mut c_void,
632 ) {
633 let f = opaque_ptr as *const *const (dyn Fn() + Send + Sync);
634 (**f)();
635 }
636
637 let f: Box<Box<dyn Fn() + Send + Sync>> = Box::new(Box::new(f));
638 unsafe {
639 rdsys::rd_kafka_queue_cb_event_enable(
640 self.queue.ptr(),
641 Some(native_message_queue_nonempty_cb),
642 &*f as *const _ as *mut c_void,
643 )
644 }
645 self.nonempty_callback = Some(f);
646 }
647}
648
649impl<C> Drop for PartitionQueue<C>
650where
651 C: ConsumerContext,
652{
653 fn drop(&mut self) {
654 unsafe { rdsys::rd_kafka_queue_cb_event_enable(self.queue.ptr(), None, ptr::null_mut()) }
655 }
656}