1use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::bytes::ByteSize;
20use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
21#[cfg(any(test, feature = "proptest"))]
22use proptest_derive::Arbitrary;
23use serde::{Deserialize, Serialize};
24use timely::PartialOrder;
25use timely::progress::frontier::Antichain;
26
27use crate::AlterCompatible;
28use crate::connections::inline::{
29 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
30 ReferencedConnection,
31};
32use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
33use crate::controller::AlterError;
34use crate::wire_format::WireFormat;
35
36pub mod s3_oneshot_sink;
37
38#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
40pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
41 pub from: GlobalId,
42 pub from_desc: RelationDesc,
43 pub connection: StorageSinkConnection,
44 pub with_snapshot: bool,
45 pub version: u64,
46 pub envelope: SinkEnvelope,
47 pub as_of: Antichain<T>,
48 pub from_storage_metadata: S,
49 pub to_storage_metadata: S,
50 pub commit_interval: Option<Duration>,
56}
57
58impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
59 for StorageSinkDesc<S, T>
60{
61 fn alter_compatible(
69 &self,
70 id: GlobalId,
71 other: &StorageSinkDesc<S, T>,
72 ) -> Result<(), AlterError> {
73 if self == other {
74 return Ok(());
75 }
76 let StorageSinkDesc {
77 from,
78 from_desc,
79 connection,
80 envelope,
81 version: _,
82 as_of: _,
84 from_storage_metadata,
85 with_snapshot,
86 to_storage_metadata,
87 commit_interval: _,
88 } = self;
89
90 let compatibility_checks = [
91 (from == &other.from, "from"),
92 (from_desc == &other.from_desc, "from_desc"),
93 (
94 connection.alter_compatible(id, &other.connection).is_ok(),
95 "connection",
96 ),
97 (envelope == &other.envelope, "envelope"),
98 (*with_snapshot || !other.with_snapshot, "with_snapshot"),
101 (
102 from_storage_metadata == &other.from_storage_metadata,
103 "from_storage_metadata",
104 ),
105 (
106 to_storage_metadata == &other.to_storage_metadata,
107 "to_storage_metadata",
108 ),
109 ];
110
111 for (compatible, field) in compatibility_checks {
112 if !compatible {
113 tracing::warn!(
114 "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
115 self,
116 other
117 );
118
119 return Err(AlterError { id });
120 }
121 }
122
123 Ok(())
124 }
125}
126
127#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
128pub enum SinkEnvelope {
129 Debezium,
131 Upsert,
132 Append,
134}
135
136#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
137pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
138 Kafka(KafkaSinkConnection<C>),
139 Iceberg(IcebergSinkConnection<C>),
140}
141
142impl<C: ConnectionAccess> StorageSinkConnection<C> {
143 pub fn alter_compatible(
148 &self,
149 id: GlobalId,
150 other: &StorageSinkConnection<C>,
151 ) -> Result<(), AlterError> {
152 if self == other {
153 return Ok(());
154 }
155 match (self, other) {
156 (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
157 s.alter_compatible(id, o)?
158 }
159 (StorageSinkConnection::Iceberg(s), StorageSinkConnection::Iceberg(o)) => {
160 s.alter_compatible(id, o)?
161 }
162 _ => {
163 tracing::warn!(
164 "StorageSinkConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
165 self,
166 other
167 );
168 return Err(AlterError { id });
169 }
170 }
171
172 Ok(())
173 }
174}
175
176impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
177 for StorageSinkConnection<ReferencedConnection>
178{
179 fn into_inline_connection(self, r: R) -> StorageSinkConnection {
180 match self {
181 Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
182 Self::Iceberg(conn) => StorageSinkConnection::Iceberg(conn.into_inline_connection(r)),
183 }
184 }
185}
186
187impl<C: ConnectionAccess> StorageSinkConnection<C> {
188 pub fn connection_id(&self) -> Option<CatalogItemId> {
190 use StorageSinkConnection::*;
191 match self {
192 Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
193 Iceberg(IcebergSinkConnection {
194 catalog_connection_id: connection_id,
195 ..
196 }) => Some(*connection_id),
197 }
198 }
199
200 pub fn name(&self) -> &'static str {
202 use StorageSinkConnection::*;
203 match self {
204 Kafka(_) => "kafka",
205 Iceberg(_) => "iceberg",
206 }
207 }
208}
209
210#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
211pub enum KafkaSinkCompressionType {
212 None,
213 Gzip,
214 Snappy,
215 Lz4,
216 Zstd,
217}
218
219impl KafkaSinkCompressionType {
220 pub fn to_librdkafka_option(&self) -> &'static str {
223 match self {
224 KafkaSinkCompressionType::None => "none",
225 KafkaSinkCompressionType::Gzip => "gzip",
226 KafkaSinkCompressionType::Snappy => "snappy",
227 KafkaSinkCompressionType::Lz4 => "lz4",
228 KafkaSinkCompressionType::Zstd => "zstd",
229 }
230 }
231}
232
233#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
234pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
235 pub connection_id: CatalogItemId,
236 pub connection: C::Kafka,
237 pub format: KafkaSinkFormat<C>,
238 pub relation_key_indices: Option<Vec<usize>>,
240 pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
242 pub headers_index: Option<usize>,
244 pub value_desc: RelationDesc,
245 pub partition_by: Option<MirScalarExpr>,
248 pub topic: String,
249 pub topic_options: KafkaTopicOptions,
251 pub compression_type: KafkaSinkCompressionType,
252 pub progress_group_id: KafkaIdStyle,
253 pub transactional_id: KafkaIdStyle,
254 pub topic_metadata_refresh_interval: Duration,
255}
256
257impl KafkaSinkConnection {
258 pub fn client_id(
263 &self,
264 configs: &ConfigSet,
265 connection_context: &ConnectionContext,
266 sink_id: GlobalId,
267 ) -> String {
268 let mut client_id =
269 KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
270 self.connection.enrich_client_id(configs, &mut client_id);
271 client_id
272 }
273
274 pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<'_, str> {
276 self.connection
277 .progress_topic(connection_context, self.connection_id)
278 }
279
280 pub fn progress_group_id(
286 &self,
287 connection_context: &ConnectionContext,
288 sink_id: GlobalId,
289 ) -> String {
290 match self.progress_group_id {
291 KafkaIdStyle::Prefix(ref prefix) => format!(
292 "{}{}",
293 prefix.as_deref().unwrap_or(""),
294 KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
295 ),
296 KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
297 }
298 }
299
300 pub fn transactional_id(
305 &self,
306 connection_context: &ConnectionContext,
307 sink_id: GlobalId,
308 ) -> String {
309 match self.transactional_id {
310 KafkaIdStyle::Prefix(ref prefix) => format!(
311 "{}{}",
312 prefix.as_deref().unwrap_or(""),
313 KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
314 ),
315 KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
316 }
317 }
318}
319
320impl<C: ConnectionAccess> KafkaSinkConnection<C> {
321 pub fn alter_compatible(
326 &self,
327 id: GlobalId,
328 other: &KafkaSinkConnection<C>,
329 ) -> Result<(), AlterError> {
330 if self == other {
331 return Ok(());
332 }
333 let KafkaSinkConnection {
334 connection_id,
335 connection,
336 format,
337 relation_key_indices,
338 key_desc_and_indices,
339 headers_index,
340 value_desc,
341 partition_by,
342 topic,
343 compression_type,
344 progress_group_id,
345 transactional_id,
346 topic_options,
347 topic_metadata_refresh_interval,
348 } = self;
349
350 let compatibility_checks = [
351 (connection_id == &other.connection_id, "connection_id"),
352 (
353 connection.alter_compatible(id, &other.connection).is_ok(),
354 "connection",
355 ),
356 (format.alter_compatible(id, &other.format).is_ok(), "format"),
357 (
358 relation_key_indices == &other.relation_key_indices,
359 "relation_key_indices",
360 ),
361 (
362 key_desc_and_indices == &other.key_desc_and_indices,
363 "key_desc_and_indices",
364 ),
365 (headers_index == &other.headers_index, "headers_index"),
366 (value_desc == &other.value_desc, "value_desc"),
367 (partition_by == &other.partition_by, "partition_by"),
368 (topic == &other.topic, "topic"),
369 (
370 compression_type == &other.compression_type,
371 "compression_type",
372 ),
373 (
374 progress_group_id == &other.progress_group_id,
375 "progress_group_id",
376 ),
377 (
378 transactional_id == &other.transactional_id,
379 "transactional_id",
380 ),
381 (topic_options == &other.topic_options, "topic_config"),
382 (
383 topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
384 "topic_metadata_refresh_interval",
385 ),
386 ];
387 for (compatible, field) in compatibility_checks {
388 if !compatible {
389 tracing::warn!(
390 "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
391 self,
392 other
393 );
394
395 return Err(AlterError { id });
396 }
397 }
398
399 Ok(())
400 }
401}
402
403impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
404 for KafkaSinkConnection<ReferencedConnection>
405{
406 fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
407 let KafkaSinkConnection {
408 connection_id,
409 connection,
410 format,
411 relation_key_indices,
412 key_desc_and_indices,
413 headers_index,
414 value_desc,
415 partition_by,
416 topic,
417 compression_type,
418 progress_group_id,
419 transactional_id,
420 topic_options,
421 topic_metadata_refresh_interval,
422 } = self;
423 KafkaSinkConnection {
424 connection_id,
425 connection: r.resolve_connection(connection).unwrap_kafka(),
426 format: format.into_inline_connection(r),
427 relation_key_indices,
428 key_desc_and_indices,
429 headers_index,
430 value_desc,
431 partition_by,
432 topic,
433 compression_type,
434 progress_group_id,
435 transactional_id,
436 topic_options,
437 topic_metadata_refresh_interval,
438 }
439 }
440}
441
442#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
443pub enum KafkaIdStyle {
444 Prefix(Option<String>),
446 Legacy,
448}
449
450#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
451pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
452 pub key_format: Option<KafkaSinkFormatType<C>>,
453 pub value_format: KafkaSinkFormatType<C>,
454}
455
456#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
457pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
458 Avro {
459 schema: String,
460 compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
461 wire_format: WireFormat<C>,
464 },
465 Json,
466 Text,
467 Bytes,
468}
469
470impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
471 pub fn get_format_name(&self) -> &str {
472 match self {
473 Self::Avro { .. } => "avro",
474 Self::Json => "json",
475 Self::Text => "text",
476 Self::Bytes => "bytes",
477 }
478 }
479}
480
481impl<C: ConnectionAccess> KafkaSinkFormat<C> {
482 pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
483 match &self.key_format {
487 None => self.value_format.get_format_name().into(),
488 Some(key_format) => match (key_format, &self.value_format) {
489 (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
490 "avro".into()
491 }
492 (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
493 (keyf, valuef) => format!(
494 "key-{}-value-{}",
495 keyf.get_format_name(),
496 valuef.get_format_name()
497 )
498 .into(),
499 },
500 }
501 }
502
503 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
504 if self == other {
505 return Ok(());
506 }
507
508 match (&self.value_format, &other.value_format) {
509 (
510 KafkaSinkFormatType::Avro {
511 schema,
512 compatibility_level: _,
513 wire_format,
514 },
515 KafkaSinkFormatType::Avro {
516 schema: other_schema,
517 compatibility_level: _,
518 wire_format: other_wire_format,
519 },
520 ) => {
521 if schema != other_schema
522 || wire_format.alter_compatible(id, other_wire_format).is_err()
523 {
524 tracing::warn!(
525 "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
526 self,
527 other
528 );
529
530 return Err(AlterError { id });
531 }
532 }
533 (s, o) => {
534 if s != o {
535 tracing::warn!(
536 "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
537 s,
538 o
539 );
540 return Err(AlterError { id });
541 }
542 }
543 }
544
545 match (&self.key_format, &other.key_format) {
546 (
547 Some(KafkaSinkFormatType::Avro {
548 schema,
549 compatibility_level: _,
550 wire_format,
551 }),
552 Some(KafkaSinkFormatType::Avro {
553 schema: other_schema,
554 compatibility_level: _,
555 wire_format: other_wire_format,
556 }),
557 ) => {
558 if schema != other_schema
559 || wire_format.alter_compatible(id, other_wire_format).is_err()
560 {
561 tracing::warn!(
562 "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
563 self,
564 other
565 );
566
567 return Err(AlterError { id });
568 }
569 }
570 (s, o) => {
571 if s != o {
572 tracing::warn!(
573 "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
574 s,
575 o
576 );
577 return Err(AlterError { id });
578 }
579 }
580 }
581
582 Ok(())
583 }
584}
585
586impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
587 for KafkaSinkFormat<ReferencedConnection>
588{
589 fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
590 KafkaSinkFormat {
591 key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
592 value_format: self.value_format.into_inline_connection(&r),
593 }
594 }
595}
596
597impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
598 for KafkaSinkFormatType<ReferencedConnection>
599{
600 fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
601 match self {
602 KafkaSinkFormatType::Avro {
603 schema,
604 compatibility_level,
605 wire_format,
606 } => KafkaSinkFormatType::Avro {
607 schema,
608 compatibility_level,
609 wire_format: wire_format.into_inline_connection(r),
610 },
611 KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
612 KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
613 KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
614 }
615 }
616}
617
618#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
619pub enum S3SinkFormat {
620 PgCopy(CopyFormatParams<'static>),
622 Parquet,
624}
625
626#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
628pub struct S3UploadInfo {
629 pub uri: String,
631 pub max_file_size: u64,
633 pub desc: RelationDesc,
635 pub format: S3SinkFormat,
637}
638
639pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
640pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);
641
642pub const ICEBERG_APPEND_DIFF_COLUMN: &str = "_mz_diff";
644pub const ICEBERG_APPEND_TIMESTAMP_COLUMN: &str = "_mz_timestamp";
646
647pub const ICEBERG_UINT64_DECIMAL_PRECISION: u8 = 20;
650
651pub fn iceberg_type_overrides(
666 scalar_type: &mz_repr::SqlScalarType,
667) -> Option<(arrow::datatypes::DataType, String)> {
668 use arrow::datatypes::DataType;
669 use mz_repr::SqlScalarType;
670 match scalar_type {
671 SqlScalarType::UInt16 => Some((DataType::Int32, "uint2".to_string())),
672 SqlScalarType::UInt32 => Some((DataType::Int64, "uint4".to_string())),
673 SqlScalarType::UInt64 => Some((
674 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
675 "uint8".to_string(),
676 )),
677 SqlScalarType::MzTimestamp => Some((
678 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
679 "mz_timestamp".to_string(),
680 )),
681 SqlScalarType::Interval => Some((DataType::LargeUtf8, "interval".to_string())),
682 _ => None,
683 }
684}
685
686#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
687#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
688pub struct IcebergSinkConnection<C: ConnectionAccess = InlinedConnection> {
689 pub catalog_connection_id: CatalogItemId,
690 pub catalog_connection: C::IcebergCatalog,
691
692 pub storage_connection_id: Option<CatalogItemId>,
701 pub storage_connection: Option<C::Aws>,
702
703 pub relation_key_indices: Option<Vec<usize>>,
705 pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
707 pub namespace: String,
708 pub table: String,
709}
710
711impl<C: ConnectionAccess> IcebergSinkConnection<C> {
712 pub fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
717 if self == other {
718 return Ok(());
719 }
720 let IcebergSinkConnection {
721 catalog_connection_id: connection_id,
722 catalog_connection,
723 storage_connection_id,
724 storage_connection,
725 relation_key_indices,
726 key_desc_and_indices,
727 namespace,
728 table,
729 } = self;
730
731 let compatibility_checks = [
732 (
733 connection_id == &other.catalog_connection_id,
734 "connection_id",
735 ),
736 (
737 catalog_connection
738 .alter_compatible(id, &other.catalog_connection)
739 .is_ok(),
740 "catalog_connection",
741 ),
742 (
745 other.storage_connection_id.is_none()
746 || storage_connection_id == &other.storage_connection_id,
747 "storage_connection_id",
748 ),
749 (
750 match &other.storage_connection {
751 None => true, Some(after) => {
753 match storage_connection {
754 None => false, Some(before) => before.alter_compatible(id, after).is_ok(),
756 }
757 }
758 },
759 "storage_connection",
760 ),
761 (
762 relation_key_indices == &other.relation_key_indices,
763 "relation_key_indices",
764 ),
765 (
766 key_desc_and_indices == &other.key_desc_and_indices,
767 "key_desc_and_indices",
768 ),
769 (namespace == &other.namespace, "namespace"),
770 (table == &other.table, "table"),
771 ];
772 for (compatible, field) in compatibility_checks {
773 if !compatible {
774 tracing::warn!(
775 "IcebergSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
776 self,
777 other
778 );
779
780 return Err(AlterError { id });
781 }
782 }
783
784 Ok(())
785 }
786}
787
788impl<R: ConnectionResolver> IntoInlineConnection<IcebergSinkConnection, R>
789 for IcebergSinkConnection<ReferencedConnection>
790{
791 fn into_inline_connection(self, r: R) -> IcebergSinkConnection {
792 let IcebergSinkConnection {
793 catalog_connection_id,
794 catalog_connection,
795 storage_connection_id,
796 storage_connection,
797 relation_key_indices,
798 key_desc_and_indices,
799 namespace,
800 table,
801 } = self;
802 IcebergSinkConnection {
803 catalog_connection_id,
804 catalog_connection: r
805 .resolve_connection(catalog_connection)
806 .unwrap_iceberg_catalog(),
807 storage_connection_id,
808 storage_connection: storage_connection.map(|c| r.resolve_connection(c).unwrap_aws()),
809 relation_key_indices,
810 key_desc_and_indices,
811 namespace,
812 table,
813 }
814 }
815}