mysql_common/binlog/events/
query_event.rs

1// Copyright (c) 2021 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use std::{
10    borrow::Cow,
11    cmp::min,
12    convert::TryFrom,
13    fmt,
14    io::{self, Read},
15};
16
17use byteorder::{LittleEndian, ReadBytesExt};
18use saturating::Saturating as S;
19
20use crate::{
21    binlog::{
22        consts::{BinlogVersion, EventType, StatusVarKey},
23        BinlogCtx, BinlogEvent, BinlogStruct,
24    },
25    constants::{Flags2, SqlMode},
26    io::ParseBuf,
27    misc::{
28        raw::{
29            bytes::{BareU16Bytes, BareU8Bytes, EofBytes, NullBytes, U8Bytes},
30            int::*,
31            RawBytes, RawFlags, Skip,
32        },
33        unexpected_buf_eof,
34    },
35    proto::{MyDeserialize, MySerialize},
36};
37
38use super::BinlogEventHeader;
39
40/// A query event is created for each query that modifies the database, unless the query
41/// is logged row-based.
42#[derive(Debug, Clone, Eq, PartialEq, Hash)]
43pub struct QueryEvent<'a> {
44    // post-header fields
45    /// The ID of the thread that issued this statement. It is needed for temporary tables.
46    thread_id: RawInt<LeU32>,
47    /// The time from when the query started to when it was logged in the binlog, in seconds.
48    execution_time: RawInt<LeU32>,
49    schema_len: RawInt<u8>,
50    /// Error code generated by the master. If the master fails, the slave will fail with
51    /// the same error code.
52    error_code: RawInt<LeU16>,
53    status_vars_len: RawInt<LeU16>,
54
55    // payload
56    /// Zero or more status variables (`status_vars_length` bytes).
57    ///
58    /// Each status variable consists of one byte identifying the variable stored, followed
59    /// by the value of the variable. Please consult the MySql documentation.
60    ///
61    /// Only available if binlog version >= 4 (empty otherwise).
62    status_vars: StatusVars<'a>,
63    /// The currently selected database name (`schema-length` bytes).
64    schema: RawBytes<'a, BareU8Bytes>,
65    __skip: Skip<1>,
66    /// The SQL query.
67    query: RawBytes<'a, EofBytes>,
68}
69
70impl<'a> QueryEvent<'a> {
71    /// Creates a new instance.
72    pub fn new(status_vars: impl Into<Cow<'a, [u8]>>, schema: impl Into<Cow<'a, [u8]>>) -> Self {
73        let status_vars = StatusVars(RawBytes::new(status_vars));
74        let schema = RawBytes::new(schema);
75        Self {
76            thread_id: Default::default(),
77            execution_time: Default::default(),
78            schema_len: RawInt::new(schema.len() as u8),
79            error_code: Default::default(),
80            status_vars_len: RawInt::new(status_vars.0.len() as u16),
81            status_vars,
82            schema,
83            __skip: Default::default(),
84            query: Default::default(),
85        }
86    }
87
88    /// Sets the `thread_id` value.
89    pub fn with_thread_id(mut self, thread_id: u32) -> Self {
90        self.thread_id = RawInt::new(thread_id);
91        self
92    }
93
94    /// Sets the `execution_time` value.
95    pub fn with_execution_time(mut self, execution_time: u32) -> Self {
96        self.execution_time = RawInt::new(execution_time);
97        self
98    }
99
100    /// Sets the `error_code` value.
101    pub fn with_error_code(mut self, error_code: u16) -> Self {
102        self.error_code = RawInt::new(error_code);
103        self
104    }
105
106    /// Sets the `status_vars` value (max length is `u16::MAX).
107    pub fn with_status_vars(mut self, status_vars: impl Into<Cow<'a, [u8]>>) -> Self {
108        self.status_vars = StatusVars(RawBytes::new(status_vars));
109        self.status_vars_len.0 = self.status_vars.0.len() as u16;
110        self
111    }
112
113    /// Sets the `schema` value (max length is `u8::MAX).
114    pub fn with_schema(mut self, schema: impl Into<Cow<'a, [u8]>>) -> Self {
115        self.schema = RawBytes::new(schema);
116        self.schema_len.0 = self.schema.len() as u8;
117        self
118    }
119
120    /// Sets the `query` value.
121    pub fn with_query(mut self, query: impl Into<Cow<'a, [u8]>>) -> Self {
122        self.query = RawBytes::new(query);
123        self
124    }
125
126    /// Returns the `thread_id` value.
127    ///
128    /// `thread_id` is the ID of the thread that issued this statement.
129    /// It is needed for temporary tables.
130    pub fn thread_id(&self) -> u32 {
131        self.thread_id.0
132    }
133
134    /// Returns the `execution_time` value.
135    ///
136    /// `execution_time` is the time from when the query started to when it was logged
137    /// in the binlog, in seconds.
138    pub fn execution_time(&self) -> u32 {
139        self.execution_time.0
140    }
141
142    /// Returns the `error_code` value.
143    ///
144    /// `error_code` is the error code generated by the master. If the master fails, the slave will
145    /// fail with the same error code.
146    pub fn error_code(&self) -> u16 {
147        self.error_code.0
148    }
149
150    /// Returns the `status_vars` value.
151    ///
152    /// `status_vars` contains zero or more status variables. Each status variable consists of one
153    /// byte identifying the variable stored, followed by the value of the variable.
154    pub fn status_vars_raw(&'a self) -> &'a [u8] {
155        self.status_vars.0.as_bytes()
156    }
157
158    /// Returns an iterator over status variables.
159    pub fn status_vars(&'a self) -> &'a StatusVars<'a> {
160        &self.status_vars
161    }
162
163    /// Returns the `schema` value.
164    ///
165    /// `schema` is schema name.
166    pub fn schema_raw(&'a self) -> &'a [u8] {
167        self.schema.as_bytes()
168    }
169
170    /// Returns the `schema` value as a string (lossy converted).
171    pub fn schema(&'a self) -> Cow<'a, str> {
172        self.schema.as_str()
173    }
174
175    /// Returns the `query` value.
176    ///
177    /// `query` is the corresponding LOAD DATA INFILE statement.
178    pub fn query_raw(&'a self) -> &'a [u8] {
179        self.query.as_bytes()
180    }
181
182    /// Returns the `query` value as a string (lossy converted).
183    pub fn query(&'a self) -> Cow<'a, str> {
184        self.query.as_str()
185    }
186
187    pub fn into_owned(self) -> QueryEvent<'static> {
188        QueryEvent {
189            thread_id: self.thread_id,
190            execution_time: self.execution_time,
191            schema_len: self.schema_len,
192            error_code: self.error_code,
193            status_vars_len: self.status_vars_len,
194            status_vars: self.status_vars.into_owned(),
195            schema: self.schema.into_owned(),
196            __skip: self.__skip,
197            query: self.query.into_owned(),
198        }
199    }
200}
201
202impl<'de> MyDeserialize<'de> for QueryEvent<'de> {
203    const SIZE: Option<usize> = None;
204    type Ctx = BinlogCtx<'de>;
205
206    fn deserialize(ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
207        let mut sbuf: ParseBuf = buf.parse(13)?;
208        let thread_id = sbuf.parse_unchecked(())?;
209        let execution_time = sbuf.parse_unchecked(())?;
210        let schema_len: RawInt<u8> = sbuf.parse_unchecked(())?;
211        let error_code = sbuf.parse_unchecked(())?;
212        let status_vars_len: RawInt<LeU16> = sbuf.parse_unchecked(())?;
213
214        let post_header_len = ctx.fde.get_event_type_header_length(Self::EVENT_TYPE);
215        if !buf.checked_skip(post_header_len.saturating_sub(13) as usize) {
216            return Err(unexpected_buf_eof());
217        }
218
219        let status_vars = buf.parse(*status_vars_len)?;
220        let schema = buf.parse(*schema_len as usize)?;
221        let __skip = buf.parse(())?;
222        let query = buf.parse(())?;
223
224        Ok(Self {
225            thread_id,
226            execution_time,
227            schema_len,
228            error_code,
229            status_vars_len,
230            status_vars,
231            schema,
232            __skip,
233            query,
234        })
235    }
236}
237
238impl MySerialize for QueryEvent<'_> {
239    fn serialize(&self, buf: &mut Vec<u8>) {
240        self.thread_id.serialize(&mut *buf);
241        self.execution_time.serialize(&mut *buf);
242        self.schema_len.serialize(&mut *buf);
243        self.error_code.serialize(&mut *buf);
244        self.status_vars_len.serialize(&mut *buf);
245        self.status_vars.serialize(&mut *buf);
246        self.schema.serialize(&mut *buf);
247        self.__skip.serialize(&mut *buf);
248        self.query.serialize(&mut *buf);
249    }
250}
251
252impl<'a> BinlogEvent<'a> for QueryEvent<'a> {
253    const EVENT_TYPE: EventType = EventType::QUERY_EVENT;
254}
255
256impl<'a> BinlogStruct<'a> for QueryEvent<'a> {
257    fn len(&self, _version: BinlogVersion) -> usize {
258        let mut len = S(0);
259
260        len += S(4);
261        len += S(4);
262        len += S(1);
263        len += S(2);
264        len += S(2);
265        len += S(min(self.status_vars.0.len(), u16::MAX as usize));
266        len += S(min(self.schema.0.len(), u8::MAX as usize));
267        len += S(1);
268        len += S(self.query.0.len());
269
270        min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
271    }
272}
273
274/// Status variable value.
275#[derive(Debug, Clone, Eq, PartialEq, Hash)]
276pub enum StatusVarVal<'a> {
277    Flags2(RawFlags<Flags2, LeU32>),
278    SqlMode(RawFlags<SqlMode, LeU64>),
279    /// Ignored by this implementation.
280    Catalog(&'a [u8]),
281    AutoIncrement {
282        increment: u16,
283        offset: u16,
284    },
285    Charset {
286        charset_client: u16,
287        collation_connection: u16,
288        collation_server: u16,
289    },
290    /// Will be empty if timezone length is `0`.
291    TimeZone(RawBytes<'a, U8Bytes>),
292    /// Will be empty if timezone length is `0`.
293    CatalogNz(RawBytes<'a, U8Bytes>),
294    LcTimeNames(u16),
295    CharsetDatabase(u16),
296    TableMapForUpdate(u64),
297    MasterDataWritten([u8; 4]),
298    Invoker {
299        username: RawBytes<'a, U8Bytes>,
300        hostname: RawBytes<'a, U8Bytes>,
301    },
302    UpdatedDbNames(Vec<RawBytes<'a, NullBytes>>),
303    Microseconds(u32),
304    /// Ignored.
305    CommitTs(&'a [u8]),
306    /// Ignored.
307    CommitTs2(&'a [u8]),
308    /// `0` is interpreted as `false` and everything else as `true`.
309    ExplicitDefaultsForTimestamp(bool),
310    DdlLoggedWithXid(u64),
311    DefaultCollationForUtf8mb4(u16),
312    SqlRequirePrimaryKey(u8),
313    DefaultTableEncryption(u8),
314}
315
316/// Raw status variable.
317#[derive(Clone, Eq, PartialEq, Hash)]
318pub struct StatusVar<'a> {
319    /// Status variable key.
320    key: StatusVarKey,
321    /// Raw value of a status variable. Use `Self::get_value`.
322    value: &'a [u8],
323}
324
325impl StatusVar<'_> {
326    /// Returns parsed value of this status variable, or raw value in case of error.
327    pub fn get_value(&self) -> Result<StatusVarVal, &[u8]> {
328        match self.key {
329            StatusVarKey::Flags2 => {
330                let mut read = self.value;
331                read.read_u32::<LittleEndian>()
332                    .map(RawFlags::new)
333                    .map(StatusVarVal::Flags2)
334                    .map_err(|_| self.value)
335            }
336            StatusVarKey::SqlMode => {
337                let mut read = self.value;
338                read.read_u64::<LittleEndian>()
339                    .map(RawFlags::new)
340                    .map(StatusVarVal::SqlMode)
341                    .map_err(|_| self.value)
342            }
343            StatusVarKey::Catalog => Ok(StatusVarVal::Catalog(self.value)),
344            StatusVarKey::AutoIncrement => {
345                let mut read = self.value;
346                let increment = read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
347                let offset = read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
348                Ok(StatusVarVal::AutoIncrement { increment, offset })
349            }
350            StatusVarKey::Charset => {
351                let mut read = self.value;
352                let charset_client = read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
353                let collation_connection =
354                    read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
355                let collation_server = read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
356                Ok(StatusVarVal::Charset {
357                    charset_client,
358                    collation_connection,
359                    collation_server,
360                })
361            }
362            StatusVarKey::TimeZone => {
363                let mut read = self.value;
364                let len = read.read_u8().map_err(|_| self.value)? as usize;
365                let text = read.get(..len).ok_or(self.value)?;
366                Ok(StatusVarVal::TimeZone(RawBytes::new(text)))
367            }
368            StatusVarKey::CatalogNz => {
369                let mut read = self.value;
370                let len = read.read_u8().map_err(|_| self.value)? as usize;
371                let text = read.get(..len).ok_or(self.value)?;
372                Ok(StatusVarVal::CatalogNz(RawBytes::new(text)))
373            }
374            StatusVarKey::LcTimeNames => {
375                let mut read = self.value;
376                let val = read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
377                Ok(StatusVarVal::LcTimeNames(val))
378            }
379            StatusVarKey::CharsetDatabase => {
380                let mut read = self.value;
381                let val = read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
382                Ok(StatusVarVal::CharsetDatabase(val))
383            }
384            StatusVarKey::TableMapForUpdate => {
385                let mut read = self.value;
386                let val = read.read_u64::<LittleEndian>().map_err(|_| self.value)?;
387                Ok(StatusVarVal::TableMapForUpdate(val))
388            }
389            StatusVarKey::MasterDataWritten => {
390                let mut read = self.value;
391                let mut val = [0u8; 4];
392                read.read_exact(&mut val).map_err(|_| self.value)?;
393                Ok(StatusVarVal::MasterDataWritten(val))
394            }
395            StatusVarKey::Invoker => {
396                let mut read = self.value;
397
398                let len = read.read_u8().map_err(|_| self.value)? as usize;
399                let username = read.get(..len).ok_or(self.value)?;
400                read = &read[len..];
401
402                let len = read.read_u8().map_err(|_| self.value)? as usize;
403                let hostname = read.get(..len).ok_or(self.value)?;
404
405                Ok(StatusVarVal::Invoker {
406                    username: RawBytes::new(username),
407                    hostname: RawBytes::new(hostname),
408                })
409            }
410            StatusVarKey::UpdatedDbNames => {
411                let mut read = self.value;
412                let count = read.read_u8().map_err(|_| self.value)? as usize;
413                let mut names = Vec::with_capacity(count);
414
415                for _ in 0..count {
416                    let index = read.iter().position(|x| *x == 0).ok_or(self.value)?;
417                    names.push(RawBytes::new(&read[..index]));
418                    read = &read[index..];
419                }
420
421                Ok(StatusVarVal::UpdatedDbNames(names))
422            }
423            StatusVarKey::Microseconds => {
424                let mut read = self.value;
425                let val = read.read_u32::<LittleEndian>().map_err(|_| self.value)?;
426                Ok(StatusVarVal::Microseconds(val))
427            }
428            StatusVarKey::CommitTs => Ok(StatusVarVal::CommitTs(self.value)),
429            StatusVarKey::CommitTs2 => Ok(StatusVarVal::CommitTs2(self.value)),
430            StatusVarKey::ExplicitDefaultsForTimestamp => {
431                let mut read = self.value;
432                let val = read.read_u8().map_err(|_| self.value)?;
433                Ok(StatusVarVal::ExplicitDefaultsForTimestamp(val != 0))
434            }
435            StatusVarKey::DdlLoggedWithXid => {
436                let mut read = self.value;
437                let val = read.read_u64::<LittleEndian>().map_err(|_| self.value)?;
438                Ok(StatusVarVal::DdlLoggedWithXid(val))
439            }
440            StatusVarKey::DefaultCollationForUtf8mb4 => {
441                let mut read = self.value;
442                let val = read.read_u16::<LittleEndian>().map_err(|_| self.value)?;
443                Ok(StatusVarVal::DefaultCollationForUtf8mb4(val))
444            }
445            StatusVarKey::SqlRequirePrimaryKey => {
446                let mut read = self.value;
447                let val = read.read_u8().map_err(|_| self.value)?;
448                Ok(StatusVarVal::SqlRequirePrimaryKey(val))
449            }
450            StatusVarKey::DefaultTableEncryption => {
451                let mut read = self.value;
452                let val = read.read_u8().map_err(|_| self.value)?;
453                Ok(StatusVarVal::DefaultTableEncryption(val))
454            }
455        }
456    }
457}
458
459impl fmt::Debug for StatusVar<'_> {
460    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461        f.debug_struct("StatusVar")
462            .field("key", &self.key)
463            .field("value", &self.get_value())
464            .finish()
465    }
466}
467
468/// Status variables of a QueryEvent.
469#[derive(Clone, Eq, PartialEq, Hash)]
470pub struct StatusVars<'a>(pub RawBytes<'a, BareU16Bytes>);
471
472impl<'a> StatusVars<'a> {
473    /// Returns an iterator over QueryEvent status variables.
474    pub fn iter(&'a self) -> StatusVarsIterator<'a> {
475        StatusVarsIterator::new(self.0.as_bytes())
476    }
477
478    /// Returns raw value of a status variable by key.
479    pub fn get_status_var(&'a self, needle: StatusVarKey) -> Option<StatusVar<'a>> {
480        self.iter()
481            .find_map(|var| if var.key == needle { Some(var) } else { None })
482    }
483
484    pub fn into_owned(self) -> StatusVars<'static> {
485        StatusVars(self.0.into_owned())
486    }
487}
488
489impl<'de> MyDeserialize<'de> for StatusVars<'de> {
490    const SIZE: Option<usize> = None;
491    type Ctx = u16;
492
493    fn deserialize(len: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
494        Ok(Self(buf.parse(len as usize)?))
495    }
496}
497
498impl MySerialize for StatusVars<'_> {
499    fn serialize(&self, buf: &mut Vec<u8>) {
500        self.0.serialize(buf);
501    }
502}
503
504impl fmt::Debug for StatusVars<'_> {
505    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
506        self.iter().fmt(f)
507    }
508}
509
510/// Iterator over status vars of a `QueryEvent`.
511///
512/// It will stop iteration if vars can't be parsed.
513#[derive(Clone, Eq, PartialEq, Hash)]
514pub struct StatusVarsIterator<'a> {
515    pos: usize,
516    status_vars: &'a [u8],
517}
518
519impl<'a> StatusVarsIterator<'a> {
520    /// Creates new instance.
521    pub fn new(status_vars: &'a [u8]) -> StatusVarsIterator<'a> {
522        Self {
523            pos: 0,
524            status_vars,
525        }
526    }
527}
528
529impl fmt::Debug for StatusVarsIterator<'_> {
530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531        f.debug_list().entries(self.clone()).finish()
532    }
533}
534
535impl<'a> Iterator for StatusVarsIterator<'a> {
536    type Item = StatusVar<'a>;
537
538    fn next(&mut self) -> Option<Self::Item> {
539        let key = *self.status_vars.get(self.pos)?;
540        let key = StatusVarKey::try_from(key).ok()?;
541        self.pos += 1;
542
543        macro_rules! get_fixed {
544            ($len:expr) => {{
545                self.pos += $len;
546                self.status_vars.get((self.pos - $len)..self.pos)?
547            }};
548        }
549
550        macro_rules! get_var {
551            ($suffix_len:expr) => {{
552                let len = *self.status_vars.get(self.pos)? as usize;
553                get_fixed!(1 + len + $suffix_len)
554            }};
555        }
556
557        let value = match key {
558            StatusVarKey::Flags2 => get_fixed!(4),
559            StatusVarKey::SqlMode => get_fixed!(8),
560            StatusVarKey::Catalog => get_var!(1),
561            StatusVarKey::AutoIncrement => get_fixed!(4),
562            StatusVarKey::Charset => get_fixed!(6),
563            StatusVarKey::TimeZone => get_var!(0),
564            StatusVarKey::CatalogNz => get_var!(0),
565            StatusVarKey::LcTimeNames => get_fixed!(2),
566            StatusVarKey::CharsetDatabase => get_fixed!(2),
567            StatusVarKey::TableMapForUpdate => get_fixed!(8),
568            StatusVarKey::MasterDataWritten => get_fixed!(4),
569            StatusVarKey::Invoker => {
570                let user_len = *self.status_vars.get(self.pos)? as usize;
571                let host_len = *self.status_vars.get(self.pos + 1 + user_len)? as usize;
572                get_fixed!(1 + user_len + 1 + host_len)
573            }
574            StatusVarKey::UpdatedDbNames => {
575                let mut total = 1;
576                let count = *self.status_vars.get(self.pos)? as usize;
577                for _ in 0..count {
578                    while *self.status_vars.get(self.pos + total)? != 0x00 {
579                        total += 1;
580                    }
581                    total += 1;
582                }
583                get_fixed!(total)
584            }
585            StatusVarKey::Microseconds => get_fixed!(3),
586            StatusVarKey::CommitTs => get_fixed!(0),
587            StatusVarKey::CommitTs2 => get_fixed!(0),
588            StatusVarKey::ExplicitDefaultsForTimestamp => get_fixed!(1),
589            StatusVarKey::DdlLoggedWithXid => get_fixed!(8),
590            StatusVarKey::DefaultCollationForUtf8mb4 => get_fixed!(2),
591            StatusVarKey::SqlRequirePrimaryKey => get_fixed!(1),
592            StatusVarKey::DefaultTableEncryption => get_fixed!(1),
593        };
594
595        Some(StatusVar { key, value })
596    }
597}