parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35use crate::errors::{ParquetError, Result};
36use crate::file::metadata::{ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder};
37use crate::file::properties::EnabledStatistics;
38use crate::file::statistics::{Statistics, ValueStatistics};
39use crate::file::{
40    metadata::ColumnChunkMetaData,
41    properties::{WriterProperties, WriterPropertiesPtr, WriterVersion},
42};
43use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
44
45pub(crate) mod encoder;
46
47macro_rules! downcast_writer {
48    ($e:expr, $i:ident, $b:expr) => {
49        match $e {
50            Self::BoolColumnWriter($i) => $b,
51            Self::Int32ColumnWriter($i) => $b,
52            Self::Int64ColumnWriter($i) => $b,
53            Self::Int96ColumnWriter($i) => $b,
54            Self::FloatColumnWriter($i) => $b,
55            Self::DoubleColumnWriter($i) => $b,
56            Self::ByteArrayColumnWriter($i) => $b,
57            Self::FixedLenByteArrayColumnWriter($i) => $b,
58        }
59    };
60}
61
62/// Column writer for a Parquet type.
63pub enum ColumnWriter<'a> {
64    /// Column writer for boolean type
65    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
66    /// Column writer for int32 type
67    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
68    /// Column writer for int64 type
69    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
70    /// Column writer for int96 (timestamp) type
71    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
72    /// Column writer for float type
73    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
74    /// Column writer for double type
75    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
76    /// Column writer for byte array type
77    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
78    /// Column writer for fixed length byte array type
79    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
80}
81
82impl ColumnWriter<'_> {
83    /// Returns the estimated total memory usage
84    #[cfg(feature = "arrow")]
85    pub(crate) fn memory_size(&self) -> usize {
86        downcast_writer!(self, typed, typed.memory_size())
87    }
88
89    /// Returns the estimated total encoded bytes for this column writer
90    #[cfg(feature = "arrow")]
91    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
92        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
93    }
94
95    /// Close this [`ColumnWriter`]
96    pub fn close(self) -> Result<ColumnCloseResult> {
97        downcast_writer!(self, typed, typed.close())
98    }
99}
100
101#[deprecated(
102    since = "54.0.0",
103    note = "Seems like a stray and nobody knows what's it for. Will be removed in the next release."
104)]
105#[allow(missing_docs)]
106pub enum Level {
107    Page,
108    Column,
109}
110
111/// Gets a specific column writer corresponding to column descriptor `descr`.
112pub fn get_column_writer<'a>(
113    descr: ColumnDescPtr,
114    props: WriterPropertiesPtr,
115    page_writer: Box<dyn PageWriter + 'a>,
116) -> ColumnWriter<'a> {
117    match descr.physical_type() {
118        Type::BOOLEAN => {
119            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
120        }
121        Type::INT32 => {
122            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
123        }
124        Type::INT64 => {
125            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
126        }
127        Type::INT96 => {
128            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
129        }
130        Type::FLOAT => {
131            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
132        }
133        Type::DOUBLE => {
134            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
135        }
136        Type::BYTE_ARRAY => {
137            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
138        }
139        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
140            ColumnWriterImpl::new(descr, props, page_writer),
141        ),
142    }
143}
144
145/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
146/// non-generic type to a generic column writer type `ColumnWriterImpl`.
147///
148/// Panics if actual enum value for `col_writer` does not match the type `T`.
149pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
150    T::get_column_writer(col_writer).unwrap_or_else(|| {
151        panic!(
152            "Failed to convert column writer into a typed column writer for `{}` type",
153            T::get_physical_type()
154        )
155    })
156}
157
158/// Similar to `get_typed_column_writer` but returns a reference.
159pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
160    col_writer: &'b ColumnWriter<'a>,
161) -> &'b ColumnWriterImpl<'a, T> {
162    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
163        panic!(
164            "Failed to convert column writer into a typed column writer for `{}` type",
165            T::get_physical_type()
166        )
167    })
168}
169
170/// Similar to `get_typed_column_writer` but returns a reference.
171pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
172    col_writer: &'a mut ColumnWriter<'b>,
173) -> &'a mut ColumnWriterImpl<'b, T> {
174    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
175        panic!(
176            "Failed to convert column writer into a typed column writer for `{}` type",
177            T::get_physical_type()
178        )
179    })
180}
181
182/// Metadata returned by [`GenericColumnWriter::close`]
183#[derive(Debug, Clone)]
184pub struct ColumnCloseResult {
185    /// The total number of bytes written
186    pub bytes_written: u64,
187    /// The total number of rows written
188    pub rows_written: u64,
189    /// Metadata for this column chunk
190    pub metadata: ColumnChunkMetaData,
191    /// Optional bloom filter for this column
192    pub bloom_filter: Option<Sbbf>,
193    /// Optional column index, for filtering
194    pub column_index: Option<ColumnIndex>,
195    /// Optional offset index, identifying page locations
196    pub offset_index: Option<OffsetIndex>,
197}
198
199// Metrics per page
200#[derive(Default)]
201struct PageMetrics {
202    num_buffered_values: u32,
203    num_buffered_rows: u32,
204    num_page_nulls: u64,
205    repetition_level_histogram: Option<LevelHistogram>,
206    definition_level_histogram: Option<LevelHistogram>,
207}
208
209impl PageMetrics {
210    fn new() -> Self {
211        Default::default()
212    }
213
214    /// Initialize the repetition level histogram
215    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
216        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
217        self
218    }
219
220    /// Initialize the definition level histogram
221    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
222        self.definition_level_histogram = LevelHistogram::try_new(max_level);
223        self
224    }
225
226    /// Resets the state of this `PageMetrics` to the initial state.
227    /// If histograms have been initialized their contents will be reset to zero.
228    fn new_page(&mut self) {
229        self.num_buffered_values = 0;
230        self.num_buffered_rows = 0;
231        self.num_page_nulls = 0;
232        self.repetition_level_histogram
233            .as_mut()
234            .map(LevelHistogram::reset);
235        self.definition_level_histogram
236            .as_mut()
237            .map(LevelHistogram::reset);
238    }
239
240    /// Updates histogram values using provided repetition levels
241    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
242        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
243            rep_hist.update_from_levels(levels);
244        }
245    }
246
247    /// Updates histogram values using provided definition levels
248    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
249        if let Some(ref mut def_hist) = self.definition_level_histogram {
250            def_hist.update_from_levels(levels);
251        }
252    }
253}
254
255// Metrics per column writer
256#[derive(Default)]
257struct ColumnMetrics<T: Default> {
258    total_bytes_written: u64,
259    total_rows_written: u64,
260    total_uncompressed_size: u64,
261    total_compressed_size: u64,
262    total_num_values: u64,
263    dictionary_page_offset: Option<u64>,
264    data_page_offset: Option<u64>,
265    min_column_value: Option<T>,
266    max_column_value: Option<T>,
267    num_column_nulls: u64,
268    column_distinct_count: Option<u64>,
269    variable_length_bytes: Option<i64>,
270    repetition_level_histogram: Option<LevelHistogram>,
271    definition_level_histogram: Option<LevelHistogram>,
272}
273
274impl<T: Default> ColumnMetrics<T> {
275    fn new() -> Self {
276        Default::default()
277    }
278
279    /// Initialize the repetition level histogram
280    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
281        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
282        self
283    }
284
285    /// Initialize the definition level histogram
286    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
287        self.definition_level_histogram = LevelHistogram::try_new(max_level);
288        self
289    }
290
291    /// Sum `page_histogram` into `chunk_histogram`
292    fn update_histogram(
293        chunk_histogram: &mut Option<LevelHistogram>,
294        page_histogram: &Option<LevelHistogram>,
295    ) {
296        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
297            chunk_hist.add(page_hist);
298        }
299    }
300
301    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
302    /// page histograms are not initialized.
303    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
304        ColumnMetrics::<T>::update_histogram(
305            &mut self.definition_level_histogram,
306            &page_metrics.definition_level_histogram,
307        );
308        ColumnMetrics::<T>::update_histogram(
309            &mut self.repetition_level_histogram,
310            &page_metrics.repetition_level_histogram,
311        );
312    }
313
314    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
315    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
316        if let Some(var_bytes) = variable_length_bytes {
317            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
318        }
319    }
320}
321
322/// Typed column writer for a primitive column.
323pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
324
325/// Generic column writer for a primitive column.
326pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
327    // Column writer properties
328    descr: ColumnDescPtr,
329    props: WriterPropertiesPtr,
330    statistics_enabled: EnabledStatistics,
331
332    page_writer: Box<dyn PageWriter + 'a>,
333    codec: Compression,
334    compressor: Option<Box<dyn Codec>>,
335    encoder: E,
336
337    page_metrics: PageMetrics,
338    // Metrics per column writer
339    column_metrics: ColumnMetrics<E::T>,
340
341    /// The order of encodings within the generated metadata does not impact its meaning,
342    /// but we use a BTreeSet so that the output is deterministic
343    encodings: BTreeSet<Encoding>,
344    // Reused buffers
345    def_levels_sink: Vec<i16>,
346    rep_levels_sink: Vec<i16>,
347    data_pages: VecDeque<CompressedPage>,
348    // column index and offset index
349    column_index_builder: ColumnIndexBuilder,
350    offset_index_builder: OffsetIndexBuilder,
351
352    // Below fields used to incrementally check boundary order across data pages.
353    // We assume they are ascending/descending until proven wrong.
354    data_page_boundary_ascending: bool,
355    data_page_boundary_descending: bool,
356    /// (min, max)
357    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
358}
359
360impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
361    /// Returns a new instance of [`GenericColumnWriter`].
362    pub fn new(
363        descr: ColumnDescPtr,
364        props: WriterPropertiesPtr,
365        page_writer: Box<dyn PageWriter + 'a>,
366    ) -> Self {
367        let codec = props.compression(descr.path());
368        let codec_options = CodecOptionsBuilder::default().build();
369        let compressor = create_codec(codec, &codec_options).unwrap();
370        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
371
372        let statistics_enabled = props.statistics_enabled(descr.path());
373
374        let mut encodings = BTreeSet::new();
375        // Used for level information
376        encodings.insert(Encoding::RLE);
377
378        let mut page_metrics = PageMetrics::new();
379        let mut column_metrics = ColumnMetrics::<E::T>::new();
380
381        // Initialize level histograms if collecting page or chunk statistics
382        if statistics_enabled != EnabledStatistics::None {
383            page_metrics = page_metrics
384                .with_repetition_level_histogram(descr.max_rep_level())
385                .with_definition_level_histogram(descr.max_def_level());
386            column_metrics = column_metrics
387                .with_repetition_level_histogram(descr.max_rep_level())
388                .with_definition_level_histogram(descr.max_def_level())
389        }
390
391        // Disable column_index_builder if not collecting page statistics.
392        let mut column_index_builder = ColumnIndexBuilder::new();
393        if statistics_enabled != EnabledStatistics::Page {
394            column_index_builder.to_invalid()
395        }
396
397        Self {
398            descr,
399            props,
400            statistics_enabled,
401            page_writer,
402            codec,
403            compressor,
404            encoder,
405            def_levels_sink: vec![],
406            rep_levels_sink: vec![],
407            data_pages: VecDeque::new(),
408            page_metrics,
409            column_metrics,
410            column_index_builder,
411            offset_index_builder: OffsetIndexBuilder::new(),
412            encodings,
413            data_page_boundary_ascending: true,
414            data_page_boundary_descending: true,
415            last_non_null_data_page_min_max: None,
416        }
417    }
418
419    #[allow(clippy::too_many_arguments)]
420    pub(crate) fn write_batch_internal(
421        &mut self,
422        values: &E::Values,
423        value_indices: Option<&[usize]>,
424        def_levels: Option<&[i16]>,
425        rep_levels: Option<&[i16]>,
426        min: Option<&E::T>,
427        max: Option<&E::T>,
428        distinct_count: Option<u64>,
429    ) -> Result<usize> {
430        // Check if number of definition levels is the same as number of repetition levels.
431        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
432            if def.len() != rep.len() {
433                return Err(general_err!(
434                    "Inconsistent length of definition and repetition levels: {} != {}",
435                    def.len(),
436                    rep.len()
437                ));
438            }
439        }
440
441        // We check for DataPage limits only after we have inserted the values. If a user
442        // writes a large number of values, the DataPage size can be well above the limit.
443        //
444        // The purpose of this chunking is to bound this. Even if a user writes large
445        // number of values, the chunking will ensure that we add data page at a
446        // reasonable pagesize limit.
447
448        // TODO: find out why we don't account for size of levels when we estimate page
449        // size.
450
451        let num_levels = match def_levels {
452            Some(def_levels) => def_levels.len(),
453            None => values.len(),
454        };
455
456        if let Some(min) = min {
457            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
458        }
459        if let Some(max) = max {
460            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
461        }
462
463        // We can only set the distinct count if there are no other writes
464        if self.encoder.num_values() == 0 {
465            self.column_metrics.column_distinct_count = distinct_count;
466        } else {
467            self.column_metrics.column_distinct_count = None;
468        }
469
470        let mut values_offset = 0;
471        let mut levels_offset = 0;
472        let base_batch_size = self.props.write_batch_size();
473        while levels_offset < num_levels {
474            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
475
476            // Split at record boundary
477            if let Some(r) = rep_levels {
478                while end_offset < r.len() && r[end_offset] != 0 {
479                    end_offset += 1;
480                }
481            }
482
483            values_offset += self.write_mini_batch(
484                values,
485                values_offset,
486                value_indices,
487                end_offset - levels_offset,
488                def_levels.map(|lv| &lv[levels_offset..end_offset]),
489                rep_levels.map(|lv| &lv[levels_offset..end_offset]),
490            )?;
491            levels_offset = end_offset;
492        }
493
494        // Return total number of values processed.
495        Ok(values_offset)
496    }
497
498    /// Writes batch of values, definition levels and repetition levels.
499    /// Returns number of values processed (written).
500    ///
501    /// If definition and repetition levels are provided, we write fully those levels and
502    /// select how many values to write (this number will be returned), since number of
503    /// actual written values may be smaller than provided values.
504    ///
505    /// If only values are provided, then all values are written and the length of
506    /// of the values buffer is returned.
507    ///
508    /// Definition and/or repetition levels can be omitted, if values are
509    /// non-nullable and/or non-repeated.
510    pub fn write_batch(
511        &mut self,
512        values: &E::Values,
513        def_levels: Option<&[i16]>,
514        rep_levels: Option<&[i16]>,
515    ) -> Result<usize> {
516        self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
517    }
518
519    /// Writer may optionally provide pre-calculated statistics for use when computing
520    /// chunk-level statistics
521    ///
522    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
523    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
524    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
525    /// computed page statistics
526    pub fn write_batch_with_statistics(
527        &mut self,
528        values: &E::Values,
529        def_levels: Option<&[i16]>,
530        rep_levels: Option<&[i16]>,
531        min: Option<&E::T>,
532        max: Option<&E::T>,
533        distinct_count: Option<u64>,
534    ) -> Result<usize> {
535        self.write_batch_internal(
536            values,
537            None,
538            def_levels,
539            rep_levels,
540            min,
541            max,
542            distinct_count,
543        )
544    }
545
546    /// Returns the estimated total memory usage.
547    ///
548    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
549    /// of the current memory usage and not the final anticipated encoded size.
550    #[cfg(feature = "arrow")]
551    pub(crate) fn memory_size(&self) -> usize {
552        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
553    }
554
555    /// Returns total number of bytes written by this column writer so far.
556    /// This value is also returned when column writer is closed.
557    ///
558    /// Note: this value does not include any buffered data that has not
559    /// yet been flushed to a page.
560    pub fn get_total_bytes_written(&self) -> u64 {
561        self.column_metrics.total_bytes_written
562    }
563
564    /// Returns the estimated total encoded bytes for this column writer.
565    ///
566    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
567    /// of any data that has not yet been flushed to a page, based on it's
568    /// anticipated encoded size.
569    #[cfg(feature = "arrow")]
570    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
571        self.column_metrics.total_bytes_written
572            + self.encoder.estimated_data_page_size() as u64
573            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
574    }
575
576    /// Returns total number of rows written by this column writer so far.
577    /// This value is also returned when column writer is closed.
578    pub fn get_total_rows_written(&self) -> u64 {
579        self.column_metrics.total_rows_written
580    }
581
582    /// Returns a reference to a [`ColumnDescPtr`]
583    pub fn get_descriptor(&self) -> &ColumnDescPtr {
584        &self.descr
585    }
586
587    /// Finalizes writes and closes the column writer.
588    /// Returns total bytes written, total rows written and column chunk metadata.
589    pub fn close(mut self) -> Result<ColumnCloseResult> {
590        if self.page_metrics.num_buffered_values > 0 {
591            self.add_data_page()?;
592        }
593        if self.encoder.has_dictionary() {
594            self.write_dictionary_page()?;
595        }
596        self.flush_data_pages()?;
597        let metadata = self.build_column_metadata()?;
598        self.page_writer.close()?;
599
600        let boundary_order = match (
601            self.data_page_boundary_ascending,
602            self.data_page_boundary_descending,
603        ) {
604            // If the lists are composed of equal elements then will be marked as ascending
605            // (Also the case if all pages are null pages)
606            (true, _) => BoundaryOrder::ASCENDING,
607            (false, true) => BoundaryOrder::DESCENDING,
608            (false, false) => BoundaryOrder::UNORDERED,
609        };
610        self.column_index_builder.set_boundary_order(boundary_order);
611
612        let column_index = self
613            .column_index_builder
614            .valid()
615            .then(|| self.column_index_builder.build_to_thrift());
616        let offset_index = Some(self.offset_index_builder.build_to_thrift());
617
618        Ok(ColumnCloseResult {
619            bytes_written: self.column_metrics.total_bytes_written,
620            rows_written: self.column_metrics.total_rows_written,
621            bloom_filter: self.encoder.flush_bloom_filter(),
622            metadata,
623            column_index,
624            offset_index,
625        })
626    }
627
628    /// Writes mini batch of values, definition and repetition levels.
629    /// This allows fine-grained processing of values and maintaining a reasonable
630    /// page size.
631    fn write_mini_batch(
632        &mut self,
633        values: &E::Values,
634        values_offset: usize,
635        value_indices: Option<&[usize]>,
636        num_levels: usize,
637        def_levels: Option<&[i16]>,
638        rep_levels: Option<&[i16]>,
639    ) -> Result<usize> {
640        // Process definition levels and determine how many values to write.
641        let values_to_write = if self.descr.max_def_level() > 0 {
642            let levels = def_levels.ok_or_else(|| {
643                general_err!(
644                    "Definition levels are required, because max definition level = {}",
645                    self.descr.max_def_level()
646                )
647            })?;
648
649            let mut values_to_write = 0;
650            for &level in levels {
651                if level == self.descr.max_def_level() {
652                    values_to_write += 1;
653                } else {
654                    // We must always compute this as it is used to populate v2 pages
655                    self.page_metrics.num_page_nulls += 1
656                }
657            }
658
659            // Update histogram
660            self.page_metrics.update_definition_level_histogram(levels);
661
662            self.def_levels_sink.extend_from_slice(levels);
663            values_to_write
664        } else {
665            num_levels
666        };
667
668        // Process repetition levels and determine how many rows we are about to process.
669        if self.descr.max_rep_level() > 0 {
670            // A row could contain more than one value.
671            let levels = rep_levels.ok_or_else(|| {
672                general_err!(
673                    "Repetition levels are required, because max repetition level = {}",
674                    self.descr.max_rep_level()
675                )
676            })?;
677
678            if !levels.is_empty() && levels[0] != 0 {
679                return Err(general_err!(
680                    "Write must start at a record boundary, got non-zero repetition level of {}",
681                    levels[0]
682                ));
683            }
684
685            // Count the occasions where we start a new row
686            for &level in levels {
687                self.page_metrics.num_buffered_rows += (level == 0) as u32
688            }
689
690            // Update histogram
691            self.page_metrics.update_repetition_level_histogram(levels);
692
693            self.rep_levels_sink.extend_from_slice(levels);
694        } else {
695            // Each value is exactly one row.
696            // Equals to the number of values, we count nulls as well.
697            self.page_metrics.num_buffered_rows += num_levels as u32;
698        }
699
700        match value_indices {
701            Some(indices) => {
702                let indices = &indices[values_offset..values_offset + values_to_write];
703                self.encoder.write_gather(values, indices)?;
704            }
705            None => self.encoder.write(values, values_offset, values_to_write)?,
706        }
707
708        self.page_metrics.num_buffered_values += num_levels as u32;
709
710        if self.should_add_data_page() {
711            self.add_data_page()?;
712        }
713
714        if self.should_dict_fallback() {
715            self.dict_fallback()?;
716        }
717
718        Ok(values_to_write)
719    }
720
721    /// Returns true if we need to fall back to non-dictionary encoding.
722    ///
723    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
724    /// size.
725    #[inline]
726    fn should_dict_fallback(&self) -> bool {
727        match self.encoder.estimated_dict_page_size() {
728            Some(size) => size >= self.props.dictionary_page_size_limit(),
729            None => false,
730        }
731    }
732
733    /// Returns true if there is enough data for a data page, false otherwise.
734    #[inline]
735    fn should_add_data_page(&self) -> bool {
736        // This is necessary in the event of a much larger dictionary size than page size
737        //
738        // In such a scenario the dictionary decoder may return an estimated encoded
739        // size in excess of the page size limit, even when there are no buffered values
740        if self.page_metrics.num_buffered_values == 0 {
741            return false;
742        }
743
744        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
745            || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
746    }
747
748    /// Performs dictionary fallback.
749    /// Prepares and writes dictionary and all data pages into page writer.
750    fn dict_fallback(&mut self) -> Result<()> {
751        // At this point we know that we need to fall back.
752        if self.page_metrics.num_buffered_values > 0 {
753            self.add_data_page()?;
754        }
755        self.write_dictionary_page()?;
756        self.flush_data_pages()?;
757        Ok(())
758    }
759
760    /// Update the column index and offset index when adding the data page
761    fn update_column_offset_index(
762        &mut self,
763        page_statistics: Option<&ValueStatistics<E::T>>,
764        page_variable_length_bytes: Option<i64>,
765    ) {
766        // update the column index
767        let null_page =
768            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
769        // a page contains only null values,
770        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
771        if null_page && self.column_index_builder.valid() {
772            self.column_index_builder.append(
773                null_page,
774                vec![],
775                vec![],
776                self.page_metrics.num_page_nulls as i64,
777            );
778        } else if self.column_index_builder.valid() {
779            // from page statistics
780            // If can't get the page statistics, ignore this column/offset index for this column chunk
781            match &page_statistics {
782                None => {
783                    self.column_index_builder.to_invalid();
784                }
785                Some(stat) => {
786                    // Check if min/max are still ascending/descending across pages
787                    let new_min = stat.min_opt().unwrap();
788                    let new_max = stat.max_opt().unwrap();
789                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
790                        if self.data_page_boundary_ascending {
791                            // If last min/max are greater than new min/max then not ascending anymore
792                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
793                                || compare_greater(&self.descr, last_max, new_max);
794                            if not_ascending {
795                                self.data_page_boundary_ascending = false;
796                            }
797                        }
798
799                        if self.data_page_boundary_descending {
800                            // If new min/max are greater than last min/max then not descending anymore
801                            let not_descending = compare_greater(&self.descr, new_min, last_min)
802                                || compare_greater(&self.descr, new_max, last_max);
803                            if not_descending {
804                                self.data_page_boundary_descending = false;
805                            }
806                        }
807                    }
808                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
809
810                    if self.can_truncate_value() {
811                        self.column_index_builder.append(
812                            null_page,
813                            self.truncate_min_value(
814                                self.props.column_index_truncate_length(),
815                                stat.min_bytes_opt().unwrap(),
816                            )
817                            .0,
818                            self.truncate_max_value(
819                                self.props.column_index_truncate_length(),
820                                stat.max_bytes_opt().unwrap(),
821                            )
822                            .0,
823                            self.page_metrics.num_page_nulls as i64,
824                        );
825                    } else {
826                        self.column_index_builder.append(
827                            null_page,
828                            stat.min_bytes_opt().unwrap().to_vec(),
829                            stat.max_bytes_opt().unwrap().to_vec(),
830                            self.page_metrics.num_page_nulls as i64,
831                        );
832                    }
833                }
834            }
835        }
836
837        // Append page histograms to the `ColumnIndex` histograms
838        self.column_index_builder.append_histograms(
839            &self.page_metrics.repetition_level_histogram,
840            &self.page_metrics.definition_level_histogram,
841        );
842
843        // Update the offset index
844        self.offset_index_builder
845            .append_row_count(self.page_metrics.num_buffered_rows as i64);
846
847        self.offset_index_builder
848            .append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
849    }
850
851    /// Determine if we should allow truncating min/max values for this column's statistics
852    fn can_truncate_value(&self) -> bool {
853        match self.descr.physical_type() {
854            // Don't truncate for Float16 and Decimal because their sort order is different
855            // from that of FIXED_LEN_BYTE_ARRAY sort order.
856            // So truncation of those types could lead to inaccurate min/max statistics
857            Type::FIXED_LEN_BYTE_ARRAY
858                if !matches!(
859                    self.descr.logical_type(),
860                    Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
861                ) =>
862            {
863                true
864            }
865            Type::BYTE_ARRAY => true,
866            // Truncation only applies for fba/binary physical types
867            _ => false,
868        }
869    }
870
871    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
872        truncation_length
873            .filter(|l| data.len() > *l)
874            .and_then(|l| match str::from_utf8(data) {
875                Ok(str_data) => truncate_utf8(str_data, l),
876                Err(_) => Some(data[..l].to_vec()),
877            })
878            .map(|truncated| (truncated, true))
879            .unwrap_or_else(|| (data.to_vec(), false))
880    }
881
882    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
883        truncation_length
884            .filter(|l| data.len() > *l)
885            .and_then(|l| match str::from_utf8(data) {
886                Ok(str_data) => truncate_utf8(str_data, l).and_then(increment_utf8),
887                Err(_) => increment(data[..l].to_vec()),
888            })
889            .map(|truncated| (truncated, true))
890            .unwrap_or_else(|| (data.to_vec(), false))
891    }
892
893    /// Adds data page.
894    /// Data page is either buffered in case of dictionary encoding or written directly.
895    fn add_data_page(&mut self) -> Result<()> {
896        // Extract encoded values
897        let values_data = self.encoder.flush_data_page()?;
898
899        let max_def_level = self.descr.max_def_level();
900        let max_rep_level = self.descr.max_rep_level();
901
902        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
903
904        let page_statistics = match (values_data.min_value, values_data.max_value) {
905            (Some(min), Some(max)) => {
906                // Update chunk level statistics
907                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
908                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
909
910                (self.statistics_enabled == EnabledStatistics::Page).then_some(
911                    ValueStatistics::new(
912                        Some(min),
913                        Some(max),
914                        None,
915                        Some(self.page_metrics.num_page_nulls),
916                        false,
917                    ),
918                )
919            }
920            _ => None,
921        };
922
923        // update column and offset index
924        self.update_column_offset_index(
925            page_statistics.as_ref(),
926            values_data.variable_length_bytes,
927        );
928
929        // Update histograms and variable_length_bytes in column_metrics
930        self.column_metrics
931            .update_from_page_metrics(&self.page_metrics);
932        self.column_metrics
933            .update_variable_length_bytes(values_data.variable_length_bytes);
934
935        let page_statistics = page_statistics.map(Statistics::from);
936
937        let compressed_page = match self.props.writer_version() {
938            WriterVersion::PARQUET_1_0 => {
939                let mut buffer = vec![];
940
941                if max_rep_level > 0 {
942                    buffer.extend_from_slice(
943                        &self.encode_levels_v1(
944                            Encoding::RLE,
945                            &self.rep_levels_sink[..],
946                            max_rep_level,
947                        )[..],
948                    );
949                }
950
951                if max_def_level > 0 {
952                    buffer.extend_from_slice(
953                        &self.encode_levels_v1(
954                            Encoding::RLE,
955                            &self.def_levels_sink[..],
956                            max_def_level,
957                        )[..],
958                    );
959                }
960
961                buffer.extend_from_slice(&values_data.buf);
962                let uncompressed_size = buffer.len();
963
964                if let Some(ref mut cmpr) = self.compressor {
965                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
966                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
967                    buffer = compressed_buf;
968                }
969
970                let data_page = Page::DataPage {
971                    buf: buffer.into(),
972                    num_values: self.page_metrics.num_buffered_values,
973                    encoding: values_data.encoding,
974                    def_level_encoding: Encoding::RLE,
975                    rep_level_encoding: Encoding::RLE,
976                    statistics: page_statistics,
977                };
978
979                CompressedPage::new(data_page, uncompressed_size)
980            }
981            WriterVersion::PARQUET_2_0 => {
982                let mut rep_levels_byte_len = 0;
983                let mut def_levels_byte_len = 0;
984                let mut buffer = vec![];
985
986                if max_rep_level > 0 {
987                    let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
988                    rep_levels_byte_len = levels.len();
989                    buffer.extend_from_slice(&levels[..]);
990                }
991
992                if max_def_level > 0 {
993                    let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
994                    def_levels_byte_len = levels.len();
995                    buffer.extend_from_slice(&levels[..]);
996                }
997
998                let uncompressed_size =
999                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1000
1001                // Data Page v2 compresses values only.
1002                match self.compressor {
1003                    Some(ref mut cmpr) => {
1004                        cmpr.compress(&values_data.buf, &mut buffer)?;
1005                    }
1006                    None => buffer.extend_from_slice(&values_data.buf),
1007                }
1008
1009                let data_page = Page::DataPageV2 {
1010                    buf: buffer.into(),
1011                    num_values: self.page_metrics.num_buffered_values,
1012                    encoding: values_data.encoding,
1013                    num_nulls: self.page_metrics.num_page_nulls as u32,
1014                    num_rows: self.page_metrics.num_buffered_rows,
1015                    def_levels_byte_len: def_levels_byte_len as u32,
1016                    rep_levels_byte_len: rep_levels_byte_len as u32,
1017                    is_compressed: self.compressor.is_some(),
1018                    statistics: page_statistics,
1019                };
1020
1021                CompressedPage::new(data_page, uncompressed_size)
1022            }
1023        };
1024
1025        // Check if we need to buffer data page or flush it to the sink directly.
1026        if self.encoder.has_dictionary() {
1027            self.data_pages.push_back(compressed_page);
1028        } else {
1029            self.write_data_page(compressed_page)?;
1030        }
1031
1032        // Update total number of rows.
1033        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1034
1035        // Reset state.
1036        self.rep_levels_sink.clear();
1037        self.def_levels_sink.clear();
1038        self.page_metrics.new_page();
1039
1040        Ok(())
1041    }
1042
1043    /// Finalises any outstanding data pages and flushes buffered data pages from
1044    /// dictionary encoding into underlying sink.
1045    #[inline]
1046    fn flush_data_pages(&mut self) -> Result<()> {
1047        // Write all outstanding data to a new page.
1048        if self.page_metrics.num_buffered_values > 0 {
1049            self.add_data_page()?;
1050        }
1051
1052        while let Some(page) = self.data_pages.pop_front() {
1053            self.write_data_page(page)?;
1054        }
1055
1056        Ok(())
1057    }
1058
1059    /// Assembles column chunk metadata.
1060    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1061        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1062        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1063        let num_values = self.column_metrics.total_num_values as i64;
1064        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1065        // If data page offset is not set, then no pages have been written
1066        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1067
1068        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1069            .set_compression(self.codec)
1070            .set_encodings(self.encodings.iter().cloned().collect())
1071            .set_total_compressed_size(total_compressed_size)
1072            .set_total_uncompressed_size(total_uncompressed_size)
1073            .set_num_values(num_values)
1074            .set_data_page_offset(data_page_offset)
1075            .set_dictionary_page_offset(dict_page_offset);
1076
1077        if self.statistics_enabled != EnabledStatistics::None {
1078            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1079
1080            let statistics = ValueStatistics::<E::T>::new(
1081                self.column_metrics.min_column_value.clone(),
1082                self.column_metrics.max_column_value.clone(),
1083                self.column_metrics.column_distinct_count,
1084                Some(self.column_metrics.num_column_nulls),
1085                false,
1086            )
1087            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1088            .into();
1089
1090            let statistics = match statistics {
1091                Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
1092                    let (min, did_truncate_min) = self.truncate_min_value(
1093                        self.props.statistics_truncate_length(),
1094                        stats.min_bytes_opt().unwrap(),
1095                    );
1096                    let (max, did_truncate_max) = self.truncate_max_value(
1097                        self.props.statistics_truncate_length(),
1098                        stats.max_bytes_opt().unwrap(),
1099                    );
1100                    Statistics::ByteArray(
1101                        ValueStatistics::new(
1102                            Some(min.into()),
1103                            Some(max.into()),
1104                            stats.distinct_count(),
1105                            stats.null_count_opt(),
1106                            backwards_compatible_min_max,
1107                        )
1108                        .with_max_is_exact(!did_truncate_max)
1109                        .with_min_is_exact(!did_truncate_min),
1110                    )
1111                }
1112                Statistics::FixedLenByteArray(stats)
1113                    if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
1114                {
1115                    let (min, did_truncate_min) = self.truncate_min_value(
1116                        self.props.statistics_truncate_length(),
1117                        stats.min_bytes_opt().unwrap(),
1118                    );
1119                    let (max, did_truncate_max) = self.truncate_max_value(
1120                        self.props.statistics_truncate_length(),
1121                        stats.max_bytes_opt().unwrap(),
1122                    );
1123                    Statistics::FixedLenByteArray(
1124                        ValueStatistics::new(
1125                            Some(min.into()),
1126                            Some(max.into()),
1127                            stats.distinct_count(),
1128                            stats.null_count_opt(),
1129                            backwards_compatible_min_max,
1130                        )
1131                        .with_max_is_exact(!did_truncate_max)
1132                        .with_min_is_exact(!did_truncate_min),
1133                    )
1134                }
1135                stats => stats,
1136            };
1137
1138            builder = builder
1139                .set_statistics(statistics)
1140                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1141                .set_repetition_level_histogram(
1142                    self.column_metrics.repetition_level_histogram.take(),
1143                )
1144                .set_definition_level_histogram(
1145                    self.column_metrics.definition_level_histogram.take(),
1146                );
1147        }
1148
1149        let metadata = builder.build()?;
1150        Ok(metadata)
1151    }
1152
1153    /// Encodes definition or repetition levels for Data Page v1.
1154    #[inline]
1155    fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1156        let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1157        encoder.put(levels);
1158        encoder.consume()
1159    }
1160
1161    /// Encodes definition or repetition levels for Data Page v2.
1162    /// Encoding is always RLE.
1163    #[inline]
1164    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1165        let mut encoder = LevelEncoder::v2(max_level, levels.len());
1166        encoder.put(levels);
1167        encoder.consume()
1168    }
1169
1170    /// Writes compressed data page into underlying sink and updates global metrics.
1171    #[inline]
1172    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1173        self.encodings.insert(page.encoding());
1174        let page_spec = self.page_writer.write_page(page)?;
1175        // update offset index
1176        // compressed_size = header_size + compressed_data_size
1177        self.offset_index_builder
1178            .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32);
1179        self.update_metrics_for_page(page_spec);
1180        Ok(())
1181    }
1182
1183    /// Writes dictionary page into underlying sink.
1184    #[inline]
1185    fn write_dictionary_page(&mut self) -> Result<()> {
1186        let compressed_page = {
1187            let mut page = self
1188                .encoder
1189                .flush_dict_page()?
1190                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1191
1192            let uncompressed_size = page.buf.len();
1193
1194            if let Some(ref mut cmpr) = self.compressor {
1195                let mut output_buf = Vec::with_capacity(uncompressed_size);
1196                cmpr.compress(&page.buf, &mut output_buf)?;
1197                page.buf = Bytes::from(output_buf);
1198            }
1199
1200            let dict_page = Page::DictionaryPage {
1201                buf: page.buf,
1202                num_values: page.num_values as u32,
1203                encoding: self.props.dictionary_page_encoding(),
1204                is_sorted: page.is_sorted,
1205            };
1206            CompressedPage::new(dict_page, uncompressed_size)
1207        };
1208
1209        self.encodings.insert(compressed_page.encoding());
1210        let page_spec = self.page_writer.write_page(compressed_page)?;
1211        self.update_metrics_for_page(page_spec);
1212        // For the directory page, don't need to update column/offset index.
1213        Ok(())
1214    }
1215
1216    /// Updates column writer metrics with each page metadata.
1217    #[inline]
1218    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1219        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1220        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1221        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1222
1223        match page_spec.page_type {
1224            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1225                self.column_metrics.total_num_values += page_spec.num_values as u64;
1226                if self.column_metrics.data_page_offset.is_none() {
1227                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1228                }
1229            }
1230            PageType::DICTIONARY_PAGE => {
1231                assert!(
1232                    self.column_metrics.dictionary_page_offset.is_none(),
1233                    "Dictionary offset is already set"
1234                );
1235                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1236            }
1237            _ => {}
1238        }
1239    }
1240}
1241
1242fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1243    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1244}
1245
1246fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1247    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1248}
1249
1250#[inline]
1251#[allow(clippy::eq_op)]
1252fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1253    match T::PHYSICAL_TYPE {
1254        Type::FLOAT | Type::DOUBLE => val != val,
1255        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1256            let val = val.as_bytes();
1257            let val = f16::from_le_bytes([val[0], val[1]]);
1258            val.is_nan()
1259        }
1260        _ => false,
1261    }
1262}
1263
1264/// Perform a conditional update of `cur`, skipping any NaN values
1265///
1266/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1267/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1268fn update_stat<T: ParquetValueType, F>(
1269    descr: &ColumnDescriptor,
1270    val: &T,
1271    cur: &mut Option<T>,
1272    should_update: F,
1273) where
1274    F: Fn(&T) -> bool,
1275{
1276    if is_nan(descr, val) {
1277        return;
1278    }
1279
1280    if cur.as_ref().map_or(true, should_update) {
1281        *cur = Some(val.clone());
1282    }
1283}
1284
1285/// Evaluate `a > b` according to underlying logical type.
1286fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1287    if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
1288        if !is_signed {
1289            // need to compare unsigned
1290            return a.as_u64().unwrap() > b.as_u64().unwrap();
1291        }
1292    }
1293
1294    match descr.converted_type() {
1295        ConvertedType::UINT_8
1296        | ConvertedType::UINT_16
1297        | ConvertedType::UINT_32
1298        | ConvertedType::UINT_64 => {
1299            return a.as_u64().unwrap() > b.as_u64().unwrap();
1300        }
1301        _ => {}
1302    };
1303
1304    if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1305        match T::PHYSICAL_TYPE {
1306            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1307                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1308            }
1309            _ => {}
1310        };
1311    }
1312
1313    if descr.converted_type() == ConvertedType::DECIMAL {
1314        match T::PHYSICAL_TYPE {
1315            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1316                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1317            }
1318            _ => {}
1319        };
1320    };
1321
1322    if let Some(LogicalType::Float16) = descr.logical_type() {
1323        let a = a.as_bytes();
1324        let a = f16::from_le_bytes([a[0], a[1]]);
1325        let b = b.as_bytes();
1326        let b = f16::from_le_bytes([b[0], b[1]]);
1327        return a > b;
1328    }
1329
1330    a > b
1331}
1332
1333// ----------------------------------------------------------------------
1334// Encoding support for column writer.
1335// This mirrors parquet-mr default encodings for writes. See:
1336// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1337// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1338
1339/// Returns encoding for a column when no other encoding is provided in writer properties.
1340fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1341    match (kind, props.writer_version()) {
1342        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1343        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1344        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1345        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1346        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1347        _ => Encoding::PLAIN,
1348    }
1349}
1350
1351/// Returns true if dictionary is supported for column writer, false otherwise.
1352fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1353    match (kind, props.writer_version()) {
1354        // Booleans do not support dict encoding and should use a fallback encoding.
1355        (Type::BOOLEAN, _) => false,
1356        // Dictionary encoding was not enabled in PARQUET 1.0
1357        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1358        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1359        _ => true,
1360    }
1361}
1362
1363/// Signed comparison of bytes arrays
1364fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1365    let a_length = a.len();
1366    let b_length = b.len();
1367
1368    if a_length == 0 || b_length == 0 {
1369        return a_length > 0;
1370    }
1371
1372    let first_a: u8 = a[0];
1373    let first_b: u8 = b[0];
1374
1375    // We can short circuit for different signed numbers or
1376    // for equal length bytes arrays that have different first bytes.
1377    // The equality requirement is necessary for sign extension cases.
1378    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1379    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1380        return (first_a as i8) > (first_b as i8);
1381    }
1382
1383    // When the lengths are unequal and the numbers are of the same
1384    // sign we need to do comparison by sign extending the shorter
1385    // value first, and once we get to equal sized arrays, lexicographical
1386    // unsigned comparison of everything but the first byte is sufficient.
1387
1388    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1389
1390    if a_length != b_length {
1391        let not_equal = if a_length > b_length {
1392            let lead_length = a_length - b_length;
1393            a[0..lead_length].iter().any(|&x| x != extension)
1394        } else {
1395            let lead_length = b_length - a_length;
1396            b[0..lead_length].iter().any(|&x| x != extension)
1397        };
1398
1399        if not_equal {
1400            let negative_values: bool = (first_a as i8) < 0;
1401            let a_longer: bool = a_length > b_length;
1402            return if negative_values { !a_longer } else { a_longer };
1403        }
1404    }
1405
1406    (a[1..]) > (b[1..])
1407}
1408
1409/// Truncate a UTF8 slice to the longest prefix that is still a valid UTF8 string,
1410/// while being less than `length` bytes and non-empty
1411fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1412    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1413    Some(data.as_bytes()[..split].to_vec())
1414}
1415
1416/// Try and increment the bytes from right to left.
1417///
1418/// Returns `None` if all bytes are set to `u8::MAX`.
1419fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1420    for byte in data.iter_mut().rev() {
1421        let (incremented, overflow) = byte.overflowing_add(1);
1422        *byte = incremented;
1423
1424        if !overflow {
1425            return Some(data);
1426        }
1427    }
1428
1429    None
1430}
1431
1432/// Try and increment the the string's bytes from right to left, returning when the result
1433/// is a valid UTF8 string. Returns `None` when it can't increment any byte.
1434fn increment_utf8(mut data: Vec<u8>) -> Option<Vec<u8>> {
1435    for idx in (0..data.len()).rev() {
1436        let original = data[idx];
1437        let (byte, overflow) = original.overflowing_add(1);
1438        if !overflow {
1439            data[idx] = byte;
1440            if str::from_utf8(&data).is_ok() {
1441                return Some(data);
1442            }
1443            data[idx] = original;
1444        }
1445    }
1446
1447    None
1448}
1449
1450#[cfg(test)]
1451mod tests {
1452    use crate::file::properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
1453    use rand::distributions::uniform::SampleUniform;
1454    use std::sync::Arc;
1455
1456    use crate::column::{
1457        page::PageReader,
1458        reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1459    };
1460    use crate::file::writer::TrackedWrite;
1461    use crate::file::{
1462        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1463    };
1464    use crate::schema::types::{ColumnPath, Type as SchemaType};
1465    use crate::util::test_common::rand_gen::random_numbers_range;
1466
1467    use super::*;
1468
1469    #[test]
1470    fn test_column_writer_inconsistent_def_rep_length() {
1471        let page_writer = get_test_page_writer();
1472        let props = Default::default();
1473        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1474        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1475        assert!(res.is_err());
1476        if let Err(err) = res {
1477            assert_eq!(
1478                format!("{err}"),
1479                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1480            );
1481        }
1482    }
1483
1484    #[test]
1485    fn test_column_writer_invalid_def_levels() {
1486        let page_writer = get_test_page_writer();
1487        let props = Default::default();
1488        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1489        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1490        assert!(res.is_err());
1491        if let Err(err) = res {
1492            assert_eq!(
1493                format!("{err}"),
1494                "Parquet error: Definition levels are required, because max definition level = 1"
1495            );
1496        }
1497    }
1498
1499    #[test]
1500    fn test_column_writer_invalid_rep_levels() {
1501        let page_writer = get_test_page_writer();
1502        let props = Default::default();
1503        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1504        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1505        assert!(res.is_err());
1506        if let Err(err) = res {
1507            assert_eq!(
1508                format!("{err}"),
1509                "Parquet error: Repetition levels are required, because max repetition level = 1"
1510            );
1511        }
1512    }
1513
1514    #[test]
1515    fn test_column_writer_not_enough_values_to_write() {
1516        let page_writer = get_test_page_writer();
1517        let props = Default::default();
1518        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1519        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1520        assert!(res.is_err());
1521        if let Err(err) = res {
1522            assert_eq!(
1523                format!("{err}"),
1524                "Parquet error: Expected to write 4 values, but have only 2"
1525            );
1526        }
1527    }
1528
1529    #[test]
1530    fn test_column_writer_write_only_one_dictionary_page() {
1531        let page_writer = get_test_page_writer();
1532        let props = Default::default();
1533        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1534        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1535        // First page should be correctly written.
1536        writer.add_data_page().unwrap();
1537        writer.write_dictionary_page().unwrap();
1538        let err = writer.write_dictionary_page().unwrap_err().to_string();
1539        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1540    }
1541
1542    #[test]
1543    fn test_column_writer_error_when_writing_disabled_dictionary() {
1544        let page_writer = get_test_page_writer();
1545        let props = Arc::new(
1546            WriterProperties::builder()
1547                .set_dictionary_enabled(false)
1548                .build(),
1549        );
1550        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1551        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1552        let err = writer.write_dictionary_page().unwrap_err().to_string();
1553        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1554    }
1555
1556    #[test]
1557    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1558        let page_writer = get_test_page_writer();
1559        let props = Arc::new(
1560            WriterProperties::builder()
1561                .set_dictionary_enabled(true)
1562                .build(),
1563        );
1564        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1565        writer
1566            .write_batch(&[true, false, true, false], None, None)
1567            .unwrap();
1568
1569        let r = writer.close().unwrap();
1570        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
1571        // byte.
1572        assert_eq!(r.bytes_written, 1);
1573        assert_eq!(r.rows_written, 4);
1574
1575        let metadata = r.metadata;
1576        assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1577        assert_eq!(metadata.num_values(), 4); // just values
1578        assert_eq!(metadata.dictionary_page_offset(), None);
1579    }
1580
1581    #[test]
1582    fn test_column_writer_default_encoding_support_bool() {
1583        check_encoding_write_support::<BoolType>(
1584            WriterVersion::PARQUET_1_0,
1585            true,
1586            &[true, false],
1587            None,
1588            &[Encoding::PLAIN, Encoding::RLE],
1589        );
1590        check_encoding_write_support::<BoolType>(
1591            WriterVersion::PARQUET_1_0,
1592            false,
1593            &[true, false],
1594            None,
1595            &[Encoding::PLAIN, Encoding::RLE],
1596        );
1597        check_encoding_write_support::<BoolType>(
1598            WriterVersion::PARQUET_2_0,
1599            true,
1600            &[true, false],
1601            None,
1602            &[Encoding::RLE],
1603        );
1604        check_encoding_write_support::<BoolType>(
1605            WriterVersion::PARQUET_2_0,
1606            false,
1607            &[true, false],
1608            None,
1609            &[Encoding::RLE],
1610        );
1611    }
1612
1613    #[test]
1614    fn test_column_writer_default_encoding_support_int32() {
1615        check_encoding_write_support::<Int32Type>(
1616            WriterVersion::PARQUET_1_0,
1617            true,
1618            &[1, 2],
1619            Some(0),
1620            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1621        );
1622        check_encoding_write_support::<Int32Type>(
1623            WriterVersion::PARQUET_1_0,
1624            false,
1625            &[1, 2],
1626            None,
1627            &[Encoding::PLAIN, Encoding::RLE],
1628        );
1629        check_encoding_write_support::<Int32Type>(
1630            WriterVersion::PARQUET_2_0,
1631            true,
1632            &[1, 2],
1633            Some(0),
1634            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1635        );
1636        check_encoding_write_support::<Int32Type>(
1637            WriterVersion::PARQUET_2_0,
1638            false,
1639            &[1, 2],
1640            None,
1641            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1642        );
1643    }
1644
1645    #[test]
1646    fn test_column_writer_default_encoding_support_int64() {
1647        check_encoding_write_support::<Int64Type>(
1648            WriterVersion::PARQUET_1_0,
1649            true,
1650            &[1, 2],
1651            Some(0),
1652            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1653        );
1654        check_encoding_write_support::<Int64Type>(
1655            WriterVersion::PARQUET_1_0,
1656            false,
1657            &[1, 2],
1658            None,
1659            &[Encoding::PLAIN, Encoding::RLE],
1660        );
1661        check_encoding_write_support::<Int64Type>(
1662            WriterVersion::PARQUET_2_0,
1663            true,
1664            &[1, 2],
1665            Some(0),
1666            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1667        );
1668        check_encoding_write_support::<Int64Type>(
1669            WriterVersion::PARQUET_2_0,
1670            false,
1671            &[1, 2],
1672            None,
1673            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1674        );
1675    }
1676
1677    #[test]
1678    fn test_column_writer_default_encoding_support_int96() {
1679        check_encoding_write_support::<Int96Type>(
1680            WriterVersion::PARQUET_1_0,
1681            true,
1682            &[Int96::from(vec![1, 2, 3])],
1683            Some(0),
1684            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1685        );
1686        check_encoding_write_support::<Int96Type>(
1687            WriterVersion::PARQUET_1_0,
1688            false,
1689            &[Int96::from(vec![1, 2, 3])],
1690            None,
1691            &[Encoding::PLAIN, Encoding::RLE],
1692        );
1693        check_encoding_write_support::<Int96Type>(
1694            WriterVersion::PARQUET_2_0,
1695            true,
1696            &[Int96::from(vec![1, 2, 3])],
1697            Some(0),
1698            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1699        );
1700        check_encoding_write_support::<Int96Type>(
1701            WriterVersion::PARQUET_2_0,
1702            false,
1703            &[Int96::from(vec![1, 2, 3])],
1704            None,
1705            &[Encoding::PLAIN, Encoding::RLE],
1706        );
1707    }
1708
1709    #[test]
1710    fn test_column_writer_default_encoding_support_float() {
1711        check_encoding_write_support::<FloatType>(
1712            WriterVersion::PARQUET_1_0,
1713            true,
1714            &[1.0, 2.0],
1715            Some(0),
1716            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1717        );
1718        check_encoding_write_support::<FloatType>(
1719            WriterVersion::PARQUET_1_0,
1720            false,
1721            &[1.0, 2.0],
1722            None,
1723            &[Encoding::PLAIN, Encoding::RLE],
1724        );
1725        check_encoding_write_support::<FloatType>(
1726            WriterVersion::PARQUET_2_0,
1727            true,
1728            &[1.0, 2.0],
1729            Some(0),
1730            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1731        );
1732        check_encoding_write_support::<FloatType>(
1733            WriterVersion::PARQUET_2_0,
1734            false,
1735            &[1.0, 2.0],
1736            None,
1737            &[Encoding::PLAIN, Encoding::RLE],
1738        );
1739    }
1740
1741    #[test]
1742    fn test_column_writer_default_encoding_support_double() {
1743        check_encoding_write_support::<DoubleType>(
1744            WriterVersion::PARQUET_1_0,
1745            true,
1746            &[1.0, 2.0],
1747            Some(0),
1748            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1749        );
1750        check_encoding_write_support::<DoubleType>(
1751            WriterVersion::PARQUET_1_0,
1752            false,
1753            &[1.0, 2.0],
1754            None,
1755            &[Encoding::PLAIN, Encoding::RLE],
1756        );
1757        check_encoding_write_support::<DoubleType>(
1758            WriterVersion::PARQUET_2_0,
1759            true,
1760            &[1.0, 2.0],
1761            Some(0),
1762            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1763        );
1764        check_encoding_write_support::<DoubleType>(
1765            WriterVersion::PARQUET_2_0,
1766            false,
1767            &[1.0, 2.0],
1768            None,
1769            &[Encoding::PLAIN, Encoding::RLE],
1770        );
1771    }
1772
1773    #[test]
1774    fn test_column_writer_default_encoding_support_byte_array() {
1775        check_encoding_write_support::<ByteArrayType>(
1776            WriterVersion::PARQUET_1_0,
1777            true,
1778            &[ByteArray::from(vec![1u8])],
1779            Some(0),
1780            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1781        );
1782        check_encoding_write_support::<ByteArrayType>(
1783            WriterVersion::PARQUET_1_0,
1784            false,
1785            &[ByteArray::from(vec![1u8])],
1786            None,
1787            &[Encoding::PLAIN, Encoding::RLE],
1788        );
1789        check_encoding_write_support::<ByteArrayType>(
1790            WriterVersion::PARQUET_2_0,
1791            true,
1792            &[ByteArray::from(vec![1u8])],
1793            Some(0),
1794            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1795        );
1796        check_encoding_write_support::<ByteArrayType>(
1797            WriterVersion::PARQUET_2_0,
1798            false,
1799            &[ByteArray::from(vec![1u8])],
1800            None,
1801            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
1802        );
1803    }
1804
1805    #[test]
1806    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
1807        check_encoding_write_support::<FixedLenByteArrayType>(
1808            WriterVersion::PARQUET_1_0,
1809            true,
1810            &[ByteArray::from(vec![1u8]).into()],
1811            None,
1812            &[Encoding::PLAIN, Encoding::RLE],
1813        );
1814        check_encoding_write_support::<FixedLenByteArrayType>(
1815            WriterVersion::PARQUET_1_0,
1816            false,
1817            &[ByteArray::from(vec![1u8]).into()],
1818            None,
1819            &[Encoding::PLAIN, Encoding::RLE],
1820        );
1821        check_encoding_write_support::<FixedLenByteArrayType>(
1822            WriterVersion::PARQUET_2_0,
1823            true,
1824            &[ByteArray::from(vec![1u8]).into()],
1825            Some(0),
1826            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1827        );
1828        check_encoding_write_support::<FixedLenByteArrayType>(
1829            WriterVersion::PARQUET_2_0,
1830            false,
1831            &[ByteArray::from(vec![1u8]).into()],
1832            None,
1833            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
1834        );
1835    }
1836
1837    #[test]
1838    fn test_column_writer_check_metadata() {
1839        let page_writer = get_test_page_writer();
1840        let props = Default::default();
1841        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1842        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1843
1844        let r = writer.close().unwrap();
1845        assert_eq!(r.bytes_written, 20);
1846        assert_eq!(r.rows_written, 4);
1847
1848        let metadata = r.metadata;
1849        assert_eq!(
1850            metadata.encodings(),
1851            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
1852        );
1853        assert_eq!(metadata.num_values(), 4);
1854        assert_eq!(metadata.compressed_size(), 20);
1855        assert_eq!(metadata.uncompressed_size(), 20);
1856        assert_eq!(metadata.data_page_offset(), 0);
1857        assert_eq!(metadata.dictionary_page_offset(), Some(0));
1858        if let Some(stats) = metadata.statistics() {
1859            assert_eq!(stats.null_count_opt(), Some(0));
1860            assert_eq!(stats.distinct_count_opt(), None);
1861            if let Statistics::Int32(stats) = stats {
1862                assert_eq!(stats.min_opt().unwrap(), &1);
1863                assert_eq!(stats.max_opt().unwrap(), &4);
1864            } else {
1865                panic!("expecting Statistics::Int32");
1866            }
1867        } else {
1868            panic!("metadata missing statistics");
1869        }
1870    }
1871
1872    #[test]
1873    fn test_column_writer_check_byte_array_min_max() {
1874        let page_writer = get_test_page_writer();
1875        let props = Default::default();
1876        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
1877        writer
1878            .write_batch(
1879                &[
1880                    ByteArray::from(vec![
1881                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
1882                        35u8, 231u8, 90u8, 0u8, 0u8,
1883                    ]),
1884                    ByteArray::from(vec![
1885                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
1886                        152u8, 177u8, 56u8, 0u8, 0u8,
1887                    ]),
1888                    ByteArray::from(vec![
1889                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
1890                        0u8,
1891                    ]),
1892                    ByteArray::from(vec![
1893                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
1894                        44u8, 0u8, 0u8,
1895                    ]),
1896                ],
1897                None,
1898                None,
1899            )
1900            .unwrap();
1901        let metadata = writer.close().unwrap().metadata;
1902        if let Some(stats) = metadata.statistics() {
1903            if let Statistics::ByteArray(stats) = stats {
1904                assert_eq!(
1905                    stats.min_opt().unwrap(),
1906                    &ByteArray::from(vec![
1907                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
1908                        35u8, 231u8, 90u8, 0u8, 0u8,
1909                    ])
1910                );
1911                assert_eq!(
1912                    stats.max_opt().unwrap(),
1913                    &ByteArray::from(vec![
1914                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
1915                        44u8, 0u8, 0u8,
1916                    ])
1917                );
1918            } else {
1919                panic!("expecting Statistics::ByteArray");
1920            }
1921        } else {
1922            panic!("metadata missing statistics");
1923        }
1924    }
1925
1926    #[test]
1927    fn test_column_writer_uint32_converted_type_min_max() {
1928        let page_writer = get_test_page_writer();
1929        let props = Default::default();
1930        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
1931            page_writer,
1932            0,
1933            0,
1934            props,
1935        );
1936        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
1937        let metadata = writer.close().unwrap().metadata;
1938        if let Some(stats) = metadata.statistics() {
1939            if let Statistics::Int32(stats) = stats {
1940                assert_eq!(stats.min_opt().unwrap(), &0,);
1941                assert_eq!(stats.max_opt().unwrap(), &5,);
1942            } else {
1943                panic!("expecting Statistics::Int32");
1944            }
1945        } else {
1946            panic!("metadata missing statistics");
1947        }
1948    }
1949
1950    #[test]
1951    fn test_column_writer_precalculated_statistics() {
1952        let page_writer = get_test_page_writer();
1953        let props = Arc::new(
1954            WriterProperties::builder()
1955                .set_statistics_enabled(EnabledStatistics::Chunk)
1956                .build(),
1957        );
1958        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1959        writer
1960            .write_batch_with_statistics(
1961                &[1, 2, 3, 4],
1962                None,
1963                None,
1964                Some(&-17),
1965                Some(&9000),
1966                Some(55),
1967            )
1968            .unwrap();
1969
1970        let r = writer.close().unwrap();
1971        assert_eq!(r.bytes_written, 20);
1972        assert_eq!(r.rows_written, 4);
1973
1974        let metadata = r.metadata;
1975        assert_eq!(
1976            metadata.encodings(),
1977            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
1978        );
1979        assert_eq!(metadata.num_values(), 4);
1980        assert_eq!(metadata.compressed_size(), 20);
1981        assert_eq!(metadata.uncompressed_size(), 20);
1982        assert_eq!(metadata.data_page_offset(), 0);
1983        assert_eq!(metadata.dictionary_page_offset(), Some(0));
1984        if let Some(stats) = metadata.statistics() {
1985            assert_eq!(stats.null_count_opt(), Some(0));
1986            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
1987            if let Statistics::Int32(stats) = stats {
1988                assert_eq!(stats.min_opt().unwrap(), &-17);
1989                assert_eq!(stats.max_opt().unwrap(), &9000);
1990            } else {
1991                panic!("expecting Statistics::Int32");
1992            }
1993        } else {
1994            panic!("metadata missing statistics");
1995        }
1996    }
1997
1998    #[test]
1999    fn test_mixed_precomputed_statistics() {
2000        let mut buf = Vec::with_capacity(100);
2001        let mut write = TrackedWrite::new(&mut buf);
2002        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2003        let props = Default::default();
2004        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2005
2006        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2007        writer
2008            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2009            .unwrap();
2010
2011        let r = writer.close().unwrap();
2012
2013        let stats = r.metadata.statistics().unwrap();
2014        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2015        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2016        assert_eq!(stats.null_count_opt(), Some(0));
2017        assert!(stats.distinct_count_opt().is_none());
2018
2019        drop(write);
2020
2021        let props = ReaderProperties::builder()
2022            .set_backward_compatible_lz4(false)
2023            .build();
2024        let reader = SerializedPageReader::new_with_properties(
2025            Arc::new(Bytes::from(buf)),
2026            &r.metadata,
2027            r.rows_written as usize,
2028            None,
2029            Arc::new(props),
2030        )
2031        .unwrap();
2032
2033        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2034        assert_eq!(pages.len(), 2);
2035
2036        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2037        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2038
2039        let page_statistics = pages[1].statistics().unwrap();
2040        assert_eq!(
2041            page_statistics.min_bytes_opt().unwrap(),
2042            1_i32.to_le_bytes()
2043        );
2044        assert_eq!(
2045            page_statistics.max_bytes_opt().unwrap(),
2046            7_i32.to_le_bytes()
2047        );
2048        assert_eq!(page_statistics.null_count_opt(), Some(0));
2049        assert!(page_statistics.distinct_count_opt().is_none());
2050    }
2051
2052    #[test]
2053    fn test_disabled_statistics() {
2054        let mut buf = Vec::with_capacity(100);
2055        let mut write = TrackedWrite::new(&mut buf);
2056        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2057        let props = WriterProperties::builder()
2058            .set_statistics_enabled(EnabledStatistics::None)
2059            .set_writer_version(WriterVersion::PARQUET_2_0)
2060            .build();
2061        let props = Arc::new(props);
2062
2063        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2064        writer
2065            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2066            .unwrap();
2067
2068        let r = writer.close().unwrap();
2069        assert!(r.metadata.statistics().is_none());
2070
2071        drop(write);
2072
2073        let props = ReaderProperties::builder()
2074            .set_backward_compatible_lz4(false)
2075            .build();
2076        let reader = SerializedPageReader::new_with_properties(
2077            Arc::new(Bytes::from(buf)),
2078            &r.metadata,
2079            r.rows_written as usize,
2080            None,
2081            Arc::new(props),
2082        )
2083        .unwrap();
2084
2085        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2086        assert_eq!(pages.len(), 2);
2087
2088        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2089        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2090
2091        match &pages[1] {
2092            Page::DataPageV2 {
2093                num_values,
2094                num_nulls,
2095                num_rows,
2096                statistics,
2097                ..
2098            } => {
2099                assert_eq!(*num_values, 6);
2100                assert_eq!(*num_nulls, 2);
2101                assert_eq!(*num_rows, 6);
2102                assert!(statistics.is_none());
2103            }
2104            _ => unreachable!(),
2105        }
2106    }
2107
2108    #[test]
2109    fn test_column_writer_empty_column_roundtrip() {
2110        let props = Default::default();
2111        column_roundtrip::<Int32Type>(props, &[], None, None);
2112    }
2113
2114    #[test]
2115    fn test_column_writer_non_nullable_values_roundtrip() {
2116        let props = Default::default();
2117        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2118    }
2119
2120    #[test]
2121    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2122        let props = Default::default();
2123        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2124    }
2125
2126    #[test]
2127    fn test_column_writer_nullable_repeated_values_roundtrip() {
2128        let props = Default::default();
2129        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2130    }
2131
2132    #[test]
2133    fn test_column_writer_dictionary_fallback_small_data_page() {
2134        let props = WriterProperties::builder()
2135            .set_dictionary_page_size_limit(32)
2136            .set_data_page_size_limit(32)
2137            .build();
2138        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2139    }
2140
2141    #[test]
2142    fn test_column_writer_small_write_batch_size() {
2143        for i in &[1usize, 2, 5, 10, 11, 1023] {
2144            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2145
2146            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2147        }
2148    }
2149
2150    #[test]
2151    fn test_column_writer_dictionary_disabled_v1() {
2152        let props = WriterProperties::builder()
2153            .set_writer_version(WriterVersion::PARQUET_1_0)
2154            .set_dictionary_enabled(false)
2155            .build();
2156        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2157    }
2158
2159    #[test]
2160    fn test_column_writer_dictionary_disabled_v2() {
2161        let props = WriterProperties::builder()
2162            .set_writer_version(WriterVersion::PARQUET_2_0)
2163            .set_dictionary_enabled(false)
2164            .build();
2165        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2166    }
2167
2168    #[test]
2169    fn test_column_writer_compression_v1() {
2170        let props = WriterProperties::builder()
2171            .set_writer_version(WriterVersion::PARQUET_1_0)
2172            .set_compression(Compression::SNAPPY)
2173            .build();
2174        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2175    }
2176
2177    #[test]
2178    fn test_column_writer_compression_v2() {
2179        let props = WriterProperties::builder()
2180            .set_writer_version(WriterVersion::PARQUET_2_0)
2181            .set_compression(Compression::SNAPPY)
2182            .build();
2183        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2184    }
2185
2186    #[test]
2187    fn test_column_writer_add_data_pages_with_dict() {
2188        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2189        // and no fallback occurred so far.
2190        let mut file = tempfile::tempfile().unwrap();
2191        let mut write = TrackedWrite::new(&mut file);
2192        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2193        let props = Arc::new(
2194            WriterProperties::builder()
2195                .set_data_page_size_limit(10)
2196                .set_write_batch_size(3) // write 3 values at a time
2197                .build(),
2198        );
2199        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2200        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2201        writer.write_batch(data, None, None).unwrap();
2202        let r = writer.close().unwrap();
2203
2204        drop(write);
2205
2206        // Read pages and check the sequence
2207        let props = ReaderProperties::builder()
2208            .set_backward_compatible_lz4(false)
2209            .build();
2210        let mut page_reader = Box::new(
2211            SerializedPageReader::new_with_properties(
2212                Arc::new(file),
2213                &r.metadata,
2214                r.rows_written as usize,
2215                None,
2216                Arc::new(props),
2217            )
2218            .unwrap(),
2219        );
2220        let mut res = Vec::new();
2221        while let Some(page) = page_reader.get_next_page().unwrap() {
2222            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2223        }
2224        assert_eq!(
2225            res,
2226            vec![
2227                (PageType::DICTIONARY_PAGE, 10, 40),
2228                (PageType::DATA_PAGE, 9, 10),
2229                (PageType::DATA_PAGE, 1, 3),
2230            ]
2231        );
2232    }
2233
2234    #[test]
2235    fn test_bool_statistics() {
2236        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2237        // Booleans have an unsigned sort order and so are not compatible
2238        // with the deprecated `min` and `max` statistics
2239        assert!(!stats.is_min_max_backwards_compatible());
2240        if let Statistics::Boolean(stats) = stats {
2241            assert_eq!(stats.min_opt().unwrap(), &false);
2242            assert_eq!(stats.max_opt().unwrap(), &true);
2243        } else {
2244            panic!("expecting Statistics::Boolean, got {stats:?}");
2245        }
2246    }
2247
2248    #[test]
2249    fn test_int32_statistics() {
2250        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2251        assert!(stats.is_min_max_backwards_compatible());
2252        if let Statistics::Int32(stats) = stats {
2253            assert_eq!(stats.min_opt().unwrap(), &-2);
2254            assert_eq!(stats.max_opt().unwrap(), &3);
2255        } else {
2256            panic!("expecting Statistics::Int32, got {stats:?}");
2257        }
2258    }
2259
2260    #[test]
2261    fn test_int64_statistics() {
2262        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2263        assert!(stats.is_min_max_backwards_compatible());
2264        if let Statistics::Int64(stats) = stats {
2265            assert_eq!(stats.min_opt().unwrap(), &-2);
2266            assert_eq!(stats.max_opt().unwrap(), &3);
2267        } else {
2268            panic!("expecting Statistics::Int64, got {stats:?}");
2269        }
2270    }
2271
2272    #[test]
2273    fn test_int96_statistics() {
2274        let input = vec![
2275            Int96::from(vec![1, 20, 30]),
2276            Int96::from(vec![3, 20, 10]),
2277            Int96::from(vec![0, 20, 30]),
2278            Int96::from(vec![2, 20, 30]),
2279        ]
2280        .into_iter()
2281        .collect::<Vec<Int96>>();
2282
2283        let stats = statistics_roundtrip::<Int96Type>(&input);
2284        assert!(!stats.is_min_max_backwards_compatible());
2285        if let Statistics::Int96(stats) = stats {
2286            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
2287            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2288        } else {
2289            panic!("expecting Statistics::Int96, got {stats:?}");
2290        }
2291    }
2292
2293    #[test]
2294    fn test_float_statistics() {
2295        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2296        assert!(stats.is_min_max_backwards_compatible());
2297        if let Statistics::Float(stats) = stats {
2298            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2299            assert_eq!(stats.max_opt().unwrap(), &3.0);
2300        } else {
2301            panic!("expecting Statistics::Float, got {stats:?}");
2302        }
2303    }
2304
2305    #[test]
2306    fn test_double_statistics() {
2307        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2308        assert!(stats.is_min_max_backwards_compatible());
2309        if let Statistics::Double(stats) = stats {
2310            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2311            assert_eq!(stats.max_opt().unwrap(), &3.0);
2312        } else {
2313            panic!("expecting Statistics::Double, got {stats:?}");
2314        }
2315    }
2316
2317    #[test]
2318    fn test_byte_array_statistics() {
2319        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2320            .iter()
2321            .map(|&s| s.into())
2322            .collect::<Vec<_>>();
2323
2324        let stats = statistics_roundtrip::<ByteArrayType>(&input);
2325        assert!(!stats.is_min_max_backwards_compatible());
2326        if let Statistics::ByteArray(stats) = stats {
2327            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2328            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2329        } else {
2330            panic!("expecting Statistics::ByteArray, got {stats:?}");
2331        }
2332    }
2333
2334    #[test]
2335    fn test_fixed_len_byte_array_statistics() {
2336        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
2337            .iter()
2338            .map(|&s| ByteArray::from(s).into())
2339            .collect::<Vec<_>>();
2340
2341        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2342        assert!(!stats.is_min_max_backwards_compatible());
2343        if let Statistics::FixedLenByteArray(stats) = stats {
2344            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
2345            assert_eq!(stats.min_opt().unwrap(), &expected_min);
2346            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
2347            assert_eq!(stats.max_opt().unwrap(), &expected_max);
2348        } else {
2349            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2350        }
2351    }
2352
2353    #[test]
2354    fn test_column_writer_check_float16_min_max() {
2355        let input = [
2356            -f16::ONE,
2357            f16::from_f32(3.0),
2358            -f16::from_f32(2.0),
2359            f16::from_f32(2.0),
2360        ]
2361        .into_iter()
2362        .map(|s| ByteArray::from(s).into())
2363        .collect::<Vec<_>>();
2364
2365        let stats = float16_statistics_roundtrip(&input);
2366        assert!(stats.is_min_max_backwards_compatible());
2367        assert_eq!(
2368            stats.min_opt().unwrap(),
2369            &ByteArray::from(-f16::from_f32(2.0))
2370        );
2371        assert_eq!(
2372            stats.max_opt().unwrap(),
2373            &ByteArray::from(f16::from_f32(3.0))
2374        );
2375    }
2376
2377    #[test]
2378    fn test_column_writer_check_float16_nan_middle() {
2379        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2380            .into_iter()
2381            .map(|s| ByteArray::from(s).into())
2382            .collect::<Vec<_>>();
2383
2384        let stats = float16_statistics_roundtrip(&input);
2385        assert!(stats.is_min_max_backwards_compatible());
2386        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2387        assert_eq!(
2388            stats.max_opt().unwrap(),
2389            &ByteArray::from(f16::ONE + f16::ONE)
2390        );
2391    }
2392
2393    #[test]
2394    fn test_float16_statistics_nan_middle() {
2395        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2396            .into_iter()
2397            .map(|s| ByteArray::from(s).into())
2398            .collect::<Vec<_>>();
2399
2400        let stats = float16_statistics_roundtrip(&input);
2401        assert!(stats.is_min_max_backwards_compatible());
2402        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2403        assert_eq!(
2404            stats.max_opt().unwrap(),
2405            &ByteArray::from(f16::ONE + f16::ONE)
2406        );
2407    }
2408
2409    #[test]
2410    fn test_float16_statistics_nan_start() {
2411        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2412            .into_iter()
2413            .map(|s| ByteArray::from(s).into())
2414            .collect::<Vec<_>>();
2415
2416        let stats = float16_statistics_roundtrip(&input);
2417        assert!(stats.is_min_max_backwards_compatible());
2418        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2419        assert_eq!(
2420            stats.max_opt().unwrap(),
2421            &ByteArray::from(f16::ONE + f16::ONE)
2422        );
2423    }
2424
2425    #[test]
2426    fn test_float16_statistics_nan_only() {
2427        let input = [f16::NAN, f16::NAN]
2428            .into_iter()
2429            .map(|s| ByteArray::from(s).into())
2430            .collect::<Vec<_>>();
2431
2432        let stats = float16_statistics_roundtrip(&input);
2433        assert!(stats.min_bytes_opt().is_none());
2434        assert!(stats.max_bytes_opt().is_none());
2435        assert!(stats.is_min_max_backwards_compatible());
2436    }
2437
2438    #[test]
2439    fn test_float16_statistics_zero_only() {
2440        let input = [f16::ZERO]
2441            .into_iter()
2442            .map(|s| ByteArray::from(s).into())
2443            .collect::<Vec<_>>();
2444
2445        let stats = float16_statistics_roundtrip(&input);
2446        assert!(stats.is_min_max_backwards_compatible());
2447        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2448        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2449    }
2450
2451    #[test]
2452    fn test_float16_statistics_neg_zero_only() {
2453        let input = [f16::NEG_ZERO]
2454            .into_iter()
2455            .map(|s| ByteArray::from(s).into())
2456            .collect::<Vec<_>>();
2457
2458        let stats = float16_statistics_roundtrip(&input);
2459        assert!(stats.is_min_max_backwards_compatible());
2460        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2461        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2462    }
2463
2464    #[test]
2465    fn test_float16_statistics_zero_min() {
2466        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2467            .into_iter()
2468            .map(|s| ByteArray::from(s).into())
2469            .collect::<Vec<_>>();
2470
2471        let stats = float16_statistics_roundtrip(&input);
2472        assert!(stats.is_min_max_backwards_compatible());
2473        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2474        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2475    }
2476
2477    #[test]
2478    fn test_float16_statistics_neg_zero_max() {
2479        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2480            .into_iter()
2481            .map(|s| ByteArray::from(s).into())
2482            .collect::<Vec<_>>();
2483
2484        let stats = float16_statistics_roundtrip(&input);
2485        assert!(stats.is_min_max_backwards_compatible());
2486        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2487        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2488    }
2489
2490    #[test]
2491    fn test_float_statistics_nan_middle() {
2492        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2493        assert!(stats.is_min_max_backwards_compatible());
2494        if let Statistics::Float(stats) = stats {
2495            assert_eq!(stats.min_opt().unwrap(), &1.0);
2496            assert_eq!(stats.max_opt().unwrap(), &2.0);
2497        } else {
2498            panic!("expecting Statistics::Float");
2499        }
2500    }
2501
2502    #[test]
2503    fn test_float_statistics_nan_start() {
2504        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2505        assert!(stats.is_min_max_backwards_compatible());
2506        if let Statistics::Float(stats) = stats {
2507            assert_eq!(stats.min_opt().unwrap(), &1.0);
2508            assert_eq!(stats.max_opt().unwrap(), &2.0);
2509        } else {
2510            panic!("expecting Statistics::Float");
2511        }
2512    }
2513
2514    #[test]
2515    fn test_float_statistics_nan_only() {
2516        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2517        assert!(stats.min_bytes_opt().is_none());
2518        assert!(stats.max_bytes_opt().is_none());
2519        assert!(stats.is_min_max_backwards_compatible());
2520        assert!(matches!(stats, Statistics::Float(_)));
2521    }
2522
2523    #[test]
2524    fn test_float_statistics_zero_only() {
2525        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2526        assert!(stats.is_min_max_backwards_compatible());
2527        if let Statistics::Float(stats) = stats {
2528            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2529            assert!(stats.min_opt().unwrap().is_sign_negative());
2530            assert_eq!(stats.max_opt().unwrap(), &0.0);
2531            assert!(stats.max_opt().unwrap().is_sign_positive());
2532        } else {
2533            panic!("expecting Statistics::Float");
2534        }
2535    }
2536
2537    #[test]
2538    fn test_float_statistics_neg_zero_only() {
2539        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2540        assert!(stats.is_min_max_backwards_compatible());
2541        if let Statistics::Float(stats) = stats {
2542            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2543            assert!(stats.min_opt().unwrap().is_sign_negative());
2544            assert_eq!(stats.max_opt().unwrap(), &0.0);
2545            assert!(stats.max_opt().unwrap().is_sign_positive());
2546        } else {
2547            panic!("expecting Statistics::Float");
2548        }
2549    }
2550
2551    #[test]
2552    fn test_float_statistics_zero_min() {
2553        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2554        assert!(stats.is_min_max_backwards_compatible());
2555        if let Statistics::Float(stats) = stats {
2556            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2557            assert!(stats.min_opt().unwrap().is_sign_negative());
2558            assert_eq!(stats.max_opt().unwrap(), &2.0);
2559        } else {
2560            panic!("expecting Statistics::Float");
2561        }
2562    }
2563
2564    #[test]
2565    fn test_float_statistics_neg_zero_max() {
2566        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2567        assert!(stats.is_min_max_backwards_compatible());
2568        if let Statistics::Float(stats) = stats {
2569            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2570            assert_eq!(stats.max_opt().unwrap(), &0.0);
2571            assert!(stats.max_opt().unwrap().is_sign_positive());
2572        } else {
2573            panic!("expecting Statistics::Float");
2574        }
2575    }
2576
2577    #[test]
2578    fn test_double_statistics_nan_middle() {
2579        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2580        assert!(stats.is_min_max_backwards_compatible());
2581        if let Statistics::Double(stats) = stats {
2582            assert_eq!(stats.min_opt().unwrap(), &1.0);
2583            assert_eq!(stats.max_opt().unwrap(), &2.0);
2584        } else {
2585            panic!("expecting Statistics::Double");
2586        }
2587    }
2588
2589    #[test]
2590    fn test_double_statistics_nan_start() {
2591        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2592        assert!(stats.is_min_max_backwards_compatible());
2593        if let Statistics::Double(stats) = stats {
2594            assert_eq!(stats.min_opt().unwrap(), &1.0);
2595            assert_eq!(stats.max_opt().unwrap(), &2.0);
2596        } else {
2597            panic!("expecting Statistics::Double");
2598        }
2599    }
2600
2601    #[test]
2602    fn test_double_statistics_nan_only() {
2603        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2604        assert!(stats.min_bytes_opt().is_none());
2605        assert!(stats.max_bytes_opt().is_none());
2606        assert!(matches!(stats, Statistics::Double(_)));
2607        assert!(stats.is_min_max_backwards_compatible());
2608    }
2609
2610    #[test]
2611    fn test_double_statistics_zero_only() {
2612        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2613        assert!(stats.is_min_max_backwards_compatible());
2614        if let Statistics::Double(stats) = stats {
2615            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2616            assert!(stats.min_opt().unwrap().is_sign_negative());
2617            assert_eq!(stats.max_opt().unwrap(), &0.0);
2618            assert!(stats.max_opt().unwrap().is_sign_positive());
2619        } else {
2620            panic!("expecting Statistics::Double");
2621        }
2622    }
2623
2624    #[test]
2625    fn test_double_statistics_neg_zero_only() {
2626        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2627        assert!(stats.is_min_max_backwards_compatible());
2628        if let Statistics::Double(stats) = stats {
2629            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2630            assert!(stats.min_opt().unwrap().is_sign_negative());
2631            assert_eq!(stats.max_opt().unwrap(), &0.0);
2632            assert!(stats.max_opt().unwrap().is_sign_positive());
2633        } else {
2634            panic!("expecting Statistics::Double");
2635        }
2636    }
2637
2638    #[test]
2639    fn test_double_statistics_zero_min() {
2640        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2641        assert!(stats.is_min_max_backwards_compatible());
2642        if let Statistics::Double(stats) = stats {
2643            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2644            assert!(stats.min_opt().unwrap().is_sign_negative());
2645            assert_eq!(stats.max_opt().unwrap(), &2.0);
2646        } else {
2647            panic!("expecting Statistics::Double");
2648        }
2649    }
2650
2651    #[test]
2652    fn test_double_statistics_neg_zero_max() {
2653        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2654        assert!(stats.is_min_max_backwards_compatible());
2655        if let Statistics::Double(stats) = stats {
2656            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2657            assert_eq!(stats.max_opt().unwrap(), &0.0);
2658            assert!(stats.max_opt().unwrap().is_sign_positive());
2659        } else {
2660            panic!("expecting Statistics::Double");
2661        }
2662    }
2663
2664    #[test]
2665    fn test_compare_greater_byte_array_decimals() {
2666        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2667        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2668        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2669        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2670        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2671        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2672        assert!(!compare_greater_byte_array_decimals(
2673            &[0u8, 1u8,],
2674            &[1u8, 0u8,],
2675        ),);
2676        assert!(!compare_greater_byte_array_decimals(
2677            &[255u8, 35u8, 0u8, 0u8,],
2678            &[0u8,],
2679        ),);
2680        assert!(compare_greater_byte_array_decimals(
2681            &[0u8,],
2682            &[255u8, 35u8, 0u8, 0u8,],
2683        ),);
2684    }
2685
2686    #[test]
2687    fn test_column_index_with_null_pages() {
2688        // write a single page of all nulls
2689        let page_writer = get_test_page_writer();
2690        let props = Default::default();
2691        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2692        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2693
2694        let r = writer.close().unwrap();
2695        assert!(r.column_index.is_some());
2696        let col_idx = r.column_index.unwrap();
2697        // null_pages should be true for page 0
2698        assert!(col_idx.null_pages[0]);
2699        // min and max should be empty byte arrays
2700        assert_eq!(col_idx.min_values[0].len(), 0);
2701        assert_eq!(col_idx.max_values[0].len(), 0);
2702        // null_counts should be defined and be 4 for page 0
2703        assert!(col_idx.null_counts.is_some());
2704        assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2705        // there is no repetition so rep histogram should be absent
2706        assert!(col_idx.repetition_level_histograms.is_none());
2707        // definition_level_histogram should be present and should be 0:4, 1:0
2708        assert!(col_idx.definition_level_histograms.is_some());
2709        assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2710    }
2711
2712    #[test]
2713    fn test_column_offset_index_metadata() {
2714        // write data
2715        // and check the offset index and column index
2716        let page_writer = get_test_page_writer();
2717        let props = Default::default();
2718        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2719        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2720        // first page
2721        writer.flush_data_pages().unwrap();
2722        // second page
2723        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2724
2725        let r = writer.close().unwrap();
2726        let column_index = r.column_index.unwrap();
2727        let offset_index = r.offset_index.unwrap();
2728
2729        assert_eq!(8, r.rows_written);
2730
2731        // column index
2732        assert_eq!(2, column_index.null_pages.len());
2733        assert_eq!(2, offset_index.page_locations.len());
2734        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2735        for idx in 0..2 {
2736            assert!(!column_index.null_pages[idx]);
2737            assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2738        }
2739
2740        if let Some(stats) = r.metadata.statistics() {
2741            assert_eq!(stats.null_count_opt(), Some(0));
2742            assert_eq!(stats.distinct_count_opt(), None);
2743            if let Statistics::Int32(stats) = stats {
2744                // first page is [1,2,3,4]
2745                // second page is [-5,2,4,8]
2746                // note that we don't increment here, as this is a non BinaryArray type.
2747                assert_eq!(
2748                    stats.min_bytes_opt(),
2749                    Some(column_index.min_values[1].as_slice())
2750                );
2751                assert_eq!(
2752                    stats.max_bytes_opt(),
2753                    column_index.max_values.get(1).map(Vec::as_slice)
2754                );
2755            } else {
2756                panic!("expecting Statistics::Int32");
2757            }
2758        } else {
2759            panic!("metadata missing statistics");
2760        }
2761
2762        // page location
2763        assert_eq!(0, offset_index.page_locations[0].first_row_index);
2764        assert_eq!(4, offset_index.page_locations[1].first_row_index);
2765    }
2766
2767    /// Verify min/max value truncation in the column index works as expected
2768    #[test]
2769    fn test_column_offset_index_metadata_truncating() {
2770        // write data
2771        // and check the offset index and column index
2772        let page_writer = get_test_page_writer();
2773        let props = Default::default();
2774        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
2775
2776        let mut data = vec![FixedLenByteArray::default(); 3];
2777        // This is the expected min value - "aaa..."
2778        data[0].set_data(Bytes::from(vec![97_u8; 200]));
2779        // This is the expected max value - "ZZZ..."
2780        data[1].set_data(Bytes::from(vec![112_u8; 200]));
2781        data[2].set_data(Bytes::from(vec![98_u8; 200]));
2782
2783        writer.write_batch(&data, None, None).unwrap();
2784
2785        writer.flush_data_pages().unwrap();
2786
2787        let r = writer.close().unwrap();
2788        let column_index = r.column_index.unwrap();
2789        let offset_index = r.offset_index.unwrap();
2790
2791        assert_eq!(3, r.rows_written);
2792
2793        // column index
2794        assert_eq!(1, column_index.null_pages.len());
2795        assert_eq!(1, offset_index.page_locations.len());
2796        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
2797        assert!(!column_index.null_pages[0]);
2798        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
2799
2800        if let Some(stats) = r.metadata.statistics() {
2801            assert_eq!(stats.null_count_opt(), Some(0));
2802            assert_eq!(stats.distinct_count_opt(), None);
2803            if let Statistics::FixedLenByteArray(stats) = stats {
2804                let column_index_min_value = &column_index.min_values[0];
2805                let column_index_max_value = &column_index.max_values[0];
2806
2807                // Column index stats are truncated, while the column chunk's aren't.
2808                assert_ne!(
2809                    stats.min_bytes_opt(),
2810                    Some(column_index_min_value.as_slice())
2811                );
2812                assert_ne!(
2813                    stats.max_bytes_opt(),
2814                    Some(column_index_max_value.as_slice())
2815                );
2816
2817                assert_eq!(
2818                    column_index_min_value.len(),
2819                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
2820                );
2821                assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
2822                assert_eq!(
2823                    column_index_max_value.len(),
2824                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
2825                );
2826
2827                // We expect the last byte to be incremented
2828                assert_eq!(
2829                    *column_index_max_value.last().unwrap(),
2830                    *column_index_max_value.first().unwrap() + 1
2831                );
2832            } else {
2833                panic!("expecting Statistics::FixedLenByteArray");
2834            }
2835        } else {
2836            panic!("metadata missing statistics");
2837        }
2838    }
2839
2840    #[test]
2841    fn test_column_offset_index_truncating_spec_example() {
2842        // write data
2843        // and check the offset index and column index
2844        let page_writer = get_test_page_writer();
2845
2846        // Truncate values at 1 byte
2847        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
2848        let props = Arc::new(builder.build());
2849        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
2850
2851        let mut data = vec![FixedLenByteArray::default(); 1];
2852        // This is the expected min value
2853        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
2854
2855        writer.write_batch(&data, None, None).unwrap();
2856
2857        writer.flush_data_pages().unwrap();
2858
2859        let r = writer.close().unwrap();
2860        let column_index = r.column_index.unwrap();
2861        let offset_index = r.offset_index.unwrap();
2862
2863        assert_eq!(1, r.rows_written);
2864
2865        // column index
2866        assert_eq!(1, column_index.null_pages.len());
2867        assert_eq!(1, offset_index.page_locations.len());
2868        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
2869        assert!(!column_index.null_pages[0]);
2870        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
2871
2872        if let Some(stats) = r.metadata.statistics() {
2873            assert_eq!(stats.null_count_opt(), Some(0));
2874            assert_eq!(stats.distinct_count_opt(), None);
2875            if let Statistics::FixedLenByteArray(_stats) = stats {
2876                let column_index_min_value = &column_index.min_values[0];
2877                let column_index_max_value = &column_index.max_values[0];
2878
2879                assert_eq!(column_index_min_value.len(), 1);
2880                assert_eq!(column_index_max_value.len(), 1);
2881
2882                assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
2883                assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
2884
2885                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
2886                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
2887            } else {
2888                panic!("expecting Statistics::FixedLenByteArray");
2889            }
2890        } else {
2891            panic!("metadata missing statistics");
2892        }
2893    }
2894
2895    #[test]
2896    fn test_float16_min_max_no_truncation() {
2897        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
2898        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
2899        let props = Arc::new(builder.build());
2900        let page_writer = get_test_page_writer();
2901        let mut writer = get_test_float16_column_writer(page_writer, props);
2902
2903        let expected_value = f16::PI.to_le_bytes().to_vec();
2904        let data = vec![ByteArray::from(expected_value.clone()).into()];
2905        writer.write_batch(&data, None, None).unwrap();
2906        writer.flush_data_pages().unwrap();
2907
2908        let r = writer.close().unwrap();
2909
2910        // stats should still be written
2911        // ensure bytes weren't truncated for column index
2912        let column_index = r.column_index.unwrap();
2913        let column_index_min_bytes = column_index.min_values[0].as_slice();
2914        let column_index_max_bytes = column_index.max_values[0].as_slice();
2915        assert_eq!(expected_value, column_index_min_bytes);
2916        assert_eq!(expected_value, column_index_max_bytes);
2917
2918        // ensure bytes weren't truncated for statistics
2919        let stats = r.metadata.statistics().unwrap();
2920        if let Statistics::FixedLenByteArray(stats) = stats {
2921            let stats_min_bytes = stats.min_bytes_opt().unwrap();
2922            let stats_max_bytes = stats.max_bytes_opt().unwrap();
2923            assert_eq!(expected_value, stats_min_bytes);
2924            assert_eq!(expected_value, stats_max_bytes);
2925        } else {
2926            panic!("expecting Statistics::FixedLenByteArray");
2927        }
2928    }
2929
2930    #[test]
2931    fn test_decimal_min_max_no_truncation() {
2932        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
2933        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
2934        let props = Arc::new(builder.build());
2935        let page_writer = get_test_page_writer();
2936        let mut writer =
2937            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
2938
2939        let expected_value = vec![
2940            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
2941            231u8, 90u8, 0u8, 0u8,
2942        ];
2943        let data = vec![ByteArray::from(expected_value.clone()).into()];
2944        writer.write_batch(&data, None, None).unwrap();
2945        writer.flush_data_pages().unwrap();
2946
2947        let r = writer.close().unwrap();
2948
2949        // stats should still be written
2950        // ensure bytes weren't truncated for column index
2951        let column_index = r.column_index.unwrap();
2952        let column_index_min_bytes = column_index.min_values[0].as_slice();
2953        let column_index_max_bytes = column_index.max_values[0].as_slice();
2954        assert_eq!(expected_value, column_index_min_bytes);
2955        assert_eq!(expected_value, column_index_max_bytes);
2956
2957        // ensure bytes weren't truncated for statistics
2958        let stats = r.metadata.statistics().unwrap();
2959        if let Statistics::FixedLenByteArray(stats) = stats {
2960            let stats_min_bytes = stats.min_bytes_opt().unwrap();
2961            let stats_max_bytes = stats.max_bytes_opt().unwrap();
2962            assert_eq!(expected_value, stats_min_bytes);
2963            assert_eq!(expected_value, stats_max_bytes);
2964        } else {
2965            panic!("expecting Statistics::FixedLenByteArray");
2966        }
2967    }
2968
2969    #[test]
2970    fn test_statistics_truncating_byte_array() {
2971        let page_writer = get_test_page_writer();
2972
2973        const TEST_TRUNCATE_LENGTH: usize = 1;
2974
2975        // Truncate values at 1 byte
2976        let builder =
2977            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
2978        let props = Arc::new(builder.build());
2979        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2980
2981        let mut data = vec![ByteArray::default(); 1];
2982        // This is the expected min value
2983        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
2984
2985        writer.write_batch(&data, None, None).unwrap();
2986
2987        writer.flush_data_pages().unwrap();
2988
2989        let r = writer.close().unwrap();
2990
2991        assert_eq!(1, r.rows_written);
2992
2993        let stats = r.metadata.statistics().expect("statistics");
2994        assert_eq!(stats.null_count_opt(), Some(0));
2995        assert_eq!(stats.distinct_count_opt(), None);
2996        if let Statistics::ByteArray(_stats) = stats {
2997            let min_value = _stats.min_opt().unwrap();
2998            let max_value = _stats.max_opt().unwrap();
2999
3000            assert!(!_stats.min_is_exact());
3001            assert!(!_stats.max_is_exact());
3002
3003            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3004            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3005
3006            assert_eq!("B".as_bytes(), min_value.as_bytes());
3007            assert_eq!("C".as_bytes(), max_value.as_bytes());
3008        } else {
3009            panic!("expecting Statistics::ByteArray");
3010        }
3011    }
3012
3013    #[test]
3014    fn test_statistics_truncating_fixed_len_byte_array() {
3015        let page_writer = get_test_page_writer();
3016
3017        const TEST_TRUNCATE_LENGTH: usize = 1;
3018
3019        // Truncate values at 1 byte
3020        let builder =
3021            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3022        let props = Arc::new(builder.build());
3023        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3024
3025        let mut data = vec![FixedLenByteArray::default(); 1];
3026
3027        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3028        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3029
3030        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
3031        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3032            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3033
3034        // This is the expected min value
3035        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3036
3037        writer.write_batch(&data, None, None).unwrap();
3038
3039        writer.flush_data_pages().unwrap();
3040
3041        let r = writer.close().unwrap();
3042
3043        assert_eq!(1, r.rows_written);
3044
3045        let stats = r.metadata.statistics().expect("statistics");
3046        assert_eq!(stats.null_count_opt(), Some(0));
3047        assert_eq!(stats.distinct_count_opt(), None);
3048        if let Statistics::FixedLenByteArray(_stats) = stats {
3049            let min_value = _stats.min_opt().unwrap();
3050            let max_value = _stats.max_opt().unwrap();
3051
3052            assert!(!_stats.min_is_exact());
3053            assert!(!_stats.max_is_exact());
3054
3055            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3056            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3057
3058            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3059            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3060
3061            let reconstructed_min = i128::from_be_bytes([
3062                min_value.as_bytes()[0],
3063                0,
3064                0,
3065                0,
3066                0,
3067                0,
3068                0,
3069                0,
3070                0,
3071                0,
3072                0,
3073                0,
3074                0,
3075                0,
3076                0,
3077                0,
3078            ]);
3079
3080            let reconstructed_max = i128::from_be_bytes([
3081                max_value.as_bytes()[0],
3082                0,
3083                0,
3084                0,
3085                0,
3086                0,
3087                0,
3088                0,
3089                0,
3090                0,
3091                0,
3092                0,
3093                0,
3094                0,
3095                0,
3096                0,
3097            ]);
3098
3099            // check that the inner value is correctly bounded by the min/max
3100            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3101            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3102            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3103            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3104        } else {
3105            panic!("expecting Statistics::FixedLenByteArray");
3106        }
3107    }
3108
3109    #[test]
3110    fn test_send() {
3111        fn test<T: Send>() {}
3112        test::<ColumnWriterImpl<Int32Type>>();
3113    }
3114
3115    #[test]
3116    fn test_increment() {
3117        let v = increment(vec![0, 0, 0]).unwrap();
3118        assert_eq!(&v, &[0, 0, 1]);
3119
3120        // Handle overflow
3121        let v = increment(vec![0, 255, 255]).unwrap();
3122        assert_eq!(&v, &[1, 0, 0]);
3123
3124        // Return `None` if all bytes are u8::MAX
3125        let v = increment(vec![255, 255, 255]);
3126        assert!(v.is_none());
3127    }
3128
3129    #[test]
3130    fn test_increment_utf8() {
3131        // Basic ASCII case
3132        let v = increment_utf8("hello".as_bytes().to_vec()).unwrap();
3133        assert_eq!(&v, "hellp".as_bytes());
3134
3135        // Also show that BinaryArray level comparison works here
3136        let mut greater = ByteArray::new();
3137        greater.set_data(Bytes::from(v));
3138        let mut original = ByteArray::new();
3139        original.set_data(Bytes::from("hello".as_bytes().to_vec()));
3140        assert!(greater > original);
3141
3142        // UTF8 string
3143        let s = "❤️🧡💛💚💙💜";
3144        let v = increment_utf8(s.as_bytes().to_vec()).unwrap();
3145
3146        if let Ok(new) = String::from_utf8(v) {
3147            assert_ne!(&new, s);
3148            assert_eq!(new, "❤️🧡💛💚💙💝");
3149            assert!(new.as_bytes().last().unwrap() > s.as_bytes().last().unwrap());
3150        } else {
3151            panic!("Expected incremented UTF8 string to also be valid.")
3152        }
3153
3154        // Max UTF8 character - should be a No-Op
3155        let s = char::MAX.to_string();
3156        assert_eq!(s.len(), 4);
3157        let v = increment_utf8(s.as_bytes().to_vec());
3158        assert!(v.is_none());
3159
3160        // Handle multi-byte UTF8 characters
3161        let s = "a\u{10ffff}";
3162        let v = increment_utf8(s.as_bytes().to_vec());
3163        assert_eq!(&v.unwrap(), "b\u{10ffff}".as_bytes());
3164    }
3165
3166    #[test]
3167    fn test_truncate_utf8() {
3168        // No-op
3169        let data = "❤️🧡💛💚💙💜";
3170        let r = truncate_utf8(data, data.as_bytes().len()).unwrap();
3171        assert_eq!(r.len(), data.as_bytes().len());
3172        assert_eq!(&r, data.as_bytes());
3173        println!("len is {}", data.len());
3174
3175        // We slice it away from the UTF8 boundary
3176        let r = truncate_utf8(data, 13).unwrap();
3177        assert_eq!(r.len(), 10);
3178        assert_eq!(&r, "❤️🧡".as_bytes());
3179
3180        // One multi-byte code point, and a length shorter than it, so we can't slice it
3181        let r = truncate_utf8("\u{0836}", 1);
3182        assert!(r.is_none());
3183    }
3184
3185    #[test]
3186    fn test_increment_max_binary_chars() {
3187        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3188        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3189
3190        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3191        assert!(incremented.is_none())
3192    }
3193
3194    #[test]
3195    fn test_no_column_index_when_stats_disabled() {
3196        // https://github.com/apache/arrow-rs/issues/6010
3197        // Test that column index is not created/written for all-nulls column when page
3198        // statistics are disabled.
3199        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3200        let props = Arc::new(
3201            WriterProperties::builder()
3202                .set_statistics_enabled(EnabledStatistics::None)
3203                .build(),
3204        );
3205        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3206        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3207
3208        let data = Vec::new();
3209        let def_levels = vec![0; 10];
3210        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3211        writer.flush_data_pages().unwrap();
3212
3213        let column_close_result = writer.close().unwrap();
3214        assert!(column_close_result.offset_index.is_some());
3215        assert!(column_close_result.column_index.is_none());
3216    }
3217
3218    #[test]
3219    fn test_boundary_order() -> Result<()> {
3220        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3221        // min max both ascending
3222        let column_close_result = write_multiple_pages::<Int32Type>(
3223            &descr,
3224            &[
3225                &[Some(-10), Some(10)],
3226                &[Some(-5), Some(11)],
3227                &[None],
3228                &[Some(-5), Some(11)],
3229            ],
3230        )?;
3231        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3232        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3233
3234        // min max both descending
3235        let column_close_result = write_multiple_pages::<Int32Type>(
3236            &descr,
3237            &[
3238                &[Some(10), Some(11)],
3239                &[Some(5), Some(11)],
3240                &[None],
3241                &[Some(-5), Some(0)],
3242            ],
3243        )?;
3244        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3245        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3246
3247        // min max both equal
3248        let column_close_result = write_multiple_pages::<Int32Type>(
3249            &descr,
3250            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3251        )?;
3252        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3253        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3254
3255        // only nulls
3256        let column_close_result =
3257            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3258        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3259        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3260
3261        // one page
3262        let column_close_result =
3263            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3264        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3265        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3266
3267        // one non-null page
3268        let column_close_result =
3269            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3270        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3271        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3272
3273        // min max both unordered
3274        let column_close_result = write_multiple_pages::<Int32Type>(
3275            &descr,
3276            &[
3277                &[Some(10), Some(11)],
3278                &[Some(11), Some(16)],
3279                &[None],
3280                &[Some(-5), Some(0)],
3281            ],
3282        )?;
3283        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3284        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3285
3286        // min max both ordered in different orders
3287        let column_close_result = write_multiple_pages::<Int32Type>(
3288            &descr,
3289            &[
3290                &[Some(1), Some(9)],
3291                &[Some(2), Some(8)],
3292                &[None],
3293                &[Some(3), Some(7)],
3294            ],
3295        )?;
3296        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3297        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3298
3299        Ok(())
3300    }
3301
3302    #[test]
3303    fn test_boundary_order_logical_type() -> Result<()> {
3304        // ensure that logical types account for different sort order than underlying
3305        // physical type representation
3306        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3307        let fba_descr = {
3308            let tpe = SchemaType::primitive_type_builder(
3309                "col",
3310                FixedLenByteArrayType::get_physical_type(),
3311            )
3312            .with_length(2)
3313            .build()?;
3314            Arc::new(ColumnDescriptor::new(
3315                Arc::new(tpe),
3316                1,
3317                0,
3318                ColumnPath::from("col"),
3319            ))
3320        };
3321
3322        let values: &[&[Option<FixedLenByteArray>]] = &[
3323            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3324            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3325            &[Some(FixedLenByteArray::from(ByteArray::from(
3326                f16::NEG_ZERO,
3327            )))],
3328            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3329        ];
3330
3331        // f16 descending
3332        let column_close_result =
3333            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3334        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3335        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3336
3337        // same bytes, but fba unordered
3338        let column_close_result =
3339            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3340        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3341        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3342
3343        Ok(())
3344    }
3345
3346    #[test]
3347    fn test_interval_stats_should_not_have_min_max() {
3348        let input = [
3349            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3350            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3351            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3352        ]
3353        .into_iter()
3354        .map(|s| ByteArray::from(s).into())
3355        .collect::<Vec<_>>();
3356
3357        let page_writer = get_test_page_writer();
3358        let mut writer = get_test_interval_column_writer(page_writer);
3359        writer.write_batch(&input, None, None).unwrap();
3360
3361        let metadata = writer.close().unwrap().metadata;
3362        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3363            stats.clone()
3364        } else {
3365            panic!("metadata missing statistics");
3366        };
3367        assert!(stats.min_bytes_opt().is_none());
3368        assert!(stats.max_bytes_opt().is_none());
3369    }
3370
3371    fn write_multiple_pages<T: DataType>(
3372        column_descr: &Arc<ColumnDescriptor>,
3373        pages: &[&[Option<T::T>]],
3374    ) -> Result<ColumnCloseResult> {
3375        let column_writer = get_column_writer(
3376            column_descr.clone(),
3377            Default::default(),
3378            get_test_page_writer(),
3379        );
3380        let mut writer = get_typed_column_writer::<T>(column_writer);
3381
3382        for &page in pages {
3383            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3384            let def_levels = page
3385                .iter()
3386                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3387                .collect::<Vec<_>>();
3388            writer.write_batch(&values, Some(&def_levels), None)?;
3389            writer.flush_data_pages()?;
3390        }
3391
3392        writer.close()
3393    }
3394
3395    /// Performs write-read roundtrip with randomly generated values and levels.
3396    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
3397    /// for a column.
3398    fn column_roundtrip_random<T: DataType>(
3399        props: WriterProperties,
3400        max_size: usize,
3401        min_value: T::T,
3402        max_value: T::T,
3403        max_def_level: i16,
3404        max_rep_level: i16,
3405    ) where
3406        T::T: PartialOrd + SampleUniform + Copy,
3407    {
3408        let mut num_values: usize = 0;
3409
3410        let mut buf: Vec<i16> = Vec::new();
3411        let def_levels = if max_def_level > 0 {
3412            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3413            for &dl in &buf[..] {
3414                if dl == max_def_level {
3415                    num_values += 1;
3416                }
3417            }
3418            Some(&buf[..])
3419        } else {
3420            num_values = max_size;
3421            None
3422        };
3423
3424        let mut buf: Vec<i16> = Vec::new();
3425        let rep_levels = if max_rep_level > 0 {
3426            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3427            buf[0] = 0; // Must start on record boundary
3428            Some(&buf[..])
3429        } else {
3430            None
3431        };
3432
3433        let mut values: Vec<T::T> = Vec::new();
3434        random_numbers_range(num_values, min_value, max_value, &mut values);
3435
3436        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3437    }
3438
3439    /// Performs write-read roundtrip and asserts written values and levels.
3440    fn column_roundtrip<T: DataType>(
3441        props: WriterProperties,
3442        values: &[T::T],
3443        def_levels: Option<&[i16]>,
3444        rep_levels: Option<&[i16]>,
3445    ) {
3446        let mut file = tempfile::tempfile().unwrap();
3447        let mut write = TrackedWrite::new(&mut file);
3448        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3449
3450        let max_def_level = match def_levels {
3451            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3452            None => 0i16,
3453        };
3454
3455        let max_rep_level = match rep_levels {
3456            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3457            None => 0i16,
3458        };
3459
3460        let mut max_batch_size = values.len();
3461        if let Some(levels) = def_levels {
3462            max_batch_size = max_batch_size.max(levels.len());
3463        }
3464        if let Some(levels) = rep_levels {
3465            max_batch_size = max_batch_size.max(levels.len());
3466        }
3467
3468        let mut writer =
3469            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3470
3471        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3472        assert_eq!(values_written, values.len());
3473        let result = writer.close().unwrap();
3474
3475        drop(write);
3476
3477        let props = ReaderProperties::builder()
3478            .set_backward_compatible_lz4(false)
3479            .build();
3480        let page_reader = Box::new(
3481            SerializedPageReader::new_with_properties(
3482                Arc::new(file),
3483                &result.metadata,
3484                result.rows_written as usize,
3485                None,
3486                Arc::new(props),
3487            )
3488            .unwrap(),
3489        );
3490        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3491
3492        let mut actual_values = Vec::with_capacity(max_batch_size);
3493        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3494        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3495
3496        let (_, values_read, levels_read) = reader
3497            .read_records(
3498                max_batch_size,
3499                actual_def_levels.as_mut(),
3500                actual_rep_levels.as_mut(),
3501                &mut actual_values,
3502            )
3503            .unwrap();
3504
3505        // Assert values, definition and repetition levels.
3506
3507        assert_eq!(&actual_values[..values_read], values);
3508        match actual_def_levels {
3509            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3510            None => assert_eq!(None, def_levels),
3511        }
3512        match actual_rep_levels {
3513            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3514            None => assert_eq!(None, rep_levels),
3515        }
3516
3517        // Assert written rows.
3518
3519        if let Some(levels) = actual_rep_levels {
3520            let mut actual_rows_written = 0;
3521            for l in levels {
3522                if l == 0 {
3523                    actual_rows_written += 1;
3524                }
3525            }
3526            assert_eq!(actual_rows_written, result.rows_written);
3527        } else if actual_def_levels.is_some() {
3528            assert_eq!(levels_read as u64, result.rows_written);
3529        } else {
3530            assert_eq!(values_read as u64, result.rows_written);
3531        }
3532    }
3533
3534    /// Performs write of provided values and returns column metadata of those values.
3535    /// Used to test encoding support for column writer.
3536    fn column_write_and_get_metadata<T: DataType>(
3537        props: WriterProperties,
3538        values: &[T::T],
3539    ) -> ColumnChunkMetaData {
3540        let page_writer = get_test_page_writer();
3541        let props = Arc::new(props);
3542        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3543        writer.write_batch(values, None, None).unwrap();
3544        writer.close().unwrap().metadata
3545    }
3546
3547    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
3548    // offset and encodings to make sure that column writer uses provided by trait
3549    // encodings.
3550    fn check_encoding_write_support<T: DataType>(
3551        version: WriterVersion,
3552        dict_enabled: bool,
3553        data: &[T::T],
3554        dictionary_page_offset: Option<i64>,
3555        encodings: &[Encoding],
3556    ) {
3557        let props = WriterProperties::builder()
3558            .set_writer_version(version)
3559            .set_dictionary_enabled(dict_enabled)
3560            .build();
3561        let meta = column_write_and_get_metadata::<T>(props, data);
3562        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
3563        assert_eq!(meta.encodings(), &encodings);
3564    }
3565
3566    /// Returns column writer.
3567    fn get_test_column_writer<'a, T: DataType>(
3568        page_writer: Box<dyn PageWriter + 'a>,
3569        max_def_level: i16,
3570        max_rep_level: i16,
3571        props: WriterPropertiesPtr,
3572    ) -> ColumnWriterImpl<'a, T> {
3573        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
3574        let column_writer = get_column_writer(descr, props, page_writer);
3575        get_typed_column_writer::<T>(column_writer)
3576    }
3577
3578    /// Returns column reader.
3579    fn get_test_column_reader<T: DataType>(
3580        page_reader: Box<dyn PageReader>,
3581        max_def_level: i16,
3582        max_rep_level: i16,
3583    ) -> ColumnReaderImpl<T> {
3584        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
3585        let column_reader = get_column_reader(descr, page_reader);
3586        get_typed_column_reader::<T>(column_reader)
3587    }
3588
3589    /// Returns descriptor for primitive column.
3590    fn get_test_column_descr<T: DataType>(
3591        max_def_level: i16,
3592        max_rep_level: i16,
3593    ) -> ColumnDescriptor {
3594        let path = ColumnPath::from("col");
3595        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
3596            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
3597            // it should be no-op for other types
3598            .with_length(1)
3599            .build()
3600            .unwrap();
3601        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3602    }
3603
3604    /// Returns page writer that collects pages without serializing them.
3605    fn get_test_page_writer() -> Box<dyn PageWriter> {
3606        Box::new(TestPageWriter {})
3607    }
3608
3609    struct TestPageWriter {}
3610
3611    impl PageWriter for TestPageWriter {
3612        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
3613            let mut res = PageWriteSpec::new();
3614            res.page_type = page.page_type();
3615            res.uncompressed_size = page.uncompressed_size();
3616            res.compressed_size = page.compressed_size();
3617            res.num_values = page.num_values();
3618            res.offset = 0;
3619            res.bytes_written = page.data().len() as u64;
3620            Ok(res)
3621        }
3622
3623        fn close(&mut self) -> Result<()> {
3624            Ok(())
3625        }
3626    }
3627
3628    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
3629    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
3630        let page_writer = get_test_page_writer();
3631        let props = Default::default();
3632        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3633        writer.write_batch(values, None, None).unwrap();
3634
3635        let metadata = writer.close().unwrap().metadata;
3636        if let Some(stats) = metadata.statistics() {
3637            stats.clone()
3638        } else {
3639            panic!("metadata missing statistics");
3640        }
3641    }
3642
3643    /// Returns Decimals column writer.
3644    fn get_test_decimals_column_writer<T: DataType>(
3645        page_writer: Box<dyn PageWriter>,
3646        max_def_level: i16,
3647        max_rep_level: i16,
3648        props: WriterPropertiesPtr,
3649    ) -> ColumnWriterImpl<'static, T> {
3650        let descr = Arc::new(get_test_decimals_column_descr::<T>(
3651            max_def_level,
3652            max_rep_level,
3653        ));
3654        let column_writer = get_column_writer(descr, props, page_writer);
3655        get_typed_column_writer::<T>(column_writer)
3656    }
3657
3658    /// Returns descriptor for Decimal type with primitive column.
3659    fn get_test_decimals_column_descr<T: DataType>(
3660        max_def_level: i16,
3661        max_rep_level: i16,
3662    ) -> ColumnDescriptor {
3663        let path = ColumnPath::from("col");
3664        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
3665            .with_length(16)
3666            .with_logical_type(Some(LogicalType::Decimal {
3667                scale: 2,
3668                precision: 3,
3669            }))
3670            .with_scale(2)
3671            .with_precision(3)
3672            .build()
3673            .unwrap();
3674        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3675    }
3676
3677    fn float16_statistics_roundtrip(
3678        values: &[FixedLenByteArray],
3679    ) -> ValueStatistics<FixedLenByteArray> {
3680        let page_writer = get_test_page_writer();
3681        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
3682        writer.write_batch(values, None, None).unwrap();
3683
3684        let metadata = writer.close().unwrap().metadata;
3685        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3686            stats.clone()
3687        } else {
3688            panic!("metadata missing statistics");
3689        }
3690    }
3691
3692    fn get_test_float16_column_writer(
3693        page_writer: Box<dyn PageWriter>,
3694        props: WriterPropertiesPtr,
3695    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
3696        let descr = Arc::new(get_test_float16_column_descr(0, 0));
3697        let column_writer = get_column_writer(descr, props, page_writer);
3698        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
3699    }
3700
3701    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
3702        let path = ColumnPath::from("col");
3703        let tpe =
3704            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
3705                .with_length(2)
3706                .with_logical_type(Some(LogicalType::Float16))
3707                .build()
3708                .unwrap();
3709        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3710    }
3711
3712    fn get_test_interval_column_writer(
3713        page_writer: Box<dyn PageWriter>,
3714    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
3715        let descr = Arc::new(get_test_interval_column_descr());
3716        let column_writer = get_column_writer(descr, Default::default(), page_writer);
3717        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
3718    }
3719
3720    fn get_test_interval_column_descr() -> ColumnDescriptor {
3721        let path = ColumnPath::from("col");
3722        let tpe =
3723            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
3724                .with_length(12)
3725                .with_converted_type(ConvertedType::INTERVAL)
3726                .build()
3727                .unwrap();
3728        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
3729    }
3730
3731    /// Returns column writer for UINT32 Column provided as ConvertedType only
3732    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
3733        page_writer: Box<dyn PageWriter + 'a>,
3734        max_def_level: i16,
3735        max_rep_level: i16,
3736        props: WriterPropertiesPtr,
3737    ) -> ColumnWriterImpl<'a, T> {
3738        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
3739            max_def_level,
3740            max_rep_level,
3741        ));
3742        let column_writer = get_column_writer(descr, props, page_writer);
3743        get_typed_column_writer::<T>(column_writer)
3744    }
3745
3746    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
3747    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
3748        max_def_level: i16,
3749        max_rep_level: i16,
3750    ) -> ColumnDescriptor {
3751        let path = ColumnPath::from("col");
3752        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
3753            .with_converted_type(ConvertedType::UINT_32)
3754            .build()
3755            .unwrap();
3756        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3757    }
3758}