rdkafka/producer/mod.rs
1//! Kafka producers.
2//!
3//! ## The C librdkafka producer
4//!
5//! Rust-rdkafka relies on the C librdkafka producer to communicate with Kafka,
6//! so in order to understand how the Rust producers work it is important to
7//! understand the basics of the C one as well.
8//!
9//! ### Async
10//!
11//! The librdkafka producer is completely asynchronous: it maintains a memory
12//! buffer where messages waiting to be sent or currently in flight are stored.
13//! Once a message is delivered or an error occurred and the maximum number of
14//! retries has been reached, the producer will enqueue a delivery event with
15//! the appropriate delivery result into an internal event queue.
16//!
17//! The librdkafka user is responsible for calling the `poll` function at
18//! regular intervals to process those events; the thread calling `poll` will be
19//! the one executing the user-specified delivery callback for every delivery
20//! event. If `poll` is not called, or not frequently enough, the producer will
21//! return a [`RDKafkaErrorCode::QueueFull`] error and it won't be able to send
22//! any other message until more delivery events are processed via `poll`. The
23//! `QueueFull` error can also be returned if Kafka is not able to receive the
24//! messages quickly enough.
25//!
26//! ### Error reporting
27//!
28//! The C library will try deal with all the transient errors such as broker
29//! disconnection, timeouts etc. These errors, called global errors, are
30//! automatically logged in rust-rdkafka, but they normally don't require any
31//! handling as they are automatically handled internally. To see the logs, make
32//! sure you initialize the logger.
33//!
34//! As mentioned earlier, errors specific to message production will be reported
35//! in the delivery callback.
36//!
37//! ### Buffering
38//!
39//! Buffering is done automatically by librdkafka. When `send` is called, the
40//! message is enqueued internally and once enough messages have been enqueued,
41//! or when enough time has passed, they will be sent to Kafka as a single
42//! batch. You can control the behavior of the buffer by configuring the the
43//! `queue.buffering.max.*` parameters listed below.
44//!
45//! ## `rust-rdkafka` producers
46//!
47//! `rust-rdkafka` (rdkafka for brevity) provides two sets of producers: low
48//! level and high level.
49//!
50//! ### Low-level producers
51//!
52//! The lowest level producer provided by rdkafka is called [`BaseProducer`].
53//! The goal of the `BaseProducer` is to be as close as possible to the C one
54//! while maintaining a safe Rust interface. In particular, the `BaseProducer`
55//! needs to be polled at regular intervals to execute any delivery callback
56//! that might be waiting and to make sure the queue doesn't fill up.
57//!
58//! Another low lever producer is the [`ThreadedProducer`], which is a
59//! `BaseProducer` with a dedicated thread for polling.
60//!
61//! The delivery callback can be defined using a `ProducerContext`. See the
62//! [`base_producer`] module for more information.
63//!
64//! ### High-level producer
65//!
66//! At the moment the only high level producer implemented is the
67//! [`FutureProducer`]. The `FutureProducer` doesn't rely on user-defined
68//! callbacks to notify the delivery or failure of a message; instead, this
69//! information will be returned in a Future. The `FutureProducer` also uses an
70//! internal thread that is used for polling, which makes calling poll
71//! explicitly not necessary. The returned future will contain information about
72//! the delivered message in case of success, or a copy of the original message
73//! in case of failure. Additional computation can be chained to the returned
74//! future, and it will executed by the future executor once the value is
75//! available (for more information, check the documentation of the futures
76//! crate).
77//!
78//! ## Transactions
79//!
80//! All rust-rdkafka producers support transactions. Transactional producers
81//! work together with transaction-aware consumers configured with the default
82//! `isolation.level` of `read_committed`.
83//!
84//! To configure a producer for transactions set `transactional.id` to an
85//! identifier unique to the application when creating the producer. After
86//! creating the producer, you must initialize it with
87//! [`Producer::init_transactions`].
88//!
89//! To start a new transaction use [`Producer::begin_transaction`]. There can be
90//! **only one ongoing transaction** at a time per producer. All records sent
91//! after starting a transaction and before committing or aborting it will
92//! automatically be associated with that transaction.
93//!
94//! Once you have initialized transactions on a producer, you are not permitted
95//! to produce messages outside of a transaction.
96//!
97//! Consumer offsets can be sent as part of the ongoing transaction using
98//! `send_offsets_to_transaction` and will be committed atomically with the
99//! other records sent in the transaction.
100//!
101//! The current transaction can be committed with
102//! [`Producer::commit_transaction`] or aborted using
103//! [`Producer::abort_transaction`]. Afterwards, a new transaction can begin.
104//!
105//! ### Errors
106//!
107//! Errors returned by transaction methods may:
108//!
109//! * be retriable ([`RDKafkaError::is_retriable`]), in which case the operation
110//! that encountered the error may be retried.
111//! * require abort ([`RDKafkaError::txn_requires_abort`], in which case the
112//! current transaction must be aborted and a new transaction begun.
113//! * be fatal ([`RDKafkaError::is_fatal`]), in which case the producer must be
114//! stopped and the application terminated.
115//!
116//! For more details about transactions, see the [Transactional Producer]
117//! section of the librdkafka introduction.
118//!
119//! ## Configuration
120//!
121//! ### Producer configuration
122//!
123//! For the configuration parameters common to both producers and consumers,
124//! refer to the documentation in the `config` module. Here are listed the most
125//! commonly used producer configuration. Click
126//! [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
127//! for the full list.
128//!
129//! - `queue.buffering.max.messages`: Maximum number of messages allowed on the
130//! producer queue. Default: 100000.
131//! - `queue.buffering.max.kbytes`: Maximum total message size sum allowed on
132//! the producer queue. This property has higher priority than
133//! queue.buffering.max.messages. Default: 4000000.
134//! - `queue.buffering.max.ms`: Delay in milliseconds to wait for messages in
135//! the producer queue to accumulate before sending a request to the brokers.
136//! A higher value allows larger and more effective (less overhead, improved
137//! compression) batches of messages to accumulate at the expense of increased
138//! message delivery latency. Default: 0.
139//! - `message.send.max.retries`: How many times to retry sending a failing
140//! batch. Note: retrying may cause reordering. Default: 2.
141//! - `compression.codec`: Compression codec to use for compressing message
142//! sets. Default: none.
143//! - `request.required.acks`: This field indicates how many acknowledgements
144//! the leader broker must receive from ISR brokers before responding to the
145//! request: 0=Broker does not send any response/ack to client, 1=Only the
146//! leader broker will need to ack the message, -1 or all=broker will block
147//! until message is committed by all in sync replicas (ISRs) or broker's
148//! in.sync.replicas setting before sending response. Default: 1.
149//! - `request.timeout.ms`: The ack timeout of the producer request in
150//! milliseconds. This value is only enforced by the broker and relies on
151//! request.required.acks being != 0. Default: 5000.
152//! - `message.timeout.ms`: Local message timeout. This value is only enforced
153//! locally and limits the time a produced message waits for successful
154//! delivery. A time of 0 is infinite. Default: 300000.
155//!
156//! [`RDKafkaErrorCode::QueueFull`]: crate::error::RDKafkaErrorCode::QueueFull
157//! [`RDKafkaError::is_retriable`]: crate::error::RDKafkaError::is_retriable
158//! [`RDKafkaError::txn_requires_abort`]: crate::error::RDKafkaError::txn_requires_abort
159//! [`RDKafkaError::is_fatal`]: crate::error::RDKafkaError::is_fatal
160//! [Transactional Producer]: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#transactional-producer
161
162use std::sync::Arc;
163
164use crate::client::{Client, ClientContext};
165use crate::consumer::ConsumerGroupMetadata;
166use crate::error::KafkaResult;
167use crate::topic_partition_list::TopicPartitionList;
168use crate::util::{IntoOpaque, Timeout};
169
170pub mod base_producer;
171pub mod future_producer;
172
173#[doc(inline)]
174pub use self::base_producer::{BaseProducer, BaseRecord, DeliveryResult, ThreadedProducer};
175#[doc(inline)]
176pub use self::future_producer::{DeliveryFuture, FutureProducer, FutureRecord};
177
178//
179// ********** PRODUCER CONTEXT **********
180//
181
182/// Producer-specific context.
183///
184/// This user-defined object can be used to provide custom callbacks for
185/// producer events. Refer to the list of methods to check which callbacks can
186/// be specified.
187///
188/// In particular, it can be used to specify the `delivery` callback that will
189/// be called when the acknowledgement for a delivered message is received.
190///
191/// See also the [`ClientContext`] trait.
192pub trait ProducerContext: ClientContext {
193 /// A `DeliveryOpaque` is a user-defined structure that will be passed to
194 /// the producer when producing a message, and returned to the `delivery`
195 /// method once the message has been delivered, or failed to.
196 type DeliveryOpaque: IntoOpaque;
197
198 /// This method will be called once the message has been delivered (or
199 /// failed to). The `DeliveryOpaque` will be the one provided by the user
200 /// when calling send.
201 fn delivery(&self, delivery_result: &DeliveryResult<'_>, delivery_opaque: Self::DeliveryOpaque);
202}
203
204/// An inert producer context that can be used when customizations are not
205/// required.
206#[derive(Clone)]
207pub struct DefaultProducerContext;
208
209impl ClientContext for DefaultProducerContext {}
210impl ProducerContext for DefaultProducerContext {
211 type DeliveryOpaque = ();
212
213 fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {}
214}
215
216/// Common trait for all producers.
217pub trait Producer<C = DefaultProducerContext>
218where
219 C: ProducerContext + 'static,
220{
221 /// Returns the [`Client`] underlying this producer.
222 fn client(&self) -> &Client<C>;
223
224 /// Returns a reference to the [`ProducerContext`] used to create this
225 /// producer.
226 fn context(&self) -> &Arc<C> {
227 self.client().context()
228 }
229
230 /// Returns the number of messages that are either waiting to be sent or are
231 /// sent but are waiting to be acknowledged.
232 fn in_flight_count(&self) -> i32;
233
234 /// Flushes any pending messages.
235 ///
236 /// This method should be called before termination to ensure delivery of
237 /// all enqueued messages. It will call `poll()` internally.
238 fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
239
240 /// Enable sending transactions with this producer.
241 ///
242 /// # Prerequisites
243 ///
244 /// * The configuration used to create the producer must include a
245 /// `transactional.id` setting.
246 /// * You must not have sent any messages or called any of the other
247 /// transaction-related functions.
248 ///
249 /// # Details
250 ///
251 /// This function ensures any transactions initiated by previous producers
252 /// with the same `transactional.id` are completed. Any transactions left
253 /// open by any such previous producers will be aborted.
254 ///
255 /// Once previous transactions have been fenced, this function acquires an
256 /// internal producer ID and epoch that will be used by all transactional
257 /// messages sent by this producer.
258 ///
259 /// If this function returns successfully, messages may only be sent to this
260 /// producer when a transaction is active. See
261 /// [`Producer::begin_transaction`].
262 ///
263 /// This function may block for the specified `timeout`.
264 fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
265
266 /// Begins a new transaction.
267 ///
268 /// # Prerequisites
269 ///
270 /// You must have successfully called [`Producer::init_transactions`].
271 ///
272 /// # Details
273 ///
274 /// This function begins a new transaction, and implicitly associates that
275 /// open transaction with this producer.
276 ///
277 /// After a successful call to this function, any messages sent via this
278 /// producer or any calls to [`Producer::send_offsets_to_transaction`] will
279 /// be implicitly associated with this transaction, until the transaction is
280 /// finished.
281 ///
282 /// Finish the transaction by calling [`Producer::commit_transaction`] or
283 /// [`Producer::abort_transaction`].
284 ///
285 /// While a transaction is open, you must perform at least one transaction
286 /// operation every `transaction.timeout.ms` to avoid timing out the
287 /// transaction on the broker.
288 fn begin_transaction(&self) -> KafkaResult<()>;
289
290 /// Associates an offset commit operation with this transaction.
291 ///
292 /// # Prerequisites
293 ///
294 /// The producer must have an open transaction via a call to
295 /// [`Producer::begin_transaction`].
296 ///
297 /// # Details
298 ///
299 /// Sends a list of topic partition offsets to the consumer group
300 /// coordinator for `cgm`, and marks the offsets as part of the current
301 /// transaction. These offsets will be considered committed only if the
302 /// transaction is committed successfully.
303 ///
304 /// The offsets should be the next message your application will consume,
305 /// i.e., one greater than the the last processed message's offset for each
306 /// partition.
307 ///
308 /// Use this method at the end of a consume-transform-produce loop, prior to
309 /// comitting the transaction with [`Producer::commit_transaction`].
310 ///
311 /// This function may block for the specified `timeout`.
312 ///
313 /// # Hints
314 ///
315 /// To obtain the correct consumer group metadata, call
316 /// [`Consumer::group_metadata`] on the consumer for which offsets are being
317 /// committed.
318 ///
319 /// The consumer must not have automatic commits enabled.
320 ///
321 /// [`Consumer::group_metadata`]: crate::consumer::Consumer::group_metadata
322 fn send_offsets_to_transaction<T: Into<Timeout>>(
323 &self,
324 offsets: &TopicPartitionList,
325 cgm: &ConsumerGroupMetadata,
326 timeout: T,
327 ) -> KafkaResult<()>;
328
329 /// Commits the current transaction.
330 ///
331 /// # Prerequisites
332 ///
333 /// The producer must have an open transaction via a call to
334 /// [`Producer::begin_transaction`].
335 ///
336 /// # Details
337 ///
338 /// Any outstanding messages will be flushed (i.e., delivered) before
339 /// actually committing the transaction.
340 ///
341 /// If any of the outstanding messages fail permanently, the current
342 /// transaction will enter an abortable error state and this function will
343 /// return an abortable error. You must then call
344 /// [`Producer::abort_transaction`] before attemping to create another
345 /// transaction.
346 ///
347 /// This function may block for the specified `timeout`.
348 fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
349
350 /// Aborts the current transaction.
351 ///
352 /// # Prerequisites
353 ///
354 /// The producer must have an open transaction via a call to
355 /// [`Producer::begin_transaction`].
356 ///
357 /// # Details
358 ///
359 /// Any oustanding messages will be purged and failed with
360 /// [`RDKafkaErrorCode::PurgeInflight`] or [`RDKafkaErrorCode::PurgeQueue`].
361 ///
362 /// This function should also be used to recover from non-fatal abortable
363 /// transaction errors.
364 ///
365 /// This function may block for the specified `timeout`.
366 ///
367 /// [`RDKafkaErrorCode::PurgeInflight`]: crate::error::RDKafkaErrorCode::PurgeInflight
368 /// [`RDKafkaErrorCode::PurgeQueue`]: crate::error::RDKafkaErrorCode::PurgeQueue
369 fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()>;
370}