mz_kafka_util/
admin.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Helpers for working with Kafka's admin API.
11
12use std::collections::BTreeMap;
13use std::iter;
14use std::time::Duration;
15
16use anyhow::{anyhow, bail};
17use itertools::Itertools;
18use mz_ore::collections::CollectionExt;
19use mz_ore::retry::Retry;
20use mz_ore::str::separated;
21use rdkafka::admin::{
22    AdminClient, AdminOptions, AlterConfig, ConfigEntry, ConfigResource, ConfigSource, NewTopic,
23    OwnedResourceSpecifier, ResourceSpecifier,
24};
25use rdkafka::client::ClientContext;
26use rdkafka::error::{KafkaError, RDKafkaErrorCode};
27use tracing::{info, warn};
28
29/// Get the current configuration for a particular topic.
30///
31/// Materialize may not have permission to list configs for the topic, so callers of this method
32/// should "fail open" if the configs are not available.
33pub async fn get_topic_config<'a, C>(
34    client: &'a AdminClient<C>,
35    admin_opts: &AdminOptions,
36    topic_name: &str,
37) -> anyhow::Result<Vec<ConfigEntry>>
38where
39    C: ClientContext,
40{
41    let ConfigResource { specifier, entries } = client
42        .describe_configs([&ResourceSpecifier::Topic(topic_name)], admin_opts)
43        .await?
44        .into_iter()
45        .exactly_one()??;
46
47    match specifier {
48        OwnedResourceSpecifier::Topic(name) if name.as_str() == topic_name => {}
49        unexpected => {
50            bail!("describe configs returned unexpected resource specifier: {unexpected:?}")
51        }
52    };
53
54    Ok(entries)
55}
56
57/// Alter the current configuration for a particular topic.
58///
59/// Materialize may not have permission to alter configs for the topic, so callers of this method
60/// should "fail open" if the configs are not available.
61pub async fn alter_topic_config<'a, C>(
62    client: &'a AdminClient<C>,
63    admin_opts: &AdminOptions,
64    topic_name: &str,
65    new_config: impl IntoIterator<Item = (&str, &str)>,
66) -> anyhow::Result<()>
67where
68    C: ClientContext,
69{
70    let mut alter_config = AlterConfig::new(ResourceSpecifier::Topic(topic_name));
71    for (key, val) in new_config {
72        alter_config = alter_config.set(key, val);
73    }
74    let result = client
75        .alter_configs([&alter_config], admin_opts)
76        .await?
77        .into_iter()
78        .exactly_one()?;
79
80    let (specifier, result) = match result {
81        Ok(specifier) => (specifier, Ok(())),
82        Err((specifier, err)) => (specifier, Err(KafkaError::AdminOp(err))),
83    };
84
85    match specifier {
86        OwnedResourceSpecifier::Topic(name) if name.as_str() == topic_name => {}
87        unexpected => {
88            bail!("alter configs returned unexpected resource specifier: {unexpected:?}")
89        }
90    };
91
92    Ok(result?)
93}
94
95/// When the topic we want to ensure already exists, what happens?
96#[derive(Debug, Copy, Clone)]
97pub enum EnsureTopicConfig {
98    /// Do nothing: assume whatever config is there is fine.
99    Skip,
100    /// Check and log the existing config, but don't try and change it.
101    Check,
102    /// If the remote config doesn't match, try and change it to match.
103    Alter,
104}
105
106/// Expect the current configuration for a particular topic to match a provided set of values.
107/// If there are other configs set, we will ignore them.
108///
109/// Materialize may not have permission to alter configs for the topic, so callers of this method
110/// should "fail open" if the configs are not available.
111pub async fn ensure_topic_config<'a, C>(
112    client: &'a AdminClient<C>,
113    admin_opts: &AdminOptions,
114    new_topic: &'a NewTopic<'a>,
115    expect: EnsureTopicConfig,
116) -> anyhow::Result<bool>
117where
118    C: ClientContext,
119{
120    let try_alter = match expect {
121        EnsureTopicConfig::Skip => return Ok(true),
122        EnsureTopicConfig::Check => false,
123        EnsureTopicConfig::Alter => true,
124    };
125
126    let mut expected_configs: BTreeMap<_, _> = new_topic.config.iter().copied().collect();
127
128    let actual_configs = get_topic_config(client, admin_opts, new_topic.name).await?;
129    info!(
130        topic = new_topic.name,
131        "got configuration for existing topic: [{}]",
132        separated(
133            ", ",
134            actual_configs.iter().map(|e| {
135                let kv = [&*e.name, e.value.as_ref().map_or("<none>", |v| &*v)];
136                separated(": ", kv)
137            })
138        )
139    );
140
141    let actual_config_values: BTreeMap<_, _> = actual_configs
142        .iter()
143        .filter_map(|e| e.value.as_ref().map(|v| (e.name.as_str(), v.as_str())))
144        .collect();
145    for (config, expected) in &expected_configs {
146        match actual_config_values.get(config) {
147            Some(actual) => {
148                if actual != expected {
149                    warn!(
150                        topic = new_topic.name,
151                        config, expected, actual, "unexpected value for config entry"
152                    )
153                }
154            }
155            None => {
156                warn!(
157                    topic = new_topic.name,
158                    config, expected, "missing expected value for config entry"
159                )
160            }
161        }
162    }
163
164    if try_alter {
165        // rdkafka does not support the newer incremental-alter APIs. Instead, we copy over values
166        // from the fetched dynamic config that we don't explicitly want to overwrite.
167        for entry in &actual_configs {
168            if entry.source != ConfigSource::DynamicTopic {
169                continue;
170            }
171            let Some(value) = entry.value.as_ref() else {
172                continue;
173            };
174            expected_configs.entry(&entry.name).or_insert(value);
175        }
176        alter_topic_config(client, admin_opts, new_topic.name, expected_configs).await?;
177        Ok(true)
178    } else {
179        Ok(false)
180    }
181}
182
183/// Creates a Kafka topic if it does not exist, and waits for it to be reported in the broker
184/// metadata.
185///
186/// This function is a wrapper around [`AdminClient::create_topics`] that
187/// attempts to ensure the topic creation has propagated throughout the Kafka
188/// cluster before returning. Kafka topic creation is asynchronous, so
189/// attempting to consume from or produce to a topic immediately after its
190/// creation can result in "unknown topic" errors.
191///
192/// This function does not return successfully unless it can find the metadata
193/// for the newly-created topic in a call to [`rdkafka::client::Client::fetch_metadata`] and
194/// verify that the metadata reports the topic has the number of partitions
195/// requested in `new_topic`. Empirically, this seems to be the condition that
196/// guarantees that future attempts to consume from or produce to the topic will
197/// succeed.
198///
199/// Returns a boolean indicating whether the topic already existed.
200pub async fn ensure_topic<'a, C>(
201    client: &'a AdminClient<C>,
202    admin_opts: &AdminOptions,
203    new_topic: &'a NewTopic<'a>,
204    on_existing: EnsureTopicConfig,
205) -> anyhow::Result<bool>
206where
207    C: ClientContext,
208{
209    let res = client
210        .create_topics(iter::once(new_topic), admin_opts)
211        .await?;
212
213    let already_exists = match res.as_slice() {
214        &[Ok(_)] => false,
215        &[Err((_, RDKafkaErrorCode::TopicAlreadyExists))] => true,
216        &[Err((_, e))] => bail!(KafkaError::AdminOp(e)),
217        other => bail!(
218            "kafka topic creation returned {} results, but exactly one result was expected",
219            other.len()
220        ),
221    };
222
223    // We don't need to read in metadata / do any validation if the topic already exists.
224    if already_exists {
225        match ensure_topic_config(client, admin_opts, new_topic, on_existing).await {
226            Ok(true) => {}
227            Ok(false) => {
228                info!(
229                    topic = new_topic.name,
230                    "did not sync topic config; configs may not match expected values"
231                );
232            }
233            Err(error) => {
234                warn!(
235                    topic = new_topic.name,
236                    "unable to enforce topic config; configs may not match expected values: {error:#}"
237                )
238            }
239        }
240
241        return Ok(true);
242    }
243
244    // Topic creation is asynchronous, and if we don't wait for it to complete,
245    // we might produce a message (below) that causes it to get automatically
246    // created with the default number partitions, and not the number of
247    // partitions requested in `new_topic`.
248    Retry::default()
249        .max_duration(Duration::from_secs(30))
250        .retry_async(|_| async {
251            let metadata = client
252                .inner()
253                // N.B. It is extremely important not to ask specifically
254                // about the topic here, even though the API supports it!
255                // Asking about the topic will create it automatically...
256                // with the wrong number of partitions. Yes, this is
257                // unbelievably horrible.
258                .fetch_metadata(None, Some(Duration::from_secs(10)))?;
259            let topic = metadata
260                .topics()
261                .iter()
262                .find(|t| t.name() == new_topic.name)
263                .ok_or_else(|| anyhow!("unable to fetch topic metadata after creation"))?;
264            // If the desired number of partitions is not "use the broker
265            // default", wait for the topic to have the correct number of
266            // partitions.
267            if new_topic.num_partitions != -1 {
268                let actual = i32::try_from(topic.partitions().len())?;
269                if actual != new_topic.num_partitions {
270                    bail!(
271                        "topic reports {actual} partitions, but expected {} partitions",
272                        new_topic.num_partitions
273                    );
274                }
275            }
276            Ok(false)
277        })
278        .await
279}
280
281/// Deletes a Kafka topic and waits for it to be reported absent in the broker metadata.
282///
283/// This function is a wrapper around [`AdminClient::delete_topics`] that attempts to ensure the
284/// topic deletion has propagated throughout the Kafka cluster before returning.
285///
286/// This function does not return successfully unless it can observe the metadata not containing
287/// the newly-created topic in a call to [`rdkafka::client::Client::fetch_metadata`]
288pub async fn delete_existing_topic<'a, C>(
289    client: &'a AdminClient<C>,
290    admin_opts: &AdminOptions,
291    topic: &'a str,
292) -> Result<(), DeleteTopicError>
293where
294    C: ClientContext,
295{
296    delete_topic_helper(client, admin_opts, topic, false).await
297}
298
299/// Like `delete_existing_topic` but allow topic to be already deleted
300pub async fn delete_topic<'a, C>(
301    client: &'a AdminClient<C>,
302    admin_opts: &AdminOptions,
303    topic: &'a str,
304) -> Result<(), DeleteTopicError>
305where
306    C: ClientContext,
307{
308    delete_topic_helper(client, admin_opts, topic, true).await
309}
310
311async fn delete_topic_helper<'a, C>(
312    client: &'a AdminClient<C>,
313    admin_opts: &AdminOptions,
314    topic: &'a str,
315    allow_missing: bool,
316) -> Result<(), DeleteTopicError>
317where
318    C: ClientContext,
319{
320    let res = client.delete_topics(&[topic], admin_opts).await?;
321    if res.len() != 1 {
322        return Err(DeleteTopicError::TopicCountMismatch(res.len()));
323    }
324    let already_missing = match res.into_element() {
325        Ok(_) => Ok(false),
326        Err((_, RDKafkaErrorCode::UnknownTopic)) if allow_missing => Ok(true),
327        Err((_, e)) => Err(DeleteTopicError::Kafka(KafkaError::AdminOp(e))),
328    }?;
329
330    // We don't need to read in metadata / do any validation if the topic already exists.
331    if already_missing {
332        return Ok(());
333    }
334
335    // Topic deletion is asynchronous, and if we don't wait for it to complete,
336    // we might produce a message (below) that causes it to get automatically
337    // created with the default number partitions, and not the number of
338    // partitions requested in `new_topic`.
339    Retry::default()
340        .max_duration(Duration::from_secs(30))
341        .retry_async(|_| async {
342            let metadata = client
343                .inner()
344                // N.B. It is extremely important not to ask specifically
345                // about the topic here, even though the API supports it!
346                // Asking about the topic will create it automatically...
347                // with the wrong number of partitions. Yes, this is
348                // unbelievably horrible.
349                .fetch_metadata(None, Some(Duration::from_secs(10)))?;
350            let topic_exists = metadata.topics().iter().any(|t| t.name() == topic);
351            if topic_exists {
352                Err(DeleteTopicError::TopicRessurected)
353            } else {
354                Ok(())
355            }
356        })
357        .await
358}
359
360/// An error while creating a Kafka topic.
361#[derive(Debug, thiserror::Error)]
362pub enum DeleteTopicError {
363    /// An error from the underlying Kafka library.
364    #[error(transparent)]
365    Kafka(#[from] KafkaError),
366    /// Topic creation returned the wrong number of results.
367    #[error("kafka topic creation returned {0} results, but exactly one result was expected")]
368    TopicCountMismatch(usize),
369    /// The topic remained in metadata after being deleted.
370    #[error("topic was recreated after being deleted")]
371    TopicRessurected,
372}