mysql_common/binlog/events/
execute_load_query_event.rs1use std::{borrow::Cow, cmp::min, io};
10
11use saturating::Saturating as S;
12
13use crate::{
14 binlog::{
15 BinlogCtx, BinlogEvent, BinlogStruct,
16 consts::{BinlogVersion, EventType, LoadDuplicateHandling},
17 },
18 io::ParseBuf,
19 misc::raw::{
20 Const, RawBytes, RawInt, Skip,
21 bytes::{BareU8Bytes, EofBytes},
22 int::*,
23 },
24 proto::{MyDeserialize, MySerialize},
25};
26
27use super::{BinlogEventHeader, StatusVars};
28
29#[derive(Debug, Clone, Eq, PartialEq, Hash)]
36pub struct ExecuteLoadQueryEvent<'a> {
37 thread_id: RawInt<LeU32>,
39 execution_time: RawInt<LeU32>,
40 schema_len: RawInt<u8>,
41 error_code: RawInt<LeU16>,
42 status_vars_len: RawInt<LeU16>,
43
44 file_id: RawInt<LeU32>,
47 start_pos: RawInt<LeU32>,
49 end_pos: RawInt<LeU32>,
51 dup_handling: Const<LoadDuplicateHandling, u8>,
53
54 status_vars: StatusVars<'a>,
55 schema: RawBytes<'a, BareU8Bytes>,
56 __skip: Skip<1>,
57 query: RawBytes<'a, EofBytes>,
58}
59
60impl<'a> ExecuteLoadQueryEvent<'a> {
61 pub fn new(
63 file_id: u32,
64 dup_handling: LoadDuplicateHandling,
65 status_vars: impl Into<Cow<'a, [u8]>>,
66 schema: impl Into<Cow<'a, [u8]>>,
67 ) -> Self {
68 let status_vars = StatusVars(RawBytes::new(status_vars));
69 let schema = RawBytes::new(schema);
70 Self {
71 thread_id: Default::default(),
72 execution_time: Default::default(),
73 schema_len: RawInt::new(schema.len() as u8),
74 error_code: Default::default(),
75 status_vars_len: RawInt::new(status_vars.0.len() as u16),
76 file_id: RawInt::new(file_id),
77 start_pos: Default::default(),
78 end_pos: Default::default(),
79 dup_handling: Const::new(dup_handling),
80 status_vars,
81 schema,
82 __skip: Default::default(),
83 query: Default::default(),
84 }
85 }
86
87 pub fn with_thread_id(mut self, thread_id: u32) -> Self {
89 self.thread_id = RawInt::new(thread_id);
90 self
91 }
92
93 pub fn with_execution_time(mut self, execution_time: u32) -> Self {
95 self.execution_time = RawInt::new(execution_time);
96 self
97 }
98
99 pub fn with_error_code(mut self, error_code: u16) -> Self {
101 self.error_code = RawInt::new(error_code);
102 self
103 }
104
105 pub fn with_file_id(mut self, file_id: u32) -> Self {
107 self.file_id = RawInt::new(file_id);
108 self
109 }
110
111 pub fn with_start_pos(mut self, start_pos: u32) -> Self {
113 self.start_pos = RawInt::new(start_pos);
114 self
115 }
116
117 pub fn with_end_pos(mut self, end_pos: u32) -> Self {
119 self.end_pos = RawInt::new(end_pos);
120 self
121 }
122
123 pub fn with_dup_handling(mut self, dup_handling: LoadDuplicateHandling) -> Self {
125 self.dup_handling = Const::new(dup_handling);
126 self
127 }
128
129 pub fn with_status_vars(mut self, status_vars: impl Into<Cow<'a, [u8]>>) -> Self {
131 self.status_vars = StatusVars(RawBytes::new(status_vars));
132 self.status_vars_len.0 = self.status_vars.0.len() as u16;
133 self
134 }
135
136 pub fn with_schema(mut self, schema: impl Into<Cow<'a, [u8]>>) -> Self {
138 self.schema = RawBytes::new(schema);
139 self.schema_len.0 = self.schema.len() as u8;
140 self
141 }
142
143 pub fn with_query(mut self, query: impl Into<Cow<'a, [u8]>>) -> Self {
145 self.query = RawBytes::new(query);
146 self
147 }
148
149 pub fn thread_id(&self) -> u32 {
154 self.thread_id.0
155 }
156
157 pub fn execution_time(&self) -> u32 {
162 self.execution_time.0
163 }
164
165 pub fn error_code(&self) -> u16 {
170 self.error_code.0
171 }
172
173 pub fn file_id(&self) -> u32 {
177 self.file_id.0
178 }
179
180 pub fn start_pos(&self) -> u32 {
184 self.start_pos.0
185 }
186
187 pub fn end_pos(&self) -> u32 {
191 self.end_pos.0
192 }
193
194 pub fn dup_handling(&self) -> LoadDuplicateHandling {
198 self.dup_handling.0
199 }
200
201 pub fn status_vars_raw(&'a self) -> &'a [u8] {
206 self.status_vars.0.as_bytes()
207 }
208
209 pub fn status_vars(&'a self) -> &'a StatusVars<'a> {
211 &self.status_vars
212 }
213
214 pub fn schema_raw(&'a self) -> &'a [u8] {
218 self.schema.as_bytes()
219 }
220
221 pub fn schema(&'a self) -> Cow<'a, str> {
223 self.schema.as_str()
224 }
225
226 pub fn query_raw(&'a self) -> &'a [u8] {
230 self.query.as_bytes()
231 }
232
233 pub fn query(&'a self) -> Cow<'a, str> {
235 self.query.as_str()
236 }
237
238 pub fn into_owned(self) -> ExecuteLoadQueryEvent<'static> {
239 ExecuteLoadQueryEvent {
240 thread_id: self.thread_id,
241 execution_time: self.execution_time,
242 schema_len: self.schema_len,
243 error_code: self.error_code,
244 status_vars_len: self.status_vars_len,
245 file_id: self.file_id,
246 start_pos: self.start_pos,
247 end_pos: self.end_pos,
248 dup_handling: self.dup_handling,
249 status_vars: self.status_vars.into_owned(),
250 schema: self.schema.into_owned(),
251 __skip: self.__skip,
252 query: self.query.into_owned(),
253 }
254 }
255}
256
257impl<'de> MyDeserialize<'de> for ExecuteLoadQueryEvent<'de> {
258 const SIZE: Option<usize> = None;
259 type Ctx = BinlogCtx<'de>;
260
261 fn deserialize(_: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
262 let mut sbuf: ParseBuf<'_> = buf.parse(26)?;
263
264 let thread_id = sbuf.parse_unchecked(())?;
265 let execution_time = sbuf.parse_unchecked(())?;
266 let schema_len: RawInt<u8> = sbuf.parse_unchecked(())?;
267 let error_code = sbuf.parse_unchecked(())?;
268 let status_vars_len: RawInt<LeU16> = sbuf.parse_unchecked(())?;
269 let file_id = sbuf.parse_unchecked(())?;
270 let start_pos = sbuf.parse_unchecked(())?;
271 let end_pos = sbuf.parse_unchecked(())?;
272 let dup_handling = sbuf.parse_unchecked(())?;
273
274 let status_vars = buf.parse(*status_vars_len)?;
275 let schema = buf.parse(*schema_len as usize)?;
276 let __skip = buf.parse(())?;
277 let query = buf.parse(())?;
278
279 Ok(Self {
280 thread_id,
281 execution_time,
282 schema_len,
283 error_code,
284 status_vars_len,
285 file_id,
286 start_pos,
287 end_pos,
288 dup_handling,
289 status_vars,
290 schema,
291 __skip,
292 query,
293 })
294 }
295}
296
297impl MySerialize for ExecuteLoadQueryEvent<'_> {
298 fn serialize(&self, buf: &mut Vec<u8>) {
299 self.thread_id.serialize(&mut *buf);
300 self.execution_time.serialize(&mut *buf);
301 self.schema_len.serialize(&mut *buf);
302 self.error_code.serialize(&mut *buf);
303 self.status_vars_len.serialize(&mut *buf);
304 self.file_id.serialize(&mut *buf);
305 self.start_pos.serialize(&mut *buf);
306 self.end_pos.serialize(&mut *buf);
307 self.dup_handling.serialize(&mut *buf);
308 self.status_vars.serialize(&mut *buf);
309 self.schema.serialize(&mut *buf);
310 self.__skip.serialize(&mut *buf);
311 self.query.serialize(&mut *buf);
312 }
313}
314
315impl<'a> BinlogStruct<'a> for ExecuteLoadQueryEvent<'a> {
316 fn len(&self, _version: BinlogVersion) -> usize {
317 let mut len = S(0);
318
319 len += S(4); len += S(4); len += S(1); len += S(2); len += S(2); len += S(4); len += S(4); len += S(4); len += S(1); len += S(min(self.status_vars.0.len(), u16::MAX as usize - 13)); len += S(min(self.schema.0.len(), u8::MAX as usize)); len += S(1); len += S(self.query.0.len());
332
333 min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
334 }
335}
336
337impl<'a> BinlogEvent<'a> for ExecuteLoadQueryEvent<'a> {
338 const EVENT_TYPE: EventType = EventType::EXECUTE_LOAD_QUERY_EVENT;
339}