rdkafka/consumer/mod.rs
1//! Kafka consumers.
2
3use std::ptr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use rdkafka_sys as rdsys;
8use rdkafka_sys::types::*;
9
10use crate::client::{Client, ClientContext, NativeClient};
11use crate::error::KafkaResult;
12use crate::groups::GroupList;
13use crate::log::{error, trace};
14use crate::message::BorrowedMessage;
15use crate::metadata::Metadata;
16use crate::topic_partition_list::{Offset, TopicPartitionList};
17use crate::util::{cstr_to_owned, KafkaDrop, NativePtr, Timeout};
18
19pub mod base_consumer;
20pub mod stream_consumer;
21
22// Re-exports.
23#[doc(inline)]
24pub use self::base_consumer::BaseConsumer;
25#[doc(inline)]
26pub use self::stream_consumer::{MessageStream, StreamConsumer};
27
28/// Rebalance information.
29#[derive(Clone, Debug)]
30pub enum Rebalance<'a> {
31 /// A new partition assignment is received.
32 Assign(&'a TopicPartitionList),
33 /// A new partition revocation is received.
34 Revoke(&'a TopicPartitionList),
35 /// Unexpected error from Kafka.
36 Error(String),
37}
38
39/// Consumer-specific context.
40///
41/// This user-defined object can be used to provide custom callbacks for
42/// consumer events. Refer to the list of methods to check which callbacks can
43/// be specified.
44///
45/// See also the [`ClientContext`] trait.
46pub trait ConsumerContext: ClientContext {
47 /// Implements the default rebalancing strategy and calls the
48 /// [`pre_rebalance`](ConsumerContext::pre_rebalance) and
49 /// [`post_rebalance`](ConsumerContext::post_rebalance) methods. If this
50 /// method is overridden, it will be responsibility of the user to call them
51 /// if needed.
52 fn rebalance(
53 &self,
54 native_client: &NativeClient,
55 err: RDKafkaRespErr,
56 tpl: &mut TopicPartitionList,
57 ) {
58 let rebalance = match err {
59 RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => Rebalance::Assign(tpl),
60 RDKafkaRespErr::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS => Rebalance::Revoke(tpl),
61 _ => {
62 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(err)) };
63 error!("Error rebalancing: {}", error);
64 Rebalance::Error(error)
65 }
66 };
67
68 trace!("Running pre-rebalance with {:?}", rebalance);
69 self.pre_rebalance(&rebalance);
70
71 trace!("Running rebalance with {:?}", rebalance);
72 // Execute rebalance
73 unsafe {
74 match err {
75 RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => {
76 match native_client.rebalance_protocol() {
77 RebalanceProtocol::Cooperative => {
78 rdsys::rd_kafka_incremental_assign(native_client.ptr(), tpl.ptr());
79 }
80 _ => {
81 rdsys::rd_kafka_assign(native_client.ptr(), tpl.ptr());
82 }
83 }
84 }
85 _ => match native_client.rebalance_protocol() {
86 RebalanceProtocol::Cooperative => {
87 rdsys::rd_kafka_incremental_unassign(native_client.ptr(), tpl.ptr());
88 }
89 _ => {
90 rdsys::rd_kafka_assign(native_client.ptr(), ptr::null());
91 }
92 },
93 }
94 }
95 trace!("Running post-rebalance with {:?}", rebalance);
96 self.post_rebalance(&rebalance);
97 }
98
99 /// Pre-rebalance callback. This method will run before the rebalance and
100 /// should terminate its execution quickly.
101 #[allow(unused_variables)]
102 fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
103
104 /// Post-rebalance callback. This method will run after the rebalance and
105 /// should terminate its execution quickly.
106 #[allow(unused_variables)]
107 fn post_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
108
109 // TODO: convert pointer to structure
110 /// Post commit callback. This method will run after a group of offsets was
111 /// committed to the offset store.
112 #[allow(unused_variables)]
113 fn commit_callback(&self, result: KafkaResult<()>, offsets: &TopicPartitionList) {}
114
115 /// Returns the minimum interval at which to poll the main queue, which
116 /// services the logging, stats, and error callbacks.
117 ///
118 /// The main queue is polled once whenever [`BaseConsumer::poll`] is called.
119 /// If `poll` is called with a timeout that is larger than this interval,
120 /// then the main queue will be polled at that interval while the consumer
121 /// queue is blocked.
122 ///
123 /// For example, if the main queue's minimum poll interval is 200ms and
124 /// `poll` is called with a timeout of 1s, then `poll` may block for up to
125 /// 1s waiting for a message, but it will poll the main queue every 200ms
126 /// while it is waiting.
127 ///
128 /// By default, the minimum poll interval for the main queue is 1s.
129 fn main_queue_min_poll_interval(&self) -> Timeout {
130 Timeout::After(Duration::from_secs(1))
131 }
132}
133
134/// An inert [`ConsumerContext`] that can be used when no customizations are
135/// needed.
136#[derive(Clone, Debug, Default)]
137pub struct DefaultConsumerContext;
138
139impl ClientContext for DefaultConsumerContext {}
140impl ConsumerContext for DefaultConsumerContext {}
141
142/// Specifies whether a commit should be performed synchronously or
143/// asynchronously.
144///
145/// A commit is performed via [`Consumer::commit`] or one of its variants.
146///
147/// Regardless of the `CommitMode`, the commit APIs enqueue the commit request
148/// in a local work queue. A separate worker thread picks up this commit request
149/// and forwards it to the Kafka broker over the network.
150///
151/// The difference between [`CommitMode::Sync`] and [`CommitMode::Async`] is in
152/// whether the caller waits for the Kafka broker to respond that it finished
153/// handling the commit request.
154///
155/// Note that the commit APIs are not async in the Rust sense due to the lack of
156/// a callback-based interface exposed by librdkafka. See
157/// [librdkafka#3212](https://github.com/edenhill/librdkafka/issues/3212).
158#[derive(Clone, Copy, Debug)]
159pub enum CommitMode {
160 /// In `Sync` mode, the caller blocks until the Kafka broker finishes
161 /// processing the commit request.
162 Sync = 0,
163
164 /// In `Async` mode, the caller enqueues the commit request in a local
165 /// work queue and returns immediately.
166 Async = 1,
167}
168
169/// Consumer group metadata.
170///
171/// For use with [`Producer::send_offsets_to_transaction`].
172///
173/// [`Producer::send_offsets_to_transaction`]: crate::producer::Producer::send_offsets_to_transaction
174pub struct ConsumerGroupMetadata(NativePtr<RDKafkaConsumerGroupMetadata>);
175
176impl ConsumerGroupMetadata {
177 pub(crate) fn ptr(&self) -> *const RDKafkaConsumerGroupMetadata {
178 self.0.ptr()
179 }
180}
181
182unsafe impl KafkaDrop for RDKafkaConsumerGroupMetadata {
183 const TYPE: &'static str = "consumer_group_metadata";
184 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_consumer_group_metadata_destroy;
185}
186
187unsafe impl Send for ConsumerGroupMetadata {}
188unsafe impl Sync for ConsumerGroupMetadata {}
189
190/// The rebalance protocol for a consumer.
191pub enum RebalanceProtocol {
192 /// The consumer has not (yet) joined a group.
193 None,
194 /// Eager rebalance protocol.
195 Eager,
196 /// Cooperative rebalance protocol.
197 Cooperative,
198}
199
200/// Common trait for all consumers.
201///
202/// # Note about object safety
203///
204/// Doing type erasure on consumers is expected to be rare (eg. `Box<dyn
205/// Consumer>`). Therefore, the API is optimised for the case where a concrete
206/// type is available. As a result, some methods are not available on trait
207/// objects, since they are generic.
208pub trait Consumer<C = DefaultConsumerContext>
209where
210 C: ConsumerContext + 'static,
211{
212 /// Returns the [`Client`] underlying this consumer.
213 fn client(&self) -> &Client<C>;
214
215 /// Returns a reference to the [`ConsumerContext`] used to create this
216 /// consumer.
217 fn context(&self) -> &Arc<C> {
218 self.client().context()
219 }
220
221 /// Returns the current consumer group metadata associated with the
222 /// consumer.
223 ///
224 /// If the consumer was not configured with a `group.id`, returns `None`.
225 /// For use with [`Producer::send_offsets_to_transaction`].
226 ///
227 /// [`Producer::send_offsets_to_transaction`]: crate::producer::Producer::send_offsets_to_transaction
228 fn group_metadata(&self) -> Option<ConsumerGroupMetadata>;
229
230 /// Subscribes the consumer to a list of topics.
231 fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>;
232
233 /// Unsubscribes the current subscription list.
234 fn unsubscribe(&self);
235
236 /// Manually assigns topics and partitions to the consumer. If used,
237 /// automatic consumer rebalance won't be activated.
238 fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>;
239
240 /// Seeks to `offset` for the specified `topic` and `partition`. After a
241 /// successful call to `seek`, the next poll of the consumer will return the
242 /// message with `offset`.
243 fn seek<T: Into<Timeout>>(
244 &self,
245 topic: &str,
246 partition: i32,
247 offset: Offset,
248 timeout: T,
249 ) -> KafkaResult<()>;
250
251 /// Commits the offset of the specified message. The commit can be sync
252 /// (blocking), or async. Notice that when a specific offset is committed,
253 /// all the previous offsets are considered committed as well. Use this
254 /// method only if you are processing messages in order.
255 ///
256 /// The highest committed offset is interpreted as the next message to be
257 /// consumed in the event that a consumer rehydrates its local state from
258 /// the Kafka broker (i.e. consumer server restart). This means that,
259 /// in general, the offset of your [`TopicPartitionList`] should equal
260 /// 1 plus the offset from your last consumed message.
261 fn commit(
262 &self,
263 topic_partition_list: &TopicPartitionList,
264 mode: CommitMode,
265 ) -> KafkaResult<()>;
266
267 /// Commits the current consumer state. Notice that if the consumer fails
268 /// after a message has been received, but before the message has been
269 /// processed by the user code, this might lead to data loss. Check the
270 /// "at-least-once delivery" section in the readme for more information.
271 fn commit_consumer_state(&self, mode: CommitMode) -> KafkaResult<()>;
272
273 /// Commit the provided message. Note that this will also automatically
274 /// commit every message with lower offset within the same partition.
275 ///
276 /// This method is exactly equivalent to invoking [`Consumer::commit`]
277 /// with a [`TopicPartitionList`] which copies the topic and partition
278 /// from the message and adds 1 to the offset of the message.
279 fn commit_message(&self, message: &BorrowedMessage<'_>, mode: CommitMode) -> KafkaResult<()>;
280
281 /// Stores offset to be used on the next (auto)commit. When
282 /// using this `enable.auto.offset.store` should be set to `false` in the
283 /// config.
284 fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()>;
285
286 /// Like [`Consumer::store_offset`], but the offset to store is derived from
287 /// the provided message.
288 fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>;
289
290 /// Store offsets to be used on the next (auto)commit. When using this
291 /// `enable.auto.offset.store` should be set to `false` in the config.
292 fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()>;
293
294 /// Returns the current topic subscription.
295 fn subscription(&self) -> KafkaResult<TopicPartitionList>;
296
297 /// Returns the current partition assignment.
298 fn assignment(&self) -> KafkaResult<TopicPartitionList>;
299
300 /// Retrieves the committed offsets for topics and partitions.
301 fn committed<T>(&self, timeout: T) -> KafkaResult<TopicPartitionList>
302 where
303 T: Into<Timeout>,
304 Self: Sized;
305
306 /// Retrieves the committed offsets for specified topics and partitions.
307 fn committed_offsets<T>(
308 &self,
309 tpl: TopicPartitionList,
310 timeout: T,
311 ) -> KafkaResult<TopicPartitionList>
312 where
313 T: Into<Timeout>;
314
315 /// Looks up the offsets for this consumer's partitions by timestamp.
316 fn offsets_for_timestamp<T>(
317 &self,
318 timestamp: i64,
319 timeout: T,
320 ) -> KafkaResult<TopicPartitionList>
321 where
322 T: Into<Timeout>,
323 Self: Sized;
324
325 /// Looks up the offsets for the specified partitions by timestamp.
326 fn offsets_for_times<T>(
327 &self,
328 timestamps: TopicPartitionList,
329 timeout: T,
330 ) -> KafkaResult<TopicPartitionList>
331 where
332 T: Into<Timeout>,
333 Self: Sized;
334
335 /// Retrieve current positions (offsets) for topics and partitions.
336 fn position(&self) -> KafkaResult<TopicPartitionList>;
337
338 /// Returns the metadata information for the specified topic, or for all
339 /// topics in the cluster if no topic is specified.
340 fn fetch_metadata<T>(&self, topic: Option<&str>, timeout: T) -> KafkaResult<Metadata>
341 where
342 T: Into<Timeout>,
343 Self: Sized;
344
345 /// Returns the low and high watermarks for a specific topic and partition.
346 fn fetch_watermarks<T>(
347 &self,
348 topic: &str,
349 partition: i32,
350 timeout: T,
351 ) -> KafkaResult<(i64, i64)>
352 where
353 T: Into<Timeout>,
354 Self: Sized;
355
356 /// Returns the group membership information for the given group. If no group is
357 /// specified, all groups will be returned.
358 fn fetch_group_list<T>(&self, group: Option<&str>, timeout: T) -> KafkaResult<GroupList>
359 where
360 T: Into<Timeout>,
361 Self: Sized;
362
363 /// Pauses consumption for the provided list of partitions.
364 fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()>;
365
366 /// Resumes consumption for the provided list of partitions.
367 fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>;
368
369 /// Reports the rebalance protocol in use.
370 fn rebalance_protocol(&self) -> RebalanceProtocol;
371}