1use std::collections::BTreeMap;
13use std::iter;
14use std::time::Duration;
15
16use anyhow::{anyhow, bail};
17use itertools::Itertools;
18use mz_ore::collections::CollectionExt;
19use mz_ore::retry::Retry;
20use mz_ore::str::separated;
21use rdkafka::admin::{
22 AdminClient, AdminOptions, AlterConfig, ConfigEntry, ConfigResource, ConfigSource, NewTopic,
23 OwnedResourceSpecifier, ResourceSpecifier,
24};
25use rdkafka::client::ClientContext;
26use rdkafka::error::{KafkaError, RDKafkaErrorCode};
27use tracing::{info, warn};
28
29pub async fn get_topic_config<'a, C>(
34 client: &'a AdminClient<C>,
35 admin_opts: &AdminOptions,
36 topic_name: &str,
37) -> anyhow::Result<Vec<ConfigEntry>>
38where
39 C: ClientContext,
40{
41 let ConfigResource { specifier, entries } = client
42 .describe_configs([&ResourceSpecifier::Topic(topic_name)], admin_opts)
43 .await?
44 .into_iter()
45 .exactly_one()??;
46
47 match specifier {
48 OwnedResourceSpecifier::Topic(name) if name.as_str() == topic_name => {}
49 unexpected => {
50 bail!("describe configs returned unexpected resource specifier: {unexpected:?}")
51 }
52 };
53
54 Ok(entries)
55}
56
57pub async fn alter_topic_config<'a, C>(
62 client: &'a AdminClient<C>,
63 admin_opts: &AdminOptions,
64 topic_name: &str,
65 new_config: impl IntoIterator<Item = (&str, &str)>,
66) -> anyhow::Result<()>
67where
68 C: ClientContext,
69{
70 let mut alter_config = AlterConfig::new(ResourceSpecifier::Topic(topic_name));
71 for (key, val) in new_config {
72 alter_config = alter_config.set(key, val);
73 }
74 let result = client
75 .alter_configs([&alter_config], admin_opts)
76 .await?
77 .into_iter()
78 .exactly_one()?;
79
80 let (specifier, result) = match result {
81 Ok(specifier) => (specifier, Ok(())),
82 Err((specifier, err)) => (specifier, Err(KafkaError::AdminOp(err))),
83 };
84
85 match specifier {
86 OwnedResourceSpecifier::Topic(name) if name.as_str() == topic_name => {}
87 unexpected => {
88 bail!("alter configs returned unexpected resource specifier: {unexpected:?}")
89 }
90 };
91
92 Ok(result?)
93}
94
95#[derive(Debug, Copy, Clone)]
97pub enum EnsureTopicConfig {
98 Skip,
100 Check,
102 Alter,
104}
105
106pub async fn ensure_topic_config<'a, C>(
112 client: &'a AdminClient<C>,
113 admin_opts: &AdminOptions,
114 new_topic: &'a NewTopic<'a>,
115 expect: EnsureTopicConfig,
116) -> anyhow::Result<bool>
117where
118 C: ClientContext,
119{
120 let try_alter = match expect {
121 EnsureTopicConfig::Skip => return Ok(true),
122 EnsureTopicConfig::Check => false,
123 EnsureTopicConfig::Alter => true,
124 };
125
126 let mut expected_configs: BTreeMap<_, _> = new_topic.config.iter().copied().collect();
127
128 let actual_configs = get_topic_config(client, admin_opts, new_topic.name).await?;
129 info!(
130 topic = new_topic.name,
131 "got configuration for existing topic: [{}]",
132 separated(
133 ", ",
134 actual_configs.iter().map(|e| {
135 let kv = [&*e.name, e.value.as_ref().map_or("<none>", |v| &*v)];
136 separated(": ", kv)
137 })
138 )
139 );
140
141 let actual_config_values: BTreeMap<_, _> = actual_configs
142 .iter()
143 .filter_map(|e| e.value.as_ref().map(|v| (e.name.as_str(), v.as_str())))
144 .collect();
145 for (config, expected) in &expected_configs {
146 match actual_config_values.get(config) {
147 Some(actual) => {
148 if actual != expected {
149 warn!(
150 topic = new_topic.name,
151 config, expected, actual, "unexpected value for config entry"
152 )
153 }
154 }
155 None => {
156 warn!(
157 topic = new_topic.name,
158 config, expected, "missing expected value for config entry"
159 )
160 }
161 }
162 }
163
164 if try_alter {
165 for entry in &actual_configs {
168 if entry.source != ConfigSource::DynamicTopic {
169 continue;
170 }
171 let Some(value) = entry.value.as_ref() else {
172 continue;
173 };
174 expected_configs.entry(&entry.name).or_insert(value);
175 }
176 alter_topic_config(client, admin_opts, new_topic.name, expected_configs).await?;
177 Ok(true)
178 } else {
179 Ok(false)
180 }
181}
182
183pub async fn ensure_topic<'a, C>(
201 client: &'a AdminClient<C>,
202 admin_opts: &AdminOptions,
203 new_topic: &'a NewTopic<'a>,
204 on_existing: EnsureTopicConfig,
205) -> anyhow::Result<bool>
206where
207 C: ClientContext,
208{
209 let res = client
210 .create_topics(iter::once(new_topic), admin_opts)
211 .await?;
212
213 let already_exists = match res.as_slice() {
214 &[Ok(_)] => false,
215 &[Err((_, RDKafkaErrorCode::TopicAlreadyExists))] => true,
216 &[Err((_, e))] => bail!(KafkaError::AdminOp(e)),
217 other => bail!(
218 "kafka topic creation returned {} results, but exactly one result was expected",
219 other.len()
220 ),
221 };
222
223 if already_exists {
225 match ensure_topic_config(client, admin_opts, new_topic, on_existing).await {
226 Ok(true) => {}
227 Ok(false) => {
228 info!(
229 topic = new_topic.name,
230 "did not sync topic config; configs may not match expected values"
231 );
232 }
233 Err(error) => {
234 warn!(
235 topic = new_topic.name,
236 "unable to enforce topic config; configs may not match expected values: {error:#}"
237 )
238 }
239 }
240
241 return Ok(true);
242 }
243
244 Retry::default()
249 .max_duration(Duration::from_secs(30))
250 .retry_async(|_| async {
251 let metadata = client
252 .inner()
253 .fetch_metadata(None, Some(Duration::from_secs(10)))?;
259 let topic = metadata
260 .topics()
261 .iter()
262 .find(|t| t.name() == new_topic.name)
263 .ok_or_else(|| anyhow!("unable to fetch topic metadata after creation"))?;
264 if new_topic.num_partitions != -1 {
268 let actual = i32::try_from(topic.partitions().len())?;
269 if actual != new_topic.num_partitions {
270 bail!(
271 "topic reports {actual} partitions, but expected {} partitions",
272 new_topic.num_partitions
273 );
274 }
275 }
276 Ok(false)
277 })
278 .await
279}
280
281pub async fn delete_existing_topic<'a, C>(
289 client: &'a AdminClient<C>,
290 admin_opts: &AdminOptions,
291 topic: &'a str,
292) -> Result<(), DeleteTopicError>
293where
294 C: ClientContext,
295{
296 delete_topic_helper(client, admin_opts, topic, false).await
297}
298
299pub async fn delete_topic<'a, C>(
301 client: &'a AdminClient<C>,
302 admin_opts: &AdminOptions,
303 topic: &'a str,
304) -> Result<(), DeleteTopicError>
305where
306 C: ClientContext,
307{
308 delete_topic_helper(client, admin_opts, topic, true).await
309}
310
311async fn delete_topic_helper<'a, C>(
312 client: &'a AdminClient<C>,
313 admin_opts: &AdminOptions,
314 topic: &'a str,
315 allow_missing: bool,
316) -> Result<(), DeleteTopicError>
317where
318 C: ClientContext,
319{
320 let res = client.delete_topics(&[topic], admin_opts).await?;
321 if res.len() != 1 {
322 return Err(DeleteTopicError::TopicCountMismatch(res.len()));
323 }
324 let already_missing = match res.into_element() {
325 Ok(_) => Ok(false),
326 Err((_, RDKafkaErrorCode::UnknownTopic)) if allow_missing => Ok(true),
327 Err((_, e)) => Err(DeleteTopicError::Kafka(KafkaError::AdminOp(e))),
328 }?;
329
330 if already_missing {
332 return Ok(());
333 }
334
335 Retry::default()
340 .max_duration(Duration::from_secs(30))
341 .retry_async(|_| async {
342 let metadata = client
343 .inner()
344 .fetch_metadata(None, Some(Duration::from_secs(10)))?;
350 let topic_exists = metadata.topics().iter().any(|t| t.name() == topic);
351 if topic_exists {
352 Err(DeleteTopicError::TopicRessurected)
353 } else {
354 Ok(())
355 }
356 })
357 .await
358}
359
360#[derive(Debug, thiserror::Error)]
362pub enum DeleteTopicError {
363 #[error(transparent)]
365 Kafka(#[from] KafkaError),
366 #[error("kafka topic creation returned {0} results, but exactly one result was expected")]
368 TopicCountMismatch(usize),
369 #[error("topic was recreated after being deleted")]
371 TopicRessurected,
372}