1use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::bytes::ByteSize;
20use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
21#[cfg(any(test, feature = "proptest"))]
22use proptest_derive::Arbitrary;
23use serde::{Deserialize, Serialize};
24use timely::PartialOrder;
25use timely::progress::frontier::Antichain;
26
27use crate::AlterCompatible;
28use crate::connections::inline::{
29 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
30 ReferencedConnection,
31};
32use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
33use crate::controller::AlterError;
34
35pub mod s3_oneshot_sink;
36
37#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
39pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
40 pub from: GlobalId,
41 pub from_desc: RelationDesc,
42 pub connection: StorageSinkConnection,
43 pub with_snapshot: bool,
44 pub version: u64,
45 pub envelope: SinkEnvelope,
46 pub as_of: Antichain<T>,
47 pub from_storage_metadata: S,
48 pub to_storage_metadata: S,
49 pub commit_interval: Option<Duration>,
55}
56
57impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
58 for StorageSinkDesc<S, T>
59{
60 fn alter_compatible(
68 &self,
69 id: GlobalId,
70 other: &StorageSinkDesc<S, T>,
71 ) -> Result<(), AlterError> {
72 if self == other {
73 return Ok(());
74 }
75 let StorageSinkDesc {
76 from,
77 from_desc,
78 connection,
79 envelope,
80 version: _,
81 as_of: _,
83 from_storage_metadata,
84 with_snapshot,
85 to_storage_metadata,
86 commit_interval: _,
87 } = self;
88
89 let compatibility_checks = [
90 (from == &other.from, "from"),
91 (from_desc == &other.from_desc, "from_desc"),
92 (
93 connection.alter_compatible(id, &other.connection).is_ok(),
94 "connection",
95 ),
96 (envelope == &other.envelope, "envelope"),
97 (*with_snapshot || !other.with_snapshot, "with_snapshot"),
100 (
101 from_storage_metadata == &other.from_storage_metadata,
102 "from_storage_metadata",
103 ),
104 (
105 to_storage_metadata == &other.to_storage_metadata,
106 "to_storage_metadata",
107 ),
108 ];
109
110 for (compatible, field) in compatibility_checks {
111 if !compatible {
112 tracing::warn!(
113 "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
114 self,
115 other
116 );
117
118 return Err(AlterError { id });
119 }
120 }
121
122 Ok(())
123 }
124}
125
126#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
127pub enum SinkEnvelope {
128 Debezium,
130 Upsert,
131 Append,
133}
134
135#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
136pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
137 Kafka(KafkaSinkConnection<C>),
138 Iceberg(IcebergSinkConnection<C>),
139}
140
141impl<C: ConnectionAccess> StorageSinkConnection<C> {
142 pub fn alter_compatible(
147 &self,
148 id: GlobalId,
149 other: &StorageSinkConnection<C>,
150 ) -> Result<(), AlterError> {
151 if self == other {
152 return Ok(());
153 }
154 match (self, other) {
155 (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
156 s.alter_compatible(id, o)?
157 }
158 (StorageSinkConnection::Iceberg(s), StorageSinkConnection::Iceberg(o)) => {
159 s.alter_compatible(id, o)?
160 }
161 _ => {
162 tracing::warn!(
163 "StorageSinkConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
164 self,
165 other
166 );
167 return Err(AlterError { id });
168 }
169 }
170
171 Ok(())
172 }
173}
174
175impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
176 for StorageSinkConnection<ReferencedConnection>
177{
178 fn into_inline_connection(self, r: R) -> StorageSinkConnection {
179 match self {
180 Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
181 Self::Iceberg(conn) => StorageSinkConnection::Iceberg(conn.into_inline_connection(r)),
182 }
183 }
184}
185
186impl<C: ConnectionAccess> StorageSinkConnection<C> {
187 pub fn connection_id(&self) -> Option<CatalogItemId> {
189 use StorageSinkConnection::*;
190 match self {
191 Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
192 Iceberg(IcebergSinkConnection {
193 catalog_connection_id: connection_id,
194 ..
195 }) => Some(*connection_id),
196 }
197 }
198
199 pub fn name(&self) -> &'static str {
201 use StorageSinkConnection::*;
202 match self {
203 Kafka(_) => "kafka",
204 Iceberg(_) => "iceberg",
205 }
206 }
207}
208
209#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
210pub enum KafkaSinkCompressionType {
211 None,
212 Gzip,
213 Snappy,
214 Lz4,
215 Zstd,
216}
217
218impl KafkaSinkCompressionType {
219 pub fn to_librdkafka_option(&self) -> &'static str {
222 match self {
223 KafkaSinkCompressionType::None => "none",
224 KafkaSinkCompressionType::Gzip => "gzip",
225 KafkaSinkCompressionType::Snappy => "snappy",
226 KafkaSinkCompressionType::Lz4 => "lz4",
227 KafkaSinkCompressionType::Zstd => "zstd",
228 }
229 }
230}
231
232#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
233pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
234 pub connection_id: CatalogItemId,
235 pub connection: C::Kafka,
236 pub format: KafkaSinkFormat<C>,
237 pub relation_key_indices: Option<Vec<usize>>,
239 pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
241 pub headers_index: Option<usize>,
243 pub value_desc: RelationDesc,
244 pub partition_by: Option<MirScalarExpr>,
247 pub topic: String,
248 pub topic_options: KafkaTopicOptions,
250 pub compression_type: KafkaSinkCompressionType,
251 pub progress_group_id: KafkaIdStyle,
252 pub transactional_id: KafkaIdStyle,
253 pub topic_metadata_refresh_interval: Duration,
254}
255
256impl KafkaSinkConnection {
257 pub fn client_id(
262 &self,
263 configs: &ConfigSet,
264 connection_context: &ConnectionContext,
265 sink_id: GlobalId,
266 ) -> String {
267 let mut client_id =
268 KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
269 self.connection.enrich_client_id(configs, &mut client_id);
270 client_id
271 }
272
273 pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<'_, str> {
275 self.connection
276 .progress_topic(connection_context, self.connection_id)
277 }
278
279 pub fn progress_group_id(
285 &self,
286 connection_context: &ConnectionContext,
287 sink_id: GlobalId,
288 ) -> String {
289 match self.progress_group_id {
290 KafkaIdStyle::Prefix(ref prefix) => format!(
291 "{}{}",
292 prefix.as_deref().unwrap_or(""),
293 KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
294 ),
295 KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
296 }
297 }
298
299 pub fn transactional_id(
304 &self,
305 connection_context: &ConnectionContext,
306 sink_id: GlobalId,
307 ) -> String {
308 match self.transactional_id {
309 KafkaIdStyle::Prefix(ref prefix) => format!(
310 "{}{}",
311 prefix.as_deref().unwrap_or(""),
312 KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
313 ),
314 KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
315 }
316 }
317}
318
319impl<C: ConnectionAccess> KafkaSinkConnection<C> {
320 pub fn alter_compatible(
325 &self,
326 id: GlobalId,
327 other: &KafkaSinkConnection<C>,
328 ) -> Result<(), AlterError> {
329 if self == other {
330 return Ok(());
331 }
332 let KafkaSinkConnection {
333 connection_id,
334 connection,
335 format,
336 relation_key_indices,
337 key_desc_and_indices,
338 headers_index,
339 value_desc,
340 partition_by,
341 topic,
342 compression_type,
343 progress_group_id,
344 transactional_id,
345 topic_options,
346 topic_metadata_refresh_interval,
347 } = self;
348
349 let compatibility_checks = [
350 (connection_id == &other.connection_id, "connection_id"),
351 (
352 connection.alter_compatible(id, &other.connection).is_ok(),
353 "connection",
354 ),
355 (format.alter_compatible(id, &other.format).is_ok(), "format"),
356 (
357 relation_key_indices == &other.relation_key_indices,
358 "relation_key_indices",
359 ),
360 (
361 key_desc_and_indices == &other.key_desc_and_indices,
362 "key_desc_and_indices",
363 ),
364 (headers_index == &other.headers_index, "headers_index"),
365 (value_desc == &other.value_desc, "value_desc"),
366 (partition_by == &other.partition_by, "partition_by"),
367 (topic == &other.topic, "topic"),
368 (
369 compression_type == &other.compression_type,
370 "compression_type",
371 ),
372 (
373 progress_group_id == &other.progress_group_id,
374 "progress_group_id",
375 ),
376 (
377 transactional_id == &other.transactional_id,
378 "transactional_id",
379 ),
380 (topic_options == &other.topic_options, "topic_config"),
381 (
382 topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
383 "topic_metadata_refresh_interval",
384 ),
385 ];
386 for (compatible, field) in compatibility_checks {
387 if !compatible {
388 tracing::warn!(
389 "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
390 self,
391 other
392 );
393
394 return Err(AlterError { id });
395 }
396 }
397
398 Ok(())
399 }
400}
401
402impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
403 for KafkaSinkConnection<ReferencedConnection>
404{
405 fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
406 let KafkaSinkConnection {
407 connection_id,
408 connection,
409 format,
410 relation_key_indices,
411 key_desc_and_indices,
412 headers_index,
413 value_desc,
414 partition_by,
415 topic,
416 compression_type,
417 progress_group_id,
418 transactional_id,
419 topic_options,
420 topic_metadata_refresh_interval,
421 } = self;
422 KafkaSinkConnection {
423 connection_id,
424 connection: r.resolve_connection(connection).unwrap_kafka(),
425 format: format.into_inline_connection(r),
426 relation_key_indices,
427 key_desc_and_indices,
428 headers_index,
429 value_desc,
430 partition_by,
431 topic,
432 compression_type,
433 progress_group_id,
434 transactional_id,
435 topic_options,
436 topic_metadata_refresh_interval,
437 }
438 }
439}
440
441#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
442pub enum KafkaIdStyle {
443 Prefix(Option<String>),
445 Legacy,
447}
448
449#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
450pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
451 pub key_format: Option<KafkaSinkFormatType<C>>,
452 pub value_format: KafkaSinkFormatType<C>,
453}
454
455#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
456pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
457 Avro {
458 schema: String,
459 compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
460 csr_connection: C::Csr,
461 },
462 Json,
463 Text,
464 Bytes,
465}
466
467impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
468 pub fn get_format_name(&self) -> &str {
469 match self {
470 Self::Avro { .. } => "avro",
471 Self::Json => "json",
472 Self::Text => "text",
473 Self::Bytes => "bytes",
474 }
475 }
476}
477
478impl<C: ConnectionAccess> KafkaSinkFormat<C> {
479 pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
480 match &self.key_format {
484 None => self.value_format.get_format_name().into(),
485 Some(key_format) => match (key_format, &self.value_format) {
486 (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
487 "avro".into()
488 }
489 (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
490 (keyf, valuef) => format!(
491 "key-{}-value-{}",
492 keyf.get_format_name(),
493 valuef.get_format_name()
494 )
495 .into(),
496 },
497 }
498 }
499
500 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
501 if self == other {
502 return Ok(());
503 }
504
505 match (&self.value_format, &other.value_format) {
506 (
507 KafkaSinkFormatType::Avro {
508 schema,
509 compatibility_level: _,
510 csr_connection,
511 },
512 KafkaSinkFormatType::Avro {
513 schema: other_schema,
514 compatibility_level: _,
515 csr_connection: other_csr_connection,
516 },
517 ) => {
518 if schema != other_schema
519 || csr_connection
520 .alter_compatible(id, other_csr_connection)
521 .is_err()
522 {
523 tracing::warn!(
524 "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
525 self,
526 other
527 );
528
529 return Err(AlterError { id });
530 }
531 }
532 (s, o) => {
533 if s != o {
534 tracing::warn!(
535 "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
536 s,
537 o
538 );
539 return Err(AlterError { id });
540 }
541 }
542 }
543
544 match (&self.key_format, &other.key_format) {
545 (
546 Some(KafkaSinkFormatType::Avro {
547 schema,
548 compatibility_level: _,
549 csr_connection,
550 }),
551 Some(KafkaSinkFormatType::Avro {
552 schema: other_schema,
553 compatibility_level: _,
554 csr_connection: other_csr_connection,
555 }),
556 ) => {
557 if schema != other_schema
558 || csr_connection
559 .alter_compatible(id, other_csr_connection)
560 .is_err()
561 {
562 tracing::warn!(
563 "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
564 self,
565 other
566 );
567
568 return Err(AlterError { id });
569 }
570 }
571 (s, o) => {
572 if s != o {
573 tracing::warn!(
574 "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
575 s,
576 o
577 );
578 return Err(AlterError { id });
579 }
580 }
581 }
582
583 Ok(())
584 }
585}
586
587impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
588 for KafkaSinkFormat<ReferencedConnection>
589{
590 fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
591 KafkaSinkFormat {
592 key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
593 value_format: self.value_format.into_inline_connection(&r),
594 }
595 }
596}
597
598impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
599 for KafkaSinkFormatType<ReferencedConnection>
600{
601 fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
602 match self {
603 KafkaSinkFormatType::Avro {
604 schema,
605 compatibility_level,
606 csr_connection,
607 } => KafkaSinkFormatType::Avro {
608 schema,
609 compatibility_level,
610 csr_connection: r.resolve_connection(csr_connection).unwrap_csr(),
611 },
612 KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
613 KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
614 KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
615 }
616 }
617}
618
619#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
620pub enum S3SinkFormat {
621 PgCopy(CopyFormatParams<'static>),
623 Parquet,
625}
626
627#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
629pub struct S3UploadInfo {
630 pub uri: String,
632 pub max_file_size: u64,
634 pub desc: RelationDesc,
636 pub format: S3SinkFormat,
638}
639
640pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
641pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);
642
643pub const ICEBERG_APPEND_DIFF_COLUMN: &str = "_mz_diff";
645pub const ICEBERG_APPEND_TIMESTAMP_COLUMN: &str = "_mz_timestamp";
647
648pub const ICEBERG_UINT64_DECIMAL_PRECISION: u8 = 20;
651
652pub fn iceberg_type_overrides(
667 scalar_type: &mz_repr::SqlScalarType,
668) -> Option<(arrow::datatypes::DataType, String)> {
669 use arrow::datatypes::DataType;
670 use mz_repr::SqlScalarType;
671 match scalar_type {
672 SqlScalarType::UInt16 => Some((DataType::Int32, "uint2".to_string())),
673 SqlScalarType::UInt32 => Some((DataType::Int64, "uint4".to_string())),
674 SqlScalarType::UInt64 => Some((
675 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
676 "uint8".to_string(),
677 )),
678 SqlScalarType::MzTimestamp => Some((
679 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
680 "mz_timestamp".to_string(),
681 )),
682 SqlScalarType::Interval => Some((DataType::LargeUtf8, "interval".to_string())),
683 _ => None,
684 }
685}
686
687#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
688#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
689pub struct IcebergSinkConnection<C: ConnectionAccess = InlinedConnection> {
690 pub catalog_connection_id: CatalogItemId,
691 pub catalog_connection: C::IcebergCatalog,
692 pub aws_connection_id: CatalogItemId,
693 pub aws_connection: C::Aws,
694 pub relation_key_indices: Option<Vec<usize>>,
696 pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
698 pub namespace: String,
699 pub table: String,
700}
701
702impl<C: ConnectionAccess> IcebergSinkConnection<C> {
703 pub fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
708 if self == other {
709 return Ok(());
710 }
711 let IcebergSinkConnection {
712 catalog_connection_id: connection_id,
713 catalog_connection,
714 aws_connection_id,
715 aws_connection,
716 relation_key_indices,
717 key_desc_and_indices,
718 namespace,
719 table,
720 } = self;
721
722 let compatibility_checks = [
723 (
724 connection_id == &other.catalog_connection_id,
725 "connection_id",
726 ),
727 (
728 catalog_connection
729 .alter_compatible(id, &other.catalog_connection)
730 .is_ok(),
731 "catalog_connection",
732 ),
733 (
734 aws_connection_id == &other.aws_connection_id,
735 "aws_connection_id",
736 ),
737 (
738 aws_connection
739 .alter_compatible(id, &other.aws_connection)
740 .is_ok(),
741 "aws_connection",
742 ),
743 (
744 relation_key_indices == &other.relation_key_indices,
745 "relation_key_indices",
746 ),
747 (
748 key_desc_and_indices == &other.key_desc_and_indices,
749 "key_desc_and_indices",
750 ),
751 (namespace == &other.namespace, "namespace"),
752 (table == &other.table, "table"),
753 ];
754 for (compatible, field) in compatibility_checks {
755 if !compatible {
756 tracing::warn!(
757 "IcebergSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
758 self,
759 other
760 );
761
762 return Err(AlterError { id });
763 }
764 }
765
766 Ok(())
767 }
768}
769
770impl<R: ConnectionResolver> IntoInlineConnection<IcebergSinkConnection, R>
771 for IcebergSinkConnection<ReferencedConnection>
772{
773 fn into_inline_connection(self, r: R) -> IcebergSinkConnection {
774 let IcebergSinkConnection {
775 catalog_connection_id,
776 catalog_connection,
777 aws_connection_id,
778 aws_connection,
779 relation_key_indices,
780 key_desc_and_indices,
781 namespace,
782 table,
783 } = self;
784 IcebergSinkConnection {
785 catalog_connection_id,
786 catalog_connection: r
787 .resolve_connection(catalog_connection)
788 .unwrap_iceberg_catalog(),
789 aws_connection_id,
790 aws_connection: r.resolve_connection(aws_connection).unwrap_aws(),
791 relation_key_indices,
792 key_desc_and_indices,
793 namespace,
794 table,
795 }
796 }
797}