mz_mysql_util/decoding.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::fmt::Write;
11use std::str::FromStr;
12
13use mysql_common::value::convert::from_value_opt;
14use mysql_common::{Row as MySqlRow, Value};
15
16use mz_ore::cast::CastFrom;
17use mz_ore::error::ErrorExt;
18use mz_repr::adt::date::Date;
19use mz_repr::adt::jsonb::JsonbPacker;
20use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, Numeric, get_precision, get_scale};
21use mz_repr::adt::timestamp::CheckedTimestamp;
22use mz_repr::{Datum, Row, RowPacker, SqlScalarType};
23
24use crate::desc::MySqlColumnMeta;
25use crate::{MySqlColumnDesc, MySqlError, MySqlTableDesc};
26
27/// Canonical text for the MySQL zero-date sentinel ('0000-00-00 00:00:00').
28/// In binlog MYSQL_TYPE_DATETIME/MYSQL_TYPE_DATETIME2 encodings, sec=0 *cannot* represent unix
29/// epoch 0. The TIMESTAMP type's supported range starts at '1970-01-01 00:00:01' UTC
30/// (<https://dev.mysql.com/doc/refman/8.0/en/datetime.html>), so any sec=0 is
31/// unambiguously this sentinel.
32const MYSQL_ZERO_TIMESTAMP: &str = "0000-00-00 00:00:00";
33
34/// Maximum fractional-seconds precision MySQL accepts for DATETIME(p) and
35/// TIMESTAMP(p) — values are stored in microseconds, so 6 digits is the
36/// upper bound (<https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html>).
37const MYSQL_MAX_FRACTIONAL_PRECISION: u32 = 6;
38
39/// Format the zero-date sentinel for a column with the given fractional
40/// precision (matches the Date arm's `{:0precision$}` behavior).
41fn mysql_zero_timestamp(precision: u32) -> String {
42 if precision > 0 {
43 format!(
44 "{}.{}",
45 MYSQL_ZERO_TIMESTAMP,
46 "0".repeat(usize::cast_from(precision))
47 )
48 } else {
49 MYSQL_ZERO_TIMESTAMP.to_string()
50 }
51}
52
53/// Format MySQL DATETIME/TIMESTAMP components as `YYYY-MM-DD HH:MM:SS[.ffff]`.
54/// `micros` is the raw microseconds (0..1_000_000); only the leading
55/// `precision` digits are kept, matching MySQL's DATETIME(p)/TIMESTAMP(p)
56/// display.
57fn format_mysql_timestamp(
58 y: u16,
59 m: u8,
60 d: u8,
61 hr: u8,
62 min: u8,
63 sec: u8,
64 micros: u32,
65 precision: u32,
66) -> String {
67 if precision == 0 {
68 return format!("{y:04}-{m:02}-{d:02} {hr:02}:{min:02}:{sec:02}");
69 }
70 // Clamp defensively: MySQL itself rejects precision > 6, but upstream
71 // metadata is untrusted and a larger value would make `pow()` below
72 // overflow its u32 exponent.
73 let p = precision.min(MYSQL_MAX_FRACTIONAL_PRECISION);
74 let scaled = micros / 10u32.pow(MYSQL_MAX_FRACTIONAL_PRECISION - p);
75 let width = usize::cast_from(p);
76 format!("{y:04}-{m:02}-{d:02} {hr:02}:{min:02}:{sec:02}.{scaled:0width$}")
77}
78
79pub fn pack_mysql_row(
80 row_container: &mut Row,
81 row: MySqlRow,
82 table_desc: &MySqlTableDesc,
83 gtid_set: Option<&str>,
84 binlog_full_metadata: bool,
85) -> Result<Row, MySqlError> {
86 let mut packer = row_container.packer();
87
88 // For each column in `table_desc` (in descriptor order), resolve its wire
89 // index. With binlog_full_metadata=true, columns are matched by name so a reordered upstream
90 // still decodes correctly; without binlog_full_metadata, rows have no column names and must be
91 // matched positionally. A `None` here means the upstream row is missing this column and is
92 // only tolerated for ignored columns, and for binlog_full_metadata = false, is only tolerated
93 // for ignored columns at the end of the table.
94 for (i, col_desc) in table_desc.columns.iter().enumerate() {
95 if col_desc.column_type.is_none() {
96 // This column is ignored, so don't decode it.
97 continue;
98 }
99 let wire_idx = if !binlog_full_metadata {
100 // No column name metadata, so we match by index.
101 (i < row.len()).then_some(i)
102 } else {
103 // This means the row from the binlog has column name included in the metadata,
104 // so we can match on that instead of position.
105 row.columns_ref()
106 .iter()
107 .position(|wc| wc.name_str() == col_desc.name.as_str())
108 };
109
110 let wire_idx = match wire_idx {
111 Some(idx) => idx,
112 None => {
113 // We could not find a column in the incoming row that matches this descriptor column.
114 // This is an error as the column is not ignored (ignored columns have already been skipped).
115 return Err(decode_error(
116 "extra column description",
117 col_desc,
118 table_desc,
119 gtid_set,
120 &row,
121 ));
122 }
123 };
124 let value = row
125 .as_ref(wire_idx)
126 .expect("wire_idx resolved from row")
127 .clone();
128 if let Err(err) = pack_val_as_datum(value, col_desc, &mut packer) {
129 return Err(decode_error(
130 &err.to_string(),
131 col_desc,
132 table_desc,
133 gtid_set,
134 &row,
135 ));
136 }
137 }
138
139 Ok(row_container.clone())
140}
141
142/// Build a `ValueDecodeError`, logging the schema, table, column, source
143/// gtid_set (if any), and a shape description of `row` at the same time.
144/// The shape string is only built here — pack_mysql_row's happy path does no
145/// per-row allocation beyond what decoding requires.
146fn decode_error(
147 err_msg: &str,
148 col_desc: &MySqlColumnDesc,
149 table_desc: &MySqlTableDesc,
150 gtid_set: Option<&str>,
151 row: &MySqlRow,
152) -> MySqlError {
153 let row_shape = describe_row_shape(row, table_desc);
154 tracing::warn!(
155 "mysql decode error for `{}`.`{}` column `{}`: {}; gtid_set={:?}; row_shape={}",
156 table_desc.schema_name,
157 table_desc.name,
158 col_desc.name,
159 err_msg,
160 gtid_set,
161 row_shape,
162 );
163 MySqlError::ValueDecodeError {
164 column_name: col_desc.name.clone(),
165 qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name),
166 error: err_msg.to_string(),
167 }
168}
169
170/// Describes the structural shape of a row without revealing any data values.
171/// Iterates every wire column. For each, emits the wire name, the binlog
172/// wire type, the character-set id (or `binary`), a classification relative
173/// to `table_desc` (`expected=<scalar>` for active columns, `ignored` for
174/// columns excluded from the source, `extra` for upstream columns with no
175/// descriptor entry), and a value disposition (`null` or `bytes(len=N)` /
176/// primitive kind). Intended for diagnostic logging on decode errors: MySQL
177/// serializes CHAR, VARCHAR, TEXT, JSON, BLOB, etc. all as `Value::Bytes`,
178/// so the wire type tag and the expected scalar type are what distinguish
179/// them.
180fn describe_row_shape(row: &MySqlRow, table_desc: &MySqlTableDesc) -> String {
181 // Binlogs without full row metadata use positional "@N" names, so we
182 // have to match by wire position rather than by name.
183 let fallback_names = row
184 .columns_ref()
185 .first()
186 .is_some_and(|col| col.name_ref().starts_with(b"@"));
187
188 let mut out = String::new();
189 out.push('[');
190 for (i, wire_col) in row.columns_ref().iter().enumerate() {
191 if i > 0 {
192 out.push_str(", ");
193 }
194 let wire_name = wire_col.name_str();
195 let cs = wire_col.character_set();
196 // 63 = binary collation (binary/blob columns).
197 let cs_str = if cs == 63 {
198 "binary".to_string()
199 } else {
200 format!("charset={cs}")
201 };
202 let wire_type = format!("{:?}", wire_col.column_type());
203
204 let matched_col = if fallback_names {
205 table_desc.columns.get(i)
206 } else {
207 table_desc
208 .columns
209 .iter()
210 .find(|c| c.name.as_str() == wire_name)
211 };
212 let match_info = match matched_col {
213 Some(col) => match &col.column_type {
214 Some(ct) => format!("expected={:?}", ct.scalar_type),
215 None => "ignored".to_string(),
216 },
217 None => "extra".to_string(),
218 };
219
220 let val_desc = match row.as_ref(i) {
221 None => "absent".to_string(),
222 Some(Value::NULL) => "null".to_string(),
223 Some(Value::Bytes(b)) => format!("bytes(len={})", b.len()),
224 Some(Value::Int(_)) => "int".to_string(),
225 Some(Value::UInt(_)) => "uint".to_string(),
226 Some(Value::Float(_)) => "float".to_string(),
227 Some(Value::Double(_)) => "double".to_string(),
228 Some(Value::Date(..)) => "date".to_string(),
229 Some(Value::Time(..)) => "time".to_string(),
230 };
231
232 let _ = write!(
233 out,
234 "{{name={wire_name}, wire={wire_type}, {cs_str}, {match_info}, val={val_desc}}}"
235 );
236 }
237 out.push(']');
238 out
239}
240
241// TODO(guswynn|roshan): This function has various `.to_string()` and `format!` calls that should
242// use a shared allocation if possible.
243fn pack_val_as_datum(
244 value: Value,
245 col_desc: &MySqlColumnDesc,
246 packer: &mut RowPacker,
247) -> Result<(), anyhow::Error> {
248 let column_type = match col_desc.column_type {
249 Some(ref column_type) => column_type,
250 None => anyhow::bail!("column type is not set for column: {}", col_desc.name),
251 };
252 match value {
253 Value::NULL => {
254 if column_type.nullable {
255 packer.push(Datum::Null);
256 } else {
257 Err(anyhow::anyhow!(
258 "received a null value in a non-null column".to_string(),
259 ))?
260 }
261 }
262 value => match &column_type.scalar_type {
263 SqlScalarType::Bool => packer.push(Datum::from(from_value_opt::<bool>(value)?)),
264 SqlScalarType::UInt16 => packer.push(Datum::from(from_value_opt::<u16>(value)?)),
265 SqlScalarType::Int16 => packer.push(Datum::from(from_value_opt::<i16>(value)?)),
266 SqlScalarType::UInt32 => packer.push(Datum::from(from_value_opt::<u32>(value)?)),
267 SqlScalarType::Int32 => packer.push(Datum::from(from_value_opt::<i32>(value)?)),
268 SqlScalarType::UInt64 => {
269 if let Some(MySqlColumnMeta::Bit(precision)) = &col_desc.meta {
270 let mut value = from_value_opt::<Vec<u8>>(value)?;
271
272 // Ensure we have the correct number of bytes.
273 let precision_bytes = (precision + 7) / 8;
274 if value.len() != usize::cast_from(precision_bytes) {
275 return Err(anyhow::anyhow!("'bit' column out of range!"));
276 }
277 // Be defensive and prune any bits that come over the wire and are
278 // greater than our precision.
279 let bit_index = precision % 8;
280 if bit_index != 0 {
281 let mask = !(u8::MAX << bit_index);
282 if value.len() > 0 {
283 value[0] &= mask;
284 }
285 }
286
287 // Based on experimentation the value coming across the wire is
288 // encoded in big-endian.
289 let mut buf = [0u8; 8];
290 buf[(8 - value.len())..].copy_from_slice(value.as_slice());
291 let value = u64::from_be_bytes(buf);
292 packer.push(Datum::from(value))
293 } else {
294 packer.push(Datum::from(from_value_opt::<u64>(value)?))
295 }
296 }
297 SqlScalarType::Int64 => packer.push(Datum::from(from_value_opt::<i64>(value)?)),
298 SqlScalarType::Float32 => packer.push(Datum::from(from_value_opt::<f32>(value)?)),
299 SqlScalarType::Float64 => packer.push(Datum::from(from_value_opt::<f64>(value)?)),
300 SqlScalarType::Char { length } => {
301 let val = from_value_opt::<String>(value)?;
302 check_char_length(length.map(|l| l.into_u32()), &val, col_desc)?;
303 packer.push(Datum::String(&val));
304 }
305 SqlScalarType::VarChar { max_length } => {
306 let val = from_value_opt::<String>(value)?;
307 check_char_length(max_length.map(|l| l.into_u32()), &val, col_desc)?;
308 packer.push(Datum::String(&val));
309 }
310 SqlScalarType::String => {
311 // Special case for string types, since this is the scalar type used for a column
312 // specified as a 'TEXT COLUMN'. In some cases we need to check the column
313 // metadata to know if the upstream value needs special handling
314 match &col_desc.meta {
315 Some(MySqlColumnMeta::Enum(e)) => {
316 match value {
317 Value::Bytes(data) => {
318 let data = std::str::from_utf8(&data)?;
319 packer.push(Datum::String(data));
320 }
321 Value::Int(val) => {
322 let enum_int = usize::try_from(val)?;
323
324 // If mysql strict mode is disabled when an invalid entry is inserted
325 // then the entry will be replicated as a 0. Outside the 1 indexed enum.
326 // https://dev.mysql.com/doc/refman/8.4/en/enum.html#enum-indexes
327 if enum_int == 0 {
328 packer.push(Datum::String(""));
329 } else {
330 // Enum types are provided as 1-indexed integers in the replication
331 // stream, so we need to find the string value from the enum meta
332 let enum_val = e.values.get(enum_int - 1).ok_or_else(|| {
333 anyhow::anyhow!(
334 "received invalid enum value: {} for column {}",
335 val,
336 col_desc.name
337 )
338 })?;
339
340 packer.push(Datum::String(enum_val));
341 }
342 }
343 _ => Err(anyhow::anyhow!(
344 "received unexpected value for enum type: {:?}",
345 value
346 ))?,
347 }
348 }
349 Some(MySqlColumnMeta::Json) => {
350 // JSON types in a query response are encoded as a string with whitespace,
351 // but when parsed from the binlog event by mysql-common they are provided
352 // as an encoded string sans-whitespace.
353 if let Value::Bytes(data) = value {
354 let json = serde_json::from_slice::<serde_json::Value>(&data)?;
355 packer.push(Datum::String(&json.to_string()));
356 } else {
357 Err(anyhow::anyhow!(
358 "received unexpected value for json type: {:?}",
359 value
360 ))?;
361 }
362 }
363 Some(MySqlColumnMeta::Year) => {
364 let val = from_value_opt::<u16>(value)?;
365 packer.push(Datum::String(&val.to_string()));
366 }
367 Some(MySqlColumnMeta::Date) => {
368 // Some MySQL dates are invalid in chrono/NaiveDate (e.g. 0000-00-00), so
369 // we need to handle them directly as strings
370 if let Value::Date(y, m, d, 0, 0, 0, 0) = value {
371 packer.push(Datum::String(&format!("{:04}-{:02}-{:02}", y, m, d)));
372 } else {
373 Err(anyhow::anyhow!(
374 "received unexpected value for date type: {:?}",
375 value
376 ))?;
377 }
378 }
379 Some(MySqlColumnMeta::Timestamp(precision)) => {
380 // Materialize treats DATETIME and TIMESTAMP as MySqlColumnMeta::Timestamp,
381 // but they have slightly different semantics as far as the range of dates
382 // they can represent.
383 // (see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-types.html).
384 //
385 // Three mysql_common::Value variants exist, which are mapped to
386 // [`MySqlColumnMeta::Timestamp`]
387 // (see https://github.com/blackbeam/rust_mysql_common/blob/2e6f6696de03c91b9fd95a87356d081285290704/src/binlog/value.rs):
388 // Value::Date — MZ snapshot & binlog MYSQL_TYPE_DATETIME/MYSQL_TYPE_DATETIME2
389 // (value/mod.rs:443-445, binlog/value.rs:109-161)
390 // Value::Int — legacy binlog MYSQL_TYPE_TIMESTAMP, pre-5.6,
391 // 4-byte unix epoch (binlog/value.rs:87-90)
392 // Value::Bytes — binlog MYSQL_TYPE_TIMESTAMP2, 5.6+,
393 // "<sec>" or "<sec>.<usec>" (binlog/value.rs:145-154)
394 let str_timestamp = match value {
395 Value::Date(y, m, d, h, mm, s, ms) => {
396 format_mysql_timestamp(y, m, d, h, mm, s, ms, *precision)
397 }
398 // Pre-5.6 unix epoch, no fractional seconds.
399 // val == 0 is the zero-date sentinel, not epoch 0.
400 Value::Int(0) => mysql_zero_timestamp(*precision),
401 Value::Int(val) => chrono::DateTime::from_timestamp(val, 0)
402 .ok_or_else(|| {
403 anyhow::anyhow!("received invalid timestamp value: {}", val)
404 })?
405 .naive_utc()
406 .format("%Y-%m-%d %H:%M:%S")
407 .to_string(),
408 // 5.6+ epoch string; parse + reformat so all variants emit the
409 // same canonical YYYY-MM-DD HH:MM:SS[.ffff] text.
410 Value::Bytes(data) => {
411 let s = std::str::from_utf8(&data).map_err(|_| {
412 anyhow::anyhow!("received invalid timestamp value: {:?}", data)
413 })?;
414 // sec=0 (with or without fractional component) is the
415 // zero-date sentinel.
416 if s.split('.').next() == Some("0") {
417 mysql_zero_timestamp(*precision)
418 } else {
419 let dt = if s.contains('.') {
420 chrono::NaiveDateTime::parse_from_str(s, "%s%.6f")
421 } else {
422 chrono::NaiveDateTime::parse_from_str(s, "%s")
423 }
424 .map_err(|_| {
425 anyhow::anyhow!("received invalid timestamp value: {:?}", s)
426 })?;
427 use chrono::{Datelike, Timelike};
428 let y = u16::try_from(dt.year()).map_err(|_| {
429 anyhow::anyhow!(
430 "timestamp year out of range: {}",
431 dt.year()
432 )
433 })?;
434 format_mysql_timestamp(
435 y,
436 u8::try_from(dt.month())?,
437 u8::try_from(dt.day())?,
438 u8::try_from(dt.hour())?,
439 u8::try_from(dt.minute())?,
440 u8::try_from(dt.second())?,
441 dt.nanosecond() / 1000,
442 *precision,
443 )
444 }
445 }
446 _ => Err(anyhow::anyhow!(
447 "received unexpected value for timestamp type: {:?}",
448 value
449 ))?,
450 };
451 packer.push(Datum::String(&str_timestamp));
452 }
453 Some(MySqlColumnMeta::Bit(_)) => unreachable!("parsed as a u64"),
454 None => {
455 packer.push(Datum::String(&from_value_opt::<String>(value)?));
456 }
457 }
458 }
459 SqlScalarType::Jsonb => {
460 if let Value::Bytes(data) = value {
461 let packer = JsonbPacker::new(packer);
462 // TODO(guswynn): This still produces and extract allocation (in the
463 // `DeserializeSeed` impl used internally), which should be improved,
464 // for all users of the APIs in that module.
465 packer.pack_slice(&data).map_err(|e| {
466 anyhow::anyhow!(
467 "Failed to decode JSON: {}",
468 // See if we can output the string that failed to be converted to JSON.
469 match std::str::from_utf8(&data) {
470 Ok(str) => str.to_string(),
471 // Otherwise produce the nominally helpful error.
472 Err(_) => e.display_with_causes().to_string(),
473 }
474 )
475 })?;
476 } else {
477 Err(anyhow::anyhow!(
478 "received unexpected value for json type: {:?}",
479 value
480 ))?
481 }
482 }
483 SqlScalarType::Bytes => {
484 let data = from_value_opt::<Vec<u8>>(value)?;
485 packer.push(Datum::Bytes(&data));
486 }
487 SqlScalarType::Date => {
488 let date = Date::try_from(from_value_opt::<chrono::NaiveDate>(value)?)?;
489 packer.push(Datum::from(date));
490 }
491 SqlScalarType::Timestamp { precision: _ } => {
492 // Timestamps are encoded as different mysql_common::Value types depending on
493 // whether they are from a binlog event or a query, and depending on which
494 // mysql timestamp version is used. We handle those cases here
495 // https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L87-L155
496 // https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/value/mod.rs#L332
497 let chrono_timestamp = match value {
498 Value::Date(..) => from_value_opt::<chrono::NaiveDateTime>(value)?,
499 // old temporal format from before MySQL 5.6; didn't support fractional seconds
500 Value::Int(val) => chrono::DateTime::from_timestamp(val, 0)
501 .ok_or_else(|| {
502 anyhow::anyhow!("received invalid timestamp value: {}", val)
503 })?
504 .naive_utc(),
505 Value::Bytes(data) => {
506 let data = std::str::from_utf8(&data)?;
507 if data.contains('.') {
508 chrono::NaiveDateTime::parse_from_str(data, "%s%.6f")?
509 } else {
510 chrono::NaiveDateTime::parse_from_str(data, "%s")?
511 }
512 }
513 _ => Err(anyhow::anyhow!(
514 "received unexpected value for timestamp type: {:?}",
515 value
516 ))?,
517 };
518 packer.push(Datum::try_from(CheckedTimestamp::try_from(
519 chrono_timestamp,
520 )?)?);
521 }
522 SqlScalarType::Time => {
523 packer.push(Datum::from(from_value_opt::<chrono::NaiveTime>(value)?));
524 }
525 SqlScalarType::Numeric { max_scale } => {
526 // The wire-format of numeric types is a string when sent in a binary query
527 // response but is represented in a decimal binary format when sent in a binlog
528 // event. However the mysql-common crate abstracts this away and always returns
529 // a string. We parse the string into a numeric type here.
530 let val = from_value_opt::<String>(value)?;
531 let val = Numeric::from_str(&val)?;
532 if get_precision(&val) > NUMERIC_DATUM_MAX_PRECISION.into() {
533 Err(anyhow::anyhow!(
534 "received numeric value with precision {} for column {} which has a max precision of {}",
535 get_precision(&val),
536 col_desc.name,
537 NUMERIC_DATUM_MAX_PRECISION
538 ))?
539 }
540 if let Some(max_scale) = max_scale {
541 if get_scale(&val) > max_scale.into_u8().into() {
542 Err(anyhow::anyhow!(
543 "received numeric value with scale {} for column {} which has a max scale of {}",
544 get_scale(&val),
545 col_desc.name,
546 max_scale.into_u8()
547 ))?
548 }
549 }
550 packer.push(Datum::from(val));
551 }
552 // TODO(roshan): IMPLEMENT OTHER TYPES
553 data_type => Err(anyhow::anyhow!(
554 "received unexpected value for type: {:?}: {:?}",
555 data_type,
556 value
557 ))?,
558 },
559 }
560 Ok(())
561}
562
563fn check_char_length(
564 length: Option<u32>,
565 val: &str,
566 col_desc: &MySqlColumnDesc,
567) -> Result<(), anyhow::Error> {
568 if let Some(length) = length {
569 if let Some(_) = val.char_indices().nth(usize::cast_from(length)) {
570 Err(anyhow::anyhow!(
571 "received string value of length {} for column {} which has a max length of {}",
572 val.len(),
573 col_desc.name,
574 length
575 ))?
576 }
577 }
578 Ok(())
579}
580
581#[cfg(test)]
582mod tests {
583 //! Unit tests for the TEXT-COLUMNS decoding of MySQL TIMESTAMP values.
584 //!
585 //! These cover the regression where a MySQL TIMESTAMP column declared as
586 //! a TEXT COLUMN fails to decode when the wire value arrives as
587 //! `Value::Bytes("<unix-epoch>")` or `Value::Int(<unix-epoch>)` instead
588 //! of `Value::Date(..)`. The integration test in
589 //! `test/mysql-cdc/text-columns-timestamp.td` exercises this through
590 //! a real MySQL container but is non-deterministic: which `Value`
591 //! variant `mysql-async` produces depends on connection-state timing.
592 //! These unit tests pin each variant down directly.
593 //!
594 //! The wire-variant matrix exercised below is derived from mysql_common
595 //! v0.35.5:
596 //!
597 //! * Value::Int(epoch) — binlog MYSQL_TYPE_TIMESTAMP (pre-5.6):
598 //! https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L87-L90
599 //! * Value::Bytes("<sec>"/"<sec>.<usec>") — binlog MYSQL_TYPE_TIMESTAMP2 (5.6+):
600 //! https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L145-L154
601 //! * Value::Date(...) — binary query response + binlog DATETIME[2]:
602 //! https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/value/mod.rs#L443-L445
603 //! https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L109-L161
604 //!
605 //! MySQL semantics referenced by the zero-date and fractional-precision
606 //! cases:
607 //!
608 //! * Zero-date allowed when sql_mode disables NO_ZERO_DATE:
609 //! https://dev.mysql.com/doc/refman/8.0/en/sql-mode.html#sqlmode_no_zero_date
610 //! * TIMESTAMP(p) / DATETIME(p) fractional seconds:
611 //! https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
612 use super::*;
613 use mz_repr::{SqlColumnType, SqlScalarType};
614
615 fn timestamp_text_col(precision: u32) -> MySqlColumnDesc {
616 MySqlColumnDesc {
617 name: "created_at".to_string(),
618 column_type: Some(SqlColumnType {
619 scalar_type: SqlScalarType::String,
620 nullable: true,
621 }),
622 meta: Some(MySqlColumnMeta::Timestamp(precision)),
623 }
624 }
625
626 fn pack_one(value: Value, col: &MySqlColumnDesc) -> Result<String, anyhow::Error> {
627 let mut row = Row::default();
628 pack_val_as_datum(value, col, &mut row.packer())?;
629 Ok(row.unpack_first().unwrap_str().to_string())
630 }
631
632 #[mz_ore::test]
633 fn timestamp_value_date_no_precision() {
634 let col = timestamp_text_col(0);
635 let s = pack_one(Value::Date(2024, 4, 3, 10, 15, 13, 0), &col).unwrap();
636 assert_eq!(s, "2024-04-03 10:15:13");
637 }
638
639 #[mz_ore::test]
640 fn timestamp_value_date_with_precision() {
641 let col = timestamp_text_col(6);
642 let s = pack_one(Value::Date(2024, 4, 3, 10, 15, 13, 123456), &col).unwrap();
643 assert_eq!(s, "2024-04-03 10:15:13.123456");
644 }
645
646 #[mz_ore::test]
647 fn timestamp_value_date_zero_date() {
648 // The whole reason TEXT COLUMNS exists for TIMESTAMP: a
649 // zero-date arriving as Value::Date(0,..) should decode to the same
650 // "zero" timestamp value MySQL would display.
651 let col = timestamp_text_col(0);
652 let s = pack_one(Value::Date(0, 0, 0, 0, 0, 0, 0), &col).unwrap();
653 assert_eq!(s, "0000-00-00 00:00:00");
654 }
655
656 /// Regression: Value::Int (pre-5.6 legacy temporal format, unix
657 /// epoch seconds) was previously rejected with
658 /// `received unexpected value for timestamp type: Int(..)`.
659 #[mz_ore::test]
660 fn timestamp_value_int_epoch() {
661 let col = timestamp_text_col(0);
662 // 1743661234 == 2025-04-03 06:20:34 UTC
663 let s = pack_one(Value::Int(1_743_661_234), &col).unwrap();
664 assert_eq!(s, "2025-04-03 06:20:34");
665 }
666
667 /// sec=0 in the legacy TIMESTAMP encoding is the zero-date sentinel,
668 /// not unix epoch 0 — TIMESTAMP's range starts at '1970-01-01 00:00:01'
669 /// UTC so epoch 0 isn't a representable column value.
670 #[mz_ore::test]
671 fn timestamp_value_int_zero_is_sentinel() {
672 let col = timestamp_text_col(0);
673 let s = pack_one(Value::Int(0), &col).unwrap();
674 assert_eq!(s, "0000-00-00 00:00:00");
675 }
676
677 /// Out-of-range epochs must error rather than silently producing
678 /// a zero-timestamp — they aren't the MySQL zero-date marker, just
679 /// garbage chrono can't represent.
680 #[mz_ore::test]
681 fn timestamp_value_int_out_of_range_errors() {
682 let col = timestamp_text_col(0);
683 let err = pack_one(Value::Int(i64::MAX), &col).unwrap_err();
684 assert!(
685 err.to_string().contains("invalid timestamp value"),
686 "unexpected error message: {err}"
687 );
688 }
689
690 /// Regression: Value::Bytes carrying a unix-epoch string is the
691 /// wire variant that triggered the production failure
692 /// received unexpected value for timestamp type: Bytes("17436613..")
693 #[mz_ore::test]
694 fn timestamp_value_bytes_epoch() {
695 let col = timestamp_text_col(0);
696 let s = pack_one(Value::Bytes(b"1743661234".to_vec()), &col).unwrap();
697 assert_eq!(s, "2025-04-03 06:20:34");
698 }
699
700 /// sec=0 in the TIMESTAMP2 encoding is the zero-date sentinel; same
701 /// reasoning as `timestamp_value_int_zero_is_sentinel`.
702 #[mz_ore::test]
703 fn timestamp_value_bytes_zero_is_sentinel() {
704 let col = timestamp_text_col(0);
705 let s = pack_one(Value::Bytes(b"0".to_vec()), &col).unwrap();
706 assert_eq!(s, "0000-00-00 00:00:00");
707 }
708
709 /// Sentinel detection survives a fractional component ("0.NNNNNN"),
710 /// and the helper pads the output to the column's precision so that
711 /// snapshot and binlog paths produce identical text for the same
712 /// upstream row.
713 #[mz_ore::test]
714 fn timestamp_value_bytes_zero_with_fractional_is_sentinel() {
715 let col = timestamp_text_col(6);
716 let s = pack_one(Value::Bytes(b"0.000000".to_vec()), &col).unwrap();
717 assert_eq!(s, "0000-00-00 00:00:00.000000");
718 }
719
720 /// Fractional form of the TIMESTAMP2 binlog encoding —
721 /// "<sec>.<usec>" wrapped in Value::Bytes (binlog/value.rs:151-153).
722 /// Hits the `s.contains('.')` branch and the precision-aware
723 /// reformat.
724 #[mz_ore::test]
725 fn timestamp_value_bytes_epoch_fractional() {
726 let col = timestamp_text_col(6);
727 let s = pack_one(Value::Bytes(b"1743661234.123456".to_vec()), &col).unwrap();
728 assert_eq!(s, "2025-04-03 06:20:34.123456");
729 }
730
731 /// Bytes that aren't valid UTF-8 should produce a meaningful error,
732 /// not a panic.
733 #[mz_ore::test]
734 fn timestamp_value_bytes_invalid_utf8_errors() {
735 let col = timestamp_text_col(0);
736 // 0xC3 0x28 is an invalid 2-byte UTF-8 sequence.
737 let err = pack_one(Value::Bytes(vec![0xC3, 0x28]), &col).unwrap_err();
738 let msg = err.to_string();
739 assert!(
740 msg.contains("invalid timestamp value"),
741 "unexpected error message: {msg}"
742 );
743 }
744
745 /// Bytes that are valid UTF-8 but not parseable as a unix epoch
746 /// should produce the same structured error as invalid UTF-8 —
747 /// covers the chrono parse failure path that
748 /// `timestamp_value_bytes_invalid_utf8_errors` doesn't reach.
749 #[mz_ore::test]
750 fn timestamp_value_bytes_unparseable_errors() {
751 let col = timestamp_text_col(0);
752 // "2024-04-03 10:15:13" is not valid because Value::Bytes must contain seconds since
753 // epoch, Value::Bytes("<sec>"/"<sec>.<usec>")
754 for payload in [&b""[..], &b"not-an-epoch"[..], &b"2024-04-03 10:15:13"[..]] {
755 let err = pack_one(Value::Bytes(payload.to_vec()), &col).unwrap_err();
756 assert!(
757 err.to_string().contains("invalid timestamp value"),
758 "payload {payload:?}: unexpected error message: {err}"
759 );
760 }
761 }
762
763 /// Variants that have no defined mapping for a TIMESTAMP column
764 /// must still produce the existing structured decode error so the
765 /// source health surface can flag them.
766 #[mz_ore::test]
767 fn timestamp_value_unsupported_variant_errors() {
768 let col = timestamp_text_col(0);
769 let err = pack_one(Value::Float(1.0), &col).unwrap_err();
770 let msg = err.to_string();
771 assert!(
772 msg.contains("unexpected value for timestamp"),
773 "unexpected error message: {msg}"
774 );
775 }
776}