mz_storage_types/sources/
kafka.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related kafka sources
11
12use std::collections::BTreeMap;
13use std::fmt;
14use std::sync::LazyLock;
15use std::time::Duration;
16
17use dec::OrderedDecimal;
18use mz_dyncfg::ConfigSet;
19use mz_kafka_util::client::MzClientContext;
20use mz_ore::collections::CollectionExt;
21use mz_ore::future::InTask;
22use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
23use mz_repr::adt::numeric::Numeric;
24use mz_repr::{CatalogItemId, ColumnType, Datum, GlobalId, RelationDesc, Row, ScalarType};
25use mz_timely_util::order::{Extrema, Partitioned};
26use proptest::prelude::any;
27use proptest_derive::Arbitrary;
28use rdkafka::admin::AdminClient;
29use serde::{Deserialize, Serialize};
30use timely::progress::Antichain;
31
32use crate::connections::inline::{
33    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
34    ReferencedConnection,
35};
36use crate::connections::{ConnectionContext, KafkaConnection};
37use crate::controller::AlterError;
38use crate::sources::{MzOffset, SourceConnection, SourceTimestamp};
39
40use super::SourceExportDetails;
41
42include!(concat!(
43    env!("OUT_DIR"),
44    "/mz_storage_types.sources.kafka.rs"
45));
46
47/// A "moment in time" perceivable in Kafka––for each partition, the greatest
48/// visible offset.
49pub type KafkaTimestamp = Partitioned<RangeBound<i32>, MzOffset>;
50
51#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
52pub struct KafkaSourceConnection<C: ConnectionAccess = InlinedConnection> {
53    pub connection: C::Kafka,
54    pub connection_id: CatalogItemId,
55    pub topic: String,
56    // Map from partition -> starting offset
57    #[proptest(strategy = "proptest::collection::btree_map(any::<i32>(), any::<i64>(), 0..4)")]
58    pub start_offsets: BTreeMap<i32, i64>,
59    pub group_id_prefix: Option<String>,
60    // The metadata_columns for the primary source export from this kafka source
61    // TODO: This should be removed once we stop outputting to the primary source collection
62    // and instead only output to source_exports
63    #[proptest(strategy = "proptest::collection::vec(any::<(String, KafkaMetadataKind)>(), 0..4)")]
64    pub metadata_columns: Vec<(String, KafkaMetadataKind)>,
65    pub topic_metadata_refresh_interval: Duration,
66}
67
68impl<R: ConnectionResolver> IntoInlineConnection<KafkaSourceConnection, R>
69    for KafkaSourceConnection<ReferencedConnection>
70{
71    fn into_inline_connection(self, r: R) -> KafkaSourceConnection {
72        let KafkaSourceConnection {
73            connection,
74            connection_id,
75            topic,
76            start_offsets,
77            group_id_prefix,
78            metadata_columns,
79            topic_metadata_refresh_interval,
80        } = self;
81        KafkaSourceConnection {
82            connection: r.resolve_connection(connection).unwrap_kafka(),
83            connection_id,
84            topic,
85            start_offsets,
86            group_id_prefix,
87            metadata_columns,
88            topic_metadata_refresh_interval,
89        }
90    }
91}
92
93pub static KAFKA_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
94    RelationDesc::builder()
95        .with_column(
96            "partition",
97            ScalarType::Range {
98                element_type: Box::new(ScalarType::Numeric { max_scale: None }),
99            }
100            .nullable(false),
101        )
102        .with_column("offset", ScalarType::UInt64.nullable(true))
103        .finish()
104});
105
106impl KafkaSourceConnection {
107    /// Returns the client ID to register with librdkafka with.
108    ///
109    /// The caller is responsible for providing the source ID as it is not known
110    /// to `KafkaSourceConnection`.
111    pub fn client_id(
112        &self,
113        configs: &ConfigSet,
114        connection_context: &ConnectionContext,
115        source_id: GlobalId,
116    ) -> String {
117        let mut client_id =
118            KafkaConnection::id_base(connection_context, self.connection_id, source_id);
119        self.connection.enrich_client_id(configs, &mut client_id);
120        client_id
121    }
122}
123
124impl<C: ConnectionAccess> KafkaSourceConnection<C> {
125    /// Returns the ID for the consumer group the configured source will use.
126    ///
127    /// The caller is responsible for providing the source ID as it is not known
128    /// to `KafkaSourceConnection`.
129    pub fn group_id(&self, connection_context: &ConnectionContext, source_id: GlobalId) -> String {
130        format!(
131            "{}{}",
132            self.group_id_prefix.as_deref().unwrap_or(""),
133            KafkaConnection::id_base(connection_context, self.connection_id, source_id),
134        )
135    }
136}
137
138impl KafkaSourceConnection {
139    pub async fn fetch_write_frontier(
140        self,
141        storage_configuration: &crate::configuration::StorageConfiguration,
142    ) -> Result<timely::progress::Antichain<KafkaTimestamp>, anyhow::Error> {
143        let (context, _error_rx) = MzClientContext::with_errors();
144        let client: AdminClient<_> = self
145            .connection
146            .create_with_context(storage_configuration, context, &BTreeMap::new(), InTask::No)
147            .await?;
148
149        let metadata_timeout = storage_configuration
150            .parameters
151            .kafka_timeout_config
152            .fetch_metadata_timeout;
153
154        mz_ore::task::spawn_blocking(|| "kafka_fetch_write_frontier_fetch_metadata", {
155            move || {
156                let meta = client
157                    .inner()
158                    .fetch_metadata(Some(&self.topic), metadata_timeout)?;
159
160                let pids = meta
161                    .topics()
162                    .into_element()
163                    .partitions()
164                    .iter()
165                    .map(|p| p.id());
166
167                let mut current_upper = Antichain::new();
168                let mut max_pid = 0;
169                for pid in pids {
170                    let (_, high) =
171                        client
172                            .inner()
173                            .fetch_watermarks(&self.topic, pid, metadata_timeout)?;
174                    max_pid = std::cmp::max(pid, max_pid);
175                    current_upper.insert(Partitioned::new_singleton(
176                        RangeBound::Elem(pid, BoundKind::At),
177                        MzOffset::from(u64::try_from(high).unwrap()),
178                    ));
179                }
180                current_upper.insert(Partitioned::new_range(
181                    RangeBound::Elem(max_pid, BoundKind::After),
182                    RangeBound::PosInfinity,
183                    MzOffset::from(0),
184                ));
185
186                Ok(current_upper)
187            }
188        })
189        .await?
190    }
191}
192
193impl<C: ConnectionAccess> SourceConnection for KafkaSourceConnection<C> {
194    fn name(&self) -> &'static str {
195        "kafka"
196    }
197
198    fn external_reference(&self) -> Option<&str> {
199        Some(self.topic.as_str())
200    }
201
202    fn default_key_desc(&self) -> RelationDesc {
203        RelationDesc::builder()
204            .with_column("key", ScalarType::Bytes.nullable(true))
205            .finish()
206    }
207
208    fn default_value_desc(&self) -> RelationDesc {
209        RelationDesc::builder()
210            .with_column("value", ScalarType::Bytes.nullable(true))
211            .finish()
212    }
213
214    fn timestamp_desc(&self) -> RelationDesc {
215        KAFKA_PROGRESS_DESC.clone()
216    }
217
218    fn connection_id(&self) -> Option<CatalogItemId> {
219        Some(self.connection_id)
220    }
221
222    fn primary_export_details(&self) -> SourceExportDetails {
223        SourceExportDetails::Kafka(KafkaSourceExportDetails {
224            metadata_columns: self.metadata_columns.clone(),
225        })
226    }
227
228    fn supports_read_only(&self) -> bool {
229        true
230    }
231
232    fn prefers_single_replica(&self) -> bool {
233        false
234    }
235}
236
237impl<C: ConnectionAccess> crate::AlterCompatible for KafkaSourceConnection<C> {
238    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
239        if self == other {
240            return Ok(());
241        }
242
243        let KafkaSourceConnection {
244            connection,
245            connection_id,
246            topic,
247            start_offsets,
248            group_id_prefix,
249            metadata_columns,
250            topic_metadata_refresh_interval,
251        } = self;
252
253        let compatibility_checks = [
254            (
255                connection.alter_compatible(id, &other.connection).is_ok(),
256                "connection",
257            ),
258            (connection_id == &other.connection_id, "connection_id"),
259            (topic == &other.topic, "topic"),
260            (start_offsets == &other.start_offsets, "start_offsets"),
261            (group_id_prefix == &other.group_id_prefix, "group_id_prefix"),
262            (
263                metadata_columns == &other.metadata_columns,
264                "metadata_columns",
265            ),
266            (
267                topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
268                "topic_metadata_refresh_interval",
269            ),
270        ];
271
272        for (compatible, field) in compatibility_checks {
273            if !compatible {
274                tracing::warn!(
275                    "KafkaSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
276                    self,
277                    other
278                );
279
280                return Err(AlterError { id });
281            }
282        }
283
284        Ok(())
285    }
286}
287
288impl RustType<ProtoKafkaSourceConnection> for KafkaSourceConnection<InlinedConnection> {
289    fn into_proto(&self) -> ProtoKafkaSourceConnection {
290        ProtoKafkaSourceConnection {
291            connection: Some(self.connection.into_proto()),
292            connection_id: Some(self.connection_id.into_proto()),
293            topic: self.topic.clone(),
294            start_offsets: self.start_offsets.clone(),
295            group_id_prefix: self.group_id_prefix.clone(),
296            metadata_columns: self
297                .metadata_columns
298                .iter()
299                .map(|(name, kind)| ProtoKafkaMetadataColumn {
300                    name: name.into_proto(),
301                    kind: Some(kind.into_proto()),
302                })
303                .collect(),
304            topic_metadata_refresh_interval: Some(
305                self.topic_metadata_refresh_interval.into_proto(),
306            ),
307        }
308    }
309
310    fn from_proto(proto: ProtoKafkaSourceConnection) -> Result<Self, TryFromProtoError> {
311        let mut metadata_columns = Vec::with_capacity(proto.metadata_columns.len());
312        for c in proto.metadata_columns {
313            let kind = c.kind.into_rust_if_some("ProtoKafkaMetadataColumn::kind")?;
314            metadata_columns.push((c.name, kind));
315        }
316
317        Ok(KafkaSourceConnection {
318            connection: proto
319                .connection
320                .into_rust_if_some("ProtoKafkaSourceConnection::connection")?,
321            connection_id: proto
322                .connection_id
323                .into_rust_if_some("ProtoKafkaSourceConnection::connection_id")?,
324            topic: proto.topic,
325            start_offsets: proto.start_offsets,
326            group_id_prefix: proto.group_id_prefix,
327            metadata_columns,
328            topic_metadata_refresh_interval: proto
329                .topic_metadata_refresh_interval
330                .into_rust_if_some("ProtoKafkaSourceConnection::topic_metadata_refresh_interval")?,
331        })
332    }
333}
334
335/// Return the column types used to describe the metadata columns of a kafka source export.
336pub fn kafka_metadata_columns_desc(
337    metadata_columns: &Vec<(String, KafkaMetadataKind)>,
338) -> Vec<(&str, ColumnType)> {
339    metadata_columns
340        .iter()
341        .map(|(name, kind)| {
342            let typ = match kind {
343                KafkaMetadataKind::Partition => ScalarType::Int32.nullable(false),
344                KafkaMetadataKind::Offset => ScalarType::UInt64.nullable(false),
345                KafkaMetadataKind::Timestamp => {
346                    ScalarType::Timestamp { precision: None }.nullable(false)
347                }
348                KafkaMetadataKind::Header {
349                    use_bytes: true, ..
350                } => ScalarType::Bytes.nullable(true),
351                KafkaMetadataKind::Header {
352                    use_bytes: false, ..
353                } => ScalarType::String.nullable(true),
354                KafkaMetadataKind::Headers => ScalarType::List {
355                    element_type: Box::new(ScalarType::Record {
356                        fields: [
357                            (
358                                "key".into(),
359                                ColumnType {
360                                    nullable: false,
361                                    scalar_type: ScalarType::String,
362                                },
363                            ),
364                            (
365                                "value".into(),
366                                ColumnType {
367                                    nullable: true,
368                                    scalar_type: ScalarType::Bytes,
369                                },
370                            ),
371                        ]
372                        .into(),
373                        custom_id: None,
374                    }),
375                    custom_id: None,
376                }
377                .nullable(false),
378            };
379            (&**name, typ)
380        })
381        .collect()
382}
383
384/// The details of a source export from a kafka source.
385#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
386pub struct KafkaSourceExportDetails {
387    #[proptest(strategy = "proptest::collection::vec(any::<(String, KafkaMetadataKind)>(), 0..4)")]
388    pub metadata_columns: Vec<(String, KafkaMetadataKind)>,
389}
390
391impl crate::AlterCompatible for KafkaSourceExportDetails {
392    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
393        let Self { metadata_columns } = self;
394        let compatibility_checks = [(
395            metadata_columns == &other.metadata_columns,
396            "metadata_columns",
397        )];
398        for (compatible, field) in compatibility_checks {
399            if !compatible {
400                tracing::warn!(
401                    "KafkaSourceExportDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
402                    self,
403                    other
404                );
405
406                return Err(AlterError { id });
407            }
408        }
409        Ok(())
410    }
411}
412
413impl RustType<ProtoKafkaSourceExportDetails> for KafkaSourceExportDetails {
414    fn into_proto(&self) -> ProtoKafkaSourceExportDetails {
415        ProtoKafkaSourceExportDetails {
416            metadata_columns: self
417                .metadata_columns
418                .iter()
419                .map(|(name, kind)| ProtoKafkaMetadataColumn {
420                    name: name.into_proto(),
421                    kind: Some(kind.into_proto()),
422                })
423                .collect(),
424        }
425    }
426
427    fn from_proto(proto: ProtoKafkaSourceExportDetails) -> Result<Self, TryFromProtoError> {
428        let mut metadata_columns = Vec::with_capacity(proto.metadata_columns.len());
429        for c in proto.metadata_columns {
430            let kind = c.kind.into_rust_if_some("ProtoKafkaMetadataColumn::kind")?;
431            metadata_columns.push((c.name, kind));
432        }
433
434        Ok(KafkaSourceExportDetails { metadata_columns })
435    }
436}
437
438/// Given an ordered type `P` it augments each of its values with a point right *before* that
439/// value, exactly *at* that value, and right *after* that value. Additionally, it provides two
440/// special values for positive and negative infinity that are greater than and less than all the
441/// other elements respectively.
442#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
443pub enum RangeBound<P> {
444    /// Negative infinity.
445    NegInfinity,
446    /// A specific element value with its associated kind.
447    Elem(P, BoundKind),
448    /// Positive infinity.
449    PosInfinity,
450}
451
452#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
453pub enum BoundKind {
454    /// A bound right before a value. When used as an upper it represents an exclusive range.
455    Before,
456    /// A bound exactly at a value. When used as a lower or upper it represents an inclusive range.
457    At,
458    /// A bound right after a value. When used as a lower it represents an exclusive range.
459    After,
460}
461
462impl<P: std::fmt::Debug> RangeBound<P> {
463    /// Constructs a range bound right before `elem`.
464    pub fn before(elem: P) -> Self {
465        Self::Elem(elem, BoundKind::Before)
466    }
467
468    /// Constructs a range bound exactly at `elem`.
469    pub fn exact(elem: P) -> Self {
470        Self::Elem(elem, BoundKind::At)
471    }
472
473    /// Constructs a range bound right after `elem`.
474    pub fn after(elem: P) -> Self {
475        Self::Elem(elem, BoundKind::After)
476    }
477
478    /// Unwraps the element of this bound.
479    ///
480    /// # Panics
481    ///
482    /// This method panics if this is not an exact element range bound.
483    pub fn unwrap_exact(&self) -> &P {
484        match self {
485            RangeBound::Elem(p, BoundKind::At) => p,
486            _ => panic!("attempt to unwrap_exact {self:?}"),
487        }
488    }
489}
490
491impl<P: fmt::Display> fmt::Display for RangeBound<P> {
492    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493        match self {
494            Self::NegInfinity => f.write_str("-inf"),
495            Self::Elem(elem, BoundKind::Before) => write!(f, "<{elem}"),
496            Self::Elem(elem, BoundKind::At) => write!(f, "{elem}"),
497            Self::Elem(elem, BoundKind::After) => write!(f, "{elem}>"),
498            Self::PosInfinity => f.write_str("+inf"),
499        }
500    }
501}
502
503impl<P> Extrema for RangeBound<P> {
504    fn minimum() -> Self {
505        Self::NegInfinity
506    }
507    fn maximum() -> Self {
508        Self::PosInfinity
509    }
510}
511
512impl SourceTimestamp for KafkaTimestamp {
513    fn encode_row(&self) -> Row {
514        use mz_repr::adt::range;
515        let mut row = Row::with_capacity(2);
516        let mut packer = row.packer();
517
518        let to_numeric = |p: i32| Datum::from(OrderedDecimal(Numeric::from(p)));
519
520        let (lower, lower_inclusive) = match self.interval().lower {
521            RangeBound::NegInfinity => (Datum::Null, false),
522            RangeBound::Elem(pid, BoundKind::After) => (to_numeric(pid), false),
523            RangeBound::Elem(pid, BoundKind::At) => (to_numeric(pid), true),
524            lower => unreachable!("invalid lower bound {lower:?}"),
525        };
526        let (upper, upper_inclusive) = match self.interval().upper {
527            RangeBound::PosInfinity => (Datum::Null, false),
528            RangeBound::Elem(pid, BoundKind::Before) => (to_numeric(pid), false),
529            RangeBound::Elem(pid, BoundKind::At) => (to_numeric(pid), true),
530            upper => unreachable!("invalid upper bound {upper:?}"),
531        };
532        assert_eq!(lower_inclusive, upper_inclusive, "invalid range {self}");
533
534        packer
535            .push_range(range::Range::new(Some((
536                range::RangeBound::new(lower, lower_inclusive),
537                range::RangeBound::new(upper, upper_inclusive),
538            ))))
539            .expect("pushing range must not generate errors");
540
541        packer.push(Datum::UInt64(self.timestamp().offset));
542        row
543    }
544
545    fn decode_row(row: &Row) -> Self {
546        let mut datums = row.iter();
547
548        match (datums.next(), datums.next(), datums.next()) {
549            (Some(Datum::Range(range)), Some(Datum::UInt64(offset)), None) => {
550                let mut range = range.into_bounds(|b| b.datum());
551                //XXX: why do we have to canonicalize on read?
552                range.canonicalize().expect("ranges must be valid");
553                let range = range.inner.expect("empty range");
554
555                let lower = range.lower.bound.map(|row| {
556                    i32::try_from(row.unwrap_numeric().0)
557                        .expect("only i32 values converted to ranges")
558                });
559                let upper = range.upper.bound.map(|row| {
560                    i32::try_from(row.unwrap_numeric().0)
561                        .expect("only i32 values converted to ranges")
562                });
563
564                match (range.lower.inclusive, range.upper.inclusive) {
565                    (true, true) => {
566                        assert_eq!(lower, upper);
567                        Partitioned::new_singleton(
568                            RangeBound::exact(lower.unwrap()),
569                            MzOffset::from(offset),
570                        )
571                    }
572                    (false, false) => {
573                        let lower = match lower {
574                            Some(pid) => RangeBound::after(pid),
575                            None => RangeBound::NegInfinity,
576                        };
577                        let upper = match upper {
578                            Some(pid) => RangeBound::before(pid),
579                            None => RangeBound::PosInfinity,
580                        };
581                        Partitioned::new_range(lower, upper, MzOffset::from(offset))
582                    }
583                    _ => panic!("invalid timestamp"),
584                }
585            }
586            invalid_binding => unreachable!("invalid binding {:?}", invalid_binding),
587        }
588    }
589}
590
591/// Which piece of metadata a column corresponds to
592#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
593pub enum KafkaMetadataKind {
594    Partition,
595    Offset,
596    Timestamp,
597    Headers,
598    Header { key: String, use_bytes: bool },
599}
600
601impl RustType<ProtoKafkaMetadataKind> for KafkaMetadataKind {
602    fn into_proto(&self) -> ProtoKafkaMetadataKind {
603        use proto_kafka_metadata_kind::Kind;
604        ProtoKafkaMetadataKind {
605            kind: Some(match self {
606                KafkaMetadataKind::Partition => Kind::Partition(()),
607                KafkaMetadataKind::Offset => Kind::Offset(()),
608                KafkaMetadataKind::Timestamp => Kind::Timestamp(()),
609                KafkaMetadataKind::Headers => Kind::Headers(()),
610                KafkaMetadataKind::Header { key, use_bytes } => Kind::Header(ProtoKafkaHeader {
611                    key: key.clone(),
612                    use_bytes: *use_bytes,
613                }),
614            }),
615        }
616    }
617
618    fn from_proto(proto: ProtoKafkaMetadataKind) -> Result<Self, TryFromProtoError> {
619        use proto_kafka_metadata_kind::Kind;
620        let kind = proto
621            .kind
622            .ok_or_else(|| TryFromProtoError::missing_field("ProtoKafkaMetadataKind::kind"))?;
623        Ok(match kind {
624            Kind::Partition(()) => KafkaMetadataKind::Partition,
625            Kind::Offset(()) => KafkaMetadataKind::Offset,
626            Kind::Timestamp(()) => KafkaMetadataKind::Timestamp,
627            Kind::Headers(()) => KafkaMetadataKind::Headers,
628            Kind::Header(ProtoKafkaHeader { key, use_bytes }) => {
629                KafkaMetadataKind::Header { key, use_bytes }
630            }
631        })
632    }
633}