1use bitvec::prelude::*;
10use byteorder::{LittleEndian, WriteBytesExt};
11use bytes::BufMut;
12use saturating::Saturating as S;
13
14use crate::{
15 io::ParseBuf,
16 misc::raw::{int::*, RawConst, RawFlags},
17 proto::{MyDeserialize, MySerialize},
18};
19
20pub use self::{
21 anonymous_gtid_event::AnonymousGtidEvent,
22 begin_load_query_event::BeginLoadQueryEvent,
23 delete_rows_event::DeleteRowsEvent,
24 delete_rows_event_v1::DeleteRowsEventV1,
25 execute_load_query_event::ExecuteLoadQueryEvent,
26 format_description_event::FormatDescriptionEvent,
27 gtid_event::GtidEvent,
28 incident_event::IncidentEvent,
29 intvar_event::IntvarEvent,
30 partial_update_rows_event::PartialUpdateRowsEvent,
31 query_event::{QueryEvent, StatusVar, StatusVarVal, StatusVars, StatusVarsIterator},
32 rand_event::RandEvent,
33 rotate_event::RotateEvent,
34 rows_event::{RowsEvent, RowsEventRows},
35 rows_query_event::RowsQueryEvent,
36 table_map_event::*,
37 transaction_payload_event::{TransactionPayloadEvent, TransactionPayloadReader},
38 update_rows_event::UpdateRowsEvent,
39 update_rows_event_v1::UpdateRowsEventV1,
40 user_var_event::UserVarEvent,
41 write_rows_event::WriteRowsEvent,
42 write_rows_event_v1::WriteRowsEventV1,
43 xid_event::XidEvent,
44};
45
46use std::{
47 any::type_name,
48 borrow::Cow,
49 cmp::min,
50 io::{self, Read, Write},
51 u16,
52};
53
54use super::{
55 consts::{
56 BinlogChecksumAlg, BinlogVersion, EventFlags, EventType, RowsEventFlags,
57 UnknownChecksumAlg, UnknownEventType,
58 },
59 misc::LimitWrite,
60 BinlogCtx, BinlogEvent,
61};
62
63mod anonymous_gtid_event;
64mod begin_load_query_event;
65mod delete_rows_event;
66mod delete_rows_event_v1;
67mod execute_load_query_event;
68mod format_description_event;
69mod gtid_event;
70mod incident_event;
71mod intvar_event;
72mod partial_update_rows_event;
73mod query_event;
74mod rand_event;
75mod rotate_event;
76mod rows_event;
77mod rows_query_event;
78mod table_map_event;
79mod transaction_payload_event;
80mod update_rows_event;
81mod update_rows_event_v1;
82mod user_var_event;
83mod write_rows_event;
84mod write_rows_event_v1;
85mod xid_event;
86
87#[derive(Debug, Clone, Eq, PartialEq)]
92pub struct Event {
93 fde: FormatDescriptionEvent<'static>,
95 header: BinlogEventHeader,
97 data: Vec<u8>,
104 footer: BinlogEventFooter,
106 checksum: [u8; BinlogEventFooter::BINLOG_CHECKSUM_LEN],
110}
111
112impl Event {
113 pub fn read<'a, T: Read>(
115 fde: &'a FormatDescriptionEvent<'a>,
116 mut input: T,
117 ) -> io::Result<Self> {
118 let binlog_header_len = BinlogEventHeader::LEN;
119 let mut fde = fde.clone().into_owned();
120
121 let mut header_buf = [0u8; BinlogEventHeader::LEN];
122 input.read_exact(&mut header_buf)?;
123 let header = BinlogEventHeader::deserialize((), &mut ParseBuf(&header_buf))?;
124
125 let mut data = vec![0_u8; (S(header.event_size() as usize) - S(binlog_header_len)).0];
126 input.read_exact(&mut data).unwrap();
127
128 let is_fde = header.event_type.0 == EventType::FORMAT_DESCRIPTION_EVENT as u8;
129 let mut bytes_to_truncate = 0;
130 let mut checksum = [0_u8; BinlogEventFooter::BINLOG_CHECKSUM_LEN];
131
132 let footer = if is_fde {
133 let footer = BinlogEventFooter::read(&data)?;
134 if footer.checksum_alg.is_some() {
135 bytes_to_truncate += BinlogEventFooter::BINLOG_CHECKSUM_ALG_DESC_LEN;
137 }
138 fde = fde.with_footer(footer);
140 footer
141 } else {
142 fde.footer()
143 };
144
145 let contains_checksum = footer.checksum_alg.is_some()
148 && (is_fde || footer.checksum_alg != Some(RawConst::new(0)))
149 && footer.checksum_enabled;
150
151 if contains_checksum {
152 bytes_to_truncate += BinlogEventFooter::BINLOG_CHECKSUM_LEN;
154 checksum.copy_from_slice(&data[data.len() - BinlogEventFooter::BINLOG_CHECKSUM_LEN..]);
155 }
156
157 data.truncate(data.len() - bytes_to_truncate);
158
159 Ok(Self {
160 fde,
161 header,
162 data,
163 footer,
164 checksum,
165 })
166 }
167
168 pub fn write<T: Write>(&self, version: BinlogVersion, mut output: T) -> io::Result<()> {
170 let is_fde = self.header.event_type.0 == EventType::FORMAT_DESCRIPTION_EVENT as u8;
171 let mut output = output.limit(S(self.len(version)));
172
173 let mut header_buf = Vec::with_capacity(BinlogEventHeader::LEN);
174 self.header.serialize(&mut header_buf);
175 output.write_all(&header_buf)?;
176 output.write_all(&self.data)?;
177
178 if let Ok(Some(alg)) = self.footer.get_checksum_alg() {
179 if is_fde {
180 output.write_u8(alg as u8)?;
181 }
182 if alg == BinlogChecksumAlg::BINLOG_CHECKSUM_ALG_CRC32 || is_fde {
183 output.write_u32::<LittleEndian>(self.calc_checksum(alg))?;
184 }
185 }
186
187 Ok(())
188 }
189
190 fn len(&self, _version: BinlogVersion) -> usize {
192 let is_fde = self.header.event_type.0 == EventType::FORMAT_DESCRIPTION_EVENT as u8;
193 let mut len = S(0);
194
195 len += S(BinlogEventHeader::LEN);
196 len += S(self.data.len());
197 if let Ok(Some(alg)) = self.footer.get_checksum_alg() {
198 if is_fde {
199 len += S(BinlogEventFooter::BINLOG_CHECKSUM_ALG_DESC_LEN);
200 }
201 if is_fde || alg != BinlogChecksumAlg::BINLOG_CHECKSUM_ALG_OFF {
202 len += S(BinlogEventFooter::BINLOG_CHECKSUM_LEN);
203 }
204 }
205
206 min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
207 }
208
209 pub fn fde(&self) -> &FormatDescriptionEvent<'static> {
211 &self.fde
212 }
213
214 pub fn header(&self) -> BinlogEventHeader {
216 self.header
217 }
218
219 pub fn data(&self) -> &[u8] {
221 &self.data
222 }
223
224 pub fn footer(&self) -> BinlogEventFooter {
226 self.footer
227 }
228
229 pub fn checksum(&self) -> Option<[u8; BinlogEventFooter::BINLOG_CHECKSUM_LEN]> {
231 let contains_checksum = self.footer.checksum_alg.is_some()
232 && (self.header.event_type.0 == (EventType::FORMAT_DESCRIPTION_EVENT as u8)
233 || self.footer.checksum_alg != Some(RawConst::new(0)));
234 contains_checksum.then_some(self.checksum)
235 }
236
237 pub fn read_event<'a, T: BinlogEvent<'a>>(&'a self) -> io::Result<T> {
239 let event_size = BinlogEventHeader::LEN + self.data.len();
241 let event_data = &mut ParseBuf(&self.data);
242 let ctx = BinlogCtx::new(event_size, &self.fde);
243
244 let event = event_data.parse(ctx)?;
245
246 if !event_data.is_empty() {
248 return Err(io::Error::new(
249 io::ErrorKind::Other,
250 format!(
251 "bytes remaining on stream while reading {}",
252 type_name::<T>()
253 ),
254 ));
255 }
256
257 Ok(event)
258 }
259
260 pub fn read_data(&self) -> io::Result<Option<EventData<'_>>> {
262 use EventType::*;
263
264 let event_type = match self.header.event_type.get() {
265 Ok(event_type) => event_type,
266 _ => return Ok(None),
267 };
268
269 let event_data = match event_type {
270 ENUM_END_EVENT | UNKNOWN_EVENT => EventData::UnknownEvent,
271 START_EVENT_V3 => EventData::StartEventV3(Cow::Borrowed(&*self.data)),
272 QUERY_EVENT => EventData::QueryEvent(self.read_event()?),
273 STOP_EVENT => EventData::StopEvent,
274 ROTATE_EVENT => EventData::RotateEvent(self.read_event()?),
275 INTVAR_EVENT => EventData::IntvarEvent(self.read_event()?),
276 LOAD_EVENT => EventData::LoadEvent(Cow::Borrowed(&*self.data)),
277 SLAVE_EVENT => EventData::SlaveEvent,
278 CREATE_FILE_EVENT => EventData::CreateFileEvent(Cow::Borrowed(&*self.data)),
279 APPEND_BLOCK_EVENT => EventData::AppendBlockEvent(Cow::Borrowed(&*self.data)),
280 EXEC_LOAD_EVENT => EventData::ExecLoadEvent(Cow::Borrowed(&*self.data)),
281 DELETE_FILE_EVENT => EventData::DeleteFileEvent(Cow::Borrowed(&*self.data)),
282 NEW_LOAD_EVENT => EventData::NewLoadEvent(Cow::Borrowed(&*self.data)),
283 RAND_EVENT => EventData::RandEvent(self.read_event()?),
284 USER_VAR_EVENT => EventData::UserVarEvent(self.read_event()?),
285 FORMAT_DESCRIPTION_EVENT => {
286 let fde = self
287 .read_event::<FormatDescriptionEvent>()?
288 .with_footer(self.footer);
289 EventData::FormatDescriptionEvent(fde)
290 }
291 XID_EVENT => EventData::XidEvent(self.read_event()?),
292 BEGIN_LOAD_QUERY_EVENT => EventData::BeginLoadQueryEvent(self.read_event()?),
293 EXECUTE_LOAD_QUERY_EVENT => EventData::ExecuteLoadQueryEvent(self.read_event()?),
294 TABLE_MAP_EVENT => EventData::TableMapEvent(self.read_event()?),
295 PRE_GA_WRITE_ROWS_EVENT => EventData::PreGaWriteRowsEvent(Cow::Borrowed(&*self.data)),
296 PRE_GA_UPDATE_ROWS_EVENT => EventData::PreGaUpdateRowsEvent(Cow::Borrowed(&*self.data)),
297 PRE_GA_DELETE_ROWS_EVENT => EventData::PreGaDeleteRowsEvent(Cow::Borrowed(&*self.data)),
298 WRITE_ROWS_EVENT_V1 => {
299 EventData::RowsEvent(RowsEventData::WriteRowsEventV1(self.read_event()?))
300 }
301 UPDATE_ROWS_EVENT_V1 => {
302 EventData::RowsEvent(RowsEventData::UpdateRowsEventV1(self.read_event()?))
303 }
304 DELETE_ROWS_EVENT_V1 => {
305 EventData::RowsEvent(RowsEventData::DeleteRowsEventV1(self.read_event()?))
306 }
307 INCIDENT_EVENT => EventData::IncidentEvent(self.read_event()?),
308 HEARTBEAT_EVENT => EventData::HeartbeatEvent,
309 IGNORABLE_EVENT => EventData::IgnorableEvent(Cow::Borrowed(&*self.data)),
310 ROWS_QUERY_EVENT => EventData::RowsQueryEvent(self.read_event()?),
311 WRITE_ROWS_EVENT => {
312 EventData::RowsEvent(RowsEventData::WriteRowsEvent(self.read_event()?))
313 }
314 UPDATE_ROWS_EVENT => {
315 EventData::RowsEvent(RowsEventData::UpdateRowsEvent(self.read_event()?))
316 }
317 DELETE_ROWS_EVENT => {
318 EventData::RowsEvent(RowsEventData::DeleteRowsEvent(self.read_event()?))
319 }
320 GTID_EVENT => EventData::GtidEvent(self.read_event()?),
321 ANONYMOUS_GTID_EVENT => EventData::AnonymousGtidEvent(self.read_event()?),
322 PREVIOUS_GTIDS_EVENT => EventData::PreviousGtidsEvent(Cow::Borrowed(&*self.data)),
323 TRANSACTION_CONTEXT_EVENT => {
324 EventData::TransactionContextEvent(Cow::Borrowed(&*self.data))
325 }
326 VIEW_CHANGE_EVENT => EventData::ViewChangeEvent(Cow::Borrowed(&*self.data)),
327 XA_PREPARE_LOG_EVENT => EventData::XaPrepareLogEvent(Cow::Borrowed(&*self.data)),
328 PARTIAL_UPDATE_ROWS_EVENT => {
329 EventData::RowsEvent(RowsEventData::PartialUpdateRowsEvent(self.read_event()?))
330 }
331 TRANSACTION_PAYLOAD_EVENT => EventData::TransactionPayloadEvent(self.read_event()?),
332 };
333
334 Ok(Some(event_data))
335 }
336
337 pub fn calc_checksum(&self, alg: BinlogChecksumAlg) -> u32 {
339 let is_fde = self.header.event_type.0 == EventType::FORMAT_DESCRIPTION_EVENT as u8;
340
341 let mut hasher = crc32fast::Hasher::new();
342 let mut header = Vec::with_capacity(BinlogEventHeader::LEN);
343 let mut header_struct = self.header;
344 if header_struct
345 .flags
346 .get()
347 .contains(EventFlags::LOG_EVENT_BINLOG_IN_USE_F)
348 {
349 header_struct.flags.0 &= !(EventFlags::LOG_EVENT_BINLOG_IN_USE_F.bits());
354 }
355 header_struct.serialize(&mut header);
356 hasher.update(&header);
357 hasher.update(&self.data);
358 if is_fde {
359 hasher.update(&[alg as u8][..]);
360 }
361 hasher.finalize()
362 }
363}
364
365#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
367pub struct BinlogEventHeader {
368 timestamp: RawInt<LeU32>,
370 event_type: RawConst<u8, EventType>,
372 server_id: RawInt<LeU32>,
376 event_size: RawInt<LeU32>,
378 log_pos: RawInt<LeU32>,
380 flags: RawFlags<EventFlags, LeU16>,
384}
385
386impl BinlogEventHeader {
387 pub const LEN: usize = 19;
389
390 pub fn new(
392 timestamp: u32,
393 event_type: EventType,
394 server_id: u32,
395 event_size: u32,
396 log_pos: u32,
397 flags: EventFlags,
398 ) -> Self {
399 Self {
400 timestamp: RawInt::new(timestamp),
401 event_type: RawConst::new(event_type as u8),
402 server_id: RawInt::new(server_id),
403 event_size: RawInt::new(event_size),
404 log_pos: RawInt::new(log_pos),
405 flags: RawFlags::new(flags.bits()),
406 }
407 }
408
409 pub fn timestamp(&self) -> u32 {
413 self.timestamp.0
414 }
415
416 pub fn event_type_raw(&self) -> u8 {
418 self.event_type.0
419 }
420
421 pub fn event_type(&self) -> Result<EventType, UnknownEventType> {
423 self.event_type.get()
424 }
425
426 pub fn server_id(&self) -> u32 {
430 self.server_id.0
431 }
432
433 pub fn event_size(&self) -> u32 {
435 self.event_size.0
436 }
437
438 pub fn log_pos(&self) -> u32 {
440 self.log_pos.0
441 }
442
443 pub fn flags(&self) -> EventFlags {
445 self.flags.get()
446 }
447
448 pub fn flags_raw(&self) -> u16 {
450 self.flags.0
451 }
452}
453
454impl<'de> MyDeserialize<'de> for BinlogEventHeader {
455 const SIZE: Option<usize> = Some(Self::LEN);
456 type Ctx = ();
457
458 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
459 let mut buf: ParseBuf = buf.parse_unchecked(Self::LEN)?;
460 Ok(Self {
461 timestamp: buf.parse_unchecked(())?,
462 event_type: buf.parse_unchecked(())?,
463 server_id: buf.parse_unchecked(())?,
464 event_size: buf.parse_unchecked(())?,
465 log_pos: buf.parse_unchecked(())?,
466 flags: buf.parse_unchecked(())?,
467 })
468 }
469}
470
471impl MySerialize for BinlogEventHeader {
472 fn serialize(&self, buf: &mut Vec<u8>) {
473 self.timestamp.serialize(&mut *buf);
474 self.event_type.serialize(&mut *buf);
475 self.server_id.serialize(&mut *buf);
476 self.event_size.serialize(&mut *buf);
477 self.log_pos.serialize(&mut *buf);
478 self.flags.serialize(&mut *buf);
479 }
480}
481
482#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
484pub struct BinlogEventFooter {
485 checksum_alg: Option<RawConst<u8, BinlogChecksumAlg>>,
487
488 checksum_enabled: bool,
490}
491
492impl BinlogEventFooter {
493 pub const BINLOG_CHECKSUM_ALG_DESC_LEN: usize = 1;
495 pub const BINLOG_CHECKSUM_LEN: usize = 4;
497 pub const CHECKSUM_VERSION_PRODUCT: (u8, u8, u8) = (5, 6, 1);
499
500 pub fn new(checksum_alg: BinlogChecksumAlg) -> Self {
501 Self {
502 checksum_alg: Some(RawConst::new(checksum_alg as u8)),
503 checksum_enabled: true,
504 }
505 }
506
507 pub fn get_checksum_alg(&self) -> Result<Option<BinlogChecksumAlg>, UnknownChecksumAlg> {
509 self.checksum_alg.as_ref().map(RawConst::get).transpose()
510 }
511
512 pub fn get_checksum_enabled(&self) -> bool {
514 self.checksum_enabled
515 }
516
517 pub fn set_checksum_enabled(&mut self, enabled: bool) {
519 self.checksum_enabled = enabled;
520 }
521
522 pub fn read(buf: &[u8]) -> io::Result<Self> {
526 let checksum_alg = if buf.len()
527 >= FormatDescriptionEvent::SERVER_VER_OFFSET + FormatDescriptionEvent::SERVER_VER_LEN
528 {
529 let mut server_version = vec![0_u8; FormatDescriptionEvent::SERVER_VER_LEN];
530 (&buf[FormatDescriptionEvent::SERVER_VER_OFFSET..]).read_exact(&mut server_version)?;
531 server_version[FormatDescriptionEvent::SERVER_VER_LEN - 1] = 0;
532 let version = crate::misc::split_version(&server_version);
533 if version < Self::CHECKSUM_VERSION_PRODUCT {
534 None
535 } else {
536 let offset = buf.len()
537 - (BinlogEventFooter::BINLOG_CHECKSUM_ALG_DESC_LEN
538 + BinlogEventFooter::BINLOG_CHECKSUM_LEN);
539 Some(buf[offset])
540 }
541 } else {
542 None
543 };
544
545 Ok(Self {
546 checksum_alg: checksum_alg.map(RawConst::new),
547 checksum_enabled: true,
548 })
549 }
550}
551
552impl Default for BinlogEventFooter {
553 fn default() -> Self {
554 BinlogEventFooter {
555 checksum_alg: Some(RawConst::new(
556 BinlogChecksumAlg::BINLOG_CHECKSUM_ALG_OFF as u8,
557 )),
558 checksum_enabled: true,
559 }
560 }
561}
562
563#[derive(Debug, Clone, Eq, PartialEq, Hash)]
565pub enum EventData<'a> {
566 UnknownEvent,
567 StartEventV3(Cow<'a, [u8]>),
569 QueryEvent(QueryEvent<'a>),
570 StopEvent,
571 RotateEvent(RotateEvent<'a>),
572 IntvarEvent(IntvarEvent),
573 LoadEvent(Cow<'a, [u8]>),
575 SlaveEvent,
576 CreateFileEvent(Cow<'a, [u8]>),
577 AppendBlockEvent(Cow<'a, [u8]>),
579 ExecLoadEvent(Cow<'a, [u8]>),
581 DeleteFileEvent(Cow<'a, [u8]>),
583 NewLoadEvent(Cow<'a, [u8]>),
585 RandEvent(RandEvent),
586 UserVarEvent(UserVarEvent<'a>),
587 FormatDescriptionEvent(FormatDescriptionEvent<'a>),
588 XidEvent(XidEvent),
589 BeginLoadQueryEvent(BeginLoadQueryEvent<'a>),
590 ExecuteLoadQueryEvent(ExecuteLoadQueryEvent<'a>),
591 TableMapEvent(TableMapEvent<'a>),
592 PreGaWriteRowsEvent(Cow<'a, [u8]>),
594 PreGaUpdateRowsEvent(Cow<'a, [u8]>),
596 PreGaDeleteRowsEvent(Cow<'a, [u8]>),
598 IncidentEvent(IncidentEvent<'a>),
599 HeartbeatEvent,
600 IgnorableEvent(Cow<'a, [u8]>),
601 RowsQueryEvent(RowsQueryEvent<'a>),
602 GtidEvent(GtidEvent),
603 AnonymousGtidEvent(AnonymousGtidEvent),
605 PreviousGtidsEvent(Cow<'a, [u8]>),
607 TransactionContextEvent(Cow<'a, [u8]>),
609 ViewChangeEvent(Cow<'a, [u8]>),
611 XaPrepareLogEvent(Cow<'a, [u8]>),
613 RowsEvent(RowsEventData<'a>),
614 TransactionPayloadEvent(TransactionPayloadEvent<'a>),
615}
616
617impl<'a> EventData<'a> {
618 pub fn into_owned(self) -> EventData<'static> {
619 match self {
620 EventData::UnknownEvent => EventData::UnknownEvent,
621 EventData::StartEventV3(ev) => EventData::StartEventV3(Cow::Owned(ev.into_owned())),
622 Self::QueryEvent(ev) => EventData::QueryEvent(ev.into_owned()),
623 Self::StopEvent => EventData::StopEvent,
624 Self::RotateEvent(ev) => EventData::RotateEvent(ev.into_owned()),
625 Self::IntvarEvent(ev) => EventData::IntvarEvent(ev),
626 Self::LoadEvent(ev) => EventData::LoadEvent(Cow::Owned(ev.into_owned())),
627 Self::SlaveEvent => EventData::SlaveEvent,
628 Self::CreateFileEvent(ev) => EventData::CreateFileEvent(Cow::Owned(ev.into_owned())),
629 Self::AppendBlockEvent(ev) => EventData::AppendBlockEvent(Cow::Owned(ev.into_owned())),
630 Self::ExecLoadEvent(ev) => EventData::ExecLoadEvent(Cow::Owned(ev.into_owned())),
631 Self::DeleteFileEvent(ev) => EventData::DeleteFileEvent(Cow::Owned(ev.into_owned())),
632 Self::NewLoadEvent(ev) => EventData::NewLoadEvent(Cow::Owned(ev.into_owned())),
633 Self::RandEvent(ev) => EventData::RandEvent(ev),
634 Self::UserVarEvent(ev) => EventData::UserVarEvent(ev.into_owned()),
635 Self::FormatDescriptionEvent(ev) => EventData::FormatDescriptionEvent(ev.into_owned()),
636 Self::XidEvent(ev) => EventData::XidEvent(ev),
637 Self::BeginLoadQueryEvent(ev) => EventData::BeginLoadQueryEvent(ev.into_owned()),
638 Self::ExecuteLoadQueryEvent(ev) => EventData::ExecuteLoadQueryEvent(ev.into_owned()),
639 Self::TableMapEvent(ev) => EventData::TableMapEvent(ev.into_owned()),
640 Self::PreGaWriteRowsEvent(ev) => {
641 EventData::PreGaWriteRowsEvent(Cow::Owned(ev.into_owned()))
642 }
643 Self::PreGaUpdateRowsEvent(ev) => {
644 EventData::PreGaUpdateRowsEvent(Cow::Owned(ev.into_owned()))
645 }
646 Self::PreGaDeleteRowsEvent(ev) => {
647 EventData::PreGaDeleteRowsEvent(Cow::Owned(ev.into_owned()))
648 }
649 Self::IncidentEvent(ev) => EventData::IncidentEvent(ev.into_owned()),
650 Self::HeartbeatEvent => EventData::HeartbeatEvent,
651 Self::IgnorableEvent(ev) => EventData::IgnorableEvent(Cow::Owned(ev.into_owned())),
652 Self::RowsQueryEvent(ev) => EventData::RowsQueryEvent(ev.into_owned()),
653 Self::GtidEvent(ev) => EventData::GtidEvent(ev),
654 Self::AnonymousGtidEvent(ev) => EventData::AnonymousGtidEvent(ev),
655 Self::PreviousGtidsEvent(ev) => {
656 EventData::PreviousGtidsEvent(Cow::Owned(ev.into_owned()))
657 }
658 Self::TransactionContextEvent(ev) => {
659 EventData::TransactionContextEvent(Cow::Owned(ev.into_owned()))
660 }
661 Self::ViewChangeEvent(ev) => EventData::ViewChangeEvent(Cow::Owned(ev.into_owned())),
662 Self::XaPrepareLogEvent(ev) => {
663 EventData::XaPrepareLogEvent(Cow::Owned(ev.into_owned()))
664 }
665 Self::RowsEvent(ev) => EventData::RowsEvent(ev.into_owned()),
666 Self::TransactionPayloadEvent(ev) => {
667 EventData::TransactionPayloadEvent(ev.into_owned())
668 }
669 }
670 }
671}
672
673impl MySerialize for EventData<'_> {
674 fn serialize(&self, buf: &mut Vec<u8>) {
675 match self {
676 EventData::UnknownEvent => (),
677 EventData::StartEventV3(ev) => buf.put_slice(ev),
678 EventData::QueryEvent(ev) => ev.serialize(buf),
679 EventData::StopEvent => (),
680 EventData::RotateEvent(ev) => ev.serialize(buf),
681 EventData::IntvarEvent(ev) => ev.serialize(buf),
682 EventData::LoadEvent(ev) => buf.put_slice(ev),
683 EventData::SlaveEvent => (),
684 EventData::CreateFileEvent(ev) => buf.put_slice(ev),
685 EventData::AppendBlockEvent(ev) => buf.put_slice(ev),
686 EventData::ExecLoadEvent(ev) => buf.put_slice(ev),
687 EventData::DeleteFileEvent(ev) => buf.put_slice(ev),
688 EventData::NewLoadEvent(ev) => buf.put_slice(ev),
689 EventData::RandEvent(ev) => ev.serialize(buf),
690 EventData::UserVarEvent(ev) => ev.serialize(buf),
691 EventData::FormatDescriptionEvent(ev) => ev.serialize(buf),
692 EventData::XidEvent(ev) => ev.serialize(buf),
693 EventData::BeginLoadQueryEvent(ev) => ev.serialize(buf),
694 EventData::ExecuteLoadQueryEvent(ev) => ev.serialize(buf),
695 EventData::TableMapEvent(ev) => ev.serialize(buf),
696 EventData::PreGaWriteRowsEvent(ev) => buf.put_slice(ev),
697 EventData::PreGaUpdateRowsEvent(ev) => buf.put_slice(ev),
698 EventData::PreGaDeleteRowsEvent(ev) => buf.put_slice(ev),
699 EventData::IncidentEvent(ev) => ev.serialize(buf),
700 EventData::HeartbeatEvent => (),
701 EventData::IgnorableEvent(ev) => buf.put_slice(ev),
702 EventData::RowsQueryEvent(ev) => ev.serialize(buf),
703 EventData::GtidEvent(ev) => ev.serialize(buf),
704 EventData::AnonymousGtidEvent(ev) => ev.serialize(buf),
705 EventData::PreviousGtidsEvent(ev) => buf.put_slice(ev),
706 EventData::TransactionContextEvent(ev) => buf.put_slice(ev),
707 EventData::ViewChangeEvent(ev) => buf.put_slice(ev),
708 EventData::XaPrepareLogEvent(ev) => buf.put_slice(ev),
709 EventData::RowsEvent(ev) => ev.serialize(buf),
710 EventData::TransactionPayloadEvent(ev) => ev.serialize(buf),
711 }
712 }
713}
714
715#[derive(Debug, Clone, Eq, PartialEq, Hash)]
717pub enum RowsEventData<'a> {
718 WriteRowsEventV1(WriteRowsEventV1<'a>),
719 UpdateRowsEventV1(UpdateRowsEventV1<'a>),
720 DeleteRowsEventV1(DeleteRowsEventV1<'a>),
721 WriteRowsEvent(WriteRowsEvent<'a>),
722 UpdateRowsEvent(UpdateRowsEvent<'a>),
723 DeleteRowsEvent(DeleteRowsEvent<'a>),
724 PartialUpdateRowsEvent(PartialUpdateRowsEvent<'a>),
725}
726
727impl<'a> RowsEventData<'a> {
728 pub fn table_id(&self) -> u64 {
730 match self {
731 RowsEventData::WriteRowsEventV1(ev) => ev.table_id(),
732 RowsEventData::UpdateRowsEventV1(ev) => ev.table_id(),
733 RowsEventData::DeleteRowsEventV1(ev) => ev.table_id(),
734 RowsEventData::WriteRowsEvent(ev) => ev.table_id(),
735 RowsEventData::UpdateRowsEvent(ev) => ev.table_id(),
736 RowsEventData::DeleteRowsEvent(ev) => ev.table_id(),
737 RowsEventData::PartialUpdateRowsEvent(ev) => ev.table_id(),
738 }
739 }
740
741 pub fn num_columns(&self) -> u64 {
743 match self {
744 RowsEventData::WriteRowsEventV1(ev) => ev.num_columns(),
745 RowsEventData::UpdateRowsEventV1(ev) => ev.num_columns(),
746 RowsEventData::DeleteRowsEventV1(ev) => ev.num_columns(),
747 RowsEventData::WriteRowsEvent(ev) => ev.num_columns(),
748 RowsEventData::UpdateRowsEvent(ev) => ev.num_columns(),
749 RowsEventData::DeleteRowsEvent(ev) => ev.num_columns(),
750 RowsEventData::PartialUpdateRowsEvent(ev) => ev.num_columns(),
751 }
752 }
753
754 pub fn columns_before_image(&'a self) -> Option<&'a BitSlice<u8>> {
758 match self {
759 RowsEventData::WriteRowsEventV1(_) => None,
760 RowsEventData::UpdateRowsEventV1(ev) => Some(ev.columns_before_image()),
761 RowsEventData::DeleteRowsEventV1(ev) => Some(ev.columns_before_image()),
762 RowsEventData::WriteRowsEvent(_) => None,
763 RowsEventData::UpdateRowsEvent(ev) => Some(ev.columns_before_image()),
764 RowsEventData::DeleteRowsEvent(ev) => Some(ev.columns_before_image()),
765 RowsEventData::PartialUpdateRowsEvent(ev) => Some(ev.columns_before_image()),
766 }
767 }
768
769 pub fn columns_after_image(&'a self) -> Option<&'a BitSlice<u8>> {
773 match self {
774 RowsEventData::WriteRowsEventV1(ev) => Some(ev.columns_after_image()),
775 RowsEventData::UpdateRowsEventV1(ev) => Some(ev.columns_after_image()),
776 RowsEventData::DeleteRowsEventV1(_) => None,
777 RowsEventData::WriteRowsEvent(ev) => Some(ev.columns_after_image()),
778 RowsEventData::UpdateRowsEvent(ev) => Some(ev.columns_after_image()),
779 RowsEventData::DeleteRowsEvent(_) => None,
780 RowsEventData::PartialUpdateRowsEvent(ev) => Some(ev.columns_after_image()),
781 }
782 }
783
784 pub fn rows_data(&'a self) -> &'a [u8] {
786 match self {
787 RowsEventData::WriteRowsEventV1(ev) => ev.rows_data(),
788 RowsEventData::UpdateRowsEventV1(ev) => ev.rows_data(),
789 RowsEventData::DeleteRowsEventV1(ev) => ev.rows_data(),
790 RowsEventData::WriteRowsEvent(ev) => ev.rows_data(),
791 RowsEventData::UpdateRowsEvent(ev) => ev.rows_data(),
792 RowsEventData::DeleteRowsEvent(ev) => ev.rows_data(),
793 RowsEventData::PartialUpdateRowsEvent(ev) => ev.rows_data(),
794 }
795 }
796
797 pub fn flags(&self) -> RowsEventFlags {
799 match self {
800 RowsEventData::WriteRowsEventV1(ev) => ev.flags(),
801 RowsEventData::UpdateRowsEventV1(ev) => ev.flags(),
802 RowsEventData::DeleteRowsEventV1(ev) => ev.flags(),
803 RowsEventData::WriteRowsEvent(ev) => ev.flags(),
804 RowsEventData::UpdateRowsEvent(ev) => ev.flags(),
805 RowsEventData::DeleteRowsEvent(ev) => ev.flags(),
806 RowsEventData::PartialUpdateRowsEvent(ev) => ev.flags(),
807 }
808 }
809
810 pub fn rows(&'a self, table_map_event: &'a TableMapEvent<'a>) -> RowsEventRows<'a> {
812 match self {
813 RowsEventData::WriteRowsEventV1(ev) => ev.rows(table_map_event),
814 RowsEventData::UpdateRowsEventV1(ev) => ev.rows(table_map_event),
815 RowsEventData::DeleteRowsEventV1(ev) => ev.rows(table_map_event),
816 RowsEventData::WriteRowsEvent(ev) => ev.rows(table_map_event),
817 RowsEventData::UpdateRowsEvent(ev) => ev.rows(table_map_event),
818 RowsEventData::DeleteRowsEvent(ev) => ev.rows(table_map_event),
819 RowsEventData::PartialUpdateRowsEvent(ev) => ev.rows(table_map_event),
820 }
821 }
822
823 pub fn into_owned(self) -> RowsEventData<'static> {
824 match self {
825 Self::WriteRowsEventV1(ev) => RowsEventData::WriteRowsEventV1(ev.into_owned()),
826 Self::UpdateRowsEventV1(ev) => RowsEventData::UpdateRowsEventV1(ev.into_owned()),
827 Self::DeleteRowsEventV1(ev) => RowsEventData::DeleteRowsEventV1(ev.into_owned()),
828 Self::WriteRowsEvent(ev) => RowsEventData::WriteRowsEvent(ev.into_owned()),
829 Self::UpdateRowsEvent(ev) => RowsEventData::UpdateRowsEvent(ev.into_owned()),
830 Self::DeleteRowsEvent(ev) => RowsEventData::DeleteRowsEvent(ev.into_owned()),
831 Self::PartialUpdateRowsEvent(ev) => {
832 RowsEventData::PartialUpdateRowsEvent(ev.into_owned())
833 }
834 }
835 }
836}
837
838impl MySerialize for RowsEventData<'_> {
839 fn serialize(&self, buf: &mut Vec<u8>) {
840 match self {
841 RowsEventData::WriteRowsEventV1(ev) => ev.serialize(buf),
842 RowsEventData::UpdateRowsEventV1(ev) => ev.serialize(buf),
843 RowsEventData::DeleteRowsEventV1(ev) => ev.serialize(buf),
844 RowsEventData::WriteRowsEvent(ev) => ev.serialize(buf),
845 RowsEventData::UpdateRowsEvent(ev) => ev.serialize(buf),
846 RowsEventData::DeleteRowsEvent(ev) => ev.serialize(buf),
847 RowsEventData::PartialUpdateRowsEvent(ev) => ev.serialize(buf),
848 }
849 }
850}