mysql_common/binlog/events/
rows_event.rs

1// Copyright (c) 2021 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use std::{cmp::min, fmt, io};
10
11use bitvec::prelude::*;
12use bytes::BufMut;
13use saturating::Saturating as S;
14
15use crate::{
16    binlog::{
17        consts::{BinlogVersion, EventType, RowsEventFlags},
18        row::BinlogRow,
19        BinlogCtx,
20    },
21    io::ParseBuf,
22    misc::{
23        raw::{
24            bytes::{BareBytes, EofBytes},
25            int::*,
26            RawBytes, RawFlags,
27        },
28        unexpected_buf_eof,
29    },
30    proto::{MyDeserialize, MySerialize},
31};
32
33use super::{BinlogEventHeader, TableMapEvent};
34
35/// Common base structure for all row-containing binary log events.
36#[derive(Debug, Clone, Eq, PartialEq, Hash)]
37pub struct RowsEvent<'a> {
38    /// An actual `EventType` of this wrapped object.
39    event_type: EventType,
40    /// Table identifier.
41    ///
42    /// If the table id is `0x00ffffff` it is a dummy event that should have
43    /// the end of statement flag set that declares that all table maps can be freed.
44    /// Otherwise it refers to a table defined by `TABLE_MAP_EVENT`.
45    table_id: RawInt<LeU48>,
46    /// Raw rows event flags (see `RowsEventFlags`).
47    flags: RawFlags<RowsEventFlags, LeU16>,
48    /// Raw extra data.
49    ///
50    /// Length is up to `u16::MAX - 2` bytes.
51    extra_data: RawBytes<'a, BareBytes<{ u16::MAX as usize - 2 }>>,
52    /// Number of columns.
53    num_columns: RawInt<LenEnc>,
54    /// For DELETE and UPDATE only. Bit-field indicating whether each column is used one bit
55    /// per column.
56    ///
57    /// Will be empty for WRITE events.
58    columns_before_image: Option<RawBytes<'a, BareBytes<0x2000000000000000>>>,
59    /// For WRITE and UPDATE only. Bit-field indicating whether each column is used
60    /// in the `UPDATE_ROWS_EVENT` and `WRITE_ROWS_EVENT` after-image; one bit per column.
61    ///
62    /// Will be empty for DELETE events.
63    columns_after_image: Option<RawBytes<'a, BareBytes<0x2000000000000000>>>,
64    /// A sequence of zero or more rows. The end is determined by the size of the event.
65    ///
66    /// Each row has the following format:
67    ///
68    /// *   A Bit-field indicating whether each field in the row is NULL. Only columns that
69    ///     are "used" according to the second field in the variable data part are listed here.
70    ///     If the second field in the variable data part has N one-bits, the amount of storage
71    ///     required for this field is INT((N + 7) / 8) bytes.
72    /// *   The row-image, containing values of all table fields. This only lists table fields
73    ///     that are used (according to the second field of the variable data part) and non-NULL
74    ///     (according to the previous field). In other words, the number of values listed here
75    ///     is equal to the number of zero bits in the previous field. (not counting padding
76    ///     bits in the last byte).
77    rows_data: RawBytes<'a, EofBytes>,
78}
79
80impl<'a> RowsEvent<'a> {
81    /// Returns an actual event type of this rows event.
82    pub fn event_type(&self) -> EventType {
83        self.event_type
84    }
85
86    /// Returns the number that identifies the table (see `TableMapEvent`).
87    pub fn table_id(&self) -> u64 {
88        self.table_id.0
89    }
90
91    /// Returns the number of columns in the table.
92    pub fn num_columns(&self) -> u64 {
93        self.num_columns.0
94    }
95
96    /// Returns columns in the before-image (only for DELETE and UPDATE).
97    ///
98    /// Each bit indicates whether corresponding column is used in the image.
99    pub fn columns_before_image(&'a self) -> Option<&'a BitSlice<u8>> {
100        match self.columns_before_image {
101            Some(ref bytes) => {
102                let slice = BitSlice::from_slice(bytes.as_bytes());
103                Some(&slice[..self.num_columns() as usize])
104            }
105            None => None,
106        }
107    }
108
109    /// Returns columns in the after-image (only for WRITE and UPDATE).
110    ///
111    /// Each bit indicates whether corresponding column is used in the image.
112    pub fn columns_after_image(&'a self) -> Option<&'a BitSlice<u8>> {
113        match self.columns_after_image {
114            Some(ref bytes) => {
115                let slice = BitSlice::from_slice(bytes.as_bytes());
116                Some(&slice[..self.num_columns() as usize])
117            }
118            None => None,
119        }
120    }
121
122    /// Returns raw rows data.
123    pub fn rows_data(&'a self) -> &'a [u8] {
124        self.rows_data.as_bytes()
125    }
126
127    /// Returns event flags (unknown bits are truncated).
128    pub fn flags(&self) -> RowsEventFlags {
129        self.flags.get()
130    }
131
132    /// Returns raw event flags (unknown bits are preserved).
133    pub fn flags_raw(&self) -> u16 {
134        self.flags.0
135    }
136
137    /// Returns length of this event in bytes.
138    ///
139    /// This function will be used in `BinlogStruct` implementations for derived events.
140    pub fn len(&self, _version: BinlogVersion) -> usize {
141        let mut len = S(0);
142
143        len += S(6); // table_id
144        len += S(2); // flags
145        len += S(2); // extra-data len
146        len += S(min(self.extra_data.len(), u16::MAX as usize - 2)); // extra data
147        len += S(crate::misc::lenenc_int_len(self.num_columns()) as usize); // number of columns
148        let bitmap_len = (self.num_columns() as usize + 7) / 8;
149        if self.columns_before_image.is_some() {
150            len += S(bitmap_len); // columns present bitmap 1
151        }
152        if self.columns_after_image.is_some() {
153            len += S(bitmap_len); // columns present bitmap 2
154        }
155        len += S(self.rows_data.len());
156
157        min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
158    }
159
160    /// Returns an iterator over event's rows given the corresponding `TableMapEvent`.
161    pub fn rows<'b>(&'b self, table_map_event: &'b TableMapEvent<'b>) -> RowsEventRows<'b> {
162        RowsEventRows {
163            rows_event: self,
164            table_map_event,
165            rows_data: ParseBuf(self.rows_data.as_bytes()),
166        }
167    }
168
169    pub fn into_owned(self) -> RowsEvent<'static> {
170        RowsEvent {
171            event_type: self.event_type,
172            table_id: self.table_id,
173            flags: self.flags,
174            extra_data: self.extra_data.into_owned(),
175            num_columns: self.num_columns,
176            columns_before_image: self.columns_before_image.map(|x| x.into_owned()),
177            columns_after_image: self.columns_after_image.map(|x| x.into_owned()),
178            rows_data: self.rows_data.into_owned(),
179        }
180    }
181}
182
183/// Deserialization context for [`RowsEvent`].
184pub struct RowsEventCtx<'a> {
185    /// An actual event type.
186    pub event_type: EventType,
187    /// Additional context data.
188    pub binlog_ctx: BinlogCtx<'a>,
189}
190
191impl<'de> MyDeserialize<'de> for RowsEvent<'de> {
192    const SIZE: Option<usize> = None;
193    type Ctx = RowsEventCtx<'de>;
194
195    fn deserialize(ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
196        let post_header_len = ctx
197            .binlog_ctx
198            .fde
199            .get_event_type_header_length(ctx.event_type);
200
201        let is_delete_event = ctx.event_type == EventType::DELETE_ROWS_EVENT
202            || ctx.event_type == EventType::DELETE_ROWS_EVENT_V1;
203
204        let is_update_event = ctx.event_type == EventType::UPDATE_ROWS_EVENT
205            || ctx.event_type == EventType::UPDATE_ROWS_EVENT_V1
206            || ctx.event_type == EventType::PARTIAL_UPDATE_ROWS_EVENT;
207
208        let table_id = if post_header_len == 6 {
209            // old server
210            let value = buf.parse::<RawInt<LeU32>>(())?;
211            RawInt::new(value.0 as u64)
212        } else {
213            buf.parse(())?
214        };
215
216        let flags = buf.parse(())?;
217
218        let extra_data = if post_header_len
219            == ctx
220                .binlog_ctx
221                .fde
222                .get_event_type_header_length(EventType::WRITE_ROWS_EVENT)
223        {
224            // variable-length post header containing extra data
225            let extra_data_len = buf.checked_eat_u16_le().ok_or_else(unexpected_buf_eof)? as usize;
226            buf.parse(extra_data_len.saturating_sub(2))?
227        } else {
228            RawBytes::new(&[][..])
229        };
230
231        let num_columns: RawInt<LenEnc> = buf.parse(())?;
232        let bitmap_len = (num_columns.0 as usize + 7) / 8;
233
234        let mut columns_before_image = None;
235        let mut columns_after_image = None;
236
237        if is_update_event {
238            columns_before_image = Some(buf.parse(bitmap_len)?);
239            columns_after_image = Some(buf.parse(bitmap_len)?);
240        } else if is_delete_event {
241            columns_before_image = Some(buf.parse(bitmap_len)?);
242        } else {
243            columns_after_image = Some(buf.parse(bitmap_len)?);
244        }
245
246        let rows_data = buf.parse(())?;
247
248        Ok(Self {
249            event_type: ctx.event_type,
250            table_id,
251            flags,
252            extra_data,
253            num_columns,
254            columns_before_image,
255            columns_after_image,
256            rows_data,
257        })
258    }
259}
260
261impl MySerialize for RowsEvent<'_> {
262    fn serialize(&self, buf: &mut Vec<u8>) {
263        self.table_id.serialize(&mut *buf);
264        self.flags.serialize(&mut *buf);
265
266        if self.event_type == EventType::WRITE_ROWS_EVENT
267            || self.event_type == EventType::UPDATE_ROWS_EVENT
268            || self.event_type == EventType::DELETE_ROWS_EVENT
269            || self.event_type == EventType::PARTIAL_UPDATE_ROWS_EVENT
270        {
271            let len = min(self.extra_data.len().saturating_add(2), u16::MAX as usize) as u16;
272            buf.put_u16_le(len);
273            self.extra_data.serialize(&mut *buf);
274        }
275
276        self.num_columns.serialize(&mut *buf);
277
278        if let Some(bitmap) = &self.columns_before_image {
279            bitmap.serialize(&mut *buf);
280        }
281        if let Some(bitmap) = &self.columns_after_image {
282            bitmap.serialize(&mut *buf);
283        }
284
285        self.rows_data.serialize(buf);
286    }
287}
288
289/// Iterator over rows of a `RowsEvent`.
290#[derive(Clone, Eq, PartialEq)]
291pub struct RowsEventRows<'a> {
292    rows_event: &'a RowsEvent<'a>,
293    table_map_event: &'a TableMapEvent<'a>,
294    rows_data: ParseBuf<'a>,
295}
296
297impl<'a> RowsEventRows<'a> {
298    pub(crate) fn new(
299        rows_event: &'a RowsEvent<'a>,
300        table_map_event: &'a TableMapEvent<'a>,
301        rows_data: ParseBuf<'a>,
302    ) -> Self {
303        Self {
304            rows_event,
305            table_map_event,
306            rows_data,
307        }
308    }
309}
310
311impl<'a> Iterator for RowsEventRows<'a> {
312    type Item = io::Result<(Option<BinlogRow>, Option<BinlogRow>)>;
313
314    fn next(&mut self) -> Option<Self::Item> {
315        let mut row_before = None;
316        let mut row_after = None;
317
318        if self.rows_data.is_empty() {
319            return None;
320        }
321
322        if let Some(cols) = self.rows_event.columns_before_image() {
323            let ctx = (
324                self.rows_event.num_columns(),
325                cols,
326                false,
327                self.table_map_event,
328            );
329            row_before = match self.rows_data.parse(ctx) {
330                Ok(row_before) => Some(row_before),
331                Err(err) => return Some(Err(err)),
332            };
333        }
334
335        if let Some(cols) = self.rows_event.columns_after_image() {
336            let ctx = (
337                self.rows_event.num_columns(),
338                cols,
339                self.rows_event.event_type == EventType::PARTIAL_UPDATE_ROWS_EVENT,
340                self.table_map_event,
341            );
342            row_after = match self.rows_data.parse(ctx) {
343                Ok(row_after) => Some(row_after),
344                Err(err) => return Some(Err(err)),
345            };
346        }
347
348        Some(Ok((row_before, row_after)))
349    }
350}
351
352impl fmt::Debug for RowsEventRows<'_> {
353    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354        f.debug_list().entries(self.clone()).finish()
355    }
356}