1use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::bytes::ByteSize;
20use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
21use serde::{Deserialize, Serialize};
22use timely::PartialOrder;
23use timely::progress::frontier::Antichain;
24
25use crate::AlterCompatible;
26use crate::connections::inline::{
27 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
28 ReferencedConnection,
29};
30use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
31use crate::controller::AlterError;
32
33pub mod s3_oneshot_sink;
34
35#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
37pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
38 pub from: GlobalId,
39 pub from_desc: RelationDesc,
40 pub connection: StorageSinkConnection,
41 pub with_snapshot: bool,
42 pub version: u64,
43 pub envelope: SinkEnvelope,
44 pub as_of: Antichain<T>,
45 pub from_storage_metadata: S,
46 pub to_storage_metadata: S,
47}
48
49impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
50 for StorageSinkDesc<S, T>
51{
52 fn alter_compatible(
60 &self,
61 id: GlobalId,
62 other: &StorageSinkDesc<S, T>,
63 ) -> Result<(), AlterError> {
64 if self == other {
65 return Ok(());
66 }
67 let StorageSinkDesc {
68 from,
69 from_desc,
70 connection,
71 envelope,
72 version: _,
73 as_of: _,
75 from_storage_metadata,
76 with_snapshot,
77 to_storage_metadata,
78 } = self;
79
80 let compatibility_checks = [
81 (from == &other.from, "from"),
82 (from_desc == &other.from_desc, "from_desc"),
83 (
84 connection.alter_compatible(id, &other.connection).is_ok(),
85 "connection",
86 ),
87 (envelope == &other.envelope, "envelope"),
88 (*with_snapshot || !other.with_snapshot, "with_snapshot"),
91 (
92 from_storage_metadata == &other.from_storage_metadata,
93 "from_storage_metadata",
94 ),
95 (
96 to_storage_metadata == &other.to_storage_metadata,
97 "to_storage_metadata",
98 ),
99 ];
100
101 for (compatible, field) in compatibility_checks {
102 if !compatible {
103 tracing::warn!(
104 "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
105 self,
106 other
107 );
108
109 return Err(AlterError { id });
110 }
111 }
112
113 Ok(())
114 }
115}
116
117#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
118pub enum SinkEnvelope {
119 Debezium,
120 Upsert,
121}
122
123#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
124pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
125 Kafka(KafkaSinkConnection<C>),
126}
127
128impl<C: ConnectionAccess> StorageSinkConnection<C> {
129 pub fn alter_compatible(
134 &self,
135 id: GlobalId,
136 other: &StorageSinkConnection<C>,
137 ) -> Result<(), AlterError> {
138 if self == other {
139 return Ok(());
140 }
141 match (self, other) {
142 (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
143 s.alter_compatible(id, o)?
144 }
145 }
146
147 Ok(())
148 }
149}
150
151impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
152 for StorageSinkConnection<ReferencedConnection>
153{
154 fn into_inline_connection(self, r: R) -> StorageSinkConnection {
155 match self {
156 Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
157 }
158 }
159}
160
161impl<C: ConnectionAccess> StorageSinkConnection<C> {
162 pub fn connection_id(&self) -> Option<CatalogItemId> {
164 use StorageSinkConnection::*;
165 match self {
166 Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
167 }
168 }
169
170 pub fn name(&self) -> &'static str {
172 use StorageSinkConnection::*;
173 match self {
174 Kafka(_) => "kafka",
175 }
176 }
177}
178
179#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
180pub enum KafkaSinkCompressionType {
181 None,
182 Gzip,
183 Snappy,
184 Lz4,
185 Zstd,
186}
187
188impl KafkaSinkCompressionType {
189 pub fn to_librdkafka_option(&self) -> &'static str {
192 match self {
193 KafkaSinkCompressionType::None => "none",
194 KafkaSinkCompressionType::Gzip => "gzip",
195 KafkaSinkCompressionType::Snappy => "snappy",
196 KafkaSinkCompressionType::Lz4 => "lz4",
197 KafkaSinkCompressionType::Zstd => "zstd",
198 }
199 }
200}
201
202#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
203pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
204 pub connection_id: CatalogItemId,
205 pub connection: C::Kafka,
206 pub format: KafkaSinkFormat<C>,
207 pub relation_key_indices: Option<Vec<usize>>,
209 pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
211 pub headers_index: Option<usize>,
213 pub value_desc: RelationDesc,
214 pub partition_by: Option<MirScalarExpr>,
217 pub topic: String,
218 pub topic_options: KafkaTopicOptions,
220 pub compression_type: KafkaSinkCompressionType,
221 pub progress_group_id: KafkaIdStyle,
222 pub transactional_id: KafkaIdStyle,
223 pub topic_metadata_refresh_interval: Duration,
224}
225
226impl KafkaSinkConnection {
227 pub fn client_id(
232 &self,
233 configs: &ConfigSet,
234 connection_context: &ConnectionContext,
235 sink_id: GlobalId,
236 ) -> String {
237 let mut client_id =
238 KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
239 self.connection.enrich_client_id(configs, &mut client_id);
240 client_id
241 }
242
243 pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<'_, str> {
245 self.connection
246 .progress_topic(connection_context, self.connection_id)
247 }
248
249 pub fn progress_group_id(
255 &self,
256 connection_context: &ConnectionContext,
257 sink_id: GlobalId,
258 ) -> String {
259 match self.progress_group_id {
260 KafkaIdStyle::Prefix(ref prefix) => format!(
261 "{}{}",
262 prefix.as_deref().unwrap_or(""),
263 KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
264 ),
265 KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
266 }
267 }
268
269 pub fn transactional_id(
274 &self,
275 connection_context: &ConnectionContext,
276 sink_id: GlobalId,
277 ) -> String {
278 match self.transactional_id {
279 KafkaIdStyle::Prefix(ref prefix) => format!(
280 "{}{}",
281 prefix.as_deref().unwrap_or(""),
282 KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
283 ),
284 KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
285 }
286 }
287}
288
289impl<C: ConnectionAccess> KafkaSinkConnection<C> {
290 pub fn alter_compatible(
295 &self,
296 id: GlobalId,
297 other: &KafkaSinkConnection<C>,
298 ) -> Result<(), AlterError> {
299 if self == other {
300 return Ok(());
301 }
302 let KafkaSinkConnection {
303 connection_id,
304 connection,
305 format,
306 relation_key_indices,
307 key_desc_and_indices,
308 headers_index,
309 value_desc,
310 partition_by,
311 topic,
312 compression_type,
313 progress_group_id,
314 transactional_id,
315 topic_options,
316 topic_metadata_refresh_interval,
317 } = self;
318
319 let compatibility_checks = [
320 (connection_id == &other.connection_id, "connection_id"),
321 (
322 connection.alter_compatible(id, &other.connection).is_ok(),
323 "connection",
324 ),
325 (format.alter_compatible(id, &other.format).is_ok(), "format"),
326 (
327 relation_key_indices == &other.relation_key_indices,
328 "relation_key_indices",
329 ),
330 (
331 key_desc_and_indices == &other.key_desc_and_indices,
332 "key_desc_and_indices",
333 ),
334 (headers_index == &other.headers_index, "headers_index"),
335 (value_desc == &other.value_desc, "value_desc"),
336 (partition_by == &other.partition_by, "partition_by"),
337 (topic == &other.topic, "topic"),
338 (
339 compression_type == &other.compression_type,
340 "compression_type",
341 ),
342 (
343 progress_group_id == &other.progress_group_id,
344 "progress_group_id",
345 ),
346 (
347 transactional_id == &other.transactional_id,
348 "transactional_id",
349 ),
350 (topic_options == &other.topic_options, "topic_config"),
351 (
352 topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
353 "topic_metadata_refresh_interval",
354 ),
355 ];
356 for (compatible, field) in compatibility_checks {
357 if !compatible {
358 tracing::warn!(
359 "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
360 self,
361 other
362 );
363
364 return Err(AlterError { id });
365 }
366 }
367
368 Ok(())
369 }
370}
371
372impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
373 for KafkaSinkConnection<ReferencedConnection>
374{
375 fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
376 let KafkaSinkConnection {
377 connection_id,
378 connection,
379 format,
380 relation_key_indices,
381 key_desc_and_indices,
382 headers_index,
383 value_desc,
384 partition_by,
385 topic,
386 compression_type,
387 progress_group_id,
388 transactional_id,
389 topic_options,
390 topic_metadata_refresh_interval,
391 } = self;
392 KafkaSinkConnection {
393 connection_id,
394 connection: r.resolve_connection(connection).unwrap_kafka(),
395 format: format.into_inline_connection(r),
396 relation_key_indices,
397 key_desc_and_indices,
398 headers_index,
399 value_desc,
400 partition_by,
401 topic,
402 compression_type,
403 progress_group_id,
404 transactional_id,
405 topic_options,
406 topic_metadata_refresh_interval,
407 }
408 }
409}
410
411#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
412pub enum KafkaIdStyle {
413 Prefix(Option<String>),
415 Legacy,
417}
418
419#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
420pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
421 pub key_format: Option<KafkaSinkFormatType<C>>,
422 pub value_format: KafkaSinkFormatType<C>,
423}
424
425#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
426pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
427 Avro {
428 schema: String,
429 compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
430 csr_connection: C::Csr,
431 },
432 Json,
433 Text,
434 Bytes,
435}
436
437impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
438 pub fn get_format_name(&self) -> &str {
439 match self {
440 Self::Avro { .. } => "avro",
441 Self::Json => "json",
442 Self::Text => "text",
443 Self::Bytes => "bytes",
444 }
445 }
446}
447
448impl<C: ConnectionAccess> KafkaSinkFormat<C> {
449 pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
450 match &self.key_format {
454 None => self.value_format.get_format_name().into(),
455 Some(key_format) => match (key_format, &self.value_format) {
456 (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
457 "avro".into()
458 }
459 (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
460 (keyf, valuef) => format!(
461 "key-{}-value-{}",
462 keyf.get_format_name(),
463 valuef.get_format_name()
464 )
465 .into(),
466 },
467 }
468 }
469
470 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
471 if self == other {
472 return Ok(());
473 }
474
475 match (&self.value_format, &other.value_format) {
476 (
477 KafkaSinkFormatType::Avro {
478 schema,
479 compatibility_level: _,
480 csr_connection,
481 },
482 KafkaSinkFormatType::Avro {
483 schema: other_schema,
484 compatibility_level: _,
485 csr_connection: other_csr_connection,
486 },
487 ) => {
488 if schema != other_schema
489 || csr_connection
490 .alter_compatible(id, other_csr_connection)
491 .is_err()
492 {
493 tracing::warn!(
494 "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
495 self,
496 other
497 );
498
499 return Err(AlterError { id });
500 }
501 }
502 (s, o) => {
503 if s != o {
504 tracing::warn!(
505 "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
506 s,
507 o
508 );
509 return Err(AlterError { id });
510 }
511 }
512 }
513
514 match (&self.key_format, &other.key_format) {
515 (
516 Some(KafkaSinkFormatType::Avro {
517 schema,
518 compatibility_level: _,
519 csr_connection,
520 }),
521 Some(KafkaSinkFormatType::Avro {
522 schema: other_schema,
523 compatibility_level: _,
524 csr_connection: other_csr_connection,
525 }),
526 ) => {
527 if schema != other_schema
528 || csr_connection
529 .alter_compatible(id, other_csr_connection)
530 .is_err()
531 {
532 tracing::warn!(
533 "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
534 self,
535 other
536 );
537
538 return Err(AlterError { id });
539 }
540 }
541 (s, o) => {
542 if s != o {
543 tracing::warn!(
544 "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
545 s,
546 o
547 );
548 return Err(AlterError { id });
549 }
550 }
551 }
552
553 Ok(())
554 }
555}
556
557impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
558 for KafkaSinkFormat<ReferencedConnection>
559{
560 fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
561 KafkaSinkFormat {
562 key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
563 value_format: self.value_format.into_inline_connection(&r),
564 }
565 }
566}
567
568impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
569 for KafkaSinkFormatType<ReferencedConnection>
570{
571 fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
572 match self {
573 KafkaSinkFormatType::Avro {
574 schema,
575 compatibility_level,
576 csr_connection,
577 } => KafkaSinkFormatType::Avro {
578 schema,
579 compatibility_level,
580 csr_connection: r.resolve_connection(csr_connection).unwrap_csr(),
581 },
582 KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
583 KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
584 KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
585 }
586 }
587}
588
589#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
590pub enum S3SinkFormat {
591 PgCopy(CopyFormatParams<'static>),
593 Parquet,
595}
596
597#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
599pub struct S3UploadInfo {
600 pub uri: String,
602 pub max_file_size: u64,
604 pub desc: RelationDesc,
606 pub format: S3SinkFormat,
608}
609
610pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
611pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);