mz_storage_types/sources/
envelope.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related to source envelopes
11
12use anyhow::{anyhow, bail};
13use itertools::Itertools;
14use mz_repr::{RelationDesc, SqlColumnType, SqlRelationType, SqlScalarType};
15use serde::{Deserialize, Serialize};
16
17/// `SourceEnvelope`s describe how to turn a stream of messages from `SourceDesc`s
18/// into a _differential stream_, that is, a stream of (data, time, diff)
19/// triples.
20///
21/// PostgreSQL sources skip any explicit envelope handling, effectively
22/// asserting that `SourceEnvelope` is `None` with `KeyEnvelope::None`.
23#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
24pub enum SourceEnvelope {
25    /// The most trivial version is `None`, which typically produces triples where the diff
26    /// is `1`. However, some sources are able to produce values with more exotic diff's,
27    /// such as the posgres source. Currently, this is the only variant usable with
28    /// those sources.
29    ///
30    /// If the `KeyEnvelope` is present,
31    /// include the key columns as an output column of the source with the given properties.
32    None(NoneEnvelope),
33    /// `Upsert` holds onto previously seen values and produces `1` or `-1` diffs depending on
34    /// whether or not the required _key_ outputed by the source has been seen before. This also
35    /// supports a `Debezium` mode.
36    Upsert(UpsertEnvelope),
37    /// `CdcV2` requires sources output messages in a strict form that requires a upstream-provided
38    /// timeline.
39    CdcV2,
40}
41
42/// `UnplannedSourceEnvelope` is a `SourceEnvelope` missing some information. This information
43/// is obtained in `UnplannedSourceEnvelope::desc`, where
44/// `UnplannedSourceEnvelope::into_source_envelope`
45/// creates a full `SourceEnvelope`
46#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
47pub enum UnplannedSourceEnvelope {
48    None(KeyEnvelope),
49    Upsert { style: UpsertStyle },
50    CdcV2,
51}
52
53#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
54pub struct NoneEnvelope {
55    pub key_envelope: KeyEnvelope,
56    pub key_arity: usize,
57}
58
59#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
60pub struct UpsertEnvelope {
61    /// Full arity, including the key columns
62    pub source_arity: usize,
63    /// What style of Upsert we are using
64    pub style: UpsertStyle,
65    /// The indices of the keys in the full value row, used
66    /// to deduplicate data in `upsert_core`
67    pub key_indices: Vec<usize>,
68}
69
70#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
71pub enum UpsertStyle {
72    /// `ENVELOPE UPSERT`, where the key shape depends on the independent
73    /// `KeyEnvelope`
74    Default(KeyEnvelope),
75    /// `ENVELOPE DEBEZIUM UPSERT`
76    Debezium { after_idx: usize },
77    /// `ENVELOPE UPSERT` where any decode errors will get serialized into a
78    /// SqlScalarType::Record column named `error_column`, and all value columns are
79    /// nullable. The key shape depends on the independent `KeyEnvelope`.
80    ValueErrInline {
81        key_envelope: KeyEnvelope,
82        error_column: String,
83    },
84}
85
86/// Computes the indices of the value's relation description that appear in the key.
87///
88/// Returns an error if it detects a common columns between the two relations that has the same
89/// name but a different type, if a key column is missing from the value, and if the key relation
90/// has a column with no name.
91fn match_key_indices(
92    key_desc: &RelationDesc,
93    value_desc: &RelationDesc,
94) -> anyhow::Result<Vec<usize>> {
95    let mut indices = Vec::new();
96    for (name, key_type) in key_desc.iter() {
97        let (index, value_type) = value_desc
98            .get_by_name(name)
99            .ok_or_else(|| anyhow!("Value schema missing primary key column: {}", name))?;
100
101        if key_type == value_type {
102            indices.push(index);
103        } else {
104            bail!(
105                "key and value column types do not match: key {:?} vs. value {:?}",
106                key_type,
107                value_type
108            );
109        }
110    }
111    Ok(indices)
112}
113
114impl UnplannedSourceEnvelope {
115    /// Transforms an `UnplannedSourceEnvelope` into a `SourceEnvelope`
116    ///
117    /// Panics if the input envelope is `UnplannedSourceEnvelope::Upsert` and
118    /// key is not passed as `Some`
119    // TODO(petrosagg): This API looks very error prone. Can we statically enforce it somehow?
120    fn into_source_envelope(
121        self,
122        key: Option<Vec<usize>>,
123        key_arity: Option<usize>,
124        source_arity: Option<usize>,
125    ) -> SourceEnvelope {
126        match self {
127            UnplannedSourceEnvelope::Upsert {
128                style: upsert_style,
129            } => SourceEnvelope::Upsert(UpsertEnvelope {
130                style: upsert_style,
131                key_indices: key.expect(
132                    "into_source_envelope to be passed \
133                    correct parameters for UnplannedSourceEnvelope::Upsert",
134                ),
135                source_arity: source_arity.expect(
136                    "into_source_envelope to be passed \
137                    correct parameters for UnplannedSourceEnvelope::Upsert",
138                ),
139            }),
140            UnplannedSourceEnvelope::None(key_envelope) => SourceEnvelope::None(NoneEnvelope {
141                key_envelope,
142                key_arity: key_arity.unwrap_or(0),
143            }),
144            UnplannedSourceEnvelope::CdcV2 => SourceEnvelope::CdcV2,
145        }
146    }
147
148    /// Computes the output relation of this envelope when applied on top of the decoded key and
149    /// value relation desc
150    pub fn desc(
151        self,
152        key_desc: Option<RelationDesc>,
153        value_desc: RelationDesc,
154        metadata_desc: RelationDesc,
155    ) -> anyhow::Result<(SourceEnvelope, RelationDesc)> {
156        Ok(match &self {
157            UnplannedSourceEnvelope::None(key_envelope)
158            | UnplannedSourceEnvelope::Upsert {
159                style: UpsertStyle::Default(key_envelope),
160                ..
161            }
162            | UnplannedSourceEnvelope::Upsert {
163                style:
164                    UpsertStyle::ValueErrInline {
165                        key_envelope,
166                        error_column: _,
167                    },
168            } => {
169                let (key_arity, key_desc) = match key_desc {
170                    Some(desc) if !desc.is_empty() => (Some(desc.arity()), Some(desc)),
171                    _ => (None, None),
172                };
173
174                // Compute any key relation and key indices
175                let (key_desc, key) = match (key_desc, key_arity, key_envelope) {
176                    (_, _, KeyEnvelope::None) => (None, None),
177                    (Some(key_desc), Some(key_arity), KeyEnvelope::Flattened) => {
178                        // Add the key columns as a key.
179                        let key_indices: Vec<usize> = (0..key_arity).collect();
180                        let key_desc = key_desc.with_key(key_indices.clone());
181                        (Some(key_desc), Some(key_indices))
182                    }
183                    (Some(key_desc), Some(key_arity), KeyEnvelope::Named(key_name)) => {
184                        let key_desc = {
185                            // if the key has multiple objects, nest them as a record inside of a single name
186                            if key_arity > 1 {
187                                let key_type = key_desc.typ();
188                                let key_as_record = SqlRelationType::new(vec![SqlColumnType {
189                                    nullable: false,
190                                    scalar_type: SqlScalarType::Record {
191                                        fields: key_desc
192                                            .iter_names()
193                                            .zip_eq(key_type.column_types.iter())
194                                            .map(|(name, ty)| (name.clone(), ty.clone()))
195                                            .collect(),
196                                        custom_id: None,
197                                    },
198                                }]);
199
200                                RelationDesc::new(key_as_record, [key_name.to_string()])
201                            } else {
202                                key_desc.with_names([key_name.to_string()])
203                            }
204                        };
205                        let (key_desc, key) = match self {
206                            UnplannedSourceEnvelope::None(_) => (key_desc, None),
207                            // If we're applying the upsert logic the key column will be unique
208                            UnplannedSourceEnvelope::Upsert { .. } => {
209                                (key_desc.with_key(vec![0]), Some(vec![0]))
210                            }
211                            _ => unreachable!(),
212                        };
213                        (Some(key_desc), key)
214                    }
215                    (None, _, _) => (None, None),
216                    (_, None, _) => (None, None),
217                };
218
219                let value_desc = compute_envelope_value_desc(&self, value_desc);
220                // Add value-related columns and metadata columns after any key columns.
221                let desc = match key_desc {
222                    Some(key_desc) => key_desc.concat(value_desc).concat(metadata_desc),
223                    None => value_desc.concat(metadata_desc),
224                };
225                (
226                    self.into_source_envelope(key, key_arity, Some(desc.arity())),
227                    desc,
228                )
229            }
230            UnplannedSourceEnvelope::Upsert {
231                style: UpsertStyle::Debezium { after_idx },
232                ..
233            } => match &value_desc.typ().column_types[*after_idx].scalar_type {
234                SqlScalarType::Record { fields, .. } => {
235                    let mut desc = RelationDesc::from_names_and_types(fields.clone());
236                    let key = key_desc.map(|k| match_key_indices(&k, &desc)).transpose()?;
237                    if let Some(key) = key.clone() {
238                        desc = desc.with_key(key);
239                    }
240
241                    let desc = match self {
242                        UnplannedSourceEnvelope::Upsert { .. } => desc.concat(metadata_desc),
243                        _ => desc,
244                    };
245
246                    (
247                        self.into_source_envelope(key, None, Some(desc.arity())),
248                        desc,
249                    )
250                }
251                ty => bail!(
252                    "Incorrect type for Debezium value, expected Record, got {:?}",
253                    ty
254                ),
255            },
256            UnplannedSourceEnvelope::CdcV2 => {
257                // the correct types
258
259                // CdcV2 row data are in a record in a record in a list
260                match &value_desc.typ().column_types[0].scalar_type {
261                    SqlScalarType::List { element_type, .. } => match &**element_type {
262                        SqlScalarType::Record { fields, .. } => {
263                            // TODO maybe check this by name
264                            match &fields[0].1.scalar_type {
265                                SqlScalarType::Record { fields, .. } => (
266                                    self.into_source_envelope(None, None, None),
267                                    RelationDesc::from_names_and_types(fields.clone()),
268                                ),
269                                ty => {
270                                    bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty)
271                                }
272                            }
273                        }
274                        ty => bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty),
275                    },
276                    ty => bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty),
277                }
278            }
279        })
280    }
281}
282
283/// Whether and how to include the decoded key of a stream in dataflows
284#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
285pub enum KeyEnvelope {
286    /// Never include the key in the output row
287    None,
288    /// For composite key encodings, pull the fields from the encoding into columns.
289    Flattened,
290    /// Always use the given name for the key.
291    ///
292    /// * For a single-field key, this means that the column will get the given name.
293    /// * For a multi-column key, the columns will get packed into a [`SqlScalarType::Record`], and
294    ///   that Record will get the given name.
295    Named(String),
296}
297
298/// Compute the resulting value relation given the decoded value relation and the envelope
299/// style. If the ValueErrInline upsert style is used this will add an error column to the
300/// beginning of the relation and make all value fields nullable.
301fn compute_envelope_value_desc(
302    source_envelope: &UnplannedSourceEnvelope,
303    value_desc: RelationDesc,
304) -> RelationDesc {
305    match &source_envelope {
306        UnplannedSourceEnvelope::Upsert {
307            style:
308                UpsertStyle::ValueErrInline {
309                    key_envelope: _,
310                    error_column,
311                },
312        } => {
313            let mut names = Vec::with_capacity(value_desc.arity() + 1);
314            names.push(error_column.as_str().into());
315            names.extend(value_desc.iter_names().cloned());
316
317            let mut types = Vec::with_capacity(value_desc.arity() + 1);
318            types.push(SqlColumnType {
319                nullable: true,
320                scalar_type: SqlScalarType::Record {
321                    fields: [(
322                        "description".into(),
323                        SqlColumnType {
324                            nullable: true,
325                            scalar_type: SqlScalarType::String,
326                        },
327                    )]
328                    .into(),
329                    custom_id: None,
330                },
331            });
332            types.extend(value_desc.iter_types().map(|t| t.clone().nullable(true)));
333            let relation_type = SqlRelationType::new(types);
334            RelationDesc::new(relation_type, names)
335        }
336        _ => value_desc,
337    }
338}