use anyhow::bail;
use aws_config::SdkConfig;
use fancy_regex::Regex;
use std::collections::{btree_map, BTreeMap};
use std::error::Error;
use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::watch;
use anyhow::{anyhow, Context};
use crossbeam::channel::{unbounded, Receiver, Sender};
use mz_ore::collections::CollectionExt;
use mz_ore::error::ErrorExt;
use mz_ore::future::InTask;
use mz_ssh_util::tunnel::{SshTimeoutConfig, SshTunnelConfig, SshTunnelStatus};
use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
use rdkafka::client::{Client, NativeClient, OAuthToken};
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::{ConsumerContext, Rebalance};
use rdkafka::error::{KafkaError, KafkaResult, RDKafkaErrorCode};
use rdkafka::producer::{DefaultProducerContext, DeliveryResult, ProducerContext};
use rdkafka::types::RDKafkaRespErr;
use rdkafka::util::Timeout;
use rdkafka::{ClientContext, Statistics, TopicPartitionList};
use serde::{Deserialize, Serialize};
use tokio::runtime::Handle;
use tracing::{debug, error, info, trace, warn, Level};
use crate::aws;
pub const DEFAULT_TOPIC_METADATA_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
pub struct MzClientContext {
error_tx: Sender<MzKafkaError>,
statistics_tx: watch::Sender<Statistics>,
}
impl Default for MzClientContext {
fn default() -> Self {
Self::with_errors().0
}
}
impl MzClientContext {
pub fn with_errors() -> (Self, Receiver<MzKafkaError>) {
let (error_tx, error_rx) = unbounded();
let (statistics_tx, _) = watch::channel(Default::default());
let ctx = Self {
error_tx,
statistics_tx,
};
(ctx, error_rx)
}
pub fn subscribe_statistics(&self) -> watch::Receiver<Statistics> {
self.statistics_tx.subscribe()
}
fn record_error(&self, msg: &str) {
let err = match MzKafkaError::from_str(msg) {
Ok(err) => err,
Err(()) => {
warn!(original_error = msg, "failed to parse kafka error");
MzKafkaError::Internal(msg.to_owned())
}
};
let _ = self.error_tx.send(err);
}
}
#[derive(Clone, Debug, Eq, PartialEq, thiserror::Error)]
pub enum MzKafkaError {
#[error("Invalid username or password")]
InvalidCredentials,
#[error("Invalid CA certificate")]
InvalidCACertificate,
#[error("Disconnected during handshake; broker might require SSL encryption")]
SSLEncryptionMaybeRequired,
#[error("Broker does not support SSL connections")]
SSLUnsupported,
#[error("Broker did not provide a certificate")]
BrokerCertificateMissing,
#[error("Failed to verify broker certificate")]
InvalidBrokerCertificate,
#[error("Connection reset: {0}")]
ConnectionReset(String),
#[error("Connection timeout")]
ConnectionTimeout,
#[error("Failed to resolve hostname")]
HostnameResolutionFailed,
#[error("Unsupported SASL mechanism")]
UnsupportedSASLMechanism,
#[error("Unsupported broker version")]
UnsupportedBrokerVersion,
#[error("Broker transport failure")]
BrokerTransportFailure,
#[error("All brokers down")]
AllBrokersDown,
#[error("SASL authentication required")]
SaslAuthenticationRequired,
#[error("SASL authentication failed")]
SaslAuthenticationFailed,
#[error("SSL authentication required")]
SslAuthenticationRequired,
#[error("Unknown topic or partition")]
UnknownTopicOrPartition,
#[error("Internal kafka error: {0}")]
Internal(String),
}
impl FromStr for MzKafkaError {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.contains("Authentication failed: Invalid username or password") {
Ok(Self::InvalidCredentials)
} else if s.contains("broker certificate could not be verified") {
Ok(Self::InvalidCACertificate)
} else if s.contains("connecting to a SSL listener?") {
Ok(Self::SSLEncryptionMaybeRequired)
} else if s.contains("client SSL authentication might be required") {
Ok(Self::SslAuthenticationRequired)
} else if s.contains("connecting to a PLAINTEXT broker listener") {
Ok(Self::SSLUnsupported)
} else if s.contains("Broker did not provide a certificate") {
Ok(Self::BrokerCertificateMissing)
} else if s.contains("Failed to verify broker certificate: ") {
Ok(Self::InvalidBrokerCertificate)
} else if let Some((_prefix, inner)) = s.split_once("Send failed: ") {
Ok(Self::ConnectionReset(inner.to_owned()))
} else if let Some((_prefix, inner)) = s.split_once("Receive failed: ") {
Ok(Self::ConnectionReset(inner.to_owned()))
} else if s.contains("request(s) timed out: disconnect") {
Ok(Self::ConnectionTimeout)
} else if s.contains("Failed to resolve") {
Ok(Self::HostnameResolutionFailed)
} else if s.contains("mechanism handshake failed:") {
Ok(Self::UnsupportedSASLMechanism)
} else if s.contains(
"verify that security.protocol is correctly configured, \
broker might require SASL authentication",
) {
Ok(Self::SaslAuthenticationRequired)
} else if s.contains("SASL authentication error: Authentication failed") {
Ok(Self::SaslAuthenticationFailed)
} else if s
.contains("incorrect security.protocol configuration (connecting to a SSL listener?)")
{
Ok(Self::SslAuthenticationRequired)
} else if s.contains("probably due to broker version < 0.10") {
Ok(Self::UnsupportedBrokerVersion)
} else if s.contains("Disconnected while requesting ApiVersion")
|| s.contains("Broker transport failure")
|| s.contains("Connection refused")
{
Ok(Self::BrokerTransportFailure)
} else if Regex::new(r"(\d+)/\1 brokers are down")
.unwrap()
.is_match(s)
.unwrap_or_default()
{
Ok(Self::AllBrokersDown)
} else if s.contains("Unknown topic or partition") || s.contains("Unknown partition") {
Ok(Self::UnknownTopicOrPartition)
} else {
Err(())
}
}
}
impl ClientContext for MzClientContext {
fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
use rdkafka::config::RDKafkaLogLevel::*;
if matches!(level, Emerg | Alert | Critical | Error) || fac == "FAIL" {
self.record_error(log_message);
}
match level {
Emerg | Alert | Critical | Error => {
warn!(target: "librdkafka", "error: {} {}", fac, log_message);
}
Warning => warn!(target: "librdkafka", "warning: {} {}", fac, log_message),
Notice => info!(target: "librdkafka", "{} {}", fac, log_message),
Info => info!(target: "librdkafka", "{} {}", fac, log_message),
Debug => debug!(target: "librdkafka", "{} {}", fac, log_message),
}
}
fn stats(&self, statistics: Statistics) {
self.statistics_tx.send_replace(statistics);
}
fn error(&self, error: KafkaError, reason: &str) {
self.record_error(reason);
warn!(target: "librdkafka", "error: {}: {}", error, reason);
}
}
impl ConsumerContext for MzClientContext {}
impl ProducerContext for MzClientContext {
type DeliveryOpaque = <DefaultProducerContext as ProducerContext>::DeliveryOpaque;
fn delivery(
&self,
delivery_result: &DeliveryResult<'_>,
delivery_opaque: Self::DeliveryOpaque,
) {
DefaultProducerContext.delivery(delivery_result, delivery_opaque);
}
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct BrokerAddr {
pub host: String,
pub port: u16,
}
#[derive(Debug, Clone)]
pub struct BrokerRewrite {
pub host: String,
pub port: Option<u16>,
}
#[derive(Clone)]
enum BrokerRewriteHandle {
Simple(BrokerRewrite),
SshTunnel(
ManagedSshTunnelHandle,
),
FailedDefaultSshTunnel(String),
}
#[derive(Clone)]
pub enum TunnelConfig {
Ssh(SshTunnelConfig),
StaticHost(String),
None,
}
#[derive(Clone)]
pub struct TunnelingClientContext<C> {
inner: C,
rewrites: Arc<Mutex<BTreeMap<BrokerAddr, BrokerRewriteHandle>>>,
default_tunnel: TunnelConfig,
in_task: InTask,
ssh_tunnel_manager: SshTunnelManager,
ssh_timeout_config: SshTimeoutConfig,
aws_config: Option<SdkConfig>,
runtime: Handle,
}
impl<C> TunnelingClientContext<C> {
pub fn new(
inner: C,
runtime: Handle,
ssh_tunnel_manager: SshTunnelManager,
ssh_timeout_config: SshTimeoutConfig,
aws_config: Option<SdkConfig>,
in_task: InTask,
) -> TunnelingClientContext<C> {
TunnelingClientContext {
inner,
rewrites: Arc::new(Mutex::new(BTreeMap::new())),
default_tunnel: TunnelConfig::None,
in_task,
ssh_tunnel_manager,
ssh_timeout_config,
aws_config,
runtime,
}
}
pub fn set_default_tunnel(&mut self, tunnel: TunnelConfig) {
self.default_tunnel = tunnel;
}
pub async fn add_ssh_tunnel(
&self,
broker: BrokerAddr,
tunnel: SshTunnelConfig,
) -> Result<(), anyhow::Error> {
let ssh_tunnel = self
.ssh_tunnel_manager
.connect(
tunnel,
&broker.host,
broker.port,
self.ssh_timeout_config,
self.in_task,
)
.await
.context("creating ssh tunnel")?;
let mut rewrites = self.rewrites.lock().expect("poisoned");
rewrites.insert(broker, BrokerRewriteHandle::SshTunnel(ssh_tunnel));
Ok(())
}
pub fn add_broker_rewrite(&self, broker: BrokerAddr, rewrite: BrokerRewrite) {
let mut rewrites = self.rewrites.lock().expect("poisoned");
rewrites.insert(broker, BrokerRewriteHandle::Simple(rewrite));
}
pub fn inner(&self) -> &C {
&self.inner
}
pub fn tunnel_status(&self) -> SshTunnelStatus {
self.rewrites
.lock()
.expect("poisoned")
.values()
.map(|handle| match handle {
BrokerRewriteHandle::SshTunnel(s) => s.check_status(),
BrokerRewriteHandle::FailedDefaultSshTunnel(e) => {
SshTunnelStatus::Errored(e.clone())
}
BrokerRewriteHandle::Simple(_) => SshTunnelStatus::Running,
})
.fold(SshTunnelStatus::Running, |acc, status| {
match (acc, status) {
(SshTunnelStatus::Running, SshTunnelStatus::Errored(e))
| (SshTunnelStatus::Errored(e), SshTunnelStatus::Running) => {
SshTunnelStatus::Errored(e)
}
(SshTunnelStatus::Errored(err), SshTunnelStatus::Errored(e)) => {
SshTunnelStatus::Errored(format!("{}, {}", err, e))
}
(SshTunnelStatus::Running, SshTunnelStatus::Running) => {
SshTunnelStatus::Running
}
}
})
}
}
impl<C> ClientContext for TunnelingClientContext<C>
where
C: ClientContext,
{
const ENABLE_REFRESH_OAUTH_TOKEN: bool = true;
fn generate_oauth_token(
&self,
_oauthbearer_config: Option<&str>,
) -> Result<OAuthToken, Box<dyn Error>> {
info!(target: "librdkafka", "generating OAuth token");
let generate = || {
let Some(sdk_config) = &self.aws_config else {
bail!("internal error: AWS configuration missing");
};
self.runtime.block_on(aws::generate_auth_token(sdk_config))
};
match generate() {
Ok((token, lifetime_ms)) => {
info!(target: "librdkafka", %lifetime_ms, "successfully generated OAuth token");
trace!(target: "librdkafka", %token);
Ok(OAuthToken {
token,
lifetime_ms,
principal_name: "".to_string(),
})
}
Err(e) => {
warn!(target: "librdkafka", "failed to generate OAuth token: {e:#}");
Err(e.into())
}
}
}
fn resolve_broker_addr(&self, host: &str, port: u16) -> Result<Vec<SocketAddr>, io::Error> {
let return_rewrite = |rewrite: &BrokerRewriteHandle| -> Result<Vec<SocketAddr>, io::Error> {
let rewrite = match rewrite {
BrokerRewriteHandle::Simple(rewrite) => rewrite.clone(),
BrokerRewriteHandle::SshTunnel(ssh_tunnel) => {
let addr = ssh_tunnel.local_addr();
BrokerRewrite {
host: addr.ip().to_string(),
port: Some(addr.port()),
}
}
BrokerRewriteHandle::FailedDefaultSshTunnel(_) => {
unreachable!()
}
};
let rewrite_port = rewrite.port.unwrap_or(port);
info!(
"rewriting broker {}:{} to {}:{}",
host, port, rewrite.host, rewrite_port
);
(rewrite.host, rewrite_port)
.to_socket_addrs()
.map(|addrs| addrs.collect())
};
let addr = BrokerAddr {
host: host.into(),
port,
};
let rewrite = self.rewrites.lock().expect("poisoned").get(&addr).cloned();
match rewrite {
None | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) => {
match &self.default_tunnel {
TunnelConfig::Ssh(default_tunnel) => {
let ssh_tunnel = self.runtime.block_on(async {
self.ssh_tunnel_manager
.connect(
default_tunnel.clone(),
host,
port,
self.ssh_timeout_config,
self.in_task,
)
.await
});
match ssh_tunnel {
Ok(ssh_tunnel) => {
let mut rewrites = self.rewrites.lock().expect("poisoned");
let rewrite = match rewrites.entry(addr.clone()) {
btree_map::Entry::Occupied(mut o)
if matches!(
o.get(),
BrokerRewriteHandle::FailedDefaultSshTunnel(_)
) =>
{
o.insert(BrokerRewriteHandle::SshTunnel(
ssh_tunnel.clone(),
));
o.into_mut()
}
btree_map::Entry::Occupied(o) => o.into_mut(),
btree_map::Entry::Vacant(v) => {
v.insert(BrokerRewriteHandle::SshTunnel(ssh_tunnel.clone()))
}
};
return_rewrite(rewrite)
}
Err(e) => {
warn!(
"failed to create ssh tunnel for {:?}: {}",
addr,
e.display_with_causes()
);
let mut rewrites = self.rewrites.lock().expect("poisoned");
rewrites.entry(addr.clone()).or_insert_with(|| {
BrokerRewriteHandle::FailedDefaultSshTunnel(
e.to_string_with_causes(),
)
});
Err(io::Error::new(
io::ErrorKind::Other,
"creating SSH tunnel failed",
))
}
}
}
TunnelConfig::StaticHost(host) => (host.as_str(), port)
.to_socket_addrs()
.map(|addrs| addrs.collect()),
TunnelConfig::None => {
(host, port).to_socket_addrs().map(|addrs| addrs.collect())
}
}
}
Some(rewrite) => return_rewrite(&rewrite),
}
}
fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
self.inner.log(level, fac, log_message)
}
fn error(&self, error: KafkaError, reason: &str) {
self.inner.error(error, reason)
}
fn stats(&self, statistics: Statistics) {
self.inner.stats(statistics)
}
fn stats_raw(&self, statistics: &[u8]) {
self.inner.stats_raw(statistics)
}
}
impl<C> ConsumerContext for TunnelingClientContext<C>
where
C: ConsumerContext,
{
fn rebalance(
&self,
native_client: &NativeClient,
err: RDKafkaRespErr,
tpl: &mut TopicPartitionList,
) {
self.inner.rebalance(native_client, err, tpl)
}
fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {
self.inner.pre_rebalance(rebalance)
}
fn post_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {
self.inner.post_rebalance(rebalance)
}
fn commit_callback(&self, result: KafkaResult<()>, offsets: &TopicPartitionList) {
self.inner.commit_callback(result, offsets)
}
fn main_queue_min_poll_interval(&self) -> Timeout {
self.inner.main_queue_min_poll_interval()
}
}
impl<C> ProducerContext for TunnelingClientContext<C>
where
C: ProducerContext,
{
type DeliveryOpaque = C::DeliveryOpaque;
fn delivery(
&self,
delivery_result: &DeliveryResult<'_>,
delivery_opaque: Self::DeliveryOpaque,
) {
self.inner.delivery(delivery_result, delivery_opaque)
}
}
pub type PartitionId = i32;
#[derive(Debug, thiserror::Error)]
pub enum GetPartitionsError {
#[error("Topic does not exist")]
TopicDoesNotExist,
#[error(transparent)]
Kafka(#[from] KafkaError),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
pub fn get_partitions<C: ClientContext>(
client: &Client<C>,
topic: &str,
timeout: Duration,
) -> Result<Vec<PartitionId>, GetPartitionsError> {
let meta = client.fetch_metadata(Some(topic), timeout)?;
if meta.topics().len() != 1 {
Err(anyhow!(
"topic {} has {} metadata entries; expected 1",
topic,
meta.topics().len()
))?;
}
fn check_err(err: Option<RDKafkaRespErr>) -> Result<(), GetPartitionsError> {
match err.map(RDKafkaErrorCode::from) {
Some(RDKafkaErrorCode::UnknownTopic | RDKafkaErrorCode::UnknownTopicOrPartition) => {
Err(GetPartitionsError::TopicDoesNotExist)
}
Some(code) => Err(anyhow!(code))?,
None => Ok(()),
}
}
let meta_topic = meta.topics().into_element();
check_err(meta_topic.error())?;
if meta_topic.name() != topic {
Err(anyhow!(
"got results for wrong topic {} (expected {})",
meta_topic.name(),
topic
))?;
}
let mut partition_ids = Vec::with_capacity(meta_topic.partitions().len());
for partition_meta in meta_topic.partitions() {
check_err(partition_meta.error())?;
partition_ids.push(partition_meta.id());
}
if partition_ids.len() == 0 {
Err(GetPartitionsError::TopicDoesNotExist)?;
}
Ok(partition_ids)
}
pub const DEFAULT_KEEPALIVE: bool = true;
pub const DEFAULT_SOCKET_TIMEOUT: Duration = Duration::from_secs(60);
pub const DEFAULT_TRANSACTION_TIMEOUT: Duration = Duration::from_secs(600);
pub const DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT: Duration = Duration::from_secs(30);
pub const DEFAULT_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(10);
pub const DEFAULT_PROGRESS_RECORD_FETCH_TIMEOUT: Duration = Duration::from_secs(90);
pub const DEFAULT_METADATA_FETCH_INTERVAL: Duration = Duration::from_secs(60);
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct TimeoutConfig {
pub keepalive: bool,
pub socket_timeout: Duration,
pub transaction_timeout: Duration,
pub socket_connection_setup_timeout: Duration,
pub fetch_metadata_timeout: Duration,
pub progress_record_fetch_timeout: Duration,
pub default_metadata_fetch_interval: Duration,
}
impl Default for TimeoutConfig {
fn default() -> Self {
TimeoutConfig {
keepalive: DEFAULT_KEEPALIVE,
socket_timeout: DEFAULT_SOCKET_TIMEOUT,
transaction_timeout: DEFAULT_TRANSACTION_TIMEOUT,
socket_connection_setup_timeout: DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT,
fetch_metadata_timeout: DEFAULT_FETCH_METADATA_TIMEOUT,
progress_record_fetch_timeout: DEFAULT_PROGRESS_RECORD_FETCH_TIMEOUT,
default_metadata_fetch_interval: DEFAULT_METADATA_FETCH_INTERVAL,
}
}
}
impl TimeoutConfig {
pub fn build(
keepalive: bool,
socket_timeout: Option<Duration>,
transaction_timeout: Duration,
socket_connection_setup_timeout: Duration,
fetch_metadata_timeout: Duration,
progress_record_fetch_timeout: Option<Duration>,
default_metadata_fetch_interval: Duration,
) -> TimeoutConfig {
let transaction_timeout = if transaction_timeout.as_millis() > i32::MAX.try_into().unwrap()
{
error!(
"transaction_timeout ({transaction_timeout:?}) greater than max \
of {}, defaulting to the default of {DEFAULT_TRANSACTION_TIMEOUT:?}",
i32::MAX
);
DEFAULT_TRANSACTION_TIMEOUT
} else if transaction_timeout.as_millis() < 1000 {
error!(
"transaction_timeout ({transaction_timeout:?}) less than max \
of 1000ms, defaulting to the default of {DEFAULT_TRANSACTION_TIMEOUT:?}"
);
DEFAULT_TRANSACTION_TIMEOUT
} else {
transaction_timeout
};
let progress_record_fetch_timeout_derived_default =
std::cmp::max(transaction_timeout, DEFAULT_PROGRESS_RECORD_FETCH_TIMEOUT);
let progress_record_fetch_timeout =
progress_record_fetch_timeout.unwrap_or(progress_record_fetch_timeout_derived_default);
let progress_record_fetch_timeout = if progress_record_fetch_timeout < transaction_timeout {
error!(
"progress record fetch ({progress_record_fetch_timeout:?}) less than transaction \
timeout ({transaction_timeout:?}), defaulting to transaction timeout {transaction_timeout:?}",
);
transaction_timeout
} else {
progress_record_fetch_timeout
};
let max_socket_timeout = std::cmp::min(
transaction_timeout + Duration::from_millis(100),
Duration::from_secs(300),
);
let socket_timeout_derived_default =
std::cmp::min(max_socket_timeout, DEFAULT_SOCKET_TIMEOUT);
let socket_timeout = socket_timeout.unwrap_or(socket_timeout_derived_default);
let socket_timeout = if socket_timeout > max_socket_timeout {
error!(
"socket_timeout ({socket_timeout:?}) greater than max \
of min(30000, transaction.timeout.ms + 100 ({})), \
defaulting to the maximum of {max_socket_timeout:?}",
transaction_timeout.as_millis() + 100
);
max_socket_timeout
} else if socket_timeout.as_millis() < 10 {
error!(
"socket_timeout ({socket_timeout:?}) less than min \
of 10ms, defaulting to the default of {socket_timeout_derived_default:?}"
);
socket_timeout_derived_default
} else {
socket_timeout
};
let socket_connection_setup_timeout =
if socket_connection_setup_timeout.as_millis() > i32::MAX.try_into().unwrap() {
error!(
"socket_connection_setup_timeout ({socket_connection_setup_timeout:?}) \
greater than max of {}ms, defaulting to the default \
of {DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT:?}",
i32::MAX,
);
DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT
} else if socket_connection_setup_timeout.as_millis() < 10 {
error!(
"socket_connection_setup_timeout ({socket_connection_setup_timeout:?}) \
less than max of 10ms, defaulting to the default of \
{DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT:?}"
);
DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT
} else {
socket_connection_setup_timeout
};
TimeoutConfig {
keepalive,
socket_timeout,
transaction_timeout,
socket_connection_setup_timeout,
fetch_metadata_timeout,
progress_record_fetch_timeout,
default_metadata_fetch_interval,
}
}
}
pub fn create_new_client_config_simple() -> ClientConfig {
create_new_client_config(tracing::Level::INFO, Default::default())
}
pub fn create_new_client_config(
tracing_level: Level,
timeout_config: TimeoutConfig,
) -> ClientConfig {
#[allow(clippy::disallowed_methods)]
let mut config = ClientConfig::new();
let level = if tracing_level >= Level::DEBUG {
RDKafkaLogLevel::Debug
} else if tracing_level >= Level::INFO {
RDKafkaLogLevel::Info
} else if tracing_level >= Level::WARN {
RDKafkaLogLevel::Warning
} else {
RDKafkaLogLevel::Error
};
tracing::debug!(target: "librdkafka", level = ?level, "Determined log level for librdkafka");
config.set_log_level(level);
if tracing_level >= Level::DEBUG {
tracing::debug!(target: "librdkafka", "Enabling debug logs for rdkafka");
config.set("debug", "all");
}
if timeout_config.keepalive {
config.set("socket.keepalive.enable", "true");
}
config.set(
"socket.timeout.ms",
timeout_config.socket_timeout.as_millis().to_string(),
);
config.set(
"transaction.timeout.ms",
timeout_config.transaction_timeout.as_millis().to_string(),
);
config.set(
"socket.connection.setup.timeout.ms",
timeout_config
.socket_connection_setup_timeout
.as_millis()
.to_string(),
);
config
}