mz_storage_types/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to reporting changing collections out of `dataflow`.
11
12use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
20use mz_repr::bytes::ByteSize;
21use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
22use proptest::prelude::{Arbitrary, BoxedStrategy, Strategy, any};
23use proptest_derive::Arbitrary;
24use serde::{Deserialize, Serialize};
25use timely::PartialOrder;
26use timely::progress::frontier::Antichain;
27
28use crate::AlterCompatible;
29use crate::connections::inline::{
30    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
31    ReferencedConnection,
32};
33use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
34use crate::controller::{AlterError, CollectionMetadata};
35
36include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sinks.rs"));
37
38pub mod s3_oneshot_sink;
39
40/// A sink for updates to a relational collection.
41#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
42pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
43    pub from: GlobalId,
44    pub from_desc: RelationDesc,
45    pub connection: StorageSinkConnection,
46    pub with_snapshot: bool,
47    pub version: u64,
48    pub envelope: SinkEnvelope,
49    pub as_of: Antichain<T>,
50    pub from_storage_metadata: S,
51    pub to_storage_metadata: S,
52}
53
54impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
55    for StorageSinkDesc<S, T>
56{
57    /// Determines if `self` is compatible with another `StorageSinkDesc`, in
58    /// such a way that it is possible to turn `self` into `other` through a
59    /// valid series of transformations.
60    ///
61    /// Currently, the only "valid transformation" is the passage of time such
62    /// that the sink's as ofs may differ. However, this will change once we
63    /// support `ALTER CONNECTION` or `ALTER SINK`.
64    fn alter_compatible(
65        &self,
66        id: GlobalId,
67        other: &StorageSinkDesc<S, T>,
68    ) -> Result<(), AlterError> {
69        if self == other {
70            return Ok(());
71        }
72        let StorageSinkDesc {
73            from,
74            from_desc,
75            connection,
76            envelope,
77            version: _,
78            // The as-of of the descriptions may differ.
79            as_of: _,
80            from_storage_metadata,
81            with_snapshot,
82            to_storage_metadata,
83        } = self;
84
85        let compatibility_checks = [
86            (from == &other.from, "from"),
87            (from_desc == &other.from_desc, "from_desc"),
88            (
89                connection.alter_compatible(id, &other.connection).is_ok(),
90                "connection",
91            ),
92            (envelope == &other.envelope, "envelope"),
93            // This can legally change from true to false once the snapshot has been
94            // written out.
95            (*with_snapshot || !other.with_snapshot, "with_snapshot"),
96            (
97                from_storage_metadata == &other.from_storage_metadata,
98                "from_storage_metadata",
99            ),
100            (
101                to_storage_metadata == &other.to_storage_metadata,
102                "to_storage_metadata",
103            ),
104        ];
105
106        for (compatible, field) in compatibility_checks {
107            if !compatible {
108                tracing::warn!(
109                    "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
110                    self,
111                    other
112                );
113
114                return Err(AlterError { id });
115            }
116        }
117
118        Ok(())
119    }
120}
121
122impl Arbitrary for StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp> {
123    type Strategy = BoxedStrategy<Self>;
124    type Parameters = ();
125
126    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
127        (
128            any::<GlobalId>(),
129            any::<RelationDesc>(),
130            any::<StorageSinkConnection>(),
131            any::<SinkEnvelope>(),
132            any::<Option<mz_repr::Timestamp>>(),
133            any::<CollectionMetadata>(),
134            any::<bool>(),
135            any::<u64>(),
136            any::<CollectionMetadata>(),
137        )
138            .prop_map(
139                |(
140                    from,
141                    from_desc,
142                    connection,
143                    envelope,
144                    as_of,
145                    from_storage_metadata,
146                    with_snapshot,
147                    version,
148                    to_storage_metadata,
149                )| {
150                    StorageSinkDesc {
151                        from,
152                        from_desc,
153                        connection,
154                        envelope,
155                        version,
156                        as_of: Antichain::from_iter(as_of),
157                        from_storage_metadata,
158                        with_snapshot,
159                        to_storage_metadata,
160                    }
161                },
162            )
163            .prop_filter("identical source and sink", |desc| {
164                desc.from_storage_metadata != desc.to_storage_metadata
165            })
166            .boxed()
167    }
168}
169
170impl RustType<ProtoStorageSinkDesc> for StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp> {
171    fn into_proto(&self) -> ProtoStorageSinkDesc {
172        ProtoStorageSinkDesc {
173            connection: Some(self.connection.into_proto()),
174            from: Some(self.from.into_proto()),
175            from_desc: Some(self.from_desc.into_proto()),
176            envelope: Some(self.envelope.into_proto()),
177            as_of: Some(self.as_of.into_proto()),
178            from_storage_metadata: Some(self.from_storage_metadata.into_proto()),
179            to_storage_metadata: Some(self.to_storage_metadata.into_proto()),
180            with_snapshot: self.with_snapshot,
181            version: self.version,
182        }
183    }
184
185    fn from_proto(proto: ProtoStorageSinkDesc) -> Result<Self, TryFromProtoError> {
186        Ok(StorageSinkDesc {
187            from: proto.from.into_rust_if_some("ProtoStorageSinkDesc::from")?,
188            from_desc: proto
189                .from_desc
190                .into_rust_if_some("ProtoStorageSinkDesc::from_desc")?,
191            connection: proto
192                .connection
193                .into_rust_if_some("ProtoStorageSinkDesc::connection")?,
194            envelope: proto
195                .envelope
196                .into_rust_if_some("ProtoStorageSinkDesc::envelope")?,
197            as_of: proto
198                .as_of
199                .into_rust_if_some("ProtoStorageSinkDesc::as_of")?,
200            from_storage_metadata: proto
201                .from_storage_metadata
202                .into_rust_if_some("ProtoStorageSinkDesc::from_storage_metadata")?,
203            with_snapshot: proto.with_snapshot,
204            version: proto.version,
205            to_storage_metadata: proto
206                .to_storage_metadata
207                .into_rust_if_some("ProtoStorageSinkDesc::to_storage_metadata")?,
208        })
209    }
210}
211
212#[derive(Arbitrary, Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
213pub enum SinkEnvelope {
214    Debezium,
215    Upsert,
216}
217
218impl RustType<ProtoSinkEnvelope> for SinkEnvelope {
219    fn into_proto(&self) -> ProtoSinkEnvelope {
220        use proto_sink_envelope::Kind;
221        ProtoSinkEnvelope {
222            kind: Some(match self {
223                SinkEnvelope::Debezium => Kind::Debezium(()),
224                SinkEnvelope::Upsert => Kind::Upsert(()),
225            }),
226        }
227    }
228
229    fn from_proto(proto: ProtoSinkEnvelope) -> Result<Self, TryFromProtoError> {
230        use proto_sink_envelope::Kind;
231        let kind = proto
232            .kind
233            .ok_or_else(|| TryFromProtoError::missing_field("ProtoSinkEnvelope::kind"))?;
234        Ok(match kind {
235            Kind::Debezium(()) => SinkEnvelope::Debezium,
236            Kind::Upsert(()) => SinkEnvelope::Upsert,
237        })
238    }
239}
240
241#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
242pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
243    Kafka(KafkaSinkConnection<C>),
244}
245
246impl<C: ConnectionAccess> StorageSinkConnection<C> {
247    /// Determines if `self` is compatible with another `StorageSinkConnection`,
248    /// in such a way that it is possible to turn `self` into `other` through a
249    /// valid series of transformations (e.g. no transformation or `ALTER
250    /// CONNECTION`).
251    pub fn alter_compatible(
252        &self,
253        id: GlobalId,
254        other: &StorageSinkConnection<C>,
255    ) -> Result<(), AlterError> {
256        if self == other {
257            return Ok(());
258        }
259        match (self, other) {
260            (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
261                s.alter_compatible(id, o)?
262            }
263        }
264
265        Ok(())
266    }
267}
268
269impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
270    for StorageSinkConnection<ReferencedConnection>
271{
272    fn into_inline_connection(self, r: R) -> StorageSinkConnection {
273        match self {
274            Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
275        }
276    }
277}
278
279impl RustType<ProtoStorageSinkConnection> for StorageSinkConnection {
280    fn into_proto(&self) -> ProtoStorageSinkConnection {
281        use proto_storage_sink_connection::Kind::*;
282
283        ProtoStorageSinkConnection {
284            kind: Some(match self {
285                Self::Kafka(conn) => KafkaV2(conn.into_proto()),
286            }),
287        }
288    }
289    fn from_proto(proto: ProtoStorageSinkConnection) -> Result<Self, TryFromProtoError> {
290        use proto_storage_sink_connection::Kind::*;
291
292        let kind = proto
293            .kind
294            .ok_or_else(|| TryFromProtoError::missing_field("ProtoStorageSinkConnection::kind"))?;
295
296        Ok(match kind {
297            KafkaV2(proto) => Self::Kafka(proto.into_rust()?),
298        })
299    }
300}
301
302impl<C: ConnectionAccess> StorageSinkConnection<C> {
303    /// returns an option to not constrain ourselves in the future
304    pub fn connection_id(&self) -> Option<CatalogItemId> {
305        use StorageSinkConnection::*;
306        match self {
307            Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
308        }
309    }
310
311    /// Returns the name of the sink connection.
312    pub fn name(&self) -> &'static str {
313        use StorageSinkConnection::*;
314        match self {
315            Kafka(_) => "kafka",
316        }
317    }
318}
319
320impl RustType<proto_kafka_sink_connection_v2::ProtoKeyDescAndIndices>
321    for (RelationDesc, Vec<usize>)
322{
323    fn into_proto(&self) -> proto_kafka_sink_connection_v2::ProtoKeyDescAndIndices {
324        proto_kafka_sink_connection_v2::ProtoKeyDescAndIndices {
325            desc: Some(self.0.into_proto()),
326            indices: self.1.into_proto(),
327        }
328    }
329
330    fn from_proto(
331        proto: proto_kafka_sink_connection_v2::ProtoKeyDescAndIndices,
332    ) -> Result<Self, TryFromProtoError> {
333        Ok((
334            proto
335                .desc
336                .into_rust_if_some("ProtoKeyDescAndIndices::desc")?,
337            proto.indices.into_rust()?,
338        ))
339    }
340}
341
342impl RustType<proto_kafka_sink_connection_v2::ProtoRelationKeyIndicesVec> for Vec<usize> {
343    fn into_proto(&self) -> proto_kafka_sink_connection_v2::ProtoRelationKeyIndicesVec {
344        proto_kafka_sink_connection_v2::ProtoRelationKeyIndicesVec {
345            relation_key_indices: self.into_proto(),
346        }
347    }
348
349    fn from_proto(
350        proto: proto_kafka_sink_connection_v2::ProtoRelationKeyIndicesVec,
351    ) -> Result<Self, TryFromProtoError> {
352        proto.relation_key_indices.into_rust()
353    }
354}
355
356#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
357pub enum KafkaSinkCompressionType {
358    None,
359    Gzip,
360    Snappy,
361    Lz4,
362    Zstd,
363}
364
365impl KafkaSinkCompressionType {
366    /// Format the compression type as expected by `compression.type` librdkafka
367    /// setting.
368    pub fn to_librdkafka_option(&self) -> &'static str {
369        match self {
370            KafkaSinkCompressionType::None => "none",
371            KafkaSinkCompressionType::Gzip => "gzip",
372            KafkaSinkCompressionType::Snappy => "snappy",
373            KafkaSinkCompressionType::Lz4 => "lz4",
374            KafkaSinkCompressionType::Zstd => "zstd",
375        }
376    }
377}
378
379#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
380pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
381    pub connection_id: CatalogItemId,
382    pub connection: C::Kafka,
383    pub format: KafkaSinkFormat<C>,
384    /// A natural key of the sinked relation (view or source).
385    pub relation_key_indices: Option<Vec<usize>>,
386    /// The user-specified key for the sink.
387    pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
388    /// The index of the column containing message headers value, if any.
389    pub headers_index: Option<usize>,
390    pub value_desc: RelationDesc,
391    /// An expression that, if present, computes a hash value that should be
392    /// used to determine the partition for each message.
393    pub partition_by: Option<MirScalarExpr>,
394    pub topic: String,
395    /// Options to use when creating the topic if it doesn't already exist.
396    pub topic_options: KafkaTopicOptions,
397    pub compression_type: KafkaSinkCompressionType,
398    pub progress_group_id: KafkaIdStyle,
399    pub transactional_id: KafkaIdStyle,
400    pub topic_metadata_refresh_interval: Duration,
401}
402
403impl KafkaSinkConnection {
404    /// Returns the client ID to register with librdkafka with.
405    ///
406    /// The caller is responsible for providing the sink ID as it is not known
407    /// to `KafkaSinkConnection`.
408    pub fn client_id(
409        &self,
410        configs: &ConfigSet,
411        connection_context: &ConnectionContext,
412        sink_id: GlobalId,
413    ) -> String {
414        let mut client_id =
415            KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
416        self.connection.enrich_client_id(configs, &mut client_id);
417        client_id
418    }
419
420    /// Returns the name of the progress topic to use for the sink.
421    pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<str> {
422        self.connection
423            .progress_topic(connection_context, self.connection_id)
424    }
425
426    /// Returns the ID for the consumer group the sink will use to read the
427    /// progress topic on resumption.
428    ///
429    /// The caller is responsible for providing the sink ID as it is not known
430    /// to `KafkaSinkConnection`.
431    pub fn progress_group_id(
432        &self,
433        connection_context: &ConnectionContext,
434        sink_id: GlobalId,
435    ) -> String {
436        match self.progress_group_id {
437            KafkaIdStyle::Prefix(ref prefix) => format!(
438                "{}{}",
439                prefix.as_deref().unwrap_or(""),
440                KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
441            ),
442            KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
443        }
444    }
445
446    /// Returns the transactional ID to use for the sink.
447    ///
448    /// The caller is responsible for providing the sink ID as it is not known
449    /// to `KafkaSinkConnection`.
450    pub fn transactional_id(
451        &self,
452        connection_context: &ConnectionContext,
453        sink_id: GlobalId,
454    ) -> String {
455        match self.transactional_id {
456            KafkaIdStyle::Prefix(ref prefix) => format!(
457                "{}{}",
458                prefix.as_deref().unwrap_or(""),
459                KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
460            ),
461            KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
462        }
463    }
464}
465
466impl<C: ConnectionAccess> KafkaSinkConnection<C> {
467    /// Determines if `self` is compatible with another `StorageSinkConnection`,
468    /// in such a way that it is possible to turn `self` into `other` through a
469    /// valid series of transformations (e.g. no transformation or `ALTER
470    /// CONNECTION`).
471    pub fn alter_compatible(
472        &self,
473        id: GlobalId,
474        other: &KafkaSinkConnection<C>,
475    ) -> Result<(), AlterError> {
476        if self == other {
477            return Ok(());
478        }
479        let KafkaSinkConnection {
480            connection_id,
481            connection,
482            format,
483            relation_key_indices,
484            key_desc_and_indices,
485            headers_index,
486            value_desc,
487            partition_by,
488            topic,
489            compression_type,
490            progress_group_id,
491            transactional_id,
492            topic_options,
493            topic_metadata_refresh_interval,
494        } = self;
495
496        let compatibility_checks = [
497            (connection_id == &other.connection_id, "connection_id"),
498            (
499                connection.alter_compatible(id, &other.connection).is_ok(),
500                "connection",
501            ),
502            (format.alter_compatible(id, &other.format).is_ok(), "format"),
503            (
504                relation_key_indices == &other.relation_key_indices,
505                "relation_key_indices",
506            ),
507            (
508                key_desc_and_indices == &other.key_desc_and_indices,
509                "key_desc_and_indices",
510            ),
511            (headers_index == &other.headers_index, "headers_index"),
512            (value_desc == &other.value_desc, "value_desc"),
513            (partition_by == &other.partition_by, "partition_by"),
514            (topic == &other.topic, "topic"),
515            (
516                compression_type == &other.compression_type,
517                "compression_type",
518            ),
519            (
520                progress_group_id == &other.progress_group_id,
521                "progress_group_id",
522            ),
523            (
524                transactional_id == &other.transactional_id,
525                "transactional_id",
526            ),
527            (topic_options == &other.topic_options, "topic_config"),
528            (
529                topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
530                "topic_metadata_refresh_interval",
531            ),
532        ];
533        for (compatible, field) in compatibility_checks {
534            if !compatible {
535                tracing::warn!(
536                    "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
537                    self,
538                    other
539                );
540
541                return Err(AlterError { id });
542            }
543        }
544
545        Ok(())
546    }
547}
548
549impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
550    for KafkaSinkConnection<ReferencedConnection>
551{
552    fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
553        let KafkaSinkConnection {
554            connection_id,
555            connection,
556            format,
557            relation_key_indices,
558            key_desc_and_indices,
559            headers_index,
560            value_desc,
561            partition_by,
562            topic,
563            compression_type,
564            progress_group_id,
565            transactional_id,
566            topic_options,
567            topic_metadata_refresh_interval,
568        } = self;
569        KafkaSinkConnection {
570            connection_id,
571            connection: r.resolve_connection(connection).unwrap_kafka(),
572            format: format.into_inline_connection(r),
573            relation_key_indices,
574            key_desc_and_indices,
575            headers_index,
576            value_desc,
577            partition_by,
578            topic,
579            compression_type,
580            progress_group_id,
581            transactional_id,
582            topic_options,
583            topic_metadata_refresh_interval,
584        }
585    }
586}
587
588#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
589pub enum KafkaIdStyle {
590    /// A new-style id that is optionally prefixed.
591    Prefix(Option<String>),
592    /// A legacy style id.
593    Legacy,
594}
595
596impl RustType<ProtoKafkaIdStyle> for KafkaIdStyle {
597    fn into_proto(&self) -> ProtoKafkaIdStyle {
598        use crate::sinks::proto_kafka_id_style::Kind::*;
599        use crate::sinks::proto_kafka_id_style::ProtoKafkaIdStylePrefix;
600
601        ProtoKafkaIdStyle {
602            kind: Some(match self {
603                Self::Prefix(prefix) => Prefix(ProtoKafkaIdStylePrefix {
604                    prefix: prefix.into_proto(),
605                }),
606                Self::Legacy => Legacy(()),
607            }),
608        }
609    }
610    fn from_proto(proto: ProtoKafkaIdStyle) -> Result<Self, TryFromProtoError> {
611        use crate::sinks::proto_kafka_id_style::Kind::*;
612
613        let kind = proto
614            .kind
615            .ok_or_else(|| TryFromProtoError::missing_field("ProtoKafkaIdStyle::kind"))?;
616
617        Ok(match kind {
618            Prefix(prefix) => Self::Prefix(prefix.prefix.into_rust()?),
619            Legacy(()) => Self::Legacy,
620        })
621    }
622}
623
624impl RustType<ProtoKafkaSinkConnectionV2> for KafkaSinkConnection {
625    fn into_proto(&self) -> ProtoKafkaSinkConnectionV2 {
626        use crate::sinks::proto_kafka_sink_connection_v2::CompressionType;
627        ProtoKafkaSinkConnectionV2 {
628            connection_id: Some(self.connection_id.into_proto()),
629            connection: Some(self.connection.into_proto()),
630            format: Some(self.format.into_proto()),
631            key_desc_and_indices: self.key_desc_and_indices.into_proto(),
632            relation_key_indices: self.relation_key_indices.into_proto(),
633            headers_index: self.headers_index.into_proto(),
634            value_desc: Some(self.value_desc.into_proto()),
635            partition_by: self.partition_by.into_proto(),
636            topic: self.topic.clone(),
637            compression_type: Some(match self.compression_type {
638                KafkaSinkCompressionType::None => CompressionType::None(()),
639                KafkaSinkCompressionType::Gzip => CompressionType::Gzip(()),
640                KafkaSinkCompressionType::Snappy => CompressionType::Snappy(()),
641                KafkaSinkCompressionType::Lz4 => CompressionType::Lz4(()),
642                KafkaSinkCompressionType::Zstd => CompressionType::Zstd(()),
643            }),
644            progress_group_id: Some(self.progress_group_id.into_proto()),
645            transactional_id: Some(self.transactional_id.into_proto()),
646            topic_options: Some(self.topic_options.into_proto()),
647            topic_metadata_refresh_interval: Some(
648                self.topic_metadata_refresh_interval.into_proto(),
649            ),
650        }
651    }
652
653    fn from_proto(proto: ProtoKafkaSinkConnectionV2) -> Result<Self, TryFromProtoError> {
654        use crate::sinks::proto_kafka_sink_connection_v2::CompressionType;
655        Ok(KafkaSinkConnection {
656            connection_id: proto
657                .connection_id
658                .into_rust_if_some("ProtoKafkaSinkConnectionV2::connection_id")?,
659            connection: proto
660                .connection
661                .into_rust_if_some("ProtoKafkaSinkConnectionV2::connection")?,
662            format: proto
663                .format
664                .into_rust_if_some("ProtoKafkaSinkConnectionV2::format")?,
665            key_desc_and_indices: proto.key_desc_and_indices.into_rust()?,
666            relation_key_indices: proto.relation_key_indices.into_rust()?,
667            headers_index: proto.headers_index.into_rust()?,
668            value_desc: proto
669                .value_desc
670                .into_rust_if_some("ProtoKafkaSinkConnectionV2::value_desc")?,
671            partition_by: proto.partition_by.into_rust()?,
672            topic: proto.topic,
673            compression_type: match proto.compression_type {
674                Some(CompressionType::None(())) => KafkaSinkCompressionType::None,
675                Some(CompressionType::Gzip(())) => KafkaSinkCompressionType::Gzip,
676                Some(CompressionType::Snappy(())) => KafkaSinkCompressionType::Snappy,
677                Some(CompressionType::Lz4(())) => KafkaSinkCompressionType::Lz4,
678                Some(CompressionType::Zstd(())) => KafkaSinkCompressionType::Zstd,
679                None => {
680                    return Err(TryFromProtoError::missing_field(
681                        "ProtoKafkaSinkConnectionV2::compression_type",
682                    ));
683                }
684            },
685            progress_group_id: proto
686                .progress_group_id
687                .into_rust_if_some("ProtoKafkaSinkConnectionV2::progress_group_id")?,
688            transactional_id: proto
689                .transactional_id
690                .into_rust_if_some("ProtoKafkaSinkConnectionV2::transactional_id")?,
691            topic_options: match proto.topic_options {
692                Some(topic_options) => topic_options.into_rust()?,
693                None => Default::default(),
694            },
695            topic_metadata_refresh_interval: proto
696                .topic_metadata_refresh_interval
697                .into_rust_if_some("ProtoKafkaSinkConnectionV2::topic_metadata_refresh_interval")?,
698        })
699    }
700}
701
702#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
703pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
704    pub key_format: Option<KafkaSinkFormatType<C>>,
705    pub value_format: KafkaSinkFormatType<C>,
706}
707
708#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
709pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
710    Avro {
711        schema: String,
712        compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
713        csr_connection: C::Csr,
714    },
715    Json,
716    Text,
717    Bytes,
718}
719
720impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
721    pub fn get_format_name(&self) -> &str {
722        match self {
723            Self::Avro { .. } => "avro",
724            Self::Json => "json",
725            Self::Text => "text",
726            Self::Bytes => "bytes",
727        }
728    }
729}
730
731impl<C: ConnectionAccess> KafkaSinkFormat<C> {
732    pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
733        // For legacy reasons, if the key-format is none or the key & value formats are
734        // both the same (either avro or json), we return the value format name,
735        // otherwise we return a composite name.
736        match &self.key_format {
737            None => self.value_format.get_format_name().into(),
738            Some(key_format) => match (key_format, &self.value_format) {
739                (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
740                    "avro".into()
741                }
742                (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
743                (keyf, valuef) => format!(
744                    "key-{}-value-{}",
745                    keyf.get_format_name(),
746                    valuef.get_format_name()
747                )
748                .into(),
749            },
750        }
751    }
752
753    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
754        if self == other {
755            return Ok(());
756        }
757
758        match (&self.value_format, &other.value_format) {
759            (
760                KafkaSinkFormatType::Avro {
761                    schema,
762                    compatibility_level: _,
763                    csr_connection,
764                },
765                KafkaSinkFormatType::Avro {
766                    schema: other_schema,
767                    compatibility_level: _,
768                    csr_connection: other_csr_connection,
769                },
770            ) => {
771                if schema != other_schema
772                    || csr_connection
773                        .alter_compatible(id, other_csr_connection)
774                        .is_err()
775                {
776                    tracing::warn!(
777                        "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
778                        self,
779                        other
780                    );
781
782                    return Err(AlterError { id });
783                }
784            }
785            (s, o) => {
786                if s != o {
787                    tracing::warn!(
788                        "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
789                        s,
790                        o
791                    );
792                    return Err(AlterError { id });
793                }
794            }
795        }
796
797        match (&self.key_format, &other.key_format) {
798            (
799                Some(KafkaSinkFormatType::Avro {
800                    schema,
801                    compatibility_level: _,
802                    csr_connection,
803                }),
804                Some(KafkaSinkFormatType::Avro {
805                    schema: other_schema,
806                    compatibility_level: _,
807                    csr_connection: other_csr_connection,
808                }),
809            ) => {
810                if schema != other_schema
811                    || csr_connection
812                        .alter_compatible(id, other_csr_connection)
813                        .is_err()
814                {
815                    tracing::warn!(
816                        "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
817                        self,
818                        other
819                    );
820
821                    return Err(AlterError { id });
822                }
823            }
824            (s, o) => {
825                if s != o {
826                    tracing::warn!(
827                        "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
828                        s,
829                        o
830                    );
831                    return Err(AlterError { id });
832                }
833            }
834        }
835
836        Ok(())
837    }
838}
839
840impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
841    for KafkaSinkFormat<ReferencedConnection>
842{
843    fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
844        KafkaSinkFormat {
845            key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
846            value_format: self.value_format.into_inline_connection(&r),
847        }
848    }
849}
850
851impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
852    for KafkaSinkFormatType<ReferencedConnection>
853{
854    fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
855        match self {
856            KafkaSinkFormatType::Avro {
857                schema,
858                compatibility_level,
859                csr_connection,
860            } => KafkaSinkFormatType::Avro {
861                schema,
862                compatibility_level,
863                csr_connection: r.resolve_connection(csr_connection).unwrap_csr(),
864            },
865            KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
866            KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
867            KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
868        }
869    }
870}
871
872impl RustType<ProtoKafkaSinkFormatType> for KafkaSinkFormatType {
873    fn into_proto(&self) -> ProtoKafkaSinkFormatType {
874        use proto_kafka_sink_format_type::Type;
875        ProtoKafkaSinkFormatType {
876            r#type: Some(match self {
877                Self::Avro {
878                    schema,
879                    compatibility_level,
880                    csr_connection,
881                } => Type::Avro(proto_kafka_sink_format_type::ProtoKafkaSinkAvroFormat {
882                    schema: schema.clone(),
883                    compatibility_level: csr_compat_level_to_proto(compatibility_level),
884                    csr_connection: Some(csr_connection.into_proto()),
885                }),
886                Self::Json => Type::Json(()),
887                Self::Text => Type::Text(()),
888                Self::Bytes => Type::Bytes(()),
889            }),
890        }
891    }
892
893    fn from_proto(proto: ProtoKafkaSinkFormatType) -> Result<Self, TryFromProtoError> {
894        use proto_kafka_sink_format_type::Type;
895        let r#type = proto
896            .r#type
897            .ok_or_else(|| TryFromProtoError::missing_field("ProtoKafkaSinkFormatType::type"))?;
898
899        Ok(match r#type {
900            Type::Avro(proto) => Self::Avro {
901                schema: proto.schema,
902                compatibility_level: csr_compat_level_from_proto(proto.compatibility_level),
903                csr_connection: proto
904                    .csr_connection
905                    .into_rust_if_some("ProtoKafkaSinkFormatType::csr_connection")?,
906            },
907            Type::Json(()) => Self::Json,
908            Type::Text(()) => Self::Text,
909            Type::Bytes(()) => Self::Bytes,
910        })
911    }
912}
913
914impl RustType<ProtoKafkaSinkFormat> for KafkaSinkFormat {
915    fn into_proto(&self) -> ProtoKafkaSinkFormat {
916        ProtoKafkaSinkFormat {
917            key_format: self.key_format.as_ref().map(|f| f.into_proto()),
918            value_format: Some(self.value_format.into_proto()),
919        }
920    }
921
922    fn from_proto(proto: ProtoKafkaSinkFormat) -> Result<Self, TryFromProtoError> {
923        Ok(KafkaSinkFormat {
924            key_format: proto.key_format.into_rust()?,
925            value_format: proto
926                .value_format
927                .into_rust_if_some("ProtoKafkaSinkFormat::value_format")?,
928        })
929    }
930}
931
932fn csr_compat_level_to_proto(compatibility_level: &Option<mz_ccsr::CompatibilityLevel>) -> i32 {
933    use proto_kafka_sink_format_type::proto_kafka_sink_avro_format::CompatibilityLevel as ProtoCompatLevel;
934    match compatibility_level {
935        Some(level) => match level {
936            mz_ccsr::CompatibilityLevel::Backward => ProtoCompatLevel::Backward,
937            mz_ccsr::CompatibilityLevel::BackwardTransitive => ProtoCompatLevel::BackwardTransitive,
938            mz_ccsr::CompatibilityLevel::Forward => ProtoCompatLevel::Forward,
939            mz_ccsr::CompatibilityLevel::ForwardTransitive => ProtoCompatLevel::ForwardTransitive,
940            mz_ccsr::CompatibilityLevel::Full => ProtoCompatLevel::Full,
941            mz_ccsr::CompatibilityLevel::FullTransitive => ProtoCompatLevel::FullTransitive,
942            mz_ccsr::CompatibilityLevel::None => ProtoCompatLevel::None,
943        },
944        None => ProtoCompatLevel::Unset,
945    }
946    .into()
947}
948
949fn csr_compat_level_from_proto(val: i32) -> Option<mz_ccsr::CompatibilityLevel> {
950    use proto_kafka_sink_format_type::proto_kafka_sink_avro_format::CompatibilityLevel as ProtoCompatLevel;
951    match ProtoCompatLevel::try_from(val) {
952        Ok(ProtoCompatLevel::Backward) => Some(mz_ccsr::CompatibilityLevel::Backward),
953        Ok(ProtoCompatLevel::BackwardTransitive) => {
954            Some(mz_ccsr::CompatibilityLevel::BackwardTransitive)
955        }
956        Ok(ProtoCompatLevel::Forward) => Some(mz_ccsr::CompatibilityLevel::Forward),
957        Ok(ProtoCompatLevel::ForwardTransitive) => {
958            Some(mz_ccsr::CompatibilityLevel::ForwardTransitive)
959        }
960        Ok(ProtoCompatLevel::Full) => Some(mz_ccsr::CompatibilityLevel::Full),
961        Ok(ProtoCompatLevel::FullTransitive) => Some(mz_ccsr::CompatibilityLevel::FullTransitive),
962        Ok(ProtoCompatLevel::None) => Some(mz_ccsr::CompatibilityLevel::None),
963        Ok(ProtoCompatLevel::Unset) => None,
964        Err(_) => None,
965    }
966}
967
968#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
969pub enum S3SinkFormat {
970    /// Encoded using the PG `COPY` protocol, with one of its supported formats.
971    PgCopy(CopyFormatParams<'static>),
972    /// Encoded as Parquet.
973    Parquet,
974}
975
976impl RustType<ProtoS3SinkFormat> for S3SinkFormat {
977    fn into_proto(&self) -> ProtoS3SinkFormat {
978        use proto_s3_sink_format::Kind;
979        ProtoS3SinkFormat {
980            kind: Some(match self {
981                Self::PgCopy(params) => Kind::PgCopy(params.into_proto()),
982                Self::Parquet => Kind::Parquet(()),
983            }),
984        }
985    }
986
987    fn from_proto(proto: ProtoS3SinkFormat) -> Result<Self, TryFromProtoError> {
988        use proto_s3_sink_format::Kind;
989        let kind = proto
990            .kind
991            .ok_or_else(|| TryFromProtoError::missing_field("ProtoS3SinkFormat::kind"))?;
992
993        Ok(match kind {
994            Kind::PgCopy(proto) => Self::PgCopy(proto.into_rust()?),
995            Kind::Parquet(_) => Self::Parquet,
996        })
997    }
998}
999
1000/// Info required to copy the data to s3.
1001#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
1002pub struct S3UploadInfo {
1003    /// The s3 uri path to write the data to.
1004    pub uri: String,
1005    /// The max file size of each file uploaded to S3.
1006    pub max_file_size: u64,
1007    /// The relation desc of the data to be uploaded to S3.
1008    pub desc: RelationDesc,
1009    /// The selected sink format.
1010    pub format: S3SinkFormat,
1011}
1012
1013impl RustType<ProtoS3UploadInfo> for S3UploadInfo {
1014    fn into_proto(&self) -> ProtoS3UploadInfo {
1015        ProtoS3UploadInfo {
1016            uri: self.uri.clone(),
1017            max_file_size: self.max_file_size,
1018            desc: Some(self.desc.into_proto()),
1019            format: Some(self.format.into_proto()),
1020        }
1021    }
1022
1023    fn from_proto(proto: ProtoS3UploadInfo) -> Result<Self, TryFromProtoError> {
1024        Ok(S3UploadInfo {
1025            uri: proto.uri,
1026            max_file_size: proto.max_file_size,
1027            desc: proto.desc.into_rust_if_some("ProtoS3UploadInfo::desc")?,
1028            format: proto
1029                .format
1030                .into_rust_if_some("ProtoS3UploadInfo::format")?,
1031        })
1032    }
1033}
1034
1035pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
1036pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);