1use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::bytes::ByteSize;
20use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
21use proptest_derive::Arbitrary;
22use serde::{Deserialize, Serialize};
23use timely::PartialOrder;
24use timely::progress::frontier::Antichain;
25
26use crate::AlterCompatible;
27use crate::connections::inline::{
28 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
29 ReferencedConnection,
30};
31use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
32use crate::controller::AlterError;
33
34pub mod s3_oneshot_sink;
35
36#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
38pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
39 pub from: GlobalId,
40 pub from_desc: RelationDesc,
41 pub connection: StorageSinkConnection,
42 pub with_snapshot: bool,
43 pub version: u64,
44 pub envelope: SinkEnvelope,
45 pub as_of: Antichain<T>,
46 pub from_storage_metadata: S,
47 pub to_storage_metadata: S,
48 pub commit_interval: Option<Duration>,
54}
55
56impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
57 for StorageSinkDesc<S, T>
58{
59 fn alter_compatible(
67 &self,
68 id: GlobalId,
69 other: &StorageSinkDesc<S, T>,
70 ) -> Result<(), AlterError> {
71 if self == other {
72 return Ok(());
73 }
74 let StorageSinkDesc {
75 from,
76 from_desc,
77 connection,
78 envelope,
79 version: _,
80 as_of: _,
82 from_storage_metadata,
83 with_snapshot,
84 to_storage_metadata,
85 commit_interval: _,
86 } = self;
87
88 let compatibility_checks = [
89 (from == &other.from, "from"),
90 (from_desc == &other.from_desc, "from_desc"),
91 (
92 connection.alter_compatible(id, &other.connection).is_ok(),
93 "connection",
94 ),
95 (envelope == &other.envelope, "envelope"),
96 (*with_snapshot || !other.with_snapshot, "with_snapshot"),
99 (
100 from_storage_metadata == &other.from_storage_metadata,
101 "from_storage_metadata",
102 ),
103 (
104 to_storage_metadata == &other.to_storage_metadata,
105 "to_storage_metadata",
106 ),
107 ];
108
109 for (compatible, field) in compatibility_checks {
110 if !compatible {
111 tracing::warn!(
112 "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
113 self,
114 other
115 );
116
117 return Err(AlterError { id });
118 }
119 }
120
121 Ok(())
122 }
123}
124
125#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
126pub enum SinkEnvelope {
127 Debezium,
129 Upsert,
130 Append,
132}
133
134#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
135pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
136 Kafka(KafkaSinkConnection<C>),
137 Iceberg(IcebergSinkConnection<C>),
138}
139
140impl<C: ConnectionAccess> StorageSinkConnection<C> {
141 pub fn alter_compatible(
146 &self,
147 id: GlobalId,
148 other: &StorageSinkConnection<C>,
149 ) -> Result<(), AlterError> {
150 if self == other {
151 return Ok(());
152 }
153 match (self, other) {
154 (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
155 s.alter_compatible(id, o)?
156 }
157 (StorageSinkConnection::Iceberg(s), StorageSinkConnection::Iceberg(o)) => {
158 s.alter_compatible(id, o)?
159 }
160 _ => {
161 tracing::warn!(
162 "StorageSinkConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
163 self,
164 other
165 );
166 return Err(AlterError { id });
167 }
168 }
169
170 Ok(())
171 }
172}
173
174impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
175 for StorageSinkConnection<ReferencedConnection>
176{
177 fn into_inline_connection(self, r: R) -> StorageSinkConnection {
178 match self {
179 Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
180 Self::Iceberg(conn) => StorageSinkConnection::Iceberg(conn.into_inline_connection(r)),
181 }
182 }
183}
184
185impl<C: ConnectionAccess> StorageSinkConnection<C> {
186 pub fn connection_id(&self) -> Option<CatalogItemId> {
188 use StorageSinkConnection::*;
189 match self {
190 Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
191 Iceberg(IcebergSinkConnection {
192 catalog_connection_id: connection_id,
193 ..
194 }) => Some(*connection_id),
195 }
196 }
197
198 pub fn name(&self) -> &'static str {
200 use StorageSinkConnection::*;
201 match self {
202 Kafka(_) => "kafka",
203 Iceberg(_) => "iceberg",
204 }
205 }
206}
207
208#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
209pub enum KafkaSinkCompressionType {
210 None,
211 Gzip,
212 Snappy,
213 Lz4,
214 Zstd,
215}
216
217impl KafkaSinkCompressionType {
218 pub fn to_librdkafka_option(&self) -> &'static str {
221 match self {
222 KafkaSinkCompressionType::None => "none",
223 KafkaSinkCompressionType::Gzip => "gzip",
224 KafkaSinkCompressionType::Snappy => "snappy",
225 KafkaSinkCompressionType::Lz4 => "lz4",
226 KafkaSinkCompressionType::Zstd => "zstd",
227 }
228 }
229}
230
231#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
232pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
233 pub connection_id: CatalogItemId,
234 pub connection: C::Kafka,
235 pub format: KafkaSinkFormat<C>,
236 pub relation_key_indices: Option<Vec<usize>>,
238 pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
240 pub headers_index: Option<usize>,
242 pub value_desc: RelationDesc,
243 pub partition_by: Option<MirScalarExpr>,
246 pub topic: String,
247 pub topic_options: KafkaTopicOptions,
249 pub compression_type: KafkaSinkCompressionType,
250 pub progress_group_id: KafkaIdStyle,
251 pub transactional_id: KafkaIdStyle,
252 pub topic_metadata_refresh_interval: Duration,
253}
254
255impl KafkaSinkConnection {
256 pub fn client_id(
261 &self,
262 configs: &ConfigSet,
263 connection_context: &ConnectionContext,
264 sink_id: GlobalId,
265 ) -> String {
266 let mut client_id =
267 KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
268 self.connection.enrich_client_id(configs, &mut client_id);
269 client_id
270 }
271
272 pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<'_, str> {
274 self.connection
275 .progress_topic(connection_context, self.connection_id)
276 }
277
278 pub fn progress_group_id(
284 &self,
285 connection_context: &ConnectionContext,
286 sink_id: GlobalId,
287 ) -> String {
288 match self.progress_group_id {
289 KafkaIdStyle::Prefix(ref prefix) => format!(
290 "{}{}",
291 prefix.as_deref().unwrap_or(""),
292 KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
293 ),
294 KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
295 }
296 }
297
298 pub fn transactional_id(
303 &self,
304 connection_context: &ConnectionContext,
305 sink_id: GlobalId,
306 ) -> String {
307 match self.transactional_id {
308 KafkaIdStyle::Prefix(ref prefix) => format!(
309 "{}{}",
310 prefix.as_deref().unwrap_or(""),
311 KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
312 ),
313 KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
314 }
315 }
316}
317
318impl<C: ConnectionAccess> KafkaSinkConnection<C> {
319 pub fn alter_compatible(
324 &self,
325 id: GlobalId,
326 other: &KafkaSinkConnection<C>,
327 ) -> Result<(), AlterError> {
328 if self == other {
329 return Ok(());
330 }
331 let KafkaSinkConnection {
332 connection_id,
333 connection,
334 format,
335 relation_key_indices,
336 key_desc_and_indices,
337 headers_index,
338 value_desc,
339 partition_by,
340 topic,
341 compression_type,
342 progress_group_id,
343 transactional_id,
344 topic_options,
345 topic_metadata_refresh_interval,
346 } = self;
347
348 let compatibility_checks = [
349 (connection_id == &other.connection_id, "connection_id"),
350 (
351 connection.alter_compatible(id, &other.connection).is_ok(),
352 "connection",
353 ),
354 (format.alter_compatible(id, &other.format).is_ok(), "format"),
355 (
356 relation_key_indices == &other.relation_key_indices,
357 "relation_key_indices",
358 ),
359 (
360 key_desc_and_indices == &other.key_desc_and_indices,
361 "key_desc_and_indices",
362 ),
363 (headers_index == &other.headers_index, "headers_index"),
364 (value_desc == &other.value_desc, "value_desc"),
365 (partition_by == &other.partition_by, "partition_by"),
366 (topic == &other.topic, "topic"),
367 (
368 compression_type == &other.compression_type,
369 "compression_type",
370 ),
371 (
372 progress_group_id == &other.progress_group_id,
373 "progress_group_id",
374 ),
375 (
376 transactional_id == &other.transactional_id,
377 "transactional_id",
378 ),
379 (topic_options == &other.topic_options, "topic_config"),
380 (
381 topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
382 "topic_metadata_refresh_interval",
383 ),
384 ];
385 for (compatible, field) in compatibility_checks {
386 if !compatible {
387 tracing::warn!(
388 "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
389 self,
390 other
391 );
392
393 return Err(AlterError { id });
394 }
395 }
396
397 Ok(())
398 }
399}
400
401impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
402 for KafkaSinkConnection<ReferencedConnection>
403{
404 fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
405 let KafkaSinkConnection {
406 connection_id,
407 connection,
408 format,
409 relation_key_indices,
410 key_desc_and_indices,
411 headers_index,
412 value_desc,
413 partition_by,
414 topic,
415 compression_type,
416 progress_group_id,
417 transactional_id,
418 topic_options,
419 topic_metadata_refresh_interval,
420 } = self;
421 KafkaSinkConnection {
422 connection_id,
423 connection: r.resolve_connection(connection).unwrap_kafka(),
424 format: format.into_inline_connection(r),
425 relation_key_indices,
426 key_desc_and_indices,
427 headers_index,
428 value_desc,
429 partition_by,
430 topic,
431 compression_type,
432 progress_group_id,
433 transactional_id,
434 topic_options,
435 topic_metadata_refresh_interval,
436 }
437 }
438}
439
440#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
441pub enum KafkaIdStyle {
442 Prefix(Option<String>),
444 Legacy,
446}
447
448#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
449pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
450 pub key_format: Option<KafkaSinkFormatType<C>>,
451 pub value_format: KafkaSinkFormatType<C>,
452}
453
454#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
455pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
456 Avro {
457 schema: String,
458 compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
459 csr_connection: C::Csr,
460 },
461 Json,
462 Text,
463 Bytes,
464}
465
466impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
467 pub fn get_format_name(&self) -> &str {
468 match self {
469 Self::Avro { .. } => "avro",
470 Self::Json => "json",
471 Self::Text => "text",
472 Self::Bytes => "bytes",
473 }
474 }
475}
476
477impl<C: ConnectionAccess> KafkaSinkFormat<C> {
478 pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
479 match &self.key_format {
483 None => self.value_format.get_format_name().into(),
484 Some(key_format) => match (key_format, &self.value_format) {
485 (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
486 "avro".into()
487 }
488 (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
489 (keyf, valuef) => format!(
490 "key-{}-value-{}",
491 keyf.get_format_name(),
492 valuef.get_format_name()
493 )
494 .into(),
495 },
496 }
497 }
498
499 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
500 if self == other {
501 return Ok(());
502 }
503
504 match (&self.value_format, &other.value_format) {
505 (
506 KafkaSinkFormatType::Avro {
507 schema,
508 compatibility_level: _,
509 csr_connection,
510 },
511 KafkaSinkFormatType::Avro {
512 schema: other_schema,
513 compatibility_level: _,
514 csr_connection: other_csr_connection,
515 },
516 ) => {
517 if schema != other_schema
518 || csr_connection
519 .alter_compatible(id, other_csr_connection)
520 .is_err()
521 {
522 tracing::warn!(
523 "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
524 self,
525 other
526 );
527
528 return Err(AlterError { id });
529 }
530 }
531 (s, o) => {
532 if s != o {
533 tracing::warn!(
534 "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
535 s,
536 o
537 );
538 return Err(AlterError { id });
539 }
540 }
541 }
542
543 match (&self.key_format, &other.key_format) {
544 (
545 Some(KafkaSinkFormatType::Avro {
546 schema,
547 compatibility_level: _,
548 csr_connection,
549 }),
550 Some(KafkaSinkFormatType::Avro {
551 schema: other_schema,
552 compatibility_level: _,
553 csr_connection: other_csr_connection,
554 }),
555 ) => {
556 if schema != other_schema
557 || csr_connection
558 .alter_compatible(id, other_csr_connection)
559 .is_err()
560 {
561 tracing::warn!(
562 "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
563 self,
564 other
565 );
566
567 return Err(AlterError { id });
568 }
569 }
570 (s, o) => {
571 if s != o {
572 tracing::warn!(
573 "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
574 s,
575 o
576 );
577 return Err(AlterError { id });
578 }
579 }
580 }
581
582 Ok(())
583 }
584}
585
586impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
587 for KafkaSinkFormat<ReferencedConnection>
588{
589 fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
590 KafkaSinkFormat {
591 key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
592 value_format: self.value_format.into_inline_connection(&r),
593 }
594 }
595}
596
597impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
598 for KafkaSinkFormatType<ReferencedConnection>
599{
600 fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
601 match self {
602 KafkaSinkFormatType::Avro {
603 schema,
604 compatibility_level,
605 csr_connection,
606 } => KafkaSinkFormatType::Avro {
607 schema,
608 compatibility_level,
609 csr_connection: r.resolve_connection(csr_connection).unwrap_csr(),
610 },
611 KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
612 KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
613 KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
614 }
615 }
616}
617
618#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
619pub enum S3SinkFormat {
620 PgCopy(CopyFormatParams<'static>),
622 Parquet,
624}
625
626#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
628pub struct S3UploadInfo {
629 pub uri: String,
631 pub max_file_size: u64,
633 pub desc: RelationDesc,
635 pub format: S3SinkFormat,
637}
638
639pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
640pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);
641
642pub const ICEBERG_APPEND_DIFF_COLUMN: &str = "_mz_diff";
644pub const ICEBERG_APPEND_TIMESTAMP_COLUMN: &str = "_mz_timestamp";
646
647pub const ICEBERG_UINT64_DECIMAL_PRECISION: u8 = 20;
650
651pub fn iceberg_type_overrides(
666 scalar_type: &mz_repr::SqlScalarType,
667) -> Option<(arrow::datatypes::DataType, String)> {
668 use arrow::datatypes::DataType;
669 use mz_repr::SqlScalarType;
670 match scalar_type {
671 SqlScalarType::UInt16 => Some((DataType::Int32, "uint2".to_string())),
672 SqlScalarType::UInt32 => Some((DataType::Int64, "uint4".to_string())),
673 SqlScalarType::UInt64 => Some((
674 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
675 "uint8".to_string(),
676 )),
677 SqlScalarType::MzTimestamp => Some((
678 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
679 "mz_timestamp".to_string(),
680 )),
681 SqlScalarType::Interval => Some((DataType::LargeUtf8, "interval".to_string())),
682 _ => None,
683 }
684}
685
686#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
687pub struct IcebergSinkConnection<C: ConnectionAccess = InlinedConnection> {
688 pub catalog_connection_id: CatalogItemId,
689 pub catalog_connection: C::IcebergCatalog,
690 pub aws_connection_id: CatalogItemId,
691 pub aws_connection: C::Aws,
692 pub relation_key_indices: Option<Vec<usize>>,
694 pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
696 pub namespace: String,
697 pub table: String,
698}
699
700impl<C: ConnectionAccess> IcebergSinkConnection<C> {
701 pub fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
706 if self == other {
707 return Ok(());
708 }
709 let IcebergSinkConnection {
710 catalog_connection_id: connection_id,
711 catalog_connection,
712 aws_connection_id,
713 aws_connection,
714 relation_key_indices,
715 key_desc_and_indices,
716 namespace,
717 table,
718 } = self;
719
720 let compatibility_checks = [
721 (
722 connection_id == &other.catalog_connection_id,
723 "connection_id",
724 ),
725 (
726 catalog_connection
727 .alter_compatible(id, &other.catalog_connection)
728 .is_ok(),
729 "catalog_connection",
730 ),
731 (
732 aws_connection_id == &other.aws_connection_id,
733 "aws_connection_id",
734 ),
735 (
736 aws_connection
737 .alter_compatible(id, &other.aws_connection)
738 .is_ok(),
739 "aws_connection",
740 ),
741 (
742 relation_key_indices == &other.relation_key_indices,
743 "relation_key_indices",
744 ),
745 (
746 key_desc_and_indices == &other.key_desc_and_indices,
747 "key_desc_and_indices",
748 ),
749 (namespace == &other.namespace, "namespace"),
750 (table == &other.table, "table"),
751 ];
752 for (compatible, field) in compatibility_checks {
753 if !compatible {
754 tracing::warn!(
755 "IcebergSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
756 self,
757 other
758 );
759
760 return Err(AlterError { id });
761 }
762 }
763
764 Ok(())
765 }
766}
767
768impl<R: ConnectionResolver> IntoInlineConnection<IcebergSinkConnection, R>
769 for IcebergSinkConnection<ReferencedConnection>
770{
771 fn into_inline_connection(self, r: R) -> IcebergSinkConnection {
772 let IcebergSinkConnection {
773 catalog_connection_id,
774 catalog_connection,
775 aws_connection_id,
776 aws_connection,
777 relation_key_indices,
778 key_desc_and_indices,
779 namespace,
780 table,
781 } = self;
782 IcebergSinkConnection {
783 catalog_connection_id,
784 catalog_connection: r
785 .resolve_connection(catalog_connection)
786 .unwrap_iceberg_catalog(),
787 aws_connection_id,
788 aws_connection: r.resolve_connection(aws_connection).unwrap_aws(),
789 relation_key_indices,
790 key_desc_and_indices,
791 namespace,
792 table,
793 }
794 }
795}