1use std::{borrow::Cow, cmp::min, convert::TryFrom, io, iter::Peekable};
10
11use bitvec::prelude::*;
12use byteorder::ReadBytesExt;
13use saturating::Saturating as S;
14
15use crate::{
16 binlog::{
17 BinlogCtx, BinlogEvent, BinlogStruct,
18 consts::{BinlogVersion, EventType, OptionalMetadataFieldType},
19 },
20 constants::{ColumnType, GeometryType, UnknownColumnType},
21 io::{ParseBuf, ReadMysqlExt},
22 misc::raw::{
23 Either, RawBytes, RawConst, RawSeq, Skip,
24 bytes::{BareBytes, EofBytes, LenEnc, U8Bytes},
25 int::*,
26 },
27 proto::{MyDeserialize, MySerialize},
28};
29
30use super::BinlogEventHeader;
31
32#[derive(Debug, Clone, Copy, Eq, PartialEq, thiserror::Error)]
33pub enum BadColumnType {
34 #[error(transparent)]
35 Unknown(#[from] UnknownColumnType),
36 #[error("Unexpected column type: {}", _0)]
37 Unexpected(u8),
38}
39
40#[derive(Debug, Clone, Eq, PartialEq, Hash)]
45pub struct TableMapEvent<'a> {
46 table_id: RawInt<LeU48>,
51 flags: RawInt<LeU16>,
53
54 database_name: RawBytes<'a, U8Bytes>,
59 __null_1: Skip<1>,
61 table_name: RawBytes<'a, U8Bytes>,
65 __null_2: Skip<1>,
67 columns_count: RawInt<LenEnc>,
69 columns_type: RawSeq<'a, u8, ColumnType>,
71 columns_metadata: RawBytes<'a, LenEnc>,
74 null_bitmask: RawBytes<'a, BareBytes<0x2000000000000000>>,
81 optional_metadata: RawBytes<'a, EofBytes>,
83}
84
85impl<'a> TableMapEvent<'a> {
86 pub fn table_id(&self) -> u64 {
88 self.table_id.0
89 }
90
91 pub fn columns_count(&self) -> u64 {
93 self.columns_count.0
94 }
95
96 pub fn json_column_count(&self) -> usize {
98 self.columns_type
99 .0
100 .iter()
101 .filter(|x| **x == ColumnType::MYSQL_TYPE_JSON as u8)
102 .count()
103 }
104
105 pub fn null_bitmask(&'a self) -> &'a BitSlice<u8> {
110 let slice = BitSlice::from_slice(self.null_bitmask.as_bytes());
111 &slice[..self.columns_count() as usize]
112 }
113
114 pub fn database_name_raw(&'a self) -> &'a [u8] {
116 self.database_name.as_bytes()
117 }
118
119 pub fn database_name(&'a self) -> Cow<'a, str> {
121 self.database_name.as_str()
122 }
123
124 pub fn table_name_raw(&'a self) -> &'a [u8] {
126 self.table_name.as_bytes()
127 }
128
129 pub fn table_name(&'a self) -> Cow<'a, str> {
131 self.table_name.as_str()
132 }
133
134 pub fn get_raw_column_type(
138 &self,
139 col_idx: usize,
140 ) -> Result<Option<ColumnType>, UnknownColumnType> {
141 self.columns_type.get(col_idx).map(|x| x.get()).transpose()
142 }
143
144 pub fn get_column_type(&self, col_idx: usize) -> Result<Option<ColumnType>, BadColumnType> {
154 self.columns_type
155 .get(col_idx)
156 .map(|x| {
157 x.get()
158 .map_err(BadColumnType::from)
159 .and_then(|column_type| self.get_real_type(col_idx, column_type))
160 })
161 .transpose()
162 }
163
164 pub fn get_column_metadata(&self, col_idx: usize) -> Option<&[u8]> {
169 let mut offset = 0;
170 for i in 0..=col_idx {
171 let ty = self.columns_type.get(i)?.get().ok()?;
172 let ptr = self.columns_metadata.as_bytes().get(offset..)?;
173 let (metadata, len) = ty.get_metadata(ptr, false)?;
174 if i == col_idx {
175 return Some(metadata);
176 } else {
177 offset += len;
178 }
179 }
180 None
181 }
182
183 pub fn iter_optional_meta(&'a self) -> OptionalMetadataIter<'a> {
184 OptionalMetadataIter {
185 columns: &self.columns_type,
186 data: self.optional_metadata.as_bytes(),
187 }
188 }
189
190 pub fn into_owned(self) -> TableMapEvent<'static> {
192 TableMapEvent {
193 table_id: self.table_id,
194 flags: self.flags,
195 database_name: self.database_name.into_owned(),
196 __null_1: self.__null_1,
197 table_name: self.table_name.into_owned(),
198 __null_2: self.__null_2,
199 columns_count: self.columns_count,
200 columns_type: self.columns_type.into_owned(),
201 columns_metadata: self.columns_metadata.into_owned(),
202 null_bitmask: self.null_bitmask.into_owned(),
203 optional_metadata: self.optional_metadata.into_owned(),
204 }
205 }
206
207 fn get_real_type(
208 &self,
209 col_idx: usize,
210 column_type: ColumnType,
211 ) -> Result<ColumnType, BadColumnType> {
212 match column_type {
213 ColumnType::MYSQL_TYPE_DATE => {
214 return Ok(ColumnType::MYSQL_TYPE_NEWDATE);
217 }
218 ColumnType::MYSQL_TYPE_STRING => {
219 let mut real_type = column_type as u8;
220 if let Some(metadata_bytes) = self.get_column_metadata(col_idx) {
221 let f1 = metadata_bytes[0];
222
223 if f1 != 0 {
224 real_type = f1 | 0x30;
225 }
226
227 match real_type {
228 247 => return Ok(ColumnType::MYSQL_TYPE_ENUM),
229 248 => return Ok(ColumnType::MYSQL_TYPE_SET),
230 254 => return Ok(ColumnType::MYSQL_TYPE_STRING),
231 x => {
232 return Err(BadColumnType::Unexpected(x));
234 }
235 };
236 }
237 }
238 _ => (),
239 }
240
241 Ok(column_type)
242 }
243}
244
245impl<'de> MyDeserialize<'de> for TableMapEvent<'de> {
246 const SIZE: Option<usize> = None;
247 type Ctx = BinlogCtx<'de>;
248
249 fn deserialize(ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
250 let table_id = if 6 == ctx.fde.get_event_type_header_length(Self::EVENT_TYPE) {
251 let table_id: RawInt<LeU32> = buf.parse(())?;
253 RawInt::new(table_id.0 as u64)
254 } else {
255 buf.parse(())?
256 };
257
258 let flags = buf.parse(())?;
259
260 let database_name = buf.parse(())?;
261 let __null_1 = buf.parse(())?;
262 let table_name = buf.parse(())?;
263 let __null_2 = buf.parse(())?;
264
265 let columns_count: RawInt<LenEnc> = buf.parse(())?;
266 let columns_type = buf.parse(columns_count.0 as usize)?;
267 let columns_metadata = buf.parse(())?;
268 let null_bitmask = buf.parse(((columns_count.0 + 7) / 8) as usize)?;
269 let optional_metadata = buf.parse(())?;
270
271 Ok(TableMapEvent {
272 table_id,
273 flags,
274 database_name,
275 __null_1,
276 table_name,
277 __null_2,
278 columns_count,
279 columns_type,
280 columns_metadata,
281 null_bitmask,
282 optional_metadata,
283 })
284 }
285}
286
287impl MySerialize for TableMapEvent<'_> {
288 fn serialize(&self, buf: &mut Vec<u8>) {
289 self.table_id.serialize(&mut *buf);
290 self.flags.serialize(&mut *buf);
291 self.database_name.serialize(&mut *buf);
292 self.__null_1.serialize(&mut *buf);
293 self.table_name.serialize(&mut *buf);
294 self.__null_2.serialize(&mut *buf);
295 self.columns_count.serialize(&mut *buf);
296 self.columns_type.serialize(&mut *buf);
297 self.columns_metadata.serialize(&mut *buf);
298 self.null_bitmask.serialize(&mut *buf);
299 self.optional_metadata.serialize(&mut *buf);
300 }
301}
302
303impl<'a> BinlogEvent<'a> for TableMapEvent<'a> {
304 const EVENT_TYPE: EventType = EventType::TABLE_MAP_EVENT;
305}
306
307impl<'a> BinlogStruct<'a> for TableMapEvent<'a> {
308 fn len(&self, _version: BinlogVersion) -> usize {
309 let mut len = S(0);
310
311 len += S(6);
312 len += S(2);
313 len += S(1);
314 len += S(min(self.database_name.0.len(), u8::MAX as usize));
315 len += S(1);
316 len += S(1);
317 len += S(min(self.table_name.0.len(), u8::MAX as usize));
318 len += S(1);
319 len += S(crate::misc::lenenc_int_len(self.columns_count()) as usize);
320 len += S(self.columns_count() as usize);
321 len += S(crate::misc::lenenc_str_len(self.columns_metadata.as_bytes()) as usize);
322 len += S((self.columns_count() as usize + 8) / 7);
323 len += S(self.optional_metadata.len());
324
325 min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
326 }
327}
328
329#[derive(Debug, Clone, Eq, PartialEq)]
335pub struct DefaultCharset<'a> {
336 default_charset: RawInt<LenEnc>,
338 non_default: RawBytes<'a, EofBytes>,
340}
341
342impl<'a> DefaultCharset<'a> {
343 pub fn default_charset(&self) -> u16 {
345 self.default_charset.0 as u16
346 }
347
348 pub fn iter_non_default(&self) -> IterNonDefault<'_> {
357 IterNonDefault {
358 buf: ParseBuf(self.non_default.as_bytes()),
359 }
360 }
361}
362
363pub struct IterNonDefault<'a> {
364 buf: ParseBuf<'a>,
365}
366
367impl<'a> Iterator for IterNonDefault<'a> {
368 type Item = io::Result<NonDefaultCharset>;
369
370 fn next(&mut self) -> Option<Self::Item> {
371 if self.buf.is_empty() {
372 None
373 } else {
374 match self.buf.parse(()) {
375 Ok(x) => Some(Ok(x)),
376 Err(e) => {
377 self.buf = ParseBuf(b"");
378 Some(Err(e))
379 }
380 }
381 }
382 }
383}
384
385impl<'de> MyDeserialize<'de> for DefaultCharset<'de> {
386 const SIZE: Option<usize> = None;
387 type Ctx = ();
388
389 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
390 Ok(Self {
391 default_charset: buf.parse(())?,
392 non_default: buf.parse(())?,
393 })
394 }
395}
396
397impl MySerialize for DefaultCharset<'_> {
398 fn serialize(&self, buf: &mut Vec<u8>) {
399 self.default_charset.serialize(&mut *buf);
400 self.non_default.serialize(&mut *buf);
401 }
402}
403
404#[derive(Debug, Clone, Eq, PartialEq)]
407pub struct NonDefaultCharset {
408 column_index: RawInt<LenEnc>,
409 charset: RawInt<LenEnc>,
410}
411
412impl NonDefaultCharset {
413 pub fn new(column_index: u64, charset: u16) -> Self {
414 Self {
415 column_index: RawInt::new(column_index),
416 charset: RawInt::new(charset as u64),
417 }
418 }
419
420 pub fn column_index(&self) -> u64 {
422 self.column_index.0
423 }
424
425 pub fn charset(&self) -> u16 {
427 self.charset.0 as u16
428 }
429}
430
431impl<'de> MyDeserialize<'de> for NonDefaultCharset {
432 const SIZE: Option<usize> = None;
433 type Ctx = ();
434
435 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
436 Ok(Self {
437 column_index: buf.parse(())?,
438 charset: buf.parse(())?,
439 })
440 }
441}
442
443impl MySerialize for NonDefaultCharset {
444 fn serialize(&self, buf: &mut Vec<u8>) {
445 self.column_index.serialize(&mut *buf);
446 self.charset.serialize(&mut *buf);
447 }
448}
449
450#[derive(Debug, Clone, Eq, PartialEq)]
456pub struct ColumnCharsets<'a> {
457 charsets: RawBytes<'a, EofBytes>,
458}
459
460impl<'a> ColumnCharsets<'a> {
461 pub fn iter_charsets(&'a self) -> IterCharsets<'a> {
470 IterCharsets {
471 buf: ParseBuf(self.charsets.as_bytes()),
472 }
473 }
474}
475
476pub struct IterCharsets<'a> {
477 buf: ParseBuf<'a>,
478}
479
480impl<'a> Iterator for IterCharsets<'a> {
481 type Item = io::Result<u16>;
482
483 fn next(&mut self) -> Option<Self::Item> {
484 if self.buf.is_empty() {
485 None
486 } else {
487 match self.buf.parse::<RawInt<LenEnc>>(()) {
488 Ok(x) => Some(Ok(x.0 as u16)),
489 Err(e) => {
490 self.buf = ParseBuf(b"");
491 Some(Err(e))
492 }
493 }
494 }
495 }
496}
497
498impl<'de> MyDeserialize<'de> for ColumnCharsets<'de> {
499 const SIZE: Option<usize> = None;
500 type Ctx = ();
501
502 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
503 Ok(Self {
504 charsets: buf.parse(())?,
505 })
506 }
507}
508
509impl MySerialize for ColumnCharsets<'_> {
510 fn serialize(&self, buf: &mut Vec<u8>) {
511 self.charsets.serialize(buf);
512 }
513}
514
515#[derive(Debug, Clone, Eq, PartialEq)]
517pub struct ColumnName<'a> {
518 name: RawBytes<'a, LenEnc>,
519}
520
521impl<'a> ColumnName<'a> {
522 pub fn new(name: impl Into<Cow<'a, [u8]>>) -> Self {
524 Self {
525 name: RawBytes::new(name),
526 }
527 }
528
529 pub fn name_raw(&'a self) -> &'a [u8] {
531 self.name.as_bytes()
532 }
533
534 pub fn name(&'a self) -> Cow<'a, str> {
536 self.name.as_str()
537 }
538
539 pub fn into_owned(self) -> ColumnName<'static> {
541 ColumnName {
542 name: self.name.into_owned(),
543 }
544 }
545}
546
547impl<'de> MyDeserialize<'de> for ColumnName<'de> {
548 const SIZE: Option<usize> = None;
549 type Ctx = ();
550
551 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
552 Ok(Self {
553 name: buf.parse(())?,
554 })
555 }
556}
557
558impl MySerialize for ColumnName<'_> {
559 fn serialize(&self, buf: &mut Vec<u8>) {
560 self.name.serialize(buf);
561 }
562}
563
564#[derive(Debug, Clone, Eq, PartialEq)]
566pub struct ColumnNames<'a> {
567 names: RawBytes<'a, EofBytes>,
568}
569
570impl<'a> ColumnNames<'a> {
571 pub fn iter_names(&self) -> IterNames<'_> {
573 IterNames {
574 buf: ParseBuf(self.names.as_bytes()),
575 }
576 }
577}
578
579pub struct IterNames<'a> {
580 buf: ParseBuf<'a>,
581}
582
583impl<'a> Iterator for IterNames<'a> {
584 type Item = io::Result<ColumnName<'a>>;
585
586 fn next(&mut self) -> Option<Self::Item> {
587 if self.buf.is_empty() {
588 None
589 } else {
590 match self.buf.parse(()) {
591 Ok(x) => Some(Ok(x)),
592 Err(e) => {
593 self.buf = ParseBuf(b"");
594 Some(Err(e))
595 }
596 }
597 }
598 }
599}
600
601impl<'de> MyDeserialize<'de> for ColumnNames<'de> {
602 const SIZE: Option<usize> = None;
603 type Ctx = ();
604
605 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
606 Ok(Self {
607 names: buf.parse(())?,
608 })
609 }
610}
611
612impl MySerialize for ColumnNames<'_> {
613 fn serialize(&self, buf: &mut Vec<u8>) {
614 self.names.serialize(buf);
615 }
616}
617
618#[derive(Debug, Clone, Eq, PartialEq)]
620pub struct SetsStrValues<'a> {
621 values: RawBytes<'a, EofBytes>,
622}
623
624impl<'a> SetsStrValues<'a> {
625 pub fn iter_values(&'a self) -> IterSetStrValues<'a> {
627 IterSetStrValues {
628 buf: ParseBuf(self.values.as_bytes()),
629 }
630 }
631}
632
633pub struct IterSetStrValues<'a> {
634 buf: ParseBuf<'a>,
635}
636
637impl<'a> Iterator for IterSetStrValues<'a> {
638 type Item = io::Result<SetStrValues<'a>>;
639
640 fn next(&mut self) -> Option<Self::Item> {
641 if self.buf.is_empty() {
642 None
643 } else {
644 match self.buf.parse(()) {
645 Ok(x) => Some(Ok(x)),
646 Err(e) => {
647 self.buf = ParseBuf(b"");
648 Some(Err(e))
649 }
650 }
651 }
652 }
653}
654
655impl<'de> MyDeserialize<'de> for SetsStrValues<'de> {
656 const SIZE: Option<usize> = None;
657 type Ctx = ();
658
659 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
660 Ok(Self {
661 values: buf.parse(())?,
662 })
663 }
664}
665
666impl MySerialize for SetsStrValues<'_> {
667 fn serialize(&self, buf: &mut Vec<u8>) {
668 self.values.serialize(buf);
669 }
670}
671
672#[derive(Debug, Clone, Eq, PartialEq)]
674pub struct SetStrValue<'a> {
675 value: RawBytes<'a, LenEnc>,
676}
677
678impl<'a> SetStrValue<'a> {
679 pub fn new(value: impl Into<Cow<'a, [u8]>>) -> Self {
681 Self {
682 value: RawBytes::new(value),
683 }
684 }
685
686 pub fn value_raw(&'a self) -> &'a [u8] {
688 self.value.as_bytes()
689 }
690
691 pub fn value(&'a self) -> Cow<'a, str> {
693 self.value.as_str()
694 }
695}
696
697impl<'de> MyDeserialize<'de> for SetStrValue<'de> {
698 const SIZE: Option<usize> = None;
699 type Ctx = ();
700
701 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
702 Ok(Self {
703 value: buf.parse(())?,
704 })
705 }
706}
707
708impl MySerialize for SetStrValue<'_> {
709 fn serialize(&self, buf: &mut Vec<u8>) {
710 self.value.serialize(buf);
711 }
712}
713
714#[derive(Debug, Clone, Eq, PartialEq)]
716pub struct SetStrValues<'a> {
717 num_variants: RawInt<LenEnc>,
718 values: Vec<SetStrValue<'a>>,
719}
720
721impl<'a> SetStrValues<'a> {
722 pub fn num_variants(&self) -> u64 {
724 self.num_variants.0
725 }
726
727 pub fn values(&'a self) -> &'a [SetStrValue<'a>] {
729 self.values.as_ref()
730 }
731}
732
733impl<'de> MyDeserialize<'de> for SetStrValues<'de> {
734 const SIZE: Option<usize> = None;
735 type Ctx = ();
736
737 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
738 let num_variants: RawInt<LenEnc> = buf.parse(())?;
739 let mut values = Vec::with_capacity(num_variants.0 as usize);
740 for _ in 0..num_variants.0 {
741 values.push(buf.parse(())?);
742 }
743 Ok(Self {
744 num_variants,
745 values,
746 })
747 }
748}
749
750impl MySerialize for SetStrValues<'_> {
751 fn serialize(&self, buf: &mut Vec<u8>) {
752 self.num_variants.serialize(&mut *buf);
753 for value in &self.values {
754 value.serialize(buf);
755 }
756 }
757}
758
759#[derive(Debug, Clone, Eq, PartialEq)]
761pub struct EnumsStrValues<'a> {
762 values: RawBytes<'a, EofBytes>,
763}
764
765impl<'a> EnumsStrValues<'a> {
766 pub fn iter_values(&'a self) -> IterEnumStrValues<'a> {
768 IterEnumStrValues {
769 buf: ParseBuf(self.values.as_bytes()),
770 }
771 }
772}
773
774pub struct IterEnumStrValues<'a> {
775 buf: ParseBuf<'a>,
776}
777
778impl<'a> Iterator for IterEnumStrValues<'a> {
779 type Item = io::Result<EnumStrValues<'a>>;
780
781 fn next(&mut self) -> Option<Self::Item> {
782 if self.buf.is_empty() {
783 None
784 } else {
785 match self.buf.parse(()) {
786 Ok(x) => Some(Ok(x)),
787 Err(e) => {
788 self.buf = ParseBuf(b"");
789 Some(Err(e))
790 }
791 }
792 }
793 }
794}
795
796impl<'de> MyDeserialize<'de> for EnumsStrValues<'de> {
797 const SIZE: Option<usize> = None;
798 type Ctx = ();
799
800 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
801 Ok(Self {
802 values: buf.parse(())?,
803 })
804 }
805}
806
807impl MySerialize for EnumsStrValues<'_> {
808 fn serialize(&self, buf: &mut Vec<u8>) {
809 self.values.serialize(buf);
810 }
811}
812
813#[derive(Debug, Clone, Eq, PartialEq)]
815pub struct EnumStrValue<'a> {
816 value: RawBytes<'a, LenEnc>,
817}
818
819impl<'a> EnumStrValue<'a> {
820 pub fn new(value: impl Into<Cow<'a, [u8]>>) -> Self {
822 Self {
823 value: RawBytes::new(value),
824 }
825 }
826
827 pub fn value_raw(&'a self) -> &'a [u8] {
829 self.value.as_bytes()
830 }
831
832 pub fn value(&'a self) -> Cow<'a, str> {
834 self.value.as_str()
835 }
836}
837
838impl<'de> MyDeserialize<'de> for EnumStrValue<'de> {
839 const SIZE: Option<usize> = None;
840 type Ctx = ();
841
842 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
843 Ok(Self {
844 value: buf.parse(())?,
845 })
846 }
847}
848
849impl MySerialize for EnumStrValue<'_> {
850 fn serialize(&self, buf: &mut Vec<u8>) {
851 self.value.serialize(buf);
852 }
853}
854
855#[derive(Debug, Clone, Eq, PartialEq)]
857pub struct EnumStrValues<'a> {
858 num_variants: RawInt<LenEnc>,
859 values: Vec<EnumStrValue<'a>>,
860}
861
862impl<'a> EnumStrValues<'a> {
863 pub fn num_variants(&self) -> u64 {
865 self.num_variants.0
866 }
867
868 pub fn values(&'a self) -> &'a [EnumStrValue<'a>] {
870 self.values.as_ref()
871 }
872}
873
874impl<'de> MyDeserialize<'de> for EnumStrValues<'de> {
875 const SIZE: Option<usize> = None;
876 type Ctx = ();
877
878 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
879 let num_variants: RawInt<LenEnc> = buf.parse(())?;
880 let mut values = Vec::with_capacity(num_variants.0 as usize);
881 for _ in 0..num_variants.0 {
882 values.push(buf.parse(())?);
883 }
884 Ok(Self {
885 num_variants,
886 values,
887 })
888 }
889}
890
891impl MySerialize for EnumStrValues<'_> {
892 fn serialize(&self, buf: &mut Vec<u8>) {
893 self.num_variants.serialize(&mut *buf);
894 for value in &self.values {
895 value.serialize(buf);
896 }
897 }
898}
899
900#[derive(Debug, Clone, Eq, PartialEq)]
902pub struct GeometryTypes<'a> {
903 geometry_types: RawBytes<'a, EofBytes>,
904}
905
906impl<'a> GeometryTypes<'a> {
907 pub fn iter_geometry_types(&'a self) -> IterGeometryTypes<'a> {
915 IterGeometryTypes {
916 buf: ParseBuf(self.geometry_types.as_bytes()),
917 }
918 }
919}
920
921pub struct IterGeometryTypes<'a> {
922 buf: ParseBuf<'a>,
923}
924
925impl<'a> Iterator for IterGeometryTypes<'a> {
926 type Item = io::Result<GeometryType>;
927
928 fn next(&mut self) -> Option<Self::Item> {
929 if self.buf.is_empty() {
930 None
931 } else {
932 match self.buf.parse::<RawInt<LenEnc>>(()) {
933 Ok(x) => Some(
934 GeometryType::try_from(x.0 as u8)
935 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)),
936 ),
937 Err(e) => {
938 self.buf = ParseBuf(b"");
939 Some(Err(e))
940 }
941 }
942 }
943 }
944}
945
946impl<'de> MyDeserialize<'de> for GeometryTypes<'de> {
947 const SIZE: Option<usize> = None;
948 type Ctx = ();
949
950 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
951 Ok(Self {
952 geometry_types: buf.parse(())?,
953 })
954 }
955}
956
957impl MySerialize for GeometryTypes<'_> {
958 fn serialize(&self, buf: &mut Vec<u8>) {
959 self.geometry_types.serialize(buf);
960 }
961}
962
963#[derive(Debug, Clone, Eq, PartialEq)]
965pub struct VectorDimensionalities<'a> {
966 dimensionalities: RawBytes<'a, EofBytes>,
967}
968
969impl<'a> VectorDimensionalities<'a> {
970 pub fn iter_dimensionalities(&'a self) -> IterVectorDimensionalities<'a> {
974 IterVectorDimensionalities {
975 buf: ParseBuf(self.dimensionalities.as_bytes()),
976 }
977 }
978}
979
980pub struct IterVectorDimensionalities<'a> {
981 buf: ParseBuf<'a>,
982}
983
984impl<'a> Iterator for IterVectorDimensionalities<'a> {
985 type Item = io::Result<u64>;
986
987 fn next(&mut self) -> Option<Self::Item> {
988 if self.buf.is_empty() {
989 None
990 } else {
991 match self.buf.parse::<RawInt<LenEnc>>(()) {
992 Ok(x) => Some(Ok(x.0)),
993 Err(e) => {
994 self.buf = ParseBuf(b"");
995 Some(Err(e))
996 }
997 }
998 }
999 }
1000}
1001
1002impl<'de> MyDeserialize<'de> for VectorDimensionalities<'de> {
1003 const SIZE: Option<usize> = None;
1004 type Ctx = ();
1005
1006 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
1007 Ok(Self {
1008 dimensionalities: buf.parse(())?,
1009 })
1010 }
1011}
1012
1013impl MySerialize for VectorDimensionalities<'_> {
1014 fn serialize(&self, buf: &mut Vec<u8>) {
1015 self.dimensionalities.serialize(buf);
1016 }
1017}
1018
1019#[derive(Debug, Clone, Eq, PartialEq)]
1021pub struct SimplePrimaryKey<'a> {
1022 indexes: RawBytes<'a, EofBytes>,
1023}
1024
1025impl<'a> SimplePrimaryKey<'a> {
1026 pub fn iter_indexes(&'a self) -> IterIndexes<'a> {
1028 IterIndexes {
1029 buf: ParseBuf(self.indexes.as_bytes()),
1030 }
1031 }
1032}
1033
1034pub struct IterIndexes<'a> {
1035 buf: ParseBuf<'a>,
1036}
1037
1038impl<'a> Iterator for IterIndexes<'a> {
1039 type Item = io::Result<u64>;
1040
1041 fn next(&mut self) -> Option<Self::Item> {
1042 if self.buf.is_empty() {
1043 None
1044 } else {
1045 match self.buf.parse::<RawInt<LenEnc>>(()) {
1046 Ok(x) => Some(Ok(x.0)),
1047 Err(e) => {
1048 self.buf = ParseBuf(b"");
1049 Some(Err(e))
1050 }
1051 }
1052 }
1053 }
1054}
1055
1056impl<'de> MyDeserialize<'de> for SimplePrimaryKey<'de> {
1057 const SIZE: Option<usize> = None;
1058 type Ctx = ();
1059
1060 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
1061 Ok(Self {
1062 indexes: buf.parse(())?,
1063 })
1064 }
1065}
1066
1067impl MySerialize for SimplePrimaryKey<'_> {
1068 fn serialize(&self, buf: &mut Vec<u8>) {
1069 self.indexes.serialize(buf);
1070 }
1071}
1072
1073#[derive(Debug, Clone, Eq, PartialEq)]
1075pub struct PrimaryKeysWithPrefix<'a> {
1076 data: RawBytes<'a, EofBytes>,
1077}
1078
1079impl<'a> PrimaryKeysWithPrefix<'a> {
1080 pub fn iter_keys(&'a self) -> IterKeys<'a> {
1082 IterKeys {
1083 buf: ParseBuf(self.data.as_bytes()),
1084 }
1085 }
1086}
1087
1088pub struct IterKeys<'a> {
1089 buf: ParseBuf<'a>,
1090}
1091
1092impl<'a> Iterator for IterKeys<'a> {
1093 type Item = io::Result<PrimaryKeyWithPrefix>;
1094
1095 fn next(&mut self) -> Option<Self::Item> {
1096 if self.buf.is_empty() {
1097 None
1098 } else {
1099 match self.buf.parse(()) {
1100 Ok(x) => Some(Ok(x)),
1101 Err(e) => {
1102 self.buf = ParseBuf(b"");
1103 Some(Err(e))
1104 }
1105 }
1106 }
1107 }
1108}
1109
1110impl<'de> MyDeserialize<'de> for PrimaryKeysWithPrefix<'de> {
1111 const SIZE: Option<usize> = None;
1112 type Ctx = ();
1113
1114 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
1115 Ok(Self {
1116 data: buf.parse(())?,
1117 })
1118 }
1119}
1120
1121impl MySerialize for PrimaryKeysWithPrefix<'_> {
1122 fn serialize(&self, buf: &mut Vec<u8>) {
1123 self.data.serialize(buf);
1124 }
1125}
1126
1127#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1129pub struct PrimaryKeyWithPrefix {
1130 column_index: RawInt<LenEnc>,
1131 prefix_length: RawInt<LenEnc>,
1132}
1133
1134impl PrimaryKeyWithPrefix {
1135 pub fn new(column_index: u64, prefix_length: u64) -> Self {
1137 Self {
1138 column_index: RawInt::new(column_index),
1139 prefix_length: RawInt::new(prefix_length),
1140 }
1141 }
1142
1143 pub fn column_index(&self) -> u64 {
1145 self.column_index.0
1146 }
1147
1148 pub fn prefix_length(&self) -> u64 {
1150 self.prefix_length.0
1151 }
1152}
1153
1154impl<'de> MyDeserialize<'de> for PrimaryKeyWithPrefix {
1155 const SIZE: Option<usize> = None;
1156 type Ctx = ();
1157
1158 fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
1159 Ok(Self {
1160 column_index: buf.parse(())?,
1161 prefix_length: buf.parse(())?,
1162 })
1163 }
1164}
1165
1166impl MySerialize for PrimaryKeyWithPrefix {
1167 fn serialize(&self, buf: &mut Vec<u8>) {
1168 self.column_index.serialize(buf);
1169 self.prefix_length.serialize(buf);
1170 }
1171}
1172
1173#[derive(Debug, Clone, Eq, PartialEq)]
1174pub enum OptionalMetadataField<'a> {
1175 Signedness(
1177 &'a BitSlice<u8, Msb0>,
1179 ),
1180 DefaultCharset(DefaultCharset<'a>),
1182 ColumnCharset(ColumnCharsets<'a>),
1184 ColumnName(ColumnNames<'a>),
1186 SetStrValue(SetsStrValues<'a>),
1188 EnumStrValue(EnumsStrValues<'a>),
1190 GeometryType(GeometryTypes<'a>),
1192 SimplePrimaryKey(SimplePrimaryKey<'a>),
1194 PrimaryKeyWithPrefix(PrimaryKeysWithPrefix<'a>),
1196 EnumAndSetDefaultCharset(DefaultCharset<'a>),
1198 EnumAndSetColumnCharset(ColumnCharsets<'a>),
1200 ColumnVisibility(
1202 &'a BitSlice<u8, Msb0>,
1204 ),
1205 Dimensionality(VectorDimensionalities<'a>),
1207}
1208
1209#[derive(Debug)]
1211pub struct OptionalMetadataIter<'a> {
1212 columns: &'a RawSeq<'a, u8, ColumnType>,
1213 data: &'a [u8],
1214}
1215
1216impl<'a> OptionalMetadataIter<'a> {
1217 fn read_tlv(&mut self) -> io::Result<(RawConst<u8, OptionalMetadataFieldType>, &'a [u8])> {
1219 let t = self.data.read_u8()?;
1220 let l = self.data.read_lenenc_int()? as usize;
1222 let v = match self.data.get(..l) {
1223 Some(v) => v,
1224 None => {
1225 self.data = &[];
1226 return Err(io::Error::new(
1227 io::ErrorKind::UnexpectedEof,
1228 "can't read tlv value",
1229 ));
1230 }
1231 };
1232 self.data = &self.data[l..];
1233 Ok((RawConst::new(t), v))
1234 }
1235
1236 fn next_tlv(
1238 &mut self,
1239 ) -> Option<io::Result<(RawConst<u8, OptionalMetadataFieldType>, &'a [u8])>> {
1240 if self.data.is_empty() {
1241 return None;
1242 }
1243
1244 self.read_tlv().map(Some).transpose()
1245 }
1246
1247 fn num_columns(&self) -> usize {
1248 self.columns.0.len()
1249 }
1250
1251 fn count_columns(&self, f: fn(&ColumnType) -> bool) -> usize {
1252 self.columns
1253 .0
1254 .iter()
1255 .filter_map(|val| ColumnType::try_from(*val).ok())
1256 .filter(f)
1257 .count()
1258 }
1259}
1260
1261impl<'a> Iterator for OptionalMetadataIter<'a> {
1262 type Item = io::Result<OptionalMetadataField<'a>>;
1263
1264 fn next(&mut self) -> Option<Self::Item> {
1265 use OptionalMetadataFieldType::*;
1266
1267 self.next_tlv()?
1268 .and_then(|(t, v)| {
1269 let mut v = ParseBuf(v);
1270 match t.get() {
1271 Ok(t) => match t {
1272 SIGNEDNESS => {
1273 let num_numeric = self.count_columns(ColumnType::is_numeric_type);
1274 let num_flags_bytes = (num_numeric + 7) / 8;
1275 let flags: &[u8] = v.parse(num_flags_bytes)?;
1276
1277 if !v.is_empty() {
1278 return Err(io::Error::new(
1279 io::ErrorKind::Other,
1280 "bytes remaining on stream",
1281 ));
1282 }
1283
1284 let flags = BitSlice::from_slice(flags);
1285 Ok(OptionalMetadataField::Signedness(&flags[..num_numeric]))
1286 }
1287 DEFAULT_CHARSET => Ok(OptionalMetadataField::DefaultCharset(v.parse(())?)),
1288 COLUMN_CHARSET => Ok(OptionalMetadataField::ColumnCharset(v.parse(())?)),
1289 COLUMN_NAME => Ok(OptionalMetadataField::ColumnName(v.parse(())?)),
1290 SET_STR_VALUE => Ok(OptionalMetadataField::SetStrValue(v.parse(())?)),
1291 ENUM_STR_VALUE => Ok(OptionalMetadataField::EnumStrValue(v.parse(())?)),
1292 GEOMETRY_TYPE => Ok(OptionalMetadataField::GeometryType(v.parse(())?)),
1293 SIMPLE_PRIMARY_KEY => {
1294 Ok(OptionalMetadataField::SimplePrimaryKey(v.parse(())?))
1295 }
1296 PRIMARY_KEY_WITH_PREFIX => {
1297 Ok(OptionalMetadataField::PrimaryKeyWithPrefix(v.parse(())?))
1298 }
1299 ENUM_AND_SET_DEFAULT_CHARSET => Ok(
1300 OptionalMetadataField::EnumAndSetDefaultCharset(v.parse(())?),
1301 ),
1302 ENUM_AND_SET_COLUMN_CHARSET => {
1303 Ok(OptionalMetadataField::EnumAndSetColumnCharset(v.parse(())?))
1304 }
1305 COLUMN_VISIBILITY => {
1306 let num_columns = self.num_columns();
1307 let num_flags_bytes = (num_columns + 7) / 8;
1308 let flags: &[u8] = v.parse(num_flags_bytes)?;
1309
1310 if !v.is_empty() {
1311 return Err(io::Error::new(
1312 io::ErrorKind::Other,
1313 "bytes remaining on stream",
1314 ));
1315 }
1316
1317 let flags = BitSlice::from_slice(flags);
1318 let flags = &flags[..num_columns];
1319 Ok(OptionalMetadataField::ColumnVisibility(flags))
1320 }
1321 VECTOR_DIMENSIONALITY => {
1322 Ok(OptionalMetadataField::Dimensionality(v.parse(())?))
1323 }
1324 },
1325 Err(_) => Err(io::Error::new(
1326 io::ErrorKind::InvalidData,
1327 "Unknown optional metadata field type",
1328 )),
1329 }
1330 })
1331 .map(Some)
1332 .transpose()
1333 }
1334}
1335
1336pub struct OptionalMetaExtractor<'a> {
1338 signedness: Option<&'a BitSlice<u8, Msb0>>,
1339 default_charset: Option<DefaultCharset<'a>>,
1340 column_charset: Option<ColumnCharsets<'a>>,
1341 column_name: Option<ColumnNames<'a>>,
1342 simple_primary_key: Option<SimplePrimaryKey<'a>>,
1343 primary_key_with_prefix: Option<PrimaryKeysWithPrefix<'a>>,
1344 enum_and_set_default_charset: Option<DefaultCharset<'a>>,
1345 enum_and_set_column_charset: Option<ColumnCharsets<'a>>,
1346}
1347
1348impl<'a> OptionalMetaExtractor<'a> {
1349 pub fn new(iter_optional_meta: OptionalMetadataIter<'a>) -> io::Result<Self> {
1350 let mut this = Self {
1351 signedness: None,
1352 default_charset: None,
1353 column_charset: None,
1354 column_name: None,
1355 simple_primary_key: None,
1356 primary_key_with_prefix: None,
1357 enum_and_set_default_charset: None,
1358 enum_and_set_column_charset: None,
1359 };
1360
1361 for field in iter_optional_meta {
1362 match field? {
1363 OptionalMetadataField::Signedness(x) => this.signedness = Some(x),
1364 OptionalMetadataField::DefaultCharset(x) => {
1365 this.default_charset = Some(x);
1366 }
1367 OptionalMetadataField::ColumnCharset(x) => {
1368 this.column_charset = Some(x);
1369 }
1370 OptionalMetadataField::ColumnName(x) => {
1371 this.column_name = Some(x);
1372 }
1373 OptionalMetadataField::SetStrValue(_) => (),
1374 OptionalMetadataField::EnumStrValue(_) => (),
1375 OptionalMetadataField::GeometryType(_) => (),
1376 OptionalMetadataField::SimplePrimaryKey(x) => {
1377 this.simple_primary_key = Some(x);
1378 }
1379 OptionalMetadataField::PrimaryKeyWithPrefix(x) => {
1380 this.primary_key_with_prefix = Some(x);
1381 }
1382 OptionalMetadataField::EnumAndSetDefaultCharset(x) => {
1383 this.enum_and_set_default_charset = Some(x);
1384 }
1385 OptionalMetadataField::EnumAndSetColumnCharset(x) => {
1386 this.enum_and_set_column_charset = Some(x);
1387 }
1388 OptionalMetadataField::ColumnVisibility(_) => (),
1389 OptionalMetadataField::Dimensionality(_) => (),
1390 }
1391 }
1392
1393 Ok(this)
1394 }
1395
1396 pub fn iter_signedness(&'a self) -> impl Iterator<Item = bool> + 'a {
1400 self.signedness
1401 .as_ref()
1402 .map(|x| x.iter().by_vals())
1403 .into_iter()
1404 .flatten()
1405 }
1406
1407 pub fn iter_charset(&'a self) -> impl Iterator<Item = Result<u16, io::Error>> + 'a {
1417 let default_charset = self.default_charset.as_ref().map(|x| x.default_charset());
1418 let non_default = self.default_charset.as_ref().map(|x| x.iter_non_default());
1419 let per_column = self.column_charset.as_ref().map(|x| x.iter_charsets());
1420
1421 iter_charset_helper(default_charset, non_default, per_column)
1422 }
1423
1424 pub fn iter_enum_and_set_charset(
1434 &'a self,
1435 ) -> impl Iterator<Item = Result<u16, io::Error>> + 'a {
1436 let default_charset = self
1437 .enum_and_set_default_charset
1438 .as_ref()
1439 .map(|x| x.default_charset());
1440 let non_default = self
1441 .enum_and_set_default_charset
1442 .as_ref()
1443 .map(|x| x.iter_non_default());
1444 let per_column = self
1445 .enum_and_set_column_charset
1446 .as_ref()
1447 .map(|x| x.iter_charsets());
1448
1449 iter_charset_helper(default_charset, non_default, per_column)
1450 }
1451
1452 pub fn iter_primary_key(&'a self) -> Peekable<impl Iterator<Item = io::Result<u64>> + 'a> {
1458 let simple = self
1459 .simple_primary_key
1460 .as_ref()
1461 .map(|x| x.iter_indexes())
1462 .into_iter()
1463 .flatten();
1464 let prefixed = self
1465 .primary_key_with_prefix
1466 .as_ref()
1467 .map(|x| x.iter_keys().map(|x| x.map(|x| x.column_index())))
1468 .into_iter()
1469 .flatten();
1470
1471 simple.chain(prefixed).peekable()
1472 }
1473
1474 pub fn iter_column_name(&'a self) -> impl Iterator<Item = io::Result<ColumnName<'a>>> + 'a {
1478 self.column_name
1479 .as_ref()
1480 .map(|x| x.iter_names())
1481 .into_iter()
1482 .flatten()
1483 }
1484}
1485
1486fn iter_charset_helper<'a>(
1487 default_charset: Option<u16>,
1488 iter_non_default: Option<IterNonDefault<'a>>,
1489 iter_charsets: Option<IterCharsets<'a>>,
1490) -> impl Iterator<Item = Result<u16, io::Error>> + 'a {
1491 let non_default = iter_non_default
1492 .into_iter()
1493 .flatten()
1494 .map(|x| x.map(Either::Left));
1495 let per_column = iter_charsets
1496 .into_iter()
1497 .flatten()
1498 .map(|x| x.map(Either::Right));
1499
1500 let mut non_default = non_default.chain(per_column).peekable();
1501 let mut broken = false;
1502 let mut current = 0;
1503 std::iter::from_fn(move || {
1504 if broken {
1505 return None;
1506 }
1507
1508 let result = match non_default.peek() {
1509 Some(x) => match x {
1510 Ok(x) => match x {
1511 Either::Left(x) => {
1512 if x.column_index() == current {
1513 non_default
1514 .next()
1515 .map(|x| Ok(x.unwrap().unwrap_left().charset()))
1516 } else {
1517 default_charset.map(Ok)
1518 }
1519 }
1520 Either::Right(x) => {
1521 let x = *x;
1522 non_default.next().map(|_| Ok(x))
1523 }
1524 },
1525 Err(_) => {
1526 broken = true;
1527 non_default.next().map(|x| Err(x.unwrap_err()))
1528 }
1529 },
1530 None => default_charset.map(Ok),
1531 };
1532
1533 current += 1;
1534 result
1535 })
1536}