mz_storage_types/sources/
envelope.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related to source envelopes
11
12use anyhow::{anyhow, bail};
13use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
14use mz_repr::{ColumnType, RelationDesc, RelationType, ScalarType};
15use proptest::prelude::any;
16use proptest_derive::Arbitrary;
17use serde::{Deserialize, Serialize};
18
19include!(concat!(
20    env!("OUT_DIR"),
21    "/mz_storage_types.sources.envelope.rs"
22));
23
24/// `SourceEnvelope`s describe how to turn a stream of messages from `SourceDesc`s
25/// into a _differential stream_, that is, a stream of (data, time, diff)
26/// triples.
27///
28/// PostgreSQL sources skip any explicit envelope handling, effectively
29/// asserting that `SourceEnvelope` is `None` with `KeyEnvelope::None`.
30#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
31pub enum SourceEnvelope {
32    /// The most trivial version is `None`, which typically produces triples where the diff
33    /// is `1`. However, some sources are able to produce values with more exotic diff's,
34    /// such as the posgres source. Currently, this is the only variant usable with
35    /// those sources.
36    ///
37    /// If the `KeyEnvelope` is present,
38    /// include the key columns as an output column of the source with the given properties.
39    None(NoneEnvelope),
40    /// `Upsert` holds onto previously seen values and produces `1` or `-1` diffs depending on
41    /// whether or not the required _key_ outputed by the source has been seen before. This also
42    /// supports a `Debezium` mode.
43    Upsert(UpsertEnvelope),
44    /// `CdcV2` requires sources output messages in a strict form that requires a upstream-provided
45    /// timeline.
46    CdcV2,
47}
48
49impl RustType<ProtoSourceEnvelope> for SourceEnvelope {
50    fn into_proto(&self) -> ProtoSourceEnvelope {
51        use proto_source_envelope::Kind;
52        ProtoSourceEnvelope {
53            kind: Some(match self {
54                SourceEnvelope::None(e) => Kind::None(e.into_proto()),
55                SourceEnvelope::Upsert(e) => Kind::Upsert(e.into_proto()),
56                SourceEnvelope::CdcV2 => Kind::CdcV2(()),
57            }),
58        }
59    }
60
61    fn from_proto(proto: ProtoSourceEnvelope) -> Result<Self, TryFromProtoError> {
62        use proto_source_envelope::Kind;
63        let kind = proto
64            .kind
65            .ok_or_else(|| TryFromProtoError::missing_field("ProtoSourceEnvelope::kind"))?;
66        Ok(match kind {
67            Kind::None(e) => SourceEnvelope::None(e.into_rust()?),
68            Kind::Upsert(e) => SourceEnvelope::Upsert(e.into_rust()?),
69            Kind::CdcV2(()) => SourceEnvelope::CdcV2,
70        })
71    }
72}
73
74/// `UnplannedSourceEnvelope` is a `SourceEnvelope` missing some information. This information
75/// is obtained in `UnplannedSourceEnvelope::desc`, where
76/// `UnplannedSourceEnvelope::into_source_envelope`
77/// creates a full `SourceEnvelope`
78#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
79pub enum UnplannedSourceEnvelope {
80    None(KeyEnvelope),
81    Upsert { style: UpsertStyle },
82    CdcV2,
83}
84
85#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
86pub struct NoneEnvelope {
87    pub key_envelope: KeyEnvelope,
88    pub key_arity: usize,
89}
90
91impl RustType<ProtoNoneEnvelope> for NoneEnvelope {
92    fn into_proto(&self) -> ProtoNoneEnvelope {
93        ProtoNoneEnvelope {
94            key_envelope: Some(self.key_envelope.into_proto()),
95            key_arity: self.key_arity.into_proto(),
96        }
97    }
98
99    fn from_proto(proto: ProtoNoneEnvelope) -> Result<Self, TryFromProtoError> {
100        Ok(NoneEnvelope {
101            key_envelope: proto
102                .key_envelope
103                .into_rust_if_some("ProtoNoneEnvelope::key_envelope")?,
104            key_arity: proto.key_arity.into_rust()?,
105        })
106    }
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
110pub struct UpsertEnvelope {
111    /// Full arity, including the key columns
112    pub source_arity: usize,
113    /// What style of Upsert we are using
114    pub style: UpsertStyle,
115    /// The indices of the keys in the full value row, used
116    /// to deduplicate data in `upsert_core`
117    #[proptest(strategy = "proptest::collection::vec(any::<usize>(), 0..4)")]
118    pub key_indices: Vec<usize>,
119}
120
121impl RustType<ProtoUpsertEnvelope> for UpsertEnvelope {
122    fn into_proto(&self) -> ProtoUpsertEnvelope {
123        ProtoUpsertEnvelope {
124            source_arity: self.source_arity.into_proto(),
125            style: Some(self.style.into_proto()),
126            key_indices: self.key_indices.into_proto(),
127        }
128    }
129
130    fn from_proto(proto: ProtoUpsertEnvelope) -> Result<Self, TryFromProtoError> {
131        Ok(UpsertEnvelope {
132            source_arity: proto.source_arity.into_rust()?,
133            style: proto
134                .style
135                .into_rust_if_some("ProtoUpsertEnvelope::style")?,
136            key_indices: proto.key_indices.into_rust()?,
137        })
138    }
139}
140
141#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
142pub enum UpsertStyle {
143    /// `ENVELOPE UPSERT`, where the key shape depends on the independent
144    /// `KeyEnvelope`
145    Default(KeyEnvelope),
146    /// `ENVELOPE DEBEZIUM UPSERT`
147    Debezium { after_idx: usize },
148    /// `ENVELOPE UPSERT` where any decode errors will get serialized into a
149    /// ScalarType::Record column named `error_column`, and all value columns are
150    /// nullable. The key shape depends on the independent `KeyEnvelope`.
151    ValueErrInline {
152        key_envelope: KeyEnvelope,
153        error_column: String,
154    },
155}
156
157impl RustType<ProtoUpsertStyle> for UpsertStyle {
158    fn into_proto(&self) -> ProtoUpsertStyle {
159        use proto_upsert_style::{Kind, ProtoDebezium, ProtoValueErrInline};
160        ProtoUpsertStyle {
161            kind: Some(match self {
162                UpsertStyle::Default(e) => Kind::Default(e.into_proto()),
163                UpsertStyle::Debezium { after_idx } => Kind::Debezium(ProtoDebezium {
164                    after_idx: after_idx.into_proto(),
165                }),
166                UpsertStyle::ValueErrInline {
167                    key_envelope,
168                    error_column,
169                } => Kind::ValueErrorInline(ProtoValueErrInline {
170                    key_envelope: Some(key_envelope.into_proto()),
171                    error_column: error_column.clone(),
172                }),
173            }),
174        }
175    }
176
177    fn from_proto(proto: ProtoUpsertStyle) -> Result<Self, TryFromProtoError> {
178        use proto_upsert_style::Kind;
179        let kind = proto
180            .kind
181            .ok_or_else(|| TryFromProtoError::missing_field("ProtoUpsertStyle::kind"))?;
182        Ok(match kind {
183            Kind::Default(e) => UpsertStyle::Default(e.into_rust()?),
184            Kind::Debezium(d) => UpsertStyle::Debezium {
185                after_idx: d.after_idx.into_rust()?,
186            },
187            Kind::ValueErrorInline(e) => UpsertStyle::ValueErrInline {
188                key_envelope: e
189                    .key_envelope
190                    .ok_or_else(|| {
191                        TryFromProtoError::missing_field("ProtoValueErrInline::key_envelope")
192                    })?
193                    .into_rust()?,
194                error_column: e.error_column.clone(),
195            },
196        })
197    }
198}
199
200/// Computes the indices of the value's relation description that appear in the key.
201///
202/// Returns an error if it detects a common columns between the two relations that has the same
203/// name but a different type, if a key column is missing from the value, and if the key relation
204/// has a column with no name.
205fn match_key_indices(
206    key_desc: &RelationDesc,
207    value_desc: &RelationDesc,
208) -> anyhow::Result<Vec<usize>> {
209    let mut indices = Vec::new();
210    for (name, key_type) in key_desc.iter() {
211        let (index, value_type) = value_desc
212            .get_by_name(name)
213            .ok_or_else(|| anyhow!("Value schema missing primary key column: {}", name))?;
214
215        if key_type == value_type {
216            indices.push(index);
217        } else {
218            bail!(
219                "key and value column types do not match: key {:?} vs. value {:?}",
220                key_type,
221                value_type
222            );
223        }
224    }
225    Ok(indices)
226}
227
228impl UnplannedSourceEnvelope {
229    /// Transforms an `UnplannedSourceEnvelope` into a `SourceEnvelope`
230    ///
231    /// Panics if the input envelope is `UnplannedSourceEnvelope::Upsert` and
232    /// key is not passed as `Some`
233    // TODO(petrosagg): This API looks very error prone. Can we statically enforce it somehow?
234    fn into_source_envelope(
235        self,
236        key: Option<Vec<usize>>,
237        key_arity: Option<usize>,
238        source_arity: Option<usize>,
239    ) -> SourceEnvelope {
240        match self {
241            UnplannedSourceEnvelope::Upsert {
242                style: upsert_style,
243            } => SourceEnvelope::Upsert(UpsertEnvelope {
244                style: upsert_style,
245                key_indices: key.expect(
246                    "into_source_envelope to be passed \
247                    correct parameters for UnplannedSourceEnvelope::Upsert",
248                ),
249                source_arity: source_arity.expect(
250                    "into_source_envelope to be passed \
251                    correct parameters for UnplannedSourceEnvelope::Upsert",
252                ),
253            }),
254            UnplannedSourceEnvelope::None(key_envelope) => SourceEnvelope::None(NoneEnvelope {
255                key_envelope,
256                key_arity: key_arity.unwrap_or(0),
257            }),
258            UnplannedSourceEnvelope::CdcV2 => SourceEnvelope::CdcV2,
259        }
260    }
261
262    /// Computes the output relation of this envelope when applied on top of the decoded key and
263    /// value relation desc
264    pub fn desc(
265        self,
266        key_desc: Option<RelationDesc>,
267        value_desc: RelationDesc,
268        metadata_desc: RelationDesc,
269    ) -> anyhow::Result<(SourceEnvelope, RelationDesc)> {
270        Ok(match &self {
271            UnplannedSourceEnvelope::None(key_envelope)
272            | UnplannedSourceEnvelope::Upsert {
273                style: UpsertStyle::Default(key_envelope),
274                ..
275            }
276            | UnplannedSourceEnvelope::Upsert {
277                style:
278                    UpsertStyle::ValueErrInline {
279                        key_envelope,
280                        error_column: _,
281                    },
282            } => {
283                let (key_arity, key_desc) = match key_desc {
284                    Some(desc) if !desc.is_empty() => (Some(desc.arity()), Some(desc)),
285                    _ => (None, None),
286                };
287
288                // Compute any key relation and key indices
289                let (key_desc, key) = match (key_desc, key_arity, key_envelope) {
290                    (_, _, KeyEnvelope::None) => (None, None),
291                    (Some(key_desc), Some(key_arity), KeyEnvelope::Flattened) => {
292                        // Add the key columns as a key.
293                        let key_indices: Vec<usize> = (0..key_arity).collect();
294                        let key_desc = key_desc.with_key(key_indices.clone());
295                        (Some(key_desc), Some(key_indices))
296                    }
297                    (Some(key_desc), Some(key_arity), KeyEnvelope::Named(key_name)) => {
298                        let key_desc = {
299                            // if the key has multiple objects, nest them as a record inside of a single name
300                            if key_arity > 1 {
301                                let key_type = key_desc.typ();
302                                let key_as_record = RelationType::new(vec![ColumnType {
303                                    nullable: false,
304                                    scalar_type: ScalarType::Record {
305                                        fields: key_desc
306                                            .iter_names()
307                                            .zip(key_type.column_types.iter())
308                                            .map(|(name, ty)| (name.clone(), ty.clone()))
309                                            .collect(),
310                                        custom_id: None,
311                                    },
312                                }]);
313
314                                RelationDesc::new(key_as_record, [key_name.to_string()])
315                            } else {
316                                key_desc.with_names([key_name.to_string()])
317                            }
318                        };
319                        let (key_desc, key) = match self {
320                            UnplannedSourceEnvelope::None(_) => (key_desc, None),
321                            // If we're applying the upsert logic the key column will be unique
322                            UnplannedSourceEnvelope::Upsert { .. } => {
323                                (key_desc.with_key(vec![0]), Some(vec![0]))
324                            }
325                            _ => unreachable!(),
326                        };
327                        (Some(key_desc), key)
328                    }
329                    (None, _, _) => (None, None),
330                    (_, None, _) => (None, None),
331                };
332
333                let value_desc = compute_envelope_value_desc(&self, value_desc);
334                // Add value-related columns and metadata columns after any key columns.
335                let desc = match key_desc {
336                    Some(key_desc) => key_desc.concat(value_desc).concat(metadata_desc),
337                    None => value_desc.concat(metadata_desc),
338                };
339                (
340                    self.into_source_envelope(key, key_arity, Some(desc.arity())),
341                    desc,
342                )
343            }
344            UnplannedSourceEnvelope::Upsert {
345                style: UpsertStyle::Debezium { after_idx },
346                ..
347            } => match &value_desc.typ().column_types[*after_idx].scalar_type {
348                ScalarType::Record { fields, .. } => {
349                    let mut desc = RelationDesc::from_names_and_types(fields.clone());
350                    let key = key_desc.map(|k| match_key_indices(&k, &desc)).transpose()?;
351                    if let Some(key) = key.clone() {
352                        desc = desc.with_key(key);
353                    }
354
355                    let desc = match self {
356                        UnplannedSourceEnvelope::Upsert { .. } => desc.concat(metadata_desc),
357                        _ => desc,
358                    };
359
360                    (
361                        self.into_source_envelope(key, None, Some(desc.arity())),
362                        desc,
363                    )
364                }
365                ty => bail!(
366                    "Incorrect type for Debezium value, expected Record, got {:?}",
367                    ty
368                ),
369            },
370            UnplannedSourceEnvelope::CdcV2 => {
371                // the correct types
372
373                // CdcV2 row data are in a record in a record in a list
374                match &value_desc.typ().column_types[0].scalar_type {
375                    ScalarType::List { element_type, .. } => match &**element_type {
376                        ScalarType::Record { fields, .. } => {
377                            // TODO maybe check this by name
378                            match &fields[0].1.scalar_type {
379                                ScalarType::Record { fields, .. } => (
380                                    self.into_source_envelope(None, None, None),
381                                    RelationDesc::from_names_and_types(fields.clone()),
382                                ),
383                                ty => {
384                                    bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty)
385                                }
386                            }
387                        }
388                        ty => bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty),
389                    },
390                    ty => bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty),
391                }
392            }
393        })
394    }
395}
396
397/// Whether and how to include the decoded key of a stream in dataflows
398#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
399pub enum KeyEnvelope {
400    /// Never include the key in the output row
401    None,
402    /// For composite key encodings, pull the fields from the encoding into columns.
403    Flattened,
404    /// Always use the given name for the key.
405    ///
406    /// * For a single-field key, this means that the column will get the given name.
407    /// * For a multi-column key, the columns will get packed into a [`ScalarType::Record`], and
408    ///   that Record will get the given name.
409    Named(String),
410}
411
412impl RustType<ProtoKeyEnvelope> for KeyEnvelope {
413    fn into_proto(&self) -> ProtoKeyEnvelope {
414        use proto_key_envelope::Kind;
415        ProtoKeyEnvelope {
416            kind: Some(match self {
417                KeyEnvelope::None => Kind::None(()),
418                KeyEnvelope::Flattened => Kind::Flattened(()),
419                KeyEnvelope::Named(name) => Kind::Named(name.clone()),
420            }),
421        }
422    }
423
424    fn from_proto(proto: ProtoKeyEnvelope) -> Result<Self, TryFromProtoError> {
425        use proto_key_envelope::Kind;
426        let kind = proto
427            .kind
428            .ok_or_else(|| TryFromProtoError::missing_field("ProtoKeyEnvelope::kind"))?;
429        Ok(match kind {
430            Kind::None(()) => KeyEnvelope::None,
431            Kind::Flattened(()) => KeyEnvelope::Flattened,
432            Kind::Named(name) => KeyEnvelope::Named(name),
433        })
434    }
435}
436
437/// Compute the resulting value relation given the decoded value relation and the envelope
438/// style. If the ValueErrInline upsert style is used this will add an error column to the
439/// beginning of the relation and make all value fields nullable.
440fn compute_envelope_value_desc(
441    source_envelope: &UnplannedSourceEnvelope,
442    value_desc: RelationDesc,
443) -> RelationDesc {
444    match &source_envelope {
445        UnplannedSourceEnvelope::Upsert {
446            style:
447                UpsertStyle::ValueErrInline {
448                    key_envelope: _,
449                    error_column,
450                },
451        } => {
452            let mut names = Vec::with_capacity(value_desc.arity() + 1);
453            names.push(error_column.as_str().into());
454            names.extend(value_desc.iter_names().cloned());
455
456            let mut types = Vec::with_capacity(value_desc.arity() + 1);
457            types.push(ColumnType {
458                nullable: true,
459                scalar_type: ScalarType::Record {
460                    fields: [(
461                        "description".into(),
462                        ColumnType {
463                            nullable: true,
464                            scalar_type: ScalarType::String,
465                        },
466                    )]
467                    .into(),
468                    custom_id: None,
469                },
470            });
471            types.extend(value_desc.iter_types().map(|t| t.clone().nullable(true)));
472            let relation_type = RelationType::new(types);
473            RelationDesc::new(relation_type, names)
474        }
475        _ => value_desc,
476    }
477}