mysql_common/binlog/events/
transaction_payload_event.rs

1// Copyright (c) 2023 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use std::{
10    borrow::Cow,
11    cmp::min,
12    convert::TryFrom,
13    fmt,
14    io::{self, BufRead, BufReader},
15};
16
17use saturating::Saturating as S;
18
19#[allow(unused)]
20use crate::binlog::EventStreamReader;
21
22use super::BinlogEventHeader;
23use crate::{
24    binlog::{
25        BinlogCtx, BinlogEvent, BinlogStruct,
26        consts::{
27            BinlogVersion, EventType, TransactionPayloadCompressionType, TransactionPayloadFields,
28        },
29    },
30    io::{BufMutExt, ParseBuf, ReadMysqlExt},
31    misc::raw::{RawBytes, bytes::EofBytes, int::*},
32    proto::{MyDeserialize, MySerialize},
33};
34
35/// This structure implements [`io::BufRead`] and represents
36/// the payload of a [`TransactionPayloadEvent`].
37#[derive(Debug)]
38pub struct TransactionPayloadReader<'a> {
39    inner: TransactionPayloadInner<'a>,
40}
41
42impl<'a> TransactionPayloadReader<'a> {
43    /// Creates new instance (`data` is not compressed).
44    pub fn new_uncompressed(data: &'a [u8]) -> Self {
45        Self {
46            inner: TransactionPayloadInner::Uncompressed(data),
47        }
48    }
49
50    /// Creates new instance (`data` is ZSTD-compressed).
51    pub fn new_zstd(data: &'a [u8]) -> io::Result<Self> {
52        let decoder = zstd::Decoder::with_buffer(data)?;
53        Ok(Self {
54            inner: TransactionPayloadInner::ZstdCompressed(BufReader::new(decoder)),
55        })
56    }
57
58    /// Returns `false` if the reader is exhausted.
59    ///
60    /// Some io might be necessary to check for data,
61    /// so this functions returns `Result<bool>`, not `bool`.
62    pub fn has_data_left(&mut self) -> io::Result<bool> {
63        self.inner.has_data_left()
64    }
65}
66
67impl io::Read for TransactionPayloadReader<'_> {
68    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
69        self.inner.read(buf)
70    }
71}
72
73impl io::BufRead for TransactionPayloadReader<'_> {
74    fn fill_buf(&mut self) -> io::Result<&[u8]> {
75        self.inner.fill_buf()
76    }
77
78    fn consume(&mut self, amt: usize) {
79        self.inner.consume(amt)
80    }
81}
82
83/// Wraps the data providing [`io::BufRead`] implementaion.
84enum TransactionPayloadInner<'a> {
85    Uncompressed(&'a [u8]),
86    ZstdCompressed(BufReader<zstd::Decoder<'a, &'a [u8]>>),
87}
88
89impl TransactionPayloadInner<'_> {
90    fn has_data_left(&mut self) -> io::Result<bool> {
91        match self {
92            TransactionPayloadInner::Uncompressed(x) => Ok(!x.is_empty()),
93            TransactionPayloadInner::ZstdCompressed(x) => x.fill_buf().map(|b| !b.is_empty()),
94        }
95    }
96}
97
98impl io::BufRead for TransactionPayloadInner<'_> {
99    fn fill_buf(&mut self) -> io::Result<&[u8]> {
100        match self {
101            TransactionPayloadInner::Uncompressed(x) => x.fill_buf(),
102            TransactionPayloadInner::ZstdCompressed(x) => x.fill_buf(),
103        }
104    }
105
106    fn consume(&mut self, amt: usize) {
107        match self {
108            TransactionPayloadInner::Uncompressed(x) => x.consume(amt),
109            TransactionPayloadInner::ZstdCompressed(x) => x.consume(amt),
110        }
111    }
112}
113
114impl io::Read for TransactionPayloadInner<'_> {
115    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
116        match self {
117            TransactionPayloadInner::Uncompressed(x) => x.read(buf),
118            TransactionPayloadInner::ZstdCompressed(x) => x.read(buf),
119        }
120    }
121}
122
123impl fmt::Debug for TransactionPayloadInner<'_> {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        match self {
126            Self::Uncompressed(arg0) => f.debug_tuple("Uncompressed").field(arg0).finish(),
127            Self::ZstdCompressed(_) => f.debug_tuple("ZstdCompressed").field(&"..").finish(),
128        }
129    }
130}
131
132/// Event that encloses all the events of a transaction.
133///
134/// It is used for carrying compressed payloads, and contains
135/// compression metadata.
136#[derive(Debug, Clone, Eq, PartialEq, Hash)]
137pub struct TransactionPayloadEvent<'a> {
138    /// payload size
139    payload_size: RawInt<LeU64>,
140
141    /// compression algorithm
142    algorithm: TransactionPayloadCompressionType,
143
144    /// uncompressed size
145    uncompressed_size: RawInt<LeU64>,
146
147    /// payload to be decompressed
148    payload: RawBytes<'a, EofBytes>,
149
150    /// size of parsed header
151    header_size: usize,
152}
153
154impl<'a> TransactionPayloadEvent<'a> {
155    pub fn new(
156        payload_size: u64,
157        algorithm: TransactionPayloadCompressionType,
158        uncompressed_size: u64,
159        payload: impl Into<Cow<'a, [u8]>>,
160    ) -> Self {
161        Self {
162            payload_size: RawInt::new(payload_size),
163            algorithm,
164            uncompressed_size: RawInt::new(uncompressed_size),
165            payload: RawBytes::new(payload),
166            header_size: 0,
167        }
168    }
169
170    /// Sets the `payload_size` field value.
171    pub fn with_payload_size(mut self, payload_size: u64) -> Self {
172        self.payload_size = RawInt::new(payload_size);
173        self
174    }
175    /// Sets the `algorithm` field value.
176    pub fn with_algorithm(mut self, algorithm: TransactionPayloadCompressionType) -> Self {
177        self.algorithm = algorithm;
178        self
179    }
180    /// Sets the `uncompressed_size` field value.
181    pub fn with_uncompressed_size(mut self, uncompressed_size: u64) -> Self {
182        self.uncompressed_size = RawInt::new(uncompressed_size);
183        self
184    }
185
186    /// Sets the `payload` field value.
187    pub fn with_payload(mut self, payload: impl Into<Cow<'a, [u8]>>) -> Self {
188        self.payload = RawBytes::new(payload);
189        self
190    }
191
192    /// Returns the payload_size.
193    pub fn payload_size(&self) -> u64 {
194        self.payload_size.0
195    }
196
197    /// Returns raw payload of the binlog event.
198    pub fn payload_raw(&'a self) -> &'a [u8] {
199        self.payload.as_bytes()
200    }
201
202    /// Returns decompressed payload in form of a struct that implements [`io::BufRead`].
203    ///
204    /// See [`EventStreamReader::read_decompressed`].
205    pub fn decompressed(&self) -> io::Result<TransactionPayloadReader<'_>> {
206        if self.algorithm == TransactionPayloadCompressionType::NONE {
207            return Ok(TransactionPayloadReader::new_uncompressed(
208                self.payload_raw(),
209            ));
210        }
211
212        return TransactionPayloadReader::new_zstd(self.payload_raw());
213    }
214
215    /// Decompress the whole payload.
216    ///
217    /// # Danger
218    ///
219    /// This function may allocate a huge buffer and cause OOM.
220    /// Consider using [`TransactionPayloadEvent::decompressed`] instead.
221    pub fn danger_decompress(self) -> Vec<u8> {
222        if self.algorithm == TransactionPayloadCompressionType::NONE {
223            return self.payload_raw().to_vec();
224        }
225        let mut decode_buf = vec![0_u8; self.uncompressed_size.0 as usize];
226        match zstd::stream::copy_decode(self.payload.as_bytes(), &mut decode_buf[..]) {
227            Ok(_) => {}
228            Err(_) => {
229                return Vec::new();
230            }
231        };
232        decode_buf
233    }
234
235    /// Returns the algorithm.
236    pub fn algorithm(&self) -> TransactionPayloadCompressionType {
237        self.algorithm
238    }
239
240    /// Returns the uncompressed_size.
241    pub fn uncompressed_size(&self) -> u64 {
242        self.uncompressed_size.0
243    }
244
245    pub fn into_owned(self) -> TransactionPayloadEvent<'static> {
246        TransactionPayloadEvent {
247            payload_size: self.payload_size,
248            algorithm: self.algorithm,
249            uncompressed_size: self.uncompressed_size,
250            payload: self.payload.into_owned(),
251            header_size: self.header_size,
252        }
253    }
254}
255
256impl<'de> MyDeserialize<'de> for TransactionPayloadEvent<'de> {
257    const SIZE: Option<usize> = None;
258    type Ctx = BinlogCtx<'de>;
259    fn deserialize(_ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
260        let mut ob = Self {
261            payload_size: RawInt::new(0),
262            algorithm: TransactionPayloadCompressionType::NONE,
263            uncompressed_size: RawInt::new(0),
264            payload: RawBytes::from("".as_bytes()),
265            header_size: 0,
266        };
267        let mut have_payload_size = false;
268        let mut have_compression_type = false;
269        let original_buf_size = buf.len();
270        while !buf.is_empty() {
271            /* read the type of the field. */
272            let field_type = buf.read_lenenc_int()?;
273            match TransactionPayloadFields::try_from(field_type) {
274                // we have reached the end of the header
275                Ok(TransactionPayloadFields::OTW_PAYLOAD_HEADER_END_MARK) => {
276                    if !have_payload_size || !have_compression_type {
277                        Err(io::Error::new(
278                            io::ErrorKind::InvalidData,
279                            "Missing field in payload header",
280                        ))?;
281                    }
282                    if ob.payload_size.0 as usize > buf.len() {
283                        Err(io::Error::new(
284                            io::ErrorKind::InvalidData,
285                            format!(
286                                "Payload size is bigger than the remaining buffer: {} > {}",
287                                ob.payload_size.0,
288                                buf.len()
289                            ),
290                        ))?;
291                    }
292                    ob.header_size = original_buf_size - ob.payload_size.0 as usize;
293                    let mut payload_buf: ParseBuf<'_> = buf.parse(ob.payload_size.0 as usize)?;
294                    ob.payload = RawBytes::from(payload_buf.eat_all());
295                    break;
296                }
297
298                Ok(TransactionPayloadFields::OTW_PAYLOAD_SIZE_FIELD) => {
299                    let _length = buf.read_lenenc_int()?;
300                    let val = buf.read_lenenc_int()?;
301                    ob.payload_size = RawInt::new(val);
302                    have_payload_size = true;
303                    continue;
304                }
305                Ok(TransactionPayloadFields::OTW_PAYLOAD_COMPRESSION_TYPE_FIELD) => {
306                    let _length = buf.read_lenenc_int()?;
307                    let val = buf.read_lenenc_int()?;
308                    ob.algorithm = TransactionPayloadCompressionType::try_from(val).unwrap();
309                    have_compression_type = true;
310                    continue;
311                }
312                Ok(TransactionPayloadFields::OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD) => {
313                    let _length = buf.read_lenenc_int()?;
314                    let val = buf.read_lenenc_int()?;
315                    ob.uncompressed_size = RawInt::new(val);
316                    continue;
317                }
318                Err(_) => {
319                    let length = buf.eat_lenenc_int();
320                    buf.skip(length as usize);
321                    continue;
322                }
323            };
324        }
325
326        Ok(ob)
327    }
328}
329
330impl MySerialize for TransactionPayloadEvent<'_> {
331    fn serialize(&self, buf: &mut Vec<u8>) {
332        buf.put_lenenc_int(TransactionPayloadFields::OTW_PAYLOAD_COMPRESSION_TYPE_FIELD as u64);
333        buf.put_lenenc_int(crate::misc::lenenc_int_len(self.algorithm as u64));
334        buf.put_lenenc_int(self.algorithm as u64);
335
336        if self.algorithm != TransactionPayloadCompressionType::NONE {
337            buf.put_lenenc_int(
338                TransactionPayloadFields::OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD as u64,
339            );
340            buf.put_lenenc_int(crate::misc::lenenc_int_len(self.uncompressed_size.0));
341            buf.put_lenenc_int(self.uncompressed_size.0);
342        }
343
344        buf.put_lenenc_int(TransactionPayloadFields::OTW_PAYLOAD_SIZE_FIELD as u64);
345        buf.put_lenenc_int(crate::misc::lenenc_int_len(self.payload_size.0));
346        buf.put_lenenc_int(self.payload_size.0);
347
348        buf.put_lenenc_int(TransactionPayloadFields::OTW_PAYLOAD_HEADER_END_MARK as u64);
349
350        self.payload.serialize(&mut *buf);
351    }
352}
353
354impl<'a> BinlogEvent<'a> for TransactionPayloadEvent<'a> {
355    const EVENT_TYPE: EventType = EventType::TRANSACTION_PAYLOAD_EVENT;
356}
357
358impl<'a> BinlogStruct<'a> for TransactionPayloadEvent<'a> {
359    fn len(&self, _version: BinlogVersion) -> usize {
360        let mut len = S(self.header_size);
361
362        len += S(self.payload.0.len());
363
364        min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
365    }
366}