mz_sql/
kafka_util.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Provides parsing and convenience functions for working with Kafka from the `sql` package.
11
12use std::collections::BTreeMap;
13use std::sync::Arc;
14
15use mz_kafka_util::client::DEFAULT_TOPIC_METADATA_REFRESH_INTERVAL;
16use mz_ore::task;
17use mz_sql_parser::ast::display::AstDisplay;
18use mz_sql_parser::ast::{
19    Expr, KafkaSinkConfigOption, KafkaSinkConfigOptionName, KafkaSourceConfigOption,
20    KafkaSourceConfigOptionName,
21};
22use mz_storage_types::sinks::KafkaSinkCompressionType;
23use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
24use rdkafka::{Offset, TopicPartitionList};
25use tokio::time::Duration;
26
27use crate::ast::Value;
28use crate::catalog::SessionCatalog;
29use crate::names::Aug;
30use crate::normalize::generate_extracted_config;
31use crate::plan::PlanError;
32use crate::plan::with_options::{ImpliedValue, TryFromValue};
33
34generate_extracted_config!(
35    KafkaSourceConfigOption,
36    (GroupIdPrefix, String),
37    (Topic, String),
38    (
39        TopicMetadataRefreshInterval,
40        Duration,
41        Default(DEFAULT_TOPIC_METADATA_REFRESH_INTERVAL)
42    ),
43    (StartTimestamp, i64),
44    (StartOffset, Vec<i64>)
45);
46
47generate_extracted_config!(
48    KafkaSinkConfigOption,
49    (
50        CompressionType,
51        KafkaSinkCompressionType,
52        Default(KafkaSinkCompressionType::Lz4)
53    ),
54    (PartitionBy, Expr<Aug>),
55    (ProgressGroupIdPrefix, String),
56    (TransactionalIdPrefix, String),
57    (LegacyIds, bool),
58    (Topic, String),
59    (TopicConfig, BTreeMap<String, String>),
60    (
61        TopicMetadataRefreshInterval,
62        Duration,
63        Default(DEFAULT_TOPIC_METADATA_REFRESH_INTERVAL)
64    ),
65    (TopicPartitionCount, i32),
66    (TopicReplicationFactor, i32)
67);
68
69impl TryFromValue<Value> for KafkaSinkCompressionType {
70    fn try_from_value(v: Value) -> Result<Self, PlanError> {
71        match v {
72            Value::String(v) => match v.to_lowercase().as_str() {
73                "none" => Ok(KafkaSinkCompressionType::None),
74                "gzip" => Ok(KafkaSinkCompressionType::Gzip),
75                "snappy" => Ok(KafkaSinkCompressionType::Snappy),
76                "lz4" => Ok(KafkaSinkCompressionType::Lz4),
77                "zstd" => Ok(KafkaSinkCompressionType::Zstd),
78                // The caller will add context, resulting in an error like
79                // "invalid COMPRESSION TYPE: <bad-compression-type>".
80                _ => sql_bail!("{}", v),
81            },
82            _ => sql_bail!("compression type must be a string"),
83        }
84    }
85
86    fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
87        Some(Value::String(match self {
88            KafkaSinkCompressionType::None => "none".to_string(),
89            KafkaSinkCompressionType::Gzip => "gzip".to_string(),
90            KafkaSinkCompressionType::Snappy => "snappy".to_string(),
91            KafkaSinkCompressionType::Lz4 => "lz4".to_string(),
92            KafkaSinkCompressionType::Zstd => "zstd".to_string(),
93        }))
94    }
95
96    fn name() -> String {
97        "Kafka sink compression type".to_string()
98    }
99}
100
101impl ImpliedValue for KafkaSinkCompressionType {
102    fn implied_value() -> Result<Self, PlanError> {
103        sql_bail!("must provide a compression type value")
104    }
105}
106
107/// Returns start offsets for the partitions of `topic` and the provided
108/// `START TIMESTAMP` option.
109///
110/// For each partition, the returned offset is the earliest offset whose
111/// timestamp is greater than or equal to the given timestamp for the
112/// partition. If no such message exists (or the Kafka broker is before
113/// 0.10.0), the current end offset is returned for the partition.
114///
115/// The provided `START TIMESTAMP` option must be a non-zero number:
116/// * Non-negative numbers will used as is (e.g. `1622659034343`)
117/// * Negative numbers will be translated to a timestamp in millis
118///   before now (e.g. `-10` means 10 millis ago)
119pub async fn lookup_start_offsets<C>(
120    consumer: Arc<BaseConsumer<C>>,
121    topic: &str,
122    time_offset: i64,
123    now: u64,
124    fetch_metadata_timeout: Duration,
125) -> Result<Vec<i64>, PlanError>
126where
127    C: ConsumerContext + 'static,
128{
129    let time_offset = if time_offset < 0 {
130        let now: i64 = now.try_into()?;
131        let ts = now - time_offset.abs();
132
133        if ts <= 0 {
134            sql_bail!("Relative START TIMESTAMP must be smaller than current system timestamp")
135        }
136        ts
137    } else {
138        time_offset
139    };
140
141    // Lookup offsets
142    // TODO(guswynn): see if we can add broker to this name
143    task::spawn_blocking(|| format!("kafka_lookup_start_offsets:{topic}"), {
144        let topic = topic.to_string();
145        move || {
146            // There cannot be more than i32 partitions
147            let num_partitions = mz_kafka_util::client::get_partitions(
148                consumer.as_ref().client(),
149                &topic,
150                fetch_metadata_timeout,
151            )
152            .map_err(|e| sql_err!("{}", e))?
153            .len();
154
155            let num_partitions_i32 = i32::try_from(num_partitions)
156                .map_err(|_| sql_err!("kafka topic had more than {} partitions", i32::MAX))?;
157
158            let mut tpl = TopicPartitionList::with_capacity(1);
159            tpl.add_partition_range(&topic, 0, num_partitions_i32 - 1);
160            tpl.set_all_offsets(Offset::Offset(time_offset))
161                .map_err(|e| sql_err!("{}", e))?;
162
163            let offsets_for_times = consumer
164                .offsets_for_times(tpl, Duration::from_secs(10))
165                .map_err(|e| sql_err!("{}", e))?;
166
167            // Translate to `start_offsets`
168            let start_offsets = offsets_for_times
169                .elements()
170                .iter()
171                .map(|elem| match elem.offset() {
172                    Offset::Offset(offset) => Ok(offset),
173                    Offset::End => fetch_end_offset(&consumer, &topic, elem.partition()),
174                    _ => sql_bail!(
175                        "Unexpected offset {:?} for partition {}",
176                        elem.offset(),
177                        elem.partition()
178                    ),
179                })
180                .collect::<Result<Vec<_>, _>>()?;
181
182            if start_offsets.len() != num_partitions {
183                sql_bail!(
184                    "Expected offsets for {} partitions, but received {}",
185                    num_partitions,
186                    start_offsets.len(),
187                );
188            }
189
190            Ok(start_offsets)
191        }
192    })
193    .await
194    .map_err(|e| sql_err!("{}", e))?
195}
196
197// Kafka supports bulk lookup of watermarks, but it is not exposed in rdkafka.
198// If that ever changes, we will want to first collect all pids that have no
199// offset for a given timestamp and then do a single request (instead of doing
200// a request for each partition individually).
201fn fetch_end_offset<C>(consumer: &BaseConsumer<C>, topic: &str, pid: i32) -> Result<i64, PlanError>
202where
203    C: ConsumerContext,
204{
205    let (_low, high) = consumer
206        .fetch_watermarks(topic, pid, Duration::from_secs(10))
207        .map_err(|e| sql_err!("{}", e))?;
208    Ok(high)
209}
210
211/// Validates that the provided start offsets are valid for the specified topic.
212/// At present, the validation is merely that there are not more start offsets
213/// than parts in the topic.
214pub async fn validate_start_offsets<C>(
215    consumer: Arc<BaseConsumer<C>>,
216    topic: &str,
217    start_offsets: Vec<i64>,
218    fetch_metadata_timeout: Duration,
219) -> Result<(), PlanError>
220where
221    C: ConsumerContext + 'static,
222{
223    // TODO(guswynn): see if we can add broker to this name
224    task::spawn_blocking(|| format!("kafka_validate_start_offsets:{topic}"), {
225        let topic = topic.to_string();
226        move || {
227            let num_partitions = mz_kafka_util::client::get_partitions(
228                consumer.as_ref().client(),
229                &topic,
230                fetch_metadata_timeout,
231            )
232            .map_err(|e| sql_err!("{}", e))?
233            .len();
234            if start_offsets.len() > num_partitions {
235                sql_bail!(
236                    "START OFFSET specified more partitions ({}) than topic ({}) contains ({})",
237                    start_offsets.len(),
238                    topic,
239                    num_partitions
240                )
241            }
242            Ok(())
243        }
244    })
245    .await
246    .map_err(|e| sql_err!("{}", e))?
247}
248
249/// Validates that we can connect to the broker and obtain metadata about the topic.
250pub async fn ensure_topic_exists<C>(
251    consumer: Arc<BaseConsumer<C>>,
252    topic: &str,
253    fetch_metadata_timeout: Duration,
254) -> Result<(), PlanError>
255where
256    C: ConsumerContext + 'static,
257{
258    task::spawn_blocking(|| format!("kafka_ensure_topic_exists:{topic}"), {
259        let topic = topic.to_string();
260        move || {
261            mz_kafka_util::client::get_partitions(
262                consumer.as_ref().client(),
263                &topic,
264                fetch_metadata_timeout,
265            )
266            .map_err(|e| sql_err!("{}", e))?;
267            Ok(())
268        }
269    })
270    .await
271    .map_err(|e| sql_err!("{}", e))?
272}