1use std::collections::BTreeMap;
13use std::sync::Arc;
14
15use mz_kafka_util::client::DEFAULT_TOPIC_METADATA_REFRESH_INTERVAL;
16use mz_ore::task;
17use mz_sql_parser::ast::display::AstDisplay;
18use mz_sql_parser::ast::{
19 Expr, KafkaSinkConfigOption, KafkaSinkConfigOptionName, KafkaSourceConfigOption,
20 KafkaSourceConfigOptionName,
21};
22use mz_storage_types::sinks::KafkaSinkCompressionType;
23use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
24use rdkafka::{Offset, TopicPartitionList};
25use tokio::time::Duration;
26
27use crate::ast::Value;
28use crate::catalog::SessionCatalog;
29use crate::names::Aug;
30use crate::normalize::generate_extracted_config;
31use crate::plan::PlanError;
32use crate::plan::with_options::{ImpliedValue, TryFromValue};
33
34generate_extracted_config!(
35 KafkaSourceConfigOption,
36 (GroupIdPrefix, String),
37 (Topic, String),
38 (
39 TopicMetadataRefreshInterval,
40 Duration,
41 Default(DEFAULT_TOPIC_METADATA_REFRESH_INTERVAL)
42 ),
43 (StartTimestamp, i64),
44 (StartOffset, Vec<i64>)
45);
46
47generate_extracted_config!(
48 KafkaSinkConfigOption,
49 (
50 CompressionType,
51 KafkaSinkCompressionType,
52 Default(KafkaSinkCompressionType::Lz4)
53 ),
54 (PartitionBy, Expr<Aug>),
55 (ProgressGroupIdPrefix, String),
56 (TransactionalIdPrefix, String),
57 (LegacyIds, bool),
58 (Topic, String),
59 (TopicConfig, BTreeMap<String, String>),
60 (
61 TopicMetadataRefreshInterval,
62 Duration,
63 Default(DEFAULT_TOPIC_METADATA_REFRESH_INTERVAL)
64 ),
65 (TopicPartitionCount, i32),
66 (TopicReplicationFactor, i32)
67);
68
69impl TryFromValue<Value> for KafkaSinkCompressionType {
70 fn try_from_value(v: Value) -> Result<Self, PlanError> {
71 match v {
72 Value::String(v) => match v.to_lowercase().as_str() {
73 "none" => Ok(KafkaSinkCompressionType::None),
74 "gzip" => Ok(KafkaSinkCompressionType::Gzip),
75 "snappy" => Ok(KafkaSinkCompressionType::Snappy),
76 "lz4" => Ok(KafkaSinkCompressionType::Lz4),
77 "zstd" => Ok(KafkaSinkCompressionType::Zstd),
78 _ => sql_bail!("{}", v),
81 },
82 _ => sql_bail!("compression type must be a string"),
83 }
84 }
85
86 fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option<Value> {
87 Some(Value::String(match self {
88 KafkaSinkCompressionType::None => "none".to_string(),
89 KafkaSinkCompressionType::Gzip => "gzip".to_string(),
90 KafkaSinkCompressionType::Snappy => "snappy".to_string(),
91 KafkaSinkCompressionType::Lz4 => "lz4".to_string(),
92 KafkaSinkCompressionType::Zstd => "zstd".to_string(),
93 }))
94 }
95
96 fn name() -> String {
97 "Kafka sink compression type".to_string()
98 }
99}
100
101impl ImpliedValue for KafkaSinkCompressionType {
102 fn implied_value() -> Result<Self, PlanError> {
103 sql_bail!("must provide a compression type value")
104 }
105}
106
107pub async fn lookup_start_offsets<C>(
120 consumer: Arc<BaseConsumer<C>>,
121 topic: &str,
122 time_offset: i64,
123 now: u64,
124 fetch_metadata_timeout: Duration,
125) -> Result<Vec<i64>, PlanError>
126where
127 C: ConsumerContext + 'static,
128{
129 let time_offset = if time_offset < 0 {
130 let now: i64 = now.try_into()?;
131 let ts = now - time_offset.abs();
132
133 if ts <= 0 {
134 sql_bail!("Relative START TIMESTAMP must be smaller than current system timestamp")
135 }
136 ts
137 } else {
138 time_offset
139 };
140
141 task::spawn_blocking(|| format!("kafka_lookup_start_offsets:{topic}"), {
144 let topic = topic.to_string();
145 move || {
146 let num_partitions = mz_kafka_util::client::get_partitions(
148 consumer.as_ref().client(),
149 &topic,
150 fetch_metadata_timeout,
151 )
152 .map_err(|e| sql_err!("{}", e))?
153 .len();
154
155 let num_partitions_i32 = i32::try_from(num_partitions)
156 .map_err(|_| sql_err!("kafka topic had more than {} partitions", i32::MAX))?;
157
158 let mut tpl = TopicPartitionList::with_capacity(1);
159 tpl.add_partition_range(&topic, 0, num_partitions_i32 - 1);
160 tpl.set_all_offsets(Offset::Offset(time_offset))
161 .map_err(|e| sql_err!("{}", e))?;
162
163 let offsets_for_times = consumer
164 .offsets_for_times(tpl, Duration::from_secs(10))
165 .map_err(|e| sql_err!("{}", e))?;
166
167 let start_offsets = offsets_for_times
169 .elements()
170 .iter()
171 .map(|elem| match elem.offset() {
172 Offset::Offset(offset) => Ok(offset),
173 Offset::End => fetch_end_offset(&consumer, &topic, elem.partition()),
174 _ => sql_bail!(
175 "Unexpected offset {:?} for partition {}",
176 elem.offset(),
177 elem.partition()
178 ),
179 })
180 .collect::<Result<Vec<_>, _>>()?;
181
182 if start_offsets.len() != num_partitions {
183 sql_bail!(
184 "Expected offsets for {} partitions, but received {}",
185 num_partitions,
186 start_offsets.len(),
187 );
188 }
189
190 Ok(start_offsets)
191 }
192 })
193 .await
194 .map_err(|e| sql_err!("{}", e))?
195}
196
197fn fetch_end_offset<C>(consumer: &BaseConsumer<C>, topic: &str, pid: i32) -> Result<i64, PlanError>
202where
203 C: ConsumerContext,
204{
205 let (_low, high) = consumer
206 .fetch_watermarks(topic, pid, Duration::from_secs(10))
207 .map_err(|e| sql_err!("{}", e))?;
208 Ok(high)
209}
210
211pub async fn validate_start_offsets<C>(
215 consumer: Arc<BaseConsumer<C>>,
216 topic: &str,
217 start_offsets: Vec<i64>,
218 fetch_metadata_timeout: Duration,
219) -> Result<(), PlanError>
220where
221 C: ConsumerContext + 'static,
222{
223 task::spawn_blocking(|| format!("kafka_validate_start_offsets:{topic}"), {
225 let topic = topic.to_string();
226 move || {
227 let num_partitions = mz_kafka_util::client::get_partitions(
228 consumer.as_ref().client(),
229 &topic,
230 fetch_metadata_timeout,
231 )
232 .map_err(|e| sql_err!("{}", e))?
233 .len();
234 if start_offsets.len() > num_partitions {
235 sql_bail!(
236 "START OFFSET specified more partitions ({}) than topic ({}) contains ({})",
237 start_offsets.len(),
238 topic,
239 num_partitions
240 )
241 }
242 Ok(())
243 }
244 })
245 .await
246 .map_err(|e| sql_err!("{}", e))?
247}
248
249pub async fn ensure_topic_exists<C>(
251 consumer: Arc<BaseConsumer<C>>,
252 topic: &str,
253 fetch_metadata_timeout: Duration,
254) -> Result<(), PlanError>
255where
256 C: ConsumerContext + 'static,
257{
258 task::spawn_blocking(|| format!("kafka_ensure_topic_exists:{topic}"), {
259 let topic = topic.to_string();
260 move || {
261 mz_kafka_util::client::get_partitions(
262 consumer.as_ref().client(),
263 &topic,
264 fetch_metadata_timeout,
265 )
266 .map_err(|e| sql_err!("{}", e))?;
267 Ok(())
268 }
269 })
270 .await
271 .map_err(|e| sql_err!("{}", e))?
272}