mz_storage_client/
sink.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::time::Duration;
12
13use anyhow::{Context, anyhow, bail};
14use mz_ccsr::GetSubjectConfigError;
15use mz_kafka_util::admin::EnsureTopicConfig;
16use mz_kafka_util::client::MzClientContext;
17use mz_ore::collections::CollectionExt;
18use mz_ore::future::{InTask, OreFutureExt};
19use mz_storage_types::configuration::StorageConfiguration;
20use mz_storage_types::connections::KafkaTopicOptions;
21use mz_storage_types::errors::ContextCreationErrorExt;
22use mz_storage_types::sinks::KafkaSinkConnection;
23use rdkafka::ClientContext;
24use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, ResourceSpecifier, TopicReplication};
25use tracing::warn;
26
27pub mod progress_key {
28    use std::fmt;
29
30    use mz_repr::GlobalId;
31    use rdkafka::message::ToBytes;
32
33    /// A key identifying a given sink within a progress topic.
34    #[derive(Debug, Clone)]
35    pub struct ProgressKey(String);
36
37    impl ProgressKey {
38        /// Constructs a progress key for the sink with the specified ID.
39        pub fn new(sink_id: GlobalId) -> ProgressKey {
40            ProgressKey(format!("mz-sink-{sink_id}"))
41        }
42    }
43
44    impl ToBytes for ProgressKey {
45        fn to_bytes(&self) -> &[u8] {
46            self.0.as_bytes()
47        }
48    }
49
50    impl fmt::Display for ProgressKey {
51        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
52            self.0.fmt(f)
53        }
54    }
55}
56
57struct TopicConfigs {
58    partition_count: i32,
59    replication_factor: i32,
60}
61
62async fn discover_topic_configs<C: ClientContext>(
63    client: &AdminClient<C>,
64    topic: &str,
65    fetch_timeout: Duration,
66) -> Result<TopicConfigs, anyhow::Error> {
67    let mut partition_count = -1;
68    let mut replication_factor = -1;
69
70    let metadata = client
71        .inner()
72        .fetch_metadata(None, fetch_timeout)
73        .with_context(|| {
74            format!(
75                "error fetching metadata when creating new topic {} for sink",
76                topic
77            )
78        })?;
79
80    if metadata.brokers().len() == 0 {
81        Err(anyhow!("zero brokers discovered in metadata request"))?;
82    }
83
84    let broker = metadata.brokers()[0].id();
85    let configs = client
86        .describe_configs(
87            &[ResourceSpecifier::Broker(broker)],
88            &AdminOptions::new().request_timeout(Some(Duration::from_secs(5))),
89        )
90        .await
91        .with_context(|| {
92            format!(
93                "error fetching configuration from broker {} when creating new topic {} for sink",
94                broker, topic
95            )
96        })?;
97
98    if configs.len() != 1 {
99        Err(anyhow!(
100            "error creating topic {} for sink: broker {} returned {} config results, but one was expected",
101            topic,
102            broker,
103            configs.len()
104        ))?;
105    }
106
107    let config = configs.into_element().map_err(|e| {
108        anyhow!(
109            "error reading broker configuration when creating topic {} for sink: {}",
110            topic,
111            e
112        )
113    })?;
114
115    if config.entries.is_empty() {
116        bail!("read empty cluster configuration; do we have DescribeConfigs permissions?")
117    }
118
119    for entry in config.entries {
120        if entry.name == "num.partitions" && partition_count == -1 {
121            if let Some(s) = entry.value {
122                partition_count = s.parse::<i32>().with_context(|| {
123                    format!(
124                        "default partition count {} cannot be parsed into an integer",
125                        s
126                    )
127                })?;
128            }
129        } else if entry.name == "default.replication.factor" && replication_factor == -1 {
130            if let Some(s) = entry.value {
131                replication_factor = s.parse::<i32>().with_context(|| {
132                    format!(
133                        "default replication factor {} cannot be parsed into an integer",
134                        s
135                    )
136                })?;
137            }
138        }
139    }
140
141    Ok(TopicConfigs {
142        partition_count,
143        replication_factor,
144    })
145}
146
147/// Ensures that the named Kafka topic exists.
148///
149/// If the topic does not exist, the function creates the topic with the
150/// provided `config`. Note that if the topic already exists, the function does
151/// *not* verify that the topic's configuration matches `config`.
152///
153/// Returns a boolean indicating whether the topic already existed.
154pub async fn ensure_kafka_topic(
155    connection: &KafkaSinkConnection,
156    storage_configuration: &StorageConfiguration,
157    topic: &str,
158    KafkaTopicOptions {
159        partition_count,
160        replication_factor,
161        topic_config,
162    }: &KafkaTopicOptions,
163    ensure_topic_config: EnsureTopicConfig,
164) -> Result<bool, anyhow::Error> {
165    let client: AdminClient<_> = connection
166        .connection
167        .create_with_context(
168            storage_configuration,
169            MzClientContext::default(),
170            &BTreeMap::new(),
171            // Only called from `mz_storage`.
172            InTask::Yes,
173        )
174        .await
175        .add_context("creating admin client failed")?;
176    let mut partition_count = partition_count.map(|f| *f);
177    let mut replication_factor = replication_factor.map(|f| *f);
178    // If either partition count or replication factor should be defaulted to the broker's config
179    // (signaled by a value of None), explicitly poll the broker to discover the defaults.
180    // Newer versions of Kafka can instead send create topic requests with -1 and have this happen
181    // behind the scenes, but this is unsupported and will result in errors on pre-2.4 Kafka.
182    if partition_count.is_none() || replication_factor.is_none() {
183        let fetch_timeout = storage_configuration
184            .parameters
185            .kafka_timeout_config
186            .fetch_metadata_timeout;
187        match discover_topic_configs(&client, topic, fetch_timeout).await {
188            Ok(configs) => {
189                if partition_count.is_none() {
190                    partition_count = Some(configs.partition_count);
191                }
192                if replication_factor.is_none() {
193                    replication_factor = Some(configs.replication_factor);
194                }
195            }
196            Err(e) => {
197                // Recent versions of Kafka can handle an explicit -1 config, so use this instead
198                // and the request will probably still succeed. Logging anyways for visibility.
199                warn!("Failed to discover default values for topic configs: {e}");
200                if partition_count.is_none() {
201                    partition_count = Some(-1);
202                }
203                if replication_factor.is_none() {
204                    replication_factor = Some(-1);
205                }
206            }
207        };
208    }
209
210    let mut kafka_topic = NewTopic::new(
211        topic,
212        partition_count.expect("always set above"),
213        TopicReplication::Fixed(replication_factor.expect("always set above")),
214    );
215
216    for (key, value) in topic_config {
217        kafka_topic = kafka_topic.set(key, value);
218    }
219
220    mz_kafka_util::admin::ensure_topic(
221        &client,
222        &AdminOptions::new().request_timeout(Some(Duration::from_secs(5))),
223        &kafka_topic,
224        ensure_topic_config,
225    )
226    .await
227    .with_context(|| format!("Error creating topic {} for sink", topic))
228}
229
230/// Publish a schema for a given subject, and set
231/// compatibility levels for the schema if applicable.
232///
233/// TODO(benesch): do we need to delete the Kafka topic if publishing the
234/// schema fails?
235pub async fn publish_kafka_schema(
236    ccsr: mz_ccsr::Client,
237    subject: String,
238    schema: String,
239    schema_type: mz_ccsr::SchemaType,
240    compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
241) -> Result<i32, anyhow::Error> {
242    if let Some(compatibility_level) = compatibility_level {
243        let ccsr = ccsr.clone();
244        let subject = subject.clone();
245        async move {
246            // Only update the compatibility level if it's not already set to something.
247            match ccsr.get_subject_config(&subject).await {
248                Ok(config) => {
249                    if config.compatibility_level != compatibility_level {
250                        tracing::debug!(
251                            "compatibility level '{}' does not match intended '{}'",
252                            config.compatibility_level,
253                            compatibility_level
254                        );
255                    }
256                    Ok(())
257                }
258                Err(GetSubjectConfigError::SubjectCompatibilityLevelNotSet)
259                | Err(GetSubjectConfigError::SubjectNotFound) => ccsr
260                    .set_subject_compatibility_level(&subject, compatibility_level)
261                    .await
262                    .map_err(anyhow::Error::from),
263                Err(e) => Err(e.into()),
264            }
265        }
266        .run_in_task(|| "set_compatibility_level".to_string())
267        .await
268        .context("unable to update schema compatibility level in kafka sink")?;
269    }
270
271    let schema_id = async move {
272        ccsr.publish_schema(&subject, &schema, schema_type, &[])
273            .await
274    }
275    .run_in_task(|| "publish_kafka_schema".to_string())
276    .await
277    .context("unable to publish schema to registry in kafka sink")?;
278
279    Ok(schema_id)
280}