Skip to main content

parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26
27use arrow_array::cast::AsArray;
28use arrow_array::types::*;
29use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
30use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
31
32use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
33
34use crate::arrow::ArrowSchemaConverter;
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
37use crate::column::page_encryption::PageEncryptor;
38use crate::column::writer::encoder::ColumnValueEncoder;
39use crate::column::writer::{
40    ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
41};
42use crate::data_type::{ByteArray, FixedLenByteArray};
43#[cfg(feature = "encryption")]
44use crate::encryption::encrypt::FileEncryptor;
45use crate::errors::{ParquetError, Result};
46use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
47use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
48use crate::file::reader::{ChunkReader, Length};
49use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
50use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
51use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
52use levels::{ArrayLevels, calculate_array_levels};
53
54mod byte_array;
55mod levels;
56
57/// Encodes [`RecordBatch`] to parquet
58///
59/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
60/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
61/// flushed on close, leading the final row group in the output file to potentially
62/// contain fewer than `max_row_group_size` rows
63///
64/// # Example: Writing `RecordBatch`es
65/// ```
66/// # use std::sync::Arc;
67/// # use bytes::Bytes;
68/// # use arrow_array::{ArrayRef, Int64Array};
69/// # use arrow_array::RecordBatch;
70/// # use parquet::arrow::arrow_writer::ArrowWriter;
71/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
72/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
73/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
74///
75/// let mut buffer = Vec::new();
76/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
77/// writer.write(&to_write).unwrap();
78/// writer.close().unwrap();
79///
80/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
81/// let read = reader.next().unwrap().unwrap();
82///
83/// assert_eq!(to_write, read);
84/// ```
85///
86/// # Memory Usage and Limiting
87///
88/// The nature of Parquet requires buffering of an entire row group before it can
89/// be flushed to the underlying writer. Data is mostly buffered in its encoded
90/// form, reducing memory usage. However, some data such as dictionary keys,
91/// large strings or very nested data may still result in non-trivial memory
92/// usage.
93///
94/// See Also:
95/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
96/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
97///
98/// Call [`Self::flush`] to trigger an early flush of a row group based on a
99/// memory threshold and/or global memory pressure. However,  smaller row groups
100/// result in higher metadata overheads, and thus may worsen compression ratios
101/// and query performance.
102///
103/// ```no_run
104/// # use std::io::Write;
105/// # use arrow_array::RecordBatch;
106/// # use parquet::arrow::ArrowWriter;
107/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
108/// # let batch: RecordBatch = todo!();
109/// writer.write(&batch).unwrap();
110/// // Trigger an early flush if anticipated size exceeds 1_000_000
111/// if writer.in_progress_size() > 1_000_000 {
112///     writer.flush().unwrap();
113/// }
114/// ```
115///
116/// ## Type Support
117///
118/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
119/// Parquet types including  [`StructArray`] and [`ListArray`].
120///
121/// The following are not supported:
122///
123/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
124///
125/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
126/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
127/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
128/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
129/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
130///
131/// ## Type Compatibility
132/// The writer can write Arrow [`RecordBatch`]s that are logically equivalent. This means that for
133/// a  given column, the writer can accept multiple Arrow [`DataType`]s that contain the same
134/// value type.
135///
136/// For example, the following [`DataType`]s are all logically equivalent and can be written
137/// to the same column:
138/// * String, LargeString, StringView
139/// * Binary, LargeBinary, BinaryView
140///
141/// The writer can will also accept both native and dictionary encoded arrays if the dictionaries
142/// contain compatible values.
143/// ```
144/// # use std::sync::Arc;
145/// # use arrow_array::{DictionaryArray, LargeStringArray, RecordBatch, StringArray, UInt8Array};
146/// # use arrow_schema::{DataType, Field, Schema};
147/// # use parquet::arrow::arrow_writer::ArrowWriter;
148/// let record_batch1 = RecordBatch::try_new(
149///    Arc::new(Schema::new(vec![Field::new("col", DataType::LargeUtf8, false)])),
150///    vec![Arc::new(LargeStringArray::from_iter_values(vec!["a", "b"]))]
151///  )
152/// .unwrap();
153///
154/// let mut buffer = Vec::new();
155/// let mut writer = ArrowWriter::try_new(&mut buffer, record_batch1.schema(), None).unwrap();
156/// writer.write(&record_batch1).unwrap();
157///
158/// let record_batch2 = RecordBatch::try_new(
159///     Arc::new(Schema::new(vec![Field::new(
160///         "col",
161///         DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
162///          false,
163///     )])),
164///     vec![Arc::new(DictionaryArray::new(
165///          UInt8Array::from_iter_values(vec![0, 1]),
166///          Arc::new(StringArray::from_iter_values(vec!["b", "c"])),
167///      ))],
168///  )
169///  .unwrap();
170///  writer.write(&record_batch2).unwrap();
171///  writer.close();
172/// ```
173pub struct ArrowWriter<W: Write> {
174    /// Underlying Parquet writer
175    writer: SerializedFileWriter<W>,
176
177    /// The in-progress row group if any
178    in_progress: Option<ArrowRowGroupWriter>,
179
180    /// A copy of the Arrow schema.
181    ///
182    /// The schema is used to verify that each record batch written has the correct schema
183    arrow_schema: SchemaRef,
184
185    /// Creates new [`ArrowRowGroupWriter`] instances as required
186    row_group_writer_factory: ArrowRowGroupWriterFactory,
187
188    /// The length of arrays to write to each row group
189    max_row_group_size: usize,
190}
191
192impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        let buffered_memory = self.in_progress_size();
195        f.debug_struct("ArrowWriter")
196            .field("writer", &self.writer)
197            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
198            .field("in_progress_rows", &self.in_progress_rows())
199            .field("arrow_schema", &self.arrow_schema)
200            .field("max_row_group_size", &self.max_row_group_size)
201            .finish()
202    }
203}
204
205impl<W: Write + Send> ArrowWriter<W> {
206    /// Try to create a new Arrow writer
207    ///
208    /// The writer will fail if:
209    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
210    ///  * the Arrow schema contains unsupported datatypes such as Unions
211    pub fn try_new(
212        writer: W,
213        arrow_schema: SchemaRef,
214        props: Option<WriterProperties>,
215    ) -> Result<Self> {
216        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
217        Self::try_new_with_options(writer, arrow_schema, options)
218    }
219
220    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
221    ///
222    /// The writer will fail if:
223    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
224    ///  * the Arrow schema contains unsupported datatypes such as Unions
225    pub fn try_new_with_options(
226        writer: W,
227        arrow_schema: SchemaRef,
228        options: ArrowWriterOptions,
229    ) -> Result<Self> {
230        let mut props = options.properties;
231
232        let schema = if let Some(parquet_schema) = options.schema_descr {
233            parquet_schema.clone()
234        } else {
235            let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
236            if let Some(schema_root) = &options.schema_root {
237                converter = converter.schema_root(schema_root);
238            }
239
240            converter.convert(&arrow_schema)?
241        };
242
243        if !options.skip_arrow_metadata {
244            // add serialized arrow schema
245            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
246        }
247
248        let max_row_group_size = props.max_row_group_size();
249
250        let props_ptr = Arc::new(props);
251        let file_writer =
252            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
253
254        let row_group_writer_factory =
255            ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
256
257        Ok(Self {
258            writer: file_writer,
259            in_progress: None,
260            arrow_schema,
261            row_group_writer_factory,
262            max_row_group_size,
263        })
264    }
265
266    /// Returns metadata for any flushed row groups
267    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
268        self.writer.flushed_row_groups()
269    }
270
271    /// Estimated memory usage, in bytes, of this `ArrowWriter`
272    ///
273    /// This estimate is formed bu summing the values of
274    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
275    pub fn memory_size(&self) -> usize {
276        match &self.in_progress {
277            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
278            None => 0,
279        }
280    }
281
282    /// Anticipated encoded size of the in progress row group.
283    ///
284    /// This estimate the row group size after being completely encoded is,
285    /// formed by summing the values of
286    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
287    /// columns.
288    pub fn in_progress_size(&self) -> usize {
289        match &self.in_progress {
290            Some(in_progress) => in_progress
291                .writers
292                .iter()
293                .map(|x| x.get_estimated_total_bytes())
294                .sum(),
295            None => 0,
296        }
297    }
298
299    /// Returns the number of rows buffered in the in progress row group
300    pub fn in_progress_rows(&self) -> usize {
301        self.in_progress
302            .as_ref()
303            .map(|x| x.buffered_rows)
304            .unwrap_or_default()
305    }
306
307    /// Returns the number of bytes written by this instance
308    pub fn bytes_written(&self) -> usize {
309        self.writer.bytes_written()
310    }
311
312    /// Encodes the provided [`RecordBatch`]
313    ///
314    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
315    /// rows, the contents of `batch` will be written to one or more row groups such that all but
316    /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows.
317    ///
318    /// This will fail if the `batch`'s schema does not match the writer's schema.
319    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
320        if batch.num_rows() == 0 {
321            return Ok(());
322        }
323
324        let in_progress = match &mut self.in_progress {
325            Some(in_progress) => in_progress,
326            x => x.insert(
327                self.row_group_writer_factory
328                    .create_row_group_writer(self.writer.flushed_row_groups().len())?,
329            ),
330        };
331
332        // If would exceed max_row_group_size, split batch
333        if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
334            let to_write = self.max_row_group_size - in_progress.buffered_rows;
335            let a = batch.slice(0, to_write);
336            let b = batch.slice(to_write, batch.num_rows() - to_write);
337            self.write(&a)?;
338            return self.write(&b);
339        }
340
341        in_progress.write(batch)?;
342
343        if in_progress.buffered_rows >= self.max_row_group_size {
344            self.flush()?
345        }
346        Ok(())
347    }
348
349    /// Writes the given buf bytes to the internal buffer.
350    ///
351    /// It's safe to use this method to write data to the underlying writer,
352    /// because it will ensure that the buffering and byte‐counting layers are used.
353    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
354        self.writer.write_all(buf)
355    }
356
357    /// Flushes underlying writer
358    pub fn sync(&mut self) -> std::io::Result<()> {
359        self.writer.flush()
360    }
361
362    /// Flushes all buffered rows into a new row group
363    ///
364    /// Note the underlying writer is not flushed with this call.
365    /// If this is a desired behavior, please call [`ArrowWriter::sync`].
366    pub fn flush(&mut self) -> Result<()> {
367        let in_progress = match self.in_progress.take() {
368            Some(in_progress) => in_progress,
369            None => return Ok(()),
370        };
371
372        let mut row_group_writer = self.writer.next_row_group()?;
373        for chunk in in_progress.close()? {
374            chunk.append_to_row_group(&mut row_group_writer)?;
375        }
376        row_group_writer.close()?;
377        Ok(())
378    }
379
380    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
381    ///
382    /// This method provide a way to append kv_metadata after write RecordBatch
383    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
384        self.writer.append_key_value_metadata(kv_metadata)
385    }
386
387    /// Returns a reference to the underlying writer.
388    pub fn inner(&self) -> &W {
389        self.writer.inner()
390    }
391
392    /// Returns a mutable reference to the underlying writer.
393    ///
394    /// **Warning**: if you write directly to this writer, you will skip
395    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
396    /// the file footer’s recorded offsets and sizes to diverge from reality,
397    /// resulting in an unreadable or corrupted Parquet file.
398    ///
399    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
400    pub fn inner_mut(&mut self) -> &mut W {
401        self.writer.inner_mut()
402    }
403
404    /// Flushes any outstanding data and returns the underlying writer.
405    pub fn into_inner(mut self) -> Result<W> {
406        self.flush()?;
407        self.writer.into_inner()
408    }
409
410    /// Close and finalize the underlying Parquet writer
411    ///
412    /// Unlike [`Self::close`] this does not consume self
413    ///
414    /// Attempting to write after calling finish will result in an error
415    pub fn finish(&mut self) -> Result<ParquetMetaData> {
416        self.flush()?;
417        self.writer.finish()
418    }
419
420    /// Close and finalize the underlying Parquet writer
421    pub fn close(mut self) -> Result<ParquetMetaData> {
422        self.finish()
423    }
424
425    /// Create a new row group writer and return its column writers.
426    #[deprecated(
427        since = "56.2.0",
428        note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
429    )]
430    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
431        self.flush()?;
432        let in_progress = self
433            .row_group_writer_factory
434            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
435        Ok(in_progress.writers)
436    }
437
438    /// Append the given column chunks to the file as a new row group.
439    #[deprecated(
440        since = "56.2.0",
441        note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
442    )]
443    pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
444        let mut row_group_writer = self.writer.next_row_group()?;
445        for chunk in chunks {
446            chunk.append_to_row_group(&mut row_group_writer)?;
447        }
448        row_group_writer.close()?;
449        Ok(())
450    }
451
452    /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`].
453    ///
454    /// Flushes any outstanding data before returning.
455    ///
456    /// This can be useful to provide more control over how files are written, for example
457    /// to write columns in parallel. See the example on [`ArrowColumnWriter`].
458    pub fn into_serialized_writer(
459        mut self,
460    ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
461        self.flush()?;
462        Ok((self.writer, self.row_group_writer_factory))
463    }
464}
465
466impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
467    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
468        self.write(batch).map_err(|e| e.into())
469    }
470
471    fn close(self) -> std::result::Result<(), ArrowError> {
472        self.close()?;
473        Ok(())
474    }
475}
476
477/// Arrow-specific configuration settings for writing parquet files.
478///
479/// See [`ArrowWriter`] for how to configure the writer.
480#[derive(Debug, Clone, Default)]
481pub struct ArrowWriterOptions {
482    properties: WriterProperties,
483    skip_arrow_metadata: bool,
484    schema_root: Option<String>,
485    schema_descr: Option<SchemaDescriptor>,
486}
487
488impl ArrowWriterOptions {
489    /// Creates a new [`ArrowWriterOptions`] with the default settings.
490    pub fn new() -> Self {
491        Self::default()
492    }
493
494    /// Sets the [`WriterProperties`] for writing parquet files.
495    pub fn with_properties(self, properties: WriterProperties) -> Self {
496        Self { properties, ..self }
497    }
498
499    /// Skip encoding the embedded arrow metadata (defaults to `false`)
500    ///
501    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
502    /// by default.
503    ///
504    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
505    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
506        Self {
507            skip_arrow_metadata,
508            ..self
509        }
510    }
511
512    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
513    pub fn with_schema_root(self, schema_root: String) -> Self {
514        Self {
515            schema_root: Some(schema_root),
516            ..self
517        }
518    }
519
520    /// Explicitly specify the Parquet schema to be used
521    ///
522    /// If omitted (the default), the [`ArrowSchemaConverter`] is used to compute the
523    /// Parquet [`SchemaDescriptor`]. This may be used When the [`SchemaDescriptor`] is
524    /// already known or must be calculated using custom logic.
525    pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
526        Self {
527            schema_descr: Some(schema_descr),
528            ..self
529        }
530    }
531}
532
533/// A single column chunk produced by [`ArrowColumnWriter`]
534#[derive(Default)]
535struct ArrowColumnChunkData {
536    length: usize,
537    data: Vec<Bytes>,
538}
539
540impl Length for ArrowColumnChunkData {
541    fn len(&self) -> u64 {
542        self.length as _
543    }
544}
545
546impl ChunkReader for ArrowColumnChunkData {
547    type T = ArrowColumnChunkReader;
548
549    fn get_read(&self, start: u64) -> Result<Self::T> {
550        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
551        Ok(ArrowColumnChunkReader(
552            self.data.clone().into_iter().peekable(),
553        ))
554    }
555
556    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
557        unimplemented!()
558    }
559}
560
561/// A [`Read`] for [`ArrowColumnChunkData`]
562struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
563
564impl Read for ArrowColumnChunkReader {
565    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
566        let buffer = loop {
567            match self.0.peek_mut() {
568                Some(b) if b.is_empty() => {
569                    self.0.next();
570                    continue;
571                }
572                Some(b) => break b,
573                None => return Ok(0),
574            }
575        };
576
577        let len = buffer.len().min(out.len());
578        let b = buffer.split_to(len);
579        out[..len].copy_from_slice(&b);
580        Ok(len)
581    }
582}
583
584/// A shared [`ArrowColumnChunkData`]
585///
586/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
587/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
588type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
589
590#[derive(Default)]
591struct ArrowPageWriter {
592    buffer: SharedColumnChunk,
593    #[cfg(feature = "encryption")]
594    page_encryptor: Option<PageEncryptor>,
595}
596
597impl ArrowPageWriter {
598    #[cfg(feature = "encryption")]
599    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
600        self.page_encryptor = page_encryptor;
601        self
602    }
603
604    #[cfg(feature = "encryption")]
605    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
606        self.page_encryptor.as_mut()
607    }
608
609    #[cfg(not(feature = "encryption"))]
610    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
611        None
612    }
613}
614
615impl PageWriter for ArrowPageWriter {
616    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
617        let page = match self.page_encryptor_mut() {
618            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
619            None => page,
620        };
621
622        let page_header = page.to_thrift_header()?;
623        let header = {
624            let mut header = Vec::with_capacity(1024);
625
626            match self.page_encryptor_mut() {
627                Some(page_encryptor) => {
628                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
629                    if page.compressed_page().is_data_page() {
630                        page_encryptor.increment_page();
631                    }
632                }
633                None => {
634                    let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
635                    page_header.write_thrift(&mut protocol)?;
636                }
637            };
638
639            Bytes::from(header)
640        };
641
642        let mut buf = self.buffer.try_lock().unwrap();
643
644        let data = page.compressed_page().buffer().clone();
645        let compressed_size = data.len() + header.len();
646
647        let mut spec = PageWriteSpec::new();
648        spec.page_type = page.page_type();
649        spec.num_values = page.num_values();
650        spec.uncompressed_size = page.uncompressed_size() + header.len();
651        spec.offset = buf.length as u64;
652        spec.compressed_size = compressed_size;
653        spec.bytes_written = compressed_size as u64;
654
655        buf.length += compressed_size;
656        buf.data.push(header);
657        buf.data.push(data);
658
659        Ok(spec)
660    }
661
662    fn close(&mut self) -> Result<()> {
663        Ok(())
664    }
665}
666
667/// A leaf column that can be encoded by [`ArrowColumnWriter`]
668#[derive(Debug)]
669pub struct ArrowLeafColumn(ArrayLevels);
670
671/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
672///
673/// This function can be used along with [`get_column_writers`] to encode
674/// individual columns in parallel. See example on [`ArrowColumnWriter`]
675pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
676    let levels = calculate_array_levels(array, field)?;
677    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
678}
679
680/// The data for a single column chunk, see [`ArrowColumnWriter`]
681pub struct ArrowColumnChunk {
682    data: ArrowColumnChunkData,
683    close: ColumnCloseResult,
684}
685
686impl std::fmt::Debug for ArrowColumnChunk {
687    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
688        f.debug_struct("ArrowColumnChunk")
689            .field("length", &self.data.length)
690            .finish_non_exhaustive()
691    }
692}
693
694impl ArrowColumnChunk {
695    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
696    pub fn append_to_row_group<W: Write + Send>(
697        self,
698        writer: &mut SerializedRowGroupWriter<'_, W>,
699    ) -> Result<()> {
700        writer.append_column(&self.data, self.close)
701    }
702}
703
704/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
705///
706/// `ArrowColumnWriter` instances can be created using an [`ArrowRowGroupWriterFactory`];
707///
708/// Note: This is a low-level interface for applications that require
709/// fine-grained control of encoding (e.g. encoding using multiple threads),
710/// see [`ArrowWriter`] for a higher-level interface
711///
712/// # Example: Encoding two Arrow Array's in Parallel
713/// ```
714/// // The arrow schema
715/// # use std::sync::Arc;
716/// # use arrow_array::*;
717/// # use arrow_schema::*;
718/// # use parquet::arrow::ArrowSchemaConverter;
719/// # use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory};
720/// # use parquet::file::properties::WriterProperties;
721/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
722/// #
723/// let schema = Arc::new(Schema::new(vec![
724///     Field::new("i32", DataType::Int32, false),
725///     Field::new("f32", DataType::Float32, false),
726/// ]));
727///
728/// // Compute the parquet schema
729/// let props = Arc::new(WriterProperties::default());
730/// let parquet_schema = ArrowSchemaConverter::new()
731///   .with_coerce_types(props.coerce_types())
732///   .convert(&schema)
733///   .unwrap();
734///
735/// // Create parquet writer
736/// let root_schema = parquet_schema.root_schema_ptr();
737/// // write to memory in the example, but this could be a File
738/// let mut out = Vec::with_capacity(1024);
739/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
740///   .unwrap();
741///
742/// // Create a factory for building Arrow column writers
743/// let row_group_factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
744/// // Create column writers for the 0th row group
745/// let col_writers = row_group_factory.create_column_writers(0).unwrap();
746///
747/// // Spawn a worker thread for each column
748/// //
749/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
750/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
751/// let mut workers: Vec<_> = col_writers
752///     .into_iter()
753///     .map(|mut col_writer| {
754///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
755///         let handle = std::thread::spawn(move || {
756///             // receive Arrays to encode via the channel
757///             for col in recv {
758///                 col_writer.write(&col)?;
759///             }
760///             // once the input is complete, close the writer
761///             // to return the newly created ArrowColumnChunk
762///             col_writer.close()
763///         });
764///         (handle, send)
765///     })
766///     .collect();
767///
768/// // Start row group
769/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
770///   .next_row_group()
771///   .unwrap();
772///
773/// // Create some example input columns to encode
774/// let to_write = vec![
775///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
776///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
777/// ];
778///
779/// // Send the input columns to the workers
780/// let mut worker_iter = workers.iter_mut();
781/// for (arr, field) in to_write.iter().zip(&schema.fields) {
782///     for leaves in compute_leaves(field, arr).unwrap() {
783///         worker_iter.next().unwrap().1.send(leaves).unwrap();
784///     }
785/// }
786///
787/// // Wait for the workers to complete encoding, and append
788/// // the resulting column chunks to the row group (and the file)
789/// for (handle, send) in workers {
790///     drop(send); // Drop send side to signal termination
791///     // wait for the worker to send the completed chunk
792///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
793///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
794/// }
795/// // Close the row group which writes to the underlying file
796/// row_group_writer.close().unwrap();
797///
798/// let metadata = writer.close().unwrap();
799/// assert_eq!(metadata.file_metadata().num_rows(), 3);
800/// ```
801pub struct ArrowColumnWriter {
802    writer: ArrowColumnWriterImpl,
803    chunk: SharedColumnChunk,
804}
805
806impl std::fmt::Debug for ArrowColumnWriter {
807    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
808        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
809    }
810}
811
812enum ArrowColumnWriterImpl {
813    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
814    Column(ColumnWriter<'static>),
815}
816
817impl ArrowColumnWriter {
818    /// Write an [`ArrowLeafColumn`]
819    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
820        match &mut self.writer {
821            ArrowColumnWriterImpl::Column(c) => {
822                write_leaf(c, &col.0)?;
823            }
824            ArrowColumnWriterImpl::ByteArray(c) => {
825                write_primitive(c, col.0.array().as_ref(), &col.0)?;
826            }
827        }
828        Ok(())
829    }
830
831    /// Close this column returning the written [`ArrowColumnChunk`]
832    pub fn close(self) -> Result<ArrowColumnChunk> {
833        let close = match self.writer {
834            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
835            ArrowColumnWriterImpl::Column(c) => c.close()?,
836        };
837        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
838        let data = chunk.into_inner().unwrap();
839        Ok(ArrowColumnChunk { data, close })
840    }
841
842    /// Returns the estimated total memory usage by the writer.
843    ///
844    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
845    /// of the current memory usage and not it's anticipated encoded size.
846    ///
847    /// This includes:
848    /// 1. Data buffered in encoded form
849    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
850    ///
851    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
852    pub fn memory_size(&self) -> usize {
853        match &self.writer {
854            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
855            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
856        }
857    }
858
859    /// Returns the estimated total encoded bytes for this column writer.
860    ///
861    /// This includes:
862    /// 1. Data buffered in encoded form
863    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
864    ///
865    /// This value should be less than or equal to [`Self::memory_size`]
866    pub fn get_estimated_total_bytes(&self) -> usize {
867        match &self.writer {
868            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
869            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
870        }
871    }
872}
873
874/// Encodes [`RecordBatch`] to a parquet row group
875///
876/// Note: this structure is created by [`ArrowRowGroupWriterFactory`] internally used to
877/// create [`ArrowRowGroupWriter`]s, but it is not exposed publicly.
878///
879/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
880#[derive(Debug)]
881struct ArrowRowGroupWriter {
882    writers: Vec<ArrowColumnWriter>,
883    schema: SchemaRef,
884    buffered_rows: usize,
885}
886
887impl ArrowRowGroupWriter {
888    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
889        Self {
890            writers,
891            schema: arrow.clone(),
892            buffered_rows: 0,
893        }
894    }
895
896    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
897        self.buffered_rows += batch.num_rows();
898        let mut writers = self.writers.iter_mut();
899        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
900            for leaf in compute_leaves(field.as_ref(), column)? {
901                writers.next().unwrap().write(&leaf)?
902            }
903        }
904        Ok(())
905    }
906
907    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
908        self.writers
909            .into_iter()
910            .map(|writer| writer.close())
911            .collect()
912    }
913}
914
915/// Factory that creates new column writers for each row group in the Parquet file.
916///
917/// You can create this structure via an [`ArrowWriter::into_serialized_writer`].
918/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
919#[derive(Debug)]
920pub struct ArrowRowGroupWriterFactory {
921    schema: SchemaDescPtr,
922    arrow_schema: SchemaRef,
923    props: WriterPropertiesPtr,
924    #[cfg(feature = "encryption")]
925    file_encryptor: Option<Arc<FileEncryptor>>,
926}
927
928impl ArrowRowGroupWriterFactory {
929    /// Create a new [`ArrowRowGroupWriterFactory`] for the provided file writer and Arrow schema
930    pub fn new<W: Write + Send>(
931        file_writer: &SerializedFileWriter<W>,
932        arrow_schema: SchemaRef,
933    ) -> Self {
934        let schema = Arc::clone(file_writer.schema_descr_ptr());
935        let props = Arc::clone(file_writer.properties());
936        Self {
937            schema,
938            arrow_schema,
939            props,
940            #[cfg(feature = "encryption")]
941            file_encryptor: file_writer.file_encryptor(),
942        }
943    }
944
945    fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
946        let writers = self.create_column_writers(row_group_index)?;
947        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
948    }
949
950    /// Create column writers for a new row group, with the given row group index
951    pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
952        let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
953        let mut leaves = self.schema.columns().iter();
954        let column_factory = self.column_writer_factory(row_group_index);
955        for field in &self.arrow_schema.fields {
956            column_factory.get_arrow_column_writer(
957                field.data_type(),
958                &self.props,
959                &mut leaves,
960                &mut writers,
961            )?;
962        }
963        Ok(writers)
964    }
965
966    #[cfg(feature = "encryption")]
967    fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
968        ArrowColumnWriterFactory::new()
969            .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
970    }
971
972    #[cfg(not(feature = "encryption"))]
973    fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
974        ArrowColumnWriterFactory::new()
975    }
976}
977
978/// Returns [`ArrowColumnWriter`]s for each column in a given schema
979#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
980pub fn get_column_writers(
981    parquet: &SchemaDescriptor,
982    props: &WriterPropertiesPtr,
983    arrow: &SchemaRef,
984) -> Result<Vec<ArrowColumnWriter>> {
985    let mut writers = Vec::with_capacity(arrow.fields.len());
986    let mut leaves = parquet.columns().iter();
987    let column_factory = ArrowColumnWriterFactory::new();
988    for field in &arrow.fields {
989        column_factory.get_arrow_column_writer(
990            field.data_type(),
991            props,
992            &mut leaves,
993            &mut writers,
994        )?;
995    }
996    Ok(writers)
997}
998
999/// Creates [`ArrowColumnWriter`] instances
1000struct ArrowColumnWriterFactory {
1001    #[cfg(feature = "encryption")]
1002    row_group_index: usize,
1003    #[cfg(feature = "encryption")]
1004    file_encryptor: Option<Arc<FileEncryptor>>,
1005}
1006
1007impl ArrowColumnWriterFactory {
1008    pub fn new() -> Self {
1009        Self {
1010            #[cfg(feature = "encryption")]
1011            row_group_index: 0,
1012            #[cfg(feature = "encryption")]
1013            file_encryptor: None,
1014        }
1015    }
1016
1017    #[cfg(feature = "encryption")]
1018    pub fn with_file_encryptor(
1019        mut self,
1020        row_group_index: usize,
1021        file_encryptor: Option<Arc<FileEncryptor>>,
1022    ) -> Self {
1023        self.row_group_index = row_group_index;
1024        self.file_encryptor = file_encryptor;
1025        self
1026    }
1027
1028    #[cfg(feature = "encryption")]
1029    fn create_page_writer(
1030        &self,
1031        column_descriptor: &ColumnDescPtr,
1032        column_index: usize,
1033    ) -> Result<Box<ArrowPageWriter>> {
1034        let column_path = column_descriptor.path().string();
1035        let page_encryptor = PageEncryptor::create_if_column_encrypted(
1036            &self.file_encryptor,
1037            self.row_group_index,
1038            column_index,
1039            &column_path,
1040        )?;
1041        Ok(Box::new(
1042            ArrowPageWriter::default().with_encryptor(page_encryptor),
1043        ))
1044    }
1045
1046    #[cfg(not(feature = "encryption"))]
1047    fn create_page_writer(
1048        &self,
1049        _column_descriptor: &ColumnDescPtr,
1050        _column_index: usize,
1051    ) -> Result<Box<ArrowPageWriter>> {
1052        Ok(Box::<ArrowPageWriter>::default())
1053    }
1054
1055    /// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the
1056    /// output ColumnDesc to `leaves` and the column writers to `out`
1057    fn get_arrow_column_writer(
1058        &self,
1059        data_type: &ArrowDataType,
1060        props: &WriterPropertiesPtr,
1061        leaves: &mut Iter<'_, ColumnDescPtr>,
1062        out: &mut Vec<ArrowColumnWriter>,
1063    ) -> Result<()> {
1064        // Instantiate writers for normal columns
1065        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1066            let page_writer = self.create_page_writer(desc, out.len())?;
1067            let chunk = page_writer.buffer.clone();
1068            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1069            Ok(ArrowColumnWriter {
1070                chunk,
1071                writer: ArrowColumnWriterImpl::Column(writer),
1072            })
1073        };
1074
1075        // Instantiate writers for byte arrays (e.g. Utf8,  Binary, etc)
1076        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1077            let page_writer = self.create_page_writer(desc, out.len())?;
1078            let chunk = page_writer.buffer.clone();
1079            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1080            Ok(ArrowColumnWriter {
1081                chunk,
1082                writer: ArrowColumnWriterImpl::ByteArray(writer),
1083            })
1084        };
1085
1086        match data_type {
1087            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1088            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1089                out.push(col(leaves.next().unwrap())?)
1090            }
1091            ArrowDataType::LargeBinary
1092            | ArrowDataType::Binary
1093            | ArrowDataType::Utf8
1094            | ArrowDataType::LargeUtf8
1095            | ArrowDataType::BinaryView
1096            | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1097            ArrowDataType::List(f)
1098            | ArrowDataType::LargeList(f)
1099            | ArrowDataType::FixedSizeList(f, _) => {
1100                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1101            }
1102            ArrowDataType::Struct(fields) => {
1103                for field in fields {
1104                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1105                }
1106            }
1107            ArrowDataType::Map(f, _) => match f.data_type() {
1108                ArrowDataType::Struct(f) => {
1109                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1110                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1111                }
1112                _ => unreachable!("invalid map type"),
1113            },
1114            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1115                ArrowDataType::Utf8
1116                | ArrowDataType::LargeUtf8
1117                | ArrowDataType::Binary
1118                | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1119                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1120                    out.push(bytes(leaves.next().unwrap())?)
1121                }
1122                ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1123                _ => out.push(col(leaves.next().unwrap())?),
1124            },
1125            _ => {
1126                return Err(ParquetError::NYI(format!(
1127                    "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1128                )));
1129            }
1130        }
1131        Ok(())
1132    }
1133}
1134
1135fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1136    let column = levels.array().as_ref();
1137    let indices = levels.non_null_indices();
1138    match writer {
1139        ColumnWriter::Int32ColumnWriter(typed) => {
1140            match column.data_type() {
1141                ArrowDataType::Date64 => {
1142                    // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
1143                    let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1144                    let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1145
1146                    let array = array.as_primitive::<Int32Type>();
1147                    write_primitive(typed, array.values(), levels)
1148                }
1149                ArrowDataType::UInt32 => {
1150                    let values = column.as_primitive::<UInt32Type>().values();
1151                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1152                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1153                    let array = values.inner().typed_data::<i32>();
1154                    write_primitive(typed, array, levels)
1155                }
1156                ArrowDataType::Decimal32(_, _) => {
1157                    let array = column
1158                        .as_primitive::<Decimal32Type>()
1159                        .unary::<_, Int32Type>(|v| v);
1160                    write_primitive(typed, array.values(), levels)
1161                }
1162                ArrowDataType::Decimal64(_, _) => {
1163                    // use the int32 to represent the decimal with low precision
1164                    let array = column
1165                        .as_primitive::<Decimal64Type>()
1166                        .unary::<_, Int32Type>(|v| v as i32);
1167                    write_primitive(typed, array.values(), levels)
1168                }
1169                ArrowDataType::Decimal128(_, _) => {
1170                    // use the int32 to represent the decimal with low precision
1171                    let array = column
1172                        .as_primitive::<Decimal128Type>()
1173                        .unary::<_, Int32Type>(|v| v as i32);
1174                    write_primitive(typed, array.values(), levels)
1175                }
1176                ArrowDataType::Decimal256(_, _) => {
1177                    // use the int32 to represent the decimal with low precision
1178                    let array = column
1179                        .as_primitive::<Decimal256Type>()
1180                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1181                    write_primitive(typed, array.values(), levels)
1182                }
1183                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1184                    ArrowDataType::Decimal32(_, _) => {
1185                        let array = arrow_cast::cast(column, value_type)?;
1186                        let array = array
1187                            .as_primitive::<Decimal32Type>()
1188                            .unary::<_, Int32Type>(|v| v);
1189                        write_primitive(typed, array.values(), levels)
1190                    }
1191                    ArrowDataType::Decimal64(_, _) => {
1192                        let array = arrow_cast::cast(column, value_type)?;
1193                        let array = array
1194                            .as_primitive::<Decimal64Type>()
1195                            .unary::<_, Int32Type>(|v| v as i32);
1196                        write_primitive(typed, array.values(), levels)
1197                    }
1198                    ArrowDataType::Decimal128(_, _) => {
1199                        let array = arrow_cast::cast(column, value_type)?;
1200                        let array = array
1201                            .as_primitive::<Decimal128Type>()
1202                            .unary::<_, Int32Type>(|v| v as i32);
1203                        write_primitive(typed, array.values(), levels)
1204                    }
1205                    ArrowDataType::Decimal256(_, _) => {
1206                        let array = arrow_cast::cast(column, value_type)?;
1207                        let array = array
1208                            .as_primitive::<Decimal256Type>()
1209                            .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1210                        write_primitive(typed, array.values(), levels)
1211                    }
1212                    _ => {
1213                        let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1214                        let array = array.as_primitive::<Int32Type>();
1215                        write_primitive(typed, array.values(), levels)
1216                    }
1217                },
1218                _ => {
1219                    let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1220                    let array = array.as_primitive::<Int32Type>();
1221                    write_primitive(typed, array.values(), levels)
1222                }
1223            }
1224        }
1225        ColumnWriter::BoolColumnWriter(typed) => {
1226            let array = column.as_boolean();
1227            typed.write_batch(
1228                get_bool_array_slice(array, indices).as_slice(),
1229                levels.def_levels(),
1230                levels.rep_levels(),
1231            )
1232        }
1233        ColumnWriter::Int64ColumnWriter(typed) => {
1234            match column.data_type() {
1235                ArrowDataType::Date64 => {
1236                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1237
1238                    let array = array.as_primitive::<Int64Type>();
1239                    write_primitive(typed, array.values(), levels)
1240                }
1241                ArrowDataType::Int64 => {
1242                    let array = column.as_primitive::<Int64Type>();
1243                    write_primitive(typed, array.values(), levels)
1244                }
1245                ArrowDataType::UInt64 => {
1246                    let values = column.as_primitive::<UInt64Type>().values();
1247                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1248                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1249                    let array = values.inner().typed_data::<i64>();
1250                    write_primitive(typed, array, levels)
1251                }
1252                ArrowDataType::Decimal64(_, _) => {
1253                    let array = column
1254                        .as_primitive::<Decimal64Type>()
1255                        .unary::<_, Int64Type>(|v| v);
1256                    write_primitive(typed, array.values(), levels)
1257                }
1258                ArrowDataType::Decimal128(_, _) => {
1259                    // use the int64 to represent the decimal with low precision
1260                    let array = column
1261                        .as_primitive::<Decimal128Type>()
1262                        .unary::<_, Int64Type>(|v| v as i64);
1263                    write_primitive(typed, array.values(), levels)
1264                }
1265                ArrowDataType::Decimal256(_, _) => {
1266                    // use the int64 to represent the decimal with low precision
1267                    let array = column
1268                        .as_primitive::<Decimal256Type>()
1269                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1270                    write_primitive(typed, array.values(), levels)
1271                }
1272                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1273                    ArrowDataType::Decimal64(_, _) => {
1274                        let array = arrow_cast::cast(column, value_type)?;
1275                        let array = array
1276                            .as_primitive::<Decimal64Type>()
1277                            .unary::<_, Int64Type>(|v| v);
1278                        write_primitive(typed, array.values(), levels)
1279                    }
1280                    ArrowDataType::Decimal128(_, _) => {
1281                        let array = arrow_cast::cast(column, value_type)?;
1282                        let array = array
1283                            .as_primitive::<Decimal128Type>()
1284                            .unary::<_, Int64Type>(|v| v as i64);
1285                        write_primitive(typed, array.values(), levels)
1286                    }
1287                    ArrowDataType::Decimal256(_, _) => {
1288                        let array = arrow_cast::cast(column, value_type)?;
1289                        let array = array
1290                            .as_primitive::<Decimal256Type>()
1291                            .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1292                        write_primitive(typed, array.values(), levels)
1293                    }
1294                    _ => {
1295                        let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1296                        let array = array.as_primitive::<Int64Type>();
1297                        write_primitive(typed, array.values(), levels)
1298                    }
1299                },
1300                _ => {
1301                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1302                    let array = array.as_primitive::<Int64Type>();
1303                    write_primitive(typed, array.values(), levels)
1304                }
1305            }
1306        }
1307        ColumnWriter::Int96ColumnWriter(_typed) => {
1308            unreachable!("Currently unreachable because data type not supported")
1309        }
1310        ColumnWriter::FloatColumnWriter(typed) => {
1311            let array = column.as_primitive::<Float32Type>();
1312            write_primitive(typed, array.values(), levels)
1313        }
1314        ColumnWriter::DoubleColumnWriter(typed) => {
1315            let array = column.as_primitive::<Float64Type>();
1316            write_primitive(typed, array.values(), levels)
1317        }
1318        ColumnWriter::ByteArrayColumnWriter(_) => {
1319            unreachable!("should use ByteArrayWriter")
1320        }
1321        ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1322            let bytes = match column.data_type() {
1323                ArrowDataType::Interval(interval_unit) => match interval_unit {
1324                    IntervalUnit::YearMonth => {
1325                        let array = column
1326                            .as_any()
1327                            .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1328                            .unwrap();
1329                        get_interval_ym_array_slice(array, indices)
1330                    }
1331                    IntervalUnit::DayTime => {
1332                        let array = column
1333                            .as_any()
1334                            .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1335                            .unwrap();
1336                        get_interval_dt_array_slice(array, indices)
1337                    }
1338                    _ => {
1339                        return Err(ParquetError::NYI(format!(
1340                            "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1341                        )));
1342                    }
1343                },
1344                ArrowDataType::FixedSizeBinary(_) => {
1345                    let array = column
1346                        .as_any()
1347                        .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1348                        .unwrap();
1349                    get_fsb_array_slice(array, indices)
1350                }
1351                ArrowDataType::Decimal32(_, _) => {
1352                    let array = column.as_primitive::<Decimal32Type>();
1353                    get_decimal_32_array_slice(array, indices)
1354                }
1355                ArrowDataType::Decimal64(_, _) => {
1356                    let array = column.as_primitive::<Decimal64Type>();
1357                    get_decimal_64_array_slice(array, indices)
1358                }
1359                ArrowDataType::Decimal128(_, _) => {
1360                    let array = column.as_primitive::<Decimal128Type>();
1361                    get_decimal_128_array_slice(array, indices)
1362                }
1363                ArrowDataType::Decimal256(_, _) => {
1364                    let array = column
1365                        .as_any()
1366                        .downcast_ref::<arrow_array::Decimal256Array>()
1367                        .unwrap();
1368                    get_decimal_256_array_slice(array, indices)
1369                }
1370                ArrowDataType::Float16 => {
1371                    let array = column.as_primitive::<Float16Type>();
1372                    get_float_16_array_slice(array, indices)
1373                }
1374                _ => {
1375                    return Err(ParquetError::NYI(
1376                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1377                    ));
1378                }
1379            };
1380            typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1381        }
1382    }
1383}
1384
1385fn write_primitive<E: ColumnValueEncoder>(
1386    writer: &mut GenericColumnWriter<E>,
1387    values: &E::Values,
1388    levels: &ArrayLevels,
1389) -> Result<usize> {
1390    writer.write_batch_internal(
1391        values,
1392        Some(levels.non_null_indices()),
1393        levels.def_levels(),
1394        levels.rep_levels(),
1395        None,
1396        None,
1397        None,
1398    )
1399}
1400
1401fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1402    let mut values = Vec::with_capacity(indices.len());
1403    for i in indices {
1404        values.push(array.value(*i))
1405    }
1406    values
1407}
1408
1409/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1410/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1411fn get_interval_ym_array_slice(
1412    array: &arrow_array::IntervalYearMonthArray,
1413    indices: &[usize],
1414) -> Vec<FixedLenByteArray> {
1415    let mut values = Vec::with_capacity(indices.len());
1416    for i in indices {
1417        let mut value = array.value(*i).to_le_bytes().to_vec();
1418        let mut suffix = vec![0; 8];
1419        value.append(&mut suffix);
1420        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1421    }
1422    values
1423}
1424
1425/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1426/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1427fn get_interval_dt_array_slice(
1428    array: &arrow_array::IntervalDayTimeArray,
1429    indices: &[usize],
1430) -> Vec<FixedLenByteArray> {
1431    let mut values = Vec::with_capacity(indices.len());
1432    for i in indices {
1433        let mut out = [0; 12];
1434        let value = array.value(*i);
1435        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1436        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1437        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1438    }
1439    values
1440}
1441
1442fn get_decimal_32_array_slice(
1443    array: &arrow_array::Decimal32Array,
1444    indices: &[usize],
1445) -> Vec<FixedLenByteArray> {
1446    let mut values = Vec::with_capacity(indices.len());
1447    let size = decimal_length_from_precision(array.precision());
1448    for i in indices {
1449        let as_be_bytes = array.value(*i).to_be_bytes();
1450        let resized_value = as_be_bytes[(4 - size)..].to_vec();
1451        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1452    }
1453    values
1454}
1455
1456fn get_decimal_64_array_slice(
1457    array: &arrow_array::Decimal64Array,
1458    indices: &[usize],
1459) -> Vec<FixedLenByteArray> {
1460    let mut values = Vec::with_capacity(indices.len());
1461    let size = decimal_length_from_precision(array.precision());
1462    for i in indices {
1463        let as_be_bytes = array.value(*i).to_be_bytes();
1464        let resized_value = as_be_bytes[(8 - size)..].to_vec();
1465        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1466    }
1467    values
1468}
1469
1470fn get_decimal_128_array_slice(
1471    array: &arrow_array::Decimal128Array,
1472    indices: &[usize],
1473) -> Vec<FixedLenByteArray> {
1474    let mut values = Vec::with_capacity(indices.len());
1475    let size = decimal_length_from_precision(array.precision());
1476    for i in indices {
1477        let as_be_bytes = array.value(*i).to_be_bytes();
1478        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1479        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1480    }
1481    values
1482}
1483
1484fn get_decimal_256_array_slice(
1485    array: &arrow_array::Decimal256Array,
1486    indices: &[usize],
1487) -> Vec<FixedLenByteArray> {
1488    let mut values = Vec::with_capacity(indices.len());
1489    let size = decimal_length_from_precision(array.precision());
1490    for i in indices {
1491        let as_be_bytes = array.value(*i).to_be_bytes();
1492        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1493        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1494    }
1495    values
1496}
1497
1498fn get_float_16_array_slice(
1499    array: &arrow_array::Float16Array,
1500    indices: &[usize],
1501) -> Vec<FixedLenByteArray> {
1502    let mut values = Vec::with_capacity(indices.len());
1503    for i in indices {
1504        let value = array.value(*i).to_le_bytes().to_vec();
1505        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1506    }
1507    values
1508}
1509
1510fn get_fsb_array_slice(
1511    array: &arrow_array::FixedSizeBinaryArray,
1512    indices: &[usize],
1513) -> Vec<FixedLenByteArray> {
1514    let mut values = Vec::with_capacity(indices.len());
1515    for i in indices {
1516        let value = array.value(*i).to_vec();
1517        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1518    }
1519    values
1520}
1521
1522#[cfg(test)]
1523mod tests {
1524    use super::*;
1525    use std::collections::HashMap;
1526
1527    use std::fs::File;
1528
1529    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1530    use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1531    use crate::column::page::{Page, PageReader};
1532    use crate::file::metadata::thrift::PageHeader;
1533    use crate::file::page_index::column_index::ColumnIndexMetaData;
1534    use crate::file::reader::SerializedPageReader;
1535    use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1536    use crate::schema::types::{ColumnPath, Type};
1537    use arrow::datatypes::ToByteSlice;
1538    use arrow::datatypes::{DataType, Schema};
1539    use arrow::error::Result as ArrowResult;
1540    use arrow::util::data_gen::create_random_array;
1541    use arrow::util::pretty::pretty_format_batches;
1542    use arrow::{array::*, buffer::Buffer};
1543    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1544    use arrow_schema::Fields;
1545    use half::f16;
1546    use num_traits::{FromPrimitive, ToPrimitive};
1547    use tempfile::tempfile;
1548
1549    use crate::basic::Encoding;
1550    use crate::data_type::AsBytes;
1551    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1552    use crate::file::properties::{
1553        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1554    };
1555    use crate::file::serialized_reader::ReadOptionsBuilder;
1556    use crate::file::{
1557        reader::{FileReader, SerializedFileReader},
1558        statistics::Statistics,
1559    };
1560
1561    #[test]
1562    fn arrow_writer() {
1563        // define schema
1564        let schema = Schema::new(vec![
1565            Field::new("a", DataType::Int32, false),
1566            Field::new("b", DataType::Int32, true),
1567        ]);
1568
1569        // create some data
1570        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1571        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1572
1573        // build a record batch
1574        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1575
1576        roundtrip(batch, Some(SMALL_SIZE / 2));
1577    }
1578
1579    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1580        let mut buffer = vec![];
1581
1582        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1583        writer.write(expected_batch).unwrap();
1584        writer.close().unwrap();
1585
1586        buffer
1587    }
1588
1589    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1590        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1591        writer.write(expected_batch).unwrap();
1592        writer.into_inner().unwrap()
1593    }
1594
1595    #[test]
1596    fn roundtrip_bytes() {
1597        // define schema
1598        let schema = Arc::new(Schema::new(vec![
1599            Field::new("a", DataType::Int32, false),
1600            Field::new("b", DataType::Int32, true),
1601        ]));
1602
1603        // create some data
1604        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1605        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1606
1607        // build a record batch
1608        let expected_batch =
1609            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1610
1611        for buffer in [
1612            get_bytes_after_close(schema.clone(), &expected_batch),
1613            get_bytes_by_into_inner(schema, &expected_batch),
1614        ] {
1615            let cursor = Bytes::from(buffer);
1616            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1617
1618            let actual_batch = record_batch_reader
1619                .next()
1620                .expect("No batch found")
1621                .expect("Unable to get batch");
1622
1623            assert_eq!(expected_batch.schema(), actual_batch.schema());
1624            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1625            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1626            for i in 0..expected_batch.num_columns() {
1627                let expected_data = expected_batch.column(i).to_data();
1628                let actual_data = actual_batch.column(i).to_data();
1629
1630                assert_eq!(expected_data, actual_data);
1631            }
1632        }
1633    }
1634
1635    #[test]
1636    fn arrow_writer_non_null() {
1637        // define schema
1638        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1639
1640        // create some data
1641        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1642
1643        // build a record batch
1644        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1645
1646        roundtrip(batch, Some(SMALL_SIZE / 2));
1647    }
1648
1649    #[test]
1650    fn arrow_writer_list() {
1651        // define schema
1652        let schema = Schema::new(vec![Field::new(
1653            "a",
1654            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1655            true,
1656        )]);
1657
1658        // create some data
1659        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1660
1661        // Construct a buffer for value offsets, for the nested array:
1662        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1663        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1664
1665        // Construct a list array from the above two
1666        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1667            DataType::Int32,
1668            false,
1669        ))))
1670        .len(5)
1671        .add_buffer(a_value_offsets)
1672        .add_child_data(a_values.into_data())
1673        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1674        .build()
1675        .unwrap();
1676        let a = ListArray::from(a_list_data);
1677
1678        // build a record batch
1679        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1680
1681        assert_eq!(batch.column(0).null_count(), 1);
1682
1683        // This test fails if the max row group size is less than the batch's length
1684        // see https://github.com/apache/arrow-rs/issues/518
1685        roundtrip(batch, None);
1686    }
1687
1688    #[test]
1689    fn arrow_writer_list_non_null() {
1690        // define schema
1691        let schema = Schema::new(vec![Field::new(
1692            "a",
1693            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1694            false,
1695        )]);
1696
1697        // create some data
1698        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1699
1700        // Construct a buffer for value offsets, for the nested array:
1701        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1702        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1703
1704        // Construct a list array from the above two
1705        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1706            DataType::Int32,
1707            false,
1708        ))))
1709        .len(5)
1710        .add_buffer(a_value_offsets)
1711        .add_child_data(a_values.into_data())
1712        .build()
1713        .unwrap();
1714        let a = ListArray::from(a_list_data);
1715
1716        // build a record batch
1717        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1718
1719        // This test fails if the max row group size is less than the batch's length
1720        // see https://github.com/apache/arrow-rs/issues/518
1721        assert_eq!(batch.column(0).null_count(), 0);
1722
1723        roundtrip(batch, None);
1724    }
1725
1726    #[test]
1727    fn arrow_writer_binary() {
1728        let string_field = Field::new("a", DataType::Utf8, false);
1729        let binary_field = Field::new("b", DataType::Binary, false);
1730        let schema = Schema::new(vec![string_field, binary_field]);
1731
1732        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1733        let raw_binary_values = [
1734            b"foo".to_vec(),
1735            b"bar".to_vec(),
1736            b"baz".to_vec(),
1737            b"quux".to_vec(),
1738        ];
1739        let raw_binary_value_refs = raw_binary_values
1740            .iter()
1741            .map(|x| x.as_slice())
1742            .collect::<Vec<_>>();
1743
1744        let string_values = StringArray::from(raw_string_values.clone());
1745        let binary_values = BinaryArray::from(raw_binary_value_refs);
1746        let batch = RecordBatch::try_new(
1747            Arc::new(schema),
1748            vec![Arc::new(string_values), Arc::new(binary_values)],
1749        )
1750        .unwrap();
1751
1752        roundtrip(batch, Some(SMALL_SIZE / 2));
1753    }
1754
1755    #[test]
1756    fn arrow_writer_binary_view() {
1757        let string_field = Field::new("a", DataType::Utf8View, false);
1758        let binary_field = Field::new("b", DataType::BinaryView, false);
1759        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1760        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1761
1762        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1763        let raw_binary_values = vec![
1764            b"foo".to_vec(),
1765            b"bar".to_vec(),
1766            b"large payload over 12 bytes".to_vec(),
1767            b"lulu".to_vec(),
1768        ];
1769        let nullable_string_values =
1770            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1771
1772        let string_view_values = StringViewArray::from(raw_string_values);
1773        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1774        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1775        let batch = RecordBatch::try_new(
1776            Arc::new(schema),
1777            vec![
1778                Arc::new(string_view_values),
1779                Arc::new(binary_view_values),
1780                Arc::new(nullable_string_view_values),
1781            ],
1782        )
1783        .unwrap();
1784
1785        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1786        roundtrip(batch, None);
1787    }
1788
1789    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1790        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1791        let schema = Schema::new(vec![decimal_field]);
1792
1793        let decimal_values = vec![10_000, 50_000, 0, -100]
1794            .into_iter()
1795            .map(Some)
1796            .collect::<Decimal128Array>()
1797            .with_precision_and_scale(precision, scale)
1798            .unwrap();
1799
1800        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1801    }
1802
1803    #[test]
1804    fn arrow_writer_decimal() {
1805        // int32 to store the decimal value
1806        let batch_int32_decimal = get_decimal_batch(5, 2);
1807        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1808        // int64 to store the decimal value
1809        let batch_int64_decimal = get_decimal_batch(12, 2);
1810        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1811        // fixed_length_byte_array to store the decimal value
1812        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1813        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1814    }
1815
1816    #[test]
1817    fn arrow_writer_complex() {
1818        // define schema
1819        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1820        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1821        let struct_field_g = Arc::new(Field::new_list(
1822            "g",
1823            Field::new_list_field(DataType::Int16, true),
1824            false,
1825        ));
1826        let struct_field_h = Arc::new(Field::new_list(
1827            "h",
1828            Field::new_list_field(DataType::Int16, false),
1829            true,
1830        ));
1831        let struct_field_e = Arc::new(Field::new_struct(
1832            "e",
1833            vec![
1834                struct_field_f.clone(),
1835                struct_field_g.clone(),
1836                struct_field_h.clone(),
1837            ],
1838            false,
1839        ));
1840        let schema = Schema::new(vec![
1841            Field::new("a", DataType::Int32, false),
1842            Field::new("b", DataType::Int32, true),
1843            Field::new_struct(
1844                "c",
1845                vec![struct_field_d.clone(), struct_field_e.clone()],
1846                false,
1847            ),
1848        ]);
1849
1850        // create some data
1851        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1852        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1853        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1854        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1855
1856        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1857
1858        // Construct a buffer for value offsets, for the nested array:
1859        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1860        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1861
1862        // Construct a list array from the above two
1863        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1864            .len(5)
1865            .add_buffer(g_value_offsets.clone())
1866            .add_child_data(g_value.to_data())
1867            .build()
1868            .unwrap();
1869        let g = ListArray::from(g_list_data);
1870        // The difference between g and h is that h has a null bitmap
1871        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1872            .len(5)
1873            .add_buffer(g_value_offsets)
1874            .add_child_data(g_value.to_data())
1875            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1876            .build()
1877            .unwrap();
1878        let h = ListArray::from(h_list_data);
1879
1880        let e = StructArray::from(vec![
1881            (struct_field_f, Arc::new(f) as ArrayRef),
1882            (struct_field_g, Arc::new(g) as ArrayRef),
1883            (struct_field_h, Arc::new(h) as ArrayRef),
1884        ]);
1885
1886        let c = StructArray::from(vec![
1887            (struct_field_d, Arc::new(d) as ArrayRef),
1888            (struct_field_e, Arc::new(e) as ArrayRef),
1889        ]);
1890
1891        // build a record batch
1892        let batch = RecordBatch::try_new(
1893            Arc::new(schema),
1894            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1895        )
1896        .unwrap();
1897
1898        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1899        roundtrip(batch, Some(SMALL_SIZE / 3));
1900    }
1901
1902    #[test]
1903    fn arrow_writer_complex_mixed() {
1904        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
1905        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
1906
1907        // define schema
1908        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1909        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1910        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1911        let schema = Schema::new(vec![Field::new(
1912            "some_nested_object",
1913            DataType::Struct(Fields::from(vec![
1914                offset_field.clone(),
1915                partition_field.clone(),
1916                topic_field.clone(),
1917            ])),
1918            false,
1919        )]);
1920
1921        // create some data
1922        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1923        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1924        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1925
1926        let some_nested_object = StructArray::from(vec![
1927            (offset_field, Arc::new(offset) as ArrayRef),
1928            (partition_field, Arc::new(partition) as ArrayRef),
1929            (topic_field, Arc::new(topic) as ArrayRef),
1930        ]);
1931
1932        // build a record batch
1933        let batch =
1934            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1935
1936        roundtrip(batch, Some(SMALL_SIZE / 2));
1937    }
1938
1939    #[test]
1940    fn arrow_writer_map() {
1941        // Note: we are using the JSON Arrow reader for brevity
1942        let json_content = r#"
1943        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1944        {"stocks":{"long": null, "long": "$CCC", "short": null}}
1945        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1946        "#;
1947        let entries_struct_type = DataType::Struct(Fields::from(vec![
1948            Field::new("key", DataType::Utf8, false),
1949            Field::new("value", DataType::Utf8, true),
1950        ]));
1951        let stocks_field = Field::new(
1952            "stocks",
1953            DataType::Map(
1954                Arc::new(Field::new("entries", entries_struct_type, false)),
1955                false,
1956            ),
1957            true,
1958        );
1959        let schema = Arc::new(Schema::new(vec![stocks_field]));
1960        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1961        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1962
1963        let batch = reader.next().unwrap().unwrap();
1964        roundtrip(batch, None);
1965    }
1966
1967    #[test]
1968    fn arrow_writer_2_level_struct() {
1969        // tests writing <struct<struct<primitive>>
1970        let field_c = Field::new("c", DataType::Int32, true);
1971        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1972        let type_a = DataType::Struct(vec![field_b.clone()].into());
1973        let field_a = Field::new("a", type_a, true);
1974        let schema = Schema::new(vec![field_a.clone()]);
1975
1976        // create data
1977        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1978        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1979            .len(6)
1980            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1981            .add_child_data(c.into_data())
1982            .build()
1983            .unwrap();
1984        let b = StructArray::from(b_data);
1985        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1986            .len(6)
1987            .null_bit_buffer(Some(Buffer::from([0b00101111])))
1988            .add_child_data(b.into_data())
1989            .build()
1990            .unwrap();
1991        let a = StructArray::from(a_data);
1992
1993        assert_eq!(a.null_count(), 1);
1994        assert_eq!(a.column(0).null_count(), 2);
1995
1996        // build a racord batch
1997        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1998
1999        roundtrip(batch, Some(SMALL_SIZE / 2));
2000    }
2001
2002    #[test]
2003    fn arrow_writer_2_level_struct_non_null() {
2004        // tests writing <struct<struct<primitive>>
2005        let field_c = Field::new("c", DataType::Int32, false);
2006        let type_b = DataType::Struct(vec![field_c].into());
2007        let field_b = Field::new("b", type_b.clone(), false);
2008        let type_a = DataType::Struct(vec![field_b].into());
2009        let field_a = Field::new("a", type_a.clone(), false);
2010        let schema = Schema::new(vec![field_a]);
2011
2012        // create data
2013        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2014        let b_data = ArrayDataBuilder::new(type_b)
2015            .len(6)
2016            .add_child_data(c.into_data())
2017            .build()
2018            .unwrap();
2019        let b = StructArray::from(b_data);
2020        let a_data = ArrayDataBuilder::new(type_a)
2021            .len(6)
2022            .add_child_data(b.into_data())
2023            .build()
2024            .unwrap();
2025        let a = StructArray::from(a_data);
2026
2027        assert_eq!(a.null_count(), 0);
2028        assert_eq!(a.column(0).null_count(), 0);
2029
2030        // build a racord batch
2031        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2032
2033        roundtrip(batch, Some(SMALL_SIZE / 2));
2034    }
2035
2036    #[test]
2037    fn arrow_writer_2_level_struct_mixed_null() {
2038        // tests writing <struct<struct<primitive>>
2039        let field_c = Field::new("c", DataType::Int32, false);
2040        let type_b = DataType::Struct(vec![field_c].into());
2041        let field_b = Field::new("b", type_b.clone(), true);
2042        let type_a = DataType::Struct(vec![field_b].into());
2043        let field_a = Field::new("a", type_a.clone(), false);
2044        let schema = Schema::new(vec![field_a]);
2045
2046        // create data
2047        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2048        let b_data = ArrayDataBuilder::new(type_b)
2049            .len(6)
2050            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2051            .add_child_data(c.into_data())
2052            .build()
2053            .unwrap();
2054        let b = StructArray::from(b_data);
2055        // a intentionally has no null buffer, to test that this is handled correctly
2056        let a_data = ArrayDataBuilder::new(type_a)
2057            .len(6)
2058            .add_child_data(b.into_data())
2059            .build()
2060            .unwrap();
2061        let a = StructArray::from(a_data);
2062
2063        assert_eq!(a.null_count(), 0);
2064        assert_eq!(a.column(0).null_count(), 2);
2065
2066        // build a racord batch
2067        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2068
2069        roundtrip(batch, Some(SMALL_SIZE / 2));
2070    }
2071
2072    #[test]
2073    fn arrow_writer_2_level_struct_mixed_null_2() {
2074        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
2075        let field_c = Field::new("c", DataType::Int32, false);
2076        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2077        let field_e = Field::new(
2078            "e",
2079            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2080            false,
2081        );
2082
2083        let field_b = Field::new(
2084            "b",
2085            DataType::Struct(vec![field_c, field_d, field_e].into()),
2086            false,
2087        );
2088        let type_a = DataType::Struct(vec![field_b.clone()].into());
2089        let field_a = Field::new("a", type_a, true);
2090        let schema = Schema::new(vec![field_a.clone()]);
2091
2092        // create data
2093        let c = Int32Array::from_iter_values(0..6);
2094        let d = FixedSizeBinaryArray::try_from_iter(
2095            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2096        )
2097        .expect("four byte values");
2098        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2099        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2100            .len(6)
2101            .add_child_data(c.into_data())
2102            .add_child_data(d.into_data())
2103            .add_child_data(e.into_data())
2104            .build()
2105            .unwrap();
2106        let b = StructArray::from(b_data);
2107        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2108            .len(6)
2109            .null_bit_buffer(Some(Buffer::from([0b00100101])))
2110            .add_child_data(b.into_data())
2111            .build()
2112            .unwrap();
2113        let a = StructArray::from(a_data);
2114
2115        assert_eq!(a.null_count(), 3);
2116        assert_eq!(a.column(0).null_count(), 0);
2117
2118        // build a record batch
2119        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2120
2121        roundtrip(batch, Some(SMALL_SIZE / 2));
2122    }
2123
2124    #[test]
2125    fn test_fixed_size_binary_in_dict() {
2126        fn test_fixed_size_binary_in_dict_inner<K>()
2127        where
2128            K: ArrowDictionaryKeyType,
2129            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2130            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2131        {
2132            let field = Field::new(
2133                "a",
2134                DataType::Dictionary(
2135                    Box::new(K::DATA_TYPE),
2136                    Box::new(DataType::FixedSizeBinary(4)),
2137                ),
2138                false,
2139            );
2140            let schema = Schema::new(vec![field]);
2141
2142            let keys: Vec<K::Native> = vec![
2143                K::Native::try_from(0u8).unwrap(),
2144                K::Native::try_from(0u8).unwrap(),
2145                K::Native::try_from(1u8).unwrap(),
2146            ];
2147            let keys = PrimitiveArray::<K>::from_iter_values(keys);
2148            let values = FixedSizeBinaryArray::try_from_iter(
2149                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2150            )
2151            .unwrap();
2152
2153            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2154            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2155            roundtrip(batch, None);
2156        }
2157
2158        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2159        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2160        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2161        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2162        test_fixed_size_binary_in_dict_inner::<Int8Type>();
2163        test_fixed_size_binary_in_dict_inner::<Int16Type>();
2164        test_fixed_size_binary_in_dict_inner::<Int32Type>();
2165        test_fixed_size_binary_in_dict_inner::<Int64Type>();
2166    }
2167
2168    #[test]
2169    fn test_empty_dict() {
2170        let struct_fields = Fields::from(vec![Field::new(
2171            "dict",
2172            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2173            false,
2174        )]);
2175
2176        let schema = Schema::new(vec![Field::new_struct(
2177            "struct",
2178            struct_fields.clone(),
2179            true,
2180        )]);
2181        let dictionary = Arc::new(DictionaryArray::new(
2182            Int32Array::new_null(5),
2183            Arc::new(StringArray::new_null(0)),
2184        ));
2185
2186        let s = StructArray::new(
2187            struct_fields,
2188            vec![dictionary],
2189            Some(NullBuffer::new_null(5)),
2190        );
2191
2192        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2193        roundtrip(batch, None);
2194    }
2195    #[test]
2196    fn arrow_writer_page_size() {
2197        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2198
2199        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2200
2201        // Generate an array of 10 unique 10 character string
2202        for i in 0..10 {
2203            let value = i
2204                .to_string()
2205                .repeat(10)
2206                .chars()
2207                .take(10)
2208                .collect::<String>();
2209
2210            builder.append_value(value);
2211        }
2212
2213        let array = Arc::new(builder.finish());
2214
2215        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2216
2217        let file = tempfile::tempfile().unwrap();
2218
2219        // Set everything very low so we fallback to PLAIN encoding after the first row
2220        let props = WriterProperties::builder()
2221            .set_data_page_size_limit(1)
2222            .set_dictionary_page_size_limit(1)
2223            .set_write_batch_size(1)
2224            .build();
2225
2226        let mut writer =
2227            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2228                .expect("Unable to write file");
2229        writer.write(&batch).unwrap();
2230        writer.close().unwrap();
2231
2232        let options = ReadOptionsBuilder::new().with_page_index().build();
2233        let reader =
2234            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2235
2236        let column = reader.metadata().row_group(0).columns();
2237
2238        assert_eq!(column.len(), 1);
2239
2240        // We should write one row before falling back to PLAIN encoding so there should still be a
2241        // dictionary page.
2242        assert!(
2243            column[0].dictionary_page_offset().is_some(),
2244            "Expected a dictionary page"
2245        );
2246
2247        assert!(reader.metadata().offset_index().is_some());
2248        let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2249
2250        let page_locations = offset_indexes[0].page_locations.clone();
2251
2252        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
2253        // so we expect one dictionary encoded page and then a page per row thereafter.
2254        assert_eq!(
2255            page_locations.len(),
2256            10,
2257            "Expected 10 pages but got {page_locations:#?}"
2258        );
2259    }
2260
2261    #[test]
2262    fn arrow_writer_float_nans() {
2263        let f16_field = Field::new("a", DataType::Float16, false);
2264        let f32_field = Field::new("b", DataType::Float32, false);
2265        let f64_field = Field::new("c", DataType::Float64, false);
2266        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2267
2268        let f16_values = (0..MEDIUM_SIZE)
2269            .map(|i| {
2270                Some(if i % 2 == 0 {
2271                    f16::NAN
2272                } else {
2273                    f16::from_f32(i as f32)
2274                })
2275            })
2276            .collect::<Float16Array>();
2277
2278        let f32_values = (0..MEDIUM_SIZE)
2279            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2280            .collect::<Float32Array>();
2281
2282        let f64_values = (0..MEDIUM_SIZE)
2283            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2284            .collect::<Float64Array>();
2285
2286        let batch = RecordBatch::try_new(
2287            Arc::new(schema),
2288            vec![
2289                Arc::new(f16_values),
2290                Arc::new(f32_values),
2291                Arc::new(f64_values),
2292            ],
2293        )
2294        .unwrap();
2295
2296        roundtrip(batch, None);
2297    }
2298
2299    const SMALL_SIZE: usize = 7;
2300    const MEDIUM_SIZE: usize = 63;
2301
2302    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
2303        let mut files = vec![];
2304        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2305            let mut props = WriterProperties::builder().set_writer_version(version);
2306
2307            if let Some(size) = max_row_group_size {
2308                props = props.set_max_row_group_size(size)
2309            }
2310
2311            let props = props.build();
2312            files.push(roundtrip_opts(&expected_batch, props))
2313        }
2314        files
2315    }
2316
2317    // Round trip the specified record batch with the specified writer properties,
2318    // to an in-memory file, and validate the arrays using the specified function.
2319    // Returns the in-memory file.
2320    fn roundtrip_opts_with_array_validation<F>(
2321        expected_batch: &RecordBatch,
2322        props: WriterProperties,
2323        validate: F,
2324    ) -> Bytes
2325    where
2326        F: Fn(&ArrayData, &ArrayData),
2327    {
2328        let mut file = vec![];
2329
2330        let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
2331            .expect("Unable to write file");
2332        writer.write(expected_batch).unwrap();
2333        writer.close().unwrap();
2334
2335        let file = Bytes::from(file);
2336        let mut record_batch_reader =
2337            ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
2338
2339        let actual_batch = record_batch_reader
2340            .next()
2341            .expect("No batch found")
2342            .expect("Unable to get batch");
2343
2344        assert_eq!(expected_batch.schema(), actual_batch.schema());
2345        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2346        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2347        for i in 0..expected_batch.num_columns() {
2348            let expected_data = expected_batch.column(i).to_data();
2349            let actual_data = actual_batch.column(i).to_data();
2350            validate(&expected_data, &actual_data);
2351        }
2352
2353        file
2354    }
2355
2356    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
2357        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2358            a.validate_full().expect("valid expected data");
2359            b.validate_full().expect("valid actual data");
2360            assert_eq!(a, b)
2361        })
2362    }
2363
2364    struct RoundTripOptions {
2365        values: ArrayRef,
2366        schema: SchemaRef,
2367        bloom_filter: bool,
2368        bloom_filter_position: BloomFilterPosition,
2369    }
2370
2371    impl RoundTripOptions {
2372        fn new(values: ArrayRef, nullable: bool) -> Self {
2373            let data_type = values.data_type().clone();
2374            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2375            Self {
2376                values,
2377                schema: Arc::new(schema),
2378                bloom_filter: false,
2379                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2380            }
2381        }
2382    }
2383
2384    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
2385        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2386    }
2387
2388    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
2389        let mut options = RoundTripOptions::new(values, false);
2390        options.schema = schema;
2391        one_column_roundtrip_with_options(options)
2392    }
2393
2394    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
2395        let RoundTripOptions {
2396            values,
2397            schema,
2398            bloom_filter,
2399            bloom_filter_position,
2400        } = options;
2401
2402        let encodings = match values.data_type() {
2403            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2404                vec![
2405                    Encoding::PLAIN,
2406                    Encoding::DELTA_BYTE_ARRAY,
2407                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
2408                ]
2409            }
2410            DataType::Int64
2411            | DataType::Int32
2412            | DataType::Int16
2413            | DataType::Int8
2414            | DataType::UInt64
2415            | DataType::UInt32
2416            | DataType::UInt16
2417            | DataType::UInt8 => vec![
2418                Encoding::PLAIN,
2419                Encoding::DELTA_BINARY_PACKED,
2420                Encoding::BYTE_STREAM_SPLIT,
2421            ],
2422            DataType::Float32 | DataType::Float64 => {
2423                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2424            }
2425            _ => vec![Encoding::PLAIN],
2426        };
2427
2428        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2429
2430        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2431
2432        let mut files = vec![];
2433        for dictionary_size in [0, 1, 1024] {
2434            for encoding in &encodings {
2435                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2436                    for row_group_size in row_group_sizes {
2437                        let props = WriterProperties::builder()
2438                            .set_writer_version(version)
2439                            .set_max_row_group_size(row_group_size)
2440                            .set_dictionary_enabled(dictionary_size != 0)
2441                            .set_dictionary_page_size_limit(dictionary_size.max(1))
2442                            .set_encoding(*encoding)
2443                            .set_bloom_filter_enabled(bloom_filter)
2444                            .set_bloom_filter_position(bloom_filter_position)
2445                            .build();
2446
2447                        files.push(roundtrip_opts(&expected_batch, props))
2448                    }
2449                }
2450            }
2451        }
2452        files
2453    }
2454
2455    fn values_required<A, I>(iter: I) -> Vec<Bytes>
2456    where
2457        A: From<Vec<I::Item>> + Array + 'static,
2458        I: IntoIterator,
2459    {
2460        let raw_values: Vec<_> = iter.into_iter().collect();
2461        let values = Arc::new(A::from(raw_values));
2462        one_column_roundtrip(values, false)
2463    }
2464
2465    fn values_optional<A, I>(iter: I) -> Vec<Bytes>
2466    where
2467        A: From<Vec<Option<I::Item>>> + Array + 'static,
2468        I: IntoIterator,
2469    {
2470        let optional_raw_values: Vec<_> = iter
2471            .into_iter()
2472            .enumerate()
2473            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2474            .collect();
2475        let optional_values = Arc::new(A::from(optional_raw_values));
2476        one_column_roundtrip(optional_values, true)
2477    }
2478
2479    fn required_and_optional<A, I>(iter: I)
2480    where
2481        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2482        I: IntoIterator + Clone,
2483    {
2484        values_required::<A, I>(iter.clone());
2485        values_optional::<A, I>(iter);
2486    }
2487
2488    fn check_bloom_filter<T: AsBytes>(
2489        files: Vec<Bytes>,
2490        file_column: String,
2491        positive_values: Vec<T>,
2492        negative_values: Vec<T>,
2493    ) {
2494        files.into_iter().take(1).for_each(|file| {
2495            let file_reader = SerializedFileReader::new_with_options(
2496                file,
2497                ReadOptionsBuilder::new()
2498                    .with_reader_properties(
2499                        ReaderProperties::builder()
2500                            .set_read_bloom_filter(true)
2501                            .build(),
2502                    )
2503                    .build(),
2504            )
2505            .expect("Unable to open file as Parquet");
2506            let metadata = file_reader.metadata();
2507
2508            // Gets bloom filters from all row groups.
2509            let mut bloom_filters: Vec<_> = vec![];
2510            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2511                if let Some((column_index, _)) = row_group
2512                    .columns()
2513                    .iter()
2514                    .enumerate()
2515                    .find(|(_, column)| column.column_path().string() == file_column)
2516                {
2517                    let row_group_reader = file_reader
2518                        .get_row_group(ri)
2519                        .expect("Unable to read row group");
2520                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2521                        bloom_filters.push(sbbf.clone());
2522                    } else {
2523                        panic!("No bloom filter for column named {file_column} found");
2524                    }
2525                } else {
2526                    panic!("No column named {file_column} found");
2527                }
2528            }
2529
2530            positive_values.iter().for_each(|value| {
2531                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2532                assert!(
2533                    found.is_some(),
2534                    "{}",
2535                    format!("Value {:?} should be in bloom filter", value.as_bytes())
2536                );
2537            });
2538
2539            negative_values.iter().for_each(|value| {
2540                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2541                assert!(
2542                    found.is_none(),
2543                    "{}",
2544                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2545                );
2546            });
2547        });
2548    }
2549
2550    #[test]
2551    fn all_null_primitive_single_column() {
2552        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2553        one_column_roundtrip(values, true);
2554    }
2555    #[test]
2556    fn null_single_column() {
2557        let values = Arc::new(NullArray::new(SMALL_SIZE));
2558        one_column_roundtrip(values, true);
2559        // null arrays are always nullable, a test with non-nullable nulls fails
2560    }
2561
2562    #[test]
2563    fn bool_single_column() {
2564        required_and_optional::<BooleanArray, _>(
2565            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2566        );
2567    }
2568
2569    #[test]
2570    fn bool_large_single_column() {
2571        let values = Arc::new(
2572            [None, Some(true), Some(false)]
2573                .iter()
2574                .cycle()
2575                .copied()
2576                .take(200_000)
2577                .collect::<BooleanArray>(),
2578        );
2579        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2580        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2581        let file = tempfile::tempfile().unwrap();
2582
2583        let mut writer =
2584            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2585                .expect("Unable to write file");
2586        writer.write(&expected_batch).unwrap();
2587        writer.close().unwrap();
2588    }
2589
2590    #[test]
2591    fn check_page_offset_index_with_nan() {
2592        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2593        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2594        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2595
2596        let mut out = Vec::with_capacity(1024);
2597        let mut writer =
2598            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2599        writer.write(&batch).unwrap();
2600        let file_meta_data = writer.close().unwrap();
2601        for row_group in file_meta_data.row_groups() {
2602            for column in row_group.columns() {
2603                assert!(column.offset_index_offset().is_some());
2604                assert!(column.offset_index_length().is_some());
2605                assert!(column.column_index_offset().is_none());
2606                assert!(column.column_index_length().is_none());
2607            }
2608        }
2609    }
2610
2611    #[test]
2612    fn i8_single_column() {
2613        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2614    }
2615
2616    #[test]
2617    fn i16_single_column() {
2618        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2619    }
2620
2621    #[test]
2622    fn i32_single_column() {
2623        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2624    }
2625
2626    #[test]
2627    fn i64_single_column() {
2628        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2629    }
2630
2631    #[test]
2632    fn u8_single_column() {
2633        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2634    }
2635
2636    #[test]
2637    fn u16_single_column() {
2638        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2639    }
2640
2641    #[test]
2642    fn u32_single_column() {
2643        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2644    }
2645
2646    #[test]
2647    fn u64_single_column() {
2648        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2649    }
2650
2651    #[test]
2652    fn f32_single_column() {
2653        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2654    }
2655
2656    #[test]
2657    fn f64_single_column() {
2658        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2659    }
2660
2661    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
2662    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
2663    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
2664
2665    #[test]
2666    fn timestamp_second_single_column() {
2667        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2668        let values = Arc::new(TimestampSecondArray::from(raw_values));
2669
2670        one_column_roundtrip(values, false);
2671    }
2672
2673    #[test]
2674    fn timestamp_millisecond_single_column() {
2675        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2676        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2677
2678        one_column_roundtrip(values, false);
2679    }
2680
2681    #[test]
2682    fn timestamp_microsecond_single_column() {
2683        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2684        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2685
2686        one_column_roundtrip(values, false);
2687    }
2688
2689    #[test]
2690    fn timestamp_nanosecond_single_column() {
2691        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2692        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2693
2694        one_column_roundtrip(values, false);
2695    }
2696
2697    #[test]
2698    fn date32_single_column() {
2699        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2700    }
2701
2702    #[test]
2703    fn date64_single_column() {
2704        // Date64 must be a multiple of 86400000, see ARROW-10925
2705        required_and_optional::<Date64Array, _>(
2706            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2707        );
2708    }
2709
2710    #[test]
2711    fn time32_second_single_column() {
2712        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2713    }
2714
2715    #[test]
2716    fn time32_millisecond_single_column() {
2717        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2718    }
2719
2720    #[test]
2721    fn time64_microsecond_single_column() {
2722        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2723    }
2724
2725    #[test]
2726    fn time64_nanosecond_single_column() {
2727        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2728    }
2729
2730    #[test]
2731    fn duration_second_single_column() {
2732        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2733    }
2734
2735    #[test]
2736    fn duration_millisecond_single_column() {
2737        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2738    }
2739
2740    #[test]
2741    fn duration_microsecond_single_column() {
2742        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2743    }
2744
2745    #[test]
2746    fn duration_nanosecond_single_column() {
2747        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2748    }
2749
2750    #[test]
2751    fn interval_year_month_single_column() {
2752        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2753    }
2754
2755    #[test]
2756    fn interval_day_time_single_column() {
2757        required_and_optional::<IntervalDayTimeArray, _>(vec![
2758            IntervalDayTime::new(0, 1),
2759            IntervalDayTime::new(0, 3),
2760            IntervalDayTime::new(3, -2),
2761            IntervalDayTime::new(-200, 4),
2762        ]);
2763    }
2764
2765    #[test]
2766    #[should_panic(
2767        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2768    )]
2769    fn interval_month_day_nano_single_column() {
2770        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2771            IntervalMonthDayNano::new(0, 1, 5),
2772            IntervalMonthDayNano::new(0, 3, 2),
2773            IntervalMonthDayNano::new(3, -2, -5),
2774            IntervalMonthDayNano::new(-200, 4, -1),
2775        ]);
2776    }
2777
2778    #[test]
2779    fn binary_single_column() {
2780        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2781        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2782        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2783
2784        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2785        values_required::<BinaryArray, _>(many_vecs_iter);
2786    }
2787
2788    #[test]
2789    fn binary_view_single_column() {
2790        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2791        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2792        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2793
2794        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2795        values_required::<BinaryViewArray, _>(many_vecs_iter);
2796    }
2797
2798    #[test]
2799    fn i32_column_bloom_filter_at_end() {
2800        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2801        let mut options = RoundTripOptions::new(array, false);
2802        options.bloom_filter = true;
2803        options.bloom_filter_position = BloomFilterPosition::End;
2804
2805        let files = one_column_roundtrip_with_options(options);
2806        check_bloom_filter(
2807            files,
2808            "col".to_string(),
2809            (0..SMALL_SIZE as i32).collect(),
2810            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2811        );
2812    }
2813
2814    #[test]
2815    fn i32_column_bloom_filter() {
2816        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2817        let mut options = RoundTripOptions::new(array, false);
2818        options.bloom_filter = true;
2819
2820        let files = one_column_roundtrip_with_options(options);
2821        check_bloom_filter(
2822            files,
2823            "col".to_string(),
2824            (0..SMALL_SIZE as i32).collect(),
2825            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2826        );
2827    }
2828
2829    #[test]
2830    fn binary_column_bloom_filter() {
2831        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2832        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2833        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2834
2835        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2836        let mut options = RoundTripOptions::new(array, false);
2837        options.bloom_filter = true;
2838
2839        let files = one_column_roundtrip_with_options(options);
2840        check_bloom_filter(
2841            files,
2842            "col".to_string(),
2843            many_vecs,
2844            vec![vec![(SMALL_SIZE + 1) as u8]],
2845        );
2846    }
2847
2848    #[test]
2849    fn empty_string_null_column_bloom_filter() {
2850        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2851        let raw_strs = raw_values.iter().map(|s| s.as_str());
2852
2853        let array = Arc::new(StringArray::from_iter_values(raw_strs));
2854        let mut options = RoundTripOptions::new(array, false);
2855        options.bloom_filter = true;
2856
2857        let files = one_column_roundtrip_with_options(options);
2858
2859        let optional_raw_values: Vec<_> = raw_values
2860            .iter()
2861            .enumerate()
2862            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2863            .collect();
2864        // For null slots, empty string should not be in bloom filter.
2865        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2866    }
2867
2868    #[test]
2869    fn large_binary_single_column() {
2870        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2871        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2872        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2873
2874        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2875        values_required::<LargeBinaryArray, _>(many_vecs_iter);
2876    }
2877
2878    #[test]
2879    fn fixed_size_binary_single_column() {
2880        let mut builder = FixedSizeBinaryBuilder::new(4);
2881        builder.append_value(b"0123").unwrap();
2882        builder.append_null();
2883        builder.append_value(b"8910").unwrap();
2884        builder.append_value(b"1112").unwrap();
2885        let array = Arc::new(builder.finish());
2886
2887        one_column_roundtrip(array, true);
2888    }
2889
2890    #[test]
2891    fn string_single_column() {
2892        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2893        let raw_strs = raw_values.iter().map(|s| s.as_str());
2894
2895        required_and_optional::<StringArray, _>(raw_strs);
2896    }
2897
2898    #[test]
2899    fn large_string_single_column() {
2900        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2901        let raw_strs = raw_values.iter().map(|s| s.as_str());
2902
2903        required_and_optional::<LargeStringArray, _>(raw_strs);
2904    }
2905
2906    #[test]
2907    fn string_view_single_column() {
2908        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2909        let raw_strs = raw_values.iter().map(|s| s.as_str());
2910
2911        required_and_optional::<StringViewArray, _>(raw_strs);
2912    }
2913
2914    #[test]
2915    fn null_list_single_column() {
2916        let null_field = Field::new_list_field(DataType::Null, true);
2917        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2918
2919        let schema = Schema::new(vec![list_field]);
2920
2921        // Build [[], null, [null, null]]
2922        let a_values = NullArray::new(2);
2923        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2924        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2925            DataType::Null,
2926            true,
2927        ))))
2928        .len(3)
2929        .add_buffer(a_value_offsets)
2930        .null_bit_buffer(Some(Buffer::from([0b00000101])))
2931        .add_child_data(a_values.into_data())
2932        .build()
2933        .unwrap();
2934
2935        let a = ListArray::from(a_list_data);
2936
2937        assert!(a.is_valid(0));
2938        assert!(!a.is_valid(1));
2939        assert!(a.is_valid(2));
2940
2941        assert_eq!(a.value(0).len(), 0);
2942        assert_eq!(a.value(2).len(), 2);
2943        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2944
2945        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2946        roundtrip(batch, None);
2947    }
2948
2949    #[test]
2950    fn list_single_column() {
2951        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2952        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2953        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2954            DataType::Int32,
2955            false,
2956        ))))
2957        .len(5)
2958        .add_buffer(a_value_offsets)
2959        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2960        .add_child_data(a_values.into_data())
2961        .build()
2962        .unwrap();
2963
2964        assert_eq!(a_list_data.null_count(), 1);
2965
2966        let a = ListArray::from(a_list_data);
2967        let values = Arc::new(a);
2968
2969        one_column_roundtrip(values, true);
2970    }
2971
2972    #[test]
2973    fn large_list_single_column() {
2974        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2975        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2976        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2977            "large_item",
2978            DataType::Int32,
2979            true,
2980        ))))
2981        .len(5)
2982        .add_buffer(a_value_offsets)
2983        .add_child_data(a_values.into_data())
2984        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2985        .build()
2986        .unwrap();
2987
2988        // I think this setup is incorrect because this should pass
2989        assert_eq!(a_list_data.null_count(), 1);
2990
2991        let a = LargeListArray::from(a_list_data);
2992        let values = Arc::new(a);
2993
2994        one_column_roundtrip(values, true);
2995    }
2996
2997    #[test]
2998    fn list_nested_nulls() {
2999        use arrow::datatypes::Int32Type;
3000        let data = vec![
3001            Some(vec![Some(1)]),
3002            Some(vec![Some(2), Some(3)]),
3003            None,
3004            Some(vec![Some(4), Some(5), None]),
3005            Some(vec![None]),
3006            Some(vec![Some(6), Some(7)]),
3007        ];
3008
3009        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3010        one_column_roundtrip(Arc::new(list), true);
3011
3012        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3013        one_column_roundtrip(Arc::new(list), true);
3014    }
3015
3016    #[test]
3017    fn struct_single_column() {
3018        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3019        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3020        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3021
3022        let values = Arc::new(s);
3023        one_column_roundtrip(values, false);
3024    }
3025
3026    #[test]
3027    fn list_and_map_coerced_names() {
3028        // Create map and list with non-Parquet naming
3029        let list_field =
3030            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3031        let map_field = Field::new_map(
3032            "my_map",
3033            "entries",
3034            Field::new("keys", DataType::Int32, false),
3035            Field::new("values", DataType::Int32, true),
3036            false,
3037            true,
3038        );
3039
3040        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3041        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3042
3043        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3044
3045        // Write data to Parquet but coerce names to match spec
3046        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3047        let file = tempfile::tempfile().unwrap();
3048        let mut writer =
3049            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3050
3051        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3052        writer.write(&batch).unwrap();
3053        let file_metadata = writer.close().unwrap();
3054
3055        let schema = file_metadata.file_metadata().schema();
3056        // Coerced name of "item" should be "element"
3057        let list_field = &schema.get_fields()[0].get_fields()[0];
3058        assert_eq!(list_field.get_fields()[0].name(), "element");
3059
3060        let map_field = &schema.get_fields()[1].get_fields()[0];
3061        // Coerced name of "entries" should be "key_value"
3062        assert_eq!(map_field.name(), "key_value");
3063        // Coerced name of "keys" should be "key"
3064        assert_eq!(map_field.get_fields()[0].name(), "key");
3065        // Coerced name of "values" should be "value"
3066        assert_eq!(map_field.get_fields()[1].name(), "value");
3067
3068        // Double check schema after reading from the file
3069        let reader = SerializedFileReader::new(file).unwrap();
3070        let file_schema = reader.metadata().file_metadata().schema();
3071        let fields = file_schema.get_fields();
3072        let list_field = &fields[0].get_fields()[0];
3073        assert_eq!(list_field.get_fields()[0].name(), "element");
3074        let map_field = &fields[1].get_fields()[0];
3075        assert_eq!(map_field.name(), "key_value");
3076        assert_eq!(map_field.get_fields()[0].name(), "key");
3077        assert_eq!(map_field.get_fields()[1].name(), "value");
3078    }
3079
3080    #[test]
3081    fn fallback_flush_data_page() {
3082        //tests if the Fallback::flush_data_page clears all buffers correctly
3083        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3084        let values = Arc::new(StringArray::from(raw_values));
3085        let encodings = vec![
3086            Encoding::DELTA_BYTE_ARRAY,
3087            Encoding::DELTA_LENGTH_BYTE_ARRAY,
3088        ];
3089        let data_type = values.data_type().clone();
3090        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3091        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3092
3093        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3094        let data_page_size_limit: usize = 32;
3095        let write_batch_size: usize = 16;
3096
3097        for encoding in &encodings {
3098            for row_group_size in row_group_sizes {
3099                let props = WriterProperties::builder()
3100                    .set_writer_version(WriterVersion::PARQUET_2_0)
3101                    .set_max_row_group_size(row_group_size)
3102                    .set_dictionary_enabled(false)
3103                    .set_encoding(*encoding)
3104                    .set_data_page_size_limit(data_page_size_limit)
3105                    .set_write_batch_size(write_batch_size)
3106                    .build();
3107
3108                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3109                    let string_array_a = StringArray::from(a.clone());
3110                    let string_array_b = StringArray::from(b.clone());
3111                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3112                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3113                    assert_eq!(
3114                        vec_a, vec_b,
3115                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3116                    );
3117                });
3118            }
3119        }
3120    }
3121
3122    #[test]
3123    fn arrow_writer_string_dictionary() {
3124        // define schema
3125        #[allow(deprecated)]
3126        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3127            "dictionary",
3128            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3129            true,
3130            42,
3131            true,
3132        )]));
3133
3134        // create some data
3135        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3136            .iter()
3137            .copied()
3138            .collect();
3139
3140        // build a record batch
3141        one_column_roundtrip_with_schema(Arc::new(d), schema);
3142    }
3143
3144    #[test]
3145    fn arrow_writer_test_type_compatibility() {
3146        fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3147        where
3148            T1: Array + 'static,
3149            T2: Array + 'static,
3150        {
3151            let schema1 = Arc::new(Schema::new(vec![Field::new(
3152                "a",
3153                array1.data_type().clone(),
3154                false,
3155            )]));
3156
3157            let file = tempfile().unwrap();
3158            let mut writer =
3159                ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3160
3161            let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3162            writer.write(&rb1).unwrap();
3163
3164            let schema2 = Arc::new(Schema::new(vec![Field::new(
3165                "a",
3166                array2.data_type().clone(),
3167                false,
3168            )]));
3169            let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3170            writer.write(&rb2).unwrap();
3171
3172            writer.close().unwrap();
3173
3174            let mut record_batch_reader =
3175                ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3176            let actual_batch = record_batch_reader.next().unwrap().unwrap();
3177
3178            let expected_batch =
3179                RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3180            assert_eq!(actual_batch, expected_batch);
3181        }
3182
3183        // check compatibility between native and dictionaries
3184
3185        ensure_compatible_write(
3186            DictionaryArray::new(
3187                UInt8Array::from_iter_values(vec![0]),
3188                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3189            ),
3190            StringArray::from_iter_values(vec!["barquet"]),
3191            DictionaryArray::new(
3192                UInt8Array::from_iter_values(vec![0, 1]),
3193                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3194            ),
3195        );
3196
3197        ensure_compatible_write(
3198            StringArray::from_iter_values(vec!["parquet"]),
3199            DictionaryArray::new(
3200                UInt8Array::from_iter_values(vec![0]),
3201                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3202            ),
3203            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3204        );
3205
3206        // check compatibility between dictionaries with different key types
3207
3208        ensure_compatible_write(
3209            DictionaryArray::new(
3210                UInt8Array::from_iter_values(vec![0]),
3211                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3212            ),
3213            DictionaryArray::new(
3214                UInt16Array::from_iter_values(vec![0]),
3215                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3216            ),
3217            DictionaryArray::new(
3218                UInt8Array::from_iter_values(vec![0, 1]),
3219                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3220            ),
3221        );
3222
3223        // check compatibility between dictionaries with different value types
3224        ensure_compatible_write(
3225            DictionaryArray::new(
3226                UInt8Array::from_iter_values(vec![0]),
3227                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3228            ),
3229            DictionaryArray::new(
3230                UInt8Array::from_iter_values(vec![0]),
3231                Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3232            ),
3233            DictionaryArray::new(
3234                UInt8Array::from_iter_values(vec![0, 1]),
3235                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3236            ),
3237        );
3238
3239        // check compatibility between a dictionary and a native array with a different type
3240        ensure_compatible_write(
3241            DictionaryArray::new(
3242                UInt8Array::from_iter_values(vec![0]),
3243                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3244            ),
3245            LargeStringArray::from_iter_values(vec!["barquet"]),
3246            DictionaryArray::new(
3247                UInt8Array::from_iter_values(vec![0, 1]),
3248                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3249            ),
3250        );
3251
3252        // check compatibility for string types
3253
3254        ensure_compatible_write(
3255            StringArray::from_iter_values(vec!["parquet"]),
3256            LargeStringArray::from_iter_values(vec!["barquet"]),
3257            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3258        );
3259
3260        ensure_compatible_write(
3261            LargeStringArray::from_iter_values(vec!["parquet"]),
3262            StringArray::from_iter_values(vec!["barquet"]),
3263            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3264        );
3265
3266        ensure_compatible_write(
3267            StringArray::from_iter_values(vec!["parquet"]),
3268            StringViewArray::from_iter_values(vec!["barquet"]),
3269            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3270        );
3271
3272        ensure_compatible_write(
3273            StringViewArray::from_iter_values(vec!["parquet"]),
3274            StringArray::from_iter_values(vec!["barquet"]),
3275            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3276        );
3277
3278        ensure_compatible_write(
3279            LargeStringArray::from_iter_values(vec!["parquet"]),
3280            StringViewArray::from_iter_values(vec!["barquet"]),
3281            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3282        );
3283
3284        ensure_compatible_write(
3285            StringViewArray::from_iter_values(vec!["parquet"]),
3286            LargeStringArray::from_iter_values(vec!["barquet"]),
3287            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3288        );
3289
3290        // check compatibility for binary types
3291
3292        ensure_compatible_write(
3293            BinaryArray::from_iter_values(vec![b"parquet"]),
3294            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3295            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3296        );
3297
3298        ensure_compatible_write(
3299            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3300            BinaryArray::from_iter_values(vec![b"barquet"]),
3301            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3302        );
3303
3304        ensure_compatible_write(
3305            BinaryArray::from_iter_values(vec![b"parquet"]),
3306            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3307            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3308        );
3309
3310        ensure_compatible_write(
3311            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3312            BinaryArray::from_iter_values(vec![b"barquet"]),
3313            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3314        );
3315
3316        ensure_compatible_write(
3317            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3318            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3319            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3320        );
3321
3322        ensure_compatible_write(
3323            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3324            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3325            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3326        );
3327
3328        // check compatibility for list types
3329
3330        let list_field_metadata = HashMap::from_iter(vec![(
3331            PARQUET_FIELD_ID_META_KEY.to_string(),
3332            "1".to_string(),
3333        )]);
3334        let list_field = Field::new_list_field(DataType::Int32, false);
3335
3336        let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
3337        let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
3338
3339        let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
3340        let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
3341
3342        let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
3343        let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
3344
3345        ensure_compatible_write(
3346            // when the initial schema has the metadata ...
3347            ListArray::try_new(
3348                Arc::new(
3349                    list_field
3350                        .clone()
3351                        .with_metadata(list_field_metadata.clone()),
3352                ),
3353                offsets1,
3354                values1,
3355                None,
3356            )
3357            .unwrap(),
3358            // ... and some intermediate schema doesn't have the metadata
3359            ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
3360            // ... the write will still go through, and the resulting schema will inherit the initial metadata
3361            ListArray::try_new(
3362                Arc::new(
3363                    list_field
3364                        .clone()
3365                        .with_metadata(list_field_metadata.clone()),
3366                ),
3367                offsets_expected,
3368                values_expected,
3369                None,
3370            )
3371            .unwrap(),
3372        );
3373    }
3374
3375    #[test]
3376    fn arrow_writer_primitive_dictionary() {
3377        // define schema
3378        #[allow(deprecated)]
3379        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3380            "dictionary",
3381            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3382            true,
3383            42,
3384            true,
3385        )]));
3386
3387        // create some data
3388        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3389        builder.append(12345678).unwrap();
3390        builder.append_null();
3391        builder.append(22345678).unwrap();
3392        builder.append(12345678).unwrap();
3393        let d = builder.finish();
3394
3395        one_column_roundtrip_with_schema(Arc::new(d), schema);
3396    }
3397
3398    #[test]
3399    fn arrow_writer_decimal32_dictionary() {
3400        let integers = vec![12345, 56789, 34567];
3401
3402        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3403
3404        let values = Decimal32Array::from(integers.clone())
3405            .with_precision_and_scale(5, 2)
3406            .unwrap();
3407
3408        let array = DictionaryArray::new(keys, Arc::new(values));
3409        one_column_roundtrip(Arc::new(array.clone()), true);
3410
3411        let values = Decimal32Array::from(integers)
3412            .with_precision_and_scale(9, 2)
3413            .unwrap();
3414
3415        let array = array.with_values(Arc::new(values));
3416        one_column_roundtrip(Arc::new(array), true);
3417    }
3418
3419    #[test]
3420    fn arrow_writer_decimal64_dictionary() {
3421        let integers = vec![12345, 56789, 34567];
3422
3423        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3424
3425        let values = Decimal64Array::from(integers.clone())
3426            .with_precision_and_scale(5, 2)
3427            .unwrap();
3428
3429        let array = DictionaryArray::new(keys, Arc::new(values));
3430        one_column_roundtrip(Arc::new(array.clone()), true);
3431
3432        let values = Decimal64Array::from(integers)
3433            .with_precision_and_scale(12, 2)
3434            .unwrap();
3435
3436        let array = array.with_values(Arc::new(values));
3437        one_column_roundtrip(Arc::new(array), true);
3438    }
3439
3440    #[test]
3441    fn arrow_writer_decimal128_dictionary() {
3442        let integers = vec![12345, 56789, 34567];
3443
3444        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3445
3446        let values = Decimal128Array::from(integers.clone())
3447            .with_precision_and_scale(5, 2)
3448            .unwrap();
3449
3450        let array = DictionaryArray::new(keys, Arc::new(values));
3451        one_column_roundtrip(Arc::new(array.clone()), true);
3452
3453        let values = Decimal128Array::from(integers)
3454            .with_precision_and_scale(12, 2)
3455            .unwrap();
3456
3457        let array = array.with_values(Arc::new(values));
3458        one_column_roundtrip(Arc::new(array), true);
3459    }
3460
3461    #[test]
3462    fn arrow_writer_decimal256_dictionary() {
3463        let integers = vec![
3464            i256::from_i128(12345),
3465            i256::from_i128(56789),
3466            i256::from_i128(34567),
3467        ];
3468
3469        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3470
3471        let values = Decimal256Array::from(integers.clone())
3472            .with_precision_and_scale(5, 2)
3473            .unwrap();
3474
3475        let array = DictionaryArray::new(keys, Arc::new(values));
3476        one_column_roundtrip(Arc::new(array.clone()), true);
3477
3478        let values = Decimal256Array::from(integers)
3479            .with_precision_and_scale(12, 2)
3480            .unwrap();
3481
3482        let array = array.with_values(Arc::new(values));
3483        one_column_roundtrip(Arc::new(array), true);
3484    }
3485
3486    #[test]
3487    fn arrow_writer_string_dictionary_unsigned_index() {
3488        // define schema
3489        #[allow(deprecated)]
3490        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3491            "dictionary",
3492            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3493            true,
3494            42,
3495            true,
3496        )]));
3497
3498        // create some data
3499        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3500            .iter()
3501            .copied()
3502            .collect();
3503
3504        one_column_roundtrip_with_schema(Arc::new(d), schema);
3505    }
3506
3507    #[test]
3508    fn u32_min_max() {
3509        // check values roundtrip through parquet
3510        let src = [
3511            u32::MIN,
3512            u32::MIN + 1,
3513            (i32::MAX as u32) - 1,
3514            i32::MAX as u32,
3515            (i32::MAX as u32) + 1,
3516            u32::MAX - 1,
3517            u32::MAX,
3518        ];
3519        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3520        let files = one_column_roundtrip(values, false);
3521
3522        for file in files {
3523            // check statistics are valid
3524            let reader = SerializedFileReader::new(file).unwrap();
3525            let metadata = reader.metadata();
3526
3527            let mut row_offset = 0;
3528            for row_group in metadata.row_groups() {
3529                assert_eq!(row_group.num_columns(), 1);
3530                let column = row_group.column(0);
3531
3532                let num_values = column.num_values() as usize;
3533                let src_slice = &src[row_offset..row_offset + num_values];
3534                row_offset += column.num_values() as usize;
3535
3536                let stats = column.statistics().unwrap();
3537                if let Statistics::Int32(stats) = stats {
3538                    assert_eq!(
3539                        *stats.min_opt().unwrap() as u32,
3540                        *src_slice.iter().min().unwrap()
3541                    );
3542                    assert_eq!(
3543                        *stats.max_opt().unwrap() as u32,
3544                        *src_slice.iter().max().unwrap()
3545                    );
3546                } else {
3547                    panic!("Statistics::Int32 missing")
3548                }
3549            }
3550        }
3551    }
3552
3553    #[test]
3554    fn u64_min_max() {
3555        // check values roundtrip through parquet
3556        let src = [
3557            u64::MIN,
3558            u64::MIN + 1,
3559            (i64::MAX as u64) - 1,
3560            i64::MAX as u64,
3561            (i64::MAX as u64) + 1,
3562            u64::MAX - 1,
3563            u64::MAX,
3564        ];
3565        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3566        let files = one_column_roundtrip(values, false);
3567
3568        for file in files {
3569            // check statistics are valid
3570            let reader = SerializedFileReader::new(file).unwrap();
3571            let metadata = reader.metadata();
3572
3573            let mut row_offset = 0;
3574            for row_group in metadata.row_groups() {
3575                assert_eq!(row_group.num_columns(), 1);
3576                let column = row_group.column(0);
3577
3578                let num_values = column.num_values() as usize;
3579                let src_slice = &src[row_offset..row_offset + num_values];
3580                row_offset += column.num_values() as usize;
3581
3582                let stats = column.statistics().unwrap();
3583                if let Statistics::Int64(stats) = stats {
3584                    assert_eq!(
3585                        *stats.min_opt().unwrap() as u64,
3586                        *src_slice.iter().min().unwrap()
3587                    );
3588                    assert_eq!(
3589                        *stats.max_opt().unwrap() as u64,
3590                        *src_slice.iter().max().unwrap()
3591                    );
3592                } else {
3593                    panic!("Statistics::Int64 missing")
3594                }
3595            }
3596        }
3597    }
3598
3599    #[test]
3600    fn statistics_null_counts_only_nulls() {
3601        // check that null-count statistics for "only NULL"-columns are correct
3602        let values = Arc::new(UInt64Array::from(vec![None, None]));
3603        let files = one_column_roundtrip(values, true);
3604
3605        for file in files {
3606            // check statistics are valid
3607            let reader = SerializedFileReader::new(file).unwrap();
3608            let metadata = reader.metadata();
3609            assert_eq!(metadata.num_row_groups(), 1);
3610            let row_group = metadata.row_group(0);
3611            assert_eq!(row_group.num_columns(), 1);
3612            let column = row_group.column(0);
3613            let stats = column.statistics().unwrap();
3614            assert_eq!(stats.null_count_opt(), Some(2));
3615        }
3616    }
3617
3618    #[test]
3619    fn test_list_of_struct_roundtrip() {
3620        // define schema
3621        let int_field = Field::new("a", DataType::Int32, true);
3622        let int_field2 = Field::new("b", DataType::Int32, true);
3623
3624        let int_builder = Int32Builder::with_capacity(10);
3625        let int_builder2 = Int32Builder::with_capacity(10);
3626
3627        let struct_builder = StructBuilder::new(
3628            vec![int_field, int_field2],
3629            vec![Box::new(int_builder), Box::new(int_builder2)],
3630        );
3631        let mut list_builder = ListBuilder::new(struct_builder);
3632
3633        // Construct the following array
3634        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
3635
3636        // [{a: 1, b: 2}]
3637        let values = list_builder.values();
3638        values
3639            .field_builder::<Int32Builder>(0)
3640            .unwrap()
3641            .append_value(1);
3642        values
3643            .field_builder::<Int32Builder>(1)
3644            .unwrap()
3645            .append_value(2);
3646        values.append(true);
3647        list_builder.append(true);
3648
3649        // []
3650        list_builder.append(true);
3651
3652        // null
3653        list_builder.append(false);
3654
3655        // [null, null]
3656        let values = list_builder.values();
3657        values
3658            .field_builder::<Int32Builder>(0)
3659            .unwrap()
3660            .append_null();
3661        values
3662            .field_builder::<Int32Builder>(1)
3663            .unwrap()
3664            .append_null();
3665        values.append(false);
3666        values
3667            .field_builder::<Int32Builder>(0)
3668            .unwrap()
3669            .append_null();
3670        values
3671            .field_builder::<Int32Builder>(1)
3672            .unwrap()
3673            .append_null();
3674        values.append(false);
3675        list_builder.append(true);
3676
3677        // [{a: null, b: 3}]
3678        let values = list_builder.values();
3679        values
3680            .field_builder::<Int32Builder>(0)
3681            .unwrap()
3682            .append_null();
3683        values
3684            .field_builder::<Int32Builder>(1)
3685            .unwrap()
3686            .append_value(3);
3687        values.append(true);
3688        list_builder.append(true);
3689
3690        // [{a: 2, b: null}]
3691        let values = list_builder.values();
3692        values
3693            .field_builder::<Int32Builder>(0)
3694            .unwrap()
3695            .append_value(2);
3696        values
3697            .field_builder::<Int32Builder>(1)
3698            .unwrap()
3699            .append_null();
3700        values.append(true);
3701        list_builder.append(true);
3702
3703        let array = Arc::new(list_builder.finish());
3704
3705        one_column_roundtrip(array, true);
3706    }
3707
3708    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3709        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3710    }
3711
3712    #[test]
3713    fn test_aggregates_records() {
3714        let arrays = [
3715            Int32Array::from((0..100).collect::<Vec<_>>()),
3716            Int32Array::from((0..50).collect::<Vec<_>>()),
3717            Int32Array::from((200..500).collect::<Vec<_>>()),
3718        ];
3719
3720        let schema = Arc::new(Schema::new(vec![Field::new(
3721            "int",
3722            ArrowDataType::Int32,
3723            false,
3724        )]));
3725
3726        let file = tempfile::tempfile().unwrap();
3727
3728        let props = WriterProperties::builder()
3729            .set_max_row_group_size(200)
3730            .build();
3731
3732        let mut writer =
3733            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3734
3735        for array in arrays {
3736            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3737            writer.write(&batch).unwrap();
3738        }
3739
3740        writer.close().unwrap();
3741
3742        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3743        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3744
3745        let batches = builder
3746            .with_batch_size(100)
3747            .build()
3748            .unwrap()
3749            .collect::<ArrowResult<Vec<_>>>()
3750            .unwrap();
3751
3752        assert_eq!(batches.len(), 5);
3753        assert!(batches.iter().all(|x| x.num_columns() == 1));
3754
3755        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3756
3757        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3758
3759        let values: Vec<_> = batches
3760            .iter()
3761            .flat_map(|x| {
3762                x.column(0)
3763                    .as_any()
3764                    .downcast_ref::<Int32Array>()
3765                    .unwrap()
3766                    .values()
3767                    .iter()
3768                    .cloned()
3769            })
3770            .collect();
3771
3772        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3773        assert_eq!(&values, &expected_values)
3774    }
3775
3776    #[test]
3777    fn complex_aggregate() {
3778        // Tests aggregating nested data
3779        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3780        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3781        let struct_a = Arc::new(Field::new(
3782            "struct_a",
3783            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3784            true,
3785        ));
3786
3787        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3788        let struct_b = Arc::new(Field::new(
3789            "struct_b",
3790            DataType::Struct(vec![list_a.clone()].into()),
3791            false,
3792        ));
3793
3794        let schema = Arc::new(Schema::new(vec![struct_b]));
3795
3796        // create nested data
3797        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3798        let field_b_array =
3799            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3800
3801        let struct_a_array = StructArray::from(vec![
3802            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3803            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3804        ]);
3805
3806        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3807            .len(5)
3808            .add_buffer(Buffer::from_iter(vec![
3809                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3810            ]))
3811            .null_bit_buffer(Some(Buffer::from_iter(vec![
3812                true, false, true, false, true,
3813            ])))
3814            .child_data(vec![struct_a_array.into_data()])
3815            .build()
3816            .unwrap();
3817
3818        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3819        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3820
3821        let batch1 =
3822            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3823                .unwrap();
3824
3825        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3826        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3827
3828        let struct_a_array = StructArray::from(vec![
3829            (field_a, Arc::new(field_a_array) as ArrayRef),
3830            (field_b, Arc::new(field_b_array) as ArrayRef),
3831        ]);
3832
3833        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3834            .len(2)
3835            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3836            .child_data(vec![struct_a_array.into_data()])
3837            .build()
3838            .unwrap();
3839
3840        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3841        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3842
3843        let batch2 =
3844            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3845                .unwrap();
3846
3847        let batches = &[batch1, batch2];
3848
3849        // Verify data is as expected
3850
3851        let expected = r#"
3852            +-------------------------------------------------------------------------------------------------------+
3853            | struct_b                                                                                              |
3854            +-------------------------------------------------------------------------------------------------------+
3855            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
3856            | {list: }                                                                                              |
3857            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
3858            | {list: }                                                                                              |
3859            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
3860            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3861            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
3862            +-------------------------------------------------------------------------------------------------------+
3863        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3864
3865        let actual = pretty_format_batches(batches).unwrap().to_string();
3866        assert_eq!(actual, expected);
3867
3868        // Write data
3869        let file = tempfile::tempfile().unwrap();
3870        let props = WriterProperties::builder()
3871            .set_max_row_group_size(6)
3872            .build();
3873
3874        let mut writer =
3875            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3876
3877        for batch in batches {
3878            writer.write(batch).unwrap();
3879        }
3880        writer.close().unwrap();
3881
3882        // Read Data
3883        // Should have written entire first batch and first row of second to the first row group
3884        // leaving a single row in the second row group
3885
3886        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3887        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3888
3889        let batches = builder
3890            .with_batch_size(2)
3891            .build()
3892            .unwrap()
3893            .collect::<ArrowResult<Vec<_>>>()
3894            .unwrap();
3895
3896        assert_eq!(batches.len(), 4);
3897        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3898        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3899
3900        let actual = pretty_format_batches(&batches).unwrap().to_string();
3901        assert_eq!(actual, expected);
3902    }
3903
3904    #[test]
3905    fn test_arrow_writer_metadata() {
3906        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3907        let file_schema = batch_schema.clone().with_metadata(
3908            vec![("foo".to_string(), "bar".to_string())]
3909                .into_iter()
3910                .collect(),
3911        );
3912
3913        let batch = RecordBatch::try_new(
3914            Arc::new(batch_schema),
3915            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3916        )
3917        .unwrap();
3918
3919        let mut buf = Vec::with_capacity(1024);
3920        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3921        writer.write(&batch).unwrap();
3922        writer.close().unwrap();
3923    }
3924
3925    #[test]
3926    fn test_arrow_writer_nullable() {
3927        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3928        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3929        let file_schema = Arc::new(file_schema);
3930
3931        let batch = RecordBatch::try_new(
3932            Arc::new(batch_schema),
3933            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3934        )
3935        .unwrap();
3936
3937        let mut buf = Vec::with_capacity(1024);
3938        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3939        writer.write(&batch).unwrap();
3940        writer.close().unwrap();
3941
3942        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3943        let back = read.next().unwrap().unwrap();
3944        assert_eq!(back.schema(), file_schema);
3945        assert_ne!(back.schema(), batch.schema());
3946        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3947    }
3948
3949    #[test]
3950    fn in_progress_accounting() {
3951        // define schema
3952        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3953
3954        // create some data
3955        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3956
3957        // build a record batch
3958        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3959
3960        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3961
3962        // starts empty
3963        assert_eq!(writer.in_progress_size(), 0);
3964        assert_eq!(writer.in_progress_rows(), 0);
3965        assert_eq!(writer.memory_size(), 0);
3966        assert_eq!(writer.bytes_written(), 4); // Initial header
3967        writer.write(&batch).unwrap();
3968
3969        // updated on write
3970        let initial_size = writer.in_progress_size();
3971        assert!(initial_size > 0);
3972        assert_eq!(writer.in_progress_rows(), 5);
3973        let initial_memory = writer.memory_size();
3974        assert!(initial_memory > 0);
3975        // memory estimate is larger than estimated encoded size
3976        assert!(
3977            initial_size <= initial_memory,
3978            "{initial_size} <= {initial_memory}"
3979        );
3980
3981        // updated on second write
3982        writer.write(&batch).unwrap();
3983        assert!(writer.in_progress_size() > initial_size);
3984        assert_eq!(writer.in_progress_rows(), 10);
3985        assert!(writer.memory_size() > initial_memory);
3986        assert!(
3987            writer.in_progress_size() <= writer.memory_size(),
3988            "in_progress_size {} <= memory_size {}",
3989            writer.in_progress_size(),
3990            writer.memory_size()
3991        );
3992
3993        // in progress tracking is cleared, but the overall data written is updated
3994        let pre_flush_bytes_written = writer.bytes_written();
3995        writer.flush().unwrap();
3996        assert_eq!(writer.in_progress_size(), 0);
3997        assert_eq!(writer.memory_size(), 0);
3998        assert!(writer.bytes_written() > pre_flush_bytes_written);
3999
4000        writer.close().unwrap();
4001    }
4002
4003    #[test]
4004    fn test_writer_all_null() {
4005        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4006        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4007        let batch = RecordBatch::try_from_iter(vec![
4008            ("a", Arc::new(a) as ArrayRef),
4009            ("b", Arc::new(b) as ArrayRef),
4010        ])
4011        .unwrap();
4012
4013        let mut buf = Vec::with_capacity(1024);
4014        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4015        writer.write(&batch).unwrap();
4016        writer.close().unwrap();
4017
4018        let bytes = Bytes::from(buf);
4019        let options = ReadOptionsBuilder::new().with_page_index().build();
4020        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4021        let index = reader.metadata().offset_index().unwrap();
4022
4023        assert_eq!(index.len(), 1);
4024        assert_eq!(index[0].len(), 2); // 2 columns
4025        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
4026        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
4027    }
4028
4029    #[test]
4030    fn test_disabled_statistics_with_page() {
4031        let file_schema = Schema::new(vec![
4032            Field::new("a", DataType::Utf8, true),
4033            Field::new("b", DataType::Utf8, true),
4034        ]);
4035        let file_schema = Arc::new(file_schema);
4036
4037        let batch = RecordBatch::try_new(
4038            file_schema.clone(),
4039            vec![
4040                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4041                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4042            ],
4043        )
4044        .unwrap();
4045
4046        let props = WriterProperties::builder()
4047            .set_statistics_enabled(EnabledStatistics::None)
4048            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4049            .build();
4050
4051        let mut buf = Vec::with_capacity(1024);
4052        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4053        writer.write(&batch).unwrap();
4054
4055        let metadata = writer.close().unwrap();
4056        assert_eq!(metadata.num_row_groups(), 1);
4057        let row_group = metadata.row_group(0);
4058        assert_eq!(row_group.num_columns(), 2);
4059        // Column "a" has both offset and column index, as requested
4060        assert!(row_group.column(0).offset_index_offset().is_some());
4061        assert!(row_group.column(0).column_index_offset().is_some());
4062        // Column "b" should only have offset index
4063        assert!(row_group.column(1).offset_index_offset().is_some());
4064        assert!(row_group.column(1).column_index_offset().is_none());
4065
4066        let options = ReadOptionsBuilder::new().with_page_index().build();
4067        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4068
4069        let row_group = reader.get_row_group(0).unwrap();
4070        let a_col = row_group.metadata().column(0);
4071        let b_col = row_group.metadata().column(1);
4072
4073        // Column chunk of column "a" should have chunk level statistics
4074        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4075            let min = byte_array_stats.min_opt().unwrap();
4076            let max = byte_array_stats.max_opt().unwrap();
4077
4078            assert_eq!(min.as_bytes(), b"a");
4079            assert_eq!(max.as_bytes(), b"d");
4080        } else {
4081            panic!("expecting Statistics::ByteArray");
4082        }
4083
4084        // The column chunk for column "b" shouldn't have statistics
4085        assert!(b_col.statistics().is_none());
4086
4087        let offset_index = reader.metadata().offset_index().unwrap();
4088        assert_eq!(offset_index.len(), 1); // 1 row group
4089        assert_eq!(offset_index[0].len(), 2); // 2 columns
4090
4091        let column_index = reader.metadata().column_index().unwrap();
4092        assert_eq!(column_index.len(), 1); // 1 row group
4093        assert_eq!(column_index[0].len(), 2); // 2 columns
4094
4095        let a_idx = &column_index[0][0];
4096        assert!(
4097            matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4098            "{a_idx:?}"
4099        );
4100        let b_idx = &column_index[0][1];
4101        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4102    }
4103
4104    #[test]
4105    fn test_disabled_statistics_with_chunk() {
4106        let file_schema = Schema::new(vec![
4107            Field::new("a", DataType::Utf8, true),
4108            Field::new("b", DataType::Utf8, true),
4109        ]);
4110        let file_schema = Arc::new(file_schema);
4111
4112        let batch = RecordBatch::try_new(
4113            file_schema.clone(),
4114            vec![
4115                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4116                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4117            ],
4118        )
4119        .unwrap();
4120
4121        let props = WriterProperties::builder()
4122            .set_statistics_enabled(EnabledStatistics::None)
4123            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4124            .build();
4125
4126        let mut buf = Vec::with_capacity(1024);
4127        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4128        writer.write(&batch).unwrap();
4129
4130        let metadata = writer.close().unwrap();
4131        assert_eq!(metadata.num_row_groups(), 1);
4132        let row_group = metadata.row_group(0);
4133        assert_eq!(row_group.num_columns(), 2);
4134        // Column "a" should only have offset index
4135        assert!(row_group.column(0).offset_index_offset().is_some());
4136        assert!(row_group.column(0).column_index_offset().is_none());
4137        // Column "b" should only have offset index
4138        assert!(row_group.column(1).offset_index_offset().is_some());
4139        assert!(row_group.column(1).column_index_offset().is_none());
4140
4141        let options = ReadOptionsBuilder::new().with_page_index().build();
4142        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4143
4144        let row_group = reader.get_row_group(0).unwrap();
4145        let a_col = row_group.metadata().column(0);
4146        let b_col = row_group.metadata().column(1);
4147
4148        // Column chunk of column "a" should have chunk level statistics
4149        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4150            let min = byte_array_stats.min_opt().unwrap();
4151            let max = byte_array_stats.max_opt().unwrap();
4152
4153            assert_eq!(min.as_bytes(), b"a");
4154            assert_eq!(max.as_bytes(), b"d");
4155        } else {
4156            panic!("expecting Statistics::ByteArray");
4157        }
4158
4159        // The column chunk for column "b"  shouldn't have statistics
4160        assert!(b_col.statistics().is_none());
4161
4162        let column_index = reader.metadata().column_index().unwrap();
4163        assert_eq!(column_index.len(), 1); // 1 row group
4164        assert_eq!(column_index[0].len(), 2); // 2 columns
4165
4166        let a_idx = &column_index[0][0];
4167        assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4168        let b_idx = &column_index[0][1];
4169        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4170    }
4171
4172    #[test]
4173    fn test_arrow_writer_skip_metadata() {
4174        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4175        let file_schema = Arc::new(batch_schema.clone());
4176
4177        let batch = RecordBatch::try_new(
4178            Arc::new(batch_schema),
4179            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4180        )
4181        .unwrap();
4182        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4183
4184        let mut buf = Vec::with_capacity(1024);
4185        let mut writer =
4186            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4187        writer.write(&batch).unwrap();
4188        writer.close().unwrap();
4189
4190        let bytes = Bytes::from(buf);
4191        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4192        assert_eq!(file_schema, *reader_builder.schema());
4193        if let Some(key_value_metadata) = reader_builder
4194            .metadata()
4195            .file_metadata()
4196            .key_value_metadata()
4197        {
4198            assert!(
4199                !key_value_metadata
4200                    .iter()
4201                    .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4202            );
4203        }
4204    }
4205
4206    #[test]
4207    fn test_arrow_writer_explicit_schema() {
4208        // Write an int32 array using explicit int64 storage
4209        let batch_schema = Arc::new(Schema::new(vec![Field::new(
4210            "integers",
4211            DataType::Int32,
4212            true,
4213        )]));
4214        let parquet_schema = Type::group_type_builder("root")
4215            .with_fields(vec![
4216                Type::primitive_type_builder("integers", crate::basic::Type::INT64)
4217                    .build()
4218                    .unwrap()
4219                    .into(),
4220            ])
4221            .build()
4222            .unwrap();
4223        let parquet_schema_descr = SchemaDescriptor::new(parquet_schema.into());
4224
4225        let batch = RecordBatch::try_new(
4226            batch_schema.clone(),
4227            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4228        )
4229        .unwrap();
4230
4231        let explicit_schema_options =
4232            ArrowWriterOptions::new().with_parquet_schema(parquet_schema_descr);
4233        let mut buf = Vec::with_capacity(1024);
4234        let mut writer = ArrowWriter::try_new_with_options(
4235            &mut buf,
4236            batch_schema.clone(),
4237            explicit_schema_options,
4238        )
4239        .unwrap();
4240        writer.write(&batch).unwrap();
4241        writer.close().unwrap();
4242
4243        let bytes = Bytes::from(buf);
4244        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4245
4246        let expected_schema = Arc::new(Schema::new(vec![Field::new(
4247            "integers",
4248            DataType::Int64,
4249            true,
4250        )]));
4251        assert_eq!(reader_builder.schema(), &expected_schema);
4252
4253        let batches = reader_builder
4254            .build()
4255            .unwrap()
4256            .collect::<Result<Vec<_>, ArrowError>>()
4257            .unwrap();
4258        assert_eq!(batches.len(), 1);
4259
4260        let expected_batch = RecordBatch::try_new(
4261            expected_schema.clone(),
4262            vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as _],
4263        )
4264        .unwrap();
4265        assert_eq!(batches[0], expected_batch);
4266    }
4267
4268    #[test]
4269    fn mismatched_schemas() {
4270        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4271        let file_schema = Arc::new(Schema::new(vec![Field::new(
4272            "temperature",
4273            DataType::Float64,
4274            false,
4275        )]));
4276
4277        let batch = RecordBatch::try_new(
4278            Arc::new(batch_schema),
4279            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4280        )
4281        .unwrap();
4282
4283        let mut buf = Vec::with_capacity(1024);
4284        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4285
4286        let err = writer.write(&batch).unwrap_err().to_string();
4287        assert_eq!(
4288            err,
4289            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4290        );
4291    }
4292
4293    #[test]
4294    // https://github.com/apache/arrow-rs/issues/6988
4295    fn test_roundtrip_empty_schema() {
4296        // create empty record batch with empty schema
4297        let empty_batch = RecordBatch::try_new_with_options(
4298            Arc::new(Schema::empty()),
4299            vec![],
4300            &RecordBatchOptions::default().with_row_count(Some(0)),
4301        )
4302        .unwrap();
4303
4304        // write to parquet
4305        let mut parquet_bytes: Vec<u8> = Vec::new();
4306        let mut writer =
4307            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4308        writer.write(&empty_batch).unwrap();
4309        writer.close().unwrap();
4310
4311        // read from parquet
4312        let bytes = Bytes::from(parquet_bytes);
4313        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4314        assert_eq!(reader.schema(), &empty_batch.schema());
4315        let batches: Vec<_> = reader
4316            .build()
4317            .unwrap()
4318            .collect::<ArrowResult<Vec<_>>>()
4319            .unwrap();
4320        assert_eq!(batches.len(), 0);
4321    }
4322
4323    #[test]
4324    fn test_page_stats_not_written_by_default() {
4325        let string_field = Field::new("a", DataType::Utf8, false);
4326        let schema = Schema::new(vec![string_field]);
4327        let raw_string_values = vec!["Blart Versenwald III"];
4328        let string_values = StringArray::from(raw_string_values.clone());
4329        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4330
4331        let props = WriterProperties::builder()
4332            .set_statistics_enabled(EnabledStatistics::Page)
4333            .set_dictionary_enabled(false)
4334            .set_encoding(Encoding::PLAIN)
4335            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4336            .build();
4337
4338        let file = roundtrip_opts(&batch, props);
4339
4340        // read file and decode page headers
4341        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4342
4343        // decode first page header
4344        let first_page = &file[4..];
4345        let mut prot = ThriftSliceInputProtocol::new(first_page);
4346        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4347        let stats = hdr.data_page_header.unwrap().statistics;
4348
4349        assert!(stats.is_none());
4350    }
4351
4352    #[test]
4353    fn test_page_stats_when_enabled() {
4354        let string_field = Field::new("a", DataType::Utf8, false);
4355        let schema = Schema::new(vec![string_field]);
4356        let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4357        let string_values = StringArray::from(raw_string_values.clone());
4358        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4359
4360        let props = WriterProperties::builder()
4361            .set_statistics_enabled(EnabledStatistics::Page)
4362            .set_dictionary_enabled(false)
4363            .set_encoding(Encoding::PLAIN)
4364            .set_write_page_header_statistics(true)
4365            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4366            .build();
4367
4368        let file = roundtrip_opts(&batch, props);
4369
4370        // read file and decode page headers
4371        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4372
4373        // decode first page header
4374        let first_page = &file[4..];
4375        let mut prot = ThriftSliceInputProtocol::new(first_page);
4376        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4377        let stats = hdr.data_page_header.unwrap().statistics;
4378
4379        let stats = stats.unwrap();
4380        // check that min/max were actually written to the page
4381        assert!(stats.is_max_value_exact.unwrap());
4382        assert!(stats.is_min_value_exact.unwrap());
4383        assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4384        assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4385    }
4386
4387    #[test]
4388    fn test_page_stats_truncation() {
4389        let string_field = Field::new("a", DataType::Utf8, false);
4390        let binary_field = Field::new("b", DataType::Binary, false);
4391        let schema = Schema::new(vec![string_field, binary_field]);
4392
4393        let raw_string_values = vec!["Blart Versenwald III"];
4394        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4395        let raw_binary_value_refs = raw_binary_values
4396            .iter()
4397            .map(|x| x.as_slice())
4398            .collect::<Vec<_>>();
4399
4400        let string_values = StringArray::from(raw_string_values.clone());
4401        let binary_values = BinaryArray::from(raw_binary_value_refs);
4402        let batch = RecordBatch::try_new(
4403            Arc::new(schema),
4404            vec![Arc::new(string_values), Arc::new(binary_values)],
4405        )
4406        .unwrap();
4407
4408        let props = WriterProperties::builder()
4409            .set_statistics_truncate_length(Some(2))
4410            .set_dictionary_enabled(false)
4411            .set_encoding(Encoding::PLAIN)
4412            .set_write_page_header_statistics(true)
4413            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4414            .build();
4415
4416        let file = roundtrip_opts(&batch, props);
4417
4418        // read file and decode page headers
4419        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4420
4421        // decode first page header
4422        let first_page = &file[4..];
4423        let mut prot = ThriftSliceInputProtocol::new(first_page);
4424        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4425        let stats = hdr.data_page_header.unwrap().statistics;
4426        assert!(stats.is_some());
4427        let stats = stats.unwrap();
4428        // check that min/max were properly truncated
4429        assert!(!stats.is_max_value_exact.unwrap());
4430        assert!(!stats.is_min_value_exact.unwrap());
4431        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4432        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4433
4434        // check second page now
4435        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4436        let mut prot = ThriftSliceInputProtocol::new(second_page);
4437        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4438        let stats = hdr.data_page_header.unwrap().statistics;
4439        assert!(stats.is_some());
4440        let stats = stats.unwrap();
4441        // check that min/max were properly truncated
4442        assert!(!stats.is_max_value_exact.unwrap());
4443        assert!(!stats.is_min_value_exact.unwrap());
4444        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4445        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4446    }
4447
4448    #[test]
4449    fn test_page_encoding_statistics_roundtrip() {
4450        let batch_schema = Schema::new(vec![Field::new(
4451            "int32",
4452            arrow_schema::DataType::Int32,
4453            false,
4454        )]);
4455
4456        let batch = RecordBatch::try_new(
4457            Arc::new(batch_schema.clone()),
4458            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4459        )
4460        .unwrap();
4461
4462        let mut file: File = tempfile::tempfile().unwrap();
4463        let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4464        writer.write(&batch).unwrap();
4465        let file_metadata = writer.close().unwrap();
4466
4467        assert_eq!(file_metadata.num_row_groups(), 1);
4468        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
4469        assert!(
4470            file_metadata
4471                .row_group(0)
4472                .column(0)
4473                .page_encoding_stats()
4474                .is_some()
4475        );
4476        let chunk_page_stats = file_metadata
4477            .row_group(0)
4478            .column(0)
4479            .page_encoding_stats()
4480            .unwrap();
4481
4482        // check that the read metadata is also correct
4483        let options = ReadOptionsBuilder::new().with_page_index().build();
4484        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4485
4486        let rowgroup = reader.get_row_group(0).expect("row group missing");
4487        assert_eq!(rowgroup.num_columns(), 1);
4488        let column = rowgroup.metadata().column(0);
4489        assert!(column.page_encoding_stats().is_some());
4490        let file_page_stats = column.page_encoding_stats().unwrap();
4491        assert_eq!(chunk_page_stats, file_page_stats);
4492    }
4493
4494    #[test]
4495    fn test_different_dict_page_size_limit() {
4496        let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4497        let schema = Arc::new(Schema::new(vec![
4498            Field::new("col0", arrow_schema::DataType::Int64, false),
4499            Field::new("col1", arrow_schema::DataType::Int64, false),
4500        ]));
4501        let batch =
4502            arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4503
4504        let props = WriterProperties::builder()
4505            .set_dictionary_page_size_limit(1024 * 1024)
4506            .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4507            .build();
4508        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4509        writer.write(&batch).unwrap();
4510        let data = Bytes::from(writer.into_inner().unwrap());
4511
4512        let mut metadata = ParquetMetaDataReader::new();
4513        metadata.try_parse(&data).unwrap();
4514        let metadata = metadata.finish().unwrap();
4515        let col0_meta = metadata.row_group(0).column(0);
4516        let col1_meta = metadata.row_group(0).column(1);
4517
4518        let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4519            let mut reader =
4520                SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4521            let page = reader.get_next_page().unwrap().unwrap();
4522            match page {
4523                Page::DictionaryPage { buf, .. } => buf.len(),
4524                _ => panic!("expected DictionaryPage"),
4525            }
4526        };
4527
4528        assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4529        assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4530    }
4531}