mz_storage_types/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to reporting changing collections out of `dataflow`.
11
12use std::borrow::Cow;
13use std::fmt::Debug;
14use std::time::Duration;
15
16use mz_dyncfg::ConfigSet;
17use mz_expr::MirScalarExpr;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::bytes::ByteSize;
20use mz_repr::{CatalogItemId, GlobalId, RelationDesc};
21use serde::{Deserialize, Serialize};
22use timely::PartialOrder;
23use timely::progress::frontier::Antichain;
24
25use crate::AlterCompatible;
26use crate::connections::inline::{
27    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
28    ReferencedConnection,
29};
30use crate::connections::{ConnectionContext, KafkaConnection, KafkaTopicOptions};
31use crate::controller::AlterError;
32
33pub mod s3_oneshot_sink;
34
35/// A sink for updates to a relational collection.
36#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
37pub struct StorageSinkDesc<S, T = mz_repr::Timestamp> {
38    pub from: GlobalId,
39    pub from_desc: RelationDesc,
40    pub connection: StorageSinkConnection,
41    pub with_snapshot: bool,
42    pub version: u64,
43    pub envelope: SinkEnvelope,
44    pub as_of: Antichain<T>,
45    pub from_storage_metadata: S,
46    pub to_storage_metadata: S,
47}
48
49impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible
50    for StorageSinkDesc<S, T>
51{
52    /// Determines if `self` is compatible with another `StorageSinkDesc`, in
53    /// such a way that it is possible to turn `self` into `other` through a
54    /// valid series of transformations.
55    ///
56    /// Currently, the only "valid transformation" is the passage of time such
57    /// that the sink's as ofs may differ. However, this will change once we
58    /// support `ALTER CONNECTION` or `ALTER SINK`.
59    fn alter_compatible(
60        &self,
61        id: GlobalId,
62        other: &StorageSinkDesc<S, T>,
63    ) -> Result<(), AlterError> {
64        if self == other {
65            return Ok(());
66        }
67        let StorageSinkDesc {
68            from,
69            from_desc,
70            connection,
71            envelope,
72            version: _,
73            // The as-of of the descriptions may differ.
74            as_of: _,
75            from_storage_metadata,
76            with_snapshot,
77            to_storage_metadata,
78        } = self;
79
80        let compatibility_checks = [
81            (from == &other.from, "from"),
82            (from_desc == &other.from_desc, "from_desc"),
83            (
84                connection.alter_compatible(id, &other.connection).is_ok(),
85                "connection",
86            ),
87            (envelope == &other.envelope, "envelope"),
88            // This can legally change from true to false once the snapshot has been
89            // written out.
90            (*with_snapshot || !other.with_snapshot, "with_snapshot"),
91            (
92                from_storage_metadata == &other.from_storage_metadata,
93                "from_storage_metadata",
94            ),
95            (
96                to_storage_metadata == &other.to_storage_metadata,
97                "to_storage_metadata",
98            ),
99        ];
100
101        for (compatible, field) in compatibility_checks {
102            if !compatible {
103                tracing::warn!(
104                    "StorageSinkDesc incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
105                    self,
106                    other
107                );
108
109                return Err(AlterError { id });
110            }
111        }
112
113        Ok(())
114    }
115}
116
117#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
118pub enum SinkEnvelope {
119    Debezium,
120    Upsert,
121}
122
123#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
124pub enum StorageSinkConnection<C: ConnectionAccess = InlinedConnection> {
125    Kafka(KafkaSinkConnection<C>),
126}
127
128impl<C: ConnectionAccess> StorageSinkConnection<C> {
129    /// Determines if `self` is compatible with another `StorageSinkConnection`,
130    /// in such a way that it is possible to turn `self` into `other` through a
131    /// valid series of transformations (e.g. no transformation or `ALTER
132    /// CONNECTION`).
133    pub fn alter_compatible(
134        &self,
135        id: GlobalId,
136        other: &StorageSinkConnection<C>,
137    ) -> Result<(), AlterError> {
138        if self == other {
139            return Ok(());
140        }
141        match (self, other) {
142            (StorageSinkConnection::Kafka(s), StorageSinkConnection::Kafka(o)) => {
143                s.alter_compatible(id, o)?
144            }
145        }
146
147        Ok(())
148    }
149}
150
151impl<R: ConnectionResolver> IntoInlineConnection<StorageSinkConnection, R>
152    for StorageSinkConnection<ReferencedConnection>
153{
154    fn into_inline_connection(self, r: R) -> StorageSinkConnection {
155        match self {
156            Self::Kafka(conn) => StorageSinkConnection::Kafka(conn.into_inline_connection(r)),
157        }
158    }
159}
160
161impl<C: ConnectionAccess> StorageSinkConnection<C> {
162    /// returns an option to not constrain ourselves in the future
163    pub fn connection_id(&self) -> Option<CatalogItemId> {
164        use StorageSinkConnection::*;
165        match self {
166            Kafka(KafkaSinkConnection { connection_id, .. }) => Some(*connection_id),
167        }
168    }
169
170    /// Returns the name of the sink connection.
171    pub fn name(&self) -> &'static str {
172        use StorageSinkConnection::*;
173        match self {
174            Kafka(_) => "kafka",
175        }
176    }
177}
178
179#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
180pub enum KafkaSinkCompressionType {
181    None,
182    Gzip,
183    Snappy,
184    Lz4,
185    Zstd,
186}
187
188impl KafkaSinkCompressionType {
189    /// Format the compression type as expected by `compression.type` librdkafka
190    /// setting.
191    pub fn to_librdkafka_option(&self) -> &'static str {
192        match self {
193            KafkaSinkCompressionType::None => "none",
194            KafkaSinkCompressionType::Gzip => "gzip",
195            KafkaSinkCompressionType::Snappy => "snappy",
196            KafkaSinkCompressionType::Lz4 => "lz4",
197            KafkaSinkCompressionType::Zstd => "zstd",
198        }
199    }
200}
201
202#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
203pub struct KafkaSinkConnection<C: ConnectionAccess = InlinedConnection> {
204    pub connection_id: CatalogItemId,
205    pub connection: C::Kafka,
206    pub format: KafkaSinkFormat<C>,
207    /// A natural key of the sinked relation (view or source).
208    pub relation_key_indices: Option<Vec<usize>>,
209    /// The user-specified key for the sink.
210    pub key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
211    /// The index of the column containing message headers value, if any.
212    pub headers_index: Option<usize>,
213    pub value_desc: RelationDesc,
214    /// An expression that, if present, computes a hash value that should be
215    /// used to determine the partition for each message.
216    pub partition_by: Option<MirScalarExpr>,
217    pub topic: String,
218    /// Options to use when creating the topic if it doesn't already exist.
219    pub topic_options: KafkaTopicOptions,
220    pub compression_type: KafkaSinkCompressionType,
221    pub progress_group_id: KafkaIdStyle,
222    pub transactional_id: KafkaIdStyle,
223    pub topic_metadata_refresh_interval: Duration,
224}
225
226impl KafkaSinkConnection {
227    /// Returns the client ID to register with librdkafka with.
228    ///
229    /// The caller is responsible for providing the sink ID as it is not known
230    /// to `KafkaSinkConnection`.
231    pub fn client_id(
232        &self,
233        configs: &ConfigSet,
234        connection_context: &ConnectionContext,
235        sink_id: GlobalId,
236    ) -> String {
237        let mut client_id =
238            KafkaConnection::id_base(connection_context, self.connection_id, sink_id);
239        self.connection.enrich_client_id(configs, &mut client_id);
240        client_id
241    }
242
243    /// Returns the name of the progress topic to use for the sink.
244    pub fn progress_topic(&self, connection_context: &ConnectionContext) -> Cow<'_, str> {
245        self.connection
246            .progress_topic(connection_context, self.connection_id)
247    }
248
249    /// Returns the ID for the consumer group the sink will use to read the
250    /// progress topic on resumption.
251    ///
252    /// The caller is responsible for providing the sink ID as it is not known
253    /// to `KafkaSinkConnection`.
254    pub fn progress_group_id(
255        &self,
256        connection_context: &ConnectionContext,
257        sink_id: GlobalId,
258    ) -> String {
259        match self.progress_group_id {
260            KafkaIdStyle::Prefix(ref prefix) => format!(
261                "{}{}",
262                prefix.as_deref().unwrap_or(""),
263                KafkaConnection::id_base(connection_context, self.connection_id, sink_id),
264            ),
265            KafkaIdStyle::Legacy => format!("materialize-bootstrap-sink-{sink_id}"),
266        }
267    }
268
269    /// Returns the transactional ID to use for the sink.
270    ///
271    /// The caller is responsible for providing the sink ID as it is not known
272    /// to `KafkaSinkConnection`.
273    pub fn transactional_id(
274        &self,
275        connection_context: &ConnectionContext,
276        sink_id: GlobalId,
277    ) -> String {
278        match self.transactional_id {
279            KafkaIdStyle::Prefix(ref prefix) => format!(
280                "{}{}",
281                prefix.as_deref().unwrap_or(""),
282                KafkaConnection::id_base(connection_context, self.connection_id, sink_id)
283            ),
284            KafkaIdStyle::Legacy => format!("mz-producer-{sink_id}-0"),
285        }
286    }
287}
288
289impl<C: ConnectionAccess> KafkaSinkConnection<C> {
290    /// Determines if `self` is compatible with another `StorageSinkConnection`,
291    /// in such a way that it is possible to turn `self` into `other` through a
292    /// valid series of transformations (e.g. no transformation or `ALTER
293    /// CONNECTION`).
294    pub fn alter_compatible(
295        &self,
296        id: GlobalId,
297        other: &KafkaSinkConnection<C>,
298    ) -> Result<(), AlterError> {
299        if self == other {
300            return Ok(());
301        }
302        let KafkaSinkConnection {
303            connection_id,
304            connection,
305            format,
306            relation_key_indices,
307            key_desc_and_indices,
308            headers_index,
309            value_desc,
310            partition_by,
311            topic,
312            compression_type,
313            progress_group_id,
314            transactional_id,
315            topic_options,
316            topic_metadata_refresh_interval,
317        } = self;
318
319        let compatibility_checks = [
320            (connection_id == &other.connection_id, "connection_id"),
321            (
322                connection.alter_compatible(id, &other.connection).is_ok(),
323                "connection",
324            ),
325            (format.alter_compatible(id, &other.format).is_ok(), "format"),
326            (
327                relation_key_indices == &other.relation_key_indices,
328                "relation_key_indices",
329            ),
330            (
331                key_desc_and_indices == &other.key_desc_and_indices,
332                "key_desc_and_indices",
333            ),
334            (headers_index == &other.headers_index, "headers_index"),
335            (value_desc == &other.value_desc, "value_desc"),
336            (partition_by == &other.partition_by, "partition_by"),
337            (topic == &other.topic, "topic"),
338            (
339                compression_type == &other.compression_type,
340                "compression_type",
341            ),
342            (
343                progress_group_id == &other.progress_group_id,
344                "progress_group_id",
345            ),
346            (
347                transactional_id == &other.transactional_id,
348                "transactional_id",
349            ),
350            (topic_options == &other.topic_options, "topic_config"),
351            (
352                topic_metadata_refresh_interval == &other.topic_metadata_refresh_interval,
353                "topic_metadata_refresh_interval",
354            ),
355        ];
356        for (compatible, field) in compatibility_checks {
357            if !compatible {
358                tracing::warn!(
359                    "KafkaSinkConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
360                    self,
361                    other
362                );
363
364                return Err(AlterError { id });
365            }
366        }
367
368        Ok(())
369    }
370}
371
372impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkConnection, R>
373    for KafkaSinkConnection<ReferencedConnection>
374{
375    fn into_inline_connection(self, r: R) -> KafkaSinkConnection {
376        let KafkaSinkConnection {
377            connection_id,
378            connection,
379            format,
380            relation_key_indices,
381            key_desc_and_indices,
382            headers_index,
383            value_desc,
384            partition_by,
385            topic,
386            compression_type,
387            progress_group_id,
388            transactional_id,
389            topic_options,
390            topic_metadata_refresh_interval,
391        } = self;
392        KafkaSinkConnection {
393            connection_id,
394            connection: r.resolve_connection(connection).unwrap_kafka(),
395            format: format.into_inline_connection(r),
396            relation_key_indices,
397            key_desc_and_indices,
398            headers_index,
399            value_desc,
400            partition_by,
401            topic,
402            compression_type,
403            progress_group_id,
404            transactional_id,
405            topic_options,
406            topic_metadata_refresh_interval,
407        }
408    }
409}
410
411#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
412pub enum KafkaIdStyle {
413    /// A new-style id that is optionally prefixed.
414    Prefix(Option<String>),
415    /// A legacy style id.
416    Legacy,
417}
418
419#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
420pub struct KafkaSinkFormat<C: ConnectionAccess = InlinedConnection> {
421    pub key_format: Option<KafkaSinkFormatType<C>>,
422    pub value_format: KafkaSinkFormatType<C>,
423}
424
425#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
426pub enum KafkaSinkFormatType<C: ConnectionAccess = InlinedConnection> {
427    Avro {
428        schema: String,
429        compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
430        csr_connection: C::Csr,
431    },
432    Json,
433    Text,
434    Bytes,
435}
436
437impl<C: ConnectionAccess> KafkaSinkFormatType<C> {
438    pub fn get_format_name(&self) -> &str {
439        match self {
440            Self::Avro { .. } => "avro",
441            Self::Json => "json",
442            Self::Text => "text",
443            Self::Bytes => "bytes",
444        }
445    }
446}
447
448impl<C: ConnectionAccess> KafkaSinkFormat<C> {
449    pub fn get_format_name<'a>(&'a self) -> Cow<'a, str> {
450        // For legacy reasons, if the key-format is none or the key & value formats are
451        // both the same (either avro or json), we return the value format name,
452        // otherwise we return a composite name.
453        match &self.key_format {
454            None => self.value_format.get_format_name().into(),
455            Some(key_format) => match (key_format, &self.value_format) {
456                (KafkaSinkFormatType::Avro { .. }, KafkaSinkFormatType::Avro { .. }) => {
457                    "avro".into()
458                }
459                (KafkaSinkFormatType::Json, KafkaSinkFormatType::Json) => "json".into(),
460                (keyf, valuef) => format!(
461                    "key-{}-value-{}",
462                    keyf.get_format_name(),
463                    valuef.get_format_name()
464                )
465                .into(),
466            },
467        }
468    }
469
470    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
471        if self == other {
472            return Ok(());
473        }
474
475        match (&self.value_format, &other.value_format) {
476            (
477                KafkaSinkFormatType::Avro {
478                    schema,
479                    compatibility_level: _,
480                    csr_connection,
481                },
482                KafkaSinkFormatType::Avro {
483                    schema: other_schema,
484                    compatibility_level: _,
485                    csr_connection: other_csr_connection,
486                },
487            ) => {
488                if schema != other_schema
489                    || csr_connection
490                        .alter_compatible(id, other_csr_connection)
491                        .is_err()
492                {
493                    tracing::warn!(
494                        "KafkaSinkFormat::Avro incompatible at value_format:\nself:\n{:#?}\n\nother\n{:#?}",
495                        self,
496                        other
497                    );
498
499                    return Err(AlterError { id });
500                }
501            }
502            (s, o) => {
503                if s != o {
504                    tracing::warn!(
505                        "KafkaSinkFormat incompatible at value_format:\nself:\n{:#?}\n\nother:{:#?}",
506                        s,
507                        o
508                    );
509                    return Err(AlterError { id });
510                }
511            }
512        }
513
514        match (&self.key_format, &other.key_format) {
515            (
516                Some(KafkaSinkFormatType::Avro {
517                    schema,
518                    compatibility_level: _,
519                    csr_connection,
520                }),
521                Some(KafkaSinkFormatType::Avro {
522                    schema: other_schema,
523                    compatibility_level: _,
524                    csr_connection: other_csr_connection,
525                }),
526            ) => {
527                if schema != other_schema
528                    || csr_connection
529                        .alter_compatible(id, other_csr_connection)
530                        .is_err()
531                {
532                    tracing::warn!(
533                        "KafkaSinkFormat::Avro incompatible at key_format:\nself:\n{:#?}\n\nother\n{:#?}",
534                        self,
535                        other
536                    );
537
538                    return Err(AlterError { id });
539                }
540            }
541            (s, o) => {
542                if s != o {
543                    tracing::warn!(
544                        "KafkaSinkFormat incompatible at key_format\nself:\n{:#?}\n\nother:{:#?}",
545                        s,
546                        o
547                    );
548                    return Err(AlterError { id });
549                }
550            }
551        }
552
553        Ok(())
554    }
555}
556
557impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormat, R>
558    for KafkaSinkFormat<ReferencedConnection>
559{
560    fn into_inline_connection(self, r: R) -> KafkaSinkFormat {
561        KafkaSinkFormat {
562            key_format: self.key_format.map(|f| f.into_inline_connection(&r)),
563            value_format: self.value_format.into_inline_connection(&r),
564        }
565    }
566}
567
568impl<R: ConnectionResolver> IntoInlineConnection<KafkaSinkFormatType, R>
569    for KafkaSinkFormatType<ReferencedConnection>
570{
571    fn into_inline_connection(self, r: R) -> KafkaSinkFormatType {
572        match self {
573            KafkaSinkFormatType::Avro {
574                schema,
575                compatibility_level,
576                csr_connection,
577            } => KafkaSinkFormatType::Avro {
578                schema,
579                compatibility_level,
580                csr_connection: r.resolve_connection(csr_connection).unwrap_csr(),
581            },
582            KafkaSinkFormatType::Json => KafkaSinkFormatType::Json,
583            KafkaSinkFormatType::Text => KafkaSinkFormatType::Text,
584            KafkaSinkFormatType::Bytes => KafkaSinkFormatType::Bytes,
585        }
586    }
587}
588
589#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
590pub enum S3SinkFormat {
591    /// Encoded using the PG `COPY` protocol, with one of its supported formats.
592    PgCopy(CopyFormatParams<'static>),
593    /// Encoded as Parquet.
594    Parquet,
595}
596
597/// Info required to copy the data to s3.
598#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
599pub struct S3UploadInfo {
600    /// The s3 uri path to write the data to.
601    pub uri: String,
602    /// The max file size of each file uploaded to S3.
603    pub max_file_size: u64,
604    /// The relation desc of the data to be uploaded to S3.
605    pub desc: RelationDesc,
606    /// The selected sink format.
607    pub format: S3SinkFormat,
608}
609
610pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
611pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);