mz_mysql_util/
decoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::str::FromStr;
11
12use itertools::{EitherOrBoth, Itertools};
13use mysql_common::value::convert::from_value_opt;
14use mysql_common::{Row as MySqlRow, Value};
15
16use mz_ore::cast::CastFrom;
17use mz_ore::error::ErrorExt;
18use mz_repr::adt::date::Date;
19use mz_repr::adt::jsonb::JsonbPacker;
20use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, Numeric, get_precision, get_scale};
21use mz_repr::adt::timestamp::CheckedTimestamp;
22use mz_repr::{Datum, Row, RowPacker, SqlScalarType};
23
24use crate::desc::MySqlColumnMeta;
25use crate::{MySqlColumnDesc, MySqlError, MySqlTableDesc};
26
27pub fn pack_mysql_row(
28    row_container: &mut Row,
29    row: MySqlRow,
30    table_desc: &MySqlTableDesc,
31) -> Result<Row, MySqlError> {
32    let mut packer = row_container.packer();
33    let row_values = row.unwrap();
34
35    for values in table_desc.columns.iter().zip_longest(row_values) {
36        let (col_desc, value) = match values {
37            EitherOrBoth::Both(col_desc, value) => (col_desc, value),
38            EitherOrBoth::Left(col_desc) => {
39                tracing::error!(
40                    "mysql: extra column description {col_desc:?} for table {}",
41                    table_desc.name
42                );
43                Err(MySqlError::ValueDecodeError {
44                    column_name: col_desc.name.clone(),
45                    qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name),
46                    error: "extra column description".to_string(),
47                })?
48            }
49            EitherOrBoth::Right(_) => {
50                // If there are extra columns on the upstream table we can safely ignore them
51                break;
52            }
53        };
54        if col_desc.column_type.is_none() {
55            // This column is ignored, so don't decode it.
56            continue;
57        }
58        match pack_val_as_datum(value, col_desc, &mut packer) {
59            Err(err) => Err(MySqlError::ValueDecodeError {
60                column_name: col_desc.name.clone(),
61                qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name),
62                error: err.to_string(),
63            })?,
64            Ok(()) => (),
65        };
66    }
67
68    Ok(row_container.clone())
69}
70
71// TODO(guswynn|roshan): This function has various `.to_string()` and `format!` calls that should
72// use a shared allocation if possible.
73fn pack_val_as_datum(
74    value: Value,
75    col_desc: &MySqlColumnDesc,
76    packer: &mut RowPacker,
77) -> Result<(), anyhow::Error> {
78    let column_type = match col_desc.column_type {
79        Some(ref column_type) => column_type,
80        None => anyhow::bail!("column type is not set for column: {}", col_desc.name),
81    };
82    match value {
83        Value::NULL => {
84            if column_type.nullable {
85                packer.push(Datum::Null);
86            } else {
87                Err(anyhow::anyhow!(
88                    "received a null value in a non-null column".to_string(),
89                ))?
90            }
91        }
92        value => match &column_type.scalar_type {
93            SqlScalarType::Bool => packer.push(Datum::from(from_value_opt::<bool>(value)?)),
94            SqlScalarType::UInt16 => packer.push(Datum::from(from_value_opt::<u16>(value)?)),
95            SqlScalarType::Int16 => packer.push(Datum::from(from_value_opt::<i16>(value)?)),
96            SqlScalarType::UInt32 => packer.push(Datum::from(from_value_opt::<u32>(value)?)),
97            SqlScalarType::Int32 => packer.push(Datum::from(from_value_opt::<i32>(value)?)),
98            SqlScalarType::UInt64 => {
99                if let Some(MySqlColumnMeta::Bit(precision)) = &col_desc.meta {
100                    let mut value = from_value_opt::<Vec<u8>>(value)?;
101
102                    // Ensure we have the correct number of bytes.
103                    let precision_bytes = (precision + 7) / 8;
104                    if value.len() != usize::cast_from(precision_bytes) {
105                        return Err(anyhow::anyhow!("'bit' column out of range!"));
106                    }
107                    // Be defensive and prune any bits that come over the wire and are
108                    // greater than our precision.
109                    let bit_index = precision % 8;
110                    if bit_index != 0 {
111                        let mask = !(u8::MAX << bit_index);
112                        if value.len() > 0 {
113                            value[0] &= mask;
114                        }
115                    }
116
117                    // Based on experimentation the value coming across the wire is
118                    // encoded in big-endian.
119                    let mut buf = [0u8; 8];
120                    buf[(8 - value.len())..].copy_from_slice(value.as_slice());
121                    let value = u64::from_be_bytes(buf);
122                    packer.push(Datum::from(value))
123                } else {
124                    packer.push(Datum::from(from_value_opt::<u64>(value)?))
125                }
126            }
127            SqlScalarType::Int64 => packer.push(Datum::from(from_value_opt::<i64>(value)?)),
128            SqlScalarType::Float32 => packer.push(Datum::from(from_value_opt::<f32>(value)?)),
129            SqlScalarType::Float64 => packer.push(Datum::from(from_value_opt::<f64>(value)?)),
130            SqlScalarType::Char { length } => {
131                let val = from_value_opt::<String>(value)?;
132                check_char_length(length.map(|l| l.into_u32()), &val, col_desc)?;
133                packer.push(Datum::String(&val));
134            }
135            SqlScalarType::VarChar { max_length } => {
136                let val = from_value_opt::<String>(value)?;
137                check_char_length(max_length.map(|l| l.into_u32()), &val, col_desc)?;
138                packer.push(Datum::String(&val));
139            }
140            SqlScalarType::String => {
141                // Special case for string types, since this is the scalar type used for a column
142                // specified as a 'TEXT COLUMN'. In some cases we need to check the column
143                // metadata to know if the upstream value needs special handling
144                match &col_desc.meta {
145                    Some(MySqlColumnMeta::Enum(e)) => {
146                        match value {
147                            Value::Bytes(data) => {
148                                let data = std::str::from_utf8(&data)?;
149                                packer.push(Datum::String(data));
150                            }
151                            Value::Int(val) => {
152                                let enum_int = usize::try_from(val)?;
153
154                                // If mysql strict mode is disabled when an invalid entry is inserted
155                                // then the entry will be replicated as a 0. Outside the 1 indexed enum.
156                                // https://dev.mysql.com/doc/refman/8.4/en/enum.html#enum-indexes
157                                if enum_int == 0 {
158                                    packer.push(Datum::String(""));
159                                } else {
160                                    // Enum types are provided as 1-indexed integers in the replication
161                                    // stream, so we need to find the string value from the enum meta
162                                    let enum_val = e.values.get(enum_int - 1).ok_or_else(|| {
163                                        anyhow::anyhow!(
164                                            "received invalid enum value: {} for column {}",
165                                            val,
166                                            col_desc.name
167                                        )
168                                    })?;
169
170                                    packer.push(Datum::String(enum_val));
171                                }
172                            }
173                            _ => Err(anyhow::anyhow!(
174                                "received unexpected value for enum type: {:?}",
175                                value
176                            ))?,
177                        }
178                    }
179                    Some(MySqlColumnMeta::Json) => {
180                        // JSON types in a query response are encoded as a string with whitespace,
181                        // but when parsed from the binlog event by mysql-common they are provided
182                        // as an encoded string sans-whitespace.
183                        if let Value::Bytes(data) = value {
184                            let json = serde_json::from_slice::<serde_json::Value>(&data)?;
185                            packer.push(Datum::String(&json.to_string()));
186                        } else {
187                            Err(anyhow::anyhow!(
188                                "received unexpected value for json type: {:?}",
189                                value
190                            ))?;
191                        }
192                    }
193                    Some(MySqlColumnMeta::Year) => {
194                        let val = from_value_opt::<u16>(value)?;
195                        packer.push(Datum::String(&val.to_string()));
196                    }
197                    Some(MySqlColumnMeta::Date) => {
198                        // Some MySQL dates are invalid in chrono/NaiveDate (e.g. 0000-00-00), so
199                        // we need to handle them directly as strings
200                        if let Value::Date(y, m, d, 0, 0, 0, 0) = value {
201                            packer.push(Datum::String(&format!("{:04}-{:02}-{:02}", y, m, d)));
202                        } else {
203                            Err(anyhow::anyhow!(
204                                "received unexpected value for date type: {:?}",
205                                value
206                            ))?;
207                        }
208                    }
209                    Some(MySqlColumnMeta::Timestamp(precision)) => {
210                        // Some MySQL dates are invalid in chrono/NaiveDate (e.g. 0000-00-00), so
211                        // we need to handle them directly as strings
212                        if let Value::Date(y, m, d, h, mm, s, ms) = value {
213                            if *precision > 0 {
214                                let precision: usize = (*precision).try_into()?;
215                                packer.push(Datum::String(&format!(
216                                    "{:04}-{:02}-{:02} {:02}:{:02}:{:02}.{:0precision$}",
217                                    y,
218                                    m,
219                                    d,
220                                    h,
221                                    mm,
222                                    s,
223                                    ms,
224                                    precision = precision
225                                )));
226                            } else {
227                                packer.push(Datum::String(&format!(
228                                    "{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
229                                    y, m, d, h, mm, s
230                                )));
231                            }
232                        } else {
233                            Err(anyhow::anyhow!(
234                                "received unexpected value for timestamp type: {:?}",
235                                value
236                            ))?;
237                        }
238                    }
239                    Some(MySqlColumnMeta::Bit(_)) => unreachable!("parsed as a u64"),
240                    None => {
241                        packer.push(Datum::String(&from_value_opt::<String>(value)?));
242                    }
243                }
244            }
245            SqlScalarType::Jsonb => {
246                if let Value::Bytes(data) = value {
247                    let packer = JsonbPacker::new(packer);
248                    // TODO(guswynn): This still produces and extract allocation (in the
249                    // `DeserializeSeed` impl used internally), which should be improved,
250                    // for all users of the APIs in that module.
251                    packer.pack_slice(&data).map_err(|e| {
252                        anyhow::anyhow!(
253                            "Failed to decode JSON: {}",
254                            // See if we can output the string that failed to be converted to JSON.
255                            match std::str::from_utf8(&data) {
256                                Ok(str) => str.to_string(),
257                                // Otherwise produce the nominally helpful error.
258                                Err(_) => e.display_with_causes().to_string(),
259                            }
260                        )
261                    })?;
262                } else {
263                    Err(anyhow::anyhow!(
264                        "received unexpected value for json type: {:?}",
265                        value
266                    ))?
267                }
268            }
269            SqlScalarType::Bytes => {
270                let data = from_value_opt::<Vec<u8>>(value)?;
271                packer.push(Datum::Bytes(&data));
272            }
273            SqlScalarType::Date => {
274                let date = Date::try_from(from_value_opt::<chrono::NaiveDate>(value)?)?;
275                packer.push(Datum::from(date));
276            }
277            SqlScalarType::Timestamp { precision: _ } => {
278                // Timestamps are encoded as different mysql_common::Value types depending on
279                // whether they are from a binlog event or a query, and depending on which
280                // mysql timestamp version is used. We handle those cases here
281                // https://github.com/blackbeam/rust_mysql_common/blob/v0.31.0/src/binlog/value.rs#L87-L155
282                // https://github.com/blackbeam/rust_mysql_common/blob/v0.31.0/src/value/mod.rs#L332
283                let chrono_timestamp = match value {
284                    Value::Date(..) => from_value_opt::<chrono::NaiveDateTime>(value)?,
285                    // old temporal format from before MySQL 5.6; didn't support fractional seconds
286                    Value::Int(val) => chrono::DateTime::from_timestamp(val, 0)
287                        .ok_or_else(|| {
288                            anyhow::anyhow!("received invalid timestamp value: {}", val)
289                        })?
290                        .naive_utc(),
291                    Value::Bytes(data) => {
292                        let data = std::str::from_utf8(&data)?;
293                        if data.contains('.') {
294                            chrono::NaiveDateTime::parse_from_str(data, "%s%.6f")?
295                        } else {
296                            chrono::NaiveDateTime::parse_from_str(data, "%s")?
297                        }
298                    }
299                    _ => Err(anyhow::anyhow!(
300                        "received unexpected value for timestamp type: {:?}",
301                        value
302                    ))?,
303                };
304                packer.push(Datum::try_from(CheckedTimestamp::try_from(
305                    chrono_timestamp,
306                )?)?);
307            }
308            SqlScalarType::Time => {
309                packer.push(Datum::from(from_value_opt::<chrono::NaiveTime>(value)?));
310            }
311            SqlScalarType::Numeric { max_scale } => {
312                // The wire-format of numeric types is a string when sent in a binary query
313                // response but is represented in a decimal binary format when sent in a binlog
314                // event. However the mysql-common crate abstracts this away and always returns
315                // a string. We parse the string into a numeric type here.
316                let val = from_value_opt::<String>(value)?;
317                let val = Numeric::from_str(&val)?;
318                if get_precision(&val) > NUMERIC_DATUM_MAX_PRECISION.into() {
319                    Err(anyhow::anyhow!(
320                        "received numeric value with precision {} for column {} which has a max precision of {}",
321                        get_precision(&val),
322                        col_desc.name,
323                        NUMERIC_DATUM_MAX_PRECISION
324                    ))?
325                }
326                if let Some(max_scale) = max_scale {
327                    if get_scale(&val) > max_scale.into_u8().into() {
328                        Err(anyhow::anyhow!(
329                            "received numeric value with scale {} for column {} which has a max scale of {}",
330                            get_scale(&val),
331                            col_desc.name,
332                            max_scale.into_u8()
333                        ))?
334                    }
335                }
336                packer.push(Datum::from(val));
337            }
338            // TODO(roshan): IMPLEMENT OTHER TYPES
339            data_type => Err(anyhow::anyhow!(
340                "received unexpected value for type: {:?}: {:?}",
341                data_type,
342                value
343            ))?,
344        },
345    }
346    Ok(())
347}
348
349fn check_char_length(
350    length: Option<u32>,
351    val: &str,
352    col_desc: &MySqlColumnDesc,
353) -> Result<(), anyhow::Error> {
354    if let Some(length) = length {
355        if let Some(_) = val.char_indices().nth(usize::cast_from(length)) {
356            Err(anyhow::anyhow!(
357                "received string value of length {} for column {} which has a max length of {}",
358                val.len(),
359                col_desc.name,
360                length
361            ))?
362        }
363    }
364    Ok(())
365}