Skip to main content

mz_mysql_util/
decoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::fmt::Write;
11use std::str::FromStr;
12
13use mysql_common::value::convert::from_value_opt;
14use mysql_common::{Row as MySqlRow, Value};
15
16use mz_ore::cast::CastFrom;
17use mz_ore::error::ErrorExt;
18use mz_repr::adt::date::Date;
19use mz_repr::adt::jsonb::JsonbPacker;
20use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, Numeric, get_precision, get_scale};
21use mz_repr::adt::timestamp::CheckedTimestamp;
22use mz_repr::{Datum, Row, RowPacker, SqlScalarType};
23
24use crate::desc::MySqlColumnMeta;
25use crate::{MySqlColumnDesc, MySqlError, MySqlTableDesc};
26
27/// Canonical text for the MySQL zero-date sentinel ('0000-00-00 00:00:00').
28/// In binlog MYSQL_TYPE_DATETIME/MYSQL_TYPE_DATETIME2 encodings, sec=0 *cannot* represent unix
29/// epoch 0.  The TIMESTAMP type's supported range starts at '1970-01-01 00:00:01' UTC
30/// (<https://dev.mysql.com/doc/refman/8.0/en/datetime.html>), so any sec=0 is
31/// unambiguously this sentinel.
32const MYSQL_ZERO_TIMESTAMP: &str = "0000-00-00 00:00:00";
33
34/// Maximum fractional-seconds precision MySQL accepts for DATETIME(p) and
35/// TIMESTAMP(p) — values are stored in microseconds, so 6 digits is the
36/// upper bound (<https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html>).
37const MYSQL_MAX_FRACTIONAL_PRECISION: u32 = 6;
38
39/// Format the zero-date sentinel for a column with the given fractional
40/// precision (matches the Date arm's `{:0precision$}` behavior).
41fn mysql_zero_timestamp(precision: u32) -> String {
42    if precision > 0 {
43        format!(
44            "{}.{}",
45            MYSQL_ZERO_TIMESTAMP,
46            "0".repeat(usize::cast_from(precision))
47        )
48    } else {
49        MYSQL_ZERO_TIMESTAMP.to_string()
50    }
51}
52
53/// Format MySQL DATETIME/TIMESTAMP components as `YYYY-MM-DD HH:MM:SS[.ffff]`.
54/// `micros` is the raw microseconds (0..1_000_000); only the leading
55/// `precision` digits are kept, matching MySQL's DATETIME(p)/TIMESTAMP(p)
56/// display.
57fn format_mysql_timestamp(
58    y: u16,
59    m: u8,
60    d: u8,
61    hr: u8,
62    min: u8,
63    sec: u8,
64    micros: u32,
65    precision: u32,
66) -> String {
67    if precision == 0 {
68        return format!("{y:04}-{m:02}-{d:02} {hr:02}:{min:02}:{sec:02}");
69    }
70    // Clamp defensively: MySQL itself rejects precision > 6, but upstream
71    // metadata is untrusted and a larger value would make `pow()` below
72    // overflow its u32 exponent.
73    let p = precision.min(MYSQL_MAX_FRACTIONAL_PRECISION);
74    let scaled = micros / 10u32.pow(MYSQL_MAX_FRACTIONAL_PRECISION - p);
75    let width = usize::cast_from(p);
76    format!("{y:04}-{m:02}-{d:02} {hr:02}:{min:02}:{sec:02}.{scaled:0width$}")
77}
78
79pub fn pack_mysql_row(
80    row_container: &mut Row,
81    row: MySqlRow,
82    table_desc: &MySqlTableDesc,
83    gtid_set: Option<&str>,
84    binlog_full_metadata: bool,
85) -> Result<Row, MySqlError> {
86    let mut packer = row_container.packer();
87
88    // For each column in `table_desc` (in descriptor order), resolve its wire
89    // index. With binlog_full_metadata=true, columns are matched by name so a reordered upstream
90    // still decodes correctly; without binlog_full_metadata, rows have no column names and must be
91    // matched positionally. A `None` here means the upstream row is missing this column and is
92    // only tolerated for ignored columns, and for binlog_full_metadata = false, is only tolerated
93    // for ignored columns at the end of the table.
94    for (i, col_desc) in table_desc.columns.iter().enumerate() {
95        if col_desc.column_type.is_none() {
96            // This column is ignored, so don't decode it.
97            continue;
98        }
99        let wire_idx = if !binlog_full_metadata {
100            // No column name metadata, so we match by index.
101            (i < row.len()).then_some(i)
102        } else {
103            // This means the row from the binlog has column name included in the metadata,
104            // so we can match on that instead of position.
105            row.columns_ref()
106                .iter()
107                .position(|wc| wc.name_str() == col_desc.name.as_str())
108        };
109
110        let wire_idx = match wire_idx {
111            Some(idx) => idx,
112            None => {
113                // We could not find a column in the incoming row that matches this descriptor column.
114                // This is an error as the column is not ignored (ignored columns have already been skipped).
115                return Err(decode_error(
116                    "extra column description",
117                    col_desc,
118                    table_desc,
119                    gtid_set,
120                    &row,
121                ));
122            }
123        };
124        let value = row
125            .as_ref(wire_idx)
126            .expect("wire_idx resolved from row")
127            .clone();
128        if let Err(err) = pack_val_as_datum(value, col_desc, &mut packer) {
129            return Err(decode_error(
130                &err.to_string(),
131                col_desc,
132                table_desc,
133                gtid_set,
134                &row,
135            ));
136        }
137    }
138
139    Ok(row_container.clone())
140}
141
142/// Build a `ValueDecodeError`, logging the schema, table, column, source
143/// gtid_set (if any), and a shape description of `row` at the same time.
144/// The shape string is only built here — pack_mysql_row's happy path does no
145/// per-row allocation beyond what decoding requires.
146fn decode_error(
147    err_msg: &str,
148    col_desc: &MySqlColumnDesc,
149    table_desc: &MySqlTableDesc,
150    gtid_set: Option<&str>,
151    row: &MySqlRow,
152) -> MySqlError {
153    let row_shape = describe_row_shape(row, table_desc);
154    tracing::warn!(
155        "mysql decode error for `{}`.`{}` column `{}`: {}; gtid_set={:?}; row_shape={}",
156        table_desc.schema_name,
157        table_desc.name,
158        col_desc.name,
159        err_msg,
160        gtid_set,
161        row_shape,
162    );
163    MySqlError::ValueDecodeError {
164        column_name: col_desc.name.clone(),
165        qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name),
166        error: err_msg.to_string(),
167    }
168}
169
170/// Describes the structural shape of a row without revealing any data values.
171/// Iterates every wire column. For each, emits the wire name, the binlog
172/// wire type, the character-set id (or `binary`), a classification relative
173/// to `table_desc` (`expected=<scalar>` for active columns, `ignored` for
174/// columns excluded from the source, `extra` for upstream columns with no
175/// descriptor entry), and a value disposition (`null` or `bytes(len=N)` /
176/// primitive kind). Intended for diagnostic logging on decode errors: MySQL
177/// serializes CHAR, VARCHAR, TEXT, JSON, BLOB, etc. all as `Value::Bytes`,
178/// so the wire type tag and the expected scalar type are what distinguish
179/// them.
180fn describe_row_shape(row: &MySqlRow, table_desc: &MySqlTableDesc) -> String {
181    // Binlogs without full row metadata use positional "@N" names, so we
182    // have to match by wire position rather than by name.
183    let fallback_names = row
184        .columns_ref()
185        .first()
186        .is_some_and(|col| col.name_ref().starts_with(b"@"));
187
188    let mut out = String::new();
189    out.push('[');
190    for (i, wire_col) in row.columns_ref().iter().enumerate() {
191        if i > 0 {
192            out.push_str(", ");
193        }
194        let wire_name = wire_col.name_str();
195        let cs = wire_col.character_set();
196        // 63 = binary collation (binary/blob columns).
197        let cs_str = if cs == 63 {
198            "binary".to_string()
199        } else {
200            format!("charset={cs}")
201        };
202        let wire_type = format!("{:?}", wire_col.column_type());
203
204        let matched_col = if fallback_names {
205            table_desc.columns.get(i)
206        } else {
207            table_desc
208                .columns
209                .iter()
210                .find(|c| c.name.as_str() == wire_name)
211        };
212        let match_info = match matched_col {
213            Some(col) => match &col.column_type {
214                Some(ct) => format!("expected={:?}", ct.scalar_type),
215                None => "ignored".to_string(),
216            },
217            None => "extra".to_string(),
218        };
219
220        let val_desc = match row.as_ref(i) {
221            None => "absent".to_string(),
222            Some(Value::NULL) => "null".to_string(),
223            Some(Value::Bytes(b)) => format!("bytes(len={})", b.len()),
224            Some(Value::Int(_)) => "int".to_string(),
225            Some(Value::UInt(_)) => "uint".to_string(),
226            Some(Value::Float(_)) => "float".to_string(),
227            Some(Value::Double(_)) => "double".to_string(),
228            Some(Value::Date(..)) => "date".to_string(),
229            Some(Value::Time(..)) => "time".to_string(),
230        };
231
232        let _ = write!(
233            out,
234            "{{name={wire_name}, wire={wire_type}, {cs_str}, {match_info}, val={val_desc}}}"
235        );
236    }
237    out.push(']');
238    out
239}
240
241// TODO(guswynn|roshan): This function has various `.to_string()` and `format!` calls that should
242// use a shared allocation if possible.
243fn pack_val_as_datum(
244    value: Value,
245    col_desc: &MySqlColumnDesc,
246    packer: &mut RowPacker,
247) -> Result<(), anyhow::Error> {
248    let column_type = match col_desc.column_type {
249        Some(ref column_type) => column_type,
250        None => anyhow::bail!("column type is not set for column: {}", col_desc.name),
251    };
252    match value {
253        Value::NULL => {
254            if column_type.nullable {
255                packer.push(Datum::Null);
256            } else {
257                Err(anyhow::anyhow!(
258                    "received a null value in a non-null column".to_string(),
259                ))?
260            }
261        }
262        value => match &column_type.scalar_type {
263            SqlScalarType::Bool => packer.push(Datum::from(from_value_opt::<bool>(value)?)),
264            SqlScalarType::UInt16 => packer.push(Datum::from(from_value_opt::<u16>(value)?)),
265            SqlScalarType::Int16 => packer.push(Datum::from(from_value_opt::<i16>(value)?)),
266            SqlScalarType::UInt32 => packer.push(Datum::from(from_value_opt::<u32>(value)?)),
267            SqlScalarType::Int32 => packer.push(Datum::from(from_value_opt::<i32>(value)?)),
268            SqlScalarType::UInt64 => {
269                if let Some(MySqlColumnMeta::Bit(precision)) = &col_desc.meta {
270                    let mut value = from_value_opt::<Vec<u8>>(value)?;
271
272                    // Ensure we have the correct number of bytes.
273                    let precision_bytes = (precision + 7) / 8;
274                    if value.len() != usize::cast_from(precision_bytes) {
275                        return Err(anyhow::anyhow!("'bit' column out of range!"));
276                    }
277                    // Be defensive and prune any bits that come over the wire and are
278                    // greater than our precision.
279                    let bit_index = precision % 8;
280                    if bit_index != 0 {
281                        let mask = !(u8::MAX << bit_index);
282                        if value.len() > 0 {
283                            value[0] &= mask;
284                        }
285                    }
286
287                    // Based on experimentation the value coming across the wire is
288                    // encoded in big-endian.
289                    let mut buf = [0u8; 8];
290                    buf[(8 - value.len())..].copy_from_slice(value.as_slice());
291                    let value = u64::from_be_bytes(buf);
292                    packer.push(Datum::from(value))
293                } else {
294                    packer.push(Datum::from(from_value_opt::<u64>(value)?))
295                }
296            }
297            SqlScalarType::Int64 => packer.push(Datum::from(from_value_opt::<i64>(value)?)),
298            SqlScalarType::Float32 => packer.push(Datum::from(from_value_opt::<f32>(value)?)),
299            SqlScalarType::Float64 => packer.push(Datum::from(from_value_opt::<f64>(value)?)),
300            SqlScalarType::Char { length } => {
301                let val = from_value_opt::<String>(value)?;
302                check_char_length(length.map(|l| l.into_u32()), &val, col_desc)?;
303                packer.push(Datum::String(&val));
304            }
305            SqlScalarType::VarChar { max_length } => {
306                let val = from_value_opt::<String>(value)?;
307                check_char_length(max_length.map(|l| l.into_u32()), &val, col_desc)?;
308                packer.push(Datum::String(&val));
309            }
310            SqlScalarType::String => {
311                // Special case for string types, since this is the scalar type used for a column
312                // specified as a 'TEXT COLUMN'. In some cases we need to check the column
313                // metadata to know if the upstream value needs special handling
314                match &col_desc.meta {
315                    Some(MySqlColumnMeta::Enum(e)) => {
316                        match value {
317                            Value::Bytes(data) => {
318                                let data = std::str::from_utf8(&data)?;
319                                packer.push(Datum::String(data));
320                            }
321                            Value::Int(val) => {
322                                let enum_int = usize::try_from(val)?;
323
324                                // If mysql strict mode is disabled when an invalid entry is inserted
325                                // then the entry will be replicated as a 0. Outside the 1 indexed enum.
326                                // https://dev.mysql.com/doc/refman/8.4/en/enum.html#enum-indexes
327                                if enum_int == 0 {
328                                    packer.push(Datum::String(""));
329                                } else {
330                                    // Enum types are provided as 1-indexed integers in the replication
331                                    // stream, so we need to find the string value from the enum meta
332                                    let enum_val = e.values.get(enum_int - 1).ok_or_else(|| {
333                                        anyhow::anyhow!(
334                                            "received invalid enum value: {} for column {}",
335                                            val,
336                                            col_desc.name
337                                        )
338                                    })?;
339
340                                    packer.push(Datum::String(enum_val));
341                                }
342                            }
343                            _ => Err(anyhow::anyhow!(
344                                "received unexpected value for enum type: {:?}",
345                                value
346                            ))?,
347                        }
348                    }
349                    Some(MySqlColumnMeta::Json) => {
350                        // JSON types in a query response are encoded as a string with whitespace,
351                        // but when parsed from the binlog event by mysql-common they are provided
352                        // as an encoded string sans-whitespace.
353                        if let Value::Bytes(data) = value {
354                            let json = serde_json::from_slice::<serde_json::Value>(&data)?;
355                            packer.push(Datum::String(&json.to_string()));
356                        } else {
357                            Err(anyhow::anyhow!(
358                                "received unexpected value for json type: {:?}",
359                                value
360                            ))?;
361                        }
362                    }
363                    Some(MySqlColumnMeta::Year) => {
364                        let val = from_value_opt::<u16>(value)?;
365                        packer.push(Datum::String(&val.to_string()));
366                    }
367                    Some(MySqlColumnMeta::Date) => {
368                        // Some MySQL dates are invalid in chrono/NaiveDate (e.g. 0000-00-00), so
369                        // we need to handle them directly as strings
370                        if let Value::Date(y, m, d, 0, 0, 0, 0) = value {
371                            packer.push(Datum::String(&format!("{:04}-{:02}-{:02}", y, m, d)));
372                        } else {
373                            Err(anyhow::anyhow!(
374                                "received unexpected value for date type: {:?}",
375                                value
376                            ))?;
377                        }
378                    }
379                    Some(MySqlColumnMeta::Timestamp(precision)) => {
380                        // Materialize treats DATETIME and TIMESTAMP as MySqlColumnMeta::Timestamp,
381                        // but they have slightly different semantics as far as the range of dates
382                        // they can represent.
383                        // (see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-types.html).
384                        //
385                        // Three mysql_common::Value variants exist, which are mapped to
386                        // [`MySqlColumnMeta::Timestamp`]
387                        // (see https://github.com/blackbeam/rust_mysql_common/blob/2e6f6696de03c91b9fd95a87356d081285290704/src/binlog/value.rs):
388                        //   Value::Date  — MZ snapshot & binlog MYSQL_TYPE_DATETIME/MYSQL_TYPE_DATETIME2
389                        //                  (value/mod.rs:443-445, binlog/value.rs:109-161)
390                        //   Value::Int   — legacy binlog MYSQL_TYPE_TIMESTAMP, pre-5.6,
391                        //                  4-byte unix epoch (binlog/value.rs:87-90)
392                        //   Value::Bytes — binlog MYSQL_TYPE_TIMESTAMP2, 5.6+,
393                        //                  "<sec>" or "<sec>.<usec>" (binlog/value.rs:145-154)
394                        let str_timestamp = match value {
395                            Value::Date(y, m, d, h, mm, s, ms) => {
396                                format_mysql_timestamp(y, m, d, h, mm, s, ms, *precision)
397                            }
398                            // Pre-5.6 unix epoch, no fractional seconds.
399                            // val == 0 is the zero-date sentinel, not epoch 0.
400                            Value::Int(0) => mysql_zero_timestamp(*precision),
401                            Value::Int(val) => chrono::DateTime::from_timestamp(val, 0)
402                                .ok_or_else(|| {
403                                    anyhow::anyhow!("received invalid timestamp value: {}", val)
404                                })?
405                                .naive_utc()
406                                .format("%Y-%m-%d %H:%M:%S")
407                                .to_string(),
408                            // 5.6+ epoch string; parse + reformat so all variants emit the
409                            // same canonical YYYY-MM-DD HH:MM:SS[.ffff] text.
410                            Value::Bytes(data) => {
411                                let s = std::str::from_utf8(&data).map_err(|_| {
412                                    anyhow::anyhow!("received invalid timestamp value: {:?}", data)
413                                })?;
414                                // sec=0 (with or without fractional component) is the
415                                // zero-date sentinel.
416                                if s.split('.').next() == Some("0") {
417                                    mysql_zero_timestamp(*precision)
418                                } else {
419                                    let dt = if s.contains('.') {
420                                        chrono::NaiveDateTime::parse_from_str(s, "%s%.6f")
421                                    } else {
422                                        chrono::NaiveDateTime::parse_from_str(s, "%s")
423                                    }
424                                    .map_err(|_| {
425                                        anyhow::anyhow!("received invalid timestamp value: {:?}", s)
426                                    })?;
427                                    use chrono::{Datelike, Timelike};
428                                    let y = u16::try_from(dt.year()).map_err(|_| {
429                                        anyhow::anyhow!(
430                                            "timestamp year out of range: {}",
431                                            dt.year()
432                                        )
433                                    })?;
434                                    format_mysql_timestamp(
435                                        y,
436                                        u8::try_from(dt.month())?,
437                                        u8::try_from(dt.day())?,
438                                        u8::try_from(dt.hour())?,
439                                        u8::try_from(dt.minute())?,
440                                        u8::try_from(dt.second())?,
441                                        dt.nanosecond() / 1000,
442                                        *precision,
443                                    )
444                                }
445                            }
446                            _ => Err(anyhow::anyhow!(
447                                "received unexpected value for timestamp type: {:?}",
448                                value
449                            ))?,
450                        };
451                        packer.push(Datum::String(&str_timestamp));
452                    }
453                    Some(MySqlColumnMeta::Bit(_)) => unreachable!("parsed as a u64"),
454                    None => {
455                        packer.push(Datum::String(&from_value_opt::<String>(value)?));
456                    }
457                }
458            }
459            SqlScalarType::Jsonb => {
460                if let Value::Bytes(data) = value {
461                    let packer = JsonbPacker::new(packer);
462                    // TODO(guswynn): This still produces and extract allocation (in the
463                    // `DeserializeSeed` impl used internally), which should be improved,
464                    // for all users of the APIs in that module.
465                    packer.pack_slice(&data).map_err(|e| {
466                        anyhow::anyhow!(
467                            "Failed to decode JSON: {}",
468                            // See if we can output the string that failed to be converted to JSON.
469                            match std::str::from_utf8(&data) {
470                                Ok(str) => str.to_string(),
471                                // Otherwise produce the nominally helpful error.
472                                Err(_) => e.display_with_causes().to_string(),
473                            }
474                        )
475                    })?;
476                } else {
477                    Err(anyhow::anyhow!(
478                        "received unexpected value for json type: {:?}",
479                        value
480                    ))?
481                }
482            }
483            SqlScalarType::Bytes => {
484                let data = from_value_opt::<Vec<u8>>(value)?;
485                packer.push(Datum::Bytes(&data));
486            }
487            SqlScalarType::Date => {
488                let date = Date::try_from(from_value_opt::<chrono::NaiveDate>(value)?)?;
489                packer.push(Datum::from(date));
490            }
491            SqlScalarType::Timestamp { precision: _ } => {
492                // Timestamps are encoded as different mysql_common::Value types depending on
493                // whether they are from a binlog event or a query, and depending on which
494                // mysql timestamp version is used. We handle those cases here
495                // https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L87-L155
496                // https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/value/mod.rs#L332
497                let chrono_timestamp = match value {
498                    Value::Date(..) => from_value_opt::<chrono::NaiveDateTime>(value)?,
499                    // old temporal format from before MySQL 5.6; didn't support fractional seconds
500                    Value::Int(val) => chrono::DateTime::from_timestamp(val, 0)
501                        .ok_or_else(|| {
502                            anyhow::anyhow!("received invalid timestamp value: {}", val)
503                        })?
504                        .naive_utc(),
505                    Value::Bytes(data) => {
506                        let data = std::str::from_utf8(&data)?;
507                        if data.contains('.') {
508                            chrono::NaiveDateTime::parse_from_str(data, "%s%.6f")?
509                        } else {
510                            chrono::NaiveDateTime::parse_from_str(data, "%s")?
511                        }
512                    }
513                    _ => Err(anyhow::anyhow!(
514                        "received unexpected value for timestamp type: {:?}",
515                        value
516                    ))?,
517                };
518                packer.push(Datum::try_from(CheckedTimestamp::try_from(
519                    chrono_timestamp,
520                )?)?);
521            }
522            SqlScalarType::Time => {
523                packer.push(Datum::from(from_value_opt::<chrono::NaiveTime>(value)?));
524            }
525            SqlScalarType::Numeric { max_scale } => {
526                // The wire-format of numeric types is a string when sent in a binary query
527                // response but is represented in a decimal binary format when sent in a binlog
528                // event. However the mysql-common crate abstracts this away and always returns
529                // a string. We parse the string into a numeric type here.
530                let val = from_value_opt::<String>(value)?;
531                let val = Numeric::from_str(&val)?;
532                if get_precision(&val) > NUMERIC_DATUM_MAX_PRECISION.into() {
533                    Err(anyhow::anyhow!(
534                        "received numeric value with precision {} for column {} which has a max precision of {}",
535                        get_precision(&val),
536                        col_desc.name,
537                        NUMERIC_DATUM_MAX_PRECISION
538                    ))?
539                }
540                if let Some(max_scale) = max_scale {
541                    if get_scale(&val) > max_scale.into_u8().into() {
542                        Err(anyhow::anyhow!(
543                            "received numeric value with scale {} for column {} which has a max scale of {}",
544                            get_scale(&val),
545                            col_desc.name,
546                            max_scale.into_u8()
547                        ))?
548                    }
549                }
550                packer.push(Datum::from(val));
551            }
552            // TODO(roshan): IMPLEMENT OTHER TYPES
553            data_type => Err(anyhow::anyhow!(
554                "received unexpected value for type: {:?}: {:?}",
555                data_type,
556                value
557            ))?,
558        },
559    }
560    Ok(())
561}
562
563fn check_char_length(
564    length: Option<u32>,
565    val: &str,
566    col_desc: &MySqlColumnDesc,
567) -> Result<(), anyhow::Error> {
568    if let Some(length) = length {
569        if let Some(_) = val.char_indices().nth(usize::cast_from(length)) {
570            Err(anyhow::anyhow!(
571                "received string value of length {} for column {} which has a max length of {}",
572                val.len(),
573                col_desc.name,
574                length
575            ))?
576        }
577    }
578    Ok(())
579}
580
581#[cfg(test)]
582mod tests {
583    //! Unit tests for the TEXT-COLUMNS decoding of MySQL TIMESTAMP values.
584    //!
585    //! These cover the regression where a MySQL TIMESTAMP column declared as
586    //! a TEXT COLUMN fails to decode when the wire value arrives as
587    //! `Value::Bytes("<unix-epoch>")` or `Value::Int(<unix-epoch>)` instead
588    //! of `Value::Date(..)`. The integration test in
589    //! `test/mysql-cdc/text-columns-timestamp.td` exercises this through
590    //! a real MySQL container but is non-deterministic: which `Value`
591    //! variant `mysql-async` produces depends on connection-state timing.
592    //! These unit tests pin each variant down directly.
593    //!
594    //! The wire-variant matrix exercised below is derived from mysql_common
595    //! v0.35.5:
596    //!
597    //!   * Value::Int(epoch) — binlog MYSQL_TYPE_TIMESTAMP (pre-5.6):
598    //!     https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L87-L90
599    //!   * Value::Bytes("<sec>"/"<sec>.<usec>") — binlog MYSQL_TYPE_TIMESTAMP2 (5.6+):
600    //!     https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L145-L154
601    //!   * Value::Date(...) — binary query response + binlog DATETIME[2]:
602    //!     https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/value/mod.rs#L443-L445
603    //!     https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L109-L161
604    //!
605    //! MySQL semantics referenced by the zero-date and fractional-precision
606    //! cases:
607    //!
608    //!   * Zero-date allowed when sql_mode disables NO_ZERO_DATE:
609    //!     https://dev.mysql.com/doc/refman/8.0/en/sql-mode.html#sqlmode_no_zero_date
610    //!   * TIMESTAMP(p) / DATETIME(p) fractional seconds:
611    //!     https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
612    use super::*;
613    use mz_repr::{SqlColumnType, SqlScalarType};
614
615    fn timestamp_text_col(precision: u32) -> MySqlColumnDesc {
616        MySqlColumnDesc {
617            name: "created_at".to_string(),
618            column_type: Some(SqlColumnType {
619                scalar_type: SqlScalarType::String,
620                nullable: true,
621            }),
622            meta: Some(MySqlColumnMeta::Timestamp(precision)),
623        }
624    }
625
626    fn pack_one(value: Value, col: &MySqlColumnDesc) -> Result<String, anyhow::Error> {
627        let mut row = Row::default();
628        pack_val_as_datum(value, col, &mut row.packer())?;
629        Ok(row.unpack_first().unwrap_str().to_string())
630    }
631
632    #[mz_ore::test]
633    fn timestamp_value_date_no_precision() {
634        let col = timestamp_text_col(0);
635        let s = pack_one(Value::Date(2024, 4, 3, 10, 15, 13, 0), &col).unwrap();
636        assert_eq!(s, "2024-04-03 10:15:13");
637    }
638
639    #[mz_ore::test]
640    fn timestamp_value_date_with_precision() {
641        let col = timestamp_text_col(6);
642        let s = pack_one(Value::Date(2024, 4, 3, 10, 15, 13, 123456), &col).unwrap();
643        assert_eq!(s, "2024-04-03 10:15:13.123456");
644    }
645
646    #[mz_ore::test]
647    fn timestamp_value_date_zero_date() {
648        // The whole reason TEXT COLUMNS exists for TIMESTAMP: a
649        // zero-date arriving as Value::Date(0,..) should decode to the same
650        // "zero" timestamp value MySQL would display.
651        let col = timestamp_text_col(0);
652        let s = pack_one(Value::Date(0, 0, 0, 0, 0, 0, 0), &col).unwrap();
653        assert_eq!(s, "0000-00-00 00:00:00");
654    }
655
656    /// Regression: Value::Int (pre-5.6 legacy temporal format, unix
657    /// epoch seconds) was previously rejected with
658    /// `received unexpected value for timestamp type: Int(..)`.
659    #[mz_ore::test]
660    fn timestamp_value_int_epoch() {
661        let col = timestamp_text_col(0);
662        // 1743661234 == 2025-04-03 06:20:34 UTC
663        let s = pack_one(Value::Int(1_743_661_234), &col).unwrap();
664        assert_eq!(s, "2025-04-03 06:20:34");
665    }
666
667    /// sec=0 in the legacy TIMESTAMP encoding is the zero-date sentinel,
668    /// not unix epoch 0 — TIMESTAMP's range starts at '1970-01-01 00:00:01'
669    /// UTC so epoch 0 isn't a representable column value.
670    #[mz_ore::test]
671    fn timestamp_value_int_zero_is_sentinel() {
672        let col = timestamp_text_col(0);
673        let s = pack_one(Value::Int(0), &col).unwrap();
674        assert_eq!(s, "0000-00-00 00:00:00");
675    }
676
677    /// Out-of-range epochs must error rather than silently producing
678    /// a zero-timestamp — they aren't the MySQL zero-date marker, just
679    /// garbage chrono can't represent.
680    #[mz_ore::test]
681    fn timestamp_value_int_out_of_range_errors() {
682        let col = timestamp_text_col(0);
683        let err = pack_one(Value::Int(i64::MAX), &col).unwrap_err();
684        assert!(
685            err.to_string().contains("invalid timestamp value"),
686            "unexpected error message: {err}"
687        );
688    }
689
690    /// Regression: Value::Bytes carrying a unix-epoch string is the
691    /// wire variant that triggered the production failure
692    ///   received unexpected value for timestamp type: Bytes("17436613..")
693    #[mz_ore::test]
694    fn timestamp_value_bytes_epoch() {
695        let col = timestamp_text_col(0);
696        let s = pack_one(Value::Bytes(b"1743661234".to_vec()), &col).unwrap();
697        assert_eq!(s, "2025-04-03 06:20:34");
698    }
699
700    /// sec=0 in the TIMESTAMP2 encoding is the zero-date sentinel; same
701    /// reasoning as `timestamp_value_int_zero_is_sentinel`.
702    #[mz_ore::test]
703    fn timestamp_value_bytes_zero_is_sentinel() {
704        let col = timestamp_text_col(0);
705        let s = pack_one(Value::Bytes(b"0".to_vec()), &col).unwrap();
706        assert_eq!(s, "0000-00-00 00:00:00");
707    }
708
709    /// Sentinel detection survives a fractional component ("0.NNNNNN"),
710    /// and the helper pads the output to the column's precision so that
711    /// snapshot and binlog paths produce identical text for the same
712    /// upstream row.
713    #[mz_ore::test]
714    fn timestamp_value_bytes_zero_with_fractional_is_sentinel() {
715        let col = timestamp_text_col(6);
716        let s = pack_one(Value::Bytes(b"0.000000".to_vec()), &col).unwrap();
717        assert_eq!(s, "0000-00-00 00:00:00.000000");
718    }
719
720    /// Fractional form of the TIMESTAMP2 binlog encoding —
721    /// "<sec>.<usec>" wrapped in Value::Bytes (binlog/value.rs:151-153).
722    /// Hits the `s.contains('.')` branch and the precision-aware
723    /// reformat.
724    #[mz_ore::test]
725    fn timestamp_value_bytes_epoch_fractional() {
726        let col = timestamp_text_col(6);
727        let s = pack_one(Value::Bytes(b"1743661234.123456".to_vec()), &col).unwrap();
728        assert_eq!(s, "2025-04-03 06:20:34.123456");
729    }
730
731    /// Bytes that aren't valid UTF-8 should produce a meaningful error,
732    /// not a panic.
733    #[mz_ore::test]
734    fn timestamp_value_bytes_invalid_utf8_errors() {
735        let col = timestamp_text_col(0);
736        // 0xC3 0x28 is an invalid 2-byte UTF-8 sequence.
737        let err = pack_one(Value::Bytes(vec![0xC3, 0x28]), &col).unwrap_err();
738        let msg = err.to_string();
739        assert!(
740            msg.contains("invalid timestamp value"),
741            "unexpected error message: {msg}"
742        );
743    }
744
745    /// Bytes that are valid UTF-8 but not parseable as a unix epoch
746    /// should produce the same structured error as invalid UTF-8 —
747    /// covers the chrono parse failure path that
748    /// `timestamp_value_bytes_invalid_utf8_errors` doesn't reach.
749    #[mz_ore::test]
750    fn timestamp_value_bytes_unparseable_errors() {
751        let col = timestamp_text_col(0);
752        // "2024-04-03 10:15:13" is not valid because Value::Bytes must contain seconds since
753        // epoch, Value::Bytes("<sec>"/"<sec>.<usec>")
754        for payload in [&b""[..], &b"not-an-epoch"[..], &b"2024-04-03 10:15:13"[..]] {
755            let err = pack_one(Value::Bytes(payload.to_vec()), &col).unwrap_err();
756            assert!(
757                err.to_string().contains("invalid timestamp value"),
758                "payload {payload:?}: unexpected error message: {err}"
759            );
760        }
761    }
762
763    /// Variants that have no defined mapping for a TIMESTAMP column
764    /// must still produce the existing structured decode error so the
765    /// source health surface can flag them.
766    #[mz_ore::test]
767    fn timestamp_value_unsupported_variant_errors() {
768        let col = timestamp_text_col(0);
769        let err = pack_one(Value::Float(1.0), &col).unwrap_err();
770        let msg = err.to_string();
771        assert!(
772            msg.contains("unexpected value for timestamp"),
773            "unexpected error message: {msg}"
774        );
775    }
776}