1use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
20use mz_repr::bytes::ByteSize;
21use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
22use proptest::prelude::{Arbitrary, BoxedStrategy, Strategy, any};
23use proptest_derive::Arbitrary;
24use serde::{Deserialize, Serialize};
25use timely::PartialOrder;
26use timely::progress::frontier::Antichain;
27
28use crate::AlterCompatible;
29use crate::connections::inline::{
30 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
31 ReferencedConnection,
32};
33use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
34use crate::controller::{AlterError, CollectionMetadata};
35
36include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sinks.rs"));
37
38pub mod s3_oneshot_sink;
39
40#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
42pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
43 pub from: GlobalId,
44 pub from_desc: RelationDesc,
45 pub connection: StorageSinkConnection,
46 pub with_snapshot: bool,
47 pub version: u64,
48 pub envelope: SinkEnvelope,
49 pub as_of: Antichain<T>,
50 pub from_storage_metadata: S,
51 pub to_storage_metadata: S,
52}
53
54impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
55 for StorageSinkDesc<S, T>
56{
57 fn alter_compatible(
65 &self,
66 id: GlobalId,
67 other: &StorageSinkDesc<S, T>,
68 ) -> Result<(), AlterError> {
69 if self == other {
70 return Ok(());
71 }
72 let StorageSinkDesc {
73 from,
74 from_desc,
75 connection,
76 envelope,
77 version: _,
78 as_of: _,
80 from_storage_metadata,
81 with_snapshot,
82 to_storage_metadata,
83 } = self;
84
85 let compatibility_checks = [
86 (from == &other.from, "from"),
87 (from_desc == &other.from_desc, "from_desc"),
88 (
89 connection.alter_compatible(id, &other.connection).is_ok(),
90 "connection",
91 ),
92 (envelope == &other.envelope, "envelope"),
93 (*with_snapshot || !other.with_snapshot, "with_snapshot"),
96 (
97 from_storage_metadata == &other.from_storage_metadata,
98 "from_storage_metadata",
99 ),
100 (
101 to_storage_metadata == &other.to_storage_metadata,
102 "to_storage_metadata",
103 ),
104 ];
105
106 for (compatible, field) in compatibility_checks {
107 if !compatible {
108 tracing::warn!(
109 "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
110 self,
111 other
112 );
113
114 return Err(AlterError { id });
115 }
116 }
117
118 Ok(())
119 }
120}
121
122impl Arbitrary for StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp> {
123 type Strategy = BoxedStrategy<Self>;
124 type Parameters = ();
125
126 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
127 (
128 any::<GlobalId>(),
129 any::<RelationDesc>(),
130 any::<StorageSinkConnection>(),
131 any::<SinkEnvelope>(),
132 any::<Option<mz_repr::Timestamp>>(),
133 any::<CollectionMetadata>(),
134 any::<bool>(),
135 any::<u64>(),
136 any::<CollectionMetadata>(),
137 )
138 .prop_map(
139 |(
140 from,
141 from_desc,
142 connection,
143 envelope,
144 as_of,
145 from_storage_metadata,
146 with_snapshot,
147 version,
148 to_storage_metadata,
149 )| {
150 StorageSinkDesc {
151 from,
152 from_desc,
153 connection,
154 envelope,
155 version,
156 as_of: Antichain::from_iter(as_of),
157 from_storage_metadata,
158 with_snapshot,
159 to_storage_metadata,
160 }
161 },
162 )
163 .prop_filter("identical source and sink", |desc| {
164 desc.from_storage_metadata != desc.to_storage_metadata
165 })
166 .boxed()
167 }
168}
169
170impl RustType<ProtoStorageSinkDesc> for StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp> {
171 fn into_proto(&self) -> ProtoStorageSinkDesc {
172 ProtoStorageSinkDesc {
173 connection: Some(self.connection.into_proto()),
174 from: Some(self.from.into_proto()),
175 from_desc: Some(self.from_desc.into_proto()),
176 envelope: Some(self.envelope.into_proto()),
177 as_of: Some(self.as_of.into_proto()),
178 from_storage_metadata: Some(self.from_storage_metadata.into_proto()),
179 to_storage_metadata: Some(self.to_storage_metadata.into_proto()),
180 with_snapshot: self.with_snapshot,
181 version: self.version,
182 }
183 }
184
185 fn from_proto(proto: ProtoStorageSinkDesc) -> Result<Self, TryFromProtoError> {
186 Ok(StorageSinkDesc {
187 from: proto.from.into_rust_if_some("ProtoStorageSinkDesc::from")?,
188 from_desc: proto
189 .from_desc
190 .into_rust_if_some("ProtoStorageSinkDesc::from_desc")?,
191 connection: proto
192 .connection
193 .into_rust_if_some("ProtoStorageSinkDesc::connection")?,
194 envelope: proto
195 .envelope
196 .into_rust_if_some("ProtoStorageSinkDesc::envelope")?,
197 as_of: proto
198 .as_of
199 .into_rust_if_some("ProtoStorageSinkDesc::as_of")?,
200 from_storage_metadata: proto
201 .from_storage_metadata
202 .into_rust_if_some("ProtoStorageSinkDesc::from_storage_metadata")?,
203 with_snapshot: proto.with_snapshot,
204 version: proto.version,
205 to_storage_metadata: proto
206 .to_storage_metadata
207 .into_rust_if_some("ProtoStorageSinkDesc::to_storage_metadata")?,
208 })
209 }
210}
211
212#[derive(Arbitrary, Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
213pub enum SinkEnvelope {
214 Debezium,
215 Upsert,
216}
217
218impl RustType<ProtoSinkEnvelope> for SinkEnvelope {
219 fn into_proto(&self) -> ProtoSinkEnvelope {
220 use proto_sink_envelope::Kind;
221 ProtoSinkEnvelope {
222 kind: Some(match self {
223 SinkEnvelope::Debezium => Kind::Debezium(()),
224 SinkEnvelope::Upsert => Kind::Upsert(()),
225 }),
226 }
227 }
228
229 fn from_proto(proto: ProtoSinkEnvelope) -> Result<Self, TryFromProtoError> {
230 use proto_sink_envelope::Kind;
231 let kind = proto
232 .kind
233 .ok_or_else(|| TryFromProtoError::missing_field("ProtoSinkEnvelope::kind"))?;
234 Ok(match kind {
235 Kind::Debezium(()) => SinkEnvelope::Debezium,
236 Kind::Upsert(()) => SinkEnvelope::Upsert,
237 })
238 }
239}
240
241#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
242pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
243 Kafka(KafkaSinkConnection<C>),
244}
245
246impl<C: ConnectionAccess> StorageSinkConnection<C> {
247 pub fn alter_compatible(
252 &self,
253 id: GlobalId,
254 other: &StorageSinkConnection<C>,
255 ) -> Result<(), AlterError> {
256 if self == other {
257 return Ok(());
258 }
259 match (self, other) {
260 (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
261 s.alter_compatible(id, o)?
262 }
263 }
264
265 Ok(())
266 }
267}
268
269impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
270 for StorageSinkConnection<ReferencedConnection>
271{
272 fn into_inline_connection(self, r: R) -> StorageSinkConnection {
273 match self {
274 Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
275 }
276 }
277}
278
279impl RustType<ProtoStorageSinkConnection> for StorageSinkConnection {
280 fn into_proto(&self) -> ProtoStorageSinkConnection {
281 use proto_storage_sink_connection::Kind::*;
282
283 ProtoStorageSinkConnection {
284 kind: Some(match self {
285 Self::Kafka(conn) => KafkaV2(conn.into_proto()),
286 }),
287 }
288 }
289 fn from_proto(proto: ProtoStorageSinkConnection) -> Result<Self, TryFromProtoError> {
290 use proto_storage_sink_connection::Kind::*;
291
292 let kind = proto
293 .kind
294 .ok_or_else(|| TryFromProtoError::missing_field("ProtoStorageSinkConnection::kind"))?;
295
296 Ok(match kind {
297 KafkaV2(proto) => Self::Kafka(proto.into_rust()?),
298 })
299 }
300}
301
302impl<C: ConnectionAccess> StorageSinkConnection<C> {
303 pub fn connection_id(&self) -> Option<CatalogItemId> {
305 use StorageSinkConnection::*;
306 match self {
307 Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
308 }
309 }
310
311 pub fn name(&self) -> &'static str {
313 use StorageSinkConnection::*;
314 match self {
315 Kafka(_) => "kafka",
316 }
317 }
318}
319
320impl RustType<proto_kafka_sink_connection_v2::ProtoKeyDescAndIndices>
321 for (RelationDesc, Vec<usize>)
322{
323 fn into_proto(&self) -> proto_kafka_sink_connection_v2::ProtoKeyDescAndIndices {
324 proto_kafka_sink_connection_v2::ProtoKeyDescAndIndices {
325 desc: Some(self.0.into_proto()),
326 indices: self.1.into_proto(),
327 }
328 }
329
330 fn from_proto(
331 proto: proto_kafka_sink_connection_v2::ProtoKeyDescAndIndices,
332 ) -> Result<Self, TryFromProtoError> {
333 Ok((
334 proto
335 .desc
336 .into_rust_if_some("ProtoKeyDescAndIndices::desc")?,
337 proto.indices.into_rust()?,
338 ))
339 }
340}
341
342impl RustType<proto_kafka_sink_connection_v2::ProtoRelationKeyIndicesVec> for Vec<usize> {
343 fn into_proto(&self) -> proto_kafka_sink_connection_v2::ProtoRelationKeyIndicesVec {
344 proto_kafka_sink_connection_v2::ProtoRelationKeyIndicesVec {
345 relation_key_indices: self.into_proto(),
346 }
347 }
348
349 fn from_proto(
350 proto: proto_kafka_sink_connection_v2::ProtoRelationKeyIndicesVec,
351 ) -> Result<Self, TryFromProtoError> {
352 proto.relation_key_indices.into_rust()
353 }
354}
355
356#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
357pub enum KafkaSinkCompressionType {
358 None,
359 Gzip,
360 Snappy,
361 Lz4,
362 Zstd,
363}
364
365impl KafkaSinkCompressionType {
366 pub fn to_librdkafka_option(&self) -> &'static str {
369 match self {
370 KafkaSinkCompressionType::None => "none",
371 KafkaSinkCompressionType::Gzip => "gzip",
372 KafkaSinkCompressionType::Snappy => "snappy",
373 KafkaSinkCompressionType::Lz4 => "lz4",
374 KafkaSinkCompressionType::Zstd => "zstd",
375 }
376 }
377}
378
379#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
380pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
381 pub connection_id: CatalogItemId,
382 pub connection: C::Kafka,
383 pub format: KafkaSinkFormat<C>,
384 pub relation_key_indices: Option<Vec<usize>>,
386 pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
388 pub headers_index: Option<usize>,
390 pub value_desc: RelationDesc,
391 pub partition_by: Option<MirScalarExpr>,
394 pub topic: String,
395 pub topic_options: KafkaTopicOptions,
397 pub compression_type: KafkaSinkCompressionType,
398 pub progress_group_id: KafkaIdStyle,
399 pub transactional_id: KafkaIdStyle,
400 pub topic_metadata_refresh_interval: Duration,
401}
402
403impl KafkaSinkConnection {
404 pub fn client_id(
409 &self,
410 configs: &ConfigSet,
411 connection_context: &ConnectionContext,
412 sink_id: GlobalId,
413 ) -> String {
414 let mut client_id =
415 KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
416 self.connection.enrich_client_id(configs, &mut client_id);
417 client_id
418 }
419
420 pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<str> {
422 self.connection
423 .progress_topic(connection_context, self.connection_id)
424 }
425
426 pub fn progress_group_id(
432 &self,
433 connection_context: &ConnectionContext,
434 sink_id: GlobalId,
435 ) -> String {
436 match self.progress_group_id {
437 KafkaIdStyle::Prefix(ref prefix) => format!(
438 "{}{}",
439 prefix.as_deref().unwrap_or(""),
440 KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
441 ),
442 KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
443 }
444 }
445
446 pub fn transactional_id(
451 &self,
452 connection_context: &ConnectionContext,
453 sink_id: GlobalId,
454 ) -> String {
455 match self.transactional_id {
456 KafkaIdStyle::Prefix(ref prefix) => format!(
457 "{}{}",
458 prefix.as_deref().unwrap_or(""),
459 KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
460 ),
461 KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
462 }
463 }
464}
465
466impl<C: ConnectionAccess> KafkaSinkConnection<C> {
467 pub fn alter_compatible(
472 &self,
473 id: GlobalId,
474 other: &KafkaSinkConnection<C>,
475 ) -> Result<(), AlterError> {
476 if self == other {
477 return Ok(());
478 }
479 let KafkaSinkConnection {
480 connection_id,
481 connection,
482 format,
483 relation_key_indices,
484 key_desc_and_indices,
485 headers_index,
486 value_desc,
487 partition_by,
488 topic,
489 compression_type,
490 progress_group_id,
491 transactional_id,
492 topic_options,
493 topic_metadata_refresh_interval,
494 } = self;
495
496 let compatibility_checks = [
497 (connection_id == &other.connection_id, "connection_id"),
498 (
499 connection.alter_compatible(id, &other.connection).is_ok(),
500 "connection",
501 ),
502 (format.alter_compatible(id, &other.format).is_ok(), "format"),
503 (
504 relation_key_indices == &other.relation_key_indices,
505 "relation_key_indices",
506 ),
507 (
508 key_desc_and_indices == &other.key_desc_and_indices,
509 "key_desc_and_indices",
510 ),
511 (headers_index == &other.headers_index, "headers_index"),
512 (value_desc == &other.value_desc, "value_desc"),
513 (partition_by == &other.partition_by, "partition_by"),
514 (topic == &other.topic, "topic"),
515 (
516 compression_type == &other.compression_type,
517 "compression_type",
518 ),
519 (
520 progress_group_id == &other.progress_group_id,
521 "progress_group_id",
522 ),
523 (
524 transactional_id == &other.transactional_id,
525 "transactional_id",
526 ),
527 (topic_options == &other.topic_options, "topic_config"),
528 (
529 topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
530 "topic_metadata_refresh_interval",
531 ),
532 ];
533 for (compatible, field) in compatibility_checks {
534 if !compatible {
535 tracing::warn!(
536 "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
537 self,
538 other
539 );
540
541 return Err(AlterError { id });
542 }
543 }
544
545 Ok(())
546 }
547}
548
549impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
550 for KafkaSinkConnection<ReferencedConnection>
551{
552 fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
553 let KafkaSinkConnection {
554 connection_id,
555 connection,
556 format,
557 relation_key_indices,
558 key_desc_and_indices,
559 headers_index,
560 value_desc,
561 partition_by,
562 topic,
563 compression_type,
564 progress_group_id,
565 transactional_id,
566 topic_options,
567 topic_metadata_refresh_interval,
568 } = self;
569 KafkaSinkConnection {
570 connection_id,
571 connection: r.resolve_connection(connection).unwrap_kafka(),
572 format: format.into_inline_connection(r),
573 relation_key_indices,
574 key_desc_and_indices,
575 headers_index,
576 value_desc,
577 partition_by,
578 topic,
579 compression_type,
580 progress_group_id,
581 transactional_id,
582 topic_options,
583 topic_metadata_refresh_interval,
584 }
585 }
586}
587
588#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
589pub enum KafkaIdStyle {
590 Prefix(Option<String>),
592 Legacy,
594}
595
596impl RustType<ProtoKafkaIdStyle> for KafkaIdStyle {
597 fn into_proto(&self) -> ProtoKafkaIdStyle {
598 use crate::sinks::proto_kafka_id_style::Kind::*;
599 use crate::sinks::proto_kafka_id_style::ProtoKafkaIdStylePrefix;
600
601 ProtoKafkaIdStyle {
602 kind: Some(match self {
603 Self::Prefix(prefix) => Prefix(ProtoKafkaIdStylePrefix {
604 prefix: prefix.into_proto(),
605 }),
606 Self::Legacy => Legacy(()),
607 }),
608 }
609 }
610 fn from_proto(proto: ProtoKafkaIdStyle) -> Result<Self, TryFromProtoError> {
611 use crate::sinks::proto_kafka_id_style::Kind::*;
612
613 let kind = proto
614 .kind
615 .ok_or_else(|| TryFromProtoError::missing_field("ProtoKafkaIdStyle::kind"))?;
616
617 Ok(match kind {
618 Prefix(prefix) => Self::Prefix(prefix.prefix.into_rust()?),
619 Legacy(()) => Self::Legacy,
620 })
621 }
622}
623
624impl RustType<ProtoKafkaSinkConnectionV2> for KafkaSinkConnection {
625 fn into_proto(&self) -> ProtoKafkaSinkConnectionV2 {
626 use crate::sinks::proto_kafka_sink_connection_v2::CompressionType;
627 ProtoKafkaSinkConnectionV2 {
628 connection_id: Some(self.connection_id.into_proto()),
629 connection: Some(self.connection.into_proto()),
630 format: Some(self.format.into_proto()),
631 key_desc_and_indices: self.key_desc_and_indices.into_proto(),
632 relation_key_indices: self.relation_key_indices.into_proto(),
633 headers_index: self.headers_index.into_proto(),
634 value_desc: Some(self.value_desc.into_proto()),
635 partition_by: self.partition_by.into_proto(),
636 topic: self.topic.clone(),
637 compression_type: Some(match self.compression_type {
638 KafkaSinkCompressionType::None => CompressionType::None(()),
639 KafkaSinkCompressionType::Gzip => CompressionType::Gzip(()),
640 KafkaSinkCompressionType::Snappy => CompressionType::Snappy(()),
641 KafkaSinkCompressionType::Lz4 => CompressionType::Lz4(()),
642 KafkaSinkCompressionType::Zstd => CompressionType::Zstd(()),
643 }),
644 progress_group_id: Some(self.progress_group_id.into_proto()),
645 transactional_id: Some(self.transactional_id.into_proto()),
646 topic_options: Some(self.topic_options.into_proto()),
647 topic_metadata_refresh_interval: Some(
648 self.topic_metadata_refresh_interval.into_proto(),
649 ),
650 }
651 }
652
653 fn from_proto(proto: ProtoKafkaSinkConnectionV2) -> Result<Self, TryFromProtoError> {
654 use crate::sinks::proto_kafka_sink_connection_v2::CompressionType;
655 Ok(KafkaSinkConnection {
656 connection_id: proto
657 .connection_id
658 .into_rust_if_some("ProtoKafkaSinkConnectionV2::connection_id")?,
659 connection: proto
660 .connection
661 .into_rust_if_some("ProtoKafkaSinkConnectionV2::connection")?,
662 format: proto
663 .format
664 .into_rust_if_some("ProtoKafkaSinkConnectionV2::format")?,
665 key_desc_and_indices: proto.key_desc_and_indices.into_rust()?,
666 relation_key_indices: proto.relation_key_indices.into_rust()?,
667 headers_index: proto.headers_index.into_rust()?,
668 value_desc: proto
669 .value_desc
670 .into_rust_if_some("ProtoKafkaSinkConnectionV2::value_desc")?,
671 partition_by: proto.partition_by.into_rust()?,
672 topic: proto.topic,
673 compression_type: match proto.compression_type {
674 Some(CompressionType::None(())) => KafkaSinkCompressionType::None,
675 Some(CompressionType::Gzip(())) => KafkaSinkCompressionType::Gzip,
676 Some(CompressionType::Snappy(())) => KafkaSinkCompressionType::Snappy,
677 Some(CompressionType::Lz4(())) => KafkaSinkCompressionType::Lz4,
678 Some(CompressionType::Zstd(())) => KafkaSinkCompressionType::Zstd,
679 None => {
680 return Err(TryFromProtoError::missing_field(
681 "ProtoKafkaSinkConnectionV2::compression_type",
682 ));
683 }
684 },
685 progress_group_id: proto
686 .progress_group_id
687 .into_rust_if_some("ProtoKafkaSinkConnectionV2::progress_group_id")?,
688 transactional_id: proto
689 .transactional_id
690 .into_rust_if_some("ProtoKafkaSinkConnectionV2::transactional_id")?,
691 topic_options: match proto.topic_options {
692 Some(topic_options) => topic_options.into_rust()?,
693 None => Default::default(),
694 },
695 topic_metadata_refresh_interval: proto
696 .topic_metadata_refresh_interval
697 .into_rust_if_some("ProtoKafkaSinkConnectionV2::topic_metadata_refresh_interval")?,
698 })
699 }
700}
701
702#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
703pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
704 pub key_format: Option<KafkaSinkFormatType<C>>,
705 pub value_format: KafkaSinkFormatType<C>,
706}
707
708#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
709pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
710 Avro {
711 schema: String,
712 compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
713 csr_connection: C::Csr,
714 },
715 Json,
716 Text,
717 Bytes,
718}
719
720impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
721 pub fn get_format_name(&self) -> &str {
722 match self {
723 Self::Avro { .. } => "avro",
724 Self::Json => "json",
725 Self::Text => "text",
726 Self::Bytes => "bytes",
727 }
728 }
729}
730
731impl<C: ConnectionAccess> KafkaSinkFormat<C> {
732 pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
733 match &self.key_format {
737 None => self.value_format.get_format_name().into(),
738 Some(key_format) => match (key_format, &self.value_format) {
739 (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
740 "avro".into()
741 }
742 (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
743 (keyf, valuef) => format!(
744 "key-{}-value-{}",
745 keyf.get_format_name(),
746 valuef.get_format_name()
747 )
748 .into(),
749 },
750 }
751 }
752
753 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
754 if self == other {
755 return Ok(());
756 }
757
758 match (&self.value_format, &other.value_format) {
759 (
760 KafkaSinkFormatType::Avro {
761 schema,
762 compatibility_level: _,
763 csr_connection,
764 },
765 KafkaSinkFormatType::Avro {
766 schema: other_schema,
767 compatibility_level: _,
768 csr_connection: other_csr_connection,
769 },
770 ) => {
771 if schema != other_schema
772 || csr_connection
773 .alter_compatible(id, other_csr_connection)
774 .is_err()
775 {
776 tracing::warn!(
777 "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
778 self,
779 other
780 );
781
782 return Err(AlterError { id });
783 }
784 }
785 (s, o) => {
786 if s != o {
787 tracing::warn!(
788 "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
789 s,
790 o
791 );
792 return Err(AlterError { id });
793 }
794 }
795 }
796
797 match (&self.key_format, &other.key_format) {
798 (
799 Some(KafkaSinkFormatType::Avro {
800 schema,
801 compatibility_level: _,
802 csr_connection,
803 }),
804 Some(KafkaSinkFormatType::Avro {
805 schema: other_schema,
806 compatibility_level: _,
807 csr_connection: other_csr_connection,
808 }),
809 ) => {
810 if schema != other_schema
811 || csr_connection
812 .alter_compatible(id, other_csr_connection)
813 .is_err()
814 {
815 tracing::warn!(
816 "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
817 self,
818 other
819 );
820
821 return Err(AlterError { id });
822 }
823 }
824 (s, o) => {
825 if s != o {
826 tracing::warn!(
827 "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
828 s,
829 o
830 );
831 return Err(AlterError { id });
832 }
833 }
834 }
835
836 Ok(())
837 }
838}
839
840impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
841 for KafkaSinkFormat<ReferencedConnection>
842{
843 fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
844 KafkaSinkFormat {
845 key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
846 value_format: self.value_format.into_inline_connection(&r),
847 }
848 }
849}
850
851impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
852 for KafkaSinkFormatType<ReferencedConnection>
853{
854 fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
855 match self {
856 KafkaSinkFormatType::Avro {
857 schema,
858 compatibility_level,
859 csr_connection,
860 } => KafkaSinkFormatType::Avro {
861 schema,
862 compatibility_level,
863 csr_connection: r.resolve_connection(csr_connection).unwrap_csr(),
864 },
865 KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
866 KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
867 KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
868 }
869 }
870}
871
872impl RustType<ProtoKafkaSinkFormatType> for KafkaSinkFormatType {
873 fn into_proto(&self) -> ProtoKafkaSinkFormatType {
874 use proto_kafka_sink_format_type::Type;
875 ProtoKafkaSinkFormatType {
876 r#type: Some(match self {
877 Self::Avro {
878 schema,
879 compatibility_level,
880 csr_connection,
881 } => Type::Avro(proto_kafka_sink_format_type::ProtoKafkaSinkAvroFormat {
882 schema: schema.clone(),
883 compatibility_level: csr_compat_level_to_proto(compatibility_level),
884 csr_connection: Some(csr_connection.into_proto()),
885 }),
886 Self::Json => Type::Json(()),
887 Self::Text => Type::Text(()),
888 Self::Bytes => Type::Bytes(()),
889 }),
890 }
891 }
892
893 fn from_proto(proto: ProtoKafkaSinkFormatType) -> Result<Self, TryFromProtoError> {
894 use proto_kafka_sink_format_type::Type;
895 let r#type = proto
896 .r#type
897 .ok_or_else(|| TryFromProtoError::missing_field("ProtoKafkaSinkFormatType::type"))?;
898
899 Ok(match r#type {
900 Type::Avro(proto) => Self::Avro {
901 schema: proto.schema,
902 compatibility_level: csr_compat_level_from_proto(proto.compatibility_level),
903 csr_connection: proto
904 .csr_connection
905 .into_rust_if_some("ProtoKafkaSinkFormatType::csr_connection")?,
906 },
907 Type::Json(()) => Self::Json,
908 Type::Text(()) => Self::Text,
909 Type::Bytes(()) => Self::Bytes,
910 })
911 }
912}
913
914impl RustType<ProtoKafkaSinkFormat> for KafkaSinkFormat {
915 fn into_proto(&self) -> ProtoKafkaSinkFormat {
916 ProtoKafkaSinkFormat {
917 key_format: self.key_format.as_ref().map(|f| f.into_proto()),
918 value_format: Some(self.value_format.into_proto()),
919 }
920 }
921
922 fn from_proto(proto: ProtoKafkaSinkFormat) -> Result<Self, TryFromProtoError> {
923 Ok(KafkaSinkFormat {
924 key_format: proto.key_format.into_rust()?,
925 value_format: proto
926 .value_format
927 .into_rust_if_some("ProtoKafkaSinkFormat::value_format")?,
928 })
929 }
930}
931
932fn csr_compat_level_to_proto(compatibility_level: &Option<mz_ccsr::CompatibilityLevel>) -> i32 {
933 use proto_kafka_sink_format_type::proto_kafka_sink_avro_format::CompatibilityLevel as ProtoCompatLevel;
934 match compatibility_level {
935 Some(level) => match level {
936 mz_ccsr::CompatibilityLevel::Backward => ProtoCompatLevel::Backward,
937 mz_ccsr::CompatibilityLevel::BackwardTransitive => ProtoCompatLevel::BackwardTransitive,
938 mz_ccsr::CompatibilityLevel::Forward => ProtoCompatLevel::Forward,
939 mz_ccsr::CompatibilityLevel::ForwardTransitive => ProtoCompatLevel::ForwardTransitive,
940 mz_ccsr::CompatibilityLevel::Full => ProtoCompatLevel::Full,
941 mz_ccsr::CompatibilityLevel::FullTransitive => ProtoCompatLevel::FullTransitive,
942 mz_ccsr::CompatibilityLevel::None => ProtoCompatLevel::None,
943 },
944 None => ProtoCompatLevel::Unset,
945 }
946 .into()
947}
948
949fn csr_compat_level_from_proto(val: i32) -> Option<mz_ccsr::CompatibilityLevel> {
950 use proto_kafka_sink_format_type::proto_kafka_sink_avro_format::CompatibilityLevel as ProtoCompatLevel;
951 match ProtoCompatLevel::try_from(val) {
952 Ok(ProtoCompatLevel::Backward) => Some(mz_ccsr::CompatibilityLevel::Backward),
953 Ok(ProtoCompatLevel::BackwardTransitive) => {
954 Some(mz_ccsr::CompatibilityLevel::BackwardTransitive)
955 }
956 Ok(ProtoCompatLevel::Forward) => Some(mz_ccsr::CompatibilityLevel::Forward),
957 Ok(ProtoCompatLevel::ForwardTransitive) => {
958 Some(mz_ccsr::CompatibilityLevel::ForwardTransitive)
959 }
960 Ok(ProtoCompatLevel::Full) => Some(mz_ccsr::CompatibilityLevel::Full),
961 Ok(ProtoCompatLevel::FullTransitive) => Some(mz_ccsr::CompatibilityLevel::FullTransitive),
962 Ok(ProtoCompatLevel::None) => Some(mz_ccsr::CompatibilityLevel::None),
963 Ok(ProtoCompatLevel::Unset) => None,
964 Err(_) => None,
965 }
966}
967
968#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
969pub enum S3SinkFormat {
970 PgCopy(CopyFormatParams<'static>),
972 Parquet,
974}
975
976impl RustType<ProtoS3SinkFormat> for S3SinkFormat {
977 fn into_proto(&self) -> ProtoS3SinkFormat {
978 use proto_s3_sink_format::Kind;
979 ProtoS3SinkFormat {
980 kind: Some(match self {
981 Self::PgCopy(params) => Kind::PgCopy(params.into_proto()),
982 Self::Parquet => Kind::Parquet(()),
983 }),
984 }
985 }
986
987 fn from_proto(proto: ProtoS3SinkFormat) -> Result<Self, TryFromProtoError> {
988 use proto_s3_sink_format::Kind;
989 let kind = proto
990 .kind
991 .ok_or_else(|| TryFromProtoError::missing_field("ProtoS3SinkFormat::kind"))?;
992
993 Ok(match kind {
994 Kind::PgCopy(proto) => Self::PgCopy(proto.into_rust()?),
995 Kind::Parquet(_) => Self::Parquet,
996 })
997 }
998}
999
1000#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
1002pub struct S3UploadInfo {
1003 pub uri: String,
1005 pub max_file_size: u64,
1007 pub desc: RelationDesc,
1009 pub format: S3SinkFormat,
1011}
1012
1013impl RustType<ProtoS3UploadInfo> for S3UploadInfo {
1014 fn into_proto(&self) -> ProtoS3UploadInfo {
1015 ProtoS3UploadInfo {
1016 uri: self.uri.clone(),
1017 max_file_size: self.max_file_size,
1018 desc: Some(self.desc.into_proto()),
1019 format: Some(self.format.into_proto()),
1020 }
1021 }
1022
1023 fn from_proto(proto: ProtoS3UploadInfo) -> Result<Self, TryFromProtoError> {
1024 Ok(S3UploadInfo {
1025 uri: proto.uri,
1026 max_file_size: proto.max_file_size,
1027 desc: proto.desc.into_rust_if_some("ProtoS3UploadInfo::desc")?,
1028 format: proto
1029 .format
1030 .into_rust_if_some("ProtoS3UploadInfo::format")?,
1031 })
1032 }
1033}
1034
1035pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
1036pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);