use std::collections::HashMap;
use std::ffi::{c_void, CStr, CString};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use futures_channel::oneshot;
use futures_util::future::{self, Either, FutureExt};
use futures_util::ready;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::client::{Client, ClientContext, DefaultClientContext, NativeQueue};
use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::log::{trace, warn};
use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout};
use crate::TopicPartitionList;
pub struct AdminClient<C: ClientContext + 'static> {
client: Client<C>,
queue: Arc<NativeQueue>,
should_stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}
impl<C: ClientContext> AdminClient<C> {
pub fn create_topics<'a, I>(
&self,
topics: I,
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<Vec<TopicResult>>>
where
I: IntoIterator<Item = &'a NewTopic<'a>>,
{
match self.create_topics_inner(topics, opts) {
Ok(rx) => Either::Left(CreateTopicsFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}
fn create_topics_inner<'a, I>(
&self,
topics: I,
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>>
where
I: IntoIterator<Item = &'a NewTopic<'a>>,
{
let mut native_topics = Vec::new();
let mut err_buf = ErrBuf::new();
for t in topics {
native_topics.push(t.to_native(&mut err_buf)?);
}
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
unsafe {
rdsys::rd_kafka_CreateTopics(
self.client.native_ptr(),
native_topics.as_c_array(),
native_topics.len(),
native_opts.ptr(),
self.queue.ptr(),
);
}
Ok(rx)
}
pub fn delete_topics(
&self,
topic_names: &[&str],
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<Vec<TopicResult>>> {
match self.delete_topics_inner(topic_names, opts) {
Ok(rx) => Either::Left(DeleteTopicsFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}
fn delete_topics_inner(
&self,
topic_names: &[&str],
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
let mut native_topics = Vec::new();
let mut err_buf = ErrBuf::new();
for tn in topic_names {
let tn_c = CString::new(*tn)?;
let native_topic = unsafe {
NativeDeleteTopic::from_ptr(rdsys::rd_kafka_DeleteTopic_new(tn_c.as_ptr())).unwrap()
};
native_topics.push(native_topic);
}
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
unsafe {
rdsys::rd_kafka_DeleteTopics(
self.client.native_ptr(),
native_topics.as_c_array(),
native_topics.len(),
native_opts.ptr(),
self.queue.ptr(),
);
}
Ok(rx)
}
pub fn create_partitions<'a, I>(
&self,
partitions: I,
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<Vec<TopicResult>>>
where
I: IntoIterator<Item = &'a NewPartitions<'a>>,
{
match self.create_partitions_inner(partitions, opts) {
Ok(rx) => Either::Left(CreatePartitionsFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}
fn create_partitions_inner<'a, I>(
&self,
partitions: I,
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>>
where
I: IntoIterator<Item = &'a NewPartitions<'a>>,
{
let mut native_partitions = Vec::new();
let mut err_buf = ErrBuf::new();
for p in partitions {
native_partitions.push(p.to_native(&mut err_buf)?);
}
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
unsafe {
rdsys::rd_kafka_CreatePartitions(
self.client.native_ptr(),
native_partitions.as_c_array(),
native_partitions.len(),
native_opts.ptr(),
self.queue.ptr(),
);
}
Ok(rx)
}
pub fn delete_records(
&self,
offsets: &TopicPartitionList,
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<TopicPartitionList>> {
match self.delete_records_inner(offsets, opts) {
Ok(rx) => Either::Left(DeleteRecordsFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}
fn delete_records_inner(
&self,
offsets: &TopicPartitionList,
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
let mut err_buf = ErrBuf::new();
let delete_records = unsafe {
NativeDeleteRecords::from_ptr(rdsys::rd_kafka_DeleteRecords_new(offsets.ptr()))
}
.ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
unsafe {
rdsys::rd_kafka_DeleteRecords(
self.client.native_ptr(),
&mut delete_records.ptr(),
1,
native_opts.ptr(),
self.queue.ptr(),
);
}
Ok(rx)
}
pub fn describe_configs<'a, I>(
&self,
configs: I,
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<Vec<ConfigResourceResult>>>
where
I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
{
match self.describe_configs_inner(configs, opts) {
Ok(rx) => Either::Left(DescribeConfigsFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}
fn describe_configs_inner<'a, I>(
&self,
configs: I,
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>>
where
I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
{
let mut native_configs = Vec::new();
let mut err_buf = ErrBuf::new();
for c in configs {
let (name, typ) = match c {
ResourceSpecifier::Topic(name) => (
CString::new(*name)?,
RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC,
),
ResourceSpecifier::Group(name) => (
CString::new(*name)?,
RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP,
),
ResourceSpecifier::Broker(id) => (
CString::new(format!("{}", id))?,
RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
),
};
native_configs.push(unsafe {
NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(
typ,
name.as_ptr(),
))
.unwrap()
});
}
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
unsafe {
rdsys::rd_kafka_DescribeConfigs(
self.client.native_ptr(),
native_configs.as_c_array(),
native_configs.len(),
native_opts.ptr(),
self.queue.ptr(),
);
}
Ok(rx)
}
pub fn alter_configs<'a, I>(
&self,
configs: I,
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<Vec<AlterConfigsResult>>>
where
I: IntoIterator<Item = &'a AlterConfig<'a>>,
{
match self.alter_configs_inner(configs, opts) {
Ok(rx) => Either::Left(AlterConfigsFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}
fn alter_configs_inner<'a, I>(
&self,
configs: I,
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>>
where
I: IntoIterator<Item = &'a AlterConfig<'a>>,
{
let mut native_configs = Vec::new();
let mut err_buf = ErrBuf::new();
for c in configs {
native_configs.push(c.to_native(&mut err_buf)?);
}
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
unsafe {
rdsys::rd_kafka_AlterConfigs(
self.client.native_ptr(),
native_configs.as_c_array(),
native_configs.len(),
native_opts.ptr(),
self.queue.ptr(),
);
}
Ok(rx)
}
pub fn inner(&self) -> &Client<C> {
&self.client
}
}
impl FromClientConfig for AdminClient<DefaultClientContext> {
fn from_config(config: &ClientConfig) -> KafkaResult<AdminClient<DefaultClientContext>> {
AdminClient::from_config_and_context(config, DefaultClientContext)
}
}
impl<C: ClientContext> FromClientConfigAndContext<C> for AdminClient<C> {
fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<AdminClient<C>> {
let native_config = config.create_native_config()?;
let client = Client::new(
config,
native_config,
RDKafkaType::RD_KAFKA_PRODUCER,
context,
)?;
let queue = Arc::new(client.new_native_queue());
let should_stop = Arc::new(AtomicBool::new(false));
let handle = start_poll_thread(queue.clone(), should_stop.clone());
Ok(AdminClient {
client,
queue,
should_stop,
handle: Some(handle),
})
}
}
impl<C: ClientContext> Drop for AdminClient<C> {
fn drop(&mut self) {
trace!("Stopping polling");
self.should_stop.store(true, Ordering::Relaxed);
trace!("Waiting for polling thread termination");
match self.handle.take().unwrap().join() {
Ok(()) => trace!("Polling stopped"),
Err(e) => warn!("Failure while terminating thread: {:?}", e),
};
}
}
fn start_poll_thread(queue: Arc<NativeQueue>, should_stop: Arc<AtomicBool>) -> JoinHandle<()> {
thread::Builder::new()
.name("admin client polling thread".into())
.spawn(move || {
trace!("Admin polling thread loop started");
loop {
let event = queue.poll(Duration::from_millis(100));
if event.is_null() {
if should_stop.load(Ordering::Relaxed) {
break;
}
continue;
}
let event = unsafe { NativeEvent::from_ptr(event).unwrap() };
let tx: Box<oneshot::Sender<NativeEvent>> =
unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) };
let _ = tx.send(event);
}
trace!("Admin polling thread loop terminated");
})
.expect("Failed to start polling thread")
}
type NativeEvent = NativePtr<RDKafkaEvent>;
unsafe impl KafkaDrop for RDKafkaEvent {
const TYPE: &'static str = "event";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_event_destroy;
}
unsafe impl Send for NativeEvent {}
unsafe impl Sync for NativeEvent {}
impl NativePtr<RDKafkaEvent> {
fn check_error(&self) -> KafkaResult<()> {
let err = unsafe { rdsys::rd_kafka_event_error(self.ptr()) };
if err.is_error() {
Err(KafkaError::AdminOp(err.into()))
} else {
Ok(())
}
}
}
#[derive(Default)]
pub struct AdminOptions {
request_timeout: Option<Timeout>,
operation_timeout: Option<Timeout>,
validate_only: bool,
broker_id: Option<i32>,
}
impl AdminOptions {
pub fn new() -> AdminOptions {
AdminOptions::default()
}
pub fn request_timeout<T: Into<Timeout>>(mut self, timeout: Option<T>) -> Self {
self.request_timeout = timeout.map(Into::into);
self
}
pub fn operation_timeout<T: Into<Timeout>>(mut self, timeout: Option<T>) -> Self {
self.operation_timeout = timeout.map(Into::into);
self
}
pub fn validate_only(mut self, validate_only: bool) -> Self {
self.validate_only = validate_only;
self
}
pub fn broker_id<T: Into<Option<i32>>>(mut self, broker_id: T) -> Self {
self.broker_id = broker_id.into();
self
}
fn to_native(
&self,
client: *mut RDKafka,
err_buf: &mut ErrBuf,
) -> KafkaResult<(NativeAdminOptions, oneshot::Receiver<NativeEvent>)> {
let native_opts = unsafe {
NativeAdminOptions::from_ptr(rdsys::rd_kafka_AdminOptions_new(
client,
RDKafkaAdminOp::RD_KAFKA_ADMIN_OP_ANY,
))
.unwrap()
};
if let Some(timeout) = self.request_timeout {
let res = unsafe {
rdsys::rd_kafka_AdminOptions_set_request_timeout(
native_opts.ptr(),
timeout.as_millis(),
err_buf.as_mut_ptr(),
err_buf.capacity(),
)
};
check_rdkafka_invalid_arg(res, err_buf)?;
}
if let Some(timeout) = self.operation_timeout {
let res = unsafe {
rdsys::rd_kafka_AdminOptions_set_operation_timeout(
native_opts.ptr(),
timeout.as_millis(),
err_buf.as_mut_ptr(),
err_buf.capacity(),
)
};
check_rdkafka_invalid_arg(res, err_buf)?;
}
if self.validate_only {
let res = unsafe {
rdsys::rd_kafka_AdminOptions_set_validate_only(
native_opts.ptr(),
1, err_buf.as_mut_ptr(),
err_buf.capacity(),
)
};
check_rdkafka_invalid_arg(res, err_buf)?;
}
if let Some(broker_id) = self.broker_id {
let res = unsafe {
rdsys::rd_kafka_AdminOptions_set_broker(
native_opts.ptr(),
broker_id,
err_buf.as_mut_ptr(),
err_buf.capacity(),
)
};
check_rdkafka_invalid_arg(res, err_buf)?;
}
let (tx, rx) = oneshot::channel();
let tx = Box::into_raw(Box::new(tx)) as *mut c_void;
unsafe { rdsys::rd_kafka_AdminOptions_set_opaque(native_opts.ptr(), tx) };
Ok((native_opts, rx))
}
}
unsafe impl KafkaDrop for RDKafkaAdminOptions {
const TYPE: &'static str = "admin options";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_AdminOptions_destroy;
}
type NativeAdminOptions = NativePtr<RDKafkaAdminOptions>;
fn check_rdkafka_invalid_arg(res: RDKafkaRespErr, err_buf: &ErrBuf) -> KafkaResult<()> {
match res.into() {
RDKafkaErrorCode::NoError => Ok(()),
RDKafkaErrorCode::InvalidArgument => {
let msg = if err_buf.len() == 0 {
"invalid argument".into()
} else {
err_buf.to_string()
};
Err(KafkaError::AdminOpCreation(msg))
}
res => Err(KafkaError::AdminOpCreation(format!(
"setting admin options returned unexpected error code {}",
res
))),
}
}
pub type TopicResult = Result<String, (String, RDKafkaErrorCode)>;
fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Vec<TopicResult> {
let mut out = Vec::with_capacity(n);
for i in 0..n {
let topic = unsafe { *topics.add(i) };
let name = unsafe { cstr_to_owned(rdsys::rd_kafka_topic_result_name(topic)) };
let err = unsafe { rdsys::rd_kafka_topic_result_error(topic) };
if err.is_error() {
out.push(Err((name, err.into())));
} else {
out.push(Ok(name));
}
}
out
}
#[derive(Debug)]
pub struct NewTopic<'a> {
pub name: &'a str,
pub num_partitions: i32,
pub replication: TopicReplication<'a>,
pub config: Vec<(&'a str, &'a str)>,
}
impl<'a> NewTopic<'a> {
pub fn new(
name: &'a str,
num_partitions: i32,
replication: TopicReplication<'a>,
) -> NewTopic<'a> {
NewTopic {
name,
num_partitions,
replication,
config: Vec::new(),
}
}
pub fn set(mut self, key: &'a str, value: &'a str) -> NewTopic<'a> {
self.config.push((key, value));
self
}
fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewTopic> {
let name = CString::new(self.name)?;
let repl = match self.replication {
TopicReplication::Fixed(n) => n,
TopicReplication::Variable(partitions) => {
if partitions.len() as i32 != self.num_partitions {
return Err(KafkaError::AdminOpCreation(format!(
"replication configuration for topic '{}' assigns {} partition(s), \
which does not match the specified number of partitions ({})",
self.name,
partitions.len(),
self.num_partitions,
)));
}
-1
}
};
let topic = unsafe {
NativeNewTopic::from_ptr(rdsys::rd_kafka_NewTopic_new(
name.as_ptr(),
self.num_partitions,
repl,
err_buf.as_mut_ptr(),
err_buf.capacity(),
))
}
.ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
if let TopicReplication::Variable(assignment) = self.replication {
for (partition_id, broker_ids) in assignment.iter().enumerate() {
let res = unsafe {
rdsys::rd_kafka_NewTopic_set_replica_assignment(
topic.ptr(),
partition_id as i32,
broker_ids.as_ptr() as *mut i32,
broker_ids.len(),
err_buf.as_mut_ptr(),
err_buf.capacity(),
)
};
check_rdkafka_invalid_arg(res, err_buf)?;
}
}
for (key, val) in &self.config {
let key_c = CString::new(*key)?;
let val_c = CString::new(*val)?;
let res = unsafe {
rdsys::rd_kafka_NewTopic_set_config(topic.ptr(), key_c.as_ptr(), val_c.as_ptr())
};
check_rdkafka_invalid_arg(res, err_buf)?;
}
Ok(topic)
}
}
pub type PartitionAssignment<'a> = &'a [&'a [i32]];
#[derive(Debug)]
pub enum TopicReplication<'a> {
Fixed(i32),
Variable(PartitionAssignment<'a>),
}
type NativeNewTopic = NativePtr<RDKafkaNewTopic>;
unsafe impl KafkaDrop for RDKafkaNewTopic {
const TYPE: &'static str = "new topic";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_NewTopic_destroy;
}
struct CreateTopicsFuture {
rx: oneshot::Receiver<NativeEvent>,
}
impl Future for CreateTopicsFuture {
type Output = KafkaResult<Vec<TopicResult>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;
let res = unsafe { rdsys::rd_kafka_event_CreateTopics_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"create topics request received response of incorrect type ({})",
typ
))));
}
let mut n = 0;
let topics = unsafe { rdsys::rd_kafka_CreateTopics_result_topics(res, &mut n) };
Poll::Ready(Ok(build_topic_results(topics, n)))
}
}
type NativeDeleteTopic = NativePtr<RDKafkaDeleteTopic>;
unsafe impl KafkaDrop for RDKafkaDeleteTopic {
const TYPE: &'static str = "delete topic";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteTopic_destroy;
}
struct DeleteTopicsFuture {
rx: oneshot::Receiver<NativeEvent>,
}
impl Future for DeleteTopicsFuture {
type Output = KafkaResult<Vec<TopicResult>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;
let res = unsafe { rdsys::rd_kafka_event_DeleteTopics_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"delete topics request received response of incorrect type ({})",
typ
))));
}
let mut n = 0;
let topics = unsafe { rdsys::rd_kafka_DeleteTopics_result_topics(res, &mut n) };
Poll::Ready(Ok(build_topic_results(topics, n)))
}
}
pub struct NewPartitions<'a> {
pub topic_name: &'a str,
pub new_partition_count: usize,
pub assignment: Option<PartitionAssignment<'a>>,
}
impl<'a> NewPartitions<'a> {
pub fn new(topic_name: &'a str, new_partition_count: usize) -> NewPartitions<'a> {
NewPartitions {
topic_name,
new_partition_count,
assignment: None,
}
}
pub fn assign(mut self, assignment: PartitionAssignment<'a>) -> NewPartitions<'_> {
self.assignment = Some(assignment);
self
}
fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewPartitions> {
let name = CString::new(self.topic_name)?;
if let Some(assignment) = self.assignment {
if assignment.len() > self.new_partition_count {
return Err(KafkaError::AdminOpCreation(format!(
"partition assignment for topic '{}' assigns {} partition(s), \
which is more than the requested total number of partitions ({})",
self.topic_name,
assignment.len(),
self.new_partition_count,
)));
}
}
let partitions = unsafe {
NativeNewPartitions::from_ptr(rdsys::rd_kafka_NewPartitions_new(
name.as_ptr(),
self.new_partition_count,
err_buf.as_mut_ptr(),
err_buf.capacity(),
))
}
.ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
if let Some(assignment) = self.assignment {
for (partition_id, broker_ids) in assignment.iter().enumerate() {
let res = unsafe {
rdsys::rd_kafka_NewPartitions_set_replica_assignment(
partitions.ptr(),
partition_id as i32,
broker_ids.as_ptr() as *mut i32,
broker_ids.len(),
err_buf.as_mut_ptr(),
err_buf.capacity(),
)
};
check_rdkafka_invalid_arg(res, err_buf)?;
}
}
Ok(partitions)
}
}
type NativeNewPartitions = NativePtr<RDKafkaNewPartitions>;
unsafe impl KafkaDrop for RDKafkaNewPartitions {
const TYPE: &'static str = "new partitions";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_NewPartitions_destroy;
}
struct CreatePartitionsFuture {
rx: oneshot::Receiver<NativeEvent>,
}
impl Future for CreatePartitionsFuture {
type Output = KafkaResult<Vec<TopicResult>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;
let res = unsafe { rdsys::rd_kafka_event_CreatePartitions_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"create partitions request received response of incorrect type ({})",
typ
))));
}
let mut n = 0;
let topics = unsafe { rdsys::rd_kafka_CreatePartitions_result_topics(res, &mut n) };
Poll::Ready(Ok(build_topic_results(topics, n)))
}
}
type NativeDeleteRecords = NativePtr<RDKafkaDeleteRecords>;
unsafe impl KafkaDrop for RDKafkaDeleteRecords {
const TYPE: &'static str = "delete records";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteRecords_destroy;
}
struct DeleteRecordsFuture {
rx: oneshot::Receiver<NativeEvent>,
}
impl Future for DeleteRecordsFuture {
type Output = KafkaResult<TopicPartitionList>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;
let res = unsafe { rdsys::rd_kafka_event_DeleteRecords_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"delete records request received response of incorrect type ({})",
typ
))));
}
let tpl = unsafe {
let tpl = rdsys::rd_kafka_DeleteRecords_result_offsets(res);
TopicPartitionList::from_ptr(rdsys::rd_kafka_topic_partition_list_copy(tpl))
};
Poll::Ready(Ok(tpl))
}
}
pub type ConfigResourceResult = Result<ConfigResource, RDKafkaErrorCode>;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ResourceSpecifier<'a> {
Topic(&'a str),
Group(&'a str),
Broker(i32),
}
#[derive(Debug, Eq, PartialEq)]
pub enum OwnedResourceSpecifier {
Topic(String),
Group(String),
Broker(i32),
}
#[derive(Debug, Eq, PartialEq)]
pub enum ConfigSource {
Unknown,
DynamicTopic,
DynamicBroker,
DynamicDefaultBroker,
StaticBroker,
Default,
}
#[derive(Debug, Eq, PartialEq)]
pub struct ConfigEntry {
pub name: String,
pub value: Option<String>,
pub source: ConfigSource,
pub is_read_only: bool,
pub is_default: bool,
pub is_sensitive: bool,
}
#[derive(Debug)]
pub struct ConfigResource {
pub specifier: OwnedResourceSpecifier,
pub entries: Vec<ConfigEntry>,
}
impl ConfigResource {
pub fn entry_map(&self) -> HashMap<&str, &ConfigEntry> {
self.entries.iter().map(|e| (&*e.name, e)).collect()
}
pub fn get(&self, name: &str) -> Option<&ConfigEntry> {
self.entries.iter().find(|e| e.name == name)
}
}
type NativeConfigResource = NativePtr<RDKafkaConfigResource>;
unsafe impl KafkaDrop for RDKafkaConfigResource {
const TYPE: &'static str = "config resource";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_ConfigResource_destroy;
}
fn extract_config_specifier(
resource: *const RDKafkaConfigResource,
) -> KafkaResult<OwnedResourceSpecifier> {
let typ = unsafe { rdsys::rd_kafka_ConfigResource_type(resource) };
match typ {
RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC => {
let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
Ok(OwnedResourceSpecifier::Topic(name))
}
RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP => {
let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
Ok(OwnedResourceSpecifier::Group(name))
}
RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER => {
let name = unsafe { CStr::from_ptr(rdsys::rd_kafka_ConfigResource_name(resource)) }
.to_string_lossy();
match name.parse::<i32>() {
Ok(id) => Ok(OwnedResourceSpecifier::Broker(id)),
Err(_) => Err(KafkaError::AdminOpCreation(format!(
"bogus broker ID in kafka response: {}",
name
))),
}
}
_ => Err(KafkaError::AdminOpCreation(format!(
"bogus resource type in kafka response: {:?}",
typ
))),
}
}
fn extract_config_source(config_source: RDKafkaConfigSource) -> KafkaResult<ConfigSource> {
match config_source {
RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG => Ok(ConfigSource::Unknown),
RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG => {
Ok(ConfigSource::DynamicTopic)
}
RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG => {
Ok(ConfigSource::DynamicBroker)
}
RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG => {
Ok(ConfigSource::DynamicDefaultBroker)
}
RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG => {
Ok(ConfigSource::StaticBroker)
}
RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG => Ok(ConfigSource::Default),
_ => Err(KafkaError::AdminOpCreation(format!(
"bogus config source type in kafka response: {:?}",
config_source,
))),
}
}
struct DescribeConfigsFuture {
rx: oneshot::Receiver<NativeEvent>,
}
impl Future for DescribeConfigsFuture {
type Output = KafkaResult<Vec<ConfigResourceResult>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;
let res = unsafe { rdsys::rd_kafka_event_DescribeConfigs_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"describe configs request received response of incorrect type ({})",
typ
))));
}
let mut n = 0;
let resources = unsafe { rdsys::rd_kafka_DescribeConfigs_result_resources(res, &mut n) };
let mut out = Vec::with_capacity(n);
for i in 0..n {
let resource = unsafe { *resources.add(i) };
let specifier = extract_config_specifier(resource)?;
let mut entries_out = Vec::new();
let mut n = 0;
let entries = unsafe { rdsys::rd_kafka_ConfigResource_configs(resource, &mut n) };
for j in 0..n {
let entry = unsafe { *entries.add(j) };
let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigEntry_name(entry)) };
let value = unsafe {
let value = rdsys::rd_kafka_ConfigEntry_value(entry);
if value.is_null() {
None
} else {
Some(cstr_to_owned(value))
}
};
entries_out.push(ConfigEntry {
name,
value,
source: extract_config_source(unsafe {
rdsys::rd_kafka_ConfigEntry_source(entry)
})?,
is_read_only: unsafe { rdsys::rd_kafka_ConfigEntry_is_read_only(entry) } != 0,
is_default: unsafe { rdsys::rd_kafka_ConfigEntry_is_default(entry) } != 0,
is_sensitive: unsafe { rdsys::rd_kafka_ConfigEntry_is_sensitive(entry) } != 0,
});
}
out.push(Ok(ConfigResource {
specifier,
entries: entries_out,
}))
}
Poll::Ready(Ok(out))
}
}
pub type AlterConfigsResult =
Result<OwnedResourceSpecifier, (OwnedResourceSpecifier, RDKafkaErrorCode)>;
pub struct AlterConfig<'a> {
pub specifier: ResourceSpecifier<'a>,
pub entries: HashMap<&'a str, &'a str>,
}
impl<'a> AlterConfig<'a> {
pub fn new(specifier: ResourceSpecifier<'_>) -> AlterConfig<'_> {
AlterConfig {
specifier,
entries: HashMap::new(),
}
}
pub fn set(mut self, key: &'a str, value: &'a str) -> AlterConfig<'a> {
self.entries.insert(key, value);
self
}
fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeConfigResource> {
let (name, typ) = match self.specifier {
ResourceSpecifier::Topic(name) => (
CString::new(name)?,
RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC,
),
ResourceSpecifier::Group(name) => (
CString::new(name)?,
RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP,
),
ResourceSpecifier::Broker(id) => (
CString::new(format!("{}", id))?,
RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
),
};
let config = unsafe {
NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(typ, name.as_ptr()))
.unwrap()
};
for (key, val) in &self.entries {
let key_c = CString::new(*key)?;
let val_c = CString::new(*val)?;
let res = unsafe {
rdsys::rd_kafka_ConfigResource_set_config(
config.ptr(),
key_c.as_ptr(),
val_c.as_ptr(),
)
};
check_rdkafka_invalid_arg(res, err_buf)?;
}
Ok(config)
}
}
struct AlterConfigsFuture {
rx: oneshot::Receiver<NativeEvent>,
}
impl Future for AlterConfigsFuture {
type Output = KafkaResult<Vec<AlterConfigsResult>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;
let res = unsafe { rdsys::rd_kafka_event_AlterConfigs_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"alter configs request received response of incorrect type ({})",
typ
))));
}
let mut n = 0;
let resources = unsafe { rdsys::rd_kafka_AlterConfigs_result_resources(res, &mut n) };
let mut out = Vec::with_capacity(n);
for i in 0..n {
let resource = unsafe { *resources.add(i) };
let specifier = extract_config_specifier(resource)?;
out.push(Ok(specifier));
}
Poll::Ready(Ok(out))
}
}