use std::collections::BTreeMap;
use std::time::Duration;
use anyhow::{anyhow, bail, Context};
use mz_ccsr::GetSubjectConfigError;
use mz_kafka_util::client::MzClientContext;
use mz_ore::collections::CollectionExt;
use mz_ore::future::{InTask, OreFutureExt};
use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::connections::KafkaTopicOptions;
use mz_storage_types::errors::ContextCreationErrorExt;
use mz_storage_types::sinks::KafkaSinkConnection;
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, ResourceSpecifier, TopicReplication};
use rdkafka::ClientContext;
use tracing::warn;
pub mod progress_key {
use std::fmt;
use mz_repr::GlobalId;
use rdkafka::message::ToBytes;
#[derive(Debug, Clone)]
pub struct ProgressKey(String);
impl ProgressKey {
pub fn new(sink_id: GlobalId) -> ProgressKey {
ProgressKey(format!("mz-sink-{sink_id}"))
}
}
impl ToBytes for ProgressKey {
fn to_bytes(&self) -> &[u8] {
self.0.as_bytes()
}
}
impl fmt::Display for ProgressKey {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(f)
}
}
}
struct TopicConfigs {
partition_count: i32,
replication_factor: i32,
}
async fn discover_topic_configs<C: ClientContext>(
client: &AdminClient<C>,
topic: &str,
fetch_timeout: Duration,
) -> Result<TopicConfigs, anyhow::Error> {
let mut partition_count = -1;
let mut replication_factor = -1;
let metadata = client
.inner()
.fetch_metadata(None, fetch_timeout)
.with_context(|| {
format!(
"error fetching metadata when creating new topic {} for sink",
topic
)
})?;
if metadata.brokers().len() == 0 {
Err(anyhow!("zero brokers discovered in metadata request"))?;
}
let broker = metadata.brokers()[0].id();
let configs = client
.describe_configs(
&[ResourceSpecifier::Broker(broker)],
&AdminOptions::new().request_timeout(Some(Duration::from_secs(5))),
)
.await
.with_context(|| {
format!(
"error fetching configuration from broker {} when creating new topic {} for sink",
broker, topic
)
})?;
if configs.len() != 1 {
Err(anyhow!(
"error creating topic {} for sink: broker {} returned {} config results, but one was expected",
topic,
broker,
configs.len()
))?;
}
let config = configs.into_element().map_err(|e| {
anyhow!(
"error reading broker configuration when creating topic {} for sink: {}",
topic,
e
)
})?;
if config.entries.is_empty() {
bail!("read empty cluster configuration; do we have DescribeConfigs permissions?")
}
for entry in config.entries {
if entry.name == "num.partitions" && partition_count == -1 {
if let Some(s) = entry.value {
partition_count = s.parse::<i32>().with_context(|| {
format!(
"default partition count {} cannot be parsed into an integer",
s
)
})?;
}
} else if entry.name == "default.replication.factor" && replication_factor == -1 {
if let Some(s) = entry.value {
replication_factor = s.parse::<i32>().with_context(|| {
format!(
"default replication factor {} cannot be parsed into an integer",
s
)
})?;
}
}
}
Ok(TopicConfigs {
partition_count,
replication_factor,
})
}
pub async fn ensure_kafka_topic(
connection: &KafkaSinkConnection,
storage_configuration: &StorageConfiguration,
topic: &str,
KafkaTopicOptions {
partition_count,
replication_factor,
topic_config,
}: &KafkaTopicOptions,
) -> Result<bool, anyhow::Error> {
let client: AdminClient<_> = connection
.connection
.create_with_context(
storage_configuration,
MzClientContext::default(),
&BTreeMap::new(),
InTask::Yes,
)
.await
.add_context("creating admin client failed")?;
let mut partition_count = partition_count.map(|f| *f);
let mut replication_factor = replication_factor.map(|f| *f);
if partition_count.is_none() || replication_factor.is_none() {
let fetch_timeout = storage_configuration
.parameters
.kafka_timeout_config
.fetch_metadata_timeout;
match discover_topic_configs(&client, topic, fetch_timeout).await {
Ok(configs) => {
if partition_count.is_none() {
partition_count = Some(configs.partition_count);
}
if replication_factor.is_none() {
replication_factor = Some(configs.replication_factor);
}
}
Err(e) => {
warn!("Failed to discover default values for topic configs: {e}");
if partition_count.is_none() {
partition_count = Some(-1);
}
if replication_factor.is_none() {
replication_factor = Some(-1);
}
}
};
}
let mut kafka_topic = NewTopic::new(
topic,
partition_count.expect("always set above"),
TopicReplication::Fixed(replication_factor.expect("always set above")),
);
for (key, value) in topic_config {
kafka_topic = kafka_topic.set(key, value);
}
mz_kafka_util::admin::ensure_topic(
&client,
&AdminOptions::new().request_timeout(Some(Duration::from_secs(5))),
&kafka_topic,
)
.await
.with_context(|| format!("Error creating topic {} for sink", topic))
}
pub async fn publish_kafka_schema(
ccsr: mz_ccsr::Client,
subject: String,
schema: String,
schema_type: mz_ccsr::SchemaType,
compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
) -> Result<i32, anyhow::Error> {
if let Some(compatibility_level) = compatibility_level {
let ccsr = ccsr.clone();
let subject = subject.clone();
async move {
match ccsr.get_subject_config(&subject).await {
Ok(config) => {
if config.compatibility_level != compatibility_level {
tracing::debug!(
"compatibility level '{}' does not match intended '{}'",
config.compatibility_level,
compatibility_level
);
}
Ok(())
}
Err(GetSubjectConfigError::SubjectCompatibilityLevelNotSet)
| Err(GetSubjectConfigError::SubjectNotFound) => ccsr
.set_subject_compatibility_level(&subject, compatibility_level)
.await
.map_err(anyhow::Error::from),
Err(e) => Err(e.into()),
}
}
.run_in_task(|| "set_compatibility_level".to_string())
.await
.context("unable to update schema compatibility level in kafka sink")?;
}
let schema_id = async move {
ccsr.publish_schema(&subject, &schema, schema_type, &[])
.await
}
.run_in_task(|| "publish_kafka_schema".to_string())
.await
.context("unable to publish schema to registry in kafka sink")?;
Ok(schema_id)
}