mysql_common/binlog/events/
gtid_event.rs
1use std::{cmp::min, io};
10
11use saturating::Saturating as S;
12
13use crate::{
14 binlog::{
15 consts::{BinlogVersion, EventType, Gno, GtidFlags},
16 BinlogCtx, BinlogEvent, BinlogStruct,
17 },
18 io::ParseBuf,
19 misc::raw::{int::*, RawConst, RawFlags},
20 proto::{MyDeserialize, MySerialize},
21};
22
23use super::BinlogEventHeader;
24
25define_const!(
26 ConstU8,
27 LogicalTimestampTypecode,
28 InvalidLogicalTimestampTypecode("Invalid logical timestamp typecode value for GTID event"),
29 2
30);
31
32#[derive(Debug, Clone, Eq, PartialEq, Hash)]
34pub struct GtidEvent {
35 flags: RawFlags<GtidFlags, u8>,
37 sid: [u8; Self::ENCODED_SID_LENGTH],
39 gno: RawConst<LeU64, Gno>,
45 lc_typecode: Option<LogicalTimestampTypecode>,
49 last_committed: RawInt<LeU64>,
51 sequence_number: RawInt<LeU64>,
55 immediate_commit_timestamp: RawInt<LeU56>,
57 original_commit_timestamp: RawInt<LeU56>,
59 tx_length: RawInt<LenEnc>,
61 original_server_version: RawInt<LeU32>,
63 immediate_server_version: RawInt<LeU32>,
65}
66
67impl GtidEvent {
68 pub const POST_HEADER_LENGTH: usize = 1 + Self::ENCODED_SID_LENGTH + 8 + 1 + 16;
69 pub const ENCODED_SID_LENGTH: usize = 16;
70 pub const LOGICAL_TIMESTAMP_TYPECODE: u8 = 2;
71 pub const IMMEDIATE_COMMIT_TIMESTAMP_LENGTH: usize = 7;
72 pub const ORIGINAL_COMMIT_TIMESTAMP_LENGTH: usize = 7;
73 pub const UNDEFINED_SERVER_VERSION: u32 = 999_999;
74 pub const IMMEDIATE_SERVER_VERSION_LENGTH: usize = 4;
75
76 pub fn new(sid: [u8; Self::ENCODED_SID_LENGTH], gno: u64) -> Self {
77 Self {
78 flags: Default::default(),
79 sid,
80 gno: RawConst::new(gno),
81 lc_typecode: Some(LogicalTimestampTypecode::default()),
82 last_committed: Default::default(),
83 sequence_number: Default::default(),
84 immediate_commit_timestamp: Default::default(),
85 original_commit_timestamp: Default::default(),
86 tx_length: Default::default(),
87 original_server_version: Default::default(),
88 immediate_server_version: Default::default(),
89 }
90 }
91
92 pub fn with_flags(mut self, flags: GtidFlags) -> Self {
94 self.flags = RawFlags::new(flags.bits());
95 self
96 }
97
98 pub fn flags_raw(&self) -> u8 {
100 self.flags.0
101 }
102
103 pub fn flags(&self) -> GtidFlags {
111 self.flags.get()
112 }
113
114 pub fn with_sid(mut self, sid: [u8; Self::ENCODED_SID_LENGTH]) -> Self {
116 self.sid = sid;
117 self
118 }
119
120 pub fn sid(&self) -> [u8; Self::ENCODED_SID_LENGTH] {
124 self.sid
125 }
126
127 pub fn with_gno(mut self, gno: u64) -> Self {
129 self.gno = RawConst::new(gno);
130 self
131 }
132
133 pub fn gno(&self) -> u64 {
137 self.gno.0
138 }
139
140 pub fn lc_typecode(&self) -> Option<u8> {
144 self.lc_typecode.as_ref().map(|x| x.value())
145 }
146
147 pub fn with_lc_typecode(mut self) -> Self {
152 self.lc_typecode = Some(LogicalTimestampTypecode::default());
153 self
154 }
155
156 pub fn with_last_committed(mut self, last_committed: u64) -> Self {
158 self.last_committed = RawInt::new(last_committed);
159 self
160 }
161
162 pub fn last_committed(&self) -> u64 {
166 self.last_committed.0
167 }
168
169 pub fn with_sequence_number(mut self, sequence_number: u64) -> Self {
171 self.sequence_number = RawInt::new(sequence_number);
172 self
173 }
174
175 pub fn sequence_number(&self) -> u64 {
179 self.sequence_number.0
180 }
181
182 pub fn with_immediate_commit_timestamp(mut self, immediate_commit_timestamp: u64) -> Self {
184 self.immediate_commit_timestamp = RawInt::new(immediate_commit_timestamp);
185 self
186 }
187
188 pub fn immediate_commit_timestamp(&self) -> u64 {
192 self.immediate_commit_timestamp.0
193 }
194
195 pub fn with_original_commit_timestamp(mut self, original_commit_timestamp: u64) -> Self {
197 self.original_commit_timestamp = RawInt::new(original_commit_timestamp);
198 self
199 }
200
201 pub fn original_commit_timestamp(&self) -> u64 {
205 self.original_commit_timestamp.0
206 }
207
208 pub fn with_tx_length(mut self, tx_length: u64) -> Self {
210 self.tx_length = RawInt::new(tx_length);
211 self
212 }
213
214 pub fn tx_length(&self) -> u64 {
218 self.tx_length.0
219 }
220
221 pub fn with_original_server_version(mut self, original_server_version: u32) -> Self {
223 self.original_server_version = RawInt::new(original_server_version);
224 self
225 }
226
227 pub fn original_server_version(&self) -> u32 {
232 self.original_server_version.0
233 }
234
235 pub fn with_immediate_server_version(mut self, immediate_server_version: u32) -> Self {
237 self.immediate_server_version = RawInt::new(immediate_server_version);
238 self
239 }
240
241 pub fn immediate_server_version(&self) -> u32 {
245 self.immediate_server_version.0
246 }
247}
248
249impl<'de> MyDeserialize<'de> for GtidEvent {
250 const SIZE: Option<usize> = None;
251 type Ctx = BinlogCtx<'de>;
252
253 fn deserialize(_ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
254 let mut sbuf: ParseBuf = buf.parse(1 + Self::ENCODED_SID_LENGTH + 8)?;
255 let flags = sbuf.parse_unchecked(())?;
256 let sid: [u8; Self::ENCODED_SID_LENGTH] = sbuf.parse_unchecked(())?;
257 let gno = sbuf.parse_unchecked(())?;
258
259 let mut lc_typecode = None;
260 let mut last_committed = RawInt::new(0);
261 let mut sequence_number = RawInt::new(0);
262 let mut immediate_commit_timestamp = RawInt::new(0);
263 let mut original_commit_timestamp = RawInt::new(0);
264 let mut tx_length = RawInt::new(0);
265
266 let mut original_server_version = RawInt::new(Self::UNDEFINED_SERVER_VERSION);
267 let mut immediate_server_version = RawInt::new(Self::UNDEFINED_SERVER_VERSION);
268
269 if !buf.is_empty() && buf.0[0] == Self::LOGICAL_TIMESTAMP_TYPECODE {
271 lc_typecode = Some(buf.parse_unchecked(())?);
272
273 let mut sbuf: ParseBuf = buf.parse(16)?;
274 last_committed = sbuf.parse_unchecked(())?;
275 sequence_number = sbuf.parse_unchecked(())?;
276
277 if buf.len() >= Self::IMMEDIATE_COMMIT_TIMESTAMP_LENGTH {
278 immediate_commit_timestamp = buf.parse_unchecked(())?;
279 if immediate_commit_timestamp.0 & (1 << 55) != 0 {
280 immediate_commit_timestamp.0 &= !(1 << 55);
281 original_commit_timestamp = buf.parse(())?;
282 } else {
283 original_commit_timestamp = immediate_commit_timestamp;
285 }
286 }
287
288 if !buf.is_empty() {
289 tx_length = buf.parse_unchecked(())?;
290 }
291
292 if buf.len() >= Self::IMMEDIATE_SERVER_VERSION_LENGTH {
293 immediate_server_version = buf.parse_unchecked(())?;
294 if immediate_server_version.0 & (1 << 31) != 0 {
295 immediate_server_version.0 &= !(1 << 31);
296 original_server_version = buf.parse(())?;
297 } else {
298 original_server_version = immediate_server_version;
299 }
300 }
301 }
302
303 Ok(Self {
304 flags,
305 sid,
306 gno,
307 lc_typecode,
308 last_committed,
309 sequence_number,
310 immediate_commit_timestamp,
311 original_commit_timestamp,
312 tx_length,
313 original_server_version,
314 immediate_server_version,
315 })
316 }
317}
318
319impl MySerialize for GtidEvent {
320 fn serialize(&self, buf: &mut Vec<u8>) {
321 self.flags.serialize(&mut *buf);
322 self.sid.serialize(&mut *buf);
323 self.gno.serialize(&mut *buf);
324 match self.lc_typecode {
325 Some(lc_typecode) => lc_typecode.serialize(&mut *buf),
326 None => return,
327 };
328 self.last_committed.serialize(&mut *buf);
329 self.sequence_number.serialize(&mut *buf);
330
331 let mut immediate_commit_timestamp_with_flag = *self.immediate_commit_timestamp;
332 if self.immediate_commit_timestamp != self.original_commit_timestamp {
333 immediate_commit_timestamp_with_flag |= 1 << 55;
334 } else {
335 immediate_commit_timestamp_with_flag &= !(1 << 55);
336 }
337 RawInt::<LeU56>::new(immediate_commit_timestamp_with_flag).serialize(&mut *buf);
338
339 if self.immediate_commit_timestamp != self.original_commit_timestamp {
340 self.original_commit_timestamp.serialize(&mut *buf);
341 }
342
343 self.tx_length.serialize(&mut *buf);
344
345 let mut immediate_server_version_with_flag = *self.immediate_server_version;
346 if self.immediate_server_version != self.original_server_version {
347 immediate_server_version_with_flag |= 1 << 31;
348 } else {
349 immediate_server_version_with_flag &= !(1 << 31);
350 }
351 RawInt::<LeU32>::new(immediate_server_version_with_flag).serialize(&mut *buf);
352
353 if self.immediate_server_version != self.original_server_version {
354 self.original_server_version.serialize(&mut *buf);
355 }
356 }
357}
358
359impl<'a> BinlogStruct<'a> for GtidEvent {
360 fn len(&self, _version: BinlogVersion) -> usize {
361 let mut len = S(0);
362
363 len += S(1); len += S(Self::ENCODED_SID_LENGTH); len += S(8); len += S(1); len += S(8); len += S(8); len += S(7); if self.immediate_commit_timestamp != self.original_commit_timestamp {
373 len += S(7); }
375
376 len += S(crate::misc::lenenc_int_len(*self.tx_length) as usize); len += S(4); if self.immediate_server_version != self.original_server_version {
379 len += S(4); }
381
382 min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
383 }
384}
385
386impl<'a> BinlogEvent<'a> for GtidEvent {
387 const EVENT_TYPE: EventType = EventType::GTID_EVENT;
388}