1use std::collections::BTreeMap;
13use std::iter;
14use std::time::Duration;
15
16use anyhow::{anyhow, bail};
17use itertools::Itertools;
18use mz_ore::collections::CollectionExt;
19use mz_ore::retry::Retry;
20use mz_ore::str::separated;
21use rdkafka::admin::{
22 AdminClient, AdminOptions, AlterConfig, ConfigEntry, ConfigResource, ConfigSource, NewTopic,
23 OwnedResourceSpecifier, ResourceSpecifier,
24};
25use rdkafka::client::ClientContext;
26use rdkafka::error::{KafkaError, RDKafkaErrorCode};
27use tracing::{info, warn};
28
29pub async fn get_topic_config<'a, C>(
34 client: &'a AdminClient<C>,
35 admin_opts: &AdminOptions,
36 topic_name: &str,
37) -> anyhow::Result<Vec<ConfigEntry>>
38where
39 C: ClientContext,
40{
41 let ConfigResource { specifier, entries } = client
42 .describe_configs([&ResourceSpecifier::Topic(topic_name)], admin_opts)
43 .await?
44 .into_iter()
45 .exactly_one()??;
46
47 match specifier {
48 OwnedResourceSpecifier::Topic(name) if name.as_str() == topic_name => {}
49 unexpected => {
50 bail!("describe configs returned unexpected resource specifier: {unexpected:?}")
51 }
52 };
53
54 Ok(entries)
55}
56
57pub async fn alter_topic_config<'a, C>(
62 client: &'a AdminClient<C>,
63 admin_opts: &AdminOptions,
64 topic_name: &str,
65 new_config: impl IntoIterator<Item = (&str, &str)>,
66) -> anyhow::Result<()>
67where
68 C: ClientContext,
69{
70 let mut alter_config = AlterConfig::new(ResourceSpecifier::Topic(topic_name));
71 for (key, val) in new_config {
72 alter_config = alter_config.set(key, val);
73 }
74 let result = client
75 .alter_configs([&alter_config], admin_opts)
76 .await?
77 .into_iter()
78 .exactly_one()?;
79
80 let (specifier, result) = match result {
81 Ok(specifier) => (specifier, Ok(())),
82 Err((specifier, err)) => (specifier, Err(KafkaError::AdminOp(err))),
83 };
84
85 match specifier {
86 OwnedResourceSpecifier::Topic(name) if name.as_str() == topic_name => {}
87 unexpected => {
88 bail!("alter configs returned unexpected resource specifier: {unexpected:?}")
89 }
90 };
91
92 Ok(result?)
93}
94
95#[derive(Debug, Copy, Clone)]
97pub enum EnsureTopicConfig {
98 Skip,
100 Check,
102 Alter,
104}
105
106pub async fn ensure_topic_config<'a, C>(
112 client: &'a AdminClient<C>,
113 admin_opts: &AdminOptions,
114 new_topic: &'a NewTopic<'a>,
115 expect: EnsureTopicConfig,
116) -> anyhow::Result<bool>
117where
118 C: ClientContext,
119{
120 let try_alter = match expect {
121 EnsureTopicConfig::Skip => return Ok(true),
122 EnsureTopicConfig::Check => false,
123 EnsureTopicConfig::Alter => true,
124 };
125
126 let mut expected_configs: BTreeMap<_, _> = new_topic.config.iter().copied().collect();
127
128 let actual_configs = get_topic_config(client, admin_opts, new_topic.name).await?;
129 info!(
130 topic = new_topic.name,
131 "got configuration for existing topic: [{}]",
132 separated(
133 ", ",
134 actual_configs.iter().map(|e| {
135 let kv = [&*e.name, e.value.as_ref().map_or("<none>", |v| &*v)];
136 separated(": ", kv)
137 })
138 )
139 );
140
141 let actual_config_values: BTreeMap<_, _> = actual_configs
142 .iter()
143 .filter_map(|e| e.value.as_ref().map(|v| (e.name.as_str(), v.as_str())))
144 .collect();
145 for (config, expected) in &expected_configs {
146 match actual_config_values.get(config) {
147 Some(actual) => {
148 if actual != expected {
149 warn!(
150 topic = new_topic.name,
151 config, expected, actual, "unexpected value for config entry"
152 )
153 }
154 }
155 None => {
156 warn!(
157 topic = new_topic.name,
158 config, expected, "missing expected value for config entry"
159 )
160 }
161 }
162 }
163
164 if try_alter {
165 for entry in &actual_configs {
168 if entry.source != ConfigSource::DynamicTopic {
169 continue;
170 }
171 let Some(value) = entry.value.as_ref() else {
172 continue;
173 };
174 expected_configs.entry(&entry.name).or_insert(value);
175 }
176 alter_topic_config(client, admin_opts, new_topic.name, expected_configs).await?;
177 Ok(true)
178 } else {
179 Ok(false)
180 }
181}
182
183pub async fn ensure_topic<'a, C>(
201 client: &'a AdminClient<C>,
202 admin_opts: &AdminOptions,
203 new_topic: &'a NewTopic<'a>,
204 on_existing: EnsureTopicConfig,
205) -> anyhow::Result<bool>
206where
207 C: ClientContext,
208{
209 let metadata = client
211 .inner()
212 .fetch_metadata(None, Some(Duration::from_secs(10)))?;
218 let already_exists = metadata.topics().iter().any(|t| t.name() == new_topic.name) || {
219 let res = client
221 .create_topics(iter::once(new_topic), admin_opts)
222 .await?;
223
224 match res.as_slice() {
226 &[Ok(_)] => false,
227 &[Err((_, RDKafkaErrorCode::TopicAlreadyExists))] => true,
228 &[Err((_, e))] => bail!(KafkaError::AdminOp(e)),
229 other => bail!(
230 "kafka topic creation returned {} results, but exactly one result was expected",
231 other.len()
232 ),
233 }
234 };
235
236 if already_exists {
238 match ensure_topic_config(client, admin_opts, new_topic, on_existing).await {
239 Ok(true) => {}
240 Ok(false) => {
241 info!(
242 topic = new_topic.name,
243 "did not sync topic config; configs may not match expected values"
244 );
245 }
246 Err(error) => {
247 warn!(
248 topic = new_topic.name,
249 "unable to enforce topic config; configs may not match expected values: {error:#}"
250 )
251 }
252 }
253
254 return Ok(true);
255 }
256
257 Retry::default()
262 .max_duration(Duration::from_secs(30))
263 .retry_async(|_| async {
264 let metadata = client
265 .inner()
266 .fetch_metadata(None, Some(Duration::from_secs(10)))?;
272 let topic = metadata
273 .topics()
274 .iter()
275 .find(|t| t.name() == new_topic.name)
276 .ok_or_else(|| anyhow!("unable to fetch topic metadata after creation"))?;
277 if new_topic.num_partitions != -1 {
281 let actual = i32::try_from(topic.partitions().len())?;
282 if actual != new_topic.num_partitions {
283 bail!(
284 "topic reports {actual} partitions, but expected {} partitions",
285 new_topic.num_partitions
286 );
287 }
288 }
289 Ok(false)
290 })
291 .await
292}
293
294pub async fn delete_existing_topic<'a, C>(
302 client: &'a AdminClient<C>,
303 admin_opts: &AdminOptions,
304 topic: &'a str,
305) -> Result<(), DeleteTopicError>
306where
307 C: ClientContext,
308{
309 delete_topic_helper(client, admin_opts, topic, false).await
310}
311
312pub async fn delete_topic<'a, C>(
314 client: &'a AdminClient<C>,
315 admin_opts: &AdminOptions,
316 topic: &'a str,
317) -> Result<(), DeleteTopicError>
318where
319 C: ClientContext,
320{
321 delete_topic_helper(client, admin_opts, topic, true).await
322}
323
324async fn delete_topic_helper<'a, C>(
325 client: &'a AdminClient<C>,
326 admin_opts: &AdminOptions,
327 topic: &'a str,
328 allow_missing: bool,
329) -> Result<(), DeleteTopicError>
330where
331 C: ClientContext,
332{
333 let res = client.delete_topics(&[topic], admin_opts).await?;
334 if res.len() != 1 {
335 return Err(DeleteTopicError::TopicCountMismatch(res.len()));
336 }
337 let already_missing = match res.into_element() {
338 Ok(_) => Ok(false),
339 Err((_, RDKafkaErrorCode::UnknownTopic)) if allow_missing => Ok(true),
340 Err((_, e)) => Err(DeleteTopicError::Kafka(KafkaError::AdminOp(e))),
341 }?;
342
343 if already_missing {
345 return Ok(());
346 }
347
348 Retry::default()
353 .max_duration(Duration::from_secs(30))
354 .retry_async(|_| async {
355 let metadata = client
356 .inner()
357 .fetch_metadata(None, Some(Duration::from_secs(10)))?;
363 let topic_exists = metadata.topics().iter().any(|t| t.name() == topic);
364 if topic_exists {
365 Err(DeleteTopicError::TopicRessurected)
366 } else {
367 Ok(())
368 }
369 })
370 .await
371}
372
373#[derive(Debug, thiserror::Error)]
375pub enum DeleteTopicError {
376 #[error(transparent)]
378 Kafka(#[from] KafkaError),
379 #[error("kafka topic creation returned {0} results, but exactly one result was expected")]
381 TopicCountMismatch(usize),
382 #[error("topic was recreated after being deleted")]
384 TopicRessurected,
385}