1use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::bytes::ByteSize;
20use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
21use proptest_derive::Arbitrary;
22use serde::{Deserialize, Serialize};
23use timely::PartialOrder;
24use timely::progress::frontier::Antichain;
25
26use crate::AlterCompatible;
27use crate::connections::inline::{
28 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
29 ReferencedConnection,
30};
31use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
32use crate::controller::AlterError;
33
34pub mod s3_oneshot_sink;
35
36#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
38pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
39 pub from: GlobalId,
40 pub from_desc: RelationDesc,
41 pub connection: StorageSinkConnection,
42 pub with_snapshot: bool,
43 pub version: u64,
44 pub envelope: SinkEnvelope,
45 pub as_of: Antichain<T>,
46 pub from_storage_metadata: S,
47 pub to_storage_metadata: S,
48}
49
50impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
51 for StorageSinkDesc<S, T>
52{
53 fn alter_compatible(
61 &self,
62 id: GlobalId,
63 other: &StorageSinkDesc<S, T>,
64 ) -> Result<(), AlterError> {
65 if self == other {
66 return Ok(());
67 }
68 let StorageSinkDesc {
69 from,
70 from_desc,
71 connection,
72 envelope,
73 version: _,
74 as_of: _,
76 from_storage_metadata,
77 with_snapshot,
78 to_storage_metadata,
79 } = self;
80
81 let compatibility_checks = [
82 (from == &other.from, "from"),
83 (from_desc == &other.from_desc, "from_desc"),
84 (
85 connection.alter_compatible(id, &other.connection).is_ok(),
86 "connection",
87 ),
88 (envelope == &other.envelope, "envelope"),
89 (*with_snapshot || !other.with_snapshot, "with_snapshot"),
92 (
93 from_storage_metadata == &other.from_storage_metadata,
94 "from_storage_metadata",
95 ),
96 (
97 to_storage_metadata == &other.to_storage_metadata,
98 "to_storage_metadata",
99 ),
100 ];
101
102 for (compatible, field) in compatibility_checks {
103 if !compatible {
104 tracing::warn!(
105 "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
106 self,
107 other
108 );
109
110 return Err(AlterError { id });
111 }
112 }
113
114 Ok(())
115 }
116}
117
118#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
119pub enum SinkEnvelope {
120 Debezium,
121 Upsert,
122}
123
124#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
125pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
126 Kafka(KafkaSinkConnection<C>),
127 Iceberg(IcebergSinkConnection<C>),
128}
129
130impl<C: ConnectionAccess> StorageSinkConnection<C> {
131 pub fn alter_compatible(
136 &self,
137 id: GlobalId,
138 other: &StorageSinkConnection<C>,
139 ) -> Result<(), AlterError> {
140 if self == other {
141 return Ok(());
142 }
143 match (self, other) {
144 (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
145 s.alter_compatible(id, o)?
146 }
147 (StorageSinkConnection::Iceberg(s), StorageSinkConnection::Iceberg(o)) => {
148 s.alter_compatible(id, o)?
149 }
150 _ => {
151 tracing::warn!(
152 "StorageSinkConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
153 self,
154 other
155 );
156 return Err(AlterError { id });
157 }
158 }
159
160 Ok(())
161 }
162}
163
164impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
165 for StorageSinkConnection<ReferencedConnection>
166{
167 fn into_inline_connection(self, r: R) -> StorageSinkConnection {
168 match self {
169 Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
170 Self::Iceberg(conn) => StorageSinkConnection::Iceberg(conn.into_inline_connection(r)),
171 }
172 }
173}
174
175impl<C: ConnectionAccess> StorageSinkConnection<C> {
176 pub fn connection_id(&self) -> Option<CatalogItemId> {
178 use StorageSinkConnection::*;
179 match self {
180 Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
181 Iceberg(IcebergSinkConnection {
182 catalog_connection_id: connection_id,
183 ..
184 }) => Some(*connection_id),
185 }
186 }
187
188 pub fn name(&self) -> &'static str {
190 use StorageSinkConnection::*;
191 match self {
192 Kafka(_) => "kafka",
193 Iceberg(_) => "iceberg",
194 }
195 }
196}
197
198#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
199pub enum KafkaSinkCompressionType {
200 None,
201 Gzip,
202 Snappy,
203 Lz4,
204 Zstd,
205}
206
207impl KafkaSinkCompressionType {
208 pub fn to_librdkafka_option(&self) -> &'static str {
211 match self {
212 KafkaSinkCompressionType::None => "none",
213 KafkaSinkCompressionType::Gzip => "gzip",
214 KafkaSinkCompressionType::Snappy => "snappy",
215 KafkaSinkCompressionType::Lz4 => "lz4",
216 KafkaSinkCompressionType::Zstd => "zstd",
217 }
218 }
219}
220
221#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
222pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
223 pub connection_id: CatalogItemId,
224 pub connection: C::Kafka,
225 pub format: KafkaSinkFormat<C>,
226 pub relation_key_indices: Option<Vec<usize>>,
228 pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
230 pub headers_index: Option<usize>,
232 pub value_desc: RelationDesc,
233 pub partition_by: Option<MirScalarExpr>,
236 pub topic: String,
237 pub topic_options: KafkaTopicOptions,
239 pub compression_type: KafkaSinkCompressionType,
240 pub progress_group_id: KafkaIdStyle,
241 pub transactional_id: KafkaIdStyle,
242 pub topic_metadata_refresh_interval: Duration,
243}
244
245impl KafkaSinkConnection {
246 pub fn client_id(
251 &self,
252 configs: &ConfigSet,
253 connection_context: &ConnectionContext,
254 sink_id: GlobalId,
255 ) -> String {
256 let mut client_id =
257 KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
258 self.connection.enrich_client_id(configs, &mut client_id);
259 client_id
260 }
261
262 pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<'_, str> {
264 self.connection
265 .progress_topic(connection_context, self.connection_id)
266 }
267
268 pub fn progress_group_id(
274 &self,
275 connection_context: &ConnectionContext,
276 sink_id: GlobalId,
277 ) -> String {
278 match self.progress_group_id {
279 KafkaIdStyle::Prefix(ref prefix) => format!(
280 "{}{}",
281 prefix.as_deref().unwrap_or(""),
282 KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
283 ),
284 KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
285 }
286 }
287
288 pub fn transactional_id(
293 &self,
294 connection_context: &ConnectionContext,
295 sink_id: GlobalId,
296 ) -> String {
297 match self.transactional_id {
298 KafkaIdStyle::Prefix(ref prefix) => format!(
299 "{}{}",
300 prefix.as_deref().unwrap_or(""),
301 KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
302 ),
303 KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
304 }
305 }
306}
307
308impl<C: ConnectionAccess> KafkaSinkConnection<C> {
309 pub fn alter_compatible(
314 &self,
315 id: GlobalId,
316 other: &KafkaSinkConnection<C>,
317 ) -> Result<(), AlterError> {
318 if self == other {
319 return Ok(());
320 }
321 let KafkaSinkConnection {
322 connection_id,
323 connection,
324 format,
325 relation_key_indices,
326 key_desc_and_indices,
327 headers_index,
328 value_desc,
329 partition_by,
330 topic,
331 compression_type,
332 progress_group_id,
333 transactional_id,
334 topic_options,
335 topic_metadata_refresh_interval,
336 } = self;
337
338 let compatibility_checks = [
339 (connection_id == &other.connection_id, "connection_id"),
340 (
341 connection.alter_compatible(id, &other.connection).is_ok(),
342 "connection",
343 ),
344 (format.alter_compatible(id, &other.format).is_ok(), "format"),
345 (
346 relation_key_indices == &other.relation_key_indices,
347 "relation_key_indices",
348 ),
349 (
350 key_desc_and_indices == &other.key_desc_and_indices,
351 "key_desc_and_indices",
352 ),
353 (headers_index == &other.headers_index, "headers_index"),
354 (value_desc == &other.value_desc, "value_desc"),
355 (partition_by == &other.partition_by, "partition_by"),
356 (topic == &other.topic, "topic"),
357 (
358 compression_type == &other.compression_type,
359 "compression_type",
360 ),
361 (
362 progress_group_id == &other.progress_group_id,
363 "progress_group_id",
364 ),
365 (
366 transactional_id == &other.transactional_id,
367 "transactional_id",
368 ),
369 (topic_options == &other.topic_options, "topic_config"),
370 (
371 topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
372 "topic_metadata_refresh_interval",
373 ),
374 ];
375 for (compatible, field) in compatibility_checks {
376 if !compatible {
377 tracing::warn!(
378 "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
379 self,
380 other
381 );
382
383 return Err(AlterError { id });
384 }
385 }
386
387 Ok(())
388 }
389}
390
391impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
392 for KafkaSinkConnection<ReferencedConnection>
393{
394 fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
395 let KafkaSinkConnection {
396 connection_id,
397 connection,
398 format,
399 relation_key_indices,
400 key_desc_and_indices,
401 headers_index,
402 value_desc,
403 partition_by,
404 topic,
405 compression_type,
406 progress_group_id,
407 transactional_id,
408 topic_options,
409 topic_metadata_refresh_interval,
410 } = self;
411 KafkaSinkConnection {
412 connection_id,
413 connection: r.resolve_connection(connection).unwrap_kafka(),
414 format: format.into_inline_connection(r),
415 relation_key_indices,
416 key_desc_and_indices,
417 headers_index,
418 value_desc,
419 partition_by,
420 topic,
421 compression_type,
422 progress_group_id,
423 transactional_id,
424 topic_options,
425 topic_metadata_refresh_interval,
426 }
427 }
428}
429
430#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
431pub enum KafkaIdStyle {
432 Prefix(Option<String>),
434 Legacy,
436}
437
438#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
439pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
440 pub key_format: Option<KafkaSinkFormatType<C>>,
441 pub value_format: KafkaSinkFormatType<C>,
442}
443
444#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
445pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
446 Avro {
447 schema: String,
448 compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
449 csr_connection: C::Csr,
450 },
451 Json,
452 Text,
453 Bytes,
454}
455
456impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
457 pub fn get_format_name(&self) -> &str {
458 match self {
459 Self::Avro { .. } => "avro",
460 Self::Json => "json",
461 Self::Text => "text",
462 Self::Bytes => "bytes",
463 }
464 }
465}
466
467impl<C: ConnectionAccess> KafkaSinkFormat<C> {
468 pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
469 match &self.key_format {
473 None => self.value_format.get_format_name().into(),
474 Some(key_format) => match (key_format, &self.value_format) {
475 (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
476 "avro".into()
477 }
478 (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
479 (keyf, valuef) => format!(
480 "key-{}-value-{}",
481 keyf.get_format_name(),
482 valuef.get_format_name()
483 )
484 .into(),
485 },
486 }
487 }
488
489 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
490 if self == other {
491 return Ok(());
492 }
493
494 match (&self.value_format, &other.value_format) {
495 (
496 KafkaSinkFormatType::Avro {
497 schema,
498 compatibility_level: _,
499 csr_connection,
500 },
501 KafkaSinkFormatType::Avro {
502 schema: other_schema,
503 compatibility_level: _,
504 csr_connection: other_csr_connection,
505 },
506 ) => {
507 if schema != other_schema
508 || csr_connection
509 .alter_compatible(id, other_csr_connection)
510 .is_err()
511 {
512 tracing::warn!(
513 "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
514 self,
515 other
516 );
517
518 return Err(AlterError { id });
519 }
520 }
521 (s, o) => {
522 if s != o {
523 tracing::warn!(
524 "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
525 s,
526 o
527 );
528 return Err(AlterError { id });
529 }
530 }
531 }
532
533 match (&self.key_format, &other.key_format) {
534 (
535 Some(KafkaSinkFormatType::Avro {
536 schema,
537 compatibility_level: _,
538 csr_connection,
539 }),
540 Some(KafkaSinkFormatType::Avro {
541 schema: other_schema,
542 compatibility_level: _,
543 csr_connection: other_csr_connection,
544 }),
545 ) => {
546 if schema != other_schema
547 || csr_connection
548 .alter_compatible(id, other_csr_connection)
549 .is_err()
550 {
551 tracing::warn!(
552 "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
553 self,
554 other
555 );
556
557 return Err(AlterError { id });
558 }
559 }
560 (s, o) => {
561 if s != o {
562 tracing::warn!(
563 "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
564 s,
565 o
566 );
567 return Err(AlterError { id });
568 }
569 }
570 }
571
572 Ok(())
573 }
574}
575
576impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
577 for KafkaSinkFormat<ReferencedConnection>
578{
579 fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
580 KafkaSinkFormat {
581 key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
582 value_format: self.value_format.into_inline_connection(&r),
583 }
584 }
585}
586
587impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
588 for KafkaSinkFormatType<ReferencedConnection>
589{
590 fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
591 match self {
592 KafkaSinkFormatType::Avro {
593 schema,
594 compatibility_level,
595 csr_connection,
596 } => KafkaSinkFormatType::Avro {
597 schema,
598 compatibility_level,
599 csr_connection: r.resolve_connection(csr_connection).unwrap_csr(),
600 },
601 KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
602 KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
603 KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
604 }
605 }
606}
607
608#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
609pub enum S3SinkFormat {
610 PgCopy(CopyFormatParams<'static>),
612 Parquet,
614}
615
616#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
618pub struct S3UploadInfo {
619 pub uri: String,
621 pub max_file_size: u64,
623 pub desc: RelationDesc,
625 pub format: S3SinkFormat,
627}
628
629pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
630pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);
631
632#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
633pub struct IcebergSinkConnection<C: ConnectionAccess = InlinedConnection> {
634 pub catalog_connection_id: CatalogItemId,
635 pub catalog_connection: C::IcebergCatalog,
636 pub aws_connection_id: CatalogItemId,
637 pub aws_connection: C::Aws,
638 pub relation_key_indices: Option<Vec<usize>>,
640 pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
642 pub namespace: String,
643 pub table: String,
644}
645
646impl<C: ConnectionAccess> IcebergSinkConnection<C> {
647 pub fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
652 if self == other {
653 return Ok(());
654 }
655 let IcebergSinkConnection {
656 catalog_connection_id: connection_id,
657 catalog_connection,
658 aws_connection_id,
659 aws_connection,
660 relation_key_indices,
661 key_desc_and_indices,
662 namespace,
663 table,
664 } = self;
665
666 let compatibility_checks = [
667 (
668 connection_id == &other.catalog_connection_id,
669 "connection_id",
670 ),
671 (
672 catalog_connection
673 .alter_compatible(id, &other.catalog_connection)
674 .is_ok(),
675 "catalog_connection",
676 ),
677 (
678 aws_connection_id == &other.aws_connection_id,
679 "aws_connection_id",
680 ),
681 (
682 aws_connection
683 .alter_compatible(id, &other.aws_connection)
684 .is_ok(),
685 "aws_connection",
686 ),
687 (
688 relation_key_indices == &other.relation_key_indices,
689 "relation_key_indices",
690 ),
691 (
692 key_desc_and_indices == &other.key_desc_and_indices,
693 "key_desc_and_indices",
694 ),
695 (namespace == &other.namespace, "namespace"),
696 (table == &other.table, "table"),
697 ];
698 for (compatible, field) in compatibility_checks {
699 if !compatible {
700 tracing::warn!(
701 "IcebergSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
702 self,
703 other
704 );
705
706 return Err(AlterError { id });
707 }
708 }
709
710 Ok(())
711 }
712}
713
714impl<R: ConnectionResolver> IntoInlineConnection<IcebergSinkConnection, R>
715 for IcebergSinkConnection<ReferencedConnection>
716{
717 fn into_inline_connection(self, r: R) -> IcebergSinkConnection {
718 let IcebergSinkConnection {
719 catalog_connection_id,
720 catalog_connection,
721 aws_connection_id,
722 aws_connection,
723 relation_key_indices,
724 key_desc_and_indices,
725 namespace,
726 table,
727 } = self;
728 IcebergSinkConnection {
729 catalog_connection_id,
730 catalog_connection: r
731 .resolve_connection(catalog_connection)
732 .unwrap_iceberg_catalog(),
733 aws_connection_id,
734 aws_connection: r.resolve_connection(aws_connection).unwrap_aws(),
735 relation_key_indices,
736 key_desc_and_indices,
737 namespace,
738 table,
739 }
740 }
741}