mz_storage_types/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to reporting changing collections out of `dataflow`.
11
12use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::bytes::ByteSize;
20use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
21use proptest_derive::Arbitrary;
22use serde::{Deserialize, Serialize};
23use timely::PartialOrder;
24use timely::progress::frontier::Antichain;
25
26use crate::AlterCompatible;
27use crate::connections::inline::{
28    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
29    ReferencedConnection,
30};
31use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
32use crate::controller::AlterError;
33
34pub mod s3_oneshot_sink;
35
36/// A sink for updates to a relational collection.
37#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
38pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
39    pub from: GlobalId,
40    pub from_desc: RelationDesc,
41    pub connection: StorageSinkConnection,
42    pub with_snapshot: bool,
43    pub version: u64,
44    pub envelope: SinkEnvelope,
45    pub as_of: Antichain<T>,
46    pub from_storage_metadata: S,
47    pub to_storage_metadata: S,
48}
49
50impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
51    for StorageSinkDesc<S, T>
52{
53    /// Determines if `self` is compatible with another `StorageSinkDesc`, in
54    /// such a way that it is possible to turn `self` into `other` through a
55    /// valid series of transformations.
56    ///
57    /// Currently, the only "valid transformation" is the passage of time such
58    /// that the sink's as ofs may differ. However, this will change once we
59    /// support `ALTER CONNECTION` or `ALTER SINK`.
60    fn alter_compatible(
61        &self,
62        id: GlobalId,
63        other: &StorageSinkDesc<S, T>,
64    ) -> Result<(), AlterError> {
65        if self == other {
66            return Ok(());
67        }
68        let StorageSinkDesc {
69            from,
70            from_desc,
71            connection,
72            envelope,
73            version: _,
74            // The as-of of the descriptions may differ.
75            as_of: _,
76            from_storage_metadata,
77            with_snapshot,
78            to_storage_metadata,
79        } = self;
80
81        let compatibility_checks = [
82            (from == &other.from, "from"),
83            (from_desc == &other.from_desc, "from_desc"),
84            (
85                connection.alter_compatible(id, &other.connection).is_ok(),
86                "connection",
87            ),
88            (envelope == &other.envelope, "envelope"),
89            // This can legally change from true to false once the snapshot has been
90            // written out.
91            (*with_snapshot || !other.with_snapshot, "with_snapshot"),
92            (
93                from_storage_metadata == &other.from_storage_metadata,
94                "from_storage_metadata",
95            ),
96            (
97                to_storage_metadata == &other.to_storage_metadata,
98                "to_storage_metadata",
99            ),
100        ];
101
102        for (compatible, field) in compatibility_checks {
103            if !compatible {
104                tracing::warn!(
105                    "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
106                    self,
107                    other
108                );
109
110                return Err(AlterError { id });
111            }
112        }
113
114        Ok(())
115    }
116}
117
118#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
119pub enum SinkEnvelope {
120    Debezium,
121    Upsert,
122}
123
124#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
125pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
126    Kafka(KafkaSinkConnection<C>),
127    Iceberg(IcebergSinkConnection<C>),
128}
129
130impl<C: ConnectionAccess> StorageSinkConnection<C> {
131    /// Determines if `self` is compatible with another `StorageSinkConnection`,
132    /// in such a way that it is possible to turn `self` into `other` through a
133    /// valid series of transformations (e.g. no transformation or `ALTER
134    /// CONNECTION`).
135    pub fn alter_compatible(
136        &self,
137        id: GlobalId,
138        other: &StorageSinkConnection<C>,
139    ) -> Result<(), AlterError> {
140        if self == other {
141            return Ok(());
142        }
143        match (self, other) {
144            (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
145                s.alter_compatible(id, o)?
146            }
147            (StorageSinkConnection::Iceberg(s), StorageSinkConnection::Iceberg(o)) => {
148                s.alter_compatible(id, o)?
149            }
150            _ => {
151                tracing::warn!(
152                    "StorageSinkConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
153                    self,
154                    other
155                );
156                return Err(AlterError { id });
157            }
158        }
159
160        Ok(())
161    }
162}
163
164impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
165    for StorageSinkConnection<ReferencedConnection>
166{
167    fn into_inline_connection(self, r: R) -> StorageSinkConnection {
168        match self {
169            Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
170            Self::Iceberg(conn) => StorageSinkConnection::Iceberg(conn.into_inline_connection(r)),
171        }
172    }
173}
174
175impl<C: ConnectionAccess> StorageSinkConnection<C> {
176    /// returns an option to not constrain ourselves in the future
177    pub fn connection_id(&self) -> Option<CatalogItemId> {
178        use StorageSinkConnection::*;
179        match self {
180            Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
181            Iceberg(IcebergSinkConnection {
182                catalog_connection_id: connection_id,
183                ..
184            }) => Some(*connection_id),
185        }
186    }
187
188    /// Returns the name of the sink connection.
189    pub fn name(&self) -> &'static str {
190        use StorageSinkConnection::*;
191        match self {
192            Kafka(_) => "kafka",
193            Iceberg(_) => "iceberg",
194        }
195    }
196}
197
198#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
199pub enum KafkaSinkCompressionType {
200    None,
201    Gzip,
202    Snappy,
203    Lz4,
204    Zstd,
205}
206
207impl KafkaSinkCompressionType {
208    /// Format the compression type as expected by `compression.type` librdkafka
209    /// setting.
210    pub fn to_librdkafka_option(&self) -> &'static str {
211        match self {
212            KafkaSinkCompressionType::None => "none",
213            KafkaSinkCompressionType::Gzip => "gzip",
214            KafkaSinkCompressionType::Snappy => "snappy",
215            KafkaSinkCompressionType::Lz4 => "lz4",
216            KafkaSinkCompressionType::Zstd => "zstd",
217        }
218    }
219}
220
221#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
222pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
223    pub connection_id: CatalogItemId,
224    pub connection: C::Kafka,
225    pub format: KafkaSinkFormat<C>,
226    /// A natural key of the sinked relation (view or source).
227    pub relation_key_indices: Option<Vec<usize>>,
228    /// The user-specified key for the sink.
229    pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
230    /// The index of the column containing message headers value, if any.
231    pub headers_index: Option<usize>,
232    pub value_desc: RelationDesc,
233    /// An expression that, if present, computes a hash value that should be
234    /// used to determine the partition for each message.
235    pub partition_by: Option<MirScalarExpr>,
236    pub topic: String,
237    /// Options to use when creating the topic if it doesn't already exist.
238    pub topic_options: KafkaTopicOptions,
239    pub compression_type: KafkaSinkCompressionType,
240    pub progress_group_id: KafkaIdStyle,
241    pub transactional_id: KafkaIdStyle,
242    pub topic_metadata_refresh_interval: Duration,
243}
244
245impl KafkaSinkConnection {
246    /// Returns the client ID to register with librdkafka with.
247    ///
248    /// The caller is responsible for providing the sink ID as it is not known
249    /// to `KafkaSinkConnection`.
250    pub fn client_id(
251        &self,
252        configs: &ConfigSet,
253        connection_context: &ConnectionContext,
254        sink_id: GlobalId,
255    ) -> String {
256        let mut client_id =
257            KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
258        self.connection.enrich_client_id(configs, &mut client_id);
259        client_id
260    }
261
262    /// Returns the name of the progress topic to use for the sink.
263    pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<'_, str> {
264        self.connection
265            .progress_topic(connection_context, self.connection_id)
266    }
267
268    /// Returns the ID for the consumer group the sink will use to read the
269    /// progress topic on resumption.
270    ///
271    /// The caller is responsible for providing the sink ID as it is not known
272    /// to `KafkaSinkConnection`.
273    pub fn progress_group_id(
274        &self,
275        connection_context: &ConnectionContext,
276        sink_id: GlobalId,
277    ) -> String {
278        match self.progress_group_id {
279            KafkaIdStyle::Prefix(ref prefix) => format!(
280                "{}{}",
281                prefix.as_deref().unwrap_or(""),
282                KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
283            ),
284            KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
285        }
286    }
287
288    /// Returns the transactional ID to use for the sink.
289    ///
290    /// The caller is responsible for providing the sink ID as it is not known
291    /// to `KafkaSinkConnection`.
292    pub fn transactional_id(
293        &self,
294        connection_context: &ConnectionContext,
295        sink_id: GlobalId,
296    ) -> String {
297        match self.transactional_id {
298            KafkaIdStyle::Prefix(ref prefix) => format!(
299                "{}{}",
300                prefix.as_deref().unwrap_or(""),
301                KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
302            ),
303            KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
304        }
305    }
306}
307
308impl<C: ConnectionAccess> KafkaSinkConnection<C> {
309    /// Determines if `self` is compatible with another `StorageSinkConnection`,
310    /// in such a way that it is possible to turn `self` into `other` through a
311    /// valid series of transformations (e.g. no transformation or `ALTER
312    /// CONNECTION`).
313    pub fn alter_compatible(
314        &self,
315        id: GlobalId,
316        other: &KafkaSinkConnection<C>,
317    ) -> Result<(), AlterError> {
318        if self == other {
319            return Ok(());
320        }
321        let KafkaSinkConnection {
322            connection_id,
323            connection,
324            format,
325            relation_key_indices,
326            key_desc_and_indices,
327            headers_index,
328            value_desc,
329            partition_by,
330            topic,
331            compression_type,
332            progress_group_id,
333            transactional_id,
334            topic_options,
335            topic_metadata_refresh_interval,
336        } = self;
337
338        let compatibility_checks = [
339            (connection_id == &other.connection_id, "connection_id"),
340            (
341                connection.alter_compatible(id, &other.connection).is_ok(),
342                "connection",
343            ),
344            (format.alter_compatible(id, &other.format).is_ok(), "format"),
345            (
346                relation_key_indices == &other.relation_key_indices,
347                "relation_key_indices",
348            ),
349            (
350                key_desc_and_indices == &other.key_desc_and_indices,
351                "key_desc_and_indices",
352            ),
353            (headers_index == &other.headers_index, "headers_index"),
354            (value_desc == &other.value_desc, "value_desc"),
355            (partition_by == &other.partition_by, "partition_by"),
356            (topic == &other.topic, "topic"),
357            (
358                compression_type == &other.compression_type,
359                "compression_type",
360            ),
361            (
362                progress_group_id == &other.progress_group_id,
363                "progress_group_id",
364            ),
365            (
366                transactional_id == &other.transactional_id,
367                "transactional_id",
368            ),
369            (topic_options == &other.topic_options, "topic_config"),
370            (
371                topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
372                "topic_metadata_refresh_interval",
373            ),
374        ];
375        for (compatible, field) in compatibility_checks {
376            if !compatible {
377                tracing::warn!(
378                    "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
379                    self,
380                    other
381                );
382
383                return Err(AlterError { id });
384            }
385        }
386
387        Ok(())
388    }
389}
390
391impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
392    for KafkaSinkConnection<ReferencedConnection>
393{
394    fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
395        let KafkaSinkConnection {
396            connection_id,
397            connection,
398            format,
399            relation_key_indices,
400            key_desc_and_indices,
401            headers_index,
402            value_desc,
403            partition_by,
404            topic,
405            compression_type,
406            progress_group_id,
407            transactional_id,
408            topic_options,
409            topic_metadata_refresh_interval,
410        } = self;
411        KafkaSinkConnection {
412            connection_id,
413            connection: r.resolve_connection(connection).unwrap_kafka(),
414            format: format.into_inline_connection(r),
415            relation_key_indices,
416            key_desc_and_indices,
417            headers_index,
418            value_desc,
419            partition_by,
420            topic,
421            compression_type,
422            progress_group_id,
423            transactional_id,
424            topic_options,
425            topic_metadata_refresh_interval,
426        }
427    }
428}
429
430#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
431pub enum KafkaIdStyle {
432    /// A new-style id that is optionally prefixed.
433    Prefix(Option<String>),
434    /// A legacy style id.
435    Legacy,
436}
437
438#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
439pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
440    pub key_format: Option<KafkaSinkFormatType<C>>,
441    pub value_format: KafkaSinkFormatType<C>,
442}
443
444#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
445pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
446    Avro {
447        schema: String,
448        compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
449        csr_connection: C::Csr,
450    },
451    Json,
452    Text,
453    Bytes,
454}
455
456impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
457    pub fn get_format_name(&self) -> &str {
458        match self {
459            Self::Avro { .. } => "avro",
460            Self::Json => "json",
461            Self::Text => "text",
462            Self::Bytes => "bytes",
463        }
464    }
465}
466
467impl<C: ConnectionAccess> KafkaSinkFormat<C> {
468    pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
469        // For legacy reasons, if the key-format is none or the key & value formats are
470        // both the same (either avro or json), we return the value format name,
471        // otherwise we return a composite name.
472        match &self.key_format {
473            None => self.value_format.get_format_name().into(),
474            Some(key_format) => match (key_format, &self.value_format) {
475                (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
476                    "avro".into()
477                }
478                (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
479                (keyf, valuef) => format!(
480                    "key-{}-value-{}",
481                    keyf.get_format_name(),
482                    valuef.get_format_name()
483                )
484                .into(),
485            },
486        }
487    }
488
489    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
490        if self == other {
491            return Ok(());
492        }
493
494        match (&self.value_format, &other.value_format) {
495            (
496                KafkaSinkFormatType::Avro {
497                    schema,
498                    compatibility_level: _,
499                    csr_connection,
500                },
501                KafkaSinkFormatType::Avro {
502                    schema: other_schema,
503                    compatibility_level: _,
504                    csr_connection: other_csr_connection,
505                },
506            ) => {
507                if schema != other_schema
508                    || csr_connection
509                        .alter_compatible(id, other_csr_connection)
510                        .is_err()
511                {
512                    tracing::warn!(
513                        "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
514                        self,
515                        other
516                    );
517
518                    return Err(AlterError { id });
519                }
520            }
521            (s, o) => {
522                if s != o {
523                    tracing::warn!(
524                        "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
525                        s,
526                        o
527                    );
528                    return Err(AlterError { id });
529                }
530            }
531        }
532
533        match (&self.key_format, &other.key_format) {
534            (
535                Some(KafkaSinkFormatType::Avro {
536                    schema,
537                    compatibility_level: _,
538                    csr_connection,
539                }),
540                Some(KafkaSinkFormatType::Avro {
541                    schema: other_schema,
542                    compatibility_level: _,
543                    csr_connection: other_csr_connection,
544                }),
545            ) => {
546                if schema != other_schema
547                    || csr_connection
548                        .alter_compatible(id, other_csr_connection)
549                        .is_err()
550                {
551                    tracing::warn!(
552                        "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
553                        self,
554                        other
555                    );
556
557                    return Err(AlterError { id });
558                }
559            }
560            (s, o) => {
561                if s != o {
562                    tracing::warn!(
563                        "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
564                        s,
565                        o
566                    );
567                    return Err(AlterError { id });
568                }
569            }
570        }
571
572        Ok(())
573    }
574}
575
576impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
577    for KafkaSinkFormat<ReferencedConnection>
578{
579    fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
580        KafkaSinkFormat {
581            key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
582            value_format: self.value_format.into_inline_connection(&r),
583        }
584    }
585}
586
587impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
588    for KafkaSinkFormatType<ReferencedConnection>
589{
590    fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
591        match self {
592            KafkaSinkFormatType::Avro {
593                schema,
594                compatibility_level,
595                csr_connection,
596            } => KafkaSinkFormatType::Avro {
597                schema,
598                compatibility_level,
599                csr_connection: r.resolve_connection(csr_connection).unwrap_csr(),
600            },
601            KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
602            KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
603            KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
604        }
605    }
606}
607
608#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
609pub enum S3SinkFormat {
610    /// Encoded using the PG `COPY` protocol, with one of its supported formats.
611    PgCopy(CopyFormatParams<'static>),
612    /// Encoded as Parquet.
613    Parquet,
614}
615
616/// Info required to copy the data to s3.
617#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
618pub struct S3UploadInfo {
619    /// The s3 uri path to write the data to.
620    pub uri: String,
621    /// The max file size of each file uploaded to S3.
622    pub max_file_size: u64,
623    /// The relation desc of the data to be uploaded to S3.
624    pub desc: RelationDesc,
625    /// The selected sink format.
626    pub format: S3SinkFormat,
627}
628
629pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
630pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);
631
632#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
633pub struct IcebergSinkConnection<C: ConnectionAccess = InlinedConnection> {
634    pub catalog_connection_id: CatalogItemId,
635    pub catalog_connection: C::IcebergCatalog,
636    pub aws_connection_id: CatalogItemId,
637    pub aws_connection: C::Aws,
638    /// A natural key of the sinked relation (view or source).
639    pub relation_key_indices: Option<Vec<usize>>,
640    /// The user-specified key for the sink.
641    pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
642    pub namespace: String,
643    pub table: String,
644}
645
646impl<C: ConnectionAccess> IcebergSinkConnection<C> {
647    /// Determines if `self` is compatible with another `StorageSinkConnection`,
648    /// in such a way that it is possible to turn `self` into `other` through a
649    /// valid series of transformations (e.g. no transformation or `ALTER
650    /// CONNECTION`).
651    pub fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
652        if self == other {
653            return Ok(());
654        }
655        let IcebergSinkConnection {
656            catalog_connection_id: connection_id,
657            catalog_connection,
658            aws_connection_id,
659            aws_connection,
660            relation_key_indices,
661            key_desc_and_indices,
662            namespace,
663            table,
664        } = self;
665
666        let compatibility_checks = [
667            (
668                connection_id == &other.catalog_connection_id,
669                "connection_id",
670            ),
671            (
672                catalog_connection
673                    .alter_compatible(id, &other.catalog_connection)
674                    .is_ok(),
675                "catalog_connection",
676            ),
677            (
678                aws_connection_id == &other.aws_connection_id,
679                "aws_connection_id",
680            ),
681            (
682                aws_connection
683                    .alter_compatible(id, &other.aws_connection)
684                    .is_ok(),
685                "aws_connection",
686            ),
687            (
688                relation_key_indices == &other.relation_key_indices,
689                "relation_key_indices",
690            ),
691            (
692                key_desc_and_indices == &other.key_desc_and_indices,
693                "key_desc_and_indices",
694            ),
695            (namespace == &other.namespace, "namespace"),
696            (table == &other.table, "table"),
697        ];
698        for (compatible, field) in compatibility_checks {
699            if !compatible {
700                tracing::warn!(
701                    "IcebergSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
702                    self,
703                    other
704                );
705
706                return Err(AlterError { id });
707            }
708        }
709
710        Ok(())
711    }
712}
713
714impl<R: ConnectionResolver> IntoInlineConnection<IcebergSinkConnection, R>
715    for IcebergSinkConnection<ReferencedConnection>
716{
717    fn into_inline_connection(self, r: R) -> IcebergSinkConnection {
718        let IcebergSinkConnection {
719            catalog_connection_id,
720            catalog_connection,
721            aws_connection_id,
722            aws_connection,
723            relation_key_indices,
724            key_desc_and_indices,
725            namespace,
726            table,
727        } = self;
728        IcebergSinkConnection {
729            catalog_connection_id,
730            catalog_connection: r
731                .resolve_connection(catalog_connection)
732                .unwrap_iceberg_catalog(),
733            aws_connection_id,
734            aws_connection: r.resolve_connection(aws_connection).unwrap_aws(),
735            relation_key_indices,
736            key_desc_and_indices,
737            namespace,
738            table,
739        }
740    }
741}