1use std::ffi::CString;
45use std::mem;
46use std::os::raw::c_void;
47use std::ptr;
48use std::sync::atomic::{AtomicBool, Ordering};
49use std::sync::Arc;
50use std::thread::{self, JoinHandle};
51use std::time::Duration;
52
53use rdkafka_sys as rdsys;
54use rdkafka_sys::rd_kafka_vtype_t::*;
55use rdkafka_sys::types::*;
56
57use crate::client::Client;
58use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
59use crate::consumer::ConsumerGroupMetadata;
60use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError};
61use crate::log::{trace, warn};
62use crate::message::{BorrowedMessage, OwnedHeaders, ToBytes};
63use crate::producer::{DefaultProducerContext, Producer, ProducerContext};
64use crate::topic_partition_list::TopicPartitionList;
65use crate::util::{IntoOpaque, Timeout};
66
67pub use crate::message::DeliveryResult;
68
69unsafe extern "C" fn delivery_cb<C: ProducerContext>(
72 _client: *mut RDKafka,
73 msg: *const RDKafkaMessage,
74 opaque: *mut c_void,
75) {
76 let producer_context = &mut *(opaque as *mut C);
77 let delivery_opaque = C::DeliveryOpaque::from_ptr((*msg)._private);
78 let owner = 42u8;
79 let delivery_result = BorrowedMessage::from_dr_callback(msg as *mut RDKafkaMessage, &owner);
82 trace!("Delivery event received: {:?}", delivery_result);
83 producer_context.delivery(&delivery_result, delivery_opaque);
84 match delivery_result {
85 Ok(message) | Err((_, message)) => mem::forget(message),
87 }
88}
89
90#[derive(Debug)]
126pub struct BaseRecord<'a, K: ToBytes + ?Sized = (), P: ToBytes + ?Sized = (), D: IntoOpaque = ()> {
127 pub topic: &'a str,
129 pub partition: Option<i32>,
131 pub payload: Option<&'a P>,
133 pub key: Option<&'a K>,
135 pub timestamp: Option<i64>,
140 pub headers: Option<OwnedHeaders>,
142 pub delivery_opaque: D,
144}
145
146impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized, D: IntoOpaque> BaseRecord<'a, K, P, D> {
147 pub fn with_opaque_to(topic: &'a str, delivery_opaque: D) -> BaseRecord<'a, K, P, D> {
149 BaseRecord {
150 topic,
151 partition: None,
152 payload: None,
153 key: None,
154 timestamp: None,
155 headers: None,
156 delivery_opaque,
157 }
158 }
159
160 pub fn partition(mut self, partition: i32) -> BaseRecord<'a, K, P, D> {
162 self.partition = Some(partition);
163 self
164 }
165
166 pub fn payload(mut self, payload: &'a P) -> BaseRecord<'a, K, P, D> {
168 self.payload = Some(payload);
169 self
170 }
171
172 pub fn key(mut self, key: &'a K) -> BaseRecord<'a, K, P, D> {
174 self.key = Some(key);
175 self
176 }
177
178 pub fn timestamp(mut self, timestamp: i64) -> BaseRecord<'a, K, P, D> {
183 self.timestamp = Some(timestamp);
184 self
185 }
186
187 pub fn headers(mut self, headers: OwnedHeaders) -> BaseRecord<'a, K, P, D> {
189 self.headers = Some(headers);
190 self
191 }
192}
193
194impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> BaseRecord<'a, K, P, ()> {
195 pub fn to(topic: &'a str) -> BaseRecord<'a, K, P, ()> {
197 BaseRecord {
198 topic,
199 partition: None,
200 payload: None,
201 key: None,
202 timestamp: None,
203 headers: None,
204 delivery_opaque: (),
205 }
206 }
207}
208
209impl FromClientConfig for BaseProducer<DefaultProducerContext> {
210 fn from_config(config: &ClientConfig) -> KafkaResult<BaseProducer<DefaultProducerContext>> {
212 BaseProducer::from_config_and_context(config, DefaultProducerContext)
213 }
214}
215
216impl<C> FromClientConfigAndContext<C> for BaseProducer<C>
217where
218 C: ProducerContext,
219{
220 fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<BaseProducer<C>> {
223 let native_config = config.create_native_config()?;
224 unsafe { rdsys::rd_kafka_conf_set_dr_msg_cb(native_config.ptr(), Some(delivery_cb::<C>)) };
225 let client = Client::new(
226 config,
227 native_config,
228 RDKafkaType::RD_KAFKA_PRODUCER,
229 context,
230 )?;
231 Ok(BaseProducer::from_client(client))
232 }
233}
234
235pub struct BaseProducer<C = DefaultProducerContext>
275where
276 C: ProducerContext + 'static,
277{
278 client_arc: Arc<Client<C>>,
279}
280
281impl<C> BaseProducer<C>
282where
283 C: ProducerContext,
284{
285 fn from_client(client: Client<C>) -> BaseProducer<C> {
287 BaseProducer {
288 client_arc: Arc::new(client),
289 }
290 }
291
292 pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> i32 {
297 unsafe { rdsys::rd_kafka_poll(self.native_ptr(), timeout.into().as_millis()) }
298 }
299
300 fn native_ptr(&self) -> *mut RDKafka {
302 self.client_arc.native_ptr()
303 }
304
305 pub fn send<'a, K, P>(
325 &self,
326 mut record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
327 ) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
328 where
329 K: ToBytes + ?Sized,
330 P: ToBytes + ?Sized,
331 {
332 fn as_bytes(opt: Option<&(impl ?Sized + ToBytes)>) -> (*mut c_void, usize) {
333 match opt.map(ToBytes::to_bytes) {
334 None => (ptr::null_mut(), 0),
335 Some(p) => (p.as_ptr() as *mut c_void, p.len()),
336 }
337 }
338 let (payload_ptr, payload_len) = as_bytes(record.payload);
339 let (key_ptr, key_len) = as_bytes(record.key);
340 let topic_cstring = CString::new(record.topic.to_owned()).unwrap();
341 let opaque_ptr = record.delivery_opaque.into_ptr();
342 let produce_error = unsafe {
343 rdsys::rd_kafka_producev(
344 self.native_ptr(),
345 RD_KAFKA_VTYPE_TOPIC,
346 topic_cstring.as_ptr(),
347 RD_KAFKA_VTYPE_PARTITION,
348 record.partition.unwrap_or(-1),
349 RD_KAFKA_VTYPE_MSGFLAGS,
350 rdsys::RD_KAFKA_MSG_F_COPY as i32,
351 RD_KAFKA_VTYPE_VALUE,
352 payload_ptr,
353 payload_len,
354 RD_KAFKA_VTYPE_KEY,
355 key_ptr,
356 key_len,
357 RD_KAFKA_VTYPE_OPAQUE,
358 opaque_ptr,
359 RD_KAFKA_VTYPE_TIMESTAMP,
360 record.timestamp.unwrap_or(0),
361 RD_KAFKA_VTYPE_HEADERS,
362 record
363 .headers
364 .as_ref()
365 .map_or(ptr::null_mut(), OwnedHeaders::ptr),
366 RD_KAFKA_VTYPE_END,
367 )
368 };
369 if produce_error.is_error() {
370 record.delivery_opaque = unsafe { C::DeliveryOpaque::from_ptr(opaque_ptr) };
371 Err((KafkaError::MessageProduction(produce_error.into()), record))
372 } else {
373 mem::forget(record.headers);
375 Ok(())
376 }
377 }
378}
379
380impl<C> Producer<C> for BaseProducer<C>
381where
382 C: ProducerContext,
383{
384 fn client(&self) -> &Client<C> {
385 &*self.client_arc
386 }
387
388 fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
389 let ret = unsafe { rdsys::rd_kafka_flush(self.native_ptr(), timeout.into().as_millis()) };
390 if ret.is_error() {
391 Err(KafkaError::Flush(ret.into()))
392 } else {
393 Ok(())
394 }
395 }
396
397 fn in_flight_count(&self) -> i32 {
398 unsafe { rdsys::rd_kafka_outq_len(self.native_ptr()) }
399 }
400
401 fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
402 let ret = unsafe {
403 RDKafkaError::from_ptr(rdsys::rd_kafka_init_transactions(
404 self.native_ptr(),
405 timeout.into().as_millis(),
406 ))
407 };
408 if ret.is_error() {
409 Err(KafkaError::Transaction(ret))
410 } else {
411 Ok(())
412 }
413 }
414
415 fn begin_transaction(&self) -> KafkaResult<()> {
416 let ret =
417 unsafe { RDKafkaError::from_ptr(rdsys::rd_kafka_begin_transaction(self.native_ptr())) };
418 if ret.is_error() {
419 Err(KafkaError::Transaction(ret))
420 } else {
421 Ok(())
422 }
423 }
424
425 fn send_offsets_to_transaction<T: Into<Timeout>>(
426 &self,
427 offsets: &TopicPartitionList,
428 cgm: &ConsumerGroupMetadata,
429 timeout: T,
430 ) -> KafkaResult<()> {
431 let ret = unsafe {
432 RDKafkaError::from_ptr(rdsys::rd_kafka_send_offsets_to_transaction(
433 self.native_ptr(),
434 offsets.ptr(),
435 cgm.ptr(),
436 timeout.into().as_millis(),
437 ))
438 };
439 if ret.is_error() {
440 Err(KafkaError::Transaction(ret))
441 } else {
442 Ok(())
443 }
444 }
445
446 fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
447 let ret = unsafe {
448 RDKafkaError::from_ptr(rdsys::rd_kafka_commit_transaction(
449 self.native_ptr(),
450 timeout.into().as_millis(),
451 ))
452 };
453 if ret.is_error() {
454 Err(KafkaError::Transaction(ret))
455 } else {
456 Ok(())
457 }
458 }
459
460 fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
461 let ret = unsafe {
462 RDKafkaError::from_ptr(rdsys::rd_kafka_abort_transaction(
463 self.native_ptr(),
464 timeout.into().as_millis(),
465 ))
466 };
467 if ret.is_error() {
468 Err(KafkaError::Transaction(ret))
469 } else {
470 Ok(())
471 }
472 }
473}
474
475impl<C> Clone for BaseProducer<C>
476where
477 C: ProducerContext,
478{
479 fn clone(&self) -> BaseProducer<C> {
480 BaseProducer {
481 client_arc: self.client_arc.clone(),
482 }
483 }
484}
485
486#[must_use = "The threaded producer will stop immediately if unused"]
497pub struct ThreadedProducer<C>
498where
499 C: ProducerContext + 'static,
500{
501 producer: BaseProducer<C>,
502 should_stop: Arc<AtomicBool>,
503 handle: Option<Arc<JoinHandle<()>>>,
504}
505
506impl FromClientConfig for ThreadedProducer<DefaultProducerContext> {
507 fn from_config(config: &ClientConfig) -> KafkaResult<ThreadedProducer<DefaultProducerContext>> {
508 ThreadedProducer::from_config_and_context(config, DefaultProducerContext)
509 }
510}
511
512impl<C> FromClientConfigAndContext<C> for ThreadedProducer<C>
513where
514 C: ProducerContext + 'static,
515{
516 fn from_config_and_context(
517 config: &ClientConfig,
518 context: C,
519 ) -> KafkaResult<ThreadedProducer<C>> {
520 let producer = BaseProducer::from_config_and_context(config, context)?;
521 let should_stop = Arc::new(AtomicBool::new(false));
522 let thread = {
523 let producer = producer.clone();
524 let should_stop = should_stop.clone();
525 thread::Builder::new()
526 .name("producer polling thread".to_string())
527 .spawn(move || {
528 trace!("Polling thread loop started");
529 loop {
530 let n = producer.poll(Duration::from_millis(100));
531 if n == 0 {
532 if should_stop.load(Ordering::Relaxed) {
533 break;
536 }
537 } else {
538 trace!("Received {} events", n);
539 }
540 }
541 trace!("Polling thread loop terminated");
542 })
543 .expect("Failed to start polling thread")
544 };
545 Ok(ThreadedProducer {
546 producer,
547 should_stop,
548 handle: Some(Arc::new(thread)),
549 })
550 }
551}
552
553impl<C> ThreadedProducer<C>
554where
555 C: ProducerContext + 'static,
556{
557 pub fn send<'a, K, P>(
563 &self,
564 record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
565 ) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
566 where
567 K: ToBytes + ?Sized,
568 P: ToBytes + ?Sized,
569 {
570 self.producer.send(record)
571 }
572
573 pub fn poll<T: Into<Timeout>>(&self, timeout: T) {
578 self.producer.poll(timeout);
579 }
580}
581
582impl<C> Producer<C> for ThreadedProducer<C>
583where
584 C: ProducerContext + 'static,
585{
586 fn client(&self) -> &Client<C> {
587 self.producer.client()
588 }
589
590 fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
591 self.producer.flush(timeout)
592 }
593
594 fn in_flight_count(&self) -> i32 {
595 self.producer.in_flight_count()
596 }
597
598 fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
599 self.producer.init_transactions(timeout)
600 }
601
602 fn begin_transaction(&self) -> KafkaResult<()> {
603 self.producer.begin_transaction()
604 }
605
606 fn send_offsets_to_transaction<T: Into<Timeout>>(
607 &self,
608 offsets: &TopicPartitionList,
609 cgm: &ConsumerGroupMetadata,
610 timeout: T,
611 ) -> KafkaResult<()> {
612 self.producer
613 .send_offsets_to_transaction(offsets, cgm, timeout)
614 }
615
616 fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
617 self.producer.commit_transaction(timeout)
618 }
619
620 fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
621 self.producer.abort_transaction(timeout)
622 }
623}
624
625impl<C: ProducerContext + 'static> Clone for ThreadedProducer<C> {
626 fn clone(&self) -> Self {
627 Self {
628 producer: self.producer.clone(),
629 should_stop: Arc::clone(&self.should_stop),
630 handle: self.handle.clone(),
631 }
632 }
633}
634
635impl<C> Drop for ThreadedProducer<C>
636where
637 C: ProducerContext + 'static,
638{
639 fn drop(&mut self) {
640 trace!("Destroy ThreadedProducer");
641 if let Some(handle) = self.handle.take().and_then(Arc::into_inner) {
642 trace!("Stopping polling");
643 self.should_stop.store(true, Ordering::Relaxed);
644 trace!("Waiting for polling thread termination");
645 match handle.join() {
646 Ok(()) => trace!("Polling stopped"),
647 Err(e) => warn!("Failure while terminating thread: {:?}", e),
648 };
649 }
650 trace!("ThreadedProducer destroyed");
651 }
652}
653
654#[cfg(test)]
655mod tests {
656 use super::*;
660 use crate::config::ClientConfig;
661
662 #[test]
664 fn test_base_producer_clone() {
665 let producer = ClientConfig::new().create::<BaseProducer<_>>().unwrap();
666 let _producer_clone = producer.clone();
667 }
668}