mysql_common/binlog/events/
execute_load_query_event.rs

1// Copyright (c) 2021 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use std::{borrow::Cow, cmp::min, io};
10
11use saturating::Saturating as S;
12
13use crate::{
14    binlog::{
15        BinlogCtx, BinlogEvent, BinlogStruct,
16        consts::{BinlogVersion, EventType, LoadDuplicateHandling},
17    },
18    io::ParseBuf,
19    misc::raw::{
20        Const, RawBytes, RawInt, Skip,
21        bytes::{BareU8Bytes, EofBytes},
22        int::*,
23    },
24    proto::{MyDeserialize, MySerialize},
25};
26
27use super::{BinlogEventHeader, StatusVars};
28
29/// Execute load query event.
30///
31/// Used for LOAD DATA INFILE statements as of MySQL 5.0.
32///
33/// It similar to Query_log_event but before executing the query it substitutes original filename
34/// in LOAD DATA query with name of temporary file.
35#[derive(Debug, Clone, Eq, PartialEq, Hash)]
36pub struct ExecuteLoadQueryEvent<'a> {
37    // post-header
38    thread_id: RawInt<LeU32>,
39    execution_time: RawInt<LeU32>,
40    schema_len: RawInt<u8>,
41    error_code: RawInt<LeU16>,
42    status_vars_len: RawInt<LeU16>,
43
44    // payload
45    /// File_id of a temporary file.
46    file_id: RawInt<LeU32>,
47    /// Pointer to the part of the query that should be substituted.
48    start_pos: RawInt<LeU32>,
49    /// Pointer to the end of this part of query
50    end_pos: RawInt<LeU32>,
51    /// How to handle duplicates.
52    dup_handling: Const<LoadDuplicateHandling, u8>,
53
54    status_vars: StatusVars<'a>,
55    schema: RawBytes<'a, BareU8Bytes>,
56    __skip: Skip<1>,
57    query: RawBytes<'a, EofBytes>,
58}
59
60impl<'a> ExecuteLoadQueryEvent<'a> {
61    /// Creates a new instance.
62    pub fn new(
63        file_id: u32,
64        dup_handling: LoadDuplicateHandling,
65        status_vars: impl Into<Cow<'a, [u8]>>,
66        schema: impl Into<Cow<'a, [u8]>>,
67    ) -> Self {
68        let status_vars = StatusVars(RawBytes::new(status_vars));
69        let schema = RawBytes::new(schema);
70        Self {
71            thread_id: Default::default(),
72            execution_time: Default::default(),
73            schema_len: RawInt::new(schema.len() as u8),
74            error_code: Default::default(),
75            status_vars_len: RawInt::new(status_vars.0.len() as u16),
76            file_id: RawInt::new(file_id),
77            start_pos: Default::default(),
78            end_pos: Default::default(),
79            dup_handling: Const::new(dup_handling),
80            status_vars,
81            schema,
82            __skip: Default::default(),
83            query: Default::default(),
84        }
85    }
86
87    /// Sets the `thread_id` value.
88    pub fn with_thread_id(mut self, thread_id: u32) -> Self {
89        self.thread_id = RawInt::new(thread_id);
90        self
91    }
92
93    /// Sets the `execution_time` value.
94    pub fn with_execution_time(mut self, execution_time: u32) -> Self {
95        self.execution_time = RawInt::new(execution_time);
96        self
97    }
98
99    /// Sets the `error_code` value.
100    pub fn with_error_code(mut self, error_code: u16) -> Self {
101        self.error_code = RawInt::new(error_code);
102        self
103    }
104
105    /// Sets the `file_id` value.
106    pub fn with_file_id(mut self, file_id: u32) -> Self {
107        self.file_id = RawInt::new(file_id);
108        self
109    }
110
111    /// Sets the `start_pos` value.
112    pub fn with_start_pos(mut self, start_pos: u32) -> Self {
113        self.start_pos = RawInt::new(start_pos);
114        self
115    }
116
117    /// Sets the `end_pos` value.
118    pub fn with_end_pos(mut self, end_pos: u32) -> Self {
119        self.end_pos = RawInt::new(end_pos);
120        self
121    }
122
123    /// Sets the `dup_handling` value.
124    pub fn with_dup_handling(mut self, dup_handling: LoadDuplicateHandling) -> Self {
125        self.dup_handling = Const::new(dup_handling);
126        self
127    }
128
129    /// Sets the `status_vars` value (max length is `u16::MAX).
130    pub fn with_status_vars(mut self, status_vars: impl Into<Cow<'a, [u8]>>) -> Self {
131        self.status_vars = StatusVars(RawBytes::new(status_vars));
132        self.status_vars_len.0 = self.status_vars.0.len() as u16;
133        self
134    }
135
136    /// Sets the `schema` value (max length is `u8::MAX).
137    pub fn with_schema(mut self, schema: impl Into<Cow<'a, [u8]>>) -> Self {
138        self.schema = RawBytes::new(schema);
139        self.schema_len.0 = self.schema.len() as u8;
140        self
141    }
142
143    /// Sets the `query` value.
144    pub fn with_query(mut self, query: impl Into<Cow<'a, [u8]>>) -> Self {
145        self.query = RawBytes::new(query);
146        self
147    }
148
149    /// Returns the `thread_id` value.
150    ///
151    /// `thread_id` is the ID of the thread that issued this statement.
152    /// It is needed for temporary tables.
153    pub fn thread_id(&self) -> u32 {
154        self.thread_id.0
155    }
156
157    /// Returns the `execution_time` value.
158    ///
159    /// `execution_time` is the time from when the query started to when it was logged
160    /// in the binlog, in seconds.
161    pub fn execution_time(&self) -> u32 {
162        self.execution_time.0
163    }
164
165    /// Returns the `error_code` value.
166    ///
167    /// `error_code` is the error code generated by the master. If the master fails, the slave will
168    /// fail with the same error code.
169    pub fn error_code(&self) -> u16 {
170        self.error_code.0
171    }
172
173    /// Returns the `file_id` value.
174    ///
175    /// `file_id` is the ID of the temporary file to load.
176    pub fn file_id(&self) -> u32 {
177        self.file_id.0
178    }
179
180    /// Returns the `start_pos` value.
181    ///
182    /// `start_pos` is the start position within the statement for filename substitution.
183    pub fn start_pos(&self) -> u32 {
184        self.start_pos.0
185    }
186
187    /// Returns the `end_pos` value.
188    ///
189    /// `end_pos` is the end position within the statement for filename substitution.
190    pub fn end_pos(&self) -> u32 {
191        self.end_pos.0
192    }
193
194    /// Returns the `dup_handling` value.
195    ///
196    /// `dup_handling` represents the information on how to handle duplicates.
197    pub fn dup_handling(&self) -> LoadDuplicateHandling {
198        self.dup_handling.0
199    }
200
201    /// Returns the `status_vars` value.
202    ///
203    /// `status_vars` contains zero or more status variables. Each status variable consists of one
204    /// byte identifying the variable stored, followed by the value of the variable.
205    pub fn status_vars_raw(&'a self) -> &'a [u8] {
206        self.status_vars.0.as_bytes()
207    }
208
209    /// Returns an iterator over status variables.
210    pub fn status_vars(&'a self) -> &'a StatusVars<'a> {
211        &self.status_vars
212    }
213
214    /// Returns the `schema` value.
215    ///
216    /// `schema` is schema name.
217    pub fn schema_raw(&'a self) -> &'a [u8] {
218        self.schema.as_bytes()
219    }
220
221    /// Returns the `schema` value as a string (lossy converted).
222    pub fn schema(&'a self) -> Cow<'a, str> {
223        self.schema.as_str()
224    }
225
226    /// Returns the `query` value.
227    ///
228    /// `query` is the corresponding LOAD DATA INFILE statement.
229    pub fn query_raw(&'a self) -> &'a [u8] {
230        self.query.as_bytes()
231    }
232
233    /// Returns the `query` value as a string (lossy converted).
234    pub fn query(&'a self) -> Cow<'a, str> {
235        self.query.as_str()
236    }
237
238    pub fn into_owned(self) -> ExecuteLoadQueryEvent<'static> {
239        ExecuteLoadQueryEvent {
240            thread_id: self.thread_id,
241            execution_time: self.execution_time,
242            schema_len: self.schema_len,
243            error_code: self.error_code,
244            status_vars_len: self.status_vars_len,
245            file_id: self.file_id,
246            start_pos: self.start_pos,
247            end_pos: self.end_pos,
248            dup_handling: self.dup_handling,
249            status_vars: self.status_vars.into_owned(),
250            schema: self.schema.into_owned(),
251            __skip: self.__skip,
252            query: self.query.into_owned(),
253        }
254    }
255}
256
257impl<'de> MyDeserialize<'de> for ExecuteLoadQueryEvent<'de> {
258    const SIZE: Option<usize> = None;
259    type Ctx = BinlogCtx<'de>;
260
261    fn deserialize(_: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
262        let mut sbuf: ParseBuf<'_> = buf.parse(26)?;
263
264        let thread_id = sbuf.parse_unchecked(())?;
265        let execution_time = sbuf.parse_unchecked(())?;
266        let schema_len: RawInt<u8> = sbuf.parse_unchecked(())?;
267        let error_code = sbuf.parse_unchecked(())?;
268        let status_vars_len: RawInt<LeU16> = sbuf.parse_unchecked(())?;
269        let file_id = sbuf.parse_unchecked(())?;
270        let start_pos = sbuf.parse_unchecked(())?;
271        let end_pos = sbuf.parse_unchecked(())?;
272        let dup_handling = sbuf.parse_unchecked(())?;
273
274        let status_vars = buf.parse(*status_vars_len)?;
275        let schema = buf.parse(*schema_len as usize)?;
276        let __skip = buf.parse(())?;
277        let query = buf.parse(())?;
278
279        Ok(Self {
280            thread_id,
281            execution_time,
282            schema_len,
283            error_code,
284            status_vars_len,
285            file_id,
286            start_pos,
287            end_pos,
288            dup_handling,
289            status_vars,
290            schema,
291            __skip,
292            query,
293        })
294    }
295}
296
297impl MySerialize for ExecuteLoadQueryEvent<'_> {
298    fn serialize(&self, buf: &mut Vec<u8>) {
299        self.thread_id.serialize(&mut *buf);
300        self.execution_time.serialize(&mut *buf);
301        self.schema_len.serialize(&mut *buf);
302        self.error_code.serialize(&mut *buf);
303        self.status_vars_len.serialize(&mut *buf);
304        self.file_id.serialize(&mut *buf);
305        self.start_pos.serialize(&mut *buf);
306        self.end_pos.serialize(&mut *buf);
307        self.dup_handling.serialize(&mut *buf);
308        self.status_vars.serialize(&mut *buf);
309        self.schema.serialize(&mut *buf);
310        self.__skip.serialize(&mut *buf);
311        self.query.serialize(&mut *buf);
312    }
313}
314
315impl<'a> BinlogStruct<'a> for ExecuteLoadQueryEvent<'a> {
316    fn len(&self, _version: BinlogVersion) -> usize {
317        let mut len = S(0);
318
319        len += S(4); // thread_id
320        len += S(4); // query_exec_time
321        len += S(1); // db_len
322        len += S(2); // error_code
323        len += S(2); // status_vars_len
324        len += S(4); // file_id
325        len += S(4); // start_pos
326        len += S(4); // end_pos
327        len += S(1); // dup_handling_flags
328        len += S(min(self.status_vars.0.len(), u16::MAX as usize - 13)); // status_vars
329        len += S(min(self.schema.0.len(), u8::MAX as usize)); // db_len
330        len += S(1); // null-byte
331        len += S(self.query.0.len());
332
333        min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
334    }
335}
336
337impl<'a> BinlogEvent<'a> for ExecuteLoadQueryEvent<'a> {
338    const EVENT_TYPE: EventType = EventType::EXECUTE_LOAD_QUERY_EVENT;
339}