1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35use crate::errors::{ParquetError, Result};
36use crate::file::metadata::{ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder};
37use crate::file::properties::EnabledStatistics;
38use crate::file::statistics::{Statistics, ValueStatistics};
39use crate::file::{
40 metadata::ColumnChunkMetaData,
41 properties::{WriterProperties, WriterPropertiesPtr, WriterVersion},
42};
43use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
44
45pub(crate) mod encoder;
46
47macro_rules! downcast_writer {
48 ($e:expr, $i:ident, $b:expr) => {
49 match $e {
50 Self::BoolColumnWriter($i) => $b,
51 Self::Int32ColumnWriter($i) => $b,
52 Self::Int64ColumnWriter($i) => $b,
53 Self::Int96ColumnWriter($i) => $b,
54 Self::FloatColumnWriter($i) => $b,
55 Self::DoubleColumnWriter($i) => $b,
56 Self::ByteArrayColumnWriter($i) => $b,
57 Self::FixedLenByteArrayColumnWriter($i) => $b,
58 }
59 };
60}
61
62pub enum ColumnWriter<'a> {
64 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
66 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
68 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
70 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
72 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
74 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
76 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
78 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
80}
81
82impl ColumnWriter<'_> {
83 #[cfg(feature = "arrow")]
85 pub(crate) fn memory_size(&self) -> usize {
86 downcast_writer!(self, typed, typed.memory_size())
87 }
88
89 #[cfg(feature = "arrow")]
91 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
92 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
93 }
94
95 pub fn close(self) -> Result<ColumnCloseResult> {
97 downcast_writer!(self, typed, typed.close())
98 }
99}
100
101#[deprecated(
102 since = "54.0.0",
103 note = "Seems like a stray and nobody knows what's it for. Will be removed in the next release."
104)]
105#[allow(missing_docs)]
106pub enum Level {
107 Page,
108 Column,
109}
110
111pub fn get_column_writer<'a>(
113 descr: ColumnDescPtr,
114 props: WriterPropertiesPtr,
115 page_writer: Box<dyn PageWriter + 'a>,
116) -> ColumnWriter<'a> {
117 match descr.physical_type() {
118 Type::BOOLEAN => {
119 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
120 }
121 Type::INT32 => {
122 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
123 }
124 Type::INT64 => {
125 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
126 }
127 Type::INT96 => {
128 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
129 }
130 Type::FLOAT => {
131 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
132 }
133 Type::DOUBLE => {
134 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
135 }
136 Type::BYTE_ARRAY => {
137 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
138 }
139 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
140 ColumnWriterImpl::new(descr, props, page_writer),
141 ),
142 }
143}
144
145pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
150 T::get_column_writer(col_writer).unwrap_or_else(|| {
151 panic!(
152 "Failed to convert column writer into a typed column writer for `{}` type",
153 T::get_physical_type()
154 )
155 })
156}
157
158pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
160 col_writer: &'b ColumnWriter<'a>,
161) -> &'b ColumnWriterImpl<'a, T> {
162 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
163 panic!(
164 "Failed to convert column writer into a typed column writer for `{}` type",
165 T::get_physical_type()
166 )
167 })
168}
169
170pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
172 col_writer: &'a mut ColumnWriter<'b>,
173) -> &'a mut ColumnWriterImpl<'b, T> {
174 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
175 panic!(
176 "Failed to convert column writer into a typed column writer for `{}` type",
177 T::get_physical_type()
178 )
179 })
180}
181
182#[derive(Debug, Clone)]
184pub struct ColumnCloseResult {
185 pub bytes_written: u64,
187 pub rows_written: u64,
189 pub metadata: ColumnChunkMetaData,
191 pub bloom_filter: Option<Sbbf>,
193 pub column_index: Option<ColumnIndex>,
195 pub offset_index: Option<OffsetIndex>,
197}
198
199#[derive(Default)]
201struct PageMetrics {
202 num_buffered_values: u32,
203 num_buffered_rows: u32,
204 num_page_nulls: u64,
205 repetition_level_histogram: Option<LevelHistogram>,
206 definition_level_histogram: Option<LevelHistogram>,
207}
208
209impl PageMetrics {
210 fn new() -> Self {
211 Default::default()
212 }
213
214 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
216 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
217 self
218 }
219
220 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
222 self.definition_level_histogram = LevelHistogram::try_new(max_level);
223 self
224 }
225
226 fn new_page(&mut self) {
229 self.num_buffered_values = 0;
230 self.num_buffered_rows = 0;
231 self.num_page_nulls = 0;
232 self.repetition_level_histogram
233 .as_mut()
234 .map(LevelHistogram::reset);
235 self.definition_level_histogram
236 .as_mut()
237 .map(LevelHistogram::reset);
238 }
239
240 fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
242 if let Some(ref mut rep_hist) = self.repetition_level_histogram {
243 rep_hist.update_from_levels(levels);
244 }
245 }
246
247 fn update_definition_level_histogram(&mut self, levels: &[i16]) {
249 if let Some(ref mut def_hist) = self.definition_level_histogram {
250 def_hist.update_from_levels(levels);
251 }
252 }
253}
254
255#[derive(Default)]
257struct ColumnMetrics<T: Default> {
258 total_bytes_written: u64,
259 total_rows_written: u64,
260 total_uncompressed_size: u64,
261 total_compressed_size: u64,
262 total_num_values: u64,
263 dictionary_page_offset: Option<u64>,
264 data_page_offset: Option<u64>,
265 min_column_value: Option<T>,
266 max_column_value: Option<T>,
267 num_column_nulls: u64,
268 column_distinct_count: Option<u64>,
269 variable_length_bytes: Option<i64>,
270 repetition_level_histogram: Option<LevelHistogram>,
271 definition_level_histogram: Option<LevelHistogram>,
272}
273
274impl<T: Default> ColumnMetrics<T> {
275 fn new() -> Self {
276 Default::default()
277 }
278
279 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
281 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
282 self
283 }
284
285 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
287 self.definition_level_histogram = LevelHistogram::try_new(max_level);
288 self
289 }
290
291 fn update_histogram(
293 chunk_histogram: &mut Option<LevelHistogram>,
294 page_histogram: &Option<LevelHistogram>,
295 ) {
296 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
297 chunk_hist.add(page_hist);
298 }
299 }
300
301 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
304 ColumnMetrics::<T>::update_histogram(
305 &mut self.definition_level_histogram,
306 &page_metrics.definition_level_histogram,
307 );
308 ColumnMetrics::<T>::update_histogram(
309 &mut self.repetition_level_histogram,
310 &page_metrics.repetition_level_histogram,
311 );
312 }
313
314 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
316 if let Some(var_bytes) = variable_length_bytes {
317 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
318 }
319 }
320}
321
322pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
324
325pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
327 descr: ColumnDescPtr,
329 props: WriterPropertiesPtr,
330 statistics_enabled: EnabledStatistics,
331
332 page_writer: Box<dyn PageWriter + 'a>,
333 codec: Compression,
334 compressor: Option<Box<dyn Codec>>,
335 encoder: E,
336
337 page_metrics: PageMetrics,
338 column_metrics: ColumnMetrics<E::T>,
340
341 encodings: BTreeSet<Encoding>,
344 def_levels_sink: Vec<i16>,
346 rep_levels_sink: Vec<i16>,
347 data_pages: VecDeque<CompressedPage>,
348 column_index_builder: ColumnIndexBuilder,
350 offset_index_builder: OffsetIndexBuilder,
351
352 data_page_boundary_ascending: bool,
355 data_page_boundary_descending: bool,
356 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
358}
359
360impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
361 pub fn new(
363 descr: ColumnDescPtr,
364 props: WriterPropertiesPtr,
365 page_writer: Box<dyn PageWriter + 'a>,
366 ) -> Self {
367 let codec = props.compression(descr.path());
368 let codec_options = CodecOptionsBuilder::default().build();
369 let compressor = create_codec(codec, &codec_options).unwrap();
370 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
371
372 let statistics_enabled = props.statistics_enabled(descr.path());
373
374 let mut encodings = BTreeSet::new();
375 encodings.insert(Encoding::RLE);
377
378 let mut page_metrics = PageMetrics::new();
379 let mut column_metrics = ColumnMetrics::<E::T>::new();
380
381 if statistics_enabled != EnabledStatistics::None {
383 page_metrics = page_metrics
384 .with_repetition_level_histogram(descr.max_rep_level())
385 .with_definition_level_histogram(descr.max_def_level());
386 column_metrics = column_metrics
387 .with_repetition_level_histogram(descr.max_rep_level())
388 .with_definition_level_histogram(descr.max_def_level())
389 }
390
391 let mut column_index_builder = ColumnIndexBuilder::new();
393 if statistics_enabled != EnabledStatistics::Page {
394 column_index_builder.to_invalid()
395 }
396
397 Self {
398 descr,
399 props,
400 statistics_enabled,
401 page_writer,
402 codec,
403 compressor,
404 encoder,
405 def_levels_sink: vec![],
406 rep_levels_sink: vec![],
407 data_pages: VecDeque::new(),
408 page_metrics,
409 column_metrics,
410 column_index_builder,
411 offset_index_builder: OffsetIndexBuilder::new(),
412 encodings,
413 data_page_boundary_ascending: true,
414 data_page_boundary_descending: true,
415 last_non_null_data_page_min_max: None,
416 }
417 }
418
419 #[allow(clippy::too_many_arguments)]
420 pub(crate) fn write_batch_internal(
421 &mut self,
422 values: &E::Values,
423 value_indices: Option<&[usize]>,
424 def_levels: Option<&[i16]>,
425 rep_levels: Option<&[i16]>,
426 min: Option<&E::T>,
427 max: Option<&E::T>,
428 distinct_count: Option<u64>,
429 ) -> Result<usize> {
430 if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
432 if def.len() != rep.len() {
433 return Err(general_err!(
434 "Inconsistent length of definition and repetition levels: {} != {}",
435 def.len(),
436 rep.len()
437 ));
438 }
439 }
440
441 let num_levels = match def_levels {
452 Some(def_levels) => def_levels.len(),
453 None => values.len(),
454 };
455
456 if let Some(min) = min {
457 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
458 }
459 if let Some(max) = max {
460 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
461 }
462
463 if self.encoder.num_values() == 0 {
465 self.column_metrics.column_distinct_count = distinct_count;
466 } else {
467 self.column_metrics.column_distinct_count = None;
468 }
469
470 let mut values_offset = 0;
471 let mut levels_offset = 0;
472 let base_batch_size = self.props.write_batch_size();
473 while levels_offset < num_levels {
474 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
475
476 if let Some(r) = rep_levels {
478 while end_offset < r.len() && r[end_offset] != 0 {
479 end_offset += 1;
480 }
481 }
482
483 values_offset += self.write_mini_batch(
484 values,
485 values_offset,
486 value_indices,
487 end_offset - levels_offset,
488 def_levels.map(|lv| &lv[levels_offset..end_offset]),
489 rep_levels.map(|lv| &lv[levels_offset..end_offset]),
490 )?;
491 levels_offset = end_offset;
492 }
493
494 Ok(values_offset)
496 }
497
498 pub fn write_batch(
511 &mut self,
512 values: &E::Values,
513 def_levels: Option<&[i16]>,
514 rep_levels: Option<&[i16]>,
515 ) -> Result<usize> {
516 self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
517 }
518
519 pub fn write_batch_with_statistics(
527 &mut self,
528 values: &E::Values,
529 def_levels: Option<&[i16]>,
530 rep_levels: Option<&[i16]>,
531 min: Option<&E::T>,
532 max: Option<&E::T>,
533 distinct_count: Option<u64>,
534 ) -> Result<usize> {
535 self.write_batch_internal(
536 values,
537 None,
538 def_levels,
539 rep_levels,
540 min,
541 max,
542 distinct_count,
543 )
544 }
545
546 #[cfg(feature = "arrow")]
551 pub(crate) fn memory_size(&self) -> usize {
552 self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
553 }
554
555 pub fn get_total_bytes_written(&self) -> u64 {
561 self.column_metrics.total_bytes_written
562 }
563
564 #[cfg(feature = "arrow")]
570 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
571 self.column_metrics.total_bytes_written
572 + self.encoder.estimated_data_page_size() as u64
573 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
574 }
575
576 pub fn get_total_rows_written(&self) -> u64 {
579 self.column_metrics.total_rows_written
580 }
581
582 pub fn get_descriptor(&self) -> &ColumnDescPtr {
584 &self.descr
585 }
586
587 pub fn close(mut self) -> Result<ColumnCloseResult> {
590 if self.page_metrics.num_buffered_values > 0 {
591 self.add_data_page()?;
592 }
593 if self.encoder.has_dictionary() {
594 self.write_dictionary_page()?;
595 }
596 self.flush_data_pages()?;
597 let metadata = self.build_column_metadata()?;
598 self.page_writer.close()?;
599
600 let boundary_order = match (
601 self.data_page_boundary_ascending,
602 self.data_page_boundary_descending,
603 ) {
604 (true, _) => BoundaryOrder::ASCENDING,
607 (false, true) => BoundaryOrder::DESCENDING,
608 (false, false) => BoundaryOrder::UNORDERED,
609 };
610 self.column_index_builder.set_boundary_order(boundary_order);
611
612 let column_index = self
613 .column_index_builder
614 .valid()
615 .then(|| self.column_index_builder.build_to_thrift());
616 let offset_index = Some(self.offset_index_builder.build_to_thrift());
617
618 Ok(ColumnCloseResult {
619 bytes_written: self.column_metrics.total_bytes_written,
620 rows_written: self.column_metrics.total_rows_written,
621 bloom_filter: self.encoder.flush_bloom_filter(),
622 metadata,
623 column_index,
624 offset_index,
625 })
626 }
627
628 fn write_mini_batch(
632 &mut self,
633 values: &E::Values,
634 values_offset: usize,
635 value_indices: Option<&[usize]>,
636 num_levels: usize,
637 def_levels: Option<&[i16]>,
638 rep_levels: Option<&[i16]>,
639 ) -> Result<usize> {
640 let values_to_write = if self.descr.max_def_level() > 0 {
642 let levels = def_levels.ok_or_else(|| {
643 general_err!(
644 "Definition levels are required, because max definition level = {}",
645 self.descr.max_def_level()
646 )
647 })?;
648
649 let mut values_to_write = 0;
650 for &level in levels {
651 if level == self.descr.max_def_level() {
652 values_to_write += 1;
653 } else {
654 self.page_metrics.num_page_nulls += 1
656 }
657 }
658
659 self.page_metrics.update_definition_level_histogram(levels);
661
662 self.def_levels_sink.extend_from_slice(levels);
663 values_to_write
664 } else {
665 num_levels
666 };
667
668 if self.descr.max_rep_level() > 0 {
670 let levels = rep_levels.ok_or_else(|| {
672 general_err!(
673 "Repetition levels are required, because max repetition level = {}",
674 self.descr.max_rep_level()
675 )
676 })?;
677
678 if !levels.is_empty() && levels[0] != 0 {
679 return Err(general_err!(
680 "Write must start at a record boundary, got non-zero repetition level of {}",
681 levels[0]
682 ));
683 }
684
685 for &level in levels {
687 self.page_metrics.num_buffered_rows += (level == 0) as u32
688 }
689
690 self.page_metrics.update_repetition_level_histogram(levels);
692
693 self.rep_levels_sink.extend_from_slice(levels);
694 } else {
695 self.page_metrics.num_buffered_rows += num_levels as u32;
698 }
699
700 match value_indices {
701 Some(indices) => {
702 let indices = &indices[values_offset..values_offset + values_to_write];
703 self.encoder.write_gather(values, indices)?;
704 }
705 None => self.encoder.write(values, values_offset, values_to_write)?,
706 }
707
708 self.page_metrics.num_buffered_values += num_levels as u32;
709
710 if self.should_add_data_page() {
711 self.add_data_page()?;
712 }
713
714 if self.should_dict_fallback() {
715 self.dict_fallback()?;
716 }
717
718 Ok(values_to_write)
719 }
720
721 #[inline]
726 fn should_dict_fallback(&self) -> bool {
727 match self.encoder.estimated_dict_page_size() {
728 Some(size) => size >= self.props.dictionary_page_size_limit(),
729 None => false,
730 }
731 }
732
733 #[inline]
735 fn should_add_data_page(&self) -> bool {
736 if self.page_metrics.num_buffered_values == 0 {
741 return false;
742 }
743
744 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
745 || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
746 }
747
748 fn dict_fallback(&mut self) -> Result<()> {
751 if self.page_metrics.num_buffered_values > 0 {
753 self.add_data_page()?;
754 }
755 self.write_dictionary_page()?;
756 self.flush_data_pages()?;
757 Ok(())
758 }
759
760 fn update_column_offset_index(
762 &mut self,
763 page_statistics: Option<&ValueStatistics<E::T>>,
764 page_variable_length_bytes: Option<i64>,
765 ) {
766 let null_page =
768 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
769 if null_page && self.column_index_builder.valid() {
772 self.column_index_builder.append(
773 null_page,
774 vec![],
775 vec![],
776 self.page_metrics.num_page_nulls as i64,
777 );
778 } else if self.column_index_builder.valid() {
779 match &page_statistics {
782 None => {
783 self.column_index_builder.to_invalid();
784 }
785 Some(stat) => {
786 let new_min = stat.min_opt().unwrap();
788 let new_max = stat.max_opt().unwrap();
789 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
790 if self.data_page_boundary_ascending {
791 let not_ascending = compare_greater(&self.descr, last_min, new_min)
793 || compare_greater(&self.descr, last_max, new_max);
794 if not_ascending {
795 self.data_page_boundary_ascending = false;
796 }
797 }
798
799 if self.data_page_boundary_descending {
800 let not_descending = compare_greater(&self.descr, new_min, last_min)
802 || compare_greater(&self.descr, new_max, last_max);
803 if not_descending {
804 self.data_page_boundary_descending = false;
805 }
806 }
807 }
808 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
809
810 if self.can_truncate_value() {
811 self.column_index_builder.append(
812 null_page,
813 self.truncate_min_value(
814 self.props.column_index_truncate_length(),
815 stat.min_bytes_opt().unwrap(),
816 )
817 .0,
818 self.truncate_max_value(
819 self.props.column_index_truncate_length(),
820 stat.max_bytes_opt().unwrap(),
821 )
822 .0,
823 self.page_metrics.num_page_nulls as i64,
824 );
825 } else {
826 self.column_index_builder.append(
827 null_page,
828 stat.min_bytes_opt().unwrap().to_vec(),
829 stat.max_bytes_opt().unwrap().to_vec(),
830 self.page_metrics.num_page_nulls as i64,
831 );
832 }
833 }
834 }
835 }
836
837 self.column_index_builder.append_histograms(
839 &self.page_metrics.repetition_level_histogram,
840 &self.page_metrics.definition_level_histogram,
841 );
842
843 self.offset_index_builder
845 .append_row_count(self.page_metrics.num_buffered_rows as i64);
846
847 self.offset_index_builder
848 .append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
849 }
850
851 fn can_truncate_value(&self) -> bool {
853 match self.descr.physical_type() {
854 Type::FIXED_LEN_BYTE_ARRAY
858 if !matches!(
859 self.descr.logical_type(),
860 Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
861 ) =>
862 {
863 true
864 }
865 Type::BYTE_ARRAY => true,
866 _ => false,
868 }
869 }
870
871 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
872 truncation_length
873 .filter(|l| data.len() > *l)
874 .and_then(|l| match str::from_utf8(data) {
875 Ok(str_data) => truncate_utf8(str_data, l),
876 Err(_) => Some(data[..l].to_vec()),
877 })
878 .map(|truncated| (truncated, true))
879 .unwrap_or_else(|| (data.to_vec(), false))
880 }
881
882 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
883 truncation_length
884 .filter(|l| data.len() > *l)
885 .and_then(|l| match str::from_utf8(data) {
886 Ok(str_data) => truncate_utf8(str_data, l).and_then(increment_utf8),
887 Err(_) => increment(data[..l].to_vec()),
888 })
889 .map(|truncated| (truncated, true))
890 .unwrap_or_else(|| (data.to_vec(), false))
891 }
892
893 fn add_data_page(&mut self) -> Result<()> {
896 let values_data = self.encoder.flush_data_page()?;
898
899 let max_def_level = self.descr.max_def_level();
900 let max_rep_level = self.descr.max_rep_level();
901
902 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
903
904 let page_statistics = match (values_data.min_value, values_data.max_value) {
905 (Some(min), Some(max)) => {
906 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
908 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
909
910 (self.statistics_enabled == EnabledStatistics::Page).then_some(
911 ValueStatistics::new(
912 Some(min),
913 Some(max),
914 None,
915 Some(self.page_metrics.num_page_nulls),
916 false,
917 ),
918 )
919 }
920 _ => None,
921 };
922
923 self.update_column_offset_index(
925 page_statistics.as_ref(),
926 values_data.variable_length_bytes,
927 );
928
929 self.column_metrics
931 .update_from_page_metrics(&self.page_metrics);
932 self.column_metrics
933 .update_variable_length_bytes(values_data.variable_length_bytes);
934
935 let page_statistics = page_statistics.map(Statistics::from);
936
937 let compressed_page = match self.props.writer_version() {
938 WriterVersion::PARQUET_1_0 => {
939 let mut buffer = vec![];
940
941 if max_rep_level > 0 {
942 buffer.extend_from_slice(
943 &self.encode_levels_v1(
944 Encoding::RLE,
945 &self.rep_levels_sink[..],
946 max_rep_level,
947 )[..],
948 );
949 }
950
951 if max_def_level > 0 {
952 buffer.extend_from_slice(
953 &self.encode_levels_v1(
954 Encoding::RLE,
955 &self.def_levels_sink[..],
956 max_def_level,
957 )[..],
958 );
959 }
960
961 buffer.extend_from_slice(&values_data.buf);
962 let uncompressed_size = buffer.len();
963
964 if let Some(ref mut cmpr) = self.compressor {
965 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
966 cmpr.compress(&buffer[..], &mut compressed_buf)?;
967 buffer = compressed_buf;
968 }
969
970 let data_page = Page::DataPage {
971 buf: buffer.into(),
972 num_values: self.page_metrics.num_buffered_values,
973 encoding: values_data.encoding,
974 def_level_encoding: Encoding::RLE,
975 rep_level_encoding: Encoding::RLE,
976 statistics: page_statistics,
977 };
978
979 CompressedPage::new(data_page, uncompressed_size)
980 }
981 WriterVersion::PARQUET_2_0 => {
982 let mut rep_levels_byte_len = 0;
983 let mut def_levels_byte_len = 0;
984 let mut buffer = vec![];
985
986 if max_rep_level > 0 {
987 let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
988 rep_levels_byte_len = levels.len();
989 buffer.extend_from_slice(&levels[..]);
990 }
991
992 if max_def_level > 0 {
993 let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
994 def_levels_byte_len = levels.len();
995 buffer.extend_from_slice(&levels[..]);
996 }
997
998 let uncompressed_size =
999 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1000
1001 match self.compressor {
1003 Some(ref mut cmpr) => {
1004 cmpr.compress(&values_data.buf, &mut buffer)?;
1005 }
1006 None => buffer.extend_from_slice(&values_data.buf),
1007 }
1008
1009 let data_page = Page::DataPageV2 {
1010 buf: buffer.into(),
1011 num_values: self.page_metrics.num_buffered_values,
1012 encoding: values_data.encoding,
1013 num_nulls: self.page_metrics.num_page_nulls as u32,
1014 num_rows: self.page_metrics.num_buffered_rows,
1015 def_levels_byte_len: def_levels_byte_len as u32,
1016 rep_levels_byte_len: rep_levels_byte_len as u32,
1017 is_compressed: self.compressor.is_some(),
1018 statistics: page_statistics,
1019 };
1020
1021 CompressedPage::new(data_page, uncompressed_size)
1022 }
1023 };
1024
1025 if self.encoder.has_dictionary() {
1027 self.data_pages.push_back(compressed_page);
1028 } else {
1029 self.write_data_page(compressed_page)?;
1030 }
1031
1032 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1034
1035 self.rep_levels_sink.clear();
1037 self.def_levels_sink.clear();
1038 self.page_metrics.new_page();
1039
1040 Ok(())
1041 }
1042
1043 #[inline]
1046 fn flush_data_pages(&mut self) -> Result<()> {
1047 if self.page_metrics.num_buffered_values > 0 {
1049 self.add_data_page()?;
1050 }
1051
1052 while let Some(page) = self.data_pages.pop_front() {
1053 self.write_data_page(page)?;
1054 }
1055
1056 Ok(())
1057 }
1058
1059 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1061 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1062 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1063 let num_values = self.column_metrics.total_num_values as i64;
1064 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1065 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1067
1068 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1069 .set_compression(self.codec)
1070 .set_encodings(self.encodings.iter().cloned().collect())
1071 .set_total_compressed_size(total_compressed_size)
1072 .set_total_uncompressed_size(total_uncompressed_size)
1073 .set_num_values(num_values)
1074 .set_data_page_offset(data_page_offset)
1075 .set_dictionary_page_offset(dict_page_offset);
1076
1077 if self.statistics_enabled != EnabledStatistics::None {
1078 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1079
1080 let statistics = ValueStatistics::<E::T>::new(
1081 self.column_metrics.min_column_value.clone(),
1082 self.column_metrics.max_column_value.clone(),
1083 self.column_metrics.column_distinct_count,
1084 Some(self.column_metrics.num_column_nulls),
1085 false,
1086 )
1087 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1088 .into();
1089
1090 let statistics = match statistics {
1091 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
1092 let (min, did_truncate_min) = self.truncate_min_value(
1093 self.props.statistics_truncate_length(),
1094 stats.min_bytes_opt().unwrap(),
1095 );
1096 let (max, did_truncate_max) = self.truncate_max_value(
1097 self.props.statistics_truncate_length(),
1098 stats.max_bytes_opt().unwrap(),
1099 );
1100 Statistics::ByteArray(
1101 ValueStatistics::new(
1102 Some(min.into()),
1103 Some(max.into()),
1104 stats.distinct_count(),
1105 stats.null_count_opt(),
1106 backwards_compatible_min_max,
1107 )
1108 .with_max_is_exact(!did_truncate_max)
1109 .with_min_is_exact(!did_truncate_min),
1110 )
1111 }
1112 Statistics::FixedLenByteArray(stats)
1113 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
1114 {
1115 let (min, did_truncate_min) = self.truncate_min_value(
1116 self.props.statistics_truncate_length(),
1117 stats.min_bytes_opt().unwrap(),
1118 );
1119 let (max, did_truncate_max) = self.truncate_max_value(
1120 self.props.statistics_truncate_length(),
1121 stats.max_bytes_opt().unwrap(),
1122 );
1123 Statistics::FixedLenByteArray(
1124 ValueStatistics::new(
1125 Some(min.into()),
1126 Some(max.into()),
1127 stats.distinct_count(),
1128 stats.null_count_opt(),
1129 backwards_compatible_min_max,
1130 )
1131 .with_max_is_exact(!did_truncate_max)
1132 .with_min_is_exact(!did_truncate_min),
1133 )
1134 }
1135 stats => stats,
1136 };
1137
1138 builder = builder
1139 .set_statistics(statistics)
1140 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1141 .set_repetition_level_histogram(
1142 self.column_metrics.repetition_level_histogram.take(),
1143 )
1144 .set_definition_level_histogram(
1145 self.column_metrics.definition_level_histogram.take(),
1146 );
1147 }
1148
1149 let metadata = builder.build()?;
1150 Ok(metadata)
1151 }
1152
1153 #[inline]
1155 fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1156 let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1157 encoder.put(levels);
1158 encoder.consume()
1159 }
1160
1161 #[inline]
1164 fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1165 let mut encoder = LevelEncoder::v2(max_level, levels.len());
1166 encoder.put(levels);
1167 encoder.consume()
1168 }
1169
1170 #[inline]
1172 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1173 self.encodings.insert(page.encoding());
1174 let page_spec = self.page_writer.write_page(page)?;
1175 self.offset_index_builder
1178 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32);
1179 self.update_metrics_for_page(page_spec);
1180 Ok(())
1181 }
1182
1183 #[inline]
1185 fn write_dictionary_page(&mut self) -> Result<()> {
1186 let compressed_page = {
1187 let mut page = self
1188 .encoder
1189 .flush_dict_page()?
1190 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1191
1192 let uncompressed_size = page.buf.len();
1193
1194 if let Some(ref mut cmpr) = self.compressor {
1195 let mut output_buf = Vec::with_capacity(uncompressed_size);
1196 cmpr.compress(&page.buf, &mut output_buf)?;
1197 page.buf = Bytes::from(output_buf);
1198 }
1199
1200 let dict_page = Page::DictionaryPage {
1201 buf: page.buf,
1202 num_values: page.num_values as u32,
1203 encoding: self.props.dictionary_page_encoding(),
1204 is_sorted: page.is_sorted,
1205 };
1206 CompressedPage::new(dict_page, uncompressed_size)
1207 };
1208
1209 self.encodings.insert(compressed_page.encoding());
1210 let page_spec = self.page_writer.write_page(compressed_page)?;
1211 self.update_metrics_for_page(page_spec);
1212 Ok(())
1214 }
1215
1216 #[inline]
1218 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1219 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1220 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1221 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1222
1223 match page_spec.page_type {
1224 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1225 self.column_metrics.total_num_values += page_spec.num_values as u64;
1226 if self.column_metrics.data_page_offset.is_none() {
1227 self.column_metrics.data_page_offset = Some(page_spec.offset);
1228 }
1229 }
1230 PageType::DICTIONARY_PAGE => {
1231 assert!(
1232 self.column_metrics.dictionary_page_offset.is_none(),
1233 "Dictionary offset is already set"
1234 );
1235 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1236 }
1237 _ => {}
1238 }
1239 }
1240}
1241
1242fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1243 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1244}
1245
1246fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1247 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1248}
1249
1250#[inline]
1251#[allow(clippy::eq_op)]
1252fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1253 match T::PHYSICAL_TYPE {
1254 Type::FLOAT | Type::DOUBLE => val != val,
1255 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1256 let val = val.as_bytes();
1257 let val = f16::from_le_bytes([val[0], val[1]]);
1258 val.is_nan()
1259 }
1260 _ => false,
1261 }
1262}
1263
1264fn update_stat<T: ParquetValueType, F>(
1269 descr: &ColumnDescriptor,
1270 val: &T,
1271 cur: &mut Option<T>,
1272 should_update: F,
1273) where
1274 F: Fn(&T) -> bool,
1275{
1276 if is_nan(descr, val) {
1277 return;
1278 }
1279
1280 if cur.as_ref().map_or(true, should_update) {
1281 *cur = Some(val.clone());
1282 }
1283}
1284
1285fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1287 if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
1288 if !is_signed {
1289 return a.as_u64().unwrap() > b.as_u64().unwrap();
1291 }
1292 }
1293
1294 match descr.converted_type() {
1295 ConvertedType::UINT_8
1296 | ConvertedType::UINT_16
1297 | ConvertedType::UINT_32
1298 | ConvertedType::UINT_64 => {
1299 return a.as_u64().unwrap() > b.as_u64().unwrap();
1300 }
1301 _ => {}
1302 };
1303
1304 if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1305 match T::PHYSICAL_TYPE {
1306 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1307 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1308 }
1309 _ => {}
1310 };
1311 }
1312
1313 if descr.converted_type() == ConvertedType::DECIMAL {
1314 match T::PHYSICAL_TYPE {
1315 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1316 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1317 }
1318 _ => {}
1319 };
1320 };
1321
1322 if let Some(LogicalType::Float16) = descr.logical_type() {
1323 let a = a.as_bytes();
1324 let a = f16::from_le_bytes([a[0], a[1]]);
1325 let b = b.as_bytes();
1326 let b = f16::from_le_bytes([b[0], b[1]]);
1327 return a > b;
1328 }
1329
1330 a > b
1331}
1332
1333fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1341 match (kind, props.writer_version()) {
1342 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1343 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1344 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1345 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1346 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1347 _ => Encoding::PLAIN,
1348 }
1349}
1350
1351fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1353 match (kind, props.writer_version()) {
1354 (Type::BOOLEAN, _) => false,
1356 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1358 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1359 _ => true,
1360 }
1361}
1362
1363fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1365 let a_length = a.len();
1366 let b_length = b.len();
1367
1368 if a_length == 0 || b_length == 0 {
1369 return a_length > 0;
1370 }
1371
1372 let first_a: u8 = a[0];
1373 let first_b: u8 = b[0];
1374
1375 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1380 return (first_a as i8) > (first_b as i8);
1381 }
1382
1383 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1389
1390 if a_length != b_length {
1391 let not_equal = if a_length > b_length {
1392 let lead_length = a_length - b_length;
1393 a[0..lead_length].iter().any(|&x| x != extension)
1394 } else {
1395 let lead_length = b_length - a_length;
1396 b[0..lead_length].iter().any(|&x| x != extension)
1397 };
1398
1399 if not_equal {
1400 let negative_values: bool = (first_a as i8) < 0;
1401 let a_longer: bool = a_length > b_length;
1402 return if negative_values { !a_longer } else { a_longer };
1403 }
1404 }
1405
1406 (a[1..]) > (b[1..])
1407}
1408
1409fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1412 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1413 Some(data.as_bytes()[..split].to_vec())
1414}
1415
1416fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1420 for byte in data.iter_mut().rev() {
1421 let (incremented, overflow) = byte.overflowing_add(1);
1422 *byte = incremented;
1423
1424 if !overflow {
1425 return Some(data);
1426 }
1427 }
1428
1429 None
1430}
1431
1432fn increment_utf8(mut data: Vec<u8>) -> Option<Vec<u8>> {
1435 for idx in (0..data.len()).rev() {
1436 let original = data[idx];
1437 let (byte, overflow) = original.overflowing_add(1);
1438 if !overflow {
1439 data[idx] = byte;
1440 if str::from_utf8(&data).is_ok() {
1441 return Some(data);
1442 }
1443 data[idx] = original;
1444 }
1445 }
1446
1447 None
1448}
1449
1450#[cfg(test)]
1451mod tests {
1452 use crate::file::properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
1453 use rand::distributions::uniform::SampleUniform;
1454 use std::sync::Arc;
1455
1456 use crate::column::{
1457 page::PageReader,
1458 reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1459 };
1460 use crate::file::writer::TrackedWrite;
1461 use crate::file::{
1462 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1463 };
1464 use crate::schema::types::{ColumnPath, Type as SchemaType};
1465 use crate::util::test_common::rand_gen::random_numbers_range;
1466
1467 use super::*;
1468
1469 #[test]
1470 fn test_column_writer_inconsistent_def_rep_length() {
1471 let page_writer = get_test_page_writer();
1472 let props = Default::default();
1473 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1474 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1475 assert!(res.is_err());
1476 if let Err(err) = res {
1477 assert_eq!(
1478 format!("{err}"),
1479 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1480 );
1481 }
1482 }
1483
1484 #[test]
1485 fn test_column_writer_invalid_def_levels() {
1486 let page_writer = get_test_page_writer();
1487 let props = Default::default();
1488 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1489 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1490 assert!(res.is_err());
1491 if let Err(err) = res {
1492 assert_eq!(
1493 format!("{err}"),
1494 "Parquet error: Definition levels are required, because max definition level = 1"
1495 );
1496 }
1497 }
1498
1499 #[test]
1500 fn test_column_writer_invalid_rep_levels() {
1501 let page_writer = get_test_page_writer();
1502 let props = Default::default();
1503 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1504 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1505 assert!(res.is_err());
1506 if let Err(err) = res {
1507 assert_eq!(
1508 format!("{err}"),
1509 "Parquet error: Repetition levels are required, because max repetition level = 1"
1510 );
1511 }
1512 }
1513
1514 #[test]
1515 fn test_column_writer_not_enough_values_to_write() {
1516 let page_writer = get_test_page_writer();
1517 let props = Default::default();
1518 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1519 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1520 assert!(res.is_err());
1521 if let Err(err) = res {
1522 assert_eq!(
1523 format!("{err}"),
1524 "Parquet error: Expected to write 4 values, but have only 2"
1525 );
1526 }
1527 }
1528
1529 #[test]
1530 fn test_column_writer_write_only_one_dictionary_page() {
1531 let page_writer = get_test_page_writer();
1532 let props = Default::default();
1533 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1534 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1535 writer.add_data_page().unwrap();
1537 writer.write_dictionary_page().unwrap();
1538 let err = writer.write_dictionary_page().unwrap_err().to_string();
1539 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1540 }
1541
1542 #[test]
1543 fn test_column_writer_error_when_writing_disabled_dictionary() {
1544 let page_writer = get_test_page_writer();
1545 let props = Arc::new(
1546 WriterProperties::builder()
1547 .set_dictionary_enabled(false)
1548 .build(),
1549 );
1550 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1551 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1552 let err = writer.write_dictionary_page().unwrap_err().to_string();
1553 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1554 }
1555
1556 #[test]
1557 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1558 let page_writer = get_test_page_writer();
1559 let props = Arc::new(
1560 WriterProperties::builder()
1561 .set_dictionary_enabled(true)
1562 .build(),
1563 );
1564 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1565 writer
1566 .write_batch(&[true, false, true, false], None, None)
1567 .unwrap();
1568
1569 let r = writer.close().unwrap();
1570 assert_eq!(r.bytes_written, 1);
1573 assert_eq!(r.rows_written, 4);
1574
1575 let metadata = r.metadata;
1576 assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1577 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
1579 }
1580
1581 #[test]
1582 fn test_column_writer_default_encoding_support_bool() {
1583 check_encoding_write_support::<BoolType>(
1584 WriterVersion::PARQUET_1_0,
1585 true,
1586 &[true, false],
1587 None,
1588 &[Encoding::PLAIN, Encoding::RLE],
1589 );
1590 check_encoding_write_support::<BoolType>(
1591 WriterVersion::PARQUET_1_0,
1592 false,
1593 &[true, false],
1594 None,
1595 &[Encoding::PLAIN, Encoding::RLE],
1596 );
1597 check_encoding_write_support::<BoolType>(
1598 WriterVersion::PARQUET_2_0,
1599 true,
1600 &[true, false],
1601 None,
1602 &[Encoding::RLE],
1603 );
1604 check_encoding_write_support::<BoolType>(
1605 WriterVersion::PARQUET_2_0,
1606 false,
1607 &[true, false],
1608 None,
1609 &[Encoding::RLE],
1610 );
1611 }
1612
1613 #[test]
1614 fn test_column_writer_default_encoding_support_int32() {
1615 check_encoding_write_support::<Int32Type>(
1616 WriterVersion::PARQUET_1_0,
1617 true,
1618 &[1, 2],
1619 Some(0),
1620 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1621 );
1622 check_encoding_write_support::<Int32Type>(
1623 WriterVersion::PARQUET_1_0,
1624 false,
1625 &[1, 2],
1626 None,
1627 &[Encoding::PLAIN, Encoding::RLE],
1628 );
1629 check_encoding_write_support::<Int32Type>(
1630 WriterVersion::PARQUET_2_0,
1631 true,
1632 &[1, 2],
1633 Some(0),
1634 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1635 );
1636 check_encoding_write_support::<Int32Type>(
1637 WriterVersion::PARQUET_2_0,
1638 false,
1639 &[1, 2],
1640 None,
1641 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1642 );
1643 }
1644
1645 #[test]
1646 fn test_column_writer_default_encoding_support_int64() {
1647 check_encoding_write_support::<Int64Type>(
1648 WriterVersion::PARQUET_1_0,
1649 true,
1650 &[1, 2],
1651 Some(0),
1652 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1653 );
1654 check_encoding_write_support::<Int64Type>(
1655 WriterVersion::PARQUET_1_0,
1656 false,
1657 &[1, 2],
1658 None,
1659 &[Encoding::PLAIN, Encoding::RLE],
1660 );
1661 check_encoding_write_support::<Int64Type>(
1662 WriterVersion::PARQUET_2_0,
1663 true,
1664 &[1, 2],
1665 Some(0),
1666 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1667 );
1668 check_encoding_write_support::<Int64Type>(
1669 WriterVersion::PARQUET_2_0,
1670 false,
1671 &[1, 2],
1672 None,
1673 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1674 );
1675 }
1676
1677 #[test]
1678 fn test_column_writer_default_encoding_support_int96() {
1679 check_encoding_write_support::<Int96Type>(
1680 WriterVersion::PARQUET_1_0,
1681 true,
1682 &[Int96::from(vec![1, 2, 3])],
1683 Some(0),
1684 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1685 );
1686 check_encoding_write_support::<Int96Type>(
1687 WriterVersion::PARQUET_1_0,
1688 false,
1689 &[Int96::from(vec![1, 2, 3])],
1690 None,
1691 &[Encoding::PLAIN, Encoding::RLE],
1692 );
1693 check_encoding_write_support::<Int96Type>(
1694 WriterVersion::PARQUET_2_0,
1695 true,
1696 &[Int96::from(vec![1, 2, 3])],
1697 Some(0),
1698 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1699 );
1700 check_encoding_write_support::<Int96Type>(
1701 WriterVersion::PARQUET_2_0,
1702 false,
1703 &[Int96::from(vec![1, 2, 3])],
1704 None,
1705 &[Encoding::PLAIN, Encoding::RLE],
1706 );
1707 }
1708
1709 #[test]
1710 fn test_column_writer_default_encoding_support_float() {
1711 check_encoding_write_support::<FloatType>(
1712 WriterVersion::PARQUET_1_0,
1713 true,
1714 &[1.0, 2.0],
1715 Some(0),
1716 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1717 );
1718 check_encoding_write_support::<FloatType>(
1719 WriterVersion::PARQUET_1_0,
1720 false,
1721 &[1.0, 2.0],
1722 None,
1723 &[Encoding::PLAIN, Encoding::RLE],
1724 );
1725 check_encoding_write_support::<FloatType>(
1726 WriterVersion::PARQUET_2_0,
1727 true,
1728 &[1.0, 2.0],
1729 Some(0),
1730 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1731 );
1732 check_encoding_write_support::<FloatType>(
1733 WriterVersion::PARQUET_2_0,
1734 false,
1735 &[1.0, 2.0],
1736 None,
1737 &[Encoding::PLAIN, Encoding::RLE],
1738 );
1739 }
1740
1741 #[test]
1742 fn test_column_writer_default_encoding_support_double() {
1743 check_encoding_write_support::<DoubleType>(
1744 WriterVersion::PARQUET_1_0,
1745 true,
1746 &[1.0, 2.0],
1747 Some(0),
1748 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1749 );
1750 check_encoding_write_support::<DoubleType>(
1751 WriterVersion::PARQUET_1_0,
1752 false,
1753 &[1.0, 2.0],
1754 None,
1755 &[Encoding::PLAIN, Encoding::RLE],
1756 );
1757 check_encoding_write_support::<DoubleType>(
1758 WriterVersion::PARQUET_2_0,
1759 true,
1760 &[1.0, 2.0],
1761 Some(0),
1762 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1763 );
1764 check_encoding_write_support::<DoubleType>(
1765 WriterVersion::PARQUET_2_0,
1766 false,
1767 &[1.0, 2.0],
1768 None,
1769 &[Encoding::PLAIN, Encoding::RLE],
1770 );
1771 }
1772
1773 #[test]
1774 fn test_column_writer_default_encoding_support_byte_array() {
1775 check_encoding_write_support::<ByteArrayType>(
1776 WriterVersion::PARQUET_1_0,
1777 true,
1778 &[ByteArray::from(vec![1u8])],
1779 Some(0),
1780 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1781 );
1782 check_encoding_write_support::<ByteArrayType>(
1783 WriterVersion::PARQUET_1_0,
1784 false,
1785 &[ByteArray::from(vec![1u8])],
1786 None,
1787 &[Encoding::PLAIN, Encoding::RLE],
1788 );
1789 check_encoding_write_support::<ByteArrayType>(
1790 WriterVersion::PARQUET_2_0,
1791 true,
1792 &[ByteArray::from(vec![1u8])],
1793 Some(0),
1794 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1795 );
1796 check_encoding_write_support::<ByteArrayType>(
1797 WriterVersion::PARQUET_2_0,
1798 false,
1799 &[ByteArray::from(vec![1u8])],
1800 None,
1801 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
1802 );
1803 }
1804
1805 #[test]
1806 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
1807 check_encoding_write_support::<FixedLenByteArrayType>(
1808 WriterVersion::PARQUET_1_0,
1809 true,
1810 &[ByteArray::from(vec![1u8]).into()],
1811 None,
1812 &[Encoding::PLAIN, Encoding::RLE],
1813 );
1814 check_encoding_write_support::<FixedLenByteArrayType>(
1815 WriterVersion::PARQUET_1_0,
1816 false,
1817 &[ByteArray::from(vec![1u8]).into()],
1818 None,
1819 &[Encoding::PLAIN, Encoding::RLE],
1820 );
1821 check_encoding_write_support::<FixedLenByteArrayType>(
1822 WriterVersion::PARQUET_2_0,
1823 true,
1824 &[ByteArray::from(vec![1u8]).into()],
1825 Some(0),
1826 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1827 );
1828 check_encoding_write_support::<FixedLenByteArrayType>(
1829 WriterVersion::PARQUET_2_0,
1830 false,
1831 &[ByteArray::from(vec![1u8]).into()],
1832 None,
1833 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
1834 );
1835 }
1836
1837 #[test]
1838 fn test_column_writer_check_metadata() {
1839 let page_writer = get_test_page_writer();
1840 let props = Default::default();
1841 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1842 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1843
1844 let r = writer.close().unwrap();
1845 assert_eq!(r.bytes_written, 20);
1846 assert_eq!(r.rows_written, 4);
1847
1848 let metadata = r.metadata;
1849 assert_eq!(
1850 metadata.encodings(),
1851 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
1852 );
1853 assert_eq!(metadata.num_values(), 4);
1854 assert_eq!(metadata.compressed_size(), 20);
1855 assert_eq!(metadata.uncompressed_size(), 20);
1856 assert_eq!(metadata.data_page_offset(), 0);
1857 assert_eq!(metadata.dictionary_page_offset(), Some(0));
1858 if let Some(stats) = metadata.statistics() {
1859 assert_eq!(stats.null_count_opt(), Some(0));
1860 assert_eq!(stats.distinct_count_opt(), None);
1861 if let Statistics::Int32(stats) = stats {
1862 assert_eq!(stats.min_opt().unwrap(), &1);
1863 assert_eq!(stats.max_opt().unwrap(), &4);
1864 } else {
1865 panic!("expecting Statistics::Int32");
1866 }
1867 } else {
1868 panic!("metadata missing statistics");
1869 }
1870 }
1871
1872 #[test]
1873 fn test_column_writer_check_byte_array_min_max() {
1874 let page_writer = get_test_page_writer();
1875 let props = Default::default();
1876 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
1877 writer
1878 .write_batch(
1879 &[
1880 ByteArray::from(vec![
1881 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
1882 35u8, 231u8, 90u8, 0u8, 0u8,
1883 ]),
1884 ByteArray::from(vec![
1885 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
1886 152u8, 177u8, 56u8, 0u8, 0u8,
1887 ]),
1888 ByteArray::from(vec![
1889 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
1890 0u8,
1891 ]),
1892 ByteArray::from(vec![
1893 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
1894 44u8, 0u8, 0u8,
1895 ]),
1896 ],
1897 None,
1898 None,
1899 )
1900 .unwrap();
1901 let metadata = writer.close().unwrap().metadata;
1902 if let Some(stats) = metadata.statistics() {
1903 if let Statistics::ByteArray(stats) = stats {
1904 assert_eq!(
1905 stats.min_opt().unwrap(),
1906 &ByteArray::from(vec![
1907 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
1908 35u8, 231u8, 90u8, 0u8, 0u8,
1909 ])
1910 );
1911 assert_eq!(
1912 stats.max_opt().unwrap(),
1913 &ByteArray::from(vec![
1914 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
1915 44u8, 0u8, 0u8,
1916 ])
1917 );
1918 } else {
1919 panic!("expecting Statistics::ByteArray");
1920 }
1921 } else {
1922 panic!("metadata missing statistics");
1923 }
1924 }
1925
1926 #[test]
1927 fn test_column_writer_uint32_converted_type_min_max() {
1928 let page_writer = get_test_page_writer();
1929 let props = Default::default();
1930 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
1931 page_writer,
1932 0,
1933 0,
1934 props,
1935 );
1936 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
1937 let metadata = writer.close().unwrap().metadata;
1938 if let Some(stats) = metadata.statistics() {
1939 if let Statistics::Int32(stats) = stats {
1940 assert_eq!(stats.min_opt().unwrap(), &0,);
1941 assert_eq!(stats.max_opt().unwrap(), &5,);
1942 } else {
1943 panic!("expecting Statistics::Int32");
1944 }
1945 } else {
1946 panic!("metadata missing statistics");
1947 }
1948 }
1949
1950 #[test]
1951 fn test_column_writer_precalculated_statistics() {
1952 let page_writer = get_test_page_writer();
1953 let props = Arc::new(
1954 WriterProperties::builder()
1955 .set_statistics_enabled(EnabledStatistics::Chunk)
1956 .build(),
1957 );
1958 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1959 writer
1960 .write_batch_with_statistics(
1961 &[1, 2, 3, 4],
1962 None,
1963 None,
1964 Some(&-17),
1965 Some(&9000),
1966 Some(55),
1967 )
1968 .unwrap();
1969
1970 let r = writer.close().unwrap();
1971 assert_eq!(r.bytes_written, 20);
1972 assert_eq!(r.rows_written, 4);
1973
1974 let metadata = r.metadata;
1975 assert_eq!(
1976 metadata.encodings(),
1977 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
1978 );
1979 assert_eq!(metadata.num_values(), 4);
1980 assert_eq!(metadata.compressed_size(), 20);
1981 assert_eq!(metadata.uncompressed_size(), 20);
1982 assert_eq!(metadata.data_page_offset(), 0);
1983 assert_eq!(metadata.dictionary_page_offset(), Some(0));
1984 if let Some(stats) = metadata.statistics() {
1985 assert_eq!(stats.null_count_opt(), Some(0));
1986 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
1987 if let Statistics::Int32(stats) = stats {
1988 assert_eq!(stats.min_opt().unwrap(), &-17);
1989 assert_eq!(stats.max_opt().unwrap(), &9000);
1990 } else {
1991 panic!("expecting Statistics::Int32");
1992 }
1993 } else {
1994 panic!("metadata missing statistics");
1995 }
1996 }
1997
1998 #[test]
1999 fn test_mixed_precomputed_statistics() {
2000 let mut buf = Vec::with_capacity(100);
2001 let mut write = TrackedWrite::new(&mut buf);
2002 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2003 let props = Default::default();
2004 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2005
2006 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2007 writer
2008 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2009 .unwrap();
2010
2011 let r = writer.close().unwrap();
2012
2013 let stats = r.metadata.statistics().unwrap();
2014 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2015 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2016 assert_eq!(stats.null_count_opt(), Some(0));
2017 assert!(stats.distinct_count_opt().is_none());
2018
2019 drop(write);
2020
2021 let props = ReaderProperties::builder()
2022 .set_backward_compatible_lz4(false)
2023 .build();
2024 let reader = SerializedPageReader::new_with_properties(
2025 Arc::new(Bytes::from(buf)),
2026 &r.metadata,
2027 r.rows_written as usize,
2028 None,
2029 Arc::new(props),
2030 )
2031 .unwrap();
2032
2033 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2034 assert_eq!(pages.len(), 2);
2035
2036 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2037 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2038
2039 let page_statistics = pages[1].statistics().unwrap();
2040 assert_eq!(
2041 page_statistics.min_bytes_opt().unwrap(),
2042 1_i32.to_le_bytes()
2043 );
2044 assert_eq!(
2045 page_statistics.max_bytes_opt().unwrap(),
2046 7_i32.to_le_bytes()
2047 );
2048 assert_eq!(page_statistics.null_count_opt(), Some(0));
2049 assert!(page_statistics.distinct_count_opt().is_none());
2050 }
2051
2052 #[test]
2053 fn test_disabled_statistics() {
2054 let mut buf = Vec::with_capacity(100);
2055 let mut write = TrackedWrite::new(&mut buf);
2056 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2057 let props = WriterProperties::builder()
2058 .set_statistics_enabled(EnabledStatistics::None)
2059 .set_writer_version(WriterVersion::PARQUET_2_0)
2060 .build();
2061 let props = Arc::new(props);
2062
2063 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2064 writer
2065 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2066 .unwrap();
2067
2068 let r = writer.close().unwrap();
2069 assert!(r.metadata.statistics().is_none());
2070
2071 drop(write);
2072
2073 let props = ReaderProperties::builder()
2074 .set_backward_compatible_lz4(false)
2075 .build();
2076 let reader = SerializedPageReader::new_with_properties(
2077 Arc::new(Bytes::from(buf)),
2078 &r.metadata,
2079 r.rows_written as usize,
2080 None,
2081 Arc::new(props),
2082 )
2083 .unwrap();
2084
2085 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2086 assert_eq!(pages.len(), 2);
2087
2088 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2089 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2090
2091 match &pages[1] {
2092 Page::DataPageV2 {
2093 num_values,
2094 num_nulls,
2095 num_rows,
2096 statistics,
2097 ..
2098 } => {
2099 assert_eq!(*num_values, 6);
2100 assert_eq!(*num_nulls, 2);
2101 assert_eq!(*num_rows, 6);
2102 assert!(statistics.is_none());
2103 }
2104 _ => unreachable!(),
2105 }
2106 }
2107
2108 #[test]
2109 fn test_column_writer_empty_column_roundtrip() {
2110 let props = Default::default();
2111 column_roundtrip::<Int32Type>(props, &[], None, None);
2112 }
2113
2114 #[test]
2115 fn test_column_writer_non_nullable_values_roundtrip() {
2116 let props = Default::default();
2117 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2118 }
2119
2120 #[test]
2121 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2122 let props = Default::default();
2123 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2124 }
2125
2126 #[test]
2127 fn test_column_writer_nullable_repeated_values_roundtrip() {
2128 let props = Default::default();
2129 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2130 }
2131
2132 #[test]
2133 fn test_column_writer_dictionary_fallback_small_data_page() {
2134 let props = WriterProperties::builder()
2135 .set_dictionary_page_size_limit(32)
2136 .set_data_page_size_limit(32)
2137 .build();
2138 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2139 }
2140
2141 #[test]
2142 fn test_column_writer_small_write_batch_size() {
2143 for i in &[1usize, 2, 5, 10, 11, 1023] {
2144 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2145
2146 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2147 }
2148 }
2149
2150 #[test]
2151 fn test_column_writer_dictionary_disabled_v1() {
2152 let props = WriterProperties::builder()
2153 .set_writer_version(WriterVersion::PARQUET_1_0)
2154 .set_dictionary_enabled(false)
2155 .build();
2156 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2157 }
2158
2159 #[test]
2160 fn test_column_writer_dictionary_disabled_v2() {
2161 let props = WriterProperties::builder()
2162 .set_writer_version(WriterVersion::PARQUET_2_0)
2163 .set_dictionary_enabled(false)
2164 .build();
2165 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2166 }
2167
2168 #[test]
2169 fn test_column_writer_compression_v1() {
2170 let props = WriterProperties::builder()
2171 .set_writer_version(WriterVersion::PARQUET_1_0)
2172 .set_compression(Compression::SNAPPY)
2173 .build();
2174 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2175 }
2176
2177 #[test]
2178 fn test_column_writer_compression_v2() {
2179 let props = WriterProperties::builder()
2180 .set_writer_version(WriterVersion::PARQUET_2_0)
2181 .set_compression(Compression::SNAPPY)
2182 .build();
2183 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2184 }
2185
2186 #[test]
2187 fn test_column_writer_add_data_pages_with_dict() {
2188 let mut file = tempfile::tempfile().unwrap();
2191 let mut write = TrackedWrite::new(&mut file);
2192 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2193 let props = Arc::new(
2194 WriterProperties::builder()
2195 .set_data_page_size_limit(10)
2196 .set_write_batch_size(3) .build(),
2198 );
2199 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2200 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2201 writer.write_batch(data, None, None).unwrap();
2202 let r = writer.close().unwrap();
2203
2204 drop(write);
2205
2206 let props = ReaderProperties::builder()
2208 .set_backward_compatible_lz4(false)
2209 .build();
2210 let mut page_reader = Box::new(
2211 SerializedPageReader::new_with_properties(
2212 Arc::new(file),
2213 &r.metadata,
2214 r.rows_written as usize,
2215 None,
2216 Arc::new(props),
2217 )
2218 .unwrap(),
2219 );
2220 let mut res = Vec::new();
2221 while let Some(page) = page_reader.get_next_page().unwrap() {
2222 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2223 }
2224 assert_eq!(
2225 res,
2226 vec![
2227 (PageType::DICTIONARY_PAGE, 10, 40),
2228 (PageType::DATA_PAGE, 9, 10),
2229 (PageType::DATA_PAGE, 1, 3),
2230 ]
2231 );
2232 }
2233
2234 #[test]
2235 fn test_bool_statistics() {
2236 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2237 assert!(!stats.is_min_max_backwards_compatible());
2240 if let Statistics::Boolean(stats) = stats {
2241 assert_eq!(stats.min_opt().unwrap(), &false);
2242 assert_eq!(stats.max_opt().unwrap(), &true);
2243 } else {
2244 panic!("expecting Statistics::Boolean, got {stats:?}");
2245 }
2246 }
2247
2248 #[test]
2249 fn test_int32_statistics() {
2250 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2251 assert!(stats.is_min_max_backwards_compatible());
2252 if let Statistics::Int32(stats) = stats {
2253 assert_eq!(stats.min_opt().unwrap(), &-2);
2254 assert_eq!(stats.max_opt().unwrap(), &3);
2255 } else {
2256 panic!("expecting Statistics::Int32, got {stats:?}");
2257 }
2258 }
2259
2260 #[test]
2261 fn test_int64_statistics() {
2262 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2263 assert!(stats.is_min_max_backwards_compatible());
2264 if let Statistics::Int64(stats) = stats {
2265 assert_eq!(stats.min_opt().unwrap(), &-2);
2266 assert_eq!(stats.max_opt().unwrap(), &3);
2267 } else {
2268 panic!("expecting Statistics::Int64, got {stats:?}");
2269 }
2270 }
2271
2272 #[test]
2273 fn test_int96_statistics() {
2274 let input = vec![
2275 Int96::from(vec![1, 20, 30]),
2276 Int96::from(vec![3, 20, 10]),
2277 Int96::from(vec![0, 20, 30]),
2278 Int96::from(vec![2, 20, 30]),
2279 ]
2280 .into_iter()
2281 .collect::<Vec<Int96>>();
2282
2283 let stats = statistics_roundtrip::<Int96Type>(&input);
2284 assert!(!stats.is_min_max_backwards_compatible());
2285 if let Statistics::Int96(stats) = stats {
2286 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
2287 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2288 } else {
2289 panic!("expecting Statistics::Int96, got {stats:?}");
2290 }
2291 }
2292
2293 #[test]
2294 fn test_float_statistics() {
2295 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2296 assert!(stats.is_min_max_backwards_compatible());
2297 if let Statistics::Float(stats) = stats {
2298 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2299 assert_eq!(stats.max_opt().unwrap(), &3.0);
2300 } else {
2301 panic!("expecting Statistics::Float, got {stats:?}");
2302 }
2303 }
2304
2305 #[test]
2306 fn test_double_statistics() {
2307 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2308 assert!(stats.is_min_max_backwards_compatible());
2309 if let Statistics::Double(stats) = stats {
2310 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2311 assert_eq!(stats.max_opt().unwrap(), &3.0);
2312 } else {
2313 panic!("expecting Statistics::Double, got {stats:?}");
2314 }
2315 }
2316
2317 #[test]
2318 fn test_byte_array_statistics() {
2319 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2320 .iter()
2321 .map(|&s| s.into())
2322 .collect::<Vec<_>>();
2323
2324 let stats = statistics_roundtrip::<ByteArrayType>(&input);
2325 assert!(!stats.is_min_max_backwards_compatible());
2326 if let Statistics::ByteArray(stats) = stats {
2327 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2328 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2329 } else {
2330 panic!("expecting Statistics::ByteArray, got {stats:?}");
2331 }
2332 }
2333
2334 #[test]
2335 fn test_fixed_len_byte_array_statistics() {
2336 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
2337 .iter()
2338 .map(|&s| ByteArray::from(s).into())
2339 .collect::<Vec<_>>();
2340
2341 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2342 assert!(!stats.is_min_max_backwards_compatible());
2343 if let Statistics::FixedLenByteArray(stats) = stats {
2344 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
2345 assert_eq!(stats.min_opt().unwrap(), &expected_min);
2346 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
2347 assert_eq!(stats.max_opt().unwrap(), &expected_max);
2348 } else {
2349 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2350 }
2351 }
2352
2353 #[test]
2354 fn test_column_writer_check_float16_min_max() {
2355 let input = [
2356 -f16::ONE,
2357 f16::from_f32(3.0),
2358 -f16::from_f32(2.0),
2359 f16::from_f32(2.0),
2360 ]
2361 .into_iter()
2362 .map(|s| ByteArray::from(s).into())
2363 .collect::<Vec<_>>();
2364
2365 let stats = float16_statistics_roundtrip(&input);
2366 assert!(stats.is_min_max_backwards_compatible());
2367 assert_eq!(
2368 stats.min_opt().unwrap(),
2369 &ByteArray::from(-f16::from_f32(2.0))
2370 );
2371 assert_eq!(
2372 stats.max_opt().unwrap(),
2373 &ByteArray::from(f16::from_f32(3.0))
2374 );
2375 }
2376
2377 #[test]
2378 fn test_column_writer_check_float16_nan_middle() {
2379 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2380 .into_iter()
2381 .map(|s| ByteArray::from(s).into())
2382 .collect::<Vec<_>>();
2383
2384 let stats = float16_statistics_roundtrip(&input);
2385 assert!(stats.is_min_max_backwards_compatible());
2386 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2387 assert_eq!(
2388 stats.max_opt().unwrap(),
2389 &ByteArray::from(f16::ONE + f16::ONE)
2390 );
2391 }
2392
2393 #[test]
2394 fn test_float16_statistics_nan_middle() {
2395 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2396 .into_iter()
2397 .map(|s| ByteArray::from(s).into())
2398 .collect::<Vec<_>>();
2399
2400 let stats = float16_statistics_roundtrip(&input);
2401 assert!(stats.is_min_max_backwards_compatible());
2402 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2403 assert_eq!(
2404 stats.max_opt().unwrap(),
2405 &ByteArray::from(f16::ONE + f16::ONE)
2406 );
2407 }
2408
2409 #[test]
2410 fn test_float16_statistics_nan_start() {
2411 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2412 .into_iter()
2413 .map(|s| ByteArray::from(s).into())
2414 .collect::<Vec<_>>();
2415
2416 let stats = float16_statistics_roundtrip(&input);
2417 assert!(stats.is_min_max_backwards_compatible());
2418 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2419 assert_eq!(
2420 stats.max_opt().unwrap(),
2421 &ByteArray::from(f16::ONE + f16::ONE)
2422 );
2423 }
2424
2425 #[test]
2426 fn test_float16_statistics_nan_only() {
2427 let input = [f16::NAN, f16::NAN]
2428 .into_iter()
2429 .map(|s| ByteArray::from(s).into())
2430 .collect::<Vec<_>>();
2431
2432 let stats = float16_statistics_roundtrip(&input);
2433 assert!(stats.min_bytes_opt().is_none());
2434 assert!(stats.max_bytes_opt().is_none());
2435 assert!(stats.is_min_max_backwards_compatible());
2436 }
2437
2438 #[test]
2439 fn test_float16_statistics_zero_only() {
2440 let input = [f16::ZERO]
2441 .into_iter()
2442 .map(|s| ByteArray::from(s).into())
2443 .collect::<Vec<_>>();
2444
2445 let stats = float16_statistics_roundtrip(&input);
2446 assert!(stats.is_min_max_backwards_compatible());
2447 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2448 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2449 }
2450
2451 #[test]
2452 fn test_float16_statistics_neg_zero_only() {
2453 let input = [f16::NEG_ZERO]
2454 .into_iter()
2455 .map(|s| ByteArray::from(s).into())
2456 .collect::<Vec<_>>();
2457
2458 let stats = float16_statistics_roundtrip(&input);
2459 assert!(stats.is_min_max_backwards_compatible());
2460 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2461 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2462 }
2463
2464 #[test]
2465 fn test_float16_statistics_zero_min() {
2466 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2467 .into_iter()
2468 .map(|s| ByteArray::from(s).into())
2469 .collect::<Vec<_>>();
2470
2471 let stats = float16_statistics_roundtrip(&input);
2472 assert!(stats.is_min_max_backwards_compatible());
2473 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2474 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2475 }
2476
2477 #[test]
2478 fn test_float16_statistics_neg_zero_max() {
2479 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2480 .into_iter()
2481 .map(|s| ByteArray::from(s).into())
2482 .collect::<Vec<_>>();
2483
2484 let stats = float16_statistics_roundtrip(&input);
2485 assert!(stats.is_min_max_backwards_compatible());
2486 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2487 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2488 }
2489
2490 #[test]
2491 fn test_float_statistics_nan_middle() {
2492 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2493 assert!(stats.is_min_max_backwards_compatible());
2494 if let Statistics::Float(stats) = stats {
2495 assert_eq!(stats.min_opt().unwrap(), &1.0);
2496 assert_eq!(stats.max_opt().unwrap(), &2.0);
2497 } else {
2498 panic!("expecting Statistics::Float");
2499 }
2500 }
2501
2502 #[test]
2503 fn test_float_statistics_nan_start() {
2504 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2505 assert!(stats.is_min_max_backwards_compatible());
2506 if let Statistics::Float(stats) = stats {
2507 assert_eq!(stats.min_opt().unwrap(), &1.0);
2508 assert_eq!(stats.max_opt().unwrap(), &2.0);
2509 } else {
2510 panic!("expecting Statistics::Float");
2511 }
2512 }
2513
2514 #[test]
2515 fn test_float_statistics_nan_only() {
2516 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2517 assert!(stats.min_bytes_opt().is_none());
2518 assert!(stats.max_bytes_opt().is_none());
2519 assert!(stats.is_min_max_backwards_compatible());
2520 assert!(matches!(stats, Statistics::Float(_)));
2521 }
2522
2523 #[test]
2524 fn test_float_statistics_zero_only() {
2525 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2526 assert!(stats.is_min_max_backwards_compatible());
2527 if let Statistics::Float(stats) = stats {
2528 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2529 assert!(stats.min_opt().unwrap().is_sign_negative());
2530 assert_eq!(stats.max_opt().unwrap(), &0.0);
2531 assert!(stats.max_opt().unwrap().is_sign_positive());
2532 } else {
2533 panic!("expecting Statistics::Float");
2534 }
2535 }
2536
2537 #[test]
2538 fn test_float_statistics_neg_zero_only() {
2539 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2540 assert!(stats.is_min_max_backwards_compatible());
2541 if let Statistics::Float(stats) = stats {
2542 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2543 assert!(stats.min_opt().unwrap().is_sign_negative());
2544 assert_eq!(stats.max_opt().unwrap(), &0.0);
2545 assert!(stats.max_opt().unwrap().is_sign_positive());
2546 } else {
2547 panic!("expecting Statistics::Float");
2548 }
2549 }
2550
2551 #[test]
2552 fn test_float_statistics_zero_min() {
2553 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2554 assert!(stats.is_min_max_backwards_compatible());
2555 if let Statistics::Float(stats) = stats {
2556 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2557 assert!(stats.min_opt().unwrap().is_sign_negative());
2558 assert_eq!(stats.max_opt().unwrap(), &2.0);
2559 } else {
2560 panic!("expecting Statistics::Float");
2561 }
2562 }
2563
2564 #[test]
2565 fn test_float_statistics_neg_zero_max() {
2566 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2567 assert!(stats.is_min_max_backwards_compatible());
2568 if let Statistics::Float(stats) = stats {
2569 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2570 assert_eq!(stats.max_opt().unwrap(), &0.0);
2571 assert!(stats.max_opt().unwrap().is_sign_positive());
2572 } else {
2573 panic!("expecting Statistics::Float");
2574 }
2575 }
2576
2577 #[test]
2578 fn test_double_statistics_nan_middle() {
2579 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2580 assert!(stats.is_min_max_backwards_compatible());
2581 if let Statistics::Double(stats) = stats {
2582 assert_eq!(stats.min_opt().unwrap(), &1.0);
2583 assert_eq!(stats.max_opt().unwrap(), &2.0);
2584 } else {
2585 panic!("expecting Statistics::Double");
2586 }
2587 }
2588
2589 #[test]
2590 fn test_double_statistics_nan_start() {
2591 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2592 assert!(stats.is_min_max_backwards_compatible());
2593 if let Statistics::Double(stats) = stats {
2594 assert_eq!(stats.min_opt().unwrap(), &1.0);
2595 assert_eq!(stats.max_opt().unwrap(), &2.0);
2596 } else {
2597 panic!("expecting Statistics::Double");
2598 }
2599 }
2600
2601 #[test]
2602 fn test_double_statistics_nan_only() {
2603 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2604 assert!(stats.min_bytes_opt().is_none());
2605 assert!(stats.max_bytes_opt().is_none());
2606 assert!(matches!(stats, Statistics::Double(_)));
2607 assert!(stats.is_min_max_backwards_compatible());
2608 }
2609
2610 #[test]
2611 fn test_double_statistics_zero_only() {
2612 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2613 assert!(stats.is_min_max_backwards_compatible());
2614 if let Statistics::Double(stats) = stats {
2615 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2616 assert!(stats.min_opt().unwrap().is_sign_negative());
2617 assert_eq!(stats.max_opt().unwrap(), &0.0);
2618 assert!(stats.max_opt().unwrap().is_sign_positive());
2619 } else {
2620 panic!("expecting Statistics::Double");
2621 }
2622 }
2623
2624 #[test]
2625 fn test_double_statistics_neg_zero_only() {
2626 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2627 assert!(stats.is_min_max_backwards_compatible());
2628 if let Statistics::Double(stats) = stats {
2629 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2630 assert!(stats.min_opt().unwrap().is_sign_negative());
2631 assert_eq!(stats.max_opt().unwrap(), &0.0);
2632 assert!(stats.max_opt().unwrap().is_sign_positive());
2633 } else {
2634 panic!("expecting Statistics::Double");
2635 }
2636 }
2637
2638 #[test]
2639 fn test_double_statistics_zero_min() {
2640 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2641 assert!(stats.is_min_max_backwards_compatible());
2642 if let Statistics::Double(stats) = stats {
2643 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2644 assert!(stats.min_opt().unwrap().is_sign_negative());
2645 assert_eq!(stats.max_opt().unwrap(), &2.0);
2646 } else {
2647 panic!("expecting Statistics::Double");
2648 }
2649 }
2650
2651 #[test]
2652 fn test_double_statistics_neg_zero_max() {
2653 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2654 assert!(stats.is_min_max_backwards_compatible());
2655 if let Statistics::Double(stats) = stats {
2656 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2657 assert_eq!(stats.max_opt().unwrap(), &0.0);
2658 assert!(stats.max_opt().unwrap().is_sign_positive());
2659 } else {
2660 panic!("expecting Statistics::Double");
2661 }
2662 }
2663
2664 #[test]
2665 fn test_compare_greater_byte_array_decimals() {
2666 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2667 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2668 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2669 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2670 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2671 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2672 assert!(!compare_greater_byte_array_decimals(
2673 &[0u8, 1u8,],
2674 &[1u8, 0u8,],
2675 ),);
2676 assert!(!compare_greater_byte_array_decimals(
2677 &[255u8, 35u8, 0u8, 0u8,],
2678 &[0u8,],
2679 ),);
2680 assert!(compare_greater_byte_array_decimals(
2681 &[0u8,],
2682 &[255u8, 35u8, 0u8, 0u8,],
2683 ),);
2684 }
2685
2686 #[test]
2687 fn test_column_index_with_null_pages() {
2688 let page_writer = get_test_page_writer();
2690 let props = Default::default();
2691 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2692 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2693
2694 let r = writer.close().unwrap();
2695 assert!(r.column_index.is_some());
2696 let col_idx = r.column_index.unwrap();
2697 assert!(col_idx.null_pages[0]);
2699 assert_eq!(col_idx.min_values[0].len(), 0);
2701 assert_eq!(col_idx.max_values[0].len(), 0);
2702 assert!(col_idx.null_counts.is_some());
2704 assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2705 assert!(col_idx.repetition_level_histograms.is_none());
2707 assert!(col_idx.definition_level_histograms.is_some());
2709 assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2710 }
2711
2712 #[test]
2713 fn test_column_offset_index_metadata() {
2714 let page_writer = get_test_page_writer();
2717 let props = Default::default();
2718 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2719 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2720 writer.flush_data_pages().unwrap();
2722 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2724
2725 let r = writer.close().unwrap();
2726 let column_index = r.column_index.unwrap();
2727 let offset_index = r.offset_index.unwrap();
2728
2729 assert_eq!(8, r.rows_written);
2730
2731 assert_eq!(2, column_index.null_pages.len());
2733 assert_eq!(2, offset_index.page_locations.len());
2734 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2735 for idx in 0..2 {
2736 assert!(!column_index.null_pages[idx]);
2737 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2738 }
2739
2740 if let Some(stats) = r.metadata.statistics() {
2741 assert_eq!(stats.null_count_opt(), Some(0));
2742 assert_eq!(stats.distinct_count_opt(), None);
2743 if let Statistics::Int32(stats) = stats {
2744 assert_eq!(
2748 stats.min_bytes_opt(),
2749 Some(column_index.min_values[1].as_slice())
2750 );
2751 assert_eq!(
2752 stats.max_bytes_opt(),
2753 column_index.max_values.get(1).map(Vec::as_slice)
2754 );
2755 } else {
2756 panic!("expecting Statistics::Int32");
2757 }
2758 } else {
2759 panic!("metadata missing statistics");
2760 }
2761
2762 assert_eq!(0, offset_index.page_locations[0].first_row_index);
2764 assert_eq!(4, offset_index.page_locations[1].first_row_index);
2765 }
2766
2767 #[test]
2769 fn test_column_offset_index_metadata_truncating() {
2770 let page_writer = get_test_page_writer();
2773 let props = Default::default();
2774 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
2775
2776 let mut data = vec![FixedLenByteArray::default(); 3];
2777 data[0].set_data(Bytes::from(vec![97_u8; 200]));
2779 data[1].set_data(Bytes::from(vec![112_u8; 200]));
2781 data[2].set_data(Bytes::from(vec![98_u8; 200]));
2782
2783 writer.write_batch(&data, None, None).unwrap();
2784
2785 writer.flush_data_pages().unwrap();
2786
2787 let r = writer.close().unwrap();
2788 let column_index = r.column_index.unwrap();
2789 let offset_index = r.offset_index.unwrap();
2790
2791 assert_eq!(3, r.rows_written);
2792
2793 assert_eq!(1, column_index.null_pages.len());
2795 assert_eq!(1, offset_index.page_locations.len());
2796 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
2797 assert!(!column_index.null_pages[0]);
2798 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
2799
2800 if let Some(stats) = r.metadata.statistics() {
2801 assert_eq!(stats.null_count_opt(), Some(0));
2802 assert_eq!(stats.distinct_count_opt(), None);
2803 if let Statistics::FixedLenByteArray(stats) = stats {
2804 let column_index_min_value = &column_index.min_values[0];
2805 let column_index_max_value = &column_index.max_values[0];
2806
2807 assert_ne!(
2809 stats.min_bytes_opt(),
2810 Some(column_index_min_value.as_slice())
2811 );
2812 assert_ne!(
2813 stats.max_bytes_opt(),
2814 Some(column_index_max_value.as_slice())
2815 );
2816
2817 assert_eq!(
2818 column_index_min_value.len(),
2819 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
2820 );
2821 assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
2822 assert_eq!(
2823 column_index_max_value.len(),
2824 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
2825 );
2826
2827 assert_eq!(
2829 *column_index_max_value.last().unwrap(),
2830 *column_index_max_value.first().unwrap() + 1
2831 );
2832 } else {
2833 panic!("expecting Statistics::FixedLenByteArray");
2834 }
2835 } else {
2836 panic!("metadata missing statistics");
2837 }
2838 }
2839
2840 #[test]
2841 fn test_column_offset_index_truncating_spec_example() {
2842 let page_writer = get_test_page_writer();
2845
2846 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
2848 let props = Arc::new(builder.build());
2849 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
2850
2851 let mut data = vec![FixedLenByteArray::default(); 1];
2852 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
2854
2855 writer.write_batch(&data, None, None).unwrap();
2856
2857 writer.flush_data_pages().unwrap();
2858
2859 let r = writer.close().unwrap();
2860 let column_index = r.column_index.unwrap();
2861 let offset_index = r.offset_index.unwrap();
2862
2863 assert_eq!(1, r.rows_written);
2864
2865 assert_eq!(1, column_index.null_pages.len());
2867 assert_eq!(1, offset_index.page_locations.len());
2868 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
2869 assert!(!column_index.null_pages[0]);
2870 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
2871
2872 if let Some(stats) = r.metadata.statistics() {
2873 assert_eq!(stats.null_count_opt(), Some(0));
2874 assert_eq!(stats.distinct_count_opt(), None);
2875 if let Statistics::FixedLenByteArray(_stats) = stats {
2876 let column_index_min_value = &column_index.min_values[0];
2877 let column_index_max_value = &column_index.max_values[0];
2878
2879 assert_eq!(column_index_min_value.len(), 1);
2880 assert_eq!(column_index_max_value.len(), 1);
2881
2882 assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
2883 assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
2884
2885 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
2886 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
2887 } else {
2888 panic!("expecting Statistics::FixedLenByteArray");
2889 }
2890 } else {
2891 panic!("metadata missing statistics");
2892 }
2893 }
2894
2895 #[test]
2896 fn test_float16_min_max_no_truncation() {
2897 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
2899 let props = Arc::new(builder.build());
2900 let page_writer = get_test_page_writer();
2901 let mut writer = get_test_float16_column_writer(page_writer, props);
2902
2903 let expected_value = f16::PI.to_le_bytes().to_vec();
2904 let data = vec![ByteArray::from(expected_value.clone()).into()];
2905 writer.write_batch(&data, None, None).unwrap();
2906 writer.flush_data_pages().unwrap();
2907
2908 let r = writer.close().unwrap();
2909
2910 let column_index = r.column_index.unwrap();
2913 let column_index_min_bytes = column_index.min_values[0].as_slice();
2914 let column_index_max_bytes = column_index.max_values[0].as_slice();
2915 assert_eq!(expected_value, column_index_min_bytes);
2916 assert_eq!(expected_value, column_index_max_bytes);
2917
2918 let stats = r.metadata.statistics().unwrap();
2920 if let Statistics::FixedLenByteArray(stats) = stats {
2921 let stats_min_bytes = stats.min_bytes_opt().unwrap();
2922 let stats_max_bytes = stats.max_bytes_opt().unwrap();
2923 assert_eq!(expected_value, stats_min_bytes);
2924 assert_eq!(expected_value, stats_max_bytes);
2925 } else {
2926 panic!("expecting Statistics::FixedLenByteArray");
2927 }
2928 }
2929
2930 #[test]
2931 fn test_decimal_min_max_no_truncation() {
2932 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
2934 let props = Arc::new(builder.build());
2935 let page_writer = get_test_page_writer();
2936 let mut writer =
2937 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
2938
2939 let expected_value = vec![
2940 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
2941 231u8, 90u8, 0u8, 0u8,
2942 ];
2943 let data = vec![ByteArray::from(expected_value.clone()).into()];
2944 writer.write_batch(&data, None, None).unwrap();
2945 writer.flush_data_pages().unwrap();
2946
2947 let r = writer.close().unwrap();
2948
2949 let column_index = r.column_index.unwrap();
2952 let column_index_min_bytes = column_index.min_values[0].as_slice();
2953 let column_index_max_bytes = column_index.max_values[0].as_slice();
2954 assert_eq!(expected_value, column_index_min_bytes);
2955 assert_eq!(expected_value, column_index_max_bytes);
2956
2957 let stats = r.metadata.statistics().unwrap();
2959 if let Statistics::FixedLenByteArray(stats) = stats {
2960 let stats_min_bytes = stats.min_bytes_opt().unwrap();
2961 let stats_max_bytes = stats.max_bytes_opt().unwrap();
2962 assert_eq!(expected_value, stats_min_bytes);
2963 assert_eq!(expected_value, stats_max_bytes);
2964 } else {
2965 panic!("expecting Statistics::FixedLenByteArray");
2966 }
2967 }
2968
2969 #[test]
2970 fn test_statistics_truncating_byte_array() {
2971 let page_writer = get_test_page_writer();
2972
2973 const TEST_TRUNCATE_LENGTH: usize = 1;
2974
2975 let builder =
2977 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
2978 let props = Arc::new(builder.build());
2979 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2980
2981 let mut data = vec![ByteArray::default(); 1];
2982 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
2984
2985 writer.write_batch(&data, None, None).unwrap();
2986
2987 writer.flush_data_pages().unwrap();
2988
2989 let r = writer.close().unwrap();
2990
2991 assert_eq!(1, r.rows_written);
2992
2993 let stats = r.metadata.statistics().expect("statistics");
2994 assert_eq!(stats.null_count_opt(), Some(0));
2995 assert_eq!(stats.distinct_count_opt(), None);
2996 if let Statistics::ByteArray(_stats) = stats {
2997 let min_value = _stats.min_opt().unwrap();
2998 let max_value = _stats.max_opt().unwrap();
2999
3000 assert!(!_stats.min_is_exact());
3001 assert!(!_stats.max_is_exact());
3002
3003 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3004 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3005
3006 assert_eq!("B".as_bytes(), min_value.as_bytes());
3007 assert_eq!("C".as_bytes(), max_value.as_bytes());
3008 } else {
3009 panic!("expecting Statistics::ByteArray");
3010 }
3011 }
3012
3013 #[test]
3014 fn test_statistics_truncating_fixed_len_byte_array() {
3015 let page_writer = get_test_page_writer();
3016
3017 const TEST_TRUNCATE_LENGTH: usize = 1;
3018
3019 let builder =
3021 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3022 let props = Arc::new(builder.build());
3023 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3024
3025 let mut data = vec![FixedLenByteArray::default(); 1];
3026
3027 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3028 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3029
3030 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3032 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3033
3034 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3036
3037 writer.write_batch(&data, None, None).unwrap();
3038
3039 writer.flush_data_pages().unwrap();
3040
3041 let r = writer.close().unwrap();
3042
3043 assert_eq!(1, r.rows_written);
3044
3045 let stats = r.metadata.statistics().expect("statistics");
3046 assert_eq!(stats.null_count_opt(), Some(0));
3047 assert_eq!(stats.distinct_count_opt(), None);
3048 if let Statistics::FixedLenByteArray(_stats) = stats {
3049 let min_value = _stats.min_opt().unwrap();
3050 let max_value = _stats.max_opt().unwrap();
3051
3052 assert!(!_stats.min_is_exact());
3053 assert!(!_stats.max_is_exact());
3054
3055 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3056 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3057
3058 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3059 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3060
3061 let reconstructed_min = i128::from_be_bytes([
3062 min_value.as_bytes()[0],
3063 0,
3064 0,
3065 0,
3066 0,
3067 0,
3068 0,
3069 0,
3070 0,
3071 0,
3072 0,
3073 0,
3074 0,
3075 0,
3076 0,
3077 0,
3078 ]);
3079
3080 let reconstructed_max = i128::from_be_bytes([
3081 max_value.as_bytes()[0],
3082 0,
3083 0,
3084 0,
3085 0,
3086 0,
3087 0,
3088 0,
3089 0,
3090 0,
3091 0,
3092 0,
3093 0,
3094 0,
3095 0,
3096 0,
3097 ]);
3098
3099 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3101 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3102 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3103 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3104 } else {
3105 panic!("expecting Statistics::FixedLenByteArray");
3106 }
3107 }
3108
3109 #[test]
3110 fn test_send() {
3111 fn test<T: Send>() {}
3112 test::<ColumnWriterImpl<Int32Type>>();
3113 }
3114
3115 #[test]
3116 fn test_increment() {
3117 let v = increment(vec![0, 0, 0]).unwrap();
3118 assert_eq!(&v, &[0, 0, 1]);
3119
3120 let v = increment(vec![0, 255, 255]).unwrap();
3122 assert_eq!(&v, &[1, 0, 0]);
3123
3124 let v = increment(vec![255, 255, 255]);
3126 assert!(v.is_none());
3127 }
3128
3129 #[test]
3130 fn test_increment_utf8() {
3131 let v = increment_utf8("hello".as_bytes().to_vec()).unwrap();
3133 assert_eq!(&v, "hellp".as_bytes());
3134
3135 let mut greater = ByteArray::new();
3137 greater.set_data(Bytes::from(v));
3138 let mut original = ByteArray::new();
3139 original.set_data(Bytes::from("hello".as_bytes().to_vec()));
3140 assert!(greater > original);
3141
3142 let s = "❤️🧡💛💚💙💜";
3144 let v = increment_utf8(s.as_bytes().to_vec()).unwrap();
3145
3146 if let Ok(new) = String::from_utf8(v) {
3147 assert_ne!(&new, s);
3148 assert_eq!(new, "❤️🧡💛💚💙💝");
3149 assert!(new.as_bytes().last().unwrap() > s.as_bytes().last().unwrap());
3150 } else {
3151 panic!("Expected incremented UTF8 string to also be valid.")
3152 }
3153
3154 let s = char::MAX.to_string();
3156 assert_eq!(s.len(), 4);
3157 let v = increment_utf8(s.as_bytes().to_vec());
3158 assert!(v.is_none());
3159
3160 let s = "a\u{10ffff}";
3162 let v = increment_utf8(s.as_bytes().to_vec());
3163 assert_eq!(&v.unwrap(), "b\u{10ffff}".as_bytes());
3164 }
3165
3166 #[test]
3167 fn test_truncate_utf8() {
3168 let data = "❤️🧡💛💚💙💜";
3170 let r = truncate_utf8(data, data.as_bytes().len()).unwrap();
3171 assert_eq!(r.len(), data.as_bytes().len());
3172 assert_eq!(&r, data.as_bytes());
3173 println!("len is {}", data.len());
3174
3175 let r = truncate_utf8(data, 13).unwrap();
3177 assert_eq!(r.len(), 10);
3178 assert_eq!(&r, "❤️🧡".as_bytes());
3179
3180 let r = truncate_utf8("\u{0836}", 1);
3182 assert!(r.is_none());
3183 }
3184
3185 #[test]
3186 fn test_increment_max_binary_chars() {
3187 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3188 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3189
3190 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3191 assert!(incremented.is_none())
3192 }
3193
3194 #[test]
3195 fn test_no_column_index_when_stats_disabled() {
3196 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3200 let props = Arc::new(
3201 WriterProperties::builder()
3202 .set_statistics_enabled(EnabledStatistics::None)
3203 .build(),
3204 );
3205 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3206 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3207
3208 let data = Vec::new();
3209 let def_levels = vec![0; 10];
3210 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3211 writer.flush_data_pages().unwrap();
3212
3213 let column_close_result = writer.close().unwrap();
3214 assert!(column_close_result.offset_index.is_some());
3215 assert!(column_close_result.column_index.is_none());
3216 }
3217
3218 #[test]
3219 fn test_boundary_order() -> Result<()> {
3220 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3221 let column_close_result = write_multiple_pages::<Int32Type>(
3223 &descr,
3224 &[
3225 &[Some(-10), Some(10)],
3226 &[Some(-5), Some(11)],
3227 &[None],
3228 &[Some(-5), Some(11)],
3229 ],
3230 )?;
3231 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3232 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3233
3234 let column_close_result = write_multiple_pages::<Int32Type>(
3236 &descr,
3237 &[
3238 &[Some(10), Some(11)],
3239 &[Some(5), Some(11)],
3240 &[None],
3241 &[Some(-5), Some(0)],
3242 ],
3243 )?;
3244 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3245 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3246
3247 let column_close_result = write_multiple_pages::<Int32Type>(
3249 &descr,
3250 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3251 )?;
3252 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3253 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3254
3255 let column_close_result =
3257 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3258 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3259 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3260
3261 let column_close_result =
3263 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3264 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3265 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3266
3267 let column_close_result =
3269 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3270 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3271 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3272
3273 let column_close_result = write_multiple_pages::<Int32Type>(
3275 &descr,
3276 &[
3277 &[Some(10), Some(11)],
3278 &[Some(11), Some(16)],
3279 &[None],
3280 &[Some(-5), Some(0)],
3281 ],
3282 )?;
3283 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3284 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3285
3286 let column_close_result = write_multiple_pages::<Int32Type>(
3288 &descr,
3289 &[
3290 &[Some(1), Some(9)],
3291 &[Some(2), Some(8)],
3292 &[None],
3293 &[Some(3), Some(7)],
3294 ],
3295 )?;
3296 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3297 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3298
3299 Ok(())
3300 }
3301
3302 #[test]
3303 fn test_boundary_order_logical_type() -> Result<()> {
3304 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3307 let fba_descr = {
3308 let tpe = SchemaType::primitive_type_builder(
3309 "col",
3310 FixedLenByteArrayType::get_physical_type(),
3311 )
3312 .with_length(2)
3313 .build()?;
3314 Arc::new(ColumnDescriptor::new(
3315 Arc::new(tpe),
3316 1,
3317 0,
3318 ColumnPath::from("col"),
3319 ))
3320 };
3321
3322 let values: &[&[Option<FixedLenByteArray>]] = &[
3323 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3324 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3325 &[Some(FixedLenByteArray::from(ByteArray::from(
3326 f16::NEG_ZERO,
3327 )))],
3328 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3329 ];
3330
3331 let column_close_result =
3333 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3334 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3335 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3336
3337 let column_close_result =
3339 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3340 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3341 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3342
3343 Ok(())
3344 }
3345
3346 #[test]
3347 fn test_interval_stats_should_not_have_min_max() {
3348 let input = [
3349 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3350 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3351 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3352 ]
3353 .into_iter()
3354 .map(|s| ByteArray::from(s).into())
3355 .collect::<Vec<_>>();
3356
3357 let page_writer = get_test_page_writer();
3358 let mut writer = get_test_interval_column_writer(page_writer);
3359 writer.write_batch(&input, None, None).unwrap();
3360
3361 let metadata = writer.close().unwrap().metadata;
3362 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3363 stats.clone()
3364 } else {
3365 panic!("metadata missing statistics");
3366 };
3367 assert!(stats.min_bytes_opt().is_none());
3368 assert!(stats.max_bytes_opt().is_none());
3369 }
3370
3371 fn write_multiple_pages<T: DataType>(
3372 column_descr: &Arc<ColumnDescriptor>,
3373 pages: &[&[Option<T::T>]],
3374 ) -> Result<ColumnCloseResult> {
3375 let column_writer = get_column_writer(
3376 column_descr.clone(),
3377 Default::default(),
3378 get_test_page_writer(),
3379 );
3380 let mut writer = get_typed_column_writer::<T>(column_writer);
3381
3382 for &page in pages {
3383 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3384 let def_levels = page
3385 .iter()
3386 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3387 .collect::<Vec<_>>();
3388 writer.write_batch(&values, Some(&def_levels), None)?;
3389 writer.flush_data_pages()?;
3390 }
3391
3392 writer.close()
3393 }
3394
3395 fn column_roundtrip_random<T: DataType>(
3399 props: WriterProperties,
3400 max_size: usize,
3401 min_value: T::T,
3402 max_value: T::T,
3403 max_def_level: i16,
3404 max_rep_level: i16,
3405 ) where
3406 T::T: PartialOrd + SampleUniform + Copy,
3407 {
3408 let mut num_values: usize = 0;
3409
3410 let mut buf: Vec<i16> = Vec::new();
3411 let def_levels = if max_def_level > 0 {
3412 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3413 for &dl in &buf[..] {
3414 if dl == max_def_level {
3415 num_values += 1;
3416 }
3417 }
3418 Some(&buf[..])
3419 } else {
3420 num_values = max_size;
3421 None
3422 };
3423
3424 let mut buf: Vec<i16> = Vec::new();
3425 let rep_levels = if max_rep_level > 0 {
3426 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3427 buf[0] = 0; Some(&buf[..])
3429 } else {
3430 None
3431 };
3432
3433 let mut values: Vec<T::T> = Vec::new();
3434 random_numbers_range(num_values, min_value, max_value, &mut values);
3435
3436 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3437 }
3438
3439 fn column_roundtrip<T: DataType>(
3441 props: WriterProperties,
3442 values: &[T::T],
3443 def_levels: Option<&[i16]>,
3444 rep_levels: Option<&[i16]>,
3445 ) {
3446 let mut file = tempfile::tempfile().unwrap();
3447 let mut write = TrackedWrite::new(&mut file);
3448 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3449
3450 let max_def_level = match def_levels {
3451 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3452 None => 0i16,
3453 };
3454
3455 let max_rep_level = match rep_levels {
3456 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3457 None => 0i16,
3458 };
3459
3460 let mut max_batch_size = values.len();
3461 if let Some(levels) = def_levels {
3462 max_batch_size = max_batch_size.max(levels.len());
3463 }
3464 if let Some(levels) = rep_levels {
3465 max_batch_size = max_batch_size.max(levels.len());
3466 }
3467
3468 let mut writer =
3469 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3470
3471 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3472 assert_eq!(values_written, values.len());
3473 let result = writer.close().unwrap();
3474
3475 drop(write);
3476
3477 let props = ReaderProperties::builder()
3478 .set_backward_compatible_lz4(false)
3479 .build();
3480 let page_reader = Box::new(
3481 SerializedPageReader::new_with_properties(
3482 Arc::new(file),
3483 &result.metadata,
3484 result.rows_written as usize,
3485 None,
3486 Arc::new(props),
3487 )
3488 .unwrap(),
3489 );
3490 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3491
3492 let mut actual_values = Vec::with_capacity(max_batch_size);
3493 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3494 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3495
3496 let (_, values_read, levels_read) = reader
3497 .read_records(
3498 max_batch_size,
3499 actual_def_levels.as_mut(),
3500 actual_rep_levels.as_mut(),
3501 &mut actual_values,
3502 )
3503 .unwrap();
3504
3505 assert_eq!(&actual_values[..values_read], values);
3508 match actual_def_levels {
3509 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3510 None => assert_eq!(None, def_levels),
3511 }
3512 match actual_rep_levels {
3513 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3514 None => assert_eq!(None, rep_levels),
3515 }
3516
3517 if let Some(levels) = actual_rep_levels {
3520 let mut actual_rows_written = 0;
3521 for l in levels {
3522 if l == 0 {
3523 actual_rows_written += 1;
3524 }
3525 }
3526 assert_eq!(actual_rows_written, result.rows_written);
3527 } else if actual_def_levels.is_some() {
3528 assert_eq!(levels_read as u64, result.rows_written);
3529 } else {
3530 assert_eq!(values_read as u64, result.rows_written);
3531 }
3532 }
3533
3534 fn column_write_and_get_metadata<T: DataType>(
3537 props: WriterProperties,
3538 values: &[T::T],
3539 ) -> ColumnChunkMetaData {
3540 let page_writer = get_test_page_writer();
3541 let props = Arc::new(props);
3542 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3543 writer.write_batch(values, None, None).unwrap();
3544 writer.close().unwrap().metadata
3545 }
3546
3547 fn check_encoding_write_support<T: DataType>(
3551 version: WriterVersion,
3552 dict_enabled: bool,
3553 data: &[T::T],
3554 dictionary_page_offset: Option<i64>,
3555 encodings: &[Encoding],
3556 ) {
3557 let props = WriterProperties::builder()
3558 .set_writer_version(version)
3559 .set_dictionary_enabled(dict_enabled)
3560 .build();
3561 let meta = column_write_and_get_metadata::<T>(props, data);
3562 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
3563 assert_eq!(meta.encodings(), &encodings);
3564 }
3565
3566 fn get_test_column_writer<'a, T: DataType>(
3568 page_writer: Box<dyn PageWriter + 'a>,
3569 max_def_level: i16,
3570 max_rep_level: i16,
3571 props: WriterPropertiesPtr,
3572 ) -> ColumnWriterImpl<'a, T> {
3573 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
3574 let column_writer = get_column_writer(descr, props, page_writer);
3575 get_typed_column_writer::<T>(column_writer)
3576 }
3577
3578 fn get_test_column_reader<T: DataType>(
3580 page_reader: Box<dyn PageReader>,
3581 max_def_level: i16,
3582 max_rep_level: i16,
3583 ) -> ColumnReaderImpl<T> {
3584 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
3585 let column_reader = get_column_reader(descr, page_reader);
3586 get_typed_column_reader::<T>(column_reader)
3587 }
3588
3589 fn get_test_column_descr<T: DataType>(
3591 max_def_level: i16,
3592 max_rep_level: i16,
3593 ) -> ColumnDescriptor {
3594 let path = ColumnPath::from("col");
3595 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
3596 .with_length(1)
3599 .build()
3600 .unwrap();
3601 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3602 }
3603
3604 fn get_test_page_writer() -> Box<dyn PageWriter> {
3606 Box::new(TestPageWriter {})
3607 }
3608
3609 struct TestPageWriter {}
3610
3611 impl PageWriter for TestPageWriter {
3612 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
3613 let mut res = PageWriteSpec::new();
3614 res.page_type = page.page_type();
3615 res.uncompressed_size = page.uncompressed_size();
3616 res.compressed_size = page.compressed_size();
3617 res.num_values = page.num_values();
3618 res.offset = 0;
3619 res.bytes_written = page.data().len() as u64;
3620 Ok(res)
3621 }
3622
3623 fn close(&mut self) -> Result<()> {
3624 Ok(())
3625 }
3626 }
3627
3628 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
3630 let page_writer = get_test_page_writer();
3631 let props = Default::default();
3632 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3633 writer.write_batch(values, None, None).unwrap();
3634
3635 let metadata = writer.close().unwrap().metadata;
3636 if let Some(stats) = metadata.statistics() {
3637 stats.clone()
3638 } else {
3639 panic!("metadata missing statistics");
3640 }
3641 }
3642
3643 fn get_test_decimals_column_writer<T: DataType>(
3645 page_writer: Box<dyn PageWriter>,
3646 max_def_level: i16,
3647 max_rep_level: i16,
3648 props: WriterPropertiesPtr,
3649 ) -> ColumnWriterImpl<'static, T> {
3650 let descr = Arc::new(get_test_decimals_column_descr::<T>(
3651 max_def_level,
3652 max_rep_level,
3653 ));
3654 let column_writer = get_column_writer(descr, props, page_writer);
3655 get_typed_column_writer::<T>(column_writer)
3656 }
3657
3658 fn get_test_decimals_column_descr<T: DataType>(
3660 max_def_level: i16,
3661 max_rep_level: i16,
3662 ) -> ColumnDescriptor {
3663 let path = ColumnPath::from("col");
3664 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
3665 .with_length(16)
3666 .with_logical_type(Some(LogicalType::Decimal {
3667 scale: 2,
3668 precision: 3,
3669 }))
3670 .with_scale(2)
3671 .with_precision(3)
3672 .build()
3673 .unwrap();
3674 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3675 }
3676
3677 fn float16_statistics_roundtrip(
3678 values: &[FixedLenByteArray],
3679 ) -> ValueStatistics<FixedLenByteArray> {
3680 let page_writer = get_test_page_writer();
3681 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
3682 writer.write_batch(values, None, None).unwrap();
3683
3684 let metadata = writer.close().unwrap().metadata;
3685 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3686 stats.clone()
3687 } else {
3688 panic!("metadata missing statistics");
3689 }
3690 }
3691
3692 fn get_test_float16_column_writer(
3693 page_writer: Box<dyn PageWriter>,
3694 props: WriterPropertiesPtr,
3695 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
3696 let descr = Arc::new(get_test_float16_column_descr(0, 0));
3697 let column_writer = get_column_writer(descr, props, page_writer);
3698 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
3699 }
3700
3701 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
3702 let path = ColumnPath::from("col");
3703 let tpe =
3704 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
3705 .with_length(2)
3706 .with_logical_type(Some(LogicalType::Float16))
3707 .build()
3708 .unwrap();
3709 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3710 }
3711
3712 fn get_test_interval_column_writer(
3713 page_writer: Box<dyn PageWriter>,
3714 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
3715 let descr = Arc::new(get_test_interval_column_descr());
3716 let column_writer = get_column_writer(descr, Default::default(), page_writer);
3717 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
3718 }
3719
3720 fn get_test_interval_column_descr() -> ColumnDescriptor {
3721 let path = ColumnPath::from("col");
3722 let tpe =
3723 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
3724 .with_length(12)
3725 .with_converted_type(ConvertedType::INTERVAL)
3726 .build()
3727 .unwrap();
3728 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
3729 }
3730
3731 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
3733 page_writer: Box<dyn PageWriter + 'a>,
3734 max_def_level: i16,
3735 max_rep_level: i16,
3736 props: WriterPropertiesPtr,
3737 ) -> ColumnWriterImpl<'a, T> {
3738 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
3739 max_def_level,
3740 max_rep_level,
3741 ));
3742 let column_writer = get_column_writer(descr, props, page_writer);
3743 get_typed_column_writer::<T>(column_writer)
3744 }
3745
3746 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
3748 max_def_level: i16,
3749 max_rep_level: i16,
3750 ) -> ColumnDescriptor {
3751 let path = ColumnPath::from("col");
3752 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
3753 .with_converted_type(ConvertedType::UINT_32)
3754 .build()
3755 .unwrap();
3756 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3757 }
3758}