1use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::bytes::ByteSize;
20use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
21use proptest_derive::Arbitrary;
22use serde::{Deserialize, Serialize};
23use timely::PartialOrder;
24use timely::progress::frontier::Antichain;
25
26use crate::AlterCompatible;
27use crate::connections::inline::{
28 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
29 ReferencedConnection,
30};
31use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
32use crate::controller::AlterError;
33
34pub mod s3_oneshot_sink;
35
36#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
38pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
39 pub from: GlobalId,
40 pub from_desc: RelationDesc,
41 pub connection: StorageSinkConnection,
42 pub with_snapshot: bool,
43 pub version: u64,
44 pub envelope: SinkEnvelope,
45 pub as_of: Antichain<T>,
46 pub from_storage_metadata: S,
47 pub to_storage_metadata: S,
48 pub commit_interval: Option<Duration>,
54}
55
56impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
57 for StorageSinkDesc<S, T>
58{
59 fn alter_compatible(
67 &self,
68 id: GlobalId,
69 other: &StorageSinkDesc<S, T>,
70 ) -> Result<(), AlterError> {
71 if self == other {
72 return Ok(());
73 }
74 let StorageSinkDesc {
75 from,
76 from_desc,
77 connection,
78 envelope,
79 version: _,
80 as_of: _,
82 from_storage_metadata,
83 with_snapshot,
84 to_storage_metadata,
85 commit_interval: _,
86 } = self;
87
88 let compatibility_checks = [
89 (from == &other.from, "from"),
90 (from_desc == &other.from_desc, "from_desc"),
91 (
92 connection.alter_compatible(id, &other.connection).is_ok(),
93 "connection",
94 ),
95 (envelope == &other.envelope, "envelope"),
96 (*with_snapshot || !other.with_snapshot, "with_snapshot"),
99 (
100 from_storage_metadata == &other.from_storage_metadata,
101 "from_storage_metadata",
102 ),
103 (
104 to_storage_metadata == &other.to_storage_metadata,
105 "to_storage_metadata",
106 ),
107 ];
108
109 for (compatible, field) in compatibility_checks {
110 if !compatible {
111 tracing::warn!(
112 "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
113 self,
114 other
115 );
116
117 return Err(AlterError { id });
118 }
119 }
120
121 Ok(())
122 }
123}
124
125#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
126pub enum SinkEnvelope {
127 Debezium,
129 Upsert,
130 Append,
132}
133
134#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
135pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
136 Kafka(KafkaSinkConnection<C>),
137 Iceberg(IcebergSinkConnection<C>),
138}
139
140impl<C: ConnectionAccess> StorageSinkConnection<C> {
141 pub fn alter_compatible(
146 &self,
147 id: GlobalId,
148 other: &StorageSinkConnection<C>,
149 ) -> Result<(), AlterError> {
150 if self == other {
151 return Ok(());
152 }
153 match (self, other) {
154 (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
155 s.alter_compatible(id, o)?
156 }
157 (StorageSinkConnection::Iceberg(s), StorageSinkConnection::Iceberg(o)) => {
158 s.alter_compatible(id, o)?
159 }
160 _ => {
161 tracing::warn!(
162 "StorageSinkConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
163 self,
164 other
165 );
166 return Err(AlterError { id });
167 }
168 }
169
170 Ok(())
171 }
172}
173
174impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
175 for StorageSinkConnection<ReferencedConnection>
176{
177 fn into_inline_connection(self, r: R) -> StorageSinkConnection {
178 match self {
179 Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
180 Self::Iceberg(conn) => StorageSinkConnection::Iceberg(conn.into_inline_connection(r)),
181 }
182 }
183}
184
185impl<C: ConnectionAccess> StorageSinkConnection<C> {
186 pub fn connection_id(&self) -> Option<CatalogItemId> {
188 use StorageSinkConnection::*;
189 match self {
190 Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
191 Iceberg(IcebergSinkConnection {
192 catalog_connection_id: connection_id,
193 ..
194 }) => Some(*connection_id),
195 }
196 }
197
198 pub fn name(&self) -> &'static str {
200 use StorageSinkConnection::*;
201 match self {
202 Kafka(_) => "kafka",
203 Iceberg(_) => "iceberg",
204 }
205 }
206}
207
208#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
209pub enum KafkaSinkCompressionType {
210 None,
211 Gzip,
212 Snappy,
213 Lz4,
214 Zstd,
215}
216
217impl KafkaSinkCompressionType {
218 pub fn to_librdkafka_option(&self) -> &'static str {
221 match self {
222 KafkaSinkCompressionType::None => "none",
223 KafkaSinkCompressionType::Gzip => "gzip",
224 KafkaSinkCompressionType::Snappy => "snappy",
225 KafkaSinkCompressionType::Lz4 => "lz4",
226 KafkaSinkCompressionType::Zstd => "zstd",
227 }
228 }
229}
230
231#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
232pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
233 pub connection_id: CatalogItemId,
234 pub connection: C::Kafka,
235 pub format: KafkaSinkFormat<C>,
236 pub relation_key_indices: Option<Vec<usize>>,
238 pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
240 pub headers_index: Option<usize>,
242 pub value_desc: RelationDesc,
243 pub partition_by: Option<MirScalarExpr>,
246 pub topic: String,
247 pub topic_options: KafkaTopicOptions,
249 pub compression_type: KafkaSinkCompressionType,
250 pub progress_group_id: KafkaIdStyle,
251 pub transactional_id: KafkaIdStyle,
252 pub topic_metadata_refresh_interval: Duration,
253}
254
255impl KafkaSinkConnection {
256 pub fn client_id(
261 &self,
262 configs: &ConfigSet,
263 connection_context: &ConnectionContext,
264 sink_id: GlobalId,
265 ) -> String {
266 let mut client_id =
267 KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
268 self.connection.enrich_client_id(configs, &mut client_id);
269 client_id
270 }
271
272 pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<'_, str> {
274 self.connection
275 .progress_topic(connection_context, self.connection_id)
276 }
277
278 pub fn progress_group_id(
284 &self,
285 connection_context: &ConnectionContext,
286 sink_id: GlobalId,
287 ) -> String {
288 match self.progress_group_id {
289 KafkaIdStyle::Prefix(ref prefix) => format!(
290 "{}{}",
291 prefix.as_deref().unwrap_or(""),
292 KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
293 ),
294 KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
295 }
296 }
297
298 pub fn transactional_id(
303 &self,
304 connection_context: &ConnectionContext,
305 sink_id: GlobalId,
306 ) -> String {
307 match self.transactional_id {
308 KafkaIdStyle::Prefix(ref prefix) => format!(
309 "{}{}",
310 prefix.as_deref().unwrap_or(""),
311 KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
312 ),
313 KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
314 }
315 }
316}
317
318impl<C: ConnectionAccess> KafkaSinkConnection<C> {
319 pub fn alter_compatible(
324 &self,
325 id: GlobalId,
326 other: &KafkaSinkConnection<C>,
327 ) -> Result<(), AlterError> {
328 if self == other {
329 return Ok(());
330 }
331 let KafkaSinkConnection {
332 connection_id,
333 connection,
334 format,
335 relation_key_indices,
336 key_desc_and_indices,
337 headers_index,
338 value_desc,
339 partition_by,
340 topic,
341 compression_type,
342 progress_group_id,
343 transactional_id,
344 topic_options,
345 topic_metadata_refresh_interval,
346 } = self;
347
348 let compatibility_checks = [
349 (connection_id == &other.connection_id, "connection_id"),
350 (
351 connection.alter_compatible(id, &other.connection).is_ok(),
352 "connection",
353 ),
354 (format.alter_compatible(id, &other.format).is_ok(), "format"),
355 (
356 relation_key_indices == &other.relation_key_indices,
357 "relation_key_indices",
358 ),
359 (
360 key_desc_and_indices == &other.key_desc_and_indices,
361 "key_desc_and_indices",
362 ),
363 (headers_index == &other.headers_index, "headers_index"),
364 (value_desc == &other.value_desc, "value_desc"),
365 (partition_by == &other.partition_by, "partition_by"),
366 (topic == &other.topic, "topic"),
367 (
368 compression_type == &other.compression_type,
369 "compression_type",
370 ),
371 (
372 progress_group_id == &other.progress_group_id,
373 "progress_group_id",
374 ),
375 (
376 transactional_id == &other.transactional_id,
377 "transactional_id",
378 ),
379 (topic_options == &other.topic_options, "topic_config"),
380 (
381 topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
382 "topic_metadata_refresh_interval",
383 ),
384 ];
385 for (compatible, field) in compatibility_checks {
386 if !compatible {
387 tracing::warn!(
388 "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
389 self,
390 other
391 );
392
393 return Err(AlterError { id });
394 }
395 }
396
397 Ok(())
398 }
399}
400
401impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
402 for KafkaSinkConnection<ReferencedConnection>
403{
404 fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
405 let KafkaSinkConnection {
406 connection_id,
407 connection,
408 format,
409 relation_key_indices,
410 key_desc_and_indices,
411 headers_index,
412 value_desc,
413 partition_by,
414 topic,
415 compression_type,
416 progress_group_id,
417 transactional_id,
418 topic_options,
419 topic_metadata_refresh_interval,
420 } = self;
421 KafkaSinkConnection {
422 connection_id,
423 connection: r.resolve_connection(connection).unwrap_kafka(),
424 format: format.into_inline_connection(r),
425 relation_key_indices,
426 key_desc_and_indices,
427 headers_index,
428 value_desc,
429 partition_by,
430 topic,
431 compression_type,
432 progress_group_id,
433 transactional_id,
434 topic_options,
435 topic_metadata_refresh_interval,
436 }
437 }
438}
439
440#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
441pub enum KafkaIdStyle {
442 Prefix(Option<String>),
444 Legacy,
446}
447
448#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
449pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
450 pub key_format: Option<KafkaSinkFormatType<C>>,
451 pub value_format: KafkaSinkFormatType<C>,
452}
453
454#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
455pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
456 Avro {
457 schema: String,
458 compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
459 csr_connection: C::Csr,
460 },
461 Json,
462 Text,
463 Bytes,
464}
465
466impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
467 pub fn get_format_name(&self) -> &str {
468 match self {
469 Self::Avro { .. } => "avro",
470 Self::Json => "json",
471 Self::Text => "text",
472 Self::Bytes => "bytes",
473 }
474 }
475}
476
477impl<C: ConnectionAccess> KafkaSinkFormat<C> {
478 pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
479 match &self.key_format {
483 None => self.value_format.get_format_name().into(),
484 Some(key_format) => match (key_format, &self.value_format) {
485 (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
486 "avro".into()
487 }
488 (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
489 (keyf, valuef) => format!(
490 "key-{}-value-{}",
491 keyf.get_format_name(),
492 valuef.get_format_name()
493 )
494 .into(),
495 },
496 }
497 }
498
499 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
500 if self == other {
501 return Ok(());
502 }
503
504 match (&self.value_format, &other.value_format) {
505 (
506 KafkaSinkFormatType::Avro {
507 schema,
508 compatibility_level: _,
509 csr_connection,
510 },
511 KafkaSinkFormatType::Avro {
512 schema: other_schema,
513 compatibility_level: _,
514 csr_connection: other_csr_connection,
515 },
516 ) => {
517 if schema != other_schema
518 || csr_connection
519 .alter_compatible(id, other_csr_connection)
520 .is_err()
521 {
522 tracing::warn!(
523 "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
524 self,
525 other
526 );
527
528 return Err(AlterError { id });
529 }
530 }
531 (s, o) => {
532 if s != o {
533 tracing::warn!(
534 "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
535 s,
536 o
537 );
538 return Err(AlterError { id });
539 }
540 }
541 }
542
543 match (&self.key_format, &other.key_format) {
544 (
545 Some(KafkaSinkFormatType::Avro {
546 schema,
547 compatibility_level: _,
548 csr_connection,
549 }),
550 Some(KafkaSinkFormatType::Avro {
551 schema: other_schema,
552 compatibility_level: _,
553 csr_connection: other_csr_connection,
554 }),
555 ) => {
556 if schema != other_schema
557 || csr_connection
558 .alter_compatible(id, other_csr_connection)
559 .is_err()
560 {
561 tracing::warn!(
562 "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
563 self,
564 other
565 );
566
567 return Err(AlterError { id });
568 }
569 }
570 (s, o) => {
571 if s != o {
572 tracing::warn!(
573 "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
574 s,
575 o
576 );
577 return Err(AlterError { id });
578 }
579 }
580 }
581
582 Ok(())
583 }
584}
585
586impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
587 for KafkaSinkFormat<ReferencedConnection>
588{
589 fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
590 KafkaSinkFormat {
591 key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
592 value_format: self.value_format.into_inline_connection(&r),
593 }
594 }
595}
596
597impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
598 for KafkaSinkFormatType<ReferencedConnection>
599{
600 fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
601 match self {
602 KafkaSinkFormatType::Avro {
603 schema,
604 compatibility_level,
605 csr_connection,
606 } => KafkaSinkFormatType::Avro {
607 schema,
608 compatibility_level,
609 csr_connection: r.resolve_connection(csr_connection).unwrap_csr(),
610 },
611 KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
612 KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
613 KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
614 }
615 }
616}
617
618#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
619pub enum S3SinkFormat {
620 PgCopy(CopyFormatParams<'static>),
622 Parquet,
624}
625
626#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
628pub struct S3UploadInfo {
629 pub uri: String,
631 pub max_file_size: u64,
633 pub desc: RelationDesc,
635 pub format: S3SinkFormat,
637}
638
639pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
640pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);
641
642pub const ICEBERG_APPEND_DIFF_COLUMN: &str = "_mz_diff";
644pub const ICEBERG_APPEND_TIMESTAMP_COLUMN: &str = "_mz_timestamp";
646
647#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
648pub struct IcebergSinkConnection<C: ConnectionAccess = InlinedConnection> {
649 pub catalog_connection_id: CatalogItemId,
650 pub catalog_connection: C::IcebergCatalog,
651 pub aws_connection_id: CatalogItemId,
652 pub aws_connection: C::Aws,
653 pub relation_key_indices: Option<Vec<usize>>,
655 pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
657 pub namespace: String,
658 pub table: String,
659}
660
661impl<C: ConnectionAccess> IcebergSinkConnection<C> {
662 pub fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
667 if self == other {
668 return Ok(());
669 }
670 let IcebergSinkConnection {
671 catalog_connection_id: connection_id,
672 catalog_connection,
673 aws_connection_id,
674 aws_connection,
675 relation_key_indices,
676 key_desc_and_indices,
677 namespace,
678 table,
679 } = self;
680
681 let compatibility_checks = [
682 (
683 connection_id == &other.catalog_connection_id,
684 "connection_id",
685 ),
686 (
687 catalog_connection
688 .alter_compatible(id, &other.catalog_connection)
689 .is_ok(),
690 "catalog_connection",
691 ),
692 (
693 aws_connection_id == &other.aws_connection_id,
694 "aws_connection_id",
695 ),
696 (
697 aws_connection
698 .alter_compatible(id, &other.aws_connection)
699 .is_ok(),
700 "aws_connection",
701 ),
702 (
703 relation_key_indices == &other.relation_key_indices,
704 "relation_key_indices",
705 ),
706 (
707 key_desc_and_indices == &other.key_desc_and_indices,
708 "key_desc_and_indices",
709 ),
710 (namespace == &other.namespace, "namespace"),
711 (table == &other.table, "table"),
712 ];
713 for (compatible, field) in compatibility_checks {
714 if !compatible {
715 tracing::warn!(
716 "IcebergSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
717 self,
718 other
719 );
720
721 return Err(AlterError { id });
722 }
723 }
724
725 Ok(())
726 }
727}
728
729impl<R: ConnectionResolver> IntoInlineConnection<IcebergSinkConnection, R>
730 for IcebergSinkConnection<ReferencedConnection>
731{
732 fn into_inline_connection(self, r: R) -> IcebergSinkConnection {
733 let IcebergSinkConnection {
734 catalog_connection_id,
735 catalog_connection,
736 aws_connection_id,
737 aws_connection,
738 relation_key_indices,
739 key_desc_and_indices,
740 namespace,
741 table,
742 } = self;
743 IcebergSinkConnection {
744 catalog_connection_id,
745 catalog_connection: r
746 .resolve_connection(catalog_connection)
747 .unwrap_iceberg_catalog(),
748 aws_connection_id,
749 aws_connection: r.resolve_connection(aws_connection).unwrap_aws(),
750 relation_key_indices,
751 key_desc_and_indices,
752 namespace,
753 table,
754 }
755 }
756}