Skip to main content

mz_storage_types/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to reporting changing collections out of `dataflow`.
11
12use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::bytes::ByteSize;
20use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
21#[cfg(any(test, feature = "proptest"))]
22use proptest_derive::Arbitrary;
23use serde::{Deserialize, Serialize};
24use timely::PartialOrder;
25use timely::progress::frontier::Antichain;
26
27use crate::AlterCompatible;
28use crate::connections::inline::{
29    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
30    ReferencedConnection,
31};
32use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
33use crate::controller::AlterError;
34
35pub mod s3_oneshot_sink;
36
37/// A sink for updates to a relational collection.
38#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
39pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
40    pub from: GlobalId,
41    pub from_desc: RelationDesc,
42    pub connection: StorageSinkConnection,
43    pub with_snapshot: bool,
44    pub version: u64,
45    pub envelope: SinkEnvelope,
46    pub as_of: Antichain<T>,
47    pub from_storage_metadata: S,
48    pub to_storage_metadata: S,
49    /// The interval at which to commit data to the sink.
50    /// This isn't universally supported by all sinks
51    /// yet, so it is optional. Even for sinks that might
52    /// support it in the future (ahem, kafka) users might
53    /// not want to set it.
54    pub commit_interval: Option<Duration>,
55}
56
57impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
58    for StorageSinkDesc<S, T>
59{
60    /// Determines if `self` is compatible with another `StorageSinkDesc`, in
61    /// such a way that it is possible to turn `self` into `other` through a
62    /// valid series of transformations.
63    ///
64    /// Currently, the only "valid transformation" is the passage of time such
65    /// that the sink's as ofs may differ. However, this will change once we
66    /// support `ALTER CONNECTION` or `ALTER SINK`.
67    fn alter_compatible(
68        &self,
69        id: GlobalId,
70        other: &StorageSinkDesc<S, T>,
71    ) -> Result<(), AlterError> {
72        if self == other {
73            return Ok(());
74        }
75        let StorageSinkDesc {
76            from,
77            from_desc,
78            connection,
79            envelope,
80            version: _,
81            // The as-of of the descriptions may differ.
82            as_of: _,
83            from_storage_metadata,
84            with_snapshot,
85            to_storage_metadata,
86            commit_interval: _,
87        } = self;
88
89        let compatibility_checks = [
90            (from == &other.from, "from"),
91            (from_desc == &other.from_desc, "from_desc"),
92            (
93                connection.alter_compatible(id, &other.connection).is_ok(),
94                "connection",
95            ),
96            (envelope == &other.envelope, "envelope"),
97            // This can legally change from true to false once the snapshot has been
98            // written out.
99            (*with_snapshot || !other.with_snapshot, "with_snapshot"),
100            (
101                from_storage_metadata == &other.from_storage_metadata,
102                "from_storage_metadata",
103            ),
104            (
105                to_storage_metadata == &other.to_storage_metadata,
106                "to_storage_metadata",
107            ),
108        ];
109
110        for (compatible, field) in compatibility_checks {
111            if !compatible {
112                tracing::warn!(
113                    "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
114                    self,
115                    other
116                );
117
118                return Err(AlterError { id });
119            }
120        }
121
122        Ok(())
123    }
124}
125
126#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
127pub enum SinkEnvelope {
128    /// Only used for Kafka.
129    Debezium,
130    Upsert,
131    /// Only used for Iceberg.
132    Append,
133}
134
135#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
136pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
137    Kafka(KafkaSinkConnection<C>),
138    Iceberg(IcebergSinkConnection<C>),
139}
140
141impl<C: ConnectionAccess> StorageSinkConnection<C> {
142    /// Determines if `self` is compatible with another `StorageSinkConnection`,
143    /// in such a way that it is possible to turn `self` into `other` through a
144    /// valid series of transformations (e.g. no transformation or `ALTER
145    /// CONNECTION`).
146    pub fn alter_compatible(
147        &self,
148        id: GlobalId,
149        other: &StorageSinkConnection<C>,
150    ) -> Result<(), AlterError> {
151        if self == other {
152            return Ok(());
153        }
154        match (self, other) {
155            (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
156                s.alter_compatible(id, o)?
157            }
158            (StorageSinkConnection::Iceberg(s), StorageSinkConnection::Iceberg(o)) => {
159                s.alter_compatible(id, o)?
160            }
161            _ => {
162                tracing::warn!(
163                    "StorageSinkConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
164                    self,
165                    other
166                );
167                return Err(AlterError { id });
168            }
169        }
170
171        Ok(())
172    }
173}
174
175impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
176    for StorageSinkConnection<ReferencedConnection>
177{
178    fn into_inline_connection(self, r: R) -> StorageSinkConnection {
179        match self {
180            Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
181            Self::Iceberg(conn) => StorageSinkConnection::Iceberg(conn.into_inline_connection(r)),
182        }
183    }
184}
185
186impl<C: ConnectionAccess> StorageSinkConnection<C> {
187    /// returns an option to not constrain ourselves in the future
188    pub fn connection_id(&self) -> Option<CatalogItemId> {
189        use StorageSinkConnection::*;
190        match self {
191            Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
192            Iceberg(IcebergSinkConnection {
193                catalog_connection_id: connection_id,
194                ..
195            }) => Some(*connection_id),
196        }
197    }
198
199    /// Returns the name of the sink connection.
200    pub fn name(&self) -> &'static str {
201        use StorageSinkConnection::*;
202        match self {
203            Kafka(_) => "kafka",
204            Iceberg(_) => "iceberg",
205        }
206    }
207}
208
209#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
210pub enum KafkaSinkCompressionType {
211    None,
212    Gzip,
213    Snappy,
214    Lz4,
215    Zstd,
216}
217
218impl KafkaSinkCompressionType {
219    /// Format the compression type as expected by `compression.type` librdkafka
220    /// setting.
221    pub fn to_librdkafka_option(&self) -> &'static str {
222        match self {
223            KafkaSinkCompressionType::None => "none",
224            KafkaSinkCompressionType::Gzip => "gzip",
225            KafkaSinkCompressionType::Snappy => "snappy",
226            KafkaSinkCompressionType::Lz4 => "lz4",
227            KafkaSinkCompressionType::Zstd => "zstd",
228        }
229    }
230}
231
232#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
233pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
234    pub connection_id: CatalogItemId,
235    pub connection: C::Kafka,
236    pub format: KafkaSinkFormat<C>,
237    /// A natural key of the sinked relation (view or source).
238    pub relation_key_indices: Option<Vec<usize>>,
239    /// The user-specified key for the sink.
240    pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
241    /// The index of the column containing message headers value, if any.
242    pub headers_index: Option<usize>,
243    pub value_desc: RelationDesc,
244    /// An expression that, if present, computes a hash value that should be
245    /// used to determine the partition for each message.
246    pub partition_by: Option<MirScalarExpr>,
247    pub topic: String,
248    /// Options to use when creating the topic if it doesn't already exist.
249    pub topic_options: KafkaTopicOptions,
250    pub compression_type: KafkaSinkCompressionType,
251    pub progress_group_id: KafkaIdStyle,
252    pub transactional_id: KafkaIdStyle,
253    pub topic_metadata_refresh_interval: Duration,
254}
255
256impl KafkaSinkConnection {
257    /// Returns the client ID to register with librdkafka with.
258    ///
259    /// The caller is responsible for providing the sink ID as it is not known
260    /// to `KafkaSinkConnection`.
261    pub fn client_id(
262        &self,
263        configs: &ConfigSet,
264        connection_context: &ConnectionContext,
265        sink_id: GlobalId,
266    ) -> String {
267        let mut client_id =
268            KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
269        self.connection.enrich_client_id(configs, &mut client_id);
270        client_id
271    }
272
273    /// Returns the name of the progress topic to use for the sink.
274    pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<'_, str> {
275        self.connection
276            .progress_topic(connection_context, self.connection_id)
277    }
278
279    /// Returns the ID for the consumer group the sink will use to read the
280    /// progress topic on resumption.
281    ///
282    /// The caller is responsible for providing the sink ID as it is not known
283    /// to `KafkaSinkConnection`.
284    pub fn progress_group_id(
285        &self,
286        connection_context: &ConnectionContext,
287        sink_id: GlobalId,
288    ) -> String {
289        match self.progress_group_id {
290            KafkaIdStyle::Prefix(ref prefix) => format!(
291                "{}{}",
292                prefix.as_deref().unwrap_or(""),
293                KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
294            ),
295            KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
296        }
297    }
298
299    /// Returns the transactional ID to use for the sink.
300    ///
301    /// The caller is responsible for providing the sink ID as it is not known
302    /// to `KafkaSinkConnection`.
303    pub fn transactional_id(
304        &self,
305        connection_context: &ConnectionContext,
306        sink_id: GlobalId,
307    ) -> String {
308        match self.transactional_id {
309            KafkaIdStyle::Prefix(ref prefix) => format!(
310                "{}{}",
311                prefix.as_deref().unwrap_or(""),
312                KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
313            ),
314            KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
315        }
316    }
317}
318
319impl<C: ConnectionAccess> KafkaSinkConnection<C> {
320    /// Determines if `self` is compatible with another `StorageSinkConnection`,
321    /// in such a way that it is possible to turn `self` into `other` through a
322    /// valid series of transformations (e.g. no transformation or `ALTER
323    /// CONNECTION`).
324    pub fn alter_compatible(
325        &self,
326        id: GlobalId,
327        other: &KafkaSinkConnection<C>,
328    ) -> Result<(), AlterError> {
329        if self == other {
330            return Ok(());
331        }
332        let KafkaSinkConnection {
333            connection_id,
334            connection,
335            format,
336            relation_key_indices,
337            key_desc_and_indices,
338            headers_index,
339            value_desc,
340            partition_by,
341            topic,
342            compression_type,
343            progress_group_id,
344            transactional_id,
345            topic_options,
346            topic_metadata_refresh_interval,
347        } = self;
348
349        let compatibility_checks = [
350            (connection_id == &other.connection_id, "connection_id"),
351            (
352                connection.alter_compatible(id, &other.connection).is_ok(),
353                "connection",
354            ),
355            (format.alter_compatible(id, &other.format).is_ok(), "format"),
356            (
357                relation_key_indices == &other.relation_key_indices,
358                "relation_key_indices",
359            ),
360            (
361                key_desc_and_indices == &other.key_desc_and_indices,
362                "key_desc_and_indices",
363            ),
364            (headers_index == &other.headers_index, "headers_index"),
365            (value_desc == &other.value_desc, "value_desc"),
366            (partition_by == &other.partition_by, "partition_by"),
367            (topic == &other.topic, "topic"),
368            (
369                compression_type == &other.compression_type,
370                "compression_type",
371            ),
372            (
373                progress_group_id == &other.progress_group_id,
374                "progress_group_id",
375            ),
376            (
377                transactional_id == &other.transactional_id,
378                "transactional_id",
379            ),
380            (topic_options == &other.topic_options, "topic_config"),
381            (
382                topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
383                "topic_metadata_refresh_interval",
384            ),
385        ];
386        for (compatible, field) in compatibility_checks {
387            if !compatible {
388                tracing::warn!(
389                    "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
390                    self,
391                    other
392                );
393
394                return Err(AlterError { id });
395            }
396        }
397
398        Ok(())
399    }
400}
401
402impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
403    for KafkaSinkConnection<ReferencedConnection>
404{
405    fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
406        let KafkaSinkConnection {
407            connection_id,
408            connection,
409            format,
410            relation_key_indices,
411            key_desc_and_indices,
412            headers_index,
413            value_desc,
414            partition_by,
415            topic,
416            compression_type,
417            progress_group_id,
418            transactional_id,
419            topic_options,
420            topic_metadata_refresh_interval,
421        } = self;
422        KafkaSinkConnection {
423            connection_id,
424            connection: r.resolve_connection(connection).unwrap_kafka(),
425            format: format.into_inline_connection(r),
426            relation_key_indices,
427            key_desc_and_indices,
428            headers_index,
429            value_desc,
430            partition_by,
431            topic,
432            compression_type,
433            progress_group_id,
434            transactional_id,
435            topic_options,
436            topic_metadata_refresh_interval,
437        }
438    }
439}
440
441#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
442pub enum KafkaIdStyle {
443    /// A new-style id that is optionally prefixed.
444    Prefix(Option<String>),
445    /// A legacy style id.
446    Legacy,
447}
448
449#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
450pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
451    pub key_format: Option<KafkaSinkFormatType<C>>,
452    pub value_format: KafkaSinkFormatType<C>,
453}
454
455#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
456pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
457    Avro {
458        schema: String,
459        compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
460        csr_connection: C::Csr,
461    },
462    Json,
463    Text,
464    Bytes,
465}
466
467impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
468    pub fn get_format_name(&self) -> &str {
469        match self {
470            Self::Avro { .. } => "avro",
471            Self::Json => "json",
472            Self::Text => "text",
473            Self::Bytes => "bytes",
474        }
475    }
476}
477
478impl<C: ConnectionAccess> KafkaSinkFormat<C> {
479    pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
480        // For legacy reasons, if the key-format is none or the key & value formats are
481        // both the same (either avro or json), we return the value format name,
482        // otherwise we return a composite name.
483        match &self.key_format {
484            None => self.value_format.get_format_name().into(),
485            Some(key_format) => match (key_format, &self.value_format) {
486                (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
487                    "avro".into()
488                }
489                (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
490                (keyf, valuef) => format!(
491                    "key-{}-value-{}",
492                    keyf.get_format_name(),
493                    valuef.get_format_name()
494                )
495                .into(),
496            },
497        }
498    }
499
500    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
501        if self == other {
502            return Ok(());
503        }
504
505        match (&self.value_format, &other.value_format) {
506            (
507                KafkaSinkFormatType::Avro {
508                    schema,
509                    compatibility_level: _,
510                    csr_connection,
511                },
512                KafkaSinkFormatType::Avro {
513                    schema: other_schema,
514                    compatibility_level: _,
515                    csr_connection: other_csr_connection,
516                },
517            ) => {
518                if schema != other_schema
519                    || csr_connection
520                        .alter_compatible(id, other_csr_connection)
521                        .is_err()
522                {
523                    tracing::warn!(
524                        "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
525                        self,
526                        other
527                    );
528
529                    return Err(AlterError { id });
530                }
531            }
532            (s, o) => {
533                if s != o {
534                    tracing::warn!(
535                        "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
536                        s,
537                        o
538                    );
539                    return Err(AlterError { id });
540                }
541            }
542        }
543
544        match (&self.key_format, &other.key_format) {
545            (
546                Some(KafkaSinkFormatType::Avro {
547                    schema,
548                    compatibility_level: _,
549                    csr_connection,
550                }),
551                Some(KafkaSinkFormatType::Avro {
552                    schema: other_schema,
553                    compatibility_level: _,
554                    csr_connection: other_csr_connection,
555                }),
556            ) => {
557                if schema != other_schema
558                    || csr_connection
559                        .alter_compatible(id, other_csr_connection)
560                        .is_err()
561                {
562                    tracing::warn!(
563                        "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
564                        self,
565                        other
566                    );
567
568                    return Err(AlterError { id });
569                }
570            }
571            (s, o) => {
572                if s != o {
573                    tracing::warn!(
574                        "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
575                        s,
576                        o
577                    );
578                    return Err(AlterError { id });
579                }
580            }
581        }
582
583        Ok(())
584    }
585}
586
587impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
588    for KafkaSinkFormat<ReferencedConnection>
589{
590    fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
591        KafkaSinkFormat {
592            key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
593            value_format: self.value_format.into_inline_connection(&r),
594        }
595    }
596}
597
598impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
599    for KafkaSinkFormatType<ReferencedConnection>
600{
601    fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
602        match self {
603            KafkaSinkFormatType::Avro {
604                schema,
605                compatibility_level,
606                csr_connection,
607            } => KafkaSinkFormatType::Avro {
608                schema,
609                compatibility_level,
610                csr_connection: r.resolve_connection(csr_connection).unwrap_csr(),
611            },
612            KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
613            KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
614            KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
615        }
616    }
617}
618
619#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
620pub enum S3SinkFormat {
621    /// Encoded using the PG `COPY` protocol, with one of its supported formats.
622    PgCopy(CopyFormatParams<'static>),
623    /// Encoded as Parquet.
624    Parquet,
625}
626
627/// Info required to copy the data to s3.
628#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
629pub struct S3UploadInfo {
630    /// The s3 uri path to write the data to.
631    pub uri: String,
632    /// The max file size of each file uploaded to S3.
633    pub max_file_size: u64,
634    /// The relation desc of the data to be uploaded to S3.
635    pub desc: RelationDesc,
636    /// The selected sink format.
637    pub format: S3SinkFormat,
638}
639
640pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
641pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);
642
643/// Column name appended by MODE APPEND Iceberg sinks to record the diff (+1/−1).
644pub const ICEBERG_APPEND_DIFF_COLUMN: &str = "_mz_diff";
645/// Column name appended by MODE APPEND Iceberg sinks to record the logical timestamp.
646pub const ICEBERG_APPEND_TIMESTAMP_COLUMN: &str = "_mz_timestamp";
647
648/// The precision needed to store all UInt64 values in a Decimal128.
649/// UInt64 max value is 18,446,744,073,709,551,615 which has 20 digits.
650pub const ICEBERG_UINT64_DECIMAL_PRECISION: u8 = 20;
651
652/// Type overrides for Iceberg-compatible Arrow schemas.
653///
654/// Iceberg doesn't support unsigned integer types or interval natively, so we
655/// map them to compatible types:
656/// - `UInt8`, `UInt16` -> `Int32`
657/// - `UInt32` -> `Int64`
658/// - `UInt64` -> `Decimal128(20, 0)`
659/// - `MzTimestamp` (which uses UInt64) -> `Decimal128(20, 0)`
660/// - `Interval` -> string (`LargeUtf8`)
661///
662/// Pass this to `mz_arrow_util::builder::desc_to_schema_with_overrides`
663/// when producing the Arrow schema for an iceberg sink, and to
664/// `mz_arrow_util::builder::ArrowBuilder::validate_desc_for_parquet` to
665/// validate the desc before sink creation.
666pub fn iceberg_type_overrides(
667    scalar_type: &mz_repr::SqlScalarType,
668) -> Option<(arrow::datatypes::DataType, String)> {
669    use arrow::datatypes::DataType;
670    use mz_repr::SqlScalarType;
671    match scalar_type {
672        SqlScalarType::UInt16 => Some((DataType::Int32, "uint2".to_string())),
673        SqlScalarType::UInt32 => Some((DataType::Int64, "uint4".to_string())),
674        SqlScalarType::UInt64 => Some((
675            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
676            "uint8".to_string(),
677        )),
678        SqlScalarType::MzTimestamp => Some((
679            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
680            "mz_timestamp".to_string(),
681        )),
682        SqlScalarType::Interval => Some((DataType::LargeUtf8, "interval".to_string())),
683        _ => None,
684    }
685}
686
687#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
688#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
689pub struct IcebergSinkConnection<C: ConnectionAccess = InlinedConnection> {
690    pub catalog_connection_id: CatalogItemId,
691    pub catalog_connection: C::IcebergCatalog,
692    pub aws_connection_id: CatalogItemId,
693    pub aws_connection: C::Aws,
694    /// A natural key of the sinked relation (view or source).
695    pub relation_key_indices: Option<Vec<usize>>,
696    /// The user-specified key for the sink.
697    pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
698    pub namespace: String,
699    pub table: String,
700}
701
702impl<C: ConnectionAccess> IcebergSinkConnection<C> {
703    /// Determines if `self` is compatible with another `StorageSinkConnection`,
704    /// in such a way that it is possible to turn `self` into `other` through a
705    /// valid series of transformations (e.g. no transformation or `ALTER
706    /// CONNECTION`).
707    pub fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
708        if self == other {
709            return Ok(());
710        }
711        let IcebergSinkConnection {
712            catalog_connection_id: connection_id,
713            catalog_connection,
714            aws_connection_id,
715            aws_connection,
716            relation_key_indices,
717            key_desc_and_indices,
718            namespace,
719            table,
720        } = self;
721
722        let compatibility_checks = [
723            (
724                connection_id == &other.catalog_connection_id,
725                "connection_id",
726            ),
727            (
728                catalog_connection
729                    .alter_compatible(id, &other.catalog_connection)
730                    .is_ok(),
731                "catalog_connection",
732            ),
733            (
734                aws_connection_id == &other.aws_connection_id,
735                "aws_connection_id",
736            ),
737            (
738                aws_connection
739                    .alter_compatible(id, &other.aws_connection)
740                    .is_ok(),
741                "aws_connection",
742            ),
743            (
744                relation_key_indices == &other.relation_key_indices,
745                "relation_key_indices",
746            ),
747            (
748                key_desc_and_indices == &other.key_desc_and_indices,
749                "key_desc_and_indices",
750            ),
751            (namespace == &other.namespace, "namespace"),
752            (table == &other.table, "table"),
753        ];
754        for (compatible, field) in compatibility_checks {
755            if !compatible {
756                tracing::warn!(
757                    "IcebergSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
758                    self,
759                    other
760                );
761
762                return Err(AlterError { id });
763            }
764        }
765
766        Ok(())
767    }
768}
769
770impl<R: ConnectionResolver> IntoInlineConnection<IcebergSinkConnection, R>
771    for IcebergSinkConnection<ReferencedConnection>
772{
773    fn into_inline_connection(self, r: R) -> IcebergSinkConnection {
774        let IcebergSinkConnection {
775            catalog_connection_id,
776            catalog_connection,
777            aws_connection_id,
778            aws_connection,
779            relation_key_indices,
780            key_desc_and_indices,
781            namespace,
782            table,
783        } = self;
784        IcebergSinkConnection {
785            catalog_connection_id,
786            catalog_connection: r
787                .resolve_connection(catalog_connection)
788                .unwrap_iceberg_catalog(),
789            aws_connection_id,
790            aws_connection: r.resolve_connection(aws_connection).unwrap_aws(),
791            relation_key_indices,
792            key_desc_and_indices,
793            namespace,
794            table,
795        }
796    }
797}