use std::{cmp::min, io};
use saturating::Saturating as S;
use crate::{
binlog::{
consts::{BinlogVersion, EventType, Gno, GtidFlags},
BinlogCtx, BinlogEvent, BinlogStruct,
},
io::ParseBuf,
misc::raw::{int::*, RawConst, RawFlags},
proto::{MyDeserialize, MySerialize},
};
use super::BinlogEventHeader;
define_const!(
ConstU8,
LogicalTimestampTypecode,
InvalidLogicalTimestampTypecode("Invalid logical timestamp typecode value for GTID event"),
2
);
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct GtidEvent {
flags: RawFlags<GtidFlags, u8>,
sid: [u8; Self::ENCODED_SID_LENGTH],
gno: RawConst<LeU64, Gno>,
lc_typecode: Option<LogicalTimestampTypecode>,
last_committed: RawInt<LeU64>,
sequence_number: RawInt<LeU64>,
immediate_commit_timestamp: RawInt<LeU56>,
original_commit_timestamp: RawInt<LeU56>,
tx_length: RawInt<LenEnc>,
original_server_version: RawInt<LeU32>,
immediate_server_version: RawInt<LeU32>,
}
impl GtidEvent {
pub const POST_HEADER_LENGTH: usize = 1 + Self::ENCODED_SID_LENGTH + 8 + 1 + 16;
pub const ENCODED_SID_LENGTH: usize = 16;
pub const LOGICAL_TIMESTAMP_TYPECODE: u8 = 2;
pub const IMMEDIATE_COMMIT_TIMESTAMP_LENGTH: usize = 7;
pub const ORIGINAL_COMMIT_TIMESTAMP_LENGTH: usize = 7;
pub const UNDEFINED_SERVER_VERSION: u32 = 999_999;
pub const IMMEDIATE_SERVER_VERSION_LENGTH: usize = 4;
pub fn new(sid: [u8; Self::ENCODED_SID_LENGTH], gno: u64) -> Self {
Self {
flags: Default::default(),
sid,
gno: RawConst::new(gno),
lc_typecode: Some(LogicalTimestampTypecode::default()),
last_committed: Default::default(),
sequence_number: Default::default(),
immediate_commit_timestamp: Default::default(),
original_commit_timestamp: Default::default(),
tx_length: Default::default(),
original_server_version: Default::default(),
immediate_server_version: Default::default(),
}
}
pub fn with_flags(mut self, flags: GtidFlags) -> Self {
self.flags = RawFlags::new(flags.bits());
self
}
pub fn flags_raw(&self) -> u8 {
self.flags.0
}
pub fn flags(&self) -> GtidFlags {
self.flags.get()
}
pub fn with_sid(mut self, sid: [u8; Self::ENCODED_SID_LENGTH]) -> Self {
self.sid = sid;
self
}
pub fn sid(&self) -> [u8; Self::ENCODED_SID_LENGTH] {
self.sid
}
pub fn with_gno(mut self, gno: u64) -> Self {
self.gno = RawConst::new(gno);
self
}
pub fn gno(&self) -> u64 {
self.gno.0
}
pub fn lc_typecode(&self) -> Option<u8> {
self.lc_typecode.as_ref().map(|x| x.value())
}
pub fn with_lc_typecode(mut self) -> Self {
self.lc_typecode = Some(LogicalTimestampTypecode::default());
self
}
pub fn with_last_committed(mut self, last_committed: u64) -> Self {
self.last_committed = RawInt::new(last_committed);
self
}
pub fn last_committed(&self) -> u64 {
self.last_committed.0
}
pub fn with_sequence_number(mut self, sequence_number: u64) -> Self {
self.sequence_number = RawInt::new(sequence_number);
self
}
pub fn sequence_number(&self) -> u64 {
self.sequence_number.0
}
pub fn with_immediate_commit_timestamp(mut self, immediate_commit_timestamp: u64) -> Self {
self.immediate_commit_timestamp = RawInt::new(immediate_commit_timestamp);
self
}
pub fn immediate_commit_timestamp(&self) -> u64 {
self.immediate_commit_timestamp.0
}
pub fn with_original_commit_timestamp(mut self, original_commit_timestamp: u64) -> Self {
self.original_commit_timestamp = RawInt::new(original_commit_timestamp);
self
}
pub fn original_commit_timestamp(&self) -> u64 {
self.original_commit_timestamp.0
}
pub fn with_tx_length(mut self, tx_length: u64) -> Self {
self.tx_length = RawInt::new(tx_length);
self
}
pub fn tx_length(&self) -> u64 {
self.tx_length.0
}
pub fn with_original_server_version(mut self, original_server_version: u32) -> Self {
self.original_server_version = RawInt::new(original_server_version);
self
}
pub fn original_server_version(&self) -> u32 {
self.original_server_version.0
}
pub fn with_immediate_server_version(mut self, immediate_server_version: u32) -> Self {
self.immediate_server_version = RawInt::new(immediate_server_version);
self
}
pub fn immediate_server_version(&self) -> u32 {
self.immediate_server_version.0
}
}
impl<'de> MyDeserialize<'de> for GtidEvent {
const SIZE: Option<usize> = None;
type Ctx = BinlogCtx<'de>;
fn deserialize(_ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
let mut sbuf: ParseBuf = buf.parse(1 + Self::ENCODED_SID_LENGTH + 8)?;
let flags = sbuf.parse_unchecked(())?;
let sid: [u8; Self::ENCODED_SID_LENGTH] = sbuf.parse_unchecked(())?;
let gno = sbuf.parse_unchecked(())?;
let mut lc_typecode = None;
let mut last_committed = RawInt::new(0);
let mut sequence_number = RawInt::new(0);
let mut immediate_commit_timestamp = RawInt::new(0);
let mut original_commit_timestamp = RawInt::new(0);
let mut tx_length = RawInt::new(0);
let mut original_server_version = RawInt::new(Self::UNDEFINED_SERVER_VERSION);
let mut immediate_server_version = RawInt::new(Self::UNDEFINED_SERVER_VERSION);
if !buf.is_empty() && buf.0[0] == Self::LOGICAL_TIMESTAMP_TYPECODE {
lc_typecode = Some(buf.parse_unchecked(())?);
let mut sbuf: ParseBuf = buf.parse(16)?;
last_committed = sbuf.parse_unchecked(())?;
sequence_number = sbuf.parse_unchecked(())?;
if buf.len() >= Self::IMMEDIATE_COMMIT_TIMESTAMP_LENGTH {
immediate_commit_timestamp = buf.parse_unchecked(())?;
if immediate_commit_timestamp.0 & (1 << 55) != 0 {
immediate_commit_timestamp.0 &= !(1 << 55);
original_commit_timestamp = buf.parse(())?;
} else {
original_commit_timestamp = immediate_commit_timestamp;
}
}
if !buf.is_empty() {
tx_length = buf.parse_unchecked(())?;
}
if buf.len() >= Self::IMMEDIATE_SERVER_VERSION_LENGTH {
immediate_server_version = buf.parse_unchecked(())?;
if immediate_server_version.0 & (1 << 31) != 0 {
immediate_server_version.0 &= !(1 << 31);
original_server_version = buf.parse(())?;
} else {
original_server_version = immediate_server_version;
}
}
}
Ok(Self {
flags,
sid,
gno,
lc_typecode,
last_committed,
sequence_number,
immediate_commit_timestamp,
original_commit_timestamp,
tx_length,
original_server_version,
immediate_server_version,
})
}
}
impl MySerialize for GtidEvent {
fn serialize(&self, buf: &mut Vec<u8>) {
self.flags.serialize(&mut *buf);
self.sid.serialize(&mut *buf);
self.gno.serialize(&mut *buf);
match self.lc_typecode {
Some(lc_typecode) => lc_typecode.serialize(&mut *buf),
None => return,
};
self.last_committed.serialize(&mut *buf);
self.sequence_number.serialize(&mut *buf);
let mut immediate_commit_timestamp_with_flag = *self.immediate_commit_timestamp;
if self.immediate_commit_timestamp != self.original_commit_timestamp {
immediate_commit_timestamp_with_flag |= 1 << 55;
} else {
immediate_commit_timestamp_with_flag &= !(1 << 55);
}
RawInt::<LeU56>::new(immediate_commit_timestamp_with_flag).serialize(&mut *buf);
if self.immediate_commit_timestamp != self.original_commit_timestamp {
self.original_commit_timestamp.serialize(&mut *buf);
}
self.tx_length.serialize(&mut *buf);
let mut immediate_server_version_with_flag = *self.immediate_server_version;
if self.immediate_server_version != self.original_server_version {
immediate_server_version_with_flag |= 1 << 31;
} else {
immediate_server_version_with_flag &= !(1 << 31);
}
RawInt::<LeU32>::new(immediate_server_version_with_flag).serialize(&mut *buf);
if self.immediate_server_version != self.original_server_version {
self.original_server_version.serialize(&mut *buf);
}
}
}
impl<'a> BinlogStruct<'a> for GtidEvent {
fn len(&self, _version: BinlogVersion) -> usize {
let mut len = S(0);
len += S(1); len += S(Self::ENCODED_SID_LENGTH); len += S(8); len += S(1); len += S(8); len += S(8); len += S(7); if self.immediate_commit_timestamp != self.original_commit_timestamp {
len += S(7); }
len += S(crate::misc::lenenc_int_len(*self.tx_length) as usize); len += S(4); if self.immediate_server_version != self.original_server_version {
len += S(4); }
min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
}
}
impl<'a> BinlogEvent<'a> for GtidEvent {
const EVENT_TYPE: EventType = EventType::GTID_EVENT;
}