mysql_common/binlog/
row.rs

1// Copyright (c) 2021 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use std::{
10    borrow::Cow,
11    convert::{TryFrom, TryInto},
12    fmt, io,
13    sync::Arc,
14};
15
16use bitvec::{prelude::BitVec, slice::BitSlice};
17
18use crate::{
19    constants::{ColumnFlags, ColumnType},
20    io::ParseBuf,
21    misc::raw::int::*,
22    packets::Column,
23    proto::MyDeserialize,
24    row::{new_row_raw, Row},
25    value::Value,
26};
27
28use super::{
29    events::{OptionalMetaExtractor, TableMapEvent},
30    value::{BinlogValue, BinlogValueToValueError},
31};
32
33/// Binlog rows event row value options.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35#[allow(non_camel_case_types)]
36#[repr(u64)]
37pub enum BinlogRowValueOptions {
38    /// Store JSON updates in partial form
39    PARTIAL_JSON_UPDATES = 1,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, thiserror::Error)]
43#[error("Unknown binlog version {}", _0)]
44#[repr(transparent)]
45pub struct UnknownBinlogRowValueOptions(pub u64);
46
47impl From<UnknownBinlogRowValueOptions> for u64 {
48    fn from(x: UnknownBinlogRowValueOptions) -> Self {
49        x.0
50    }
51}
52
53impl TryFrom<u64> for BinlogRowValueOptions {
54    type Error = UnknownBinlogRowValueOptions;
55
56    fn try_from(value: u64) -> Result<Self, Self::Error> {
57        match value {
58            1 => Ok(Self::PARTIAL_JSON_UPDATES),
59            x => Err(UnknownBinlogRowValueOptions(x)),
60        }
61    }
62}
63
64/// Representation of a binlog row.
65#[derive(Clone, PartialEq)]
66pub struct BinlogRow {
67    values: Vec<Option<BinlogValue<'static>>>,
68    columns: Arc<[Column]>,
69}
70
71impl BinlogRow {
72    pub fn new(values: Vec<Option<BinlogValue<'static>>>, columns: Arc<[Column]>) -> Self {
73        Self { values, columns }
74    }
75
76    /// Returns length of a row.
77    pub fn len(&self) -> usize {
78        self.values.len()
79    }
80
81    /// Returns true if the row has a length of 0.
82    pub fn is_empty(&self) -> bool {
83        self.values.is_empty()
84    }
85
86    /// Returns columns of this row.
87    pub fn columns_ref(&self) -> &[Column] {
88        &self.columns
89    }
90
91    /// Returns columns of this row.
92    pub fn columns(&self) -> Arc<[Column]> {
93        self.columns.clone()
94    }
95
96    /// Returns reference to the value of a column with index `index` if it exists and wasn't taken
97    /// by `Row::take` method.
98    ///
99    /// Non panicking version of `row[usize]`.
100    pub fn as_ref(&self, index: usize) -> Option<&BinlogValue> {
101        self.values.get(index).and_then(|x| x.as_ref())
102    }
103
104    /// Will take value of a column with index `index` if it exists and wasn't taken earlier then
105    /// will converts it to `T`.
106    pub fn take(&mut self, index: usize) -> Option<BinlogValue> {
107        self.values.get_mut(index).and_then(|x| x.take())
108    }
109
110    /// Unwraps values of a row.
111    ///
112    /// # Panics
113    ///
114    /// Panics if any of columns was taken by `take` method.
115    pub fn unwrap(self) -> Vec<BinlogValue<'static>> {
116        self.values
117            .into_iter()
118            .map(|x| x.expect("Can't unwrap row if some of columns was taken"))
119            .collect()
120    }
121
122    #[doc(hidden)]
123    pub fn place(&mut self, index: usize, value: BinlogValue<'static>) {
124        self.values[index] = Some(value);
125    }
126}
127
128impl<'de> MyDeserialize<'de> for BinlogRow {
129    const SIZE: Option<usize> = None;
130    /// Content:
131    ///
132    /// * number of columns
133    /// * column bitmap - bit is set if column is in the row
134    /// * have shared image - `true` means, that this is a partial event
135    ///   and this is an after image row. Therefore we need to parse a shared image
136    /// * corresponding table map event
137    type Ctx = (u64, &'de BitSlice<u8>, bool, &'de TableMapEvent<'de>);
138
139    fn deserialize(
140        (num_columns, cols, have_shared_image, table_info): Self::Ctx,
141        buf: &mut ParseBuf<'de>,
142    ) -> io::Result<Self> {
143        let mut values: Vec<Option<BinlogValue<'static>>> = vec![];
144        let mut columns = vec![];
145
146        // read a shared image if needed (see WL#2955)
147        let mut partial_cols = if have_shared_image {
148            let value_options = *buf.parse::<RawInt<LenEnc>>(())?;
149            if value_options & BinlogRowValueOptions::PARTIAL_JSON_UPDATES as u64 > 0 {
150                let json_columns_count = table_info.json_column_count();
151                let partial_columns_len = (json_columns_count + 7) / 8;
152                let partial_columns: &[u8] = buf.parse(partial_columns_len)?;
153                let partial_columns = BitSlice::<u8>::from_slice(partial_columns);
154                Some(partial_columns.into_iter().take(json_columns_count))
155            } else {
156                None
157            }
158        } else {
159            None
160        };
161
162        let num_bits = cols.count_ones();
163        let bitmap_len = (num_bits + 7) / 8;
164        let bitmap_buf: &[u8] = buf.parse(bitmap_len)?;
165        let mut null_bitmap = BitVec::<u8>::from_slice(bitmap_buf);
166        null_bitmap.truncate(num_bits);
167
168        let mut image_idx = 0;
169
170        let opt_meta_extractor = OptionalMetaExtractor::new(table_info.iter_optional_meta())?;
171
172        let mut signedness_iterator = opt_meta_extractor.iter_signedness();
173        let mut charset_iter = opt_meta_extractor.iter_charset();
174        let mut enum_and_set_charset_iter = opt_meta_extractor.iter_enum_and_set_charset();
175        let mut primary_key_iter = opt_meta_extractor.iter_primary_key();
176        let mut column_name_iter = opt_meta_extractor.iter_column_name();
177
178        for i in 0..(num_columns as usize) {
179            // check if column is in columns list
180            if cols.get(i).as_deref().copied().unwrap_or(false) {
181                let column_type = table_info.get_column_type(i);
182
183                // TableMapEvent must define column type for the current column.
184                let column_type = match column_type {
185                    Ok(Some(ty)) => ty,
186                    Ok(None) => {
187                        return Err(io::Error::new(io::ErrorKind::InvalidData, "No column type"))
188                    }
189                    Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidData, e)),
190                };
191
192                let column_meta = table_info.get_column_metadata(i).unwrap_or(&[]);
193
194                let is_partial = column_type == ColumnType::MYSQL_TYPE_JSON
195                    && partial_cols
196                        .as_mut()
197                        .and_then(|bits| bits.next().as_deref().copied())
198                        .unwrap_or(false);
199
200                let is_unsigned = column_type
201                    .is_numeric_type()
202                    .then(|| signedness_iterator.next())
203                    .flatten()
204                    .unwrap_or_default();
205
206                let charset = if column_type.is_character_type() {
207                    charset_iter.next().transpose()?.unwrap_or_default()
208                } else if column_type.is_enum_or_set_type() {
209                    enum_and_set_charset_iter
210                        .next()
211                        .transpose()?
212                        .unwrap_or_default()
213                } else {
214                    Default::default()
215                };
216
217                let column_name_raw = column_name_iter.next().transpose()?;
218                let column_name = column_name_raw
219                    .as_ref()
220                    .map(|x| Cow::Borrowed(x.name_raw()))
221                    .unwrap_or_else(|| {
222                        // default column name is `@<i>` where i is a column offset in a table
223                        Cow::Owned(format!("@{}", i).into())
224                    });
225
226                let mut column_flags = ColumnFlags::empty();
227
228                if is_unsigned {
229                    column_flags |= ColumnFlags::UNSIGNED_FLAG;
230                }
231
232                if primary_key_iter
233                    .next_if(|next| next.is_err() || next.as_ref().ok() == Some(&(i as u64)))
234                    .transpose()?
235                    .is_some()
236                {
237                    column_flags |= ColumnFlags::PRI_KEY_FLAG;
238                }
239
240                let column = Column::new(column_type)
241                    .with_schema(table_info.database_name_raw())
242                    .with_table(table_info.table_name_raw())
243                    .with_name(column_name.as_ref())
244                    .with_flags(column_flags)
245                    .with_schema(table_info.database_name_raw())
246                    .with_org_table(table_info.table_name_raw())
247                    .with_table(table_info.table_name_raw())
248                    .with_character_set(charset);
249
250                columns.push(column);
251
252                // check if column is null
253                if null_bitmap
254                    .get(image_idx)
255                    .as_deref()
256                    .copied()
257                    .unwrap_or(true)
258                {
259                    values.push(Some(BinlogValue::Value(Value::NULL)));
260                } else {
261                    let ctx = (column_type, column_meta, is_unsigned, is_partial);
262                    values.push(Some(buf.parse::<BinlogValue>(ctx)?.into_owned()));
263                }
264
265                image_idx += 1;
266            }
267        }
268
269        Ok(BinlogRow::new(values, columns.into_boxed_slice().into()))
270    }
271}
272
273impl fmt::Debug for BinlogRow {
274    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
275        let mut debug = f.debug_struct("BinlogRow");
276        for (val, column) in self.values.iter().zip(self.columns.iter()) {
277            match *val {
278                Some(ref val) => {
279                    debug.field(column.name_str().as_ref(), val);
280                }
281                None => {
282                    debug.field(column.name_str().as_ref(), &"<taken>");
283                }
284            }
285        }
286        debug.finish()
287    }
288}
289
290#[derive(Debug, thiserror::Error)]
291#[error(
292    "Can't convert BinlogRow to Row at column offset {}: {}",
293    column_offset,
294    error
295)]
296pub struct BinlogRowToRowError {
297    /// Column offset.
298    pub column_offset: usize,
299    /// Value conversion error.
300    pub error: BinlogValueToValueError,
301}
302
303impl TryFrom<BinlogRow> for Row {
304    type Error = BinlogRowToRowError;
305
306    fn try_from(binlog_row: BinlogRow) -> Result<Self, Self::Error> {
307        let mut values = Vec::with_capacity(binlog_row.values.len());
308        for (column_offset, value) in binlog_row.values.into_iter().enumerate() {
309            match value {
310                Some(x) => {
311                    values.push(Some(x.try_into().map_err(|error| BinlogRowToRowError {
312                        column_offset,
313                        error,
314                    })?))
315                }
316                None => values.push(None),
317            }
318        }
319        Ok(new_row_raw(values, binlog_row.columns))
320    }
321}