mz_storage_types/sources/
encoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to the *decoding* of data for sources.
11
12use anyhow::Context;
13use mz_interchange::{avro, protobuf};
14use mz_repr::{GlobalId, RelationDesc, SqlColumnType, SqlScalarType};
15use serde::{Deserialize, Serialize};
16
17use crate::AlterCompatible;
18use crate::connections::inline::{
19    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
20    ReferencedConnection,
21};
22use crate::controller::AlterError;
23
24/// A description of how to interpret data from various sources
25///
26/// Almost all sources only present values as part of their records, but Kafka allows a key to be
27/// associated with each record, which has a possibly independent encoding.
28#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
29pub struct SourceDataEncoding<C: ConnectionAccess = InlinedConnection> {
30    pub key: Option<DataEncoding<C>>,
31    pub value: DataEncoding<C>,
32}
33
34impl<C: ConnectionAccess> SourceDataEncoding<C> {
35    pub fn desc(&self) -> Result<(Option<RelationDesc>, RelationDesc), anyhow::Error> {
36        Ok(match &self.key {
37            None => (None, self.value.desc()?),
38            Some(key) => (Some(key.desc()?), self.value.desc()?),
39        })
40    }
41}
42
43impl<R: ConnectionResolver> IntoInlineConnection<SourceDataEncoding, R>
44    for SourceDataEncoding<ReferencedConnection>
45{
46    fn into_inline_connection(self, r: R) -> SourceDataEncoding {
47        SourceDataEncoding {
48            key: self.key.map(|enc| enc.into_inline_connection(&r)),
49            value: self.value.into_inline_connection(&r),
50        }
51    }
52}
53
54impl<C: ConnectionAccess> AlterCompatible for SourceDataEncoding<C> {
55    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
56        if self == other {
57            return Ok(());
58        }
59
60        let SourceDataEncoding { key, value } = self;
61
62        let compatibility_checks = [
63            (
64                match (key, &other.key) {
65                    (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
66                    (s, o) => s == o,
67                },
68                "key",
69            ),
70            (value.alter_compatible(id, &other.value).is_ok(), "value"),
71        ];
72
73        for (compatible, field) in compatibility_checks {
74            if !compatible {
75                tracing::warn!(
76                    "SourceDataEncoding incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
77                    self,
78                    other
79                );
80
81                return Err(AlterError { id });
82            }
83        }
84
85        Ok(())
86    }
87}
88
89/// A description of how each row should be decoded, from a string of bytes to a sequence of
90/// Differential updates.
91#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
92pub enum DataEncoding<C: ConnectionAccess = InlinedConnection> {
93    Avro(AvroEncoding<C>),
94    Protobuf(ProtobufEncoding),
95    Csv(CsvEncoding),
96    Regex(RegexEncoding),
97    Bytes,
98    Json,
99    Text,
100}
101
102impl<R: ConnectionResolver> IntoInlineConnection<DataEncoding, R>
103    for DataEncoding<ReferencedConnection>
104{
105    fn into_inline_connection(self, r: R) -> DataEncoding {
106        match self {
107            Self::Avro(conn) => DataEncoding::Avro(conn.into_inline_connection(r)),
108            Self::Protobuf(conn) => DataEncoding::Protobuf(conn),
109            Self::Csv(conn) => DataEncoding::Csv(conn),
110            Self::Regex(conn) => DataEncoding::Regex(conn),
111            Self::Bytes => DataEncoding::Bytes,
112            Self::Json => DataEncoding::Json,
113            Self::Text => DataEncoding::Text,
114        }
115    }
116}
117
118pub fn included_column_desc(included_columns: Vec<(&str, SqlColumnType)>) -> RelationDesc {
119    let mut desc = RelationDesc::builder();
120    for (name, ty) in included_columns {
121        desc = desc.with_column(name, ty);
122    }
123    desc.finish()
124}
125
126impl<C: ConnectionAccess> DataEncoding<C> {
127    /// A human-readable name for the type of encoding
128    pub fn type_(&self) -> &str {
129        match self {
130            Self::Avro(_) => "avro",
131            Self::Protobuf(_) => "protobuf",
132            Self::Csv(_) => "csv",
133            Self::Regex(_) => "regex",
134            Self::Bytes => "bytes",
135            Self::Json => "json",
136            Self::Text => "text",
137        }
138    }
139
140    /// Computes the [`RelationDesc`] for the relation specified by this
141    /// data encoding.
142    fn desc(&self) -> Result<RelationDesc, anyhow::Error> {
143        // Add columns for the data, based on the encoding format.
144        Ok(match self {
145            Self::Bytes => RelationDesc::builder()
146                .with_column("data", SqlScalarType::Bytes.nullable(false))
147                .finish(),
148            Self::Json => RelationDesc::builder()
149                .with_column("data", SqlScalarType::Jsonb.nullable(false))
150                .finish(),
151            Self::Avro(AvroEncoding { schema, .. }) => {
152                let parsed_schema = avro::parse_schema(schema).context("validating avro schema")?;
153                avro::schema_to_relationdesc(parsed_schema).context("validating avro schema")?
154            }
155            Self::Protobuf(ProtobufEncoding {
156                descriptors,
157                message_name,
158                confluent_wire_format: _,
159            }) => protobuf::DecodedDescriptors::from_bytes(descriptors, message_name.to_owned())?
160                .columns()
161                .iter()
162                .fold(RelationDesc::builder(), |desc, (name, ty)| {
163                    desc.with_column(name, ty.clone())
164                })
165                .finish(),
166            Self::Regex(RegexEncoding { regex }) => regex
167                .capture_names()
168                .enumerate()
169                // The first capture is the entire matched string. This will
170                // often not be useful, so skip it. If people want it they can
171                // just surround their entire regex in an explicit capture
172                // group.
173                .skip(1)
174                .fold(RelationDesc::builder(), |desc, (i, name)| {
175                    let name = match name {
176                        None => format!("column{}", i),
177                        Some(name) => name.to_owned(),
178                    };
179                    let ty = SqlScalarType::String.nullable(true);
180                    desc.with_column(name, ty)
181                })
182                .finish(),
183            Self::Csv(CsvEncoding { columns, .. }) => match columns {
184                ColumnSpec::Count(n) => (1..=*n)
185                    .fold(RelationDesc::builder(), |desc, i| {
186                        desc.with_column(
187                            format!("column{}", i),
188                            SqlScalarType::String.nullable(false),
189                        )
190                    })
191                    .finish(),
192                ColumnSpec::Header { names } => names
193                    .iter()
194                    .map(|s| &**s)
195                    .fold(RelationDesc::builder(), |desc, name| {
196                        desc.with_column(name, SqlScalarType::String.nullable(false))
197                    })
198                    .finish(),
199            },
200            Self::Text => RelationDesc::builder()
201                .with_column("text", SqlScalarType::String.nullable(false))
202                .finish(),
203        })
204    }
205
206    pub fn op_name(&self) -> &'static str {
207        match self {
208            Self::Bytes => "Bytes",
209            Self::Json => "Json",
210            Self::Avro(_) => "Avro",
211            Self::Protobuf(_) => "Protobuf",
212            Self::Regex { .. } => "Regex",
213            Self::Csv(_) => "Csv",
214            Self::Text => "Text",
215        }
216    }
217}
218
219impl<C: ConnectionAccess> AlterCompatible for DataEncoding<C> {
220    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
221        if self == other {
222            return Ok(());
223        }
224
225        let compatible = match (self, other) {
226            (DataEncoding::Avro(avro), DataEncoding::Avro(other_avro)) => {
227                avro.alter_compatible(id, other_avro).is_ok()
228            }
229            (s, o) => s == o,
230        };
231
232        if !compatible {
233            tracing::warn!(
234                "DataEncoding incompatible :\nself:\n{:#?}\n\nother\n{:#?}",
235                self,
236                other
237            );
238
239            return Err(AlterError { id });
240        }
241
242        Ok(())
243    }
244}
245
246/// Encoding in Avro format.
247#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
248pub struct AvroEncoding<C: ConnectionAccess = InlinedConnection> {
249    pub schema: String,
250    pub csr_connection: Option<C::Csr>,
251    pub confluent_wire_format: bool,
252}
253
254impl<R: ConnectionResolver> IntoInlineConnection<AvroEncoding, R>
255    for AvroEncoding<ReferencedConnection>
256{
257    fn into_inline_connection(self, r: R) -> AvroEncoding {
258        let AvroEncoding {
259            schema,
260            csr_connection,
261            confluent_wire_format,
262        } = self;
263        AvroEncoding {
264            schema,
265            csr_connection: csr_connection.map(|csr| r.resolve_connection(csr).unwrap_csr()),
266            confluent_wire_format,
267        }
268    }
269}
270
271impl<C: ConnectionAccess> AlterCompatible for AvroEncoding<C> {
272    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
273        if self == other {
274            return Ok(());
275        }
276
277        let AvroEncoding {
278            schema,
279            csr_connection,
280            confluent_wire_format,
281        } = self;
282
283        let compatibility_checks = [
284            (schema == &other.schema, "schema"),
285            (
286                match (csr_connection, &other.csr_connection) {
287                    (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
288                    (s, o) => s == o,
289                },
290                "csr_connection",
291            ),
292            (
293                confluent_wire_format == &other.confluent_wire_format,
294                "confluent_wire_format",
295            ),
296        ];
297
298        for (compatible, field) in compatibility_checks {
299            if !compatible {
300                tracing::warn!(
301                    "AvroEncoding incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
302                    self,
303                    other
304                );
305
306                return Err(AlterError { id });
307            }
308        }
309
310        Ok(())
311    }
312}
313
314/// Encoding in Protobuf format.
315#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
316pub struct ProtobufEncoding {
317    pub descriptors: Vec<u8>,
318    pub message_name: String,
319    pub confluent_wire_format: bool,
320}
321
322/// Arguments necessary to define how to decode from CSV format
323#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
324pub struct CsvEncoding {
325    pub columns: ColumnSpec,
326    pub delimiter: u8,
327}
328
329/// Determines the RelationDesc and decoding of CSV objects
330#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
331pub enum ColumnSpec {
332    /// The first row is not a header row, and all columns get default names like `columnN`.
333    Count(usize),
334    /// The first row is a header row and therefore does become data
335    ///
336    /// Each of the values in `names` becomes the default name of a column in the dataflow.
337    Header { names: Vec<String> },
338}
339
340impl ColumnSpec {
341    /// The number of columns described by the column spec.
342    pub fn arity(&self) -> usize {
343        match self {
344            ColumnSpec::Count(n) => *n,
345            ColumnSpec::Header { names } => names.len(),
346        }
347    }
348
349    pub fn into_header_names(self) -> Option<Vec<String>> {
350        match self {
351            ColumnSpec::Count(_) => None,
352            ColumnSpec::Header { names } => Some(names),
353        }
354    }
355}
356
357#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
358pub struct RegexEncoding {
359    pub regex: mz_repr::adt::regex::Regex,
360}