mysql_common/binlog/
mod.rs

1// Copyright (c) 2020 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9//! Binlog-related structures and functions. This implementation assumes
10//! binlog version >= 4 (MySql >= 5.0.0).
11//!
12//! All structures of this module contains raw data that may not necessarily be valid.
13//! Please consult the MySql documentation.
14
15// #![cfg(features = "binlog")]
16
17use std::{
18    collections::HashMap,
19    convert::TryFrom,
20    hash::Hash,
21    io::{
22        self, BufRead, Error,
23        ErrorKind::{InvalidData, UnexpectedEof},
24        Read, Write,
25    },
26};
27
28use crate::{
29    constants::ColumnType,
30    proto::{MyDeserialize, MySerialize},
31};
32
33#[allow(unused)]
34use self::events::TransactionPayloadEvent;
35
36use self::{
37    consts::{BinlogVersion, EventType},
38    events::{Event, FormatDescriptionEvent, RotateEvent, TableMapEvent},
39};
40
41pub mod consts;
42pub mod decimal;
43pub mod events;
44pub mod jsonb;
45pub mod jsondiff;
46pub mod misc;
47pub mod row;
48pub mod time;
49pub mod value;
50
51pub struct BinlogCtx<'a> {
52    pub event_size: usize,
53    pub fde: &'a FormatDescriptionEvent<'a>,
54}
55
56impl<'a> BinlogCtx<'a> {
57    pub fn new(event_size: usize, fde: &'a FormatDescriptionEvent<'a>) -> Self {
58        Self { event_size, fde }
59    }
60}
61
62/// Binlog event.
63pub trait BinlogStruct<'a>: MySerialize + MyDeserialize<'a, Ctx = BinlogCtx<'a>> {
64    /// Returns serialized length of this struct in bytes.
65    ///
66    /// *   implementation must truncate each field to its maximum length.
67    fn len(&self, version: BinlogVersion) -> usize;
68}
69
70pub trait BinlogEvent<'a>: BinlogStruct<'a> {
71    /// An event type, associated with this struct (if any).
72    const EVENT_TYPE: EventType;
73}
74
75/// A binlog file starts with a Binlog File Header `[ fe 'bin' ]`.
76#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
77pub struct BinlogFileHeader;
78
79impl BinlogFileHeader {
80    /// Length of a binlog file header.
81    pub const LEN: usize = 4;
82    /// Value of a binlog file header.
83    pub const VALUE: [u8; Self::LEN] = [0xfe, b'b', b'i', b'n'];
84
85    pub fn read<T: Read>(mut input: T) -> io::Result<Self> {
86        let mut buf = [0_u8; Self::LEN];
87        input.read_exact(&mut buf)?;
88
89        if buf != Self::VALUE {
90            return Err(Error::new(InvalidData, "invalid binlog file header"));
91        }
92
93        Ok(Self)
94    }
95
96    pub fn write<T: Write>(&self, _version: BinlogVersion, mut output: T) -> io::Result<()> {
97        output.write_all(&Self::VALUE)
98    }
99
100    pub fn len(&self, _version: BinlogVersion) -> usize {
101        Self::LEN
102    }
103}
104
105/// Reader for binlog events.
106///
107/// # Note
108///
109/// It's a low-level stream reader and only maintains actual fde and table map,
110/// so one should properly handle encountered events (see docs on [`EventStreamReader::read`]).
111#[derive(Debug)]
112pub struct EventStreamReader {
113    fde: FormatDescriptionEvent<'static>,
114    table_map: HashMap<u64, TableMapEvent<'static>>,
115}
116
117impl EventStreamReader {
118    /// Creates a new instance.
119    pub fn new(version: BinlogVersion) -> Self {
120        Self {
121            fde: FormatDescriptionEvent::new(version),
122            table_map: Default::default(),
123        }
124    }
125
126    /// Returns the format description event.
127    ///
128    /// Returns the default placeholder if there was no FDE yet.
129    pub fn get_fde(&self) -> &FormatDescriptionEvent<'static> {
130        &self.fde
131    }
132
133    /// Disable/Enable checksum verification without changing the original algorithm.
134    ///
135    /// See [`EventStreamReader::read_decompressed`].
136    pub(crate) fn set_checksum_enabled(&mut self, enabled: bool) {
137        self.fde.footer_mut().set_checksum_enabled(enabled);
138    }
139
140    /// Returns the table map event for the given table id.
141    ///
142    /// Should be availeble if rows event with this table id encountered in the stream.
143    pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> {
144        self.table_map.get(&table_id)
145    }
146
147    /// Will read next event from the given stream (Returns None if stream is exhausted).
148    ///
149    /// # Note
150    ///
151    /// Since MySql 8.0.20 it is possible for an event stream to contain an embedded
152    /// stream of events in form of a [`TransactionPayloadEvent`]. This means that
153    /// to properly handle table maps it is necessary to read the embedded stream
154    /// as soon as it is encountered (see [`EventStreamReader::read_decompressed`]).
155    pub fn read<T: BufRead>(&mut self, mut input: T) -> io::Result<Option<Event>> {
156        if input.fill_buf().map(|x| x.is_empty())? {
157            return Ok(None);
158        }
159
160        let event = Event::read(&self.fde, input)?;
161
162        self.handle_event(&event)?;
163
164        Ok(Some(event))
165    }
166
167    /// This function reads decompressed payload of a Transaction_payload_event
168    /// (see [`TransactionPayloadEvent::decompressed`]).
169    ///
170    /// The difference is that checksum verification will be disabled according to the WL#3549.
171    ///
172    /// # Warning
173    ///
174    /// This function can't be used to skip checksum verification for regular events.
175    ///
176    /// # Errors
177    ///
178    /// There is a list of events that should never be a part of a transaction payload and the list
179    /// includes the [`TransactionPayloadEvent`] itself.
180    /// This function will emit an [`io::ErrorKind::Other`] if [`TransactionPayloadEvent`]
181    /// is encountered within the compressed payload.
182    pub fn read_decompressed<T: BufRead>(&mut self, input: T) -> io::Result<Option<Event>> {
183        self.set_checksum_enabled(false);
184        let result = self.read(input);
185        self.set_checksum_enabled(true);
186        let Some(event) = result? else {
187            return Ok(None);
188        };
189
190        if event.header().event_type_raw() == EventType::TRANSACTION_PAYLOAD_EVENT as u8 {
191            return Err(io::Error::new(
192                io::ErrorKind::Other,
193                "TRANSACTION_PAYLOAD_EVENT encountered",
194            ));
195        }
196
197        self.handle_event(&event)?;
198
199        Ok(Some(event))
200    }
201
202    fn handle_event(&mut self, event: &Event) -> io::Result<()> {
203        let event_type = event.header().event_type_raw();
204
205        if event_type == EventType::FORMAT_DESCRIPTION_EVENT as u8 {
206            // we'll redefine fde with an actual one
207            let fde = event.read_event::<FormatDescriptionEvent>()?;
208            self.fde = fde.into_owned().with_footer(event.footer());
209        } else if event_type == EventType::TABLE_MAP_EVENT as u8 {
210            // we'll maintain known table maps
211            let tme = event.read_event::<TableMapEvent>()?;
212            self.table_map.insert(tme.table_id(), tme.into_owned());
213        } else if event_type == EventType::ROTATE_EVENT as u8 {
214            // we'll keep table map size within reasonlable bounds
215
216            // TODO: This value is arbitrary
217            const TABLE_MAP_MAX_SIZE: usize = 64;
218
219            let re = event.read_event::<RotateEvent>()?;
220            if !re.is_fake() {
221                self.table_map.clear();
222                self.table_map.shrink_to(TABLE_MAP_MAX_SIZE);
223            }
224        }
225
226        Ok(())
227    }
228}
229
230/// Binlog file.
231///
232/// It's an iterator over events in a binlog file.
233#[derive(Debug)]
234pub struct BinlogFile<T> {
235    reader: EventStreamReader,
236    read: T,
237}
238
239impl<T: BufRead> BinlogFile<T> {
240    /// Creates a new instance.
241    ///
242    /// It'll try to read binlog file header.
243    pub fn new(version: BinlogVersion, mut read: T) -> io::Result<Self> {
244        let reader = EventStreamReader::new(version);
245        BinlogFileHeader::read(&mut read)?;
246        Ok(Self { reader, read })
247    }
248
249    /// Returns a reference to the binlog stream reader.
250    pub fn reader(&self) -> &EventStreamReader {
251        &self.reader
252    }
253
254    /// Returns a mutable reference to the binlog stream reader.
255    pub fn reader_mut(&mut self) -> &mut EventStreamReader {
256        &mut self.reader
257    }
258}
259
260impl<T: BufRead> Iterator for BinlogFile<T> {
261    type Item = io::Result<Event>;
262
263    fn next(&mut self) -> Option<Self::Item> {
264        match self.reader.read(&mut self.read) {
265            Ok(event) => event.map(Ok),
266            Err(err) if err.kind() == UnexpectedEof => None,
267            Err(err) => Some(Err(err)),
268        }
269    }
270}
271
272impl ColumnType {
273    /// Returns type-specific metadata for this column type,
274    /// as well as the total number of occupied bytes.
275    ///
276    /// `is_array` must be true if `self` is from `MYSQL_TYPE_TYPED_ARRAY` metadata.
277    fn get_metadata<'a>(&self, ptr: &'a [u8], is_array: bool) -> Option<(&'a [u8], usize)> {
278        match self {
279            Self::MYSQL_TYPE_TINY_BLOB
280            | Self::MYSQL_TYPE_BLOB
281            | Self::MYSQL_TYPE_MEDIUM_BLOB
282            | Self::MYSQL_TYPE_LONG_BLOB
283            | Self::MYSQL_TYPE_DOUBLE
284            | Self::MYSQL_TYPE_FLOAT
285            | Self::MYSQL_TYPE_GEOMETRY
286            | Self::MYSQL_TYPE_TIME2
287            | Self::MYSQL_TYPE_DATETIME2
288            | Self::MYSQL_TYPE_TIMESTAMP2
289            | Self::MYSQL_TYPE_JSON
290            | Self::MYSQL_TYPE_VECTOR => ptr.get(..1).map(|x| (x, 1)),
291            Self::MYSQL_TYPE_VARCHAR => {
292                if is_array {
293                    ptr.get(..3).map(|x| (x, 3))
294                } else {
295                    ptr.get(..2).map(|x| (x, 2))
296                }
297            }
298            Self::MYSQL_TYPE_NEWDECIMAL
299            | Self::MYSQL_TYPE_SET
300            | Self::MYSQL_TYPE_ENUM
301            | Self::MYSQL_TYPE_STRING
302            | Self::MYSQL_TYPE_BIT => ptr.get(..2).map(|x| (x, 2)),
303            Self::MYSQL_TYPE_TYPED_ARRAY => Self::try_from(*ptr.first()?)
304                .ok()?
305                .get_metadata(ptr.get(1..)?, true)
306                .map(|(x, n)| (x, n + 1)),
307            Self::MYSQL_TYPE_DECIMAL
308            | Self::MYSQL_TYPE_TINY
309            | Self::MYSQL_TYPE_SHORT
310            | Self::MYSQL_TYPE_LONG
311            | Self::MYSQL_TYPE_NULL
312            | Self::MYSQL_TYPE_TIMESTAMP
313            | Self::MYSQL_TYPE_LONGLONG
314            | Self::MYSQL_TYPE_INT24
315            | Self::MYSQL_TYPE_DATE
316            | Self::MYSQL_TYPE_TIME
317            | Self::MYSQL_TYPE_DATETIME
318            | Self::MYSQL_TYPE_YEAR
319            | Self::MYSQL_TYPE_NEWDATE
320            | Self::MYSQL_TYPE_UNKNOWN
321            | Self::MYSQL_TYPE_VAR_STRING => Some((&[], 0)),
322        }
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use std::{
329        collections::HashMap,
330        convert::TryFrom,
331        io,
332        iter::{once, repeat},
333    };
334
335    use super::{
336        consts::{EventFlags, EventType},
337        events::{BinlogEventHeader, EventData, GtidEvent},
338        BinlogFile, BinlogFileHeader, BinlogVersion,
339    };
340
341    use crate::{
342        binlog::{
343            events::{OptionalMetadataField, RowsEventData},
344            value::BinlogValue,
345        },
346        collations::CollationId,
347        constants::ColumnFlags,
348        proto::MySerialize,
349        row::convert::from_row,
350        value::Value,
351    };
352
353    const BINLOG_FILE: &[u8] = &[
354        0xfe, 0x62, 0x69, 0x6e, 0xfc, 0x35, 0xbb, 0x4a, 0x0f, 0x01, 0x00, 0x00, 0x00, 0x5e, 0x00,
355        0x00, 0x00, 0x62, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x35, 0x2e, 0x30, 0x2e, 0x38,
356        0x36, 0x2d, 0x64, 0x65, 0x62, 0x75, 0x67, 0x2d, 0x6c, 0x6f, 0x67, 0x00, 0x00, 0x00, 0x00,
357        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
358        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
359        0xfc, 0x35, 0xbb, 0x4a, 0x13, 0x38, 0x0d, 0x00, 0x08, 0x00, 0x12, 0x00, 0x04, 0x04, 0x04,
360        0x04, 0x12, 0x00, 0x00, 0x4b, 0x00, 0x04, 0x1a, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00,
361        0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0xc6, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
362        0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00,
363        0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04,
364        0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x74, 0x65, 0x73, 0x74, 0x00, 0x63, 0x72, 0x65, 0x61,
365        0x74, 0x65, 0x20, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x74, 0x31, 0x28, 0x61, 0x20, 0x69,
366        0x6e, 0x74, 0x29, 0x20, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x3d, 0x20, 0x69, 0x6e, 0x6e,
367        0x6f, 0x64, 0x62, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x65, 0x00, 0x00,
368        0x00, 0x2b, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
369        0x05, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
370        0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08,
371        0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x00, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x20, 0x74,
372        0x61, 0x62, 0x6c, 0x65, 0x20, 0x74, 0x32, 0x28, 0x61, 0x20, 0x69, 0x6e, 0x74, 0x29, 0x20,
373        0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x3d, 0x20, 0x69, 0x6e, 0x6e, 0x6f, 0x64, 0x62, 0xfd,
374        0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x45, 0x00, 0x00, 0x00, 0x70, 0x01, 0x00,
375        0x00, 0x08, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x1a,
376        0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
377        0x06, 0x03, 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d, 0x79, 0x73,
378        0x71, 0x6c, 0x00, 0x42, 0x45, 0x47, 0x49, 0x4e, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00,
379        0x00, 0x00, 0x5c, 0x00, 0x00, 0x00, 0xcc, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
380        0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00,
381        0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04,
382        0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x74, 0x65, 0x73, 0x74, 0x00, 0x69, 0x6e, 0x73, 0x65,
383        0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x31, 0x20, 0x28, 0x61, 0x29, 0x20,
384        0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x31, 0x29, 0xfd, 0x35, 0xbb, 0x4a, 0x02,
385        0x01, 0x00, 0x00, 0x00, 0x5d, 0x00, 0x00, 0x00, 0x29, 0x02, 0x00, 0x00, 0x00, 0x00, 0x01,
386        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40,
387        0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74,
388        0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x00, 0x69,
389        0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x32, 0x20, 0x28,
390        0x61, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x31, 0x29, 0xfd, 0x35,
391        0xbb, 0x4a, 0x10, 0x01, 0x00, 0x00, 0x00, 0x1b, 0x00, 0x00, 0x00, 0x44, 0x02, 0x00, 0x00,
392        0x00, 0x00, 0x0b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xfd, 0x35, 0xbb, 0x4a, 0x02,
393        0x01, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0xa8, 0x02, 0x00, 0x00, 0x00, 0x00, 0x01,
394        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40,
395        0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74,
396        0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x74, 0x65, 0x73, 0x74, 0x00, 0x63, 0x72,
397        0x65, 0x61, 0x74, 0x65, 0x20, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x74, 0x33, 0x28, 0x61,
398        0x20, 0x69, 0x6e, 0x74, 0x29, 0x20, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x3d, 0x20, 0x69,
399        0x6e, 0x6e, 0x6f, 0x64, 0x62, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x65,
400        0x00, 0x00, 0x00, 0x0d, 0x03, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00,
401        0x00, 0x00, 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00,
402        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08,
403        0x00, 0x08, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x00, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65,
404        0x20, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x74, 0x34, 0x28, 0x61, 0x20, 0x69, 0x6e, 0x74,
405        0x29, 0x20, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x3d, 0x20, 0x6d, 0x79, 0x69, 0x73, 0x61,
406        0x6d, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x45, 0x00, 0x00, 0x00, 0x52,
407        0x03, 0x00, 0x00, 0x08, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00,
408        0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
409        0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d,
410        0x79, 0x73, 0x71, 0x6c, 0x00, 0x42, 0x45, 0x47, 0x49, 0x4e, 0xfd, 0x35, 0xbb, 0x4a, 0x02,
411        0x01, 0x00, 0x00, 0x00, 0x5c, 0x00, 0x00, 0x00, 0xae, 0x03, 0x00, 0x00, 0x00, 0x00, 0x01,
412        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40,
413        0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74,
414        0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x74, 0x65, 0x73, 0x74, 0x00, 0x69, 0x6e,
415        0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x33, 0x20, 0x28, 0x61,
416        0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x32, 0x29, 0xfd, 0x35, 0xbb,
417        0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x5d, 0x00, 0x00, 0x00, 0x0b, 0x04, 0x00, 0x00, 0x00,
418        0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00,
419        0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03,
420        0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c,
421        0x00, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x34,
422        0x20, 0x28, 0x61, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x32, 0x29,
423        0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x48, 0x00, 0x00, 0x00, 0x53, 0x04,
424        0x00, 0x00, 0x08, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00,
425        0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
426        0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d, 0x79,
427        0x73, 0x71, 0x6c, 0x00, 0x52, 0x4f, 0x4c, 0x4c, 0x42, 0x41, 0x43, 0x4b, 0xfd, 0x35, 0xbb,
428        0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x61, 0x00, 0x00, 0x00, 0xb4, 0x04, 0x00, 0x00, 0x00,
429        0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x1a, 0x00, 0x00,
430        0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03,
431        0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x74, 0x65, 0x73, 0x74, 0x00,
432        0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x20, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x74, 0x35,
433        0x28, 0x61, 0x20, 0x69, 0x6e, 0x74, 0x29, 0x20, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x3d,
434        0x20, 0x4e, 0x44, 0x42, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x62, 0x00,
435        0x00, 0x00, 0x16, 0x05, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
436        0x00, 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
437        0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00,
438        0x08, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x00, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x20,
439        0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x74, 0x36, 0x28, 0x61, 0x20, 0x69, 0x6e, 0x74, 0x29,
440        0x20, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x3d, 0x20, 0x4e, 0x44, 0x42, 0xfd, 0x35, 0xbb,
441        0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x45, 0x00, 0x00, 0x00, 0x5b, 0x05, 0x00, 0x00, 0x08,
442        0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00,
443        0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03,
444        0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c,
445        0x00, 0x42, 0x45, 0x47, 0x49, 0x4e, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00,
446        0x5c, 0x00, 0x00, 0x00, 0xb7, 0x05, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
447        0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x01, 0x00,
448        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04, 0x08, 0x00,
449        0x08, 0x00, 0x08, 0x00, 0x74, 0x65, 0x73, 0x74, 0x00, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74,
450        0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x35, 0x20, 0x28, 0x61, 0x29, 0x20, 0x76, 0x61,
451        0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x33, 0x29, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00,
452        0x00, 0x00, 0x5d, 0x00, 0x00, 0x00, 0x14, 0x06, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
453        0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00,
454        0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04,
455        0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x00, 0x69, 0x6e, 0x73,
456        0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x36, 0x20, 0x28, 0x61, 0x29,
457        0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x33, 0x29, 0xfd, 0x35, 0xbb, 0x4a,
458        0x02, 0x01, 0x00, 0x00, 0x00, 0x46, 0x00, 0x00, 0x00, 0x5a, 0x06, 0x00, 0x00, 0x08, 0x00,
459        0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00,
460        0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73,
461        0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x00,
462        0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0xfd, 0x35, 0xbb, 0x4a, 0x04, 0x01, 0x00, 0x00, 0x00,
463        0x2c, 0x00, 0x00, 0x00, 0x86, 0x06, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00,
464        0x00, 0x00, 0x00, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x2d, 0x62, 0x69, 0x6e, 0x2e, 0x30,
465        0x30, 0x30, 0x30, 0x30, 0x32,
466    ];
467
468    #[test]
469    fn binlog_file_header_roundtrip() -> io::Result<()> {
470        let mut output = Vec::new();
471
472        let binlog_file_header = BinlogFileHeader::read(BINLOG_FILE)?;
473        binlog_file_header.write(BinlogVersion::Version4, &mut output)?;
474
475        assert_eq!(&output[..], &BINLOG_FILE[..BinlogFileHeader::LEN]);
476
477        Ok(())
478    }
479
480    #[test]
481    fn binlog_file_iterator() -> io::Result<()> {
482        let binlog_file = BinlogFile::new(BinlogVersion::Version4, BINLOG_FILE)?;
483
484        let mut total = 0;
485        let mut ev_pos = 4;
486
487        for (i, ev) in binlog_file.enumerate() {
488            let data_start = ev_pos + BinlogEventHeader::LEN;
489            let ev = ev?;
490            match i {
491                0 => {
492                    assert_eq!(
493                        ev.header(),
494                        BinlogEventHeader::new(
495                            1253783036,
496                            EventType::FORMAT_DESCRIPTION_EVENT,
497                            1,
498                            94,
499                            98,
500                            EventFlags::empty()
501                        )
502                    )
503                }
504                1 => assert_eq!(
505                    ev.header(),
506                    BinlogEventHeader::new(
507                        1253783037,
508                        EventType::QUERY_EVENT,
509                        1,
510                        100,
511                        198,
512                        EventFlags::empty()
513                    )
514                ),
515                2 => assert_eq!(
516                    ev.header(),
517                    BinlogEventHeader::new(
518                        1253783037,
519                        EventType::QUERY_EVENT,
520                        1,
521                        101,
522                        299,
523                        EventFlags::empty()
524                    )
525                ),
526                3 => assert_eq!(
527                    ev.header(),
528                    BinlogEventHeader::new(
529                        1253783037,
530                        EventType::QUERY_EVENT,
531                        1,
532                        69,
533                        368,
534                        EventFlags::LOG_EVENT_SUPPRESS_USE_F
535                    )
536                ),
537                4 => assert_eq!(
538                    ev.header(),
539                    BinlogEventHeader::new(
540                        1253783037,
541                        EventType::QUERY_EVENT,
542                        1,
543                        92,
544                        460,
545                        EventFlags::empty()
546                    )
547                ),
548                5 => assert_eq!(
549                    ev.header(),
550                    BinlogEventHeader::new(
551                        1253783037,
552                        EventType::QUERY_EVENT,
553                        1,
554                        93,
555                        553,
556                        EventFlags::empty()
557                    )
558                ),
559                6 => assert_eq!(
560                    ev.header(),
561                    BinlogEventHeader::new(
562                        1253783037,
563                        EventType::XID_EVENT,
564                        1,
565                        27,
566                        580,
567                        EventFlags::empty()
568                    )
569                ),
570                7 => assert_eq!(
571                    ev.header(),
572                    BinlogEventHeader::new(
573                        1253783037,
574                        EventType::QUERY_EVENT,
575                        1,
576                        100,
577                        680,
578                        EventFlags::empty()
579                    )
580                ),
581                8 => assert_eq!(
582                    ev.header(),
583                    BinlogEventHeader::new(
584                        1253783037,
585                        EventType::QUERY_EVENT,
586                        1,
587                        101,
588                        781,
589                        EventFlags::empty()
590                    )
591                ),
592                9 => assert_eq!(
593                    ev.header(),
594                    BinlogEventHeader::new(
595                        1253783037,
596                        EventType::QUERY_EVENT,
597                        1,
598                        69,
599                        850,
600                        EventFlags::LOG_EVENT_SUPPRESS_USE_F
601                    )
602                ),
603                10 => assert_eq!(
604                    ev.header(),
605                    BinlogEventHeader::new(
606                        1253783037,
607                        EventType::QUERY_EVENT,
608                        1,
609                        92,
610                        942,
611                        EventFlags::empty()
612                    )
613                ),
614                11 => assert_eq!(
615                    ev.header(),
616                    BinlogEventHeader::new(
617                        1253783037,
618                        EventType::QUERY_EVENT,
619                        1,
620                        93,
621                        1035,
622                        EventFlags::empty()
623                    )
624                ),
625                12 => assert_eq!(
626                    ev.header(),
627                    BinlogEventHeader::new(
628                        1253783037,
629                        EventType::QUERY_EVENT,
630                        1,
631                        72,
632                        1107,
633                        EventFlags::LOG_EVENT_SUPPRESS_USE_F
634                    )
635                ),
636                13 => assert_eq!(
637                    ev.header(),
638                    BinlogEventHeader::new(
639                        1253783037,
640                        EventType::QUERY_EVENT,
641                        1,
642                        97,
643                        1204,
644                        EventFlags::empty()
645                    )
646                ),
647                14 => assert_eq!(
648                    ev.header(),
649                    BinlogEventHeader::new(
650                        1253783037,
651                        EventType::QUERY_EVENT,
652                        1,
653                        98,
654                        1302,
655                        EventFlags::empty()
656                    )
657                ),
658                15 => assert_eq!(
659                    ev.header(),
660                    BinlogEventHeader::new(
661                        1253783037,
662                        EventType::QUERY_EVENT,
663                        1,
664                        69,
665                        1371,
666                        EventFlags::LOG_EVENT_SUPPRESS_USE_F
667                    )
668                ),
669                16 => assert_eq!(
670                    ev.header(),
671                    BinlogEventHeader::new(
672                        1253783037,
673                        EventType::QUERY_EVENT,
674                        1,
675                        92,
676                        1463,
677                        EventFlags::empty()
678                    )
679                ),
680                17 => assert_eq!(
681                    ev.header(),
682                    BinlogEventHeader::new(
683                        1253783037,
684                        EventType::QUERY_EVENT,
685                        1,
686                        93,
687                        1556,
688                        EventFlags::empty()
689                    )
690                ),
691                18 => assert_eq!(
692                    ev.header(),
693                    BinlogEventHeader::new(
694                        1253783037,
695                        EventType::QUERY_EVENT,
696                        1,
697                        70,
698                        1626,
699                        EventFlags::LOG_EVENT_SUPPRESS_USE_F
700                    )
701                ),
702                19 => assert_eq!(
703                    ev.header(),
704                    BinlogEventHeader::new(
705                        1253783037,
706                        EventType::ROTATE_EVENT,
707                        1,
708                        44,
709                        1670,
710                        EventFlags::empty()
711                    )
712                ),
713                _ => panic!("too many"),
714            }
715
716            assert_eq!(
717                ev.data(),
718                &BINLOG_FILE[data_start
719                    ..(data_start + ev.header().event_size() as usize - BinlogEventHeader::LEN)],
720            );
721
722            total += 1;
723            ev_pos = ev.header().log_pos() as usize;
724        }
725
726        assert_eq!(total, 20);
727        Ok(())
728    }
729
730    #[test]
731    fn binlog_event_roundtrip() -> io::Result<()> {
732        const PATH: &str = "./test-data/binlogs";
733
734        let binlogs = std::fs::read_dir(PATH)?
735            .filter_map(|path| path.ok())
736            .map(|entry| entry.path())
737            .filter(|path| path.file_name().is_some());
738
739        'outer: for file_path in binlogs {
740            let file_data = std::fs::read(dbg!(&file_path))?;
741            let mut binlog_file = BinlogFile::new(BinlogVersion::Version4, &file_data[..])?;
742
743            let mut i = 0;
744            let mut ev_pos = 4;
745            let mut table_map_events = HashMap::new();
746
747            while let Some(ev) = binlog_file.next() {
748                i += 1;
749                let ev = ev?;
750                let _ = dbg!(ev.header().event_type());
751                let ev_end = ev_pos + ev.header().event_size() as usize;
752                let binlog_version = binlog_file.reader.fde.binlog_version();
753
754                let mut output = Vec::new();
755                ev.write(binlog_version, &mut output)?;
756
757                let event = match ev.read_data() {
758                    Ok(event) => {
759                        let event = match event {
760                            Some(e) => e,
761                            None => {
762                                if file_path.file_name().unwrap() == "mariadb-bin.000001" {
763                                    continue;
764                                } else {
765                                    dbg!(&ev);
766                                    panic!();
767                                }
768                            }
769                        };
770                        match event {
771                            EventData::TableMapEvent(ref ev) => {
772                                // store table maps for later use
773                                table_map_events.insert(ev.table_id(), ev.clone().into_owned());
774
775                                event
776                            }
777                            EventData::RowsEvent(ref rows_event) => {
778                                // iterate rows in a rows event
779                                let table_map_event =
780                                    binlog_file.reader().get_tme(rows_event.table_id()).unwrap();
781                                for row in rows_event.rows(table_map_event) {
782                                    let _row = row.unwrap();
783                                    if file_path.file_name().unwrap() == "mariadb-bin.000001" {
784                                        // should parse metadata for `binlog_row_metadata=FULL`
785                                        let after = _row.1.as_ref().unwrap();
786                                        let columns = after.columns_ref();
787
788                                        for col in columns.iter() {
789                                            assert_eq!(col.schema_ref(), b"toddy_test");
790                                            assert_eq!(col.table_ref(), b"outbox");
791                                            assert_eq!(col.org_table_ref(), b"outbox");
792                                        }
793
794                                        for (col, col_name) in columns.iter().zip([
795                                            "id",
796                                            "topic",
797                                            "event_type",
798                                            "event",
799                                            "created",
800                                        ]) {
801                                            assert_eq!(col.name_ref(), col_name.as_bytes());
802                                        }
803
804                                        for (col, f) in columns.iter().zip(
805                                            once(ColumnFlags::PRI_KEY_FLAG)
806                                                .chain(repeat(ColumnFlags::empty())),
807                                        ) {
808                                            assert_eq!(col.flags(), f);
809                                        }
810
811                                        for (col, charset) in columns.iter().zip([
812                                            CollationId::UNKNOWN_COLLATION_ID,
813                                            CollationId::UTF8MB4_GENERAL_CI,
814                                            CollationId::UTF8MB4_GENERAL_CI,
815                                            CollationId::BINARY,
816                                            CollationId::UNKNOWN_COLLATION_ID,
817                                        ]) {
818                                            assert_eq!(col.character_set(), charset as u16);
819                                        }
820                                    }
821                                }
822
823                                event
824                            }
825                            _ => event,
826                        }
827                    }
828                    Err(err)
829                        if err.kind() == std::io::ErrorKind::Other
830                            && ev.header().event_type() == Ok(EventType::XID_EVENT)
831                            && ev.header().event_size() == 0x26
832                            && file_path.file_name().unwrap() == "ver_5_1-wl2325_r.001" =>
833                    {
834                        // ver_5_1-wl2325_r.001 testfile contains broken xid event.
835                        continue 'outer;
836                    }
837                    Err(err)
838                        if err.kind() == std::io::ErrorKind::UnexpectedEof
839                            && ev.header().event_type() == Ok(EventType::QUERY_EVENT)
840                            && ev.header().event_size() == 171
841                            && file_path.file_name().unwrap() == "corrupt-relay-bin.000624" =>
842                    {
843                        // corrupt-relay-bin.000624 testfile contains broken query event.
844                        continue 'outer;
845                    }
846                    other => other.transpose().unwrap()?,
847                };
848
849                if file_path.file_name().unwrap() == "binlog-invisible-columns.000001" {
850                    if let Some(EventData::TableMapEvent(ev)) = ev.read_data().unwrap() {
851                        let optional_meta = ev.iter_optional_meta();
852                        for meta in optional_meta {
853                            meta.unwrap();
854                        }
855                    }
856                }
857
858                if file_path.file_name().unwrap() == "json-opaque.binlog" {
859                    let event_data = ev.read_data().unwrap();
860
861                    /// Extracts first column of the binlog row after-image as a Jsonb::Value
862                    /// then parses it into the structured representation and compares with
863                    /// the expected value.
864                    macro_rules! extract_cmp {
865                        ($row:expr, $expected:tt) => {
866                            let mut after = $row.1.unwrap().unwrap();
867                            let a = dbg!(after.pop().unwrap());
868                            let super::value::BinlogValue::Jsonb(a) = a else {
869                                panic!("BinlogValue::Jsonb(_) expected");
870                            };
871                            assert_eq!(
872                                serde_json::json!($expected),
873                                serde_json::Value::from(a.parse().unwrap())
874                            );
875                        };
876                    }
877
878                    match event_data {
879                        Some(EventData::RowsEvent(ev)) if i == 10 => {
880                            let table_map_event =
881                                binlog_file.reader().get_tme(ev.table_id()).unwrap();
882                            let mut rows = ev.rows(table_map_event);
883                            extract_cmp!(rows.next().unwrap().unwrap(), {"a": "base64:type15:VQ=="});
884                        }
885                        Some(EventData::RowsEvent(ev)) if i == 12 => {
886                            let table_map_event =
887                                binlog_file.reader().get_tme(ev.table_id()).unwrap();
888                            let mut rows = ev.rows(table_map_event);
889                            extract_cmp!(rows.next().unwrap().unwrap(), {"b": "2012-03-18"});
890                        }
891                        Some(EventData::RowsEvent(ev)) if i == 14 => {
892                            let table_map_event =
893                                binlog_file.reader().get_tme(ev.table_id()).unwrap();
894                            let mut rows = ev.rows(table_map_event);
895                            extract_cmp!(rows.next().unwrap().unwrap(), {"c": "2012-03-18 11:30:45.000000"});
896                        }
897                        Some(EventData::RowsEvent(ev)) if i == 16 => {
898                            let table_map_event =
899                                binlog_file.reader().get_tme(ev.table_id()).unwrap();
900                            let mut rows = ev.rows(table_map_event);
901                            extract_cmp!(rows.next().unwrap().unwrap(), {"c": "87:31:46.654321"});
902                        }
903                        Some(EventData::RowsEvent(ev)) if i == 18 => {
904                            let table_map_event =
905                                binlog_file.reader().get_tme(ev.table_id()).unwrap();
906                            let mut rows = ev.rows(table_map_event);
907                            extract_cmp!(rows.next().unwrap().unwrap(), {"d": "123.456"});
908                        }
909                        Some(EventData::RowsEvent(ev)) if i == 20 => {
910                            let table_map_event =
911                                binlog_file.reader().get_tme(ev.table_id()).unwrap();
912                            let mut rows = ev.rows(table_map_event);
913                            extract_cmp!(rows.next().unwrap().unwrap(), {"e": "9.00"});
914                        }
915                        Some(EventData::RowsEvent(ev)) if i == 22 => {
916                            let table_map_event =
917                                binlog_file.reader().get_tme(ev.table_id()).unwrap();
918                            let mut rows = ev.rows(table_map_event);
919                            extract_cmp!(rows.next().unwrap().unwrap(), {"e": [0, 1, true, false]});
920                        }
921                        Some(EventData::RowsEvent(ev)) if i == 24 => {
922                            let table_map_event =
923                                binlog_file.reader().get_tme(ev.table_id()).unwrap();
924                            let mut rows = ev.rows(table_map_event);
925                            extract_cmp!(rows.next().unwrap().unwrap(), {"e": null});
926                        }
927                        Some(EventData::RowsEvent(ev)) => {
928                            panic!("no more events expected i={}, {:?}", i, ev);
929                        }
930                        _ => (),
931                    }
932                }
933
934                if file_path.file_name().unwrap() == "vector.binlog" {
935                    let event_data = ev.read_data().unwrap();
936                    match event_data {
937                        Some(EventData::TableMapEvent(ev)) => {
938                            let optional_meta = ev.iter_optional_meta();
939                            match ev.table_name().as_ref() {
940                                "foo" => {
941                                    for meta in optional_meta {
942                                        match meta.unwrap() {
943                                            OptionalMetadataField::Dimensionality(x) => assert_eq!(
944                                                x.iter_dimensionalities()
945                                                    .collect::<Result<Vec<_>, _>>()
946                                                    .unwrap(),
947                                                vec![3],
948                                            ),
949                                            _ => (),
950                                        }
951                                    }
952                                }
953                                "bar" => {
954                                    for meta in optional_meta {
955                                        match meta.unwrap() {
956                                            OptionalMetadataField::Dimensionality(x) => assert_eq!(
957                                                x.iter_dimensionalities()
958                                                    .collect::<Result<Vec<_>, _>>()
959                                                    .unwrap(),
960                                                vec![2, 4],
961                                            ),
962                                            _ => (),
963                                        }
964                                    }
965                                }
966                                _ => (),
967                            }
968                        }
969                        Some(EventData::RowsEvent(ev)) if i == 12 => {
970                            let table_map_event =
971                                binlog_file.reader().get_tme(ev.table_id()).unwrap();
972                            let mut rows = ev.rows(table_map_event);
973
974                            let (None, Some(after)) = rows.next().unwrap().unwrap() else {
975                                panic!("Unexpected data");
976                            };
977                            let (id, vector_column): (u8, Vec<u8>) =
978                                from_row(crate::Row::try_from(after).unwrap());
979                            assert_eq!(id, 1);
980                            assert_eq!(
981                                vector_column,
982                                vec![205, 204, 140, 63, 205, 204, 12, 64, 51, 51, 83, 64]
983                            );
984
985                            let (None, Some(after)) = rows.next().unwrap().unwrap() else {
986                                panic!("Unexpected data");
987                            };
988                            let (id, vector_column): (u8, Vec<u8>) =
989                                from_row(crate::Row::try_from(after).unwrap());
990                            assert_eq!(id, 2);
991                            assert_eq!(
992                                vector_column,
993                                vec![0, 0, 128, 63, 0, 0, 128, 191, 0, 0, 0, 0]
994                            );
995                        }
996                        _ => (),
997                    }
998                }
999
1000                if file_path.file_name().unwrap() == "mysql-enum-string-set.000001" {
1001                    if let Some(EventData::RowsEvent(data)) = ev.read_data().unwrap() {
1002                        let table_map_event =
1003                            binlog_file.reader().get_tme(data.table_id()).unwrap();
1004                        for row in data.rows(table_map_event) {
1005                            let (before, after) = row.unwrap();
1006                            match data {
1007                                RowsEventData::WriteRowsEvent(_) => {
1008                                    assert!(before.is_none());
1009                                    let after = after.unwrap().unwrap();
1010                                    let mut j = 0;
1011                                    for v in after {
1012                                        j += 1;
1013                                        match j {
1014                                            1 => assert_eq!(v, BinlogValue::Value("0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789".into())),
1015                                            2 => assert_eq!(v, BinlogValue::Value("0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789".into())),
1016                                            3 => assert_eq!(v, BinlogValue::Value(1_i8.into())),
1017                                            4 => assert_eq!(v, BinlogValue::Value([0b00000101_u8].into())),
1018                                            5 => assert_eq!(v, BinlogValue::Value("0123456789".into())),
1019
1020                                            _ => panic!(),
1021                                        }
1022                                    }
1023                                    assert_eq!(j, 5);
1024                                }
1025                                RowsEventData::UpdateRowsEvent(_) => {
1026                                    let before = before.unwrap().unwrap();
1027                                    let mut j = 0;
1028                                    for v in before {
1029                                        j += 1;
1030                                        match j {
1031                                            1 => assert_eq!(v, BinlogValue::Value("0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789".into())),
1032                                            2 => assert_eq!(v, BinlogValue::Value("0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789".into())),
1033                                            3 => assert_eq!(v, BinlogValue::Value(1_i8.into())),
1034                                            4 => assert_eq!(v, BinlogValue::Value([0b00000101_u8].into())),
1035                                            5 => assert_eq!(v, BinlogValue::Value("0123456789".into())),
1036
1037                                            _ => panic!(),
1038                                        }
1039                                    }
1040                                    assert_eq!(j, 5);
1041
1042                                    let after = after.unwrap().unwrap();
1043                                    let mut j = 0;
1044                                    for v in after {
1045                                        j += 1;
1046                                        match j {
1047                                            1 => assert_eq!(v, BinlogValue::Value("field1".into())),
1048                                            2 => assert_eq!(v, BinlogValue::Value("field_2".into())),
1049                                            3 => assert_eq!(v, BinlogValue::Value(2_i8.into())),
1050                                            4 => assert_eq!(v, BinlogValue::Value([0b00001010_u8].into())),
1051                                            5 => assert_eq!(v, BinlogValue::Value("0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789".into())),
1052                                            _ => panic!(),
1053                                        }
1054                                    }
1055                                    assert_eq!(j, 5);
1056                                }
1057                                RowsEventData::DeleteRowsEvent(_) => {
1058                                    assert!(after.is_none());
1059
1060                                    let before = before.unwrap().unwrap();
1061                                    let mut j = 0;
1062                                    for v in before {
1063                                        j += 1;
1064                                        match j {
1065                                            1 => assert_eq!(v, BinlogValue::Value("field1".into())),
1066                                            2 => assert_eq!(v, BinlogValue::Value("field_2".into())),
1067                                            3 => assert_eq!(v, BinlogValue::Value(2_i8.into())),
1068                                            4 => assert_eq!(v, BinlogValue::Value([0b00001010_u8].into())),
1069                                            5 => assert_eq!(v, BinlogValue::Value("0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789".into())),
1070                                            _ => panic!(),
1071                                        }
1072                                    }
1073                                    assert_eq!(j, 5);
1074                                }
1075                                _ => panic!(),
1076                            }
1077                        }
1078                    }
1079                }
1080
1081                if file_path.file_name().unwrap() == "mariadb-bin.000001" {
1082                    // Extraneous bytes in RotateEvent file name
1083                    // https://github.com/blackbeam/mysql_async/issues/189
1084                    if let Some(EventData::RotateEvent(ev)) = ev.read_data().unwrap() {
1085                        assert_ne!(ev.name_raw(), b"mariadb-bin.000001");
1086                    }
1087                }
1088
1089                if file_path.file_name().unwrap() == "mysql_type_bit.000001" {
1090                    if let Some(EventData::RowsEvent(ev)) = ev.read_data().unwrap() {
1091                        let table_map_event = binlog_file.reader().get_tme(ev.table_id()).unwrap();
1092                        for row in ev.rows(table_map_event) {
1093                            let (before, after) = row.unwrap();
1094                            assert_eq!(before, None);
1095                            assert_eq!(
1096                                after.unwrap().unwrap(),
1097                                vec![
1098                                    BinlogValue::Value(Value::Bytes(vec![0b100])),
1099                                    BinlogValue::Value(Value::Bytes(b"foo".to_vec())),
1100                                    BinlogValue::Value(Value::Bytes(vec![0b100000])),
1101                                ],
1102                            );
1103                        }
1104                    }
1105                }
1106
1107                if file_path.file_name().unwrap() != "mariadb-bin.000001" {
1108                    assert_eq!(output, &file_data[ev_pos..ev_end]);
1109                }
1110
1111                if file_path.file_name().unwrap() == "transaction_compression.000001" {
1112                    if let Some(EventData::TransactionPayloadEvent(data)) = ev.read_data().unwrap()
1113                    {
1114                        let mut payload = data.decompressed()?;
1115                        let reader = binlog_file.reader_mut();
1116
1117                        let mut binlog_ev = reader.read_decompressed(&mut payload)?.unwrap();
1118                        assert_eq!(binlog_ev.header().event_type(), Ok(EventType::QUERY_EVENT));
1119
1120                        binlog_ev = reader.read_decompressed(&mut payload)?.unwrap();
1121                        assert_eq!(
1122                            binlog_ev.header().event_type(),
1123                            Ok(EventType::TABLE_MAP_EVENT)
1124                        );
1125
1126                        binlog_ev = reader.read_decompressed(&mut payload)?.unwrap();
1127                        assert_eq!(
1128                            binlog_ev.header().event_type(),
1129                            Ok(EventType::WRITE_ROWS_EVENT)
1130                        );
1131
1132                        binlog_ev = reader.read_decompressed(&mut payload)?.unwrap();
1133                        assert_eq!(binlog_ev.header().event_type(), Ok(EventType::XID_EVENT));
1134                        assert!(reader.read_decompressed(&mut payload)?.is_none());
1135                    }
1136                }
1137                output = Vec::new();
1138                event.serialize(&mut output);
1139
1140                if matches!(event, EventData::UserVarEvent(_)) {
1141                    // Server may or may not write the flags field, but we will always write it.
1142                    assert_eq!(&output[..ev.data().len()], ev.data());
1143                    assert!(output.len() == ev.data().len() || output.len() == ev.data().len() + 1);
1144                } else if (matches!(event, EventData::GtidEvent(_))
1145                    || matches!(event, EventData::AnonymousGtidEvent(_)))
1146                    && ev.fde().split_version() < (5, 7, 0)
1147                {
1148                    // MySql 5.6 does not write TS_TYPE and following post-header fields
1149                    assert_eq!(&output[..GtidEvent::POST_HEADER_LENGTH - 1 - 16], ev.data());
1150                } else if (matches!(event, EventData::GtidEvent(_))
1151                    || matches!(event, EventData::AnonymousGtidEvent(_)))
1152                    && ev.fde().split_version() < (5, 8, 0)
1153                {
1154                    // MySql 5.7 contains only post-header in this event
1155                    assert_eq!(&output[..GtidEvent::POST_HEADER_LENGTH], ev.data());
1156                } else {
1157                    assert_eq!(output, ev.data());
1158                }
1159
1160                ev_pos = ev_end;
1161            }
1162        }
1163
1164        Ok(())
1165    }
1166}