use std::{borrow::Cow, cmp::min, io};
use saturating::Saturating as S;
use crate::{
binlog::{
consts::{BinlogVersion, EventType, LoadDuplicateHandling},
BinlogCtx, BinlogEvent, BinlogStruct,
},
io::ParseBuf,
misc::raw::{
bytes::{BareU8Bytes, EofBytes},
int::*,
Const, RawBytes, RawInt, Skip,
},
proto::{MyDeserialize, MySerialize},
};
use super::{BinlogEventHeader, StatusVars};
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct ExecuteLoadQueryEvent<'a> {
thread_id: RawInt<LeU32>,
execution_time: RawInt<LeU32>,
schema_len: RawInt<u8>,
error_code: RawInt<LeU16>,
status_vars_len: RawInt<LeU16>,
file_id: RawInt<LeU32>,
start_pos: RawInt<LeU32>,
end_pos: RawInt<LeU32>,
dup_handling: Const<LoadDuplicateHandling, u8>,
status_vars: StatusVars<'a>,
schema: RawBytes<'a, BareU8Bytes>,
__skip: Skip<1>,
query: RawBytes<'a, EofBytes>,
}
impl<'a> ExecuteLoadQueryEvent<'a> {
pub fn new(
file_id: u32,
dup_handling: LoadDuplicateHandling,
status_vars: impl Into<Cow<'a, [u8]>>,
schema: impl Into<Cow<'a, [u8]>>,
) -> Self {
let status_vars = StatusVars(RawBytes::new(status_vars));
let schema = RawBytes::new(schema);
Self {
thread_id: Default::default(),
execution_time: Default::default(),
schema_len: RawInt::new(schema.len() as u8),
error_code: Default::default(),
status_vars_len: RawInt::new(status_vars.0.len() as u16),
file_id: RawInt::new(file_id),
start_pos: Default::default(),
end_pos: Default::default(),
dup_handling: Const::new(dup_handling),
status_vars,
schema,
__skip: Default::default(),
query: Default::default(),
}
}
pub fn with_thread_id(mut self, thread_id: u32) -> Self {
self.thread_id = RawInt::new(thread_id);
self
}
pub fn with_execution_time(mut self, execution_time: u32) -> Self {
self.execution_time = RawInt::new(execution_time);
self
}
pub fn with_error_code(mut self, error_code: u16) -> Self {
self.error_code = RawInt::new(error_code);
self
}
pub fn with_file_id(mut self, file_id: u32) -> Self {
self.file_id = RawInt::new(file_id);
self
}
pub fn with_start_pos(mut self, start_pos: u32) -> Self {
self.start_pos = RawInt::new(start_pos);
self
}
pub fn with_end_pos(mut self, end_pos: u32) -> Self {
self.end_pos = RawInt::new(end_pos);
self
}
pub fn with_dup_handling(mut self, dup_handling: LoadDuplicateHandling) -> Self {
self.dup_handling = Const::new(dup_handling);
self
}
pub fn with_status_vars(mut self, status_vars: impl Into<Cow<'a, [u8]>>) -> Self {
self.status_vars = StatusVars(RawBytes::new(status_vars));
self.status_vars_len.0 = self.status_vars.0.len() as u16;
self
}
pub fn with_schema(mut self, schema: impl Into<Cow<'a, [u8]>>) -> Self {
self.schema = RawBytes::new(schema);
self.schema_len.0 = self.schema.len() as u8;
self
}
pub fn with_query(mut self, query: impl Into<Cow<'a, [u8]>>) -> Self {
self.query = RawBytes::new(query);
self
}
pub fn thread_id(&self) -> u32 {
self.thread_id.0
}
pub fn execution_time(&self) -> u32 {
self.execution_time.0
}
pub fn error_code(&self) -> u16 {
self.error_code.0
}
pub fn file_id(&self) -> u32 {
self.file_id.0
}
pub fn start_pos(&self) -> u32 {
self.start_pos.0
}
pub fn end_pos(&self) -> u32 {
self.end_pos.0
}
pub fn dup_handling(&self) -> LoadDuplicateHandling {
self.dup_handling.0
}
pub fn status_vars_raw(&'a self) -> &'a [u8] {
self.status_vars.0.as_bytes()
}
pub fn status_vars(&'a self) -> &'a StatusVars<'a> {
&self.status_vars
}
pub fn schema_raw(&'a self) -> &'a [u8] {
self.schema.as_bytes()
}
pub fn schema(&'a self) -> Cow<'a, str> {
self.schema.as_str()
}
pub fn query_raw(&'a self) -> &'a [u8] {
self.query.as_bytes()
}
pub fn query(&'a self) -> Cow<'a, str> {
self.query.as_str()
}
pub fn into_owned(self) -> ExecuteLoadQueryEvent<'static> {
ExecuteLoadQueryEvent {
thread_id: self.thread_id,
execution_time: self.execution_time,
schema_len: self.schema_len,
error_code: self.error_code,
status_vars_len: self.status_vars_len,
file_id: self.file_id,
start_pos: self.start_pos,
end_pos: self.end_pos,
dup_handling: self.dup_handling,
status_vars: self.status_vars.into_owned(),
schema: self.schema.into_owned(),
__skip: self.__skip,
query: self.query.into_owned(),
}
}
}
impl<'de> MyDeserialize<'de> for ExecuteLoadQueryEvent<'de> {
const SIZE: Option<usize> = None;
type Ctx = BinlogCtx<'de>;
fn deserialize(_: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
let mut sbuf: ParseBuf = buf.parse(26)?;
let thread_id = sbuf.parse_unchecked(())?;
let execution_time = sbuf.parse_unchecked(())?;
let schema_len: RawInt<u8> = sbuf.parse_unchecked(())?;
let error_code = sbuf.parse_unchecked(())?;
let status_vars_len: RawInt<LeU16> = sbuf.parse_unchecked(())?;
let file_id = sbuf.parse_unchecked(())?;
let start_pos = sbuf.parse_unchecked(())?;
let end_pos = sbuf.parse_unchecked(())?;
let dup_handling = sbuf.parse_unchecked(())?;
let status_vars = buf.parse(*status_vars_len)?;
let schema = buf.parse(*schema_len as usize)?;
let __skip = buf.parse(())?;
let query = buf.parse(())?;
Ok(Self {
thread_id,
execution_time,
schema_len,
error_code,
status_vars_len,
file_id,
start_pos,
end_pos,
dup_handling,
status_vars,
schema,
__skip,
query,
})
}
}
impl MySerialize for ExecuteLoadQueryEvent<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.thread_id.serialize(&mut *buf);
self.execution_time.serialize(&mut *buf);
self.schema_len.serialize(&mut *buf);
self.error_code.serialize(&mut *buf);
self.status_vars_len.serialize(&mut *buf);
self.file_id.serialize(&mut *buf);
self.start_pos.serialize(&mut *buf);
self.end_pos.serialize(&mut *buf);
self.dup_handling.serialize(&mut *buf);
self.status_vars.serialize(&mut *buf);
self.schema.serialize(&mut *buf);
self.__skip.serialize(&mut *buf);
self.query.serialize(&mut *buf);
}
}
impl<'a> BinlogStruct<'a> for ExecuteLoadQueryEvent<'a> {
fn len(&self, _version: BinlogVersion) -> usize {
let mut len = S(0);
len += S(4); len += S(4); len += S(1); len += S(2); len += S(2); len += S(4); len += S(4); len += S(4); len += S(1); len += S(min(self.status_vars.0.len(), u16::MAX as usize - 13)); len += S(min(self.schema.0.len(), u8::MAX as usize)); len += S(1); len += S(self.query.0.len());
min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
}
}
impl<'a> BinlogEvent<'a> for ExecuteLoadQueryEvent<'a> {
const EVENT_TYPE: EventType = EventType::EXECUTE_LOAD_QUERY_EVENT;
}