rdkafka/consumer/
base_consumer.rs

1//! Low-level consumers.
2
3use std::cmp;
4use std::ffi::CString;
5use std::mem::ManuallyDrop;
6use std::os::raw::c_void;
7use std::ptr;
8use std::sync::Arc;
9
10use rdkafka_sys as rdsys;
11use rdkafka_sys::types::*;
12
13use crate::client::{Client, NativeClient, NativeQueue};
14use crate::config::{
15    ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig,
16};
17use crate::consumer::{
18    CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
19    RebalanceProtocol,
20};
21use crate::error::{IsError, KafkaError, KafkaResult};
22use crate::groups::GroupList;
23use crate::message::{BorrowedMessage, Message};
24use crate::metadata::Metadata;
25use crate::topic_partition_list::{Offset, TopicPartitionList};
26use crate::util::{cstr_to_owned, NativePtr, Timeout};
27
28pub(crate) unsafe extern "C" fn native_commit_cb<C: ConsumerContext>(
29    _conf: *mut RDKafka,
30    err: RDKafkaRespErr,
31    offsets: *mut RDKafkaTopicPartitionList,
32    opaque_ptr: *mut c_void,
33) {
34    let context = &mut *(opaque_ptr as *mut C);
35    let commit_error = if err.is_error() {
36        Err(KafkaError::ConsumerCommit(err.into()))
37    } else {
38        Ok(())
39    };
40    if offsets.is_null() {
41        let tpl = TopicPartitionList::new();
42        context.commit_callback(commit_error, &tpl);
43    } else {
44        let tpl = ManuallyDrop::new(TopicPartitionList::from_ptr(offsets));
45        context.commit_callback(commit_error, &tpl);
46    }
47}
48
49/// Native rebalance callback. This callback will run on every rebalance, and it will call the
50/// rebalance method defined in the current `Context`.
51unsafe extern "C" fn native_rebalance_cb<C: ConsumerContext>(
52    rk: *mut RDKafka,
53    err: RDKafkaRespErr,
54    native_tpl: *mut RDKafkaTopicPartitionList,
55    opaque_ptr: *mut c_void,
56) {
57    let context = &mut *(opaque_ptr as *mut C);
58    let native_client = ManuallyDrop::new(NativeClient::from_ptr(rk));
59    let mut tpl = ManuallyDrop::new(TopicPartitionList::from_ptr(native_tpl));
60    context.rebalance(&native_client, err, &mut tpl);
61}
62
63/// A low-level consumer that requires manual polling.
64///
65/// This consumer must be periodically polled to make progress on rebalancing,
66/// callbacks and to receive messages.
67pub struct BaseConsumer<C = DefaultConsumerContext>
68where
69    C: ConsumerContext + 'static,
70{
71    client: Client<C>,
72    main_queue_min_poll_interval: Timeout,
73}
74
75impl FromClientConfig for BaseConsumer {
76    fn from_config(config: &ClientConfig) -> KafkaResult<BaseConsumer> {
77        BaseConsumer::from_config_and_context(config, DefaultConsumerContext)
78    }
79}
80
81/// Creates a new `BaseConsumer` starting from a `ClientConfig`.
82impl<C: ConsumerContext> FromClientConfigAndContext<C> for BaseConsumer<C> {
83    fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<BaseConsumer<C>> {
84        BaseConsumer::new(config, config.create_native_config()?, context)
85    }
86}
87
88impl<C> BaseConsumer<C>
89where
90    C: ConsumerContext,
91{
92    pub(crate) fn new(
93        config: &ClientConfig,
94        native_config: NativeClientConfig,
95        context: C,
96    ) -> KafkaResult<BaseConsumer<C>> {
97        unsafe {
98            rdsys::rd_kafka_conf_set_rebalance_cb(
99                native_config.ptr(),
100                Some(native_rebalance_cb::<C>),
101            );
102            rdsys::rd_kafka_conf_set_offset_commit_cb(
103                native_config.ptr(),
104                Some(native_commit_cb::<C>),
105            );
106        }
107        let main_queue_min_poll_interval = context.main_queue_min_poll_interval();
108        let client = Client::new(
109            config,
110            native_config,
111            RDKafkaType::RD_KAFKA_CONSUMER,
112            context,
113        )?;
114        Ok(BaseConsumer {
115            client,
116            main_queue_min_poll_interval,
117        })
118    }
119
120    /// Polls the consumer for messages and returns a pointer to the native rdkafka-sys struct.
121    /// This method is for internal use only. Use poll instead.
122    pub(crate) fn poll_raw(&self, mut timeout: Timeout) -> Option<NativePtr<RDKafkaMessage>> {
123        loop {
124            unsafe { rdsys::rd_kafka_poll(self.client.native_ptr(), 0) };
125            let op_timeout = cmp::min(timeout, self.main_queue_min_poll_interval);
126            let message_ptr = unsafe {
127                NativePtr::from_ptr(rdsys::rd_kafka_consumer_poll(
128                    self.client.native_ptr(),
129                    op_timeout.as_millis(),
130                ))
131            };
132            if let Some(message_ptr) = message_ptr {
133                break Some(message_ptr);
134            }
135            if op_timeout >= timeout {
136                break None;
137            }
138            timeout -= op_timeout;
139        }
140    }
141
142    /// Polls the consumer for new messages.
143    ///
144    /// It won't block for more than the specified timeout. Use zero `Duration` for non-blocking
145    /// call. With no timeout it blocks until an event is received.
146    ///
147    /// This method should be called at regular intervals, even if no message is expected,
148    /// to serve any queued callbacks waiting to be called. This is especially important for
149    /// automatic consumer rebalance, as the rebalance function will be executed by the thread
150    /// calling the poll() function.
151    ///
152    /// # Lifetime
153    ///
154    /// The returned message lives in the memory of the consumer and cannot outlive it.
155    pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage<'_>>> {
156        self.poll_raw(timeout.into())
157            .map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr, self) })
158    }
159
160    /// Returns an iterator over the available messages.
161    ///
162    /// It repeatedly calls [`poll`](#method.poll) with no timeout.
163    ///
164    /// Note that it's also possible to iterate over the consumer directly.
165    ///
166    /// # Examples
167    ///
168    /// All these are equivalent and will receive messages without timing out.
169    ///
170    /// ```rust,no_run
171    /// # let consumer: rdkafka::consumer::BaseConsumer<_> = rdkafka::ClientConfig::new()
172    /// #    .create()
173    /// #    .unwrap();
174    /// #
175    /// loop {
176    ///   let message = consumer.poll(None);
177    ///   // Handle the message
178    /// }
179    /// ```
180    ///
181    /// ```rust,no_run
182    /// # let consumer: rdkafka::consumer::BaseConsumer<_> = rdkafka::ClientConfig::new()
183    /// #    .create()
184    /// #    .unwrap();
185    /// #
186    /// for message in consumer.iter() {
187    ///   // Handle the message
188    /// }
189    /// ```
190    ///
191    /// ```rust,no_run
192    /// # let consumer: rdkafka::consumer::BaseConsumer<_> = rdkafka::ClientConfig::new()
193    /// #    .create()
194    /// #    .unwrap();
195    /// #
196    /// for message in &consumer {
197    ///   // Handle the message
198    /// }
199    /// ```
200    pub fn iter(&self) -> Iter<'_, C> {
201        Iter(self)
202    }
203
204    /// Splits messages for the specified partition into their own queue.
205    ///
206    /// If the `topic` or `partition` is invalid, returns `None`.
207    ///
208    /// After calling this method, newly-fetched messages for the specified
209    /// partition will be returned via [`PartitionQueue::poll`] rather than
210    /// [`BaseConsumer::poll`]. Note that there may be buffered messages for the
211    /// specified partition that will continue to be returned by
212    /// `BaseConsumer::poll`. For best results, call `split_partition_queue`
213    /// before the first call to `BaseConsumer::poll`.
214    ///
215    /// You must continue to call `BaseConsumer::poll`, even if no messages are
216    /// expected, to serve callbacks.
217    ///
218    /// Note that calling [`Consumer::assign`] will deactivate any existing
219    /// partition queues. You will need to call this method for every partition
220    /// that should be split after every call to `assign`.
221    ///
222    /// Beware that this method is implemented for `&Arc<Self>`, not `&self`.
223    /// You will need to wrap your consumer in an `Arc` in order to call this
224    /// method. This design permits moving the partition queue to another thread
225    /// while ensuring the partition queue does not outlive the consumer.
226    pub fn split_partition_queue(
227        self: &Arc<Self>,
228        topic: &str,
229        partition: i32,
230    ) -> Option<PartitionQueue<C>> {
231        let topic = match CString::new(topic) {
232            Ok(topic) => topic,
233            Err(_) => return None,
234        };
235        let queue = unsafe {
236            NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_partition(
237                self.client.native_ptr(),
238                topic.as_ptr(),
239                partition,
240            ))
241        };
242        queue.map(|queue| {
243            unsafe { rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut()) }
244            PartitionQueue::new(self.clone(), queue)
245        })
246    }
247}
248
249impl<C> Consumer<C> for BaseConsumer<C>
250where
251    C: ConsumerContext,
252{
253    fn client(&self) -> &Client<C> {
254        &self.client
255    }
256
257    fn group_metadata(&self) -> Option<ConsumerGroupMetadata> {
258        let ptr = unsafe {
259            NativePtr::from_ptr(rdsys::rd_kafka_consumer_group_metadata(
260                self.client.native_ptr(),
261            ))
262        }?;
263        Some(ConsumerGroupMetadata(ptr))
264    }
265
266    fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> {
267        let mut tpl = TopicPartitionList::new();
268        for topic in topics {
269            tpl.add_topic_unassigned(topic);
270        }
271        let ret_code = unsafe { rdsys::rd_kafka_subscribe(self.client.native_ptr(), tpl.ptr()) };
272        if ret_code.is_error() {
273            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
274            return Err(KafkaError::Subscription(error));
275        };
276        Ok(())
277    }
278
279    fn unsubscribe(&self) {
280        unsafe { rdsys::rd_kafka_unsubscribe(self.client.native_ptr()) };
281    }
282
283    fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
284        let ret_code =
285            unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), assignment.ptr()) };
286        if ret_code.is_error() {
287            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
288            return Err(KafkaError::Subscription(error));
289        };
290        Ok(())
291    }
292
293    fn seek<T: Into<Timeout>>(
294        &self,
295        topic: &str,
296        partition: i32,
297        offset: Offset,
298        timeout: T,
299    ) -> KafkaResult<()> {
300        let topic = self.client.native_topic(topic)?;
301        let ret_code = match offset.to_raw() {
302            Some(offset) => unsafe {
303                rdsys::rd_kafka_seek(topic.ptr(), partition, offset, timeout.into().as_millis())
304            },
305            None => return Err(KafkaError::Seek("Local: Unrepresentable offset".into())),
306        };
307        if ret_code.is_error() {
308            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
309            return Err(KafkaError::Seek(error));
310        };
311        Ok(())
312    }
313
314    fn commit(
315        &self,
316        topic_partition_list: &TopicPartitionList,
317        mode: CommitMode,
318    ) -> KafkaResult<()> {
319        let error = unsafe {
320            rdsys::rd_kafka_commit(
321                self.client.native_ptr(),
322                topic_partition_list.ptr(),
323                mode as i32,
324            )
325        };
326        if error.is_error() {
327            Err(KafkaError::ConsumerCommit(error.into()))
328        } else {
329            Ok(())
330        }
331    }
332
333    fn commit_consumer_state(&self, mode: CommitMode) -> KafkaResult<()> {
334        let error = unsafe {
335            rdsys::rd_kafka_commit(self.client.native_ptr(), ptr::null_mut(), mode as i32)
336        };
337        if error.is_error() {
338            Err(KafkaError::ConsumerCommit(error.into()))
339        } else {
340            Ok(())
341        }
342    }
343
344    fn commit_message(&self, message: &BorrowedMessage<'_>, mode: CommitMode) -> KafkaResult<()> {
345        let error = unsafe {
346            rdsys::rd_kafka_commit_message(self.client.native_ptr(), message.ptr(), mode as i32)
347        };
348        if error.is_error() {
349            Err(KafkaError::ConsumerCommit(error.into()))
350        } else {
351            Ok(())
352        }
353    }
354
355    fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> {
356        let topic = self.client.native_topic(topic)?;
357        let error = unsafe { rdsys::rd_kafka_offset_store(topic.ptr(), partition, offset) };
358        if error.is_error() {
359            Err(KafkaError::StoreOffset(error.into()))
360        } else {
361            Ok(())
362        }
363    }
364
365    fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
366        let error = unsafe {
367            rdsys::rd_kafka_offset_store(message.topic_ptr(), message.partition(), message.offset())
368        };
369        if error.is_error() {
370            Err(KafkaError::StoreOffset(error.into()))
371        } else {
372            Ok(())
373        }
374    }
375
376    fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()> {
377        let error = unsafe { rdsys::rd_kafka_offsets_store(self.client.native_ptr(), tpl.ptr()) };
378        if error.is_error() {
379            Err(KafkaError::StoreOffset(error.into()))
380        } else {
381            Ok(())
382        }
383    }
384
385    fn subscription(&self) -> KafkaResult<TopicPartitionList> {
386        let mut tpl_ptr = ptr::null_mut();
387        let error = unsafe { rdsys::rd_kafka_subscription(self.client.native_ptr(), &mut tpl_ptr) };
388
389        if error.is_error() {
390            Err(KafkaError::MetadataFetch(error.into()))
391        } else {
392            Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
393        }
394    }
395
396    fn assignment(&self) -> KafkaResult<TopicPartitionList> {
397        let mut tpl_ptr = ptr::null_mut();
398        let error = unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
399
400        if error.is_error() {
401            Err(KafkaError::MetadataFetch(error.into()))
402        } else {
403            Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
404        }
405    }
406
407    fn committed<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<TopicPartitionList> {
408        let mut tpl_ptr = ptr::null_mut();
409        let assignment_error =
410            unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
411        if assignment_error.is_error() {
412            return Err(KafkaError::MetadataFetch(assignment_error.into()));
413        }
414
415        self.committed_offsets(unsafe { TopicPartitionList::from_ptr(tpl_ptr) }, timeout)
416    }
417
418    fn committed_offsets<T: Into<Timeout>>(
419        &self,
420        tpl: TopicPartitionList,
421        timeout: T,
422    ) -> KafkaResult<TopicPartitionList> {
423        let committed_error = unsafe {
424            rdsys::rd_kafka_committed(
425                self.client.native_ptr(),
426                tpl.ptr(),
427                timeout.into().as_millis(),
428            )
429        };
430
431        if committed_error.is_error() {
432            Err(KafkaError::MetadataFetch(committed_error.into()))
433        } else {
434            Ok(tpl)
435        }
436    }
437
438    fn offsets_for_timestamp<T: Into<Timeout>>(
439        &self,
440        timestamp: i64,
441        timeout: T,
442    ) -> KafkaResult<TopicPartitionList> {
443        let mut tpl_ptr = ptr::null_mut();
444        let assignment_error =
445            unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
446        if assignment_error.is_error() {
447            return Err(KafkaError::MetadataFetch(assignment_error.into()));
448        }
449        let mut tpl = unsafe { TopicPartitionList::from_ptr(tpl_ptr) };
450
451        // Set the timestamp we want in the offset field for every partition as
452        // librdkafka expects.
453        tpl.set_all_offsets(Offset::Offset(timestamp))?;
454
455        self.offsets_for_times(tpl, timeout)
456    }
457
458    // `timestamps` is a `TopicPartitionList` with timestamps instead of
459    // offsets.
460    fn offsets_for_times<T: Into<Timeout>>(
461        &self,
462        timestamps: TopicPartitionList,
463        timeout: T,
464    ) -> KafkaResult<TopicPartitionList> {
465        // This call will then put the offset in the offset field of this topic
466        // partition list.
467        let offsets_for_times_error = unsafe {
468            rdsys::rd_kafka_offsets_for_times(
469                self.client.native_ptr(),
470                timestamps.ptr(),
471                timeout.into().as_millis(),
472            )
473        };
474
475        if offsets_for_times_error.is_error() {
476            Err(KafkaError::MetadataFetch(offsets_for_times_error.into()))
477        } else {
478            Ok(timestamps)
479        }
480    }
481
482    fn position(&self) -> KafkaResult<TopicPartitionList> {
483        let tpl = self.assignment()?;
484        let error = unsafe { rdsys::rd_kafka_position(self.client.native_ptr(), tpl.ptr()) };
485        if error.is_error() {
486            Err(KafkaError::MetadataFetch(error.into()))
487        } else {
488            Ok(tpl)
489        }
490    }
491
492    fn fetch_metadata<T: Into<Timeout>>(
493        &self,
494        topic: Option<&str>,
495        timeout: T,
496    ) -> KafkaResult<Metadata> {
497        self.client.fetch_metadata(topic, timeout)
498    }
499
500    fn fetch_watermarks<T: Into<Timeout>>(
501        &self,
502        topic: &str,
503        partition: i32,
504        timeout: T,
505    ) -> KafkaResult<(i64, i64)> {
506        self.client.fetch_watermarks(topic, partition, timeout)
507    }
508
509    fn fetch_group_list<T: Into<Timeout>>(
510        &self,
511        group: Option<&str>,
512        timeout: T,
513    ) -> KafkaResult<GroupList> {
514        self.client.fetch_group_list(group, timeout)
515    }
516
517    fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
518        let ret_code =
519            unsafe { rdsys::rd_kafka_pause_partitions(self.client.native_ptr(), partitions.ptr()) };
520        if ret_code.is_error() {
521            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
522            return Err(KafkaError::PauseResume(error));
523        };
524        Ok(())
525    }
526
527    fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
528        let ret_code = unsafe {
529            rdsys::rd_kafka_resume_partitions(self.client.native_ptr(), partitions.ptr())
530        };
531        if ret_code.is_error() {
532            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
533            return Err(KafkaError::PauseResume(error));
534        };
535        Ok(())
536    }
537
538    fn rebalance_protocol(&self) -> RebalanceProtocol {
539        self.client.native_client().rebalance_protocol()
540    }
541}
542
543/// A convenience iterator over the messages in a [`BaseConsumer`].
544///
545/// Each call to [`Iter::next`] simply calls [`BaseConsumer::poll`] with an
546/// infinite timeout.
547pub struct Iter<'a, C>(&'a BaseConsumer<C>)
548where
549    C: ConsumerContext + 'static;
550
551impl<'a, C> Iterator for Iter<'a, C>
552where
553    C: ConsumerContext,
554{
555    type Item = KafkaResult<BorrowedMessage<'a>>;
556
557    fn next(&mut self) -> Option<Self::Item> {
558        loop {
559            if let Some(item) = self.0.poll(None) {
560                return Some(item);
561            }
562        }
563    }
564}
565
566impl<'a, C> IntoIterator for &'a BaseConsumer<C>
567where
568    C: ConsumerContext,
569{
570    type Item = KafkaResult<BorrowedMessage<'a>>;
571    type IntoIter = Iter<'a, C>;
572
573    fn into_iter(self) -> Self::IntoIter {
574        self.iter()
575    }
576}
577
578/// A message queue for a single partition.
579pub struct PartitionQueue<C>
580where
581    C: ConsumerContext + 'static,
582{
583    consumer: Arc<BaseConsumer<C>>,
584    queue: NativeQueue,
585    nonempty_callback: Option<Box<Box<dyn Fn() + Send + Sync>>>,
586}
587
588impl<C> PartitionQueue<C>
589where
590    C: ConsumerContext,
591{
592    pub(crate) fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
593        PartitionQueue {
594            consumer,
595            queue,
596            nonempty_callback: None,
597        }
598    }
599
600    /// Polls the partition for new messages.
601    ///
602    /// The `timeout` parameter controls how long to block if no messages are
603    /// available.
604    ///
605    /// Remember that you must also call [`BaseConsumer::poll`] on the
606    /// associated consumer regularly, even if no messages are expected, to
607    /// serve callbacks.
608    pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage<'_>>> {
609        unsafe {
610            NativePtr::from_ptr(rdsys::rd_kafka_consume_queue(
611                self.queue.ptr(),
612                timeout.into().as_millis(),
613            ))
614        }
615        .map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr, &self.consumer) })
616    }
617
618    /// Sets a callback that will be invoked whenever the queue becomes
619    /// nonempty.
620    pub fn set_nonempty_callback<F>(&mut self, f: F)
621    where
622        F: Fn() + Send + Sync + 'static,
623    {
624        // SAFETY: we keep `F` alive until the next call to
625        // `rd_kafka_queue_cb_event_enable`. That might be the next call to
626        // `set_nonempty_callback` or it might be when the queue is dropped. The
627        // double indirection is required because `&dyn Fn` is a fat pointer.
628
629        unsafe extern "C" fn native_message_queue_nonempty_cb(
630            _: *mut RDKafka,
631            opaque_ptr: *mut c_void,
632        ) {
633            let f = opaque_ptr as *const *const (dyn Fn() + Send + Sync);
634            (**f)();
635        }
636
637        let f: Box<Box<dyn Fn() + Send + Sync>> = Box::new(Box::new(f));
638        unsafe {
639            rdsys::rd_kafka_queue_cb_event_enable(
640                self.queue.ptr(),
641                Some(native_message_queue_nonempty_cb),
642                &*f as *const _ as *mut c_void,
643            )
644        }
645        self.nonempty_callback = Some(f);
646    }
647}
648
649impl<C> Drop for PartitionQueue<C>
650where
651    C: ConsumerContext,
652{
653    fn drop(&mut self) {
654        unsafe { rdsys::rd_kafka_queue_cb_event_enable(self.queue.ptr(), None, ptr::null_mut()) }
655    }
656}