Skip to main content

mz_storage_types/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to reporting changing collections out of `dataflow`.
11
12use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::bytes::ByteSize;
20use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
21use proptest_derive::Arbitrary;
22use serde::{Deserialize, Serialize};
23use timely::PartialOrder;
24use timely::progress::frontier::Antichain;
25
26use crate::AlterCompatible;
27use crate::connections::inline::{
28    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
29    ReferencedConnection,
30};
31use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
32use crate::controller::AlterError;
33
34pub mod s3_oneshot_sink;
35
36/// A sink for updates to a relational collection.
37#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
38pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
39    pub from: GlobalId,
40    pub from_desc: RelationDesc,
41    pub connection: StorageSinkConnection,
42    pub with_snapshot: bool,
43    pub version: u64,
44    pub envelope: SinkEnvelope,
45    pub as_of: Antichain<T>,
46    pub from_storage_metadata: S,
47    pub to_storage_metadata: S,
48    /// The interval at which to commit data to the sink.
49    /// This isn't universally supported by all sinks
50    /// yet, so it is optional. Even for sinks that might
51    /// support it in the future (ahem, kafka) users might
52    /// not want to set it.
53    pub commit_interval: Option<Duration>,
54}
55
56impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
57    for StorageSinkDesc<S, T>
58{
59    /// Determines if `self` is compatible with another `StorageSinkDesc`, in
60    /// such a way that it is possible to turn `self` into `other` through a
61    /// valid series of transformations.
62    ///
63    /// Currently, the only "valid transformation" is the passage of time such
64    /// that the sink's as ofs may differ. However, this will change once we
65    /// support `ALTER CONNECTION` or `ALTER SINK`.
66    fn alter_compatible(
67        &self,
68        id: GlobalId,
69        other: &StorageSinkDesc<S, T>,
70    ) -> Result<(), AlterError> {
71        if self == other {
72            return Ok(());
73        }
74        let StorageSinkDesc {
75            from,
76            from_desc,
77            connection,
78            envelope,
79            version: _,
80            // The as-of of the descriptions may differ.
81            as_of: _,
82            from_storage_metadata,
83            with_snapshot,
84            to_storage_metadata,
85            commit_interval: _,
86        } = self;
87
88        let compatibility_checks = [
89            (from == &other.from, "from"),
90            (from_desc == &other.from_desc, "from_desc"),
91            (
92                connection.alter_compatible(id, &other.connection).is_ok(),
93                "connection",
94            ),
95            (envelope == &other.envelope, "envelope"),
96            // This can legally change from true to false once the snapshot has been
97            // written out.
98            (*with_snapshot || !other.with_snapshot, "with_snapshot"),
99            (
100                from_storage_metadata == &other.from_storage_metadata,
101                "from_storage_metadata",
102            ),
103            (
104                to_storage_metadata == &other.to_storage_metadata,
105                "to_storage_metadata",
106            ),
107        ];
108
109        for (compatible, field) in compatibility_checks {
110            if !compatible {
111                tracing::warn!(
112                    "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
113                    self,
114                    other
115                );
116
117                return Err(AlterError { id });
118            }
119        }
120
121        Ok(())
122    }
123}
124
125#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
126pub enum SinkEnvelope {
127    /// Only used for Kafka.
128    Debezium,
129    Upsert,
130    /// Only used for Iceberg.
131    Append,
132}
133
134#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
135pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
136    Kafka(KafkaSinkConnection<C>),
137    Iceberg(IcebergSinkConnection<C>),
138}
139
140impl<C: ConnectionAccess> StorageSinkConnection<C> {
141    /// Determines if `self` is compatible with another `StorageSinkConnection`,
142    /// in such a way that it is possible to turn `self` into `other` through a
143    /// valid series of transformations (e.g. no transformation or `ALTER
144    /// CONNECTION`).
145    pub fn alter_compatible(
146        &self,
147        id: GlobalId,
148        other: &StorageSinkConnection<C>,
149    ) -> Result<(), AlterError> {
150        if self == other {
151            return Ok(());
152        }
153        match (self, other) {
154            (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
155                s.alter_compatible(id, o)?
156            }
157            (StorageSinkConnection::Iceberg(s), StorageSinkConnection::Iceberg(o)) => {
158                s.alter_compatible(id, o)?
159            }
160            _ => {
161                tracing::warn!(
162                    "StorageSinkConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
163                    self,
164                    other
165                );
166                return Err(AlterError { id });
167            }
168        }
169
170        Ok(())
171    }
172}
173
174impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
175    for StorageSinkConnection<ReferencedConnection>
176{
177    fn into_inline_connection(self, r: R) -> StorageSinkConnection {
178        match self {
179            Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
180            Self::Iceberg(conn) => StorageSinkConnection::Iceberg(conn.into_inline_connection(r)),
181        }
182    }
183}
184
185impl<C: ConnectionAccess> StorageSinkConnection<C> {
186    /// returns an option to not constrain ourselves in the future
187    pub fn connection_id(&self) -> Option<CatalogItemId> {
188        use StorageSinkConnection::*;
189        match self {
190            Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
191            Iceberg(IcebergSinkConnection {
192                catalog_connection_id: connection_id,
193                ..
194            }) => Some(*connection_id),
195        }
196    }
197
198    /// Returns the name of the sink connection.
199    pub fn name(&self) -> &'static str {
200        use StorageSinkConnection::*;
201        match self {
202            Kafka(_) => "kafka",
203            Iceberg(_) => "iceberg",
204        }
205    }
206}
207
208#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
209pub enum KafkaSinkCompressionType {
210    None,
211    Gzip,
212    Snappy,
213    Lz4,
214    Zstd,
215}
216
217impl KafkaSinkCompressionType {
218    /// Format the compression type as expected by `compression.type` librdkafka
219    /// setting.
220    pub fn to_librdkafka_option(&self) -> &'static str {
221        match self {
222            KafkaSinkCompressionType::None => "none",
223            KafkaSinkCompressionType::Gzip => "gzip",
224            KafkaSinkCompressionType::Snappy => "snappy",
225            KafkaSinkCompressionType::Lz4 => "lz4",
226            KafkaSinkCompressionType::Zstd => "zstd",
227        }
228    }
229}
230
231#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
232pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
233    pub connection_id: CatalogItemId,
234    pub connection: C::Kafka,
235    pub format: KafkaSinkFormat<C>,
236    /// A natural key of the sinked relation (view or source).
237    pub relation_key_indices: Option<Vec<usize>>,
238    /// The user-specified key for the sink.
239    pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
240    /// The index of the column containing message headers value, if any.
241    pub headers_index: Option<usize>,
242    pub value_desc: RelationDesc,
243    /// An expression that, if present, computes a hash value that should be
244    /// used to determine the partition for each message.
245    pub partition_by: Option<MirScalarExpr>,
246    pub topic: String,
247    /// Options to use when creating the topic if it doesn't already exist.
248    pub topic_options: KafkaTopicOptions,
249    pub compression_type: KafkaSinkCompressionType,
250    pub progress_group_id: KafkaIdStyle,
251    pub transactional_id: KafkaIdStyle,
252    pub topic_metadata_refresh_interval: Duration,
253}
254
255impl KafkaSinkConnection {
256    /// Returns the client ID to register with librdkafka with.
257    ///
258    /// The caller is responsible for providing the sink ID as it is not known
259    /// to `KafkaSinkConnection`.
260    pub fn client_id(
261        &self,
262        configs: &ConfigSet,
263        connection_context: &ConnectionContext,
264        sink_id: GlobalId,
265    ) -> String {
266        let mut client_id =
267            KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
268        self.connection.enrich_client_id(configs, &mut client_id);
269        client_id
270    }
271
272    /// Returns the name of the progress topic to use for the sink.
273    pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<'_, str> {
274        self.connection
275            .progress_topic(connection_context, self.connection_id)
276    }
277
278    /// Returns the ID for the consumer group the sink will use to read the
279    /// progress topic on resumption.
280    ///
281    /// The caller is responsible for providing the sink ID as it is not known
282    /// to `KafkaSinkConnection`.
283    pub fn progress_group_id(
284        &self,
285        connection_context: &ConnectionContext,
286        sink_id: GlobalId,
287    ) -> String {
288        match self.progress_group_id {
289            KafkaIdStyle::Prefix(ref prefix) => format!(
290                "{}{}",
291                prefix.as_deref().unwrap_or(""),
292                KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
293            ),
294            KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
295        }
296    }
297
298    /// Returns the transactional ID to use for the sink.
299    ///
300    /// The caller is responsible for providing the sink ID as it is not known
301    /// to `KafkaSinkConnection`.
302    pub fn transactional_id(
303        &self,
304        connection_context: &ConnectionContext,
305        sink_id: GlobalId,
306    ) -> String {
307        match self.transactional_id {
308            KafkaIdStyle::Prefix(ref prefix) => format!(
309                "{}{}",
310                prefix.as_deref().unwrap_or(""),
311                KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
312            ),
313            KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
314        }
315    }
316}
317
318impl<C: ConnectionAccess> KafkaSinkConnection<C> {
319    /// Determines if `self` is compatible with another `StorageSinkConnection`,
320    /// in such a way that it is possible to turn `self` into `other` through a
321    /// valid series of transformations (e.g. no transformation or `ALTER
322    /// CONNECTION`).
323    pub fn alter_compatible(
324        &self,
325        id: GlobalId,
326        other: &KafkaSinkConnection<C>,
327    ) -> Result<(), AlterError> {
328        if self == other {
329            return Ok(());
330        }
331        let KafkaSinkConnection {
332            connection_id,
333            connection,
334            format,
335            relation_key_indices,
336            key_desc_and_indices,
337            headers_index,
338            value_desc,
339            partition_by,
340            topic,
341            compression_type,
342            progress_group_id,
343            transactional_id,
344            topic_options,
345            topic_metadata_refresh_interval,
346        } = self;
347
348        let compatibility_checks = [
349            (connection_id == &other.connection_id, "connection_id"),
350            (
351                connection.alter_compatible(id, &other.connection).is_ok(),
352                "connection",
353            ),
354            (format.alter_compatible(id, &other.format).is_ok(), "format"),
355            (
356                relation_key_indices == &other.relation_key_indices,
357                "relation_key_indices",
358            ),
359            (
360                key_desc_and_indices == &other.key_desc_and_indices,
361                "key_desc_and_indices",
362            ),
363            (headers_index == &other.headers_index, "headers_index"),
364            (value_desc == &other.value_desc, "value_desc"),
365            (partition_by == &other.partition_by, "partition_by"),
366            (topic == &other.topic, "topic"),
367            (
368                compression_type == &other.compression_type,
369                "compression_type",
370            ),
371            (
372                progress_group_id == &other.progress_group_id,
373                "progress_group_id",
374            ),
375            (
376                transactional_id == &other.transactional_id,
377                "transactional_id",
378            ),
379            (topic_options == &other.topic_options, "topic_config"),
380            (
381                topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
382                "topic_metadata_refresh_interval",
383            ),
384        ];
385        for (compatible, field) in compatibility_checks {
386            if !compatible {
387                tracing::warn!(
388                    "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
389                    self,
390                    other
391                );
392
393                return Err(AlterError { id });
394            }
395        }
396
397        Ok(())
398    }
399}
400
401impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
402    for KafkaSinkConnection<ReferencedConnection>
403{
404    fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
405        let KafkaSinkConnection {
406            connection_id,
407            connection,
408            format,
409            relation_key_indices,
410            key_desc_and_indices,
411            headers_index,
412            value_desc,
413            partition_by,
414            topic,
415            compression_type,
416            progress_group_id,
417            transactional_id,
418            topic_options,
419            topic_metadata_refresh_interval,
420        } = self;
421        KafkaSinkConnection {
422            connection_id,
423            connection: r.resolve_connection(connection).unwrap_kafka(),
424            format: format.into_inline_connection(r),
425            relation_key_indices,
426            key_desc_and_indices,
427            headers_index,
428            value_desc,
429            partition_by,
430            topic,
431            compression_type,
432            progress_group_id,
433            transactional_id,
434            topic_options,
435            topic_metadata_refresh_interval,
436        }
437    }
438}
439
440#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
441pub enum KafkaIdStyle {
442    /// A new-style id that is optionally prefixed.
443    Prefix(Option<String>),
444    /// A legacy style id.
445    Legacy,
446}
447
448#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
449pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
450    pub key_format: Option<KafkaSinkFormatType<C>>,
451    pub value_format: KafkaSinkFormatType<C>,
452}
453
454#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
455pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
456    Avro {
457        schema: String,
458        compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
459        csr_connection: C::Csr,
460    },
461    Json,
462    Text,
463    Bytes,
464}
465
466impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
467    pub fn get_format_name(&self) -> &str {
468        match self {
469            Self::Avro { .. } => "avro",
470            Self::Json => "json",
471            Self::Text => "text",
472            Self::Bytes => "bytes",
473        }
474    }
475}
476
477impl<C: ConnectionAccess> KafkaSinkFormat<C> {
478    pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
479        // For legacy reasons, if the key-format is none or the key & value formats are
480        // both the same (either avro or json), we return the value format name,
481        // otherwise we return a composite name.
482        match &self.key_format {
483            None => self.value_format.get_format_name().into(),
484            Some(key_format) => match (key_format, &self.value_format) {
485                (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
486                    "avro".into()
487                }
488                (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
489                (keyf, valuef) => format!(
490                    "key-{}-value-{}",
491                    keyf.get_format_name(),
492                    valuef.get_format_name()
493                )
494                .into(),
495            },
496        }
497    }
498
499    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
500        if self == other {
501            return Ok(());
502        }
503
504        match (&self.value_format, &other.value_format) {
505            (
506                KafkaSinkFormatType::Avro {
507                    schema,
508                    compatibility_level: _,
509                    csr_connection,
510                },
511                KafkaSinkFormatType::Avro {
512                    schema: other_schema,
513                    compatibility_level: _,
514                    csr_connection: other_csr_connection,
515                },
516            ) => {
517                if schema != other_schema
518                    || csr_connection
519                        .alter_compatible(id, other_csr_connection)
520                        .is_err()
521                {
522                    tracing::warn!(
523                        "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
524                        self,
525                        other
526                    );
527
528                    return Err(AlterError { id });
529                }
530            }
531            (s, o) => {
532                if s != o {
533                    tracing::warn!(
534                        "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
535                        s,
536                        o
537                    );
538                    return Err(AlterError { id });
539                }
540            }
541        }
542
543        match (&self.key_format, &other.key_format) {
544            (
545                Some(KafkaSinkFormatType::Avro {
546                    schema,
547                    compatibility_level: _,
548                    csr_connection,
549                }),
550                Some(KafkaSinkFormatType::Avro {
551                    schema: other_schema,
552                    compatibility_level: _,
553                    csr_connection: other_csr_connection,
554                }),
555            ) => {
556                if schema != other_schema
557                    || csr_connection
558                        .alter_compatible(id, other_csr_connection)
559                        .is_err()
560                {
561                    tracing::warn!(
562                        "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
563                        self,
564                        other
565                    );
566
567                    return Err(AlterError { id });
568                }
569            }
570            (s, o) => {
571                if s != o {
572                    tracing::warn!(
573                        "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
574                        s,
575                        o
576                    );
577                    return Err(AlterError { id });
578                }
579            }
580        }
581
582        Ok(())
583    }
584}
585
586impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
587    for KafkaSinkFormat<ReferencedConnection>
588{
589    fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
590        KafkaSinkFormat {
591            key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
592            value_format: self.value_format.into_inline_connection(&r),
593        }
594    }
595}
596
597impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
598    for KafkaSinkFormatType<ReferencedConnection>
599{
600    fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
601        match self {
602            KafkaSinkFormatType::Avro {
603                schema,
604                compatibility_level,
605                csr_connection,
606            } => KafkaSinkFormatType::Avro {
607                schema,
608                compatibility_level,
609                csr_connection: r.resolve_connection(csr_connection).unwrap_csr(),
610            },
611            KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
612            KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
613            KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
614        }
615    }
616}
617
618#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
619pub enum S3SinkFormat {
620    /// Encoded using the PG `COPY` protocol, with one of its supported formats.
621    PgCopy(CopyFormatParams<'static>),
622    /// Encoded as Parquet.
623    Parquet,
624}
625
626/// Info required to copy the data to s3.
627#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
628pub struct S3UploadInfo {
629    /// The s3 uri path to write the data to.
630    pub uri: String,
631    /// The max file size of each file uploaded to S3.
632    pub max_file_size: u64,
633    /// The relation desc of the data to be uploaded to S3.
634    pub desc: RelationDesc,
635    /// The selected sink format.
636    pub format: S3SinkFormat,
637}
638
639pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
640pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);
641
642/// Column name appended by MODE APPEND Iceberg sinks to record the diff (+1/−1).
643pub const ICEBERG_APPEND_DIFF_COLUMN: &str = "_mz_diff";
644/// Column name appended by MODE APPEND Iceberg sinks to record the logical timestamp.
645pub const ICEBERG_APPEND_TIMESTAMP_COLUMN: &str = "_mz_timestamp";
646
647#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
648pub struct IcebergSinkConnection<C: ConnectionAccess = InlinedConnection> {
649    pub catalog_connection_id: CatalogItemId,
650    pub catalog_connection: C::IcebergCatalog,
651    pub aws_connection_id: CatalogItemId,
652    pub aws_connection: C::Aws,
653    /// A natural key of the sinked relation (view or source).
654    pub relation_key_indices: Option<Vec<usize>>,
655    /// The user-specified key for the sink.
656    pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
657    pub namespace: String,
658    pub table: String,
659}
660
661impl<C: ConnectionAccess> IcebergSinkConnection<C> {
662    /// Determines if `self` is compatible with another `StorageSinkConnection`,
663    /// in such a way that it is possible to turn `self` into `other` through a
664    /// valid series of transformations (e.g. no transformation or `ALTER
665    /// CONNECTION`).
666    pub fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
667        if self == other {
668            return Ok(());
669        }
670        let IcebergSinkConnection {
671            catalog_connection_id: connection_id,
672            catalog_connection,
673            aws_connection_id,
674            aws_connection,
675            relation_key_indices,
676            key_desc_and_indices,
677            namespace,
678            table,
679        } = self;
680
681        let compatibility_checks = [
682            (
683                connection_id == &other.catalog_connection_id,
684                "connection_id",
685            ),
686            (
687                catalog_connection
688                    .alter_compatible(id, &other.catalog_connection)
689                    .is_ok(),
690                "catalog_connection",
691            ),
692            (
693                aws_connection_id == &other.aws_connection_id,
694                "aws_connection_id",
695            ),
696            (
697                aws_connection
698                    .alter_compatible(id, &other.aws_connection)
699                    .is_ok(),
700                "aws_connection",
701            ),
702            (
703                relation_key_indices == &other.relation_key_indices,
704                "relation_key_indices",
705            ),
706            (
707                key_desc_and_indices == &other.key_desc_and_indices,
708                "key_desc_and_indices",
709            ),
710            (namespace == &other.namespace, "namespace"),
711            (table == &other.table, "table"),
712        ];
713        for (compatible, field) in compatibility_checks {
714            if !compatible {
715                tracing::warn!(
716                    "IcebergSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
717                    self,
718                    other
719                );
720
721                return Err(AlterError { id });
722            }
723        }
724
725        Ok(())
726    }
727}
728
729impl<R: ConnectionResolver> IntoInlineConnection<IcebergSinkConnection, R>
730    for IcebergSinkConnection<ReferencedConnection>
731{
732    fn into_inline_connection(self, r: R) -> IcebergSinkConnection {
733        let IcebergSinkConnection {
734            catalog_connection_id,
735            catalog_connection,
736            aws_connection_id,
737            aws_connection,
738            relation_key_indices,
739            key_desc_and_indices,
740            namespace,
741            table,
742        } = self;
743        IcebergSinkConnection {
744            catalog_connection_id,
745            catalog_connection: r
746                .resolve_connection(catalog_connection)
747                .unwrap_iceberg_catalog(),
748            aws_connection_id,
749            aws_connection: r.resolve_connection(aws_connection).unwrap_aws(),
750            relation_key_indices,
751            key_desc_and_indices,
752            namespace,
753            table,
754        }
755    }
756}