1use std::str::FromStr;
11
12use itertools::{EitherOrBoth, Itertools};
13use mysql_common::value::convert::from_value_opt;
14use mysql_common::{Row as MySqlRow, Value};
15
16use mz_ore::cast::CastFrom;
17use mz_ore::error::ErrorExt;
18use mz_repr::adt::date::Date;
19use mz_repr::adt::jsonb::JsonbPacker;
20use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, Numeric, get_precision, get_scale};
21use mz_repr::adt::timestamp::CheckedTimestamp;
22use mz_repr::{Datum, Row, RowPacker, SqlScalarType};
23
24use crate::desc::MySqlColumnMeta;
25use crate::{MySqlColumnDesc, MySqlError, MySqlTableDesc};
26
27pub fn pack_mysql_row(
28 row_container: &mut Row,
29 row: MySqlRow,
30 table_desc: &MySqlTableDesc,
31) -> Result<Row, MySqlError> {
32 let mut packer = row_container.packer();
33 let row_values = row.unwrap();
34
35 for values in table_desc.columns.iter().zip_longest(row_values) {
36 let (col_desc, value) = match values {
37 EitherOrBoth::Both(col_desc, value) => (col_desc, value),
38 EitherOrBoth::Left(col_desc) => {
39 tracing::error!(
40 "mysql: extra column description {col_desc:?} for table {}",
41 table_desc.name
42 );
43 Err(MySqlError::ValueDecodeError {
44 column_name: col_desc.name.clone(),
45 qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name),
46 error: "extra column description".to_string(),
47 })?
48 }
49 EitherOrBoth::Right(_) => {
50 break;
52 }
53 };
54 if col_desc.column_type.is_none() {
55 continue;
57 }
58 match pack_val_as_datum(value, col_desc, &mut packer) {
59 Err(err) => Err(MySqlError::ValueDecodeError {
60 column_name: col_desc.name.clone(),
61 qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name),
62 error: err.to_string(),
63 })?,
64 Ok(()) => (),
65 };
66 }
67
68 Ok(row_container.clone())
69}
70
71fn pack_val_as_datum(
74 value: Value,
75 col_desc: &MySqlColumnDesc,
76 packer: &mut RowPacker,
77) -> Result<(), anyhow::Error> {
78 let column_type = match col_desc.column_type {
79 Some(ref column_type) => column_type,
80 None => anyhow::bail!("column type is not set for column: {}", col_desc.name),
81 };
82 match value {
83 Value::NULL => {
84 if column_type.nullable {
85 packer.push(Datum::Null);
86 } else {
87 Err(anyhow::anyhow!(
88 "received a null value in a non-null column".to_string(),
89 ))?
90 }
91 }
92 value => match &column_type.scalar_type {
93 SqlScalarType::Bool => packer.push(Datum::from(from_value_opt::<bool>(value)?)),
94 SqlScalarType::UInt16 => packer.push(Datum::from(from_value_opt::<u16>(value)?)),
95 SqlScalarType::Int16 => packer.push(Datum::from(from_value_opt::<i16>(value)?)),
96 SqlScalarType::UInt32 => packer.push(Datum::from(from_value_opt::<u32>(value)?)),
97 SqlScalarType::Int32 => packer.push(Datum::from(from_value_opt::<i32>(value)?)),
98 SqlScalarType::UInt64 => {
99 if let Some(MySqlColumnMeta::Bit(precision)) = &col_desc.meta {
100 let mut value = from_value_opt::<Vec<u8>>(value)?;
101
102 let precision_bytes = (precision + 7) / 8;
104 if value.len() != usize::cast_from(precision_bytes) {
105 return Err(anyhow::anyhow!("'bit' column out of range!"));
106 }
107 let bit_index = precision % 8;
110 if bit_index != 0 {
111 let mask = !(u8::MAX << bit_index);
112 if value.len() > 0 {
113 value[0] &= mask;
114 }
115 }
116
117 let mut buf = [0u8; 8];
120 buf[(8 - value.len())..].copy_from_slice(value.as_slice());
121 let value = u64::from_be_bytes(buf);
122 packer.push(Datum::from(value))
123 } else {
124 packer.push(Datum::from(from_value_opt::<u64>(value)?))
125 }
126 }
127 SqlScalarType::Int64 => packer.push(Datum::from(from_value_opt::<i64>(value)?)),
128 SqlScalarType::Float32 => packer.push(Datum::from(from_value_opt::<f32>(value)?)),
129 SqlScalarType::Float64 => packer.push(Datum::from(from_value_opt::<f64>(value)?)),
130 SqlScalarType::Char { length } => {
131 let val = from_value_opt::<String>(value)?;
132 check_char_length(length.map(|l| l.into_u32()), &val, col_desc)?;
133 packer.push(Datum::String(&val));
134 }
135 SqlScalarType::VarChar { max_length } => {
136 let val = from_value_opt::<String>(value)?;
137 check_char_length(max_length.map(|l| l.into_u32()), &val, col_desc)?;
138 packer.push(Datum::String(&val));
139 }
140 SqlScalarType::String => {
141 match &col_desc.meta {
145 Some(MySqlColumnMeta::Enum(e)) => {
146 match value {
147 Value::Bytes(data) => {
148 let data = std::str::from_utf8(&data)?;
149 packer.push(Datum::String(data));
150 }
151 Value::Int(val) => {
152 let enum_int = usize::try_from(val)?;
153
154 if enum_int == 0 {
158 packer.push(Datum::String(""));
159 } else {
160 let enum_val = e.values.get(enum_int - 1).ok_or_else(|| {
163 anyhow::anyhow!(
164 "received invalid enum value: {} for column {}",
165 val,
166 col_desc.name
167 )
168 })?;
169
170 packer.push(Datum::String(enum_val));
171 }
172 }
173 _ => Err(anyhow::anyhow!(
174 "received unexpected value for enum type: {:?}",
175 value
176 ))?,
177 }
178 }
179 Some(MySqlColumnMeta::Json) => {
180 if let Value::Bytes(data) = value {
184 let json = serde_json::from_slice::<serde_json::Value>(&data)?;
185 packer.push(Datum::String(&json.to_string()));
186 } else {
187 Err(anyhow::anyhow!(
188 "received unexpected value for json type: {:?}",
189 value
190 ))?;
191 }
192 }
193 Some(MySqlColumnMeta::Year) => {
194 let val = from_value_opt::<u16>(value)?;
195 packer.push(Datum::String(&val.to_string()));
196 }
197 Some(MySqlColumnMeta::Date) => {
198 if let Value::Date(y, m, d, 0, 0, 0, 0) = value {
201 packer.push(Datum::String(&format!("{:04}-{:02}-{:02}", y, m, d)));
202 } else {
203 Err(anyhow::anyhow!(
204 "received unexpected value for date type: {:?}",
205 value
206 ))?;
207 }
208 }
209 Some(MySqlColumnMeta::Timestamp(precision)) => {
210 if let Value::Date(y, m, d, h, mm, s, ms) = value {
213 if *precision > 0 {
214 let precision: usize = (*precision).try_into()?;
215 packer.push(Datum::String(&format!(
216 "{:04}-{:02}-{:02} {:02}:{:02}:{:02}.{:0precision$}",
217 y,
218 m,
219 d,
220 h,
221 mm,
222 s,
223 ms,
224 precision = precision
225 )));
226 } else {
227 packer.push(Datum::String(&format!(
228 "{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
229 y, m, d, h, mm, s
230 )));
231 }
232 } else {
233 Err(anyhow::anyhow!(
234 "received unexpected value for timestamp type: {:?}",
235 value
236 ))?;
237 }
238 }
239 Some(MySqlColumnMeta::Bit(_)) => unreachable!("parsed as a u64"),
240 None => {
241 packer.push(Datum::String(&from_value_opt::<String>(value)?));
242 }
243 }
244 }
245 SqlScalarType::Jsonb => {
246 if let Value::Bytes(data) = value {
247 let packer = JsonbPacker::new(packer);
248 packer.pack_slice(&data).map_err(|e| {
252 anyhow::anyhow!(
253 "Failed to decode JSON: {}",
254 match std::str::from_utf8(&data) {
256 Ok(str) => str.to_string(),
257 Err(_) => e.display_with_causes().to_string(),
259 }
260 )
261 })?;
262 } else {
263 Err(anyhow::anyhow!(
264 "received unexpected value for json type: {:?}",
265 value
266 ))?
267 }
268 }
269 SqlScalarType::Bytes => {
270 let data = from_value_opt::<Vec<u8>>(value)?;
271 packer.push(Datum::Bytes(&data));
272 }
273 SqlScalarType::Date => {
274 let date = Date::try_from(from_value_opt::<chrono::NaiveDate>(value)?)?;
275 packer.push(Datum::from(date));
276 }
277 SqlScalarType::Timestamp { precision: _ } => {
278 let chrono_timestamp = match value {
284 Value::Date(..) => from_value_opt::<chrono::NaiveDateTime>(value)?,
285 Value::Int(val) => chrono::DateTime::from_timestamp(val, 0)
287 .ok_or_else(|| {
288 anyhow::anyhow!("received invalid timestamp value: {}", val)
289 })?
290 .naive_utc(),
291 Value::Bytes(data) => {
292 let data = std::str::from_utf8(&data)?;
293 if data.contains('.') {
294 chrono::NaiveDateTime::parse_from_str(data, "%s%.6f")?
295 } else {
296 chrono::NaiveDateTime::parse_from_str(data, "%s")?
297 }
298 }
299 _ => Err(anyhow::anyhow!(
300 "received unexpected value for timestamp type: {:?}",
301 value
302 ))?,
303 };
304 packer.push(Datum::try_from(CheckedTimestamp::try_from(
305 chrono_timestamp,
306 )?)?);
307 }
308 SqlScalarType::Time => {
309 packer.push(Datum::from(from_value_opt::<chrono::NaiveTime>(value)?));
310 }
311 SqlScalarType::Numeric { max_scale } => {
312 let val = from_value_opt::<String>(value)?;
317 let val = Numeric::from_str(&val)?;
318 if get_precision(&val) > NUMERIC_DATUM_MAX_PRECISION.into() {
319 Err(anyhow::anyhow!(
320 "received numeric value with precision {} for column {} which has a max precision of {}",
321 get_precision(&val),
322 col_desc.name,
323 NUMERIC_DATUM_MAX_PRECISION
324 ))?
325 }
326 if let Some(max_scale) = max_scale {
327 if get_scale(&val) > max_scale.into_u8().into() {
328 Err(anyhow::anyhow!(
329 "received numeric value with scale {} for column {} which has a max scale of {}",
330 get_scale(&val),
331 col_desc.name,
332 max_scale.into_u8()
333 ))?
334 }
335 }
336 packer.push(Datum::from(val));
337 }
338 data_type => Err(anyhow::anyhow!(
340 "received unexpected value for type: {:?}: {:?}",
341 data_type,
342 value
343 ))?,
344 },
345 }
346 Ok(())
347}
348
349fn check_char_length(
350 length: Option<u32>,
351 val: &str,
352 col_desc: &MySqlColumnDesc,
353) -> Result<(), anyhow::Error> {
354 if let Some(length) = length {
355 if let Some(_) = val.char_indices().nth(usize::cast_from(length)) {
356 Err(anyhow::anyhow!(
357 "received string value of length {} for column {} which has a max length of {}",
358 val.len(),
359 col_desc.name,
360 length
361 ))?
362 }
363 }
364 Ok(())
365}