1use std::{
18 collections::HashMap,
19 convert::TryFrom,
20 hash::Hash,
21 io::{
22 self, BufRead, Error,
23 ErrorKind::{InvalidData, UnexpectedEof},
24 Read, Write,
25 },
26};
27
28use crate::{
29 constants::ColumnType,
30 proto::{MyDeserialize, MySerialize},
31};
32
33#[allow(unused)]
34use self::events::TransactionPayloadEvent;
35
36use self::{
37 consts::{BinlogVersion, EventType},
38 events::{Event, FormatDescriptionEvent, RotateEvent, TableMapEvent},
39};
40
41pub mod consts;
42pub mod decimal;
43pub mod events;
44pub mod jsonb;
45pub mod jsondiff;
46pub mod misc;
47pub mod row;
48pub mod time;
49pub mod value;
50
51pub struct BinlogCtx<'a> {
52 pub event_size: usize,
53 pub fde: &'a FormatDescriptionEvent<'a>,
54}
55
56impl<'a> BinlogCtx<'a> {
57 pub fn new(event_size: usize, fde: &'a FormatDescriptionEvent<'a>) -> Self {
58 Self { event_size, fde }
59 }
60}
61
62pub trait BinlogStruct<'a>: MySerialize + MyDeserialize<'a, Ctx = BinlogCtx<'a>> {
64 fn len(&self, version: BinlogVersion) -> usize;
68}
69
70pub trait BinlogEvent<'a>: BinlogStruct<'a> {
71 const EVENT_TYPE: EventType;
73}
74
75#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
77pub struct BinlogFileHeader;
78
79impl BinlogFileHeader {
80 pub const LEN: usize = 4;
82 pub const VALUE: [u8; Self::LEN] = [0xfe, b'b', b'i', b'n'];
84
85 pub fn read<T: Read>(mut input: T) -> io::Result<Self> {
86 let mut buf = [0_u8; Self::LEN];
87 input.read_exact(&mut buf)?;
88
89 if buf != Self::VALUE {
90 return Err(Error::new(InvalidData, "invalid binlog file header"));
91 }
92
93 Ok(Self)
94 }
95
96 pub fn write<T: Write>(&self, _version: BinlogVersion, mut output: T) -> io::Result<()> {
97 output.write_all(&Self::VALUE)
98 }
99
100 pub fn len(&self, _version: BinlogVersion) -> usize {
101 Self::LEN
102 }
103}
104
105#[derive(Debug)]
112pub struct EventStreamReader {
113 fde: FormatDescriptionEvent<'static>,
114 table_map: HashMap<u64, TableMapEvent<'static>>,
115}
116
117impl EventStreamReader {
118 pub fn new(version: BinlogVersion) -> Self {
120 Self {
121 fde: FormatDescriptionEvent::new(version),
122 table_map: Default::default(),
123 }
124 }
125
126 pub fn get_fde(&self) -> &FormatDescriptionEvent<'static> {
130 &self.fde
131 }
132
133 pub(crate) fn set_checksum_enabled(&mut self, enabled: bool) {
137 self.fde.footer_mut().set_checksum_enabled(enabled);
138 }
139
140 pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> {
144 self.table_map.get(&table_id)
145 }
146
147 pub fn read<T: BufRead>(&mut self, mut input: T) -> io::Result<Option<Event>> {
156 if input.fill_buf().map(|x| x.is_empty())? {
157 return Ok(None);
158 }
159
160 let event = Event::read(&self.fde, input)?;
161
162 self.handle_event(&event)?;
163
164 Ok(Some(event))
165 }
166
167 pub fn read_decompressed<T: BufRead>(&mut self, input: T) -> io::Result<Option<Event>> {
183 self.set_checksum_enabled(false);
184 let result = self.read(input);
185 self.set_checksum_enabled(true);
186 let Some(event) = result? else {
187 return Ok(None);
188 };
189
190 if event.header().event_type_raw() == EventType::TRANSACTION_PAYLOAD_EVENT as u8 {
191 return Err(io::Error::new(
192 io::ErrorKind::Other,
193 "TRANSACTION_PAYLOAD_EVENT encountered",
194 ));
195 }
196
197 self.handle_event(&event)?;
198
199 Ok(Some(event))
200 }
201
202 fn handle_event(&mut self, event: &Event) -> io::Result<()> {
203 let event_type = event.header().event_type_raw();
204
205 if event_type == EventType::FORMAT_DESCRIPTION_EVENT as u8 {
206 let fde = event.read_event::<FormatDescriptionEvent>()?;
208 self.fde = fde.into_owned().with_footer(event.footer());
209 } else if event_type == EventType::TABLE_MAP_EVENT as u8 {
210 let tme = event.read_event::<TableMapEvent>()?;
212 self.table_map.insert(tme.table_id(), tme.into_owned());
213 } else if event_type == EventType::ROTATE_EVENT as u8 {
214 const TABLE_MAP_MAX_SIZE: usize = 64;
218
219 let re = event.read_event::<RotateEvent>()?;
220 if !re.is_fake() {
221 self.table_map.clear();
222 self.table_map.shrink_to(TABLE_MAP_MAX_SIZE);
223 }
224 }
225
226 Ok(())
227 }
228}
229
230#[derive(Debug)]
234pub struct BinlogFile<T> {
235 reader: EventStreamReader,
236 read: T,
237}
238
239impl<T: BufRead> BinlogFile<T> {
240 pub fn new(version: BinlogVersion, mut read: T) -> io::Result<Self> {
244 let reader = EventStreamReader::new(version);
245 BinlogFileHeader::read(&mut read)?;
246 Ok(Self { reader, read })
247 }
248
249 pub fn reader(&self) -> &EventStreamReader {
251 &self.reader
252 }
253
254 pub fn reader_mut(&mut self) -> &mut EventStreamReader {
256 &mut self.reader
257 }
258}
259
260impl<T: BufRead> Iterator for BinlogFile<T> {
261 type Item = io::Result<Event>;
262
263 fn next(&mut self) -> Option<Self::Item> {
264 match self.reader.read(&mut self.read) {
265 Ok(event) => event.map(Ok),
266 Err(err) if err.kind() == UnexpectedEof => None,
267 Err(err) => Some(Err(err)),
268 }
269 }
270}
271
272impl ColumnType {
273 fn get_metadata<'a>(&self, ptr: &'a [u8], is_array: bool) -> Option<(&'a [u8], usize)> {
278 match self {
279 Self::MYSQL_TYPE_TINY_BLOB
280 | Self::MYSQL_TYPE_BLOB
281 | Self::MYSQL_TYPE_MEDIUM_BLOB
282 | Self::MYSQL_TYPE_LONG_BLOB
283 | Self::MYSQL_TYPE_DOUBLE
284 | Self::MYSQL_TYPE_FLOAT
285 | Self::MYSQL_TYPE_GEOMETRY
286 | Self::MYSQL_TYPE_TIME2
287 | Self::MYSQL_TYPE_DATETIME2
288 | Self::MYSQL_TYPE_TIMESTAMP2
289 | Self::MYSQL_TYPE_JSON
290 | Self::MYSQL_TYPE_VECTOR => ptr.get(..1).map(|x| (x, 1)),
291 Self::MYSQL_TYPE_VARCHAR => {
292 if is_array {
293 ptr.get(..3).map(|x| (x, 3))
294 } else {
295 ptr.get(..2).map(|x| (x, 2))
296 }
297 }
298 Self::MYSQL_TYPE_NEWDECIMAL
299 | Self::MYSQL_TYPE_SET
300 | Self::MYSQL_TYPE_ENUM
301 | Self::MYSQL_TYPE_STRING
302 | Self::MYSQL_TYPE_BIT => ptr.get(..2).map(|x| (x, 2)),
303 Self::MYSQL_TYPE_TYPED_ARRAY => Self::try_from(*ptr.first()?)
304 .ok()?
305 .get_metadata(ptr.get(1..)?, true)
306 .map(|(x, n)| (x, n + 1)),
307 Self::MYSQL_TYPE_DECIMAL
308 | Self::MYSQL_TYPE_TINY
309 | Self::MYSQL_TYPE_SHORT
310 | Self::MYSQL_TYPE_LONG
311 | Self::MYSQL_TYPE_NULL
312 | Self::MYSQL_TYPE_TIMESTAMP
313 | Self::MYSQL_TYPE_LONGLONG
314 | Self::MYSQL_TYPE_INT24
315 | Self::MYSQL_TYPE_DATE
316 | Self::MYSQL_TYPE_TIME
317 | Self::MYSQL_TYPE_DATETIME
318 | Self::MYSQL_TYPE_YEAR
319 | Self::MYSQL_TYPE_NEWDATE
320 | Self::MYSQL_TYPE_UNKNOWN
321 | Self::MYSQL_TYPE_VAR_STRING => Some((&[], 0)),
322 }
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use std::{
329 collections::HashMap,
330 convert::TryFrom,
331 io,
332 iter::{once, repeat},
333 };
334
335 use super::{
336 consts::{EventFlags, EventType},
337 events::{BinlogEventHeader, EventData, GtidEvent},
338 BinlogFile, BinlogFileHeader, BinlogVersion,
339 };
340
341 use crate::{
342 binlog::{
343 events::{OptionalMetadataField, RowsEventData},
344 value::BinlogValue,
345 },
346 collations::CollationId,
347 constants::ColumnFlags,
348 proto::MySerialize,
349 row::convert::from_row,
350 value::Value,
351 };
352
353 const BINLOG_FILE: &[u8] = &[
354 0xfe, 0x62, 0x69, 0x6e, 0xfc, 0x35, 0xbb, 0x4a, 0x0f, 0x01, 0x00, 0x00, 0x00, 0x5e, 0x00,
355 0x00, 0x00, 0x62, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x35, 0x2e, 0x30, 0x2e, 0x38,
356 0x36, 0x2d, 0x64, 0x65, 0x62, 0x75, 0x67, 0x2d, 0x6c, 0x6f, 0x67, 0x00, 0x00, 0x00, 0x00,
357 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
358 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
359 0xfc, 0x35, 0xbb, 0x4a, 0x13, 0x38, 0x0d, 0x00, 0x08, 0x00, 0x12, 0x00, 0x04, 0x04, 0x04,
360 0x04, 0x12, 0x00, 0x00, 0x4b, 0x00, 0x04, 0x1a, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00,
361 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0xc6, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
362 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00,
363 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04,
364 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x74, 0x65, 0x73, 0x74, 0x00, 0x63, 0x72, 0x65, 0x61,
365 0x74, 0x65, 0x20, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x74, 0x31, 0x28, 0x61, 0x20, 0x69,
366 0x6e, 0x74, 0x29, 0x20, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x3d, 0x20, 0x69, 0x6e, 0x6e,
367 0x6f, 0x64, 0x62, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x65, 0x00, 0x00,
368 0x00, 0x2b, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
369 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
370 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08,
371 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x00, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x20, 0x74,
372 0x61, 0x62, 0x6c, 0x65, 0x20, 0x74, 0x32, 0x28, 0x61, 0x20, 0x69, 0x6e, 0x74, 0x29, 0x20,
373 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x3d, 0x20, 0x69, 0x6e, 0x6e, 0x6f, 0x64, 0x62, 0xfd,
374 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x45, 0x00, 0x00, 0x00, 0x70, 0x01, 0x00,
375 0x00, 0x08, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x1a,
376 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
377 0x06, 0x03, 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d, 0x79, 0x73,
378 0x71, 0x6c, 0x00, 0x42, 0x45, 0x47, 0x49, 0x4e, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00,
379 0x00, 0x00, 0x5c, 0x00, 0x00, 0x00, 0xcc, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
380 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00,
381 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04,
382 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x74, 0x65, 0x73, 0x74, 0x00, 0x69, 0x6e, 0x73, 0x65,
383 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x31, 0x20, 0x28, 0x61, 0x29, 0x20,
384 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x31, 0x29, 0xfd, 0x35, 0xbb, 0x4a, 0x02,
385 0x01, 0x00, 0x00, 0x00, 0x5d, 0x00, 0x00, 0x00, 0x29, 0x02, 0x00, 0x00, 0x00, 0x00, 0x01,
386 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40,
387 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74,
388 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x00, 0x69,
389 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x32, 0x20, 0x28,
390 0x61, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x31, 0x29, 0xfd, 0x35,
391 0xbb, 0x4a, 0x10, 0x01, 0x00, 0x00, 0x00, 0x1b, 0x00, 0x00, 0x00, 0x44, 0x02, 0x00, 0x00,
392 0x00, 0x00, 0x0b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xfd, 0x35, 0xbb, 0x4a, 0x02,
393 0x01, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0xa8, 0x02, 0x00, 0x00, 0x00, 0x00, 0x01,
394 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40,
395 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74,
396 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x74, 0x65, 0x73, 0x74, 0x00, 0x63, 0x72,
397 0x65, 0x61, 0x74, 0x65, 0x20, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x74, 0x33, 0x28, 0x61,
398 0x20, 0x69, 0x6e, 0x74, 0x29, 0x20, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x3d, 0x20, 0x69,
399 0x6e, 0x6e, 0x6f, 0x64, 0x62, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x65,
400 0x00, 0x00, 0x00, 0x0d, 0x03, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00,
401 0x00, 0x00, 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00,
402 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08,
403 0x00, 0x08, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x00, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65,
404 0x20, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x74, 0x34, 0x28, 0x61, 0x20, 0x69, 0x6e, 0x74,
405 0x29, 0x20, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x3d, 0x20, 0x6d, 0x79, 0x69, 0x73, 0x61,
406 0x6d, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x45, 0x00, 0x00, 0x00, 0x52,
407 0x03, 0x00, 0x00, 0x08, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00,
408 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
409 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d,
410 0x79, 0x73, 0x71, 0x6c, 0x00, 0x42, 0x45, 0x47, 0x49, 0x4e, 0xfd, 0x35, 0xbb, 0x4a, 0x02,
411 0x01, 0x00, 0x00, 0x00, 0x5c, 0x00, 0x00, 0x00, 0xae, 0x03, 0x00, 0x00, 0x00, 0x00, 0x01,
412 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40,
413 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74,
414 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x74, 0x65, 0x73, 0x74, 0x00, 0x69, 0x6e,
415 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x33, 0x20, 0x28, 0x61,
416 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x32, 0x29, 0xfd, 0x35, 0xbb,
417 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x5d, 0x00, 0x00, 0x00, 0x0b, 0x04, 0x00, 0x00, 0x00,
418 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00,
419 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03,
420 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c,
421 0x00, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x34,
422 0x20, 0x28, 0x61, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x32, 0x29,
423 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x48, 0x00, 0x00, 0x00, 0x53, 0x04,
424 0x00, 0x00, 0x08, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00,
425 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
426 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d, 0x79,
427 0x73, 0x71, 0x6c, 0x00, 0x52, 0x4f, 0x4c, 0x4c, 0x42, 0x41, 0x43, 0x4b, 0xfd, 0x35, 0xbb,
428 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x61, 0x00, 0x00, 0x00, 0xb4, 0x04, 0x00, 0x00, 0x00,
429 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x1a, 0x00, 0x00,
430 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03,
431 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x74, 0x65, 0x73, 0x74, 0x00,
432 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x20, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x74, 0x35,
433 0x28, 0x61, 0x20, 0x69, 0x6e, 0x74, 0x29, 0x20, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x3d,
434 0x20, 0x4e, 0x44, 0x42, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x62, 0x00,
435 0x00, 0x00, 0x16, 0x05, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
436 0x00, 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
437 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00,
438 0x08, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x00, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x20,
439 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x74, 0x36, 0x28, 0x61, 0x20, 0x69, 0x6e, 0x74, 0x29,
440 0x20, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x3d, 0x20, 0x4e, 0x44, 0x42, 0xfd, 0x35, 0xbb,
441 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00, 0x45, 0x00, 0x00, 0x00, 0x5b, 0x05, 0x00, 0x00, 0x08,
442 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00,
443 0x00, 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03,
444 0x73, 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c,
445 0x00, 0x42, 0x45, 0x47, 0x49, 0x4e, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00, 0x00, 0x00,
446 0x5c, 0x00, 0x00, 0x00, 0xb7, 0x05, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
447 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x01, 0x00,
448 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04, 0x08, 0x00,
449 0x08, 0x00, 0x08, 0x00, 0x74, 0x65, 0x73, 0x74, 0x00, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74,
450 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x35, 0x20, 0x28, 0x61, 0x29, 0x20, 0x76, 0x61,
451 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x33, 0x29, 0xfd, 0x35, 0xbb, 0x4a, 0x02, 0x01, 0x00,
452 0x00, 0x00, 0x5d, 0x00, 0x00, 0x00, 0x14, 0x06, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
453 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00,
454 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73, 0x74, 0x64, 0x04,
455 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x00, 0x69, 0x6e, 0x73,
456 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x36, 0x20, 0x28, 0x61, 0x29,
457 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x33, 0x29, 0xfd, 0x35, 0xbb, 0x4a,
458 0x02, 0x01, 0x00, 0x00, 0x00, 0x46, 0x00, 0x00, 0x00, 0x5a, 0x06, 0x00, 0x00, 0x08, 0x00,
459 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x1a, 0x00, 0x00, 0x00,
460 0x40, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x03, 0x73,
461 0x74, 0x64, 0x04, 0x08, 0x00, 0x08, 0x00, 0x08, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x00,
462 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0xfd, 0x35, 0xbb, 0x4a, 0x04, 0x01, 0x00, 0x00, 0x00,
463 0x2c, 0x00, 0x00, 0x00, 0x86, 0x06, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00,
464 0x00, 0x00, 0x00, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x2d, 0x62, 0x69, 0x6e, 0x2e, 0x30,
465 0x30, 0x30, 0x30, 0x30, 0x32,
466 ];
467
468 #[test]
469 fn binlog_file_header_roundtrip() -> io::Result<()> {
470 let mut output = Vec::new();
471
472 let binlog_file_header = BinlogFileHeader::read(BINLOG_FILE)?;
473 binlog_file_header.write(BinlogVersion::Version4, &mut output)?;
474
475 assert_eq!(&output[..], &BINLOG_FILE[..BinlogFileHeader::LEN]);
476
477 Ok(())
478 }
479
480 #[test]
481 fn binlog_file_iterator() -> io::Result<()> {
482 let binlog_file = BinlogFile::new(BinlogVersion::Version4, BINLOG_FILE)?;
483
484 let mut total = 0;
485 let mut ev_pos = 4;
486
487 for (i, ev) in binlog_file.enumerate() {
488 let data_start = ev_pos + BinlogEventHeader::LEN;
489 let ev = ev?;
490 match i {
491 0 => {
492 assert_eq!(
493 ev.header(),
494 BinlogEventHeader::new(
495 1253783036,
496 EventType::FORMAT_DESCRIPTION_EVENT,
497 1,
498 94,
499 98,
500 EventFlags::empty()
501 )
502 )
503 }
504 1 => assert_eq!(
505 ev.header(),
506 BinlogEventHeader::new(
507 1253783037,
508 EventType::QUERY_EVENT,
509 1,
510 100,
511 198,
512 EventFlags::empty()
513 )
514 ),
515 2 => assert_eq!(
516 ev.header(),
517 BinlogEventHeader::new(
518 1253783037,
519 EventType::QUERY_EVENT,
520 1,
521 101,
522 299,
523 EventFlags::empty()
524 )
525 ),
526 3 => assert_eq!(
527 ev.header(),
528 BinlogEventHeader::new(
529 1253783037,
530 EventType::QUERY_EVENT,
531 1,
532 69,
533 368,
534 EventFlags::LOG_EVENT_SUPPRESS_USE_F
535 )
536 ),
537 4 => assert_eq!(
538 ev.header(),
539 BinlogEventHeader::new(
540 1253783037,
541 EventType::QUERY_EVENT,
542 1,
543 92,
544 460,
545 EventFlags::empty()
546 )
547 ),
548 5 => assert_eq!(
549 ev.header(),
550 BinlogEventHeader::new(
551 1253783037,
552 EventType::QUERY_EVENT,
553 1,
554 93,
555 553,
556 EventFlags::empty()
557 )
558 ),
559 6 => assert_eq!(
560 ev.header(),
561 BinlogEventHeader::new(
562 1253783037,
563 EventType::XID_EVENT,
564 1,
565 27,
566 580,
567 EventFlags::empty()
568 )
569 ),
570 7 => assert_eq!(
571 ev.header(),
572 BinlogEventHeader::new(
573 1253783037,
574 EventType::QUERY_EVENT,
575 1,
576 100,
577 680,
578 EventFlags::empty()
579 )
580 ),
581 8 => assert_eq!(
582 ev.header(),
583 BinlogEventHeader::new(
584 1253783037,
585 EventType::QUERY_EVENT,
586 1,
587 101,
588 781,
589 EventFlags::empty()
590 )
591 ),
592 9 => assert_eq!(
593 ev.header(),
594 BinlogEventHeader::new(
595 1253783037,
596 EventType::QUERY_EVENT,
597 1,
598 69,
599 850,
600 EventFlags::LOG_EVENT_SUPPRESS_USE_F
601 )
602 ),
603 10 => assert_eq!(
604 ev.header(),
605 BinlogEventHeader::new(
606 1253783037,
607 EventType::QUERY_EVENT,
608 1,
609 92,
610 942,
611 EventFlags::empty()
612 )
613 ),
614 11 => assert_eq!(
615 ev.header(),
616 BinlogEventHeader::new(
617 1253783037,
618 EventType::QUERY_EVENT,
619 1,
620 93,
621 1035,
622 EventFlags::empty()
623 )
624 ),
625 12 => assert_eq!(
626 ev.header(),
627 BinlogEventHeader::new(
628 1253783037,
629 EventType::QUERY_EVENT,
630 1,
631 72,
632 1107,
633 EventFlags::LOG_EVENT_SUPPRESS_USE_F
634 )
635 ),
636 13 => assert_eq!(
637 ev.header(),
638 BinlogEventHeader::new(
639 1253783037,
640 EventType::QUERY_EVENT,
641 1,
642 97,
643 1204,
644 EventFlags::empty()
645 )
646 ),
647 14 => assert_eq!(
648 ev.header(),
649 BinlogEventHeader::new(
650 1253783037,
651 EventType::QUERY_EVENT,
652 1,
653 98,
654 1302,
655 EventFlags::empty()
656 )
657 ),
658 15 => assert_eq!(
659 ev.header(),
660 BinlogEventHeader::new(
661 1253783037,
662 EventType::QUERY_EVENT,
663 1,
664 69,
665 1371,
666 EventFlags::LOG_EVENT_SUPPRESS_USE_F
667 )
668 ),
669 16 => assert_eq!(
670 ev.header(),
671 BinlogEventHeader::new(
672 1253783037,
673 EventType::QUERY_EVENT,
674 1,
675 92,
676 1463,
677 EventFlags::empty()
678 )
679 ),
680 17 => assert_eq!(
681 ev.header(),
682 BinlogEventHeader::new(
683 1253783037,
684 EventType::QUERY_EVENT,
685 1,
686 93,
687 1556,
688 EventFlags::empty()
689 )
690 ),
691 18 => assert_eq!(
692 ev.header(),
693 BinlogEventHeader::new(
694 1253783037,
695 EventType::QUERY_EVENT,
696 1,
697 70,
698 1626,
699 EventFlags::LOG_EVENT_SUPPRESS_USE_F
700 )
701 ),
702 19 => assert_eq!(
703 ev.header(),
704 BinlogEventHeader::new(
705 1253783037,
706 EventType::ROTATE_EVENT,
707 1,
708 44,
709 1670,
710 EventFlags::empty()
711 )
712 ),
713 _ => panic!("too many"),
714 }
715
716 assert_eq!(
717 ev.data(),
718 &BINLOG_FILE[data_start
719 ..(data_start + ev.header().event_size() as usize - BinlogEventHeader::LEN)],
720 );
721
722 total += 1;
723 ev_pos = ev.header().log_pos() as usize;
724 }
725
726 assert_eq!(total, 20);
727 Ok(())
728 }
729
730 #[test]
731 fn binlog_event_roundtrip() -> io::Result<()> {
732 const PATH: &str = "./test-data/binlogs";
733
734 let binlogs = std::fs::read_dir(PATH)?
735 .filter_map(|path| path.ok())
736 .map(|entry| entry.path())
737 .filter(|path| path.file_name().is_some());
738
739 'outer: for file_path in binlogs {
740 let file_data = std::fs::read(dbg!(&file_path))?;
741 let mut binlog_file = BinlogFile::new(BinlogVersion::Version4, &file_data[..])?;
742
743 let mut i = 0;
744 let mut ev_pos = 4;
745 let mut table_map_events = HashMap::new();
746
747 while let Some(ev) = binlog_file.next() {
748 i += 1;
749 let ev = ev?;
750 let _ = dbg!(ev.header().event_type());
751 let ev_end = ev_pos + ev.header().event_size() as usize;
752 let binlog_version = binlog_file.reader.fde.binlog_version();
753
754 let mut output = Vec::new();
755 ev.write(binlog_version, &mut output)?;
756
757 let event = match ev.read_data() {
758 Ok(event) => {
759 let event = match event {
760 Some(e) => e,
761 None => {
762 if file_path.file_name().unwrap() == "mariadb-bin.000001" {
763 continue;
764 } else {
765 dbg!(&ev);
766 panic!();
767 }
768 }
769 };
770 match event {
771 EventData::TableMapEvent(ref ev) => {
772 table_map_events.insert(ev.table_id(), ev.clone().into_owned());
774
775 event
776 }
777 EventData::RowsEvent(ref rows_event) => {
778 let table_map_event =
780 binlog_file.reader().get_tme(rows_event.table_id()).unwrap();
781 for row in rows_event.rows(table_map_event) {
782 let _row = row.unwrap();
783 if file_path.file_name().unwrap() == "mariadb-bin.000001" {
784 let after = _row.1.as_ref().unwrap();
786 let columns = after.columns_ref();
787
788 for col in columns.iter() {
789 assert_eq!(col.schema_ref(), b"toddy_test");
790 assert_eq!(col.table_ref(), b"outbox");
791 assert_eq!(col.org_table_ref(), b"outbox");
792 }
793
794 for (col, col_name) in columns.iter().zip([
795 "id",
796 "topic",
797 "event_type",
798 "event",
799 "created",
800 ]) {
801 assert_eq!(col.name_ref(), col_name.as_bytes());
802 }
803
804 for (col, f) in columns.iter().zip(
805 once(ColumnFlags::PRI_KEY_FLAG)
806 .chain(repeat(ColumnFlags::empty())),
807 ) {
808 assert_eq!(col.flags(), f);
809 }
810
811 for (col, charset) in columns.iter().zip([
812 CollationId::UNKNOWN_COLLATION_ID,
813 CollationId::UTF8MB4_GENERAL_CI,
814 CollationId::UTF8MB4_GENERAL_CI,
815 CollationId::BINARY,
816 CollationId::UNKNOWN_COLLATION_ID,
817 ]) {
818 assert_eq!(col.character_set(), charset as u16);
819 }
820 }
821 }
822
823 event
824 }
825 _ => event,
826 }
827 }
828 Err(err)
829 if err.kind() == std::io::ErrorKind::Other
830 && ev.header().event_type() == Ok(EventType::XID_EVENT)
831 && ev.header().event_size() == 0x26
832 && file_path.file_name().unwrap() == "ver_5_1-wl2325_r.001" =>
833 {
834 continue 'outer;
836 }
837 Err(err)
838 if err.kind() == std::io::ErrorKind::UnexpectedEof
839 && ev.header().event_type() == Ok(EventType::QUERY_EVENT)
840 && ev.header().event_size() == 171
841 && file_path.file_name().unwrap() == "corrupt-relay-bin.000624" =>
842 {
843 continue 'outer;
845 }
846 other => other.transpose().unwrap()?,
847 };
848
849 if file_path.file_name().unwrap() == "binlog-invisible-columns.000001" {
850 if let Some(EventData::TableMapEvent(ev)) = ev.read_data().unwrap() {
851 let optional_meta = ev.iter_optional_meta();
852 for meta in optional_meta {
853 meta.unwrap();
854 }
855 }
856 }
857
858 if file_path.file_name().unwrap() == "json-opaque.binlog" {
859 let event_data = ev.read_data().unwrap();
860
861 macro_rules! extract_cmp {
865 ($row:expr, $expected:tt) => {
866 let mut after = $row.1.unwrap().unwrap();
867 let a = dbg!(after.pop().unwrap());
868 let super::value::BinlogValue::Jsonb(a) = a else {
869 panic!("BinlogValue::Jsonb(_) expected");
870 };
871 assert_eq!(
872 serde_json::json!($expected),
873 serde_json::Value::from(a.parse().unwrap())
874 );
875 };
876 }
877
878 match event_data {
879 Some(EventData::RowsEvent(ev)) if i == 10 => {
880 let table_map_event =
881 binlog_file.reader().get_tme(ev.table_id()).unwrap();
882 let mut rows = ev.rows(table_map_event);
883 extract_cmp!(rows.next().unwrap().unwrap(), {"a": "base64:type15:VQ=="});
884 }
885 Some(EventData::RowsEvent(ev)) if i == 12 => {
886 let table_map_event =
887 binlog_file.reader().get_tme(ev.table_id()).unwrap();
888 let mut rows = ev.rows(table_map_event);
889 extract_cmp!(rows.next().unwrap().unwrap(), {"b": "2012-03-18"});
890 }
891 Some(EventData::RowsEvent(ev)) if i == 14 => {
892 let table_map_event =
893 binlog_file.reader().get_tme(ev.table_id()).unwrap();
894 let mut rows = ev.rows(table_map_event);
895 extract_cmp!(rows.next().unwrap().unwrap(), {"c": "2012-03-18 11:30:45.000000"});
896 }
897 Some(EventData::RowsEvent(ev)) if i == 16 => {
898 let table_map_event =
899 binlog_file.reader().get_tme(ev.table_id()).unwrap();
900 let mut rows = ev.rows(table_map_event);
901 extract_cmp!(rows.next().unwrap().unwrap(), {"c": "87:31:46.654321"});
902 }
903 Some(EventData::RowsEvent(ev)) if i == 18 => {
904 let table_map_event =
905 binlog_file.reader().get_tme(ev.table_id()).unwrap();
906 let mut rows = ev.rows(table_map_event);
907 extract_cmp!(rows.next().unwrap().unwrap(), {"d": "123.456"});
908 }
909 Some(EventData::RowsEvent(ev)) if i == 20 => {
910 let table_map_event =
911 binlog_file.reader().get_tme(ev.table_id()).unwrap();
912 let mut rows = ev.rows(table_map_event);
913 extract_cmp!(rows.next().unwrap().unwrap(), {"e": "9.00"});
914 }
915 Some(EventData::RowsEvent(ev)) if i == 22 => {
916 let table_map_event =
917 binlog_file.reader().get_tme(ev.table_id()).unwrap();
918 let mut rows = ev.rows(table_map_event);
919 extract_cmp!(rows.next().unwrap().unwrap(), {"e": [0, 1, true, false]});
920 }
921 Some(EventData::RowsEvent(ev)) if i == 24 => {
922 let table_map_event =
923 binlog_file.reader().get_tme(ev.table_id()).unwrap();
924 let mut rows = ev.rows(table_map_event);
925 extract_cmp!(rows.next().unwrap().unwrap(), {"e": null});
926 }
927 Some(EventData::RowsEvent(ev)) => {
928 panic!("no more events expected i={}, {:?}", i, ev);
929 }
930 _ => (),
931 }
932 }
933
934 if file_path.file_name().unwrap() == "vector.binlog" {
935 let event_data = ev.read_data().unwrap();
936 match event_data {
937 Some(EventData::TableMapEvent(ev)) => {
938 let optional_meta = ev.iter_optional_meta();
939 match ev.table_name().as_ref() {
940 "foo" => {
941 for meta in optional_meta {
942 match meta.unwrap() {
943 OptionalMetadataField::Dimensionality(x) => assert_eq!(
944 x.iter_dimensionalities()
945 .collect::<Result<Vec<_>, _>>()
946 .unwrap(),
947 vec![3],
948 ),
949 _ => (),
950 }
951 }
952 }
953 "bar" => {
954 for meta in optional_meta {
955 match meta.unwrap() {
956 OptionalMetadataField::Dimensionality(x) => assert_eq!(
957 x.iter_dimensionalities()
958 .collect::<Result<Vec<_>, _>>()
959 .unwrap(),
960 vec![2, 4],
961 ),
962 _ => (),
963 }
964 }
965 }
966 _ => (),
967 }
968 }
969 Some(EventData::RowsEvent(ev)) if i == 12 => {
970 let table_map_event =
971 binlog_file.reader().get_tme(ev.table_id()).unwrap();
972 let mut rows = ev.rows(table_map_event);
973
974 let (None, Some(after)) = rows.next().unwrap().unwrap() else {
975 panic!("Unexpected data");
976 };
977 let (id, vector_column): (u8, Vec<u8>) =
978 from_row(crate::Row::try_from(after).unwrap());
979 assert_eq!(id, 1);
980 assert_eq!(
981 vector_column,
982 vec![205, 204, 140, 63, 205, 204, 12, 64, 51, 51, 83, 64]
983 );
984
985 let (None, Some(after)) = rows.next().unwrap().unwrap() else {
986 panic!("Unexpected data");
987 };
988 let (id, vector_column): (u8, Vec<u8>) =
989 from_row(crate::Row::try_from(after).unwrap());
990 assert_eq!(id, 2);
991 assert_eq!(
992 vector_column,
993 vec![0, 0, 128, 63, 0, 0, 128, 191, 0, 0, 0, 0]
994 );
995 }
996 _ => (),
997 }
998 }
999
1000 if file_path.file_name().unwrap() == "mysql-enum-string-set.000001" {
1001 if let Some(EventData::RowsEvent(data)) = ev.read_data().unwrap() {
1002 let table_map_event =
1003 binlog_file.reader().get_tme(data.table_id()).unwrap();
1004 for row in data.rows(table_map_event) {
1005 let (before, after) = row.unwrap();
1006 match data {
1007 RowsEventData::WriteRowsEvent(_) => {
1008 assert!(before.is_none());
1009 let after = after.unwrap().unwrap();
1010 let mut j = 0;
1011 for v in after {
1012 j += 1;
1013 match j {
1014 1 => assert_eq!(v, BinlogValue::Value("0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789".into())),
1015 2 => assert_eq!(v, BinlogValue::Value("0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789".into())),
1016 3 => assert_eq!(v, BinlogValue::Value(1_i8.into())),
1017 4 => assert_eq!(v, BinlogValue::Value([0b00000101_u8].into())),
1018 5 => assert_eq!(v, BinlogValue::Value("0123456789".into())),
1019
1020 _ => panic!(),
1021 }
1022 }
1023 assert_eq!(j, 5);
1024 }
1025 RowsEventData::UpdateRowsEvent(_) => {
1026 let before = before.unwrap().unwrap();
1027 let mut j = 0;
1028 for v in before {
1029 j += 1;
1030 match j {
1031 1 => assert_eq!(v, BinlogValue::Value("0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789".into())),
1032 2 => assert_eq!(v, BinlogValue::Value("0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789".into())),
1033 3 => assert_eq!(v, BinlogValue::Value(1_i8.into())),
1034 4 => assert_eq!(v, BinlogValue::Value([0b00000101_u8].into())),
1035 5 => assert_eq!(v, BinlogValue::Value("0123456789".into())),
1036
1037 _ => panic!(),
1038 }
1039 }
1040 assert_eq!(j, 5);
1041
1042 let after = after.unwrap().unwrap();
1043 let mut j = 0;
1044 for v in after {
1045 j += 1;
1046 match j {
1047 1 => assert_eq!(v, BinlogValue::Value("field1".into())),
1048 2 => assert_eq!(v, BinlogValue::Value("field_2".into())),
1049 3 => assert_eq!(v, BinlogValue::Value(2_i8.into())),
1050 4 => assert_eq!(v, BinlogValue::Value([0b00001010_u8].into())),
1051 5 => assert_eq!(v, BinlogValue::Value("0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789".into())),
1052 _ => panic!(),
1053 }
1054 }
1055 assert_eq!(j, 5);
1056 }
1057 RowsEventData::DeleteRowsEvent(_) => {
1058 assert!(after.is_none());
1059
1060 let before = before.unwrap().unwrap();
1061 let mut j = 0;
1062 for v in before {
1063 j += 1;
1064 match j {
1065 1 => assert_eq!(v, BinlogValue::Value("field1".into())),
1066 2 => assert_eq!(v, BinlogValue::Value("field_2".into())),
1067 3 => assert_eq!(v, BinlogValue::Value(2_i8.into())),
1068 4 => assert_eq!(v, BinlogValue::Value([0b00001010_u8].into())),
1069 5 => assert_eq!(v, BinlogValue::Value("0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456780123456789012345678901234567890123456789".into())),
1070 _ => panic!(),
1071 }
1072 }
1073 assert_eq!(j, 5);
1074 }
1075 _ => panic!(),
1076 }
1077 }
1078 }
1079 }
1080
1081 if file_path.file_name().unwrap() == "mariadb-bin.000001" {
1082 if let Some(EventData::RotateEvent(ev)) = ev.read_data().unwrap() {
1085 assert_ne!(ev.name_raw(), b"mariadb-bin.000001");
1086 }
1087 }
1088
1089 if file_path.file_name().unwrap() == "mysql_type_bit.000001" {
1090 if let Some(EventData::RowsEvent(ev)) = ev.read_data().unwrap() {
1091 let table_map_event = binlog_file.reader().get_tme(ev.table_id()).unwrap();
1092 for row in ev.rows(table_map_event) {
1093 let (before, after) = row.unwrap();
1094 assert_eq!(before, None);
1095 assert_eq!(
1096 after.unwrap().unwrap(),
1097 vec![
1098 BinlogValue::Value(Value::Bytes(vec![0b100])),
1099 BinlogValue::Value(Value::Bytes(b"foo".to_vec())),
1100 BinlogValue::Value(Value::Bytes(vec![0b100000])),
1101 ],
1102 );
1103 }
1104 }
1105 }
1106
1107 if file_path.file_name().unwrap() != "mariadb-bin.000001" {
1108 assert_eq!(output, &file_data[ev_pos..ev_end]);
1109 }
1110
1111 if file_path.file_name().unwrap() == "transaction_compression.000001" {
1112 if let Some(EventData::TransactionPayloadEvent(data)) = ev.read_data().unwrap()
1113 {
1114 let mut payload = data.decompressed()?;
1115 let reader = binlog_file.reader_mut();
1116
1117 let mut binlog_ev = reader.read_decompressed(&mut payload)?.unwrap();
1118 assert_eq!(binlog_ev.header().event_type(), Ok(EventType::QUERY_EVENT));
1119
1120 binlog_ev = reader.read_decompressed(&mut payload)?.unwrap();
1121 assert_eq!(
1122 binlog_ev.header().event_type(),
1123 Ok(EventType::TABLE_MAP_EVENT)
1124 );
1125
1126 binlog_ev = reader.read_decompressed(&mut payload)?.unwrap();
1127 assert_eq!(
1128 binlog_ev.header().event_type(),
1129 Ok(EventType::WRITE_ROWS_EVENT)
1130 );
1131
1132 binlog_ev = reader.read_decompressed(&mut payload)?.unwrap();
1133 assert_eq!(binlog_ev.header().event_type(), Ok(EventType::XID_EVENT));
1134 assert!(reader.read_decompressed(&mut payload)?.is_none());
1135 }
1136 }
1137 output = Vec::new();
1138 event.serialize(&mut output);
1139
1140 if matches!(event, EventData::UserVarEvent(_)) {
1141 assert_eq!(&output[..ev.data().len()], ev.data());
1143 assert!(output.len() == ev.data().len() || output.len() == ev.data().len() + 1);
1144 } else if (matches!(event, EventData::GtidEvent(_))
1145 || matches!(event, EventData::AnonymousGtidEvent(_)))
1146 && ev.fde().split_version() < (5, 7, 0)
1147 {
1148 assert_eq!(&output[..GtidEvent::POST_HEADER_LENGTH - 1 - 16], ev.data());
1150 } else if (matches!(event, EventData::GtidEvent(_))
1151 || matches!(event, EventData::AnonymousGtidEvent(_)))
1152 && ev.fde().split_version() < (5, 8, 0)
1153 {
1154 assert_eq!(&output[..GtidEvent::POST_HEADER_LENGTH], ev.data());
1156 } else {
1157 assert_eq!(output, ev.data());
1158 }
1159
1160 ev_pos = ev_end;
1161 }
1162 }
1163
1164 Ok(())
1165 }
1166}