Skip to main content

mz_mysql_util/
decoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::fmt::Write;
11use std::str::FromStr;
12
13use mysql_common::value::convert::from_value_opt;
14use mysql_common::{Row as MySqlRow, Value};
15
16use mz_ore::cast::CastFrom;
17use mz_ore::error::ErrorExt;
18use mz_repr::adt::date::Date;
19use mz_repr::adt::jsonb::JsonbPacker;
20use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, Numeric, get_precision, get_scale};
21use mz_repr::adt::timestamp::CheckedTimestamp;
22use mz_repr::{Datum, Row, RowPacker, SqlScalarType};
23
24use crate::desc::MySqlColumnMeta;
25use crate::{MySqlColumnDesc, MySqlError, MySqlTableDesc};
26
27/// Canonical text for the MySQL zero-date sentinel ('0000-00-00 00:00:00').
28/// In binlog MYSQL_TYPE_DATETIME/MYSQL_TYPE_DATETIME2 encodings, sec=0 *cannot* represent unix
29/// epoch 0.  The TIMESTAMP type's supported range starts at '1970-01-01 00:00:01' UTC
30/// (<https://dev.mysql.com/doc/refman/8.0/en/datetime.html>), so any sec=0 is
31/// unambiguously this sentinel.
32const MYSQL_ZERO_TIMESTAMP: &str = "0000-00-00 00:00:00";
33
34/// Maximum fractional-seconds precision MySQL accepts for DATETIME(p) and
35/// TIMESTAMP(p) — values are stored in microseconds, so 6 digits is the
36/// upper bound (<https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html>).
37const MYSQL_MAX_FRACTIONAL_PRECISION: u32 = 6;
38
39/// Format the zero-date sentinel for a column with the given fractional
40/// precision (matches the Date arm's `{:0precision$}` behavior).
41fn mysql_zero_timestamp(precision: u32) -> String {
42    if precision > 0 {
43        format!(
44            "{}.{}",
45            MYSQL_ZERO_TIMESTAMP,
46            "0".repeat(usize::cast_from(precision))
47        )
48    } else {
49        MYSQL_ZERO_TIMESTAMP.to_string()
50    }
51}
52
53/// Format MySQL DATETIME/TIMESTAMP components as `YYYY-MM-DD HH:MM:SS[.ffff]`.
54/// `micros` is the raw microseconds (0..1_000_000); only the leading
55/// `precision` digits are kept, matching MySQL's DATETIME(p)/TIMESTAMP(p)
56/// display.
57fn format_mysql_timestamp(
58    y: u16,
59    m: u8,
60    d: u8,
61    hr: u8,
62    min: u8,
63    sec: u8,
64    micros: u32,
65    precision: u32,
66) -> String {
67    if precision == 0 {
68        return format!("{y:04}-{m:02}-{d:02} {hr:02}:{min:02}:{sec:02}");
69    }
70    // Clamp defensively: MySQL itself rejects precision > 6, but upstream
71    // metadata is untrusted and a larger value would make `pow()` below
72    // overflow its u32 exponent.
73    let p = precision.min(MYSQL_MAX_FRACTIONAL_PRECISION);
74    let scaled = micros / 10u32.pow(MYSQL_MAX_FRACTIONAL_PRECISION - p);
75    let width = usize::cast_from(p);
76    format!("{y:04}-{m:02}-{d:02} {hr:02}:{min:02}:{sec:02}.{scaled:0width$}")
77}
78
79pub fn pack_mysql_row(
80    row_container: &mut Row,
81    row: MySqlRow,
82    table_desc: &MySqlTableDesc,
83    gtid_set: Option<&str>,
84    binlog_full_metadata: bool,
85) -> Result<Row, MySqlError> {
86    let mut packer = row_container.packer();
87
88    // For each column in `table_desc` (in descriptor order), resolve its wire
89    // index. With binlog_full_metadata=true, columns are matched by name so a reordered upstream
90    // still decodes correctly; without binlog_full_metadata, rows have no column names and must be
91    // matched positionally. A `None` here means the upstream row is missing this column and is
92    // only tolerated for ignored columns, and for binlog_full_metadata = false, is only tolerated
93    // for ignored columns at the end of the table.
94    for (i, col_desc) in table_desc.columns.iter().enumerate() {
95        if col_desc.column_type.is_none() {
96            // This column is ignored, so don't decode it.
97            continue;
98        }
99        let wire_idx = if !binlog_full_metadata {
100            // No column name metadata, so we match by index.
101            (i < row.len()).then_some(i)
102        } else {
103            // This means the row from the binlog has column name included in the metadata,
104            // so we can match on that instead of position.
105            row.columns_ref()
106                .iter()
107                .position(|wc| wc.name_str() == col_desc.name.as_str())
108        };
109
110        let wire_idx = match wire_idx {
111            Some(idx) => idx,
112            None => {
113                // We could not find a column in the incoming row that matches this descriptor column.
114                // This is an error as the column is not ignored (ignored columns have already been skipped).
115                return Err(decode_error(
116                    "extra column description",
117                    col_desc,
118                    table_desc,
119                    gtid_set,
120                    &row,
121                ));
122            }
123        };
124        let value = row
125            .as_ref(wire_idx)
126            .expect("wire_idx resolved from row")
127            .clone();
128        if let Err(err) = pack_val_as_datum(value, col_desc, &mut packer) {
129            return Err(decode_error(
130                &err.to_string(),
131                col_desc,
132                table_desc,
133                gtid_set,
134                &row,
135            ));
136        }
137    }
138
139    Ok(row_container.clone())
140}
141
142/// Build a `ValueDecodeError`, logging the schema, table, column, source
143/// gtid_set (if any), and a shape description of `row` at the same time.
144/// The shape string is only built here — pack_mysql_row's happy path does no
145/// per-row allocation beyond what decoding requires.
146fn decode_error(
147    err_msg: &str,
148    col_desc: &MySqlColumnDesc,
149    table_desc: &MySqlTableDesc,
150    gtid_set: Option<&str>,
151    row: &MySqlRow,
152) -> MySqlError {
153    let row_shape = describe_row_shape(row, table_desc);
154    tracing::warn!(
155        "mysql decode error for `{}`.`{}` column `{}`: {}; gtid_set={:?}; row_shape={}",
156        table_desc.schema_name,
157        table_desc.name,
158        col_desc.name,
159        err_msg,
160        gtid_set,
161        row_shape,
162    );
163    MySqlError::ValueDecodeError {
164        column_name: col_desc.name.clone(),
165        qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name),
166        error: err_msg.to_string(),
167    }
168}
169
170/// Describes the structural shape of a row without revealing any data values.
171/// Iterates every wire column. For each, emits the wire name, the binlog
172/// wire type, the character-set id (or `binary`), a classification relative
173/// to `table_desc` (`expected=<scalar>` for active columns, `ignored` for
174/// columns excluded from the source, `extra` for upstream columns with no
175/// descriptor entry), and a value disposition (`null` or `bytes(len=N)` /
176/// primitive kind). Intended for diagnostic logging on decode errors: MySQL
177/// serializes CHAR, VARCHAR, TEXT, JSON, BLOB, etc. all as `Value::Bytes`,
178/// so the wire type tag and the expected scalar type are what distinguish
179/// them.
180fn describe_row_shape(row: &MySqlRow, table_desc: &MySqlTableDesc) -> String {
181    // Binlogs without full row metadata use positional "@N" names, so we
182    // have to match by wire position rather than by name.
183    let fallback_names = row
184        .columns_ref()
185        .first()
186        .is_some_and(|col| col.name_ref().starts_with(b"@"));
187
188    let mut out = String::new();
189    out.push('[');
190    for (i, wire_col) in row.columns_ref().iter().enumerate() {
191        if i > 0 {
192            out.push_str(", ");
193        }
194        let wire_name = wire_col.name_str();
195        let cs = wire_col.character_set();
196        // 63 = binary collation (binary/blob columns).
197        let cs_str = if cs == 63 {
198            "binary".to_string()
199        } else {
200            format!("charset={cs}")
201        };
202        let wire_type = format!("{:?}", wire_col.column_type());
203
204        let matched_col = if fallback_names {
205            table_desc.columns.get(i)
206        } else {
207            table_desc
208                .columns
209                .iter()
210                .find(|c| c.name.as_str() == wire_name)
211        };
212        let match_info = match matched_col {
213            Some(col) => match &col.column_type {
214                Some(ct) => format!("expected={:?}", ct.scalar_type),
215                None => "ignored".to_string(),
216            },
217            None => "extra".to_string(),
218        };
219
220        let val_desc = match row.as_ref(i) {
221            None => "absent".to_string(),
222            Some(Value::NULL) => "null".to_string(),
223            Some(Value::Bytes(b)) => format!("bytes(len={})", b.len()),
224            Some(Value::Int(_)) => "int".to_string(),
225            Some(Value::UInt(_)) => "uint".to_string(),
226            Some(Value::Float(_)) => "float".to_string(),
227            Some(Value::Double(_)) => "double".to_string(),
228            Some(Value::Date(..)) => "date".to_string(),
229            Some(Value::Time(..)) => "time".to_string(),
230        };
231
232        let _ = write!(
233            out,
234            "{{name={wire_name}, wire={wire_type}, {cs_str}, {match_info}, val={val_desc}}}"
235        );
236    }
237    out.push(']');
238    out
239}
240
241// TODO(guswynn|roshan): This function has various `.to_string()` and `format!` calls that should
242// use a shared allocation if possible.
243fn pack_val_as_datum(
244    value: Value,
245    col_desc: &MySqlColumnDesc,
246    packer: &mut RowPacker,
247) -> Result<(), anyhow::Error> {
248    let column_type = match col_desc.column_type {
249        Some(ref column_type) => column_type,
250        None => anyhow::bail!("column type is not set for column: {}", col_desc.name),
251    };
252    match value {
253        Value::NULL => {
254            if column_type.nullable {
255                packer.push(Datum::Null);
256            } else {
257                Err(anyhow::anyhow!(
258                    "received a null value in a non-null column".to_string(),
259                ))?
260            }
261        }
262        value => match &column_type.scalar_type {
263            SqlScalarType::Bool => packer.push(Datum::from(from_value_opt::<bool>(value)?)),
264            SqlScalarType::UInt16 => packer.push(Datum::from(from_value_opt::<u16>(value)?)),
265            SqlScalarType::Int16 => packer.push(Datum::from(from_value_opt::<i16>(value)?)),
266            SqlScalarType::UInt32 => packer.push(Datum::from(from_value_opt::<u32>(value)?)),
267            SqlScalarType::Int32 => packer.push(Datum::from(from_value_opt::<i32>(value)?)),
268            SqlScalarType::UInt64 => {
269                if let Some(MySqlColumnMeta::Bit(precision)) = &col_desc.meta {
270                    let mut value = from_value_opt::<Vec<u8>>(value)?;
271
272                    // Ensure we have the correct number of bytes.
273                    let precision_bytes = (precision + 7) / 8;
274                    if value.len() != usize::cast_from(precision_bytes) {
275                        return Err(anyhow::anyhow!("'bit' column out of range!"));
276                    }
277                    // Be defensive and prune any bits that come over the wire and are
278                    // greater than our precision.
279                    let bit_index = precision % 8;
280                    if bit_index != 0 {
281                        let mask = !(u8::MAX << bit_index);
282                        if value.len() > 0 {
283                            value[0] &= mask;
284                        }
285                    }
286
287                    // Based on experimentation the value coming across the wire is
288                    // encoded in big-endian.
289                    let mut buf = [0u8; 8];
290                    buf[(8 - value.len())..].copy_from_slice(value.as_slice());
291                    let value = u64::from_be_bytes(buf);
292                    packer.push(Datum::from(value))
293                } else {
294                    packer.push(Datum::from(from_value_opt::<u64>(value)?))
295                }
296            }
297            SqlScalarType::Int64 => packer.push(Datum::from(from_value_opt::<i64>(value)?)),
298            SqlScalarType::Float32 => packer.push(Datum::from(from_value_opt::<f32>(value)?)),
299            SqlScalarType::Float64 => packer.push(Datum::from(from_value_opt::<f64>(value)?)),
300            SqlScalarType::Char { length } => {
301                let val = from_value_opt::<String>(value)?;
302                check_char_length(length.map(|l| l.into_u32()), &val, col_desc)?;
303                packer.push(Datum::String(&val));
304            }
305            SqlScalarType::VarChar { max_length } => {
306                let val = from_value_opt::<String>(value)?;
307                check_char_length(max_length.map(|l| l.into_u32()), &val, col_desc)?;
308                packer.push(Datum::String(&val));
309            }
310            SqlScalarType::String => {
311                // Special case for string types, since this is the scalar type used for a column
312                // specified as a 'TEXT COLUMN'. In some cases we need to check the column
313                // metadata to know if the upstream value needs special handling
314                match &col_desc.meta {
315                    Some(MySqlColumnMeta::Enum(e)) => {
316                        match value {
317                            Value::Bytes(data) => {
318                                let data = std::str::from_utf8(&data)?;
319                                packer.push(Datum::String(data));
320                            }
321                            Value::Int(val) => {
322                                let enum_int = usize::try_from(val)?;
323
324                                // If mysql strict mode is disabled when an invalid entry is inserted
325                                // then the entry will be replicated as a 0. Outside the 1 indexed enum.
326                                // https://dev.mysql.com/doc/refman/8.4/en/enum.html#enum-indexes
327                                if enum_int == 0 {
328                                    packer.push(Datum::String(""));
329                                } else {
330                                    // Enum types are provided as 1-indexed integers in the replication
331                                    // stream, so we need to find the string value from the enum meta
332                                    let enum_val = e.values.get(enum_int - 1).ok_or_else(|| {
333                                        anyhow::anyhow!(
334                                            "received invalid enum value: {} for column {}",
335                                            val,
336                                            col_desc.name
337                                        )
338                                    })?;
339
340                                    packer.push(Datum::String(enum_val));
341                                }
342                            }
343                            _ => Err(anyhow::anyhow!(
344                                "received unexpected value for enum type: {:?}",
345                                value
346                            ))?,
347                        }
348                    }
349                    Some(MySqlColumnMeta::Json) => {
350                        // JSON types in a query response are encoded as a string with whitespace,
351                        // but when parsed from the binlog event by mysql-common they are provided
352                        // as an encoded string sans-whitespace.
353                        if let Value::Bytes(data) = value {
354                            let json = serde_json::from_slice::<serde_json::Value>(&data)?;
355                            packer.push(Datum::String(&json.to_string()));
356                        } else {
357                            Err(anyhow::anyhow!(
358                                "received unexpected value for json type: {:?}",
359                                value
360                            ))?;
361                        }
362                    }
363                    Some(MySqlColumnMeta::Year) => {
364                        let mut val = from_value_opt::<u16>(value)?;
365                        // mysql_common incorrectly handles MySQL YEAR type, which has a valid range
366                        // of 1901-2155 (https://dev.mysql.com/doc/refman/8.0/en/year.html)
367                        //
368                        // We treat the value 1900 as the zero-value year - "0000"
369                        // https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L124-L129
370                        if val == 1900 {
371                            val = 0;
372                        }
373                        packer.push(Datum::String(&format!("{val:04}")));
374                    }
375                    Some(MySqlColumnMeta::Date) => {
376                        // Some MySQL dates are invalid in chrono/NaiveDate (e.g. 0000-00-00), so
377                        // we need to handle them directly as strings
378                        if let Value::Date(y, m, d, 0, 0, 0, 0) = value {
379                            packer.push(Datum::String(&format!("{:04}-{:02}-{:02}", y, m, d)));
380                        } else {
381                            Err(anyhow::anyhow!(
382                                "received unexpected value for date type: {:?}",
383                                value
384                            ))?;
385                        }
386                    }
387                    Some(MySqlColumnMeta::Timestamp(precision)) => {
388                        // Materialize treats DATETIME and TIMESTAMP as MySqlColumnMeta::Timestamp,
389                        // but they have slightly different semantics as far as the range of dates
390                        // they can represent.
391                        // (see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-types.html).
392                        //
393                        // Three mysql_common::Value variants exist, which are mapped to
394                        // [`MySqlColumnMeta::Timestamp`]
395                        // (see https://github.com/blackbeam/rust_mysql_common/blob/2e6f6696de03c91b9fd95a87356d081285290704/src/binlog/value.rs):
396                        //   Value::Date  — MZ snapshot & binlog MYSQL_TYPE_DATETIME/MYSQL_TYPE_DATETIME2
397                        //                  (value/mod.rs:443-445, binlog/value.rs:109-161)
398                        //   Value::Int   — legacy binlog MYSQL_TYPE_TIMESTAMP, pre-5.6,
399                        //                  4-byte unix epoch (binlog/value.rs:87-90)
400                        //   Value::Bytes — binlog MYSQL_TYPE_TIMESTAMP2, 5.6+,
401                        //                  "<sec>" or "<sec>.<usec>" (binlog/value.rs:145-154)
402                        let str_timestamp = match value {
403                            Value::Date(y, m, d, h, mm, s, ms) => {
404                                format_mysql_timestamp(y, m, d, h, mm, s, ms, *precision)
405                            }
406                            // Pre-5.6 unix epoch, no fractional seconds.
407                            // val == 0 is the zero-date sentinel, not epoch 0.
408                            Value::Int(0) => mysql_zero_timestamp(*precision),
409                            Value::Int(val) => chrono::DateTime::from_timestamp(val, 0)
410                                .ok_or_else(|| {
411                                    anyhow::anyhow!("received invalid timestamp value: {}", val)
412                                })?
413                                .naive_utc()
414                                .format("%Y-%m-%d %H:%M:%S")
415                                .to_string(),
416                            // 5.6+ epoch string; parse + reformat so all variants emit the
417                            // same canonical YYYY-MM-DD HH:MM:SS[.ffff] text.
418                            Value::Bytes(data) => {
419                                let s = std::str::from_utf8(&data).map_err(|_| {
420                                    anyhow::anyhow!("received invalid timestamp value: {:?}", data)
421                                })?;
422                                // sec=0 (with or without fractional component) is the
423                                // zero-date sentinel.
424                                if s.split('.').next() == Some("0") {
425                                    mysql_zero_timestamp(*precision)
426                                } else {
427                                    let dt = if s.contains('.') {
428                                        chrono::NaiveDateTime::parse_from_str(s, "%s%.6f")
429                                    } else {
430                                        chrono::NaiveDateTime::parse_from_str(s, "%s")
431                                    }
432                                    .map_err(|_| {
433                                        anyhow::anyhow!("received invalid timestamp value: {:?}", s)
434                                    })?;
435                                    use chrono::{Datelike, Timelike};
436                                    let y = u16::try_from(dt.year()).map_err(|_| {
437                                        anyhow::anyhow!(
438                                            "timestamp year out of range: {}",
439                                            dt.year()
440                                        )
441                                    })?;
442                                    format_mysql_timestamp(
443                                        y,
444                                        u8::try_from(dt.month())?,
445                                        u8::try_from(dt.day())?,
446                                        u8::try_from(dt.hour())?,
447                                        u8::try_from(dt.minute())?,
448                                        u8::try_from(dt.second())?,
449                                        dt.nanosecond() / 1000,
450                                        *precision,
451                                    )
452                                }
453                            }
454                            _ => Err(anyhow::anyhow!(
455                                "received unexpected value for timestamp type: {:?}",
456                                value
457                            ))?,
458                        };
459                        packer.push(Datum::String(&str_timestamp));
460                    }
461                    Some(MySqlColumnMeta::Bit(_)) => unreachable!("parsed as a u64"),
462                    None => {
463                        packer.push(Datum::String(&from_value_opt::<String>(value)?));
464                    }
465                }
466            }
467            SqlScalarType::Jsonb => {
468                if let Value::Bytes(data) = value {
469                    let packer = JsonbPacker::new(packer);
470                    // TODO(guswynn): This still produces and extract allocation (in the
471                    // `DeserializeSeed` impl used internally), which should be improved,
472                    // for all users of the APIs in that module.
473                    packer.pack_slice(&data).map_err(|e| {
474                        anyhow::anyhow!(
475                            "Failed to decode JSON: {}",
476                            // See if we can output the string that failed to be converted to JSON.
477                            match std::str::from_utf8(&data) {
478                                Ok(str) => str.to_string(),
479                                // Otherwise produce the nominally helpful error.
480                                Err(_) => e.display_with_causes().to_string(),
481                            }
482                        )
483                    })?;
484                } else {
485                    Err(anyhow::anyhow!(
486                        "received unexpected value for json type: {:?}",
487                        value
488                    ))?
489                }
490            }
491            SqlScalarType::Bytes => {
492                let data = from_value_opt::<Vec<u8>>(value)?;
493                packer.push(Datum::Bytes(&data));
494            }
495            SqlScalarType::Date => {
496                let date = Date::try_from(from_value_opt::<chrono::NaiveDate>(value)?)?;
497                packer.push(Datum::from(date));
498            }
499            SqlScalarType::Timestamp { precision: _ } => {
500                // Timestamps are encoded as different mysql_common::Value types depending on
501                // whether they are from a binlog event or a query, and depending on which
502                // mysql timestamp version is used. We handle those cases here
503                // https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L87-L155
504                // https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/value/mod.rs#L332
505                let chrono_timestamp = match value {
506                    Value::Date(..) => from_value_opt::<chrono::NaiveDateTime>(value)?,
507                    // old temporal format from before MySQL 5.6; didn't support fractional seconds
508                    Value::Int(val) => chrono::DateTime::from_timestamp(val, 0)
509                        .ok_or_else(|| {
510                            anyhow::anyhow!("received invalid timestamp value: {}", val)
511                        })?
512                        .naive_utc(),
513                    Value::Bytes(data) => {
514                        let data = std::str::from_utf8(&data)?;
515                        if data.contains('.') {
516                            chrono::NaiveDateTime::parse_from_str(data, "%s%.6f")?
517                        } else {
518                            chrono::NaiveDateTime::parse_from_str(data, "%s")?
519                        }
520                    }
521                    _ => Err(anyhow::anyhow!(
522                        "received unexpected value for timestamp type: {:?}",
523                        value
524                    ))?,
525                };
526                packer.push(Datum::try_from(CheckedTimestamp::try_from(
527                    chrono_timestamp,
528                )?)?);
529            }
530            SqlScalarType::Time => {
531                packer.push(Datum::from(from_value_opt::<chrono::NaiveTime>(value)?));
532            }
533            SqlScalarType::Numeric { max_scale } => {
534                // The wire-format of numeric types is a string when sent in a binary query
535                // response but is represented in a decimal binary format when sent in a binlog
536                // event. However the mysql-common crate abstracts this away and always returns
537                // a string. We parse the string into a numeric type here.
538                let val = from_value_opt::<String>(value)?;
539                let val = Numeric::from_str(&val)?;
540                if get_precision(&val) > NUMERIC_DATUM_MAX_PRECISION.into() {
541                    Err(anyhow::anyhow!(
542                        "received numeric value with precision {} for column {} which has a max precision of {}",
543                        get_precision(&val),
544                        col_desc.name,
545                        NUMERIC_DATUM_MAX_PRECISION
546                    ))?
547                }
548                if let Some(max_scale) = max_scale {
549                    if get_scale(&val) > max_scale.into_u8().into() {
550                        Err(anyhow::anyhow!(
551                            "received numeric value with scale {} for column {} which has a max scale of {}",
552                            get_scale(&val),
553                            col_desc.name,
554                            max_scale.into_u8()
555                        ))?
556                    }
557                }
558                packer.push(Datum::from(val));
559            }
560            // TODO(roshan): IMPLEMENT OTHER TYPES
561            data_type => Err(anyhow::anyhow!(
562                "received unexpected value for type: {:?}: {:?}",
563                data_type,
564                value
565            ))?,
566        },
567    }
568    Ok(())
569}
570
571fn check_char_length(
572    length: Option<u32>,
573    val: &str,
574    col_desc: &MySqlColumnDesc,
575) -> Result<(), anyhow::Error> {
576    if let Some(length) = length {
577        if let Some(_) = val.char_indices().nth(usize::cast_from(length)) {
578            Err(anyhow::anyhow!(
579                "received string value of length {} for column {} which has a max length of {}",
580                val.len(),
581                col_desc.name,
582                length
583            ))?
584        }
585    }
586    Ok(())
587}
588
589#[cfg(test)]
590mod tests {
591    //! Unit tests for the TEXT-COLUMNS decoding of MySQL TIMESTAMP values.
592    //!
593    //! These cover the regression where a MySQL TIMESTAMP column declared as
594    //! a TEXT COLUMN fails to decode when the wire value arrives as
595    //! `Value::Bytes("<unix-epoch>")` or `Value::Int(<unix-epoch>)` instead
596    //! of `Value::Date(..)`. The integration test in
597    //! `test/mysql-cdc/text-columns-timestamp.td` exercises this through
598    //! a real MySQL container but is non-deterministic: which `Value`
599    //! variant `mysql-async` produces depends on connection-state timing.
600    //! These unit tests pin each variant down directly.
601    //!
602    //! The wire-variant matrix exercised below is derived from mysql_common
603    //! v0.35.5:
604    //!
605    //!   * Value::Int(epoch) — binlog MYSQL_TYPE_TIMESTAMP (pre-5.6):
606    //!     https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L87-L90
607    //!   * Value::Bytes("<sec>"/"<sec>.<usec>") — binlog MYSQL_TYPE_TIMESTAMP2 (5.6+):
608    //!     https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L145-L154
609    //!   * Value::Date(...) — binary query response + binlog DATETIME[2]:
610    //!     https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/value/mod.rs#L443-L445
611    //!     https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L109-L161
612    //!
613    //! MySQL semantics referenced by the zero-date and fractional-precision
614    //! cases:
615    //!
616    //!   * Zero-date allowed when sql_mode disables NO_ZERO_DATE:
617    //!     https://dev.mysql.com/doc/refman/8.0/en/sql-mode.html#sqlmode_no_zero_date
618    //!   * TIMESTAMP(p) / DATETIME(p) fractional seconds:
619    //!     https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
620    use super::*;
621    use mz_repr::{SqlColumnType, SqlScalarType};
622
623    fn timestamp_text_col(precision: u32) -> MySqlColumnDesc {
624        MySqlColumnDesc {
625            name: "created_at".to_string(),
626            column_type: Some(SqlColumnType {
627                scalar_type: SqlScalarType::String,
628                nullable: true,
629            }),
630            meta: Some(MySqlColumnMeta::Timestamp(precision)),
631        }
632    }
633
634    fn pack_one(value: Value, col: &MySqlColumnDesc) -> Result<String, anyhow::Error> {
635        let mut row = Row::default();
636        pack_val_as_datum(value, col, &mut row.packer())?;
637        Ok(row.unpack_first().unwrap_str().to_string())
638    }
639
640    #[mz_ore::test]
641    fn timestamp_value_date_no_precision() {
642        let col = timestamp_text_col(0);
643        let s = pack_one(Value::Date(2024, 4, 3, 10, 15, 13, 0), &col).unwrap();
644        assert_eq!(s, "2024-04-03 10:15:13");
645    }
646
647    #[mz_ore::test]
648    fn timestamp_value_date_with_precision() {
649        let col = timestamp_text_col(6);
650        let s = pack_one(Value::Date(2024, 4, 3, 10, 15, 13, 123456), &col).unwrap();
651        assert_eq!(s, "2024-04-03 10:15:13.123456");
652    }
653
654    #[mz_ore::test]
655    fn timestamp_value_date_zero_date() {
656        // The whole reason TEXT COLUMNS exists for TIMESTAMP: a
657        // zero-date arriving as Value::Date(0,..) should decode to the same
658        // "zero" timestamp value MySQL would display.
659        let col = timestamp_text_col(0);
660        let s = pack_one(Value::Date(0, 0, 0, 0, 0, 0, 0), &col).unwrap();
661        assert_eq!(s, "0000-00-00 00:00:00");
662    }
663
664    /// Regression: Value::Int (pre-5.6 legacy temporal format, unix
665    /// epoch seconds) was previously rejected with
666    /// `received unexpected value for timestamp type: Int(..)`.
667    #[mz_ore::test]
668    fn timestamp_value_int_epoch() {
669        let col = timestamp_text_col(0);
670        // 1743661234 == 2025-04-03 06:20:34 UTC
671        let s = pack_one(Value::Int(1_743_661_234), &col).unwrap();
672        assert_eq!(s, "2025-04-03 06:20:34");
673    }
674
675    /// sec=0 in the legacy TIMESTAMP encoding is the zero-date sentinel,
676    /// not unix epoch 0 — TIMESTAMP's range starts at '1970-01-01 00:00:01'
677    /// UTC so epoch 0 isn't a representable column value.
678    #[mz_ore::test]
679    fn timestamp_value_int_zero_is_sentinel() {
680        let col = timestamp_text_col(0);
681        let s = pack_one(Value::Int(0), &col).unwrap();
682        assert_eq!(s, "0000-00-00 00:00:00");
683    }
684
685    /// Out-of-range epochs must error rather than silently producing
686    /// a zero-timestamp — they aren't the MySQL zero-date marker, just
687    /// garbage chrono can't represent.
688    #[mz_ore::test]
689    fn timestamp_value_int_out_of_range_errors() {
690        let col = timestamp_text_col(0);
691        let err = pack_one(Value::Int(i64::MAX), &col).unwrap_err();
692        assert!(
693            err.to_string().contains("invalid timestamp value"),
694            "unexpected error message: {err}"
695        );
696    }
697
698    /// Regression: Value::Bytes carrying a unix-epoch string is the
699    /// wire variant that triggered the production failure
700    ///   received unexpected value for timestamp type: Bytes("17436613..")
701    #[mz_ore::test]
702    fn timestamp_value_bytes_epoch() {
703        let col = timestamp_text_col(0);
704        let s = pack_one(Value::Bytes(b"1743661234".to_vec()), &col).unwrap();
705        assert_eq!(s, "2025-04-03 06:20:34");
706    }
707
708    /// sec=0 in the TIMESTAMP2 encoding is the zero-date sentinel; same
709    /// reasoning as `timestamp_value_int_zero_is_sentinel`.
710    #[mz_ore::test]
711    fn timestamp_value_bytes_zero_is_sentinel() {
712        let col = timestamp_text_col(0);
713        let s = pack_one(Value::Bytes(b"0".to_vec()), &col).unwrap();
714        assert_eq!(s, "0000-00-00 00:00:00");
715    }
716
717    /// Sentinel detection survives a fractional component ("0.NNNNNN"),
718    /// and the helper pads the output to the column's precision so that
719    /// snapshot and binlog paths produce identical text for the same
720    /// upstream row.
721    #[mz_ore::test]
722    fn timestamp_value_bytes_zero_with_fractional_is_sentinel() {
723        let col = timestamp_text_col(6);
724        let s = pack_one(Value::Bytes(b"0.000000".to_vec()), &col).unwrap();
725        assert_eq!(s, "0000-00-00 00:00:00.000000");
726    }
727
728    /// Fractional form of the TIMESTAMP2 binlog encoding —
729    /// "<sec>.<usec>" wrapped in Value::Bytes (binlog/value.rs:151-153).
730    /// Hits the `s.contains('.')` branch and the precision-aware
731    /// reformat.
732    #[mz_ore::test]
733    fn timestamp_value_bytes_epoch_fractional() {
734        let col = timestamp_text_col(6);
735        let s = pack_one(Value::Bytes(b"1743661234.123456".to_vec()), &col).unwrap();
736        assert_eq!(s, "2025-04-03 06:20:34.123456");
737    }
738
739    /// Bytes that aren't valid UTF-8 should produce a meaningful error,
740    /// not a panic.
741    #[mz_ore::test]
742    fn timestamp_value_bytes_invalid_utf8_errors() {
743        let col = timestamp_text_col(0);
744        // 0xC3 0x28 is an invalid 2-byte UTF-8 sequence.
745        let err = pack_one(Value::Bytes(vec![0xC3, 0x28]), &col).unwrap_err();
746        let msg = err.to_string();
747        assert!(
748            msg.contains("invalid timestamp value"),
749            "unexpected error message: {msg}"
750        );
751    }
752
753    /// Bytes that are valid UTF-8 but not parseable as a unix epoch
754    /// should produce the same structured error as invalid UTF-8 —
755    /// covers the chrono parse failure path that
756    /// `timestamp_value_bytes_invalid_utf8_errors` doesn't reach.
757    #[mz_ore::test]
758    fn timestamp_value_bytes_unparseable_errors() {
759        let col = timestamp_text_col(0);
760        // "2024-04-03 10:15:13" is not valid because Value::Bytes must contain seconds since
761        // epoch, Value::Bytes("<sec>"/"<sec>.<usec>")
762        for payload in [&b""[..], &b"not-an-epoch"[..], &b"2024-04-03 10:15:13"[..]] {
763            let err = pack_one(Value::Bytes(payload.to_vec()), &col).unwrap_err();
764            assert!(
765                err.to_string().contains("invalid timestamp value"),
766                "payload {payload:?}: unexpected error message: {err}"
767            );
768        }
769    }
770
771    /// Variants that have no defined mapping for a TIMESTAMP column
772    /// must still produce the existing structured decode error so the
773    /// source health surface can flag them.
774    #[mz_ore::test]
775    fn timestamp_value_unsupported_variant_errors() {
776        let col = timestamp_text_col(0);
777        let err = pack_one(Value::Float(1.0), &col).unwrap_err();
778        let msg = err.to_string();
779        assert!(
780            msg.contains("unexpected value for timestamp"),
781            "unexpected error message: {msg}"
782        );
783    }
784}