1mod memory;
95pub(crate) mod reader;
96mod writer;
97
98use std::ops::Range;
99use std::sync::Arc;
100
101use crate::format::{
102 BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup,
103 SizeStatistics, SortingColumn,
104};
105
106use crate::basic::{ColumnOrder, Compression, Encoding, Type};
107use crate::errors::{ParquetError, Result};
108pub(crate) use crate::file::metadata::memory::HeapSize;
109use crate::file::page_encoding_stats::{self, PageEncodingStats};
110use crate::file::page_index::index::Index;
111use crate::file::page_index::offset_index::OffsetIndexMetaData;
112use crate::file::statistics::{self, Statistics};
113use crate::schema::types::{
114 ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor,
115 Type as SchemaType,
116};
117pub use reader::ParquetMetaDataReader;
118pub use writer::ParquetMetaDataWriter;
119pub(crate) use writer::ThriftMetadataWriter;
120
121pub type ParquetColumnIndex = Vec<Vec<Index>>;
137
138pub type ParquetOffsetIndex = Vec<Vec<OffsetIndexMetaData>>;
149
150#[derive(Debug, Clone, PartialEq)]
168pub struct ParquetMetaData {
169 file_metadata: FileMetaData,
171 row_groups: Vec<RowGroupMetaData>,
173 column_index: Option<ParquetColumnIndex>,
175 offset_index: Option<ParquetOffsetIndex>,
177}
178
179impl ParquetMetaData {
180 pub fn new(file_metadata: FileMetaData, row_groups: Vec<RowGroupMetaData>) -> Self {
183 ParquetMetaData {
184 file_metadata,
185 row_groups,
186 column_index: None,
187 offset_index: None,
188 }
189 }
190
191 #[deprecated(note = "Use ParquetMetaDataBuilder")]
194 pub fn new_with_page_index(
195 file_metadata: FileMetaData,
196 row_groups: Vec<RowGroupMetaData>,
197 column_index: Option<ParquetColumnIndex>,
198 offset_index: Option<ParquetOffsetIndex>,
199 ) -> Self {
200 ParquetMetaDataBuilder::new(file_metadata)
201 .set_row_groups(row_groups)
202 .set_column_index(column_index)
203 .set_offset_index(offset_index)
204 .build()
205 }
206
207 pub fn into_builder(self) -> ParquetMetaDataBuilder {
209 self.into()
210 }
211
212 pub fn file_metadata(&self) -> &FileMetaData {
214 &self.file_metadata
215 }
216
217 pub fn num_row_groups(&self) -> usize {
219 self.row_groups.len()
220 }
221
222 pub fn row_group(&self, i: usize) -> &RowGroupMetaData {
225 &self.row_groups[i]
226 }
227
228 pub fn row_groups(&self) -> &[RowGroupMetaData] {
230 &self.row_groups
231 }
232
233 #[deprecated(note = "Use Self::column_index")]
235 pub fn page_indexes(&self) -> Option<&ParquetColumnIndex> {
236 self.column_index.as_ref()
237 }
238
239 pub fn column_index(&self) -> Option<&ParquetColumnIndex> {
246 self.column_index.as_ref()
247 }
248
249 #[deprecated(note = "Use Self::offset_index")]
251 pub fn offset_indexes(&self) -> Option<&ParquetOffsetIndex> {
252 self.offset_index.as_ref()
253 }
254
255 pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
262 self.offset_index.as_ref()
263 }
264
265 pub fn memory_size(&self) -> usize {
280 std::mem::size_of::<Self>()
281 + self.file_metadata.heap_size()
282 + self.row_groups.heap_size()
283 + self.column_index.heap_size()
284 + self.offset_index.heap_size()
285 }
286
287 pub(crate) fn set_column_index(&mut self, index: Option<ParquetColumnIndex>) {
289 self.column_index = index;
290 }
291
292 pub(crate) fn set_offset_index(&mut self, index: Option<ParquetOffsetIndex>) {
294 self.offset_index = index;
295 }
296}
297
298pub struct ParquetMetaDataBuilder(ParquetMetaData);
336
337impl ParquetMetaDataBuilder {
338 pub fn new(file_meta_data: FileMetaData) -> Self {
340 Self(ParquetMetaData::new(file_meta_data, vec![]))
341 }
342
343 pub fn new_from_metadata(metadata: ParquetMetaData) -> Self {
345 Self(metadata)
346 }
347
348 pub fn add_row_group(mut self, row_group: RowGroupMetaData) -> Self {
350 self.0.row_groups.push(row_group);
351 self
352 }
353
354 pub fn set_row_groups(mut self, row_groups: Vec<RowGroupMetaData>) -> Self {
356 self.0.row_groups = row_groups;
357 self
358 }
359
360 pub fn take_row_groups(&mut self) -> Vec<RowGroupMetaData> {
366 std::mem::take(&mut self.0.row_groups)
367 }
368
369 pub fn row_groups(&self) -> &[RowGroupMetaData] {
371 &self.0.row_groups
372 }
373
374 pub fn set_column_index(mut self, column_index: Option<ParquetColumnIndex>) -> Self {
376 self.0.column_index = column_index;
377 self
378 }
379
380 pub fn take_column_index(&mut self) -> Option<ParquetColumnIndex> {
382 std::mem::take(&mut self.0.column_index)
383 }
384
385 pub fn column_index(&self) -> Option<&ParquetColumnIndex> {
387 self.0.column_index.as_ref()
388 }
389
390 pub fn set_offset_index(mut self, offset_index: Option<ParquetOffsetIndex>) -> Self {
392 self.0.offset_index = offset_index;
393 self
394 }
395
396 pub fn take_offset_index(&mut self) -> Option<ParquetOffsetIndex> {
398 std::mem::take(&mut self.0.offset_index)
399 }
400
401 pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
403 self.0.offset_index.as_ref()
404 }
405
406 pub fn build(self) -> ParquetMetaData {
408 let Self(metadata) = self;
409 metadata
410 }
411}
412
413impl From<ParquetMetaData> for ParquetMetaDataBuilder {
414 fn from(meta_data: ParquetMetaData) -> Self {
415 Self(meta_data)
416 }
417}
418
419pub type KeyValue = crate::format::KeyValue;
421
422pub type FileMetaDataPtr = Arc<FileMetaData>;
424
425#[derive(Debug, Clone, PartialEq)]
429pub struct FileMetaData {
430 version: i32,
431 num_rows: i64,
432 created_by: Option<String>,
433 key_value_metadata: Option<Vec<KeyValue>>,
434 schema_descr: SchemaDescPtr,
435 column_orders: Option<Vec<ColumnOrder>>,
436}
437
438impl FileMetaData {
439 pub fn new(
441 version: i32,
442 num_rows: i64,
443 created_by: Option<String>,
444 key_value_metadata: Option<Vec<KeyValue>>,
445 schema_descr: SchemaDescPtr,
446 column_orders: Option<Vec<ColumnOrder>>,
447 ) -> Self {
448 FileMetaData {
449 version,
450 num_rows,
451 created_by,
452 key_value_metadata,
453 schema_descr,
454 column_orders,
455 }
456 }
457
458 pub fn version(&self) -> i32 {
460 self.version
461 }
462
463 pub fn num_rows(&self) -> i64 {
465 self.num_rows
466 }
467
468 pub fn created_by(&self) -> Option<&str> {
477 self.created_by.as_deref()
478 }
479
480 pub fn key_value_metadata(&self) -> Option<&Vec<KeyValue>> {
482 self.key_value_metadata.as_ref()
483 }
484
485 pub fn schema(&self) -> &SchemaType {
489 self.schema_descr.root_schema()
490 }
491
492 pub fn schema_descr(&self) -> &SchemaDescriptor {
494 &self.schema_descr
495 }
496
497 pub fn schema_descr_ptr(&self) -> SchemaDescPtr {
499 self.schema_descr.clone()
500 }
501
502 pub fn column_orders(&self) -> Option<&Vec<ColumnOrder>> {
510 self.column_orders.as_ref()
511 }
512
513 pub fn column_order(&self, i: usize) -> ColumnOrder {
516 self.column_orders
517 .as_ref()
518 .map(|data| data[i])
519 .unwrap_or(ColumnOrder::UNDEFINED)
520 }
521}
522
523pub type RowGroupMetaDataPtr = Arc<RowGroupMetaData>;
525
526#[derive(Debug, Clone, PartialEq)]
531pub struct RowGroupMetaData {
532 columns: Vec<ColumnChunkMetaData>,
533 num_rows: i64,
534 sorting_columns: Option<Vec<SortingColumn>>,
535 total_byte_size: i64,
536 schema_descr: SchemaDescPtr,
537 file_offset: Option<i64>,
539 ordinal: Option<i16>,
541}
542
543impl RowGroupMetaData {
544 pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder {
546 RowGroupMetaDataBuilder::new(schema_descr)
547 }
548
549 pub fn num_columns(&self) -> usize {
551 self.columns.len()
552 }
553
554 pub fn column(&self, i: usize) -> &ColumnChunkMetaData {
556 &self.columns[i]
557 }
558
559 pub fn columns(&self) -> &[ColumnChunkMetaData] {
561 &self.columns
562 }
563
564 pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
566 &mut self.columns
567 }
568
569 pub fn num_rows(&self) -> i64 {
571 self.num_rows
572 }
573
574 pub fn sorting_columns(&self) -> Option<&Vec<SortingColumn>> {
576 self.sorting_columns.as_ref()
577 }
578
579 pub fn total_byte_size(&self) -> i64 {
581 self.total_byte_size
582 }
583
584 pub fn compressed_size(&self) -> i64 {
586 self.columns.iter().map(|c| c.total_compressed_size).sum()
587 }
588
589 pub fn schema_descr(&self) -> &SchemaDescriptor {
591 self.schema_descr.as_ref()
592 }
593
594 pub fn schema_descr_ptr(&self) -> SchemaDescPtr {
596 self.schema_descr.clone()
597 }
598
599 #[inline(always)]
604 pub fn ordinal(&self) -> Option<i16> {
605 self.ordinal
606 }
607
608 #[inline(always)]
610 pub fn file_offset(&self) -> Option<i64> {
611 self.file_offset
612 }
613
614 pub fn from_thrift(schema_descr: SchemaDescPtr, mut rg: RowGroup) -> Result<RowGroupMetaData> {
616 if schema_descr.num_columns() != rg.columns.len() {
617 return Err(general_err!(
618 "Column count mismatch. Schema has {} columns while Row Group has {}",
619 schema_descr.num_columns(),
620 rg.columns.len()
621 ));
622 }
623 let total_byte_size = rg.total_byte_size;
624 let num_rows = rg.num_rows;
625 let mut columns = vec![];
626 for (c, d) in rg.columns.drain(0..).zip(schema_descr.columns()) {
627 let cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
628 columns.push(cc);
629 }
630 let sorting_columns = rg.sorting_columns;
631 Ok(RowGroupMetaData {
632 columns,
633 num_rows,
634 sorting_columns,
635 total_byte_size,
636 schema_descr,
637 file_offset: rg.file_offset,
638 ordinal: rg.ordinal,
639 })
640 }
641
642 pub fn to_thrift(&self) -> RowGroup {
644 RowGroup {
645 columns: self.columns().iter().map(|v| v.to_thrift()).collect(),
646 total_byte_size: self.total_byte_size,
647 num_rows: self.num_rows,
648 sorting_columns: self.sorting_columns().cloned(),
649 file_offset: self.file_offset(),
650 total_compressed_size: Some(self.compressed_size()),
651 ordinal: self.ordinal,
652 }
653 }
654
655 pub fn into_builder(self) -> RowGroupMetaDataBuilder {
657 RowGroupMetaDataBuilder(self)
658 }
659}
660
661pub struct RowGroupMetaDataBuilder(RowGroupMetaData);
663
664impl RowGroupMetaDataBuilder {
665 fn new(schema_descr: SchemaDescPtr) -> Self {
667 Self(RowGroupMetaData {
668 columns: Vec::with_capacity(schema_descr.num_columns()),
669 schema_descr,
670 file_offset: None,
671 num_rows: 0,
672 sorting_columns: None,
673 total_byte_size: 0,
674 ordinal: None,
675 })
676 }
677
678 pub fn set_num_rows(mut self, value: i64) -> Self {
680 self.0.num_rows = value;
681 self
682 }
683
684 pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) -> Self {
686 self.0.sorting_columns = value;
687 self
688 }
689
690 pub fn set_total_byte_size(mut self, value: i64) -> Self {
692 self.0.total_byte_size = value;
693 self
694 }
695
696 pub fn take_columns(&mut self) -> Vec<ColumnChunkMetaData> {
702 std::mem::take(&mut self.0.columns)
703 }
704
705 pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaData>) -> Self {
707 self.0.columns = value;
708 self
709 }
710
711 pub fn add_column_metadata(mut self, value: ColumnChunkMetaData) -> Self {
713 self.0.columns.push(value);
714 self
715 }
716
717 pub fn set_ordinal(mut self, value: i16) -> Self {
719 self.0.ordinal = Some(value);
720 self
721 }
722
723 pub fn set_file_offset(mut self, value: i64) -> Self {
725 self.0.file_offset = Some(value);
726 self
727 }
728
729 pub fn build(self) -> Result<RowGroupMetaData> {
731 if self.0.schema_descr.num_columns() != self.0.columns.len() {
732 return Err(general_err!(
733 "Column length mismatch: {} != {}",
734 self.0.schema_descr.num_columns(),
735 self.0.columns.len()
736 ));
737 }
738
739 Ok(self.0)
740 }
741}
742
743#[derive(Debug, Clone, PartialEq)]
745pub struct ColumnChunkMetaData {
746 column_descr: ColumnDescPtr,
747 encodings: Vec<Encoding>,
748 file_path: Option<String>,
749 file_offset: i64,
750 num_values: i64,
751 compression: Compression,
752 total_compressed_size: i64,
753 total_uncompressed_size: i64,
754 data_page_offset: i64,
755 index_page_offset: Option<i64>,
756 dictionary_page_offset: Option<i64>,
757 statistics: Option<Statistics>,
758 encoding_stats: Option<Vec<PageEncodingStats>>,
759 bloom_filter_offset: Option<i64>,
760 bloom_filter_length: Option<i32>,
761 offset_index_offset: Option<i64>,
762 offset_index_length: Option<i32>,
763 column_index_offset: Option<i64>,
764 column_index_length: Option<i32>,
765 unencoded_byte_array_data_bytes: Option<i64>,
766 repetition_level_histogram: Option<LevelHistogram>,
767 definition_level_histogram: Option<LevelHistogram>,
768}
769
770#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
779pub struct LevelHistogram {
780 inner: Vec<i64>,
781}
782
783impl LevelHistogram {
784 pub fn try_new(max_level: i16) -> Option<Self> {
790 if max_level > 0 {
791 Some(Self {
792 inner: vec![0; max_level as usize + 1],
793 })
794 } else {
795 None
796 }
797 }
798 pub fn values(&self) -> &[i64] {
800 &self.inner
801 }
802
803 pub fn into_inner(self) -> Vec<i64> {
805 self.inner
806 }
807
808 pub fn get(&self, index: usize) -> Option<i64> {
815 self.inner.get(index).copied()
816 }
817
818 pub fn add(&mut self, other: &Self) {
823 assert_eq!(self.len(), other.len());
824 for (dst, src) in self.inner.iter_mut().zip(other.inner.iter()) {
825 *dst += src;
826 }
827 }
828
829 pub fn len(&self) -> usize {
831 self.inner.len()
832 }
833
834 pub fn is_empty(&self) -> bool {
836 self.inner.is_empty()
837 }
838
839 pub fn reset(&mut self) {
841 for value in self.inner.iter_mut() {
842 *value = 0;
843 }
844 }
845
846 pub fn update_from_levels(&mut self, levels: &[i16]) {
852 for &level in levels {
853 self.inner[level as usize] += 1;
854 }
855 }
856}
857
858impl From<Vec<i64>> for LevelHistogram {
859 fn from(inner: Vec<i64>) -> Self {
860 Self { inner }
861 }
862}
863
864impl From<LevelHistogram> for Vec<i64> {
865 fn from(value: LevelHistogram) -> Self {
866 value.into_inner()
867 }
868}
869
870impl HeapSize for LevelHistogram {
871 fn heap_size(&self) -> usize {
872 self.inner.heap_size()
873 }
874}
875
876impl ColumnChunkMetaData {
878 pub fn builder(column_descr: ColumnDescPtr) -> ColumnChunkMetaDataBuilder {
880 ColumnChunkMetaDataBuilder::new(column_descr)
881 }
882
883 pub fn file_path(&self) -> Option<&str> {
888 self.file_path.as_deref()
889 }
890
891 pub fn file_offset(&self) -> i64 {
898 self.file_offset
899 }
900
901 pub fn column_type(&self) -> Type {
903 self.column_descr.physical_type()
904 }
905
906 pub fn column_path(&self) -> &ColumnPath {
908 self.column_descr.path()
909 }
910
911 pub fn column_descr(&self) -> &ColumnDescriptor {
913 self.column_descr.as_ref()
914 }
915
916 pub fn column_descr_ptr(&self) -> ColumnDescPtr {
918 self.column_descr.clone()
919 }
920
921 pub fn encodings(&self) -> &Vec<Encoding> {
923 &self.encodings
924 }
925
926 pub fn num_values(&self) -> i64 {
928 self.num_values
929 }
930
931 pub fn compression(&self) -> Compression {
933 self.compression
934 }
935
936 pub fn compressed_size(&self) -> i64 {
938 self.total_compressed_size
939 }
940
941 pub fn uncompressed_size(&self) -> i64 {
943 self.total_uncompressed_size
944 }
945
946 pub fn data_page_offset(&self) -> i64 {
948 self.data_page_offset
949 }
950
951 pub fn index_page_offset(&self) -> Option<i64> {
953 self.index_page_offset
954 }
955
956 pub fn dictionary_page_offset(&self) -> Option<i64> {
958 self.dictionary_page_offset
959 }
960
961 pub fn byte_range(&self) -> (u64, u64) {
963 let col_start = match self.dictionary_page_offset() {
964 Some(dictionary_page_offset) => dictionary_page_offset,
965 None => self.data_page_offset(),
966 };
967 let col_len = self.compressed_size();
968 assert!(
969 col_start >= 0 && col_len >= 0,
970 "column start and length should not be negative"
971 );
972 (col_start as u64, col_len as u64)
973 }
974
975 pub fn statistics(&self) -> Option<&Statistics> {
978 self.statistics.as_ref()
979 }
980
981 pub fn page_encoding_stats(&self) -> Option<&Vec<PageEncodingStats>> {
984 self.encoding_stats.as_ref()
985 }
986
987 pub fn bloom_filter_offset(&self) -> Option<i64> {
989 self.bloom_filter_offset
990 }
991
992 pub fn bloom_filter_length(&self) -> Option<i32> {
994 self.bloom_filter_length
995 }
996
997 pub fn column_index_offset(&self) -> Option<i64> {
999 self.column_index_offset
1000 }
1001
1002 pub fn column_index_length(&self) -> Option<i32> {
1004 self.column_index_length
1005 }
1006
1007 pub(crate) fn column_index_range(&self) -> Option<Range<usize>> {
1009 let offset = usize::try_from(self.column_index_offset?).ok()?;
1010 let length = usize::try_from(self.column_index_length?).ok()?;
1011 Some(offset..(offset + length))
1012 }
1013
1014 pub fn offset_index_offset(&self) -> Option<i64> {
1016 self.offset_index_offset
1017 }
1018
1019 pub fn offset_index_length(&self) -> Option<i32> {
1021 self.offset_index_length
1022 }
1023
1024 pub(crate) fn offset_index_range(&self) -> Option<Range<usize>> {
1026 let offset = usize::try_from(self.offset_index_offset?).ok()?;
1027 let length = usize::try_from(self.offset_index_length?).ok()?;
1028 Some(offset..(offset + length))
1029 }
1030
1031 pub fn unencoded_byte_array_data_bytes(&self) -> Option<i64> {
1036 self.unencoded_byte_array_data_bytes
1037 }
1038
1039 pub fn repetition_level_histogram(&self) -> Option<&LevelHistogram> {
1045 self.repetition_level_histogram.as_ref()
1046 }
1047
1048 pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> {
1054 self.definition_level_histogram.as_ref()
1055 }
1056
1057 pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> Result<Self> {
1059 if cc.meta_data.is_none() {
1060 return Err(general_err!("Expected to have column metadata"));
1061 }
1062 let mut col_metadata: ColumnMetaData = cc.meta_data.unwrap();
1063 let column_type = Type::try_from(col_metadata.type_)?;
1064 let encodings = col_metadata
1065 .encodings
1066 .drain(0..)
1067 .map(Encoding::try_from)
1068 .collect::<Result<_>>()?;
1069 let compression = Compression::try_from(col_metadata.codec)?;
1070 let file_path = cc.file_path;
1071 let file_offset = cc.file_offset;
1072 let num_values = col_metadata.num_values;
1073 let total_compressed_size = col_metadata.total_compressed_size;
1074 let total_uncompressed_size = col_metadata.total_uncompressed_size;
1075 let data_page_offset = col_metadata.data_page_offset;
1076 let index_page_offset = col_metadata.index_page_offset;
1077 let dictionary_page_offset = col_metadata.dictionary_page_offset;
1078 let statistics = statistics::from_thrift(column_type, col_metadata.statistics)?;
1079 let encoding_stats = col_metadata
1080 .encoding_stats
1081 .as_ref()
1082 .map(|vec| {
1083 vec.iter()
1084 .map(page_encoding_stats::try_from_thrift)
1085 .collect::<Result<_>>()
1086 })
1087 .transpose()?;
1088 let bloom_filter_offset = col_metadata.bloom_filter_offset;
1089 let bloom_filter_length = col_metadata.bloom_filter_length;
1090 let offset_index_offset = cc.offset_index_offset;
1091 let offset_index_length = cc.offset_index_length;
1092 let column_index_offset = cc.column_index_offset;
1093 let column_index_length = cc.column_index_length;
1094 let (
1095 unencoded_byte_array_data_bytes,
1096 repetition_level_histogram,
1097 definition_level_histogram,
1098 ) = if let Some(size_stats) = col_metadata.size_statistics {
1099 (
1100 size_stats.unencoded_byte_array_data_bytes,
1101 size_stats.repetition_level_histogram,
1102 size_stats.definition_level_histogram,
1103 )
1104 } else {
1105 (None, None, None)
1106 };
1107
1108 let repetition_level_histogram = repetition_level_histogram.map(LevelHistogram::from);
1109 let definition_level_histogram = definition_level_histogram.map(LevelHistogram::from);
1110
1111 let result = ColumnChunkMetaData {
1112 column_descr,
1113 encodings,
1114 file_path,
1115 file_offset,
1116 num_values,
1117 compression,
1118 total_compressed_size,
1119 total_uncompressed_size,
1120 data_page_offset,
1121 index_page_offset,
1122 dictionary_page_offset,
1123 statistics,
1124 encoding_stats,
1125 bloom_filter_offset,
1126 bloom_filter_length,
1127 offset_index_offset,
1128 offset_index_length,
1129 column_index_offset,
1130 column_index_length,
1131 unencoded_byte_array_data_bytes,
1132 repetition_level_histogram,
1133 definition_level_histogram,
1134 };
1135 Ok(result)
1136 }
1137
1138 pub fn to_thrift(&self) -> ColumnChunk {
1140 let column_metadata = self.to_column_metadata_thrift();
1141
1142 ColumnChunk {
1143 file_path: self.file_path().map(|s| s.to_owned()),
1144 file_offset: self.file_offset,
1145 meta_data: Some(column_metadata),
1146 offset_index_offset: self.offset_index_offset,
1147 offset_index_length: self.offset_index_length,
1148 column_index_offset: self.column_index_offset,
1149 column_index_length: self.column_index_length,
1150 crypto_metadata: None,
1151 encrypted_column_metadata: None,
1152 }
1153 }
1154
1155 pub fn to_column_metadata_thrift(&self) -> ColumnMetaData {
1157 let size_statistics = if self.unencoded_byte_array_data_bytes.is_some()
1158 || self.repetition_level_histogram.is_some()
1159 || self.definition_level_histogram.is_some()
1160 {
1161 let repetition_level_histogram = self
1162 .repetition_level_histogram
1163 .as_ref()
1164 .map(|hist| hist.clone().into_inner());
1165
1166 let definition_level_histogram = self
1167 .definition_level_histogram
1168 .as_ref()
1169 .map(|hist| hist.clone().into_inner());
1170
1171 Some(SizeStatistics {
1172 unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes,
1173 repetition_level_histogram,
1174 definition_level_histogram,
1175 })
1176 } else {
1177 None
1178 };
1179
1180 ColumnMetaData {
1181 type_: self.column_type().into(),
1182 encodings: self.encodings().iter().map(|&v| v.into()).collect(),
1183 path_in_schema: self.column_path().as_ref().to_vec(),
1184 codec: self.compression.into(),
1185 num_values: self.num_values,
1186 total_uncompressed_size: self.total_uncompressed_size,
1187 total_compressed_size: self.total_compressed_size,
1188 key_value_metadata: None,
1189 data_page_offset: self.data_page_offset,
1190 index_page_offset: self.index_page_offset,
1191 dictionary_page_offset: self.dictionary_page_offset,
1192 statistics: statistics::to_thrift(self.statistics.as_ref()),
1193 encoding_stats: self
1194 .encoding_stats
1195 .as_ref()
1196 .map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()),
1197 bloom_filter_offset: self.bloom_filter_offset,
1198 bloom_filter_length: self.bloom_filter_length,
1199 size_statistics,
1200 }
1201 }
1202
1203 pub fn into_builder(self) -> ColumnChunkMetaDataBuilder {
1205 ColumnChunkMetaDataBuilder::from(self)
1206 }
1207}
1208
1209pub struct ColumnChunkMetaDataBuilder(ColumnChunkMetaData);
1228
1229impl ColumnChunkMetaDataBuilder {
1230 fn new(column_descr: ColumnDescPtr) -> Self {
1234 Self(ColumnChunkMetaData {
1235 column_descr,
1236 encodings: Vec::new(),
1237 file_path: None,
1238 file_offset: 0,
1239 num_values: 0,
1240 compression: Compression::UNCOMPRESSED,
1241 total_compressed_size: 0,
1242 total_uncompressed_size: 0,
1243 data_page_offset: 0,
1244 index_page_offset: None,
1245 dictionary_page_offset: None,
1246 statistics: None,
1247 encoding_stats: None,
1248 bloom_filter_offset: None,
1249 bloom_filter_length: None,
1250 offset_index_offset: None,
1251 offset_index_length: None,
1252 column_index_offset: None,
1253 column_index_length: None,
1254 unencoded_byte_array_data_bytes: None,
1255 repetition_level_histogram: None,
1256 definition_level_histogram: None,
1257 })
1258 }
1259
1260 pub fn set_encodings(mut self, encodings: Vec<Encoding>) -> Self {
1262 self.0.encodings = encodings;
1263 self
1264 }
1265
1266 pub fn set_file_path(mut self, value: String) -> Self {
1268 self.0.file_path = Some(value);
1269 self
1270 }
1271
1272 #[deprecated(
1278 since = "53.0.0",
1279 note = "The Parquet specification requires this field to be 0"
1280 )]
1281 pub fn set_file_offset(mut self, value: i64) -> Self {
1282 self.0.file_offset = value;
1283 self
1284 }
1285
1286 pub fn set_num_values(mut self, value: i64) -> Self {
1288 self.0.num_values = value;
1289 self
1290 }
1291
1292 pub fn set_compression(mut self, value: Compression) -> Self {
1294 self.0.compression = value;
1295 self
1296 }
1297
1298 pub fn set_total_compressed_size(mut self, value: i64) -> Self {
1300 self.0.total_compressed_size = value;
1301 self
1302 }
1303
1304 pub fn set_total_uncompressed_size(mut self, value: i64) -> Self {
1306 self.0.total_uncompressed_size = value;
1307 self
1308 }
1309
1310 pub fn set_data_page_offset(mut self, value: i64) -> Self {
1312 self.0.data_page_offset = value;
1313 self
1314 }
1315
1316 pub fn set_dictionary_page_offset(mut self, value: Option<i64>) -> Self {
1318 self.0.dictionary_page_offset = value;
1319 self
1320 }
1321
1322 pub fn set_index_page_offset(mut self, value: Option<i64>) -> Self {
1324 self.0.index_page_offset = value;
1325 self
1326 }
1327
1328 pub fn set_statistics(mut self, value: Statistics) -> Self {
1330 self.0.statistics = Some(value);
1331 self
1332 }
1333
1334 pub fn clear_statistics(mut self) -> Self {
1336 self.0.statistics = None;
1337 self
1338 }
1339
1340 pub fn set_page_encoding_stats(mut self, value: Vec<PageEncodingStats>) -> Self {
1342 self.0.encoding_stats = Some(value);
1343 self
1344 }
1345
1346 pub fn clear_page_encoding_stats(mut self) -> Self {
1348 self.0.encoding_stats = None;
1349 self
1350 }
1351
1352 pub fn set_bloom_filter_offset(mut self, value: Option<i64>) -> Self {
1354 self.0.bloom_filter_offset = value;
1355 self
1356 }
1357
1358 pub fn set_bloom_filter_length(mut self, value: Option<i32>) -> Self {
1360 self.0.bloom_filter_length = value;
1361 self
1362 }
1363
1364 pub fn set_offset_index_offset(mut self, value: Option<i64>) -> Self {
1366 self.0.offset_index_offset = value;
1367 self
1368 }
1369
1370 pub fn set_offset_index_length(mut self, value: Option<i32>) -> Self {
1372 self.0.offset_index_length = value;
1373 self
1374 }
1375
1376 pub fn set_column_index_offset(mut self, value: Option<i64>) -> Self {
1378 self.0.column_index_offset = value;
1379 self
1380 }
1381
1382 pub fn set_column_index_length(mut self, value: Option<i32>) -> Self {
1384 self.0.column_index_length = value;
1385 self
1386 }
1387
1388 pub fn set_unencoded_byte_array_data_bytes(mut self, value: Option<i64>) -> Self {
1390 self.0.unencoded_byte_array_data_bytes = value;
1391 self
1392 }
1393
1394 pub fn set_repetition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self {
1396 self.0.repetition_level_histogram = value;
1397 self
1398 }
1399
1400 pub fn set_definition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self {
1402 self.0.definition_level_histogram = value;
1403 self
1404 }
1405
1406 pub fn build(self) -> Result<ColumnChunkMetaData> {
1408 Ok(self.0)
1409 }
1410}
1411
1412pub struct ColumnIndexBuilder {
1416 null_pages: Vec<bool>,
1417 min_values: Vec<Vec<u8>>,
1418 max_values: Vec<Vec<u8>>,
1419 null_counts: Vec<i64>,
1420 boundary_order: BoundaryOrder,
1421 repetition_level_histograms: Option<Vec<i64>>,
1423 definition_level_histograms: Option<Vec<i64>>,
1425 valid: bool,
1433}
1434
1435impl Default for ColumnIndexBuilder {
1436 fn default() -> Self {
1437 Self::new()
1438 }
1439}
1440
1441impl ColumnIndexBuilder {
1442 pub fn new() -> Self {
1444 ColumnIndexBuilder {
1445 null_pages: Vec::new(),
1446 min_values: Vec::new(),
1447 max_values: Vec::new(),
1448 null_counts: Vec::new(),
1449 boundary_order: BoundaryOrder::UNORDERED,
1450 repetition_level_histograms: None,
1451 definition_level_histograms: None,
1452 valid: true,
1453 }
1454 }
1455
1456 pub fn append(
1458 &mut self,
1459 null_page: bool,
1460 min_value: Vec<u8>,
1461 max_value: Vec<u8>,
1462 null_count: i64,
1463 ) {
1464 self.null_pages.push(null_page);
1465 self.min_values.push(min_value);
1466 self.max_values.push(max_value);
1467 self.null_counts.push(null_count);
1468 }
1469
1470 pub fn append_histograms(
1473 &mut self,
1474 repetition_level_histogram: &Option<LevelHistogram>,
1475 definition_level_histogram: &Option<LevelHistogram>,
1476 ) {
1477 if !self.valid {
1478 return;
1479 }
1480 if let Some(ref rep_lvl_hist) = repetition_level_histogram {
1481 let hist = self.repetition_level_histograms.get_or_insert(Vec::new());
1482 hist.reserve(rep_lvl_hist.len());
1483 hist.extend(rep_lvl_hist.values());
1484 }
1485 if let Some(ref def_lvl_hist) = definition_level_histogram {
1486 let hist = self.definition_level_histograms.get_or_insert(Vec::new());
1487 hist.reserve(def_lvl_hist.len());
1488 hist.extend(def_lvl_hist.values());
1489 }
1490 }
1491
1492 pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) {
1494 self.boundary_order = boundary_order;
1495 }
1496
1497 pub fn to_invalid(&mut self) {
1499 self.valid = false;
1500 }
1501
1502 pub fn valid(&self) -> bool {
1504 self.valid
1505 }
1506
1507 pub fn build_to_thrift(self) -> ColumnIndex {
1511 ColumnIndex::new(
1512 self.null_pages,
1513 self.min_values,
1514 self.max_values,
1515 self.boundary_order,
1516 self.null_counts,
1517 self.repetition_level_histograms,
1518 self.definition_level_histograms,
1519 )
1520 }
1521}
1522
1523impl From<ColumnChunkMetaData> for ColumnChunkMetaDataBuilder {
1524 fn from(value: ColumnChunkMetaData) -> Self {
1525 ColumnChunkMetaDataBuilder(value)
1526 }
1527}
1528
1529pub struct OffsetIndexBuilder {
1533 offset_array: Vec<i64>,
1534 compressed_page_size_array: Vec<i32>,
1535 first_row_index_array: Vec<i64>,
1536 unencoded_byte_array_data_bytes_array: Option<Vec<i64>>,
1537 current_first_row_index: i64,
1538}
1539
1540impl Default for OffsetIndexBuilder {
1541 fn default() -> Self {
1542 Self::new()
1543 }
1544}
1545
1546impl OffsetIndexBuilder {
1547 pub fn new() -> Self {
1549 OffsetIndexBuilder {
1550 offset_array: Vec::new(),
1551 compressed_page_size_array: Vec::new(),
1552 first_row_index_array: Vec::new(),
1553 unencoded_byte_array_data_bytes_array: None,
1554 current_first_row_index: 0,
1555 }
1556 }
1557
1558 pub fn append_row_count(&mut self, row_count: i64) {
1560 let current_page_row_index = self.current_first_row_index;
1561 self.first_row_index_array.push(current_page_row_index);
1562 self.current_first_row_index += row_count;
1563 }
1564
1565 pub fn append_offset_and_size(&mut self, offset: i64, compressed_page_size: i32) {
1567 self.offset_array.push(offset);
1568 self.compressed_page_size_array.push(compressed_page_size);
1569 }
1570
1571 pub fn append_unencoded_byte_array_data_bytes(
1573 &mut self,
1574 unencoded_byte_array_data_bytes: Option<i64>,
1575 ) {
1576 if let Some(val) = unencoded_byte_array_data_bytes {
1577 self.unencoded_byte_array_data_bytes_array
1578 .get_or_insert(Vec::new())
1579 .push(val);
1580 }
1581 }
1582
1583 pub fn build_to_thrift(self) -> OffsetIndex {
1585 let locations = self
1586 .offset_array
1587 .iter()
1588 .zip(self.compressed_page_size_array.iter())
1589 .zip(self.first_row_index_array.iter())
1590 .map(|((offset, size), row_index)| PageLocation::new(*offset, *size, *row_index))
1591 .collect::<Vec<_>>();
1592 OffsetIndex::new(locations, self.unencoded_byte_array_data_bytes_array)
1593 }
1594}
1595
1596#[cfg(test)]
1597mod tests {
1598 use super::*;
1599 use crate::basic::{PageType, SortOrder};
1600 use crate::file::page_index::index::NativeIndex;
1601
1602 #[test]
1603 fn test_row_group_metadata_thrift_conversion() {
1604 let schema_descr = get_test_schema_descr();
1605
1606 let mut columns = vec![];
1607 for ptr in schema_descr.columns() {
1608 let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
1609 columns.push(column);
1610 }
1611 let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
1612 .set_num_rows(1000)
1613 .set_total_byte_size(2000)
1614 .set_column_metadata(columns)
1615 .set_ordinal(1)
1616 .build()
1617 .unwrap();
1618
1619 let row_group_exp = row_group_meta.to_thrift();
1620 let row_group_res = RowGroupMetaData::from_thrift(schema_descr, row_group_exp.clone())
1621 .unwrap()
1622 .to_thrift();
1623
1624 assert_eq!(row_group_res, row_group_exp);
1625 }
1626
1627 #[test]
1628 fn test_row_group_metadata_thrift_conversion_empty() {
1629 let schema_descr = get_test_schema_descr();
1630
1631 let row_group_meta = RowGroupMetaData::builder(schema_descr).build();
1632
1633 assert!(row_group_meta.is_err());
1634 if let Err(e) = row_group_meta {
1635 assert_eq!(
1636 format!("{e}"),
1637 "Parquet error: Column length mismatch: 2 != 0"
1638 );
1639 }
1640 }
1641
1642 #[test]
1644 fn test_row_group_metadata_thrift_corrupted() {
1645 let schema_descr_2cols = Arc::new(SchemaDescriptor::new(Arc::new(
1646 SchemaType::group_type_builder("schema")
1647 .with_fields(vec![
1648 Arc::new(
1649 SchemaType::primitive_type_builder("a", Type::INT32)
1650 .build()
1651 .unwrap(),
1652 ),
1653 Arc::new(
1654 SchemaType::primitive_type_builder("b", Type::INT32)
1655 .build()
1656 .unwrap(),
1657 ),
1658 ])
1659 .build()
1660 .unwrap(),
1661 )));
1662
1663 let schema_descr_3cols = Arc::new(SchemaDescriptor::new(Arc::new(
1664 SchemaType::group_type_builder("schema")
1665 .with_fields(vec![
1666 Arc::new(
1667 SchemaType::primitive_type_builder("a", Type::INT32)
1668 .build()
1669 .unwrap(),
1670 ),
1671 Arc::new(
1672 SchemaType::primitive_type_builder("b", Type::INT32)
1673 .build()
1674 .unwrap(),
1675 ),
1676 Arc::new(
1677 SchemaType::primitive_type_builder("c", Type::INT32)
1678 .build()
1679 .unwrap(),
1680 ),
1681 ])
1682 .build()
1683 .unwrap(),
1684 )));
1685
1686 let row_group_meta_2cols = RowGroupMetaData::builder(schema_descr_2cols.clone())
1687 .set_num_rows(1000)
1688 .set_total_byte_size(2000)
1689 .set_column_metadata(vec![
1690 ColumnChunkMetaData::builder(schema_descr_2cols.column(0))
1691 .build()
1692 .unwrap(),
1693 ColumnChunkMetaData::builder(schema_descr_2cols.column(1))
1694 .build()
1695 .unwrap(),
1696 ])
1697 .set_ordinal(1)
1698 .build()
1699 .unwrap();
1700
1701 let err =
1702 RowGroupMetaData::from_thrift(schema_descr_3cols, row_group_meta_2cols.to_thrift())
1703 .unwrap_err()
1704 .to_string();
1705 assert_eq!(
1706 err,
1707 "Parquet error: Column count mismatch. Schema has 3 columns while Row Group has 2"
1708 );
1709 }
1710
1711 #[test]
1712 fn test_column_chunk_metadata_thrift_conversion() {
1713 let column_descr = get_test_schema_descr().column(0);
1714
1715 let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
1716 .set_encodings(vec![Encoding::PLAIN, Encoding::RLE])
1717 .set_file_path("file_path".to_owned())
1718 .set_num_values(1000)
1719 .set_compression(Compression::SNAPPY)
1720 .set_total_compressed_size(2000)
1721 .set_total_uncompressed_size(3000)
1722 .set_data_page_offset(4000)
1723 .set_dictionary_page_offset(Some(5000))
1724 .set_page_encoding_stats(vec![
1725 PageEncodingStats {
1726 page_type: PageType::DATA_PAGE,
1727 encoding: Encoding::PLAIN,
1728 count: 3,
1729 },
1730 PageEncodingStats {
1731 page_type: PageType::DATA_PAGE,
1732 encoding: Encoding::RLE,
1733 count: 5,
1734 },
1735 ])
1736 .set_bloom_filter_offset(Some(6000))
1737 .set_bloom_filter_length(Some(25))
1738 .set_offset_index_offset(Some(7000))
1739 .set_offset_index_length(Some(25))
1740 .set_column_index_offset(Some(8000))
1741 .set_column_index_length(Some(25))
1742 .set_unencoded_byte_array_data_bytes(Some(2000))
1743 .set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100])))
1744 .set_definition_level_histogram(Some(LevelHistogram::from(vec![0, 200])))
1745 .build()
1746 .unwrap();
1747
1748 let col_chunk_res =
1749 ColumnChunkMetaData::from_thrift(column_descr, col_metadata.to_thrift()).unwrap();
1750
1751 assert_eq!(col_chunk_res, col_metadata);
1752 }
1753
1754 #[test]
1755 fn test_column_chunk_metadata_thrift_conversion_empty() {
1756 let column_descr = get_test_schema_descr().column(0);
1757
1758 let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
1759 .build()
1760 .unwrap();
1761
1762 let col_chunk_exp = col_metadata.to_thrift();
1763 let col_chunk_res = ColumnChunkMetaData::from_thrift(column_descr, col_chunk_exp.clone())
1764 .unwrap()
1765 .to_thrift();
1766
1767 assert_eq!(col_chunk_res, col_chunk_exp);
1768 }
1769
1770 #[test]
1771 fn test_compressed_size() {
1772 let schema_descr = get_test_schema_descr();
1773
1774 let mut columns = vec![];
1775 for column_descr in schema_descr.columns() {
1776 let column = ColumnChunkMetaData::builder(column_descr.clone())
1777 .set_total_compressed_size(500)
1778 .set_total_uncompressed_size(700)
1779 .build()
1780 .unwrap();
1781 columns.push(column);
1782 }
1783 let row_group_meta = RowGroupMetaData::builder(schema_descr)
1784 .set_num_rows(1000)
1785 .set_column_metadata(columns)
1786 .build()
1787 .unwrap();
1788
1789 let compressed_size_res: i64 = row_group_meta.compressed_size();
1790 let compressed_size_exp: i64 = 1000;
1791
1792 assert_eq!(compressed_size_res, compressed_size_exp);
1793 }
1794
1795 #[test]
1796 fn test_memory_size() {
1797 let schema_descr = get_test_schema_descr();
1798
1799 let columns = schema_descr
1800 .columns()
1801 .iter()
1802 .map(|column_descr| {
1803 ColumnChunkMetaData::builder(column_descr.clone())
1804 .set_statistics(Statistics::new::<i32>(None, None, None, None, false))
1805 .build()
1806 })
1807 .collect::<Result<Vec<_>>>()
1808 .unwrap();
1809 let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
1810 .set_num_rows(1000)
1811 .set_column_metadata(columns)
1812 .build()
1813 .unwrap();
1814 let row_group_meta = vec![row_group_meta];
1815
1816 let version = 2;
1817 let num_rows = 1000;
1818 let created_by = Some(String::from("test harness"));
1819 let key_value_metadata = Some(vec![KeyValue::new(
1820 String::from("Foo"),
1821 Some(String::from("bar")),
1822 )]);
1823 let column_orders = Some(vec![
1824 ColumnOrder::UNDEFINED,
1825 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1826 ]);
1827 let file_metadata = FileMetaData::new(
1828 version,
1829 num_rows,
1830 created_by,
1831 key_value_metadata,
1832 schema_descr.clone(),
1833 column_orders,
1834 );
1835
1836 let columns_with_stats = schema_descr
1838 .columns()
1839 .iter()
1840 .map(|column_descr| {
1841 ColumnChunkMetaData::builder(column_descr.clone())
1842 .set_statistics(Statistics::new::<i32>(
1843 Some(0),
1844 Some(100),
1845 None,
1846 None,
1847 false,
1848 ))
1849 .build()
1850 })
1851 .collect::<Result<Vec<_>>>()
1852 .unwrap();
1853
1854 let row_group_meta_with_stats = RowGroupMetaData::builder(schema_descr)
1855 .set_num_rows(1000)
1856 .set_column_metadata(columns_with_stats)
1857 .build()
1858 .unwrap();
1859 let row_group_meta_with_stats = vec![row_group_meta_with_stats];
1860
1861 let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone())
1862 .set_row_groups(row_group_meta_with_stats)
1863 .build();
1864 let base_expected_size = 2312;
1865
1866 assert_eq!(parquet_meta.memory_size(), base_expected_size);
1867
1868 let mut column_index = ColumnIndexBuilder::new();
1869 column_index.append(false, vec![1u8], vec![2u8, 3u8], 4);
1870 let column_index = column_index.build_to_thrift();
1871 let native_index = NativeIndex::<bool>::try_new(column_index).unwrap();
1872
1873 let mut offset_index = OffsetIndexBuilder::new();
1875 offset_index.append_row_count(1);
1876 offset_index.append_offset_and_size(2, 3);
1877 offset_index.append_unencoded_byte_array_data_bytes(Some(10));
1878 offset_index.append_row_count(1);
1879 offset_index.append_offset_and_size(2, 3);
1880 offset_index.append_unencoded_byte_array_data_bytes(Some(10));
1881 let offset_index = offset_index.build_to_thrift();
1882
1883 let parquet_meta = ParquetMetaDataBuilder::new(file_metadata)
1884 .set_row_groups(row_group_meta)
1885 .set_column_index(Some(vec![vec![Index::BOOLEAN(native_index)]]))
1886 .set_offset_index(Some(vec![vec![
1887 OffsetIndexMetaData::try_new(offset_index).unwrap()
1888 ]]))
1889 .build();
1890
1891 let bigger_expected_size = 2816;
1892 assert!(bigger_expected_size > base_expected_size);
1894 assert_eq!(parquet_meta.memory_size(), bigger_expected_size);
1895 }
1896
1897 fn get_test_schema_descr() -> SchemaDescPtr {
1899 let schema = SchemaType::group_type_builder("schema")
1900 .with_fields(vec![
1901 Arc::new(
1902 SchemaType::primitive_type_builder("a", Type::INT32)
1903 .build()
1904 .unwrap(),
1905 ),
1906 Arc::new(
1907 SchemaType::primitive_type_builder("b", Type::INT32)
1908 .build()
1909 .unwrap(),
1910 ),
1911 ])
1912 .build()
1913 .unwrap();
1914
1915 Arc::new(SchemaDescriptor::new(Arc::new(schema)))
1916 }
1917}