mysql_common/binlog/
value.rs
1use std::{convert::TryFrom, io};
10
11use crate::{
12 binlog::{decimal, jsonb, jsondiff::JsonDiff, misc::*},
13 constants::{ColumnFlags, ColumnType},
14 io::ParseBuf,
15 misc::raw::int::*,
16 proto::MyDeserialize,
17 value::Value::{self, *},
18};
19
20use super::jsonb::JsonbToJsonError;
21
22#[derive(Debug, Clone, PartialEq)]
24pub enum BinlogValue<'a> {
25 Value(Value),
27 Jsonb(jsonb::Value<'a>),
29 JsonDiff(Vec<JsonDiff<'a>>),
31}
32
33impl<'a> BinlogValue<'a> {
34 pub fn into_owned(self) -> BinlogValue<'static> {
36 match self {
37 BinlogValue::Value(x) => BinlogValue::Value(x),
38 BinlogValue::Jsonb(x) => BinlogValue::Jsonb(x.into_owned()),
39 BinlogValue::JsonDiff(x) => {
40 BinlogValue::JsonDiff(x.into_iter().map(|x| x.into_owned()).collect())
41 }
42 }
43 }
44}
45
46impl<'de> MyDeserialize<'de> for BinlogValue<'de> {
47 const SIZE: Option<usize> = None;
48 type Ctx = (ColumnType, &'de [u8], bool, bool);
50
51 fn deserialize(
52 (mut col_type, col_meta, is_unsigned, is_partial): Self::Ctx,
53 buf: &mut ParseBuf<'de>,
54 ) -> io::Result<Self> {
55 use ColumnType::*;
56
57 let mut length = 0_usize;
58
59 if col_type == MYSQL_TYPE_TYPED_ARRAY {
60 let type_byte = col_meta[0];
61 col_type = ColumnType::try_from(type_byte).unwrap_or(col_type);
62 }
63
64 if col_type == MYSQL_TYPE_STRING {
65 if col_meta[0] >= 1 {
66 let byte0 = col_meta[0] as usize;
67 let byte1 = col_meta[1] as usize;
68
69 if (byte0 & 0x30) != 0x30 {
70 length = byte1 | (((byte0 & 0x30) ^ 0x30) << 4);
72 } else {
73 length = byte1;
74 }
75 } else {
76 length = (ParseBuf(col_meta)).eat_u16_le() as usize;
77 }
78 }
79
80 match col_type {
81 MYSQL_TYPE_TINY | MYSQL_TYPE_SHORT | MYSQL_TYPE_LONG | MYSQL_TYPE_LONGLONG
82 | MYSQL_TYPE_FLOAT | MYSQL_TYPE_DOUBLE => {
83 let mut flags = ColumnFlags::empty();
84 flags.set(ColumnFlags::UNSIGNED_FLAG, is_unsigned);
85 Value::deserialize_bin((col_type, flags), &mut *buf).map(BinlogValue::Value)
86 }
87 MYSQL_TYPE_TIMESTAMP => {
88 let val: RawInt<LeU32> = buf.parse(())?;
89 Ok(BinlogValue::Value(Int(*val as i64)))
90 }
91 MYSQL_TYPE_INT24 => {
92 if is_unsigned {
93 let val: RawInt<LeU24> = buf.parse(())?;
94 Ok(BinlogValue::Value(Int(*val as i64)))
95 } else {
96 let val: RawInt<LeI24> = buf.parse(())?;
97 Ok(BinlogValue::Value(Int(*val as i64)))
98 }
99 }
100 MYSQL_TYPE_TIME => {
101 let tmp: RawInt<LeU24> = buf.parse(())?;
102 let h = *tmp / 10000;
103 let m = (*tmp % 10000) / 100;
104 let s = *tmp % 100;
105 Ok(BinlogValue::Value(Time(
106 false, 0, h as u8, m as u8, s as u8, 0,
107 )))
108 }
109 MYSQL_TYPE_DATETIME => {
110 let raw: RawInt<LeU64> = buf.parse(())?;
112 let d_part = *raw / 1_000_000;
113 let t_part = *raw % 1_000_000;
114 Ok(BinlogValue::Value(Date(
115 (d_part / 10000) as u16,
116 ((d_part % 10000) / 100) as u8,
117 (d_part % 100) as u8,
118 (t_part / 10000) as u8,
119 ((t_part % 10000) / 100) as u8,
120 (t_part % 100) as u8,
121 0,
122 )))
123 }
124 MYSQL_TYPE_YEAR => {
125 let y = *buf.parse::<RawInt<u8>>(())? as i32;
126 Ok(BinlogValue::Value(Bytes(
127 (1900 + y).to_string().into_bytes(),
128 )))
129 }
130 MYSQL_TYPE_NEWDATE => {
131 let tmp = *buf.parse::<RawInt<LeU24>>(())?;
132 let d = tmp & 31;
133 let m = (tmp >> 5) & 15;
134 let y = tmp >> 9;
135 Ok(BinlogValue::Value(Date(
136 y as u16, m as u8, d as u8, 0, 0, 0, 0,
137 )))
138 }
139 MYSQL_TYPE_BIT => {
140 let nbits = col_meta[1] as usize * 8 + (col_meta[0] as usize);
141 let nbytes = (nbits + 7) / 8;
142 let bytes: &[u8] = buf.parse(nbytes)?;
143 Ok(BinlogValue::Value(Bytes(bytes.into())))
144 }
145 MYSQL_TYPE_TIMESTAMP2 => {
146 let dec = col_meta[0];
147 let (sec, usec) = my_timestamp_from_binary(&mut *buf, dec)?;
148 if usec == 0 {
149 Ok(BinlogValue::Value(Bytes(sec.to_string().into_bytes())))
150 } else {
151 Ok(BinlogValue::Value(Bytes(
152 format!("{}.{:06}", sec, usec).into_bytes(),
153 )))
154 }
155 }
156 MYSQL_TYPE_DATETIME2 => {
157 let dec = col_meta[0];
158 my_datetime_packed_from_binary(&mut *buf, dec as u32)
159 .map(datetime_from_packed)
160 .map(BinlogValue::Value)
161 }
162 MYSQL_TYPE_TIME2 => {
163 let dec = col_meta[0];
164 my_time_packed_from_binary(&mut *buf, dec as u32)
165 .map(time_from_packed)
166 .map(BinlogValue::Value)
167 }
168 MYSQL_TYPE_JSON => {
169 length = *buf.parse::<RawInt<LeU32>>(())? as usize;
170 let mut json_value_buf: ParseBuf = buf.parse(length)?;
171 if is_partial {
172 let mut diffs = Vec::new();
173 while !json_value_buf.is_empty() {
174 diffs.push(json_value_buf.parse(())?);
175 }
176 Ok(BinlogValue::JsonDiff(diffs))
177 } else {
178 Ok(BinlogValue::Jsonb(json_value_buf.parse(())?))
179 }
180 }
181 MYSQL_TYPE_NEWDECIMAL => {
182 let precision = col_meta[0] as usize;
184 let scale = col_meta[1] as usize;
186
187 let dec = decimal::Decimal::read_bin(&mut *buf, precision, scale, false)?;
188
189 Ok(BinlogValue::Value(Bytes(dec.to_string().into_bytes())))
190 }
191 MYSQL_TYPE_ENUM => match col_meta[1] {
192 1 => {
193 let val = buf.parse::<RawInt<u8>>(())?;
194 Ok(BinlogValue::Value(Int(*val as i64)))
195 }
196 2 => {
197 let val = buf.parse::<RawInt<LeU16>>(())?;
198 Ok(BinlogValue::Value(Int(*val as i64)))
199 }
200 _ => Err(io::Error::new(io::ErrorKind::InvalidData, "Unknown ENUM")),
201 },
202 MYSQL_TYPE_SET => {
203 let nbytes = col_meta[1] as usize;
204 let bytes: &[u8] = buf.parse(nbytes)?;
205 Ok(BinlogValue::Value(Bytes(bytes.into())))
206 }
207 MYSQL_TYPE_TINY_BLOB
208 | MYSQL_TYPE_MEDIUM_BLOB
209 | MYSQL_TYPE_LONG_BLOB
210 | MYSQL_TYPE_BLOB
211 | MYSQL_TYPE_GEOMETRY
212 | MYSQL_TYPE_VECTOR => {
213 let nbytes = match col_meta[0] {
214 1 => *buf.parse::<RawInt<u8>>(())? as usize,
215 2 => *buf.parse::<RawInt<LeU16>>(())? as usize,
216 3 => *buf.parse::<RawInt<LeU24>>(())? as usize,
217 4 => *buf.parse::<RawInt<LeU32>>(())? as usize,
218 _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "Unknown BLOB")),
219 };
220 let bytes: &[u8] = buf.parse(nbytes)?;
221 Ok(BinlogValue::Value(Bytes(bytes.into())))
222 }
223 MYSQL_TYPE_VARCHAR | MYSQL_TYPE_VAR_STRING => {
224 let type_len = (col_meta[0] as u16 | ((col_meta[1] as u16) << 8)) as usize;
225 let nbytes = if type_len < 256 {
226 *buf.parse::<RawInt<u8>>(())? as usize
227 } else {
228 *buf.parse::<RawInt<LeU16>>(())? as usize
229 };
230 let bytes: &[u8] = buf.parse(nbytes)?;
231 Ok(BinlogValue::Value(Bytes(bytes.into())))
232 }
233 MYSQL_TYPE_STRING => {
234 let nbytes = if length < 256 {
235 *buf.parse::<RawInt<u8>>(())? as usize
236 } else {
237 *buf.parse::<RawInt<LeU16>>(())? as usize
238 };
239 let bytes: &[u8] = buf.parse(nbytes)?;
240 Ok(BinlogValue::Value(Bytes(bytes.into())))
241 }
242 _ => Err(io::Error::new(
243 io::ErrorKind::InvalidData,
244 "Don't know how to handle column",
245 )),
246 }
247 }
248}
249
250#[derive(Debug, thiserror::Error)]
251pub enum BinlogValueToValueError {
252 #[error("Can't convert Jsonb to Json: {}", _0)]
253 ToJson(#[from] JsonbToJsonError),
254 #[error("Impossible to convert JsonDiff to Value")]
255 JsonDiff,
256}
257
258impl<'a> TryFrom<BinlogValue<'a>> for Value {
259 type Error = BinlogValueToValueError;
260
261 fn try_from(value: BinlogValue<'a>) -> Result<Self, Self::Error> {
262 match value {
263 BinlogValue::Value(x) => Ok(x),
264 BinlogValue::Jsonb(x) => {
265 let json = serde_json::Value::try_from(x)?;
266 Ok(Value::Bytes(Vec::from(json.to_string())))
267 }
268 BinlogValue::JsonDiff(_) => Err(BinlogValueToValueError::JsonDiff),
269 }
270 }
271}