mz_mysql_util/decoding.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::fmt::Write;
11use std::str::FromStr;
12
13use mysql_common::value::convert::from_value_opt;
14use mysql_common::{Row as MySqlRow, Value};
15
16use mz_ore::cast::CastFrom;
17use mz_ore::error::ErrorExt;
18use mz_repr::adt::date::Date;
19use mz_repr::adt::jsonb::JsonbPacker;
20use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, Numeric, get_precision, get_scale};
21use mz_repr::adt::timestamp::CheckedTimestamp;
22use mz_repr::{Datum, Row, RowPacker, SqlScalarType};
23
24use crate::desc::MySqlColumnMeta;
25use crate::{MySqlColumnDesc, MySqlError, MySqlTableDesc};
26
27pub fn pack_mysql_row(
28 row_container: &mut Row,
29 row: MySqlRow,
30 table_desc: &MySqlTableDesc,
31 gtid_set: Option<&str>,
32 binlog_full_metadata: bool,
33) -> Result<Row, MySqlError> {
34 let mut packer = row_container.packer();
35
36 // For each column in `table_desc` (in descriptor order), resolve its wire
37 // index. With binlog_full_metadata=true, columns are matched by name so a reordered upstream
38 // still decodes correctly; without binlog_full_metadata, rows have no column names and must be
39 // matched positionally. A `None` here means the upstream row is missing this column and is
40 // only tolerated for ignored columns, and for binlog_full_metadata = false, is only tolerated
41 // for ignored columns at the end of the table.
42 for (i, col_desc) in table_desc.columns.iter().enumerate() {
43 if col_desc.column_type.is_none() {
44 // This column is ignored, so don't decode it.
45 continue;
46 }
47 let wire_idx = if !binlog_full_metadata {
48 // No column name metadata, so we match by index.
49 (i < row.len()).then_some(i)
50 } else {
51 // This means the row from the binlog has column name included in the metadata,
52 // so we can match on that instead of position.
53 row.columns_ref()
54 .iter()
55 .position(|wc| wc.name_str() == col_desc.name.as_str())
56 };
57
58 let wire_idx = match wire_idx {
59 Some(idx) => idx,
60 None => {
61 // We could not find a column in the incoming row that matches this descriptor column.
62 // This is an error as the column is not ignored (ignored columns have already been skipped).
63 return Err(decode_error(
64 "extra column description",
65 col_desc,
66 table_desc,
67 gtid_set,
68 &row,
69 ));
70 }
71 };
72 let value = row
73 .as_ref(wire_idx)
74 .expect("wire_idx resolved from row")
75 .clone();
76 if let Err(err) = pack_val_as_datum(value, col_desc, &mut packer) {
77 return Err(decode_error(
78 &err.to_string(),
79 col_desc,
80 table_desc,
81 gtid_set,
82 &row,
83 ));
84 }
85 }
86
87 Ok(row_container.clone())
88}
89
90/// Build a `ValueDecodeError`, logging the schema, table, column, source
91/// gtid_set (if any), and a shape description of `row` at the same time.
92/// The shape string is only built here — pack_mysql_row's happy path does no
93/// per-row allocation beyond what decoding requires.
94fn decode_error(
95 err_msg: &str,
96 col_desc: &MySqlColumnDesc,
97 table_desc: &MySqlTableDesc,
98 gtid_set: Option<&str>,
99 row: &MySqlRow,
100) -> MySqlError {
101 let row_shape = describe_row_shape(row, table_desc);
102 tracing::warn!(
103 "mysql decode error for `{}`.`{}` column `{}`: {}; gtid_set={:?}; row_shape={}",
104 table_desc.schema_name,
105 table_desc.name,
106 col_desc.name,
107 err_msg,
108 gtid_set,
109 row_shape,
110 );
111 MySqlError::ValueDecodeError {
112 column_name: col_desc.name.clone(),
113 qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name),
114 error: err_msg.to_string(),
115 }
116}
117
118/// Describes the structural shape of a row without revealing any data values.
119/// Iterates every wire column. For each, emits the wire name, the binlog
120/// wire type, the character-set id (or `binary`), a classification relative
121/// to `table_desc` (`expected=<scalar>` for active columns, `ignored` for
122/// columns excluded from the source, `extra` for upstream columns with no
123/// descriptor entry), and a value disposition (`null` or `bytes(len=N)` /
124/// primitive kind). Intended for diagnostic logging on decode errors: MySQL
125/// serializes CHAR, VARCHAR, TEXT, JSON, BLOB, etc. all as `Value::Bytes`,
126/// so the wire type tag and the expected scalar type are what distinguish
127/// them.
128fn describe_row_shape(row: &MySqlRow, table_desc: &MySqlTableDesc) -> String {
129 // Binlogs without full row metadata use positional "@N" names, so we
130 // have to match by wire position rather than by name.
131 let fallback_names = row
132 .columns_ref()
133 .first()
134 .is_some_and(|col| col.name_ref().starts_with(b"@"));
135
136 let mut out = String::new();
137 out.push('[');
138 for (i, wire_col) in row.columns_ref().iter().enumerate() {
139 if i > 0 {
140 out.push_str(", ");
141 }
142 let wire_name = wire_col.name_str();
143 let cs = wire_col.character_set();
144 // 63 = binary collation (binary/blob columns).
145 let cs_str = if cs == 63 {
146 "binary".to_string()
147 } else {
148 format!("charset={cs}")
149 };
150 let wire_type = format!("{:?}", wire_col.column_type());
151
152 let matched_col = if fallback_names {
153 table_desc.columns.get(i)
154 } else {
155 table_desc
156 .columns
157 .iter()
158 .find(|c| c.name.as_str() == wire_name)
159 };
160 let match_info = match matched_col {
161 Some(col) => match &col.column_type {
162 Some(ct) => format!("expected={:?}", ct.scalar_type),
163 None => "ignored".to_string(),
164 },
165 None => "extra".to_string(),
166 };
167
168 let val_desc = match row.as_ref(i) {
169 None => "absent".to_string(),
170 Some(Value::NULL) => "null".to_string(),
171 Some(Value::Bytes(b)) => format!("bytes(len={})", b.len()),
172 Some(Value::Int(_)) => "int".to_string(),
173 Some(Value::UInt(_)) => "uint".to_string(),
174 Some(Value::Float(_)) => "float".to_string(),
175 Some(Value::Double(_)) => "double".to_string(),
176 Some(Value::Date(..)) => "date".to_string(),
177 Some(Value::Time(..)) => "time".to_string(),
178 };
179
180 let _ = write!(
181 out,
182 "{{name={wire_name}, wire={wire_type}, {cs_str}, {match_info}, val={val_desc}}}"
183 );
184 }
185 out.push(']');
186 out
187}
188
189// TODO(guswynn|roshan): This function has various `.to_string()` and `format!` calls that should
190// use a shared allocation if possible.
191fn pack_val_as_datum(
192 value: Value,
193 col_desc: &MySqlColumnDesc,
194 packer: &mut RowPacker,
195) -> Result<(), anyhow::Error> {
196 let column_type = match col_desc.column_type {
197 Some(ref column_type) => column_type,
198 None => anyhow::bail!("column type is not set for column: {}", col_desc.name),
199 };
200 match value {
201 Value::NULL => {
202 if column_type.nullable {
203 packer.push(Datum::Null);
204 } else {
205 Err(anyhow::anyhow!(
206 "received a null value in a non-null column".to_string(),
207 ))?
208 }
209 }
210 value => match &column_type.scalar_type {
211 SqlScalarType::Bool => packer.push(Datum::from(from_value_opt::<bool>(value)?)),
212 SqlScalarType::UInt16 => packer.push(Datum::from(from_value_opt::<u16>(value)?)),
213 SqlScalarType::Int16 => packer.push(Datum::from(from_value_opt::<i16>(value)?)),
214 SqlScalarType::UInt32 => packer.push(Datum::from(from_value_opt::<u32>(value)?)),
215 SqlScalarType::Int32 => packer.push(Datum::from(from_value_opt::<i32>(value)?)),
216 SqlScalarType::UInt64 => {
217 if let Some(MySqlColumnMeta::Bit(precision)) = &col_desc.meta {
218 let mut value = from_value_opt::<Vec<u8>>(value)?;
219
220 // Ensure we have the correct number of bytes.
221 let precision_bytes = (precision + 7) / 8;
222 if value.len() != usize::cast_from(precision_bytes) {
223 return Err(anyhow::anyhow!("'bit' column out of range!"));
224 }
225 // Be defensive and prune any bits that come over the wire and are
226 // greater than our precision.
227 let bit_index = precision % 8;
228 if bit_index != 0 {
229 let mask = !(u8::MAX << bit_index);
230 if value.len() > 0 {
231 value[0] &= mask;
232 }
233 }
234
235 // Based on experimentation the value coming across the wire is
236 // encoded in big-endian.
237 let mut buf = [0u8; 8];
238 buf[(8 - value.len())..].copy_from_slice(value.as_slice());
239 let value = u64::from_be_bytes(buf);
240 packer.push(Datum::from(value))
241 } else {
242 packer.push(Datum::from(from_value_opt::<u64>(value)?))
243 }
244 }
245 SqlScalarType::Int64 => packer.push(Datum::from(from_value_opt::<i64>(value)?)),
246 SqlScalarType::Float32 => packer.push(Datum::from(from_value_opt::<f32>(value)?)),
247 SqlScalarType::Float64 => packer.push(Datum::from(from_value_opt::<f64>(value)?)),
248 SqlScalarType::Char { length } => {
249 let val = from_value_opt::<String>(value)?;
250 check_char_length(length.map(|l| l.into_u32()), &val, col_desc)?;
251 packer.push(Datum::String(&val));
252 }
253 SqlScalarType::VarChar { max_length } => {
254 let val = from_value_opt::<String>(value)?;
255 check_char_length(max_length.map(|l| l.into_u32()), &val, col_desc)?;
256 packer.push(Datum::String(&val));
257 }
258 SqlScalarType::String => {
259 // Special case for string types, since this is the scalar type used for a column
260 // specified as a 'TEXT COLUMN'. In some cases we need to check the column
261 // metadata to know if the upstream value needs special handling
262 match &col_desc.meta {
263 Some(MySqlColumnMeta::Enum(e)) => {
264 match value {
265 Value::Bytes(data) => {
266 let data = std::str::from_utf8(&data)?;
267 packer.push(Datum::String(data));
268 }
269 Value::Int(val) => {
270 let enum_int = usize::try_from(val)?;
271
272 // If mysql strict mode is disabled when an invalid entry is inserted
273 // then the entry will be replicated as a 0. Outside the 1 indexed enum.
274 // https://dev.mysql.com/doc/refman/8.4/en/enum.html#enum-indexes
275 if enum_int == 0 {
276 packer.push(Datum::String(""));
277 } else {
278 // Enum types are provided as 1-indexed integers in the replication
279 // stream, so we need to find the string value from the enum meta
280 let enum_val = e.values.get(enum_int - 1).ok_or_else(|| {
281 anyhow::anyhow!(
282 "received invalid enum value: {} for column {}",
283 val,
284 col_desc.name
285 )
286 })?;
287
288 packer.push(Datum::String(enum_val));
289 }
290 }
291 _ => Err(anyhow::anyhow!(
292 "received unexpected value for enum type: {:?}",
293 value
294 ))?,
295 }
296 }
297 Some(MySqlColumnMeta::Json) => {
298 // JSON types in a query response are encoded as a string with whitespace,
299 // but when parsed from the binlog event by mysql-common they are provided
300 // as an encoded string sans-whitespace.
301 if let Value::Bytes(data) = value {
302 let json = serde_json::from_slice::<serde_json::Value>(&data)?;
303 packer.push(Datum::String(&json.to_string()));
304 } else {
305 Err(anyhow::anyhow!(
306 "received unexpected value for json type: {:?}",
307 value
308 ))?;
309 }
310 }
311 Some(MySqlColumnMeta::Year) => {
312 let val = from_value_opt::<u16>(value)?;
313 packer.push(Datum::String(&val.to_string()));
314 }
315 Some(MySqlColumnMeta::Date) => {
316 // Some MySQL dates are invalid in chrono/NaiveDate (e.g. 0000-00-00), so
317 // we need to handle them directly as strings
318 if let Value::Date(y, m, d, 0, 0, 0, 0) = value {
319 packer.push(Datum::String(&format!("{:04}-{:02}-{:02}", y, m, d)));
320 } else {
321 Err(anyhow::anyhow!(
322 "received unexpected value for date type: {:?}",
323 value
324 ))?;
325 }
326 }
327 Some(MySqlColumnMeta::Timestamp(precision)) => {
328 // Some MySQL dates are invalid in chrono/NaiveDate (e.g. 0000-00-00), so
329 // we need to handle them directly as strings
330 if let Value::Date(y, m, d, h, mm, s, ms) = value {
331 if *precision > 0 {
332 let precision: usize = (*precision).try_into()?;
333 packer.push(Datum::String(&format!(
334 "{:04}-{:02}-{:02} {:02}:{:02}:{:02}.{:0precision$}",
335 y,
336 m,
337 d,
338 h,
339 mm,
340 s,
341 ms,
342 precision = precision
343 )));
344 } else {
345 packer.push(Datum::String(&format!(
346 "{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
347 y, m, d, h, mm, s
348 )));
349 }
350 } else {
351 Err(anyhow::anyhow!(
352 "received unexpected value for timestamp type: {:?}",
353 value
354 ))?;
355 }
356 }
357 Some(MySqlColumnMeta::Bit(_)) => unreachable!("parsed as a u64"),
358 None => {
359 packer.push(Datum::String(&from_value_opt::<String>(value)?));
360 }
361 }
362 }
363 SqlScalarType::Jsonb => {
364 if let Value::Bytes(data) = value {
365 let packer = JsonbPacker::new(packer);
366 // TODO(guswynn): This still produces and extract allocation (in the
367 // `DeserializeSeed` impl used internally), which should be improved,
368 // for all users of the APIs in that module.
369 packer.pack_slice(&data).map_err(|e| {
370 anyhow::anyhow!(
371 "Failed to decode JSON: {}",
372 // See if we can output the string that failed to be converted to JSON.
373 match std::str::from_utf8(&data) {
374 Ok(str) => str.to_string(),
375 // Otherwise produce the nominally helpful error.
376 Err(_) => e.display_with_causes().to_string(),
377 }
378 )
379 })?;
380 } else {
381 Err(anyhow::anyhow!(
382 "received unexpected value for json type: {:?}",
383 value
384 ))?
385 }
386 }
387 SqlScalarType::Bytes => {
388 let data = from_value_opt::<Vec<u8>>(value)?;
389 packer.push(Datum::Bytes(&data));
390 }
391 SqlScalarType::Date => {
392 let date = Date::try_from(from_value_opt::<chrono::NaiveDate>(value)?)?;
393 packer.push(Datum::from(date));
394 }
395 SqlScalarType::Timestamp { precision: _ } => {
396 // Timestamps are encoded as different mysql_common::Value types depending on
397 // whether they are from a binlog event or a query, and depending on which
398 // mysql timestamp version is used. We handle those cases here
399 // https://github.com/blackbeam/rust_mysql_common/blob/v0.31.0/src/binlog/value.rs#L87-L155
400 // https://github.com/blackbeam/rust_mysql_common/blob/v0.31.0/src/value/mod.rs#L332
401 let chrono_timestamp = match value {
402 Value::Date(..) => from_value_opt::<chrono::NaiveDateTime>(value)?,
403 // old temporal format from before MySQL 5.6; didn't support fractional seconds
404 Value::Int(val) => chrono::DateTime::from_timestamp(val, 0)
405 .ok_or_else(|| {
406 anyhow::anyhow!("received invalid timestamp value: {}", val)
407 })?
408 .naive_utc(),
409 Value::Bytes(data) => {
410 let data = std::str::from_utf8(&data)?;
411 if data.contains('.') {
412 chrono::NaiveDateTime::parse_from_str(data, "%s%.6f")?
413 } else {
414 chrono::NaiveDateTime::parse_from_str(data, "%s")?
415 }
416 }
417 _ => Err(anyhow::anyhow!(
418 "received unexpected value for timestamp type: {:?}",
419 value
420 ))?,
421 };
422 packer.push(Datum::try_from(CheckedTimestamp::try_from(
423 chrono_timestamp,
424 )?)?);
425 }
426 SqlScalarType::Time => {
427 packer.push(Datum::from(from_value_opt::<chrono::NaiveTime>(value)?));
428 }
429 SqlScalarType::Numeric { max_scale } => {
430 // The wire-format of numeric types is a string when sent in a binary query
431 // response but is represented in a decimal binary format when sent in a binlog
432 // event. However the mysql-common crate abstracts this away and always returns
433 // a string. We parse the string into a numeric type here.
434 let val = from_value_opt::<String>(value)?;
435 let val = Numeric::from_str(&val)?;
436 if get_precision(&val) > NUMERIC_DATUM_MAX_PRECISION.into() {
437 Err(anyhow::anyhow!(
438 "received numeric value with precision {} for column {} which has a max precision of {}",
439 get_precision(&val),
440 col_desc.name,
441 NUMERIC_DATUM_MAX_PRECISION
442 ))?
443 }
444 if let Some(max_scale) = max_scale {
445 if get_scale(&val) > max_scale.into_u8().into() {
446 Err(anyhow::anyhow!(
447 "received numeric value with scale {} for column {} which has a max scale of {}",
448 get_scale(&val),
449 col_desc.name,
450 max_scale.into_u8()
451 ))?
452 }
453 }
454 packer.push(Datum::from(val));
455 }
456 // TODO(roshan): IMPLEMENT OTHER TYPES
457 data_type => Err(anyhow::anyhow!(
458 "received unexpected value for type: {:?}: {:?}",
459 data_type,
460 value
461 ))?,
462 },
463 }
464 Ok(())
465}
466
467fn check_char_length(
468 length: Option<u32>,
469 val: &str,
470 col_desc: &MySqlColumnDesc,
471) -> Result<(), anyhow::Error> {
472 if let Some(length) = length {
473 if let Some(_) = val.char_indices().nth(usize::cast_from(length)) {
474 Err(anyhow::anyhow!(
475 "received string value of length {} for column {} which has a max length of {}",
476 val.len(),
477 col_desc.name,
478 length
479 ))?
480 }
481 }
482 Ok(())
483}