mysql_common/binlog/events/
mod.rs

1// Copyright (c) 2021 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use bitvec::prelude::*;
10use byteorder::{LittleEndian, WriteBytesExt};
11use bytes::BufMut;
12use saturating::Saturating as S;
13
14use crate::{
15    io::ParseBuf,
16    misc::raw::{int::*, RawConst, RawFlags},
17    proto::{MyDeserialize, MySerialize},
18};
19
20pub use self::{
21    anonymous_gtid_event::AnonymousGtidEvent,
22    begin_load_query_event::BeginLoadQueryEvent,
23    delete_rows_event::DeleteRowsEvent,
24    delete_rows_event_v1::DeleteRowsEventV1,
25    execute_load_query_event::ExecuteLoadQueryEvent,
26    format_description_event::FormatDescriptionEvent,
27    gtid_event::GtidEvent,
28    incident_event::IncidentEvent,
29    intvar_event::IntvarEvent,
30    partial_update_rows_event::PartialUpdateRowsEvent,
31    query_event::{QueryEvent, StatusVar, StatusVarVal, StatusVars, StatusVarsIterator},
32    rand_event::RandEvent,
33    rotate_event::RotateEvent,
34    rows_event::{RowsEvent, RowsEventRows},
35    rows_query_event::RowsQueryEvent,
36    table_map_event::*,
37    transaction_payload_event::{TransactionPayloadEvent, TransactionPayloadReader},
38    update_rows_event::UpdateRowsEvent,
39    update_rows_event_v1::UpdateRowsEventV1,
40    user_var_event::UserVarEvent,
41    write_rows_event::WriteRowsEvent,
42    write_rows_event_v1::WriteRowsEventV1,
43    xid_event::XidEvent,
44};
45
46use std::{
47    any::type_name,
48    borrow::Cow,
49    cmp::min,
50    io::{self, Read, Write},
51    u16,
52};
53
54use super::{
55    consts::{
56        BinlogChecksumAlg, BinlogVersion, EventFlags, EventType, RowsEventFlags,
57        UnknownChecksumAlg, UnknownEventType,
58    },
59    misc::LimitWrite,
60    BinlogCtx, BinlogEvent,
61};
62
63mod anonymous_gtid_event;
64mod begin_load_query_event;
65mod delete_rows_event;
66mod delete_rows_event_v1;
67mod execute_load_query_event;
68mod format_description_event;
69mod gtid_event;
70mod incident_event;
71mod intvar_event;
72mod partial_update_rows_event;
73mod query_event;
74mod rand_event;
75mod rotate_event;
76mod rows_event;
77mod rows_query_event;
78mod table_map_event;
79mod transaction_payload_event;
80mod update_rows_event;
81mod update_rows_event_v1;
82mod user_var_event;
83mod write_rows_event;
84mod write_rows_event_v1;
85mod xid_event;
86
87/// Raw binlog event.
88///
89/// A binlog event starts with a Binlog Event header and is followed by a Binlog Event Type
90/// specific data part.
91#[derive(Debug, Clone, Eq, PartialEq)]
92pub struct Event {
93    /// Format description event.
94    fde: FormatDescriptionEvent<'static>,
95    /// Common header of an event.
96    header: BinlogEventHeader,
97    /// An event-type specific data.
98    ///
99    /// Checksum-related suffix is truncated:
100    ///
101    /// *   checksum algorithm description (for fde) will go to `footer`;
102    /// *   checksum will go to `checksum`.
103    data: Vec<u8>,
104    /// Log event footer.
105    footer: BinlogEventFooter,
106    /// Event checksum.
107    ///
108    /// Makes sense only if checksum algorithm is defined in `footer`.
109    checksum: [u8; BinlogEventFooter::BINLOG_CHECKSUM_LEN],
110}
111
112impl Event {
113    /// Reads an event from `input`.
114    pub fn read<'a, T: Read>(
115        fde: &'a FormatDescriptionEvent<'a>,
116        mut input: T,
117    ) -> io::Result<Self> {
118        let binlog_header_len = BinlogEventHeader::LEN;
119        let mut fde = fde.clone().into_owned();
120
121        let mut header_buf = [0u8; BinlogEventHeader::LEN];
122        input.read_exact(&mut header_buf)?;
123        let header = BinlogEventHeader::deserialize((), &mut ParseBuf(&header_buf))?;
124
125        let mut data = vec![0_u8; (S(header.event_size() as usize) - S(binlog_header_len)).0];
126        input.read_exact(&mut data).unwrap();
127
128        let is_fde = header.event_type.0 == EventType::FORMAT_DESCRIPTION_EVENT as u8;
129        let mut bytes_to_truncate = 0;
130        let mut checksum = [0_u8; BinlogEventFooter::BINLOG_CHECKSUM_LEN];
131
132        let footer = if is_fde {
133            let footer = BinlogEventFooter::read(&data)?;
134            if footer.checksum_alg.is_some() {
135                // truncate checksum algorithm description
136                bytes_to_truncate += BinlogEventFooter::BINLOG_CHECKSUM_ALG_DESC_LEN;
137            }
138            // We'll update dummy fde footer
139            fde = fde.with_footer(footer);
140            footer
141        } else {
142            fde.footer()
143        };
144
145        // * fde will always contain checksum (see WL#2540)
146        // * events inside of a Transaction_payload_event are not checksummed (see WL#3549)
147        let contains_checksum = footer.checksum_alg.is_some()
148            && (is_fde || footer.checksum_alg != Some(RawConst::new(0)))
149            && footer.checksum_enabled;
150
151        if contains_checksum {
152            // truncate checksum
153            bytes_to_truncate += BinlogEventFooter::BINLOG_CHECKSUM_LEN;
154            checksum.copy_from_slice(&data[data.len() - BinlogEventFooter::BINLOG_CHECKSUM_LEN..]);
155        }
156
157        data.truncate(data.len() - bytes_to_truncate);
158
159        Ok(Self {
160            fde,
161            header,
162            data,
163            footer,
164            checksum,
165        })
166    }
167
168    /// Writes this event into the `output`.
169    pub fn write<T: Write>(&self, version: BinlogVersion, mut output: T) -> io::Result<()> {
170        let is_fde = self.header.event_type.0 == EventType::FORMAT_DESCRIPTION_EVENT as u8;
171        let mut output = output.limit(S(self.len(version)));
172
173        let mut header_buf = Vec::with_capacity(BinlogEventHeader::LEN);
174        self.header.serialize(&mut header_buf);
175        output.write_all(&header_buf)?;
176        output.write_all(&self.data)?;
177
178        if let Ok(Some(alg)) = self.footer.get_checksum_alg() {
179            if is_fde {
180                output.write_u8(alg as u8)?;
181            }
182            if alg == BinlogChecksumAlg::BINLOG_CHECKSUM_ALG_CRC32 || is_fde {
183                output.write_u32::<LittleEndian>(self.calc_checksum(alg))?;
184            }
185        }
186
187        Ok(())
188    }
189
190    /// Returns a length of a serialized representation of this event.
191    fn len(&self, _version: BinlogVersion) -> usize {
192        let is_fde = self.header.event_type.0 == EventType::FORMAT_DESCRIPTION_EVENT as u8;
193        let mut len = S(0);
194
195        len += S(BinlogEventHeader::LEN);
196        len += S(self.data.len());
197        if let Ok(Some(alg)) = self.footer.get_checksum_alg() {
198            if is_fde {
199                len += S(BinlogEventFooter::BINLOG_CHECKSUM_ALG_DESC_LEN);
200            }
201            if is_fde || alg != BinlogChecksumAlg::BINLOG_CHECKSUM_ALG_OFF {
202                len += S(BinlogEventFooter::BINLOG_CHECKSUM_LEN);
203            }
204        }
205
206        min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
207    }
208
209    /// Returns a reference to the corresponding format description event.
210    pub fn fde(&self) -> &FormatDescriptionEvent<'static> {
211        &self.fde
212    }
213
214    /// Returns a reference to the event header.
215    pub fn header(&self) -> BinlogEventHeader {
216        self.header
217    }
218
219    /// Returns a reference to the event data.
220    pub fn data(&self) -> &[u8] {
221        &self.data
222    }
223
224    /// Returns a reference to the event footer.
225    pub fn footer(&self) -> BinlogEventFooter {
226        self.footer
227    }
228
229    /// Returns the checksum, if it is defined.
230    pub fn checksum(&self) -> Option<[u8; BinlogEventFooter::BINLOG_CHECKSUM_LEN]> {
231        let contains_checksum = self.footer.checksum_alg.is_some()
232            && (self.header.event_type.0 == (EventType::FORMAT_DESCRIPTION_EVENT as u8)
233                || self.footer.checksum_alg != Some(RawConst::new(0)));
234        contains_checksum.then_some(self.checksum)
235    }
236
237    /// Read event-type specific data as a binlog struct.
238    pub fn read_event<'a, T: BinlogEvent<'a>>(&'a self) -> io::Result<T> {
239        // we'll use data.len() here because of truncated event footer
240        let event_size = BinlogEventHeader::LEN + self.data.len();
241        let event_data = &mut ParseBuf(&self.data);
242        let ctx = BinlogCtx::new(event_size, &self.fde);
243
244        let event = event_data.parse(ctx)?;
245
246        // it is an error if the `event_data` isn't fully consumed
247        if !event_data.is_empty() {
248            return Err(io::Error::new(
249                io::ErrorKind::Other,
250                format!(
251                    "bytes remaining on stream while reading {}",
252                    type_name::<T>()
253                ),
254            ));
255        }
256
257        Ok(event)
258    }
259
260    /// Reads event data. Returns `None` if event type is unknown.
261    pub fn read_data(&self) -> io::Result<Option<EventData<'_>>> {
262        use EventType::*;
263
264        let event_type = match self.header.event_type.get() {
265            Ok(event_type) => event_type,
266            _ => return Ok(None),
267        };
268
269        let event_data = match event_type {
270            ENUM_END_EVENT | UNKNOWN_EVENT => EventData::UnknownEvent,
271            START_EVENT_V3 => EventData::StartEventV3(Cow::Borrowed(&*self.data)),
272            QUERY_EVENT => EventData::QueryEvent(self.read_event()?),
273            STOP_EVENT => EventData::StopEvent,
274            ROTATE_EVENT => EventData::RotateEvent(self.read_event()?),
275            INTVAR_EVENT => EventData::IntvarEvent(self.read_event()?),
276            LOAD_EVENT => EventData::LoadEvent(Cow::Borrowed(&*self.data)),
277            SLAVE_EVENT => EventData::SlaveEvent,
278            CREATE_FILE_EVENT => EventData::CreateFileEvent(Cow::Borrowed(&*self.data)),
279            APPEND_BLOCK_EVENT => EventData::AppendBlockEvent(Cow::Borrowed(&*self.data)),
280            EXEC_LOAD_EVENT => EventData::ExecLoadEvent(Cow::Borrowed(&*self.data)),
281            DELETE_FILE_EVENT => EventData::DeleteFileEvent(Cow::Borrowed(&*self.data)),
282            NEW_LOAD_EVENT => EventData::NewLoadEvent(Cow::Borrowed(&*self.data)),
283            RAND_EVENT => EventData::RandEvent(self.read_event()?),
284            USER_VAR_EVENT => EventData::UserVarEvent(self.read_event()?),
285            FORMAT_DESCRIPTION_EVENT => {
286                let fde = self
287                    .read_event::<FormatDescriptionEvent>()?
288                    .with_footer(self.footer);
289                EventData::FormatDescriptionEvent(fde)
290            }
291            XID_EVENT => EventData::XidEvent(self.read_event()?),
292            BEGIN_LOAD_QUERY_EVENT => EventData::BeginLoadQueryEvent(self.read_event()?),
293            EXECUTE_LOAD_QUERY_EVENT => EventData::ExecuteLoadQueryEvent(self.read_event()?),
294            TABLE_MAP_EVENT => EventData::TableMapEvent(self.read_event()?),
295            PRE_GA_WRITE_ROWS_EVENT => EventData::PreGaWriteRowsEvent(Cow::Borrowed(&*self.data)),
296            PRE_GA_UPDATE_ROWS_EVENT => EventData::PreGaUpdateRowsEvent(Cow::Borrowed(&*self.data)),
297            PRE_GA_DELETE_ROWS_EVENT => EventData::PreGaDeleteRowsEvent(Cow::Borrowed(&*self.data)),
298            WRITE_ROWS_EVENT_V1 => {
299                EventData::RowsEvent(RowsEventData::WriteRowsEventV1(self.read_event()?))
300            }
301            UPDATE_ROWS_EVENT_V1 => {
302                EventData::RowsEvent(RowsEventData::UpdateRowsEventV1(self.read_event()?))
303            }
304            DELETE_ROWS_EVENT_V1 => {
305                EventData::RowsEvent(RowsEventData::DeleteRowsEventV1(self.read_event()?))
306            }
307            INCIDENT_EVENT => EventData::IncidentEvent(self.read_event()?),
308            HEARTBEAT_EVENT => EventData::HeartbeatEvent,
309            IGNORABLE_EVENT => EventData::IgnorableEvent(Cow::Borrowed(&*self.data)),
310            ROWS_QUERY_EVENT => EventData::RowsQueryEvent(self.read_event()?),
311            WRITE_ROWS_EVENT => {
312                EventData::RowsEvent(RowsEventData::WriteRowsEvent(self.read_event()?))
313            }
314            UPDATE_ROWS_EVENT => {
315                EventData::RowsEvent(RowsEventData::UpdateRowsEvent(self.read_event()?))
316            }
317            DELETE_ROWS_EVENT => {
318                EventData::RowsEvent(RowsEventData::DeleteRowsEvent(self.read_event()?))
319            }
320            GTID_EVENT => EventData::GtidEvent(self.read_event()?),
321            ANONYMOUS_GTID_EVENT => EventData::AnonymousGtidEvent(self.read_event()?),
322            PREVIOUS_GTIDS_EVENT => EventData::PreviousGtidsEvent(Cow::Borrowed(&*self.data)),
323            TRANSACTION_CONTEXT_EVENT => {
324                EventData::TransactionContextEvent(Cow::Borrowed(&*self.data))
325            }
326            VIEW_CHANGE_EVENT => EventData::ViewChangeEvent(Cow::Borrowed(&*self.data)),
327            XA_PREPARE_LOG_EVENT => EventData::XaPrepareLogEvent(Cow::Borrowed(&*self.data)),
328            PARTIAL_UPDATE_ROWS_EVENT => {
329                EventData::RowsEvent(RowsEventData::PartialUpdateRowsEvent(self.read_event()?))
330            }
331            TRANSACTION_PAYLOAD_EVENT => EventData::TransactionPayloadEvent(self.read_event()?),
332        };
333
334        Ok(Some(event_data))
335    }
336
337    /// Calculates checksum for this event.
338    pub fn calc_checksum(&self, alg: BinlogChecksumAlg) -> u32 {
339        let is_fde = self.header.event_type.0 == EventType::FORMAT_DESCRIPTION_EVENT as u8;
340
341        let mut hasher = crc32fast::Hasher::new();
342        let mut header = Vec::with_capacity(BinlogEventHeader::LEN);
343        let mut header_struct = self.header;
344        if header_struct
345            .flags
346            .get()
347            .contains(EventFlags::LOG_EVENT_BINLOG_IN_USE_F)
348        {
349            // In case this is a Format_description_log_event, we need to clear
350            // the LOG_EVENT_BINLOG_IN_USE_F flag before computing the checksum,
351            // since the flag will be cleared when the binlog is closed.
352            // On verification, the flag is also dropped before computing the checksum.
353            header_struct.flags.0 &= !(EventFlags::LOG_EVENT_BINLOG_IN_USE_F.bits());
354        }
355        header_struct.serialize(&mut header);
356        hasher.update(&header);
357        hasher.update(&self.data);
358        if is_fde {
359            hasher.update(&[alg as u8][..]);
360        }
361        hasher.finalize()
362    }
363}
364
365/// The binlog event header starts each event and is 19 bytes long assuming binlog version >= 4.
366#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
367pub struct BinlogEventHeader {
368    /// Seconds since unix epoch.
369    timestamp: RawInt<LeU32>,
370    /// Raw event Type.
371    event_type: RawConst<u8, EventType>,
372    /// Server-id of the originating mysql-server.
373    ///
374    /// Used to filter out events in circular replication.
375    server_id: RawInt<LeU32>,
376    /// Size of the event (header, post-header, body).
377    event_size: RawInt<LeU32>,
378    /// Position of the next event.
379    log_pos: RawInt<LeU32>,
380    /// Binlog Event Flag.
381    ///
382    /// This field contains the raw value. Use [`Self::flags()`] to get the actual flags.
383    flags: RawFlags<EventFlags, LeU16>,
384}
385
386impl BinlogEventHeader {
387    /// Binlog event header length for version >= 4.
388    pub const LEN: usize = 19;
389
390    /// Creates a new `BinlogEventHeader`.
391    pub fn new(
392        timestamp: u32,
393        event_type: EventType,
394        server_id: u32,
395        event_size: u32,
396        log_pos: u32,
397        flags: EventFlags,
398    ) -> Self {
399        Self {
400            timestamp: RawInt::new(timestamp),
401            event_type: RawConst::new(event_type as u8),
402            server_id: RawInt::new(server_id),
403            event_size: RawInt::new(event_size),
404            log_pos: RawInt::new(log_pos),
405            flags: RawFlags::new(flags.bits()),
406        }
407    }
408
409    /// Returns the `timestamp` value.
410    ///
411    /// `timestamp` is in seconds since unix epoch.
412    pub fn timestamp(&self) -> u32 {
413        self.timestamp.0
414    }
415
416    /// Returns the raw event type.
417    pub fn event_type_raw(&self) -> u8 {
418        self.event_type.0
419    }
420
421    /// Returns the event type, if it's valid.
422    pub fn event_type(&self) -> Result<EventType, UnknownEventType> {
423        self.event_type.get()
424    }
425
426    /// Returns the server Id of the originating mysql-server.
427    ///
428    /// Used to filter out events in circular replication.
429    pub fn server_id(&self) -> u32 {
430        self.server_id.0
431    }
432
433    /// Returns the size of the event (header, post-header, body).
434    pub fn event_size(&self) -> u32 {
435        self.event_size.0
436    }
437
438    /// Returns the position of the next event.
439    pub fn log_pos(&self) -> u32 {
440        self.log_pos.0
441    }
442
443    /// Returns event flags (unknown bits are truncated).
444    pub fn flags(&self) -> EventFlags {
445        self.flags.get()
446    }
447
448    /// Returns raw event flags (unknown bits are preserved).
449    pub fn flags_raw(&self) -> u16 {
450        self.flags.0
451    }
452}
453
454impl<'de> MyDeserialize<'de> for BinlogEventHeader {
455    const SIZE: Option<usize> = Some(Self::LEN);
456    type Ctx = ();
457
458    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
459        let mut buf: ParseBuf = buf.parse_unchecked(Self::LEN)?;
460        Ok(Self {
461            timestamp: buf.parse_unchecked(())?,
462            event_type: buf.parse_unchecked(())?,
463            server_id: buf.parse_unchecked(())?,
464            event_size: buf.parse_unchecked(())?,
465            log_pos: buf.parse_unchecked(())?,
466            flags: buf.parse_unchecked(())?,
467        })
468    }
469}
470
471impl MySerialize for BinlogEventHeader {
472    fn serialize(&self, buf: &mut Vec<u8>) {
473        self.timestamp.serialize(&mut *buf);
474        self.event_type.serialize(&mut *buf);
475        self.server_id.serialize(&mut *buf);
476        self.event_size.serialize(&mut *buf);
477        self.log_pos.serialize(&mut *buf);
478        self.flags.serialize(&mut *buf);
479    }
480}
481
482/// Binlog event footer.
483#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
484pub struct BinlogEventFooter {
485    /// Raw checksum algorithm description.
486    checksum_alg: Option<RawConst<u8, BinlogChecksumAlg>>,
487
488    /// Checksum enabled
489    checksum_enabled: bool,
490}
491
492impl BinlogEventFooter {
493    /// Length of the checksum algorithm description.
494    pub const BINLOG_CHECKSUM_ALG_DESC_LEN: usize = 1;
495    /// Length of the checksum.
496    pub const BINLOG_CHECKSUM_LEN: usize = 4;
497    /// Minimum MySql version that supports checksums.
498    pub const CHECKSUM_VERSION_PRODUCT: (u8, u8, u8) = (5, 6, 1);
499
500    pub fn new(checksum_alg: BinlogChecksumAlg) -> Self {
501        Self {
502            checksum_alg: Some(RawConst::new(checksum_alg as u8)),
503            checksum_enabled: true,
504        }
505    }
506
507    /// Returns parsed checksum algorithm, or raw value if algorithm is unknown.
508    pub fn get_checksum_alg(&self) -> Result<Option<BinlogChecksumAlg>, UnknownChecksumAlg> {
509        self.checksum_alg.as_ref().map(RawConst::get).transpose()
510    }
511
512    /// Returns `true` if checksum is enabled.
513    pub fn get_checksum_enabled(&self) -> bool {
514        self.checksum_enabled
515    }
516
517    /// Set checksum enabled flag.
518    pub fn set_checksum_enabled(&mut self, enabled: bool) {
519        self.checksum_enabled = enabled;
520    }
521
522    /// Reads binlog event footer from the given buffer.
523    ///
524    /// Requires that buf contains `FormatDescriptionEvent` data.
525    pub fn read(buf: &[u8]) -> io::Result<Self> {
526        let checksum_alg = if buf.len()
527            >= FormatDescriptionEvent::SERVER_VER_OFFSET + FormatDescriptionEvent::SERVER_VER_LEN
528        {
529            let mut server_version = vec![0_u8; FormatDescriptionEvent::SERVER_VER_LEN];
530            (&buf[FormatDescriptionEvent::SERVER_VER_OFFSET..]).read_exact(&mut server_version)?;
531            server_version[FormatDescriptionEvent::SERVER_VER_LEN - 1] = 0;
532            let version = crate::misc::split_version(&server_version);
533            if version < Self::CHECKSUM_VERSION_PRODUCT {
534                None
535            } else {
536                let offset = buf.len()
537                    - (BinlogEventFooter::BINLOG_CHECKSUM_ALG_DESC_LEN
538                        + BinlogEventFooter::BINLOG_CHECKSUM_LEN);
539                Some(buf[offset])
540            }
541        } else {
542            None
543        };
544
545        Ok(Self {
546            checksum_alg: checksum_alg.map(RawConst::new),
547            checksum_enabled: true,
548        })
549    }
550}
551
552impl Default for BinlogEventFooter {
553    fn default() -> Self {
554        BinlogEventFooter {
555            checksum_alg: Some(RawConst::new(
556                BinlogChecksumAlg::BINLOG_CHECKSUM_ALG_OFF as u8,
557            )),
558            checksum_enabled: true,
559        }
560    }
561}
562
563/// Parsed event data.
564#[derive(Debug, Clone, Eq, PartialEq, Hash)]
565pub enum EventData<'a> {
566    UnknownEvent,
567    /// Ignored by this implementation
568    StartEventV3(Cow<'a, [u8]>),
569    QueryEvent(QueryEvent<'a>),
570    StopEvent,
571    RotateEvent(RotateEvent<'a>),
572    IntvarEvent(IntvarEvent),
573    /// Ignored by this implementation
574    LoadEvent(Cow<'a, [u8]>),
575    SlaveEvent,
576    CreateFileEvent(Cow<'a, [u8]>),
577    /// Ignored by this implementation
578    AppendBlockEvent(Cow<'a, [u8]>),
579    /// Ignored by this implementation
580    ExecLoadEvent(Cow<'a, [u8]>),
581    /// Ignored by this implementation
582    DeleteFileEvent(Cow<'a, [u8]>),
583    /// Ignored by this implementation
584    NewLoadEvent(Cow<'a, [u8]>),
585    RandEvent(RandEvent),
586    UserVarEvent(UserVarEvent<'a>),
587    FormatDescriptionEvent(FormatDescriptionEvent<'a>),
588    XidEvent(XidEvent),
589    BeginLoadQueryEvent(BeginLoadQueryEvent<'a>),
590    ExecuteLoadQueryEvent(ExecuteLoadQueryEvent<'a>),
591    TableMapEvent(TableMapEvent<'a>),
592    /// Ignored by this implementation
593    PreGaWriteRowsEvent(Cow<'a, [u8]>),
594    /// Ignored by this implementation
595    PreGaUpdateRowsEvent(Cow<'a, [u8]>),
596    /// Ignored by this implementation
597    PreGaDeleteRowsEvent(Cow<'a, [u8]>),
598    IncidentEvent(IncidentEvent<'a>),
599    HeartbeatEvent,
600    IgnorableEvent(Cow<'a, [u8]>),
601    RowsQueryEvent(RowsQueryEvent<'a>),
602    GtidEvent(GtidEvent),
603    /// Not yet implemented.
604    AnonymousGtidEvent(AnonymousGtidEvent),
605    /// Not yet implemented.
606    PreviousGtidsEvent(Cow<'a, [u8]>),
607    /// Not yet implemented.
608    TransactionContextEvent(Cow<'a, [u8]>),
609    /// Not yet implemented.
610    ViewChangeEvent(Cow<'a, [u8]>),
611    /// Not yet implemented.
612    XaPrepareLogEvent(Cow<'a, [u8]>),
613    RowsEvent(RowsEventData<'a>),
614    TransactionPayloadEvent(TransactionPayloadEvent<'a>),
615}
616
617impl<'a> EventData<'a> {
618    pub fn into_owned(self) -> EventData<'static> {
619        match self {
620            EventData::UnknownEvent => EventData::UnknownEvent,
621            EventData::StartEventV3(ev) => EventData::StartEventV3(Cow::Owned(ev.into_owned())),
622            Self::QueryEvent(ev) => EventData::QueryEvent(ev.into_owned()),
623            Self::StopEvent => EventData::StopEvent,
624            Self::RotateEvent(ev) => EventData::RotateEvent(ev.into_owned()),
625            Self::IntvarEvent(ev) => EventData::IntvarEvent(ev),
626            Self::LoadEvent(ev) => EventData::LoadEvent(Cow::Owned(ev.into_owned())),
627            Self::SlaveEvent => EventData::SlaveEvent,
628            Self::CreateFileEvent(ev) => EventData::CreateFileEvent(Cow::Owned(ev.into_owned())),
629            Self::AppendBlockEvent(ev) => EventData::AppendBlockEvent(Cow::Owned(ev.into_owned())),
630            Self::ExecLoadEvent(ev) => EventData::ExecLoadEvent(Cow::Owned(ev.into_owned())),
631            Self::DeleteFileEvent(ev) => EventData::DeleteFileEvent(Cow::Owned(ev.into_owned())),
632            Self::NewLoadEvent(ev) => EventData::NewLoadEvent(Cow::Owned(ev.into_owned())),
633            Self::RandEvent(ev) => EventData::RandEvent(ev),
634            Self::UserVarEvent(ev) => EventData::UserVarEvent(ev.into_owned()),
635            Self::FormatDescriptionEvent(ev) => EventData::FormatDescriptionEvent(ev.into_owned()),
636            Self::XidEvent(ev) => EventData::XidEvent(ev),
637            Self::BeginLoadQueryEvent(ev) => EventData::BeginLoadQueryEvent(ev.into_owned()),
638            Self::ExecuteLoadQueryEvent(ev) => EventData::ExecuteLoadQueryEvent(ev.into_owned()),
639            Self::TableMapEvent(ev) => EventData::TableMapEvent(ev.into_owned()),
640            Self::PreGaWriteRowsEvent(ev) => {
641                EventData::PreGaWriteRowsEvent(Cow::Owned(ev.into_owned()))
642            }
643            Self::PreGaUpdateRowsEvent(ev) => {
644                EventData::PreGaUpdateRowsEvent(Cow::Owned(ev.into_owned()))
645            }
646            Self::PreGaDeleteRowsEvent(ev) => {
647                EventData::PreGaDeleteRowsEvent(Cow::Owned(ev.into_owned()))
648            }
649            Self::IncidentEvent(ev) => EventData::IncidentEvent(ev.into_owned()),
650            Self::HeartbeatEvent => EventData::HeartbeatEvent,
651            Self::IgnorableEvent(ev) => EventData::IgnorableEvent(Cow::Owned(ev.into_owned())),
652            Self::RowsQueryEvent(ev) => EventData::RowsQueryEvent(ev.into_owned()),
653            Self::GtidEvent(ev) => EventData::GtidEvent(ev),
654            Self::AnonymousGtidEvent(ev) => EventData::AnonymousGtidEvent(ev),
655            Self::PreviousGtidsEvent(ev) => {
656                EventData::PreviousGtidsEvent(Cow::Owned(ev.into_owned()))
657            }
658            Self::TransactionContextEvent(ev) => {
659                EventData::TransactionContextEvent(Cow::Owned(ev.into_owned()))
660            }
661            Self::ViewChangeEvent(ev) => EventData::ViewChangeEvent(Cow::Owned(ev.into_owned())),
662            Self::XaPrepareLogEvent(ev) => {
663                EventData::XaPrepareLogEvent(Cow::Owned(ev.into_owned()))
664            }
665            Self::RowsEvent(ev) => EventData::RowsEvent(ev.into_owned()),
666            Self::TransactionPayloadEvent(ev) => {
667                EventData::TransactionPayloadEvent(ev.into_owned())
668            }
669        }
670    }
671}
672
673impl MySerialize for EventData<'_> {
674    fn serialize(&self, buf: &mut Vec<u8>) {
675        match self {
676            EventData::UnknownEvent => (),
677            EventData::StartEventV3(ev) => buf.put_slice(ev),
678            EventData::QueryEvent(ev) => ev.serialize(buf),
679            EventData::StopEvent => (),
680            EventData::RotateEvent(ev) => ev.serialize(buf),
681            EventData::IntvarEvent(ev) => ev.serialize(buf),
682            EventData::LoadEvent(ev) => buf.put_slice(ev),
683            EventData::SlaveEvent => (),
684            EventData::CreateFileEvent(ev) => buf.put_slice(ev),
685            EventData::AppendBlockEvent(ev) => buf.put_slice(ev),
686            EventData::ExecLoadEvent(ev) => buf.put_slice(ev),
687            EventData::DeleteFileEvent(ev) => buf.put_slice(ev),
688            EventData::NewLoadEvent(ev) => buf.put_slice(ev),
689            EventData::RandEvent(ev) => ev.serialize(buf),
690            EventData::UserVarEvent(ev) => ev.serialize(buf),
691            EventData::FormatDescriptionEvent(ev) => ev.serialize(buf),
692            EventData::XidEvent(ev) => ev.serialize(buf),
693            EventData::BeginLoadQueryEvent(ev) => ev.serialize(buf),
694            EventData::ExecuteLoadQueryEvent(ev) => ev.serialize(buf),
695            EventData::TableMapEvent(ev) => ev.serialize(buf),
696            EventData::PreGaWriteRowsEvent(ev) => buf.put_slice(ev),
697            EventData::PreGaUpdateRowsEvent(ev) => buf.put_slice(ev),
698            EventData::PreGaDeleteRowsEvent(ev) => buf.put_slice(ev),
699            EventData::IncidentEvent(ev) => ev.serialize(buf),
700            EventData::HeartbeatEvent => (),
701            EventData::IgnorableEvent(ev) => buf.put_slice(ev),
702            EventData::RowsQueryEvent(ev) => ev.serialize(buf),
703            EventData::GtidEvent(ev) => ev.serialize(buf),
704            EventData::AnonymousGtidEvent(ev) => ev.serialize(buf),
705            EventData::PreviousGtidsEvent(ev) => buf.put_slice(ev),
706            EventData::TransactionContextEvent(ev) => buf.put_slice(ev),
707            EventData::ViewChangeEvent(ev) => buf.put_slice(ev),
708            EventData::XaPrepareLogEvent(ev) => buf.put_slice(ev),
709            EventData::RowsEvent(ev) => ev.serialize(buf),
710            EventData::TransactionPayloadEvent(ev) => ev.serialize(buf),
711        }
712    }
713}
714
715/// Rows events are unified under this enum (see [`EventData`]).
716#[derive(Debug, Clone, Eq, PartialEq, Hash)]
717pub enum RowsEventData<'a> {
718    WriteRowsEventV1(WriteRowsEventV1<'a>),
719    UpdateRowsEventV1(UpdateRowsEventV1<'a>),
720    DeleteRowsEventV1(DeleteRowsEventV1<'a>),
721    WriteRowsEvent(WriteRowsEvent<'a>),
722    UpdateRowsEvent(UpdateRowsEvent<'a>),
723    DeleteRowsEvent(DeleteRowsEvent<'a>),
724    PartialUpdateRowsEvent(PartialUpdateRowsEvent<'a>),
725}
726
727impl<'a> RowsEventData<'a> {
728    /// Returns the number that identifies the table (see `TableMapEvent`).
729    pub fn table_id(&self) -> u64 {
730        match self {
731            RowsEventData::WriteRowsEventV1(ev) => ev.table_id(),
732            RowsEventData::UpdateRowsEventV1(ev) => ev.table_id(),
733            RowsEventData::DeleteRowsEventV1(ev) => ev.table_id(),
734            RowsEventData::WriteRowsEvent(ev) => ev.table_id(),
735            RowsEventData::UpdateRowsEvent(ev) => ev.table_id(),
736            RowsEventData::DeleteRowsEvent(ev) => ev.table_id(),
737            RowsEventData::PartialUpdateRowsEvent(ev) => ev.table_id(),
738        }
739    }
740
741    /// Returns the number of columns in the table.
742    pub fn num_columns(&self) -> u64 {
743        match self {
744            RowsEventData::WriteRowsEventV1(ev) => ev.num_columns(),
745            RowsEventData::UpdateRowsEventV1(ev) => ev.num_columns(),
746            RowsEventData::DeleteRowsEventV1(ev) => ev.num_columns(),
747            RowsEventData::WriteRowsEvent(ev) => ev.num_columns(),
748            RowsEventData::UpdateRowsEvent(ev) => ev.num_columns(),
749            RowsEventData::DeleteRowsEvent(ev) => ev.num_columns(),
750            RowsEventData::PartialUpdateRowsEvent(ev) => ev.num_columns(),
751        }
752    }
753
754    /// Returns columns in the before-image (only for DELETE and UPDATE).
755    ///
756    /// Each bit indicates whether corresponding column is used in the image.
757    pub fn columns_before_image(&'a self) -> Option<&'a BitSlice<u8>> {
758        match self {
759            RowsEventData::WriteRowsEventV1(_) => None,
760            RowsEventData::UpdateRowsEventV1(ev) => Some(ev.columns_before_image()),
761            RowsEventData::DeleteRowsEventV1(ev) => Some(ev.columns_before_image()),
762            RowsEventData::WriteRowsEvent(_) => None,
763            RowsEventData::UpdateRowsEvent(ev) => Some(ev.columns_before_image()),
764            RowsEventData::DeleteRowsEvent(ev) => Some(ev.columns_before_image()),
765            RowsEventData::PartialUpdateRowsEvent(ev) => Some(ev.columns_before_image()),
766        }
767    }
768
769    /// Returns columns in the after-image (only for WRITE and UPDATE).
770    ///
771    /// Each bit indicates whether corresponding column is used in the image.
772    pub fn columns_after_image(&'a self) -> Option<&'a BitSlice<u8>> {
773        match self {
774            RowsEventData::WriteRowsEventV1(ev) => Some(ev.columns_after_image()),
775            RowsEventData::UpdateRowsEventV1(ev) => Some(ev.columns_after_image()),
776            RowsEventData::DeleteRowsEventV1(_) => None,
777            RowsEventData::WriteRowsEvent(ev) => Some(ev.columns_after_image()),
778            RowsEventData::UpdateRowsEvent(ev) => Some(ev.columns_after_image()),
779            RowsEventData::DeleteRowsEvent(_) => None,
780            RowsEventData::PartialUpdateRowsEvent(ev) => Some(ev.columns_after_image()),
781        }
782    }
783
784    /// Returns raw rows data.
785    pub fn rows_data(&'a self) -> &'a [u8] {
786        match self {
787            RowsEventData::WriteRowsEventV1(ev) => ev.rows_data(),
788            RowsEventData::UpdateRowsEventV1(ev) => ev.rows_data(),
789            RowsEventData::DeleteRowsEventV1(ev) => ev.rows_data(),
790            RowsEventData::WriteRowsEvent(ev) => ev.rows_data(),
791            RowsEventData::UpdateRowsEvent(ev) => ev.rows_data(),
792            RowsEventData::DeleteRowsEvent(ev) => ev.rows_data(),
793            RowsEventData::PartialUpdateRowsEvent(ev) => ev.rows_data(),
794        }
795    }
796
797    /// Returns event flags.
798    pub fn flags(&self) -> RowsEventFlags {
799        match self {
800            RowsEventData::WriteRowsEventV1(ev) => ev.flags(),
801            RowsEventData::UpdateRowsEventV1(ev) => ev.flags(),
802            RowsEventData::DeleteRowsEventV1(ev) => ev.flags(),
803            RowsEventData::WriteRowsEvent(ev) => ev.flags(),
804            RowsEventData::UpdateRowsEvent(ev) => ev.flags(),
805            RowsEventData::DeleteRowsEvent(ev) => ev.flags(),
806            RowsEventData::PartialUpdateRowsEvent(ev) => ev.flags(),
807        }
808    }
809
810    /// Returns an iterator over event's rows given the corresponding `TableMapEvent`.
811    pub fn rows(&'a self, table_map_event: &'a TableMapEvent<'a>) -> RowsEventRows<'a> {
812        match self {
813            RowsEventData::WriteRowsEventV1(ev) => ev.rows(table_map_event),
814            RowsEventData::UpdateRowsEventV1(ev) => ev.rows(table_map_event),
815            RowsEventData::DeleteRowsEventV1(ev) => ev.rows(table_map_event),
816            RowsEventData::WriteRowsEvent(ev) => ev.rows(table_map_event),
817            RowsEventData::UpdateRowsEvent(ev) => ev.rows(table_map_event),
818            RowsEventData::DeleteRowsEvent(ev) => ev.rows(table_map_event),
819            RowsEventData::PartialUpdateRowsEvent(ev) => ev.rows(table_map_event),
820        }
821    }
822
823    pub fn into_owned(self) -> RowsEventData<'static> {
824        match self {
825            Self::WriteRowsEventV1(ev) => RowsEventData::WriteRowsEventV1(ev.into_owned()),
826            Self::UpdateRowsEventV1(ev) => RowsEventData::UpdateRowsEventV1(ev.into_owned()),
827            Self::DeleteRowsEventV1(ev) => RowsEventData::DeleteRowsEventV1(ev.into_owned()),
828            Self::WriteRowsEvent(ev) => RowsEventData::WriteRowsEvent(ev.into_owned()),
829            Self::UpdateRowsEvent(ev) => RowsEventData::UpdateRowsEvent(ev.into_owned()),
830            Self::DeleteRowsEvent(ev) => RowsEventData::DeleteRowsEvent(ev.into_owned()),
831            Self::PartialUpdateRowsEvent(ev) => {
832                RowsEventData::PartialUpdateRowsEvent(ev.into_owned())
833            }
834        }
835    }
836}
837
838impl MySerialize for RowsEventData<'_> {
839    fn serialize(&self, buf: &mut Vec<u8>) {
840        match self {
841            RowsEventData::WriteRowsEventV1(ev) => ev.serialize(buf),
842            RowsEventData::UpdateRowsEventV1(ev) => ev.serialize(buf),
843            RowsEventData::DeleteRowsEventV1(ev) => ev.serialize(buf),
844            RowsEventData::WriteRowsEvent(ev) => ev.serialize(buf),
845            RowsEventData::UpdateRowsEvent(ev) => ev.serialize(buf),
846            RowsEventData::DeleteRowsEvent(ev) => ev.serialize(buf),
847            RowsEventData::PartialUpdateRowsEvent(ev) => ev.serialize(buf),
848        }
849    }
850}