mysql_common/binlog/
value.rsuse std::{convert::TryFrom, io};
use crate::{
binlog::{decimal, jsonb, jsondiff::JsonDiff, misc::*},
constants::{ColumnFlags, ColumnType},
io::ParseBuf,
misc::raw::int::*,
proto::MyDeserialize,
value::Value::{self, *},
};
use super::jsonb::JsonbToJsonError;
#[derive(Debug, Clone, PartialEq)]
pub enum BinlogValue<'a> {
Value(Value),
Jsonb(jsonb::Value<'a>),
JsonDiff(Vec<JsonDiff<'a>>),
}
impl<'a> BinlogValue<'a> {
pub fn into_owned(self) -> BinlogValue<'static> {
match self {
BinlogValue::Value(x) => BinlogValue::Value(x),
BinlogValue::Jsonb(x) => BinlogValue::Jsonb(x.into_owned()),
BinlogValue::JsonDiff(x) => {
BinlogValue::JsonDiff(x.into_iter().map(|x| x.into_owned()).collect())
}
}
}
}
impl<'de> MyDeserialize<'de> for BinlogValue<'de> {
const SIZE: Option<usize> = None;
type Ctx = (ColumnType, &'de [u8], bool, bool);
fn deserialize(
(mut col_type, col_meta, is_unsigned, is_partial): Self::Ctx,
buf: &mut ParseBuf<'de>,
) -> io::Result<Self> {
use ColumnType::*;
let mut length = 0_usize;
if col_type == MYSQL_TYPE_TYPED_ARRAY {
let type_byte = col_meta[0];
col_type = ColumnType::try_from(type_byte).unwrap_or(col_type);
}
if col_type == MYSQL_TYPE_STRING {
if col_meta[0] >= 1 {
let byte0 = col_meta[0] as usize;
let byte1 = col_meta[1] as usize;
if (byte0 & 0x30) != 0x30 {
length = byte1 | (((byte0 & 0x30) ^ 0x30) << 4);
} else {
length = byte1;
}
} else {
length = (ParseBuf(col_meta)).eat_u16_le() as usize;
}
}
match col_type {
MYSQL_TYPE_TINY | MYSQL_TYPE_SHORT | MYSQL_TYPE_LONG | MYSQL_TYPE_LONGLONG
| MYSQL_TYPE_FLOAT | MYSQL_TYPE_DOUBLE => {
let mut flags = ColumnFlags::empty();
flags.set(ColumnFlags::UNSIGNED_FLAG, is_unsigned);
Value::deserialize_bin((col_type, flags), &mut *buf).map(BinlogValue::Value)
}
MYSQL_TYPE_TIMESTAMP => {
let val: RawInt<LeU32> = buf.parse(())?;
Ok(BinlogValue::Value(Int(*val as i64)))
}
MYSQL_TYPE_INT24 => {
if is_unsigned {
let val: RawInt<LeU24> = buf.parse(())?;
Ok(BinlogValue::Value(Int(*val as i64)))
} else {
let val: RawInt<LeI24> = buf.parse(())?;
Ok(BinlogValue::Value(Int(*val as i64)))
}
}
MYSQL_TYPE_TIME => {
let tmp: RawInt<LeU24> = buf.parse(())?;
let h = *tmp / 10000;
let m = (*tmp % 10000) / 100;
let s = *tmp % 100;
Ok(BinlogValue::Value(Time(
false, 0, h as u8, m as u8, s as u8, 0,
)))
}
MYSQL_TYPE_DATETIME => {
let raw: RawInt<LeU64> = buf.parse(())?;
let d_part = *raw / 1_000_000;
let t_part = *raw % 1_000_000;
Ok(BinlogValue::Value(Date(
(d_part / 10000) as u16,
((d_part % 10000) / 100) as u8,
(d_part % 100) as u8,
(t_part / 10000) as u8,
((t_part % 10000) / 100) as u8,
(t_part % 100) as u8,
0,
)))
}
MYSQL_TYPE_YEAR => {
let y = *buf.parse::<RawInt<u8>>(())? as i32;
Ok(BinlogValue::Value(Bytes(
(1900 + y).to_string().into_bytes(),
)))
}
MYSQL_TYPE_NEWDATE => {
let tmp = *buf.parse::<RawInt<LeU24>>(())?;
let d = tmp & 31;
let m = (tmp >> 5) & 15;
let y = tmp >> 9;
Ok(BinlogValue::Value(Date(
y as u16, m as u8, d as u8, 0, 0, 0, 0,
)))
}
MYSQL_TYPE_BIT => {
let nbits = col_meta[1] as usize * 8 + (col_meta[0] as usize);
let nbytes = (nbits + 7) / 8;
let bytes: &[u8] = buf.parse(nbytes)?;
Ok(BinlogValue::Value(Bytes(bytes.into())))
}
MYSQL_TYPE_TIMESTAMP2 => {
let dec = col_meta[0];
let (sec, usec) = my_timestamp_from_binary(&mut *buf, dec)?;
if usec == 0 {
Ok(BinlogValue::Value(Bytes(sec.to_string().into_bytes())))
} else {
Ok(BinlogValue::Value(Bytes(
format!("{}.{:06}", sec, usec).into_bytes(),
)))
}
}
MYSQL_TYPE_DATETIME2 => {
let dec = col_meta[0];
my_datetime_packed_from_binary(&mut *buf, dec as u32)
.map(datetime_from_packed)
.map(BinlogValue::Value)
}
MYSQL_TYPE_TIME2 => {
let dec = col_meta[0];
my_time_packed_from_binary(&mut *buf, dec as u32)
.map(time_from_packed)
.map(BinlogValue::Value)
}
MYSQL_TYPE_JSON => {
length = *buf.parse::<RawInt<LeU32>>(())? as usize;
let mut json_value_buf: ParseBuf = buf.parse(length)?;
if is_partial {
let mut diffs = Vec::new();
while !json_value_buf.is_empty() {
diffs.push(json_value_buf.parse(())?);
}
Ok(BinlogValue::JsonDiff(diffs))
} else {
Ok(BinlogValue::Jsonb(json_value_buf.parse(())?))
}
}
MYSQL_TYPE_NEWDECIMAL => {
let precision = col_meta[0] as usize;
let scale = col_meta[1] as usize;
let dec = decimal::Decimal::read_bin(&mut *buf, precision, scale, false)?;
Ok(BinlogValue::Value(Bytes(dec.to_string().into_bytes())))
}
MYSQL_TYPE_ENUM => match col_meta[1] {
1 => {
let val = buf.parse::<RawInt<u8>>(())?;
Ok(BinlogValue::Value(Int(*val as i64)))
}
2 => {
let val = buf.parse::<RawInt<LeU16>>(())?;
Ok(BinlogValue::Value(Int(*val as i64)))
}
_ => Err(io::Error::new(io::ErrorKind::InvalidData, "Unknown ENUM")),
},
MYSQL_TYPE_SET => {
let nbytes = col_meta[1] as usize;
let bytes: &[u8] = buf.parse(nbytes)?;
Ok(BinlogValue::Value(Bytes(bytes.into())))
}
MYSQL_TYPE_TINY_BLOB
| MYSQL_TYPE_MEDIUM_BLOB
| MYSQL_TYPE_LONG_BLOB
| MYSQL_TYPE_BLOB
| MYSQL_TYPE_GEOMETRY => {
let nbytes = match col_meta[0] {
1 => *buf.parse::<RawInt<u8>>(())? as usize,
2 => *buf.parse::<RawInt<LeU16>>(())? as usize,
3 => *buf.parse::<RawInt<LeU24>>(())? as usize,
4 => *buf.parse::<RawInt<LeU32>>(())? as usize,
_ => return Err(io::Error::new(io::ErrorKind::InvalidData, "Unknown BLOB")),
};
let bytes: &[u8] = buf.parse(nbytes)?;
Ok(BinlogValue::Value(Bytes(bytes.into())))
}
MYSQL_TYPE_VARCHAR | MYSQL_TYPE_VAR_STRING => {
let type_len = (col_meta[0] as u16 | ((col_meta[1] as u16) << 8)) as usize;
let nbytes = if type_len < 256 {
*buf.parse::<RawInt<u8>>(())? as usize
} else {
*buf.parse::<RawInt<LeU16>>(())? as usize
};
let bytes: &[u8] = buf.parse(nbytes)?;
Ok(BinlogValue::Value(Bytes(bytes.into())))
}
MYSQL_TYPE_STRING => {
let nbytes = if length < 256 {
*buf.parse::<RawInt<u8>>(())? as usize
} else {
*buf.parse::<RawInt<LeU16>>(())? as usize
};
let bytes: &[u8] = buf.parse(nbytes)?;
Ok(BinlogValue::Value(Bytes(bytes.into())))
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
"Don't know how to handle column",
)),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum BinlogValueToValueError {
#[error("Can't convert Jsonb to Json: {}", _0)]
ToJson(#[from] JsonbToJsonError),
#[error("Impossible to convert JsonDiff to Value")]
JsonDiff,
}
impl<'a> TryFrom<BinlogValue<'a>> for Value {
type Error = BinlogValueToValueError;
fn try_from(value: BinlogValue<'a>) -> Result<Self, Self::Error> {
match value {
BinlogValue::Value(x) => Ok(x),
BinlogValue::Jsonb(x) => {
let json = serde_json::Value::try_from(x)?;
Ok(Value::Bytes(Vec::from(json.to_string())))
}
BinlogValue::JsonDiff(_) => Err(BinlogValueToValueError::JsonDiff),
}
}
}