use std::{borrow::Cow, cmp::min, convert::TryFrom, io, iter::Peekable};
use bitvec::prelude::*;
use byteorder::ReadBytesExt;
use saturating::Saturating as S;
use crate::{
binlog::{
consts::{BinlogVersion, EventType, OptionalMetadataFieldType},
BinlogCtx, BinlogEvent, BinlogStruct,
},
constants::{ColumnType, GeometryType, UnknownColumnType},
io::{ParseBuf, ReadMysqlExt},
misc::raw::{
bytes::{BareBytes, EofBytes, LenEnc, U8Bytes},
int::*,
Either, RawBytes, RawConst, RawSeq, Skip,
},
proto::{MyDeserialize, MySerialize},
};
use super::BinlogEventHeader;
#[derive(Debug, Clone, Copy, Eq, PartialEq, thiserror::Error)]
pub enum BadColumnType {
#[error(transparent)]
Unknown(#[from] UnknownColumnType),
#[error("Unexpected column type: {}", _0)]
Unexpected(u8),
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct TableMapEvent<'a> {
table_id: RawInt<LeU48>,
flags: RawInt<LeU16>,
database_name: RawBytes<'a, U8Bytes>,
__null_1: Skip<1>,
table_name: RawBytes<'a, U8Bytes>,
__null_2: Skip<1>,
columns_count: RawInt<LenEnc>,
columns_type: RawSeq<'a, u8, ColumnType>,
columns_metadata: RawBytes<'a, LenEnc>,
null_bitmask: RawBytes<'a, BareBytes<0x2000000000000000>>,
optional_metadata: RawBytes<'a, EofBytes>,
}
impl<'a> TableMapEvent<'a> {
pub fn table_id(&self) -> u64 {
self.table_id.0
}
pub fn columns_count(&self) -> u64 {
self.columns_count.0
}
pub fn json_column_count(&self) -> usize {
self.columns_type
.0
.iter()
.filter(|x| **x == ColumnType::MYSQL_TYPE_JSON as u8)
.count()
}
pub fn null_bitmask(&'a self) -> &'a BitSlice<u8> {
let slice = BitSlice::from_slice(self.null_bitmask.as_bytes());
&slice[..self.columns_count() as usize]
}
pub fn database_name_raw(&'a self) -> &'a [u8] {
self.database_name.as_bytes()
}
pub fn database_name(&'a self) -> Cow<'a, str> {
self.database_name.as_str()
}
pub fn table_name_raw(&'a self) -> &'a [u8] {
self.table_name.as_bytes()
}
pub fn table_name(&'a self) -> Cow<'a, str> {
self.table_name.as_str()
}
pub fn get_raw_column_type(
&self,
col_idx: usize,
) -> Result<Option<ColumnType>, UnknownColumnType> {
self.columns_type.get(col_idx).map(|x| x.get()).transpose()
}
pub fn get_column_type(&self, col_idx: usize) -> Result<Option<ColumnType>, BadColumnType> {
self.columns_type
.get(col_idx)
.map(|x| {
x.get()
.map_err(BadColumnType::from)
.and_then(|column_type| self.get_real_type(col_idx, column_type))
})
.transpose()
}
pub fn get_column_metadata(&self, col_idx: usize) -> Option<&[u8]> {
let mut offset = 0;
for i in 0..=col_idx {
let ty = self.columns_type.get(i)?.get().ok()?;
let ptr = self.columns_metadata.as_bytes().get(offset..)?;
let (metadata, len) = ty.get_metadata(ptr, false)?;
if i == col_idx {
return Some(metadata);
} else {
offset += len;
}
}
None
}
pub fn iter_optional_meta(&'a self) -> OptionalMetadataIter<'a> {
OptionalMetadataIter {
columns: &self.columns_type,
data: self.optional_metadata.as_bytes(),
}
}
pub fn into_owned(self) -> TableMapEvent<'static> {
TableMapEvent {
table_id: self.table_id,
flags: self.flags,
database_name: self.database_name.into_owned(),
__null_1: self.__null_1,
table_name: self.table_name.into_owned(),
__null_2: self.__null_2,
columns_count: self.columns_count,
columns_type: self.columns_type.into_owned(),
columns_metadata: self.columns_metadata.into_owned(),
null_bitmask: self.null_bitmask.into_owned(),
optional_metadata: self.optional_metadata.into_owned(),
}
}
fn get_real_type(
&self,
col_idx: usize,
column_type: ColumnType,
) -> Result<ColumnType, BadColumnType> {
match column_type {
ColumnType::MYSQL_TYPE_DATE => {
return Ok(ColumnType::MYSQL_TYPE_NEWDATE);
}
ColumnType::MYSQL_TYPE_STRING => {
let mut real_type = column_type as u8;
if let Some(metadata_bytes) = self.get_column_metadata(col_idx) {
let f1 = metadata_bytes[0];
if f1 != 0 {
real_type = f1 | 0x30;
}
match real_type {
247 => return Ok(ColumnType::MYSQL_TYPE_ENUM),
248 => return Ok(ColumnType::MYSQL_TYPE_SET),
254 => return Ok(ColumnType::MYSQL_TYPE_STRING),
x => {
return Err(BadColumnType::Unexpected(x));
}
};
}
}
_ => (),
}
Ok(column_type)
}
}
impl<'de> MyDeserialize<'de> for TableMapEvent<'de> {
const SIZE: Option<usize> = None;
type Ctx = BinlogCtx<'de>;
fn deserialize(ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
let table_id = if 6 == ctx.fde.get_event_type_header_length(Self::EVENT_TYPE) {
let table_id: RawInt<LeU32> = buf.parse(())?;
RawInt::new(table_id.0 as u64)
} else {
buf.parse(())?
};
let flags = buf.parse(())?;
let database_name = buf.parse(())?;
let __null_1 = buf.parse(())?;
let table_name = buf.parse(())?;
let __null_2 = buf.parse(())?;
let columns_count: RawInt<LenEnc> = buf.parse(())?;
let columns_type = buf.parse(columns_count.0 as usize)?;
let columns_metadata = buf.parse(())?;
let null_bitmask = buf.parse(((columns_count.0 + 7) / 8) as usize)?;
let optional_metadata = buf.parse(())?;
Ok(TableMapEvent {
table_id,
flags,
database_name,
__null_1,
table_name,
__null_2,
columns_count,
columns_type,
columns_metadata,
null_bitmask,
optional_metadata,
})
}
}
impl MySerialize for TableMapEvent<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.table_id.serialize(&mut *buf);
self.flags.serialize(&mut *buf);
self.database_name.serialize(&mut *buf);
self.__null_1.serialize(&mut *buf);
self.table_name.serialize(&mut *buf);
self.__null_2.serialize(&mut *buf);
self.columns_count.serialize(&mut *buf);
self.columns_type.serialize(&mut *buf);
self.columns_metadata.serialize(&mut *buf);
self.null_bitmask.serialize(&mut *buf);
self.optional_metadata.serialize(&mut *buf);
}
}
impl<'a> BinlogEvent<'a> for TableMapEvent<'a> {
const EVENT_TYPE: EventType = EventType::TABLE_MAP_EVENT;
}
impl<'a> BinlogStruct<'a> for TableMapEvent<'a> {
fn len(&self, _version: BinlogVersion) -> usize {
let mut len = S(0);
len += S(6);
len += S(2);
len += S(1);
len += S(min(self.database_name.0.len(), u8::MAX as usize));
len += S(1);
len += S(1);
len += S(min(self.table_name.0.len(), u8::MAX as usize));
len += S(1);
len += S(crate::misc::lenenc_int_len(self.columns_count()) as usize);
len += S(self.columns_count() as usize);
len += S(crate::misc::lenenc_str_len(self.columns_metadata.as_bytes()) as usize);
len += S((self.columns_count() as usize + 8) / 7);
len += S(self.optional_metadata.len());
min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct DefaultCharset<'a> {
default_charset: RawInt<LenEnc>,
non_default: RawBytes<'a, EofBytes>,
}
impl<'a> DefaultCharset<'a> {
pub fn default_charset(&self) -> u16 {
self.default_charset.0 as u16
}
pub fn iter_non_default(&self) -> IterNonDefault<'_> {
IterNonDefault {
buf: ParseBuf(self.non_default.as_bytes()),
}
}
}
pub struct IterNonDefault<'a> {
buf: ParseBuf<'a>,
}
impl<'a> Iterator for IterNonDefault<'a> {
type Item = io::Result<NonDefaultCharset>;
fn next(&mut self) -> Option<Self::Item> {
if self.buf.is_empty() {
None
} else {
match self.buf.parse(()) {
Ok(x) => Some(Ok(x)),
Err(e) => {
self.buf = ParseBuf(b"");
Some(Err(e))
}
}
}
}
}
impl<'de> MyDeserialize<'de> for DefaultCharset<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
default_charset: buf.parse(())?,
non_default: buf.parse(())?,
})
}
}
impl MySerialize for DefaultCharset<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.default_charset.serialize(&mut *buf);
self.non_default.serialize(&mut *buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct NonDefaultCharset {
column_index: RawInt<LenEnc>,
charset: RawInt<LenEnc>,
}
impl NonDefaultCharset {
pub fn new(column_index: u64, charset: u16) -> Self {
Self {
column_index: RawInt::new(column_index),
charset: RawInt::new(charset as u64),
}
}
pub fn column_index(&self) -> u64 {
self.column_index.0
}
pub fn charset(&self) -> u16 {
self.charset.0 as u16
}
}
impl<'de> MyDeserialize<'de> for NonDefaultCharset {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
column_index: buf.parse(())?,
charset: buf.parse(())?,
})
}
}
impl MySerialize for NonDefaultCharset {
fn serialize(&self, buf: &mut Vec<u8>) {
self.column_index.serialize(&mut *buf);
self.charset.serialize(&mut *buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ColumnCharsets<'a> {
charsets: RawBytes<'a, EofBytes>,
}
impl<'a> ColumnCharsets<'a> {
pub fn iter_charsets(&'a self) -> IterCharsets<'a> {
IterCharsets {
buf: ParseBuf(self.charsets.as_bytes()),
}
}
}
pub struct IterCharsets<'a> {
buf: ParseBuf<'a>,
}
impl<'a> Iterator for IterCharsets<'a> {
type Item = io::Result<u16>;
fn next(&mut self) -> Option<Self::Item> {
if self.buf.is_empty() {
None
} else {
match self.buf.parse::<RawInt<LenEnc>>(()) {
Ok(x) => Some(Ok(x.0 as u16)),
Err(e) => {
self.buf = ParseBuf(b"");
Some(Err(e))
}
}
}
}
}
impl<'de> MyDeserialize<'de> for ColumnCharsets<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
charsets: buf.parse(())?,
})
}
}
impl MySerialize for ColumnCharsets<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.charsets.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ColumnName<'a> {
name: RawBytes<'a, LenEnc>,
}
impl<'a> ColumnName<'a> {
pub fn new(name: impl Into<Cow<'a, [u8]>>) -> Self {
Self {
name: RawBytes::new(name),
}
}
pub fn name_raw(&'a self) -> &'a [u8] {
self.name.as_bytes()
}
pub fn name(&'a self) -> Cow<'a, str> {
self.name.as_str()
}
pub fn into_owned(self) -> ColumnName<'static> {
ColumnName {
name: self.name.into_owned(),
}
}
}
impl<'de> MyDeserialize<'de> for ColumnName<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
name: buf.parse(())?,
})
}
}
impl MySerialize for ColumnName<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.name.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ColumnNames<'a> {
names: RawBytes<'a, EofBytes>,
}
impl<'a> ColumnNames<'a> {
pub fn iter_names(&self) -> IterNames<'_> {
IterNames {
buf: ParseBuf(self.names.as_bytes()),
}
}
}
pub struct IterNames<'a> {
buf: ParseBuf<'a>,
}
impl<'a> Iterator for IterNames<'a> {
type Item = io::Result<ColumnName<'a>>;
fn next(&mut self) -> Option<Self::Item> {
if self.buf.is_empty() {
None
} else {
match self.buf.parse(()) {
Ok(x) => Some(Ok(x)),
Err(e) => {
self.buf = ParseBuf(b"");
Some(Err(e))
}
}
}
}
}
impl<'de> MyDeserialize<'de> for ColumnNames<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
names: buf.parse(())?,
})
}
}
impl MySerialize for ColumnNames<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.names.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SetsStrValues<'a> {
values: RawBytes<'a, EofBytes>,
}
impl<'a> SetsStrValues<'a> {
pub fn iter_values(&'a self) -> IterSetStrValues<'a> {
IterSetStrValues {
buf: ParseBuf(self.values.as_bytes()),
}
}
}
pub struct IterSetStrValues<'a> {
buf: ParseBuf<'a>,
}
impl<'a> Iterator for IterSetStrValues<'a> {
type Item = io::Result<SetStrValues<'a>>;
fn next(&mut self) -> Option<Self::Item> {
if self.buf.is_empty() {
None
} else {
match self.buf.parse(()) {
Ok(x) => Some(Ok(x)),
Err(e) => {
self.buf = ParseBuf(b"");
Some(Err(e))
}
}
}
}
}
impl<'de> MyDeserialize<'de> for SetsStrValues<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
values: buf.parse(())?,
})
}
}
impl MySerialize for SetsStrValues<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.values.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SetStrValue<'a> {
value: RawBytes<'a, LenEnc>,
}
impl<'a> SetStrValue<'a> {
pub fn new(value: impl Into<Cow<'a, [u8]>>) -> Self {
Self {
value: RawBytes::new(value),
}
}
pub fn value_raw(&'a self) -> &'a [u8] {
self.value.as_bytes()
}
pub fn value(&'a self) -> Cow<'a, str> {
self.value.as_str()
}
}
impl<'de> MyDeserialize<'de> for SetStrValue<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
value: buf.parse(())?,
})
}
}
impl MySerialize for SetStrValue<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.value.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SetStrValues<'a> {
num_variants: RawInt<LenEnc>,
values: Vec<SetStrValue<'a>>,
}
impl<'a> SetStrValues<'a> {
pub fn num_variants(&self) -> u64 {
self.num_variants.0
}
pub fn values(&'a self) -> &'a [SetStrValue<'a>] {
self.values.as_ref()
}
}
impl<'de> MyDeserialize<'de> for SetStrValues<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
let num_variants: RawInt<LenEnc> = buf.parse(())?;
let mut values = Vec::with_capacity(num_variants.0 as usize);
for _ in 0..num_variants.0 {
values.push(buf.parse(())?);
}
Ok(Self {
num_variants,
values,
})
}
}
impl MySerialize for SetStrValues<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.num_variants.serialize(&mut *buf);
for value in &self.values {
value.serialize(buf);
}
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct EnumsStrValues<'a> {
values: RawBytes<'a, EofBytes>,
}
impl<'a> EnumsStrValues<'a> {
pub fn iter_values(&'a self) -> IterEnumStrValues<'a> {
IterEnumStrValues {
buf: ParseBuf(self.values.as_bytes()),
}
}
}
pub struct IterEnumStrValues<'a> {
buf: ParseBuf<'a>,
}
impl<'a> Iterator for IterEnumStrValues<'a> {
type Item = io::Result<EnumStrValues<'a>>;
fn next(&mut self) -> Option<Self::Item> {
if self.buf.is_empty() {
None
} else {
match self.buf.parse(()) {
Ok(x) => Some(Ok(x)),
Err(e) => {
self.buf = ParseBuf(b"");
Some(Err(e))
}
}
}
}
}
impl<'de> MyDeserialize<'de> for EnumsStrValues<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
values: buf.parse(())?,
})
}
}
impl MySerialize for EnumsStrValues<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.values.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct EnumStrValue<'a> {
value: RawBytes<'a, LenEnc>,
}
impl<'a> EnumStrValue<'a> {
pub fn new(value: impl Into<Cow<'a, [u8]>>) -> Self {
Self {
value: RawBytes::new(value),
}
}
pub fn value_raw(&'a self) -> &'a [u8] {
self.value.as_bytes()
}
pub fn value(&'a self) -> Cow<'a, str> {
self.value.as_str()
}
}
impl<'de> MyDeserialize<'de> for EnumStrValue<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
value: buf.parse(())?,
})
}
}
impl MySerialize for EnumStrValue<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.value.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct EnumStrValues<'a> {
num_variants: RawInt<LenEnc>,
values: Vec<EnumStrValue<'a>>,
}
impl<'a> EnumStrValues<'a> {
pub fn num_variants(&self) -> u64 {
self.num_variants.0
}
pub fn values(&'a self) -> &'a [EnumStrValue<'a>] {
self.values.as_ref()
}
}
impl<'de> MyDeserialize<'de> for EnumStrValues<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
let num_variants: RawInt<LenEnc> = buf.parse(())?;
let mut values = Vec::with_capacity(num_variants.0 as usize);
for _ in 0..num_variants.0 {
values.push(buf.parse(())?);
}
Ok(Self {
num_variants,
values,
})
}
}
impl MySerialize for EnumStrValues<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.num_variants.serialize(&mut *buf);
for value in &self.values {
value.serialize(buf);
}
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct GeometryTypes<'a> {
geometry_types: RawBytes<'a, EofBytes>,
}
impl<'a> GeometryTypes<'a> {
pub fn iter_geometry_types(&'a self) -> IterGeometryTypes<'a> {
IterGeometryTypes {
buf: ParseBuf(self.geometry_types.as_bytes()),
}
}
}
pub struct IterGeometryTypes<'a> {
buf: ParseBuf<'a>,
}
impl<'a> Iterator for IterGeometryTypes<'a> {
type Item = io::Result<GeometryType>;
fn next(&mut self) -> Option<Self::Item> {
if self.buf.is_empty() {
None
} else {
match self.buf.parse::<RawInt<LenEnc>>(()) {
Ok(x) => Some(
GeometryType::try_from(x.0 as u8)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)),
),
Err(e) => {
self.buf = ParseBuf(b"");
Some(Err(e))
}
}
}
}
}
impl<'de> MyDeserialize<'de> for GeometryTypes<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
geometry_types: buf.parse(())?,
})
}
}
impl MySerialize for GeometryTypes<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.geometry_types.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SimplePrimaryKey<'a> {
indexes: RawBytes<'a, EofBytes>,
}
impl<'a> SimplePrimaryKey<'a> {
pub fn iter_indexes(&'a self) -> IterIndexes<'a> {
IterIndexes {
buf: ParseBuf(self.indexes.as_bytes()),
}
}
}
pub struct IterIndexes<'a> {
buf: ParseBuf<'a>,
}
impl<'a> Iterator for IterIndexes<'a> {
type Item = io::Result<u64>;
fn next(&mut self) -> Option<Self::Item> {
if self.buf.is_empty() {
None
} else {
match self.buf.parse::<RawInt<LenEnc>>(()) {
Ok(x) => Some(Ok(x.0)),
Err(e) => {
self.buf = ParseBuf(b"");
Some(Err(e))
}
}
}
}
}
impl<'de> MyDeserialize<'de> for SimplePrimaryKey<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
indexes: buf.parse(())?,
})
}
}
impl MySerialize for SimplePrimaryKey<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.indexes.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct PrimaryKeysWithPrefix<'a> {
data: RawBytes<'a, EofBytes>,
}
impl<'a> PrimaryKeysWithPrefix<'a> {
pub fn iter_keys(&'a self) -> IterKeys<'a> {
IterKeys {
buf: ParseBuf(self.data.as_bytes()),
}
}
}
pub struct IterKeys<'a> {
buf: ParseBuf<'a>,
}
impl<'a> Iterator for IterKeys<'a> {
type Item = io::Result<PrimaryKeyWithPrefix>;
fn next(&mut self) -> Option<Self::Item> {
if self.buf.is_empty() {
None
} else {
match self.buf.parse(()) {
Ok(x) => Some(Ok(x)),
Err(e) => {
self.buf = ParseBuf(b"");
Some(Err(e))
}
}
}
}
}
impl<'de> MyDeserialize<'de> for PrimaryKeysWithPrefix<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
data: buf.parse(())?,
})
}
}
impl MySerialize for PrimaryKeysWithPrefix<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.data.serialize(buf);
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct PrimaryKeyWithPrefix {
column_index: RawInt<LenEnc>,
prefix_length: RawInt<LenEnc>,
}
impl PrimaryKeyWithPrefix {
pub fn new(column_index: u64, prefix_length: u64) -> Self {
Self {
column_index: RawInt::new(column_index),
prefix_length: RawInt::new(prefix_length),
}
}
pub fn column_index(&self) -> u64 {
self.column_index.0
}
pub fn prefix_length(&self) -> u64 {
self.prefix_length.0
}
}
impl<'de> MyDeserialize<'de> for PrimaryKeyWithPrefix {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
column_index: buf.parse(())?,
prefix_length: buf.parse(())?,
})
}
}
impl MySerialize for PrimaryKeyWithPrefix {
fn serialize(&self, buf: &mut Vec<u8>) {
self.column_index.serialize(buf);
self.prefix_length.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum OptionalMetadataField<'a> {
Signedness(
&'a BitSlice<u8, Msb0>,
),
DefaultCharset(DefaultCharset<'a>),
ColumnCharset(ColumnCharsets<'a>),
ColumnName(ColumnNames<'a>),
SetStrValue(SetsStrValues<'a>),
EnumStrValue(EnumsStrValues<'a>),
GeometryType(GeometryTypes<'a>),
SimplePrimaryKey(SimplePrimaryKey<'a>),
PrimaryKeyWithPrefix(PrimaryKeysWithPrefix<'a>),
EnumAndSetDefaultCharset(DefaultCharset<'a>),
EnumAndSetColumnCharset(ColumnCharsets<'a>),
ColumnVisibility(
&'a BitSlice<u8, Msb0>,
),
}
#[derive(Debug)]
pub struct OptionalMetadataIter<'a> {
columns: &'a RawSeq<'a, u8, ColumnType>,
data: &'a [u8],
}
impl<'a> OptionalMetadataIter<'a> {
fn read_tlv(&mut self) -> io::Result<(RawConst<u8, OptionalMetadataFieldType>, &'a [u8])> {
let t = self.data.read_u8()?;
let l = self.data.read_lenenc_int()? as usize;
let v = match self.data.get(..l) {
Some(v) => v,
None => {
self.data = &[];
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"can't read tlv value",
));
}
};
self.data = &self.data[l..];
Ok((RawConst::new(t), v))
}
fn next_tlv(
&mut self,
) -> Option<io::Result<(RawConst<u8, OptionalMetadataFieldType>, &'a [u8])>> {
if self.data.is_empty() {
return None;
}
self.read_tlv().map(Some).transpose()
}
fn num_columns(&self) -> usize {
self.columns.0.len()
}
fn count_columns(&self, f: fn(&ColumnType) -> bool) -> usize {
self.columns
.0
.iter()
.filter_map(|val| ColumnType::try_from(*val).ok())
.filter(f)
.count()
}
}
impl<'a> Iterator for OptionalMetadataIter<'a> {
type Item = io::Result<OptionalMetadataField<'a>>;
fn next(&mut self) -> Option<Self::Item> {
use OptionalMetadataFieldType::*;
self.next_tlv()?
.and_then(|(t, v)| {
let mut v = ParseBuf(v);
match t.get() {
Ok(t) => match t {
SIGNEDNESS => {
let num_numeric = self.count_columns(ColumnType::is_numeric_type);
let num_flags_bytes = (num_numeric + 7) / 8;
let flags: &[u8] = v.parse(num_flags_bytes)?;
if !v.is_empty() {
return Err(io::Error::new(
io::ErrorKind::Other,
"bytes remaining on stream",
));
}
let flags = BitSlice::from_slice(flags);
Ok(OptionalMetadataField::Signedness(&flags[..num_numeric]))
}
DEFAULT_CHARSET => Ok(OptionalMetadataField::DefaultCharset(v.parse(())?)),
COLUMN_CHARSET => Ok(OptionalMetadataField::ColumnCharset(v.parse(())?)),
COLUMN_NAME => Ok(OptionalMetadataField::ColumnName(v.parse(())?)),
SET_STR_VALUE => Ok(OptionalMetadataField::SetStrValue(v.parse(())?)),
ENUM_STR_VALUE => Ok(OptionalMetadataField::EnumStrValue(v.parse(())?)),
GEOMETRY_TYPE => Ok(OptionalMetadataField::GeometryType(v.parse(())?)),
SIMPLE_PRIMARY_KEY => {
Ok(OptionalMetadataField::SimplePrimaryKey(v.parse(())?))
}
PRIMARY_KEY_WITH_PREFIX => {
Ok(OptionalMetadataField::PrimaryKeyWithPrefix(v.parse(())?))
}
ENUM_AND_SET_DEFAULT_CHARSET => Ok(
OptionalMetadataField::EnumAndSetDefaultCharset(v.parse(())?),
),
ENUM_AND_SET_COLUMN_CHARSET => {
Ok(OptionalMetadataField::EnumAndSetColumnCharset(v.parse(())?))
}
COLUMN_VISIBILITY => {
let num_columns = self.num_columns();
let num_flags_bytes = (num_columns + 7) / 8;
let flags: &[u8] = v.parse(num_flags_bytes)?;
if !v.is_empty() {
return Err(io::Error::new(
io::ErrorKind::Other,
"bytes remaining on stream",
));
}
let flags = BitSlice::from_slice(flags);
let flags = &flags[..num_columns];
Ok(OptionalMetadataField::ColumnVisibility(flags))
}
},
Err(_) => Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unknown optional metadata field type",
)),
}
})
.map(Some)
.transpose()
}
}
pub struct OptionalMetaExtractor<'a> {
signedness: Option<&'a BitSlice<u8, Msb0>>,
default_charset: Option<DefaultCharset<'a>>,
column_charset: Option<ColumnCharsets<'a>>,
column_name: Option<ColumnNames<'a>>,
simple_primary_key: Option<SimplePrimaryKey<'a>>,
primary_key_with_prefix: Option<PrimaryKeysWithPrefix<'a>>,
enum_and_set_default_charset: Option<DefaultCharset<'a>>,
enum_and_set_column_charset: Option<ColumnCharsets<'a>>,
}
impl<'a> OptionalMetaExtractor<'a> {
pub fn new(iter_optional_meta: OptionalMetadataIter<'a>) -> io::Result<Self> {
let mut this = Self {
signedness: None,
default_charset: None,
column_charset: None,
column_name: None,
simple_primary_key: None,
primary_key_with_prefix: None,
enum_and_set_default_charset: None,
enum_and_set_column_charset: None,
};
for field in iter_optional_meta {
match field? {
OptionalMetadataField::Signedness(x) => this.signedness = Some(x),
OptionalMetadataField::DefaultCharset(x) => {
this.default_charset = Some(x);
}
OptionalMetadataField::ColumnCharset(x) => {
this.column_charset = Some(x);
}
OptionalMetadataField::ColumnName(x) => {
this.column_name = Some(x);
}
OptionalMetadataField::SetStrValue(_) => (),
OptionalMetadataField::EnumStrValue(_) => (),
OptionalMetadataField::GeometryType(_) => (),
OptionalMetadataField::SimplePrimaryKey(x) => {
this.simple_primary_key = Some(x);
}
OptionalMetadataField::PrimaryKeyWithPrefix(x) => {
this.primary_key_with_prefix = Some(x);
}
OptionalMetadataField::EnumAndSetDefaultCharset(x) => {
this.enum_and_set_default_charset = Some(x);
}
OptionalMetadataField::EnumAndSetColumnCharset(x) => {
this.enum_and_set_column_charset = Some(x);
}
OptionalMetadataField::ColumnVisibility(_) => (),
}
}
Ok(this)
}
pub fn iter_signedness(&'a self) -> impl Iterator<Item = bool> + 'a {
self.signedness
.as_ref()
.map(|x| x.iter().by_vals())
.into_iter()
.flatten()
}
pub fn iter_charset(&'a self) -> impl Iterator<Item = Result<u16, io::Error>> + 'a {
let default_charset = self.default_charset.as_ref().map(|x| x.default_charset());
let non_default = self.default_charset.as_ref().map(|x| x.iter_non_default());
let per_column = self.column_charset.as_ref().map(|x| x.iter_charsets());
iter_charset_helper(default_charset, non_default, per_column)
}
pub fn iter_enum_and_set_charset(
&'a self,
) -> impl Iterator<Item = Result<u16, io::Error>> + 'a {
let default_charset = self
.enum_and_set_default_charset
.as_ref()
.map(|x| x.default_charset());
let non_default = self
.enum_and_set_default_charset
.as_ref()
.map(|x| x.iter_non_default());
let per_column = self
.enum_and_set_column_charset
.as_ref()
.map(|x| x.iter_charsets());
iter_charset_helper(default_charset, non_default, per_column)
}
pub fn iter_primary_key(&'a self) -> Peekable<impl Iterator<Item = io::Result<u64>> + 'a> {
let simple = self
.simple_primary_key
.as_ref()
.map(|x| x.iter_indexes())
.into_iter()
.flatten();
let prefixed = self
.primary_key_with_prefix
.as_ref()
.map(|x| x.iter_keys().map(|x| x.map(|x| x.column_index())))
.into_iter()
.flatten();
simple.chain(prefixed).peekable()
}
pub fn iter_column_name(&'a self) -> impl Iterator<Item = io::Result<ColumnName<'a>>> + 'a {
self.column_name
.as_ref()
.map(|x| x.iter_names())
.into_iter()
.flatten()
}
}
fn iter_charset_helper<'a>(
default_charset: Option<u16>,
iter_non_default: Option<IterNonDefault<'a>>,
iter_charsets: Option<IterCharsets<'a>>,
) -> impl Iterator<Item = Result<u16, io::Error>> + 'a {
let non_default = iter_non_default
.into_iter()
.flatten()
.map(|x| x.map(Either::Left));
let per_column = iter_charsets
.into_iter()
.flatten()
.map(|x| x.map(Either::Right));
let mut non_default = non_default.chain(per_column).peekable();
let mut broken = false;
let mut current = 0;
std::iter::from_fn(move || {
if broken {
return None;
}
let result = match non_default.peek() {
Some(x) => match x {
Ok(x) => match x {
Either::Left(x) => {
if x.column_index() == current {
non_default
.next()
.map(|x| Ok(x.unwrap().unwrap_left().charset()))
} else {
default_charset.map(Ok)
}
}
Either::Right(x) => {
let x = *x;
non_default.next().map(|_| Ok(x))
}
},
Err(_) => {
broken = true;
non_default.next().map(|x| Err(x.unwrap_err()))
}
},
None => default_charset.map(Ok),
};
current += 1;
result
})
}