mysql_common/binlog/events/
table_map_event.rs

1// Copyright (c) 2021 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use std::{borrow::Cow, cmp::min, convert::TryFrom, io, iter::Peekable};
10
11use bitvec::prelude::*;
12use byteorder::ReadBytesExt;
13use saturating::Saturating as S;
14
15use crate::{
16    binlog::{
17        BinlogCtx, BinlogEvent, BinlogStruct,
18        consts::{BinlogVersion, EventType, OptionalMetadataFieldType},
19    },
20    constants::{ColumnType, GeometryType, UnknownColumnType},
21    io::{ParseBuf, ReadMysqlExt},
22    misc::raw::{
23        Either, RawBytes, RawConst, RawSeq, Skip,
24        bytes::{BareBytes, EofBytes, LenEnc, U8Bytes},
25        int::*,
26    },
27    proto::{MyDeserialize, MySerialize},
28};
29
30use super::BinlogEventHeader;
31
32#[derive(Debug, Clone, Copy, Eq, PartialEq, thiserror::Error)]
33pub enum BadColumnType {
34    #[error(transparent)]
35    Unknown(#[from] UnknownColumnType),
36    #[error("Unexpected column type: {}", _0)]
37    Unexpected(u8),
38}
39
40/// Table map event.
41///
42/// In row-based mode, every row operation event is preceded by a Table_map_event which maps
43/// a table definition to a number.
44#[derive(Debug, Clone, Eq, PartialEq, Hash)]
45pub struct TableMapEvent<'a> {
46    // post-header
47    /// The number that identifies the table.
48    ///
49    /// It's 6 bytes long, so valid range is [0, 1<<48).
50    table_id: RawInt<LeU48>,
51    /// Reserved for future use; currently always 0.
52    flags: RawInt<LeU16>,
53
54    // payload
55    /// The name of the database in which the table resides.
56    ///
57    /// Length must be <= 64 bytes.
58    database_name: RawBytes<'a, U8Bytes>,
59    /// The database name is null-terminated even though it is preceded by the length.
60    __null_1: Skip<1>,
61    /// The name of the table.
62    ///
63    /// Length must be <= 64 bytes.
64    table_name: RawBytes<'a, U8Bytes>,
65    /// The table name is null-terminated even though it is preceded by the length.
66    __null_2: Skip<1>,
67    /// Number of columns in the table.
68    columns_count: RawInt<LenEnc>,
69    /// The type of each column in the table, listed from left to right.
70    columns_type: RawSeq<'a, u8, ColumnType>,
71    /// For each column from left to right, a chunk of data who's length and semantics depends
72    /// on the type of the column.
73    columns_metadata: RawBytes<'a, LenEnc>,
74    /// For each column, a bit indicating whether data in the column can be NULL or not.
75    ///
76    /// The number of bytes needed for this is int((column_count + 7) / 8).
77    /// The flag for the first column from the left is in the least-significant bit
78    /// of the first byte, the second is in the second least significant bit of the first byte,
79    /// the ninth is in the least significant bit of the second byte, and so on.
80    null_bitmask: RawBytes<'a, BareBytes<0x2000000000000000>>,
81    /// Optional metadata.
82    optional_metadata: RawBytes<'a, EofBytes>,
83}
84
85impl<'a> TableMapEvent<'a> {
86    /// Returns the table identifier.
87    pub fn table_id(&self) -> u64 {
88        self.table_id.0
89    }
90
91    /// Returns the number of columns
92    pub fn columns_count(&self) -> u64 {
93        self.columns_count.0
94    }
95
96    /// Ruturns a number of JSON columns.
97    pub fn json_column_count(&self) -> usize {
98        self.columns_type
99            .0
100            .iter()
101            .filter(|x| **x == ColumnType::MYSQL_TYPE_JSON as u8)
102            .count()
103    }
104
105    /// Returns null-bitmap for this table.
106    ///
107    /// For each column this null bitmap contains a bit indicating whether
108    /// data in the column can be NULL or not.
109    pub fn null_bitmask(&'a self) -> &'a BitSlice<u8> {
110        let slice = BitSlice::from_slice(self.null_bitmask.as_bytes());
111        &slice[..self.columns_count() as usize]
112    }
113
114    /// Returns raw database name value.
115    pub fn database_name_raw(&'a self) -> &'a [u8] {
116        self.database_name.as_bytes()
117    }
118
119    /// Returns database name as a string (lossy converted).
120    pub fn database_name(&'a self) -> Cow<'a, str> {
121        self.database_name.as_str()
122    }
123
124    /// Returns raw table name value.
125    pub fn table_name_raw(&'a self) -> &'a [u8] {
126        self.table_name.as_bytes()
127    }
128
129    /// Returns table name as a string (lossy converted).
130    pub fn table_name(&'a self) -> Cow<'a, str> {
131        self.table_name.as_str()
132    }
133
134    /// Returns raw type of the column as stored in the column_type field of the Table Map Event.
135    ///
136    /// `None` means that the column index is out of range.
137    pub fn get_raw_column_type(
138        &self,
139        col_idx: usize,
140    ) -> Result<Option<ColumnType>, UnknownColumnType> {
141        self.columns_type.get(col_idx).map(|x| x.get()).transpose()
142    }
143
144    /// Returns a type of the given column.
145    ///
146    /// It'll read real column type out of the column
147    /// metadata if column type is `MYSQL_TYPE_STRING`.
148    ///
149    /// Returns an error in case of unknown column type
150    /// or unexpected real type for `MYSQL_TYPE_STRING`.
151    ///
152    /// `None` means that the column index is out of range.
153    pub fn get_column_type(&self, col_idx: usize) -> Result<Option<ColumnType>, BadColumnType> {
154        self.columns_type
155            .get(col_idx)
156            .map(|x| {
157                x.get()
158                    .map_err(BadColumnType::from)
159                    .and_then(|column_type| self.get_real_type(col_idx, column_type))
160            })
161            .transpose()
162    }
163
164    /// Returns metadata for the given column.
165    ///
166    /// Returns `None` if column index is out of bounds or if offset couldn't be calculated
167    /// (e.g. because of unknown column type between `0` and `col_idx`).
168    pub fn get_column_metadata(&self, col_idx: usize) -> Option<&[u8]> {
169        let mut offset = 0;
170        for i in 0..=col_idx {
171            let ty = self.columns_type.get(i)?.get().ok()?;
172            let ptr = self.columns_metadata.as_bytes().get(offset..)?;
173            let (metadata, len) = ty.get_metadata(ptr, false)?;
174            if i == col_idx {
175                return Some(metadata);
176            } else {
177                offset += len;
178            }
179        }
180        None
181    }
182
183    pub fn iter_optional_meta(&'a self) -> OptionalMetadataIter<'a> {
184        OptionalMetadataIter {
185            columns: &self.columns_type,
186            data: self.optional_metadata.as_bytes(),
187        }
188    }
189
190    /// Returns a `'static` version of `self`.
191    pub fn into_owned(self) -> TableMapEvent<'static> {
192        TableMapEvent {
193            table_id: self.table_id,
194            flags: self.flags,
195            database_name: self.database_name.into_owned(),
196            __null_1: self.__null_1,
197            table_name: self.table_name.into_owned(),
198            __null_2: self.__null_2,
199            columns_count: self.columns_count,
200            columns_type: self.columns_type.into_owned(),
201            columns_metadata: self.columns_metadata.into_owned(),
202            null_bitmask: self.null_bitmask.into_owned(),
203            optional_metadata: self.optional_metadata.into_owned(),
204        }
205    }
206
207    fn get_real_type(
208        &self,
209        col_idx: usize,
210        column_type: ColumnType,
211    ) -> Result<ColumnType, BadColumnType> {
212        match column_type {
213            ColumnType::MYSQL_TYPE_DATE => {
214                // This type has not been used since before row-based replication,
215                // so we can safely assume that it really is MYSQL_TYPE_NEWDATE.
216                return Ok(ColumnType::MYSQL_TYPE_NEWDATE);
217            }
218            ColumnType::MYSQL_TYPE_STRING => {
219                let mut real_type = column_type as u8;
220                if let Some(metadata_bytes) = self.get_column_metadata(col_idx) {
221                    let f1 = metadata_bytes[0];
222
223                    if f1 != 0 {
224                        real_type = f1 | 0x30;
225                    }
226
227                    match real_type {
228                        247 => return Ok(ColumnType::MYSQL_TYPE_ENUM),
229                        248 => return Ok(ColumnType::MYSQL_TYPE_SET),
230                        254 => return Ok(ColumnType::MYSQL_TYPE_STRING),
231                        x => {
232                            // this event seems to be malformed
233                            return Err(BadColumnType::Unexpected(x));
234                        }
235                    };
236                }
237            }
238            _ => (),
239        }
240
241        Ok(column_type)
242    }
243}
244
245impl<'de> MyDeserialize<'de> for TableMapEvent<'de> {
246    const SIZE: Option<usize> = None;
247    type Ctx = BinlogCtx<'de>;
248
249    fn deserialize(ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
250        let table_id = if 6 == ctx.fde.get_event_type_header_length(Self::EVENT_TYPE) {
251            // old server
252            let table_id: RawInt<LeU32> = buf.parse(())?;
253            RawInt::new(table_id.0 as u64)
254        } else {
255            buf.parse(())?
256        };
257
258        let flags = buf.parse(())?;
259
260        let database_name = buf.parse(())?;
261        let __null_1 = buf.parse(())?;
262        let table_name = buf.parse(())?;
263        let __null_2 = buf.parse(())?;
264
265        let columns_count: RawInt<LenEnc> = buf.parse(())?;
266        let columns_type = buf.parse(columns_count.0 as usize)?;
267        let columns_metadata = buf.parse(())?;
268        let null_bitmask = buf.parse(((columns_count.0 + 7) / 8) as usize)?;
269        let optional_metadata = buf.parse(())?;
270
271        Ok(TableMapEvent {
272            table_id,
273            flags,
274            database_name,
275            __null_1,
276            table_name,
277            __null_2,
278            columns_count,
279            columns_type,
280            columns_metadata,
281            null_bitmask,
282            optional_metadata,
283        })
284    }
285}
286
287impl MySerialize for TableMapEvent<'_> {
288    fn serialize(&self, buf: &mut Vec<u8>) {
289        self.table_id.serialize(&mut *buf);
290        self.flags.serialize(&mut *buf);
291        self.database_name.serialize(&mut *buf);
292        self.__null_1.serialize(&mut *buf);
293        self.table_name.serialize(&mut *buf);
294        self.__null_2.serialize(&mut *buf);
295        self.columns_count.serialize(&mut *buf);
296        self.columns_type.serialize(&mut *buf);
297        self.columns_metadata.serialize(&mut *buf);
298        self.null_bitmask.serialize(&mut *buf);
299        self.optional_metadata.serialize(&mut *buf);
300    }
301}
302
303impl<'a> BinlogEvent<'a> for TableMapEvent<'a> {
304    const EVENT_TYPE: EventType = EventType::TABLE_MAP_EVENT;
305}
306
307impl<'a> BinlogStruct<'a> for TableMapEvent<'a> {
308    fn len(&self, _version: BinlogVersion) -> usize {
309        let mut len = S(0);
310
311        len += S(6);
312        len += S(2);
313        len += S(1);
314        len += S(min(self.database_name.0.len(), u8::MAX as usize));
315        len += S(1);
316        len += S(1);
317        len += S(min(self.table_name.0.len(), u8::MAX as usize));
318        len += S(1);
319        len += S(crate::misc::lenenc_int_len(self.columns_count()) as usize);
320        len += S(self.columns_count() as usize);
321        len += S(crate::misc::lenenc_str_len(self.columns_metadata.as_bytes()) as usize);
322        len += S((self.columns_count() as usize + 8) / 7);
323        len += S(self.optional_metadata.len());
324
325        min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
326    }
327}
328
329/// Optional metadata field that contains charsets for columns.
330///
331/// - contains charsets for caracter columns if it's a [`OptionalMetadataField::DefaultCharset`];
332/// – contains charsets for ENUM and SET columns if it's a
333///   [`OptionalMetadataField::EnumAndSetDefaultCharset`].
334#[derive(Debug, Clone, Eq, PartialEq)]
335pub struct DefaultCharset<'a> {
336    /// Default charset+collation id.
337    default_charset: RawInt<LenEnc>,
338    /// Maps column index to its charset+collation id for columns which charset isn't default.
339    non_default: RawBytes<'a, EofBytes>,
340}
341
342impl<'a> DefaultCharset<'a> {
343    /// Returns the default charset.
344    pub fn default_charset(&self) -> u16 {
345        self.default_charset.0 as u16
346    }
347
348    /// Iterates non-default charsets. Errors if data is malformed.
349    ///
350    /// It'll either enumerate charsets for character columns
351    /// (for [`OptionalMetadataField::DefaultCharset`]) or for ENUM and SET columns
352    /// (for [`OptionalMetadataField::EnumAndSetDefaultCharset`]).
353    /// See [`ColumnType::is_character_type`] and [`ColumnType::is_enum_or_set_type`].
354    ///
355    /// The order is same to the order of [`TableMapEvent::get_column_type`] field.
356    pub fn iter_non_default(&self) -> IterNonDefault<'_> {
357        IterNonDefault {
358            buf: ParseBuf(self.non_default.as_bytes()),
359        }
360    }
361}
362
363pub struct IterNonDefault<'a> {
364    buf: ParseBuf<'a>,
365}
366
367impl<'a> Iterator for IterNonDefault<'a> {
368    type Item = io::Result<NonDefaultCharset>;
369
370    fn next(&mut self) -> Option<Self::Item> {
371        if self.buf.is_empty() {
372            None
373        } else {
374            match self.buf.parse(()) {
375                Ok(x) => Some(Ok(x)),
376                Err(e) => {
377                    self.buf = ParseBuf(b"");
378                    Some(Err(e))
379                }
380            }
381        }
382    }
383}
384
385impl<'de> MyDeserialize<'de> for DefaultCharset<'de> {
386    const SIZE: Option<usize> = None;
387    type Ctx = ();
388
389    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
390        Ok(Self {
391            default_charset: buf.parse(())?,
392            non_default: buf.parse(())?,
393        })
394    }
395}
396
397impl MySerialize for DefaultCharset<'_> {
398    fn serialize(&self, buf: &mut Vec<u8>) {
399        self.default_charset.serialize(&mut *buf);
400        self.non_default.serialize(&mut *buf);
401    }
402}
403
404/// Contains `column_id -> charset+collation` mapping for columns with non-default charsets.
405/// (see [`DefaultCharset::iter_non_default`]).
406#[derive(Debug, Clone, Eq, PartialEq)]
407pub struct NonDefaultCharset {
408    column_index: RawInt<LenEnc>,
409    charset: RawInt<LenEnc>,
410}
411
412impl NonDefaultCharset {
413    pub fn new(column_index: u64, charset: u16) -> Self {
414        Self {
415            column_index: RawInt::new(column_index),
416            charset: RawInt::new(charset as u64),
417        }
418    }
419
420    /// Returns the column index.
421    pub fn column_index(&self) -> u64 {
422        self.column_index.0
423    }
424
425    /// Returns the charset+collation.
426    pub fn charset(&self) -> u16 {
427        self.charset.0 as u16
428    }
429}
430
431impl<'de> MyDeserialize<'de> for NonDefaultCharset {
432    const SIZE: Option<usize> = None;
433    type Ctx = ();
434
435    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
436        Ok(Self {
437            column_index: buf.parse(())?,
438            charset: buf.parse(())?,
439        })
440    }
441}
442
443impl MySerialize for NonDefaultCharset {
444    fn serialize(&self, buf: &mut Vec<u8>) {
445        self.column_index.serialize(&mut *buf);
446        self.charset.serialize(&mut *buf);
447    }
448}
449
450/// Contains charset+collation for column.
451///
452/// - contains charsets for caracter columns if it's a [`OptionalMetadataField::ColumnCharset`];
453/// – contains charsets for ENUM and SET columns if it's a
454///   [`OptionalMetadataField::EnumAndSetColumnCharset`].
455#[derive(Debug, Clone, Eq, PartialEq)]
456pub struct ColumnCharsets<'a> {
457    charsets: RawBytes<'a, EofBytes>,
458}
459
460impl<'a> ColumnCharsets<'a> {
461    /// Returns an iterator over charsets.
462    ///
463    /// It'll either enumerate charsets for character columns
464    /// (for [`OptionalMetadataField::DefaultCharset`]) or for ENUM and SET columns
465    /// (for [`OptionalMetadataField::EnumAndSetDefaultCharset`]).
466    /// See [`ColumnType::is_character_type`] and [`ColumnType::is_enum_or_set_type`].
467    ///
468    /// The order is same to the order of [`TableMapEvent::get_column_type`] field.
469    pub fn iter_charsets(&'a self) -> IterCharsets<'a> {
470        IterCharsets {
471            buf: ParseBuf(self.charsets.as_bytes()),
472        }
473    }
474}
475
476pub struct IterCharsets<'a> {
477    buf: ParseBuf<'a>,
478}
479
480impl<'a> Iterator for IterCharsets<'a> {
481    type Item = io::Result<u16>;
482
483    fn next(&mut self) -> Option<Self::Item> {
484        if self.buf.is_empty() {
485            None
486        } else {
487            match self.buf.parse::<RawInt<LenEnc>>(()) {
488                Ok(x) => Some(Ok(x.0 as u16)),
489                Err(e) => {
490                    self.buf = ParseBuf(b"");
491                    Some(Err(e))
492                }
493            }
494        }
495    }
496}
497
498impl<'de> MyDeserialize<'de> for ColumnCharsets<'de> {
499    const SIZE: Option<usize> = None;
500    type Ctx = ();
501
502    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
503        Ok(Self {
504            charsets: buf.parse(())?,
505        })
506    }
507}
508
509impl MySerialize for ColumnCharsets<'_> {
510    fn serialize(&self, buf: &mut Vec<u8>) {
511        self.charsets.serialize(buf);
512    }
513}
514
515/// Name of a column in [`ColumnNames`].
516#[derive(Debug, Clone, Eq, PartialEq)]
517pub struct ColumnName<'a> {
518    name: RawBytes<'a, LenEnc>,
519}
520
521impl<'a> ColumnName<'a> {
522    /// Creates new column name.
523    pub fn new(name: impl Into<Cow<'a, [u8]>>) -> Self {
524        Self {
525            name: RawBytes::new(name),
526        }
527    }
528
529    /// Returns the raw name.
530    pub fn name_raw(&'a self) -> &'a [u8] {
531        self.name.as_bytes()
532    }
533
534    /// Returns the name as a string (lossy converted).
535    pub fn name(&'a self) -> Cow<'a, str> {
536        self.name.as_str()
537    }
538
539    /// Converts self to a 'static version.
540    pub fn into_owned(self) -> ColumnName<'static> {
541        ColumnName {
542            name: self.name.into_owned(),
543        }
544    }
545}
546
547impl<'de> MyDeserialize<'de> for ColumnName<'de> {
548    const SIZE: Option<usize> = None;
549    type Ctx = ();
550
551    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
552        Ok(Self {
553            name: buf.parse(())?,
554        })
555    }
556}
557
558impl MySerialize for ColumnName<'_> {
559    fn serialize(&self, buf: &mut Vec<u8>) {
560        self.name.serialize(buf);
561    }
562}
563
564/// Contains names of columns.
565#[derive(Debug, Clone, Eq, PartialEq)]
566pub struct ColumnNames<'a> {
567    names: RawBytes<'a, EofBytes>,
568}
569
570impl<'a> ColumnNames<'a> {
571    /// Returns an iterator over names of columns.
572    pub fn iter_names(&self) -> IterNames<'_> {
573        IterNames {
574            buf: ParseBuf(self.names.as_bytes()),
575        }
576    }
577}
578
579pub struct IterNames<'a> {
580    buf: ParseBuf<'a>,
581}
582
583impl<'a> Iterator for IterNames<'a> {
584    type Item = io::Result<ColumnName<'a>>;
585
586    fn next(&mut self) -> Option<Self::Item> {
587        if self.buf.is_empty() {
588            None
589        } else {
590            match self.buf.parse(()) {
591                Ok(x) => Some(Ok(x)),
592                Err(e) => {
593                    self.buf = ParseBuf(b"");
594                    Some(Err(e))
595                }
596            }
597        }
598    }
599}
600
601impl<'de> MyDeserialize<'de> for ColumnNames<'de> {
602    const SIZE: Option<usize> = None;
603    type Ctx = ();
604
605    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
606        Ok(Self {
607            names: buf.parse(())?,
608        })
609    }
610}
611
612impl MySerialize for ColumnNames<'_> {
613    fn serialize(&self, buf: &mut Vec<u8>) {
614        self.names.serialize(buf);
615    }
616}
617
618/// Contains string values of SET columns.
619#[derive(Debug, Clone, Eq, PartialEq)]
620pub struct SetsStrValues<'a> {
621    values: RawBytes<'a, EofBytes>,
622}
623
624impl<'a> SetsStrValues<'a> {
625    /// Returns an iterator over SET columns string values.
626    pub fn iter_values(&'a self) -> IterSetStrValues<'a> {
627        IterSetStrValues {
628            buf: ParseBuf(self.values.as_bytes()),
629        }
630    }
631}
632
633pub struct IterSetStrValues<'a> {
634    buf: ParseBuf<'a>,
635}
636
637impl<'a> Iterator for IterSetStrValues<'a> {
638    type Item = io::Result<SetStrValues<'a>>;
639
640    fn next(&mut self) -> Option<Self::Item> {
641        if self.buf.is_empty() {
642            None
643        } else {
644            match self.buf.parse(()) {
645                Ok(x) => Some(Ok(x)),
646                Err(e) => {
647                    self.buf = ParseBuf(b"");
648                    Some(Err(e))
649                }
650            }
651        }
652    }
653}
654
655impl<'de> MyDeserialize<'de> for SetsStrValues<'de> {
656    const SIZE: Option<usize> = None;
657    type Ctx = ();
658
659    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
660        Ok(Self {
661            values: buf.parse(())?,
662        })
663    }
664}
665
666impl MySerialize for SetsStrValues<'_> {
667    fn serialize(&self, buf: &mut Vec<u8>) {
668        self.values.serialize(buf);
669    }
670}
671
672/// String value for a SET column variant.
673#[derive(Debug, Clone, Eq, PartialEq)]
674pub struct SetStrValue<'a> {
675    value: RawBytes<'a, LenEnc>,
676}
677
678impl<'a> SetStrValue<'a> {
679    /// Creates a new SET variant name.
680    pub fn new(value: impl Into<Cow<'a, [u8]>>) -> Self {
681        Self {
682            value: RawBytes::new(value),
683        }
684    }
685
686    /// Returns the raw value.
687    pub fn value_raw(&'a self) -> &'a [u8] {
688        self.value.as_bytes()
689    }
690
691    /// Returns the value as a string (lossy converted).
692    pub fn value(&'a self) -> Cow<'a, str> {
693        self.value.as_str()
694    }
695}
696
697impl<'de> MyDeserialize<'de> for SetStrValue<'de> {
698    const SIZE: Option<usize> = None;
699    type Ctx = ();
700
701    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
702        Ok(Self {
703            value: buf.parse(())?,
704        })
705    }
706}
707
708impl MySerialize for SetStrValue<'_> {
709    fn serialize(&self, buf: &mut Vec<u8>) {
710        self.value.serialize(buf);
711    }
712}
713
714/// Contains string values for SET column.
715#[derive(Debug, Clone, Eq, PartialEq)]
716pub struct SetStrValues<'a> {
717    num_variants: RawInt<LenEnc>,
718    values: Vec<SetStrValue<'a>>,
719}
720
721impl<'a> SetStrValues<'a> {
722    /// Returns the number of variants in this SET column.
723    pub fn num_variants(&self) -> u64 {
724        self.num_variants.0
725    }
726
727    /// Returns an iterator over string values of SET variants.
728    pub fn values(&'a self) -> &'a [SetStrValue<'a>] {
729        self.values.as_ref()
730    }
731}
732
733impl<'de> MyDeserialize<'de> for SetStrValues<'de> {
734    const SIZE: Option<usize> = None;
735    type Ctx = ();
736
737    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
738        let num_variants: RawInt<LenEnc> = buf.parse(())?;
739        let mut values = Vec::with_capacity(num_variants.0 as usize);
740        for _ in 0..num_variants.0 {
741            values.push(buf.parse(())?);
742        }
743        Ok(Self {
744            num_variants,
745            values,
746        })
747    }
748}
749
750impl MySerialize for SetStrValues<'_> {
751    fn serialize(&self, buf: &mut Vec<u8>) {
752        self.num_variants.serialize(&mut *buf);
753        for value in &self.values {
754            value.serialize(buf);
755        }
756    }
757}
758
759/// Contains string values of ENUM columns.
760#[derive(Debug, Clone, Eq, PartialEq)]
761pub struct EnumsStrValues<'a> {
762    values: RawBytes<'a, EofBytes>,
763}
764
765impl<'a> EnumsStrValues<'a> {
766    /// Returns an iterator over ENUM columns string values.
767    pub fn iter_values(&'a self) -> IterEnumStrValues<'a> {
768        IterEnumStrValues {
769            buf: ParseBuf(self.values.as_bytes()),
770        }
771    }
772}
773
774pub struct IterEnumStrValues<'a> {
775    buf: ParseBuf<'a>,
776}
777
778impl<'a> Iterator for IterEnumStrValues<'a> {
779    type Item = io::Result<EnumStrValues<'a>>;
780
781    fn next(&mut self) -> Option<Self::Item> {
782        if self.buf.is_empty() {
783            None
784        } else {
785            match self.buf.parse(()) {
786                Ok(x) => Some(Ok(x)),
787                Err(e) => {
788                    self.buf = ParseBuf(b"");
789                    Some(Err(e))
790                }
791            }
792        }
793    }
794}
795
796impl<'de> MyDeserialize<'de> for EnumsStrValues<'de> {
797    const SIZE: Option<usize> = None;
798    type Ctx = ();
799
800    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
801        Ok(Self {
802            values: buf.parse(())?,
803        })
804    }
805}
806
807impl MySerialize for EnumsStrValues<'_> {
808    fn serialize(&self, buf: &mut Vec<u8>) {
809        self.values.serialize(buf);
810    }
811}
812
813/// String value for an ENUM column variant.
814#[derive(Debug, Clone, Eq, PartialEq)]
815pub struct EnumStrValue<'a> {
816    value: RawBytes<'a, LenEnc>,
817}
818
819impl<'a> EnumStrValue<'a> {
820    /// Creates a new ENUM variant name.
821    pub fn new(value: impl Into<Cow<'a, [u8]>>) -> Self {
822        Self {
823            value: RawBytes::new(value),
824        }
825    }
826
827    /// Returns the raw value.
828    pub fn value_raw(&'a self) -> &'a [u8] {
829        self.value.as_bytes()
830    }
831
832    /// Returns the value as a string (lossy converted).
833    pub fn value(&'a self) -> Cow<'a, str> {
834        self.value.as_str()
835    }
836}
837
838impl<'de> MyDeserialize<'de> for EnumStrValue<'de> {
839    const SIZE: Option<usize> = None;
840    type Ctx = ();
841
842    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
843        Ok(Self {
844            value: buf.parse(())?,
845        })
846    }
847}
848
849impl MySerialize for EnumStrValue<'_> {
850    fn serialize(&self, buf: &mut Vec<u8>) {
851        self.value.serialize(buf);
852    }
853}
854
855/// Contains string values for ENUM column.
856#[derive(Debug, Clone, Eq, PartialEq)]
857pub struct EnumStrValues<'a> {
858    num_variants: RawInt<LenEnc>,
859    values: Vec<EnumStrValue<'a>>,
860}
861
862impl<'a> EnumStrValues<'a> {
863    /// Returns the number of variants in this ENUM column.
864    pub fn num_variants(&self) -> u64 {
865        self.num_variants.0
866    }
867
868    /// Returns an iterator over string values of ENUM variants.
869    pub fn values(&'a self) -> &'a [EnumStrValue<'a>] {
870        self.values.as_ref()
871    }
872}
873
874impl<'de> MyDeserialize<'de> for EnumStrValues<'de> {
875    const SIZE: Option<usize> = None;
876    type Ctx = ();
877
878    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
879        let num_variants: RawInt<LenEnc> = buf.parse(())?;
880        let mut values = Vec::with_capacity(num_variants.0 as usize);
881        for _ in 0..num_variants.0 {
882            values.push(buf.parse(())?);
883        }
884        Ok(Self {
885            num_variants,
886            values,
887        })
888    }
889}
890
891impl MySerialize for EnumStrValues<'_> {
892    fn serialize(&self, buf: &mut Vec<u8>) {
893        self.num_variants.serialize(&mut *buf);
894        for value in &self.values {
895            value.serialize(buf);
896        }
897    }
898}
899
900/// Contains real types for every geometry column.
901#[derive(Debug, Clone, Eq, PartialEq)]
902pub struct GeometryTypes<'a> {
903    geometry_types: RawBytes<'a, EofBytes>,
904}
905
906impl<'a> GeometryTypes<'a> {
907    /// Returns an iterator over real types.
908    ///
909    /// It will signal an error and stop iteration if field value is malformed.
910    ///
911    /// It'll continue iteration in case of unknown [`GeometryType`]. This error is indicated
912    /// by the [`std::io::ErrorKind::InvalidData`] kind and
913    /// [`crate::constants::UnknownGeometryType`] payload.
914    pub fn iter_geometry_types(&'a self) -> IterGeometryTypes<'a> {
915        IterGeometryTypes {
916            buf: ParseBuf(self.geometry_types.as_bytes()),
917        }
918    }
919}
920
921pub struct IterGeometryTypes<'a> {
922    buf: ParseBuf<'a>,
923}
924
925impl<'a> Iterator for IterGeometryTypes<'a> {
926    type Item = io::Result<GeometryType>;
927
928    fn next(&mut self) -> Option<Self::Item> {
929        if self.buf.is_empty() {
930            None
931        } else {
932            match self.buf.parse::<RawInt<LenEnc>>(()) {
933                Ok(x) => Some(
934                    GeometryType::try_from(x.0 as u8)
935                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)),
936                ),
937                Err(e) => {
938                    self.buf = ParseBuf(b"");
939                    Some(Err(e))
940                }
941            }
942        }
943    }
944}
945
946impl<'de> MyDeserialize<'de> for GeometryTypes<'de> {
947    const SIZE: Option<usize> = None;
948    type Ctx = ();
949
950    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
951        Ok(Self {
952            geometry_types: buf.parse(())?,
953        })
954    }
955}
956
957impl MySerialize for GeometryTypes<'_> {
958    fn serialize(&self, buf: &mut Vec<u8>) {
959        self.geometry_types.serialize(buf);
960    }
961}
962
963/// Contains a number of dimensions for every vector column.
964#[derive(Debug, Clone, Eq, PartialEq)]
965pub struct VectorDimensionalities<'a> {
966    dimensionalities: RawBytes<'a, EofBytes>,
967}
968
969impl<'a> VectorDimensionalities<'a> {
970    /// Returns an iterator over dimensionalities.
971    ///
972    /// It will signal an error and stop iteration if field value is malformed.
973    pub fn iter_dimensionalities(&'a self) -> IterVectorDimensionalities<'a> {
974        IterVectorDimensionalities {
975            buf: ParseBuf(self.dimensionalities.as_bytes()),
976        }
977    }
978}
979
980pub struct IterVectorDimensionalities<'a> {
981    buf: ParseBuf<'a>,
982}
983
984impl<'a> Iterator for IterVectorDimensionalities<'a> {
985    type Item = io::Result<u64>;
986
987    fn next(&mut self) -> Option<Self::Item> {
988        if self.buf.is_empty() {
989            None
990        } else {
991            match self.buf.parse::<RawInt<LenEnc>>(()) {
992                Ok(x) => Some(Ok(x.0)),
993                Err(e) => {
994                    self.buf = ParseBuf(b"");
995                    Some(Err(e))
996                }
997            }
998        }
999    }
1000}
1001
1002impl<'de> MyDeserialize<'de> for VectorDimensionalities<'de> {
1003    const SIZE: Option<usize> = None;
1004    type Ctx = ();
1005
1006    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
1007        Ok(Self {
1008            dimensionalities: buf.parse(())?,
1009        })
1010    }
1011}
1012
1013impl MySerialize for VectorDimensionalities<'_> {
1014    fn serialize(&self, buf: &mut Vec<u8>) {
1015        self.dimensionalities.serialize(buf);
1016    }
1017}
1018
1019/// Contains a sequence of PK column indexes where PK doesn't have a prefix.
1020#[derive(Debug, Clone, Eq, PartialEq)]
1021pub struct SimplePrimaryKey<'a> {
1022    indexes: RawBytes<'a, EofBytes>,
1023}
1024
1025impl<'a> SimplePrimaryKey<'a> {
1026    /// Returns an iterator over column indexes.
1027    pub fn iter_indexes(&'a self) -> IterIndexes<'a> {
1028        IterIndexes {
1029            buf: ParseBuf(self.indexes.as_bytes()),
1030        }
1031    }
1032}
1033
1034pub struct IterIndexes<'a> {
1035    buf: ParseBuf<'a>,
1036}
1037
1038impl<'a> Iterator for IterIndexes<'a> {
1039    type Item = io::Result<u64>;
1040
1041    fn next(&mut self) -> Option<Self::Item> {
1042        if self.buf.is_empty() {
1043            None
1044        } else {
1045            match self.buf.parse::<RawInt<LenEnc>>(()) {
1046                Ok(x) => Some(Ok(x.0)),
1047                Err(e) => {
1048                    self.buf = ParseBuf(b"");
1049                    Some(Err(e))
1050                }
1051            }
1052        }
1053    }
1054}
1055
1056impl<'de> MyDeserialize<'de> for SimplePrimaryKey<'de> {
1057    const SIZE: Option<usize> = None;
1058    type Ctx = ();
1059
1060    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
1061        Ok(Self {
1062            indexes: buf.parse(())?,
1063        })
1064    }
1065}
1066
1067impl MySerialize for SimplePrimaryKey<'_> {
1068    fn serialize(&self, buf: &mut Vec<u8>) {
1069        self.indexes.serialize(buf);
1070    }
1071}
1072
1073/// Contains a sequence of primary keys with prefix.
1074#[derive(Debug, Clone, Eq, PartialEq)]
1075pub struct PrimaryKeysWithPrefix<'a> {
1076    data: RawBytes<'a, EofBytes>,
1077}
1078
1079impl<'a> PrimaryKeysWithPrefix<'a> {
1080    /// Returns an iterator over column indexes and corresponding prefix lengths.
1081    pub fn iter_keys(&'a self) -> IterKeys<'a> {
1082        IterKeys {
1083            buf: ParseBuf(self.data.as_bytes()),
1084        }
1085    }
1086}
1087
1088pub struct IterKeys<'a> {
1089    buf: ParseBuf<'a>,
1090}
1091
1092impl<'a> Iterator for IterKeys<'a> {
1093    type Item = io::Result<PrimaryKeyWithPrefix>;
1094
1095    fn next(&mut self) -> Option<Self::Item> {
1096        if self.buf.is_empty() {
1097            None
1098        } else {
1099            match self.buf.parse(()) {
1100                Ok(x) => Some(Ok(x)),
1101                Err(e) => {
1102                    self.buf = ParseBuf(b"");
1103                    Some(Err(e))
1104                }
1105            }
1106        }
1107    }
1108}
1109
1110impl<'de> MyDeserialize<'de> for PrimaryKeysWithPrefix<'de> {
1111    const SIZE: Option<usize> = None;
1112    type Ctx = ();
1113
1114    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
1115        Ok(Self {
1116            data: buf.parse(())?,
1117        })
1118    }
1119}
1120
1121impl MySerialize for PrimaryKeysWithPrefix<'_> {
1122    fn serialize(&self, buf: &mut Vec<u8>) {
1123        self.data.serialize(buf);
1124    }
1125}
1126
1127/// Info about primary key with a prefix (see [`PrimaryKeysWithPrefix`]).
1128#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1129pub struct PrimaryKeyWithPrefix {
1130    column_index: RawInt<LenEnc>,
1131    prefix_length: RawInt<LenEnc>,
1132}
1133
1134impl PrimaryKeyWithPrefix {
1135    /// Creates a new PK with prefix.
1136    pub fn new(column_index: u64, prefix_length: u64) -> Self {
1137        Self {
1138            column_index: RawInt::new(column_index),
1139            prefix_length: RawInt::new(prefix_length),
1140        }
1141    }
1142
1143    /// Returns the column index
1144    pub fn column_index(&self) -> u64 {
1145        self.column_index.0
1146    }
1147
1148    /// Returns the prefix length. `0` means that the whole column value is used.
1149    pub fn prefix_length(&self) -> u64 {
1150        self.prefix_length.0
1151    }
1152}
1153
1154impl<'de> MyDeserialize<'de> for PrimaryKeyWithPrefix {
1155    const SIZE: Option<usize> = None;
1156    type Ctx = ();
1157
1158    fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
1159        Ok(Self {
1160            column_index: buf.parse(())?,
1161            prefix_length: buf.parse(())?,
1162        })
1163    }
1164}
1165
1166impl MySerialize for PrimaryKeyWithPrefix {
1167    fn serialize(&self, buf: &mut Vec<u8>) {
1168        self.column_index.serialize(buf);
1169        self.prefix_length.serialize(buf);
1170    }
1171}
1172
1173#[derive(Debug, Clone, Eq, PartialEq)]
1174pub enum OptionalMetadataField<'a> {
1175    /// See [`OptionalMetadataFieldType::SIGNEDNESS`].
1176    Signedness(
1177        /// Flags indicating unsignedness for every numeric column.
1178        &'a BitSlice<u8, Msb0>,
1179    ),
1180    /// See [`OptionalMetadataFieldType::DEFAULT_CHARSET`].
1181    DefaultCharset(DefaultCharset<'a>),
1182    /// See [`OptionalMetadataFieldType::COLUMN_CHARSET`].
1183    ColumnCharset(ColumnCharsets<'a>),
1184    /// See [`OptionalMetadataFieldType::COLUMN_NAME`].
1185    ColumnName(ColumnNames<'a>),
1186    /// See [`OptionalMetadataFieldType::SET_STR_VALUE`].
1187    SetStrValue(SetsStrValues<'a>),
1188    /// See [`OptionalMetadataFieldType::ENUM_STR_VALUE`].
1189    EnumStrValue(EnumsStrValues<'a>),
1190    /// See [`OptionalMetadataFieldType::GEOMETRY_TYPE`].
1191    GeometryType(GeometryTypes<'a>),
1192    /// See [`OptionalMetadataFieldType::SIMPLE_PRIMARY_KEY`].
1193    SimplePrimaryKey(SimplePrimaryKey<'a>),
1194    /// See [`OptionalMetadataFieldType::PRIMARY_KEY_WITH_PREFIX`].
1195    PrimaryKeyWithPrefix(PrimaryKeysWithPrefix<'a>),
1196    /// See [`OptionalMetadataFieldType::ENUM_AND_SET_DEFAULT_CHARSET`].
1197    EnumAndSetDefaultCharset(DefaultCharset<'a>),
1198    /// See [`OptionalMetadataFieldType::ENUM_AND_SET_COLUMN_CHARSET`].
1199    EnumAndSetColumnCharset(ColumnCharsets<'a>),
1200    /// See [`OptionalMetadataFieldType::COLUMN_VISIBILITY`].
1201    ColumnVisibility(
1202        /// Flags indicating visibility for every numeric column.
1203        &'a BitSlice<u8, Msb0>,
1204    ),
1205    /// See [`OptionalMetadataFieldType::VECTOR_DIMENSIONALITY`].
1206    Dimensionality(VectorDimensionalities<'a>),
1207}
1208
1209/// Iterator over fields of an optional metadata.
1210#[derive(Debug)]
1211pub struct OptionalMetadataIter<'a> {
1212    columns: &'a RawSeq<'a, u8, ColumnType>,
1213    data: &'a [u8],
1214}
1215
1216impl<'a> OptionalMetadataIter<'a> {
1217    /// Reads type-length-value value.
1218    fn read_tlv(&mut self) -> io::Result<(RawConst<u8, OptionalMetadataFieldType>, &'a [u8])> {
1219        let t = self.data.read_u8()?;
1220        // The length is encoded in 1, 3 or 8 bytes (https://github.com/mysql/mysql-server/blob/824e2b4064053f7daf17d7f3f84b7a3ed92e5fb4/libs/mysql/binlog/event/binlog_event.h#L812)
1221        let l = self.data.read_lenenc_int()? as usize;
1222        let v = match self.data.get(..l) {
1223            Some(v) => v,
1224            None => {
1225                self.data = &[];
1226                return Err(io::Error::new(
1227                    io::ErrorKind::UnexpectedEof,
1228                    "can't read tlv value",
1229                ));
1230            }
1231        };
1232        self.data = &self.data[l..];
1233        Ok((RawConst::new(t), v))
1234    }
1235
1236    /// Returns next tlv, if any.
1237    fn next_tlv(
1238        &mut self,
1239    ) -> Option<io::Result<(RawConst<u8, OptionalMetadataFieldType>, &'a [u8])>> {
1240        if self.data.is_empty() {
1241            return None;
1242        }
1243
1244        self.read_tlv().map(Some).transpose()
1245    }
1246
1247    fn num_columns(&self) -> usize {
1248        self.columns.0.len()
1249    }
1250
1251    fn count_columns(&self, f: fn(&ColumnType) -> bool) -> usize {
1252        self.columns
1253            .0
1254            .iter()
1255            .filter_map(|val| ColumnType::try_from(*val).ok())
1256            .filter(f)
1257            .count()
1258    }
1259}
1260
1261impl<'a> Iterator for OptionalMetadataIter<'a> {
1262    type Item = io::Result<OptionalMetadataField<'a>>;
1263
1264    fn next(&mut self) -> Option<Self::Item> {
1265        use OptionalMetadataFieldType::*;
1266
1267        self.next_tlv()?
1268            .and_then(|(t, v)| {
1269                let mut v = ParseBuf(v);
1270                match t.get() {
1271                    Ok(t) => match t {
1272                        SIGNEDNESS => {
1273                            let num_numeric = self.count_columns(ColumnType::is_numeric_type);
1274                            let num_flags_bytes = (num_numeric + 7) / 8;
1275                            let flags: &[u8] = v.parse(num_flags_bytes)?;
1276
1277                            if !v.is_empty() {
1278                                return Err(io::Error::new(
1279                                    io::ErrorKind::Other,
1280                                    "bytes remaining on stream",
1281                                ));
1282                            }
1283
1284                            let flags = BitSlice::from_slice(flags);
1285                            Ok(OptionalMetadataField::Signedness(&flags[..num_numeric]))
1286                        }
1287                        DEFAULT_CHARSET => Ok(OptionalMetadataField::DefaultCharset(v.parse(())?)),
1288                        COLUMN_CHARSET => Ok(OptionalMetadataField::ColumnCharset(v.parse(())?)),
1289                        COLUMN_NAME => Ok(OptionalMetadataField::ColumnName(v.parse(())?)),
1290                        SET_STR_VALUE => Ok(OptionalMetadataField::SetStrValue(v.parse(())?)),
1291                        ENUM_STR_VALUE => Ok(OptionalMetadataField::EnumStrValue(v.parse(())?)),
1292                        GEOMETRY_TYPE => Ok(OptionalMetadataField::GeometryType(v.parse(())?)),
1293                        SIMPLE_PRIMARY_KEY => {
1294                            Ok(OptionalMetadataField::SimplePrimaryKey(v.parse(())?))
1295                        }
1296                        PRIMARY_KEY_WITH_PREFIX => {
1297                            Ok(OptionalMetadataField::PrimaryKeyWithPrefix(v.parse(())?))
1298                        }
1299                        ENUM_AND_SET_DEFAULT_CHARSET => Ok(
1300                            OptionalMetadataField::EnumAndSetDefaultCharset(v.parse(())?),
1301                        ),
1302                        ENUM_AND_SET_COLUMN_CHARSET => {
1303                            Ok(OptionalMetadataField::EnumAndSetColumnCharset(v.parse(())?))
1304                        }
1305                        COLUMN_VISIBILITY => {
1306                            let num_columns = self.num_columns();
1307                            let num_flags_bytes = (num_columns + 7) / 8;
1308                            let flags: &[u8] = v.parse(num_flags_bytes)?;
1309
1310                            if !v.is_empty() {
1311                                return Err(io::Error::new(
1312                                    io::ErrorKind::Other,
1313                                    "bytes remaining on stream",
1314                                ));
1315                            }
1316
1317                            let flags = BitSlice::from_slice(flags);
1318                            let flags = &flags[..num_columns];
1319                            Ok(OptionalMetadataField::ColumnVisibility(flags))
1320                        }
1321                        VECTOR_DIMENSIONALITY => {
1322                            Ok(OptionalMetadataField::Dimensionality(v.parse(())?))
1323                        }
1324                    },
1325                    Err(_) => Err(io::Error::new(
1326                        io::ErrorKind::InvalidData,
1327                        "Unknown optional metadata field type",
1328                    )),
1329                }
1330            })
1331            .map(Some)
1332            .transpose()
1333    }
1334}
1335
1336/// Helper struct that extracts optional metadata for columns.
1337pub struct OptionalMetaExtractor<'a> {
1338    signedness: Option<&'a BitSlice<u8, Msb0>>,
1339    default_charset: Option<DefaultCharset<'a>>,
1340    column_charset: Option<ColumnCharsets<'a>>,
1341    column_name: Option<ColumnNames<'a>>,
1342    simple_primary_key: Option<SimplePrimaryKey<'a>>,
1343    primary_key_with_prefix: Option<PrimaryKeysWithPrefix<'a>>,
1344    enum_and_set_default_charset: Option<DefaultCharset<'a>>,
1345    enum_and_set_column_charset: Option<ColumnCharsets<'a>>,
1346}
1347
1348impl<'a> OptionalMetaExtractor<'a> {
1349    pub fn new(iter_optional_meta: OptionalMetadataIter<'a>) -> io::Result<Self> {
1350        let mut this = Self {
1351            signedness: None,
1352            default_charset: None,
1353            column_charset: None,
1354            column_name: None,
1355            simple_primary_key: None,
1356            primary_key_with_prefix: None,
1357            enum_and_set_default_charset: None,
1358            enum_and_set_column_charset: None,
1359        };
1360
1361        for field in iter_optional_meta {
1362            match field? {
1363                OptionalMetadataField::Signedness(x) => this.signedness = Some(x),
1364                OptionalMetadataField::DefaultCharset(x) => {
1365                    this.default_charset = Some(x);
1366                }
1367                OptionalMetadataField::ColumnCharset(x) => {
1368                    this.column_charset = Some(x);
1369                }
1370                OptionalMetadataField::ColumnName(x) => {
1371                    this.column_name = Some(x);
1372                }
1373                OptionalMetadataField::SetStrValue(_) => (),
1374                OptionalMetadataField::EnumStrValue(_) => (),
1375                OptionalMetadataField::GeometryType(_) => (),
1376                OptionalMetadataField::SimplePrimaryKey(x) => {
1377                    this.simple_primary_key = Some(x);
1378                }
1379                OptionalMetadataField::PrimaryKeyWithPrefix(x) => {
1380                    this.primary_key_with_prefix = Some(x);
1381                }
1382                OptionalMetadataField::EnumAndSetDefaultCharset(x) => {
1383                    this.enum_and_set_default_charset = Some(x);
1384                }
1385                OptionalMetadataField::EnumAndSetColumnCharset(x) => {
1386                    this.enum_and_set_column_charset = Some(x);
1387                }
1388                OptionalMetadataField::ColumnVisibility(_) => (),
1389                OptionalMetadataField::Dimensionality(_) => (),
1390            }
1391        }
1392
1393        Ok(this)
1394    }
1395
1396    /// For every numeric column (in order) emits signedness data (`true` means _unsigned_).
1397    ///
1398    /// Emits nothing if there are no signedness data in the optional metadata.
1399    pub fn iter_signedness(&'a self) -> impl Iterator<Item = bool> + 'a {
1400        self.signedness
1401            .as_ref()
1402            .map(|x| x.iter().by_vals())
1403            .into_iter()
1404            .flatten()
1405    }
1406
1407    /// For every character column (in order) emits its character set.
1408    ///
1409    /// Will use either `DEFAULT_CHARSET` or `COLUMN_CHARSET` metadata.
1410    ///
1411    /// ### Warning
1412    ///
1413    /// This iterator is infinite if `DEFAULT_CHARSET` is used.
1414    ///
1415    /// Emits nothing if there are no charset data in the optional metadata.
1416    pub fn iter_charset(&'a self) -> impl Iterator<Item = Result<u16, io::Error>> + 'a {
1417        let default_charset = self.default_charset.as_ref().map(|x| x.default_charset());
1418        let non_default = self.default_charset.as_ref().map(|x| x.iter_non_default());
1419        let per_column = self.column_charset.as_ref().map(|x| x.iter_charsets());
1420
1421        iter_charset_helper(default_charset, non_default, per_column)
1422    }
1423
1424    /// For every ENUM and SET column (in order) emits its character set.
1425    ///
1426    /// Will use either `ENUM_AND_SET_DEFAULT_CHARSET` or `ENUM_AND_SET_COLUMN_CHARSET` metadata.
1427    ///
1428    /// ### Warning
1429    ///
1430    /// This iterator is infinite if `ENUM_AND_SET_DEFAULT_CHARSET` is used.
1431    ///
1432    /// Emits nothing if there are no charset data in the optional metadata.
1433    pub fn iter_enum_and_set_charset(
1434        &'a self,
1435    ) -> impl Iterator<Item = Result<u16, io::Error>> + 'a {
1436        let default_charset = self
1437            .enum_and_set_default_charset
1438            .as_ref()
1439            .map(|x| x.default_charset());
1440        let non_default = self
1441            .enum_and_set_default_charset
1442            .as_ref()
1443            .map(|x| x.iter_non_default());
1444        let per_column = self
1445            .enum_and_set_column_charset
1446            .as_ref()
1447            .map(|x| x.iter_charsets());
1448
1449        iter_charset_helper(default_charset, non_default, per_column)
1450    }
1451
1452    /// Iterates over column indexes for columns that are primary keys.
1453    ///
1454    /// Returns peekable iterator so that it is possible to observe next PK index.
1455    ///
1456    /// Emits nothing if there are no PK data in the optinal metadata.
1457    pub fn iter_primary_key(&'a self) -> Peekable<impl Iterator<Item = io::Result<u64>> + 'a> {
1458        let simple = self
1459            .simple_primary_key
1460            .as_ref()
1461            .map(|x| x.iter_indexes())
1462            .into_iter()
1463            .flatten();
1464        let prefixed = self
1465            .primary_key_with_prefix
1466            .as_ref()
1467            .map(|x| x.iter_keys().map(|x| x.map(|x| x.column_index())))
1468            .into_iter()
1469            .flatten();
1470
1471        simple.chain(prefixed).peekable()
1472    }
1473
1474    /// For every column (in order) emits its name
1475    ///
1476    /// Emits nothing if there are no column name data in the optional metadata.
1477    pub fn iter_column_name(&'a self) -> impl Iterator<Item = io::Result<ColumnName<'a>>> + 'a {
1478        self.column_name
1479            .as_ref()
1480            .map(|x| x.iter_names())
1481            .into_iter()
1482            .flatten()
1483    }
1484}
1485
1486fn iter_charset_helper<'a>(
1487    default_charset: Option<u16>,
1488    iter_non_default: Option<IterNonDefault<'a>>,
1489    iter_charsets: Option<IterCharsets<'a>>,
1490) -> impl Iterator<Item = Result<u16, io::Error>> + 'a {
1491    let non_default = iter_non_default
1492        .into_iter()
1493        .flatten()
1494        .map(|x| x.map(Either::Left));
1495    let per_column = iter_charsets
1496        .into_iter()
1497        .flatten()
1498        .map(|x| x.map(Either::Right));
1499
1500    let mut non_default = non_default.chain(per_column).peekable();
1501    let mut broken = false;
1502    let mut current = 0;
1503    std::iter::from_fn(move || {
1504        if broken {
1505            return None;
1506        }
1507
1508        let result = match non_default.peek() {
1509            Some(x) => match x {
1510                Ok(x) => match x {
1511                    Either::Left(x) => {
1512                        if x.column_index() == current {
1513                            non_default
1514                                .next()
1515                                .map(|x| Ok(x.unwrap().unwrap_left().charset()))
1516                        } else {
1517                            default_charset.map(Ok)
1518                        }
1519                    }
1520                    Either::Right(x) => {
1521                        let x = *x;
1522                        non_default.next().map(|_| Ok(x))
1523                    }
1524                },
1525                Err(_) => {
1526                    broken = true;
1527                    non_default.next().map(|x| Err(x.unwrap_err()))
1528                }
1529            },
1530            None => default_charset.map(Ok),
1531        };
1532
1533        current += 1;
1534        result
1535    })
1536}