mysql_common/binlog/events/
format_description_event.rs1use std::{borrow::Cow, cmp::min, io};
10
11use saturating::Saturating as S;
12
13use crate::{
14 binlog::{
15 BinlogCtx, BinlogEvent, BinlogStruct,
16 consts::{BinlogVersion, EventType},
17 },
18 io::ParseBuf,
19 misc::raw::{
20 Const, RawBytes, RawInt,
21 bytes::{EofBytes, FixedLengthText},
22 int::{ConstU8, LeU16, LeU32},
23 },
24 proto::{MyDeserialize, MySerialize},
25};
26
27use super::{BinlogEventFooter, BinlogEventHeader};
28
29pub const SERVER_VER_LEN: usize = 50;
31
32define_const!(
33 ConstU8,
34 EventHeaderLength,
35 InvalidEventHeaderLength("Invalid event_header_length value for format description event"),
36 19
37);
38
39#[derive(Debug, Clone, Eq, PartialEq, Hash)]
43pub struct FormatDescriptionEvent<'a> {
44 binlog_version: Const<BinlogVersion, LeU16>,
46 server_version: RawBytes<'a, FixedLengthText<{ SERVER_VER_LEN }>>,
50 create_timestamp: RawInt<LeU32>,
52 event_header_length: EventHeaderLength,
54 event_type_header_lengths: RawBytes<'a, EofBytes>,
59 footer: BinlogEventFooter,
65}
66
67impl<'a> FormatDescriptionEvent<'a> {
68 pub const SERVER_VER_LEN: usize = SERVER_VER_LEN;
70 pub const SERVER_VER_OFFSET: usize = 2;
72
73 pub const QUERY_HEADER_MINIMAL_LEN: usize = (4 + 4 + 1 + 2);
76 pub const QUERY_HEADER_LEN: usize = Self::QUERY_HEADER_MINIMAL_LEN + 2;
78 pub const STOP_HEADER_LEN: usize = 0;
80 pub const START_V3_HEADER_LEN: usize = 2 + Self::SERVER_VER_LEN + 4;
82 pub const ROTATE_HEADER_LEN: usize = 8;
84 pub const INTVAR_HEADER_LEN: usize = 0;
86 pub const APPEND_BLOCK_HEADER_LEN: usize = 4;
88 pub const DELETE_FILE_HEADER_LEN: usize = 4;
90 pub const RAND_HEADER_LEN: usize = 0;
92 pub const USER_VAR_HEADER_LEN: usize = 0;
94 pub const FORMAT_DESCRIPTION_HEADER_LEN: usize =
96 (Self::START_V3_HEADER_LEN + EventType::ENUM_END_EVENT as usize);
97 pub const XID_HEADER_LEN: usize = 0;
99 pub const BEGIN_LOAD_QUERY_HEADER_LEN: usize = Self::APPEND_BLOCK_HEADER_LEN;
101 pub const ROWS_HEADER_LEN_V1: usize = 8;
103 pub const TABLE_MAP_HEADER_LEN: usize = 8;
105 pub const EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN: usize = (4 + 4 + 4 + 1);
107 pub const EXECUTE_LOAD_QUERY_HEADER_LEN: usize =
109 (Self::QUERY_HEADER_LEN + Self::EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
110 pub const INCIDENT_HEADER_LEN: usize = 2;
112 pub const HEARTBEAT_HEADER_LEN: usize = 0;
114 pub const IGNORABLE_HEADER_LEN: usize = 0;
116 pub const ROWS_HEADER_LEN_V2: usize = 10;
118 pub const GTID_HEADER_LEN: usize = 42;
120 pub const TRANSACTION_CONTEXT_HEADER_LEN: usize = 18;
122 pub const VIEW_CHANGE_HEADER_LEN: usize = 52;
124 pub const XA_PREPARE_HEADER_LEN: usize = 0;
126 pub const TRANSACTION_PAYLOAD_HEADER_LEN: usize = 0;
128
129 pub fn new(binlog_version: BinlogVersion) -> Self {
131 Self {
132 binlog_version: Const::new(binlog_version),
133 server_version: Default::default(),
134 create_timestamp: Default::default(),
135 event_header_length: Default::default(),
136 event_type_header_lengths: Default::default(),
137 footer: Default::default(),
138 }
139 }
140
141 pub fn with_binlog_version(mut self, binlog_version: BinlogVersion) -> Self {
143 self.binlog_version = Const::new(binlog_version);
144 self
145 }
146
147 pub fn with_server_version(mut self, server_version: impl Into<Cow<'a, [u8]>>) -> Self {
149 self.server_version = RawBytes::new(server_version);
150 self
151 }
152
153 pub fn with_create_timestamp(mut self, create_timestamp: u32) -> Self {
155 self.create_timestamp = RawInt::new(create_timestamp);
156 self
157 }
158
159 pub fn with_event_type_header_lengths(
161 mut self,
162 event_type_header_lengths: impl Into<Cow<'a, [u8]>>,
163 ) -> Self {
164 self.event_type_header_lengths = RawBytes::new(event_type_header_lengths);
165 self
166 }
167
168 pub fn with_footer(mut self, footer: BinlogEventFooter) -> Self {
170 self.footer = footer;
171 self
172 }
173
174 pub fn binlog_version(&self) -> BinlogVersion {
176 self.binlog_version.0
177 }
178
179 pub fn server_version_raw(&'a self) -> &'a [u8] {
183 self.server_version.as_bytes()
184 }
185
186 pub fn server_version(&'a self) -> Cow<'a, str> {
188 self.server_version.as_str()
189 }
190
191 pub fn create_timestamp(&self) -> u32 {
196 self.create_timestamp.0
197 }
198
199 pub fn event_header_length(&self) -> u8 {
204 self.event_header_length.value()
205 }
206
207 pub fn event_type_header_lengths(&'a self) -> &'a [u8] {
212 self.event_type_header_lengths.as_bytes()
213 }
214
215 pub fn footer(&self) -> BinlogEventFooter {
217 self.footer
218 }
219
220 pub(crate) fn footer_mut(&mut self) -> &mut BinlogEventFooter {
221 &mut self.footer
222 }
223
224 pub fn split_version(&self) -> (u8, u8, u8) {
226 crate::misc::split_version(&self.server_version.0)
227 }
228
229 pub fn get_event_type_header_length(&self, event_type: EventType) -> u8 {
231 if event_type == EventType::UNKNOWN_EVENT {
232 return 0;
233 }
234
235 self.event_type_header_lengths
236 .as_bytes()
237 .get(usize::from(event_type as u8).saturating_sub(1))
238 .copied()
239 .unwrap_or_else(|| match event_type {
240 EventType::UNKNOWN_EVENT => 0,
241 EventType::START_EVENT_V3 => Self::START_V3_HEADER_LEN,
242 EventType::QUERY_EVENT => Self::QUERY_HEADER_LEN,
243 EventType::STOP_EVENT => Self::STOP_HEADER_LEN,
244 EventType::ROTATE_EVENT => Self::ROTATE_HEADER_LEN,
245 EventType::INTVAR_EVENT => Self::INTVAR_HEADER_LEN,
246 EventType::LOAD_EVENT => 0,
247 EventType::SLAVE_EVENT => 0,
248 EventType::CREATE_FILE_EVENT => 0,
249 EventType::APPEND_BLOCK_EVENT => Self::APPEND_BLOCK_HEADER_LEN,
250 EventType::EXEC_LOAD_EVENT => 0,
251 EventType::DELETE_FILE_EVENT => Self::DELETE_FILE_HEADER_LEN,
252 EventType::NEW_LOAD_EVENT => 0,
253 EventType::RAND_EVENT => Self::RAND_HEADER_LEN,
254 EventType::USER_VAR_EVENT => Self::USER_VAR_HEADER_LEN,
255 EventType::FORMAT_DESCRIPTION_EVENT => Self::FORMAT_DESCRIPTION_HEADER_LEN,
256 EventType::XID_EVENT => Self::XID_HEADER_LEN,
257 EventType::BEGIN_LOAD_QUERY_EVENT => Self::BEGIN_LOAD_QUERY_HEADER_LEN,
258 EventType::EXECUTE_LOAD_QUERY_EVENT => Self::EXECUTE_LOAD_QUERY_HEADER_LEN,
259 EventType::TABLE_MAP_EVENT => Self::TABLE_MAP_HEADER_LEN,
260 EventType::PRE_GA_WRITE_ROWS_EVENT => 0,
261 EventType::PRE_GA_UPDATE_ROWS_EVENT => 0,
262 EventType::PRE_GA_DELETE_ROWS_EVENT => 0,
263 EventType::WRITE_ROWS_EVENT_V1 => Self::ROWS_HEADER_LEN_V1,
264 EventType::UPDATE_ROWS_EVENT_V1 => Self::ROWS_HEADER_LEN_V1,
265 EventType::DELETE_ROWS_EVENT_V1 => Self::ROWS_HEADER_LEN_V1,
266 EventType::INCIDENT_EVENT => Self::INCIDENT_HEADER_LEN,
267 EventType::HEARTBEAT_EVENT => 0,
268 EventType::IGNORABLE_EVENT => Self::IGNORABLE_HEADER_LEN,
269 EventType::ROWS_QUERY_EVENT => Self::IGNORABLE_HEADER_LEN,
270 EventType::WRITE_ROWS_EVENT => Self::ROWS_HEADER_LEN_V2,
271 EventType::UPDATE_ROWS_EVENT => Self::ROWS_HEADER_LEN_V2,
272 EventType::DELETE_ROWS_EVENT => Self::ROWS_HEADER_LEN_V2,
273 EventType::GTID_EVENT => Self::GTID_HEADER_LEN,
274 EventType::ANONYMOUS_GTID_EVENT => Self::GTID_HEADER_LEN,
275 EventType::PREVIOUS_GTIDS_EVENT => Self::IGNORABLE_HEADER_LEN,
276 EventType::TRANSACTION_CONTEXT_EVENT => Self::TRANSACTION_CONTEXT_HEADER_LEN,
277 EventType::VIEW_CHANGE_EVENT => Self::VIEW_CHANGE_HEADER_LEN,
278 EventType::XA_PREPARE_LOG_EVENT => Self::XA_PREPARE_HEADER_LEN,
279 EventType::PARTIAL_UPDATE_ROWS_EVENT => Self::ROWS_HEADER_LEN_V2,
280 EventType::TRANSACTION_PAYLOAD_EVENT => Self::TRANSACTION_PAYLOAD_HEADER_LEN,
281 EventType::ENUM_END_EVENT => 0,
282 } as u8)
283 }
284
285 pub fn into_owned(self) -> FormatDescriptionEvent<'static> {
287 FormatDescriptionEvent {
288 binlog_version: self.binlog_version,
289 server_version: self.server_version.into_owned(),
290 create_timestamp: self.create_timestamp,
291 event_header_length: self.event_header_length,
292 event_type_header_lengths: self.event_type_header_lengths.into_owned(),
293 footer: self.footer,
294 }
295 }
296}
297
298impl<'de> MyDeserialize<'de> for FormatDescriptionEvent<'de> {
299 const SIZE: Option<usize> = None;
300 type Ctx = BinlogCtx<'de>;
301
302 fn deserialize(_ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
303 let mut sbuf: ParseBuf<'_> = buf.parse(57)?;
304 let binlog_version = sbuf.parse_unchecked(())?;
305 let server_version = sbuf.parse_unchecked(())?;
306 let create_timestamp = sbuf.parse_unchecked(())?;
307 let event_header_length = sbuf.parse_unchecked(())?;
308
309 let event_type_header_lengths = buf.parse(())?;
310
311 Ok(Self {
312 binlog_version,
313 server_version,
314 create_timestamp,
315 event_header_length,
316 event_type_header_lengths,
317 footer: Default::default(),
318 })
319 }
320}
321
322impl MySerialize for FormatDescriptionEvent<'_> {
323 fn serialize(&self, buf: &mut Vec<u8>) {
324 self.binlog_version.serialize(&mut *buf);
325 self.server_version.serialize(&mut *buf);
326 self.create_timestamp.serialize(&mut *buf);
327 self.event_header_length.serialize(&mut *buf);
328 self.event_type_header_lengths.serialize(&mut *buf);
329 }
330}
331
332impl<'a> BinlogStruct<'a> for FormatDescriptionEvent<'a> {
333 fn len(&self, _version: BinlogVersion) -> usize {
334 let mut len = S(0);
335
336 len += S(2);
337 len += S(Self::SERVER_VER_LEN);
338 len += S(4);
339 len += S(1);
340 len += S(self.event_type_header_lengths.len());
341
342 min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
343 }
344}
345
346impl<'a> BinlogEvent<'a> for FormatDescriptionEvent<'a> {
347 const EVENT_TYPE: EventType = EventType::FORMAT_DESCRIPTION_EVENT;
348}