1use anyhow::Context;
13use mz_interchange::{avro, protobuf};
14use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
15use mz_repr::adt::regex::any_regex;
16use mz_repr::{ColumnType, GlobalId, RelationDesc, ScalarType};
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize};
19
20use crate::AlterCompatible;
21use crate::connections::inline::{
22 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
23 ReferencedConnection,
24};
25use crate::controller::AlterError;
26
27include!(concat!(
28 env!("OUT_DIR"),
29 "/mz_storage_types.sources.encoding.rs"
30));
31
32#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
37pub struct SourceDataEncoding<C: ConnectionAccess = InlinedConnection> {
38 pub key: Option<DataEncoding<C>>,
39 pub value: DataEncoding<C>,
40}
41
42impl<C: ConnectionAccess> SourceDataEncoding<C> {
43 pub fn desc(&self) -> Result<(Option<RelationDesc>, RelationDesc), anyhow::Error> {
44 Ok(match &self.key {
45 None => (None, self.value.desc()?),
46 Some(key) => (Some(key.desc()?), self.value.desc()?),
47 })
48 }
49}
50
51impl<R: ConnectionResolver> IntoInlineConnection<SourceDataEncoding, R>
52 for SourceDataEncoding<ReferencedConnection>
53{
54 fn into_inline_connection(self, r: R) -> SourceDataEncoding {
55 SourceDataEncoding {
56 key: self.key.map(|enc| enc.into_inline_connection(&r)),
57 value: self.value.into_inline_connection(&r),
58 }
59 }
60}
61
62impl RustType<ProtoSourceDataEncoding> for SourceDataEncoding {
63 fn into_proto(&self) -> ProtoSourceDataEncoding {
64 ProtoSourceDataEncoding {
65 key: self.key.into_proto(),
66 value: Some(self.value.into_proto()),
67 }
68 }
69
70 fn from_proto(proto: ProtoSourceDataEncoding) -> Result<Self, TryFromProtoError> {
71 Ok(SourceDataEncoding {
72 key: proto.key.into_rust()?,
73 value: proto.value.into_rust_if_some("ProtoKeyValue::value")?,
74 })
75 }
76}
77
78impl<C: ConnectionAccess> AlterCompatible for SourceDataEncoding<C> {
79 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
80 if self == other {
81 return Ok(());
82 }
83
84 let SourceDataEncoding { key, value } = self;
85
86 let compatibility_checks = [
87 (
88 match (key, &other.key) {
89 (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
90 (s, o) => s == o,
91 },
92 "key",
93 ),
94 (value.alter_compatible(id, &other.value).is_ok(), "value"),
95 ];
96
97 for (compatible, field) in compatibility_checks {
98 if !compatible {
99 tracing::warn!(
100 "SourceDataEncoding incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
101 self,
102 other
103 );
104
105 return Err(AlterError { id });
106 }
107 }
108
109 Ok(())
110 }
111}
112
113#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
116pub enum DataEncoding<C: ConnectionAccess = InlinedConnection> {
117 Avro(AvroEncoding<C>),
118 Protobuf(ProtobufEncoding),
119 Csv(CsvEncoding),
120 Regex(RegexEncoding),
121 Bytes,
122 Json,
123 Text,
124}
125
126impl<R: ConnectionResolver> IntoInlineConnection<DataEncoding, R>
127 for DataEncoding<ReferencedConnection>
128{
129 fn into_inline_connection(self, r: R) -> DataEncoding {
130 match self {
131 Self::Avro(conn) => DataEncoding::Avro(conn.into_inline_connection(r)),
132 Self::Protobuf(conn) => DataEncoding::Protobuf(conn),
133 Self::Csv(conn) => DataEncoding::Csv(conn),
134 Self::Regex(conn) => DataEncoding::Regex(conn),
135 Self::Bytes => DataEncoding::Bytes,
136 Self::Json => DataEncoding::Json,
137 Self::Text => DataEncoding::Text,
138 }
139 }
140}
141
142impl RustType<ProtoDataEncoding> for DataEncoding {
143 fn into_proto(&self) -> ProtoDataEncoding {
144 use proto_data_encoding::Kind;
145 ProtoDataEncoding {
146 kind: Some(match self {
147 DataEncoding::Avro(e) => Kind::Avro(e.into_proto()),
148 DataEncoding::Protobuf(e) => Kind::Protobuf(e.into_proto()),
149 DataEncoding::Csv(e) => Kind::Csv(e.into_proto()),
150 DataEncoding::Regex(e) => Kind::Regex(e.into_proto()),
151 DataEncoding::Bytes => Kind::Bytes(()),
152 DataEncoding::Text => Kind::Text(()),
153 DataEncoding::Json => Kind::Json(()),
154 }),
155 }
156 }
157
158 fn from_proto(proto: ProtoDataEncoding) -> Result<Self, TryFromProtoError> {
159 use proto_data_encoding::Kind;
160 let kind = proto
161 .kind
162 .ok_or_else(|| TryFromProtoError::missing_field("ProtoDataEncoding::kind"))?;
163 Ok(match kind {
164 Kind::Avro(e) => DataEncoding::Avro(e.into_rust()?),
165 Kind::Protobuf(e) => DataEncoding::Protobuf(e.into_rust()?),
166 Kind::Csv(e) => DataEncoding::Csv(e.into_rust()?),
167 Kind::Regex(e) => DataEncoding::Regex(e.into_rust()?),
168 Kind::Bytes(()) => DataEncoding::Bytes,
169 Kind::Text(()) => DataEncoding::Text,
170 Kind::Json(()) => DataEncoding::Json,
171 })
172 }
173}
174
175pub fn included_column_desc(included_columns: Vec<(&str, ColumnType)>) -> RelationDesc {
176 let mut desc = RelationDesc::builder();
177 for (name, ty) in included_columns {
178 desc = desc.with_column(name, ty);
179 }
180 desc.finish()
181}
182
183impl<C: ConnectionAccess> DataEncoding<C> {
184 pub fn type_(&self) -> &str {
186 match self {
187 Self::Avro(_) => "avro",
188 Self::Protobuf(_) => "protobuf",
189 Self::Csv(_) => "csv",
190 Self::Regex(_) => "regex",
191 Self::Bytes => "bytes",
192 Self::Json => "json",
193 Self::Text => "text",
194 }
195 }
196
197 fn desc(&self) -> Result<RelationDesc, anyhow::Error> {
200 Ok(match self {
202 Self::Bytes => RelationDesc::builder()
203 .with_column("data", ScalarType::Bytes.nullable(false))
204 .finish(),
205 Self::Json => RelationDesc::builder()
206 .with_column("data", ScalarType::Jsonb.nullable(false))
207 .finish(),
208 Self::Avro(AvroEncoding { schema, .. }) => {
209 let parsed_schema = avro::parse_schema(schema).context("validating avro schema")?;
210 avro::schema_to_relationdesc(parsed_schema).context("validating avro schema")?
211 }
212 Self::Protobuf(ProtobufEncoding {
213 descriptors,
214 message_name,
215 confluent_wire_format: _,
216 }) => protobuf::DecodedDescriptors::from_bytes(descriptors, message_name.to_owned())?
217 .columns()
218 .iter()
219 .fold(RelationDesc::builder(), |desc, (name, ty)| {
220 desc.with_column(name, ty.clone())
221 })
222 .finish(),
223 Self::Regex(RegexEncoding { regex }) => regex
224 .capture_names()
225 .enumerate()
226 .skip(1)
231 .fold(RelationDesc::builder(), |desc, (i, name)| {
232 let name = match name {
233 None => format!("column{}", i),
234 Some(name) => name.to_owned(),
235 };
236 let ty = ScalarType::String.nullable(true);
237 desc.with_column(name, ty)
238 })
239 .finish(),
240 Self::Csv(CsvEncoding { columns, .. }) => match columns {
241 ColumnSpec::Count(n) => (1..=*n)
242 .fold(RelationDesc::builder(), |desc, i| {
243 desc.with_column(format!("column{}", i), ScalarType::String.nullable(false))
244 })
245 .finish(),
246 ColumnSpec::Header { names } => names
247 .iter()
248 .map(|s| &**s)
249 .fold(RelationDesc::builder(), |desc, name| {
250 desc.with_column(name, ScalarType::String.nullable(false))
251 })
252 .finish(),
253 },
254 Self::Text => RelationDesc::builder()
255 .with_column("text", ScalarType::String.nullable(false))
256 .finish(),
257 })
258 }
259
260 pub fn op_name(&self) -> &'static str {
261 match self {
262 Self::Bytes => "Bytes",
263 Self::Json => "Json",
264 Self::Avro(_) => "Avro",
265 Self::Protobuf(_) => "Protobuf",
266 Self::Regex { .. } => "Regex",
267 Self::Csv(_) => "Csv",
268 Self::Text => "Text",
269 }
270 }
271}
272
273impl<C: ConnectionAccess> AlterCompatible for DataEncoding<C> {
274 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
275 if self == other {
276 return Ok(());
277 }
278
279 let compatible = match (self, other) {
280 (DataEncoding::Avro(avro), DataEncoding::Avro(other_avro)) => {
281 avro.alter_compatible(id, other_avro).is_ok()
282 }
283 (s, o) => s == o,
284 };
285
286 if !compatible {
287 tracing::warn!(
288 "DataEncoding incompatible :\nself:\n{:#?}\n\nother\n{:#?}",
289 self,
290 other
291 );
292
293 return Err(AlterError { id });
294 }
295
296 Ok(())
297 }
298}
299
300#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
302pub struct AvroEncoding<C: ConnectionAccess = InlinedConnection> {
303 pub schema: String,
304 pub csr_connection: Option<C::Csr>,
305 pub confluent_wire_format: bool,
306}
307
308impl<R: ConnectionResolver> IntoInlineConnection<AvroEncoding, R>
309 for AvroEncoding<ReferencedConnection>
310{
311 fn into_inline_connection(self, r: R) -> AvroEncoding {
312 let AvroEncoding {
313 schema,
314 csr_connection,
315 confluent_wire_format,
316 } = self;
317 AvroEncoding {
318 schema,
319 csr_connection: csr_connection.map(|csr| r.resolve_connection(csr).unwrap_csr()),
320 confluent_wire_format,
321 }
322 }
323}
324
325impl<C: ConnectionAccess> AlterCompatible for AvroEncoding<C> {
326 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
327 if self == other {
328 return Ok(());
329 }
330
331 let AvroEncoding {
332 schema,
333 csr_connection,
334 confluent_wire_format,
335 } = self;
336
337 let compatibility_checks = [
338 (schema == &other.schema, "schema"),
339 (
340 match (csr_connection, &other.csr_connection) {
341 (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
342 (s, o) => s == o,
343 },
344 "csr_connection",
345 ),
346 (
347 confluent_wire_format == &other.confluent_wire_format,
348 "confluent_wire_format",
349 ),
350 ];
351
352 for (compatible, field) in compatibility_checks {
353 if !compatible {
354 tracing::warn!(
355 "AvroEncoding incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
356 self,
357 other
358 );
359
360 return Err(AlterError { id });
361 }
362 }
363
364 Ok(())
365 }
366}
367
368impl RustType<ProtoAvroEncoding> for AvroEncoding {
369 fn into_proto(&self) -> ProtoAvroEncoding {
370 ProtoAvroEncoding {
371 schema: self.schema.clone(),
372 csr_connection: self.csr_connection.into_proto(),
373 confluent_wire_format: self.confluent_wire_format,
374 }
375 }
376
377 fn from_proto(proto: ProtoAvroEncoding) -> Result<Self, TryFromProtoError> {
378 Ok(AvroEncoding {
379 schema: proto.schema,
380 csr_connection: proto.csr_connection.into_rust()?,
381 confluent_wire_format: proto.confluent_wire_format,
382 })
383 }
384}
385
386#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
388pub struct ProtobufEncoding {
389 pub descriptors: Vec<u8>,
390 pub message_name: String,
391 pub confluent_wire_format: bool,
392}
393
394impl RustType<ProtoProtobufEncoding> for ProtobufEncoding {
395 fn into_proto(&self) -> ProtoProtobufEncoding {
396 ProtoProtobufEncoding {
397 descriptors: self.descriptors.clone(),
398 message_name: self.message_name.clone(),
399 confluent_wire_format: self.confluent_wire_format,
400 }
401 }
402
403 fn from_proto(proto: ProtoProtobufEncoding) -> Result<Self, TryFromProtoError> {
404 Ok(ProtobufEncoding {
405 descriptors: proto.descriptors,
406 message_name: proto.message_name,
407 confluent_wire_format: proto.confluent_wire_format,
408 })
409 }
410}
411
412#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
414pub struct CsvEncoding {
415 pub columns: ColumnSpec,
416 pub delimiter: u8,
417}
418
419impl RustType<ProtoCsvEncoding> for CsvEncoding {
420 fn into_proto(&self) -> ProtoCsvEncoding {
421 ProtoCsvEncoding {
422 columns: Some(self.columns.into_proto()),
423 delimiter: self.delimiter.into_proto(),
424 }
425 }
426
427 fn from_proto(proto: ProtoCsvEncoding) -> Result<Self, TryFromProtoError> {
428 Ok(CsvEncoding {
429 columns: proto
430 .columns
431 .into_rust_if_some("ProtoCsvEncoding::columns")?,
432 delimiter: proto.delimiter.into_rust()?,
433 })
434 }
435}
436
437#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
439pub enum ColumnSpec {
440 Count(usize),
442 Header { names: Vec<String> },
446}
447
448impl RustType<ProtoColumnSpec> for ColumnSpec {
449 fn into_proto(&self) -> ProtoColumnSpec {
450 use proto_column_spec::{Kind, ProtoHeader};
451 ProtoColumnSpec {
452 kind: Some(match self {
453 ColumnSpec::Count(c) => Kind::Count(c.into_proto()),
454 ColumnSpec::Header { names } => Kind::Header(ProtoHeader {
455 names: names.clone(),
456 }),
457 }),
458 }
459 }
460
461 fn from_proto(proto: ProtoColumnSpec) -> Result<Self, TryFromProtoError> {
462 use proto_column_spec::{Kind, ProtoHeader};
463 let kind = proto
464 .kind
465 .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnSpec::kind"))?;
466 Ok(match kind {
467 Kind::Count(c) => ColumnSpec::Count(c.into_rust()?),
468 Kind::Header(ProtoHeader { names }) => ColumnSpec::Header { names },
469 })
470 }
471}
472
473impl ColumnSpec {
474 pub fn arity(&self) -> usize {
476 match self {
477 ColumnSpec::Count(n) => *n,
478 ColumnSpec::Header { names } => names.len(),
479 }
480 }
481
482 pub fn into_header_names(self) -> Option<Vec<String>> {
483 match self {
484 ColumnSpec::Count(_) => None,
485 ColumnSpec::Header { names } => Some(names),
486 }
487 }
488}
489
490#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
491pub struct RegexEncoding {
492 #[proptest(strategy = "any_regex()")]
493 pub regex: mz_repr::adt::regex::Regex,
494}
495
496impl RustType<ProtoRegexEncoding> for RegexEncoding {
497 fn into_proto(&self) -> ProtoRegexEncoding {
498 ProtoRegexEncoding {
499 regex: Some(self.regex.into_proto()),
500 }
501 }
502
503 fn from_proto(proto: ProtoRegexEncoding) -> Result<Self, TryFromProtoError> {
504 Ok(RegexEncoding {
505 regex: proto.regex.into_rust_if_some("ProtoRegexEncoding::regex")?,
506 })
507 }
508}