rdkafka/consumer/mod.rs
1//! Kafka consumers.
2
3use std::ptr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use rdkafka_sys as rdsys;
8use rdkafka_sys::types::*;
9
10use crate::client::{Client, ClientContext, NativeClient};
11use crate::error::KafkaResult;
12use crate::groups::GroupList;
13use crate::log::{error, trace};
14use crate::message::BorrowedMessage;
15use crate::metadata::Metadata;
16use crate::topic_partition_list::{Offset, TopicPartitionList};
17use crate::util::{cstr_to_owned, KafkaDrop, NativePtr, Timeout};
18
19pub mod base_consumer;
20pub mod stream_consumer;
21
22// Re-exports.
23#[doc(inline)]
24pub use self::base_consumer::BaseConsumer;
25#[doc(inline)]
26pub use self::stream_consumer::{MessageStream, StreamConsumer};
27
28/// Rebalance information.
29#[derive(Clone, Debug)]
30pub enum Rebalance<'a> {
31    /// A new partition assignment is received.
32    Assign(&'a TopicPartitionList),
33    /// A new partition revocation is received.
34    Revoke(&'a TopicPartitionList),
35    /// Unexpected error from Kafka.
36    Error(String),
37}
38
39/// Consumer-specific context.
40///
41/// This user-defined object can be used to provide custom callbacks for
42/// consumer events. Refer to the list of methods to check which callbacks can
43/// be specified.
44///
45/// See also the [`ClientContext`] trait.
46pub trait ConsumerContext: ClientContext {
47    /// Implements the default rebalancing strategy and calls the
48    /// [`pre_rebalance`](ConsumerContext::pre_rebalance) and
49    /// [`post_rebalance`](ConsumerContext::post_rebalance) methods. If this
50    /// method is overridden, it will be responsibility of the user to call them
51    /// if needed.
52    fn rebalance(
53        &self,
54        native_client: &NativeClient,
55        err: RDKafkaRespErr,
56        tpl: &mut TopicPartitionList,
57    ) {
58        let rebalance = match err {
59            RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => Rebalance::Assign(tpl),
60            RDKafkaRespErr::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS => Rebalance::Revoke(tpl),
61            _ => {
62                let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(err)) };
63                error!("Error rebalancing: {}", error);
64                Rebalance::Error(error)
65            }
66        };
67
68        trace!("Running pre-rebalance with {:?}", rebalance);
69        self.pre_rebalance(&rebalance);
70
71        trace!("Running rebalance with {:?}", rebalance);
72        // Execute rebalance
73        unsafe {
74            match err {
75                RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => {
76                    match native_client.rebalance_protocol() {
77                        RebalanceProtocol::Cooperative => {
78                            rdsys::rd_kafka_incremental_assign(native_client.ptr(), tpl.ptr());
79                        }
80                        _ => {
81                            rdsys::rd_kafka_assign(native_client.ptr(), tpl.ptr());
82                        }
83                    }
84                }
85                _ => match native_client.rebalance_protocol() {
86                    RebalanceProtocol::Cooperative => {
87                        rdsys::rd_kafka_incremental_unassign(native_client.ptr(), tpl.ptr());
88                    }
89                    _ => {
90                        rdsys::rd_kafka_assign(native_client.ptr(), ptr::null());
91                    }
92                },
93            }
94        }
95        trace!("Running post-rebalance with {:?}", rebalance);
96        self.post_rebalance(&rebalance);
97    }
98
99    /// Pre-rebalance callback. This method will run before the rebalance and
100    /// should terminate its execution quickly.
101    #[allow(unused_variables)]
102    fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
103
104    /// Post-rebalance callback. This method will run after the rebalance and
105    /// should terminate its execution quickly.
106    #[allow(unused_variables)]
107    fn post_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
108
109    // TODO: convert pointer to structure
110    /// Post commit callback. This method will run after a group of offsets was
111    /// committed to the offset store.
112    #[allow(unused_variables)]
113    fn commit_callback(&self, result: KafkaResult<()>, offsets: &TopicPartitionList) {}
114
115    /// Returns the minimum interval at which to poll the main queue, which
116    /// services the logging, stats, and error callbacks.
117    ///
118    /// The main queue is polled once whenever [`BaseConsumer::poll`] is called.
119    /// If `poll` is called with a timeout that is larger than this interval,
120    /// then the main queue will be polled at that interval while the consumer
121    /// queue is blocked.
122    ///
123    /// For example, if the main queue's minimum poll interval is 200ms and
124    /// `poll` is called with a timeout of 1s, then `poll` may block for up to
125    /// 1s waiting for a message, but it will poll the main queue every 200ms
126    /// while it is waiting.
127    ///
128    /// By default, the minimum poll interval for the main queue is 1s.
129    fn main_queue_min_poll_interval(&self) -> Timeout {
130        Timeout::After(Duration::from_secs(1))
131    }
132}
133
134/// An inert [`ConsumerContext`] that can be used when no customizations are
135/// needed.
136#[derive(Clone, Debug, Default)]
137pub struct DefaultConsumerContext;
138
139impl ClientContext for DefaultConsumerContext {}
140impl ConsumerContext for DefaultConsumerContext {}
141
142/// Specifies whether a commit should be performed synchronously or
143/// asynchronously.
144///
145/// A commit is performed via [`Consumer::commit`] or one of its variants.
146///
147/// Regardless of the `CommitMode`, the commit APIs enqueue the commit request
148/// in a local work queue. A separate worker thread picks up this commit request
149/// and forwards it to the Kafka broker over the network.
150///
151/// The difference between [`CommitMode::Sync`] and [`CommitMode::Async`] is in
152/// whether the caller waits for the Kafka broker to respond that it finished
153/// handling the commit request.
154///
155/// Note that the commit APIs are not async in the Rust sense due to the lack of
156/// a callback-based interface exposed by librdkafka. See
157/// [librdkafka#3212](https://github.com/edenhill/librdkafka/issues/3212).
158#[derive(Clone, Copy, Debug)]
159pub enum CommitMode {
160    /// In `Sync` mode, the caller blocks until the Kafka broker finishes
161    /// processing the commit request.
162    Sync = 0,
163
164    /// In `Async` mode, the caller enqueues the commit request in a local
165    /// work queue and returns immediately.
166    Async = 1,
167}
168
169/// Consumer group metadata.
170///
171/// For use with [`Producer::send_offsets_to_transaction`].
172///
173/// [`Producer::send_offsets_to_transaction`]: crate::producer::Producer::send_offsets_to_transaction
174pub struct ConsumerGroupMetadata(NativePtr<RDKafkaConsumerGroupMetadata>);
175
176impl ConsumerGroupMetadata {
177    pub(crate) fn ptr(&self) -> *const RDKafkaConsumerGroupMetadata {
178        self.0.ptr()
179    }
180}
181
182unsafe impl KafkaDrop for RDKafkaConsumerGroupMetadata {
183    const TYPE: &'static str = "consumer_group_metadata";
184    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_consumer_group_metadata_destroy;
185}
186
187unsafe impl Send for ConsumerGroupMetadata {}
188unsafe impl Sync for ConsumerGroupMetadata {}
189
190/// The rebalance protocol for a consumer.
191pub enum RebalanceProtocol {
192    /// The consumer has not (yet) joined a group.
193    None,
194    /// Eager rebalance protocol.
195    Eager,
196    /// Cooperative rebalance protocol.
197    Cooperative,
198}
199
200/// Common trait for all consumers.
201///
202/// # Note about object safety
203///
204/// Doing type erasure on consumers is expected to be rare (eg. `Box<dyn
205/// Consumer>`). Therefore, the API is optimised for the case where a concrete
206/// type is available. As a result, some methods are not available on trait
207/// objects, since they are generic.
208pub trait Consumer<C = DefaultConsumerContext>
209where
210    C: ConsumerContext + 'static,
211{
212    /// Returns the [`Client`] underlying this consumer.
213    fn client(&self) -> &Client<C>;
214
215    /// Returns a reference to the [`ConsumerContext`] used to create this
216    /// consumer.
217    fn context(&self) -> &Arc<C> {
218        self.client().context()
219    }
220
221    /// Returns the current consumer group metadata associated with the
222    /// consumer.
223    ///
224    /// If the consumer was not configured with a `group.id`, returns `None`.
225    /// For use with [`Producer::send_offsets_to_transaction`].
226    ///
227    /// [`Producer::send_offsets_to_transaction`]: crate::producer::Producer::send_offsets_to_transaction
228    fn group_metadata(&self) -> Option<ConsumerGroupMetadata>;
229
230    /// Subscribes the consumer to a list of topics.
231    fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>;
232
233    /// Unsubscribes the current subscription list.
234    fn unsubscribe(&self);
235
236    /// Manually assigns topics and partitions to the consumer. If used,
237    /// automatic consumer rebalance won't be activated.
238    fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>;
239
240    /// Seeks to `offset` for the specified `topic` and `partition`. After a
241    /// successful call to `seek`, the next poll of the consumer will return the
242    /// message with `offset`.
243    fn seek<T: Into<Timeout>>(
244        &self,
245        topic: &str,
246        partition: i32,
247        offset: Offset,
248        timeout: T,
249    ) -> KafkaResult<()>;
250
251    /// Commits the offset of the specified message. The commit can be sync
252    /// (blocking), or async. Notice that when a specific offset is committed,
253    /// all the previous offsets are considered committed as well. Use this
254    /// method only if you are processing messages in order.
255    ///
256    /// The highest committed offset is interpreted as the next message to be
257    /// consumed in the event that a consumer rehydrates its local state from
258    /// the Kafka broker (i.e. consumer server restart). This means that,
259    /// in general, the offset of your [`TopicPartitionList`] should equal
260    /// 1 plus the offset from your last consumed message.
261    fn commit(
262        &self,
263        topic_partition_list: &TopicPartitionList,
264        mode: CommitMode,
265    ) -> KafkaResult<()>;
266
267    /// Commits the current consumer state. Notice that if the consumer fails
268    /// after a message has been received, but before the message has been
269    /// processed by the user code, this might lead to data loss. Check the
270    /// "at-least-once delivery" section in the readme for more information.
271    fn commit_consumer_state(&self, mode: CommitMode) -> KafkaResult<()>;
272
273    /// Commit the provided message. Note that this will also automatically
274    /// commit every message with lower offset within the same partition.
275    ///
276    /// This method is exactly equivalent to invoking [`Consumer::commit`]
277    /// with a [`TopicPartitionList`] which copies the topic and partition
278    /// from the message and adds 1 to the offset of the message.
279    fn commit_message(&self, message: &BorrowedMessage<'_>, mode: CommitMode) -> KafkaResult<()>;
280
281    /// Stores offset to be used on the next (auto)commit. When
282    /// using this `enable.auto.offset.store` should be set to `false` in the
283    /// config.
284    fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()>;
285
286    /// Like [`Consumer::store_offset`], but the offset to store is derived from
287    /// the provided message.
288    fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>;
289
290    /// Store offsets to be used on the next (auto)commit. When using this
291    /// `enable.auto.offset.store` should be set to `false` in the config.
292    fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()>;
293
294    /// Returns the current topic subscription.
295    fn subscription(&self) -> KafkaResult<TopicPartitionList>;
296
297    /// Returns the current partition assignment.
298    fn assignment(&self) -> KafkaResult<TopicPartitionList>;
299
300    /// Retrieves the committed offsets for topics and partitions.
301    fn committed<T>(&self, timeout: T) -> KafkaResult<TopicPartitionList>
302    where
303        T: Into<Timeout>,
304        Self: Sized;
305
306    /// Retrieves the committed offsets for specified topics and partitions.
307    fn committed_offsets<T>(
308        &self,
309        tpl: TopicPartitionList,
310        timeout: T,
311    ) -> KafkaResult<TopicPartitionList>
312    where
313        T: Into<Timeout>;
314
315    /// Looks up the offsets for this consumer's partitions by timestamp.
316    fn offsets_for_timestamp<T>(
317        &self,
318        timestamp: i64,
319        timeout: T,
320    ) -> KafkaResult<TopicPartitionList>
321    where
322        T: Into<Timeout>,
323        Self: Sized;
324
325    /// Looks up the offsets for the specified partitions by timestamp.
326    fn offsets_for_times<T>(
327        &self,
328        timestamps: TopicPartitionList,
329        timeout: T,
330    ) -> KafkaResult<TopicPartitionList>
331    where
332        T: Into<Timeout>,
333        Self: Sized;
334
335    /// Retrieve current positions (offsets) for topics and partitions.
336    fn position(&self) -> KafkaResult<TopicPartitionList>;
337
338    /// Returns the metadata information for the specified topic, or for all
339    /// topics in the cluster if no topic is specified.
340    fn fetch_metadata<T>(&self, topic: Option<&str>, timeout: T) -> KafkaResult<Metadata>
341    where
342        T: Into<Timeout>,
343        Self: Sized;
344
345    /// Returns the low and high watermarks for a specific topic and partition.
346    fn fetch_watermarks<T>(
347        &self,
348        topic: &str,
349        partition: i32,
350        timeout: T,
351    ) -> KafkaResult<(i64, i64)>
352    where
353        T: Into<Timeout>,
354        Self: Sized;
355
356    /// Returns the group membership information for the given group. If no group is
357    /// specified, all groups will be returned.
358    fn fetch_group_list<T>(&self, group: Option<&str>, timeout: T) -> KafkaResult<GroupList>
359    where
360        T: Into<Timeout>,
361        Self: Sized;
362
363    /// Pauses consumption for the provided list of partitions.
364    fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()>;
365
366    /// Resumes consumption for the provided list of partitions.
367    fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>;
368
369    /// Reports the rebalance protocol in use.
370    fn rebalance_protocol(&self) -> RebalanceProtocol;
371}