rdkafka/producer/
base_producer.rs

1//! Low-level Kafka producers.
2//!
3//! For more information about the producers provided in rdkafka, refer to the
4//! [`producer`](super) module documentation.
5//!
6//! ## `BaseProducer`
7//!
8//! The [`BaseProducer`] is a low level Kafka producer designed to be as similar
9//! as possible to the underlying C librdkafka producer, while maintaining a
10//! safe Rust interface.
11//!
12//! Production of messages is fully asynchronous. The librdkafka producer will
13//! take care of buffering requests together according to configuration, and to
14//! send them efficiently. Once a message has been produced, or the retry count
15//! reached, a callback function called delivery callback will be called.
16//!
17//! The `BaseProducer` requires a [`ProducerContext`] which will be used to
18//! specify the delivery callback and the
19//! [`DeliveryOpaque`](ProducerContext::DeliveryOpaque). The `DeliveryOpaque` is
20//! a user-defined type that the user can pass to the `send` method of the
21//! producer, and that the producer will then forward to the delivery callback
22//! of the corresponding message. The `DeliveryOpaque` is useful in case the
23//! delivery callback requires additional information about the message (such as
24//! message id for example).
25//!
26//! ### Calling poll
27//!
28//! To execute delivery callbacks the `poll` method of the producer should be
29//! called regularly. If `poll` is not called, or not often enough, a
30//! [`RDKafkaErrorCode::QueueFull`] error will be returned.
31//!
32//! ## `ThreadedProducer`
33//!
34//! The `ThreadedProducer` is a wrapper around the `BaseProducer` which spawns a
35//! thread dedicated to calling `poll` on the producer at regular intervals, so
36//! that the user doesn't have to. The thread is started when the producer is
37//! created, and it will be terminated once the producer goes out of scope.
38//!
39//! A [`RDKafkaErrorCode::QueueFull`] error can still be returned in case the
40//! polling thread is not fast enough or Kafka is not able to receive data and
41//! acknowledge messages quickly enough. If this error is returned, the caller
42//! should wait and try again.
43
44use std::ffi::CString;
45use std::mem;
46use std::os::raw::c_void;
47use std::ptr;
48use std::sync::atomic::{AtomicBool, Ordering};
49use std::sync::Arc;
50use std::thread::{self, JoinHandle};
51use std::time::Duration;
52
53use rdkafka_sys as rdsys;
54use rdkafka_sys::rd_kafka_vtype_t::*;
55use rdkafka_sys::types::*;
56
57use crate::client::Client;
58use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
59use crate::consumer::ConsumerGroupMetadata;
60use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError};
61use crate::log::{trace, warn};
62use crate::message::{BorrowedMessage, OwnedHeaders, ToBytes};
63use crate::producer::{DefaultProducerContext, Producer, ProducerContext};
64use crate::topic_partition_list::TopicPartitionList;
65use crate::util::{IntoOpaque, Timeout};
66
67pub use crate::message::DeliveryResult;
68
69/// Callback that gets called from librdkafka every time a message succeeds or fails to be
70/// delivered.
71unsafe extern "C" fn delivery_cb<C: ProducerContext>(
72    _client: *mut RDKafka,
73    msg: *const RDKafkaMessage,
74    opaque: *mut c_void,
75) {
76    let producer_context = &mut *(opaque as *mut C);
77    let delivery_opaque = C::DeliveryOpaque::from_ptr((*msg)._private);
78    let owner = 42u8;
79    // Wrap the message pointer into a BorrowedMessage that will only live for the body of this
80    // function.
81    let delivery_result = BorrowedMessage::from_dr_callback(msg as *mut RDKafkaMessage, &owner);
82    trace!("Delivery event received: {:?}", delivery_result);
83    producer_context.delivery(&delivery_result, delivery_opaque);
84    match delivery_result {
85        // Do not free the message, librdkafka will do it for us
86        Ok(message) | Err((_, message)) => mem::forget(message),
87    }
88}
89
90//
91// ********** BASE PRODUCER **********
92//
93
94/// A record for the [`BaseProducer`] and [`ThreadedProducer`].
95///
96/// The `BaseRecord` is a structure that can be used to provide a new record to
97/// [`BaseProducer::send`] or [`ThreadedProducer::send`]. Since most fields are
98/// optional, a `BaseRecord` can be constructed using the builder pattern.
99///
100/// # Examples
101///
102/// This example will create a `BaseRecord` with no
103/// [`DeliveryOpaque`](ProducerContext::DeliveryOpaque):
104///
105/// ```rust,no_run
106/// # use rdkafka::producer::BaseRecord;
107/// # use rdkafka::message::ToBytes;
108/// let record = BaseRecord::to("topic_name")  // destination topic
109///     .key(&[1, 2, 3, 4])                    // message key
110///     .payload("content")                    // message payload
111///     .partition(5);                         // target partition
112/// ```
113///
114/// The following example will build a similar record, but it will use a number
115/// as the `DeliveryOpaque` for the message:
116///
117/// ```rust,no_run
118/// # use rdkafka::producer::BaseRecord;
119/// # use rdkafka::message::ToBytes;
120/// let record = BaseRecord::with_opaque_to("topic_name", 123) // destination topic and message id
121///     .key(&[1, 2, 3, 4])                                    // message key
122///     .payload("content")                                    // message payload
123///     .partition(5);                                         // target partition
124/// ```
125#[derive(Debug)]
126pub struct BaseRecord<'a, K: ToBytes + ?Sized = (), P: ToBytes + ?Sized = (), D: IntoOpaque = ()> {
127    /// Required destination topic.
128    pub topic: &'a str,
129    /// Optional destination partition.
130    pub partition: Option<i32>,
131    /// Optional payload.
132    pub payload: Option<&'a P>,
133    /// Optional key.
134    pub key: Option<&'a K>,
135    /// Optional timestamp.
136    ///
137    /// Note that Kafka represents timestamps as the number of milliseconds
138    /// since the Unix epoch.
139    pub timestamp: Option<i64>,
140    /// Optional message headers.
141    pub headers: Option<OwnedHeaders>,
142    /// Required delivery opaque (defaults to `()` if not required).
143    pub delivery_opaque: D,
144}
145
146impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized, D: IntoOpaque> BaseRecord<'a, K, P, D> {
147    /// Creates a new record with the specified topic name and delivery opaque.
148    pub fn with_opaque_to(topic: &'a str, delivery_opaque: D) -> BaseRecord<'a, K, P, D> {
149        BaseRecord {
150            topic,
151            partition: None,
152            payload: None,
153            key: None,
154            timestamp: None,
155            headers: None,
156            delivery_opaque,
157        }
158    }
159
160    /// Sets the destination partition of the record.
161    pub fn partition(mut self, partition: i32) -> BaseRecord<'a, K, P, D> {
162        self.partition = Some(partition);
163        self
164    }
165
166    /// Sets the payload of the record.
167    pub fn payload(mut self, payload: &'a P) -> BaseRecord<'a, K, P, D> {
168        self.payload = Some(payload);
169        self
170    }
171
172    /// Sets the key of the record.
173    pub fn key(mut self, key: &'a K) -> BaseRecord<'a, K, P, D> {
174        self.key = Some(key);
175        self
176    }
177
178    /// Sets the timestamp of the record.
179    ///
180    /// Note that Kafka represents timestamps as the number of milliseconds
181    /// since the Unix epoch.
182    pub fn timestamp(mut self, timestamp: i64) -> BaseRecord<'a, K, P, D> {
183        self.timestamp = Some(timestamp);
184        self
185    }
186
187    /// Sets the headers of the record.
188    pub fn headers(mut self, headers: OwnedHeaders) -> BaseRecord<'a, K, P, D> {
189        self.headers = Some(headers);
190        self
191    }
192}
193
194impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> BaseRecord<'a, K, P, ()> {
195    /// Creates a new record with the specified topic name.
196    pub fn to(topic: &'a str) -> BaseRecord<'a, K, P, ()> {
197        BaseRecord {
198            topic,
199            partition: None,
200            payload: None,
201            key: None,
202            timestamp: None,
203            headers: None,
204            delivery_opaque: (),
205        }
206    }
207}
208
209impl FromClientConfig for BaseProducer<DefaultProducerContext> {
210    /// Creates a new `BaseProducer` starting from a configuration.
211    fn from_config(config: &ClientConfig) -> KafkaResult<BaseProducer<DefaultProducerContext>> {
212        BaseProducer::from_config_and_context(config, DefaultProducerContext)
213    }
214}
215
216impl<C> FromClientConfigAndContext<C> for BaseProducer<C>
217where
218    C: ProducerContext,
219{
220    /// Creates a new `BaseProducer` starting from a configuration and a
221    /// context.
222    fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<BaseProducer<C>> {
223        let native_config = config.create_native_config()?;
224        unsafe { rdsys::rd_kafka_conf_set_dr_msg_cb(native_config.ptr(), Some(delivery_cb::<C>)) };
225        let client = Client::new(
226            config,
227            native_config,
228            RDKafkaType::RD_KAFKA_PRODUCER,
229            context,
230        )?;
231        Ok(BaseProducer::from_client(client))
232    }
233}
234
235/// Lowest level Kafka producer.
236///
237/// The `BaseProducer` needs to be polled at regular intervals in order to serve
238/// queued delivery report callbacks (for more information, refer to the
239/// module-level documentation). This producer can be cheaply cloned to create a
240/// new reference to the same underlying producer.
241///
242/// # Example usage
243///
244/// This code will send a message to Kafka. No custom [`ProducerContext`] is
245/// specified, so the [`DefaultProducerContext`] will be used. To see how to use
246/// a producer context, refer to the examples in the [`examples`] folder.
247///
248/// ```rust
249/// use rdkafka::config::ClientConfig;
250/// use rdkafka::producer::{BaseProducer, BaseRecord, Producer};
251/// use std::time::Duration;
252///
253/// let producer: BaseProducer = ClientConfig::new()
254///     .set("bootstrap.servers", "kafka:9092")
255///     .create()
256///     .expect("Producer creation error");
257///
258/// producer.send(
259///     BaseRecord::to("destination_topic")
260///         .payload("this is the payload")
261///         .key("and this is a key"),
262/// ).expect("Failed to enqueue");
263///
264/// // Poll at regular intervals to process all the asynchronous delivery events.
265/// for _ in 0..10 {
266///     producer.poll(Duration::from_millis(100));
267/// }
268///
269/// // And/or flush the producer before dropping it.
270/// producer.flush(Duration::from_secs(1));
271/// ```
272///
273/// [`examples`]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/
274pub struct BaseProducer<C = DefaultProducerContext>
275where
276    C: ProducerContext + 'static,
277{
278    client_arc: Arc<Client<C>>,
279}
280
281impl<C> BaseProducer<C>
282where
283    C: ProducerContext,
284{
285    /// Creates a base producer starting from a Client.
286    fn from_client(client: Client<C>) -> BaseProducer<C> {
287        BaseProducer {
288            client_arc: Arc::new(client),
289        }
290    }
291
292    /// Polls the producer, returning the number of events served.
293    ///
294    /// Regular calls to `poll` are required to process the events and execute
295    /// the message delivery callbacks.
296    pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> i32 {
297        unsafe { rdsys::rd_kafka_poll(self.native_ptr(), timeout.into().as_millis()) }
298    }
299
300    /// Returns a pointer to the native Kafka client.
301    fn native_ptr(&self) -> *mut RDKafka {
302        self.client_arc.native_ptr()
303    }
304
305    /// Sends a message to Kafka.
306    ///
307    /// Message fields such as key, payload, partition, timestamp etc. are
308    /// provided to this method via a [`BaseRecord`]. If the message is
309    /// correctly enqueued in the producer's memory buffer, the method will take
310    /// ownership of the record and return immediately; in case of failure to
311    /// enqueue, the original record is returned, alongside an error code. If
312    /// the message fails to be produced after being enqueued in the buffer, the
313    /// [`ProducerContext::delivery`] method will be called asynchronously, with
314    /// the provided [`ProducerContext::DeliveryOpaque`].
315    ///
316    /// When no partition is specified the underlying Kafka library picks a
317    /// partition based on a hash of the key. If no key is specified, a random
318    /// partition will be used. To correctly handle errors, the delivery
319    /// callback should be implemented.
320    ///
321    /// Note that this method will never block.
322    // Simplifying the return type requires generic associated types, which are
323    // unstable.
324    pub fn send<'a, K, P>(
325        &self,
326        mut record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
327    ) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
328    where
329        K: ToBytes + ?Sized,
330        P: ToBytes + ?Sized,
331    {
332        fn as_bytes(opt: Option<&(impl ?Sized + ToBytes)>) -> (*mut c_void, usize) {
333            match opt.map(ToBytes::to_bytes) {
334                None => (ptr::null_mut(), 0),
335                Some(p) => (p.as_ptr() as *mut c_void, p.len()),
336            }
337        }
338        let (payload_ptr, payload_len) = as_bytes(record.payload);
339        let (key_ptr, key_len) = as_bytes(record.key);
340        let topic_cstring = CString::new(record.topic.to_owned()).unwrap();
341        let opaque_ptr = record.delivery_opaque.into_ptr();
342        let produce_error = unsafe {
343            rdsys::rd_kafka_producev(
344                self.native_ptr(),
345                RD_KAFKA_VTYPE_TOPIC,
346                topic_cstring.as_ptr(),
347                RD_KAFKA_VTYPE_PARTITION,
348                record.partition.unwrap_or(-1),
349                RD_KAFKA_VTYPE_MSGFLAGS,
350                rdsys::RD_KAFKA_MSG_F_COPY as i32,
351                RD_KAFKA_VTYPE_VALUE,
352                payload_ptr,
353                payload_len,
354                RD_KAFKA_VTYPE_KEY,
355                key_ptr,
356                key_len,
357                RD_KAFKA_VTYPE_OPAQUE,
358                opaque_ptr,
359                RD_KAFKA_VTYPE_TIMESTAMP,
360                record.timestamp.unwrap_or(0),
361                RD_KAFKA_VTYPE_HEADERS,
362                record
363                    .headers
364                    .as_ref()
365                    .map_or(ptr::null_mut(), OwnedHeaders::ptr),
366                RD_KAFKA_VTYPE_END,
367            )
368        };
369        if produce_error.is_error() {
370            record.delivery_opaque = unsafe { C::DeliveryOpaque::from_ptr(opaque_ptr) };
371            Err((KafkaError::MessageProduction(produce_error.into()), record))
372        } else {
373            // The kafka producer now owns the headers
374            mem::forget(record.headers);
375            Ok(())
376        }
377    }
378}
379
380impl<C> Producer<C> for BaseProducer<C>
381where
382    C: ProducerContext,
383{
384    fn client(&self) -> &Client<C> {
385        &*self.client_arc
386    }
387
388    fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
389        let ret = unsafe { rdsys::rd_kafka_flush(self.native_ptr(), timeout.into().as_millis()) };
390        if ret.is_error() {
391            Err(KafkaError::Flush(ret.into()))
392        } else {
393            Ok(())
394        }
395    }
396
397    fn in_flight_count(&self) -> i32 {
398        unsafe { rdsys::rd_kafka_outq_len(self.native_ptr()) }
399    }
400
401    fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
402        let ret = unsafe {
403            RDKafkaError::from_ptr(rdsys::rd_kafka_init_transactions(
404                self.native_ptr(),
405                timeout.into().as_millis(),
406            ))
407        };
408        if ret.is_error() {
409            Err(KafkaError::Transaction(ret))
410        } else {
411            Ok(())
412        }
413    }
414
415    fn begin_transaction(&self) -> KafkaResult<()> {
416        let ret =
417            unsafe { RDKafkaError::from_ptr(rdsys::rd_kafka_begin_transaction(self.native_ptr())) };
418        if ret.is_error() {
419            Err(KafkaError::Transaction(ret))
420        } else {
421            Ok(())
422        }
423    }
424
425    fn send_offsets_to_transaction<T: Into<Timeout>>(
426        &self,
427        offsets: &TopicPartitionList,
428        cgm: &ConsumerGroupMetadata,
429        timeout: T,
430    ) -> KafkaResult<()> {
431        let ret = unsafe {
432            RDKafkaError::from_ptr(rdsys::rd_kafka_send_offsets_to_transaction(
433                self.native_ptr(),
434                offsets.ptr(),
435                cgm.ptr(),
436                timeout.into().as_millis(),
437            ))
438        };
439        if ret.is_error() {
440            Err(KafkaError::Transaction(ret))
441        } else {
442            Ok(())
443        }
444    }
445
446    fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
447        let ret = unsafe {
448            RDKafkaError::from_ptr(rdsys::rd_kafka_commit_transaction(
449                self.native_ptr(),
450                timeout.into().as_millis(),
451            ))
452        };
453        if ret.is_error() {
454            Err(KafkaError::Transaction(ret))
455        } else {
456            Ok(())
457        }
458    }
459
460    fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
461        let ret = unsafe {
462            RDKafkaError::from_ptr(rdsys::rd_kafka_abort_transaction(
463                self.native_ptr(),
464                timeout.into().as_millis(),
465            ))
466        };
467        if ret.is_error() {
468            Err(KafkaError::Transaction(ret))
469        } else {
470            Ok(())
471        }
472    }
473}
474
475impl<C> Clone for BaseProducer<C>
476where
477    C: ProducerContext,
478{
479    fn clone(&self) -> BaseProducer<C> {
480        BaseProducer {
481            client_arc: self.client_arc.clone(),
482        }
483    }
484}
485
486//
487// ********** THREADED PRODUCER **********
488//
489
490/// A low-level Kafka producer with a separate thread for event handling.
491///
492/// The `ThreadedProducer` is a [`BaseProducer`] with a separate thread
493/// dedicated to calling `poll` at regular intervals in order to execute any
494/// queued events, such as delivery notifications. The thread will be
495/// automatically stopped when the producer is dropped.
496#[must_use = "The threaded producer will stop immediately if unused"]
497pub struct ThreadedProducer<C>
498where
499    C: ProducerContext + 'static,
500{
501    producer: BaseProducer<C>,
502    should_stop: Arc<AtomicBool>,
503    handle: Option<Arc<JoinHandle<()>>>,
504}
505
506impl FromClientConfig for ThreadedProducer<DefaultProducerContext> {
507    fn from_config(config: &ClientConfig) -> KafkaResult<ThreadedProducer<DefaultProducerContext>> {
508        ThreadedProducer::from_config_and_context(config, DefaultProducerContext)
509    }
510}
511
512impl<C> FromClientConfigAndContext<C> for ThreadedProducer<C>
513where
514    C: ProducerContext + 'static,
515{
516    fn from_config_and_context(
517        config: &ClientConfig,
518        context: C,
519    ) -> KafkaResult<ThreadedProducer<C>> {
520        let producer = BaseProducer::from_config_and_context(config, context)?;
521        let should_stop = Arc::new(AtomicBool::new(false));
522        let thread = {
523            let producer = producer.clone();
524            let should_stop = should_stop.clone();
525            thread::Builder::new()
526                .name("producer polling thread".to_string())
527                .spawn(move || {
528                    trace!("Polling thread loop started");
529                    loop {
530                        let n = producer.poll(Duration::from_millis(100));
531                        if n == 0 {
532                            if should_stop.load(Ordering::Relaxed) {
533                                // We received nothing and the thread should
534                                // stop, so break the loop.
535                                break;
536                            }
537                        } else {
538                            trace!("Received {} events", n);
539                        }
540                    }
541                    trace!("Polling thread loop terminated");
542                })
543                .expect("Failed to start polling thread")
544        };
545        Ok(ThreadedProducer {
546            producer,
547            should_stop,
548            handle: Some(Arc::new(thread)),
549        })
550    }
551}
552
553impl<C> ThreadedProducer<C>
554where
555    C: ProducerContext + 'static,
556{
557    /// Sends a message to Kafka.
558    ///
559    /// See the documentation for [`BaseProducer::send`] for details.
560    // Simplifying the return type requires generic associated types, which are
561    // unstable.
562    pub fn send<'a, K, P>(
563        &self,
564        record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
565    ) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
566    where
567        K: ToBytes + ?Sized,
568        P: ToBytes + ?Sized,
569    {
570        self.producer.send(record)
571    }
572
573    /// Polls the internal producer.
574    ///
575    /// This is not normally required since the `ThreadedProducer` has a thread
576    /// dedicated to calling `poll` regularly.
577    pub fn poll<T: Into<Timeout>>(&self, timeout: T) {
578        self.producer.poll(timeout);
579    }
580}
581
582impl<C> Producer<C> for ThreadedProducer<C>
583where
584    C: ProducerContext + 'static,
585{
586    fn client(&self) -> &Client<C> {
587        self.producer.client()
588    }
589
590    fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
591        self.producer.flush(timeout)
592    }
593
594    fn in_flight_count(&self) -> i32 {
595        self.producer.in_flight_count()
596    }
597
598    fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
599        self.producer.init_transactions(timeout)
600    }
601
602    fn begin_transaction(&self) -> KafkaResult<()> {
603        self.producer.begin_transaction()
604    }
605
606    fn send_offsets_to_transaction<T: Into<Timeout>>(
607        &self,
608        offsets: &TopicPartitionList,
609        cgm: &ConsumerGroupMetadata,
610        timeout: T,
611    ) -> KafkaResult<()> {
612        self.producer
613            .send_offsets_to_transaction(offsets, cgm, timeout)
614    }
615
616    fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
617        self.producer.commit_transaction(timeout)
618    }
619
620    fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
621        self.producer.abort_transaction(timeout)
622    }
623}
624
625impl<C: ProducerContext + 'static> Clone for ThreadedProducer<C> {
626    fn clone(&self) -> Self {
627        Self {
628            producer: self.producer.clone(),
629            should_stop: Arc::clone(&self.should_stop),
630            handle: self.handle.clone(),
631        }
632    }
633}
634
635impl<C> Drop for ThreadedProducer<C>
636where
637    C: ProducerContext + 'static,
638{
639    fn drop(&mut self) {
640        trace!("Destroy ThreadedProducer");
641        if let Some(handle) = self.handle.take().and_then(Arc::into_inner) {
642            trace!("Stopping polling");
643            self.should_stop.store(true, Ordering::Relaxed);
644            trace!("Waiting for polling thread termination");
645            match handle.join() {
646                Ok(()) => trace!("Polling stopped"),
647                Err(e) => warn!("Failure while terminating thread: {:?}", e),
648            };
649        }
650        trace!("ThreadedProducer destroyed");
651    }
652}
653
654#[cfg(test)]
655mod tests {
656    // Just test that there are no panics, and that each struct implements the
657    // expected traits (Clone, Send, Sync etc.). Behavior is tested in the
658    // integration tests.
659    use super::*;
660    use crate::config::ClientConfig;
661
662    // Verify that the producer is clone, according to documentation.
663    #[test]
664    fn test_base_producer_clone() {
665        let producer = ClientConfig::new().create::<BaseProducer<_>>().unwrap();
666        let _producer_clone = producer.clone();
667    }
668}