1use std::collections::BTreeMap;
13use std::fmt;
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use dec::OrderedDecimal;
18use mz_dyncfg::ConfigSet;
19use mz_kafka_util::client::MzClientContext;
20use mz_ore::collections::CollectionExt;
21use mz_ore::future::InTask;
22use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
23use mz_repr::adt::numeric::Numeric;
24use mz_repr::{CatalogItemId, ColumnType, Datum, GlobalId, RelationDesc, Row, ScalarType};
25use mz_timely_util::order::{Extrema, Partitioned};
26use proptest::prelude::any;
27use proptest_derive::Arbitrary;
28use rdkafka::admin::AdminClient;
29use serde::{Deserialize, Serialize};
30use timely::progress::Antichain;
31
32use crate::connections::inline::{
33 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
34 ReferencedConnection,
35};
36use crate::connections::{ConnectionContext, KafkaConnection};
37use crate::controller::AlterError;
38use crate::sources::{MzOffset, SourceConnection, SourceTimestamp};
39
40use super::SourceExportDetails;
41
42include!(concat!(
43 env!("OUT_DIR"),
44 "/mz_storage_types.sources.kafka.rs"
45));
46
47pub type KafkaTimestamp = Partitioned<RangeBound<i32>, MzOffset>;
50
51#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
52pub struct KafkaSourceConnection<C: ConnectionAccess = InlinedConnection> {
53 pub connection: C::Kafka,
54 pub connection_id: CatalogItemId,
55 pub topic: String,
56 #[proptest(strategy = "proptest::collection::btree_map(any::<i32>(), any::<i64>(), 0..4)")]
58 pub start_offsets: BTreeMap<i32, i64>,
59 pub group_id_prefix: Option<String>,
60 #[proptest(strategy = "proptest::collection::vec(any::<(String, KafkaMetadataKind)>(), 0..4)")]
64 pub metadata_columns: Vec<(String, KafkaMetadataKind)>,
65 pub topic_metadata_refresh_interval: Duration,
66}
67
68impl<R: ConnectionResolver> IntoInlineConnection<KafkaSourceConnection, R>
69 for KafkaSourceConnection<ReferencedConnection>
70{
71 fn into_inline_connection(self, r: R) -> KafkaSourceConnection {
72 let KafkaSourceConnection {
73 connection,
74 connection_id,
75 topic,
76 start_offsets,
77 group_id_prefix,
78 metadata_columns,
79 topic_metadata_refresh_interval,
80 } = self;
81 KafkaSourceConnection {
82 connection: r.resolve_connection(connection).unwrap_kafka(),
83 connection_id,
84 topic,
85 start_offsets,
86 group_id_prefix,
87 metadata_columns,
88 topic_metadata_refresh_interval,
89 }
90 }
91}
92
93pub static KAFKA_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
94 RelationDesc::builder()
95 .with_column(
96 "partition",
97 ScalarType::Range {
98 element_type: Box::new(ScalarType::Numeric { max_scale: None }),
99 }
100 .nullable(false),
101 )
102 .with_column("offset", ScalarType::UInt64.nullable(true))
103 .finish()
104});
105
106impl KafkaSourceConnection {
107 pub fn client_id(
112 &self,
113 configs: &ConfigSet,
114 connection_context: &ConnectionContext,
115 source_id: GlobalId,
116 ) -> String {
117 let mut client_id =
118 KafkaConnection::id_base(connection_context, self.connection_id, source_id);
119 self.connection.enrich_client_id(configs, &mut client_id);
120 client_id
121 }
122}
123
124impl<C: ConnectionAccess> KafkaSourceConnection<C> {
125 pub fn group_id(&self, connection_context: &ConnectionContext, source_id: GlobalId) -> String {
130 format!(
131 "{}{}",
132 self.group_id_prefix.as_deref().unwrap_or(""),
133 KafkaConnection::id_base(connection_context, self.connection_id, source_id),
134 )
135 }
136}
137
138impl KafkaSourceConnection {
139 pub async fn fetch_write_frontier(
140 self,
141 storage_configuration: &crate::configuration::StorageConfiguration,
142 ) -> Result<timely::progress::Antichain<KafkaTimestamp>, anyhow::Error> {
143 let (context, _error_rx) = MzClientContext::with_errors();
144 let client: AdminClient<_> = self
145 .connection
146 .create_with_context(storage_configuration, context, &BTreeMap::new(), InTask::No)
147 .await?;
148
149 let metadata_timeout = storage_configuration
150 .parameters
151 .kafka_timeout_config
152 .fetch_metadata_timeout;
153
154 mz_ore::task::spawn_blocking(|| "kafka_fetch_write_frontier_fetch_metadata", {
155 move || {
156 let meta = client
157 .inner()
158 .fetch_metadata(Some(&self.topic), metadata_timeout)?;
159
160 let pids = meta
161 .topics()
162 .into_element()
163 .partitions()
164 .iter()
165 .map(|p| p.id());
166
167 let mut current_upper = Antichain::new();
168 let mut max_pid = 0;
169 for pid in pids {
170 let (_, high) =
171 client
172 .inner()
173 .fetch_watermarks(&self.topic, pid, metadata_timeout)?;
174 max_pid = std::cmp::max(pid, max_pid);
175 current_upper.insert(Partitioned::new_singleton(
176 RangeBound::Elem(pid, BoundKind::At),
177 MzOffset::from(u64::try_from(high).unwrap()),
178 ));
179 }
180 current_upper.insert(Partitioned::new_range(
181 RangeBound::Elem(max_pid, BoundKind::After),
182 RangeBound::PosInfinity,
183 MzOffset::from(0),
184 ));
185
186 Ok(current_upper)
187 }
188 })
189 .await?
190 }
191}
192
193impl<C: ConnectionAccess> SourceConnection for KafkaSourceConnection<C> {
194 fn name(&self) -> &'static str {
195 "kafka"
196 }
197
198 fn external_reference(&self) -> Option<&str> {
199 Some(self.topic.as_str())
200 }
201
202 fn default_key_desc(&self) -> RelationDesc {
203 RelationDesc::builder()
204 .with_column("key", ScalarType::Bytes.nullable(true))
205 .finish()
206 }
207
208 fn default_value_desc(&self) -> RelationDesc {
209 RelationDesc::builder()
210 .with_column("value", ScalarType::Bytes.nullable(true))
211 .finish()
212 }
213
214 fn timestamp_desc(&self) -> RelationDesc {
215 KAFKA_PROGRESS_DESC.clone()
216 }
217
218 fn connection_id(&self) -> Option<CatalogItemId> {
219 Some(self.connection_id)
220 }
221
222 fn primary_export_details(&self) -> SourceExportDetails {
223 SourceExportDetails::Kafka(KafkaSourceExportDetails {
224 metadata_columns: self.metadata_columns.clone(),
225 })
226 }
227
228 fn supports_read_only(&self) -> bool {
229 true
230 }
231
232 fn prefers_single_replica(&self) -> bool {
233 false
234 }
235}
236
237impl<C: ConnectionAccess> crate::AlterCompatible for KafkaSourceConnection<C> {
238 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
239 if self == other {
240 return Ok(());
241 }
242
243 let KafkaSourceConnection {
244 connection,
245 connection_id,
246 topic,
247 start_offsets,
248 group_id_prefix,
249 metadata_columns,
250 topic_metadata_refresh_interval,
251 } = self;
252
253 let compatibility_checks = [
254 (
255 connection.alter_compatible(id, &other.connection).is_ok(),
256 "connection",
257 ),
258 (connection_id == &other.connection_id, "connection_id"),
259 (topic == &other.topic, "topic"),
260 (start_offsets == &other.start_offsets, "start_offsets"),
261 (group_id_prefix == &other.group_id_prefix, "group_id_prefix"),
262 (
263 metadata_columns == &other.metadata_columns,
264 "metadata_columns",
265 ),
266 (
267 topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
268 "topic_metadata_refresh_interval",
269 ),
270 ];
271
272 for (compatible, field) in compatibility_checks {
273 if !compatible {
274 tracing::warn!(
275 "KafkaSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
276 self,
277 other
278 );
279
280 return Err(AlterError { id });
281 }
282 }
283
284 Ok(())
285 }
286}
287
288impl RustType<ProtoKafkaSourceConnection> for KafkaSourceConnection<InlinedConnection> {
289 fn into_proto(&self) -> ProtoKafkaSourceConnection {
290 ProtoKafkaSourceConnection {
291 connection: Some(self.connection.into_proto()),
292 connection_id: Some(self.connection_id.into_proto()),
293 topic: self.topic.clone(),
294 start_offsets: self.start_offsets.clone(),
295 group_id_prefix: self.group_id_prefix.clone(),
296 metadata_columns: self
297 .metadata_columns
298 .iter()
299 .map(|(name, kind)| ProtoKafkaMetadataColumn {
300 name: name.into_proto(),
301 kind: Some(kind.into_proto()),
302 })
303 .collect(),
304 topic_metadata_refresh_interval: Some(
305 self.topic_metadata_refresh_interval.into_proto(),
306 ),
307 }
308 }
309
310 fn from_proto(proto: ProtoKafkaSourceConnection) -> Result<Self, TryFromProtoError> {
311 let mut metadata_columns = Vec::with_capacity(proto.metadata_columns.len());
312 for c in proto.metadata_columns {
313 let kind = c.kind.into_rust_if_some("ProtoKafkaMetadataColumn::kind")?;
314 metadata_columns.push((c.name, kind));
315 }
316
317 Ok(KafkaSourceConnection {
318 connection: proto
319 .connection
320 .into_rust_if_some("ProtoKafkaSourceConnection::connection")?,
321 connection_id: proto
322 .connection_id
323 .into_rust_if_some("ProtoKafkaSourceConnection::connection_id")?,
324 topic: proto.topic,
325 start_offsets: proto.start_offsets,
326 group_id_prefix: proto.group_id_prefix,
327 metadata_columns,
328 topic_metadata_refresh_interval: proto
329 .topic_metadata_refresh_interval
330 .into_rust_if_some("ProtoKafkaSourceConnection::topic_metadata_refresh_interval")?,
331 })
332 }
333}
334
335pub fn kafka_metadata_columns_desc(
337 metadata_columns: &Vec<(String, KafkaMetadataKind)>,
338) -> Vec<(&str, ColumnType)> {
339 metadata_columns
340 .iter()
341 .map(|(name, kind)| {
342 let typ = match kind {
343 KafkaMetadataKind::Partition => ScalarType::Int32.nullable(false),
344 KafkaMetadataKind::Offset => ScalarType::UInt64.nullable(false),
345 KafkaMetadataKind::Timestamp => {
346 ScalarType::Timestamp { precision: None }.nullable(false)
347 }
348 KafkaMetadataKind::Header {
349 use_bytes: true, ..
350 } => ScalarType::Bytes.nullable(true),
351 KafkaMetadataKind::Header {
352 use_bytes: false, ..
353 } => ScalarType::String.nullable(true),
354 KafkaMetadataKind::Headers => ScalarType::List {
355 element_type: Box::new(ScalarType::Record {
356 fields: [
357 (
358 "key".into(),
359 ColumnType {
360 nullable: false,
361 scalar_type: ScalarType::String,
362 },
363 ),
364 (
365 "value".into(),
366 ColumnType {
367 nullable: true,
368 scalar_type: ScalarType::Bytes,
369 },
370 ),
371 ]
372 .into(),
373 custom_id: None,
374 }),
375 custom_id: None,
376 }
377 .nullable(false),
378 };
379 (&**name, typ)
380 })
381 .collect()
382}
383
384#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
386pub struct KafkaSourceExportDetails {
387 #[proptest(strategy = "proptest::collection::vec(any::<(String, KafkaMetadataKind)>(), 0..4)")]
388 pub metadata_columns: Vec<(String, KafkaMetadataKind)>,
389}
390
391impl crate::AlterCompatible for KafkaSourceExportDetails {
392 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
393 let Self { metadata_columns } = self;
394 let compatibility_checks = [(
395 metadata_columns == &other.metadata_columns,
396 "metadata_columns",
397 )];
398 for (compatible, field) in compatibility_checks {
399 if !compatible {
400 tracing::warn!(
401 "KafkaSourceExportDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
402 self,
403 other
404 );
405
406 return Err(AlterError { id });
407 }
408 }
409 Ok(())
410 }
411}
412
413impl RustType<ProtoKafkaSourceExportDetails> for KafkaSourceExportDetails {
414 fn into_proto(&self) -> ProtoKafkaSourceExportDetails {
415 ProtoKafkaSourceExportDetails {
416 metadata_columns: self
417 .metadata_columns
418 .iter()
419 .map(|(name, kind)| ProtoKafkaMetadataColumn {
420 name: name.into_proto(),
421 kind: Some(kind.into_proto()),
422 })
423 .collect(),
424 }
425 }
426
427 fn from_proto(proto: ProtoKafkaSourceExportDetails) -> Result<Self, TryFromProtoError> {
428 let mut metadata_columns = Vec::with_capacity(proto.metadata_columns.len());
429 for c in proto.metadata_columns {
430 let kind = c.kind.into_rust_if_some("ProtoKafkaMetadataColumn::kind")?;
431 metadata_columns.push((c.name, kind));
432 }
433
434 Ok(KafkaSourceExportDetails { metadata_columns })
435 }
436}
437
438#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
443pub enum RangeBound<P> {
444 NegInfinity,
446 Elem(P, BoundKind),
448 PosInfinity,
450}
451
452#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
453pub enum BoundKind {
454 Before,
456 At,
458 After,
460}
461
462impl<P: std::fmt::Debug> RangeBound<P> {
463 pub fn before(elem: P) -> Self {
465 Self::Elem(elem, BoundKind::Before)
466 }
467
468 pub fn exact(elem: P) -> Self {
470 Self::Elem(elem, BoundKind::At)
471 }
472
473 pub fn after(elem: P) -> Self {
475 Self::Elem(elem, BoundKind::After)
476 }
477
478 pub fn unwrap_exact(&self) -> &P {
484 match self {
485 RangeBound::Elem(p, BoundKind::At) => p,
486 _ => panic!("attempt to unwrap_exact {self:?}"),
487 }
488 }
489}
490
491impl<P: fmt::Display> fmt::Display for RangeBound<P> {
492 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493 match self {
494 Self::NegInfinity => f.write_str("-inf"),
495 Self::Elem(elem, BoundKind::Before) => write!(f, "<{elem}"),
496 Self::Elem(elem, BoundKind::At) => write!(f, "{elem}"),
497 Self::Elem(elem, BoundKind::After) => write!(f, "{elem}>"),
498 Self::PosInfinity => f.write_str("+inf"),
499 }
500 }
501}
502
503impl<P> Extrema for RangeBound<P> {
504 fn minimum() -> Self {
505 Self::NegInfinity
506 }
507 fn maximum() -> Self {
508 Self::PosInfinity
509 }
510}
511
512impl SourceTimestamp for KafkaTimestamp {
513 fn encode_row(&self) -> Row {
514 use mz_repr::adt::range;
515 let mut row = Row::with_capacity(2);
516 let mut packer = row.packer();
517
518 let to_numeric = |p: i32| Datum::from(OrderedDecimal(Numeric::from(p)));
519
520 let (lower, lower_inclusive) = match self.interval().lower {
521 RangeBound::NegInfinity => (Datum::Null, false),
522 RangeBound::Elem(pid, BoundKind::After) => (to_numeric(pid), false),
523 RangeBound::Elem(pid, BoundKind::At) => (to_numeric(pid), true),
524 lower => unreachable!("invalid lower bound {lower:?}"),
525 };
526 let (upper, upper_inclusive) = match self.interval().upper {
527 RangeBound::PosInfinity => (Datum::Null, false),
528 RangeBound::Elem(pid, BoundKind::Before) => (to_numeric(pid), false),
529 RangeBound::Elem(pid, BoundKind::At) => (to_numeric(pid), true),
530 upper => unreachable!("invalid upper bound {upper:?}"),
531 };
532 assert_eq!(lower_inclusive, upper_inclusive, "invalid range {self}");
533
534 packer
535 .push_range(range::Range::new(Some((
536 range::RangeBound::new(lower, lower_inclusive),
537 range::RangeBound::new(upper, upper_inclusive),
538 ))))
539 .expect("pushing range must not generate errors");
540
541 packer.push(Datum::UInt64(self.timestamp().offset));
542 row
543 }
544
545 fn decode_row(row: &Row) -> Self {
546 let mut datums = row.iter();
547
548 match (datums.next(), datums.next(), datums.next()) {
549 (Some(Datum::Range(range)), Some(Datum::UInt64(offset)), None) => {
550 let mut range = range.into_bounds(|b| b.datum());
551 range.canonicalize().expect("ranges must be valid");
553 let range = range.inner.expect("empty range");
554
555 let lower = range.lower.bound.map(|row| {
556 i32::try_from(row.unwrap_numeric().0)
557 .expect("only i32 values converted to ranges")
558 });
559 let upper = range.upper.bound.map(|row| {
560 i32::try_from(row.unwrap_numeric().0)
561 .expect("only i32 values converted to ranges")
562 });
563
564 match (range.lower.inclusive, range.upper.inclusive) {
565 (true, true) => {
566 assert_eq!(lower, upper);
567 Partitioned::new_singleton(
568 RangeBound::exact(lower.unwrap()),
569 MzOffset::from(offset),
570 )
571 }
572 (false, false) => {
573 let lower = match lower {
574 Some(pid) => RangeBound::after(pid),
575 None => RangeBound::NegInfinity,
576 };
577 let upper = match upper {
578 Some(pid) => RangeBound::before(pid),
579 None => RangeBound::PosInfinity,
580 };
581 Partitioned::new_range(lower, upper, MzOffset::from(offset))
582 }
583 _ => panic!("invalid timestamp"),
584 }
585 }
586 invalid_binding => unreachable!("invalid binding {:?}", invalid_binding),
587 }
588 }
589}
590
591#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
593pub enum KafkaMetadataKind {
594 Partition,
595 Offset,
596 Timestamp,
597 Headers,
598 Header { key: String, use_bytes: bool },
599}
600
601impl RustType<ProtoKafkaMetadataKind> for KafkaMetadataKind {
602 fn into_proto(&self) -> ProtoKafkaMetadataKind {
603 use proto_kafka_metadata_kind::Kind;
604 ProtoKafkaMetadataKind {
605 kind: Some(match self {
606 KafkaMetadataKind::Partition => Kind::Partition(()),
607 KafkaMetadataKind::Offset => Kind::Offset(()),
608 KafkaMetadataKind::Timestamp => Kind::Timestamp(()),
609 KafkaMetadataKind::Headers => Kind::Headers(()),
610 KafkaMetadataKind::Header { key, use_bytes } => Kind::Header(ProtoKafkaHeader {
611 key: key.clone(),
612 use_bytes: *use_bytes,
613 }),
614 }),
615 }
616 }
617
618 fn from_proto(proto: ProtoKafkaMetadataKind) -> Result<Self, TryFromProtoError> {
619 use proto_kafka_metadata_kind::Kind;
620 let kind = proto
621 .kind
622 .ok_or_else(|| TryFromProtoError::missing_field("ProtoKafkaMetadataKind::kind"))?;
623 Ok(match kind {
624 Kind::Partition(()) => KafkaMetadataKind::Partition,
625 Kind::Offset(()) => KafkaMetadataKind::Offset,
626 Kind::Timestamp(()) => KafkaMetadataKind::Timestamp,
627 Kind::Headers(()) => KafkaMetadataKind::Headers,
628 Kind::Header(ProtoKafkaHeader { key, use_bytes }) => {
629 KafkaMetadataKind::Header { key, use_bytes }
630 }
631 })
632 }
633}