rdkafka/
admin.rs

1//! Admin client.
2//!
3//! The main object is the [`AdminClient`] struct.
4//!
5//! [`AdminClient`]: struct.AdminClient.html
6
7use std::collections::HashMap;
8use std::ffi::{c_void, CStr, CString};
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::thread::{self, JoinHandle};
15use std::time::Duration;
16
17use futures_channel::oneshot;
18use futures_util::future::{self, Either, FutureExt};
19use futures_util::ready;
20
21use rdkafka_sys as rdsys;
22use rdkafka_sys::types::*;
23
24use crate::client::{Client, ClientContext, DefaultClientContext, NativeQueue};
25use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
26use crate::error::{IsError, KafkaError, KafkaResult};
27use crate::log::{trace, warn};
28use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout};
29use crate::TopicPartitionList;
30
31//
32// ********** ADMIN CLIENT **********
33//
34
35/// A client for the Kafka admin API.
36///
37/// `AdminClient` provides programmatic access to managing a Kafka cluster,
38/// notably manipulating topics, partitions, and configuration paramaters.
39pub struct AdminClient<C: ClientContext + 'static> {
40    client: Client<C>,
41    queue: Arc<NativeQueue>,
42    should_stop: Arc<AtomicBool>,
43    handle: Option<JoinHandle<()>>,
44}
45
46impl<C: ClientContext> AdminClient<C> {
47    /// Creates new topics according to the provided `NewTopic` specifications.
48    ///
49    /// Note that while the API supports creating multiple topics at once, it
50    /// is not transactional. Creation of some topics may succeed while others
51    /// fail. Be sure to check the result of each individual operation.
52    pub fn create_topics<'a, I>(
53        &self,
54        topics: I,
55        opts: &AdminOptions,
56    ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>>
57    where
58        I: IntoIterator<Item = &'a NewTopic<'a>>,
59    {
60        match self.create_topics_inner(topics, opts) {
61            Ok(rx) => Either::Left(CreateTopicsFuture { rx }),
62            Err(err) => Either::Right(future::err(err)),
63        }
64    }
65
66    fn create_topics_inner<'a, I>(
67        &self,
68        topics: I,
69        opts: &AdminOptions,
70    ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
71    where
72        I: IntoIterator<Item = &'a NewTopic<'a>>,
73    {
74        let mut native_topics = Vec::new();
75        let mut err_buf = ErrBuf::new();
76        for t in topics {
77            native_topics.push(t.to_native(&mut err_buf)?);
78        }
79        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
80        unsafe {
81            rdsys::rd_kafka_CreateTopics(
82                self.client.native_ptr(),
83                native_topics.as_c_array(),
84                native_topics.len(),
85                native_opts.ptr(),
86                self.queue.ptr(),
87            );
88        }
89        Ok(rx)
90    }
91
92    /// Deletes the named topics.
93    ///
94    /// Note that while the API supports deleting multiple topics at once, it is
95    /// not transactional. Deletion of some topics may succeed while others
96    /// fail. Be sure to check the result of each individual operation.
97    pub fn delete_topics(
98        &self,
99        topic_names: &[&str],
100        opts: &AdminOptions,
101    ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>> {
102        match self.delete_topics_inner(topic_names, opts) {
103            Ok(rx) => Either::Left(DeleteTopicsFuture { rx }),
104            Err(err) => Either::Right(future::err(err)),
105        }
106    }
107
108    fn delete_topics_inner(
109        &self,
110        topic_names: &[&str],
111        opts: &AdminOptions,
112    ) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
113        let mut native_topics = Vec::new();
114        let mut err_buf = ErrBuf::new();
115        for tn in topic_names {
116            let tn_c = CString::new(*tn)?;
117            let native_topic = unsafe {
118                NativeDeleteTopic::from_ptr(rdsys::rd_kafka_DeleteTopic_new(tn_c.as_ptr())).unwrap()
119            };
120            native_topics.push(native_topic);
121        }
122        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
123        unsafe {
124            rdsys::rd_kafka_DeleteTopics(
125                self.client.native_ptr(),
126                native_topics.as_c_array(),
127                native_topics.len(),
128                native_opts.ptr(),
129                self.queue.ptr(),
130            );
131        }
132        Ok(rx)
133    }
134
135    /// Adds additional partitions to existing topics according to the provided
136    /// `NewPartitions` specifications.
137    ///
138    /// Note that while the API supports creating partitions for multiple topics
139    /// at once, it is not transactional. Creation of partitions for some topics
140    /// may succeed while others fail. Be sure to check the result of each
141    /// individual operation.
142    pub fn create_partitions<'a, I>(
143        &self,
144        partitions: I,
145        opts: &AdminOptions,
146    ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>>
147    where
148        I: IntoIterator<Item = &'a NewPartitions<'a>>,
149    {
150        match self.create_partitions_inner(partitions, opts) {
151            Ok(rx) => Either::Left(CreatePartitionsFuture { rx }),
152            Err(err) => Either::Right(future::err(err)),
153        }
154    }
155
156    fn create_partitions_inner<'a, I>(
157        &self,
158        partitions: I,
159        opts: &AdminOptions,
160    ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
161    where
162        I: IntoIterator<Item = &'a NewPartitions<'a>>,
163    {
164        let mut native_partitions = Vec::new();
165        let mut err_buf = ErrBuf::new();
166        for p in partitions {
167            native_partitions.push(p.to_native(&mut err_buf)?);
168        }
169        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
170        unsafe {
171            rdsys::rd_kafka_CreatePartitions(
172                self.client.native_ptr(),
173                native_partitions.as_c_array(),
174                native_partitions.len(),
175                native_opts.ptr(),
176                self.queue.ptr(),
177            );
178        }
179        Ok(rx)
180    }
181
182    /// Deletes records from a topic.
183    ///
184    /// The provided `offsets` is a topic partition list specifying which
185    /// records to delete from a list of topic partitions. For each entry in the
186    /// list, the messages at offsets before the specified offsets (exclusive)
187    /// in the specified partition will be deleted. Use offset [`Offset::End`]
188    /// to delete all records in the partition.
189    ///
190    /// Returns a topic partition list describing the result of the deletion. If
191    /// the operation succeeded for a partition, the offset for that partition
192    /// will be set to the post-deletion low-water mark for that partition. If
193    /// the operation failed for a partition, there will be an error for that
194    /// partition's entry in the list.
195    pub fn delete_records(
196        &self,
197        offsets: &TopicPartitionList,
198        opts: &AdminOptions,
199    ) -> impl Future<Output = KafkaResult<TopicPartitionList>> {
200        match self.delete_records_inner(offsets, opts) {
201            Ok(rx) => Either::Left(DeleteRecordsFuture { rx }),
202            Err(err) => Either::Right(future::err(err)),
203        }
204    }
205
206    fn delete_records_inner(
207        &self,
208        offsets: &TopicPartitionList,
209        opts: &AdminOptions,
210    ) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
211        let mut err_buf = ErrBuf::new();
212        let delete_records = unsafe {
213            NativeDeleteRecords::from_ptr(rdsys::rd_kafka_DeleteRecords_new(offsets.ptr()))
214        }
215        .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
216        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
217        unsafe {
218            rdsys::rd_kafka_DeleteRecords(
219                self.client.native_ptr(),
220                &mut delete_records.ptr(),
221                1,
222                native_opts.ptr(),
223                self.queue.ptr(),
224            );
225        }
226        Ok(rx)
227    }
228
229    /// Retrieves the configuration parameters for the specified resources.
230    ///
231    /// Note that while the API supports describing multiple configurations at
232    /// once, it is not transactional. There is no guarantee that you will see
233    /// a consistent snapshot of the configuration across different resources.
234    pub fn describe_configs<'a, I>(
235        &self,
236        configs: I,
237        opts: &AdminOptions,
238    ) -> impl Future<Output = KafkaResult<Vec<ConfigResourceResult>>>
239    where
240        I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
241    {
242        match self.describe_configs_inner(configs, opts) {
243            Ok(rx) => Either::Left(DescribeConfigsFuture { rx }),
244            Err(err) => Either::Right(future::err(err)),
245        }
246    }
247
248    fn describe_configs_inner<'a, I>(
249        &self,
250        configs: I,
251        opts: &AdminOptions,
252    ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
253    where
254        I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
255    {
256        let mut native_configs = Vec::new();
257        let mut err_buf = ErrBuf::new();
258        for c in configs {
259            let (name, typ) = match c {
260                ResourceSpecifier::Topic(name) => (
261                    CString::new(*name)?,
262                    RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC,
263                ),
264                ResourceSpecifier::Group(name) => (
265                    CString::new(*name)?,
266                    RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP,
267                ),
268                ResourceSpecifier::Broker(id) => (
269                    CString::new(format!("{}", id))?,
270                    RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
271                ),
272            };
273            native_configs.push(unsafe {
274                NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(
275                    typ,
276                    name.as_ptr(),
277                ))
278                .unwrap()
279            });
280        }
281        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
282        unsafe {
283            rdsys::rd_kafka_DescribeConfigs(
284                self.client.native_ptr(),
285                native_configs.as_c_array(),
286                native_configs.len(),
287                native_opts.ptr(),
288                self.queue.ptr(),
289            );
290        }
291        Ok(rx)
292    }
293
294    /// Sets configuration parameters for the specified resources.
295    ///
296    /// Note that while the API supports altering multiple resources at once, it
297    /// is not transactional. Alteration of some resources may succeed while
298    /// others fail. Be sure to check the result of each individual operation.
299    pub fn alter_configs<'a, I>(
300        &self,
301        configs: I,
302        opts: &AdminOptions,
303    ) -> impl Future<Output = KafkaResult<Vec<AlterConfigsResult>>>
304    where
305        I: IntoIterator<Item = &'a AlterConfig<'a>>,
306    {
307        match self.alter_configs_inner(configs, opts) {
308            Ok(rx) => Either::Left(AlterConfigsFuture { rx }),
309            Err(err) => Either::Right(future::err(err)),
310        }
311    }
312
313    fn alter_configs_inner<'a, I>(
314        &self,
315        configs: I,
316        opts: &AdminOptions,
317    ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
318    where
319        I: IntoIterator<Item = &'a AlterConfig<'a>>,
320    {
321        let mut native_configs = Vec::new();
322        let mut err_buf = ErrBuf::new();
323        for c in configs {
324            native_configs.push(c.to_native(&mut err_buf)?);
325        }
326        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
327        unsafe {
328            rdsys::rd_kafka_AlterConfigs(
329                self.client.native_ptr(),
330                native_configs.as_c_array(),
331                native_configs.len(),
332                native_opts.ptr(),
333                self.queue.ptr(),
334            );
335        }
336        Ok(rx)
337    }
338
339    /// Returns the client underlying this admin client.
340    pub fn inner(&self) -> &Client<C> {
341        &self.client
342    }
343}
344
345impl FromClientConfig for AdminClient<DefaultClientContext> {
346    fn from_config(config: &ClientConfig) -> KafkaResult<AdminClient<DefaultClientContext>> {
347        AdminClient::from_config_and_context(config, DefaultClientContext)
348    }
349}
350
351impl<C: ClientContext> FromClientConfigAndContext<C> for AdminClient<C> {
352    fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<AdminClient<C>> {
353        let native_config = config.create_native_config()?;
354        // librdkafka only provides consumer and producer types. We follow the
355        // example of the Python bindings in choosing to pretend to be a
356        // producer, as producer clients are allegedly more lightweight. [0]
357        //
358        // [0]: https://github.com/confluentinc/confluent-kafka-python/blob/bfb07dfbca47c256c840aaace83d3fe26c587360/confluent_kafka/src/Admin.c#L1492-L1493
359        let client = Client::new(
360            config,
361            native_config,
362            RDKafkaType::RD_KAFKA_PRODUCER,
363            context,
364        )?;
365        let queue = Arc::new(client.new_native_queue());
366        let should_stop = Arc::new(AtomicBool::new(false));
367        let handle = start_poll_thread(queue.clone(), should_stop.clone());
368        Ok(AdminClient {
369            client,
370            queue,
371            should_stop,
372            handle: Some(handle),
373        })
374    }
375}
376
377impl<C: ClientContext> Drop for AdminClient<C> {
378    fn drop(&mut self) {
379        trace!("Stopping polling");
380        self.should_stop.store(true, Ordering::Relaxed);
381        trace!("Waiting for polling thread termination");
382        match self.handle.take().unwrap().join() {
383            Ok(()) => trace!("Polling stopped"),
384            Err(e) => warn!("Failure while terminating thread: {:?}", e),
385        };
386    }
387}
388
389fn start_poll_thread(queue: Arc<NativeQueue>, should_stop: Arc<AtomicBool>) -> JoinHandle<()> {
390    thread::Builder::new()
391        .name("admin client polling thread".into())
392        .spawn(move || {
393            trace!("Admin polling thread loop started");
394            loop {
395                let event = queue.poll(Duration::from_millis(100));
396                if event.is_null() {
397                    if should_stop.load(Ordering::Relaxed) {
398                        // We received nothing and the thread should stop, so
399                        // break the loop.
400                        break;
401                    }
402                    continue;
403                }
404                let event = unsafe { NativeEvent::from_ptr(event).unwrap() };
405                let tx: Box<oneshot::Sender<NativeEvent>> =
406                    unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) };
407                let _ = tx.send(event);
408            }
409            trace!("Admin polling thread loop terminated");
410        })
411        .expect("Failed to start polling thread")
412}
413
414type NativeEvent = NativePtr<RDKafkaEvent>;
415
416unsafe impl KafkaDrop for RDKafkaEvent {
417    const TYPE: &'static str = "event";
418    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_event_destroy;
419}
420
421unsafe impl Send for NativeEvent {}
422unsafe impl Sync for NativeEvent {}
423
424impl NativePtr<RDKafkaEvent> {
425    fn check_error(&self) -> KafkaResult<()> {
426        let err = unsafe { rdsys::rd_kafka_event_error(self.ptr()) };
427        if err.is_error() {
428            Err(KafkaError::AdminOp(err.into()))
429        } else {
430            Ok(())
431        }
432    }
433}
434
435//
436// ********** ADMIN OPTIONS **********
437//
438
439/// Options for an admin API request.
440#[derive(Default)]
441pub struct AdminOptions {
442    request_timeout: Option<Timeout>,
443    operation_timeout: Option<Timeout>,
444    validate_only: bool,
445    broker_id: Option<i32>,
446}
447
448impl AdminOptions {
449    /// Creates a new `AdminOptions`.
450    pub fn new() -> AdminOptions {
451        AdminOptions::default()
452    }
453
454    /// Sets the overall request timeout, including broker lookup, request
455    /// transmission, operation time on broker, and response.
456    ///
457    /// Defaults to the `socket.timeout.ms` configuration parameter.
458    pub fn request_timeout<T: Into<Timeout>>(mut self, timeout: Option<T>) -> Self {
459        self.request_timeout = timeout.map(Into::into);
460        self
461    }
462
463    /// Sets the broker's operation timeout, such as the timeout for
464    /// CreateTopics to complete the creation of topics on the controller before
465    /// returning a result to the application.
466    ///
467    /// If unset (the default), the API calls will return immediately after
468    /// triggering the operation.
469    ///
470    /// Only the CreateTopics, DeleteTopics, and CreatePartitions API calls
471    /// respect this option.
472    pub fn operation_timeout<T: Into<Timeout>>(mut self, timeout: Option<T>) -> Self {
473        self.operation_timeout = timeout.map(Into::into);
474        self
475    }
476
477    /// Tells the broker to only validate the request, without performing the
478    /// requested operation.
479    ///
480    /// Defaults to false.
481    pub fn validate_only(mut self, validate_only: bool) -> Self {
482        self.validate_only = validate_only;
483        self
484    }
485
486    /// Override what broker the admin request will be sent to.
487    ///
488    /// By default, a reasonable broker will be selected automatically. See the
489    /// librdkafka docs on `rd_kafka_AdminOptions_set_broker` for details.
490    pub fn broker_id<T: Into<Option<i32>>>(mut self, broker_id: T) -> Self {
491        self.broker_id = broker_id.into();
492        self
493    }
494
495    fn to_native(
496        &self,
497        client: *mut RDKafka,
498        err_buf: &mut ErrBuf,
499    ) -> KafkaResult<(NativeAdminOptions, oneshot::Receiver<NativeEvent>)> {
500        let native_opts = unsafe {
501            NativeAdminOptions::from_ptr(rdsys::rd_kafka_AdminOptions_new(
502                client,
503                RDKafkaAdminOp::RD_KAFKA_ADMIN_OP_ANY,
504            ))
505            .unwrap()
506        };
507
508        if let Some(timeout) = self.request_timeout {
509            let res = unsafe {
510                rdsys::rd_kafka_AdminOptions_set_request_timeout(
511                    native_opts.ptr(),
512                    timeout.as_millis(),
513                    err_buf.as_mut_ptr(),
514                    err_buf.capacity(),
515                )
516            };
517            check_rdkafka_invalid_arg(res, err_buf)?;
518        }
519
520        if let Some(timeout) = self.operation_timeout {
521            let res = unsafe {
522                rdsys::rd_kafka_AdminOptions_set_operation_timeout(
523                    native_opts.ptr(),
524                    timeout.as_millis(),
525                    err_buf.as_mut_ptr(),
526                    err_buf.capacity(),
527                )
528            };
529            check_rdkafka_invalid_arg(res, err_buf)?;
530        }
531
532        if self.validate_only {
533            let res = unsafe {
534                rdsys::rd_kafka_AdminOptions_set_validate_only(
535                    native_opts.ptr(),
536                    1, // true
537                    err_buf.as_mut_ptr(),
538                    err_buf.capacity(),
539                )
540            };
541            check_rdkafka_invalid_arg(res, err_buf)?;
542        }
543
544        if let Some(broker_id) = self.broker_id {
545            let res = unsafe {
546                rdsys::rd_kafka_AdminOptions_set_broker(
547                    native_opts.ptr(),
548                    broker_id,
549                    err_buf.as_mut_ptr(),
550                    err_buf.capacity(),
551                )
552            };
553            check_rdkafka_invalid_arg(res, err_buf)?;
554        }
555
556        let (tx, rx) = oneshot::channel();
557        let tx = Box::into_raw(Box::new(tx)) as *mut c_void;
558        unsafe { rdsys::rd_kafka_AdminOptions_set_opaque(native_opts.ptr(), tx) };
559
560        Ok((native_opts, rx))
561    }
562}
563
564unsafe impl KafkaDrop for RDKafkaAdminOptions {
565    const TYPE: &'static str = "admin options";
566    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_AdminOptions_destroy;
567}
568
569type NativeAdminOptions = NativePtr<RDKafkaAdminOptions>;
570
571fn check_rdkafka_invalid_arg(res: RDKafkaRespErr, err_buf: &ErrBuf) -> KafkaResult<()> {
572    match res.into() {
573        RDKafkaErrorCode::NoError => Ok(()),
574        RDKafkaErrorCode::InvalidArgument => {
575            let msg = if err_buf.len() == 0 {
576                "invalid argument".into()
577            } else {
578                err_buf.to_string()
579            };
580            Err(KafkaError::AdminOpCreation(msg))
581        }
582        res => Err(KafkaError::AdminOpCreation(format!(
583            "setting admin options returned unexpected error code {}",
584            res
585        ))),
586    }
587}
588
589//
590// ********** RESPONSE HANDLING **********
591//
592
593/// The result of an individual CreateTopic, DeleteTopic, or
594/// CreatePartition operation.
595pub type TopicResult = Result<String, (String, RDKafkaErrorCode)>;
596
597fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Vec<TopicResult> {
598    let mut out = Vec::with_capacity(n);
599    for i in 0..n {
600        let topic = unsafe { *topics.add(i) };
601        let name = unsafe { cstr_to_owned(rdsys::rd_kafka_topic_result_name(topic)) };
602        let err = unsafe { rdsys::rd_kafka_topic_result_error(topic) };
603        if err.is_error() {
604            out.push(Err((name, err.into())));
605        } else {
606            out.push(Ok(name));
607        }
608    }
609    out
610}
611
612//
613// Create topic handling
614//
615
616/// Configuration for a CreateTopic operation.
617#[derive(Debug)]
618pub struct NewTopic<'a> {
619    /// The name of the new topic.
620    pub name: &'a str,
621    /// The initial number of partitions.
622    pub num_partitions: i32,
623    /// The initial replication configuration.
624    pub replication: TopicReplication<'a>,
625    /// The initial configuration parameters for the topic.
626    pub config: Vec<(&'a str, &'a str)>,
627}
628
629impl<'a> NewTopic<'a> {
630    /// Creates a new `NewTopic`.
631    pub fn new(
632        name: &'a str,
633        num_partitions: i32,
634        replication: TopicReplication<'a>,
635    ) -> NewTopic<'a> {
636        NewTopic {
637            name,
638            num_partitions,
639            replication,
640            config: Vec::new(),
641        }
642    }
643
644    /// Sets a new parameter in the initial topic configuration.
645    pub fn set(mut self, key: &'a str, value: &'a str) -> NewTopic<'a> {
646        self.config.push((key, value));
647        self
648    }
649
650    fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewTopic> {
651        let name = CString::new(self.name)?;
652        let repl = match self.replication {
653            TopicReplication::Fixed(n) => n,
654            TopicReplication::Variable(partitions) => {
655                if partitions.len() as i32 != self.num_partitions {
656                    return Err(KafkaError::AdminOpCreation(format!(
657                        "replication configuration for topic '{}' assigns {} partition(s), \
658                         which does not match the specified number of partitions ({})",
659                        self.name,
660                        partitions.len(),
661                        self.num_partitions,
662                    )));
663                }
664                -1
665            }
666        };
667        // N.B.: we wrap topic immediately, so that it is destroyed via the
668        // NativeNewTopic's Drop implementation if replica assignment or config
669        // installation fails.
670        let topic = unsafe {
671            NativeNewTopic::from_ptr(rdsys::rd_kafka_NewTopic_new(
672                name.as_ptr(),
673                self.num_partitions,
674                repl,
675                err_buf.as_mut_ptr(),
676                err_buf.capacity(),
677            ))
678        }
679        .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
680
681        if let TopicReplication::Variable(assignment) = self.replication {
682            for (partition_id, broker_ids) in assignment.iter().enumerate() {
683                let res = unsafe {
684                    rdsys::rd_kafka_NewTopic_set_replica_assignment(
685                        topic.ptr(),
686                        partition_id as i32,
687                        broker_ids.as_ptr() as *mut i32,
688                        broker_ids.len(),
689                        err_buf.as_mut_ptr(),
690                        err_buf.capacity(),
691                    )
692                };
693                check_rdkafka_invalid_arg(res, err_buf)?;
694            }
695        }
696        for (key, val) in &self.config {
697            let key_c = CString::new(*key)?;
698            let val_c = CString::new(*val)?;
699            let res = unsafe {
700                rdsys::rd_kafka_NewTopic_set_config(topic.ptr(), key_c.as_ptr(), val_c.as_ptr())
701            };
702            check_rdkafka_invalid_arg(res, err_buf)?;
703        }
704        Ok(topic)
705    }
706}
707
708/// An assignment of partitions to replicas.
709///
710/// Each element in the outer slice corresponds to the partition with that
711/// index. The inner slice specifies the broker IDs to which replicas of that
712/// partition should be assigned.
713pub type PartitionAssignment<'a> = &'a [&'a [i32]];
714
715/// Replication configuration for a new topic.
716#[derive(Debug)]
717pub enum TopicReplication<'a> {
718    /// All partitions should use the same fixed replication factor.
719    Fixed(i32),
720    /// Each partition should use the replica assignment from
721    /// `PartitionAssignment`.
722    Variable(PartitionAssignment<'a>),
723}
724
725type NativeNewTopic = NativePtr<RDKafkaNewTopic>;
726
727unsafe impl KafkaDrop for RDKafkaNewTopic {
728    const TYPE: &'static str = "new topic";
729    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_NewTopic_destroy;
730}
731
732struct CreateTopicsFuture {
733    rx: oneshot::Receiver<NativeEvent>,
734}
735
736impl Future for CreateTopicsFuture {
737    type Output = KafkaResult<Vec<TopicResult>>;
738
739    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
740        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
741        event.check_error()?;
742        let res = unsafe { rdsys::rd_kafka_event_CreateTopics_result(event.ptr()) };
743        if res.is_null() {
744            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
745            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
746                "create topics request received response of incorrect type ({})",
747                typ
748            ))));
749        }
750        let mut n = 0;
751        let topics = unsafe { rdsys::rd_kafka_CreateTopics_result_topics(res, &mut n) };
752        Poll::Ready(Ok(build_topic_results(topics, n)))
753    }
754}
755
756//
757// Delete topic handling
758//
759
760type NativeDeleteTopic = NativePtr<RDKafkaDeleteTopic>;
761
762unsafe impl KafkaDrop for RDKafkaDeleteTopic {
763    const TYPE: &'static str = "delete topic";
764    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteTopic_destroy;
765}
766
767struct DeleteTopicsFuture {
768    rx: oneshot::Receiver<NativeEvent>,
769}
770
771impl Future for DeleteTopicsFuture {
772    type Output = KafkaResult<Vec<TopicResult>>;
773
774    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
775        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
776        event.check_error()?;
777        let res = unsafe { rdsys::rd_kafka_event_DeleteTopics_result(event.ptr()) };
778        if res.is_null() {
779            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
780            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
781                "delete topics request received response of incorrect type ({})",
782                typ
783            ))));
784        }
785        let mut n = 0;
786        let topics = unsafe { rdsys::rd_kafka_DeleteTopics_result_topics(res, &mut n) };
787        Poll::Ready(Ok(build_topic_results(topics, n)))
788    }
789}
790
791//
792// Create partitions handling
793//
794
795/// Configuration for a CreatePartitions operation.
796pub struct NewPartitions<'a> {
797    /// The name of the topic to which partitions should be added.
798    pub topic_name: &'a str,
799    /// The total number of partitions after the operation completes.
800    pub new_partition_count: usize,
801    /// The replica assignments for the new partitions.
802    pub assignment: Option<PartitionAssignment<'a>>,
803}
804
805impl<'a> NewPartitions<'a> {
806    /// Creates a new `NewPartitions`.
807    pub fn new(topic_name: &'a str, new_partition_count: usize) -> NewPartitions<'a> {
808        NewPartitions {
809            topic_name,
810            new_partition_count,
811            assignment: None,
812        }
813    }
814
815    /// Sets the partition replica assignment for the new partitions. Only
816    /// assignments for newly created replicas should be included.
817    pub fn assign(mut self, assignment: PartitionAssignment<'a>) -> NewPartitions<'_> {
818        self.assignment = Some(assignment);
819        self
820    }
821
822    fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewPartitions> {
823        let name = CString::new(self.topic_name)?;
824        if let Some(assignment) = self.assignment {
825            // If assignment contains more than self.new_partition_count
826            // entries, we'll trip an assertion in librdkafka that crashes the
827            // process. Note that this check isn't a guarantee that the
828            // partition assignment is valid, since the assignment should only
829            // contain entries for the *new* partitions added, and not any
830            // existing partitions, but we can let the server handle that
831            // validation--we just need to make sure not to crash librdkafka.
832            if assignment.len() > self.new_partition_count {
833                return Err(KafkaError::AdminOpCreation(format!(
834                    "partition assignment for topic '{}' assigns {} partition(s), \
835                     which is more than the requested total number of partitions ({})",
836                    self.topic_name,
837                    assignment.len(),
838                    self.new_partition_count,
839                )));
840            }
841        }
842        // N.B.: we wrap partition immediately, so that it is destroyed via
843        // NativeNewPartitions's Drop implementation if replica assignment or
844        // config installation fails.
845        let partitions = unsafe {
846            NativeNewPartitions::from_ptr(rdsys::rd_kafka_NewPartitions_new(
847                name.as_ptr(),
848                self.new_partition_count,
849                err_buf.as_mut_ptr(),
850                err_buf.capacity(),
851            ))
852        }
853        .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
854
855        if let Some(assignment) = self.assignment {
856            for (partition_id, broker_ids) in assignment.iter().enumerate() {
857                let res = unsafe {
858                    rdsys::rd_kafka_NewPartitions_set_replica_assignment(
859                        partitions.ptr(),
860                        partition_id as i32,
861                        broker_ids.as_ptr() as *mut i32,
862                        broker_ids.len(),
863                        err_buf.as_mut_ptr(),
864                        err_buf.capacity(),
865                    )
866                };
867                check_rdkafka_invalid_arg(res, err_buf)?;
868            }
869        }
870        Ok(partitions)
871    }
872}
873
874type NativeNewPartitions = NativePtr<RDKafkaNewPartitions>;
875
876unsafe impl KafkaDrop for RDKafkaNewPartitions {
877    const TYPE: &'static str = "new partitions";
878    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_NewPartitions_destroy;
879}
880
881struct CreatePartitionsFuture {
882    rx: oneshot::Receiver<NativeEvent>,
883}
884
885impl Future for CreatePartitionsFuture {
886    type Output = KafkaResult<Vec<TopicResult>>;
887
888    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
889        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
890        event.check_error()?;
891        let res = unsafe { rdsys::rd_kafka_event_CreatePartitions_result(event.ptr()) };
892        if res.is_null() {
893            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
894            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
895                "create partitions request received response of incorrect type ({})",
896                typ
897            ))));
898        }
899        let mut n = 0;
900        let topics = unsafe { rdsys::rd_kafka_CreatePartitions_result_topics(res, &mut n) };
901        Poll::Ready(Ok(build_topic_results(topics, n)))
902    }
903}
904
905//
906// Delete records handling
907//
908
909type NativeDeleteRecords = NativePtr<RDKafkaDeleteRecords>;
910
911unsafe impl KafkaDrop for RDKafkaDeleteRecords {
912    const TYPE: &'static str = "delete records";
913    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteRecords_destroy;
914}
915
916struct DeleteRecordsFuture {
917    rx: oneshot::Receiver<NativeEvent>,
918}
919
920impl Future for DeleteRecordsFuture {
921    type Output = KafkaResult<TopicPartitionList>;
922
923    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
924        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
925        event.check_error()?;
926        let res = unsafe { rdsys::rd_kafka_event_DeleteRecords_result(event.ptr()) };
927        if res.is_null() {
928            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
929            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
930                "delete records request received response of incorrect type ({})",
931                typ
932            ))));
933        }
934        let tpl = unsafe {
935            let tpl = rdsys::rd_kafka_DeleteRecords_result_offsets(res);
936            TopicPartitionList::from_ptr(rdsys::rd_kafka_topic_partition_list_copy(tpl))
937        };
938        Poll::Ready(Ok(tpl))
939    }
940}
941
942//
943// Describe configs handling
944//
945
946/// The result of an individual DescribeConfig operation.
947pub type ConfigResourceResult = Result<ConfigResource, RDKafkaErrorCode>;
948
949/// Specification of a configurable resource.
950#[derive(Copy, Clone, Debug, Eq, PartialEq)]
951pub enum ResourceSpecifier<'a> {
952    /// A topic resource, identified by its name.
953    Topic(&'a str),
954    /// A group resource, identified by its ID.
955    Group(&'a str),
956    /// A broker resource, identified by its ID.
957    Broker(i32),
958}
959
960/// A `ResourceSpecifier` that owns its data.
961#[derive(Debug, Eq, PartialEq)]
962pub enum OwnedResourceSpecifier {
963    /// A topic resource, identified by its name.
964    Topic(String),
965    /// A group resource, identified by its ID.
966    Group(String),
967    /// A broker resource, identified by its ID.
968    Broker(i32),
969}
970
971/// The source of a configuration entry.
972#[derive(Debug, Eq, PartialEq)]
973pub enum ConfigSource {
974    /// Unknown. Note that Kafka brokers before v1.1.0 do not reliably provide
975    /// configuration source information.
976    Unknown,
977    /// A dynamic topic configuration.
978    DynamicTopic,
979    /// A dynamic broker configuration.
980    DynamicBroker,
981    /// The default dynamic broker configuration.
982    DynamicDefaultBroker,
983    /// The static broker configuration.
984    StaticBroker,
985    /// The hardcoded default configuration.
986    Default,
987}
988
989/// An individual configuration parameter for a `ConfigResource`.
990#[derive(Debug, Eq, PartialEq)]
991pub struct ConfigEntry {
992    /// The name of the configuration parameter.
993    pub name: String,
994    /// The value of the configuration parameter.
995    pub value: Option<String>,
996    /// The source of the configuration parameter.
997    pub source: ConfigSource,
998    /// Whether the configuration parameter is read only.
999    pub is_read_only: bool,
1000    /// Whether the configuration parameter currently has the default value.
1001    pub is_default: bool,
1002    /// Whether the configuration parameter contains sensitive data.
1003    pub is_sensitive: bool,
1004}
1005
1006/// A configurable resource and its current configuration values.
1007#[derive(Debug)]
1008pub struct ConfigResource {
1009    /// Identifies the resource.
1010    pub specifier: OwnedResourceSpecifier,
1011    /// The current configuration parameters.
1012    pub entries: Vec<ConfigEntry>,
1013}
1014
1015impl ConfigResource {
1016    /// Builds a `HashMap` of configuration entries, keyed by configuration
1017    /// entry name.
1018    pub fn entry_map(&self) -> HashMap<&str, &ConfigEntry> {
1019        self.entries.iter().map(|e| (&*e.name, e)).collect()
1020    }
1021
1022    /// Searches the configuration entries to find the named parameter.
1023    ///
1024    /// For more efficient lookups, use `entry_map` to build a `HashMap`
1025    /// instead.
1026    pub fn get(&self, name: &str) -> Option<&ConfigEntry> {
1027        self.entries.iter().find(|e| e.name == name)
1028    }
1029}
1030
1031type NativeConfigResource = NativePtr<RDKafkaConfigResource>;
1032
1033unsafe impl KafkaDrop for RDKafkaConfigResource {
1034    const TYPE: &'static str = "config resource";
1035    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_ConfigResource_destroy;
1036}
1037
1038fn extract_config_specifier(
1039    resource: *const RDKafkaConfigResource,
1040) -> KafkaResult<OwnedResourceSpecifier> {
1041    let typ = unsafe { rdsys::rd_kafka_ConfigResource_type(resource) };
1042    match typ {
1043        RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC => {
1044            let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
1045            Ok(OwnedResourceSpecifier::Topic(name))
1046        }
1047        RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP => {
1048            let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
1049            Ok(OwnedResourceSpecifier::Group(name))
1050        }
1051        RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER => {
1052            let name = unsafe { CStr::from_ptr(rdsys::rd_kafka_ConfigResource_name(resource)) }
1053                .to_string_lossy();
1054            match name.parse::<i32>() {
1055                Ok(id) => Ok(OwnedResourceSpecifier::Broker(id)),
1056                Err(_) => Err(KafkaError::AdminOpCreation(format!(
1057                    "bogus broker ID in kafka response: {}",
1058                    name
1059                ))),
1060            }
1061        }
1062        _ => Err(KafkaError::AdminOpCreation(format!(
1063            "bogus resource type in kafka response: {:?}",
1064            typ
1065        ))),
1066    }
1067}
1068
1069fn extract_config_source(config_source: RDKafkaConfigSource) -> KafkaResult<ConfigSource> {
1070    match config_source {
1071        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG => Ok(ConfigSource::Unknown),
1072        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG => {
1073            Ok(ConfigSource::DynamicTopic)
1074        }
1075        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG => {
1076            Ok(ConfigSource::DynamicBroker)
1077        }
1078        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG => {
1079            Ok(ConfigSource::DynamicDefaultBroker)
1080        }
1081        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG => {
1082            Ok(ConfigSource::StaticBroker)
1083        }
1084        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG => Ok(ConfigSource::Default),
1085        _ => Err(KafkaError::AdminOpCreation(format!(
1086            "bogus config source type in kafka response: {:?}",
1087            config_source,
1088        ))),
1089    }
1090}
1091
1092struct DescribeConfigsFuture {
1093    rx: oneshot::Receiver<NativeEvent>,
1094}
1095
1096impl Future for DescribeConfigsFuture {
1097    type Output = KafkaResult<Vec<ConfigResourceResult>>;
1098
1099    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1100        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1101        event.check_error()?;
1102        let res = unsafe { rdsys::rd_kafka_event_DescribeConfigs_result(event.ptr()) };
1103        if res.is_null() {
1104            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1105            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1106                "describe configs request received response of incorrect type ({})",
1107                typ
1108            ))));
1109        }
1110        let mut n = 0;
1111        let resources = unsafe { rdsys::rd_kafka_DescribeConfigs_result_resources(res, &mut n) };
1112        let mut out = Vec::with_capacity(n);
1113        for i in 0..n {
1114            let resource = unsafe { *resources.add(i) };
1115            let specifier = extract_config_specifier(resource)?;
1116            let mut entries_out = Vec::new();
1117            let mut n = 0;
1118            let entries = unsafe { rdsys::rd_kafka_ConfigResource_configs(resource, &mut n) };
1119            for j in 0..n {
1120                let entry = unsafe { *entries.add(j) };
1121                let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigEntry_name(entry)) };
1122                let value = unsafe {
1123                    let value = rdsys::rd_kafka_ConfigEntry_value(entry);
1124                    if value.is_null() {
1125                        None
1126                    } else {
1127                        Some(cstr_to_owned(value))
1128                    }
1129                };
1130                entries_out.push(ConfigEntry {
1131                    name,
1132                    value,
1133                    source: extract_config_source(unsafe {
1134                        rdsys::rd_kafka_ConfigEntry_source(entry)
1135                    })?,
1136                    is_read_only: unsafe { rdsys::rd_kafka_ConfigEntry_is_read_only(entry) } != 0,
1137                    is_default: unsafe { rdsys::rd_kafka_ConfigEntry_is_default(entry) } != 0,
1138                    is_sensitive: unsafe { rdsys::rd_kafka_ConfigEntry_is_sensitive(entry) } != 0,
1139                });
1140            }
1141            out.push(Ok(ConfigResource {
1142                specifier,
1143                entries: entries_out,
1144            }))
1145        }
1146        Poll::Ready(Ok(out))
1147    }
1148}
1149
1150//
1151// Alter configs handling
1152//
1153
1154/// The result of an individual AlterConfig operation.
1155pub type AlterConfigsResult =
1156    Result<OwnedResourceSpecifier, (OwnedResourceSpecifier, RDKafkaErrorCode)>;
1157
1158/// Configuration for an AlterConfig operation.
1159pub struct AlterConfig<'a> {
1160    /// Identifies the resource to be altered.
1161    pub specifier: ResourceSpecifier<'a>,
1162    /// The configuration parameters to be updated.
1163    pub entries: HashMap<&'a str, &'a str>,
1164}
1165
1166impl<'a> AlterConfig<'a> {
1167    /// Creates a new `AlterConfig`.
1168    pub fn new(specifier: ResourceSpecifier<'_>) -> AlterConfig<'_> {
1169        AlterConfig {
1170            specifier,
1171            entries: HashMap::new(),
1172        }
1173    }
1174
1175    /// Sets the configuration parameter named `key` to the specified `value`.
1176    pub fn set(mut self, key: &'a str, value: &'a str) -> AlterConfig<'a> {
1177        self.entries.insert(key, value);
1178        self
1179    }
1180
1181    fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeConfigResource> {
1182        let (name, typ) = match self.specifier {
1183            ResourceSpecifier::Topic(name) => (
1184                CString::new(name)?,
1185                RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC,
1186            ),
1187            ResourceSpecifier::Group(name) => (
1188                CString::new(name)?,
1189                RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP,
1190            ),
1191            ResourceSpecifier::Broker(id) => (
1192                CString::new(format!("{}", id))?,
1193                RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
1194            ),
1195        };
1196        // N.B.: we wrap config immediately, so that it is destroyed via the
1197        // NativeNewTopic's Drop implementation if config installation fails.
1198        let config = unsafe {
1199            NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(typ, name.as_ptr()))
1200                .unwrap()
1201        };
1202        for (key, val) in &self.entries {
1203            let key_c = CString::new(*key)?;
1204            let val_c = CString::new(*val)?;
1205            let res = unsafe {
1206                rdsys::rd_kafka_ConfigResource_set_config(
1207                    config.ptr(),
1208                    key_c.as_ptr(),
1209                    val_c.as_ptr(),
1210                )
1211            };
1212            check_rdkafka_invalid_arg(res, err_buf)?;
1213        }
1214        Ok(config)
1215    }
1216}
1217
1218struct AlterConfigsFuture {
1219    rx: oneshot::Receiver<NativeEvent>,
1220}
1221
1222impl Future for AlterConfigsFuture {
1223    type Output = KafkaResult<Vec<AlterConfigsResult>>;
1224
1225    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1226        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1227        event.check_error()?;
1228        let res = unsafe { rdsys::rd_kafka_event_AlterConfigs_result(event.ptr()) };
1229        if res.is_null() {
1230            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1231            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1232                "alter configs request received response of incorrect type ({})",
1233                typ
1234            ))));
1235        }
1236        let mut n = 0;
1237        let resources = unsafe { rdsys::rd_kafka_AlterConfigs_result_resources(res, &mut n) };
1238        let mut out = Vec::with_capacity(n);
1239        for i in 0..n {
1240            let resource = unsafe { *resources.add(i) };
1241            let specifier = extract_config_specifier(resource)?;
1242            out.push(Ok(specifier));
1243        }
1244        Poll::Ready(Ok(out))
1245    }
1246}