1use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::bytes::ByteSize;
20use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
21use proptest_derive::Arbitrary;
22use serde::{Deserialize, Serialize};
23use timely::PartialOrder;
24use timely::progress::frontier::Antichain;
25
26use crate::AlterCompatible;
27use crate::connections::inline::{
28 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
29 ReferencedConnection,
30};
31use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
32use crate::controller::AlterError;
33
34pub mod s3_oneshot_sink;
35
36#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
38pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
39 pub from: GlobalId,
40 pub from_desc: RelationDesc,
41 pub connection: StorageSinkConnection,
42 pub with_snapshot: bool,
43 pub version: u64,
44 pub envelope: SinkEnvelope,
45 pub as_of: Antichain<T>,
46 pub from_storage_metadata: S,
47 pub to_storage_metadata: S,
48 pub commit_interval: Option<Duration>,
54}
55
56impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
57 for StorageSinkDesc<S, T>
58{
59 fn alter_compatible(
67 &self,
68 id: GlobalId,
69 other: &StorageSinkDesc<S, T>,
70 ) -> Result<(), AlterError> {
71 if self == other {
72 return Ok(());
73 }
74 let StorageSinkDesc {
75 from,
76 from_desc,
77 connection,
78 envelope,
79 version: _,
80 as_of: _,
82 from_storage_metadata,
83 with_snapshot,
84 to_storage_metadata,
85 commit_interval: _,
86 } = self;
87
88 let compatibility_checks = [
89 (from == &other.from, "from"),
90 (from_desc == &other.from_desc, "from_desc"),
91 (
92 connection.alter_compatible(id, &other.connection).is_ok(),
93 "connection",
94 ),
95 (envelope == &other.envelope, "envelope"),
96 (*with_snapshot || !other.with_snapshot, "with_snapshot"),
99 (
100 from_storage_metadata == &other.from_storage_metadata,
101 "from_storage_metadata",
102 ),
103 (
104 to_storage_metadata == &other.to_storage_metadata,
105 "to_storage_metadata",
106 ),
107 ];
108
109 for (compatible, field) in compatibility_checks {
110 if !compatible {
111 tracing::warn!(
112 "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
113 self,
114 other
115 );
116
117 return Err(AlterError { id });
118 }
119 }
120
121 Ok(())
122 }
123}
124
125#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
126pub enum SinkEnvelope {
127 Debezium,
128 Upsert,
129}
130
131#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
132pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
133 Kafka(KafkaSinkConnection<C>),
134 Iceberg(IcebergSinkConnection<C>),
135}
136
137impl<C: ConnectionAccess> StorageSinkConnection<C> {
138 pub fn alter_compatible(
143 &self,
144 id: GlobalId,
145 other: &StorageSinkConnection<C>,
146 ) -> Result<(), AlterError> {
147 if self == other {
148 return Ok(());
149 }
150 match (self, other) {
151 (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
152 s.alter_compatible(id, o)?
153 }
154 (StorageSinkConnection::Iceberg(s), StorageSinkConnection::Iceberg(o)) => {
155 s.alter_compatible(id, o)?
156 }
157 _ => {
158 tracing::warn!(
159 "StorageSinkConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
160 self,
161 other
162 );
163 return Err(AlterError { id });
164 }
165 }
166
167 Ok(())
168 }
169}
170
171impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
172 for StorageSinkConnection<ReferencedConnection>
173{
174 fn into_inline_connection(self, r: R) -> StorageSinkConnection {
175 match self {
176 Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
177 Self::Iceberg(conn) => StorageSinkConnection::Iceberg(conn.into_inline_connection(r)),
178 }
179 }
180}
181
182impl<C: ConnectionAccess> StorageSinkConnection<C> {
183 pub fn connection_id(&self) -> Option<CatalogItemId> {
185 use StorageSinkConnection::*;
186 match self {
187 Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
188 Iceberg(IcebergSinkConnection {
189 catalog_connection_id: connection_id,
190 ..
191 }) => Some(*connection_id),
192 }
193 }
194
195 pub fn name(&self) -> &'static str {
197 use StorageSinkConnection::*;
198 match self {
199 Kafka(_) => "kafka",
200 Iceberg(_) => "iceberg",
201 }
202 }
203}
204
205#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
206pub enum KafkaSinkCompressionType {
207 None,
208 Gzip,
209 Snappy,
210 Lz4,
211 Zstd,
212}
213
214impl KafkaSinkCompressionType {
215 pub fn to_librdkafka_option(&self) -> &'static str {
218 match self {
219 KafkaSinkCompressionType::None => "none",
220 KafkaSinkCompressionType::Gzip => "gzip",
221 KafkaSinkCompressionType::Snappy => "snappy",
222 KafkaSinkCompressionType::Lz4 => "lz4",
223 KafkaSinkCompressionType::Zstd => "zstd",
224 }
225 }
226}
227
228#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
229pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
230 pub connection_id: CatalogItemId,
231 pub connection: C::Kafka,
232 pub format: KafkaSinkFormat<C>,
233 pub relation_key_indices: Option<Vec<usize>>,
235 pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
237 pub headers_index: Option<usize>,
239 pub value_desc: RelationDesc,
240 pub partition_by: Option<MirScalarExpr>,
243 pub topic: String,
244 pub topic_options: KafkaTopicOptions,
246 pub compression_type: KafkaSinkCompressionType,
247 pub progress_group_id: KafkaIdStyle,
248 pub transactional_id: KafkaIdStyle,
249 pub topic_metadata_refresh_interval: Duration,
250}
251
252impl KafkaSinkConnection {
253 pub fn client_id(
258 &self,
259 configs: &ConfigSet,
260 connection_context: &ConnectionContext,
261 sink_id: GlobalId,
262 ) -> String {
263 let mut client_id =
264 KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
265 self.connection.enrich_client_id(configs, &mut client_id);
266 client_id
267 }
268
269 pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<'_, str> {
271 self.connection
272 .progress_topic(connection_context, self.connection_id)
273 }
274
275 pub fn progress_group_id(
281 &self,
282 connection_context: &ConnectionContext,
283 sink_id: GlobalId,
284 ) -> String {
285 match self.progress_group_id {
286 KafkaIdStyle::Prefix(ref prefix) => format!(
287 "{}{}",
288 prefix.as_deref().unwrap_or(""),
289 KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
290 ),
291 KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
292 }
293 }
294
295 pub fn transactional_id(
300 &self,
301 connection_context: &ConnectionContext,
302 sink_id: GlobalId,
303 ) -> String {
304 match self.transactional_id {
305 KafkaIdStyle::Prefix(ref prefix) => format!(
306 "{}{}",
307 prefix.as_deref().unwrap_or(""),
308 KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
309 ),
310 KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
311 }
312 }
313}
314
315impl<C: ConnectionAccess> KafkaSinkConnection<C> {
316 pub fn alter_compatible(
321 &self,
322 id: GlobalId,
323 other: &KafkaSinkConnection<C>,
324 ) -> Result<(), AlterError> {
325 if self == other {
326 return Ok(());
327 }
328 let KafkaSinkConnection {
329 connection_id,
330 connection,
331 format,
332 relation_key_indices,
333 key_desc_and_indices,
334 headers_index,
335 value_desc,
336 partition_by,
337 topic,
338 compression_type,
339 progress_group_id,
340 transactional_id,
341 topic_options,
342 topic_metadata_refresh_interval,
343 } = self;
344
345 let compatibility_checks = [
346 (connection_id == &other.connection_id, "connection_id"),
347 (
348 connection.alter_compatible(id, &other.connection).is_ok(),
349 "connection",
350 ),
351 (format.alter_compatible(id, &other.format).is_ok(), "format"),
352 (
353 relation_key_indices == &other.relation_key_indices,
354 "relation_key_indices",
355 ),
356 (
357 key_desc_and_indices == &other.key_desc_and_indices,
358 "key_desc_and_indices",
359 ),
360 (headers_index == &other.headers_index, "headers_index"),
361 (value_desc == &other.value_desc, "value_desc"),
362 (partition_by == &other.partition_by, "partition_by"),
363 (topic == &other.topic, "topic"),
364 (
365 compression_type == &other.compression_type,
366 "compression_type",
367 ),
368 (
369 progress_group_id == &other.progress_group_id,
370 "progress_group_id",
371 ),
372 (
373 transactional_id == &other.transactional_id,
374 "transactional_id",
375 ),
376 (topic_options == &other.topic_options, "topic_config"),
377 (
378 topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
379 "topic_metadata_refresh_interval",
380 ),
381 ];
382 for (compatible, field) in compatibility_checks {
383 if !compatible {
384 tracing::warn!(
385 "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
386 self,
387 other
388 );
389
390 return Err(AlterError { id });
391 }
392 }
393
394 Ok(())
395 }
396}
397
398impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
399 for KafkaSinkConnection<ReferencedConnection>
400{
401 fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
402 let KafkaSinkConnection {
403 connection_id,
404 connection,
405 format,
406 relation_key_indices,
407 key_desc_and_indices,
408 headers_index,
409 value_desc,
410 partition_by,
411 topic,
412 compression_type,
413 progress_group_id,
414 transactional_id,
415 topic_options,
416 topic_metadata_refresh_interval,
417 } = self;
418 KafkaSinkConnection {
419 connection_id,
420 connection: r.resolve_connection(connection).unwrap_kafka(),
421 format: format.into_inline_connection(r),
422 relation_key_indices,
423 key_desc_and_indices,
424 headers_index,
425 value_desc,
426 partition_by,
427 topic,
428 compression_type,
429 progress_group_id,
430 transactional_id,
431 topic_options,
432 topic_metadata_refresh_interval,
433 }
434 }
435}
436
437#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
438pub enum KafkaIdStyle {
439 Prefix(Option<String>),
441 Legacy,
443}
444
445#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
446pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
447 pub key_format: Option<KafkaSinkFormatType<C>>,
448 pub value_format: KafkaSinkFormatType<C>,
449}
450
451#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
452pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
453 Avro {
454 schema: String,
455 compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
456 csr_connection: C::Csr,
457 },
458 Json,
459 Text,
460 Bytes,
461}
462
463impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
464 pub fn get_format_name(&self) -> &str {
465 match self {
466 Self::Avro { .. } => "avro",
467 Self::Json => "json",
468 Self::Text => "text",
469 Self::Bytes => "bytes",
470 }
471 }
472}
473
474impl<C: ConnectionAccess> KafkaSinkFormat<C> {
475 pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
476 match &self.key_format {
480 None => self.value_format.get_format_name().into(),
481 Some(key_format) => match (key_format, &self.value_format) {
482 (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
483 "avro".into()
484 }
485 (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
486 (keyf, valuef) => format!(
487 "key-{}-value-{}",
488 keyf.get_format_name(),
489 valuef.get_format_name()
490 )
491 .into(),
492 },
493 }
494 }
495
496 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
497 if self == other {
498 return Ok(());
499 }
500
501 match (&self.value_format, &other.value_format) {
502 (
503 KafkaSinkFormatType::Avro {
504 schema,
505 compatibility_level: _,
506 csr_connection,
507 },
508 KafkaSinkFormatType::Avro {
509 schema: other_schema,
510 compatibility_level: _,
511 csr_connection: other_csr_connection,
512 },
513 ) => {
514 if schema != other_schema
515 || csr_connection
516 .alter_compatible(id, other_csr_connection)
517 .is_err()
518 {
519 tracing::warn!(
520 "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
521 self,
522 other
523 );
524
525 return Err(AlterError { id });
526 }
527 }
528 (s, o) => {
529 if s != o {
530 tracing::warn!(
531 "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
532 s,
533 o
534 );
535 return Err(AlterError { id });
536 }
537 }
538 }
539
540 match (&self.key_format, &other.key_format) {
541 (
542 Some(KafkaSinkFormatType::Avro {
543 schema,
544 compatibility_level: _,
545 csr_connection,
546 }),
547 Some(KafkaSinkFormatType::Avro {
548 schema: other_schema,
549 compatibility_level: _,
550 csr_connection: other_csr_connection,
551 }),
552 ) => {
553 if schema != other_schema
554 || csr_connection
555 .alter_compatible(id, other_csr_connection)
556 .is_err()
557 {
558 tracing::warn!(
559 "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
560 self,
561 other
562 );
563
564 return Err(AlterError { id });
565 }
566 }
567 (s, o) => {
568 if s != o {
569 tracing::warn!(
570 "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
571 s,
572 o
573 );
574 return Err(AlterError { id });
575 }
576 }
577 }
578
579 Ok(())
580 }
581}
582
583impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
584 for KafkaSinkFormat<ReferencedConnection>
585{
586 fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
587 KafkaSinkFormat {
588 key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
589 value_format: self.value_format.into_inline_connection(&r),
590 }
591 }
592}
593
594impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
595 for KafkaSinkFormatType<ReferencedConnection>
596{
597 fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
598 match self {
599 KafkaSinkFormatType::Avro {
600 schema,
601 compatibility_level,
602 csr_connection,
603 } => KafkaSinkFormatType::Avro {
604 schema,
605 compatibility_level,
606 csr_connection: r.resolve_connection(csr_connection).unwrap_csr(),
607 },
608 KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
609 KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
610 KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
611 }
612 }
613}
614
615#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
616pub enum S3SinkFormat {
617 PgCopy(CopyFormatParams<'static>),
619 Parquet,
621}
622
623#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
625pub struct S3UploadInfo {
626 pub uri: String,
628 pub max_file_size: u64,
630 pub desc: RelationDesc,
632 pub format: S3SinkFormat,
634}
635
636pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
637pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);
638
639#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
640pub struct IcebergSinkConnection<C: ConnectionAccess = InlinedConnection> {
641 pub catalog_connection_id: CatalogItemId,
642 pub catalog_connection: C::IcebergCatalog,
643 pub aws_connection_id: CatalogItemId,
644 pub aws_connection: C::Aws,
645 pub relation_key_indices: Option<Vec<usize>>,
647 pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
649 pub namespace: String,
650 pub table: String,
651}
652
653impl<C: ConnectionAccess> IcebergSinkConnection<C> {
654 pub fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
659 if self == other {
660 return Ok(());
661 }
662 let IcebergSinkConnection {
663 catalog_connection_id: connection_id,
664 catalog_connection,
665 aws_connection_id,
666 aws_connection,
667 relation_key_indices,
668 key_desc_and_indices,
669 namespace,
670 table,
671 } = self;
672
673 let compatibility_checks = [
674 (
675 connection_id == &other.catalog_connection_id,
676 "connection_id",
677 ),
678 (
679 catalog_connection
680 .alter_compatible(id, &other.catalog_connection)
681 .is_ok(),
682 "catalog_connection",
683 ),
684 (
685 aws_connection_id == &other.aws_connection_id,
686 "aws_connection_id",
687 ),
688 (
689 aws_connection
690 .alter_compatible(id, &other.aws_connection)
691 .is_ok(),
692 "aws_connection",
693 ),
694 (
695 relation_key_indices == &other.relation_key_indices,
696 "relation_key_indices",
697 ),
698 (
699 key_desc_and_indices == &other.key_desc_and_indices,
700 "key_desc_and_indices",
701 ),
702 (namespace == &other.namespace, "namespace"),
703 (table == &other.table, "table"),
704 ];
705 for (compatible, field) in compatibility_checks {
706 if !compatible {
707 tracing::warn!(
708 "IcebergSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
709 self,
710 other
711 );
712
713 return Err(AlterError { id });
714 }
715 }
716
717 Ok(())
718 }
719}
720
721impl<R: ConnectionResolver> IntoInlineConnection<IcebergSinkConnection, R>
722 for IcebergSinkConnection<ReferencedConnection>
723{
724 fn into_inline_connection(self, r: R) -> IcebergSinkConnection {
725 let IcebergSinkConnection {
726 catalog_connection_id,
727 catalog_connection,
728 aws_connection_id,
729 aws_connection,
730 relation_key_indices,
731 key_desc_and_indices,
732 namespace,
733 table,
734 } = self;
735 IcebergSinkConnection {
736 catalog_connection_id,
737 catalog_connection: r
738 .resolve_connection(catalog_connection)
739 .unwrap_iceberg_catalog(),
740 aws_connection_id,
741 aws_connection: r.resolve_connection(aws_connection).unwrap_aws(),
742 relation_key_indices,
743 key_desc_and_indices,
744 namespace,
745 table,
746 }
747 }
748}