mysql_common/binlog/events/
rows_event.rs
1use std::{cmp::min, fmt, io};
10
11use bitvec::prelude::*;
12use bytes::BufMut;
13use saturating::Saturating as S;
14
15use crate::{
16 binlog::{
17 consts::{BinlogVersion, EventType, RowsEventFlags},
18 row::BinlogRow,
19 BinlogCtx,
20 },
21 io::ParseBuf,
22 misc::{
23 raw::{
24 bytes::{BareBytes, EofBytes},
25 int::*,
26 RawBytes, RawFlags,
27 },
28 unexpected_buf_eof,
29 },
30 proto::{MyDeserialize, MySerialize},
31};
32
33use super::{BinlogEventHeader, TableMapEvent};
34
35#[derive(Debug, Clone, Eq, PartialEq, Hash)]
37pub struct RowsEvent<'a> {
38 event_type: EventType,
40 table_id: RawInt<LeU48>,
46 flags: RawFlags<RowsEventFlags, LeU16>,
48 extra_data: RawBytes<'a, BareBytes<{ u16::MAX as usize - 2 }>>,
52 num_columns: RawInt<LenEnc>,
54 columns_before_image: Option<RawBytes<'a, BareBytes<0x2000000000000000>>>,
59 columns_after_image: Option<RawBytes<'a, BareBytes<0x2000000000000000>>>,
64 rows_data: RawBytes<'a, EofBytes>,
78}
79
80impl<'a> RowsEvent<'a> {
81 pub fn event_type(&self) -> EventType {
83 self.event_type
84 }
85
86 pub fn table_id(&self) -> u64 {
88 self.table_id.0
89 }
90
91 pub fn num_columns(&self) -> u64 {
93 self.num_columns.0
94 }
95
96 pub fn columns_before_image(&'a self) -> Option<&'a BitSlice<u8>> {
100 match self.columns_before_image {
101 Some(ref bytes) => {
102 let slice = BitSlice::from_slice(bytes.as_bytes());
103 Some(&slice[..self.num_columns() as usize])
104 }
105 None => None,
106 }
107 }
108
109 pub fn columns_after_image(&'a self) -> Option<&'a BitSlice<u8>> {
113 match self.columns_after_image {
114 Some(ref bytes) => {
115 let slice = BitSlice::from_slice(bytes.as_bytes());
116 Some(&slice[..self.num_columns() as usize])
117 }
118 None => None,
119 }
120 }
121
122 pub fn rows_data(&'a self) -> &'a [u8] {
124 self.rows_data.as_bytes()
125 }
126
127 pub fn flags(&self) -> RowsEventFlags {
129 self.flags.get()
130 }
131
132 pub fn flags_raw(&self) -> u16 {
134 self.flags.0
135 }
136
137 pub fn len(&self, _version: BinlogVersion) -> usize {
141 let mut len = S(0);
142
143 len += S(6); len += S(2); len += S(2); len += S(min(self.extra_data.len(), u16::MAX as usize - 2)); len += S(crate::misc::lenenc_int_len(self.num_columns()) as usize); let bitmap_len = (self.num_columns() as usize + 7) / 8;
149 if self.columns_before_image.is_some() {
150 len += S(bitmap_len); }
152 if self.columns_after_image.is_some() {
153 len += S(bitmap_len); }
155 len += S(self.rows_data.len());
156
157 min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
158 }
159
160 pub fn rows<'b>(&'b self, table_map_event: &'b TableMapEvent<'b>) -> RowsEventRows<'b> {
162 RowsEventRows {
163 rows_event: self,
164 table_map_event,
165 rows_data: ParseBuf(self.rows_data.as_bytes()),
166 }
167 }
168
169 pub fn into_owned(self) -> RowsEvent<'static> {
170 RowsEvent {
171 event_type: self.event_type,
172 table_id: self.table_id,
173 flags: self.flags,
174 extra_data: self.extra_data.into_owned(),
175 num_columns: self.num_columns,
176 columns_before_image: self.columns_before_image.map(|x| x.into_owned()),
177 columns_after_image: self.columns_after_image.map(|x| x.into_owned()),
178 rows_data: self.rows_data.into_owned(),
179 }
180 }
181}
182
183pub struct RowsEventCtx<'a> {
185 pub event_type: EventType,
187 pub binlog_ctx: BinlogCtx<'a>,
189}
190
191impl<'de> MyDeserialize<'de> for RowsEvent<'de> {
192 const SIZE: Option<usize> = None;
193 type Ctx = RowsEventCtx<'de>;
194
195 fn deserialize(ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
196 let post_header_len = ctx
197 .binlog_ctx
198 .fde
199 .get_event_type_header_length(ctx.event_type);
200
201 let is_delete_event = ctx.event_type == EventType::DELETE_ROWS_EVENT
202 || ctx.event_type == EventType::DELETE_ROWS_EVENT_V1;
203
204 let is_update_event = ctx.event_type == EventType::UPDATE_ROWS_EVENT
205 || ctx.event_type == EventType::UPDATE_ROWS_EVENT_V1
206 || ctx.event_type == EventType::PARTIAL_UPDATE_ROWS_EVENT;
207
208 let table_id = if post_header_len == 6 {
209 let value = buf.parse::<RawInt<LeU32>>(())?;
211 RawInt::new(value.0 as u64)
212 } else {
213 buf.parse(())?
214 };
215
216 let flags = buf.parse(())?;
217
218 let extra_data = if post_header_len
219 == ctx
220 .binlog_ctx
221 .fde
222 .get_event_type_header_length(EventType::WRITE_ROWS_EVENT)
223 {
224 let extra_data_len = buf.checked_eat_u16_le().ok_or_else(unexpected_buf_eof)? as usize;
226 buf.parse(extra_data_len.saturating_sub(2))?
227 } else {
228 RawBytes::new(&[][..])
229 };
230
231 let num_columns: RawInt<LenEnc> = buf.parse(())?;
232 let bitmap_len = (num_columns.0 as usize + 7) / 8;
233
234 let mut columns_before_image = None;
235 let mut columns_after_image = None;
236
237 if is_update_event {
238 columns_before_image = Some(buf.parse(bitmap_len)?);
239 columns_after_image = Some(buf.parse(bitmap_len)?);
240 } else if is_delete_event {
241 columns_before_image = Some(buf.parse(bitmap_len)?);
242 } else {
243 columns_after_image = Some(buf.parse(bitmap_len)?);
244 }
245
246 let rows_data = buf.parse(())?;
247
248 Ok(Self {
249 event_type: ctx.event_type,
250 table_id,
251 flags,
252 extra_data,
253 num_columns,
254 columns_before_image,
255 columns_after_image,
256 rows_data,
257 })
258 }
259}
260
261impl MySerialize for RowsEvent<'_> {
262 fn serialize(&self, buf: &mut Vec<u8>) {
263 self.table_id.serialize(&mut *buf);
264 self.flags.serialize(&mut *buf);
265
266 if self.event_type == EventType::WRITE_ROWS_EVENT
267 || self.event_type == EventType::UPDATE_ROWS_EVENT
268 || self.event_type == EventType::DELETE_ROWS_EVENT
269 || self.event_type == EventType::PARTIAL_UPDATE_ROWS_EVENT
270 {
271 let len = min(self.extra_data.len().saturating_add(2), u16::MAX as usize) as u16;
272 buf.put_u16_le(len);
273 self.extra_data.serialize(&mut *buf);
274 }
275
276 self.num_columns.serialize(&mut *buf);
277
278 if let Some(bitmap) = &self.columns_before_image {
279 bitmap.serialize(&mut *buf);
280 }
281 if let Some(bitmap) = &self.columns_after_image {
282 bitmap.serialize(&mut *buf);
283 }
284
285 self.rows_data.serialize(buf);
286 }
287}
288
289#[derive(Clone, Eq, PartialEq)]
291pub struct RowsEventRows<'a> {
292 rows_event: &'a RowsEvent<'a>,
293 table_map_event: &'a TableMapEvent<'a>,
294 rows_data: ParseBuf<'a>,
295}
296
297impl<'a> RowsEventRows<'a> {
298 pub(crate) fn new(
299 rows_event: &'a RowsEvent<'a>,
300 table_map_event: &'a TableMapEvent<'a>,
301 rows_data: ParseBuf<'a>,
302 ) -> Self {
303 Self {
304 rows_event,
305 table_map_event,
306 rows_data,
307 }
308 }
309}
310
311impl<'a> Iterator for RowsEventRows<'a> {
312 type Item = io::Result<(Option<BinlogRow>, Option<BinlogRow>)>;
313
314 fn next(&mut self) -> Option<Self::Item> {
315 let mut row_before = None;
316 let mut row_after = None;
317
318 if self.rows_data.is_empty() {
319 return None;
320 }
321
322 if let Some(cols) = self.rows_event.columns_before_image() {
323 let ctx = (
324 self.rows_event.num_columns(),
325 cols,
326 false,
327 self.table_map_event,
328 );
329 row_before = match self.rows_data.parse(ctx) {
330 Ok(row_before) => Some(row_before),
331 Err(err) => return Some(Err(err)),
332 };
333 }
334
335 if let Some(cols) = self.rows_event.columns_after_image() {
336 let ctx = (
337 self.rows_event.num_columns(),
338 cols,
339 self.rows_event.event_type == EventType::PARTIAL_UPDATE_ROWS_EVENT,
340 self.table_map_event,
341 );
342 row_after = match self.rows_data.parse(ctx) {
343 Ok(row_after) => Some(row_after),
344 Err(err) => return Some(Err(err)),
345 };
346 }
347
348 Some(Ok((row_before, row_after)))
349 }
350}
351
352impl fmt::Debug for RowsEventRows<'_> {
353 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354 f.debug_list().entries(self.clone()).finish()
355 }
356}