mz_storage_types/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to reporting changing collections out of `dataflow`.
11
12use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::bytes::ByteSize;
20use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
21use proptest_derive::Arbitrary;
22use serde::{Deserialize, Serialize};
23use timely::PartialOrder;
24use timely::progress::frontier::Antichain;
25
26use crate::AlterCompatible;
27use crate::connections::inline::{
28    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
29    ReferencedConnection,
30};
31use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
32use crate::controller::AlterError;
33
34pub mod s3_oneshot_sink;
35
36/// A sink for updates to a relational collection.
37#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
38pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
39    pub from: GlobalId,
40    pub from_desc: RelationDesc,
41    pub connection: StorageSinkConnection,
42    pub with_snapshot: bool,
43    pub version: u64,
44    pub envelope: SinkEnvelope,
45    pub as_of: Antichain<T>,
46    pub from_storage_metadata: S,
47    pub to_storage_metadata: S,
48    /// The interval at which to commit data to the sink.
49    /// This isn't universally supported by all sinks
50    /// yet, so it is optional. Even for sinks that might
51    /// support it in the future (ahem, kafka) users might
52    /// not want to set it.
53    pub commit_interval: Option<Duration>,
54}
55
56impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
57    for StorageSinkDesc<S, T>
58{
59    /// Determines if `self` is compatible with another `StorageSinkDesc`, in
60    /// such a way that it is possible to turn `self` into `other` through a
61    /// valid series of transformations.
62    ///
63    /// Currently, the only "valid transformation" is the passage of time such
64    /// that the sink's as ofs may differ. However, this will change once we
65    /// support `ALTER CONNECTION` or `ALTER SINK`.
66    fn alter_compatible(
67        &self,
68        id: GlobalId,
69        other: &StorageSinkDesc<S, T>,
70    ) -> Result<(), AlterError> {
71        if self == other {
72            return Ok(());
73        }
74        let StorageSinkDesc {
75            from,
76            from_desc,
77            connection,
78            envelope,
79            version: _,
80            // The as-of of the descriptions may differ.
81            as_of: _,
82            from_storage_metadata,
83            with_snapshot,
84            to_storage_metadata,
85            commit_interval: _,
86        } = self;
87
88        let compatibility_checks = [
89            (from == &other.from, "from"),
90            (from_desc == &other.from_desc, "from_desc"),
91            (
92                connection.alter_compatible(id, &other.connection).is_ok(),
93                "connection",
94            ),
95            (envelope == &other.envelope, "envelope"),
96            // This can legally change from true to false once the snapshot has been
97            // written out.
98            (*with_snapshot || !other.with_snapshot, "with_snapshot"),
99            (
100                from_storage_metadata == &other.from_storage_metadata,
101                "from_storage_metadata",
102            ),
103            (
104                to_storage_metadata == &other.to_storage_metadata,
105                "to_storage_metadata",
106            ),
107        ];
108
109        for (compatible, field) in compatibility_checks {
110            if !compatible {
111                tracing::warn!(
112                    "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
113                    self,
114                    other
115                );
116
117                return Err(AlterError { id });
118            }
119        }
120
121        Ok(())
122    }
123}
124
125#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
126pub enum SinkEnvelope {
127    Debezium,
128    Upsert,
129}
130
131#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
132pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
133    Kafka(KafkaSinkConnection<C>),
134    Iceberg(IcebergSinkConnection<C>),
135}
136
137impl<C: ConnectionAccess> StorageSinkConnection<C> {
138    /// Determines if `self` is compatible with another `StorageSinkConnection`,
139    /// in such a way that it is possible to turn `self` into `other` through a
140    /// valid series of transformations (e.g. no transformation or `ALTER
141    /// CONNECTION`).
142    pub fn alter_compatible(
143        &self,
144        id: GlobalId,
145        other: &StorageSinkConnection<C>,
146    ) -> Result<(), AlterError> {
147        if self == other {
148            return Ok(());
149        }
150        match (self, other) {
151            (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
152                s.alter_compatible(id, o)?
153            }
154            (StorageSinkConnection::Iceberg(s), StorageSinkConnection::Iceberg(o)) => {
155                s.alter_compatible(id, o)?
156            }
157            _ => {
158                tracing::warn!(
159                    "StorageSinkConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
160                    self,
161                    other
162                );
163                return Err(AlterError { id });
164            }
165        }
166
167        Ok(())
168    }
169}
170
171impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
172    for StorageSinkConnection<ReferencedConnection>
173{
174    fn into_inline_connection(self, r: R) -> StorageSinkConnection {
175        match self {
176            Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
177            Self::Iceberg(conn) => StorageSinkConnection::Iceberg(conn.into_inline_connection(r)),
178        }
179    }
180}
181
182impl<C: ConnectionAccess> StorageSinkConnection<C> {
183    /// returns an option to not constrain ourselves in the future
184    pub fn connection_id(&self) -> Option<CatalogItemId> {
185        use StorageSinkConnection::*;
186        match self {
187            Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
188            Iceberg(IcebergSinkConnection {
189                catalog_connection_id: connection_id,
190                ..
191            }) => Some(*connection_id),
192        }
193    }
194
195    /// Returns the name of the sink connection.
196    pub fn name(&self) -> &'static str {
197        use StorageSinkConnection::*;
198        match self {
199            Kafka(_) => "kafka",
200            Iceberg(_) => "iceberg",
201        }
202    }
203}
204
205#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
206pub enum KafkaSinkCompressionType {
207    None,
208    Gzip,
209    Snappy,
210    Lz4,
211    Zstd,
212}
213
214impl KafkaSinkCompressionType {
215    /// Format the compression type as expected by `compression.type` librdkafka
216    /// setting.
217    pub fn to_librdkafka_option(&self) -> &'static str {
218        match self {
219            KafkaSinkCompressionType::None => "none",
220            KafkaSinkCompressionType::Gzip => "gzip",
221            KafkaSinkCompressionType::Snappy => "snappy",
222            KafkaSinkCompressionType::Lz4 => "lz4",
223            KafkaSinkCompressionType::Zstd => "zstd",
224        }
225    }
226}
227
228#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
229pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
230    pub connection_id: CatalogItemId,
231    pub connection: C::Kafka,
232    pub format: KafkaSinkFormat<C>,
233    /// A natural key of the sinked relation (view or source).
234    pub relation_key_indices: Option<Vec<usize>>,
235    /// The user-specified key for the sink.
236    pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
237    /// The index of the column containing message headers value, if any.
238    pub headers_index: Option<usize>,
239    pub value_desc: RelationDesc,
240    /// An expression that, if present, computes a hash value that should be
241    /// used to determine the partition for each message.
242    pub partition_by: Option<MirScalarExpr>,
243    pub topic: String,
244    /// Options to use when creating the topic if it doesn't already exist.
245    pub topic_options: KafkaTopicOptions,
246    pub compression_type: KafkaSinkCompressionType,
247    pub progress_group_id: KafkaIdStyle,
248    pub transactional_id: KafkaIdStyle,
249    pub topic_metadata_refresh_interval: Duration,
250}
251
252impl KafkaSinkConnection {
253    /// Returns the client ID to register with librdkafka with.
254    ///
255    /// The caller is responsible for providing the sink ID as it is not known
256    /// to `KafkaSinkConnection`.
257    pub fn client_id(
258        &self,
259        configs: &ConfigSet,
260        connection_context: &ConnectionContext,
261        sink_id: GlobalId,
262    ) -> String {
263        let mut client_id =
264            KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
265        self.connection.enrich_client_id(configs, &mut client_id);
266        client_id
267    }
268
269    /// Returns the name of the progress topic to use for the sink.
270    pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<'_, str> {
271        self.connection
272            .progress_topic(connection_context, self.connection_id)
273    }
274
275    /// Returns the ID for the consumer group the sink will use to read the
276    /// progress topic on resumption.
277    ///
278    /// The caller is responsible for providing the sink ID as it is not known
279    /// to `KafkaSinkConnection`.
280    pub fn progress_group_id(
281        &self,
282        connection_context: &ConnectionContext,
283        sink_id: GlobalId,
284    ) -> String {
285        match self.progress_group_id {
286            KafkaIdStyle::Prefix(ref prefix) => format!(
287                "{}{}",
288                prefix.as_deref().unwrap_or(""),
289                KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
290            ),
291            KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
292        }
293    }
294
295    /// Returns the transactional ID to use for the sink.
296    ///
297    /// The caller is responsible for providing the sink ID as it is not known
298    /// to `KafkaSinkConnection`.
299    pub fn transactional_id(
300        &self,
301        connection_context: &ConnectionContext,
302        sink_id: GlobalId,
303    ) -> String {
304        match self.transactional_id {
305            KafkaIdStyle::Prefix(ref prefix) => format!(
306                "{}{}",
307                prefix.as_deref().unwrap_or(""),
308                KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
309            ),
310            KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
311        }
312    }
313}
314
315impl<C: ConnectionAccess> KafkaSinkConnection<C> {
316    /// Determines if `self` is compatible with another `StorageSinkConnection`,
317    /// in such a way that it is possible to turn `self` into `other` through a
318    /// valid series of transformations (e.g. no transformation or `ALTER
319    /// CONNECTION`).
320    pub fn alter_compatible(
321        &self,
322        id: GlobalId,
323        other: &KafkaSinkConnection<C>,
324    ) -> Result<(), AlterError> {
325        if self == other {
326            return Ok(());
327        }
328        let KafkaSinkConnection {
329            connection_id,
330            connection,
331            format,
332            relation_key_indices,
333            key_desc_and_indices,
334            headers_index,
335            value_desc,
336            partition_by,
337            topic,
338            compression_type,
339            progress_group_id,
340            transactional_id,
341            topic_options,
342            topic_metadata_refresh_interval,
343        } = self;
344
345        let compatibility_checks = [
346            (connection_id == &other.connection_id, "connection_id"),
347            (
348                connection.alter_compatible(id, &other.connection).is_ok(),
349                "connection",
350            ),
351            (format.alter_compatible(id, &other.format).is_ok(), "format"),
352            (
353                relation_key_indices == &other.relation_key_indices,
354                "relation_key_indices",
355            ),
356            (
357                key_desc_and_indices == &other.key_desc_and_indices,
358                "key_desc_and_indices",
359            ),
360            (headers_index == &other.headers_index, "headers_index"),
361            (value_desc == &other.value_desc, "value_desc"),
362            (partition_by == &other.partition_by, "partition_by"),
363            (topic == &other.topic, "topic"),
364            (
365                compression_type == &other.compression_type,
366                "compression_type",
367            ),
368            (
369                progress_group_id == &other.progress_group_id,
370                "progress_group_id",
371            ),
372            (
373                transactional_id == &other.transactional_id,
374                "transactional_id",
375            ),
376            (topic_options == &other.topic_options, "topic_config"),
377            (
378                topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
379                "topic_metadata_refresh_interval",
380            ),
381        ];
382        for (compatible, field) in compatibility_checks {
383            if !compatible {
384                tracing::warn!(
385                    "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
386                    self,
387                    other
388                );
389
390                return Err(AlterError { id });
391            }
392        }
393
394        Ok(())
395    }
396}
397
398impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
399    for KafkaSinkConnection<ReferencedConnection>
400{
401    fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
402        let KafkaSinkConnection {
403            connection_id,
404            connection,
405            format,
406            relation_key_indices,
407            key_desc_and_indices,
408            headers_index,
409            value_desc,
410            partition_by,
411            topic,
412            compression_type,
413            progress_group_id,
414            transactional_id,
415            topic_options,
416            topic_metadata_refresh_interval,
417        } = self;
418        KafkaSinkConnection {
419            connection_id,
420            connection: r.resolve_connection(connection).unwrap_kafka(),
421            format: format.into_inline_connection(r),
422            relation_key_indices,
423            key_desc_and_indices,
424            headers_index,
425            value_desc,
426            partition_by,
427            topic,
428            compression_type,
429            progress_group_id,
430            transactional_id,
431            topic_options,
432            topic_metadata_refresh_interval,
433        }
434    }
435}
436
437#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
438pub enum KafkaIdStyle {
439    /// A new-style id that is optionally prefixed.
440    Prefix(Option<String>),
441    /// A legacy style id.
442    Legacy,
443}
444
445#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
446pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
447    pub key_format: Option<KafkaSinkFormatType<C>>,
448    pub value_format: KafkaSinkFormatType<C>,
449}
450
451#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
452pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
453    Avro {
454        schema: String,
455        compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
456        csr_connection: C::Csr,
457    },
458    Json,
459    Text,
460    Bytes,
461}
462
463impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
464    pub fn get_format_name(&self) -> &str {
465        match self {
466            Self::Avro { .. } => "avro",
467            Self::Json => "json",
468            Self::Text => "text",
469            Self::Bytes => "bytes",
470        }
471    }
472}
473
474impl<C: ConnectionAccess> KafkaSinkFormat<C> {
475    pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
476        // For legacy reasons, if the key-format is none or the key & value formats are
477        // both the same (either avro or json), we return the value format name,
478        // otherwise we return a composite name.
479        match &self.key_format {
480            None => self.value_format.get_format_name().into(),
481            Some(key_format) => match (key_format, &self.value_format) {
482                (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
483                    "avro".into()
484                }
485                (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
486                (keyf, valuef) => format!(
487                    "key-{}-value-{}",
488                    keyf.get_format_name(),
489                    valuef.get_format_name()
490                )
491                .into(),
492            },
493        }
494    }
495
496    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
497        if self == other {
498            return Ok(());
499        }
500
501        match (&self.value_format, &other.value_format) {
502            (
503                KafkaSinkFormatType::Avro {
504                    schema,
505                    compatibility_level: _,
506                    csr_connection,
507                },
508                KafkaSinkFormatType::Avro {
509                    schema: other_schema,
510                    compatibility_level: _,
511                    csr_connection: other_csr_connection,
512                },
513            ) => {
514                if schema != other_schema
515                    || csr_connection
516                        .alter_compatible(id, other_csr_connection)
517                        .is_err()
518                {
519                    tracing::warn!(
520                        "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
521                        self,
522                        other
523                    );
524
525                    return Err(AlterError { id });
526                }
527            }
528            (s, o) => {
529                if s != o {
530                    tracing::warn!(
531                        "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
532                        s,
533                        o
534                    );
535                    return Err(AlterError { id });
536                }
537            }
538        }
539
540        match (&self.key_format, &other.key_format) {
541            (
542                Some(KafkaSinkFormatType::Avro {
543                    schema,
544                    compatibility_level: _,
545                    csr_connection,
546                }),
547                Some(KafkaSinkFormatType::Avro {
548                    schema: other_schema,
549                    compatibility_level: _,
550                    csr_connection: other_csr_connection,
551                }),
552            ) => {
553                if schema != other_schema
554                    || csr_connection
555                        .alter_compatible(id, other_csr_connection)
556                        .is_err()
557                {
558                    tracing::warn!(
559                        "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
560                        self,
561                        other
562                    );
563
564                    return Err(AlterError { id });
565                }
566            }
567            (s, o) => {
568                if s != o {
569                    tracing::warn!(
570                        "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
571                        s,
572                        o
573                    );
574                    return Err(AlterError { id });
575                }
576            }
577        }
578
579        Ok(())
580    }
581}
582
583impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
584    for KafkaSinkFormat<ReferencedConnection>
585{
586    fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
587        KafkaSinkFormat {
588            key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
589            value_format: self.value_format.into_inline_connection(&r),
590        }
591    }
592}
593
594impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
595    for KafkaSinkFormatType<ReferencedConnection>
596{
597    fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
598        match self {
599            KafkaSinkFormatType::Avro {
600                schema,
601                compatibility_level,
602                csr_connection,
603            } => KafkaSinkFormatType::Avro {
604                schema,
605                compatibility_level,
606                csr_connection: r.resolve_connection(csr_connection).unwrap_csr(),
607            },
608            KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
609            KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
610            KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
611        }
612    }
613}
614
615#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
616pub enum S3SinkFormat {
617    /// Encoded using the PG `COPY` protocol, with one of its supported formats.
618    PgCopy(CopyFormatParams<'static>),
619    /// Encoded as Parquet.
620    Parquet,
621}
622
623/// Info required to copy the data to s3.
624#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
625pub struct S3UploadInfo {
626    /// The s3 uri path to write the data to.
627    pub uri: String,
628    /// The max file size of each file uploaded to S3.
629    pub max_file_size: u64,
630    /// The relation desc of the data to be uploaded to S3.
631    pub desc: RelationDesc,
632    /// The selected sink format.
633    pub format: S3SinkFormat,
634}
635
636pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
637pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);
638
639#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
640pub struct IcebergSinkConnection<C: ConnectionAccess = InlinedConnection> {
641    pub catalog_connection_id: CatalogItemId,
642    pub catalog_connection: C::IcebergCatalog,
643    pub aws_connection_id: CatalogItemId,
644    pub aws_connection: C::Aws,
645    /// A natural key of the sinked relation (view or source).
646    pub relation_key_indices: Option<Vec<usize>>,
647    /// The user-specified key for the sink.
648    pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
649    pub namespace: String,
650    pub table: String,
651}
652
653impl<C: ConnectionAccess> IcebergSinkConnection<C> {
654    /// Determines if `self` is compatible with another `StorageSinkConnection`,
655    /// in such a way that it is possible to turn `self` into `other` through a
656    /// valid series of transformations (e.g. no transformation or `ALTER
657    /// CONNECTION`).
658    pub fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
659        if self == other {
660            return Ok(());
661        }
662        let IcebergSinkConnection {
663            catalog_connection_id: connection_id,
664            catalog_connection,
665            aws_connection_id,
666            aws_connection,
667            relation_key_indices,
668            key_desc_and_indices,
669            namespace,
670            table,
671        } = self;
672
673        let compatibility_checks = [
674            (
675                connection_id == &other.catalog_connection_id,
676                "connection_id",
677            ),
678            (
679                catalog_connection
680                    .alter_compatible(id, &other.catalog_connection)
681                    .is_ok(),
682                "catalog_connection",
683            ),
684            (
685                aws_connection_id == &other.aws_connection_id,
686                "aws_connection_id",
687            ),
688            (
689                aws_connection
690                    .alter_compatible(id, &other.aws_connection)
691                    .is_ok(),
692                "aws_connection",
693            ),
694            (
695                relation_key_indices == &other.relation_key_indices,
696                "relation_key_indices",
697            ),
698            (
699                key_desc_and_indices == &other.key_desc_and_indices,
700                "key_desc_and_indices",
701            ),
702            (namespace == &other.namespace, "namespace"),
703            (table == &other.table, "table"),
704        ];
705        for (compatible, field) in compatibility_checks {
706            if !compatible {
707                tracing::warn!(
708                    "IcebergSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
709                    self,
710                    other
711                );
712
713                return Err(AlterError { id });
714            }
715        }
716
717        Ok(())
718    }
719}
720
721impl<R: ConnectionResolver> IntoInlineConnection<IcebergSinkConnection, R>
722    for IcebergSinkConnection<ReferencedConnection>
723{
724    fn into_inline_connection(self, r: R) -> IcebergSinkConnection {
725        let IcebergSinkConnection {
726            catalog_connection_id,
727            catalog_connection,
728            aws_connection_id,
729            aws_connection,
730            relation_key_indices,
731            key_desc_and_indices,
732            namespace,
733            table,
734        } = self;
735        IcebergSinkConnection {
736            catalog_connection_id,
737            catalog_connection: r
738                .resolve_connection(catalog_connection)
739                .unwrap_iceberg_catalog(),
740            aws_connection_id,
741            aws_connection: r.resolve_connection(aws_connection).unwrap_aws(),
742            relation_key_indices,
743            key_desc_and_indices,
744            namespace,
745            table,
746        }
747    }
748}