mysql_common/binlog/
row.rs
1use std::{
10 borrow::Cow,
11 convert::{TryFrom, TryInto},
12 fmt, io,
13 sync::Arc,
14};
15
16use bitvec::{prelude::BitVec, slice::BitSlice};
17
18use crate::{
19 constants::{ColumnFlags, ColumnType},
20 io::ParseBuf,
21 misc::raw::int::*,
22 packets::Column,
23 proto::MyDeserialize,
24 row::{new_row_raw, Row},
25 value::Value,
26};
27
28use super::{
29 events::{OptionalMetaExtractor, TableMapEvent},
30 value::{BinlogValue, BinlogValueToValueError},
31};
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35#[allow(non_camel_case_types)]
36#[repr(u64)]
37pub enum BinlogRowValueOptions {
38 PARTIAL_JSON_UPDATES = 1,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, thiserror::Error)]
43#[error("Unknown binlog version {}", _0)]
44#[repr(transparent)]
45pub struct UnknownBinlogRowValueOptions(pub u64);
46
47impl From<UnknownBinlogRowValueOptions> for u64 {
48 fn from(x: UnknownBinlogRowValueOptions) -> Self {
49 x.0
50 }
51}
52
53impl TryFrom<u64> for BinlogRowValueOptions {
54 type Error = UnknownBinlogRowValueOptions;
55
56 fn try_from(value: u64) -> Result<Self, Self::Error> {
57 match value {
58 1 => Ok(Self::PARTIAL_JSON_UPDATES),
59 x => Err(UnknownBinlogRowValueOptions(x)),
60 }
61 }
62}
63
64#[derive(Clone, PartialEq)]
66pub struct BinlogRow {
67 values: Vec<Option<BinlogValue<'static>>>,
68 columns: Arc<[Column]>,
69}
70
71impl BinlogRow {
72 pub fn new(values: Vec<Option<BinlogValue<'static>>>, columns: Arc<[Column]>) -> Self {
73 Self { values, columns }
74 }
75
76 pub fn len(&self) -> usize {
78 self.values.len()
79 }
80
81 pub fn is_empty(&self) -> bool {
83 self.values.is_empty()
84 }
85
86 pub fn columns_ref(&self) -> &[Column] {
88 &self.columns
89 }
90
91 pub fn columns(&self) -> Arc<[Column]> {
93 self.columns.clone()
94 }
95
96 pub fn as_ref(&self, index: usize) -> Option<&BinlogValue> {
101 self.values.get(index).and_then(|x| x.as_ref())
102 }
103
104 pub fn take(&mut self, index: usize) -> Option<BinlogValue> {
107 self.values.get_mut(index).and_then(|x| x.take())
108 }
109
110 pub fn unwrap(self) -> Vec<BinlogValue<'static>> {
116 self.values
117 .into_iter()
118 .map(|x| x.expect("Can't unwrap row if some of columns was taken"))
119 .collect()
120 }
121
122 #[doc(hidden)]
123 pub fn place(&mut self, index: usize, value: BinlogValue<'static>) {
124 self.values[index] = Some(value);
125 }
126}
127
128impl<'de> MyDeserialize<'de> for BinlogRow {
129 const SIZE: Option<usize> = None;
130 type Ctx = (u64, &'de BitSlice<u8>, bool, &'de TableMapEvent<'de>);
138
139 fn deserialize(
140 (num_columns, cols, have_shared_image, table_info): Self::Ctx,
141 buf: &mut ParseBuf<'de>,
142 ) -> io::Result<Self> {
143 let mut values: Vec<Option<BinlogValue<'static>>> = vec![];
144 let mut columns = vec![];
145
146 let mut partial_cols = if have_shared_image {
148 let value_options = *buf.parse::<RawInt<LenEnc>>(())?;
149 if value_options & BinlogRowValueOptions::PARTIAL_JSON_UPDATES as u64 > 0 {
150 let json_columns_count = table_info.json_column_count();
151 let partial_columns_len = (json_columns_count + 7) / 8;
152 let partial_columns: &[u8] = buf.parse(partial_columns_len)?;
153 let partial_columns = BitSlice::<u8>::from_slice(partial_columns);
154 Some(partial_columns.into_iter().take(json_columns_count))
155 } else {
156 None
157 }
158 } else {
159 None
160 };
161
162 let num_bits = cols.count_ones();
163 let bitmap_len = (num_bits + 7) / 8;
164 let bitmap_buf: &[u8] = buf.parse(bitmap_len)?;
165 let mut null_bitmap = BitVec::<u8>::from_slice(bitmap_buf);
166 null_bitmap.truncate(num_bits);
167
168 let mut image_idx = 0;
169
170 let opt_meta_extractor = OptionalMetaExtractor::new(table_info.iter_optional_meta())?;
171
172 let mut signedness_iterator = opt_meta_extractor.iter_signedness();
173 let mut charset_iter = opt_meta_extractor.iter_charset();
174 let mut enum_and_set_charset_iter = opt_meta_extractor.iter_enum_and_set_charset();
175 let mut primary_key_iter = opt_meta_extractor.iter_primary_key();
176 let mut column_name_iter = opt_meta_extractor.iter_column_name();
177
178 for i in 0..(num_columns as usize) {
179 if cols.get(i).as_deref().copied().unwrap_or(false) {
181 let column_type = table_info.get_column_type(i);
182
183 let column_type = match column_type {
185 Ok(Some(ty)) => ty,
186 Ok(None) => {
187 return Err(io::Error::new(io::ErrorKind::InvalidData, "No column type"))
188 }
189 Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidData, e)),
190 };
191
192 let column_meta = table_info.get_column_metadata(i).unwrap_or(&[]);
193
194 let is_partial = column_type == ColumnType::MYSQL_TYPE_JSON
195 && partial_cols
196 .as_mut()
197 .and_then(|bits| bits.next().as_deref().copied())
198 .unwrap_or(false);
199
200 let is_unsigned = column_type
201 .is_numeric_type()
202 .then(|| signedness_iterator.next())
203 .flatten()
204 .unwrap_or_default();
205
206 let charset = if column_type.is_character_type() {
207 charset_iter.next().transpose()?.unwrap_or_default()
208 } else if column_type.is_enum_or_set_type() {
209 enum_and_set_charset_iter
210 .next()
211 .transpose()?
212 .unwrap_or_default()
213 } else {
214 Default::default()
215 };
216
217 let column_name_raw = column_name_iter.next().transpose()?;
218 let column_name = column_name_raw
219 .as_ref()
220 .map(|x| Cow::Borrowed(x.name_raw()))
221 .unwrap_or_else(|| {
222 Cow::Owned(format!("@{}", i).into())
224 });
225
226 let mut column_flags = ColumnFlags::empty();
227
228 if is_unsigned {
229 column_flags |= ColumnFlags::UNSIGNED_FLAG;
230 }
231
232 if primary_key_iter
233 .next_if(|next| next.is_err() || next.as_ref().ok() == Some(&(i as u64)))
234 .transpose()?
235 .is_some()
236 {
237 column_flags |= ColumnFlags::PRI_KEY_FLAG;
238 }
239
240 let column = Column::new(column_type)
241 .with_schema(table_info.database_name_raw())
242 .with_table(table_info.table_name_raw())
243 .with_name(column_name.as_ref())
244 .with_flags(column_flags)
245 .with_schema(table_info.database_name_raw())
246 .with_org_table(table_info.table_name_raw())
247 .with_table(table_info.table_name_raw())
248 .with_character_set(charset);
249
250 columns.push(column);
251
252 if null_bitmap
254 .get(image_idx)
255 .as_deref()
256 .copied()
257 .unwrap_or(true)
258 {
259 values.push(Some(BinlogValue::Value(Value::NULL)));
260 } else {
261 let ctx = (column_type, column_meta, is_unsigned, is_partial);
262 values.push(Some(buf.parse::<BinlogValue>(ctx)?.into_owned()));
263 }
264
265 image_idx += 1;
266 }
267 }
268
269 Ok(BinlogRow::new(values, columns.into_boxed_slice().into()))
270 }
271}
272
273impl fmt::Debug for BinlogRow {
274 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
275 let mut debug = f.debug_struct("BinlogRow");
276 for (val, column) in self.values.iter().zip(self.columns.iter()) {
277 match *val {
278 Some(ref val) => {
279 debug.field(column.name_str().as_ref(), val);
280 }
281 None => {
282 debug.field(column.name_str().as_ref(), &"<taken>");
283 }
284 }
285 }
286 debug.finish()
287 }
288}
289
290#[derive(Debug, thiserror::Error)]
291#[error(
292 "Can't convert BinlogRow to Row at column offset {}: {}",
293 column_offset,
294 error
295)]
296pub struct BinlogRowToRowError {
297 pub column_offset: usize,
299 pub error: BinlogValueToValueError,
301}
302
303impl TryFrom<BinlogRow> for Row {
304 type Error = BinlogRowToRowError;
305
306 fn try_from(binlog_row: BinlogRow) -> Result<Self, Self::Error> {
307 let mut values = Vec::with_capacity(binlog_row.values.len());
308 for (column_offset, value) in binlog_row.values.into_iter().enumerate() {
309 match value {
310 Some(x) => {
311 values.push(Some(x.try_into().map_err(|error| BinlogRowToRowError {
312 column_offset,
313 error,
314 })?))
315 }
316 None => values.push(None),
317 }
318 }
319 Ok(new_row_raw(values, binlog_row.columns))
320 }
321}