rdkafka/consumer/
stream_consumer.rs

1//! High-level consumers with a [`Stream`](futures_util::Stream) interface.
2
3use std::ffi::CString;
4use std::marker::PhantomData;
5use std::os::raw::c_void;
6use std::pin::Pin;
7use std::ptr;
8use std::sync::{Arc, Mutex};
9use std::task::{Context, Poll, Waker};
10use std::time::Duration;
11
12use crate::log::trace;
13use futures_channel::oneshot;
14use futures_util::future::{self, Either, FutureExt};
15use futures_util::pin_mut;
16use futures_util::stream::{Stream, StreamExt};
17use slab::Slab;
18
19use rdkafka_sys as rdsys;
20use rdkafka_sys::types::*;
21
22use crate::client::{Client, NativeQueue};
23use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
24use crate::consumer::base_consumer::BaseConsumer;
25use crate::consumer::{
26    CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
27    RebalanceProtocol,
28};
29use crate::error::{KafkaError, KafkaResult};
30use crate::groups::GroupList;
31use crate::message::BorrowedMessage;
32use crate::metadata::Metadata;
33use crate::topic_partition_list::{Offset, TopicPartitionList};
34use crate::util::{AsyncRuntime, DefaultRuntime, NativePtr, Timeout};
35
36unsafe extern "C" fn native_message_queue_nonempty_cb(_: *mut RDKafka, opaque_ptr: *mut c_void) {
37    let wakers = &*(opaque_ptr as *const WakerSlab);
38    wakers.wake_all();
39}
40
41unsafe fn enable_nonempty_callback(queue: &NativeQueue, wakers: &Arc<WakerSlab>) {
42    rdsys::rd_kafka_queue_cb_event_enable(
43        queue.ptr(),
44        Some(native_message_queue_nonempty_cb),
45        Arc::as_ptr(wakers) as *mut c_void,
46    )
47}
48
49unsafe fn disable_nonempty_callback(queue: &NativeQueue) {
50    rdsys::rd_kafka_queue_cb_event_enable(queue.ptr(), None, ptr::null_mut())
51}
52
53struct WakerSlab {
54    wakers: Mutex<Slab<Option<Waker>>>,
55}
56
57impl WakerSlab {
58    fn new() -> WakerSlab {
59        WakerSlab {
60            wakers: Mutex::new(Slab::new()),
61        }
62    }
63
64    fn wake_all(&self) {
65        let mut wakers = self.wakers.lock().unwrap();
66        for (_, waker) in wakers.iter_mut() {
67            if let Some(waker) = waker.take() {
68                waker.wake();
69            }
70        }
71    }
72
73    fn register(&self) -> usize {
74        let mut wakers = self.wakers.lock().expect("lock poisoned");
75        wakers.insert(None)
76    }
77
78    fn unregister(&self, slot: usize) {
79        let mut wakers = self.wakers.lock().expect("lock poisoned");
80        wakers.remove(slot);
81    }
82
83    fn set_waker(&self, slot: usize, waker: Waker) {
84        let mut wakers = self.wakers.lock().expect("lock poisoned");
85        wakers[slot] = Some(waker);
86    }
87}
88
89/// A stream of messages from a [`StreamConsumer`].
90///
91/// See the documentation of [`StreamConsumer::stream`] for details.
92pub struct MessageStream<'a> {
93    wakers: &'a WakerSlab,
94    queue: &'a NativeQueue,
95    slot: usize,
96}
97
98impl<'a> MessageStream<'a> {
99    fn new(wakers: &'a WakerSlab, queue: &'a NativeQueue) -> MessageStream<'a> {
100        let slot = wakers.register();
101        MessageStream {
102            wakers,
103            queue,
104            slot,
105        }
106    }
107
108    fn poll(&self) -> Option<KafkaResult<BorrowedMessage<'a>>> {
109        unsafe {
110            NativePtr::from_ptr(rdsys::rd_kafka_consume_queue(self.queue.ptr(), 0))
111                .map(|p| BorrowedMessage::from_consumer(p, self.queue))
112        }
113    }
114}
115
116impl<'a> Stream for MessageStream<'a> {
117    type Item = KafkaResult<BorrowedMessage<'a>>;
118
119    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
120        // If there is a message ready, yield it immediately to avoid the
121        // taking the lock in `self.set_waker`.
122        if let Some(message) = self.poll() {
123            return Poll::Ready(Some(message));
124        }
125
126        // Otherwise, we need to wait for a message to become available. Store
127        // the waker so that we are woken up if the queue flips from non-empty
128        // to empty. We have to store the waker repatedly in case this future
129        // migrates between tasks.
130        self.wakers.set_waker(self.slot, cx.waker().clone());
131
132        // Check whether a new message became available after we installed the
133        // waker. This avoids a race where `poll` returns None to indicate that
134        // the queue is empty, but the queue becomes non-empty before we've
135        // installed the waker.
136        match self.poll() {
137            None => Poll::Pending,
138            Some(message) => Poll::Ready(Some(message)),
139        }
140    }
141}
142
143impl<'a> Drop for MessageStream<'a> {
144    fn drop(&mut self) {
145        self.wakers.unregister(self.slot);
146    }
147}
148
149/// A high-level consumer with a [`Stream`](futures_util::Stream) interface.
150///
151/// This consumer doesn't need to be polled explicitly. Extracting an item from
152/// the stream returned by the [`stream`](StreamConsumer::stream) will
153/// implicitly poll the underlying Kafka consumer.
154///
155/// If you activate the consumer group protocol by calling
156/// [`subscribe`](Consumer::subscribe), the stream consumer will integrate with
157/// librdkafka's liveness detection as described in [KIP-62]. You must be sure
158/// that you attempt to extract a message from the stream consumer at least
159/// every `max.poll.interval.ms` milliseconds, or librdkafka will assume that
160/// the processing thread is wedged and leave the consumer groups.
161///
162/// [KIP-62]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
163#[must_use = "Consumer polling thread will stop immediately if unused"]
164pub struct StreamConsumer<C = DefaultConsumerContext, R = DefaultRuntime>
165where
166    C: ConsumerContext + 'static,
167{
168    base: BaseConsumer<C>,
169    wakers: Arc<WakerSlab>,
170    queue: NativeQueue,
171    _shutdown_trigger: oneshot::Sender<()>,
172    _runtime: PhantomData<R>,
173}
174
175impl<R> FromClientConfig for StreamConsumer<DefaultConsumerContext, R>
176where
177    R: AsyncRuntime,
178{
179    fn from_config(config: &ClientConfig) -> KafkaResult<Self> {
180        StreamConsumer::from_config_and_context(config, DefaultConsumerContext)
181    }
182}
183
184/// Creates a new `StreamConsumer` starting from a [`ClientConfig`].
185impl<C, R> FromClientConfigAndContext<C> for StreamConsumer<C, R>
186where
187    C: ConsumerContext + 'static,
188    R: AsyncRuntime,
189{
190    fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<Self> {
191        let native_config = config.create_native_config()?;
192        let poll_interval = {
193            let millis: u64 = native_config
194                .get("max.poll.interval.ms")?
195                .parse()
196                .expect("librdkafka validated config value is valid u64");
197            Duration::from_millis(millis)
198        };
199
200        let base = BaseConsumer::new(config, native_config, context)?;
201        let native_ptr = base.client().native_ptr() as usize;
202
203        // Redirect rdkafka's main queue to the consumer queue so that we only
204        // need to listen to the consumer queue to observe events like
205        // rebalancings and stats.
206        unsafe { rdsys::rd_kafka_poll_set_consumer(base.client().native_ptr()) };
207
208        let queue = base.client().consumer_queue().ok_or_else(|| {
209            KafkaError::ClientCreation("librdkafka failed to create consumer queue".into())
210        })?;
211        let wakers = Arc::new(WakerSlab::new());
212        unsafe { enable_nonempty_callback(&queue, &wakers) }
213
214        // We need to make sure we poll the consumer at least once every max
215        // poll interval, *unless* the processing task has wedged. To accomplish
216        // this, we launch a background task that sends spurious wakeup
217        // notifications at half the max poll interval. An unwedged processing
218        // task will wake up and poll the consumer with plenty of time to spare,
219        // while a wedged processing task will not.
220        //
221        // The default max poll interval is 5m, so there is essentially no
222        // performance impact to these spurious wakeups.
223        let (shutdown_trigger, shutdown_tripwire) = oneshot::channel();
224        let mut shutdown_tripwire = shutdown_tripwire.fuse();
225        R::spawn({
226            let wakers = wakers.clone();
227            async move {
228                trace!("Starting stream consumer wake loop: 0x{:x}", native_ptr);
229                loop {
230                    let delay = R::delay_for(poll_interval / 2).fuse();
231                    pin_mut!(delay);
232                    match future::select(&mut delay, &mut shutdown_tripwire).await {
233                        Either::Left(_) => wakers.wake_all(),
234                        Either::Right(_) => break,
235                    }
236                }
237                trace!("Shut down stream consumer wake loop: 0x{:x}", native_ptr);
238            }
239        });
240
241        Ok(StreamConsumer {
242            base,
243            wakers,
244            queue,
245            _shutdown_trigger: shutdown_trigger,
246            _runtime: PhantomData,
247        })
248    }
249}
250
251impl<C, R> StreamConsumer<C, R>
252where
253    C: ConsumerContext + 'static,
254{
255    /// Constructs a stream that yields messages from this consumer.
256    ///
257    /// It is legal to have multiple live message streams for the same consumer,
258    /// and to move those message streams across threads. Note, however, that
259    /// the message streams share the same underlying state. A message received
260    /// by the consumer will be delivered to only one of the live message
261    /// streams. If you seek the underlying consumer, all message streams
262    /// created from the consumer will begin to draw messages from the new
263    /// position of the consumer.
264    ///
265    /// If you want multiple independent views of a Kafka topic, create multiple
266    /// consumers, not multiple message streams.
267    pub fn stream(&self) -> MessageStream<'_> {
268        MessageStream::new(&self.wakers, &self.queue)
269    }
270
271    /// Receives the next message from the stream.
272    ///
273    /// This method will block until the next message is available or an error
274    /// occurs. It is legal to call `recv` from multiple threads simultaneously.
275    ///
276    /// This method is [cancellation safe].
277    ///
278    /// Note that this method is exactly as efficient as constructing a
279    /// single-use message stream and extracting one message from it:
280    ///
281    /// ```
282    /// use futures::stream::StreamExt;
283    /// # use rdkafka::consumer::StreamConsumer;
284    ///
285    /// # async fn example(consumer: StreamConsumer) {
286    /// consumer.stream().next().await.expect("MessageStream never returns None");
287    /// # }
288    /// ```
289    ///
290    /// [cancellation safe]: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
291    pub async fn recv(&self) -> Result<BorrowedMessage<'_>, KafkaError> {
292        self.stream()
293            .next()
294            .await
295            .expect("kafka streams never terminate")
296    }
297
298    /// Splits messages for the specified partition into their own stream.
299    ///
300    /// If the `topic` or `partition` is invalid, returns `None`.
301    ///
302    /// After calling this method, newly-fetched messages for the specified
303    /// partition will be returned via [`StreamPartitionQueue::recv`] rather
304    /// than [`StreamConsumer::recv`]. Note that there may be buffered messages
305    /// for the specified partition that will continue to be returned by
306    /// `StreamConsumer::recv`. For best results, call `split_partition_queue`
307    /// before the first call to
308    /// `StreamConsumer::recv`.
309    ///
310    /// You must periodically await `StreamConsumer::recv`, even if no messages
311    /// are expected, to serve callbacks. Consider using a background task like:
312    ///
313    /// ```
314    /// # use rdkafka::consumer::StreamConsumer;
315    /// # use tokio::task::JoinHandle;
316    /// # async fn example(stream_consumer: StreamConsumer) -> JoinHandle<()> {
317    /// tokio::spawn(async move {
318    ///     let message = stream_consumer.recv().await;
319    ///     panic!("main stream consumer queue unexpectedly received message: {:?}", message);
320    /// })
321    /// # }
322    /// ```
323    ///
324    /// Note that calling [`Consumer::assign`] will deactivate any existing
325    /// partition queues. You will need to call this method for every partition
326    /// that should be split after every call to `assign`.
327    ///
328    /// Beware that this method is implemented for `&Arc<Self>`, not `&self`.
329    /// You will need to wrap your consumer in an `Arc` in order to call this
330    /// method. This design permits moving the partition queue to another thread
331    /// while ensuring the partition queue does not outlive the consumer.
332    pub fn split_partition_queue(
333        self: &Arc<Self>,
334        topic: &str,
335        partition: i32,
336    ) -> Option<StreamPartitionQueue<C, R>> {
337        let topic = match CString::new(topic) {
338            Ok(topic) => topic,
339            Err(_) => return None,
340        };
341        let queue = unsafe {
342            NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_partition(
343                self.base.client().native_ptr(),
344                topic.as_ptr(),
345                partition,
346            ))
347        };
348        queue.map(|queue| {
349            let wakers = Arc::new(WakerSlab::new());
350            unsafe {
351                rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut());
352                enable_nonempty_callback(&queue, &wakers);
353            }
354            StreamPartitionQueue {
355                queue,
356                wakers,
357                _consumer: self.clone(),
358            }
359        })
360    }
361}
362
363impl<C, R> Consumer<C> for StreamConsumer<C, R>
364where
365    C: ConsumerContext,
366{
367    fn client(&self) -> &Client<C> {
368        self.base.client()
369    }
370
371    fn group_metadata(&self) -> Option<ConsumerGroupMetadata> {
372        self.base.group_metadata()
373    }
374
375    fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> {
376        self.base.subscribe(topics)
377    }
378
379    fn unsubscribe(&self) {
380        self.base.unsubscribe();
381    }
382
383    fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
384        self.base.assign(assignment)
385    }
386
387    fn seek<T: Into<Timeout>>(
388        &self,
389        topic: &str,
390        partition: i32,
391        offset: Offset,
392        timeout: T,
393    ) -> KafkaResult<()> {
394        self.base.seek(topic, partition, offset, timeout)
395    }
396
397    fn commit(
398        &self,
399        topic_partition_list: &TopicPartitionList,
400        mode: CommitMode,
401    ) -> KafkaResult<()> {
402        self.base.commit(topic_partition_list, mode)
403    }
404
405    fn commit_consumer_state(&self, mode: CommitMode) -> KafkaResult<()> {
406        self.base.commit_consumer_state(mode)
407    }
408
409    fn commit_message(&self, message: &BorrowedMessage<'_>, mode: CommitMode) -> KafkaResult<()> {
410        self.base.commit_message(message, mode)
411    }
412
413    fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> {
414        self.base.store_offset(topic, partition, offset)
415    }
416
417    fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
418        self.base.store_offset_from_message(message)
419    }
420
421    fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()> {
422        self.base.store_offsets(tpl)
423    }
424
425    fn subscription(&self) -> KafkaResult<TopicPartitionList> {
426        self.base.subscription()
427    }
428
429    fn assignment(&self) -> KafkaResult<TopicPartitionList> {
430        self.base.assignment()
431    }
432
433    fn committed<T>(&self, timeout: T) -> KafkaResult<TopicPartitionList>
434    where
435        T: Into<Timeout>,
436        Self: Sized,
437    {
438        self.base.committed(timeout)
439    }
440
441    fn committed_offsets<T>(
442        &self,
443        tpl: TopicPartitionList,
444        timeout: T,
445    ) -> KafkaResult<TopicPartitionList>
446    where
447        T: Into<Timeout>,
448    {
449        self.base.committed_offsets(tpl, timeout)
450    }
451
452    fn offsets_for_timestamp<T>(
453        &self,
454        timestamp: i64,
455        timeout: T,
456    ) -> KafkaResult<TopicPartitionList>
457    where
458        T: Into<Timeout>,
459        Self: Sized,
460    {
461        self.base.offsets_for_timestamp(timestamp, timeout)
462    }
463
464    fn offsets_for_times<T>(
465        &self,
466        timestamps: TopicPartitionList,
467        timeout: T,
468    ) -> KafkaResult<TopicPartitionList>
469    where
470        T: Into<Timeout>,
471        Self: Sized,
472    {
473        self.base.offsets_for_times(timestamps, timeout)
474    }
475
476    fn position(&self) -> KafkaResult<TopicPartitionList> {
477        self.base.position()
478    }
479
480    fn fetch_metadata<T>(&self, topic: Option<&str>, timeout: T) -> KafkaResult<Metadata>
481    where
482        T: Into<Timeout>,
483        Self: Sized,
484    {
485        self.base.fetch_metadata(topic, timeout)
486    }
487
488    fn fetch_watermarks<T>(
489        &self,
490        topic: &str,
491        partition: i32,
492        timeout: T,
493    ) -> KafkaResult<(i64, i64)>
494    where
495        T: Into<Timeout>,
496        Self: Sized,
497    {
498        self.base.fetch_watermarks(topic, partition, timeout)
499    }
500
501    fn fetch_group_list<T>(&self, group: Option<&str>, timeout: T) -> KafkaResult<GroupList>
502    where
503        T: Into<Timeout>,
504        Self: Sized,
505    {
506        self.base.fetch_group_list(group, timeout)
507    }
508
509    fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
510        self.base.pause(partitions)
511    }
512
513    fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
514        self.base.resume(partitions)
515    }
516
517    fn rebalance_protocol(&self) -> RebalanceProtocol {
518        self.base.rebalance_protocol()
519    }
520}
521
522/// A message queue for a single partition of a [`StreamConsumer`].
523///
524/// See the documentation of [`StreamConsumer::split_partition_queue`] for
525/// details.
526pub struct StreamPartitionQueue<C, R = DefaultRuntime>
527where
528    C: ConsumerContext + 'static,
529{
530    queue: NativeQueue,
531    wakers: Arc<WakerSlab>,
532    _consumer: Arc<StreamConsumer<C, R>>,
533}
534
535impl<C, R> StreamPartitionQueue<C, R>
536where
537    C: ConsumerContext,
538{
539    /// Constructs a stream that yields messages from this partition.
540    ///
541    /// It is legal to have multiple live message streams for the same
542    /// partition, and to move those message streams across threads. Note,
543    /// however, that the message streams share the same underlying state. A
544    /// message received by the partition will be delivered to only one of the
545    /// live message streams. If you seek the underlying partition, all message
546    /// streams created from the partition will begin to draw messages from the
547    /// new position of the partition.
548    ///
549    /// If you want multiple independent views of a Kafka partition, create
550    /// multiple consumers, not multiple partition streams.
551    pub fn stream(&self) -> MessageStream<'_> {
552        MessageStream::new(&self.wakers, &self.queue)
553    }
554
555    /// Receives the next message from the stream.
556    ///
557    /// This method will block until the next message is available or an error
558    /// occurs. It is legal to call `recv` from multiple threads simultaneously.
559    ///
560    /// This method is [cancellation safe].
561    ///
562    /// Note that this method is exactly as efficient as constructing a
563    /// single-use message stream and extracting one message from it:
564    ///
565    /// ```
566    /// use futures::stream::StreamExt;
567    /// # use rdkafka::consumer::ConsumerContext;
568    /// # use rdkafka::consumer::stream_consumer::StreamPartitionQueue;
569    //
570    /// # async fn example<C>(partition_queue: StreamPartitionQueue<C>)
571    /// # where
572    /// #     C: ConsumerContext {
573    /// partition_queue.stream().next().await.expect("MessageStream never returns None");
574    /// # }
575    /// ```
576    ///
577    /// [cancellation safe]: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
578    pub async fn recv(&self) -> Result<BorrowedMessage<'_>, KafkaError> {
579        self.stream()
580            .next()
581            .await
582            .expect("kafka streams never terminate")
583    }
584}
585
586impl<C, R> Drop for StreamPartitionQueue<C, R>
587where
588    C: ConsumerContext,
589{
590    fn drop(&mut self) {
591        unsafe { disable_nonempty_callback(&self.queue) }
592    }
593}