rdkafka/producer/
mod.rs

1//! Kafka producers.
2//!
3//! ## The C librdkafka producer
4//!
5//! Rust-rdkafka relies on the C librdkafka producer to communicate with Kafka,
6//! so in order to understand how the Rust producers work it is important to
7//! understand the basics of the C one as well.
8//!
9//! ### Async
10//!
11//! The librdkafka producer is completely asynchronous: it maintains a memory
12//! buffer where messages waiting to be sent or currently in flight are stored.
13//! Once a message is delivered or an error occurred and the maximum number of
14//! retries has been reached, the producer will enqueue a delivery event with
15//! the appropriate delivery result into an internal event queue.
16//!
17//! The librdkafka user is responsible for calling the `poll` function at
18//! regular intervals to process those events; the thread calling `poll` will be
19//! the one executing the user-specified delivery callback for every delivery
20//! event. If `poll` is not called, or not frequently enough, the producer will
21//! return a [`RDKafkaErrorCode::QueueFull`] error and it won't be able to send
22//! any other message until more delivery events are processed via `poll`. The
23//! `QueueFull` error can also be returned if Kafka is not able to receive the
24//! messages quickly enough.
25//!
26//! ### Error reporting
27//!
28//! The C library will try deal with all the transient errors such as broker
29//! disconnection, timeouts etc. These errors, called global errors, are
30//! automatically logged in rust-rdkafka, but they normally don't require any
31//! handling as they are automatically handled internally. To see the logs, make
32//! sure you initialize the logger.
33//!
34//! As mentioned earlier, errors specific to message production will be reported
35//! in the delivery callback.
36//!
37//! ### Buffering
38//!
39//! Buffering is done automatically by librdkafka. When `send` is called, the
40//! message is enqueued internally and once enough messages have been enqueued,
41//! or when enough time has passed, they will be sent to Kafka as a single
42//! batch. You can control the behavior of the buffer by configuring the the
43//! `queue.buffering.max.*` parameters listed below.
44//!
45//! ## `rust-rdkafka` producers
46//!
47//! `rust-rdkafka` (rdkafka for brevity) provides two sets of producers: low
48//! level and high level.
49//!
50//! ### Low-level producers
51//!
52//! The lowest level producer provided by rdkafka is called [`BaseProducer`].
53//! The goal of the `BaseProducer` is to be as close as possible to the C one
54//! while maintaining a safe Rust interface. In particular, the `BaseProducer`
55//! needs to be polled at regular intervals to execute any delivery callback
56//! that might be waiting and to make sure the queue doesn't fill up.
57//!
58//! Another low lever producer is the [`ThreadedProducer`], which is a
59//! `BaseProducer` with a dedicated thread for polling.
60//!
61//! The delivery callback can be defined using a `ProducerContext`. See the
62//! [`base_producer`] module for more information.
63//!
64//! ### High-level producer
65//!
66//! At the moment the only high level producer implemented is the
67//! [`FutureProducer`]. The `FutureProducer` doesn't rely on user-defined
68//! callbacks to notify the delivery or failure of a message; instead, this
69//! information will be returned in a Future. The `FutureProducer` also uses an
70//! internal thread that is used for polling, which makes calling poll
71//! explicitly not necessary. The returned future will contain information about
72//! the delivered message in case of success, or a copy of the original message
73//! in case of failure. Additional computation can be chained to the returned
74//! future, and it will executed by the future executor once the value is
75//! available (for more information, check the documentation of the futures
76//! crate).
77//!
78//! ## Transactions
79//!
80//! All rust-rdkafka producers support transactions. Transactional producers
81//! work together with transaction-aware consumers configured with the default
82//! `isolation.level` of `read_committed`.
83//!
84//! To configure a producer for transactions set `transactional.id` to an
85//! identifier unique to the application when creating the producer. After
86//! creating the producer, you must initialize it with
87//! [`Producer::init_transactions`].
88//!
89//! To start a new transaction use [`Producer::begin_transaction`]. There can be
90//! **only one ongoing transaction** at a time per producer. All records sent
91//! after starting a transaction and before committing or aborting it will
92//! automatically be associated with that transaction.
93//!
94//! Once you have initialized transactions on a producer, you are not permitted
95//! to produce messages outside of a transaction.
96//!
97//! Consumer offsets can be sent as part of the ongoing transaction using
98//! `send_offsets_to_transaction` and will be committed atomically with the
99//! other records sent in the transaction.
100//!
101//! The current transaction can be committed with
102//! [`Producer::commit_transaction`] or aborted using
103//! [`Producer::abort_transaction`]. Afterwards, a new transaction can begin.
104//!
105//! ### Errors
106//!
107//! Errors returned by transaction methods may:
108//!
109//! * be retriable ([`RDKafkaError::is_retriable`]), in which case the operation
110//!   that encountered the error may be retried.
111//! * require abort ([`RDKafkaError::txn_requires_abort`], in which case the
112//!   current transaction must be aborted and a new transaction begun.
113//! * be fatal ([`RDKafkaError::is_fatal`]), in which case the producer must be
114//!   stopped and the application terminated.
115//!
116//! For more details about transactions, see the [Transactional Producer]
117//! section of the librdkafka introduction.
118//!
119//! ## Configuration
120//!
121//! ### Producer configuration
122//!
123//! For the configuration parameters common to both producers and consumers,
124//! refer to the documentation in the `config` module. Here are listed the most
125//! commonly used producer configuration. Click
126//! [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
127//! for the full list.
128//!
129//! - `queue.buffering.max.messages`: Maximum number of messages allowed on the
130//!   producer queue. Default: 100000.
131//! - `queue.buffering.max.kbytes`: Maximum total message size sum allowed on
132//!   the producer queue. This property has higher priority than
133//!   queue.buffering.max.messages. Default: 4000000.
134//! - `queue.buffering.max.ms`: Delay in milliseconds to wait for messages in
135//!   the producer queue to accumulate before sending a request to the brokers.
136//!   A higher value allows larger and more effective (less overhead, improved
137//!   compression) batches of messages to accumulate at the expense of increased
138//!   message delivery latency. Default: 0.
139//! - `message.send.max.retries`: How many times to retry sending a failing
140//!   batch. Note: retrying may cause reordering. Default: 2.
141//! - `compression.codec`: Compression codec to use for compressing message
142//!   sets. Default: none.
143//! - `request.required.acks`: This field indicates how many acknowledgements
144//!   the leader broker must receive from ISR brokers before responding to the
145//!   request: 0=Broker does not send any response/ack to client, 1=Only the
146//!   leader broker will need to ack the message, -1 or all=broker will block
147//!   until message is committed by all in sync replicas (ISRs) or broker's
148//!   in.sync.replicas setting before sending response. Default: 1.
149//! - `request.timeout.ms`: The ack timeout of the producer request in
150//!   milliseconds. This value is only enforced by the broker and relies on
151//!   request.required.acks being != 0. Default: 5000.
152//! - `message.timeout.ms`: Local message timeout. This value is only enforced
153//!   locally and limits the time a produced message waits for successful
154//!   delivery. A time of 0 is infinite. Default: 300000.
155//!
156//! [`RDKafkaErrorCode::QueueFull`]: crate::error::RDKafkaErrorCode::QueueFull
157//! [`RDKafkaError::is_retriable`]: crate::error::RDKafkaError::is_retriable
158//! [`RDKafkaError::txn_requires_abort`]: crate::error::RDKafkaError::txn_requires_abort
159//! [`RDKafkaError::is_fatal`]: crate::error::RDKafkaError::is_fatal
160//! [Transactional Producer]: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#transactional-producer
161
162use std::sync::Arc;
163
164use crate::client::{Client, ClientContext};
165use crate::consumer::ConsumerGroupMetadata;
166use crate::error::KafkaResult;
167use crate::topic_partition_list::TopicPartitionList;
168use crate::util::{IntoOpaque, Timeout};
169
170pub mod base_producer;
171pub mod future_producer;
172
173#[doc(inline)]
174pub use self::base_producer::{BaseProducer, BaseRecord, DeliveryResult, ThreadedProducer};
175#[doc(inline)]
176pub use self::future_producer::{DeliveryFuture, FutureProducer, FutureRecord};
177
178//
179// ********** PRODUCER CONTEXT **********
180//
181
182/// Producer-specific context.
183///
184/// This user-defined object can be used to provide custom callbacks for
185/// producer events. Refer to the list of methods to check which callbacks can
186/// be specified.
187///
188/// In particular, it can be used to specify the `delivery` callback that will
189/// be called when the acknowledgement for a delivered message is received.
190///
191/// See also the [`ClientContext`] trait.
192pub trait ProducerContext: ClientContext {
193    /// A `DeliveryOpaque` is a user-defined structure that will be passed to
194    /// the producer when producing a message, and returned to the `delivery`
195    /// method once the message has been delivered, or failed to.
196    type DeliveryOpaque: IntoOpaque;
197
198    /// This method will be called once the message has been delivered (or
199    /// failed to). The `DeliveryOpaque` will be the one provided by the user
200    /// when calling send.
201    fn delivery(&self, delivery_result: &DeliveryResult<'_>, delivery_opaque: Self::DeliveryOpaque);
202}
203
204/// An inert producer context that can be used when customizations are not
205/// required.
206#[derive(Clone)]
207pub struct DefaultProducerContext;
208
209impl ClientContext for DefaultProducerContext {}
210impl ProducerContext for DefaultProducerContext {
211    type DeliveryOpaque = ();
212
213    fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {}
214}
215
216/// Common trait for all producers.
217pub trait Producer<C = DefaultProducerContext>
218where
219    C: ProducerContext + 'static,
220{
221    /// Returns the [`Client`] underlying this producer.
222    fn client(&self) -> &Client<C>;
223
224    /// Returns a reference to the [`ProducerContext`] used to create this
225    /// producer.
226    fn context(&self) -> &Arc<C> {
227        self.client().context()
228    }
229
230    /// Returns the number of messages that are either waiting to be sent or are
231    /// sent but are waiting to be acknowledged.
232    fn in_flight_count(&self) -> i32;
233
234    /// Flushes any pending messages.
235    ///
236    /// This method should be called before termination to ensure delivery of
237    /// all enqueued messages. It will call `poll()` internally.
238    fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
239
240    /// Enable sending transactions with this producer.
241    ///
242    /// # Prerequisites
243    ///
244    /// * The configuration used to create the producer must include a
245    ///   `transactional.id` setting.
246    /// * You must not have sent any messages or called any of the other
247    ///   transaction-related functions.
248    ///
249    /// # Details
250    ///
251    /// This function ensures any transactions initiated by previous producers
252    /// with the same `transactional.id` are completed. Any transactions left
253    /// open by any such previous producers will be aborted.
254    ///
255    /// Once previous transactions have been fenced, this function acquires an
256    /// internal producer ID and epoch that will be used by all transactional
257    /// messages sent by this producer.
258    ///
259    /// If this function returns successfully, messages may only be sent to this
260    /// producer when a transaction is active. See
261    /// [`Producer::begin_transaction`].
262    ///
263    /// This function may block for the specified `timeout`.
264    fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
265
266    /// Begins a new transaction.
267    ///
268    /// # Prerequisites
269    ///
270    /// You must have successfully called [`Producer::init_transactions`].
271    ///
272    /// # Details
273    ///
274    /// This function begins a new transaction, and implicitly associates that
275    /// open transaction with this producer.
276    ///
277    /// After a successful call to this function, any messages sent via this
278    /// producer or any calls to [`Producer::send_offsets_to_transaction`] will
279    /// be implicitly associated with this transaction, until the transaction is
280    /// finished.
281    ///
282    /// Finish the transaction by calling [`Producer::commit_transaction`] or
283    /// [`Producer::abort_transaction`].
284    ///
285    /// While a transaction is open, you must perform at least one transaction
286    /// operation every `transaction.timeout.ms` to avoid timing out the
287    /// transaction on the broker.
288    fn begin_transaction(&self) -> KafkaResult<()>;
289
290    /// Associates an offset commit operation with this transaction.
291    ///
292    /// # Prerequisites
293    ///
294    /// The producer must have an open transaction via a call to
295    /// [`Producer::begin_transaction`].
296    ///
297    /// # Details
298    ///
299    /// Sends a list of topic partition offsets to the consumer group
300    /// coordinator for `cgm`, and marks the offsets as part of the current
301    /// transaction. These offsets will be considered committed only if the
302    /// transaction is committed successfully.
303    ///
304    /// The offsets should be the next message your application will consume,
305    /// i.e., one greater than the the last processed message's offset for each
306    /// partition.
307    ///
308    /// Use this method at the end of a consume-transform-produce loop, prior to
309    /// comitting the transaction with [`Producer::commit_transaction`].
310    ///
311    /// This function may block for the specified `timeout`.
312    ///
313    /// # Hints
314    ///
315    /// To obtain the correct consumer group metadata, call
316    /// [`Consumer::group_metadata`] on the consumer for which offsets are being
317    /// committed.
318    ///
319    /// The consumer must not have automatic commits enabled.
320    ///
321    /// [`Consumer::group_metadata`]: crate::consumer::Consumer::group_metadata
322    fn send_offsets_to_transaction<T: Into<Timeout>>(
323        &self,
324        offsets: &TopicPartitionList,
325        cgm: &ConsumerGroupMetadata,
326        timeout: T,
327    ) -> KafkaResult<()>;
328
329    /// Commits the current transaction.
330    ///
331    /// # Prerequisites
332    ///
333    /// The producer must have an open transaction via a call to
334    /// [`Producer::begin_transaction`].
335    ///
336    /// # Details
337    ///
338    /// Any outstanding messages will be flushed (i.e., delivered) before
339    /// actually committing the transaction.
340    ///
341    /// If any of the outstanding messages fail permanently, the current
342    /// transaction will enter an abortable error state and this function will
343    /// return an abortable error. You must then call
344    /// [`Producer::abort_transaction`] before attemping to create another
345    /// transaction.
346    ///
347    /// This function may block for the specified `timeout`.
348    fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
349
350    /// Aborts the current transaction.
351    ///
352    /// # Prerequisites
353    ///
354    /// The producer must have an open transaction via a call to
355    /// [`Producer::begin_transaction`].
356    ///
357    /// # Details
358    ///
359    /// Any oustanding messages will be purged and failed with
360    /// [`RDKafkaErrorCode::PurgeInflight`] or [`RDKafkaErrorCode::PurgeQueue`].
361    ///
362    /// This function should also be used to recover from non-fatal abortable
363    /// transaction errors.
364    ///
365    /// This function may block for the specified `timeout`.
366    ///
367    /// [`RDKafkaErrorCode::PurgeInflight`]: crate::error::RDKafkaErrorCode::PurgeInflight
368    /// [`RDKafkaErrorCode::PurgeQueue`]: crate::error::RDKafkaErrorCode::PurgeQueue
369    fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
370}