parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{
34    add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
35    arrow_to_parquet_schema_with_root, decimal_length_from_precision,
36};
37
38use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
39use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
40use crate::column::writer::encoder::ColumnValueEncoder;
41use crate::column::writer::{
42    get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
43};
44use crate::data_type::{ByteArray, FixedLenByteArray};
45use crate::errors::{ParquetError, Result};
46use crate::file::metadata::{KeyValue, RowGroupMetaData};
47use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
48use crate::file::reader::{ChunkReader, Length};
49use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
50use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
51use crate::thrift::TSerializable;
52use levels::{calculate_array_levels, ArrayLevels};
53
54mod byte_array;
55mod levels;
56
57/// Encodes [`RecordBatch`] to parquet
58///
59/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
60/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
61/// flushed on close, leading the final row group in the output file to potentially
62/// contain fewer than `max_row_group_size` rows
63///
64/// ```
65/// # use std::sync::Arc;
66/// # use bytes::Bytes;
67/// # use arrow_array::{ArrayRef, Int64Array};
68/// # use arrow_array::RecordBatch;
69/// # use parquet::arrow::arrow_writer::ArrowWriter;
70/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
71/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
72/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
73///
74/// let mut buffer = Vec::new();
75/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
76/// writer.write(&to_write).unwrap();
77/// writer.close().unwrap();
78///
79/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
80/// let read = reader.next().unwrap().unwrap();
81///
82/// assert_eq!(to_write, read);
83/// ```
84///
85/// ## Memory Limiting
86///
87/// The nature of parquet forces buffering of an entire row group before it can
88/// be flushed to the underlying writer. Data is mostly buffered in its encoded
89/// form, reducing memory usage. However, some data such as dictionary keys or
90/// large strings or very nested data may still result in non-trivial memory
91/// usage.
92///
93/// See Also:
94/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
95/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
96///
97/// Call [`Self::flush`] to trigger an early flush of a row group based on a
98/// memory threshold and/or global memory pressure. However,  smaller row groups
99/// result in higher metadata overheads, and thus may worsen compression ratios
100/// and query performance.
101///
102/// ```no_run
103/// # use std::io::Write;
104/// # use arrow_array::RecordBatch;
105/// # use parquet::arrow::ArrowWriter;
106/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
107/// # let batch: RecordBatch = todo!();
108/// writer.write(&batch).unwrap();
109/// // Trigger an early flush if anticipated size exceeds 1_000_000
110/// if writer.in_progress_size() > 1_000_000 {
111///     writer.flush().unwrap();
112/// }
113/// ```
114///
115/// ## Type Support
116///
117/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
118/// Parquet types including  [`StructArray`] and [`ListArray`].
119///
120/// The following are not supported:
121///
122/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
123///
124/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
125/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
126/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
127/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
128/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
129pub struct ArrowWriter<W: Write> {
130    /// Underlying Parquet writer
131    writer: SerializedFileWriter<W>,
132
133    /// The in-progress row group if any
134    in_progress: Option<ArrowRowGroupWriter>,
135
136    /// A copy of the Arrow schema.
137    ///
138    /// The schema is used to verify that each record batch written has the correct schema
139    arrow_schema: SchemaRef,
140
141    /// The length of arrays to write to each row group
142    max_row_group_size: usize,
143}
144
145impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        let buffered_memory = self.in_progress_size();
148        f.debug_struct("ArrowWriter")
149            .field("writer", &self.writer)
150            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
151            .field("in_progress_rows", &self.in_progress_rows())
152            .field("arrow_schema", &self.arrow_schema)
153            .field("max_row_group_size", &self.max_row_group_size)
154            .finish()
155    }
156}
157
158impl<W: Write + Send> ArrowWriter<W> {
159    /// Try to create a new Arrow writer
160    ///
161    /// The writer will fail if:
162    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
163    ///  * the Arrow schema contains unsupported datatypes such as Unions
164    pub fn try_new(
165        writer: W,
166        arrow_schema: SchemaRef,
167        props: Option<WriterProperties>,
168    ) -> Result<Self> {
169        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
170        Self::try_new_with_options(writer, arrow_schema, options)
171    }
172
173    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
174    ///
175    /// The writer will fail if:
176    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
177    ///  * the Arrow schema contains unsupported datatypes such as Unions
178    pub fn try_new_with_options(
179        writer: W,
180        arrow_schema: SchemaRef,
181        options: ArrowWriterOptions,
182    ) -> Result<Self> {
183        let schema = match options.schema_root {
184            Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s)?,
185            None => arrow_to_parquet_schema(&arrow_schema)?,
186        };
187        let mut props = options.properties;
188        if !options.skip_arrow_metadata {
189            // add serialized arrow schema
190            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
191        }
192
193        let max_row_group_size = props.max_row_group_size();
194
195        let file_writer =
196            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
197
198        Ok(Self {
199            writer: file_writer,
200            in_progress: None,
201            arrow_schema,
202            max_row_group_size,
203        })
204    }
205
206    /// Returns metadata for any flushed row groups
207    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
208        self.writer.flushed_row_groups()
209    }
210
211    /// Estimated memory usage, in bytes, of this `ArrowWriter`
212    ///
213    /// This estimate is formed bu summing the values of
214    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
215    pub fn memory_size(&self) -> usize {
216        match &self.in_progress {
217            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
218            None => 0,
219        }
220    }
221
222    /// Anticipated encoded size of the in progress row group.
223    ///
224    /// This estimate the row group size after being completely encoded is,
225    /// formed by summing the values of
226    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
227    /// columns.
228    pub fn in_progress_size(&self) -> usize {
229        match &self.in_progress {
230            Some(in_progress) => in_progress
231                .writers
232                .iter()
233                .map(|x| x.get_estimated_total_bytes())
234                .sum(),
235            None => 0,
236        }
237    }
238
239    /// Returns the number of rows buffered in the in progress row group
240    pub fn in_progress_rows(&self) -> usize {
241        self.in_progress
242            .as_ref()
243            .map(|x| x.buffered_rows)
244            .unwrap_or_default()
245    }
246
247    /// Returns the number of bytes written by this instance
248    pub fn bytes_written(&self) -> usize {
249        self.writer.bytes_written()
250    }
251
252    /// Encodes the provided [`RecordBatch`]
253    ///
254    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
255    /// rows, the contents of `batch` will be written to one or more row groups such that all but
256    /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows.
257    ///
258    /// This will fail if the `batch`'s schema does not match the writer's schema.
259    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
260        if batch.num_rows() == 0 {
261            return Ok(());
262        }
263
264        let in_progress = match &mut self.in_progress {
265            Some(in_progress) => in_progress,
266            x => x.insert(ArrowRowGroupWriter::new(
267                self.writer.schema_descr(),
268                self.writer.properties(),
269                &self.arrow_schema,
270            )?),
271        };
272
273        // If would exceed max_row_group_size, split batch
274        if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
275            let to_write = self.max_row_group_size - in_progress.buffered_rows;
276            let a = batch.slice(0, to_write);
277            let b = batch.slice(to_write, batch.num_rows() - to_write);
278            self.write(&a)?;
279            return self.write(&b);
280        }
281
282        in_progress.write(batch)?;
283
284        if in_progress.buffered_rows >= self.max_row_group_size {
285            self.flush()?
286        }
287        Ok(())
288    }
289
290    /// Flushes all buffered rows into a new row group
291    pub fn flush(&mut self) -> Result<()> {
292        let in_progress = match self.in_progress.take() {
293            Some(in_progress) => in_progress,
294            None => return Ok(()),
295        };
296
297        let mut row_group_writer = self.writer.next_row_group()?;
298        for chunk in in_progress.close()? {
299            chunk.append_to_row_group(&mut row_group_writer)?;
300        }
301        row_group_writer.close()?;
302        Ok(())
303    }
304
305    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
306    ///
307    /// This method provide a way to append kv_metadata after write RecordBatch
308    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
309        self.writer.append_key_value_metadata(kv_metadata)
310    }
311
312    /// Returns a reference to the underlying writer.
313    pub fn inner(&self) -> &W {
314        self.writer.inner()
315    }
316
317    /// Returns a mutable reference to the underlying writer.
318    ///
319    /// It is inadvisable to directly write to the underlying writer, doing so
320    /// will likely result in a corrupt parquet file
321    pub fn inner_mut(&mut self) -> &mut W {
322        self.writer.inner_mut()
323    }
324
325    /// Flushes any outstanding data and returns the underlying writer.
326    pub fn into_inner(mut self) -> Result<W> {
327        self.flush()?;
328        self.writer.into_inner()
329    }
330
331    /// Close and finalize the underlying Parquet writer
332    ///
333    /// Unlike [`Self::close`] this does not consume self
334    ///
335    /// Attempting to write after calling finish will result in an error
336    pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
337        self.flush()?;
338        self.writer.finish()
339    }
340
341    /// Close and finalize the underlying Parquet writer
342    pub fn close(mut self) -> Result<crate::format::FileMetaData> {
343        self.finish()
344    }
345}
346
347impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
348    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
349        self.write(batch).map_err(|e| e.into())
350    }
351
352    fn close(self) -> std::result::Result<(), ArrowError> {
353        self.close()?;
354        Ok(())
355    }
356}
357
358/// Arrow-specific configuration settings for writing parquet files.
359///
360/// See [`ArrowWriter`] for how to configure the writer.
361#[derive(Debug, Clone, Default)]
362pub struct ArrowWriterOptions {
363    properties: WriterProperties,
364    skip_arrow_metadata: bool,
365    schema_root: Option<String>,
366}
367
368impl ArrowWriterOptions {
369    /// Creates a new [`ArrowWriterOptions`] with the default settings.
370    pub fn new() -> Self {
371        Self::default()
372    }
373
374    /// Sets the [`WriterProperties`] for writing parquet files.
375    pub fn with_properties(self, properties: WriterProperties) -> Self {
376        Self { properties, ..self }
377    }
378
379    /// Skip encoding the embedded arrow metadata (defaults to `false`)
380    ///
381    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
382    /// by default.
383    ///
384    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
385    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
386        Self {
387            skip_arrow_metadata,
388            ..self
389        }
390    }
391
392    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
393    pub fn with_schema_root(self, name: String) -> Self {
394        Self {
395            schema_root: Some(name),
396            ..self
397        }
398    }
399}
400
401/// A single column chunk produced by [`ArrowColumnWriter`]
402#[derive(Default)]
403struct ArrowColumnChunkData {
404    length: usize,
405    data: Vec<Bytes>,
406}
407
408impl Length for ArrowColumnChunkData {
409    fn len(&self) -> u64 {
410        self.length as _
411    }
412}
413
414impl ChunkReader for ArrowColumnChunkData {
415    type T = ArrowColumnChunkReader;
416
417    fn get_read(&self, start: u64) -> Result<Self::T> {
418        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
419        Ok(ArrowColumnChunkReader(
420            self.data.clone().into_iter().peekable(),
421        ))
422    }
423
424    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
425        unimplemented!()
426    }
427}
428
429/// A [`Read`] for [`ArrowColumnChunkData`]
430struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
431
432impl Read for ArrowColumnChunkReader {
433    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
434        let buffer = loop {
435            match self.0.peek_mut() {
436                Some(b) if b.is_empty() => {
437                    self.0.next();
438                    continue;
439                }
440                Some(b) => break b,
441                None => return Ok(0),
442            }
443        };
444
445        let len = buffer.len().min(out.len());
446        let b = buffer.split_to(len);
447        out[..len].copy_from_slice(&b);
448        Ok(len)
449    }
450}
451
452/// A shared [`ArrowColumnChunkData`]
453///
454/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
455/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
456type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
457
458#[derive(Default)]
459struct ArrowPageWriter {
460    buffer: SharedColumnChunk,
461}
462
463impl PageWriter for ArrowPageWriter {
464    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
465        let mut buf = self.buffer.try_lock().unwrap();
466        let page_header = page.to_thrift_header();
467        let header = {
468            let mut header = Vec::with_capacity(1024);
469            let mut protocol = TCompactOutputProtocol::new(&mut header);
470            page_header.write_to_out_protocol(&mut protocol)?;
471            Bytes::from(header)
472        };
473
474        let data = page.compressed_page().buffer().clone();
475        let compressed_size = data.len() + header.len();
476
477        let mut spec = PageWriteSpec::new();
478        spec.page_type = page.page_type();
479        spec.num_values = page.num_values();
480        spec.uncompressed_size = page.uncompressed_size() + header.len();
481        spec.offset = buf.length as u64;
482        spec.compressed_size = compressed_size;
483        spec.bytes_written = compressed_size as u64;
484
485        buf.length += compressed_size;
486        buf.data.push(header);
487        buf.data.push(data);
488
489        Ok(spec)
490    }
491
492    fn close(&mut self) -> Result<()> {
493        Ok(())
494    }
495}
496
497/// A leaf column that can be encoded by [`ArrowColumnWriter`]
498#[derive(Debug)]
499pub struct ArrowLeafColumn(ArrayLevels);
500
501/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
502pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
503    let levels = calculate_array_levels(array, field)?;
504    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
505}
506
507/// The data for a single column chunk, see [`ArrowColumnWriter`]
508pub struct ArrowColumnChunk {
509    data: ArrowColumnChunkData,
510    close: ColumnCloseResult,
511}
512
513impl std::fmt::Debug for ArrowColumnChunk {
514    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
515        f.debug_struct("ArrowColumnChunk")
516            .field("length", &self.data.length)
517            .finish_non_exhaustive()
518    }
519}
520
521impl ArrowColumnChunk {
522    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
523    pub fn append_to_row_group<W: Write + Send>(
524        self,
525        writer: &mut SerializedRowGroupWriter<'_, W>,
526    ) -> Result<()> {
527        writer.append_column(&self.data, self.close)
528    }
529}
530
531/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
532///
533/// Note: This is a low-level interface for applications that require fine-grained control
534/// of encoding, see [`ArrowWriter`] for a higher-level interface
535///
536/// ```
537/// // The arrow schema
538/// # use std::sync::Arc;
539/// # use arrow_array::*;
540/// # use arrow_schema::*;
541/// # use parquet::arrow::arrow_to_parquet_schema;
542/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers};
543/// # use parquet::file::properties::WriterProperties;
544/// # use parquet::file::writer::SerializedFileWriter;
545/// #
546/// let schema = Arc::new(Schema::new(vec![
547///     Field::new("i32", DataType::Int32, false),
548///     Field::new("f32", DataType::Float32, false),
549/// ]));
550///
551/// // Compute the parquet schema
552/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
553/// let props = Arc::new(WriterProperties::default());
554///
555/// // Create writers for each of the leaf columns
556/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
557///
558/// // Spawn a worker thread for each column
559/// // This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better
560/// let mut workers: Vec<_> = col_writers
561///     .into_iter()
562///     .map(|mut col_writer| {
563///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
564///         let handle = std::thread::spawn(move || {
565///             for col in recv {
566///                 col_writer.write(&col)?;
567///             }
568///             col_writer.close()
569///         });
570///         (handle, send)
571///     })
572///     .collect();
573///
574/// // Create parquet writer
575/// let root_schema = parquet_schema.root_schema_ptr();
576/// let mut out = Vec::with_capacity(1024); // This could be a File
577/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone()).unwrap();
578///
579/// // Start row group
580/// let mut row_group = writer.next_row_group().unwrap();
581///
582/// // Columns to encode
583/// let to_write = vec![
584///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
585///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
586/// ];
587///
588/// // Spawn work to encode columns
589/// let mut worker_iter = workers.iter_mut();
590/// for (arr, field) in to_write.iter().zip(&schema.fields) {
591///     for leaves in compute_leaves(field, arr).unwrap() {
592///         worker_iter.next().unwrap().1.send(leaves).unwrap();
593///     }
594/// }
595///
596/// // Finish up parallel column encoding
597/// for (handle, send) in workers {
598///     drop(send); // Drop send side to signal termination
599///     let chunk = handle.join().unwrap().unwrap();
600///     chunk.append_to_row_group(&mut row_group).unwrap();
601/// }
602/// row_group.close().unwrap();
603///
604/// let metadata = writer.close().unwrap();
605/// assert_eq!(metadata.num_rows, 3);
606/// ```
607pub struct ArrowColumnWriter {
608    writer: ArrowColumnWriterImpl,
609    chunk: SharedColumnChunk,
610}
611
612impl std::fmt::Debug for ArrowColumnWriter {
613    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
614        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
615    }
616}
617
618enum ArrowColumnWriterImpl {
619    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
620    Column(ColumnWriter<'static>),
621}
622
623impl ArrowColumnWriter {
624    /// Write an [`ArrowLeafColumn`]
625    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
626        match &mut self.writer {
627            ArrowColumnWriterImpl::Column(c) => {
628                write_leaf(c, &col.0)?;
629            }
630            ArrowColumnWriterImpl::ByteArray(c) => {
631                write_primitive(c, col.0.array().as_ref(), &col.0)?;
632            }
633        }
634        Ok(())
635    }
636
637    /// Close this column returning the written [`ArrowColumnChunk`]
638    pub fn close(self) -> Result<ArrowColumnChunk> {
639        let close = match self.writer {
640            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
641            ArrowColumnWriterImpl::Column(c) => c.close()?,
642        };
643        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
644        let data = chunk.into_inner().unwrap();
645        Ok(ArrowColumnChunk { data, close })
646    }
647
648    /// Returns the estimated total memory usage by the writer.
649    ///
650    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
651    /// of the current memory usage and not it's anticipated encoded size.
652    ///
653    /// This includes:
654    /// 1. Data buffered in encoded form
655    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
656    ///
657    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
658    pub fn memory_size(&self) -> usize {
659        match &self.writer {
660            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
661            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
662        }
663    }
664
665    /// Returns the estimated total encoded bytes for this column writer.
666    ///
667    /// This includes:
668    /// 1. Data buffered in encoded form
669    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
670    ///
671    /// This value should be less than or equal to [`Self::memory_size`]
672    pub fn get_estimated_total_bytes(&self) -> usize {
673        match &self.writer {
674            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
675            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
676        }
677    }
678}
679
680/// Encodes [`RecordBatch`] to a parquet row group
681struct ArrowRowGroupWriter {
682    writers: Vec<ArrowColumnWriter>,
683    schema: SchemaRef,
684    buffered_rows: usize,
685}
686
687impl ArrowRowGroupWriter {
688    fn new(
689        parquet: &SchemaDescriptor,
690        props: &WriterPropertiesPtr,
691        arrow: &SchemaRef,
692    ) -> Result<Self> {
693        let writers = get_column_writers(parquet, props, arrow)?;
694        Ok(Self {
695            writers,
696            schema: arrow.clone(),
697            buffered_rows: 0,
698        })
699    }
700
701    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
702        self.buffered_rows += batch.num_rows();
703        let mut writers = self.writers.iter_mut();
704        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
705            for leaf in compute_leaves(field.as_ref(), column)? {
706                writers.next().unwrap().write(&leaf)?
707            }
708        }
709        Ok(())
710    }
711
712    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
713        self.writers
714            .into_iter()
715            .map(|writer| writer.close())
716            .collect()
717    }
718}
719
720/// Returns the [`ArrowColumnWriter`] for a given schema
721pub fn get_column_writers(
722    parquet: &SchemaDescriptor,
723    props: &WriterPropertiesPtr,
724    arrow: &SchemaRef,
725) -> Result<Vec<ArrowColumnWriter>> {
726    let mut writers = Vec::with_capacity(arrow.fields.len());
727    let mut leaves = parquet.columns().iter();
728    for field in &arrow.fields {
729        get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut writers)?;
730    }
731    Ok(writers)
732}
733
734/// Gets the [`ArrowColumnWriter`] for the given `data_type`
735fn get_arrow_column_writer(
736    data_type: &ArrowDataType,
737    props: &WriterPropertiesPtr,
738    leaves: &mut Iter<'_, ColumnDescPtr>,
739    out: &mut Vec<ArrowColumnWriter>,
740) -> Result<()> {
741    let col = |desc: &ColumnDescPtr| {
742        let page_writer = Box::<ArrowPageWriter>::default();
743        let chunk = page_writer.buffer.clone();
744        let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
745        ArrowColumnWriter {
746            chunk,
747            writer: ArrowColumnWriterImpl::Column(writer),
748        }
749    };
750
751    let bytes = |desc: &ColumnDescPtr| {
752        let page_writer = Box::<ArrowPageWriter>::default();
753        let chunk = page_writer.buffer.clone();
754        let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
755        ArrowColumnWriter {
756            chunk,
757            writer: ArrowColumnWriterImpl::ByteArray(writer),
758        }
759    };
760
761    match data_type {
762        _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())),
763        ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())),
764        ArrowDataType::LargeBinary
765        | ArrowDataType::Binary
766        | ArrowDataType::Utf8
767        | ArrowDataType::LargeUtf8
768        | ArrowDataType::BinaryView
769        | ArrowDataType::Utf8View => {
770            out.push(bytes(leaves.next().unwrap()))
771        }
772        ArrowDataType::List(f)
773        | ArrowDataType::LargeList(f)
774        | ArrowDataType::FixedSizeList(f, _) => {
775            get_arrow_column_writer(f.data_type(), props, leaves, out)?
776        }
777        ArrowDataType::Struct(fields) => {
778            for field in fields {
779                get_arrow_column_writer(field.data_type(), props, leaves, out)?
780            }
781        }
782        ArrowDataType::Map(f, _) => match f.data_type() {
783            ArrowDataType::Struct(f) => {
784                get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
785                get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
786            }
787            _ => unreachable!("invalid map type"),
788        }
789        ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
790            ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
791                out.push(bytes(leaves.next().unwrap()))
792            }
793            ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
794                out.push(bytes(leaves.next().unwrap()))
795            }
796            _ => {
797                out.push(col(leaves.next().unwrap()))
798            }
799        }
800       _ => return Err(ParquetError::NYI(
801           format!(
802               "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
803           )
804       ))
805    }
806    Ok(())
807}
808
809fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
810    let column = levels.array().as_ref();
811    let indices = levels.non_null_indices();
812    match writer {
813        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
814            match column.data_type() {
815                ArrowDataType::Date64 => {
816                    // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
817                    let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
818                    let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
819
820                    let array = array.as_primitive::<Int32Type>();
821                    write_primitive(typed, array.values(), levels)
822                }
823                ArrowDataType::UInt32 => {
824                    let values = column.as_primitive::<UInt32Type>().values();
825                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
826                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
827                    let array = values.inner().typed_data::<i32>();
828                    write_primitive(typed, array, levels)
829                }
830                ArrowDataType::Decimal128(_, _) => {
831                    // use the int32 to represent the decimal with low precision
832                    let array = column
833                        .as_primitive::<Decimal128Type>()
834                        .unary::<_, Int32Type>(|v| v as i32);
835                    write_primitive(typed, array.values(), levels)
836                }
837                ArrowDataType::Decimal256(_, _) => {
838                    // use the int32 to represent the decimal with low precision
839                    let array = column
840                        .as_primitive::<Decimal256Type>()
841                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
842                    write_primitive(typed, array.values(), levels)
843                }
844                _ => {
845                    let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
846                    let array = array.as_primitive::<Int32Type>();
847                    write_primitive(typed, array.values(), levels)
848                }
849            }
850        }
851        ColumnWriter::BoolColumnWriter(ref mut typed) => {
852            let array = column.as_boolean();
853            typed.write_batch(
854                get_bool_array_slice(array, indices).as_slice(),
855                levels.def_levels(),
856                levels.rep_levels(),
857            )
858        }
859        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
860            match column.data_type() {
861                ArrowDataType::Int64 => {
862                    let array = column.as_primitive::<Int64Type>();
863                    write_primitive(typed, array.values(), levels)
864                }
865                ArrowDataType::UInt64 => {
866                    let values = column.as_primitive::<UInt64Type>().values();
867                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
868                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
869                    let array = values.inner().typed_data::<i64>();
870                    write_primitive(typed, array, levels)
871                }
872                ArrowDataType::Decimal128(_, _) => {
873                    // use the int64 to represent the decimal with low precision
874                    let array = column
875                        .as_primitive::<Decimal128Type>()
876                        .unary::<_, Int64Type>(|v| v as i64);
877                    write_primitive(typed, array.values(), levels)
878                }
879                ArrowDataType::Decimal256(_, _) => {
880                    // use the int64 to represent the decimal with low precision
881                    let array = column
882                        .as_primitive::<Decimal256Type>()
883                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
884                    write_primitive(typed, array.values(), levels)
885                }
886                _ => {
887                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
888                    let array = array.as_primitive::<Int64Type>();
889                    write_primitive(typed, array.values(), levels)
890                }
891            }
892        }
893        ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
894            unreachable!("Currently unreachable because data type not supported")
895        }
896        ColumnWriter::FloatColumnWriter(ref mut typed) => {
897            let array = column.as_primitive::<Float32Type>();
898            write_primitive(typed, array.values(), levels)
899        }
900        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
901            let array = column.as_primitive::<Float64Type>();
902            write_primitive(typed, array.values(), levels)
903        }
904        ColumnWriter::ByteArrayColumnWriter(_) => {
905            unreachable!("should use ByteArrayWriter")
906        }
907        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
908            let bytes = match column.data_type() {
909                ArrowDataType::Interval(interval_unit) => match interval_unit {
910                    IntervalUnit::YearMonth => {
911                        let array = column
912                            .as_any()
913                            .downcast_ref::<arrow_array::IntervalYearMonthArray>()
914                            .unwrap();
915                        get_interval_ym_array_slice(array, indices)
916                    }
917                    IntervalUnit::DayTime => {
918                        let array = column
919                            .as_any()
920                            .downcast_ref::<arrow_array::IntervalDayTimeArray>()
921                            .unwrap();
922                        get_interval_dt_array_slice(array, indices)
923                    }
924                    _ => {
925                        return Err(ParquetError::NYI(
926                            format!(
927                                "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
928                            )
929                        ));
930                    }
931                },
932                ArrowDataType::FixedSizeBinary(_) => {
933                    let array = column
934                        .as_any()
935                        .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
936                        .unwrap();
937                    get_fsb_array_slice(array, indices)
938                }
939                ArrowDataType::Decimal128(_, _) => {
940                    let array = column.as_primitive::<Decimal128Type>();
941                    get_decimal_128_array_slice(array, indices)
942                }
943                ArrowDataType::Decimal256(_, _) => {
944                    let array = column
945                        .as_any()
946                        .downcast_ref::<arrow_array::Decimal256Array>()
947                        .unwrap();
948                    get_decimal_256_array_slice(array, indices)
949                }
950                ArrowDataType::Float16 => {
951                    let array = column.as_primitive::<Float16Type>();
952                    get_float_16_array_slice(array, indices)
953                }
954                _ => {
955                    return Err(ParquetError::NYI(
956                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
957                    ));
958                }
959            };
960            typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
961        }
962    }
963}
964
965fn write_primitive<E: ColumnValueEncoder>(
966    writer: &mut GenericColumnWriter<E>,
967    values: &E::Values,
968    levels: &ArrayLevels,
969) -> Result<usize> {
970    writer.write_batch_internal(
971        values,
972        Some(levels.non_null_indices()),
973        levels.def_levels(),
974        levels.rep_levels(),
975        None,
976        None,
977        None,
978    )
979}
980
981fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
982    let mut values = Vec::with_capacity(indices.len());
983    for i in indices {
984        values.push(array.value(*i))
985    }
986    values
987}
988
989/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
990/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
991fn get_interval_ym_array_slice(
992    array: &arrow_array::IntervalYearMonthArray,
993    indices: &[usize],
994) -> Vec<FixedLenByteArray> {
995    let mut values = Vec::with_capacity(indices.len());
996    for i in indices {
997        let mut value = array.value(*i).to_le_bytes().to_vec();
998        let mut suffix = vec![0; 8];
999        value.append(&mut suffix);
1000        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1001    }
1002    values
1003}
1004
1005/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1006/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1007fn get_interval_dt_array_slice(
1008    array: &arrow_array::IntervalDayTimeArray,
1009    indices: &[usize],
1010) -> Vec<FixedLenByteArray> {
1011    let mut values = Vec::with_capacity(indices.len());
1012    for i in indices {
1013        let mut out = [0; 12];
1014        let value = array.value(*i);
1015        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1016        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1017        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1018    }
1019    values
1020}
1021
1022fn get_decimal_128_array_slice(
1023    array: &arrow_array::Decimal128Array,
1024    indices: &[usize],
1025) -> Vec<FixedLenByteArray> {
1026    let mut values = Vec::with_capacity(indices.len());
1027    let size = decimal_length_from_precision(array.precision());
1028    for i in indices {
1029        let as_be_bytes = array.value(*i).to_be_bytes();
1030        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1031        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1032    }
1033    values
1034}
1035
1036fn get_decimal_256_array_slice(
1037    array: &arrow_array::Decimal256Array,
1038    indices: &[usize],
1039) -> Vec<FixedLenByteArray> {
1040    let mut values = Vec::with_capacity(indices.len());
1041    let size = decimal_length_from_precision(array.precision());
1042    for i in indices {
1043        let as_be_bytes = array.value(*i).to_be_bytes();
1044        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1045        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1046    }
1047    values
1048}
1049
1050fn get_float_16_array_slice(
1051    array: &arrow_array::Float16Array,
1052    indices: &[usize],
1053) -> Vec<FixedLenByteArray> {
1054    let mut values = Vec::with_capacity(indices.len());
1055    for i in indices {
1056        let value = array.value(*i).to_le_bytes().to_vec();
1057        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1058    }
1059    values
1060}
1061
1062fn get_fsb_array_slice(
1063    array: &arrow_array::FixedSizeBinaryArray,
1064    indices: &[usize],
1065) -> Vec<FixedLenByteArray> {
1066    let mut values = Vec::with_capacity(indices.len());
1067    for i in indices {
1068        let value = array.value(*i).to_vec();
1069        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1070    }
1071    values
1072}
1073
1074#[cfg(test)]
1075mod tests {
1076    use super::*;
1077
1078    use std::fs::File;
1079
1080    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1081    use crate::arrow::ARROW_SCHEMA_META_KEY;
1082    use arrow::datatypes::ToByteSlice;
1083    use arrow::datatypes::{DataType, Schema};
1084    use arrow::error::Result as ArrowResult;
1085    use arrow::util::pretty::pretty_format_batches;
1086    use arrow::{array::*, buffer::Buffer};
1087    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1088    use arrow_schema::Fields;
1089
1090    use crate::basic::Encoding;
1091    use crate::data_type::AsBytes;
1092    use crate::file::metadata::ParquetMetaData;
1093    use crate::file::page_index::index::Index;
1094    use crate::file::page_index::index_reader::read_offset_indexes;
1095    use crate::file::properties::{
1096        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1097    };
1098    use crate::file::serialized_reader::ReadOptionsBuilder;
1099    use crate::file::{
1100        reader::{FileReader, SerializedFileReader},
1101        statistics::Statistics,
1102    };
1103
1104    #[test]
1105    fn arrow_writer() {
1106        // define schema
1107        let schema = Schema::new(vec![
1108            Field::new("a", DataType::Int32, false),
1109            Field::new("b", DataType::Int32, true),
1110        ]);
1111
1112        // create some data
1113        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1114        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1115
1116        // build a record batch
1117        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1118
1119        roundtrip(batch, Some(SMALL_SIZE / 2));
1120    }
1121
1122    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1123        let mut buffer = vec![];
1124
1125        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1126        writer.write(expected_batch).unwrap();
1127        writer.close().unwrap();
1128
1129        buffer
1130    }
1131
1132    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1133        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1134        writer.write(expected_batch).unwrap();
1135        writer.into_inner().unwrap()
1136    }
1137
1138    #[test]
1139    fn roundtrip_bytes() {
1140        // define schema
1141        let schema = Arc::new(Schema::new(vec![
1142            Field::new("a", DataType::Int32, false),
1143            Field::new("b", DataType::Int32, true),
1144        ]));
1145
1146        // create some data
1147        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1148        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1149
1150        // build a record batch
1151        let expected_batch =
1152            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1153
1154        for buffer in [
1155            get_bytes_after_close(schema.clone(), &expected_batch),
1156            get_bytes_by_into_inner(schema, &expected_batch),
1157        ] {
1158            let cursor = Bytes::from(buffer);
1159            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1160
1161            let actual_batch = record_batch_reader
1162                .next()
1163                .expect("No batch found")
1164                .expect("Unable to get batch");
1165
1166            assert_eq!(expected_batch.schema(), actual_batch.schema());
1167            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1168            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1169            for i in 0..expected_batch.num_columns() {
1170                let expected_data = expected_batch.column(i).to_data();
1171                let actual_data = actual_batch.column(i).to_data();
1172
1173                assert_eq!(expected_data, actual_data);
1174            }
1175        }
1176    }
1177
1178    #[test]
1179    fn arrow_writer_non_null() {
1180        // define schema
1181        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1182
1183        // create some data
1184        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1185
1186        // build a record batch
1187        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1188
1189        roundtrip(batch, Some(SMALL_SIZE / 2));
1190    }
1191
1192    #[test]
1193    fn arrow_writer_list() {
1194        // define schema
1195        let schema = Schema::new(vec![Field::new(
1196            "a",
1197            DataType::List(Arc::new(Field::new("item", DataType::Int32, false))),
1198            true,
1199        )]);
1200
1201        // create some data
1202        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1203
1204        // Construct a buffer for value offsets, for the nested array:
1205        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1206        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1207
1208        // Construct a list array from the above two
1209        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
1210            "item",
1211            DataType::Int32,
1212            false,
1213        ))))
1214        .len(5)
1215        .add_buffer(a_value_offsets)
1216        .add_child_data(a_values.into_data())
1217        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1218        .build()
1219        .unwrap();
1220        let a = ListArray::from(a_list_data);
1221
1222        // build a record batch
1223        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1224
1225        assert_eq!(batch.column(0).null_count(), 1);
1226
1227        // This test fails if the max row group size is less than the batch's length
1228        // see https://github.com/apache/arrow-rs/issues/518
1229        roundtrip(batch, None);
1230    }
1231
1232    #[test]
1233    fn arrow_writer_list_non_null() {
1234        // define schema
1235        let schema = Schema::new(vec![Field::new(
1236            "a",
1237            DataType::List(Arc::new(Field::new("item", DataType::Int32, false))),
1238            false,
1239        )]);
1240
1241        // create some data
1242        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1243
1244        // Construct a buffer for value offsets, for the nested array:
1245        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1246        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1247
1248        // Construct a list array from the above two
1249        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
1250            "item",
1251            DataType::Int32,
1252            false,
1253        ))))
1254        .len(5)
1255        .add_buffer(a_value_offsets)
1256        .add_child_data(a_values.into_data())
1257        .build()
1258        .unwrap();
1259        let a = ListArray::from(a_list_data);
1260
1261        // build a record batch
1262        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1263
1264        // This test fails if the max row group size is less than the batch's length
1265        // see https://github.com/apache/arrow-rs/issues/518
1266        assert_eq!(batch.column(0).null_count(), 0);
1267
1268        roundtrip(batch, None);
1269    }
1270
1271    #[test]
1272    fn arrow_writer_binary() {
1273        let string_field = Field::new("a", DataType::Utf8, false);
1274        let binary_field = Field::new("b", DataType::Binary, false);
1275        let schema = Schema::new(vec![string_field, binary_field]);
1276
1277        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1278        let raw_binary_values = [
1279            b"foo".to_vec(),
1280            b"bar".to_vec(),
1281            b"baz".to_vec(),
1282            b"quux".to_vec(),
1283        ];
1284        let raw_binary_value_refs = raw_binary_values
1285            .iter()
1286            .map(|x| x.as_slice())
1287            .collect::<Vec<_>>();
1288
1289        let string_values = StringArray::from(raw_string_values.clone());
1290        let binary_values = BinaryArray::from(raw_binary_value_refs);
1291        let batch = RecordBatch::try_new(
1292            Arc::new(schema),
1293            vec![Arc::new(string_values), Arc::new(binary_values)],
1294        )
1295        .unwrap();
1296
1297        roundtrip(batch, Some(SMALL_SIZE / 2));
1298    }
1299
1300    #[test]
1301    fn arrow_writer_binary_view() {
1302        let string_field = Field::new("a", DataType::Utf8View, false);
1303        let binary_field = Field::new("b", DataType::BinaryView, false);
1304        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1305        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1306
1307        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1308        let raw_binary_values = vec![
1309            b"foo".to_vec(),
1310            b"bar".to_vec(),
1311            b"large payload over 12 bytes".to_vec(),
1312            b"lulu".to_vec(),
1313        ];
1314        let nullable_string_values =
1315            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1316
1317        let string_view_values = StringViewArray::from(raw_string_values);
1318        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1319        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1320        let batch = RecordBatch::try_new(
1321            Arc::new(schema),
1322            vec![
1323                Arc::new(string_view_values),
1324                Arc::new(binary_view_values),
1325                Arc::new(nullable_string_view_values),
1326            ],
1327        )
1328        .unwrap();
1329
1330        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1331        roundtrip(batch, None);
1332    }
1333
1334    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1335        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1336        let schema = Schema::new(vec![decimal_field]);
1337
1338        let decimal_values = vec![10_000, 50_000, 0, -100]
1339            .into_iter()
1340            .map(Some)
1341            .collect::<Decimal128Array>()
1342            .with_precision_and_scale(precision, scale)
1343            .unwrap();
1344
1345        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1346    }
1347
1348    #[test]
1349    fn arrow_writer_decimal() {
1350        // int32 to store the decimal value
1351        let batch_int32_decimal = get_decimal_batch(5, 2);
1352        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1353        // int64 to store the decimal value
1354        let batch_int64_decimal = get_decimal_batch(12, 2);
1355        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1356        // fixed_length_byte_array to store the decimal value
1357        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1358        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1359    }
1360
1361    #[test]
1362    fn arrow_writer_complex() {
1363        // define schema
1364        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1365        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1366        let struct_field_g = Arc::new(Field::new_list(
1367            "g",
1368            Field::new("item", DataType::Int16, true),
1369            false,
1370        ));
1371        let struct_field_h = Arc::new(Field::new_list(
1372            "h",
1373            Field::new("item", DataType::Int16, false),
1374            true,
1375        ));
1376        let struct_field_e = Arc::new(Field::new_struct(
1377            "e",
1378            vec![
1379                struct_field_f.clone(),
1380                struct_field_g.clone(),
1381                struct_field_h.clone(),
1382            ],
1383            false,
1384        ));
1385        let schema = Schema::new(vec![
1386            Field::new("a", DataType::Int32, false),
1387            Field::new("b", DataType::Int32, true),
1388            Field::new_struct(
1389                "c",
1390                vec![struct_field_d.clone(), struct_field_e.clone()],
1391                false,
1392            ),
1393        ]);
1394
1395        // create some data
1396        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1397        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1398        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1399        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1400
1401        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1402
1403        // Construct a buffer for value offsets, for the nested array:
1404        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1405        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1406
1407        // Construct a list array from the above two
1408        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1409            .len(5)
1410            .add_buffer(g_value_offsets.clone())
1411            .add_child_data(g_value.to_data())
1412            .build()
1413            .unwrap();
1414        let g = ListArray::from(g_list_data);
1415        // The difference between g and h is that h has a null bitmap
1416        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1417            .len(5)
1418            .add_buffer(g_value_offsets)
1419            .add_child_data(g_value.to_data())
1420            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1421            .build()
1422            .unwrap();
1423        let h = ListArray::from(h_list_data);
1424
1425        let e = StructArray::from(vec![
1426            (struct_field_f, Arc::new(f) as ArrayRef),
1427            (struct_field_g, Arc::new(g) as ArrayRef),
1428            (struct_field_h, Arc::new(h) as ArrayRef),
1429        ]);
1430
1431        let c = StructArray::from(vec![
1432            (struct_field_d, Arc::new(d) as ArrayRef),
1433            (struct_field_e, Arc::new(e) as ArrayRef),
1434        ]);
1435
1436        // build a record batch
1437        let batch = RecordBatch::try_new(
1438            Arc::new(schema),
1439            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1440        )
1441        .unwrap();
1442
1443        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1444        roundtrip(batch, Some(SMALL_SIZE / 3));
1445    }
1446
1447    #[test]
1448    fn arrow_writer_complex_mixed() {
1449        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
1450        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
1451
1452        // define schema
1453        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1454        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1455        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1456        let schema = Schema::new(vec![Field::new(
1457            "some_nested_object",
1458            DataType::Struct(Fields::from(vec![
1459                offset_field.clone(),
1460                partition_field.clone(),
1461                topic_field.clone(),
1462            ])),
1463            false,
1464        )]);
1465
1466        // create some data
1467        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1468        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1469        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1470
1471        let some_nested_object = StructArray::from(vec![
1472            (offset_field, Arc::new(offset) as ArrayRef),
1473            (partition_field, Arc::new(partition) as ArrayRef),
1474            (topic_field, Arc::new(topic) as ArrayRef),
1475        ]);
1476
1477        // build a record batch
1478        let batch =
1479            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1480
1481        roundtrip(batch, Some(SMALL_SIZE / 2));
1482    }
1483
1484    #[test]
1485    fn arrow_writer_map() {
1486        // Note: we are using the JSON Arrow reader for brevity
1487        let json_content = r#"
1488        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1489        {"stocks":{"long": null, "long": "$CCC", "short": null}}
1490        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1491        "#;
1492        let entries_struct_type = DataType::Struct(Fields::from(vec![
1493            Field::new("key", DataType::Utf8, false),
1494            Field::new("value", DataType::Utf8, true),
1495        ]));
1496        let stocks_field = Field::new(
1497            "stocks",
1498            DataType::Map(
1499                Arc::new(Field::new("entries", entries_struct_type, false)),
1500                false,
1501            ),
1502            true,
1503        );
1504        let schema = Arc::new(Schema::new(vec![stocks_field]));
1505        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1506        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1507
1508        let batch = reader.next().unwrap().unwrap();
1509        roundtrip(batch, None);
1510    }
1511
1512    #[test]
1513    fn arrow_writer_2_level_struct() {
1514        // tests writing <struct<struct<primitive>>
1515        let field_c = Field::new("c", DataType::Int32, true);
1516        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1517        let type_a = DataType::Struct(vec![field_b.clone()].into());
1518        let field_a = Field::new("a", type_a, true);
1519        let schema = Schema::new(vec![field_a.clone()]);
1520
1521        // create data
1522        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1523        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1524            .len(6)
1525            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1526            .add_child_data(c.into_data())
1527            .build()
1528            .unwrap();
1529        let b = StructArray::from(b_data);
1530        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1531            .len(6)
1532            .null_bit_buffer(Some(Buffer::from([0b00101111])))
1533            .add_child_data(b.into_data())
1534            .build()
1535            .unwrap();
1536        let a = StructArray::from(a_data);
1537
1538        assert_eq!(a.null_count(), 1);
1539        assert_eq!(a.column(0).null_count(), 2);
1540
1541        // build a racord batch
1542        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1543
1544        roundtrip(batch, Some(SMALL_SIZE / 2));
1545    }
1546
1547    #[test]
1548    fn arrow_writer_2_level_struct_non_null() {
1549        // tests writing <struct<struct<primitive>>
1550        let field_c = Field::new("c", DataType::Int32, false);
1551        let type_b = DataType::Struct(vec![field_c].into());
1552        let field_b = Field::new("b", type_b.clone(), false);
1553        let type_a = DataType::Struct(vec![field_b].into());
1554        let field_a = Field::new("a", type_a.clone(), false);
1555        let schema = Schema::new(vec![field_a]);
1556
1557        // create data
1558        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1559        let b_data = ArrayDataBuilder::new(type_b)
1560            .len(6)
1561            .add_child_data(c.into_data())
1562            .build()
1563            .unwrap();
1564        let b = StructArray::from(b_data);
1565        let a_data = ArrayDataBuilder::new(type_a)
1566            .len(6)
1567            .add_child_data(b.into_data())
1568            .build()
1569            .unwrap();
1570        let a = StructArray::from(a_data);
1571
1572        assert_eq!(a.null_count(), 0);
1573        assert_eq!(a.column(0).null_count(), 0);
1574
1575        // build a racord batch
1576        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1577
1578        roundtrip(batch, Some(SMALL_SIZE / 2));
1579    }
1580
1581    #[test]
1582    fn arrow_writer_2_level_struct_mixed_null() {
1583        // tests writing <struct<struct<primitive>>
1584        let field_c = Field::new("c", DataType::Int32, false);
1585        let type_b = DataType::Struct(vec![field_c].into());
1586        let field_b = Field::new("b", type_b.clone(), true);
1587        let type_a = DataType::Struct(vec![field_b].into());
1588        let field_a = Field::new("a", type_a.clone(), false);
1589        let schema = Schema::new(vec![field_a]);
1590
1591        // create data
1592        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1593        let b_data = ArrayDataBuilder::new(type_b)
1594            .len(6)
1595            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1596            .add_child_data(c.into_data())
1597            .build()
1598            .unwrap();
1599        let b = StructArray::from(b_data);
1600        // a intentionally has no null buffer, to test that this is handled correctly
1601        let a_data = ArrayDataBuilder::new(type_a)
1602            .len(6)
1603            .add_child_data(b.into_data())
1604            .build()
1605            .unwrap();
1606        let a = StructArray::from(a_data);
1607
1608        assert_eq!(a.null_count(), 0);
1609        assert_eq!(a.column(0).null_count(), 2);
1610
1611        // build a racord batch
1612        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1613
1614        roundtrip(batch, Some(SMALL_SIZE / 2));
1615    }
1616
1617    #[test]
1618    fn arrow_writer_2_level_struct_mixed_null_2() {
1619        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
1620        let field_c = Field::new("c", DataType::Int32, false);
1621        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
1622        let field_e = Field::new(
1623            "e",
1624            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1625            false,
1626        );
1627
1628        let field_b = Field::new(
1629            "b",
1630            DataType::Struct(vec![field_c, field_d, field_e].into()),
1631            false,
1632        );
1633        let type_a = DataType::Struct(vec![field_b.clone()].into());
1634        let field_a = Field::new("a", type_a, true);
1635        let schema = Schema::new(vec![field_a.clone()]);
1636
1637        // create data
1638        let c = Int32Array::from_iter_values(0..6);
1639        let d = FixedSizeBinaryArray::try_from_iter(
1640            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
1641        )
1642        .expect("four byte values");
1643        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
1644        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1645            .len(6)
1646            .add_child_data(c.into_data())
1647            .add_child_data(d.into_data())
1648            .add_child_data(e.into_data())
1649            .build()
1650            .unwrap();
1651        let b = StructArray::from(b_data);
1652        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1653            .len(6)
1654            .null_bit_buffer(Some(Buffer::from([0b00100101])))
1655            .add_child_data(b.into_data())
1656            .build()
1657            .unwrap();
1658        let a = StructArray::from(a_data);
1659
1660        assert_eq!(a.null_count(), 3);
1661        assert_eq!(a.column(0).null_count(), 0);
1662
1663        // build a record batch
1664        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1665
1666        roundtrip(batch, Some(SMALL_SIZE / 2));
1667    }
1668
1669    #[test]
1670    fn test_empty_dict() {
1671        let struct_fields = Fields::from(vec![Field::new(
1672            "dict",
1673            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1674            false,
1675        )]);
1676
1677        let schema = Schema::new(vec![Field::new_struct(
1678            "struct",
1679            struct_fields.clone(),
1680            true,
1681        )]);
1682        let dictionary = Arc::new(DictionaryArray::new(
1683            Int32Array::new_null(5),
1684            Arc::new(StringArray::new_null(0)),
1685        ));
1686
1687        let s = StructArray::new(
1688            struct_fields,
1689            vec![dictionary],
1690            Some(NullBuffer::new_null(5)),
1691        );
1692
1693        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
1694        roundtrip(batch, None);
1695    }
1696    #[test]
1697    fn arrow_writer_page_size() {
1698        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
1699
1700        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
1701
1702        // Generate an array of 10 unique 10 character string
1703        for i in 0..10 {
1704            let value = i
1705                .to_string()
1706                .repeat(10)
1707                .chars()
1708                .take(10)
1709                .collect::<String>();
1710
1711            builder.append_value(value);
1712        }
1713
1714        let array = Arc::new(builder.finish());
1715
1716        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1717
1718        let file = tempfile::tempfile().unwrap();
1719
1720        // Set everything very low so we fallback to PLAIN encoding after the first row
1721        let props = WriterProperties::builder()
1722            .set_data_page_size_limit(1)
1723            .set_dictionary_page_size_limit(1)
1724            .set_write_batch_size(1)
1725            .build();
1726
1727        let mut writer =
1728            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
1729                .expect("Unable to write file");
1730        writer.write(&batch).unwrap();
1731        writer.close().unwrap();
1732
1733        let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
1734
1735        let column = reader.metadata().row_group(0).columns();
1736
1737        assert_eq!(column.len(), 1);
1738
1739        // We should write one row before falling back to PLAIN encoding so there should still be a
1740        // dictionary page.
1741        assert!(
1742            column[0].dictionary_page_offset().is_some(),
1743            "Expected a dictionary page"
1744        );
1745
1746        let offset_indexes = read_offset_indexes(&file, column).unwrap();
1747
1748        let page_locations = offset_indexes[0].page_locations.clone();
1749
1750        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
1751        // so we expect one dictionary encoded page and then a page per row thereafter.
1752        assert_eq!(
1753            page_locations.len(),
1754            10,
1755            "Expected 9 pages but got {page_locations:#?}"
1756        );
1757    }
1758
1759    const SMALL_SIZE: usize = 7;
1760    const MEDIUM_SIZE: usize = 63;
1761
1762    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
1763        let mut files = vec![];
1764        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
1765            let mut props = WriterProperties::builder().set_writer_version(version);
1766
1767            if let Some(size) = max_row_group_size {
1768                props = props.set_max_row_group_size(size)
1769            }
1770
1771            let props = props.build();
1772            files.push(roundtrip_opts(&expected_batch, props))
1773        }
1774        files
1775    }
1776
1777    fn roundtrip_opts_with_array_validation<F>(
1778        expected_batch: &RecordBatch,
1779        props: WriterProperties,
1780        validate: F,
1781    ) -> File
1782    where
1783        F: Fn(&ArrayData, &ArrayData),
1784    {
1785        let file = tempfile::tempfile().unwrap();
1786
1787        let mut writer = ArrowWriter::try_new(
1788            file.try_clone().unwrap(),
1789            expected_batch.schema(),
1790            Some(props),
1791        )
1792        .expect("Unable to write file");
1793        writer.write(expected_batch).unwrap();
1794        writer.close().unwrap();
1795
1796        let mut record_batch_reader =
1797            ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
1798
1799        let actual_batch = record_batch_reader
1800            .next()
1801            .expect("No batch found")
1802            .expect("Unable to get batch");
1803
1804        assert_eq!(expected_batch.schema(), actual_batch.schema());
1805        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1806        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1807        for i in 0..expected_batch.num_columns() {
1808            let expected_data = expected_batch.column(i).to_data();
1809            let actual_data = actual_batch.column(i).to_data();
1810            validate(&expected_data, &actual_data);
1811        }
1812
1813        file
1814    }
1815
1816    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
1817        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
1818            a.validate_full().expect("valid expected data");
1819            b.validate_full().expect("valid actual data");
1820            assert_eq!(a, b)
1821        })
1822    }
1823
1824    struct RoundTripOptions {
1825        values: ArrayRef,
1826        schema: SchemaRef,
1827        bloom_filter: bool,
1828        bloom_filter_position: BloomFilterPosition,
1829    }
1830
1831    impl RoundTripOptions {
1832        fn new(values: ArrayRef, nullable: bool) -> Self {
1833            let data_type = values.data_type().clone();
1834            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
1835            Self {
1836                values,
1837                schema: Arc::new(schema),
1838                bloom_filter: false,
1839                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
1840            }
1841        }
1842    }
1843
1844    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
1845        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
1846    }
1847
1848    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
1849        let mut options = RoundTripOptions::new(values, false);
1850        options.schema = schema;
1851        one_column_roundtrip_with_options(options)
1852    }
1853
1854    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
1855        let RoundTripOptions {
1856            values,
1857            schema,
1858            bloom_filter,
1859            bloom_filter_position,
1860        } = options;
1861
1862        let encodings = match values.data_type() {
1863            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
1864                vec![
1865                    Encoding::PLAIN,
1866                    Encoding::DELTA_BYTE_ARRAY,
1867                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
1868                ]
1869            }
1870            DataType::Int64
1871            | DataType::Int32
1872            | DataType::Int16
1873            | DataType::Int8
1874            | DataType::UInt64
1875            | DataType::UInt32
1876            | DataType::UInt16
1877            | DataType::UInt8 => vec![
1878                Encoding::PLAIN,
1879                Encoding::DELTA_BINARY_PACKED,
1880                Encoding::BYTE_STREAM_SPLIT,
1881            ],
1882            DataType::Float32 | DataType::Float64 => {
1883                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
1884            }
1885            _ => vec![Encoding::PLAIN],
1886        };
1887
1888        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
1889
1890        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
1891
1892        let mut files = vec![];
1893        for dictionary_size in [0, 1, 1024] {
1894            for encoding in &encodings {
1895                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
1896                    for row_group_size in row_group_sizes {
1897                        let props = WriterProperties::builder()
1898                            .set_writer_version(version)
1899                            .set_max_row_group_size(row_group_size)
1900                            .set_dictionary_enabled(dictionary_size != 0)
1901                            .set_dictionary_page_size_limit(dictionary_size.max(1))
1902                            .set_encoding(*encoding)
1903                            .set_bloom_filter_enabled(bloom_filter)
1904                            .set_bloom_filter_position(bloom_filter_position)
1905                            .build();
1906
1907                        files.push(roundtrip_opts(&expected_batch, props))
1908                    }
1909                }
1910            }
1911        }
1912        files
1913    }
1914
1915    fn values_required<A, I>(iter: I) -> Vec<File>
1916    where
1917        A: From<Vec<I::Item>> + Array + 'static,
1918        I: IntoIterator,
1919    {
1920        let raw_values: Vec<_> = iter.into_iter().collect();
1921        let values = Arc::new(A::from(raw_values));
1922        one_column_roundtrip(values, false)
1923    }
1924
1925    fn values_optional<A, I>(iter: I) -> Vec<File>
1926    where
1927        A: From<Vec<Option<I::Item>>> + Array + 'static,
1928        I: IntoIterator,
1929    {
1930        let optional_raw_values: Vec<_> = iter
1931            .into_iter()
1932            .enumerate()
1933            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
1934            .collect();
1935        let optional_values = Arc::new(A::from(optional_raw_values));
1936        one_column_roundtrip(optional_values, true)
1937    }
1938
1939    fn required_and_optional<A, I>(iter: I)
1940    where
1941        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
1942        I: IntoIterator + Clone,
1943    {
1944        values_required::<A, I>(iter.clone());
1945        values_optional::<A, I>(iter);
1946    }
1947
1948    fn check_bloom_filter<T: AsBytes>(
1949        files: Vec<File>,
1950        file_column: String,
1951        positive_values: Vec<T>,
1952        negative_values: Vec<T>,
1953    ) {
1954        files.into_iter().take(1).for_each(|file| {
1955            let file_reader = SerializedFileReader::new_with_options(
1956                file,
1957                ReadOptionsBuilder::new()
1958                    .with_reader_properties(
1959                        ReaderProperties::builder()
1960                            .set_read_bloom_filter(true)
1961                            .build(),
1962                    )
1963                    .build(),
1964            )
1965            .expect("Unable to open file as Parquet");
1966            let metadata = file_reader.metadata();
1967
1968            // Gets bloom filters from all row groups.
1969            let mut bloom_filters: Vec<_> = vec![];
1970            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
1971                if let Some((column_index, _)) = row_group
1972                    .columns()
1973                    .iter()
1974                    .enumerate()
1975                    .find(|(_, column)| column.column_path().string() == file_column)
1976                {
1977                    let row_group_reader = file_reader
1978                        .get_row_group(ri)
1979                        .expect("Unable to read row group");
1980                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
1981                        bloom_filters.push(sbbf.clone());
1982                    } else {
1983                        panic!("No bloom filter for column named {file_column} found");
1984                    }
1985                } else {
1986                    panic!("No column named {file_column} found");
1987                }
1988            }
1989
1990            positive_values.iter().for_each(|value| {
1991                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
1992                assert!(
1993                    found.is_some(),
1994                    "{}",
1995                    format!("Value {:?} should be in bloom filter", value.as_bytes())
1996                );
1997            });
1998
1999            negative_values.iter().for_each(|value| {
2000                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2001                assert!(
2002                    found.is_none(),
2003                    "{}",
2004                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2005                );
2006            });
2007        });
2008    }
2009
2010    #[test]
2011    fn all_null_primitive_single_column() {
2012        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2013        one_column_roundtrip(values, true);
2014    }
2015    #[test]
2016    fn null_single_column() {
2017        let values = Arc::new(NullArray::new(SMALL_SIZE));
2018        one_column_roundtrip(values, true);
2019        // null arrays are always nullable, a test with non-nullable nulls fails
2020    }
2021
2022    #[test]
2023    fn bool_single_column() {
2024        required_and_optional::<BooleanArray, _>(
2025            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2026        );
2027    }
2028
2029    #[test]
2030    fn bool_large_single_column() {
2031        let values = Arc::new(
2032            [None, Some(true), Some(false)]
2033                .iter()
2034                .cycle()
2035                .copied()
2036                .take(200_000)
2037                .collect::<BooleanArray>(),
2038        );
2039        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2040        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2041        let file = tempfile::tempfile().unwrap();
2042
2043        let mut writer =
2044            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2045                .expect("Unable to write file");
2046        writer.write(&expected_batch).unwrap();
2047        writer.close().unwrap();
2048    }
2049
2050    #[test]
2051    fn check_page_offset_index_with_nan() {
2052        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2053        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2054        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2055
2056        let mut out = Vec::with_capacity(1024);
2057        let mut writer =
2058            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2059        writer.write(&batch).unwrap();
2060        let file_meta_data = writer.close().unwrap();
2061        for row_group in file_meta_data.row_groups {
2062            for column in row_group.columns {
2063                assert!(column.offset_index_offset.is_some());
2064                assert!(column.offset_index_length.is_some());
2065                assert!(column.column_index_offset.is_none());
2066                assert!(column.column_index_length.is_none());
2067            }
2068        }
2069    }
2070
2071    #[test]
2072    fn i8_single_column() {
2073        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2074    }
2075
2076    #[test]
2077    fn i16_single_column() {
2078        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2079    }
2080
2081    #[test]
2082    fn i32_single_column() {
2083        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2084    }
2085
2086    #[test]
2087    fn i64_single_column() {
2088        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2089    }
2090
2091    #[test]
2092    fn u8_single_column() {
2093        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2094    }
2095
2096    #[test]
2097    fn u16_single_column() {
2098        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2099    }
2100
2101    #[test]
2102    fn u32_single_column() {
2103        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2104    }
2105
2106    #[test]
2107    fn u64_single_column() {
2108        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2109    }
2110
2111    #[test]
2112    fn f32_single_column() {
2113        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2114    }
2115
2116    #[test]
2117    fn f64_single_column() {
2118        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2119    }
2120
2121    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
2122    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
2123    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
2124
2125    #[test]
2126    fn timestamp_second_single_column() {
2127        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2128        let values = Arc::new(TimestampSecondArray::from(raw_values));
2129
2130        one_column_roundtrip(values, false);
2131    }
2132
2133    #[test]
2134    fn timestamp_millisecond_single_column() {
2135        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2136        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2137
2138        one_column_roundtrip(values, false);
2139    }
2140
2141    #[test]
2142    fn timestamp_microsecond_single_column() {
2143        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2144        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2145
2146        one_column_roundtrip(values, false);
2147    }
2148
2149    #[test]
2150    fn timestamp_nanosecond_single_column() {
2151        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2152        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2153
2154        one_column_roundtrip(values, false);
2155    }
2156
2157    #[test]
2158    fn date32_single_column() {
2159        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2160    }
2161
2162    #[test]
2163    fn date64_single_column() {
2164        // Date64 must be a multiple of 86400000, see ARROW-10925
2165        required_and_optional::<Date64Array, _>(
2166            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2167        );
2168    }
2169
2170    #[test]
2171    fn time32_second_single_column() {
2172        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2173    }
2174
2175    #[test]
2176    fn time32_millisecond_single_column() {
2177        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2178    }
2179
2180    #[test]
2181    fn time64_microsecond_single_column() {
2182        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2183    }
2184
2185    #[test]
2186    fn time64_nanosecond_single_column() {
2187        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2188    }
2189
2190    #[test]
2191    #[should_panic(expected = "Converting Duration to parquet not supported")]
2192    fn duration_second_single_column() {
2193        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2194    }
2195
2196    #[test]
2197    #[should_panic(expected = "Converting Duration to parquet not supported")]
2198    fn duration_millisecond_single_column() {
2199        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2200    }
2201
2202    #[test]
2203    #[should_panic(expected = "Converting Duration to parquet not supported")]
2204    fn duration_microsecond_single_column() {
2205        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2206    }
2207
2208    #[test]
2209    #[should_panic(expected = "Converting Duration to parquet not supported")]
2210    fn duration_nanosecond_single_column() {
2211        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2212    }
2213
2214    #[test]
2215    fn interval_year_month_single_column() {
2216        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2217    }
2218
2219    #[test]
2220    fn interval_day_time_single_column() {
2221        required_and_optional::<IntervalDayTimeArray, _>(vec![
2222            IntervalDayTime::new(0, 1),
2223            IntervalDayTime::new(0, 3),
2224            IntervalDayTime::new(3, -2),
2225            IntervalDayTime::new(-200, 4),
2226        ]);
2227    }
2228
2229    #[test]
2230    #[should_panic(
2231        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2232    )]
2233    fn interval_month_day_nano_single_column() {
2234        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2235            IntervalMonthDayNano::new(0, 1, 5),
2236            IntervalMonthDayNano::new(0, 3, 2),
2237            IntervalMonthDayNano::new(3, -2, -5),
2238            IntervalMonthDayNano::new(-200, 4, -1),
2239        ]);
2240    }
2241
2242    #[test]
2243    fn binary_single_column() {
2244        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2245        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2246        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2247
2248        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2249        values_required::<BinaryArray, _>(many_vecs_iter);
2250    }
2251
2252    #[test]
2253    fn binary_view_single_column() {
2254        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2255        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2256        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2257
2258        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2259        values_required::<BinaryViewArray, _>(many_vecs_iter);
2260    }
2261
2262    #[test]
2263    fn i32_column_bloom_filter_at_end() {
2264        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2265        let mut options = RoundTripOptions::new(array, false);
2266        options.bloom_filter = true;
2267        options.bloom_filter_position = BloomFilterPosition::End;
2268
2269        let files = one_column_roundtrip_with_options(options);
2270        check_bloom_filter(
2271            files,
2272            "col".to_string(),
2273            (0..SMALL_SIZE as i32).collect(),
2274            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2275        );
2276    }
2277
2278    #[test]
2279    fn i32_column_bloom_filter() {
2280        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2281        let mut options = RoundTripOptions::new(array, false);
2282        options.bloom_filter = true;
2283
2284        let files = one_column_roundtrip_with_options(options);
2285        check_bloom_filter(
2286            files,
2287            "col".to_string(),
2288            (0..SMALL_SIZE as i32).collect(),
2289            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2290        );
2291    }
2292
2293    #[test]
2294    fn binary_column_bloom_filter() {
2295        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2296        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2297        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2298
2299        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2300        let mut options = RoundTripOptions::new(array, false);
2301        options.bloom_filter = true;
2302
2303        let files = one_column_roundtrip_with_options(options);
2304        check_bloom_filter(
2305            files,
2306            "col".to_string(),
2307            many_vecs,
2308            vec![vec![(SMALL_SIZE + 1) as u8]],
2309        );
2310    }
2311
2312    #[test]
2313    fn empty_string_null_column_bloom_filter() {
2314        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2315        let raw_strs = raw_values.iter().map(|s| s.as_str());
2316
2317        let array = Arc::new(StringArray::from_iter_values(raw_strs));
2318        let mut options = RoundTripOptions::new(array, false);
2319        options.bloom_filter = true;
2320
2321        let files = one_column_roundtrip_with_options(options);
2322
2323        let optional_raw_values: Vec<_> = raw_values
2324            .iter()
2325            .enumerate()
2326            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2327            .collect();
2328        // For null slots, empty string should not be in bloom filter.
2329        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2330    }
2331
2332    #[test]
2333    fn large_binary_single_column() {
2334        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2335        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2336        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2337
2338        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2339        values_required::<LargeBinaryArray, _>(many_vecs_iter);
2340    }
2341
2342    #[test]
2343    fn fixed_size_binary_single_column() {
2344        let mut builder = FixedSizeBinaryBuilder::new(4);
2345        builder.append_value(b"0123").unwrap();
2346        builder.append_null();
2347        builder.append_value(b"8910").unwrap();
2348        builder.append_value(b"1112").unwrap();
2349        let array = Arc::new(builder.finish());
2350
2351        one_column_roundtrip(array, true);
2352    }
2353
2354    #[test]
2355    fn string_single_column() {
2356        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2357        let raw_strs = raw_values.iter().map(|s| s.as_str());
2358
2359        required_and_optional::<StringArray, _>(raw_strs);
2360    }
2361
2362    #[test]
2363    fn large_string_single_column() {
2364        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2365        let raw_strs = raw_values.iter().map(|s| s.as_str());
2366
2367        required_and_optional::<LargeStringArray, _>(raw_strs);
2368    }
2369
2370    #[test]
2371    fn string_view_single_column() {
2372        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2373        let raw_strs = raw_values.iter().map(|s| s.as_str());
2374
2375        required_and_optional::<StringViewArray, _>(raw_strs);
2376    }
2377
2378    #[test]
2379    fn null_list_single_column() {
2380        let null_field = Field::new("item", DataType::Null, true);
2381        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2382
2383        let schema = Schema::new(vec![list_field]);
2384
2385        // Build [[], null, [null, null]]
2386        let a_values = NullArray::new(2);
2387        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2388        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
2389            "item",
2390            DataType::Null,
2391            true,
2392        ))))
2393        .len(3)
2394        .add_buffer(a_value_offsets)
2395        .null_bit_buffer(Some(Buffer::from([0b00000101])))
2396        .add_child_data(a_values.into_data())
2397        .build()
2398        .unwrap();
2399
2400        let a = ListArray::from(a_list_data);
2401
2402        assert!(a.is_valid(0));
2403        assert!(!a.is_valid(1));
2404        assert!(a.is_valid(2));
2405
2406        assert_eq!(a.value(0).len(), 0);
2407        assert_eq!(a.value(2).len(), 2);
2408        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2409
2410        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2411        roundtrip(batch, None);
2412    }
2413
2414    #[test]
2415    fn list_single_column() {
2416        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2417        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2418        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
2419            "item",
2420            DataType::Int32,
2421            false,
2422        ))))
2423        .len(5)
2424        .add_buffer(a_value_offsets)
2425        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2426        .add_child_data(a_values.into_data())
2427        .build()
2428        .unwrap();
2429
2430        assert_eq!(a_list_data.null_count(), 1);
2431
2432        let a = ListArray::from(a_list_data);
2433        let values = Arc::new(a);
2434
2435        one_column_roundtrip(values, true);
2436    }
2437
2438    #[test]
2439    fn large_list_single_column() {
2440        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2441        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2442        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2443            "large_item",
2444            DataType::Int32,
2445            true,
2446        ))))
2447        .len(5)
2448        .add_buffer(a_value_offsets)
2449        .add_child_data(a_values.into_data())
2450        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2451        .build()
2452        .unwrap();
2453
2454        // I think this setup is incorrect because this should pass
2455        assert_eq!(a_list_data.null_count(), 1);
2456
2457        let a = LargeListArray::from(a_list_data);
2458        let values = Arc::new(a);
2459
2460        one_column_roundtrip(values, true);
2461    }
2462
2463    #[test]
2464    fn list_nested_nulls() {
2465        use arrow::datatypes::Int32Type;
2466        let data = vec![
2467            Some(vec![Some(1)]),
2468            Some(vec![Some(2), Some(3)]),
2469            None,
2470            Some(vec![Some(4), Some(5), None]),
2471            Some(vec![None]),
2472            Some(vec![Some(6), Some(7)]),
2473        ];
2474
2475        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2476        one_column_roundtrip(Arc::new(list), true);
2477
2478        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2479        one_column_roundtrip(Arc::new(list), true);
2480    }
2481
2482    #[test]
2483    fn struct_single_column() {
2484        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2485        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2486        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2487
2488        let values = Arc::new(s);
2489        one_column_roundtrip(values, false);
2490    }
2491
2492    #[test]
2493    fn fallback_flush_data_page() {
2494        //tests if the Fallback::flush_data_page clears all buffers correctly
2495        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
2496        let values = Arc::new(StringArray::from(raw_values));
2497        let encodings = vec![
2498            Encoding::DELTA_BYTE_ARRAY,
2499            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2500        ];
2501        let data_type = values.data_type().clone();
2502        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
2503        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2504
2505        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2506        let data_page_size_limit: usize = 32;
2507        let write_batch_size: usize = 16;
2508
2509        for encoding in &encodings {
2510            for row_group_size in row_group_sizes {
2511                let props = WriterProperties::builder()
2512                    .set_writer_version(WriterVersion::PARQUET_2_0)
2513                    .set_max_row_group_size(row_group_size)
2514                    .set_dictionary_enabled(false)
2515                    .set_encoding(*encoding)
2516                    .set_data_page_size_limit(data_page_size_limit)
2517                    .set_write_batch_size(write_batch_size)
2518                    .build();
2519
2520                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
2521                    let string_array_a = StringArray::from(a.clone());
2522                    let string_array_b = StringArray::from(b.clone());
2523                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
2524                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
2525                    assert_eq!(
2526                        vec_a, vec_b,
2527                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
2528                    );
2529                });
2530            }
2531        }
2532    }
2533
2534    #[test]
2535    fn arrow_writer_string_dictionary() {
2536        // define schema
2537        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2538            "dictionary",
2539            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2540            true,
2541            42,
2542            true,
2543        )]));
2544
2545        // create some data
2546        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2547            .iter()
2548            .copied()
2549            .collect();
2550
2551        // build a record batch
2552        one_column_roundtrip_with_schema(Arc::new(d), schema);
2553    }
2554
2555    #[test]
2556    fn arrow_writer_primitive_dictionary() {
2557        // define schema
2558        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2559            "dictionary",
2560            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
2561            true,
2562            42,
2563            true,
2564        )]));
2565
2566        // create some data
2567        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
2568        builder.append(12345678).unwrap();
2569        builder.append_null();
2570        builder.append(22345678).unwrap();
2571        builder.append(12345678).unwrap();
2572        let d = builder.finish();
2573
2574        one_column_roundtrip_with_schema(Arc::new(d), schema);
2575    }
2576
2577    #[test]
2578    fn arrow_writer_string_dictionary_unsigned_index() {
2579        // define schema
2580        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2581            "dictionary",
2582            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
2583            true,
2584            42,
2585            true,
2586        )]));
2587
2588        // create some data
2589        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2590            .iter()
2591            .copied()
2592            .collect();
2593
2594        one_column_roundtrip_with_schema(Arc::new(d), schema);
2595    }
2596
2597    #[test]
2598    fn u32_min_max() {
2599        // check values roundtrip through parquet
2600        let src = [
2601            u32::MIN,
2602            u32::MIN + 1,
2603            (i32::MAX as u32) - 1,
2604            i32::MAX as u32,
2605            (i32::MAX as u32) + 1,
2606            u32::MAX - 1,
2607            u32::MAX,
2608        ];
2609        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
2610        let files = one_column_roundtrip(values, false);
2611
2612        for file in files {
2613            // check statistics are valid
2614            let reader = SerializedFileReader::new(file).unwrap();
2615            let metadata = reader.metadata();
2616
2617            let mut row_offset = 0;
2618            for row_group in metadata.row_groups() {
2619                assert_eq!(row_group.num_columns(), 1);
2620                let column = row_group.column(0);
2621
2622                let num_values = column.num_values() as usize;
2623                let src_slice = &src[row_offset..row_offset + num_values];
2624                row_offset += column.num_values() as usize;
2625
2626                let stats = column.statistics().unwrap();
2627                if let Statistics::Int32(stats) = stats {
2628                    assert_eq!(
2629                        *stats.min_opt().unwrap() as u32,
2630                        *src_slice.iter().min().unwrap()
2631                    );
2632                    assert_eq!(
2633                        *stats.max_opt().unwrap() as u32,
2634                        *src_slice.iter().max().unwrap()
2635                    );
2636                } else {
2637                    panic!("Statistics::Int32 missing")
2638                }
2639            }
2640        }
2641    }
2642
2643    #[test]
2644    fn u64_min_max() {
2645        // check values roundtrip through parquet
2646        let src = [
2647            u64::MIN,
2648            u64::MIN + 1,
2649            (i64::MAX as u64) - 1,
2650            i64::MAX as u64,
2651            (i64::MAX as u64) + 1,
2652            u64::MAX - 1,
2653            u64::MAX,
2654        ];
2655        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
2656        let files = one_column_roundtrip(values, false);
2657
2658        for file in files {
2659            // check statistics are valid
2660            let reader = SerializedFileReader::new(file).unwrap();
2661            let metadata = reader.metadata();
2662
2663            let mut row_offset = 0;
2664            for row_group in metadata.row_groups() {
2665                assert_eq!(row_group.num_columns(), 1);
2666                let column = row_group.column(0);
2667
2668                let num_values = column.num_values() as usize;
2669                let src_slice = &src[row_offset..row_offset + num_values];
2670                row_offset += column.num_values() as usize;
2671
2672                let stats = column.statistics().unwrap();
2673                if let Statistics::Int64(stats) = stats {
2674                    assert_eq!(
2675                        *stats.min_opt().unwrap() as u64,
2676                        *src_slice.iter().min().unwrap()
2677                    );
2678                    assert_eq!(
2679                        *stats.max_opt().unwrap() as u64,
2680                        *src_slice.iter().max().unwrap()
2681                    );
2682                } else {
2683                    panic!("Statistics::Int64 missing")
2684                }
2685            }
2686        }
2687    }
2688
2689    #[test]
2690    fn statistics_null_counts_only_nulls() {
2691        // check that null-count statistics for "only NULL"-columns are correct
2692        let values = Arc::new(UInt64Array::from(vec![None, None]));
2693        let files = one_column_roundtrip(values, true);
2694
2695        for file in files {
2696            // check statistics are valid
2697            let reader = SerializedFileReader::new(file).unwrap();
2698            let metadata = reader.metadata();
2699            assert_eq!(metadata.num_row_groups(), 1);
2700            let row_group = metadata.row_group(0);
2701            assert_eq!(row_group.num_columns(), 1);
2702            let column = row_group.column(0);
2703            let stats = column.statistics().unwrap();
2704            assert_eq!(stats.null_count_opt(), Some(2));
2705        }
2706    }
2707
2708    #[test]
2709    fn test_list_of_struct_roundtrip() {
2710        // define schema
2711        let int_field = Field::new("a", DataType::Int32, true);
2712        let int_field2 = Field::new("b", DataType::Int32, true);
2713
2714        let int_builder = Int32Builder::with_capacity(10);
2715        let int_builder2 = Int32Builder::with_capacity(10);
2716
2717        let struct_builder = StructBuilder::new(
2718            vec![int_field, int_field2],
2719            vec![Box::new(int_builder), Box::new(int_builder2)],
2720        );
2721        let mut list_builder = ListBuilder::new(struct_builder);
2722
2723        // Construct the following array
2724        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
2725
2726        // [{a: 1, b: 2}]
2727        let values = list_builder.values();
2728        values
2729            .field_builder::<Int32Builder>(0)
2730            .unwrap()
2731            .append_value(1);
2732        values
2733            .field_builder::<Int32Builder>(1)
2734            .unwrap()
2735            .append_value(2);
2736        values.append(true);
2737        list_builder.append(true);
2738
2739        // []
2740        list_builder.append(true);
2741
2742        // null
2743        list_builder.append(false);
2744
2745        // [null, null]
2746        let values = list_builder.values();
2747        values
2748            .field_builder::<Int32Builder>(0)
2749            .unwrap()
2750            .append_null();
2751        values
2752            .field_builder::<Int32Builder>(1)
2753            .unwrap()
2754            .append_null();
2755        values.append(false);
2756        values
2757            .field_builder::<Int32Builder>(0)
2758            .unwrap()
2759            .append_null();
2760        values
2761            .field_builder::<Int32Builder>(1)
2762            .unwrap()
2763            .append_null();
2764        values.append(false);
2765        list_builder.append(true);
2766
2767        // [{a: null, b: 3}]
2768        let values = list_builder.values();
2769        values
2770            .field_builder::<Int32Builder>(0)
2771            .unwrap()
2772            .append_null();
2773        values
2774            .field_builder::<Int32Builder>(1)
2775            .unwrap()
2776            .append_value(3);
2777        values.append(true);
2778        list_builder.append(true);
2779
2780        // [{a: 2, b: null}]
2781        let values = list_builder.values();
2782        values
2783            .field_builder::<Int32Builder>(0)
2784            .unwrap()
2785            .append_value(2);
2786        values
2787            .field_builder::<Int32Builder>(1)
2788            .unwrap()
2789            .append_null();
2790        values.append(true);
2791        list_builder.append(true);
2792
2793        let array = Arc::new(list_builder.finish());
2794
2795        one_column_roundtrip(array, true);
2796    }
2797
2798    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
2799        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
2800    }
2801
2802    #[test]
2803    fn test_aggregates_records() {
2804        let arrays = [
2805            Int32Array::from((0..100).collect::<Vec<_>>()),
2806            Int32Array::from((0..50).collect::<Vec<_>>()),
2807            Int32Array::from((200..500).collect::<Vec<_>>()),
2808        ];
2809
2810        let schema = Arc::new(Schema::new(vec![Field::new(
2811            "int",
2812            ArrowDataType::Int32,
2813            false,
2814        )]));
2815
2816        let file = tempfile::tempfile().unwrap();
2817
2818        let props = WriterProperties::builder()
2819            .set_max_row_group_size(200)
2820            .build();
2821
2822        let mut writer =
2823            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
2824
2825        for array in arrays {
2826            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2827            writer.write(&batch).unwrap();
2828        }
2829
2830        writer.close().unwrap();
2831
2832        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2833        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
2834
2835        let batches = builder
2836            .with_batch_size(100)
2837            .build()
2838            .unwrap()
2839            .collect::<ArrowResult<Vec<_>>>()
2840            .unwrap();
2841
2842        assert_eq!(batches.len(), 5);
2843        assert!(batches.iter().all(|x| x.num_columns() == 1));
2844
2845        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
2846
2847        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
2848
2849        let values: Vec<_> = batches
2850            .iter()
2851            .flat_map(|x| {
2852                x.column(0)
2853                    .as_any()
2854                    .downcast_ref::<Int32Array>()
2855                    .unwrap()
2856                    .values()
2857                    .iter()
2858                    .cloned()
2859            })
2860            .collect();
2861
2862        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
2863        assert_eq!(&values, &expected_values)
2864    }
2865
2866    #[test]
2867    fn complex_aggregate() {
2868        // Tests aggregating nested data
2869        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
2870        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
2871        let struct_a = Arc::new(Field::new(
2872            "struct_a",
2873            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
2874            true,
2875        ));
2876
2877        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
2878        let struct_b = Arc::new(Field::new(
2879            "struct_b",
2880            DataType::Struct(vec![list_a.clone()].into()),
2881            false,
2882        ));
2883
2884        let schema = Arc::new(Schema::new(vec![struct_b]));
2885
2886        // create nested data
2887        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2888        let field_b_array =
2889            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
2890
2891        let struct_a_array = StructArray::from(vec![
2892            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
2893            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
2894        ]);
2895
2896        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
2897            .len(5)
2898            .add_buffer(Buffer::from_iter(vec![
2899                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
2900            ]))
2901            .null_bit_buffer(Some(Buffer::from_iter(vec![
2902                true, false, true, false, true,
2903            ])))
2904            .child_data(vec![struct_a_array.into_data()])
2905            .build()
2906            .unwrap();
2907
2908        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
2909        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
2910
2911        let batch1 =
2912            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
2913                .unwrap();
2914
2915        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
2916        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
2917
2918        let struct_a_array = StructArray::from(vec![
2919            (field_a, Arc::new(field_a_array) as ArrayRef),
2920            (field_b, Arc::new(field_b_array) as ArrayRef),
2921        ]);
2922
2923        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
2924            .len(2)
2925            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
2926            .child_data(vec![struct_a_array.into_data()])
2927            .build()
2928            .unwrap();
2929
2930        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
2931        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
2932
2933        let batch2 =
2934            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
2935                .unwrap();
2936
2937        let batches = &[batch1, batch2];
2938
2939        // Verify data is as expected
2940
2941        let expected = r#"
2942            +-------------------------------------------------------------------------------------------------------+
2943            | struct_b                                                                                              |
2944            +-------------------------------------------------------------------------------------------------------+
2945            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
2946            | {list: }                                                                                              |
2947            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
2948            | {list: }                                                                                              |
2949            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
2950            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
2951            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
2952            +-------------------------------------------------------------------------------------------------------+
2953        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
2954
2955        let actual = pretty_format_batches(batches).unwrap().to_string();
2956        assert_eq!(actual, expected);
2957
2958        // Write data
2959        let file = tempfile::tempfile().unwrap();
2960        let props = WriterProperties::builder()
2961            .set_max_row_group_size(6)
2962            .build();
2963
2964        let mut writer =
2965            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
2966
2967        for batch in batches {
2968            writer.write(batch).unwrap();
2969        }
2970        writer.close().unwrap();
2971
2972        // Read Data
2973        // Should have written entire first batch and first row of second to the first row group
2974        // leaving a single row in the second row group
2975
2976        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2977        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
2978
2979        let batches = builder
2980            .with_batch_size(2)
2981            .build()
2982            .unwrap()
2983            .collect::<ArrowResult<Vec<_>>>()
2984            .unwrap();
2985
2986        assert_eq!(batches.len(), 4);
2987        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
2988        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
2989
2990        let actual = pretty_format_batches(&batches).unwrap().to_string();
2991        assert_eq!(actual, expected);
2992    }
2993
2994    #[test]
2995    fn test_arrow_writer_metadata() {
2996        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
2997        let file_schema = batch_schema.clone().with_metadata(
2998            vec![("foo".to_string(), "bar".to_string())]
2999                .into_iter()
3000                .collect(),
3001        );
3002
3003        let batch = RecordBatch::try_new(
3004            Arc::new(batch_schema),
3005            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3006        )
3007        .unwrap();
3008
3009        let mut buf = Vec::with_capacity(1024);
3010        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3011        writer.write(&batch).unwrap();
3012        writer.close().unwrap();
3013    }
3014
3015    #[test]
3016    fn test_arrow_writer_nullable() {
3017        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3018        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3019        let file_schema = Arc::new(file_schema);
3020
3021        let batch = RecordBatch::try_new(
3022            Arc::new(batch_schema),
3023            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3024        )
3025        .unwrap();
3026
3027        let mut buf = Vec::with_capacity(1024);
3028        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3029        writer.write(&batch).unwrap();
3030        writer.close().unwrap();
3031
3032        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3033        let back = read.next().unwrap().unwrap();
3034        assert_eq!(back.schema(), file_schema);
3035        assert_ne!(back.schema(), batch.schema());
3036        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3037    }
3038
3039    #[test]
3040    fn in_progress_accounting() {
3041        // define schema
3042        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3043
3044        // create some data
3045        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3046
3047        // build a record batch
3048        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3049
3050        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3051
3052        // starts empty
3053        assert_eq!(writer.in_progress_size(), 0);
3054        assert_eq!(writer.in_progress_rows(), 0);
3055        assert_eq!(writer.memory_size(), 0);
3056        assert_eq!(writer.bytes_written(), 4); // Initial header
3057        writer.write(&batch).unwrap();
3058
3059        // updated on write
3060        let initial_size = writer.in_progress_size();
3061        assert!(initial_size > 0);
3062        assert_eq!(writer.in_progress_rows(), 5);
3063        let initial_memory = writer.memory_size();
3064        assert!(initial_memory > 0);
3065        // memory estimate is larger than estimated encoded size
3066        assert!(
3067            initial_size <= initial_memory,
3068            "{initial_size} <= {initial_memory}"
3069        );
3070
3071        // updated on second write
3072        writer.write(&batch).unwrap();
3073        assert!(writer.in_progress_size() > initial_size);
3074        assert_eq!(writer.in_progress_rows(), 10);
3075        assert!(writer.memory_size() > initial_memory);
3076        assert!(
3077            writer.in_progress_size() <= writer.memory_size(),
3078            "in_progress_size {} <= memory_size {}",
3079            writer.in_progress_size(),
3080            writer.memory_size()
3081        );
3082
3083        // in progress tracking is cleared, but the overall data written is updated
3084        let pre_flush_bytes_written = writer.bytes_written();
3085        writer.flush().unwrap();
3086        assert_eq!(writer.in_progress_size(), 0);
3087        assert_eq!(writer.memory_size(), 0);
3088        assert!(writer.bytes_written() > pre_flush_bytes_written);
3089
3090        writer.close().unwrap();
3091    }
3092
3093    #[test]
3094    fn test_writer_all_null() {
3095        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3096        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3097        let batch = RecordBatch::try_from_iter(vec![
3098            ("a", Arc::new(a) as ArrayRef),
3099            ("b", Arc::new(b) as ArrayRef),
3100        ])
3101        .unwrap();
3102
3103        let mut buf = Vec::with_capacity(1024);
3104        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3105        writer.write(&batch).unwrap();
3106        writer.close().unwrap();
3107
3108        let bytes = Bytes::from(buf);
3109        let options = ReadOptionsBuilder::new().with_page_index().build();
3110        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3111        let index = reader.metadata().offset_index().unwrap();
3112
3113        assert_eq!(index.len(), 1);
3114        assert_eq!(index[0].len(), 2); // 2 columns
3115        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
3116        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
3117    }
3118
3119    #[test]
3120    fn test_disabled_statistics_with_page() {
3121        let file_schema = Schema::new(vec![
3122            Field::new("a", DataType::Utf8, true),
3123            Field::new("b", DataType::Utf8, true),
3124        ]);
3125        let file_schema = Arc::new(file_schema);
3126
3127        let batch = RecordBatch::try_new(
3128            file_schema.clone(),
3129            vec![
3130                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3131                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3132            ],
3133        )
3134        .unwrap();
3135
3136        let props = WriterProperties::builder()
3137            .set_statistics_enabled(EnabledStatistics::None)
3138            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3139            .build();
3140
3141        let mut buf = Vec::with_capacity(1024);
3142        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3143        writer.write(&batch).unwrap();
3144
3145        let metadata = writer.close().unwrap();
3146        assert_eq!(metadata.row_groups.len(), 1);
3147        let row_group = &metadata.row_groups[0];
3148        assert_eq!(row_group.columns.len(), 2);
3149        // Column "a" has both offset and column index, as requested
3150        assert!(row_group.columns[0].offset_index_offset.is_some());
3151        assert!(row_group.columns[0].column_index_offset.is_some());
3152        // Column "b" should only have offset index
3153        assert!(row_group.columns[1].offset_index_offset.is_some());
3154        assert!(row_group.columns[1].column_index_offset.is_none());
3155
3156        let options = ReadOptionsBuilder::new().with_page_index().build();
3157        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3158
3159        let row_group = reader.get_row_group(0).unwrap();
3160        let a_col = row_group.metadata().column(0);
3161        let b_col = row_group.metadata().column(1);
3162
3163        // Column chunk of column "a" should have chunk level statistics
3164        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3165            let min = byte_array_stats.min_opt().unwrap();
3166            let max = byte_array_stats.max_opt().unwrap();
3167
3168            assert_eq!(min.as_bytes(), b"a");
3169            assert_eq!(max.as_bytes(), b"d");
3170        } else {
3171            panic!("expecting Statistics::ByteArray");
3172        }
3173
3174        // The column chunk for column "b" shouldn't have statistics
3175        assert!(b_col.statistics().is_none());
3176
3177        let offset_index = reader.metadata().offset_index().unwrap();
3178        assert_eq!(offset_index.len(), 1); // 1 row group
3179        assert_eq!(offset_index[0].len(), 2); // 2 columns
3180
3181        let column_index = reader.metadata().column_index().unwrap();
3182        assert_eq!(column_index.len(), 1); // 1 row group
3183        assert_eq!(column_index[0].len(), 2); // 2 columns
3184
3185        let a_idx = &column_index[0][0];
3186        assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
3187        let b_idx = &column_index[0][1];
3188        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3189    }
3190
3191    #[test]
3192    fn test_disabled_statistics_with_chunk() {
3193        let file_schema = Schema::new(vec![
3194            Field::new("a", DataType::Utf8, true),
3195            Field::new("b", DataType::Utf8, true),
3196        ]);
3197        let file_schema = Arc::new(file_schema);
3198
3199        let batch = RecordBatch::try_new(
3200            file_schema.clone(),
3201            vec![
3202                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3203                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3204            ],
3205        )
3206        .unwrap();
3207
3208        let props = WriterProperties::builder()
3209            .set_statistics_enabled(EnabledStatistics::None)
3210            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
3211            .build();
3212
3213        let mut buf = Vec::with_capacity(1024);
3214        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3215        writer.write(&batch).unwrap();
3216
3217        let metadata = writer.close().unwrap();
3218        assert_eq!(metadata.row_groups.len(), 1);
3219        let row_group = &metadata.row_groups[0];
3220        assert_eq!(row_group.columns.len(), 2);
3221        // Column "a" should only have offset index
3222        assert!(row_group.columns[0].offset_index_offset.is_some());
3223        assert!(row_group.columns[0].column_index_offset.is_none());
3224        // Column "b" should only have offset index
3225        assert!(row_group.columns[1].offset_index_offset.is_some());
3226        assert!(row_group.columns[1].column_index_offset.is_none());
3227
3228        let options = ReadOptionsBuilder::new().with_page_index().build();
3229        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3230
3231        let row_group = reader.get_row_group(0).unwrap();
3232        let a_col = row_group.metadata().column(0);
3233        let b_col = row_group.metadata().column(1);
3234
3235        // Column chunk of column "a" should have chunk level statistics
3236        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3237            let min = byte_array_stats.min_opt().unwrap();
3238            let max = byte_array_stats.max_opt().unwrap();
3239
3240            assert_eq!(min.as_bytes(), b"a");
3241            assert_eq!(max.as_bytes(), b"d");
3242        } else {
3243            panic!("expecting Statistics::ByteArray");
3244        }
3245
3246        // The column chunk for column "b"  shouldn't have statistics
3247        assert!(b_col.statistics().is_none());
3248
3249        let column_index = reader.metadata().column_index().unwrap();
3250        assert_eq!(column_index.len(), 1); // 1 row group
3251        assert_eq!(column_index[0].len(), 2); // 2 columns
3252
3253        let a_idx = &column_index[0][0];
3254        assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
3255        let b_idx = &column_index[0][1];
3256        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3257    }
3258
3259    #[test]
3260    fn test_arrow_writer_skip_metadata() {
3261        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3262        let file_schema = Arc::new(batch_schema.clone());
3263
3264        let batch = RecordBatch::try_new(
3265            Arc::new(batch_schema),
3266            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3267        )
3268        .unwrap();
3269        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
3270
3271        let mut buf = Vec::with_capacity(1024);
3272        let mut writer =
3273            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
3274        writer.write(&batch).unwrap();
3275        writer.close().unwrap();
3276
3277        let bytes = Bytes::from(buf);
3278        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3279        assert_eq!(file_schema, *reader_builder.schema());
3280        if let Some(key_value_metadata) = reader_builder
3281            .metadata()
3282            .file_metadata()
3283            .key_value_metadata()
3284        {
3285            assert!(!key_value_metadata
3286                .iter()
3287                .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
3288        }
3289    }
3290
3291    #[test]
3292    fn mismatched_schemas() {
3293        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
3294        let file_schema = Arc::new(Schema::new(vec![Field::new(
3295            "temperature",
3296            DataType::Float64,
3297            false,
3298        )]));
3299
3300        let batch = RecordBatch::try_new(
3301            Arc::new(batch_schema),
3302            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3303        )
3304        .unwrap();
3305
3306        let mut buf = Vec::with_capacity(1024);
3307        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3308
3309        let err = writer.write(&batch).unwrap_err().to_string();
3310        assert_eq!(
3311            err,
3312            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
3313        );
3314    }
3315}