Skip to main content

mz_mysql_util/
decoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::fmt::Write;
11use std::str::FromStr;
12
13use mysql_common::value::convert::from_value_opt;
14use mysql_common::{Row as MySqlRow, Value};
15
16use mz_ore::cast::CastFrom;
17use mz_ore::error::ErrorExt;
18use mz_repr::adt::date::Date;
19use mz_repr::adt::jsonb::JsonbPacker;
20use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, Numeric, get_precision, get_scale};
21use mz_repr::adt::timestamp::CheckedTimestamp;
22use mz_repr::{Datum, Row, RowPacker, SqlScalarType};
23
24use crate::desc::MySqlColumnMeta;
25use crate::{MySqlColumnDesc, MySqlError, MySqlTableDesc};
26
27pub fn pack_mysql_row(
28    row_container: &mut Row,
29    row: MySqlRow,
30    table_desc: &MySqlTableDesc,
31    gtid_set: Option<&str>,
32    binlog_full_metadata: bool,
33) -> Result<Row, MySqlError> {
34    let mut packer = row_container.packer();
35
36    // For each column in `table_desc` (in descriptor order), resolve its wire
37    // index. With binlog_full_metadata=true, columns are matched by name so a reordered upstream
38    // still decodes correctly; without binlog_full_metadata, rows have no column names and must be
39    // matched positionally. A `None` here means the upstream row is missing this column and is
40    // only tolerated for ignored columns, and for binlog_full_metadata = false, is only tolerated
41    // for ignored columns at the end of the table.
42    for (i, col_desc) in table_desc.columns.iter().enumerate() {
43        if col_desc.column_type.is_none() {
44            // This column is ignored, so don't decode it.
45            continue;
46        }
47        let wire_idx = if !binlog_full_metadata {
48            // No column name metadata, so we match by index.
49            (i < row.len()).then_some(i)
50        } else {
51            // This means the row from the binlog has column name included in the metadata,
52            // so we can match on that instead of position.
53            row.columns_ref()
54                .iter()
55                .position(|wc| wc.name_str() == col_desc.name.as_str())
56        };
57
58        let wire_idx = match wire_idx {
59            Some(idx) => idx,
60            None => {
61                // We could not find a column in the incoming row that matches this descriptor column.
62                // This is an error as the column is not ignored (ignored columns have already been skipped).
63                return Err(decode_error(
64                    "extra column description",
65                    col_desc,
66                    table_desc,
67                    gtid_set,
68                    &row,
69                ));
70            }
71        };
72        let value = row
73            .as_ref(wire_idx)
74            .expect("wire_idx resolved from row")
75            .clone();
76        if let Err(err) = pack_val_as_datum(value, col_desc, &mut packer) {
77            return Err(decode_error(
78                &err.to_string(),
79                col_desc,
80                table_desc,
81                gtid_set,
82                &row,
83            ));
84        }
85    }
86
87    Ok(row_container.clone())
88}
89
90/// Build a `ValueDecodeError`, logging the schema, table, column, source
91/// gtid_set (if any), and a shape description of `row` at the same time.
92/// The shape string is only built here — pack_mysql_row's happy path does no
93/// per-row allocation beyond what decoding requires.
94fn decode_error(
95    err_msg: &str,
96    col_desc: &MySqlColumnDesc,
97    table_desc: &MySqlTableDesc,
98    gtid_set: Option<&str>,
99    row: &MySqlRow,
100) -> MySqlError {
101    let row_shape = describe_row_shape(row, table_desc);
102    tracing::warn!(
103        "mysql decode error for `{}`.`{}` column `{}`: {}; gtid_set={:?}; row_shape={}",
104        table_desc.schema_name,
105        table_desc.name,
106        col_desc.name,
107        err_msg,
108        gtid_set,
109        row_shape,
110    );
111    MySqlError::ValueDecodeError {
112        column_name: col_desc.name.clone(),
113        qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name),
114        error: err_msg.to_string(),
115    }
116}
117
118/// Describes the structural shape of a row without revealing any data values.
119/// Iterates every wire column. For each, emits the wire name, the binlog
120/// wire type, the character-set id (or `binary`), a classification relative
121/// to `table_desc` (`expected=<scalar>` for active columns, `ignored` for
122/// columns excluded from the source, `extra` for upstream columns with no
123/// descriptor entry), and a value disposition (`null` or `bytes(len=N)` /
124/// primitive kind). Intended for diagnostic logging on decode errors: MySQL
125/// serializes CHAR, VARCHAR, TEXT, JSON, BLOB, etc. all as `Value::Bytes`,
126/// so the wire type tag and the expected scalar type are what distinguish
127/// them.
128fn describe_row_shape(row: &MySqlRow, table_desc: &MySqlTableDesc) -> String {
129    // Binlogs without full row metadata use positional "@N" names, so we
130    // have to match by wire position rather than by name.
131    let fallback_names = row
132        .columns_ref()
133        .first()
134        .is_some_and(|col| col.name_ref().starts_with(b"@"));
135
136    let mut out = String::new();
137    out.push('[');
138    for (i, wire_col) in row.columns_ref().iter().enumerate() {
139        if i > 0 {
140            out.push_str(", ");
141        }
142        let wire_name = wire_col.name_str();
143        let cs = wire_col.character_set();
144        // 63 = binary collation (binary/blob columns).
145        let cs_str = if cs == 63 {
146            "binary".to_string()
147        } else {
148            format!("charset={cs}")
149        };
150        let wire_type = format!("{:?}", wire_col.column_type());
151
152        let matched_col = if fallback_names {
153            table_desc.columns.get(i)
154        } else {
155            table_desc
156                .columns
157                .iter()
158                .find(|c| c.name.as_str() == wire_name)
159        };
160        let match_info = match matched_col {
161            Some(col) => match &col.column_type {
162                Some(ct) => format!("expected={:?}", ct.scalar_type),
163                None => "ignored".to_string(),
164            },
165            None => "extra".to_string(),
166        };
167
168        let val_desc = match row.as_ref(i) {
169            None => "absent".to_string(),
170            Some(Value::NULL) => "null".to_string(),
171            Some(Value::Bytes(b)) => format!("bytes(len={})", b.len()),
172            Some(Value::Int(_)) => "int".to_string(),
173            Some(Value::UInt(_)) => "uint".to_string(),
174            Some(Value::Float(_)) => "float".to_string(),
175            Some(Value::Double(_)) => "double".to_string(),
176            Some(Value::Date(..)) => "date".to_string(),
177            Some(Value::Time(..)) => "time".to_string(),
178        };
179
180        let _ = write!(
181            out,
182            "{{name={wire_name}, wire={wire_type}, {cs_str}, {match_info}, val={val_desc}}}"
183        );
184    }
185    out.push(']');
186    out
187}
188
189// TODO(guswynn|roshan): This function has various `.to_string()` and `format!` calls that should
190// use a shared allocation if possible.
191fn pack_val_as_datum(
192    value: Value,
193    col_desc: &MySqlColumnDesc,
194    packer: &mut RowPacker,
195) -> Result<(), anyhow::Error> {
196    let column_type = match col_desc.column_type {
197        Some(ref column_type) => column_type,
198        None => anyhow::bail!("column type is not set for column: {}", col_desc.name),
199    };
200    match value {
201        Value::NULL => {
202            if column_type.nullable {
203                packer.push(Datum::Null);
204            } else {
205                Err(anyhow::anyhow!(
206                    "received a null value in a non-null column".to_string(),
207                ))?
208            }
209        }
210        value => match &column_type.scalar_type {
211            SqlScalarType::Bool => packer.push(Datum::from(from_value_opt::<bool>(value)?)),
212            SqlScalarType::UInt16 => packer.push(Datum::from(from_value_opt::<u16>(value)?)),
213            SqlScalarType::Int16 => packer.push(Datum::from(from_value_opt::<i16>(value)?)),
214            SqlScalarType::UInt32 => packer.push(Datum::from(from_value_opt::<u32>(value)?)),
215            SqlScalarType::Int32 => packer.push(Datum::from(from_value_opt::<i32>(value)?)),
216            SqlScalarType::UInt64 => {
217                if let Some(MySqlColumnMeta::Bit(precision)) = &col_desc.meta {
218                    let mut value = from_value_opt::<Vec<u8>>(value)?;
219
220                    // Ensure we have the correct number of bytes.
221                    let precision_bytes = (precision + 7) / 8;
222                    if value.len() != usize::cast_from(precision_bytes) {
223                        return Err(anyhow::anyhow!("'bit' column out of range!"));
224                    }
225                    // Be defensive and prune any bits that come over the wire and are
226                    // greater than our precision.
227                    let bit_index = precision % 8;
228                    if bit_index != 0 {
229                        let mask = !(u8::MAX << bit_index);
230                        if value.len() > 0 {
231                            value[0] &= mask;
232                        }
233                    }
234
235                    // Based on experimentation the value coming across the wire is
236                    // encoded in big-endian.
237                    let mut buf = [0u8; 8];
238                    buf[(8 - value.len())..].copy_from_slice(value.as_slice());
239                    let value = u64::from_be_bytes(buf);
240                    packer.push(Datum::from(value))
241                } else {
242                    packer.push(Datum::from(from_value_opt::<u64>(value)?))
243                }
244            }
245            SqlScalarType::Int64 => packer.push(Datum::from(from_value_opt::<i64>(value)?)),
246            SqlScalarType::Float32 => packer.push(Datum::from(from_value_opt::<f32>(value)?)),
247            SqlScalarType::Float64 => packer.push(Datum::from(from_value_opt::<f64>(value)?)),
248            SqlScalarType::Char { length } => {
249                let val = from_value_opt::<String>(value)?;
250                check_char_length(length.map(|l| l.into_u32()), &val, col_desc)?;
251                packer.push(Datum::String(&val));
252            }
253            SqlScalarType::VarChar { max_length } => {
254                let val = from_value_opt::<String>(value)?;
255                check_char_length(max_length.map(|l| l.into_u32()), &val, col_desc)?;
256                packer.push(Datum::String(&val));
257            }
258            SqlScalarType::String => {
259                // Special case for string types, since this is the scalar type used for a column
260                // specified as a 'TEXT COLUMN'. In some cases we need to check the column
261                // metadata to know if the upstream value needs special handling
262                match &col_desc.meta {
263                    Some(MySqlColumnMeta::Enum(e)) => {
264                        match value {
265                            Value::Bytes(data) => {
266                                let data = std::str::from_utf8(&data)?;
267                                packer.push(Datum::String(data));
268                            }
269                            Value::Int(val) => {
270                                let enum_int = usize::try_from(val)?;
271
272                                // If mysql strict mode is disabled when an invalid entry is inserted
273                                // then the entry will be replicated as a 0. Outside the 1 indexed enum.
274                                // https://dev.mysql.com/doc/refman/8.4/en/enum.html#enum-indexes
275                                if enum_int == 0 {
276                                    packer.push(Datum::String(""));
277                                } else {
278                                    // Enum types are provided as 1-indexed integers in the replication
279                                    // stream, so we need to find the string value from the enum meta
280                                    let enum_val = e.values.get(enum_int - 1).ok_or_else(|| {
281                                        anyhow::anyhow!(
282                                            "received invalid enum value: {} for column {}",
283                                            val,
284                                            col_desc.name
285                                        )
286                                    })?;
287
288                                    packer.push(Datum::String(enum_val));
289                                }
290                            }
291                            _ => Err(anyhow::anyhow!(
292                                "received unexpected value for enum type: {:?}",
293                                value
294                            ))?,
295                        }
296                    }
297                    Some(MySqlColumnMeta::Json) => {
298                        // JSON types in a query response are encoded as a string with whitespace,
299                        // but when parsed from the binlog event by mysql-common they are provided
300                        // as an encoded string sans-whitespace.
301                        if let Value::Bytes(data) = value {
302                            let json = serde_json::from_slice::<serde_json::Value>(&data)?;
303                            packer.push(Datum::String(&json.to_string()));
304                        } else {
305                            Err(anyhow::anyhow!(
306                                "received unexpected value for json type: {:?}",
307                                value
308                            ))?;
309                        }
310                    }
311                    Some(MySqlColumnMeta::Year) => {
312                        let val = from_value_opt::<u16>(value)?;
313                        packer.push(Datum::String(&val.to_string()));
314                    }
315                    Some(MySqlColumnMeta::Date) => {
316                        // Some MySQL dates are invalid in chrono/NaiveDate (e.g. 0000-00-00), so
317                        // we need to handle them directly as strings
318                        if let Value::Date(y, m, d, 0, 0, 0, 0) = value {
319                            packer.push(Datum::String(&format!("{:04}-{:02}-{:02}", y, m, d)));
320                        } else {
321                            Err(anyhow::anyhow!(
322                                "received unexpected value for date type: {:?}",
323                                value
324                            ))?;
325                        }
326                    }
327                    Some(MySqlColumnMeta::Timestamp(precision)) => {
328                        // Some MySQL dates are invalid in chrono/NaiveDate (e.g. 0000-00-00), so
329                        // we need to handle them directly as strings
330                        if let Value::Date(y, m, d, h, mm, s, ms) = value {
331                            if *precision > 0 {
332                                let precision: usize = (*precision).try_into()?;
333                                packer.push(Datum::String(&format!(
334                                    "{:04}-{:02}-{:02} {:02}:{:02}:{:02}.{:0precision$}",
335                                    y,
336                                    m,
337                                    d,
338                                    h,
339                                    mm,
340                                    s,
341                                    ms,
342                                    precision = precision
343                                )));
344                            } else {
345                                packer.push(Datum::String(&format!(
346                                    "{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
347                                    y, m, d, h, mm, s
348                                )));
349                            }
350                        } else {
351                            Err(anyhow::anyhow!(
352                                "received unexpected value for timestamp type: {:?}",
353                                value
354                            ))?;
355                        }
356                    }
357                    Some(MySqlColumnMeta::Bit(_)) => unreachable!("parsed as a u64"),
358                    None => {
359                        packer.push(Datum::String(&from_value_opt::<String>(value)?));
360                    }
361                }
362            }
363            SqlScalarType::Jsonb => {
364                if let Value::Bytes(data) = value {
365                    let packer = JsonbPacker::new(packer);
366                    // TODO(guswynn): This still produces and extract allocation (in the
367                    // `DeserializeSeed` impl used internally), which should be improved,
368                    // for all users of the APIs in that module.
369                    packer.pack_slice(&data).map_err(|e| {
370                        anyhow::anyhow!(
371                            "Failed to decode JSON: {}",
372                            // See if we can output the string that failed to be converted to JSON.
373                            match std::str::from_utf8(&data) {
374                                Ok(str) => str.to_string(),
375                                // Otherwise produce the nominally helpful error.
376                                Err(_) => e.display_with_causes().to_string(),
377                            }
378                        )
379                    })?;
380                } else {
381                    Err(anyhow::anyhow!(
382                        "received unexpected value for json type: {:?}",
383                        value
384                    ))?
385                }
386            }
387            SqlScalarType::Bytes => {
388                let data = from_value_opt::<Vec<u8>>(value)?;
389                packer.push(Datum::Bytes(&data));
390            }
391            SqlScalarType::Date => {
392                let date = Date::try_from(from_value_opt::<chrono::NaiveDate>(value)?)?;
393                packer.push(Datum::from(date));
394            }
395            SqlScalarType::Timestamp { precision: _ } => {
396                // Timestamps are encoded as different mysql_common::Value types depending on
397                // whether they are from a binlog event or a query, and depending on which
398                // mysql timestamp version is used. We handle those cases here
399                // https://github.com/blackbeam/rust_mysql_common/blob/v0.31.0/src/binlog/value.rs#L87-L155
400                // https://github.com/blackbeam/rust_mysql_common/blob/v0.31.0/src/value/mod.rs#L332
401                let chrono_timestamp = match value {
402                    Value::Date(..) => from_value_opt::<chrono::NaiveDateTime>(value)?,
403                    // old temporal format from before MySQL 5.6; didn't support fractional seconds
404                    Value::Int(val) => chrono::DateTime::from_timestamp(val, 0)
405                        .ok_or_else(|| {
406                            anyhow::anyhow!("received invalid timestamp value: {}", val)
407                        })?
408                        .naive_utc(),
409                    Value::Bytes(data) => {
410                        let data = std::str::from_utf8(&data)?;
411                        if data.contains('.') {
412                            chrono::NaiveDateTime::parse_from_str(data, "%s%.6f")?
413                        } else {
414                            chrono::NaiveDateTime::parse_from_str(data, "%s")?
415                        }
416                    }
417                    _ => Err(anyhow::anyhow!(
418                        "received unexpected value for timestamp type: {:?}",
419                        value
420                    ))?,
421                };
422                packer.push(Datum::try_from(CheckedTimestamp::try_from(
423                    chrono_timestamp,
424                )?)?);
425            }
426            SqlScalarType::Time => {
427                packer.push(Datum::from(from_value_opt::<chrono::NaiveTime>(value)?));
428            }
429            SqlScalarType::Numeric { max_scale } => {
430                // The wire-format of numeric types is a string when sent in a binary query
431                // response but is represented in a decimal binary format when sent in a binlog
432                // event. However the mysql-common crate abstracts this away and always returns
433                // a string. We parse the string into a numeric type here.
434                let val = from_value_opt::<String>(value)?;
435                let val = Numeric::from_str(&val)?;
436                if get_precision(&val) > NUMERIC_DATUM_MAX_PRECISION.into() {
437                    Err(anyhow::anyhow!(
438                        "received numeric value with precision {} for column {} which has a max precision of {}",
439                        get_precision(&val),
440                        col_desc.name,
441                        NUMERIC_DATUM_MAX_PRECISION
442                    ))?
443                }
444                if let Some(max_scale) = max_scale {
445                    if get_scale(&val) > max_scale.into_u8().into() {
446                        Err(anyhow::anyhow!(
447                            "received numeric value with scale {} for column {} which has a max scale of {}",
448                            get_scale(&val),
449                            col_desc.name,
450                            max_scale.into_u8()
451                        ))?
452                    }
453                }
454                packer.push(Datum::from(val));
455            }
456            // TODO(roshan): IMPLEMENT OTHER TYPES
457            data_type => Err(anyhow::anyhow!(
458                "received unexpected value for type: {:?}: {:?}",
459                data_type,
460                value
461            ))?,
462        },
463    }
464    Ok(())
465}
466
467fn check_char_length(
468    length: Option<u32>,
469    val: &str,
470    col_desc: &MySqlColumnDesc,
471) -> Result<(), anyhow::Error> {
472    if let Some(length) = length {
473        if let Some(_) = val.char_indices().nth(usize::cast_from(length)) {
474            Err(anyhow::anyhow!(
475                "received string value of length {} for column {} which has a max length of {}",
476                val.len(),
477                col_desc.name,
478                length
479            ))?
480        }
481    }
482    Ok(())
483}