mz_storage_types/sources/envelope.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related to source envelopes
11
12use anyhow::{anyhow, bail};
13use itertools::Itertools;
14use mz_repr::{RelationDesc, SqlColumnType, SqlRelationType, SqlScalarType};
15use serde::{Deserialize, Serialize};
16
17/// `SourceEnvelope`s describe how to turn a stream of messages from `SourceDesc`s
18/// into a _differential stream_, that is, a stream of (data, time, diff)
19/// triples.
20///
21/// PostgreSQL sources skip any explicit envelope handling, effectively
22/// asserting that `SourceEnvelope` is `None` with `KeyEnvelope::None`.
23#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
24pub enum SourceEnvelope {
25 /// The most trivial version is `None`, which typically produces triples where the diff
26 /// is `1`. However, some sources are able to produce values with more exotic diff's,
27 /// such as the posgres source. Currently, this is the only variant usable with
28 /// those sources.
29 ///
30 /// If the `KeyEnvelope` is present,
31 /// include the key columns as an output column of the source with the given properties.
32 None(NoneEnvelope),
33 /// `Upsert` holds onto previously seen values and produces `1` or `-1` diffs depending on
34 /// whether or not the required _key_ outputed by the source has been seen before. This also
35 /// supports a `Debezium` mode.
36 Upsert(UpsertEnvelope),
37 /// `CdcV2` requires sources output messages in a strict form that requires a upstream-provided
38 /// timeline.
39 CdcV2,
40}
41
42/// `UnplannedSourceEnvelope` is a `SourceEnvelope` missing some information. This information
43/// is obtained in `UnplannedSourceEnvelope::desc`, where
44/// `UnplannedSourceEnvelope::into_source_envelope`
45/// creates a full `SourceEnvelope`
46#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
47pub enum UnplannedSourceEnvelope {
48 None(KeyEnvelope),
49 Upsert { style: UpsertStyle },
50 CdcV2,
51}
52
53#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
54pub struct NoneEnvelope {
55 pub key_envelope: KeyEnvelope,
56 pub key_arity: usize,
57}
58
59#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
60pub struct UpsertEnvelope {
61 /// Full arity, including the key columns
62 pub source_arity: usize,
63 /// What style of Upsert we are using
64 pub style: UpsertStyle,
65 /// The indices of the keys in the full value row, used
66 /// to deduplicate data in `upsert_core`
67 pub key_indices: Vec<usize>,
68}
69
70#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
71pub enum UpsertStyle {
72 /// `ENVELOPE UPSERT`, where the key shape depends on the independent
73 /// `KeyEnvelope`
74 Default(KeyEnvelope),
75 /// `ENVELOPE DEBEZIUM UPSERT`
76 Debezium { after_idx: usize },
77 /// `ENVELOPE UPSERT` where any decode errors will get serialized into a
78 /// SqlScalarType::Record column named `error_column`, and all value columns are
79 /// nullable. The key shape depends on the independent `KeyEnvelope`.
80 ValueErrInline {
81 key_envelope: KeyEnvelope,
82 error_column: String,
83 },
84}
85
86/// Computes the indices of the value's relation description that appear in the key.
87///
88/// Returns an error if it detects a common columns between the two relations that has the same
89/// name but a different type, if a key column is missing from the value, and if the key relation
90/// has a column with no name.
91fn match_key_indices(
92 key_desc: &RelationDesc,
93 value_desc: &RelationDesc,
94) -> anyhow::Result<Vec<usize>> {
95 let mut indices = Vec::new();
96 for (name, key_type) in key_desc.iter() {
97 let (index, value_type) = value_desc
98 .get_by_name(name)
99 .ok_or_else(|| anyhow!("Value schema missing primary key column: {}", name))?;
100
101 if key_type == value_type {
102 indices.push(index);
103 } else {
104 bail!(
105 "key and value column types do not match: key {:?} vs. value {:?}",
106 key_type,
107 value_type
108 );
109 }
110 }
111 Ok(indices)
112}
113
114impl UnplannedSourceEnvelope {
115 /// Transforms an `UnplannedSourceEnvelope` into a `SourceEnvelope`
116 ///
117 /// Panics if the input envelope is `UnplannedSourceEnvelope::Upsert` and
118 /// key is not passed as `Some`
119 // TODO(petrosagg): This API looks very error prone. Can we statically enforce it somehow?
120 fn into_source_envelope(
121 self,
122 key: Option<Vec<usize>>,
123 key_arity: Option<usize>,
124 source_arity: Option<usize>,
125 ) -> SourceEnvelope {
126 match self {
127 UnplannedSourceEnvelope::Upsert {
128 style: upsert_style,
129 } => SourceEnvelope::Upsert(UpsertEnvelope {
130 style: upsert_style,
131 key_indices: key.expect(
132 "into_source_envelope to be passed \
133 correct parameters for UnplannedSourceEnvelope::Upsert",
134 ),
135 source_arity: source_arity.expect(
136 "into_source_envelope to be passed \
137 correct parameters for UnplannedSourceEnvelope::Upsert",
138 ),
139 }),
140 UnplannedSourceEnvelope::None(key_envelope) => SourceEnvelope::None(NoneEnvelope {
141 key_envelope,
142 key_arity: key_arity.unwrap_or(0),
143 }),
144 UnplannedSourceEnvelope::CdcV2 => SourceEnvelope::CdcV2,
145 }
146 }
147
148 /// Computes the output relation of this envelope when applied on top of the decoded key and
149 /// value relation desc
150 pub fn desc(
151 self,
152 key_desc: Option<RelationDesc>,
153 value_desc: RelationDesc,
154 metadata_desc: RelationDesc,
155 ) -> anyhow::Result<(SourceEnvelope, RelationDesc)> {
156 Ok(match &self {
157 UnplannedSourceEnvelope::None(key_envelope)
158 | UnplannedSourceEnvelope::Upsert {
159 style: UpsertStyle::Default(key_envelope),
160 ..
161 }
162 | UnplannedSourceEnvelope::Upsert {
163 style:
164 UpsertStyle::ValueErrInline {
165 key_envelope,
166 error_column: _,
167 },
168 } => {
169 let (key_arity, key_desc) = match key_desc {
170 Some(desc) if !desc.is_empty() => (Some(desc.arity()), Some(desc)),
171 _ => (None, None),
172 };
173
174 // Compute any key relation and key indices
175 let (key_desc, key) = match (key_desc, key_arity, key_envelope) {
176 (_, _, KeyEnvelope::None) => (None, None),
177 (Some(key_desc), Some(key_arity), KeyEnvelope::Flattened) => {
178 // Add the key columns as a key.
179 let key_indices: Vec<usize> = (0..key_arity).collect();
180 let key_desc = key_desc.with_key(key_indices.clone());
181 (Some(key_desc), Some(key_indices))
182 }
183 (Some(key_desc), Some(key_arity), KeyEnvelope::Named(key_name)) => {
184 let key_desc = {
185 // if the key has multiple objects, nest them as a record inside of a single name
186 if key_arity > 1 {
187 let key_type = key_desc.typ();
188 let key_as_record = SqlRelationType::new(vec![SqlColumnType {
189 nullable: false,
190 scalar_type: SqlScalarType::Record {
191 fields: key_desc
192 .iter_names()
193 .zip_eq(key_type.column_types.iter())
194 .map(|(name, ty)| (name.clone(), ty.clone()))
195 .collect(),
196 custom_id: None,
197 },
198 }]);
199
200 RelationDesc::new(key_as_record, [key_name.to_string()])
201 } else {
202 key_desc.with_names([key_name.to_string()])
203 }
204 };
205 let (key_desc, key) = match self {
206 UnplannedSourceEnvelope::None(_) => (key_desc, None),
207 // If we're applying the upsert logic the key column will be unique
208 UnplannedSourceEnvelope::Upsert { .. } => {
209 (key_desc.with_key(vec![0]), Some(vec![0]))
210 }
211 _ => unreachable!(),
212 };
213 (Some(key_desc), key)
214 }
215 (None, _, _) => (None, None),
216 (_, None, _) => (None, None),
217 };
218
219 let value_desc = compute_envelope_value_desc(&self, value_desc);
220 // Add value-related columns and metadata columns after any key columns.
221 let desc = match key_desc {
222 Some(key_desc) => key_desc.concat(value_desc).concat(metadata_desc),
223 None => value_desc.concat(metadata_desc),
224 };
225 (
226 self.into_source_envelope(key, key_arity, Some(desc.arity())),
227 desc,
228 )
229 }
230 UnplannedSourceEnvelope::Upsert {
231 style: UpsertStyle::Debezium { after_idx },
232 ..
233 } => match &value_desc.typ().column_types[*after_idx].scalar_type {
234 SqlScalarType::Record { fields, .. } => {
235 let mut desc = RelationDesc::from_names_and_types(fields.clone());
236 let key = key_desc.map(|k| match_key_indices(&k, &desc)).transpose()?;
237 if let Some(key) = key.clone() {
238 desc = desc.with_key(key);
239 }
240
241 let desc = match self {
242 UnplannedSourceEnvelope::Upsert { .. } => desc.concat(metadata_desc),
243 _ => desc,
244 };
245
246 (
247 self.into_source_envelope(key, None, Some(desc.arity())),
248 desc,
249 )
250 }
251 ty => bail!(
252 "Incorrect type for Debezium value, expected Record, got {:?}",
253 ty
254 ),
255 },
256 UnplannedSourceEnvelope::CdcV2 => {
257 // the correct types
258
259 // CdcV2 row data are in a record in a record in a list
260 match &value_desc.typ().column_types[0].scalar_type {
261 SqlScalarType::List { element_type, .. } => match &**element_type {
262 SqlScalarType::Record { fields, .. } => {
263 // TODO maybe check this by name
264 match &fields[0].1.scalar_type {
265 SqlScalarType::Record { fields, .. } => (
266 self.into_source_envelope(None, None, None),
267 RelationDesc::from_names_and_types(fields.clone()),
268 ),
269 ty => {
270 bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty)
271 }
272 }
273 }
274 ty => bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty),
275 },
276 ty => bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty),
277 }
278 }
279 })
280 }
281}
282
283/// Whether and how to include the decoded key of a stream in dataflows
284#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
285pub enum KeyEnvelope {
286 /// Never include the key in the output row
287 None,
288 /// For composite key encodings, pull the fields from the encoding into columns.
289 Flattened,
290 /// Always use the given name for the key.
291 ///
292 /// * For a single-field key, this means that the column will get the given name.
293 /// * For a multi-column key, the columns will get packed into a [`SqlScalarType::Record`], and
294 /// that Record will get the given name.
295 Named(String),
296}
297
298/// Compute the resulting value relation given the decoded value relation and the envelope
299/// style. If the ValueErrInline upsert style is used this will add an error column to the
300/// beginning of the relation and make all value fields nullable.
301fn compute_envelope_value_desc(
302 source_envelope: &UnplannedSourceEnvelope,
303 value_desc: RelationDesc,
304) -> RelationDesc {
305 match &source_envelope {
306 UnplannedSourceEnvelope::Upsert {
307 style:
308 UpsertStyle::ValueErrInline {
309 key_envelope: _,
310 error_column,
311 },
312 } => {
313 let mut names = Vec::with_capacity(value_desc.arity() + 1);
314 names.push(error_column.as_str().into());
315 names.extend(value_desc.iter_names().cloned());
316
317 let mut types = Vec::with_capacity(value_desc.arity() + 1);
318 types.push(SqlColumnType {
319 nullable: true,
320 scalar_type: SqlScalarType::Record {
321 fields: [(
322 "description".into(),
323 SqlColumnType {
324 nullable: true,
325 scalar_type: SqlScalarType::String,
326 },
327 )]
328 .into(),
329 custom_id: None,
330 },
331 });
332 types.extend(value_desc.iter_types().map(|t| t.clone().nullable(true)));
333 let relation_type = SqlRelationType::new(types);
334 RelationDesc::new(relation_type, names)
335 }
336 _ => value_desc,
337 }
338}