1use anyhow::Context;
13use mz_interchange::{avro, protobuf};
14use mz_repr::{GlobalId, RelationDesc, SqlColumnType, SqlScalarType};
15use serde::{Deserialize, Serialize};
16
17use crate::AlterCompatible;
18use crate::connections::inline::{
19 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
20 ReferencedConnection,
21};
22use crate::controller::AlterError;
23
24#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
29pub struct SourceDataEncoding<C: ConnectionAccess = InlinedConnection> {
30 pub key: Option<DataEncoding<C>>,
31 pub value: DataEncoding<C>,
32}
33
34impl<C: ConnectionAccess> SourceDataEncoding<C> {
35 pub fn desc(&self) -> Result<(Option<RelationDesc>, RelationDesc), anyhow::Error> {
36 Ok(match &self.key {
37 None => (None, self.value.desc()?),
38 Some(key) => (Some(key.desc()?), self.value.desc()?),
39 })
40 }
41}
42
43impl<R: ConnectionResolver> IntoInlineConnection<SourceDataEncoding, R>
44 for SourceDataEncoding<ReferencedConnection>
45{
46 fn into_inline_connection(self, r: R) -> SourceDataEncoding {
47 SourceDataEncoding {
48 key: self.key.map(|enc| enc.into_inline_connection(&r)),
49 value: self.value.into_inline_connection(&r),
50 }
51 }
52}
53
54impl<C: ConnectionAccess> AlterCompatible for SourceDataEncoding<C> {
55 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
56 if self == other {
57 return Ok(());
58 }
59
60 let SourceDataEncoding { key, value } = self;
61
62 let compatibility_checks = [
63 (
64 match (key, &other.key) {
65 (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
66 (s, o) => s == o,
67 },
68 "key",
69 ),
70 (value.alter_compatible(id, &other.value).is_ok(), "value"),
71 ];
72
73 for (compatible, field) in compatibility_checks {
74 if !compatible {
75 tracing::warn!(
76 "SourceDataEncoding incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
77 self,
78 other
79 );
80
81 return Err(AlterError { id });
82 }
83 }
84
85 Ok(())
86 }
87}
88
89#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
92pub enum DataEncoding<C: ConnectionAccess = InlinedConnection> {
93 Avro(AvroEncoding<C>),
94 Protobuf(ProtobufEncoding),
95 Csv(CsvEncoding),
96 Regex(RegexEncoding),
97 Bytes,
98 Json,
99 Text,
100}
101
102impl<R: ConnectionResolver> IntoInlineConnection<DataEncoding, R>
103 for DataEncoding<ReferencedConnection>
104{
105 fn into_inline_connection(self, r: R) -> DataEncoding {
106 match self {
107 Self::Avro(conn) => DataEncoding::Avro(conn.into_inline_connection(r)),
108 Self::Protobuf(conn) => DataEncoding::Protobuf(conn),
109 Self::Csv(conn) => DataEncoding::Csv(conn),
110 Self::Regex(conn) => DataEncoding::Regex(conn),
111 Self::Bytes => DataEncoding::Bytes,
112 Self::Json => DataEncoding::Json,
113 Self::Text => DataEncoding::Text,
114 }
115 }
116}
117
118pub fn included_column_desc(included_columns: Vec<(&str, SqlColumnType)>) -> RelationDesc {
119 let mut desc = RelationDesc::builder();
120 for (name, ty) in included_columns {
121 desc = desc.with_column(name, ty);
122 }
123 desc.finish()
124}
125
126impl<C: ConnectionAccess> DataEncoding<C> {
127 pub fn type_(&self) -> &str {
129 match self {
130 Self::Avro(_) => "avro",
131 Self::Protobuf(_) => "protobuf",
132 Self::Csv(_) => "csv",
133 Self::Regex(_) => "regex",
134 Self::Bytes => "bytes",
135 Self::Json => "json",
136 Self::Text => "text",
137 }
138 }
139
140 fn desc(&self) -> Result<RelationDesc, anyhow::Error> {
143 Ok(match self {
145 Self::Bytes => RelationDesc::builder()
146 .with_column("data", SqlScalarType::Bytes.nullable(false))
147 .finish(),
148 Self::Json => RelationDesc::builder()
149 .with_column("data", SqlScalarType::Jsonb.nullable(false))
150 .finish(),
151 Self::Avro(AvroEncoding { schema, .. }) => {
152 let parsed_schema = avro::parse_schema(schema).context("validating avro schema")?;
153 avro::schema_to_relationdesc(parsed_schema).context("validating avro schema")?
154 }
155 Self::Protobuf(ProtobufEncoding {
156 descriptors,
157 message_name,
158 confluent_wire_format: _,
159 }) => protobuf::DecodedDescriptors::from_bytes(descriptors, message_name.to_owned())?
160 .columns()
161 .iter()
162 .fold(RelationDesc::builder(), |desc, (name, ty)| {
163 desc.with_column(name, ty.clone())
164 })
165 .finish(),
166 Self::Regex(RegexEncoding { regex }) => regex
167 .capture_names()
168 .enumerate()
169 .skip(1)
174 .fold(RelationDesc::builder(), |desc, (i, name)| {
175 let name = match name {
176 None => format!("column{}", i),
177 Some(name) => name.to_owned(),
178 };
179 let ty = SqlScalarType::String.nullable(true);
180 desc.with_column(name, ty)
181 })
182 .finish(),
183 Self::Csv(CsvEncoding { columns, .. }) => match columns {
184 ColumnSpec::Count(n) => (1..=*n)
185 .fold(RelationDesc::builder(), |desc, i| {
186 desc.with_column(
187 format!("column{}", i),
188 SqlScalarType::String.nullable(false),
189 )
190 })
191 .finish(),
192 ColumnSpec::Header { names } => names
193 .iter()
194 .map(|s| &**s)
195 .fold(RelationDesc::builder(), |desc, name| {
196 desc.with_column(name, SqlScalarType::String.nullable(false))
197 })
198 .finish(),
199 },
200 Self::Text => RelationDesc::builder()
201 .with_column("text", SqlScalarType::String.nullable(false))
202 .finish(),
203 })
204 }
205
206 pub fn op_name(&self) -> &'static str {
207 match self {
208 Self::Bytes => "Bytes",
209 Self::Json => "Json",
210 Self::Avro(_) => "Avro",
211 Self::Protobuf(_) => "Protobuf",
212 Self::Regex { .. } => "Regex",
213 Self::Csv(_) => "Csv",
214 Self::Text => "Text",
215 }
216 }
217}
218
219impl<C: ConnectionAccess> AlterCompatible for DataEncoding<C> {
220 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
221 if self == other {
222 return Ok(());
223 }
224
225 let compatible = match (self, other) {
226 (DataEncoding::Avro(avro), DataEncoding::Avro(other_avro)) => {
227 avro.alter_compatible(id, other_avro).is_ok()
228 }
229 (s, o) => s == o,
230 };
231
232 if !compatible {
233 tracing::warn!(
234 "DataEncoding incompatible :\nself:\n{:#?}\n\nother\n{:#?}",
235 self,
236 other
237 );
238
239 return Err(AlterError { id });
240 }
241
242 Ok(())
243 }
244}
245
246#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
248pub struct AvroEncoding<C: ConnectionAccess = InlinedConnection> {
249 pub schema: String,
250 pub csr_connection: Option<C::Csr>,
251 pub confluent_wire_format: bool,
252}
253
254impl<R: ConnectionResolver> IntoInlineConnection<AvroEncoding, R>
255 for AvroEncoding<ReferencedConnection>
256{
257 fn into_inline_connection(self, r: R) -> AvroEncoding {
258 let AvroEncoding {
259 schema,
260 csr_connection,
261 confluent_wire_format,
262 } = self;
263 AvroEncoding {
264 schema,
265 csr_connection: csr_connection.map(|csr| r.resolve_connection(csr).unwrap_csr()),
266 confluent_wire_format,
267 }
268 }
269}
270
271impl<C: ConnectionAccess> AlterCompatible for AvroEncoding<C> {
272 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
273 if self == other {
274 return Ok(());
275 }
276
277 let AvroEncoding {
278 schema,
279 csr_connection,
280 confluent_wire_format,
281 } = self;
282
283 let compatibility_checks = [
284 (schema == &other.schema, "schema"),
285 (
286 match (csr_connection, &other.csr_connection) {
287 (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
288 (s, o) => s == o,
289 },
290 "csr_connection",
291 ),
292 (
293 confluent_wire_format == &other.confluent_wire_format,
294 "confluent_wire_format",
295 ),
296 ];
297
298 for (compatible, field) in compatibility_checks {
299 if !compatible {
300 tracing::warn!(
301 "AvroEncoding incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
302 self,
303 other
304 );
305
306 return Err(AlterError { id });
307 }
308 }
309
310 Ok(())
311 }
312}
313
314#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
316pub struct ProtobufEncoding {
317 pub descriptors: Vec<u8>,
318 pub message_name: String,
319 pub confluent_wire_format: bool,
320}
321
322#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
324pub struct CsvEncoding {
325 pub columns: ColumnSpec,
326 pub delimiter: u8,
327}
328
329#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
331pub enum ColumnSpec {
332 Count(usize),
334 Header { names: Vec<String> },
338}
339
340impl ColumnSpec {
341 pub fn arity(&self) -> usize {
343 match self {
344 ColumnSpec::Count(n) => *n,
345 ColumnSpec::Header { names } => names.len(),
346 }
347 }
348
349 pub fn into_header_names(self) -> Option<Vec<String>> {
350 match self {
351 ColumnSpec::Count(_) => None,
352 ColumnSpec::Header { names } => Some(names),
353 }
354 }
355}
356
357#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
358pub struct RegexEncoding {
359 pub regex: mz_repr::adt::regex::Regex,
360}