1use std::{
10 borrow::Cow,
11 cmp::min,
12 convert::TryFrom,
13 fmt,
14 io::{self, Read},
15};
16
17use byteorder::{LittleEndian, ReadBytesExt};
18use saturating::Saturating as S;
19
20use crate::{
21 binlog::{
22 consts::{BinlogVersion, EventType, StatusVarKey},
23 BinlogCtx, BinlogEvent, BinlogStruct,
24 },
25 constants::{Flags2, SqlMode},
26 io::ParseBuf,
27 misc::{
28 raw::{
29 bytes::{BareU16Bytes, BareU8Bytes, EofBytes, NullBytes, U8Bytes},
30 int::*,
31 RawBytes, RawFlags, Skip,
32 },
33 unexpected_buf_eof,
34 },
35 proto::{MyDeserialize, MySerialize},
36};
37
38use super::BinlogEventHeader;
39
40#[derive(Debug, Clone, Eq, PartialEq, Hash)]
43pub struct QueryEvent<'a> {
44 thread_id: RawInt<LeU32>,
47 execution_time: RawInt<LeU32>,
49 schema_len: RawInt<u8>,
50 error_code: RawInt<LeU16>,
53 status_vars_len: RawInt<LeU16>,
54
55 status_vars: StatusVars<'a>,
63 schema: RawBytes<'a, BareU8Bytes>,
65 __skip: Skip<1>,
66 query: RawBytes<'a, EofBytes>,
68}
69
70impl<'a> QueryEvent<'a> {
71 pub fn new(status_vars: impl Into<Cow<'a, [u8]>>, schema: impl Into<Cow<'a, [u8]>>) -> Self {
73 let status_vars = StatusVars(RawBytes::new(status_vars));
74 let schema = RawBytes::new(schema);
75 Self {
76 thread_id: Default::default(),
77 execution_time: Default::default(),
78 schema_len: RawInt::new(schema.len() as u8),
79 error_code: Default::default(),
80 status_vars_len: RawInt::new(status_vars.0.len() as u16),
81 status_vars,
82 schema,
83 __skip: Default::default(),
84 query: Default::default(),
85 }
86 }
87
88 pub fn with_thread_id(mut self, thread_id: u32) -> Self {
90 self.thread_id = RawInt::new(thread_id);
91 self
92 }
93
94 pub fn with_execution_time(mut self, execution_time: u32) -> Self {
96 self.execution_time = RawInt::new(execution_time);
97 self
98 }
99
100 pub fn with_error_code(mut self, error_code: u16) -> Self {
102 self.error_code = RawInt::new(error_code);
103 self
104 }
105
106 pub fn with_status_vars(mut self, status_vars: impl Into<Cow<'a, [u8]>>) -> Self {
108 self.status_vars = StatusVars(RawBytes::new(status_vars));
109 self.status_vars_len.0 = self.status_vars.0.len() as u16;
110 self
111 }
112
113 pub fn with_schema(mut self, schema: impl Into<Cow<'a, [u8]>>) -> Self {
115 self.schema = RawBytes::new(schema);
116 self.schema_len.0 = self.schema.len() as u8;
117 self
118 }
119
120 pub fn with_query(mut self, query: impl Into<Cow<'a, [u8]>>) -> Self {
122 self.query = RawBytes::new(query);
123 self
124 }
125
126 pub fn thread_id(&self) -> u32 {
131 self.thread_id.0
132 }
133
134 pub fn execution_time(&self) -> u32 {
139 self.execution_time.0
140 }
141
142 pub fn error_code(&self) -> u16 {
147 self.error_code.0
148 }
149
150 pub fn status_vars_raw(&'a self) -> &'a [u8] {
155 self.status_vars.0.as_bytes()
156 }
157
158 pub fn status_vars(&'a self) -> &'a StatusVars<'a> {
160 &self.status_vars
161 }
162
163 pub fn schema_raw(&'a self) -> &'a [u8] {
167 self.schema.as_bytes()
168 }
169
170 pub fn schema(&'a self) -> Cow<'a, str> {
172 self.schema.as_str()
173 }
174
175 pub fn query_raw(&'a self) -> &'a [u8] {
179 self.query.as_bytes()
180 }
181
182 pub fn query(&'a self) -> Cow<'a, str> {
184 self.query.as_str()
185 }
186
187 pub fn into_owned(self) -> QueryEvent<'static> {
188 QueryEvent {
189 thread_id: self.thread_id,
190 execution_time: self.execution_time,
191 schema_len: self.schema_len,
192 error_code: self.error_code,
193 status_vars_len: self.status_vars_len,
194 status_vars: self.status_vars.into_owned(),
195 schema: self.schema.into_owned(),
196 __skip: self.__skip,
197 query: self.query.into_owned(),
198 }
199 }
200}
201
202impl<'de> MyDeserialize<'de> for QueryEvent<'de> {
203 const SIZE: Option<usize> = None;
204 type Ctx = BinlogCtx<'de>;
205
206 fn deserialize(ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
207 let mut sbuf: ParseBuf = buf.parse(13)?;
208 let thread_id = sbuf.parse_unchecked(())?;
209 let execution_time = sbuf.parse_unchecked(())?;
210 let schema_len: RawInt<u8> = sbuf.parse_unchecked(())?;
211 let error_code = sbuf.parse_unchecked(())?;
212 let status_vars_len: RawInt<LeU16> = sbuf.parse_unchecked(())?;
213
214 let post_header_len = ctx.fde.get_event_type_header_length(Self::EVENT_TYPE);
215 if !buf.checked_skip(post_header_len.saturating_sub(13) as usize) {
216 return Err(unexpected_buf_eof());
217 }
218
219 let status_vars = buf.parse(*status_vars_len)?;
220 let schema = buf.parse(*schema_len as usize)?;
221 let __skip = buf.parse(())?;
222 let query = buf.parse(())?;
223
224 Ok(Self {
225 thread_id,
226 execution_time,
227 schema_len,
228 error_code,
229 status_vars_len,
230 status_vars,
231 schema,
232 __skip,
233 query,
234 })
235 }
236}
237
238impl MySerialize for QueryEvent<'_> {
239 fn serialize(&self, buf: &mut Vec<u8>) {
240 self.thread_id.serialize(&mut *buf);
241 self.execution_time.serialize(&mut *buf);
242 self.schema_len.serialize(&mut *buf);
243 self.error_code.serialize(&mut *buf);
244 self.status_vars_len.serialize(&mut *buf);
245 self.status_vars.serialize(&mut *buf);
246 self.schema.serialize(&mut *buf);
247 self.__skip.serialize(&mut *buf);
248 self.query.serialize(&mut *buf);
249 }
250}
251
252impl<'a> BinlogEvent<'a> for QueryEvent<'a> {
253 const EVENT_TYPE: EventType = EventType::QUERY_EVENT;
254}
255
256impl<'a> BinlogStruct<'a> for QueryEvent<'a> {
257 fn len(&self, _version: BinlogVersion) -> usize {
258 let mut len = S(0);
259
260 len += S(4);
261 len += S(4);
262 len += S(1);
263 len += S(2);
264 len += S(2);
265 len += S(min(self.status_vars.0.len(), u16::MAX as usize));
266 len += S(min(self.schema.0.len(), u8::MAX as usize));
267 len += S(1);
268 len += S(self.query.0.len());
269
270 min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
271 }
272}
273
274#[derive(Debug, Clone, Eq, PartialEq, Hash)]
276pub enum StatusVarVal<'a> {
277 Flags2(RawFlags<Flags2, LeU32>),
278 SqlMode(RawFlags<SqlMode, LeU64>),
279 Catalog(&'a [u8]),
281 AutoIncrement {
282 increment: u16,
283 offset: u16,
284 },
285 Charset {
286 charset_client: u16,
287 collation_connection: u16,
288 collation_server: u16,
289 },
290 TimeZone(RawBytes<'a, U8Bytes>),
292 CatalogNz(RawBytes<'a, U8Bytes>),
294 LcTimeNames(u16),
295 CharsetDatabase(u16),
296 TableMapForUpdate(u64),
297 MasterDataWritten([u8; 4]),
298 Invoker {
299 username: RawBytes<'a, U8Bytes>,
300 hostname: RawBytes<'a, U8Bytes>,
301 },
302 UpdatedDbNames(Vec<RawBytes<'a, NullBytes>>),
303 Microseconds(u32),
304 CommitTs(&'a [u8]),
306 CommitTs2(&'a [u8]),
308 ExplicitDefaultsForTimestamp(bool),
310 DdlLoggedWithXid(u64),
311 DefaultCollationForUtf8mb4(u16),
312 SqlRequirePrimaryKey(u8),
313 DefaultTableEncryption(u8),
314}
315
316#[derive(Clone, Eq, PartialEq, Hash)]
318pub struct StatusVar<'a> {
319 key: StatusVarKey,
321 value: &'a [u8],
323}
324
325impl StatusVar<'_> {
326 pub fn get_value(&self) -> Result<StatusVarVal, &[u8]> {
328 match self.key {
329 StatusVarKey::Flags2 => {
330 let mut read = self.value;
331 read.read_u32::<LittleEndian>()
332 .map(RawFlags::new)
333 .map(StatusVarVal::Flags2)
334 .map_err(|_| self.value)
335 }
336 StatusVarKey::SqlMode => {
337 let mut read = self.value;
338 read.read_u64::<LittleEndian>()
339 .map(RawFlags::new)
340 .map(StatusVarVal::SqlMode)
341 .map_err(|_| self.value)
342 }
343 StatusVarKey::Catalog => Ok(StatusVarVal::Catalog(self.value)),
344 StatusVarKey::AutoIncrement => {
345 let mut read = self.value;
346 let increment = read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
347 let offset = read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
348 Ok(StatusVarVal::AutoIncrement { increment, offset })
349 }
350 StatusVarKey::Charset => {
351 let mut read = self.value;
352 let charset_client = read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
353 let collation_connection =
354 read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
355 let collation_server = read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
356 Ok(StatusVarVal::Charset {
357 charset_client,
358 collation_connection,
359 collation_server,
360 })
361 }
362 StatusVarKey::TimeZone => {
363 let mut read = self.value;
364 let len = read.read_u8().map_err(|_| self.value)? as usize;
365 let text = read.get(..len).ok_or(self.value)?;
366 Ok(StatusVarVal::TimeZone(RawBytes::new(text)))
367 }
368 StatusVarKey::CatalogNz => {
369 let mut read = self.value;
370 let len = read.read_u8().map_err(|_| self.value)? as usize;
371 let text = read.get(..len).ok_or(self.value)?;
372 Ok(StatusVarVal::CatalogNz(RawBytes::new(text)))
373 }
374 StatusVarKey::LcTimeNames => {
375 let mut read = self.value;
376 let val = read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
377 Ok(StatusVarVal::LcTimeNames(val))
378 }
379 StatusVarKey::CharsetDatabase => {
380 let mut read = self.value;
381 let val = read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
382 Ok(StatusVarVal::CharsetDatabase(val))
383 }
384 StatusVarKey::TableMapForUpdate => {
385 let mut read = self.value;
386 let val = read.read_u64::<LittleEndian>().map_err(|_| self.value)?;
387 Ok(StatusVarVal::TableMapForUpdate(val))
388 }
389 StatusVarKey::MasterDataWritten => {
390 let mut read = self.value;
391 let mut val = [0u8; 4];
392 read.read_exact(&mut val).map_err(|_| self.value)?;
393 Ok(StatusVarVal::MasterDataWritten(val))
394 }
395 StatusVarKey::Invoker => {
396 let mut read = self.value;
397
398 let len = read.read_u8().map_err(|_| self.value)? as usize;
399 let username = read.get(..len).ok_or(self.value)?;
400 read = &read[len..];
401
402 let len = read.read_u8().map_err(|_| self.value)? as usize;
403 let hostname = read.get(..len).ok_or(self.value)?;
404
405 Ok(StatusVarVal::Invoker {
406 username: RawBytes::new(username),
407 hostname: RawBytes::new(hostname),
408 })
409 }
410 StatusVarKey::UpdatedDbNames => {
411 let mut read = self.value;
412 let count = read.read_u8().map_err(|_| self.value)? as usize;
413 let mut names = Vec::with_capacity(count);
414
415 for _ in 0..count {
416 let index = read.iter().position(|x| *x == 0).ok_or(self.value)?;
417 names.push(RawBytes::new(&read[..index]));
418 read = &read[index..];
419 }
420
421 Ok(StatusVarVal::UpdatedDbNames(names))
422 }
423 StatusVarKey::Microseconds => {
424 let mut read = self.value;
425 let val = read.read_u32::<LittleEndian>().map_err(|_| self.value)?;
426 Ok(StatusVarVal::Microseconds(val))
427 }
428 StatusVarKey::CommitTs => Ok(StatusVarVal::CommitTs(self.value)),
429 StatusVarKey::CommitTs2 => Ok(StatusVarVal::CommitTs2(self.value)),
430 StatusVarKey::ExplicitDefaultsForTimestamp => {
431 let mut read = self.value;
432 let val = read.read_u8().map_err(|_| self.value)?;
433 Ok(StatusVarVal::ExplicitDefaultsForTimestamp(val != 0))
434 }
435 StatusVarKey::DdlLoggedWithXid => {
436 let mut read = self.value;
437 let val = read.read_u64::<LittleEndian>().map_err(|_| self.value)?;
438 Ok(StatusVarVal::DdlLoggedWithXid(val))
439 }
440 StatusVarKey::DefaultCollationForUtf8mb4 => {
441 let mut read = self.value;
442 let val = read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
443 Ok(StatusVarVal::DefaultCollationForUtf8mb4(val))
444 }
445 StatusVarKey::SqlRequirePrimaryKey => {
446 let mut read = self.value;
447 let val = read.read_u8().map_err(|_| self.value)?;
448 Ok(StatusVarVal::SqlRequirePrimaryKey(val))
449 }
450 StatusVarKey::DefaultTableEncryption => {
451 let mut read = self.value;
452 let val = read.read_u8().map_err(|_| self.value)?;
453 Ok(StatusVarVal::DefaultTableEncryption(val))
454 }
455 }
456 }
457}
458
459impl fmt::Debug for StatusVar<'_> {
460 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461 f.debug_struct("StatusVar")
462 .field("key", &self.key)
463 .field("value", &self.get_value())
464 .finish()
465 }
466}
467
468#[derive(Clone, Eq, PartialEq, Hash)]
470pub struct StatusVars<'a>(pub RawBytes<'a, BareU16Bytes>);
471
472impl<'a> StatusVars<'a> {
473 pub fn iter(&'a self) -> StatusVarsIterator<'a> {
475 StatusVarsIterator::new(self.0.as_bytes())
476 }
477
478 pub fn get_status_var(&'a self, needle: StatusVarKey) -> Option<StatusVar<'a>> {
480 self.iter()
481 .find_map(|var| if var.key == needle { Some(var) } else { None })
482 }
483
484 pub fn into_owned(self) -> StatusVars<'static> {
485 StatusVars(self.0.into_owned())
486 }
487}
488
489impl<'de> MyDeserialize<'de> for StatusVars<'de> {
490 const SIZE: Option<usize> = None;
491 type Ctx = u16;
492
493 fn deserialize(len: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
494 Ok(Self(buf.parse(len as usize)?))
495 }
496}
497
498impl MySerialize for StatusVars<'_> {
499 fn serialize(&self, buf: &mut Vec<u8>) {
500 self.0.serialize(buf);
501 }
502}
503
504impl fmt::Debug for StatusVars<'_> {
505 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
506 self.iter().fmt(f)
507 }
508}
509
510#[derive(Clone, Eq, PartialEq, Hash)]
514pub struct StatusVarsIterator<'a> {
515 pos: usize,
516 status_vars: &'a [u8],
517}
518
519impl<'a> StatusVarsIterator<'a> {
520 pub fn new(status_vars: &'a [u8]) -> StatusVarsIterator<'a> {
522 Self {
523 pos: 0,
524 status_vars,
525 }
526 }
527}
528
529impl fmt::Debug for StatusVarsIterator<'_> {
530 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531 f.debug_list().entries(self.clone()).finish()
532 }
533}
534
535impl<'a> Iterator for StatusVarsIterator<'a> {
536 type Item = StatusVar<'a>;
537
538 fn next(&mut self) -> Option<Self::Item> {
539 let key = *self.status_vars.get(self.pos)?;
540 let key = StatusVarKey::try_from(key).ok()?;
541 self.pos += 1;
542
543 macro_rules! get_fixed {
544 ($len:expr) => {{
545 self.pos += $len;
546 self.status_vars.get((self.pos - $len)..self.pos)?
547 }};
548 }
549
550 macro_rules! get_var {
551 ($suffix_len:expr) => {{
552 let len = *self.status_vars.get(self.pos)? as usize;
553 get_fixed!(1 + len + $suffix_len)
554 }};
555 }
556
557 let value = match key {
558 StatusVarKey::Flags2 => get_fixed!(4),
559 StatusVarKey::SqlMode => get_fixed!(8),
560 StatusVarKey::Catalog => get_var!(1),
561 StatusVarKey::AutoIncrement => get_fixed!(4),
562 StatusVarKey::Charset => get_fixed!(6),
563 StatusVarKey::TimeZone => get_var!(0),
564 StatusVarKey::CatalogNz => get_var!(0),
565 StatusVarKey::LcTimeNames => get_fixed!(2),
566 StatusVarKey::CharsetDatabase => get_fixed!(2),
567 StatusVarKey::TableMapForUpdate => get_fixed!(8),
568 StatusVarKey::MasterDataWritten => get_fixed!(4),
569 StatusVarKey::Invoker => {
570 let user_len = *self.status_vars.get(self.pos)? as usize;
571 let host_len = *self.status_vars.get(self.pos + 1 + user_len)? as usize;
572 get_fixed!(1 + user_len + 1 + host_len)
573 }
574 StatusVarKey::UpdatedDbNames => {
575 let mut total = 1;
576 let count = *self.status_vars.get(self.pos)? as usize;
577 for _ in 0..count {
578 while *self.status_vars.get(self.pos + total)? != 0x00 {
579 total += 1;
580 }
581 total += 1;
582 }
583 get_fixed!(total)
584 }
585 StatusVarKey::Microseconds => get_fixed!(3),
586 StatusVarKey::CommitTs => get_fixed!(0),
587 StatusVarKey::CommitTs2 => get_fixed!(0),
588 StatusVarKey::ExplicitDefaultsForTimestamp => get_fixed!(1),
589 StatusVarKey::DdlLoggedWithXid => get_fixed!(8),
590 StatusVarKey::DefaultCollationForUtf8mb4 => get_fixed!(2),
591 StatusVarKey::SqlRequirePrimaryKey => get_fixed!(1),
592 StatusVarKey::DefaultTableEncryption => get_fixed!(1),
593 };
594
595 Some(StatusVar { key, value })
596 }
597}