1use std::error::Error;
7use std::future::Future;
8use std::io;
9use std::marker::PhantomData;
10use std::net::SocketAddr;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::time::{Duration, Instant};
15
16use futures_channel::oneshot;
17use futures_util::FutureExt;
18
19use crate::client::{Client, ClientContext, DefaultClientContext, OAuthToken};
20use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel};
21use crate::consumer::ConsumerGroupMetadata;
22use crate::error::{KafkaError, KafkaResult, RDKafkaErrorCode};
23use crate::message::{Message, OwnedHeaders, OwnedMessage, Timestamp, ToBytes};
24use crate::producer::{BaseRecord, DeliveryResult, Producer, ProducerContext, ThreadedProducer};
25use crate::statistics::Statistics;
26use crate::topic_partition_list::TopicPartitionList;
27use crate::util::{AsyncRuntime, DefaultRuntime, IntoOpaque, Timeout};
28
29#[derive(Debug)]
39pub struct FutureRecord<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> {
40 pub topic: &'a str,
42 pub partition: Option<i32>,
44 pub payload: Option<&'a P>,
46 pub key: Option<&'a K>,
48 pub timestamp: Option<i64>,
50 pub headers: Option<OwnedHeaders>,
52}
53
54impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> FutureRecord<'a, K, P> {
55 pub fn to(topic: &'a str) -> FutureRecord<'a, K, P> {
57 FutureRecord {
58 topic,
59 partition: None,
60 payload: None,
61 key: None,
62 timestamp: None,
63 headers: None,
64 }
65 }
66
67 fn from_base_record<D: IntoOpaque>(
68 base_record: BaseRecord<'a, K, P, D>,
69 ) -> FutureRecord<'a, K, P> {
70 FutureRecord {
71 topic: base_record.topic,
72 partition: base_record.partition,
73 key: base_record.key,
74 payload: base_record.payload,
75 timestamp: base_record.timestamp,
76 headers: base_record.headers,
77 }
78 }
79
80 pub fn partition(mut self, partition: i32) -> FutureRecord<'a, K, P> {
82 self.partition = Some(partition);
83 self
84 }
85
86 pub fn payload(mut self, payload: &'a P) -> FutureRecord<'a, K, P> {
88 self.payload = Some(payload);
89 self
90 }
91
92 pub fn key(mut self, key: &'a K) -> FutureRecord<'a, K, P> {
94 self.key = Some(key);
95 self
96 }
97
98 pub fn timestamp(mut self, timestamp: i64) -> FutureRecord<'a, K, P> {
100 self.timestamp = Some(timestamp);
101 self
102 }
103
104 pub fn headers(mut self, headers: OwnedHeaders) -> FutureRecord<'a, K, P> {
106 self.headers = Some(headers);
107 self
108 }
109
110 fn into_base_record<D: IntoOpaque>(self, delivery_opaque: D) -> BaseRecord<'a, K, P, D> {
111 BaseRecord {
112 topic: self.topic,
113 partition: self.partition,
114 key: self.key,
115 payload: self.payload,
116 timestamp: self.timestamp,
117 headers: self.headers,
118 delivery_opaque,
119 }
120 }
121}
122
123#[derive(Clone)]
128pub struct FutureProducerContext<C: ClientContext + 'static> {
129 wrapped_context: C,
130}
131
132pub type OwnedDeliveryResult = Result<(i32, i64), (KafkaError, OwnedMessage)>;
140
141impl<C: ClientContext + 'static> ClientContext for FutureProducerContext<C> {
143 const ENABLE_REFRESH_OAUTH_TOKEN: bool = C::ENABLE_REFRESH_OAUTH_TOKEN;
144
145 fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
146 self.wrapped_context.log(level, fac, log_message);
147 }
148
149 fn stats(&self, statistics: Statistics) {
150 self.wrapped_context.stats(statistics);
151 }
152
153 fn stats_raw(&self, statistics: &[u8]) {
154 self.wrapped_context.stats_raw(statistics)
155 }
156
157 fn error(&self, error: KafkaError, reason: &str) {
158 self.wrapped_context.error(error, reason);
159 }
160
161 fn resolve_broker_addr(&self, host: &str, port: u16) -> Result<Vec<SocketAddr>, io::Error> {
162 self.wrapped_context.resolve_broker_addr(host, port)
163 }
164
165 fn generate_oauth_token(
166 &self,
167 oauthbearer_config: Option<&str>,
168 ) -> Result<OAuthToken, Box<dyn Error>> {
169 self.wrapped_context
170 .generate_oauth_token(oauthbearer_config)
171 }
172}
173
174impl<C: ClientContext + 'static> ProducerContext for FutureProducerContext<C> {
175 type DeliveryOpaque = Box<oneshot::Sender<OwnedDeliveryResult>>;
176
177 fn delivery(
178 &self,
179 delivery_result: &DeliveryResult<'_>,
180 tx: Box<oneshot::Sender<OwnedDeliveryResult>>,
181 ) {
182 let owned_delivery_result = match *delivery_result {
183 Ok(ref message) => Ok((message.partition(), message.offset())),
184 Err((ref error, ref message)) => Err((error.clone(), message.detach())),
185 };
186 let _ = tx.send(owned_delivery_result); }
188}
189
190#[must_use = "Producer polling thread will stop immediately if unused"]
202pub struct FutureProducer<C = DefaultClientContext, R = DefaultRuntime>
203where
204 C: ClientContext + 'static,
205{
206 producer: Arc<ThreadedProducer<FutureProducerContext<C>>>,
207 _runtime: PhantomData<R>,
208}
209
210impl<C, R> Clone for FutureProducer<C, R>
211where
212 C: ClientContext + 'static,
213{
214 fn clone(&self) -> FutureProducer<C, R> {
215 FutureProducer {
216 producer: self.producer.clone(),
217 _runtime: PhantomData,
218 }
219 }
220}
221
222impl<R> FromClientConfig for FutureProducer<DefaultClientContext, R>
223where
224 R: AsyncRuntime,
225{
226 fn from_config(config: &ClientConfig) -> KafkaResult<FutureProducer<DefaultClientContext, R>> {
227 FutureProducer::from_config_and_context(config, DefaultClientContext)
228 }
229}
230
231impl<C, R> FromClientConfigAndContext<C> for FutureProducer<C, R>
232where
233 C: ClientContext + 'static,
234 R: AsyncRuntime,
235{
236 fn from_config_and_context(
237 config: &ClientConfig,
238 context: C,
239 ) -> KafkaResult<FutureProducer<C, R>> {
240 let future_context = FutureProducerContext {
241 wrapped_context: context,
242 };
243 let threaded_producer = ThreadedProducer::from_config_and_context(config, future_context)?;
244 Ok(FutureProducer {
245 producer: Arc::new(threaded_producer),
246 _runtime: PhantomData,
247 })
248 }
249}
250
251pub struct DeliveryFuture {
258 rx: oneshot::Receiver<OwnedDeliveryResult>,
259}
260
261impl Future for DeliveryFuture {
262 type Output = Result<OwnedDeliveryResult, oneshot::Canceled>;
263
264 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
265 self.rx.poll_unpin(cx)
266 }
267}
268
269impl<C, R> FutureProducer<C, R>
270where
271 C: ClientContext + 'static,
272 R: AsyncRuntime,
273{
274 pub async fn send<K, P, T>(
290 &self,
291 record: FutureRecord<'_, K, P>,
292 queue_timeout: T,
293 ) -> OwnedDeliveryResult
294 where
295 K: ToBytes + ?Sized,
296 P: ToBytes + ?Sized,
297 T: Into<Timeout>,
298 {
299 let start_time = Instant::now();
300 let queue_timeout = queue_timeout.into();
301 let can_retry = || match queue_timeout {
302 Timeout::Never => true,
303 Timeout::After(t) if start_time.elapsed() < t => true,
304 _ => false,
305 };
306
307 let (tx, rx) = oneshot::channel();
308 let mut base_record = record.into_base_record(Box::new(tx));
309
310 loop {
311 match self.producer.send(base_record) {
312 Err((e, record))
313 if e == KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull)
314 && can_retry() =>
315 {
316 base_record = record;
317 R::delay_for(Duration::from_millis(100)).await;
318 }
319 Ok(_) => {
320 break rx.await.expect("producer unexpectedly dropped");
324 }
325 Err((e, record)) => {
326 let owned_message = OwnedMessage::new(
327 record.payload.map(|p| p.to_bytes().to_vec()),
328 record.key.map(|k| k.to_bytes().to_vec()),
329 record.topic.to_owned(),
330 record
331 .timestamp
332 .map_or(Timestamp::NotAvailable, Timestamp::CreateTime),
333 record.partition.unwrap_or(-1),
334 0,
335 record.headers,
336 );
337 break Err((e, owned_message));
338 }
339 }
340 }
341 }
342
343 pub fn send_result<'a, K, P>(
346 &self,
347 record: FutureRecord<'a, K, P>,
348 ) -> Result<DeliveryFuture, (KafkaError, FutureRecord<'a, K, P>)>
349 where
350 K: ToBytes + ?Sized,
351 P: ToBytes + ?Sized,
352 {
353 let (tx, rx) = oneshot::channel();
354 let base_record = record.into_base_record(Box::new(tx));
355 self.producer
356 .send(base_record)
357 .map(|()| DeliveryFuture { rx })
358 .map_err(|(e, record)| (e, FutureRecord::from_base_record(record)))
359 }
360
361 pub fn poll<T: Into<Timeout>>(&self, timeout: T) {
366 self.producer.poll(timeout);
367 }
368}
369
370impl<C, R> Producer<FutureProducerContext<C>> for FutureProducer<C, R>
371where
372 C: ClientContext + 'static,
373 R: AsyncRuntime,
374{
375 fn client(&self) -> &Client<FutureProducerContext<C>> {
376 self.producer.client()
377 }
378
379 fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
380 self.producer.flush(timeout)
381 }
382
383 fn in_flight_count(&self) -> i32 {
384 self.producer.in_flight_count()
385 }
386
387 fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
388 self.producer.init_transactions(timeout)
389 }
390
391 fn begin_transaction(&self) -> KafkaResult<()> {
392 self.producer.begin_transaction()
393 }
394
395 fn send_offsets_to_transaction<T: Into<Timeout>>(
396 &self,
397 offsets: &TopicPartitionList,
398 cgm: &ConsumerGroupMetadata,
399 timeout: T,
400 ) -> KafkaResult<()> {
401 self.producer
402 .send_offsets_to_transaction(offsets, cgm, timeout)
403 }
404
405 fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
406 self.producer.commit_transaction(timeout)
407 }
408
409 fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
410 self.producer.abort_transaction(timeout)
411 }
412}
413
414#[cfg(test)]
415mod tests {
416 use super::*;
419 use crate::config::ClientConfig;
420
421 struct TestContext;
422
423 impl ClientContext for TestContext {}
424 impl ProducerContext for TestContext {
425 type DeliveryOpaque = Box<i32>;
426
427 fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {
428 unimplemented!()
429 }
430 }
431
432 #[test]
434 fn test_future_producer_clone() {
435 let producer = ClientConfig::new().create::<FutureProducer>().unwrap();
436 let _producer_clone = producer.clone();
437 }
438
439 #[test]
441 fn test_base_future_topic_send_sync() {
442 let test_context = TestContext;
443 let producer = ClientConfig::new()
444 .create_with_context::<_, FutureProducer<TestContext>>(test_context)
445 .unwrap();
446 let _producer_clone = producer.clone();
447 }
448}