1use std::collections::HashMap;
8use std::ffi::{c_void, CStr, CString};
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::thread::{self, JoinHandle};
15use std::time::Duration;
16
17use futures_channel::oneshot;
18use futures_util::future::{self, Either, FutureExt};
19use futures_util::ready;
20
21use rdkafka_sys as rdsys;
22use rdkafka_sys::types::*;
23
24use crate::client::{Client, ClientContext, DefaultClientContext, NativeQueue};
25use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
26use crate::error::{IsError, KafkaError, KafkaResult};
27use crate::log::{trace, warn};
28use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout};
29use crate::TopicPartitionList;
30
31pub struct AdminClient<C: ClientContext + 'static> {
40 client: Client<C>,
41 queue: Arc<NativeQueue>,
42 should_stop: Arc<AtomicBool>,
43 handle: Option<JoinHandle<()>>,
44}
45
46impl<C: ClientContext> AdminClient<C> {
47 pub fn create_topics<'a, I>(
53 &self,
54 topics: I,
55 opts: &AdminOptions,
56 ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>>
57 where
58 I: IntoIterator<Item = &'a NewTopic<'a>>,
59 {
60 match self.create_topics_inner(topics, opts) {
61 Ok(rx) => Either::Left(CreateTopicsFuture { rx }),
62 Err(err) => Either::Right(future::err(err)),
63 }
64 }
65
66 fn create_topics_inner<'a, I>(
67 &self,
68 topics: I,
69 opts: &AdminOptions,
70 ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
71 where
72 I: IntoIterator<Item = &'a NewTopic<'a>>,
73 {
74 let mut native_topics = Vec::new();
75 let mut err_buf = ErrBuf::new();
76 for t in topics {
77 native_topics.push(t.to_native(&mut err_buf)?);
78 }
79 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
80 unsafe {
81 rdsys::rd_kafka_CreateTopics(
82 self.client.native_ptr(),
83 native_topics.as_c_array(),
84 native_topics.len(),
85 native_opts.ptr(),
86 self.queue.ptr(),
87 );
88 }
89 Ok(rx)
90 }
91
92 pub fn delete_topics(
98 &self,
99 topic_names: &[&str],
100 opts: &AdminOptions,
101 ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>> {
102 match self.delete_topics_inner(topic_names, opts) {
103 Ok(rx) => Either::Left(DeleteTopicsFuture { rx }),
104 Err(err) => Either::Right(future::err(err)),
105 }
106 }
107
108 fn delete_topics_inner(
109 &self,
110 topic_names: &[&str],
111 opts: &AdminOptions,
112 ) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
113 let mut native_topics = Vec::new();
114 let mut err_buf = ErrBuf::new();
115 for tn in topic_names {
116 let tn_c = CString::new(*tn)?;
117 let native_topic = unsafe {
118 NativeDeleteTopic::from_ptr(rdsys::rd_kafka_DeleteTopic_new(tn_c.as_ptr())).unwrap()
119 };
120 native_topics.push(native_topic);
121 }
122 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
123 unsafe {
124 rdsys::rd_kafka_DeleteTopics(
125 self.client.native_ptr(),
126 native_topics.as_c_array(),
127 native_topics.len(),
128 native_opts.ptr(),
129 self.queue.ptr(),
130 );
131 }
132 Ok(rx)
133 }
134
135 pub fn create_partitions<'a, I>(
143 &self,
144 partitions: I,
145 opts: &AdminOptions,
146 ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>>
147 where
148 I: IntoIterator<Item = &'a NewPartitions<'a>>,
149 {
150 match self.create_partitions_inner(partitions, opts) {
151 Ok(rx) => Either::Left(CreatePartitionsFuture { rx }),
152 Err(err) => Either::Right(future::err(err)),
153 }
154 }
155
156 fn create_partitions_inner<'a, I>(
157 &self,
158 partitions: I,
159 opts: &AdminOptions,
160 ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
161 where
162 I: IntoIterator<Item = &'a NewPartitions<'a>>,
163 {
164 let mut native_partitions = Vec::new();
165 let mut err_buf = ErrBuf::new();
166 for p in partitions {
167 native_partitions.push(p.to_native(&mut err_buf)?);
168 }
169 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
170 unsafe {
171 rdsys::rd_kafka_CreatePartitions(
172 self.client.native_ptr(),
173 native_partitions.as_c_array(),
174 native_partitions.len(),
175 native_opts.ptr(),
176 self.queue.ptr(),
177 );
178 }
179 Ok(rx)
180 }
181
182 pub fn delete_records(
196 &self,
197 offsets: &TopicPartitionList,
198 opts: &AdminOptions,
199 ) -> impl Future<Output = KafkaResult<TopicPartitionList>> {
200 match self.delete_records_inner(offsets, opts) {
201 Ok(rx) => Either::Left(DeleteRecordsFuture { rx }),
202 Err(err) => Either::Right(future::err(err)),
203 }
204 }
205
206 fn delete_records_inner(
207 &self,
208 offsets: &TopicPartitionList,
209 opts: &AdminOptions,
210 ) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
211 let mut err_buf = ErrBuf::new();
212 let delete_records = unsafe {
213 NativeDeleteRecords::from_ptr(rdsys::rd_kafka_DeleteRecords_new(offsets.ptr()))
214 }
215 .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
216 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
217 unsafe {
218 rdsys::rd_kafka_DeleteRecords(
219 self.client.native_ptr(),
220 &mut delete_records.ptr(),
221 1,
222 native_opts.ptr(),
223 self.queue.ptr(),
224 );
225 }
226 Ok(rx)
227 }
228
229 pub fn describe_configs<'a, I>(
235 &self,
236 configs: I,
237 opts: &AdminOptions,
238 ) -> impl Future<Output = KafkaResult<Vec<ConfigResourceResult>>>
239 where
240 I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
241 {
242 match self.describe_configs_inner(configs, opts) {
243 Ok(rx) => Either::Left(DescribeConfigsFuture { rx }),
244 Err(err) => Either::Right(future::err(err)),
245 }
246 }
247
248 fn describe_configs_inner<'a, I>(
249 &self,
250 configs: I,
251 opts: &AdminOptions,
252 ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
253 where
254 I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
255 {
256 let mut native_configs = Vec::new();
257 let mut err_buf = ErrBuf::new();
258 for c in configs {
259 let (name, typ) = match c {
260 ResourceSpecifier::Topic(name) => (
261 CString::new(*name)?,
262 RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC,
263 ),
264 ResourceSpecifier::Group(name) => (
265 CString::new(*name)?,
266 RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP,
267 ),
268 ResourceSpecifier::Broker(id) => (
269 CString::new(format!("{}", id))?,
270 RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
271 ),
272 };
273 native_configs.push(unsafe {
274 NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(
275 typ,
276 name.as_ptr(),
277 ))
278 .unwrap()
279 });
280 }
281 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
282 unsafe {
283 rdsys::rd_kafka_DescribeConfigs(
284 self.client.native_ptr(),
285 native_configs.as_c_array(),
286 native_configs.len(),
287 native_opts.ptr(),
288 self.queue.ptr(),
289 );
290 }
291 Ok(rx)
292 }
293
294 pub fn alter_configs<'a, I>(
300 &self,
301 configs: I,
302 opts: &AdminOptions,
303 ) -> impl Future<Output = KafkaResult<Vec<AlterConfigsResult>>>
304 where
305 I: IntoIterator<Item = &'a AlterConfig<'a>>,
306 {
307 match self.alter_configs_inner(configs, opts) {
308 Ok(rx) => Either::Left(AlterConfigsFuture { rx }),
309 Err(err) => Either::Right(future::err(err)),
310 }
311 }
312
313 fn alter_configs_inner<'a, I>(
314 &self,
315 configs: I,
316 opts: &AdminOptions,
317 ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
318 where
319 I: IntoIterator<Item = &'a AlterConfig<'a>>,
320 {
321 let mut native_configs = Vec::new();
322 let mut err_buf = ErrBuf::new();
323 for c in configs {
324 native_configs.push(c.to_native(&mut err_buf)?);
325 }
326 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
327 unsafe {
328 rdsys::rd_kafka_AlterConfigs(
329 self.client.native_ptr(),
330 native_configs.as_c_array(),
331 native_configs.len(),
332 native_opts.ptr(),
333 self.queue.ptr(),
334 );
335 }
336 Ok(rx)
337 }
338
339 pub fn inner(&self) -> &Client<C> {
341 &self.client
342 }
343}
344
345impl FromClientConfig for AdminClient<DefaultClientContext> {
346 fn from_config(config: &ClientConfig) -> KafkaResult<AdminClient<DefaultClientContext>> {
347 AdminClient::from_config_and_context(config, DefaultClientContext)
348 }
349}
350
351impl<C: ClientContext> FromClientConfigAndContext<C> for AdminClient<C> {
352 fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<AdminClient<C>> {
353 let native_config = config.create_native_config()?;
354 let client = Client::new(
360 config,
361 native_config,
362 RDKafkaType::RD_KAFKA_PRODUCER,
363 context,
364 )?;
365 let queue = Arc::new(client.new_native_queue());
366 let should_stop = Arc::new(AtomicBool::new(false));
367 let handle = start_poll_thread(queue.clone(), should_stop.clone());
368 Ok(AdminClient {
369 client,
370 queue,
371 should_stop,
372 handle: Some(handle),
373 })
374 }
375}
376
377impl<C: ClientContext> Drop for AdminClient<C> {
378 fn drop(&mut self) {
379 trace!("Stopping polling");
380 self.should_stop.store(true, Ordering::Relaxed);
381 trace!("Waiting for polling thread termination");
382 match self.handle.take().unwrap().join() {
383 Ok(()) => trace!("Polling stopped"),
384 Err(e) => warn!("Failure while terminating thread: {:?}", e),
385 };
386 }
387}
388
389fn start_poll_thread(queue: Arc<NativeQueue>, should_stop: Arc<AtomicBool>) -> JoinHandle<()> {
390 thread::Builder::new()
391 .name("admin client polling thread".into())
392 .spawn(move || {
393 trace!("Admin polling thread loop started");
394 loop {
395 let event = queue.poll(Duration::from_millis(100));
396 if event.is_null() {
397 if should_stop.load(Ordering::Relaxed) {
398 break;
401 }
402 continue;
403 }
404 let event = unsafe { NativeEvent::from_ptr(event).unwrap() };
405 let tx: Box<oneshot::Sender<NativeEvent>> =
406 unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) };
407 let _ = tx.send(event);
408 }
409 trace!("Admin polling thread loop terminated");
410 })
411 .expect("Failed to start polling thread")
412}
413
414type NativeEvent = NativePtr<RDKafkaEvent>;
415
416unsafe impl KafkaDrop for RDKafkaEvent {
417 const TYPE: &'static str = "event";
418 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_event_destroy;
419}
420
421unsafe impl Send for NativeEvent {}
422unsafe impl Sync for NativeEvent {}
423
424impl NativePtr<RDKafkaEvent> {
425 fn check_error(&self) -> KafkaResult<()> {
426 let err = unsafe { rdsys::rd_kafka_event_error(self.ptr()) };
427 if err.is_error() {
428 Err(KafkaError::AdminOp(err.into()))
429 } else {
430 Ok(())
431 }
432 }
433}
434
435#[derive(Default)]
441pub struct AdminOptions {
442 request_timeout: Option<Timeout>,
443 operation_timeout: Option<Timeout>,
444 validate_only: bool,
445 broker_id: Option<i32>,
446}
447
448impl AdminOptions {
449 pub fn new() -> AdminOptions {
451 AdminOptions::default()
452 }
453
454 pub fn request_timeout<T: Into<Timeout>>(mut self, timeout: Option<T>) -> Self {
459 self.request_timeout = timeout.map(Into::into);
460 self
461 }
462
463 pub fn operation_timeout<T: Into<Timeout>>(mut self, timeout: Option<T>) -> Self {
473 self.operation_timeout = timeout.map(Into::into);
474 self
475 }
476
477 pub fn validate_only(mut self, validate_only: bool) -> Self {
482 self.validate_only = validate_only;
483 self
484 }
485
486 pub fn broker_id<T: Into<Option<i32>>>(mut self, broker_id: T) -> Self {
491 self.broker_id = broker_id.into();
492 self
493 }
494
495 fn to_native(
496 &self,
497 client: *mut RDKafka,
498 err_buf: &mut ErrBuf,
499 ) -> KafkaResult<(NativeAdminOptions, oneshot::Receiver<NativeEvent>)> {
500 let native_opts = unsafe {
501 NativeAdminOptions::from_ptr(rdsys::rd_kafka_AdminOptions_new(
502 client,
503 RDKafkaAdminOp::RD_KAFKA_ADMIN_OP_ANY,
504 ))
505 .unwrap()
506 };
507
508 if let Some(timeout) = self.request_timeout {
509 let res = unsafe {
510 rdsys::rd_kafka_AdminOptions_set_request_timeout(
511 native_opts.ptr(),
512 timeout.as_millis(),
513 err_buf.as_mut_ptr(),
514 err_buf.capacity(),
515 )
516 };
517 check_rdkafka_invalid_arg(res, err_buf)?;
518 }
519
520 if let Some(timeout) = self.operation_timeout {
521 let res = unsafe {
522 rdsys::rd_kafka_AdminOptions_set_operation_timeout(
523 native_opts.ptr(),
524 timeout.as_millis(),
525 err_buf.as_mut_ptr(),
526 err_buf.capacity(),
527 )
528 };
529 check_rdkafka_invalid_arg(res, err_buf)?;
530 }
531
532 if self.validate_only {
533 let res = unsafe {
534 rdsys::rd_kafka_AdminOptions_set_validate_only(
535 native_opts.ptr(),
536 1, err_buf.as_mut_ptr(),
538 err_buf.capacity(),
539 )
540 };
541 check_rdkafka_invalid_arg(res, err_buf)?;
542 }
543
544 if let Some(broker_id) = self.broker_id {
545 let res = unsafe {
546 rdsys::rd_kafka_AdminOptions_set_broker(
547 native_opts.ptr(),
548 broker_id,
549 err_buf.as_mut_ptr(),
550 err_buf.capacity(),
551 )
552 };
553 check_rdkafka_invalid_arg(res, err_buf)?;
554 }
555
556 let (tx, rx) = oneshot::channel();
557 let tx = Box::into_raw(Box::new(tx)) as *mut c_void;
558 unsafe { rdsys::rd_kafka_AdminOptions_set_opaque(native_opts.ptr(), tx) };
559
560 Ok((native_opts, rx))
561 }
562}
563
564unsafe impl KafkaDrop for RDKafkaAdminOptions {
565 const TYPE: &'static str = "admin options";
566 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_AdminOptions_destroy;
567}
568
569type NativeAdminOptions = NativePtr<RDKafkaAdminOptions>;
570
571fn check_rdkafka_invalid_arg(res: RDKafkaRespErr, err_buf: &ErrBuf) -> KafkaResult<()> {
572 match res.into() {
573 RDKafkaErrorCode::NoError => Ok(()),
574 RDKafkaErrorCode::InvalidArgument => {
575 let msg = if err_buf.len() == 0 {
576 "invalid argument".into()
577 } else {
578 err_buf.to_string()
579 };
580 Err(KafkaError::AdminOpCreation(msg))
581 }
582 res => Err(KafkaError::AdminOpCreation(format!(
583 "setting admin options returned unexpected error code {}",
584 res
585 ))),
586 }
587}
588
589pub type TopicResult = Result<String, (String, RDKafkaErrorCode)>;
596
597fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Vec<TopicResult> {
598 let mut out = Vec::with_capacity(n);
599 for i in 0..n {
600 let topic = unsafe { *topics.add(i) };
601 let name = unsafe { cstr_to_owned(rdsys::rd_kafka_topic_result_name(topic)) };
602 let err = unsafe { rdsys::rd_kafka_topic_result_error(topic) };
603 if err.is_error() {
604 out.push(Err((name, err.into())));
605 } else {
606 out.push(Ok(name));
607 }
608 }
609 out
610}
611
612#[derive(Debug)]
618pub struct NewTopic<'a> {
619 pub name: &'a str,
621 pub num_partitions: i32,
623 pub replication: TopicReplication<'a>,
625 pub config: Vec<(&'a str, &'a str)>,
627}
628
629impl<'a> NewTopic<'a> {
630 pub fn new(
632 name: &'a str,
633 num_partitions: i32,
634 replication: TopicReplication<'a>,
635 ) -> NewTopic<'a> {
636 NewTopic {
637 name,
638 num_partitions,
639 replication,
640 config: Vec::new(),
641 }
642 }
643
644 pub fn set(mut self, key: &'a str, value: &'a str) -> NewTopic<'a> {
646 self.config.push((key, value));
647 self
648 }
649
650 fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewTopic> {
651 let name = CString::new(self.name)?;
652 let repl = match self.replication {
653 TopicReplication::Fixed(n) => n,
654 TopicReplication::Variable(partitions) => {
655 if partitions.len() as i32 != self.num_partitions {
656 return Err(KafkaError::AdminOpCreation(format!(
657 "replication configuration for topic '{}' assigns {} partition(s), \
658 which does not match the specified number of partitions ({})",
659 self.name,
660 partitions.len(),
661 self.num_partitions,
662 )));
663 }
664 -1
665 }
666 };
667 let topic = unsafe {
671 NativeNewTopic::from_ptr(rdsys::rd_kafka_NewTopic_new(
672 name.as_ptr(),
673 self.num_partitions,
674 repl,
675 err_buf.as_mut_ptr(),
676 err_buf.capacity(),
677 ))
678 }
679 .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
680
681 if let TopicReplication::Variable(assignment) = self.replication {
682 for (partition_id, broker_ids) in assignment.iter().enumerate() {
683 let res = unsafe {
684 rdsys::rd_kafka_NewTopic_set_replica_assignment(
685 topic.ptr(),
686 partition_id as i32,
687 broker_ids.as_ptr() as *mut i32,
688 broker_ids.len(),
689 err_buf.as_mut_ptr(),
690 err_buf.capacity(),
691 )
692 };
693 check_rdkafka_invalid_arg(res, err_buf)?;
694 }
695 }
696 for (key, val) in &self.config {
697 let key_c = CString::new(*key)?;
698 let val_c = CString::new(*val)?;
699 let res = unsafe {
700 rdsys::rd_kafka_NewTopic_set_config(topic.ptr(), key_c.as_ptr(), val_c.as_ptr())
701 };
702 check_rdkafka_invalid_arg(res, err_buf)?;
703 }
704 Ok(topic)
705 }
706}
707
708pub type PartitionAssignment<'a> = &'a [&'a [i32]];
714
715#[derive(Debug)]
717pub enum TopicReplication<'a> {
718 Fixed(i32),
720 Variable(PartitionAssignment<'a>),
723}
724
725type NativeNewTopic = NativePtr<RDKafkaNewTopic>;
726
727unsafe impl KafkaDrop for RDKafkaNewTopic {
728 const TYPE: &'static str = "new topic";
729 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_NewTopic_destroy;
730}
731
732struct CreateTopicsFuture {
733 rx: oneshot::Receiver<NativeEvent>,
734}
735
736impl Future for CreateTopicsFuture {
737 type Output = KafkaResult<Vec<TopicResult>>;
738
739 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
740 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
741 event.check_error()?;
742 let res = unsafe { rdsys::rd_kafka_event_CreateTopics_result(event.ptr()) };
743 if res.is_null() {
744 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
745 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
746 "create topics request received response of incorrect type ({})",
747 typ
748 ))));
749 }
750 let mut n = 0;
751 let topics = unsafe { rdsys::rd_kafka_CreateTopics_result_topics(res, &mut n) };
752 Poll::Ready(Ok(build_topic_results(topics, n)))
753 }
754}
755
756type NativeDeleteTopic = NativePtr<RDKafkaDeleteTopic>;
761
762unsafe impl KafkaDrop for RDKafkaDeleteTopic {
763 const TYPE: &'static str = "delete topic";
764 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteTopic_destroy;
765}
766
767struct DeleteTopicsFuture {
768 rx: oneshot::Receiver<NativeEvent>,
769}
770
771impl Future for DeleteTopicsFuture {
772 type Output = KafkaResult<Vec<TopicResult>>;
773
774 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
775 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
776 event.check_error()?;
777 let res = unsafe { rdsys::rd_kafka_event_DeleteTopics_result(event.ptr()) };
778 if res.is_null() {
779 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
780 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
781 "delete topics request received response of incorrect type ({})",
782 typ
783 ))));
784 }
785 let mut n = 0;
786 let topics = unsafe { rdsys::rd_kafka_DeleteTopics_result_topics(res, &mut n) };
787 Poll::Ready(Ok(build_topic_results(topics, n)))
788 }
789}
790
791pub struct NewPartitions<'a> {
797 pub topic_name: &'a str,
799 pub new_partition_count: usize,
801 pub assignment: Option<PartitionAssignment<'a>>,
803}
804
805impl<'a> NewPartitions<'a> {
806 pub fn new(topic_name: &'a str, new_partition_count: usize) -> NewPartitions<'a> {
808 NewPartitions {
809 topic_name,
810 new_partition_count,
811 assignment: None,
812 }
813 }
814
815 pub fn assign(mut self, assignment: PartitionAssignment<'a>) -> NewPartitions<'_> {
818 self.assignment = Some(assignment);
819 self
820 }
821
822 fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewPartitions> {
823 let name = CString::new(self.topic_name)?;
824 if let Some(assignment) = self.assignment {
825 if assignment.len() > self.new_partition_count {
833 return Err(KafkaError::AdminOpCreation(format!(
834 "partition assignment for topic '{}' assigns {} partition(s), \
835 which is more than the requested total number of partitions ({})",
836 self.topic_name,
837 assignment.len(),
838 self.new_partition_count,
839 )));
840 }
841 }
842 let partitions = unsafe {
846 NativeNewPartitions::from_ptr(rdsys::rd_kafka_NewPartitions_new(
847 name.as_ptr(),
848 self.new_partition_count,
849 err_buf.as_mut_ptr(),
850 err_buf.capacity(),
851 ))
852 }
853 .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
854
855 if let Some(assignment) = self.assignment {
856 for (partition_id, broker_ids) in assignment.iter().enumerate() {
857 let res = unsafe {
858 rdsys::rd_kafka_NewPartitions_set_replica_assignment(
859 partitions.ptr(),
860 partition_id as i32,
861 broker_ids.as_ptr() as *mut i32,
862 broker_ids.len(),
863 err_buf.as_mut_ptr(),
864 err_buf.capacity(),
865 )
866 };
867 check_rdkafka_invalid_arg(res, err_buf)?;
868 }
869 }
870 Ok(partitions)
871 }
872}
873
874type NativeNewPartitions = NativePtr<RDKafkaNewPartitions>;
875
876unsafe impl KafkaDrop for RDKafkaNewPartitions {
877 const TYPE: &'static str = "new partitions";
878 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_NewPartitions_destroy;
879}
880
881struct CreatePartitionsFuture {
882 rx: oneshot::Receiver<NativeEvent>,
883}
884
885impl Future for CreatePartitionsFuture {
886 type Output = KafkaResult<Vec<TopicResult>>;
887
888 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
889 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
890 event.check_error()?;
891 let res = unsafe { rdsys::rd_kafka_event_CreatePartitions_result(event.ptr()) };
892 if res.is_null() {
893 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
894 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
895 "create partitions request received response of incorrect type ({})",
896 typ
897 ))));
898 }
899 let mut n = 0;
900 let topics = unsafe { rdsys::rd_kafka_CreatePartitions_result_topics(res, &mut n) };
901 Poll::Ready(Ok(build_topic_results(topics, n)))
902 }
903}
904
905type NativeDeleteRecords = NativePtr<RDKafkaDeleteRecords>;
910
911unsafe impl KafkaDrop for RDKafkaDeleteRecords {
912 const TYPE: &'static str = "delete records";
913 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteRecords_destroy;
914}
915
916struct DeleteRecordsFuture {
917 rx: oneshot::Receiver<NativeEvent>,
918}
919
920impl Future for DeleteRecordsFuture {
921 type Output = KafkaResult<TopicPartitionList>;
922
923 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
924 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
925 event.check_error()?;
926 let res = unsafe { rdsys::rd_kafka_event_DeleteRecords_result(event.ptr()) };
927 if res.is_null() {
928 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
929 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
930 "delete records request received response of incorrect type ({})",
931 typ
932 ))));
933 }
934 let tpl = unsafe {
935 let tpl = rdsys::rd_kafka_DeleteRecords_result_offsets(res);
936 TopicPartitionList::from_ptr(rdsys::rd_kafka_topic_partition_list_copy(tpl))
937 };
938 Poll::Ready(Ok(tpl))
939 }
940}
941
942pub type ConfigResourceResult = Result<ConfigResource, RDKafkaErrorCode>;
948
949#[derive(Copy, Clone, Debug, Eq, PartialEq)]
951pub enum ResourceSpecifier<'a> {
952 Topic(&'a str),
954 Group(&'a str),
956 Broker(i32),
958}
959
960#[derive(Debug, Eq, PartialEq)]
962pub enum OwnedResourceSpecifier {
963 Topic(String),
965 Group(String),
967 Broker(i32),
969}
970
971#[derive(Debug, Eq, PartialEq)]
973pub enum ConfigSource {
974 Unknown,
977 DynamicTopic,
979 DynamicBroker,
981 DynamicDefaultBroker,
983 StaticBroker,
985 Default,
987}
988
989#[derive(Debug, Eq, PartialEq)]
991pub struct ConfigEntry {
992 pub name: String,
994 pub value: Option<String>,
996 pub source: ConfigSource,
998 pub is_read_only: bool,
1000 pub is_default: bool,
1002 pub is_sensitive: bool,
1004}
1005
1006#[derive(Debug)]
1008pub struct ConfigResource {
1009 pub specifier: OwnedResourceSpecifier,
1011 pub entries: Vec<ConfigEntry>,
1013}
1014
1015impl ConfigResource {
1016 pub fn entry_map(&self) -> HashMap<&str, &ConfigEntry> {
1019 self.entries.iter().map(|e| (&*e.name, e)).collect()
1020 }
1021
1022 pub fn get(&self, name: &str) -> Option<&ConfigEntry> {
1027 self.entries.iter().find(|e| e.name == name)
1028 }
1029}
1030
1031type NativeConfigResource = NativePtr<RDKafkaConfigResource>;
1032
1033unsafe impl KafkaDrop for RDKafkaConfigResource {
1034 const TYPE: &'static str = "config resource";
1035 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_ConfigResource_destroy;
1036}
1037
1038fn extract_config_specifier(
1039 resource: *const RDKafkaConfigResource,
1040) -> KafkaResult<OwnedResourceSpecifier> {
1041 let typ = unsafe { rdsys::rd_kafka_ConfigResource_type(resource) };
1042 match typ {
1043 RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC => {
1044 let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
1045 Ok(OwnedResourceSpecifier::Topic(name))
1046 }
1047 RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP => {
1048 let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
1049 Ok(OwnedResourceSpecifier::Group(name))
1050 }
1051 RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER => {
1052 let name = unsafe { CStr::from_ptr(rdsys::rd_kafka_ConfigResource_name(resource)) }
1053 .to_string_lossy();
1054 match name.parse::<i32>() {
1055 Ok(id) => Ok(OwnedResourceSpecifier::Broker(id)),
1056 Err(_) => Err(KafkaError::AdminOpCreation(format!(
1057 "bogus broker ID in kafka response: {}",
1058 name
1059 ))),
1060 }
1061 }
1062 _ => Err(KafkaError::AdminOpCreation(format!(
1063 "bogus resource type in kafka response: {:?}",
1064 typ
1065 ))),
1066 }
1067}
1068
1069fn extract_config_source(config_source: RDKafkaConfigSource) -> KafkaResult<ConfigSource> {
1070 match config_source {
1071 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG => Ok(ConfigSource::Unknown),
1072 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG => {
1073 Ok(ConfigSource::DynamicTopic)
1074 }
1075 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG => {
1076 Ok(ConfigSource::DynamicBroker)
1077 }
1078 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG => {
1079 Ok(ConfigSource::DynamicDefaultBroker)
1080 }
1081 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG => {
1082 Ok(ConfigSource::StaticBroker)
1083 }
1084 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG => Ok(ConfigSource::Default),
1085 _ => Err(KafkaError::AdminOpCreation(format!(
1086 "bogus config source type in kafka response: {:?}",
1087 config_source,
1088 ))),
1089 }
1090}
1091
1092struct DescribeConfigsFuture {
1093 rx: oneshot::Receiver<NativeEvent>,
1094}
1095
1096impl Future for DescribeConfigsFuture {
1097 type Output = KafkaResult<Vec<ConfigResourceResult>>;
1098
1099 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1100 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1101 event.check_error()?;
1102 let res = unsafe { rdsys::rd_kafka_event_DescribeConfigs_result(event.ptr()) };
1103 if res.is_null() {
1104 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1105 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1106 "describe configs request received response of incorrect type ({})",
1107 typ
1108 ))));
1109 }
1110 let mut n = 0;
1111 let resources = unsafe { rdsys::rd_kafka_DescribeConfigs_result_resources(res, &mut n) };
1112 let mut out = Vec::with_capacity(n);
1113 for i in 0..n {
1114 let resource = unsafe { *resources.add(i) };
1115 let specifier = extract_config_specifier(resource)?;
1116 let mut entries_out = Vec::new();
1117 let mut n = 0;
1118 let entries = unsafe { rdsys::rd_kafka_ConfigResource_configs(resource, &mut n) };
1119 for j in 0..n {
1120 let entry = unsafe { *entries.add(j) };
1121 let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigEntry_name(entry)) };
1122 let value = unsafe {
1123 let value = rdsys::rd_kafka_ConfigEntry_value(entry);
1124 if value.is_null() {
1125 None
1126 } else {
1127 Some(cstr_to_owned(value))
1128 }
1129 };
1130 entries_out.push(ConfigEntry {
1131 name,
1132 value,
1133 source: extract_config_source(unsafe {
1134 rdsys::rd_kafka_ConfigEntry_source(entry)
1135 })?,
1136 is_read_only: unsafe { rdsys::rd_kafka_ConfigEntry_is_read_only(entry) } != 0,
1137 is_default: unsafe { rdsys::rd_kafka_ConfigEntry_is_default(entry) } != 0,
1138 is_sensitive: unsafe { rdsys::rd_kafka_ConfigEntry_is_sensitive(entry) } != 0,
1139 });
1140 }
1141 out.push(Ok(ConfigResource {
1142 specifier,
1143 entries: entries_out,
1144 }))
1145 }
1146 Poll::Ready(Ok(out))
1147 }
1148}
1149
1150pub type AlterConfigsResult =
1156 Result<OwnedResourceSpecifier, (OwnedResourceSpecifier, RDKafkaErrorCode)>;
1157
1158pub struct AlterConfig<'a> {
1160 pub specifier: ResourceSpecifier<'a>,
1162 pub entries: HashMap<&'a str, &'a str>,
1164}
1165
1166impl<'a> AlterConfig<'a> {
1167 pub fn new(specifier: ResourceSpecifier<'_>) -> AlterConfig<'_> {
1169 AlterConfig {
1170 specifier,
1171 entries: HashMap::new(),
1172 }
1173 }
1174
1175 pub fn set(mut self, key: &'a str, value: &'a str) -> AlterConfig<'a> {
1177 self.entries.insert(key, value);
1178 self
1179 }
1180
1181 fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeConfigResource> {
1182 let (name, typ) = match self.specifier {
1183 ResourceSpecifier::Topic(name) => (
1184 CString::new(name)?,
1185 RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC,
1186 ),
1187 ResourceSpecifier::Group(name) => (
1188 CString::new(name)?,
1189 RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP,
1190 ),
1191 ResourceSpecifier::Broker(id) => (
1192 CString::new(format!("{}", id))?,
1193 RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
1194 ),
1195 };
1196 let config = unsafe {
1199 NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(typ, name.as_ptr()))
1200 .unwrap()
1201 };
1202 for (key, val) in &self.entries {
1203 let key_c = CString::new(*key)?;
1204 let val_c = CString::new(*val)?;
1205 let res = unsafe {
1206 rdsys::rd_kafka_ConfigResource_set_config(
1207 config.ptr(),
1208 key_c.as_ptr(),
1209 val_c.as_ptr(),
1210 )
1211 };
1212 check_rdkafka_invalid_arg(res, err_buf)?;
1213 }
1214 Ok(config)
1215 }
1216}
1217
1218struct AlterConfigsFuture {
1219 rx: oneshot::Receiver<NativeEvent>,
1220}
1221
1222impl Future for AlterConfigsFuture {
1223 type Output = KafkaResult<Vec<AlterConfigsResult>>;
1224
1225 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1226 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1227 event.check_error()?;
1228 let res = unsafe { rdsys::rd_kafka_event_AlterConfigs_result(event.ptr()) };
1229 if res.is_null() {
1230 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1231 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1232 "alter configs request received response of incorrect type ({})",
1233 typ
1234 ))));
1235 }
1236 let mut n = 0;
1237 let resources = unsafe { rdsys::rd_kafka_AlterConfigs_result_resources(res, &mut n) };
1238 let mut out = Vec::with_capacity(n);
1239 for i in 0..n {
1240 let resource = unsafe { *resources.add(i) };
1241 let specifier = extract_config_specifier(resource)?;
1242 out.push(Ok(specifier));
1243 }
1244 Poll::Ready(Ok(out))
1245 }
1246}