mz_storage_client/
sink.rs
1use std::collections::BTreeMap;
11use std::time::Duration;
12
13use anyhow::{Context, anyhow, bail};
14use mz_ccsr::GetSubjectConfigError;
15use mz_kafka_util::admin::EnsureTopicConfig;
16use mz_kafka_util::client::MzClientContext;
17use mz_ore::collections::CollectionExt;
18use mz_ore::future::{InTask, OreFutureExt};
19use mz_storage_types::configuration::StorageConfiguration;
20use mz_storage_types::connections::KafkaTopicOptions;
21use mz_storage_types::errors::ContextCreationErrorExt;
22use mz_storage_types::sinks::KafkaSinkConnection;
23use rdkafka::ClientContext;
24use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, ResourceSpecifier, TopicReplication};
25use tracing::warn;
26
27pub mod progress_key {
28 use std::fmt;
29
30 use mz_repr::GlobalId;
31 use rdkafka::message::ToBytes;
32
33 #[derive(Debug, Clone)]
35 pub struct ProgressKey(String);
36
37 impl ProgressKey {
38 pub fn new(sink_id: GlobalId) -> ProgressKey {
40 ProgressKey(format!("mz-sink-{sink_id}"))
41 }
42 }
43
44 impl ToBytes for ProgressKey {
45 fn to_bytes(&self) -> &[u8] {
46 self.0.as_bytes()
47 }
48 }
49
50 impl fmt::Display for ProgressKey {
51 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
52 self.0.fmt(f)
53 }
54 }
55}
56
57struct TopicConfigs {
58 partition_count: i32,
59 replication_factor: i32,
60}
61
62async fn discover_topic_configs<C: ClientContext>(
63 client: &AdminClient<C>,
64 topic: &str,
65 fetch_timeout: Duration,
66) -> Result<TopicConfigs, anyhow::Error> {
67 let mut partition_count = -1;
68 let mut replication_factor = -1;
69
70 let metadata = client
71 .inner()
72 .fetch_metadata(None, fetch_timeout)
73 .with_context(|| {
74 format!(
75 "error fetching metadata when creating new topic {} for sink",
76 topic
77 )
78 })?;
79
80 if metadata.brokers().len() == 0 {
81 Err(anyhow!("zero brokers discovered in metadata request"))?;
82 }
83
84 let broker = metadata.brokers()[0].id();
85 let configs = client
86 .describe_configs(
87 &[ResourceSpecifier::Broker(broker)],
88 &AdminOptions::new().request_timeout(Some(Duration::from_secs(5))),
89 )
90 .await
91 .with_context(|| {
92 format!(
93 "error fetching configuration from broker {} when creating new topic {} for sink",
94 broker, topic
95 )
96 })?;
97
98 if configs.len() != 1 {
99 Err(anyhow!(
100 "error creating topic {} for sink: broker {} returned {} config results, but one was expected",
101 topic,
102 broker,
103 configs.len()
104 ))?;
105 }
106
107 let config = configs.into_element().map_err(|e| {
108 anyhow!(
109 "error reading broker configuration when creating topic {} for sink: {}",
110 topic,
111 e
112 )
113 })?;
114
115 if config.entries.is_empty() {
116 bail!("read empty cluster configuration; do we have DescribeConfigs permissions?")
117 }
118
119 for entry in config.entries {
120 if entry.name == "num.partitions" && partition_count == -1 {
121 if let Some(s) = entry.value {
122 partition_count = s.parse::<i32>().with_context(|| {
123 format!(
124 "default partition count {} cannot be parsed into an integer",
125 s
126 )
127 })?;
128 }
129 } else if entry.name == "default.replication.factor" && replication_factor == -1 {
130 if let Some(s) = entry.value {
131 replication_factor = s.parse::<i32>().with_context(|| {
132 format!(
133 "default replication factor {} cannot be parsed into an integer",
134 s
135 )
136 })?;
137 }
138 }
139 }
140
141 Ok(TopicConfigs {
142 partition_count,
143 replication_factor,
144 })
145}
146
147pub async fn ensure_kafka_topic(
155 connection: &KafkaSinkConnection,
156 storage_configuration: &StorageConfiguration,
157 topic: &str,
158 KafkaTopicOptions {
159 partition_count,
160 replication_factor,
161 topic_config,
162 }: &KafkaTopicOptions,
163 ensure_topic_config: EnsureTopicConfig,
164) -> Result<bool, anyhow::Error> {
165 let client: AdminClient<_> = connection
166 .connection
167 .create_with_context(
168 storage_configuration,
169 MzClientContext::default(),
170 &BTreeMap::new(),
171 InTask::Yes,
173 )
174 .await
175 .add_context("creating admin client failed")?;
176 let mut partition_count = partition_count.map(|f| *f);
177 let mut replication_factor = replication_factor.map(|f| *f);
178 if partition_count.is_none() || replication_factor.is_none() {
183 let fetch_timeout = storage_configuration
184 .parameters
185 .kafka_timeout_config
186 .fetch_metadata_timeout;
187 match discover_topic_configs(&client, topic, fetch_timeout).await {
188 Ok(configs) => {
189 if partition_count.is_none() {
190 partition_count = Some(configs.partition_count);
191 }
192 if replication_factor.is_none() {
193 replication_factor = Some(configs.replication_factor);
194 }
195 }
196 Err(e) => {
197 warn!("Failed to discover default values for topic configs: {e}");
200 if partition_count.is_none() {
201 partition_count = Some(-1);
202 }
203 if replication_factor.is_none() {
204 replication_factor = Some(-1);
205 }
206 }
207 };
208 }
209
210 let mut kafka_topic = NewTopic::new(
211 topic,
212 partition_count.expect("always set above"),
213 TopicReplication::Fixed(replication_factor.expect("always set above")),
214 );
215
216 for (key, value) in topic_config {
217 kafka_topic = kafka_topic.set(key, value);
218 }
219
220 mz_kafka_util::admin::ensure_topic(
221 &client,
222 &AdminOptions::new().request_timeout(Some(Duration::from_secs(5))),
223 &kafka_topic,
224 ensure_topic_config,
225 )
226 .await
227 .with_context(|| format!("Error creating topic {} for sink", topic))
228}
229
230pub async fn publish_kafka_schema(
236 ccsr: mz_ccsr::Client,
237 subject: String,
238 schema: String,
239 schema_type: mz_ccsr::SchemaType,
240 compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
241) -> Result<i32, anyhow::Error> {
242 if let Some(compatibility_level) = compatibility_level {
243 let ccsr = ccsr.clone();
244 let subject = subject.clone();
245 async move {
246 match ccsr.get_subject_config(&subject).await {
248 Ok(config) => {
249 if config.compatibility_level != compatibility_level {
250 tracing::debug!(
251 "compatibility level '{}' does not match intended '{}'",
252 config.compatibility_level,
253 compatibility_level
254 );
255 }
256 Ok(())
257 }
258 Err(GetSubjectConfigError::SubjectCompatibilityLevelNotSet)
259 | Err(GetSubjectConfigError::SubjectNotFound) => ccsr
260 .set_subject_compatibility_level(&subject, compatibility_level)
261 .await
262 .map_err(anyhow::Error::from),
263 Err(e) => Err(e.into()),
264 }
265 }
266 .run_in_task(|| "set_compatibility_level".to_string())
267 .await
268 .context("unable to update schema compatibility level in kafka sink")?;
269 }
270
271 let schema_id = async move {
272 ccsr.publish_schema(&subject, &schema, schema_type, &[])
273 .await
274 }
275 .run_in_task(|| "publish_kafka_schema".to_string())
276 .await
277 .context("unable to publish schema to registry in kafka sink")?;
278
279 Ok(schema_id)
280}