mysql_common/binlog/events/
transaction_payload_event.rs1use std::{
10 borrow::Cow,
11 cmp::min,
12 convert::TryFrom,
13 fmt,
14 io::{self, BufRead, BufReader},
15};
16
17use saturating::Saturating as S;
18
19#[allow(unused)]
20use crate::binlog::EventStreamReader;
21
22use super::BinlogEventHeader;
23use crate::{
24 binlog::{
25 BinlogCtx, BinlogEvent, BinlogStruct,
26 consts::{
27 BinlogVersion, EventType, TransactionPayloadCompressionType, TransactionPayloadFields,
28 },
29 },
30 io::{BufMutExt, ParseBuf, ReadMysqlExt},
31 misc::raw::{RawBytes, bytes::EofBytes, int::*},
32 proto::{MyDeserialize, MySerialize},
33};
34
35#[derive(Debug)]
38pub struct TransactionPayloadReader<'a> {
39 inner: TransactionPayloadInner<'a>,
40}
41
42impl<'a> TransactionPayloadReader<'a> {
43 pub fn new_uncompressed(data: &'a [u8]) -> Self {
45 Self {
46 inner: TransactionPayloadInner::Uncompressed(data),
47 }
48 }
49
50 pub fn new_zstd(data: &'a [u8]) -> io::Result<Self> {
52 let decoder = zstd::Decoder::with_buffer(data)?;
53 Ok(Self {
54 inner: TransactionPayloadInner::ZstdCompressed(BufReader::new(decoder)),
55 })
56 }
57
58 pub fn has_data_left(&mut self) -> io::Result<bool> {
63 self.inner.has_data_left()
64 }
65}
66
67impl io::Read for TransactionPayloadReader<'_> {
68 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
69 self.inner.read(buf)
70 }
71}
72
73impl io::BufRead for TransactionPayloadReader<'_> {
74 fn fill_buf(&mut self) -> io::Result<&[u8]> {
75 self.inner.fill_buf()
76 }
77
78 fn consume(&mut self, amt: usize) {
79 self.inner.consume(amt)
80 }
81}
82
83enum TransactionPayloadInner<'a> {
85 Uncompressed(&'a [u8]),
86 ZstdCompressed(BufReader<zstd::Decoder<'a, &'a [u8]>>),
87}
88
89impl TransactionPayloadInner<'_> {
90 fn has_data_left(&mut self) -> io::Result<bool> {
91 match self {
92 TransactionPayloadInner::Uncompressed(x) => Ok(!x.is_empty()),
93 TransactionPayloadInner::ZstdCompressed(x) => x.fill_buf().map(|b| !b.is_empty()),
94 }
95 }
96}
97
98impl io::BufRead for TransactionPayloadInner<'_> {
99 fn fill_buf(&mut self) -> io::Result<&[u8]> {
100 match self {
101 TransactionPayloadInner::Uncompressed(x) => x.fill_buf(),
102 TransactionPayloadInner::ZstdCompressed(x) => x.fill_buf(),
103 }
104 }
105
106 fn consume(&mut self, amt: usize) {
107 match self {
108 TransactionPayloadInner::Uncompressed(x) => x.consume(amt),
109 TransactionPayloadInner::ZstdCompressed(x) => x.consume(amt),
110 }
111 }
112}
113
114impl io::Read for TransactionPayloadInner<'_> {
115 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
116 match self {
117 TransactionPayloadInner::Uncompressed(x) => x.read(buf),
118 TransactionPayloadInner::ZstdCompressed(x) => x.read(buf),
119 }
120 }
121}
122
123impl fmt::Debug for TransactionPayloadInner<'_> {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 match self {
126 Self::Uncompressed(arg0) => f.debug_tuple("Uncompressed").field(arg0).finish(),
127 Self::ZstdCompressed(_) => f.debug_tuple("ZstdCompressed").field(&"..").finish(),
128 }
129 }
130}
131
132#[derive(Debug, Clone, Eq, PartialEq, Hash)]
137pub struct TransactionPayloadEvent<'a> {
138 payload_size: RawInt<LeU64>,
140
141 algorithm: TransactionPayloadCompressionType,
143
144 uncompressed_size: RawInt<LeU64>,
146
147 payload: RawBytes<'a, EofBytes>,
149
150 header_size: usize,
152}
153
154impl<'a> TransactionPayloadEvent<'a> {
155 pub fn new(
156 payload_size: u64,
157 algorithm: TransactionPayloadCompressionType,
158 uncompressed_size: u64,
159 payload: impl Into<Cow<'a, [u8]>>,
160 ) -> Self {
161 Self {
162 payload_size: RawInt::new(payload_size),
163 algorithm,
164 uncompressed_size: RawInt::new(uncompressed_size),
165 payload: RawBytes::new(payload),
166 header_size: 0,
167 }
168 }
169
170 pub fn with_payload_size(mut self, payload_size: u64) -> Self {
172 self.payload_size = RawInt::new(payload_size);
173 self
174 }
175 pub fn with_algorithm(mut self, algorithm: TransactionPayloadCompressionType) -> Self {
177 self.algorithm = algorithm;
178 self
179 }
180 pub fn with_uncompressed_size(mut self, uncompressed_size: u64) -> Self {
182 self.uncompressed_size = RawInt::new(uncompressed_size);
183 self
184 }
185
186 pub fn with_payload(mut self, payload: impl Into<Cow<'a, [u8]>>) -> Self {
188 self.payload = RawBytes::new(payload);
189 self
190 }
191
192 pub fn payload_size(&self) -> u64 {
194 self.payload_size.0
195 }
196
197 pub fn payload_raw(&'a self) -> &'a [u8] {
199 self.payload.as_bytes()
200 }
201
202 pub fn decompressed(&self) -> io::Result<TransactionPayloadReader<'_>> {
206 if self.algorithm == TransactionPayloadCompressionType::NONE {
207 return Ok(TransactionPayloadReader::new_uncompressed(
208 self.payload_raw(),
209 ));
210 }
211
212 return TransactionPayloadReader::new_zstd(self.payload_raw());
213 }
214
215 pub fn danger_decompress(self) -> Vec<u8> {
222 if self.algorithm == TransactionPayloadCompressionType::NONE {
223 return self.payload_raw().to_vec();
224 }
225 let mut decode_buf = vec![0_u8; self.uncompressed_size.0 as usize];
226 match zstd::stream::copy_decode(self.payload.as_bytes(), &mut decode_buf[..]) {
227 Ok(_) => {}
228 Err(_) => {
229 return Vec::new();
230 }
231 };
232 decode_buf
233 }
234
235 pub fn algorithm(&self) -> TransactionPayloadCompressionType {
237 self.algorithm
238 }
239
240 pub fn uncompressed_size(&self) -> u64 {
242 self.uncompressed_size.0
243 }
244
245 pub fn into_owned(self) -> TransactionPayloadEvent<'static> {
246 TransactionPayloadEvent {
247 payload_size: self.payload_size,
248 algorithm: self.algorithm,
249 uncompressed_size: self.uncompressed_size,
250 payload: self.payload.into_owned(),
251 header_size: self.header_size,
252 }
253 }
254}
255
256impl<'de> MyDeserialize<'de> for TransactionPayloadEvent<'de> {
257 const SIZE: Option<usize> = None;
258 type Ctx = BinlogCtx<'de>;
259 fn deserialize(_ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
260 let mut ob = Self {
261 payload_size: RawInt::new(0),
262 algorithm: TransactionPayloadCompressionType::NONE,
263 uncompressed_size: RawInt::new(0),
264 payload: RawBytes::from("".as_bytes()),
265 header_size: 0,
266 };
267 let mut have_payload_size = false;
268 let mut have_compression_type = false;
269 let original_buf_size = buf.len();
270 while !buf.is_empty() {
271 let field_type = buf.read_lenenc_int()?;
273 match TransactionPayloadFields::try_from(field_type) {
274 Ok(TransactionPayloadFields::OTW_PAYLOAD_HEADER_END_MARK) => {
276 if !have_payload_size || !have_compression_type {
277 Err(io::Error::new(
278 io::ErrorKind::InvalidData,
279 "Missing field in payload header",
280 ))?;
281 }
282 if ob.payload_size.0 as usize > buf.len() {
283 Err(io::Error::new(
284 io::ErrorKind::InvalidData,
285 format!(
286 "Payload size is bigger than the remaining buffer: {} > {}",
287 ob.payload_size.0,
288 buf.len()
289 ),
290 ))?;
291 }
292 ob.header_size = original_buf_size - ob.payload_size.0 as usize;
293 let mut payload_buf: ParseBuf<'_> = buf.parse(ob.payload_size.0 as usize)?;
294 ob.payload = RawBytes::from(payload_buf.eat_all());
295 break;
296 }
297
298 Ok(TransactionPayloadFields::OTW_PAYLOAD_SIZE_FIELD) => {
299 let _length = buf.read_lenenc_int()?;
300 let val = buf.read_lenenc_int()?;
301 ob.payload_size = RawInt::new(val);
302 have_payload_size = true;
303 continue;
304 }
305 Ok(TransactionPayloadFields::OTW_PAYLOAD_COMPRESSION_TYPE_FIELD) => {
306 let _length = buf.read_lenenc_int()?;
307 let val = buf.read_lenenc_int()?;
308 ob.algorithm = TransactionPayloadCompressionType::try_from(val).unwrap();
309 have_compression_type = true;
310 continue;
311 }
312 Ok(TransactionPayloadFields::OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD) => {
313 let _length = buf.read_lenenc_int()?;
314 let val = buf.read_lenenc_int()?;
315 ob.uncompressed_size = RawInt::new(val);
316 continue;
317 }
318 Err(_) => {
319 let length = buf.eat_lenenc_int();
320 buf.skip(length as usize);
321 continue;
322 }
323 };
324 }
325
326 Ok(ob)
327 }
328}
329
330impl MySerialize for TransactionPayloadEvent<'_> {
331 fn serialize(&self, buf: &mut Vec<u8>) {
332 buf.put_lenenc_int(TransactionPayloadFields::OTW_PAYLOAD_COMPRESSION_TYPE_FIELD as u64);
333 buf.put_lenenc_int(crate::misc::lenenc_int_len(self.algorithm as u64));
334 buf.put_lenenc_int(self.algorithm as u64);
335
336 if self.algorithm != TransactionPayloadCompressionType::NONE {
337 buf.put_lenenc_int(
338 TransactionPayloadFields::OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD as u64,
339 );
340 buf.put_lenenc_int(crate::misc::lenenc_int_len(self.uncompressed_size.0));
341 buf.put_lenenc_int(self.uncompressed_size.0);
342 }
343
344 buf.put_lenenc_int(TransactionPayloadFields::OTW_PAYLOAD_SIZE_FIELD as u64);
345 buf.put_lenenc_int(crate::misc::lenenc_int_len(self.payload_size.0));
346 buf.put_lenenc_int(self.payload_size.0);
347
348 buf.put_lenenc_int(TransactionPayloadFields::OTW_PAYLOAD_HEADER_END_MARK as u64);
349
350 self.payload.serialize(&mut *buf);
351 }
352}
353
354impl<'a> BinlogEvent<'a> for TransactionPayloadEvent<'a> {
355 const EVENT_TYPE: EventType = EventType::TRANSACTION_PAYLOAD_EVENT;
356}
357
358impl<'a> BinlogStruct<'a> for TransactionPayloadEvent<'a> {
359 fn len(&self, _version: BinlogVersion) -> usize {
360 let mut len = S(self.header_size);
361
362 len += S(self.payload.0.len());
363
364 min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
365 }
366}