mysql_common/binlog/
value.rs

1// Copyright (c) 2021 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use std::{convert::TryFrom, io};
10
11use crate::{
12    binlog::{decimal, jsonb, jsondiff::JsonDiff, misc::*},
13    constants::{ColumnFlags, ColumnType},
14    io::ParseBuf,
15    misc::raw::int::*,
16    proto::MyDeserialize,
17    value::Value::{self, *},
18};
19
20use super::jsonb::JsonbToJsonError;
21
22/// Value of a binlog event.
23#[derive(Debug, Clone, PartialEq)]
24pub enum BinlogValue<'a> {
25    /// MySql value.
26    Value(Value),
27    /// JSONB value.
28    Jsonb(jsonb::Value<'a>),
29    /// Value of a partial JSON modification event.
30    JsonDiff(Vec<JsonDiff<'a>>),
31}
32
33impl<'a> BinlogValue<'a> {
34    /// Returns a `'static` version of `self`.
35    pub fn into_owned(self) -> BinlogValue<'static> {
36        match self {
37            BinlogValue::Value(x) => BinlogValue::Value(x),
38            BinlogValue::Jsonb(x) => BinlogValue::Jsonb(x.into_owned()),
39            BinlogValue::JsonDiff(x) => {
40                BinlogValue::JsonDiff(x.into_iter().map(|x| x.into_owned()).collect())
41            }
42        }
43    }
44}
45
46impl<'de> MyDeserialize<'de> for BinlogValue<'de> {
47    const SIZE: Option<usize> = None;
48    /// <col_type, col_meta, is_unsigned, is_partial>
49    type Ctx = (ColumnType, &'de [u8], bool, bool);
50
51    fn deserialize(
52        (mut col_type, col_meta, is_unsigned, is_partial): Self::Ctx,
53        buf: &mut ParseBuf<'de>,
54    ) -> io::Result<Self> {
55        use ColumnType::*;
56
57        let mut length = 0_usize;
58
59        if col_type == MYSQL_TYPE_TYPED_ARRAY {
60            let type_byte = col_meta[0];
61            col_type = ColumnType::try_from(type_byte).unwrap_or(col_type);
62        }
63
64        if col_type == MYSQL_TYPE_STRING {
65            if col_meta[0] >= 1 {
66                let byte0 = col_meta[0] as usize;
67                let byte1 = col_meta[1] as usize;
68
69                if (byte0 & 0x30) != 0x30 {
70                    // a long CHAR() field: see #37426
71                    length = byte1 | (((byte0 & 0x30) ^ 0x30) << 4);
72                } else {
73                    length = byte1;
74                }
75            } else {
76                length = (ParseBuf(col_meta)).eat_u16_le() as usize;
77            }
78        }
79
80        match col_type {
81            MYSQL_TYPE_TINY | MYSQL_TYPE_SHORT | MYSQL_TYPE_LONG | MYSQL_TYPE_LONGLONG
82            | MYSQL_TYPE_FLOAT | MYSQL_TYPE_DOUBLE => {
83                let mut flags = ColumnFlags::empty();
84                flags.set(ColumnFlags::UNSIGNED_FLAG, is_unsigned);
85                Value::deserialize_bin((col_type, flags), &mut *buf).map(BinlogValue::Value)
86            }
87            MYSQL_TYPE_TIMESTAMP => {
88                let val: RawInt<LeU32> = buf.parse(())?;
89                Ok(BinlogValue::Value(Int(*val as i64)))
90            }
91            MYSQL_TYPE_INT24 => {
92                if is_unsigned {
93                    let val: RawInt<LeU24> = buf.parse(())?;
94                    Ok(BinlogValue::Value(Int(*val as i64)))
95                } else {
96                    let val: RawInt<LeI24> = buf.parse(())?;
97                    Ok(BinlogValue::Value(Int(*val as i64)))
98                }
99            }
100            MYSQL_TYPE_TIME => {
101                let tmp: RawInt<LeU24> = buf.parse(())?;
102                let h = *tmp / 10000;
103                let m = (*tmp % 10000) / 100;
104                let s = *tmp % 100;
105                Ok(BinlogValue::Value(Time(
106                    false, 0, h as u8, m as u8, s as u8, 0,
107                )))
108            }
109            MYSQL_TYPE_DATETIME => {
110                // read YYYYMMDDHHMMSS representaion
111                let raw: RawInt<LeU64> = buf.parse(())?;
112                let d_part = *raw / 1_000_000;
113                let t_part = *raw % 1_000_000;
114                Ok(BinlogValue::Value(Date(
115                    (d_part / 10000) as u16,
116                    ((d_part % 10000) / 100) as u8,
117                    (d_part % 100) as u8,
118                    (t_part / 10000) as u8,
119                    ((t_part % 10000) / 100) as u8,
120                    (t_part % 100) as u8,
121                    0,
122                )))
123            }
124            MYSQL_TYPE_YEAR => {
125                let y = *buf.parse::<RawInt<u8>>(())? as i32;
126                Ok(BinlogValue::Value(Bytes(
127                    (1900 + y).to_string().into_bytes(),
128                )))
129            }
130            MYSQL_TYPE_NEWDATE => {
131                let tmp = *buf.parse::<RawInt<LeU24>>(())?;
132                let d = tmp & 31;
133                let m = (tmp >> 5) & 15;
134                let y = tmp >> 9;
135                Ok(BinlogValue::Value(Date(
136                    y as u16, m as u8, d as u8, 0, 0, 0, 0,
137                )))
138            }
139            MYSQL_TYPE_BIT => {
140                let nbits = col_meta[1] as usize * 8 + (col_meta[0] as usize);
141                let nbytes = (nbits + 7) / 8;
142                let bytes: &[u8] = buf.parse(nbytes)?;
143                Ok(BinlogValue::Value(Bytes(bytes.into())))
144            }
145            MYSQL_TYPE_TIMESTAMP2 => {
146                let dec = col_meta[0];
147                let (sec, usec) = my_timestamp_from_binary(&mut *buf, dec)?;
148                if usec == 0 {
149                    Ok(BinlogValue::Value(Bytes(sec.to_string().into_bytes())))
150                } else {
151                    Ok(BinlogValue::Value(Bytes(
152                        format!("{}.{:06}", sec, usec).into_bytes(),
153                    )))
154                }
155            }
156            MYSQL_TYPE_DATETIME2 => {
157                let dec = col_meta[0];
158                my_datetime_packed_from_binary(&mut *buf, dec as u32)
159                    .map(datetime_from_packed)
160                    .map(BinlogValue::Value)
161            }
162            MYSQL_TYPE_TIME2 => {
163                let dec = col_meta[0];
164                my_time_packed_from_binary(&mut *buf, dec as u32)
165                    .map(time_from_packed)
166                    .map(BinlogValue::Value)
167            }
168            MYSQL_TYPE_JSON => {
169                length = *buf.parse::<RawInt<LeU32>>(())? as usize;
170                let mut json_value_buf: ParseBuf = buf.parse(length)?;
171                if is_partial {
172                    let mut diffs = Vec::new();
173                    while !json_value_buf.is_empty() {
174                        diffs.push(json_value_buf.parse(())?);
175                    }
176                    Ok(BinlogValue::JsonDiff(diffs))
177                } else {
178                    Ok(BinlogValue::Jsonb(json_value_buf.parse(())?))
179                }
180            }
181            MYSQL_TYPE_NEWDECIMAL => {
182                // precision is the maximum number of decimal digits
183                let precision = col_meta[0] as usize;
184                // scale (aka decimals) is the number of decimal digits after the point
185                let scale = col_meta[1] as usize;
186
187                let dec = decimal::Decimal::read_bin(&mut *buf, precision, scale, false)?;
188
189                Ok(BinlogValue::Value(Bytes(dec.to_string().into_bytes())))
190            }
191            MYSQL_TYPE_ENUM => match col_meta[1] {
192                1 => {
193                    let val = buf.parse::<RawInt<u8>>(())?;
194                    Ok(BinlogValue::Value(Int(*val as i64)))
195                }
196                2 => {
197                    let val = buf.parse::<RawInt<LeU16>>(())?;
198                    Ok(BinlogValue::Value(Int(*val as i64)))
199                }
200                _ => Err(io::Error::new(io::ErrorKind::InvalidData, "Unknown ENUM")),
201            },
202            MYSQL_TYPE_SET => {
203                let nbytes = col_meta[1] as usize;
204                let bytes: &[u8] = buf.parse(nbytes)?;
205                Ok(BinlogValue::Value(Bytes(bytes.into())))
206            }
207            MYSQL_TYPE_TINY_BLOB
208            | MYSQL_TYPE_MEDIUM_BLOB
209            | MYSQL_TYPE_LONG_BLOB
210            | MYSQL_TYPE_BLOB
211            | MYSQL_TYPE_GEOMETRY
212            | MYSQL_TYPE_VECTOR => {
213                let nbytes = match col_meta[0] {
214                    1 => *buf.parse::<RawInt<u8>>(())? as usize,
215                    2 => *buf.parse::<RawInt<LeU16>>(())? as usize,
216                    3 => *buf.parse::<RawInt<LeU24>>(())? as usize,
217                    4 => *buf.parse::<RawInt<LeU32>>(())? as usize,
218                    _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "Unknown BLOB")),
219                };
220                let bytes: &[u8] = buf.parse(nbytes)?;
221                Ok(BinlogValue::Value(Bytes(bytes.into())))
222            }
223            MYSQL_TYPE_VARCHAR | MYSQL_TYPE_VAR_STRING => {
224                let type_len = (col_meta[0] as u16 | ((col_meta[1] as u16) << 8)) as usize;
225                let nbytes = if type_len < 256 {
226                    *buf.parse::<RawInt<u8>>(())? as usize
227                } else {
228                    *buf.parse::<RawInt<LeU16>>(())? as usize
229                };
230                let bytes: &[u8] = buf.parse(nbytes)?;
231                Ok(BinlogValue::Value(Bytes(bytes.into())))
232            }
233            MYSQL_TYPE_STRING => {
234                let nbytes = if length < 256 {
235                    *buf.parse::<RawInt<u8>>(())? as usize
236                } else {
237                    *buf.parse::<RawInt<LeU16>>(())? as usize
238                };
239                let bytes: &[u8] = buf.parse(nbytes)?;
240                Ok(BinlogValue::Value(Bytes(bytes.into())))
241            }
242            _ => Err(io::Error::new(
243                io::ErrorKind::InvalidData,
244                "Don't know how to handle column",
245            )),
246        }
247    }
248}
249
250#[derive(Debug, thiserror::Error)]
251pub enum BinlogValueToValueError {
252    #[error("Can't convert Jsonb to Json: {}", _0)]
253    ToJson(#[from] JsonbToJsonError),
254    #[error("Impossible to convert JsonDiff to Value")]
255    JsonDiff,
256}
257
258impl<'a> TryFrom<BinlogValue<'a>> for Value {
259    type Error = BinlogValueToValueError;
260
261    fn try_from(value: BinlogValue<'a>) -> Result<Self, Self::Error> {
262        match value {
263            BinlogValue::Value(x) => Ok(x),
264            BinlogValue::Jsonb(x) => {
265                let json = serde_json::Value::try_from(x)?;
266                Ok(Value::Bytes(Vec::from(json.to_string())))
267            }
268            BinlogValue::JsonDiff(_) => Err(BinlogValueToValueError::JsonDiff),
269        }
270    }
271}