Skip to main content

mz_kafka_util/
admin.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Helpers for working with Kafka's admin API.
11
12use std::collections::BTreeMap;
13use std::iter;
14use std::time::Duration;
15
16use anyhow::{anyhow, bail};
17use itertools::Itertools;
18use mz_ore::collections::CollectionExt;
19use mz_ore::retry::Retry;
20use mz_ore::str::separated;
21use rdkafka::admin::{
22    AdminClient, AdminOptions, AlterConfig, ConfigEntry, ConfigResource, ConfigSource, NewTopic,
23    OwnedResourceSpecifier, ResourceSpecifier,
24};
25use rdkafka::client::ClientContext;
26use rdkafka::error::{KafkaError, RDKafkaErrorCode};
27use tracing::{info, warn};
28
29/// Get the current configuration for a particular topic.
30///
31/// Materialize may not have permission to list configs for the topic, so callers of this method
32/// should "fail open" if the configs are not available.
33pub async fn get_topic_config<'a, C>(
34    client: &'a AdminClient<C>,
35    admin_opts: &AdminOptions,
36    topic_name: &str,
37) -> anyhow::Result<Vec<ConfigEntry>>
38where
39    C: ClientContext,
40{
41    let ConfigResource { specifier, entries } = client
42        .describe_configs([&ResourceSpecifier::Topic(topic_name)], admin_opts)
43        .await?
44        .into_iter()
45        .exactly_one()??;
46
47    match specifier {
48        OwnedResourceSpecifier::Topic(name) if name.as_str() == topic_name => {}
49        unexpected => {
50            bail!("describe configs returned unexpected resource specifier: {unexpected:?}")
51        }
52    };
53
54    Ok(entries)
55}
56
57/// Alter the current configuration for a particular topic.
58///
59/// Materialize may not have permission to alter configs for the topic, so callers of this method
60/// should "fail open" if the configs are not available.
61pub async fn alter_topic_config<'a, C>(
62    client: &'a AdminClient<C>,
63    admin_opts: &AdminOptions,
64    topic_name: &str,
65    new_config: impl IntoIterator<Item = (&str, &str)>,
66) -> anyhow::Result<()>
67where
68    C: ClientContext,
69{
70    let mut alter_config = AlterConfig::new(ResourceSpecifier::Topic(topic_name));
71    for (key, val) in new_config {
72        alter_config = alter_config.set(key, val);
73    }
74    let result = client
75        .alter_configs([&alter_config], admin_opts)
76        .await?
77        .into_iter()
78        .exactly_one()?;
79
80    let (specifier, result) = match result {
81        Ok(specifier) => (specifier, Ok(())),
82        Err((specifier, err)) => (specifier, Err(KafkaError::AdminOp(err))),
83    };
84
85    match specifier {
86        OwnedResourceSpecifier::Topic(name) if name.as_str() == topic_name => {}
87        unexpected => {
88            bail!("alter configs returned unexpected resource specifier: {unexpected:?}")
89        }
90    };
91
92    Ok(result?)
93}
94
95/// When the topic we want to ensure already exists, what happens?
96#[derive(Debug, Copy, Clone)]
97pub enum EnsureTopicConfig {
98    /// Do nothing: assume whatever config is there is fine.
99    Skip,
100    /// Check and log the existing config, but don't try and change it.
101    Check,
102    /// If the remote config doesn't match, try and change it to match.
103    Alter,
104}
105
106/// Expect the current configuration for a particular topic to match a provided set of values.
107/// If there are other configs set, we will ignore them.
108///
109/// Materialize may not have permission to alter configs for the topic, so callers of this method
110/// should "fail open" if the configs are not available.
111pub async fn ensure_topic_config<'a, C>(
112    client: &'a AdminClient<C>,
113    admin_opts: &AdminOptions,
114    new_topic: &'a NewTopic<'a>,
115    expect: EnsureTopicConfig,
116) -> anyhow::Result<bool>
117where
118    C: ClientContext,
119{
120    let try_alter = match expect {
121        EnsureTopicConfig::Skip => return Ok(true),
122        EnsureTopicConfig::Check => false,
123        EnsureTopicConfig::Alter => true,
124    };
125
126    let mut expected_configs: BTreeMap<_, _> = new_topic.config.iter().copied().collect();
127
128    let actual_configs = get_topic_config(client, admin_opts, new_topic.name).await?;
129    info!(
130        topic = new_topic.name,
131        "got configuration for existing topic: [{}]",
132        separated(
133            ", ",
134            actual_configs.iter().map(|e| {
135                let kv = [&*e.name, e.value.as_ref().map_or("<none>", |v| &*v)];
136                separated(": ", kv)
137            })
138        )
139    );
140
141    let actual_config_values: BTreeMap<_, _> = actual_configs
142        .iter()
143        .filter_map(|e| e.value.as_ref().map(|v| (e.name.as_str(), v.as_str())))
144        .collect();
145    for (config, expected) in &expected_configs {
146        match actual_config_values.get(config) {
147            Some(actual) => {
148                if actual != expected {
149                    warn!(
150                        topic = new_topic.name,
151                        config, expected, actual, "unexpected value for config entry"
152                    )
153                }
154            }
155            None => {
156                warn!(
157                    topic = new_topic.name,
158                    config, expected, "missing expected value for config entry"
159                )
160            }
161        }
162    }
163
164    if try_alter {
165        // rdkafka does not support the newer incremental-alter APIs. Instead, we copy over values
166        // from the fetched dynamic config that we don't explicitly want to overwrite.
167        for entry in &actual_configs {
168            if entry.source != ConfigSource::DynamicTopic {
169                continue;
170            }
171            let Some(value) = entry.value.as_ref() else {
172                continue;
173            };
174            expected_configs.entry(&entry.name).or_insert(value);
175        }
176        alter_topic_config(client, admin_opts, new_topic.name, expected_configs).await?;
177        Ok(true)
178    } else {
179        Ok(false)
180    }
181}
182
183/// Creates a Kafka topic if it does not exist, and waits for it to be reported in the broker
184/// metadata.
185///
186/// This function is a wrapper around [`AdminClient::create_topics`] that
187/// attempts to ensure the topic creation has propagated throughout the Kafka
188/// cluster before returning. Kafka topic creation is asynchronous, so
189/// attempting to consume from or produce to a topic immediately after its
190/// creation can result in "unknown topic" errors.
191///
192/// This function does not return successfully unless it can find the metadata
193/// for the newly-created topic in a call to [`rdkafka::client::Client::fetch_metadata`] and
194/// verify that the metadata reports the topic has the number of partitions
195/// requested in `new_topic`. Empirically, this seems to be the condition that
196/// guarantees that future attempts to consume from or produce to the topic will
197/// succeed.
198///
199/// Returns a boolean indicating whether the topic already existed.
200pub async fn ensure_topic<'a, C>(
201    client: &'a AdminClient<C>,
202    admin_opts: &AdminOptions,
203    new_topic: &'a NewTopic<'a>,
204    on_existing: EnsureTopicConfig,
205) -> anyhow::Result<bool>
206where
207    C: ClientContext,
208{
209    // First, check whether the topic already exists.
210    let metadata = client
211        .inner()
212        // N.B. It is extremely important not to ask specifically
213        // about the topic here, even though the API supports it!
214        // Asking about the topic will create it automatically...
215        // with the wrong number of partitions. Yes, this is
216        // unbelievably horrible.
217        .fetch_metadata(None, Some(Duration::from_secs(10)))?;
218    let already_exists = metadata.topics().iter().any(|t| t.name() == new_topic.name) || {
219        // If we didn't just see the topic in the metadata, try creating it.
220        let res = client
221            .create_topics(iter::once(new_topic), admin_opts)
222            .await?;
223
224        // We still check for "already exists" in case we're racing against someone else.
225        match res.as_slice() {
226            &[Ok(_)] => false,
227            &[Err((_, RDKafkaErrorCode::TopicAlreadyExists))] => true,
228            &[Err((_, e))] => bail!(KafkaError::AdminOp(e)),
229            other => bail!(
230                "kafka topic creation returned {} results, but exactly one result was expected",
231                other.len()
232            ),
233        }
234    };
235
236    // We don't need to read in metadata / do any validation if the topic already exists.
237    if already_exists {
238        match ensure_topic_config(client, admin_opts, new_topic, on_existing).await {
239            Ok(true) => {}
240            Ok(false) => {
241                info!(
242                    topic = new_topic.name,
243                    "did not sync topic config; configs may not match expected values"
244                );
245            }
246            Err(error) => {
247                warn!(
248                    topic = new_topic.name,
249                    "unable to enforce topic config; configs may not match expected values: {error:#}"
250                )
251            }
252        }
253
254        return Ok(true);
255    }
256
257    // Topic creation is asynchronous, and if we don't wait for it to complete,
258    // we might produce a message (below) that causes it to get automatically
259    // created with the default number partitions, and not the number of
260    // partitions requested in `new_topic`.
261    Retry::default()
262        .max_duration(Duration::from_secs(30))
263        .retry_async(|_| async {
264            let metadata = client
265                .inner()
266                // N.B. It is extremely important not to ask specifically
267                // about the topic here, even though the API supports it!
268                // Asking about the topic will create it automatically...
269                // with the wrong number of partitions. Yes, this is
270                // unbelievably horrible.
271                .fetch_metadata(None, Some(Duration::from_secs(10)))?;
272            let topic = metadata
273                .topics()
274                .iter()
275                .find(|t| t.name() == new_topic.name)
276                .ok_or_else(|| anyhow!("unable to fetch topic metadata after creation"))?;
277            // If the desired number of partitions is not "use the broker
278            // default", wait for the topic to have the correct number of
279            // partitions.
280            if new_topic.num_partitions != -1 {
281                let actual = i32::try_from(topic.partitions().len())?;
282                if actual != new_topic.num_partitions {
283                    bail!(
284                        "topic reports {actual} partitions, but expected {} partitions",
285                        new_topic.num_partitions
286                    );
287                }
288            }
289            Ok(false)
290        })
291        .await
292}
293
294/// Deletes a Kafka topic and waits for it to be reported absent in the broker metadata.
295///
296/// This function is a wrapper around [`AdminClient::delete_topics`] that attempts to ensure the
297/// topic deletion has propagated throughout the Kafka cluster before returning.
298///
299/// This function does not return successfully unless it can observe the metadata not containing
300/// the newly-created topic in a call to [`rdkafka::client::Client::fetch_metadata`]
301pub async fn delete_existing_topic<'a, C>(
302    client: &'a AdminClient<C>,
303    admin_opts: &AdminOptions,
304    topic: &'a str,
305) -> Result<(), DeleteTopicError>
306where
307    C: ClientContext,
308{
309    delete_topic_helper(client, admin_opts, topic, false).await
310}
311
312/// Like `delete_existing_topic` but allow topic to be already deleted
313pub async fn delete_topic<'a, C>(
314    client: &'a AdminClient<C>,
315    admin_opts: &AdminOptions,
316    topic: &'a str,
317) -> Result<(), DeleteTopicError>
318where
319    C: ClientContext,
320{
321    delete_topic_helper(client, admin_opts, topic, true).await
322}
323
324async fn delete_topic_helper<'a, C>(
325    client: &'a AdminClient<C>,
326    admin_opts: &AdminOptions,
327    topic: &'a str,
328    allow_missing: bool,
329) -> Result<(), DeleteTopicError>
330where
331    C: ClientContext,
332{
333    let res = client.delete_topics(&[topic], admin_opts).await?;
334    if res.len() != 1 {
335        return Err(DeleteTopicError::TopicCountMismatch(res.len()));
336    }
337    let already_missing = match res.into_element() {
338        Ok(_) => Ok(false),
339        Err((_, RDKafkaErrorCode::UnknownTopic)) if allow_missing => Ok(true),
340        Err((_, e)) => Err(DeleteTopicError::Kafka(KafkaError::AdminOp(e))),
341    }?;
342
343    // We don't need to read in metadata / do any validation if the topic already exists.
344    if already_missing {
345        return Ok(());
346    }
347
348    // Topic deletion is asynchronous, and if we don't wait for it to complete,
349    // we might produce a message (below) that causes it to get automatically
350    // created with the default number partitions, and not the number of
351    // partitions requested in `new_topic`.
352    Retry::default()
353        .max_duration(Duration::from_secs(30))
354        .retry_async(|_| async {
355            let metadata = client
356                .inner()
357                // N.B. It is extremely important not to ask specifically
358                // about the topic here, even though the API supports it!
359                // Asking about the topic will create it automatically...
360                // with the wrong number of partitions. Yes, this is
361                // unbelievably horrible.
362                .fetch_metadata(None, Some(Duration::from_secs(10)))?;
363            let topic_exists = metadata.topics().iter().any(|t| t.name() == topic);
364            if topic_exists {
365                Err(DeleteTopicError::TopicRessurected)
366            } else {
367                Ok(())
368            }
369        })
370        .await
371}
372
373/// An error while creating a Kafka topic.
374#[derive(Debug, thiserror::Error)]
375pub enum DeleteTopicError {
376    /// An error from the underlying Kafka library.
377    #[error(transparent)]
378    Kafka(#[from] KafkaError),
379    /// Topic creation returned the wrong number of results.
380    #[error("kafka topic creation returned {0} results, but exactly one result was expected")]
381    TopicCountMismatch(usize),
382    /// The topic remained in metadata after being deleted.
383    #[error("topic was recreated after being deleted")]
384    TopicRessurected,
385}