mysql_common/binlog/events/
gtid_event.rs

1// Copyright (c) 2021 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use std::{cmp::min, io};
10
11use saturating::Saturating as S;
12
13use crate::{
14    binlog::{
15        consts::{BinlogVersion, EventType, Gno, GtidFlags},
16        BinlogCtx, BinlogEvent, BinlogStruct,
17    },
18    io::ParseBuf,
19    misc::raw::{int::*, RawConst, RawFlags},
20    proto::{MyDeserialize, MySerialize},
21};
22
23use super::BinlogEventHeader;
24
25define_const!(
26    ConstU8,
27    LogicalTimestampTypecode,
28    InvalidLogicalTimestampTypecode("Invalid logical timestamp typecode value for GTID event"),
29    2
30);
31
32/// GTID stands for Global Transaction IDentifier.
33#[derive(Debug, Clone, Eq, PartialEq, Hash)]
34pub struct GtidEvent {
35    /// Raw flags value.
36    flags: RawFlags<GtidFlags, u8>,
37    /// UUID representing the SID.
38    sid: [u8; Self::ENCODED_SID_LENGTH],
39    /// Group number, second component of GTID.
40    ///
41    ///
42    /// Should be an integer between `MIN_GNO` and `MAX_GNO` for GtidEvent
43    /// or `0` for AnonymousGtidEvent.
44    gno: RawConst<LeU64, Gno>,
45    /// If defined, then always equal to the constant [`GtidEvent::LOGICAL_TIMESTAMP_TYPECODE`].
46    ///
47    /// May be missing for 5.6. Will have different value on 5.7.4 and earlier (ignored).
48    lc_typecode: Option<LogicalTimestampTypecode>,
49    /// Store the transaction's commit parent `sequence_number`.
50    last_committed: RawInt<LeU64>,
51    /// The transaction's logical timestamp assigned at prepare phase.
52    ///
53    /// If it isn't `0` then it must be greater than `last_committed` timestamp.
54    sequence_number: RawInt<LeU64>,
55    /// Timestamp when the transaction was committed on the nearest master.
56    immediate_commit_timestamp: RawInt<LeU56>,
57    /// Timestamp when the transaction was committed on the originating master.
58    original_commit_timestamp: RawInt<LeU56>,
59    /// The packed transaction's length in bytes, including the Gtid.
60    tx_length: RawInt<LenEnc>,
61    /// The version of the server where the transaction was originally executed.
62    original_server_version: RawInt<LeU32>,
63    /// The version of the immediate server.
64    immediate_server_version: RawInt<LeU32>,
65}
66
67impl GtidEvent {
68    pub const POST_HEADER_LENGTH: usize = 1 + Self::ENCODED_SID_LENGTH + 8 + 1 + 16;
69    pub const ENCODED_SID_LENGTH: usize = 16;
70    pub const LOGICAL_TIMESTAMP_TYPECODE: u8 = 2;
71    pub const IMMEDIATE_COMMIT_TIMESTAMP_LENGTH: usize = 7;
72    pub const ORIGINAL_COMMIT_TIMESTAMP_LENGTH: usize = 7;
73    pub const UNDEFINED_SERVER_VERSION: u32 = 999_999;
74    pub const IMMEDIATE_SERVER_VERSION_LENGTH: usize = 4;
75
76    pub fn new(sid: [u8; Self::ENCODED_SID_LENGTH], gno: u64) -> Self {
77        Self {
78            flags: Default::default(),
79            sid,
80            gno: RawConst::new(gno),
81            lc_typecode: Some(LogicalTimestampTypecode::default()),
82            last_committed: Default::default(),
83            sequence_number: Default::default(),
84            immediate_commit_timestamp: Default::default(),
85            original_commit_timestamp: Default::default(),
86            tx_length: Default::default(),
87            original_server_version: Default::default(),
88            immediate_server_version: Default::default(),
89        }
90    }
91
92    /// Defines the `flags` value.
93    pub fn with_flags(mut self, flags: GtidFlags) -> Self {
94        self.flags = RawFlags::new(flags.bits());
95        self
96    }
97
98    /// Returns the raw `flags` value.
99    pub fn flags_raw(&self) -> u8 {
100        self.flags.0
101    }
102
103    /// Returns the `flags` value. Unknown bits will be truncated.
104    ///
105    /// `00000001` – Transaction may have changes logged with SBR.
106    ///
107    /// In 5.6, 5.7.0-5.7.18, and 8.0.0-8.0.1, this flag is always set. Starting in 5.7.19 and
108    /// 8.0.2, this flag is cleared if the transaction only contains row events.
109    /// It is set if any part of the transaction is written in statement format.
110    pub fn flags(&self) -> GtidFlags {
111        self.flags.get()
112    }
113
114    /// Defines the `sid` value.
115    pub fn with_sid(mut self, sid: [u8; Self::ENCODED_SID_LENGTH]) -> Self {
116        self.sid = sid;
117        self
118    }
119
120    /// Returns the `sid` value.
121    ///
122    /// `sid` is the UUID representing the SID.
123    pub fn sid(&self) -> [u8; Self::ENCODED_SID_LENGTH] {
124        self.sid
125    }
126
127    /// Defines the `gno` value.
128    pub fn with_gno(mut self, gno: u64) -> Self {
129        self.gno = RawConst::new(gno);
130        self
131    }
132
133    /// Returns the `gno` value.
134    ///
135    /// `gno` is a group number, second component of GTID.
136    pub fn gno(&self) -> u64 {
137        self.gno.0
138    }
139
140    /// Returns the `lc_typecode` value.
141    ///
142    /// `lc_typecode` is the type of logical timestamp used in the logical clock fields.
143    pub fn lc_typecode(&self) -> Option<u8> {
144        self.lc_typecode.as_ref().map(|x| x.value())
145    }
146
147    /// Sets the `lc_typecode` value to [`GtidEvent::LOGICAL_TIMESTAMP_TYPECODE`].
148    ///
149    /// This is already by default, but `lc_typecode` might be `None` if `Self` is obtained
150    /// from an old MySql server via [`MyDeserialize::deserialize`].
151    pub fn with_lc_typecode(mut self) -> Self {
152        self.lc_typecode = Some(LogicalTimestampTypecode::default());
153        self
154    }
155
156    /// Sets the `last_committed` value.
157    pub fn with_last_committed(mut self, last_committed: u64) -> Self {
158        self.last_committed = RawInt::new(last_committed);
159        self
160    }
161
162    /// Returns the `last_committed` value.
163    ///
164    /// `last_committed` stores the transaction's commit parent `sequence_number`.
165    pub fn last_committed(&self) -> u64 {
166        self.last_committed.0
167    }
168
169    /// Sets the `sequence_number` value.
170    pub fn with_sequence_number(mut self, sequence_number: u64) -> Self {
171        self.sequence_number = RawInt::new(sequence_number);
172        self
173    }
174
175    /// Returns the `sequence_number` value.
176    ///
177    /// `sequence_number` is the transaction's logical timestamp assigned at prepare phase.
178    pub fn sequence_number(&self) -> u64 {
179        self.sequence_number.0
180    }
181
182    /// Sets the `immediate_commit_timestamp` value.
183    pub fn with_immediate_commit_timestamp(mut self, immediate_commit_timestamp: u64) -> Self {
184        self.immediate_commit_timestamp = RawInt::new(immediate_commit_timestamp);
185        self
186    }
187
188    /// Returns the `immediate_commit_timestamp` value.
189    ///
190    /// `immediate_commit_timestamp` is a timestamp of commit on the immediate master.
191    pub fn immediate_commit_timestamp(&self) -> u64 {
192        self.immediate_commit_timestamp.0
193    }
194
195    /// Sets the `original_commit_timestamp` value.
196    pub fn with_original_commit_timestamp(mut self, original_commit_timestamp: u64) -> Self {
197        self.original_commit_timestamp = RawInt::new(original_commit_timestamp);
198        self
199    }
200
201    /// Returns the `original_commit_timestamp` value.
202    ///
203    /// `original_commit_timestamp` is the timestamp of commit on the originating master.
204    pub fn original_commit_timestamp(&self) -> u64 {
205        self.original_commit_timestamp.0
206    }
207
208    /// Sets the `tx_length` value.
209    pub fn with_tx_length(mut self, tx_length: u64) -> Self {
210        self.tx_length = RawInt::new(tx_length);
211        self
212    }
213
214    /// Returns the `tx_length` value.
215    ///
216    /// `tx_length` is the packed transaction's length in bytes, including the Gtid.
217    pub fn tx_length(&self) -> u64 {
218        self.tx_length.0
219    }
220
221    /// Sets the `original_server_version` value.
222    pub fn with_original_server_version(mut self, original_server_version: u32) -> Self {
223        self.original_server_version = RawInt::new(original_server_version);
224        self
225    }
226
227    /// Returns the `original_server_version` value.
228    ///
229    /// `original_server_version` is the version of the server where the transaction was originally
230    /// executed.
231    pub fn original_server_version(&self) -> u32 {
232        self.original_server_version.0
233    }
234
235    /// Sets the `immediate_server_version` value.
236    pub fn with_immediate_server_version(mut self, immediate_server_version: u32) -> Self {
237        self.immediate_server_version = RawInt::new(immediate_server_version);
238        self
239    }
240
241    /// Returns the `immediate_server_version` value.
242    ///
243    /// `immediate_server_version` is the server version of the immediate server.
244    pub fn immediate_server_version(&self) -> u32 {
245        self.immediate_server_version.0
246    }
247}
248
249impl<'de> MyDeserialize<'de> for GtidEvent {
250    const SIZE: Option<usize> = None;
251    type Ctx = BinlogCtx<'de>;
252
253    fn deserialize(_ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
254        let mut sbuf: ParseBuf = buf.parse(1 + Self::ENCODED_SID_LENGTH + 8)?;
255        let flags = sbuf.parse_unchecked(())?;
256        let sid: [u8; Self::ENCODED_SID_LENGTH] = sbuf.parse_unchecked(())?;
257        let gno = sbuf.parse_unchecked(())?;
258
259        let mut lc_typecode = None;
260        let mut last_committed = RawInt::new(0);
261        let mut sequence_number = RawInt::new(0);
262        let mut immediate_commit_timestamp = RawInt::new(0);
263        let mut original_commit_timestamp = RawInt::new(0);
264        let mut tx_length = RawInt::new(0);
265
266        let mut original_server_version = RawInt::new(Self::UNDEFINED_SERVER_VERSION);
267        let mut immediate_server_version = RawInt::new(Self::UNDEFINED_SERVER_VERSION);
268
269        // Buf will be empty for MySql 5.6. Condition will be false for MySql <= 5.7.4
270        if !buf.is_empty() && buf.0[0] == Self::LOGICAL_TIMESTAMP_TYPECODE {
271            lc_typecode = Some(buf.parse_unchecked(())?);
272
273            let mut sbuf: ParseBuf = buf.parse(16)?;
274            last_committed = sbuf.parse_unchecked(())?;
275            sequence_number = sbuf.parse_unchecked(())?;
276
277            if buf.len() >= Self::IMMEDIATE_COMMIT_TIMESTAMP_LENGTH {
278                immediate_commit_timestamp = buf.parse_unchecked(())?;
279                if immediate_commit_timestamp.0 & (1 << 55) != 0 {
280                    immediate_commit_timestamp.0 &= !(1 << 55);
281                    original_commit_timestamp = buf.parse(())?;
282                } else {
283                    // The transaction originated in the previous server
284                    original_commit_timestamp = immediate_commit_timestamp;
285                }
286            }
287
288            if !buf.is_empty() {
289                tx_length = buf.parse_unchecked(())?;
290            }
291
292            if buf.len() >= Self::IMMEDIATE_SERVER_VERSION_LENGTH {
293                immediate_server_version = buf.parse_unchecked(())?;
294                if immediate_server_version.0 & (1 << 31) != 0 {
295                    immediate_server_version.0 &= !(1 << 31);
296                    original_server_version = buf.parse(())?;
297                } else {
298                    original_server_version = immediate_server_version;
299                }
300            }
301        }
302
303        Ok(Self {
304            flags,
305            sid,
306            gno,
307            lc_typecode,
308            last_committed,
309            sequence_number,
310            immediate_commit_timestamp,
311            original_commit_timestamp,
312            tx_length,
313            original_server_version,
314            immediate_server_version,
315        })
316    }
317}
318
319impl MySerialize for GtidEvent {
320    fn serialize(&self, buf: &mut Vec<u8>) {
321        self.flags.serialize(&mut *buf);
322        self.sid.serialize(&mut *buf);
323        self.gno.serialize(&mut *buf);
324        match self.lc_typecode {
325            Some(lc_typecode) => lc_typecode.serialize(&mut *buf),
326            None => return,
327        };
328        self.last_committed.serialize(&mut *buf);
329        self.sequence_number.serialize(&mut *buf);
330
331        let mut immediate_commit_timestamp_with_flag = *self.immediate_commit_timestamp;
332        if self.immediate_commit_timestamp != self.original_commit_timestamp {
333            immediate_commit_timestamp_with_flag |= 1 << 55;
334        } else {
335            immediate_commit_timestamp_with_flag &= !(1 << 55);
336        }
337        RawInt::<LeU56>::new(immediate_commit_timestamp_with_flag).serialize(&mut *buf);
338
339        if self.immediate_commit_timestamp != self.original_commit_timestamp {
340            self.original_commit_timestamp.serialize(&mut *buf);
341        }
342
343        self.tx_length.serialize(&mut *buf);
344
345        let mut immediate_server_version_with_flag = *self.immediate_server_version;
346        if self.immediate_server_version != self.original_server_version {
347            immediate_server_version_with_flag |= 1 << 31;
348        } else {
349            immediate_server_version_with_flag &= !(1 << 31);
350        }
351        RawInt::<LeU32>::new(immediate_server_version_with_flag).serialize(&mut *buf);
352
353        if self.immediate_server_version != self.original_server_version {
354            self.original_server_version.serialize(&mut *buf);
355        }
356    }
357}
358
359impl<'a> BinlogStruct<'a> for GtidEvent {
360    fn len(&self, _version: BinlogVersion) -> usize {
361        let mut len = S(0);
362
363        // post header
364        len += S(1); // flags
365        len += S(Self::ENCODED_SID_LENGTH); // sid
366        len += S(8); // gno
367        len += S(1); // lc_typecode
368        len += S(8); // last_committed
369        len += S(8); // sequence_number
370
371        len += S(7); // immediate_commit_timestamp
372        if self.immediate_commit_timestamp != self.original_commit_timestamp {
373            len += S(7); // original_commit_timestamp
374        }
375
376        len += S(crate::misc::lenenc_int_len(*self.tx_length) as usize); // tx_length
377        len += S(4); // immediate_server_version
378        if self.immediate_server_version != self.original_server_version {
379            len += S(4); // original_server_version
380        }
381
382        min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
383    }
384}
385
386impl<'a> BinlogEvent<'a> for GtidEvent {
387    const EVENT_TYPE: EventType = EventType::GTID_EVENT;
388}