1use anyhow::{anyhow, bail};
13use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
14use mz_repr::{ColumnType, RelationDesc, RelationType, ScalarType};
15use proptest::prelude::any;
16use proptest_derive::Arbitrary;
17use serde::{Deserialize, Serialize};
18
19include!(concat!(
20 env!("OUT_DIR"),
21 "/mz_storage_types.sources.envelope.rs"
22));
23
24#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
31pub enum SourceEnvelope {
32 None(NoneEnvelope),
40 Upsert(UpsertEnvelope),
44 CdcV2,
47}
48
49impl RustType<ProtoSourceEnvelope> for SourceEnvelope {
50 fn into_proto(&self) -> ProtoSourceEnvelope {
51 use proto_source_envelope::Kind;
52 ProtoSourceEnvelope {
53 kind: Some(match self {
54 SourceEnvelope::None(e) => Kind::None(e.into_proto()),
55 SourceEnvelope::Upsert(e) => Kind::Upsert(e.into_proto()),
56 SourceEnvelope::CdcV2 => Kind::CdcV2(()),
57 }),
58 }
59 }
60
61 fn from_proto(proto: ProtoSourceEnvelope) -> Result<Self, TryFromProtoError> {
62 use proto_source_envelope::Kind;
63 let kind = proto
64 .kind
65 .ok_or_else(|| TryFromProtoError::missing_field("ProtoSourceEnvelope::kind"))?;
66 Ok(match kind {
67 Kind::None(e) => SourceEnvelope::None(e.into_rust()?),
68 Kind::Upsert(e) => SourceEnvelope::Upsert(e.into_rust()?),
69 Kind::CdcV2(()) => SourceEnvelope::CdcV2,
70 })
71 }
72}
73
74#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
79pub enum UnplannedSourceEnvelope {
80 None(KeyEnvelope),
81 Upsert { style: UpsertStyle },
82 CdcV2,
83}
84
85#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
86pub struct NoneEnvelope {
87 pub key_envelope: KeyEnvelope,
88 pub key_arity: usize,
89}
90
91impl RustType<ProtoNoneEnvelope> for NoneEnvelope {
92 fn into_proto(&self) -> ProtoNoneEnvelope {
93 ProtoNoneEnvelope {
94 key_envelope: Some(self.key_envelope.into_proto()),
95 key_arity: self.key_arity.into_proto(),
96 }
97 }
98
99 fn from_proto(proto: ProtoNoneEnvelope) -> Result<Self, TryFromProtoError> {
100 Ok(NoneEnvelope {
101 key_envelope: proto
102 .key_envelope
103 .into_rust_if_some("ProtoNoneEnvelope::key_envelope")?,
104 key_arity: proto.key_arity.into_rust()?,
105 })
106 }
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
110pub struct UpsertEnvelope {
111 pub source_arity: usize,
113 pub style: UpsertStyle,
115 #[proptest(strategy = "proptest::collection::vec(any::<usize>(), 0..4)")]
118 pub key_indices: Vec<usize>,
119}
120
121impl RustType<ProtoUpsertEnvelope> for UpsertEnvelope {
122 fn into_proto(&self) -> ProtoUpsertEnvelope {
123 ProtoUpsertEnvelope {
124 source_arity: self.source_arity.into_proto(),
125 style: Some(self.style.into_proto()),
126 key_indices: self.key_indices.into_proto(),
127 }
128 }
129
130 fn from_proto(proto: ProtoUpsertEnvelope) -> Result<Self, TryFromProtoError> {
131 Ok(UpsertEnvelope {
132 source_arity: proto.source_arity.into_rust()?,
133 style: proto
134 .style
135 .into_rust_if_some("ProtoUpsertEnvelope::style")?,
136 key_indices: proto.key_indices.into_rust()?,
137 })
138 }
139}
140
141#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
142pub enum UpsertStyle {
143 Default(KeyEnvelope),
146 Debezium { after_idx: usize },
148 ValueErrInline {
152 key_envelope: KeyEnvelope,
153 error_column: String,
154 },
155}
156
157impl RustType<ProtoUpsertStyle> for UpsertStyle {
158 fn into_proto(&self) -> ProtoUpsertStyle {
159 use proto_upsert_style::{Kind, ProtoDebezium, ProtoValueErrInline};
160 ProtoUpsertStyle {
161 kind: Some(match self {
162 UpsertStyle::Default(e) => Kind::Default(e.into_proto()),
163 UpsertStyle::Debezium { after_idx } => Kind::Debezium(ProtoDebezium {
164 after_idx: after_idx.into_proto(),
165 }),
166 UpsertStyle::ValueErrInline {
167 key_envelope,
168 error_column,
169 } => Kind::ValueErrorInline(ProtoValueErrInline {
170 key_envelope: Some(key_envelope.into_proto()),
171 error_column: error_column.clone(),
172 }),
173 }),
174 }
175 }
176
177 fn from_proto(proto: ProtoUpsertStyle) -> Result<Self, TryFromProtoError> {
178 use proto_upsert_style::Kind;
179 let kind = proto
180 .kind
181 .ok_or_else(|| TryFromProtoError::missing_field("ProtoUpsertStyle::kind"))?;
182 Ok(match kind {
183 Kind::Default(e) => UpsertStyle::Default(e.into_rust()?),
184 Kind::Debezium(d) => UpsertStyle::Debezium {
185 after_idx: d.after_idx.into_rust()?,
186 },
187 Kind::ValueErrorInline(e) => UpsertStyle::ValueErrInline {
188 key_envelope: e
189 .key_envelope
190 .ok_or_else(|| {
191 TryFromProtoError::missing_field("ProtoValueErrInline::key_envelope")
192 })?
193 .into_rust()?,
194 error_column: e.error_column.clone(),
195 },
196 })
197 }
198}
199
200fn match_key_indices(
206 key_desc: &RelationDesc,
207 value_desc: &RelationDesc,
208) -> anyhow::Result<Vec<usize>> {
209 let mut indices = Vec::new();
210 for (name, key_type) in key_desc.iter() {
211 let (index, value_type) = value_desc
212 .get_by_name(name)
213 .ok_or_else(|| anyhow!("Value schema missing primary key column: {}", name))?;
214
215 if key_type == value_type {
216 indices.push(index);
217 } else {
218 bail!(
219 "key and value column types do not match: key {:?} vs. value {:?}",
220 key_type,
221 value_type
222 );
223 }
224 }
225 Ok(indices)
226}
227
228impl UnplannedSourceEnvelope {
229 fn into_source_envelope(
235 self,
236 key: Option<Vec<usize>>,
237 key_arity: Option<usize>,
238 source_arity: Option<usize>,
239 ) -> SourceEnvelope {
240 match self {
241 UnplannedSourceEnvelope::Upsert {
242 style: upsert_style,
243 } => SourceEnvelope::Upsert(UpsertEnvelope {
244 style: upsert_style,
245 key_indices: key.expect(
246 "into_source_envelope to be passed \
247 correct parameters for UnplannedSourceEnvelope::Upsert",
248 ),
249 source_arity: source_arity.expect(
250 "into_source_envelope to be passed \
251 correct parameters for UnplannedSourceEnvelope::Upsert",
252 ),
253 }),
254 UnplannedSourceEnvelope::None(key_envelope) => SourceEnvelope::None(NoneEnvelope {
255 key_envelope,
256 key_arity: key_arity.unwrap_or(0),
257 }),
258 UnplannedSourceEnvelope::CdcV2 => SourceEnvelope::CdcV2,
259 }
260 }
261
262 pub fn desc(
265 self,
266 key_desc: Option<RelationDesc>,
267 value_desc: RelationDesc,
268 metadata_desc: RelationDesc,
269 ) -> anyhow::Result<(SourceEnvelope, RelationDesc)> {
270 Ok(match &self {
271 UnplannedSourceEnvelope::None(key_envelope)
272 | UnplannedSourceEnvelope::Upsert {
273 style: UpsertStyle::Default(key_envelope),
274 ..
275 }
276 | UnplannedSourceEnvelope::Upsert {
277 style:
278 UpsertStyle::ValueErrInline {
279 key_envelope,
280 error_column: _,
281 },
282 } => {
283 let (key_arity, key_desc) = match key_desc {
284 Some(desc) if !desc.is_empty() => (Some(desc.arity()), Some(desc)),
285 _ => (None, None),
286 };
287
288 let (key_desc, key) = match (key_desc, key_arity, key_envelope) {
290 (_, _, KeyEnvelope::None) => (None, None),
291 (Some(key_desc), Some(key_arity), KeyEnvelope::Flattened) => {
292 let key_indices: Vec<usize> = (0..key_arity).collect();
294 let key_desc = key_desc.with_key(key_indices.clone());
295 (Some(key_desc), Some(key_indices))
296 }
297 (Some(key_desc), Some(key_arity), KeyEnvelope::Named(key_name)) => {
298 let key_desc = {
299 if key_arity > 1 {
301 let key_type = key_desc.typ();
302 let key_as_record = RelationType::new(vec![ColumnType {
303 nullable: false,
304 scalar_type: ScalarType::Record {
305 fields: key_desc
306 .iter_names()
307 .zip(key_type.column_types.iter())
308 .map(|(name, ty)| (name.clone(), ty.clone()))
309 .collect(),
310 custom_id: None,
311 },
312 }]);
313
314 RelationDesc::new(key_as_record, [key_name.to_string()])
315 } else {
316 key_desc.with_names([key_name.to_string()])
317 }
318 };
319 let (key_desc, key) = match self {
320 UnplannedSourceEnvelope::None(_) => (key_desc, None),
321 UnplannedSourceEnvelope::Upsert { .. } => {
323 (key_desc.with_key(vec![0]), Some(vec![0]))
324 }
325 _ => unreachable!(),
326 };
327 (Some(key_desc), key)
328 }
329 (None, _, _) => (None, None),
330 (_, None, _) => (None, None),
331 };
332
333 let value_desc = compute_envelope_value_desc(&self, value_desc);
334 let desc = match key_desc {
336 Some(key_desc) => key_desc.concat(value_desc).concat(metadata_desc),
337 None => value_desc.concat(metadata_desc),
338 };
339 (
340 self.into_source_envelope(key, key_arity, Some(desc.arity())),
341 desc,
342 )
343 }
344 UnplannedSourceEnvelope::Upsert {
345 style: UpsertStyle::Debezium { after_idx },
346 ..
347 } => match &value_desc.typ().column_types[*after_idx].scalar_type {
348 ScalarType::Record { fields, .. } => {
349 let mut desc = RelationDesc::from_names_and_types(fields.clone());
350 let key = key_desc.map(|k| match_key_indices(&k, &desc)).transpose()?;
351 if let Some(key) = key.clone() {
352 desc = desc.with_key(key);
353 }
354
355 let desc = match self {
356 UnplannedSourceEnvelope::Upsert { .. } => desc.concat(metadata_desc),
357 _ => desc,
358 };
359
360 (
361 self.into_source_envelope(key, None, Some(desc.arity())),
362 desc,
363 )
364 }
365 ty => bail!(
366 "Incorrect type for Debezium value, expected Record, got {:?}",
367 ty
368 ),
369 },
370 UnplannedSourceEnvelope::CdcV2 => {
371 match &value_desc.typ().column_types[0].scalar_type {
375 ScalarType::List { element_type, .. } => match &**element_type {
376 ScalarType::Record { fields, .. } => {
377 match &fields[0].1.scalar_type {
379 ScalarType::Record { fields, .. } => (
380 self.into_source_envelope(None, None, None),
381 RelationDesc::from_names_and_types(fields.clone()),
382 ),
383 ty => {
384 bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty)
385 }
386 }
387 }
388 ty => bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty),
389 },
390 ty => bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty),
391 }
392 }
393 })
394 }
395}
396
397#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
399pub enum KeyEnvelope {
400 None,
402 Flattened,
404 Named(String),
410}
411
412impl RustType<ProtoKeyEnvelope> for KeyEnvelope {
413 fn into_proto(&self) -> ProtoKeyEnvelope {
414 use proto_key_envelope::Kind;
415 ProtoKeyEnvelope {
416 kind: Some(match self {
417 KeyEnvelope::None => Kind::None(()),
418 KeyEnvelope::Flattened => Kind::Flattened(()),
419 KeyEnvelope::Named(name) => Kind::Named(name.clone()),
420 }),
421 }
422 }
423
424 fn from_proto(proto: ProtoKeyEnvelope) -> Result<Self, TryFromProtoError> {
425 use proto_key_envelope::Kind;
426 let kind = proto
427 .kind
428 .ok_or_else(|| TryFromProtoError::missing_field("ProtoKeyEnvelope::kind"))?;
429 Ok(match kind {
430 Kind::None(()) => KeyEnvelope::None,
431 Kind::Flattened(()) => KeyEnvelope::Flattened,
432 Kind::Named(name) => KeyEnvelope::Named(name),
433 })
434 }
435}
436
437fn compute_envelope_value_desc(
441 source_envelope: &UnplannedSourceEnvelope,
442 value_desc: RelationDesc,
443) -> RelationDesc {
444 match &source_envelope {
445 UnplannedSourceEnvelope::Upsert {
446 style:
447 UpsertStyle::ValueErrInline {
448 key_envelope: _,
449 error_column,
450 },
451 } => {
452 let mut names = Vec::with_capacity(value_desc.arity() + 1);
453 names.push(error_column.as_str().into());
454 names.extend(value_desc.iter_names().cloned());
455
456 let mut types = Vec::with_capacity(value_desc.arity() + 1);
457 types.push(ColumnType {
458 nullable: true,
459 scalar_type: ScalarType::Record {
460 fields: [(
461 "description".into(),
462 ColumnType {
463 nullable: true,
464 scalar_type: ScalarType::String,
465 },
466 )]
467 .into(),
468 custom_id: None,
469 },
470 });
471 types.extend(value_desc.iter_types().map(|t| t.clone().nullable(true)));
472 let relation_type = RelationType::new(types);
473 RelationDesc::new(relation_type, names)
474 }
475 _ => value_desc,
476 }
477}