rdkafka/producer/
future_producer.rs

1//! High-level, futures-enabled Kafka producer.
2//!
3//! See the [`FutureProducer`] for details.
4// TODO: extend docs
5
6use std::error::Error;
7use std::future::Future;
8use std::io;
9use std::marker::PhantomData;
10use std::net::SocketAddr;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::time::{Duration, Instant};
15
16use futures_channel::oneshot;
17use futures_util::FutureExt;
18
19use crate::client::{Client, ClientContext, DefaultClientContext, OAuthToken};
20use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel};
21use crate::consumer::ConsumerGroupMetadata;
22use crate::error::{KafkaError, KafkaResult, RDKafkaErrorCode};
23use crate::message::{Message, OwnedHeaders, OwnedMessage, Timestamp, ToBytes};
24use crate::producer::{BaseRecord, DeliveryResult, Producer, ProducerContext, ThreadedProducer};
25use crate::statistics::Statistics;
26use crate::topic_partition_list::TopicPartitionList;
27use crate::util::{AsyncRuntime, DefaultRuntime, IntoOpaque, Timeout};
28
29//
30// ********** FUTURE PRODUCER **********
31//
32
33/// A record for the future producer.
34///
35/// Like [`BaseRecord`], but specific to the [`FutureProducer`]. The only
36/// difference is that the [FutureRecord] doesn't provide custom delivery opaque
37/// object.
38#[derive(Debug)]
39pub struct FutureRecord<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> {
40    /// Required destination topic.
41    pub topic: &'a str,
42    /// Optional destination partition.
43    pub partition: Option<i32>,
44    /// Optional payload.
45    pub payload: Option<&'a P>,
46    /// Optional key.
47    pub key: Option<&'a K>,
48    /// Optional timestamp.
49    pub timestamp: Option<i64>,
50    /// Optional message headers.
51    pub headers: Option<OwnedHeaders>,
52}
53
54impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> FutureRecord<'a, K, P> {
55    /// Creates a new record with the specified topic name.
56    pub fn to(topic: &'a str) -> FutureRecord<'a, K, P> {
57        FutureRecord {
58            topic,
59            partition: None,
60            payload: None,
61            key: None,
62            timestamp: None,
63            headers: None,
64        }
65    }
66
67    fn from_base_record<D: IntoOpaque>(
68        base_record: BaseRecord<'a, K, P, D>,
69    ) -> FutureRecord<'a, K, P> {
70        FutureRecord {
71            topic: base_record.topic,
72            partition: base_record.partition,
73            key: base_record.key,
74            payload: base_record.payload,
75            timestamp: base_record.timestamp,
76            headers: base_record.headers,
77        }
78    }
79
80    /// Sets the destination partition of the record.
81    pub fn partition(mut self, partition: i32) -> FutureRecord<'a, K, P> {
82        self.partition = Some(partition);
83        self
84    }
85
86    /// Sets the destination payload of the record.
87    pub fn payload(mut self, payload: &'a P) -> FutureRecord<'a, K, P> {
88        self.payload = Some(payload);
89        self
90    }
91
92    /// Sets the destination key of the record.
93    pub fn key(mut self, key: &'a K) -> FutureRecord<'a, K, P> {
94        self.key = Some(key);
95        self
96    }
97
98    /// Sets the destination timestamp of the record.
99    pub fn timestamp(mut self, timestamp: i64) -> FutureRecord<'a, K, P> {
100        self.timestamp = Some(timestamp);
101        self
102    }
103
104    /// Sets the headers of the record.
105    pub fn headers(mut self, headers: OwnedHeaders) -> FutureRecord<'a, K, P> {
106        self.headers = Some(headers);
107        self
108    }
109
110    fn into_base_record<D: IntoOpaque>(self, delivery_opaque: D) -> BaseRecord<'a, K, P, D> {
111        BaseRecord {
112            topic: self.topic,
113            partition: self.partition,
114            key: self.key,
115            payload: self.payload,
116            timestamp: self.timestamp,
117            headers: self.headers,
118            delivery_opaque,
119        }
120    }
121}
122
123/// The [`ProducerContext`] used by the [`FutureProducer`].
124///
125/// This context will use a [`Future`] as its `DeliveryOpaque` and will complete
126/// the future when the message is delivered (or failed to).
127#[derive(Clone)]
128pub struct FutureProducerContext<C: ClientContext + 'static> {
129    wrapped_context: C,
130}
131
132/// Represents the result of message production as performed from the
133/// `FutureProducer`.
134///
135/// If message delivery was successful, `OwnedDeliveryResult` will return the
136/// partition and offset of the message. If the message failed to be delivered
137/// an error will be returned, together with an owned copy of the original
138/// message.
139pub type OwnedDeliveryResult = Result<(i32, i64), (KafkaError, OwnedMessage)>;
140
141// Delegates all the methods calls to the wrapped context.
142impl<C: ClientContext + 'static> ClientContext for FutureProducerContext<C> {
143    const ENABLE_REFRESH_OAUTH_TOKEN: bool = C::ENABLE_REFRESH_OAUTH_TOKEN;
144
145    fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
146        self.wrapped_context.log(level, fac, log_message);
147    }
148
149    fn stats(&self, statistics: Statistics) {
150        self.wrapped_context.stats(statistics);
151    }
152
153    fn stats_raw(&self, statistics: &[u8]) {
154        self.wrapped_context.stats_raw(statistics)
155    }
156
157    fn error(&self, error: KafkaError, reason: &str) {
158        self.wrapped_context.error(error, reason);
159    }
160
161    fn resolve_broker_addr(&self, host: &str, port: u16) -> Result<Vec<SocketAddr>, io::Error> {
162        self.wrapped_context.resolve_broker_addr(host, port)
163    }
164
165    fn generate_oauth_token(
166        &self,
167        oauthbearer_config: Option<&str>,
168    ) -> Result<OAuthToken, Box<dyn Error>> {
169        self.wrapped_context
170            .generate_oauth_token(oauthbearer_config)
171    }
172}
173
174impl<C: ClientContext + 'static> ProducerContext for FutureProducerContext<C> {
175    type DeliveryOpaque = Box<oneshot::Sender<OwnedDeliveryResult>>;
176
177    fn delivery(
178        &self,
179        delivery_result: &DeliveryResult<'_>,
180        tx: Box<oneshot::Sender<OwnedDeliveryResult>>,
181    ) {
182        let owned_delivery_result = match *delivery_result {
183            Ok(ref message) => Ok((message.partition(), message.offset())),
184            Err((ref error, ref message)) => Err((error.clone(), message.detach())),
185        };
186        let _ = tx.send(owned_delivery_result); // TODO: handle error
187    }
188}
189
190/// A producer that returns a [`Future`] for every message being produced.
191///
192/// Since message production in rdkafka is asynchronous, the caller cannot
193/// immediately know if the delivery of the message was successful or not. The
194/// FutureProducer provides this information in a [`Future`], which will be
195/// completed once the information becomes available.
196///
197/// This producer has an internal polling thread and as such it doesn't need to
198/// be polled. It can be cheaply cloned to get a reference to the same
199/// underlying producer. The internal polling thread will be terminated when the
200/// `FutureProducer` goes out of scope.
201#[must_use = "Producer polling thread will stop immediately if unused"]
202pub struct FutureProducer<C = DefaultClientContext, R = DefaultRuntime>
203where
204    C: ClientContext + 'static,
205{
206    producer: Arc<ThreadedProducer<FutureProducerContext<C>>>,
207    _runtime: PhantomData<R>,
208}
209
210impl<C, R> Clone for FutureProducer<C, R>
211where
212    C: ClientContext + 'static,
213{
214    fn clone(&self) -> FutureProducer<C, R> {
215        FutureProducer {
216            producer: self.producer.clone(),
217            _runtime: PhantomData,
218        }
219    }
220}
221
222impl<R> FromClientConfig for FutureProducer<DefaultClientContext, R>
223where
224    R: AsyncRuntime,
225{
226    fn from_config(config: &ClientConfig) -> KafkaResult<FutureProducer<DefaultClientContext, R>> {
227        FutureProducer::from_config_and_context(config, DefaultClientContext)
228    }
229}
230
231impl<C, R> FromClientConfigAndContext<C> for FutureProducer<C, R>
232where
233    C: ClientContext + 'static,
234    R: AsyncRuntime,
235{
236    fn from_config_and_context(
237        config: &ClientConfig,
238        context: C,
239    ) -> KafkaResult<FutureProducer<C, R>> {
240        let future_context = FutureProducerContext {
241            wrapped_context: context,
242        };
243        let threaded_producer = ThreadedProducer::from_config_and_context(config, future_context)?;
244        Ok(FutureProducer {
245            producer: Arc::new(threaded_producer),
246            _runtime: PhantomData,
247        })
248    }
249}
250
251/// A [`Future`] wrapping the result of the message production.
252///
253/// Once completed, the future will contain an `OwnedDeliveryResult` with
254/// information on the delivery status of the message. If the producer is
255/// dropped before the delivery status is received, the future will instead
256/// resolve with [`oneshot::Canceled`].
257pub struct DeliveryFuture {
258    rx: oneshot::Receiver<OwnedDeliveryResult>,
259}
260
261impl Future for DeliveryFuture {
262    type Output = Result<OwnedDeliveryResult, oneshot::Canceled>;
263
264    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
265        self.rx.poll_unpin(cx)
266    }
267}
268
269impl<C, R> FutureProducer<C, R>
270where
271    C: ClientContext + 'static,
272    R: AsyncRuntime,
273{
274    /// Sends a message to Kafka, returning the result of the send.
275    ///
276    /// The `queue_timeout` parameter controls how long to retry for if the
277    /// librdkafka producer queue is full. Set it to `Timeout::Never` to retry
278    /// forever or `Timeout::After(0)` to never block. If the timeout is reached
279    /// and the queue is still full, an [`RDKafkaErrorCode::QueueFull`] error will
280    /// be reported in the [`OwnedDeliveryResult`].
281    ///
282    /// Keep in mind that `queue_timeout` only applies to the first phase of the
283    /// send operation. Once the message is queued, the underlying librdkafka
284    /// client has separate timeout parameters that apply, like
285    /// `delivery.timeout.ms`.
286    ///
287    /// See also the [`FutureProducer::send_result`] method, which will not
288    /// retry the queue operation if the queue is full.
289    pub async fn send<K, P, T>(
290        &self,
291        record: FutureRecord<'_, K, P>,
292        queue_timeout: T,
293    ) -> OwnedDeliveryResult
294    where
295        K: ToBytes + ?Sized,
296        P: ToBytes + ?Sized,
297        T: Into<Timeout>,
298    {
299        let start_time = Instant::now();
300        let queue_timeout = queue_timeout.into();
301        let can_retry = || match queue_timeout {
302            Timeout::Never => true,
303            Timeout::After(t) if start_time.elapsed() < t => true,
304            _ => false,
305        };
306
307        let (tx, rx) = oneshot::channel();
308        let mut base_record = record.into_base_record(Box::new(tx));
309
310        loop {
311            match self.producer.send(base_record) {
312                Err((e, record))
313                    if e == KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull)
314                        && can_retry() =>
315                {
316                    base_record = record;
317                    R::delay_for(Duration::from_millis(100)).await;
318                }
319                Ok(_) => {
320                    // We hold a reference to the producer, so it should not be
321                    // possible for the producer to vanish and cancel the
322                    // oneshot.
323                    break rx.await.expect("producer unexpectedly dropped");
324                }
325                Err((e, record)) => {
326                    let owned_message = OwnedMessage::new(
327                        record.payload.map(|p| p.to_bytes().to_vec()),
328                        record.key.map(|k| k.to_bytes().to_vec()),
329                        record.topic.to_owned(),
330                        record
331                            .timestamp
332                            .map_or(Timestamp::NotAvailable, Timestamp::CreateTime),
333                        record.partition.unwrap_or(-1),
334                        0,
335                        record.headers,
336                    );
337                    break Err((e, owned_message));
338                }
339            }
340        }
341    }
342
343    /// Like [`FutureProducer::send`], but if enqueuing fails, an error will be
344    /// returned immediately, alongside the [`FutureRecord`] provided.
345    pub fn send_result<'a, K, P>(
346        &self,
347        record: FutureRecord<'a, K, P>,
348    ) -> Result<DeliveryFuture, (KafkaError, FutureRecord<'a, K, P>)>
349    where
350        K: ToBytes + ?Sized,
351        P: ToBytes + ?Sized,
352    {
353        let (tx, rx) = oneshot::channel();
354        let base_record = record.into_base_record(Box::new(tx));
355        self.producer
356            .send(base_record)
357            .map(|()| DeliveryFuture { rx })
358            .map_err(|(e, record)| (e, FutureRecord::from_base_record(record)))
359    }
360
361    /// Polls the internal producer.
362    ///
363    /// This is not normally required since the `FutureProducer` has a thread
364    /// dedicated to calling `poll` regularly.
365    pub fn poll<T: Into<Timeout>>(&self, timeout: T) {
366        self.producer.poll(timeout);
367    }
368}
369
370impl<C, R> Producer<FutureProducerContext<C>> for FutureProducer<C, R>
371where
372    C: ClientContext + 'static,
373    R: AsyncRuntime,
374{
375    fn client(&self) -> &Client<FutureProducerContext<C>> {
376        self.producer.client()
377    }
378
379    fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
380        self.producer.flush(timeout)
381    }
382
383    fn in_flight_count(&self) -> i32 {
384        self.producer.in_flight_count()
385    }
386
387    fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
388        self.producer.init_transactions(timeout)
389    }
390
391    fn begin_transaction(&self) -> KafkaResult<()> {
392        self.producer.begin_transaction()
393    }
394
395    fn send_offsets_to_transaction<T: Into<Timeout>>(
396        &self,
397        offsets: &TopicPartitionList,
398        cgm: &ConsumerGroupMetadata,
399        timeout: T,
400    ) -> KafkaResult<()> {
401        self.producer
402            .send_offsets_to_transaction(offsets, cgm, timeout)
403    }
404
405    fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
406        self.producer.commit_transaction(timeout)
407    }
408
409    fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
410        self.producer.abort_transaction(timeout)
411    }
412}
413
414#[cfg(test)]
415mod tests {
416    // Just test that there are no panics, and that each struct implements the expected
417    // traits (Clone, Send, Sync etc.). Behavior is tested in the integrations tests.
418    use super::*;
419    use crate::config::ClientConfig;
420
421    struct TestContext;
422
423    impl ClientContext for TestContext {}
424    impl ProducerContext for TestContext {
425        type DeliveryOpaque = Box<i32>;
426
427        fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {
428            unimplemented!()
429        }
430    }
431
432    // Verify that the future producer is clone, according to documentation.
433    #[test]
434    fn test_future_producer_clone() {
435        let producer = ClientConfig::new().create::<FutureProducer>().unwrap();
436        let _producer_clone = producer.clone();
437    }
438
439    // Test that the future producer can be cloned even if the context is not Clone.
440    #[test]
441    fn test_base_future_topic_send_sync() {
442        let test_context = TestContext;
443        let producer = ClientConfig::new()
444            .create_with_context::<_, FutureProducer<TestContext>>(test_context)
445            .unwrap();
446        let _producer_clone = producer.clone();
447    }
448}