Skip to main content

mz_storage_types/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to reporting changing collections out of `dataflow`.
11
12use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::bytes::ByteSize;
20use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
21#[cfg(any(test, feature = "proptest"))]
22use proptest_derive::Arbitrary;
23use serde::{Deserialize, Serialize};
24use timely::PartialOrder;
25use timely::progress::frontier::Antichain;
26
27use crate::AlterCompatible;
28use crate::connections::inline::{
29    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
30    ReferencedConnection,
31};
32use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
33use crate::controller::AlterError;
34use crate::wire_format::WireFormat;
35
36pub mod s3_oneshot_sink;
37
38/// A sink for updates to a relational collection.
39#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
40pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
41    pub from: GlobalId,
42    pub from_desc: RelationDesc,
43    pub connection: StorageSinkConnection,
44    pub with_snapshot: bool,
45    pub version: u64,
46    pub envelope: SinkEnvelope,
47    pub as_of: Antichain<T>,
48    pub from_storage_metadata: S,
49    pub to_storage_metadata: S,
50    /// The interval at which to commit data to the sink.
51    /// This isn't universally supported by all sinks
52    /// yet, so it is optional. Even for sinks that might
53    /// support it in the future (ahem, kafka) users might
54    /// not want to set it.
55    pub commit_interval: Option<Duration>,
56}
57
58impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
59    for StorageSinkDesc<S, T>
60{
61    /// Determines if `self` is compatible with another `StorageSinkDesc`, in
62    /// such a way that it is possible to turn `self` into `other` through a
63    /// valid series of transformations.
64    ///
65    /// Currently, the only "valid transformation" is the passage of time such
66    /// that the sink's as ofs may differ. However, this will change once we
67    /// support `ALTER CONNECTION` or `ALTER SINK`.
68    fn alter_compatible(
69        &self,
70        id: GlobalId,
71        other: &StorageSinkDesc<S, T>,
72    ) -> Result<(), AlterError> {
73        if self == other {
74            return Ok(());
75        }
76        let StorageSinkDesc {
77            from,
78            from_desc,
79            connection,
80            envelope,
81            version: _,
82            // The as-of of the descriptions may differ.
83            as_of: _,
84            from_storage_metadata,
85            with_snapshot,
86            to_storage_metadata,
87            commit_interval: _,
88        } = self;
89
90        let compatibility_checks = [
91            (from == &other.from, "from"),
92            (from_desc == &other.from_desc, "from_desc"),
93            (
94                connection.alter_compatible(id, &other.connection).is_ok(),
95                "connection",
96            ),
97            (envelope == &other.envelope, "envelope"),
98            // This can legally change from true to false once the snapshot has been
99            // written out.
100            (*with_snapshot || !other.with_snapshot, "with_snapshot"),
101            (
102                from_storage_metadata == &other.from_storage_metadata,
103                "from_storage_metadata",
104            ),
105            (
106                to_storage_metadata == &other.to_storage_metadata,
107                "to_storage_metadata",
108            ),
109        ];
110
111        for (compatible, field) in compatibility_checks {
112            if !compatible {
113                tracing::warn!(
114                    "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
115                    self,
116                    other
117                );
118
119                return Err(AlterError { id });
120            }
121        }
122
123        Ok(())
124    }
125}
126
127#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
128pub enum SinkEnvelope {
129    /// Only used for Kafka.
130    Debezium,
131    Upsert,
132    /// Only used for Iceberg.
133    Append,
134}
135
136#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
137pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
138    Kafka(KafkaSinkConnection<C>),
139    Iceberg(IcebergSinkConnection<C>),
140}
141
142impl<C: ConnectionAccess> StorageSinkConnection<C> {
143    /// Determines if `self` is compatible with another `StorageSinkConnection`,
144    /// in such a way that it is possible to turn `self` into `other` through a
145    /// valid series of transformations (e.g. no transformation or `ALTER
146    /// CONNECTION`).
147    pub fn alter_compatible(
148        &self,
149        id: GlobalId,
150        other: &StorageSinkConnection<C>,
151    ) -> Result<(), AlterError> {
152        if self == other {
153            return Ok(());
154        }
155        match (self, other) {
156            (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
157                s.alter_compatible(id, o)?
158            }
159            (StorageSinkConnection::Iceberg(s), StorageSinkConnection::Iceberg(o)) => {
160                s.alter_compatible(id, o)?
161            }
162            _ => {
163                tracing::warn!(
164                    "StorageSinkConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
165                    self,
166                    other
167                );
168                return Err(AlterError { id });
169            }
170        }
171
172        Ok(())
173    }
174}
175
176impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
177    for StorageSinkConnection<ReferencedConnection>
178{
179    fn into_inline_connection(self, r: R) -> StorageSinkConnection {
180        match self {
181            Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
182            Self::Iceberg(conn) => StorageSinkConnection::Iceberg(conn.into_inline_connection(r)),
183        }
184    }
185}
186
187impl<C: ConnectionAccess> StorageSinkConnection<C> {
188    /// returns an option to not constrain ourselves in the future
189    pub fn connection_id(&self) -> Option<CatalogItemId> {
190        use StorageSinkConnection::*;
191        match self {
192            Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
193            Iceberg(IcebergSinkConnection {
194                catalog_connection_id: connection_id,
195                ..
196            }) => Some(*connection_id),
197        }
198    }
199
200    /// Returns the name of the sink connection.
201    pub fn name(&self) -> &'static str {
202        use StorageSinkConnection::*;
203        match self {
204            Kafka(_) => "kafka",
205            Iceberg(_) => "iceberg",
206        }
207    }
208}
209
210#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
211pub enum KafkaSinkCompressionType {
212    None,
213    Gzip,
214    Snappy,
215    Lz4,
216    Zstd,
217}
218
219impl KafkaSinkCompressionType {
220    /// Format the compression type as expected by `compression.type` librdkafka
221    /// setting.
222    pub fn to_librdkafka_option(&self) -> &'static str {
223        match self {
224            KafkaSinkCompressionType::None => "none",
225            KafkaSinkCompressionType::Gzip => "gzip",
226            KafkaSinkCompressionType::Snappy => "snappy",
227            KafkaSinkCompressionType::Lz4 => "lz4",
228            KafkaSinkCompressionType::Zstd => "zstd",
229        }
230    }
231}
232
233#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
234pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
235    pub connection_id: CatalogItemId,
236    pub connection: C::Kafka,
237    pub format: KafkaSinkFormat<C>,
238    /// A natural key of the sinked relation (view or source).
239    pub relation_key_indices: Option<Vec<usize>>,
240    /// The user-specified key for the sink.
241    pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
242    /// The index of the column containing message headers value, if any.
243    pub headers_index: Option<usize>,
244    pub value_desc: RelationDesc,
245    /// An expression that, if present, computes a hash value that should be
246    /// used to determine the partition for each message.
247    pub partition_by: Option<MirScalarExpr>,
248    pub topic: String,
249    /// Options to use when creating the topic if it doesn't already exist.
250    pub topic_options: KafkaTopicOptions,
251    pub compression_type: KafkaSinkCompressionType,
252    pub progress_group_id: KafkaIdStyle,
253    pub transactional_id: KafkaIdStyle,
254    pub topic_metadata_refresh_interval: Duration,
255}
256
257impl KafkaSinkConnection {
258    /// Returns the client ID to register with librdkafka with.
259    ///
260    /// The caller is responsible for providing the sink ID as it is not known
261    /// to `KafkaSinkConnection`.
262    pub fn client_id(
263        &self,
264        configs: &ConfigSet,
265        connection_context: &ConnectionContext,
266        sink_id: GlobalId,
267    ) -> String {
268        let mut client_id =
269            KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
270        self.connection.enrich_client_id(configs, &mut client_id);
271        client_id
272    }
273
274    /// Returns the name of the progress topic to use for the sink.
275    pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<'_, str> {
276        self.connection
277            .progress_topic(connection_context, self.connection_id)
278    }
279
280    /// Returns the ID for the consumer group the sink will use to read the
281    /// progress topic on resumption.
282    ///
283    /// The caller is responsible for providing the sink ID as it is not known
284    /// to `KafkaSinkConnection`.
285    pub fn progress_group_id(
286        &self,
287        connection_context: &ConnectionContext,
288        sink_id: GlobalId,
289    ) -> String {
290        match self.progress_group_id {
291            KafkaIdStyle::Prefix(ref prefix) => format!(
292                "{}{}",
293                prefix.as_deref().unwrap_or(""),
294                KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
295            ),
296            KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
297        }
298    }
299
300    /// Returns the transactional ID to use for the sink.
301    ///
302    /// The caller is responsible for providing the sink ID as it is not known
303    /// to `KafkaSinkConnection`.
304    pub fn transactional_id(
305        &self,
306        connection_context: &ConnectionContext,
307        sink_id: GlobalId,
308    ) -> String {
309        match self.transactional_id {
310            KafkaIdStyle::Prefix(ref prefix) => format!(
311                "{}{}",
312                prefix.as_deref().unwrap_or(""),
313                KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
314            ),
315            KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
316        }
317    }
318}
319
320impl<C: ConnectionAccess> KafkaSinkConnection<C> {
321    /// Determines if `self` is compatible with another `StorageSinkConnection`,
322    /// in such a way that it is possible to turn `self` into `other` through a
323    /// valid series of transformations (e.g. no transformation or `ALTER
324    /// CONNECTION`).
325    pub fn alter_compatible(
326        &self,
327        id: GlobalId,
328        other: &KafkaSinkConnection<C>,
329    ) -> Result<(), AlterError> {
330        if self == other {
331            return Ok(());
332        }
333        let KafkaSinkConnection {
334            connection_id,
335            connection,
336            format,
337            relation_key_indices,
338            key_desc_and_indices,
339            headers_index,
340            value_desc,
341            partition_by,
342            topic,
343            compression_type,
344            progress_group_id,
345            transactional_id,
346            topic_options,
347            topic_metadata_refresh_interval,
348        } = self;
349
350        let compatibility_checks = [
351            (connection_id == &other.connection_id, "connection_id"),
352            (
353                connection.alter_compatible(id, &other.connection).is_ok(),
354                "connection",
355            ),
356            (format.alter_compatible(id, &other.format).is_ok(), "format"),
357            (
358                relation_key_indices == &other.relation_key_indices,
359                "relation_key_indices",
360            ),
361            (
362                key_desc_and_indices == &other.key_desc_and_indices,
363                "key_desc_and_indices",
364            ),
365            (headers_index == &other.headers_index, "headers_index"),
366            (value_desc == &other.value_desc, "value_desc"),
367            (partition_by == &other.partition_by, "partition_by"),
368            (topic == &other.topic, "topic"),
369            (
370                compression_type == &other.compression_type,
371                "compression_type",
372            ),
373            (
374                progress_group_id == &other.progress_group_id,
375                "progress_group_id",
376            ),
377            (
378                transactional_id == &other.transactional_id,
379                "transactional_id",
380            ),
381            (topic_options == &other.topic_options, "topic_config"),
382            (
383                topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
384                "topic_metadata_refresh_interval",
385            ),
386        ];
387        for (compatible, field) in compatibility_checks {
388            if !compatible {
389                tracing::warn!(
390                    "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
391                    self,
392                    other
393                );
394
395                return Err(AlterError { id });
396            }
397        }
398
399        Ok(())
400    }
401}
402
403impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
404    for KafkaSinkConnection<ReferencedConnection>
405{
406    fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
407        let KafkaSinkConnection {
408            connection_id,
409            connection,
410            format,
411            relation_key_indices,
412            key_desc_and_indices,
413            headers_index,
414            value_desc,
415            partition_by,
416            topic,
417            compression_type,
418            progress_group_id,
419            transactional_id,
420            topic_options,
421            topic_metadata_refresh_interval,
422        } = self;
423        KafkaSinkConnection {
424            connection_id,
425            connection: r.resolve_connection(connection).unwrap_kafka(),
426            format: format.into_inline_connection(r),
427            relation_key_indices,
428            key_desc_and_indices,
429            headers_index,
430            value_desc,
431            partition_by,
432            topic,
433            compression_type,
434            progress_group_id,
435            transactional_id,
436            topic_options,
437            topic_metadata_refresh_interval,
438        }
439    }
440}
441
442#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
443pub enum KafkaIdStyle {
444    /// A new-style id that is optionally prefixed.
445    Prefix(Option<String>),
446    /// A legacy style id.
447    Legacy,
448}
449
450#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
451pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
452    pub key_format: Option<KafkaSinkFormatType<C>>,
453    pub value_format: KafkaSinkFormatType<C>,
454}
455
456#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
457pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
458    Avro {
459        schema: String,
460        compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
461        /// Wire-format dispatch and the registry to publish to. Sinks
462        /// require a registry
463        wire_format: WireFormat<C>,
464    },
465    Json,
466    Text,
467    Bytes,
468}
469
470impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
471    pub fn get_format_name(&self) -> &str {
472        match self {
473            Self::Avro { .. } => "avro",
474            Self::Json => "json",
475            Self::Text => "text",
476            Self::Bytes => "bytes",
477        }
478    }
479}
480
481impl<C: ConnectionAccess> KafkaSinkFormat<C> {
482    pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
483        // For legacy reasons, if the key-format is none or the key & value formats are
484        // both the same (either avro or json), we return the value format name,
485        // otherwise we return a composite name.
486        match &self.key_format {
487            None => self.value_format.get_format_name().into(),
488            Some(key_format) => match (key_format, &self.value_format) {
489                (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
490                    "avro".into()
491                }
492                (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
493                (keyf, valuef) => format!(
494                    "key-{}-value-{}",
495                    keyf.get_format_name(),
496                    valuef.get_format_name()
497                )
498                .into(),
499            },
500        }
501    }
502
503    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
504        if self == other {
505            return Ok(());
506        }
507
508        match (&self.value_format, &other.value_format) {
509            (
510                KafkaSinkFormatType::Avro {
511                    schema,
512                    compatibility_level: _,
513                    wire_format,
514                },
515                KafkaSinkFormatType::Avro {
516                    schema: other_schema,
517                    compatibility_level: _,
518                    wire_format: other_wire_format,
519                },
520            ) => {
521                if schema != other_schema
522                    || wire_format.alter_compatible(id, other_wire_format).is_err()
523                {
524                    tracing::warn!(
525                        "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
526                        self,
527                        other
528                    );
529
530                    return Err(AlterError { id });
531                }
532            }
533            (s, o) => {
534                if s != o {
535                    tracing::warn!(
536                        "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
537                        s,
538                        o
539                    );
540                    return Err(AlterError { id });
541                }
542            }
543        }
544
545        match (&self.key_format, &other.key_format) {
546            (
547                Some(KafkaSinkFormatType::Avro {
548                    schema,
549                    compatibility_level: _,
550                    wire_format,
551                }),
552                Some(KafkaSinkFormatType::Avro {
553                    schema: other_schema,
554                    compatibility_level: _,
555                    wire_format: other_wire_format,
556                }),
557            ) => {
558                if schema != other_schema
559                    || wire_format.alter_compatible(id, other_wire_format).is_err()
560                {
561                    tracing::warn!(
562                        "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
563                        self,
564                        other
565                    );
566
567                    return Err(AlterError { id });
568                }
569            }
570            (s, o) => {
571                if s != o {
572                    tracing::warn!(
573                        "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
574                        s,
575                        o
576                    );
577                    return Err(AlterError { id });
578                }
579            }
580        }
581
582        Ok(())
583    }
584}
585
586impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
587    for KafkaSinkFormat<ReferencedConnection>
588{
589    fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
590        KafkaSinkFormat {
591            key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
592            value_format: self.value_format.into_inline_connection(&r),
593        }
594    }
595}
596
597impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
598    for KafkaSinkFormatType<ReferencedConnection>
599{
600    fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
601        match self {
602            KafkaSinkFormatType::Avro {
603                schema,
604                compatibility_level,
605                wire_format,
606            } => KafkaSinkFormatType::Avro {
607                schema,
608                compatibility_level,
609                wire_format: wire_format.into_inline_connection(r),
610            },
611            KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
612            KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
613            KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
614        }
615    }
616}
617
618#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
619pub enum S3SinkFormat {
620    /// Encoded using the PG `COPY` protocol, with one of its supported formats.
621    PgCopy(CopyFormatParams<'static>),
622    /// Encoded as Parquet.
623    Parquet,
624}
625
626/// Info required to copy the data to s3.
627#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
628pub struct S3UploadInfo {
629    /// The s3 uri path to write the data to.
630    pub uri: String,
631    /// The max file size of each file uploaded to S3.
632    pub max_file_size: u64,
633    /// The relation desc of the data to be uploaded to S3.
634    pub desc: RelationDesc,
635    /// The selected sink format.
636    pub format: S3SinkFormat,
637}
638
639pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
640pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);
641
642/// Column name appended by MODE APPEND Iceberg sinks to record the diff (+1/−1).
643pub const ICEBERG_APPEND_DIFF_COLUMN: &str = "_mz_diff";
644/// Column name appended by MODE APPEND Iceberg sinks to record the logical timestamp.
645pub const ICEBERG_APPEND_TIMESTAMP_COLUMN: &str = "_mz_timestamp";
646
647/// The precision needed to store all UInt64 values in a Decimal128.
648/// UInt64 max value is 18,446,744,073,709,551,615 which has 20 digits.
649pub const ICEBERG_UINT64_DECIMAL_PRECISION: u8 = 20;
650
651/// Type overrides for Iceberg-compatible Arrow schemas.
652///
653/// Iceberg doesn't support unsigned integer types or interval natively, so we
654/// map them to compatible types:
655/// - `UInt8`, `UInt16` -> `Int32`
656/// - `UInt32` -> `Int64`
657/// - `UInt64` -> `Decimal128(20, 0)`
658/// - `MzTimestamp` (which uses UInt64) -> `Decimal128(20, 0)`
659/// - `Interval` -> string (`LargeUtf8`)
660///
661/// Pass this to `mz_arrow_util::builder::desc_to_schema_with_overrides`
662/// when producing the Arrow schema for an iceberg sink, and to
663/// `mz_arrow_util::builder::ArrowBuilder::validate_desc_for_parquet` to
664/// validate the desc before sink creation.
665pub fn iceberg_type_overrides(
666    scalar_type: &mz_repr::SqlScalarType,
667) -> Option<(arrow::datatypes::DataType, String)> {
668    use arrow::datatypes::DataType;
669    use mz_repr::SqlScalarType;
670    match scalar_type {
671        SqlScalarType::UInt16 => Some((DataType::Int32, "uint2".to_string())),
672        SqlScalarType::UInt32 => Some((DataType::Int64, "uint4".to_string())),
673        SqlScalarType::UInt64 => Some((
674            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
675            "uint8".to_string(),
676        )),
677        SqlScalarType::MzTimestamp => Some((
678            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
679            "mz_timestamp".to_string(),
680        )),
681        SqlScalarType::Interval => Some((DataType::LargeUtf8, "interval".to_string())),
682        _ => None,
683    }
684}
685
686#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
687#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
688pub struct IcebergSinkConnection<C: ConnectionAccess = InlinedConnection> {
689    pub catalog_connection_id: CatalogItemId,
690    pub catalog_connection: C::IcebergCatalog,
691
692    /// We allow users to specify a separate (from the catalog) connection
693    /// for the storage layer, but we currently ignore it.
694    /// S3 Tables uses the same AWS connection for catalog and storage.
695    /// BigLake/Lakehouse uses the same GCP connection for catalog and storage.
696    ///
697    /// TODO(kynan): Once we need separate storage creds, make this generic.
698    ///   And check that the [`IcebergSinkConnection::alter_compatible`]
699    ///   implementation still handles `storage_connection` acceptably.
700    pub storage_connection_id: Option<CatalogItemId>,
701    pub storage_connection: Option<C::Aws>,
702
703    /// A natural key of the sinked relation (view or source).
704    pub relation_key_indices: Option<Vec<usize>>,
705    /// The user-specified key for the sink.
706    pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
707    pub namespace: String,
708    pub table: String,
709}
710
711impl<C: ConnectionAccess> IcebergSinkConnection<C> {
712    /// Determines if `self` is compatible with another `StorageSinkConnection`,
713    /// in such a way that it is possible to turn `self` into `other` through a
714    /// valid series of transformations (e.g. no transformation or `ALTER
715    /// CONNECTION`).
716    pub fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
717        if self == other {
718            return Ok(());
719        }
720        let IcebergSinkConnection {
721            catalog_connection_id: connection_id,
722            catalog_connection,
723            storage_connection_id,
724            storage_connection,
725            relation_key_indices,
726            key_desc_and_indices,
727            namespace,
728            table,
729        } = self;
730
731        let compatibility_checks = [
732            (
733                connection_id == &other.catalog_connection_id,
734                "connection_id",
735            ),
736            (
737                catalog_connection
738                    .alter_compatible(id, &other.catalog_connection)
739                    .is_ok(),
740                "catalog_connection",
741            ),
742            // We don't use `storage_connection_id` and `storage_connection`,
743            // so allow them to be removed.
744            (
745                other.storage_connection_id.is_none()
746                    || storage_connection_id == &other.storage_connection_id,
747                "storage_connection_id",
748            ),
749            (
750                match &other.storage_connection {
751                    None => true, // Removing a storage connection OR not adding a storage connection.
752                    Some(after) => {
753                        match storage_connection {
754                            None => false, // Adding a storage connection where there wasn't one before.
755                            Some(before) => before.alter_compatible(id, after).is_ok(),
756                        }
757                    }
758                },
759                "storage_connection",
760            ),
761            (
762                relation_key_indices == &other.relation_key_indices,
763                "relation_key_indices",
764            ),
765            (
766                key_desc_and_indices == &other.key_desc_and_indices,
767                "key_desc_and_indices",
768            ),
769            (namespace == &other.namespace, "namespace"),
770            (table == &other.table, "table"),
771        ];
772        for (compatible, field) in compatibility_checks {
773            if !compatible {
774                tracing::warn!(
775                    "IcebergSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
776                    self,
777                    other
778                );
779
780                return Err(AlterError { id });
781            }
782        }
783
784        Ok(())
785    }
786}
787
788impl<R: ConnectionResolver> IntoInlineConnection<IcebergSinkConnection, R>
789    for IcebergSinkConnection<ReferencedConnection>
790{
791    fn into_inline_connection(self, r: R) -> IcebergSinkConnection {
792        let IcebergSinkConnection {
793            catalog_connection_id,
794            catalog_connection,
795            storage_connection_id,
796            storage_connection,
797            relation_key_indices,
798            key_desc_and_indices,
799            namespace,
800            table,
801        } = self;
802        IcebergSinkConnection {
803            catalog_connection_id,
804            catalog_connection: r
805                .resolve_connection(catalog_connection)
806                .unwrap_iceberg_catalog(),
807            storage_connection_id,
808            storage_connection: storage_connection.map(|c| r.resolve_connection(c).unwrap_aws()),
809            relation_key_indices,
810            key_desc_and_indices,
811            namespace,
812            table,
813        }
814    }
815}