mz_storage_types/sources/
encoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to the *decoding* of data for sources.
11
12use anyhow::Context;
13use mz_interchange::{avro, protobuf};
14use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
15use mz_repr::adt::regex::any_regex;
16use mz_repr::{ColumnType, GlobalId, RelationDesc, ScalarType};
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize};
19
20use crate::AlterCompatible;
21use crate::connections::inline::{
22    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
23    ReferencedConnection,
24};
25use crate::controller::AlterError;
26
27include!(concat!(
28    env!("OUT_DIR"),
29    "/mz_storage_types.sources.encoding.rs"
30));
31
32/// A description of how to interpret data from various sources
33///
34/// Almost all sources only present values as part of their records, but Kafka allows a key to be
35/// associated with each record, which has a possibly independent encoding.
36#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
37pub struct SourceDataEncoding<C: ConnectionAccess = InlinedConnection> {
38    pub key: Option<DataEncoding<C>>,
39    pub value: DataEncoding<C>,
40}
41
42impl<C: ConnectionAccess> SourceDataEncoding<C> {
43    pub fn desc(&self) -> Result<(Option<RelationDesc>, RelationDesc), anyhow::Error> {
44        Ok(match &self.key {
45            None => (None, self.value.desc()?),
46            Some(key) => (Some(key.desc()?), self.value.desc()?),
47        })
48    }
49}
50
51impl<R: ConnectionResolver> IntoInlineConnection<SourceDataEncoding, R>
52    for SourceDataEncoding<ReferencedConnection>
53{
54    fn into_inline_connection(self, r: R) -> SourceDataEncoding {
55        SourceDataEncoding {
56            key: self.key.map(|enc| enc.into_inline_connection(&r)),
57            value: self.value.into_inline_connection(&r),
58        }
59    }
60}
61
62impl RustType<ProtoSourceDataEncoding> for SourceDataEncoding {
63    fn into_proto(&self) -> ProtoSourceDataEncoding {
64        ProtoSourceDataEncoding {
65            key: self.key.into_proto(),
66            value: Some(self.value.into_proto()),
67        }
68    }
69
70    fn from_proto(proto: ProtoSourceDataEncoding) -> Result<Self, TryFromProtoError> {
71        Ok(SourceDataEncoding {
72            key: proto.key.into_rust()?,
73            value: proto.value.into_rust_if_some("ProtoKeyValue::value")?,
74        })
75    }
76}
77
78impl<C: ConnectionAccess> AlterCompatible for SourceDataEncoding<C> {
79    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
80        if self == other {
81            return Ok(());
82        }
83
84        let SourceDataEncoding { key, value } = self;
85
86        let compatibility_checks = [
87            (
88                match (key, &other.key) {
89                    (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
90                    (s, o) => s == o,
91                },
92                "key",
93            ),
94            (value.alter_compatible(id, &other.value).is_ok(), "value"),
95        ];
96
97        for (compatible, field) in compatibility_checks {
98            if !compatible {
99                tracing::warn!(
100                    "SourceDataEncoding incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
101                    self,
102                    other
103                );
104
105                return Err(AlterError { id });
106            }
107        }
108
109        Ok(())
110    }
111}
112
113/// A description of how each row should be decoded, from a string of bytes to a sequence of
114/// Differential updates.
115#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
116pub enum DataEncoding<C: ConnectionAccess = InlinedConnection> {
117    Avro(AvroEncoding<C>),
118    Protobuf(ProtobufEncoding),
119    Csv(CsvEncoding),
120    Regex(RegexEncoding),
121    Bytes,
122    Json,
123    Text,
124}
125
126impl<R: ConnectionResolver> IntoInlineConnection<DataEncoding, R>
127    for DataEncoding<ReferencedConnection>
128{
129    fn into_inline_connection(self, r: R) -> DataEncoding {
130        match self {
131            Self::Avro(conn) => DataEncoding::Avro(conn.into_inline_connection(r)),
132            Self::Protobuf(conn) => DataEncoding::Protobuf(conn),
133            Self::Csv(conn) => DataEncoding::Csv(conn),
134            Self::Regex(conn) => DataEncoding::Regex(conn),
135            Self::Bytes => DataEncoding::Bytes,
136            Self::Json => DataEncoding::Json,
137            Self::Text => DataEncoding::Text,
138        }
139    }
140}
141
142impl RustType<ProtoDataEncoding> for DataEncoding {
143    fn into_proto(&self) -> ProtoDataEncoding {
144        use proto_data_encoding::Kind;
145        ProtoDataEncoding {
146            kind: Some(match self {
147                DataEncoding::Avro(e) => Kind::Avro(e.into_proto()),
148                DataEncoding::Protobuf(e) => Kind::Protobuf(e.into_proto()),
149                DataEncoding::Csv(e) => Kind::Csv(e.into_proto()),
150                DataEncoding::Regex(e) => Kind::Regex(e.into_proto()),
151                DataEncoding::Bytes => Kind::Bytes(()),
152                DataEncoding::Text => Kind::Text(()),
153                DataEncoding::Json => Kind::Json(()),
154            }),
155        }
156    }
157
158    fn from_proto(proto: ProtoDataEncoding) -> Result<Self, TryFromProtoError> {
159        use proto_data_encoding::Kind;
160        let kind = proto
161            .kind
162            .ok_or_else(|| TryFromProtoError::missing_field("ProtoDataEncoding::kind"))?;
163        Ok(match kind {
164            Kind::Avro(e) => DataEncoding::Avro(e.into_rust()?),
165            Kind::Protobuf(e) => DataEncoding::Protobuf(e.into_rust()?),
166            Kind::Csv(e) => DataEncoding::Csv(e.into_rust()?),
167            Kind::Regex(e) => DataEncoding::Regex(e.into_rust()?),
168            Kind::Bytes(()) => DataEncoding::Bytes,
169            Kind::Text(()) => DataEncoding::Text,
170            Kind::Json(()) => DataEncoding::Json,
171        })
172    }
173}
174
175pub fn included_column_desc(included_columns: Vec<(&str, ColumnType)>) -> RelationDesc {
176    let mut desc = RelationDesc::builder();
177    for (name, ty) in included_columns {
178        desc = desc.with_column(name, ty);
179    }
180    desc.finish()
181}
182
183impl<C: ConnectionAccess> DataEncoding<C> {
184    /// A human-readable name for the type of encoding
185    pub fn type_(&self) -> &str {
186        match self {
187            Self::Avro(_) => "avro",
188            Self::Protobuf(_) => "protobuf",
189            Self::Csv(_) => "csv",
190            Self::Regex(_) => "regex",
191            Self::Bytes => "bytes",
192            Self::Json => "json",
193            Self::Text => "text",
194        }
195    }
196
197    /// Computes the [`RelationDesc`] for the relation specified by this
198    /// data encoding.
199    fn desc(&self) -> Result<RelationDesc, anyhow::Error> {
200        // Add columns for the data, based on the encoding format.
201        Ok(match self {
202            Self::Bytes => RelationDesc::builder()
203                .with_column("data", ScalarType::Bytes.nullable(false))
204                .finish(),
205            Self::Json => RelationDesc::builder()
206                .with_column("data", ScalarType::Jsonb.nullable(false))
207                .finish(),
208            Self::Avro(AvroEncoding { schema, .. }) => {
209                let parsed_schema = avro::parse_schema(schema).context("validating avro schema")?;
210                avro::schema_to_relationdesc(parsed_schema).context("validating avro schema")?
211            }
212            Self::Protobuf(ProtobufEncoding {
213                descriptors,
214                message_name,
215                confluent_wire_format: _,
216            }) => protobuf::DecodedDescriptors::from_bytes(descriptors, message_name.to_owned())?
217                .columns()
218                .iter()
219                .fold(RelationDesc::builder(), |desc, (name, ty)| {
220                    desc.with_column(name, ty.clone())
221                })
222                .finish(),
223            Self::Regex(RegexEncoding { regex }) => regex
224                .capture_names()
225                .enumerate()
226                // The first capture is the entire matched string. This will
227                // often not be useful, so skip it. If people want it they can
228                // just surround their entire regex in an explicit capture
229                // group.
230                .skip(1)
231                .fold(RelationDesc::builder(), |desc, (i, name)| {
232                    let name = match name {
233                        None => format!("column{}", i),
234                        Some(name) => name.to_owned(),
235                    };
236                    let ty = ScalarType::String.nullable(true);
237                    desc.with_column(name, ty)
238                })
239                .finish(),
240            Self::Csv(CsvEncoding { columns, .. }) => match columns {
241                ColumnSpec::Count(n) => (1..=*n)
242                    .fold(RelationDesc::builder(), |desc, i| {
243                        desc.with_column(format!("column{}", i), ScalarType::String.nullable(false))
244                    })
245                    .finish(),
246                ColumnSpec::Header { names } => names
247                    .iter()
248                    .map(|s| &**s)
249                    .fold(RelationDesc::builder(), |desc, name| {
250                        desc.with_column(name, ScalarType::String.nullable(false))
251                    })
252                    .finish(),
253            },
254            Self::Text => RelationDesc::builder()
255                .with_column("text", ScalarType::String.nullable(false))
256                .finish(),
257        })
258    }
259
260    pub fn op_name(&self) -> &'static str {
261        match self {
262            Self::Bytes => "Bytes",
263            Self::Json => "Json",
264            Self::Avro(_) => "Avro",
265            Self::Protobuf(_) => "Protobuf",
266            Self::Regex { .. } => "Regex",
267            Self::Csv(_) => "Csv",
268            Self::Text => "Text",
269        }
270    }
271}
272
273impl<C: ConnectionAccess> AlterCompatible for DataEncoding<C> {
274    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
275        if self == other {
276            return Ok(());
277        }
278
279        let compatible = match (self, other) {
280            (DataEncoding::Avro(avro), DataEncoding::Avro(other_avro)) => {
281                avro.alter_compatible(id, other_avro).is_ok()
282            }
283            (s, o) => s == o,
284        };
285
286        if !compatible {
287            tracing::warn!(
288                "DataEncoding incompatible :\nself:\n{:#?}\n\nother\n{:#?}",
289                self,
290                other
291            );
292
293            return Err(AlterError { id });
294        }
295
296        Ok(())
297    }
298}
299
300/// Encoding in Avro format.
301#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
302pub struct AvroEncoding<C: ConnectionAccess = InlinedConnection> {
303    pub schema: String,
304    pub csr_connection: Option<C::Csr>,
305    pub confluent_wire_format: bool,
306}
307
308impl<R: ConnectionResolver> IntoInlineConnection<AvroEncoding, R>
309    for AvroEncoding<ReferencedConnection>
310{
311    fn into_inline_connection(self, r: R) -> AvroEncoding {
312        let AvroEncoding {
313            schema,
314            csr_connection,
315            confluent_wire_format,
316        } = self;
317        AvroEncoding {
318            schema,
319            csr_connection: csr_connection.map(|csr| r.resolve_connection(csr).unwrap_csr()),
320            confluent_wire_format,
321        }
322    }
323}
324
325impl<C: ConnectionAccess> AlterCompatible for AvroEncoding<C> {
326    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
327        if self == other {
328            return Ok(());
329        }
330
331        let AvroEncoding {
332            schema,
333            csr_connection,
334            confluent_wire_format,
335        } = self;
336
337        let compatibility_checks = [
338            (schema == &other.schema, "schema"),
339            (
340                match (csr_connection, &other.csr_connection) {
341                    (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
342                    (s, o) => s == o,
343                },
344                "csr_connection",
345            ),
346            (
347                confluent_wire_format == &other.confluent_wire_format,
348                "confluent_wire_format",
349            ),
350        ];
351
352        for (compatible, field) in compatibility_checks {
353            if !compatible {
354                tracing::warn!(
355                    "AvroEncoding incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
356                    self,
357                    other
358                );
359
360                return Err(AlterError { id });
361            }
362        }
363
364        Ok(())
365    }
366}
367
368impl RustType<ProtoAvroEncoding> for AvroEncoding {
369    fn into_proto(&self) -> ProtoAvroEncoding {
370        ProtoAvroEncoding {
371            schema: self.schema.clone(),
372            csr_connection: self.csr_connection.into_proto(),
373            confluent_wire_format: self.confluent_wire_format,
374        }
375    }
376
377    fn from_proto(proto: ProtoAvroEncoding) -> Result<Self, TryFromProtoError> {
378        Ok(AvroEncoding {
379            schema: proto.schema,
380            csr_connection: proto.csr_connection.into_rust()?,
381            confluent_wire_format: proto.confluent_wire_format,
382        })
383    }
384}
385
386/// Encoding in Protobuf format.
387#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
388pub struct ProtobufEncoding {
389    pub descriptors: Vec<u8>,
390    pub message_name: String,
391    pub confluent_wire_format: bool,
392}
393
394impl RustType<ProtoProtobufEncoding> for ProtobufEncoding {
395    fn into_proto(&self) -> ProtoProtobufEncoding {
396        ProtoProtobufEncoding {
397            descriptors: self.descriptors.clone(),
398            message_name: self.message_name.clone(),
399            confluent_wire_format: self.confluent_wire_format,
400        }
401    }
402
403    fn from_proto(proto: ProtoProtobufEncoding) -> Result<Self, TryFromProtoError> {
404        Ok(ProtobufEncoding {
405            descriptors: proto.descriptors,
406            message_name: proto.message_name,
407            confluent_wire_format: proto.confluent_wire_format,
408        })
409    }
410}
411
412/// Arguments necessary to define how to decode from CSV format
413#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
414pub struct CsvEncoding {
415    pub columns: ColumnSpec,
416    pub delimiter: u8,
417}
418
419impl RustType<ProtoCsvEncoding> for CsvEncoding {
420    fn into_proto(&self) -> ProtoCsvEncoding {
421        ProtoCsvEncoding {
422            columns: Some(self.columns.into_proto()),
423            delimiter: self.delimiter.into_proto(),
424        }
425    }
426
427    fn from_proto(proto: ProtoCsvEncoding) -> Result<Self, TryFromProtoError> {
428        Ok(CsvEncoding {
429            columns: proto
430                .columns
431                .into_rust_if_some("ProtoCsvEncoding::columns")?,
432            delimiter: proto.delimiter.into_rust()?,
433        })
434    }
435}
436
437/// Determines the RelationDesc and decoding of CSV objects
438#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
439pub enum ColumnSpec {
440    /// The first row is not a header row, and all columns get default names like `columnN`.
441    Count(usize),
442    /// The first row is a header row and therefore does become data
443    ///
444    /// Each of the values in `names` becomes the default name of a column in the dataflow.
445    Header { names: Vec<String> },
446}
447
448impl RustType<ProtoColumnSpec> for ColumnSpec {
449    fn into_proto(&self) -> ProtoColumnSpec {
450        use proto_column_spec::{Kind, ProtoHeader};
451        ProtoColumnSpec {
452            kind: Some(match self {
453                ColumnSpec::Count(c) => Kind::Count(c.into_proto()),
454                ColumnSpec::Header { names } => Kind::Header(ProtoHeader {
455                    names: names.clone(),
456                }),
457            }),
458        }
459    }
460
461    fn from_proto(proto: ProtoColumnSpec) -> Result<Self, TryFromProtoError> {
462        use proto_column_spec::{Kind, ProtoHeader};
463        let kind = proto
464            .kind
465            .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnSpec::kind"))?;
466        Ok(match kind {
467            Kind::Count(c) => ColumnSpec::Count(c.into_rust()?),
468            Kind::Header(ProtoHeader { names }) => ColumnSpec::Header { names },
469        })
470    }
471}
472
473impl ColumnSpec {
474    /// The number of columns described by the column spec.
475    pub fn arity(&self) -> usize {
476        match self {
477            ColumnSpec::Count(n) => *n,
478            ColumnSpec::Header { names } => names.len(),
479        }
480    }
481
482    pub fn into_header_names(self) -> Option<Vec<String>> {
483        match self {
484            ColumnSpec::Count(_) => None,
485            ColumnSpec::Header { names } => Some(names),
486        }
487    }
488}
489
490#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
491pub struct RegexEncoding {
492    #[proptest(strategy = "any_regex()")]
493    pub regex: mz_repr::adt::regex::Regex,
494}
495
496impl RustType<ProtoRegexEncoding> for RegexEncoding {
497    fn into_proto(&self) -> ProtoRegexEncoding {
498        ProtoRegexEncoding {
499            regex: Some(self.regex.into_proto()),
500        }
501    }
502
503    fn from_proto(proto: ProtoRegexEncoding) -> Result<Self, TryFromProtoError> {
504        Ok(RegexEncoding {
505            regex: proto.regex.into_rust_if_some("ProtoRegexEncoding::regex")?,
506        })
507    }
508}