1use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{
34 add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
35 arrow_to_parquet_schema_with_root, decimal_length_from_precision,
36};
37
38use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
39use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
40use crate::column::writer::encoder::ColumnValueEncoder;
41use crate::column::writer::{
42 get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
43};
44use crate::data_type::{ByteArray, FixedLenByteArray};
45use crate::errors::{ParquetError, Result};
46use crate::file::metadata::{KeyValue, RowGroupMetaData};
47use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
48use crate::file::reader::{ChunkReader, Length};
49use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
50use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
51use crate::thrift::TSerializable;
52use levels::{calculate_array_levels, ArrayLevels};
53
54mod byte_array;
55mod levels;
56
57pub struct ArrowWriter<W: Write> {
130 writer: SerializedFileWriter<W>,
132
133 in_progress: Option<ArrowRowGroupWriter>,
135
136 arrow_schema: SchemaRef,
140
141 max_row_group_size: usize,
143}
144
145impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 let buffered_memory = self.in_progress_size();
148 f.debug_struct("ArrowWriter")
149 .field("writer", &self.writer)
150 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
151 .field("in_progress_rows", &self.in_progress_rows())
152 .field("arrow_schema", &self.arrow_schema)
153 .field("max_row_group_size", &self.max_row_group_size)
154 .finish()
155 }
156}
157
158impl<W: Write + Send> ArrowWriter<W> {
159 pub fn try_new(
165 writer: W,
166 arrow_schema: SchemaRef,
167 props: Option<WriterProperties>,
168 ) -> Result<Self> {
169 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
170 Self::try_new_with_options(writer, arrow_schema, options)
171 }
172
173 pub fn try_new_with_options(
179 writer: W,
180 arrow_schema: SchemaRef,
181 options: ArrowWriterOptions,
182 ) -> Result<Self> {
183 let schema = match options.schema_root {
184 Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s)?,
185 None => arrow_to_parquet_schema(&arrow_schema)?,
186 };
187 let mut props = options.properties;
188 if !options.skip_arrow_metadata {
189 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
191 }
192
193 let max_row_group_size = props.max_row_group_size();
194
195 let file_writer =
196 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
197
198 Ok(Self {
199 writer: file_writer,
200 in_progress: None,
201 arrow_schema,
202 max_row_group_size,
203 })
204 }
205
206 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
208 self.writer.flushed_row_groups()
209 }
210
211 pub fn memory_size(&self) -> usize {
216 match &self.in_progress {
217 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
218 None => 0,
219 }
220 }
221
222 pub fn in_progress_size(&self) -> usize {
229 match &self.in_progress {
230 Some(in_progress) => in_progress
231 .writers
232 .iter()
233 .map(|x| x.get_estimated_total_bytes())
234 .sum(),
235 None => 0,
236 }
237 }
238
239 pub fn in_progress_rows(&self) -> usize {
241 self.in_progress
242 .as_ref()
243 .map(|x| x.buffered_rows)
244 .unwrap_or_default()
245 }
246
247 pub fn bytes_written(&self) -> usize {
249 self.writer.bytes_written()
250 }
251
252 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
260 if batch.num_rows() == 0 {
261 return Ok(());
262 }
263
264 let in_progress = match &mut self.in_progress {
265 Some(in_progress) => in_progress,
266 x => x.insert(ArrowRowGroupWriter::new(
267 self.writer.schema_descr(),
268 self.writer.properties(),
269 &self.arrow_schema,
270 )?),
271 };
272
273 if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
275 let to_write = self.max_row_group_size - in_progress.buffered_rows;
276 let a = batch.slice(0, to_write);
277 let b = batch.slice(to_write, batch.num_rows() - to_write);
278 self.write(&a)?;
279 return self.write(&b);
280 }
281
282 in_progress.write(batch)?;
283
284 if in_progress.buffered_rows >= self.max_row_group_size {
285 self.flush()?
286 }
287 Ok(())
288 }
289
290 pub fn flush(&mut self) -> Result<()> {
292 let in_progress = match self.in_progress.take() {
293 Some(in_progress) => in_progress,
294 None => return Ok(()),
295 };
296
297 let mut row_group_writer = self.writer.next_row_group()?;
298 for chunk in in_progress.close()? {
299 chunk.append_to_row_group(&mut row_group_writer)?;
300 }
301 row_group_writer.close()?;
302 Ok(())
303 }
304
305 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
309 self.writer.append_key_value_metadata(kv_metadata)
310 }
311
312 pub fn inner(&self) -> &W {
314 self.writer.inner()
315 }
316
317 pub fn inner_mut(&mut self) -> &mut W {
322 self.writer.inner_mut()
323 }
324
325 pub fn into_inner(mut self) -> Result<W> {
327 self.flush()?;
328 self.writer.into_inner()
329 }
330
331 pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
337 self.flush()?;
338 self.writer.finish()
339 }
340
341 pub fn close(mut self) -> Result<crate::format::FileMetaData> {
343 self.finish()
344 }
345}
346
347impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
348 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
349 self.write(batch).map_err(|e| e.into())
350 }
351
352 fn close(self) -> std::result::Result<(), ArrowError> {
353 self.close()?;
354 Ok(())
355 }
356}
357
358#[derive(Debug, Clone, Default)]
362pub struct ArrowWriterOptions {
363 properties: WriterProperties,
364 skip_arrow_metadata: bool,
365 schema_root: Option<String>,
366}
367
368impl ArrowWriterOptions {
369 pub fn new() -> Self {
371 Self::default()
372 }
373
374 pub fn with_properties(self, properties: WriterProperties) -> Self {
376 Self { properties, ..self }
377 }
378
379 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
386 Self {
387 skip_arrow_metadata,
388 ..self
389 }
390 }
391
392 pub fn with_schema_root(self, name: String) -> Self {
394 Self {
395 schema_root: Some(name),
396 ..self
397 }
398 }
399}
400
401#[derive(Default)]
403struct ArrowColumnChunkData {
404 length: usize,
405 data: Vec<Bytes>,
406}
407
408impl Length for ArrowColumnChunkData {
409 fn len(&self) -> u64 {
410 self.length as _
411 }
412}
413
414impl ChunkReader for ArrowColumnChunkData {
415 type T = ArrowColumnChunkReader;
416
417 fn get_read(&self, start: u64) -> Result<Self::T> {
418 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
420 self.data.clone().into_iter().peekable(),
421 ))
422 }
423
424 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
425 unimplemented!()
426 }
427}
428
429struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
431
432impl Read for ArrowColumnChunkReader {
433 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
434 let buffer = loop {
435 match self.0.peek_mut() {
436 Some(b) if b.is_empty() => {
437 self.0.next();
438 continue;
439 }
440 Some(b) => break b,
441 None => return Ok(0),
442 }
443 };
444
445 let len = buffer.len().min(out.len());
446 let b = buffer.split_to(len);
447 out[..len].copy_from_slice(&b);
448 Ok(len)
449 }
450}
451
452type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
457
458#[derive(Default)]
459struct ArrowPageWriter {
460 buffer: SharedColumnChunk,
461}
462
463impl PageWriter for ArrowPageWriter {
464 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
465 let mut buf = self.buffer.try_lock().unwrap();
466 let page_header = page.to_thrift_header();
467 let header = {
468 let mut header = Vec::with_capacity(1024);
469 let mut protocol = TCompactOutputProtocol::new(&mut header);
470 page_header.write_to_out_protocol(&mut protocol)?;
471 Bytes::from(header)
472 };
473
474 let data = page.compressed_page().buffer().clone();
475 let compressed_size = data.len() + header.len();
476
477 let mut spec = PageWriteSpec::new();
478 spec.page_type = page.page_type();
479 spec.num_values = page.num_values();
480 spec.uncompressed_size = page.uncompressed_size() + header.len();
481 spec.offset = buf.length as u64;
482 spec.compressed_size = compressed_size;
483 spec.bytes_written = compressed_size as u64;
484
485 buf.length += compressed_size;
486 buf.data.push(header);
487 buf.data.push(data);
488
489 Ok(spec)
490 }
491
492 fn close(&mut self) -> Result<()> {
493 Ok(())
494 }
495}
496
497#[derive(Debug)]
499pub struct ArrowLeafColumn(ArrayLevels);
500
501pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
503 let levels = calculate_array_levels(array, field)?;
504 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
505}
506
507pub struct ArrowColumnChunk {
509 data: ArrowColumnChunkData,
510 close: ColumnCloseResult,
511}
512
513impl std::fmt::Debug for ArrowColumnChunk {
514 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
515 f.debug_struct("ArrowColumnChunk")
516 .field("length", &self.data.length)
517 .finish_non_exhaustive()
518 }
519}
520
521impl ArrowColumnChunk {
522 pub fn append_to_row_group<W: Write + Send>(
524 self,
525 writer: &mut SerializedRowGroupWriter<'_, W>,
526 ) -> Result<()> {
527 writer.append_column(&self.data, self.close)
528 }
529}
530
531pub struct ArrowColumnWriter {
608 writer: ArrowColumnWriterImpl,
609 chunk: SharedColumnChunk,
610}
611
612impl std::fmt::Debug for ArrowColumnWriter {
613 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
614 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
615 }
616}
617
618enum ArrowColumnWriterImpl {
619 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
620 Column(ColumnWriter<'static>),
621}
622
623impl ArrowColumnWriter {
624 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
626 match &mut self.writer {
627 ArrowColumnWriterImpl::Column(c) => {
628 write_leaf(c, &col.0)?;
629 }
630 ArrowColumnWriterImpl::ByteArray(c) => {
631 write_primitive(c, col.0.array().as_ref(), &col.0)?;
632 }
633 }
634 Ok(())
635 }
636
637 pub fn close(self) -> Result<ArrowColumnChunk> {
639 let close = match self.writer {
640 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
641 ArrowColumnWriterImpl::Column(c) => c.close()?,
642 };
643 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
644 let data = chunk.into_inner().unwrap();
645 Ok(ArrowColumnChunk { data, close })
646 }
647
648 pub fn memory_size(&self) -> usize {
659 match &self.writer {
660 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
661 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
662 }
663 }
664
665 pub fn get_estimated_total_bytes(&self) -> usize {
673 match &self.writer {
674 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
675 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
676 }
677 }
678}
679
680struct ArrowRowGroupWriter {
682 writers: Vec<ArrowColumnWriter>,
683 schema: SchemaRef,
684 buffered_rows: usize,
685}
686
687impl ArrowRowGroupWriter {
688 fn new(
689 parquet: &SchemaDescriptor,
690 props: &WriterPropertiesPtr,
691 arrow: &SchemaRef,
692 ) -> Result<Self> {
693 let writers = get_column_writers(parquet, props, arrow)?;
694 Ok(Self {
695 writers,
696 schema: arrow.clone(),
697 buffered_rows: 0,
698 })
699 }
700
701 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
702 self.buffered_rows += batch.num_rows();
703 let mut writers = self.writers.iter_mut();
704 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
705 for leaf in compute_leaves(field.as_ref(), column)? {
706 writers.next().unwrap().write(&leaf)?
707 }
708 }
709 Ok(())
710 }
711
712 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
713 self.writers
714 .into_iter()
715 .map(|writer| writer.close())
716 .collect()
717 }
718}
719
720pub fn get_column_writers(
722 parquet: &SchemaDescriptor,
723 props: &WriterPropertiesPtr,
724 arrow: &SchemaRef,
725) -> Result<Vec<ArrowColumnWriter>> {
726 let mut writers = Vec::with_capacity(arrow.fields.len());
727 let mut leaves = parquet.columns().iter();
728 for field in &arrow.fields {
729 get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut writers)?;
730 }
731 Ok(writers)
732}
733
734fn get_arrow_column_writer(
736 data_type: &ArrowDataType,
737 props: &WriterPropertiesPtr,
738 leaves: &mut Iter<'_, ColumnDescPtr>,
739 out: &mut Vec<ArrowColumnWriter>,
740) -> Result<()> {
741 let col = |desc: &ColumnDescPtr| {
742 let page_writer = Box::<ArrowPageWriter>::default();
743 let chunk = page_writer.buffer.clone();
744 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
745 ArrowColumnWriter {
746 chunk,
747 writer: ArrowColumnWriterImpl::Column(writer),
748 }
749 };
750
751 let bytes = |desc: &ColumnDescPtr| {
752 let page_writer = Box::<ArrowPageWriter>::default();
753 let chunk = page_writer.buffer.clone();
754 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
755 ArrowColumnWriter {
756 chunk,
757 writer: ArrowColumnWriterImpl::ByteArray(writer),
758 }
759 };
760
761 match data_type {
762 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())),
763 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())),
764 ArrowDataType::LargeBinary
765 | ArrowDataType::Binary
766 | ArrowDataType::Utf8
767 | ArrowDataType::LargeUtf8
768 | ArrowDataType::BinaryView
769 | ArrowDataType::Utf8View => {
770 out.push(bytes(leaves.next().unwrap()))
771 }
772 ArrowDataType::List(f)
773 | ArrowDataType::LargeList(f)
774 | ArrowDataType::FixedSizeList(f, _) => {
775 get_arrow_column_writer(f.data_type(), props, leaves, out)?
776 }
777 ArrowDataType::Struct(fields) => {
778 for field in fields {
779 get_arrow_column_writer(field.data_type(), props, leaves, out)?
780 }
781 }
782 ArrowDataType::Map(f, _) => match f.data_type() {
783 ArrowDataType::Struct(f) => {
784 get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
785 get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
786 }
787 _ => unreachable!("invalid map type"),
788 }
789 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
790 ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
791 out.push(bytes(leaves.next().unwrap()))
792 }
793 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
794 out.push(bytes(leaves.next().unwrap()))
795 }
796 _ => {
797 out.push(col(leaves.next().unwrap()))
798 }
799 }
800 _ => return Err(ParquetError::NYI(
801 format!(
802 "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
803 )
804 ))
805 }
806 Ok(())
807}
808
809fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
810 let column = levels.array().as_ref();
811 let indices = levels.non_null_indices();
812 match writer {
813 ColumnWriter::Int32ColumnWriter(ref mut typed) => {
814 match column.data_type() {
815 ArrowDataType::Date64 => {
816 let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
818 let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
819
820 let array = array.as_primitive::<Int32Type>();
821 write_primitive(typed, array.values(), levels)
822 }
823 ArrowDataType::UInt32 => {
824 let values = column.as_primitive::<UInt32Type>().values();
825 let array = values.inner().typed_data::<i32>();
828 write_primitive(typed, array, levels)
829 }
830 ArrowDataType::Decimal128(_, _) => {
831 let array = column
833 .as_primitive::<Decimal128Type>()
834 .unary::<_, Int32Type>(|v| v as i32);
835 write_primitive(typed, array.values(), levels)
836 }
837 ArrowDataType::Decimal256(_, _) => {
838 let array = column
840 .as_primitive::<Decimal256Type>()
841 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
842 write_primitive(typed, array.values(), levels)
843 }
844 _ => {
845 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
846 let array = array.as_primitive::<Int32Type>();
847 write_primitive(typed, array.values(), levels)
848 }
849 }
850 }
851 ColumnWriter::BoolColumnWriter(ref mut typed) => {
852 let array = column.as_boolean();
853 typed.write_batch(
854 get_bool_array_slice(array, indices).as_slice(),
855 levels.def_levels(),
856 levels.rep_levels(),
857 )
858 }
859 ColumnWriter::Int64ColumnWriter(ref mut typed) => {
860 match column.data_type() {
861 ArrowDataType::Int64 => {
862 let array = column.as_primitive::<Int64Type>();
863 write_primitive(typed, array.values(), levels)
864 }
865 ArrowDataType::UInt64 => {
866 let values = column.as_primitive::<UInt64Type>().values();
867 let array = values.inner().typed_data::<i64>();
870 write_primitive(typed, array, levels)
871 }
872 ArrowDataType::Decimal128(_, _) => {
873 let array = column
875 .as_primitive::<Decimal128Type>()
876 .unary::<_, Int64Type>(|v| v as i64);
877 write_primitive(typed, array.values(), levels)
878 }
879 ArrowDataType::Decimal256(_, _) => {
880 let array = column
882 .as_primitive::<Decimal256Type>()
883 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
884 write_primitive(typed, array.values(), levels)
885 }
886 _ => {
887 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
888 let array = array.as_primitive::<Int64Type>();
889 write_primitive(typed, array.values(), levels)
890 }
891 }
892 }
893 ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
894 unreachable!("Currently unreachable because data type not supported")
895 }
896 ColumnWriter::FloatColumnWriter(ref mut typed) => {
897 let array = column.as_primitive::<Float32Type>();
898 write_primitive(typed, array.values(), levels)
899 }
900 ColumnWriter::DoubleColumnWriter(ref mut typed) => {
901 let array = column.as_primitive::<Float64Type>();
902 write_primitive(typed, array.values(), levels)
903 }
904 ColumnWriter::ByteArrayColumnWriter(_) => {
905 unreachable!("should use ByteArrayWriter")
906 }
907 ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
908 let bytes = match column.data_type() {
909 ArrowDataType::Interval(interval_unit) => match interval_unit {
910 IntervalUnit::YearMonth => {
911 let array = column
912 .as_any()
913 .downcast_ref::<arrow_array::IntervalYearMonthArray>()
914 .unwrap();
915 get_interval_ym_array_slice(array, indices)
916 }
917 IntervalUnit::DayTime => {
918 let array = column
919 .as_any()
920 .downcast_ref::<arrow_array::IntervalDayTimeArray>()
921 .unwrap();
922 get_interval_dt_array_slice(array, indices)
923 }
924 _ => {
925 return Err(ParquetError::NYI(
926 format!(
927 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
928 )
929 ));
930 }
931 },
932 ArrowDataType::FixedSizeBinary(_) => {
933 let array = column
934 .as_any()
935 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
936 .unwrap();
937 get_fsb_array_slice(array, indices)
938 }
939 ArrowDataType::Decimal128(_, _) => {
940 let array = column.as_primitive::<Decimal128Type>();
941 get_decimal_128_array_slice(array, indices)
942 }
943 ArrowDataType::Decimal256(_, _) => {
944 let array = column
945 .as_any()
946 .downcast_ref::<arrow_array::Decimal256Array>()
947 .unwrap();
948 get_decimal_256_array_slice(array, indices)
949 }
950 ArrowDataType::Float16 => {
951 let array = column.as_primitive::<Float16Type>();
952 get_float_16_array_slice(array, indices)
953 }
954 _ => {
955 return Err(ParquetError::NYI(
956 "Attempting to write an Arrow type that is not yet implemented".to_string(),
957 ));
958 }
959 };
960 typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
961 }
962 }
963}
964
965fn write_primitive<E: ColumnValueEncoder>(
966 writer: &mut GenericColumnWriter<E>,
967 values: &E::Values,
968 levels: &ArrayLevels,
969) -> Result<usize> {
970 writer.write_batch_internal(
971 values,
972 Some(levels.non_null_indices()),
973 levels.def_levels(),
974 levels.rep_levels(),
975 None,
976 None,
977 None,
978 )
979}
980
981fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
982 let mut values = Vec::with_capacity(indices.len());
983 for i in indices {
984 values.push(array.value(*i))
985 }
986 values
987}
988
989fn get_interval_ym_array_slice(
992 array: &arrow_array::IntervalYearMonthArray,
993 indices: &[usize],
994) -> Vec<FixedLenByteArray> {
995 let mut values = Vec::with_capacity(indices.len());
996 for i in indices {
997 let mut value = array.value(*i).to_le_bytes().to_vec();
998 let mut suffix = vec![0; 8];
999 value.append(&mut suffix);
1000 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1001 }
1002 values
1003}
1004
1005fn get_interval_dt_array_slice(
1008 array: &arrow_array::IntervalDayTimeArray,
1009 indices: &[usize],
1010) -> Vec<FixedLenByteArray> {
1011 let mut values = Vec::with_capacity(indices.len());
1012 for i in indices {
1013 let mut out = [0; 12];
1014 let value = array.value(*i);
1015 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1016 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1017 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1018 }
1019 values
1020}
1021
1022fn get_decimal_128_array_slice(
1023 array: &arrow_array::Decimal128Array,
1024 indices: &[usize],
1025) -> Vec<FixedLenByteArray> {
1026 let mut values = Vec::with_capacity(indices.len());
1027 let size = decimal_length_from_precision(array.precision());
1028 for i in indices {
1029 let as_be_bytes = array.value(*i).to_be_bytes();
1030 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1031 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1032 }
1033 values
1034}
1035
1036fn get_decimal_256_array_slice(
1037 array: &arrow_array::Decimal256Array,
1038 indices: &[usize],
1039) -> Vec<FixedLenByteArray> {
1040 let mut values = Vec::with_capacity(indices.len());
1041 let size = decimal_length_from_precision(array.precision());
1042 for i in indices {
1043 let as_be_bytes = array.value(*i).to_be_bytes();
1044 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1045 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1046 }
1047 values
1048}
1049
1050fn get_float_16_array_slice(
1051 array: &arrow_array::Float16Array,
1052 indices: &[usize],
1053) -> Vec<FixedLenByteArray> {
1054 let mut values = Vec::with_capacity(indices.len());
1055 for i in indices {
1056 let value = array.value(*i).to_le_bytes().to_vec();
1057 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1058 }
1059 values
1060}
1061
1062fn get_fsb_array_slice(
1063 array: &arrow_array::FixedSizeBinaryArray,
1064 indices: &[usize],
1065) -> Vec<FixedLenByteArray> {
1066 let mut values = Vec::with_capacity(indices.len());
1067 for i in indices {
1068 let value = array.value(*i).to_vec();
1069 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1070 }
1071 values
1072}
1073
1074#[cfg(test)]
1075mod tests {
1076 use super::*;
1077
1078 use std::fs::File;
1079
1080 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1081 use crate::arrow::ARROW_SCHEMA_META_KEY;
1082 use arrow::datatypes::ToByteSlice;
1083 use arrow::datatypes::{DataType, Schema};
1084 use arrow::error::Result as ArrowResult;
1085 use arrow::util::pretty::pretty_format_batches;
1086 use arrow::{array::*, buffer::Buffer};
1087 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1088 use arrow_schema::Fields;
1089
1090 use crate::basic::Encoding;
1091 use crate::data_type::AsBytes;
1092 use crate::file::metadata::ParquetMetaData;
1093 use crate::file::page_index::index::Index;
1094 use crate::file::page_index::index_reader::read_offset_indexes;
1095 use crate::file::properties::{
1096 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1097 };
1098 use crate::file::serialized_reader::ReadOptionsBuilder;
1099 use crate::file::{
1100 reader::{FileReader, SerializedFileReader},
1101 statistics::Statistics,
1102 };
1103
1104 #[test]
1105 fn arrow_writer() {
1106 let schema = Schema::new(vec![
1108 Field::new("a", DataType::Int32, false),
1109 Field::new("b", DataType::Int32, true),
1110 ]);
1111
1112 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1114 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1115
1116 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1118
1119 roundtrip(batch, Some(SMALL_SIZE / 2));
1120 }
1121
1122 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1123 let mut buffer = vec![];
1124
1125 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1126 writer.write(expected_batch).unwrap();
1127 writer.close().unwrap();
1128
1129 buffer
1130 }
1131
1132 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1133 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1134 writer.write(expected_batch).unwrap();
1135 writer.into_inner().unwrap()
1136 }
1137
1138 #[test]
1139 fn roundtrip_bytes() {
1140 let schema = Arc::new(Schema::new(vec![
1142 Field::new("a", DataType::Int32, false),
1143 Field::new("b", DataType::Int32, true),
1144 ]));
1145
1146 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1148 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1149
1150 let expected_batch =
1152 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1153
1154 for buffer in [
1155 get_bytes_after_close(schema.clone(), &expected_batch),
1156 get_bytes_by_into_inner(schema, &expected_batch),
1157 ] {
1158 let cursor = Bytes::from(buffer);
1159 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1160
1161 let actual_batch = record_batch_reader
1162 .next()
1163 .expect("No batch found")
1164 .expect("Unable to get batch");
1165
1166 assert_eq!(expected_batch.schema(), actual_batch.schema());
1167 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1168 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1169 for i in 0..expected_batch.num_columns() {
1170 let expected_data = expected_batch.column(i).to_data();
1171 let actual_data = actual_batch.column(i).to_data();
1172
1173 assert_eq!(expected_data, actual_data);
1174 }
1175 }
1176 }
1177
1178 #[test]
1179 fn arrow_writer_non_null() {
1180 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1182
1183 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1185
1186 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1188
1189 roundtrip(batch, Some(SMALL_SIZE / 2));
1190 }
1191
1192 #[test]
1193 fn arrow_writer_list() {
1194 let schema = Schema::new(vec![Field::new(
1196 "a",
1197 DataType::List(Arc::new(Field::new("item", DataType::Int32, false))),
1198 true,
1199 )]);
1200
1201 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1203
1204 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1207
1208 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
1210 "item",
1211 DataType::Int32,
1212 false,
1213 ))))
1214 .len(5)
1215 .add_buffer(a_value_offsets)
1216 .add_child_data(a_values.into_data())
1217 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1218 .build()
1219 .unwrap();
1220 let a = ListArray::from(a_list_data);
1221
1222 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1224
1225 assert_eq!(batch.column(0).null_count(), 1);
1226
1227 roundtrip(batch, None);
1230 }
1231
1232 #[test]
1233 fn arrow_writer_list_non_null() {
1234 let schema = Schema::new(vec![Field::new(
1236 "a",
1237 DataType::List(Arc::new(Field::new("item", DataType::Int32, false))),
1238 false,
1239 )]);
1240
1241 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1243
1244 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1247
1248 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
1250 "item",
1251 DataType::Int32,
1252 false,
1253 ))))
1254 .len(5)
1255 .add_buffer(a_value_offsets)
1256 .add_child_data(a_values.into_data())
1257 .build()
1258 .unwrap();
1259 let a = ListArray::from(a_list_data);
1260
1261 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1263
1264 assert_eq!(batch.column(0).null_count(), 0);
1267
1268 roundtrip(batch, None);
1269 }
1270
1271 #[test]
1272 fn arrow_writer_binary() {
1273 let string_field = Field::new("a", DataType::Utf8, false);
1274 let binary_field = Field::new("b", DataType::Binary, false);
1275 let schema = Schema::new(vec![string_field, binary_field]);
1276
1277 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1278 let raw_binary_values = [
1279 b"foo".to_vec(),
1280 b"bar".to_vec(),
1281 b"baz".to_vec(),
1282 b"quux".to_vec(),
1283 ];
1284 let raw_binary_value_refs = raw_binary_values
1285 .iter()
1286 .map(|x| x.as_slice())
1287 .collect::<Vec<_>>();
1288
1289 let string_values = StringArray::from(raw_string_values.clone());
1290 let binary_values = BinaryArray::from(raw_binary_value_refs);
1291 let batch = RecordBatch::try_new(
1292 Arc::new(schema),
1293 vec![Arc::new(string_values), Arc::new(binary_values)],
1294 )
1295 .unwrap();
1296
1297 roundtrip(batch, Some(SMALL_SIZE / 2));
1298 }
1299
1300 #[test]
1301 fn arrow_writer_binary_view() {
1302 let string_field = Field::new("a", DataType::Utf8View, false);
1303 let binary_field = Field::new("b", DataType::BinaryView, false);
1304 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1305 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1306
1307 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1308 let raw_binary_values = vec![
1309 b"foo".to_vec(),
1310 b"bar".to_vec(),
1311 b"large payload over 12 bytes".to_vec(),
1312 b"lulu".to_vec(),
1313 ];
1314 let nullable_string_values =
1315 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1316
1317 let string_view_values = StringViewArray::from(raw_string_values);
1318 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1319 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1320 let batch = RecordBatch::try_new(
1321 Arc::new(schema),
1322 vec![
1323 Arc::new(string_view_values),
1324 Arc::new(binary_view_values),
1325 Arc::new(nullable_string_view_values),
1326 ],
1327 )
1328 .unwrap();
1329
1330 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1331 roundtrip(batch, None);
1332 }
1333
1334 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1335 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1336 let schema = Schema::new(vec![decimal_field]);
1337
1338 let decimal_values = vec![10_000, 50_000, 0, -100]
1339 .into_iter()
1340 .map(Some)
1341 .collect::<Decimal128Array>()
1342 .with_precision_and_scale(precision, scale)
1343 .unwrap();
1344
1345 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1346 }
1347
1348 #[test]
1349 fn arrow_writer_decimal() {
1350 let batch_int32_decimal = get_decimal_batch(5, 2);
1352 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1353 let batch_int64_decimal = get_decimal_batch(12, 2);
1355 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1356 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1358 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1359 }
1360
1361 #[test]
1362 fn arrow_writer_complex() {
1363 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1365 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1366 let struct_field_g = Arc::new(Field::new_list(
1367 "g",
1368 Field::new("item", DataType::Int16, true),
1369 false,
1370 ));
1371 let struct_field_h = Arc::new(Field::new_list(
1372 "h",
1373 Field::new("item", DataType::Int16, false),
1374 true,
1375 ));
1376 let struct_field_e = Arc::new(Field::new_struct(
1377 "e",
1378 vec![
1379 struct_field_f.clone(),
1380 struct_field_g.clone(),
1381 struct_field_h.clone(),
1382 ],
1383 false,
1384 ));
1385 let schema = Schema::new(vec![
1386 Field::new("a", DataType::Int32, false),
1387 Field::new("b", DataType::Int32, true),
1388 Field::new_struct(
1389 "c",
1390 vec![struct_field_d.clone(), struct_field_e.clone()],
1391 false,
1392 ),
1393 ]);
1394
1395 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1397 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1398 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1399 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1400
1401 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1402
1403 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1406
1407 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1409 .len(5)
1410 .add_buffer(g_value_offsets.clone())
1411 .add_child_data(g_value.to_data())
1412 .build()
1413 .unwrap();
1414 let g = ListArray::from(g_list_data);
1415 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1417 .len(5)
1418 .add_buffer(g_value_offsets)
1419 .add_child_data(g_value.to_data())
1420 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1421 .build()
1422 .unwrap();
1423 let h = ListArray::from(h_list_data);
1424
1425 let e = StructArray::from(vec![
1426 (struct_field_f, Arc::new(f) as ArrayRef),
1427 (struct_field_g, Arc::new(g) as ArrayRef),
1428 (struct_field_h, Arc::new(h) as ArrayRef),
1429 ]);
1430
1431 let c = StructArray::from(vec![
1432 (struct_field_d, Arc::new(d) as ArrayRef),
1433 (struct_field_e, Arc::new(e) as ArrayRef),
1434 ]);
1435
1436 let batch = RecordBatch::try_new(
1438 Arc::new(schema),
1439 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1440 )
1441 .unwrap();
1442
1443 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1444 roundtrip(batch, Some(SMALL_SIZE / 3));
1445 }
1446
1447 #[test]
1448 fn arrow_writer_complex_mixed() {
1449 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1454 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1455 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1456 let schema = Schema::new(vec![Field::new(
1457 "some_nested_object",
1458 DataType::Struct(Fields::from(vec![
1459 offset_field.clone(),
1460 partition_field.clone(),
1461 topic_field.clone(),
1462 ])),
1463 false,
1464 )]);
1465
1466 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1468 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1469 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1470
1471 let some_nested_object = StructArray::from(vec![
1472 (offset_field, Arc::new(offset) as ArrayRef),
1473 (partition_field, Arc::new(partition) as ArrayRef),
1474 (topic_field, Arc::new(topic) as ArrayRef),
1475 ]);
1476
1477 let batch =
1479 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1480
1481 roundtrip(batch, Some(SMALL_SIZE / 2));
1482 }
1483
1484 #[test]
1485 fn arrow_writer_map() {
1486 let json_content = r#"
1488 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1489 {"stocks":{"long": null, "long": "$CCC", "short": null}}
1490 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1491 "#;
1492 let entries_struct_type = DataType::Struct(Fields::from(vec![
1493 Field::new("key", DataType::Utf8, false),
1494 Field::new("value", DataType::Utf8, true),
1495 ]));
1496 let stocks_field = Field::new(
1497 "stocks",
1498 DataType::Map(
1499 Arc::new(Field::new("entries", entries_struct_type, false)),
1500 false,
1501 ),
1502 true,
1503 );
1504 let schema = Arc::new(Schema::new(vec![stocks_field]));
1505 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1506 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1507
1508 let batch = reader.next().unwrap().unwrap();
1509 roundtrip(batch, None);
1510 }
1511
1512 #[test]
1513 fn arrow_writer_2_level_struct() {
1514 let field_c = Field::new("c", DataType::Int32, true);
1516 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1517 let type_a = DataType::Struct(vec![field_b.clone()].into());
1518 let field_a = Field::new("a", type_a, true);
1519 let schema = Schema::new(vec![field_a.clone()]);
1520
1521 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1523 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1524 .len(6)
1525 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1526 .add_child_data(c.into_data())
1527 .build()
1528 .unwrap();
1529 let b = StructArray::from(b_data);
1530 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1531 .len(6)
1532 .null_bit_buffer(Some(Buffer::from([0b00101111])))
1533 .add_child_data(b.into_data())
1534 .build()
1535 .unwrap();
1536 let a = StructArray::from(a_data);
1537
1538 assert_eq!(a.null_count(), 1);
1539 assert_eq!(a.column(0).null_count(), 2);
1540
1541 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1543
1544 roundtrip(batch, Some(SMALL_SIZE / 2));
1545 }
1546
1547 #[test]
1548 fn arrow_writer_2_level_struct_non_null() {
1549 let field_c = Field::new("c", DataType::Int32, false);
1551 let type_b = DataType::Struct(vec![field_c].into());
1552 let field_b = Field::new("b", type_b.clone(), false);
1553 let type_a = DataType::Struct(vec![field_b].into());
1554 let field_a = Field::new("a", type_a.clone(), false);
1555 let schema = Schema::new(vec![field_a]);
1556
1557 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1559 let b_data = ArrayDataBuilder::new(type_b)
1560 .len(6)
1561 .add_child_data(c.into_data())
1562 .build()
1563 .unwrap();
1564 let b = StructArray::from(b_data);
1565 let a_data = ArrayDataBuilder::new(type_a)
1566 .len(6)
1567 .add_child_data(b.into_data())
1568 .build()
1569 .unwrap();
1570 let a = StructArray::from(a_data);
1571
1572 assert_eq!(a.null_count(), 0);
1573 assert_eq!(a.column(0).null_count(), 0);
1574
1575 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1577
1578 roundtrip(batch, Some(SMALL_SIZE / 2));
1579 }
1580
1581 #[test]
1582 fn arrow_writer_2_level_struct_mixed_null() {
1583 let field_c = Field::new("c", DataType::Int32, false);
1585 let type_b = DataType::Struct(vec![field_c].into());
1586 let field_b = Field::new("b", type_b.clone(), true);
1587 let type_a = DataType::Struct(vec![field_b].into());
1588 let field_a = Field::new("a", type_a.clone(), false);
1589 let schema = Schema::new(vec![field_a]);
1590
1591 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1593 let b_data = ArrayDataBuilder::new(type_b)
1594 .len(6)
1595 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1596 .add_child_data(c.into_data())
1597 .build()
1598 .unwrap();
1599 let b = StructArray::from(b_data);
1600 let a_data = ArrayDataBuilder::new(type_a)
1602 .len(6)
1603 .add_child_data(b.into_data())
1604 .build()
1605 .unwrap();
1606 let a = StructArray::from(a_data);
1607
1608 assert_eq!(a.null_count(), 0);
1609 assert_eq!(a.column(0).null_count(), 2);
1610
1611 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1613
1614 roundtrip(batch, Some(SMALL_SIZE / 2));
1615 }
1616
1617 #[test]
1618 fn arrow_writer_2_level_struct_mixed_null_2() {
1619 let field_c = Field::new("c", DataType::Int32, false);
1621 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
1622 let field_e = Field::new(
1623 "e",
1624 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1625 false,
1626 );
1627
1628 let field_b = Field::new(
1629 "b",
1630 DataType::Struct(vec![field_c, field_d, field_e].into()),
1631 false,
1632 );
1633 let type_a = DataType::Struct(vec![field_b.clone()].into());
1634 let field_a = Field::new("a", type_a, true);
1635 let schema = Schema::new(vec![field_a.clone()]);
1636
1637 let c = Int32Array::from_iter_values(0..6);
1639 let d = FixedSizeBinaryArray::try_from_iter(
1640 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
1641 )
1642 .expect("four byte values");
1643 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
1644 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1645 .len(6)
1646 .add_child_data(c.into_data())
1647 .add_child_data(d.into_data())
1648 .add_child_data(e.into_data())
1649 .build()
1650 .unwrap();
1651 let b = StructArray::from(b_data);
1652 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1653 .len(6)
1654 .null_bit_buffer(Some(Buffer::from([0b00100101])))
1655 .add_child_data(b.into_data())
1656 .build()
1657 .unwrap();
1658 let a = StructArray::from(a_data);
1659
1660 assert_eq!(a.null_count(), 3);
1661 assert_eq!(a.column(0).null_count(), 0);
1662
1663 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1665
1666 roundtrip(batch, Some(SMALL_SIZE / 2));
1667 }
1668
1669 #[test]
1670 fn test_empty_dict() {
1671 let struct_fields = Fields::from(vec![Field::new(
1672 "dict",
1673 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1674 false,
1675 )]);
1676
1677 let schema = Schema::new(vec![Field::new_struct(
1678 "struct",
1679 struct_fields.clone(),
1680 true,
1681 )]);
1682 let dictionary = Arc::new(DictionaryArray::new(
1683 Int32Array::new_null(5),
1684 Arc::new(StringArray::new_null(0)),
1685 ));
1686
1687 let s = StructArray::new(
1688 struct_fields,
1689 vec![dictionary],
1690 Some(NullBuffer::new_null(5)),
1691 );
1692
1693 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
1694 roundtrip(batch, None);
1695 }
1696 #[test]
1697 fn arrow_writer_page_size() {
1698 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
1699
1700 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
1701
1702 for i in 0..10 {
1704 let value = i
1705 .to_string()
1706 .repeat(10)
1707 .chars()
1708 .take(10)
1709 .collect::<String>();
1710
1711 builder.append_value(value);
1712 }
1713
1714 let array = Arc::new(builder.finish());
1715
1716 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1717
1718 let file = tempfile::tempfile().unwrap();
1719
1720 let props = WriterProperties::builder()
1722 .set_data_page_size_limit(1)
1723 .set_dictionary_page_size_limit(1)
1724 .set_write_batch_size(1)
1725 .build();
1726
1727 let mut writer =
1728 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
1729 .expect("Unable to write file");
1730 writer.write(&batch).unwrap();
1731 writer.close().unwrap();
1732
1733 let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
1734
1735 let column = reader.metadata().row_group(0).columns();
1736
1737 assert_eq!(column.len(), 1);
1738
1739 assert!(
1742 column[0].dictionary_page_offset().is_some(),
1743 "Expected a dictionary page"
1744 );
1745
1746 let offset_indexes = read_offset_indexes(&file, column).unwrap();
1747
1748 let page_locations = offset_indexes[0].page_locations.clone();
1749
1750 assert_eq!(
1753 page_locations.len(),
1754 10,
1755 "Expected 9 pages but got {page_locations:#?}"
1756 );
1757 }
1758
1759 const SMALL_SIZE: usize = 7;
1760 const MEDIUM_SIZE: usize = 63;
1761
1762 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
1763 let mut files = vec![];
1764 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
1765 let mut props = WriterProperties::builder().set_writer_version(version);
1766
1767 if let Some(size) = max_row_group_size {
1768 props = props.set_max_row_group_size(size)
1769 }
1770
1771 let props = props.build();
1772 files.push(roundtrip_opts(&expected_batch, props))
1773 }
1774 files
1775 }
1776
1777 fn roundtrip_opts_with_array_validation<F>(
1778 expected_batch: &RecordBatch,
1779 props: WriterProperties,
1780 validate: F,
1781 ) -> File
1782 where
1783 F: Fn(&ArrayData, &ArrayData),
1784 {
1785 let file = tempfile::tempfile().unwrap();
1786
1787 let mut writer = ArrowWriter::try_new(
1788 file.try_clone().unwrap(),
1789 expected_batch.schema(),
1790 Some(props),
1791 )
1792 .expect("Unable to write file");
1793 writer.write(expected_batch).unwrap();
1794 writer.close().unwrap();
1795
1796 let mut record_batch_reader =
1797 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
1798
1799 let actual_batch = record_batch_reader
1800 .next()
1801 .expect("No batch found")
1802 .expect("Unable to get batch");
1803
1804 assert_eq!(expected_batch.schema(), actual_batch.schema());
1805 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1806 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1807 for i in 0..expected_batch.num_columns() {
1808 let expected_data = expected_batch.column(i).to_data();
1809 let actual_data = actual_batch.column(i).to_data();
1810 validate(&expected_data, &actual_data);
1811 }
1812
1813 file
1814 }
1815
1816 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
1817 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
1818 a.validate_full().expect("valid expected data");
1819 b.validate_full().expect("valid actual data");
1820 assert_eq!(a, b)
1821 })
1822 }
1823
1824 struct RoundTripOptions {
1825 values: ArrayRef,
1826 schema: SchemaRef,
1827 bloom_filter: bool,
1828 bloom_filter_position: BloomFilterPosition,
1829 }
1830
1831 impl RoundTripOptions {
1832 fn new(values: ArrayRef, nullable: bool) -> Self {
1833 let data_type = values.data_type().clone();
1834 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
1835 Self {
1836 values,
1837 schema: Arc::new(schema),
1838 bloom_filter: false,
1839 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
1840 }
1841 }
1842 }
1843
1844 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
1845 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
1846 }
1847
1848 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
1849 let mut options = RoundTripOptions::new(values, false);
1850 options.schema = schema;
1851 one_column_roundtrip_with_options(options)
1852 }
1853
1854 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
1855 let RoundTripOptions {
1856 values,
1857 schema,
1858 bloom_filter,
1859 bloom_filter_position,
1860 } = options;
1861
1862 let encodings = match values.data_type() {
1863 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
1864 vec![
1865 Encoding::PLAIN,
1866 Encoding::DELTA_BYTE_ARRAY,
1867 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1868 ]
1869 }
1870 DataType::Int64
1871 | DataType::Int32
1872 | DataType::Int16
1873 | DataType::Int8
1874 | DataType::UInt64
1875 | DataType::UInt32
1876 | DataType::UInt16
1877 | DataType::UInt8 => vec![
1878 Encoding::PLAIN,
1879 Encoding::DELTA_BINARY_PACKED,
1880 Encoding::BYTE_STREAM_SPLIT,
1881 ],
1882 DataType::Float32 | DataType::Float64 => {
1883 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
1884 }
1885 _ => vec![Encoding::PLAIN],
1886 };
1887
1888 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
1889
1890 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
1891
1892 let mut files = vec![];
1893 for dictionary_size in [0, 1, 1024] {
1894 for encoding in &encodings {
1895 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
1896 for row_group_size in row_group_sizes {
1897 let props = WriterProperties::builder()
1898 .set_writer_version(version)
1899 .set_max_row_group_size(row_group_size)
1900 .set_dictionary_enabled(dictionary_size != 0)
1901 .set_dictionary_page_size_limit(dictionary_size.max(1))
1902 .set_encoding(*encoding)
1903 .set_bloom_filter_enabled(bloom_filter)
1904 .set_bloom_filter_position(bloom_filter_position)
1905 .build();
1906
1907 files.push(roundtrip_opts(&expected_batch, props))
1908 }
1909 }
1910 }
1911 }
1912 files
1913 }
1914
1915 fn values_required<A, I>(iter: I) -> Vec<File>
1916 where
1917 A: From<Vec<I::Item>> + Array + 'static,
1918 I: IntoIterator,
1919 {
1920 let raw_values: Vec<_> = iter.into_iter().collect();
1921 let values = Arc::new(A::from(raw_values));
1922 one_column_roundtrip(values, false)
1923 }
1924
1925 fn values_optional<A, I>(iter: I) -> Vec<File>
1926 where
1927 A: From<Vec<Option<I::Item>>> + Array + 'static,
1928 I: IntoIterator,
1929 {
1930 let optional_raw_values: Vec<_> = iter
1931 .into_iter()
1932 .enumerate()
1933 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
1934 .collect();
1935 let optional_values = Arc::new(A::from(optional_raw_values));
1936 one_column_roundtrip(optional_values, true)
1937 }
1938
1939 fn required_and_optional<A, I>(iter: I)
1940 where
1941 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
1942 I: IntoIterator + Clone,
1943 {
1944 values_required::<A, I>(iter.clone());
1945 values_optional::<A, I>(iter);
1946 }
1947
1948 fn check_bloom_filter<T: AsBytes>(
1949 files: Vec<File>,
1950 file_column: String,
1951 positive_values: Vec<T>,
1952 negative_values: Vec<T>,
1953 ) {
1954 files.into_iter().take(1).for_each(|file| {
1955 let file_reader = SerializedFileReader::new_with_options(
1956 file,
1957 ReadOptionsBuilder::new()
1958 .with_reader_properties(
1959 ReaderProperties::builder()
1960 .set_read_bloom_filter(true)
1961 .build(),
1962 )
1963 .build(),
1964 )
1965 .expect("Unable to open file as Parquet");
1966 let metadata = file_reader.metadata();
1967
1968 let mut bloom_filters: Vec<_> = vec![];
1970 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
1971 if let Some((column_index, _)) = row_group
1972 .columns()
1973 .iter()
1974 .enumerate()
1975 .find(|(_, column)| column.column_path().string() == file_column)
1976 {
1977 let row_group_reader = file_reader
1978 .get_row_group(ri)
1979 .expect("Unable to read row group");
1980 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
1981 bloom_filters.push(sbbf.clone());
1982 } else {
1983 panic!("No bloom filter for column named {file_column} found");
1984 }
1985 } else {
1986 panic!("No column named {file_column} found");
1987 }
1988 }
1989
1990 positive_values.iter().for_each(|value| {
1991 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
1992 assert!(
1993 found.is_some(),
1994 "{}",
1995 format!("Value {:?} should be in bloom filter", value.as_bytes())
1996 );
1997 });
1998
1999 negative_values.iter().for_each(|value| {
2000 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2001 assert!(
2002 found.is_none(),
2003 "{}",
2004 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2005 );
2006 });
2007 });
2008 }
2009
2010 #[test]
2011 fn all_null_primitive_single_column() {
2012 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2013 one_column_roundtrip(values, true);
2014 }
2015 #[test]
2016 fn null_single_column() {
2017 let values = Arc::new(NullArray::new(SMALL_SIZE));
2018 one_column_roundtrip(values, true);
2019 }
2021
2022 #[test]
2023 fn bool_single_column() {
2024 required_and_optional::<BooleanArray, _>(
2025 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2026 );
2027 }
2028
2029 #[test]
2030 fn bool_large_single_column() {
2031 let values = Arc::new(
2032 [None, Some(true), Some(false)]
2033 .iter()
2034 .cycle()
2035 .copied()
2036 .take(200_000)
2037 .collect::<BooleanArray>(),
2038 );
2039 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2040 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2041 let file = tempfile::tempfile().unwrap();
2042
2043 let mut writer =
2044 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2045 .expect("Unable to write file");
2046 writer.write(&expected_batch).unwrap();
2047 writer.close().unwrap();
2048 }
2049
2050 #[test]
2051 fn check_page_offset_index_with_nan() {
2052 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2053 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2054 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2055
2056 let mut out = Vec::with_capacity(1024);
2057 let mut writer =
2058 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2059 writer.write(&batch).unwrap();
2060 let file_meta_data = writer.close().unwrap();
2061 for row_group in file_meta_data.row_groups {
2062 for column in row_group.columns {
2063 assert!(column.offset_index_offset.is_some());
2064 assert!(column.offset_index_length.is_some());
2065 assert!(column.column_index_offset.is_none());
2066 assert!(column.column_index_length.is_none());
2067 }
2068 }
2069 }
2070
2071 #[test]
2072 fn i8_single_column() {
2073 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2074 }
2075
2076 #[test]
2077 fn i16_single_column() {
2078 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2079 }
2080
2081 #[test]
2082 fn i32_single_column() {
2083 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2084 }
2085
2086 #[test]
2087 fn i64_single_column() {
2088 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2089 }
2090
2091 #[test]
2092 fn u8_single_column() {
2093 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2094 }
2095
2096 #[test]
2097 fn u16_single_column() {
2098 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2099 }
2100
2101 #[test]
2102 fn u32_single_column() {
2103 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2104 }
2105
2106 #[test]
2107 fn u64_single_column() {
2108 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2109 }
2110
2111 #[test]
2112 fn f32_single_column() {
2113 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2114 }
2115
2116 #[test]
2117 fn f64_single_column() {
2118 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2119 }
2120
2121 #[test]
2126 fn timestamp_second_single_column() {
2127 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2128 let values = Arc::new(TimestampSecondArray::from(raw_values));
2129
2130 one_column_roundtrip(values, false);
2131 }
2132
2133 #[test]
2134 fn timestamp_millisecond_single_column() {
2135 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2136 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2137
2138 one_column_roundtrip(values, false);
2139 }
2140
2141 #[test]
2142 fn timestamp_microsecond_single_column() {
2143 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2144 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2145
2146 one_column_roundtrip(values, false);
2147 }
2148
2149 #[test]
2150 fn timestamp_nanosecond_single_column() {
2151 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2152 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2153
2154 one_column_roundtrip(values, false);
2155 }
2156
2157 #[test]
2158 fn date32_single_column() {
2159 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2160 }
2161
2162 #[test]
2163 fn date64_single_column() {
2164 required_and_optional::<Date64Array, _>(
2166 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2167 );
2168 }
2169
2170 #[test]
2171 fn time32_second_single_column() {
2172 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2173 }
2174
2175 #[test]
2176 fn time32_millisecond_single_column() {
2177 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2178 }
2179
2180 #[test]
2181 fn time64_microsecond_single_column() {
2182 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2183 }
2184
2185 #[test]
2186 fn time64_nanosecond_single_column() {
2187 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2188 }
2189
2190 #[test]
2191 #[should_panic(expected = "Converting Duration to parquet not supported")]
2192 fn duration_second_single_column() {
2193 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2194 }
2195
2196 #[test]
2197 #[should_panic(expected = "Converting Duration to parquet not supported")]
2198 fn duration_millisecond_single_column() {
2199 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2200 }
2201
2202 #[test]
2203 #[should_panic(expected = "Converting Duration to parquet not supported")]
2204 fn duration_microsecond_single_column() {
2205 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2206 }
2207
2208 #[test]
2209 #[should_panic(expected = "Converting Duration to parquet not supported")]
2210 fn duration_nanosecond_single_column() {
2211 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2212 }
2213
2214 #[test]
2215 fn interval_year_month_single_column() {
2216 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2217 }
2218
2219 #[test]
2220 fn interval_day_time_single_column() {
2221 required_and_optional::<IntervalDayTimeArray, _>(vec![
2222 IntervalDayTime::new(0, 1),
2223 IntervalDayTime::new(0, 3),
2224 IntervalDayTime::new(3, -2),
2225 IntervalDayTime::new(-200, 4),
2226 ]);
2227 }
2228
2229 #[test]
2230 #[should_panic(
2231 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2232 )]
2233 fn interval_month_day_nano_single_column() {
2234 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2235 IntervalMonthDayNano::new(0, 1, 5),
2236 IntervalMonthDayNano::new(0, 3, 2),
2237 IntervalMonthDayNano::new(3, -2, -5),
2238 IntervalMonthDayNano::new(-200, 4, -1),
2239 ]);
2240 }
2241
2242 #[test]
2243 fn binary_single_column() {
2244 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2245 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2246 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2247
2248 values_required::<BinaryArray, _>(many_vecs_iter);
2250 }
2251
2252 #[test]
2253 fn binary_view_single_column() {
2254 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2255 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2256 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2257
2258 values_required::<BinaryViewArray, _>(many_vecs_iter);
2260 }
2261
2262 #[test]
2263 fn i32_column_bloom_filter_at_end() {
2264 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2265 let mut options = RoundTripOptions::new(array, false);
2266 options.bloom_filter = true;
2267 options.bloom_filter_position = BloomFilterPosition::End;
2268
2269 let files = one_column_roundtrip_with_options(options);
2270 check_bloom_filter(
2271 files,
2272 "col".to_string(),
2273 (0..SMALL_SIZE as i32).collect(),
2274 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2275 );
2276 }
2277
2278 #[test]
2279 fn i32_column_bloom_filter() {
2280 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2281 let mut options = RoundTripOptions::new(array, false);
2282 options.bloom_filter = true;
2283
2284 let files = one_column_roundtrip_with_options(options);
2285 check_bloom_filter(
2286 files,
2287 "col".to_string(),
2288 (0..SMALL_SIZE as i32).collect(),
2289 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2290 );
2291 }
2292
2293 #[test]
2294 fn binary_column_bloom_filter() {
2295 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2296 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2297 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2298
2299 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2300 let mut options = RoundTripOptions::new(array, false);
2301 options.bloom_filter = true;
2302
2303 let files = one_column_roundtrip_with_options(options);
2304 check_bloom_filter(
2305 files,
2306 "col".to_string(),
2307 many_vecs,
2308 vec![vec![(SMALL_SIZE + 1) as u8]],
2309 );
2310 }
2311
2312 #[test]
2313 fn empty_string_null_column_bloom_filter() {
2314 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2315 let raw_strs = raw_values.iter().map(|s| s.as_str());
2316
2317 let array = Arc::new(StringArray::from_iter_values(raw_strs));
2318 let mut options = RoundTripOptions::new(array, false);
2319 options.bloom_filter = true;
2320
2321 let files = one_column_roundtrip_with_options(options);
2322
2323 let optional_raw_values: Vec<_> = raw_values
2324 .iter()
2325 .enumerate()
2326 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2327 .collect();
2328 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2330 }
2331
2332 #[test]
2333 fn large_binary_single_column() {
2334 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2335 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2336 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2337
2338 values_required::<LargeBinaryArray, _>(many_vecs_iter);
2340 }
2341
2342 #[test]
2343 fn fixed_size_binary_single_column() {
2344 let mut builder = FixedSizeBinaryBuilder::new(4);
2345 builder.append_value(b"0123").unwrap();
2346 builder.append_null();
2347 builder.append_value(b"8910").unwrap();
2348 builder.append_value(b"1112").unwrap();
2349 let array = Arc::new(builder.finish());
2350
2351 one_column_roundtrip(array, true);
2352 }
2353
2354 #[test]
2355 fn string_single_column() {
2356 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2357 let raw_strs = raw_values.iter().map(|s| s.as_str());
2358
2359 required_and_optional::<StringArray, _>(raw_strs);
2360 }
2361
2362 #[test]
2363 fn large_string_single_column() {
2364 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2365 let raw_strs = raw_values.iter().map(|s| s.as_str());
2366
2367 required_and_optional::<LargeStringArray, _>(raw_strs);
2368 }
2369
2370 #[test]
2371 fn string_view_single_column() {
2372 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2373 let raw_strs = raw_values.iter().map(|s| s.as_str());
2374
2375 required_and_optional::<StringViewArray, _>(raw_strs);
2376 }
2377
2378 #[test]
2379 fn null_list_single_column() {
2380 let null_field = Field::new("item", DataType::Null, true);
2381 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2382
2383 let schema = Schema::new(vec![list_field]);
2384
2385 let a_values = NullArray::new(2);
2387 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2388 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
2389 "item",
2390 DataType::Null,
2391 true,
2392 ))))
2393 .len(3)
2394 .add_buffer(a_value_offsets)
2395 .null_bit_buffer(Some(Buffer::from([0b00000101])))
2396 .add_child_data(a_values.into_data())
2397 .build()
2398 .unwrap();
2399
2400 let a = ListArray::from(a_list_data);
2401
2402 assert!(a.is_valid(0));
2403 assert!(!a.is_valid(1));
2404 assert!(a.is_valid(2));
2405
2406 assert_eq!(a.value(0).len(), 0);
2407 assert_eq!(a.value(2).len(), 2);
2408 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2409
2410 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2411 roundtrip(batch, None);
2412 }
2413
2414 #[test]
2415 fn list_single_column() {
2416 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2417 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2418 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
2419 "item",
2420 DataType::Int32,
2421 false,
2422 ))))
2423 .len(5)
2424 .add_buffer(a_value_offsets)
2425 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2426 .add_child_data(a_values.into_data())
2427 .build()
2428 .unwrap();
2429
2430 assert_eq!(a_list_data.null_count(), 1);
2431
2432 let a = ListArray::from(a_list_data);
2433 let values = Arc::new(a);
2434
2435 one_column_roundtrip(values, true);
2436 }
2437
2438 #[test]
2439 fn large_list_single_column() {
2440 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2441 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2442 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2443 "large_item",
2444 DataType::Int32,
2445 true,
2446 ))))
2447 .len(5)
2448 .add_buffer(a_value_offsets)
2449 .add_child_data(a_values.into_data())
2450 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2451 .build()
2452 .unwrap();
2453
2454 assert_eq!(a_list_data.null_count(), 1);
2456
2457 let a = LargeListArray::from(a_list_data);
2458 let values = Arc::new(a);
2459
2460 one_column_roundtrip(values, true);
2461 }
2462
2463 #[test]
2464 fn list_nested_nulls() {
2465 use arrow::datatypes::Int32Type;
2466 let data = vec![
2467 Some(vec![Some(1)]),
2468 Some(vec![Some(2), Some(3)]),
2469 None,
2470 Some(vec![Some(4), Some(5), None]),
2471 Some(vec![None]),
2472 Some(vec![Some(6), Some(7)]),
2473 ];
2474
2475 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2476 one_column_roundtrip(Arc::new(list), true);
2477
2478 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2479 one_column_roundtrip(Arc::new(list), true);
2480 }
2481
2482 #[test]
2483 fn struct_single_column() {
2484 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2485 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2486 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2487
2488 let values = Arc::new(s);
2489 one_column_roundtrip(values, false);
2490 }
2491
2492 #[test]
2493 fn fallback_flush_data_page() {
2494 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
2496 let values = Arc::new(StringArray::from(raw_values));
2497 let encodings = vec![
2498 Encoding::DELTA_BYTE_ARRAY,
2499 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2500 ];
2501 let data_type = values.data_type().clone();
2502 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
2503 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2504
2505 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2506 let data_page_size_limit: usize = 32;
2507 let write_batch_size: usize = 16;
2508
2509 for encoding in &encodings {
2510 for row_group_size in row_group_sizes {
2511 let props = WriterProperties::builder()
2512 .set_writer_version(WriterVersion::PARQUET_2_0)
2513 .set_max_row_group_size(row_group_size)
2514 .set_dictionary_enabled(false)
2515 .set_encoding(*encoding)
2516 .set_data_page_size_limit(data_page_size_limit)
2517 .set_write_batch_size(write_batch_size)
2518 .build();
2519
2520 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
2521 let string_array_a = StringArray::from(a.clone());
2522 let string_array_b = StringArray::from(b.clone());
2523 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
2524 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
2525 assert_eq!(
2526 vec_a, vec_b,
2527 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
2528 );
2529 });
2530 }
2531 }
2532 }
2533
2534 #[test]
2535 fn arrow_writer_string_dictionary() {
2536 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2538 "dictionary",
2539 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2540 true,
2541 42,
2542 true,
2543 )]));
2544
2545 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2547 .iter()
2548 .copied()
2549 .collect();
2550
2551 one_column_roundtrip_with_schema(Arc::new(d), schema);
2553 }
2554
2555 #[test]
2556 fn arrow_writer_primitive_dictionary() {
2557 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2559 "dictionary",
2560 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
2561 true,
2562 42,
2563 true,
2564 )]));
2565
2566 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
2568 builder.append(12345678).unwrap();
2569 builder.append_null();
2570 builder.append(22345678).unwrap();
2571 builder.append(12345678).unwrap();
2572 let d = builder.finish();
2573
2574 one_column_roundtrip_with_schema(Arc::new(d), schema);
2575 }
2576
2577 #[test]
2578 fn arrow_writer_string_dictionary_unsigned_index() {
2579 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2581 "dictionary",
2582 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
2583 true,
2584 42,
2585 true,
2586 )]));
2587
2588 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2590 .iter()
2591 .copied()
2592 .collect();
2593
2594 one_column_roundtrip_with_schema(Arc::new(d), schema);
2595 }
2596
2597 #[test]
2598 fn u32_min_max() {
2599 let src = [
2601 u32::MIN,
2602 u32::MIN + 1,
2603 (i32::MAX as u32) - 1,
2604 i32::MAX as u32,
2605 (i32::MAX as u32) + 1,
2606 u32::MAX - 1,
2607 u32::MAX,
2608 ];
2609 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
2610 let files = one_column_roundtrip(values, false);
2611
2612 for file in files {
2613 let reader = SerializedFileReader::new(file).unwrap();
2615 let metadata = reader.metadata();
2616
2617 let mut row_offset = 0;
2618 for row_group in metadata.row_groups() {
2619 assert_eq!(row_group.num_columns(), 1);
2620 let column = row_group.column(0);
2621
2622 let num_values = column.num_values() as usize;
2623 let src_slice = &src[row_offset..row_offset + num_values];
2624 row_offset += column.num_values() as usize;
2625
2626 let stats = column.statistics().unwrap();
2627 if let Statistics::Int32(stats) = stats {
2628 assert_eq!(
2629 *stats.min_opt().unwrap() as u32,
2630 *src_slice.iter().min().unwrap()
2631 );
2632 assert_eq!(
2633 *stats.max_opt().unwrap() as u32,
2634 *src_slice.iter().max().unwrap()
2635 );
2636 } else {
2637 panic!("Statistics::Int32 missing")
2638 }
2639 }
2640 }
2641 }
2642
2643 #[test]
2644 fn u64_min_max() {
2645 let src = [
2647 u64::MIN,
2648 u64::MIN + 1,
2649 (i64::MAX as u64) - 1,
2650 i64::MAX as u64,
2651 (i64::MAX as u64) + 1,
2652 u64::MAX - 1,
2653 u64::MAX,
2654 ];
2655 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
2656 let files = one_column_roundtrip(values, false);
2657
2658 for file in files {
2659 let reader = SerializedFileReader::new(file).unwrap();
2661 let metadata = reader.metadata();
2662
2663 let mut row_offset = 0;
2664 for row_group in metadata.row_groups() {
2665 assert_eq!(row_group.num_columns(), 1);
2666 let column = row_group.column(0);
2667
2668 let num_values = column.num_values() as usize;
2669 let src_slice = &src[row_offset..row_offset + num_values];
2670 row_offset += column.num_values() as usize;
2671
2672 let stats = column.statistics().unwrap();
2673 if let Statistics::Int64(stats) = stats {
2674 assert_eq!(
2675 *stats.min_opt().unwrap() as u64,
2676 *src_slice.iter().min().unwrap()
2677 );
2678 assert_eq!(
2679 *stats.max_opt().unwrap() as u64,
2680 *src_slice.iter().max().unwrap()
2681 );
2682 } else {
2683 panic!("Statistics::Int64 missing")
2684 }
2685 }
2686 }
2687 }
2688
2689 #[test]
2690 fn statistics_null_counts_only_nulls() {
2691 let values = Arc::new(UInt64Array::from(vec![None, None]));
2693 let files = one_column_roundtrip(values, true);
2694
2695 for file in files {
2696 let reader = SerializedFileReader::new(file).unwrap();
2698 let metadata = reader.metadata();
2699 assert_eq!(metadata.num_row_groups(), 1);
2700 let row_group = metadata.row_group(0);
2701 assert_eq!(row_group.num_columns(), 1);
2702 let column = row_group.column(0);
2703 let stats = column.statistics().unwrap();
2704 assert_eq!(stats.null_count_opt(), Some(2));
2705 }
2706 }
2707
2708 #[test]
2709 fn test_list_of_struct_roundtrip() {
2710 let int_field = Field::new("a", DataType::Int32, true);
2712 let int_field2 = Field::new("b", DataType::Int32, true);
2713
2714 let int_builder = Int32Builder::with_capacity(10);
2715 let int_builder2 = Int32Builder::with_capacity(10);
2716
2717 let struct_builder = StructBuilder::new(
2718 vec![int_field, int_field2],
2719 vec![Box::new(int_builder), Box::new(int_builder2)],
2720 );
2721 let mut list_builder = ListBuilder::new(struct_builder);
2722
2723 let values = list_builder.values();
2728 values
2729 .field_builder::<Int32Builder>(0)
2730 .unwrap()
2731 .append_value(1);
2732 values
2733 .field_builder::<Int32Builder>(1)
2734 .unwrap()
2735 .append_value(2);
2736 values.append(true);
2737 list_builder.append(true);
2738
2739 list_builder.append(true);
2741
2742 list_builder.append(false);
2744
2745 let values = list_builder.values();
2747 values
2748 .field_builder::<Int32Builder>(0)
2749 .unwrap()
2750 .append_null();
2751 values
2752 .field_builder::<Int32Builder>(1)
2753 .unwrap()
2754 .append_null();
2755 values.append(false);
2756 values
2757 .field_builder::<Int32Builder>(0)
2758 .unwrap()
2759 .append_null();
2760 values
2761 .field_builder::<Int32Builder>(1)
2762 .unwrap()
2763 .append_null();
2764 values.append(false);
2765 list_builder.append(true);
2766
2767 let values = list_builder.values();
2769 values
2770 .field_builder::<Int32Builder>(0)
2771 .unwrap()
2772 .append_null();
2773 values
2774 .field_builder::<Int32Builder>(1)
2775 .unwrap()
2776 .append_value(3);
2777 values.append(true);
2778 list_builder.append(true);
2779
2780 let values = list_builder.values();
2782 values
2783 .field_builder::<Int32Builder>(0)
2784 .unwrap()
2785 .append_value(2);
2786 values
2787 .field_builder::<Int32Builder>(1)
2788 .unwrap()
2789 .append_null();
2790 values.append(true);
2791 list_builder.append(true);
2792
2793 let array = Arc::new(list_builder.finish());
2794
2795 one_column_roundtrip(array, true);
2796 }
2797
2798 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
2799 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
2800 }
2801
2802 #[test]
2803 fn test_aggregates_records() {
2804 let arrays = [
2805 Int32Array::from((0..100).collect::<Vec<_>>()),
2806 Int32Array::from((0..50).collect::<Vec<_>>()),
2807 Int32Array::from((200..500).collect::<Vec<_>>()),
2808 ];
2809
2810 let schema = Arc::new(Schema::new(vec![Field::new(
2811 "int",
2812 ArrowDataType::Int32,
2813 false,
2814 )]));
2815
2816 let file = tempfile::tempfile().unwrap();
2817
2818 let props = WriterProperties::builder()
2819 .set_max_row_group_size(200)
2820 .build();
2821
2822 let mut writer =
2823 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
2824
2825 for array in arrays {
2826 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2827 writer.write(&batch).unwrap();
2828 }
2829
2830 writer.close().unwrap();
2831
2832 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2833 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
2834
2835 let batches = builder
2836 .with_batch_size(100)
2837 .build()
2838 .unwrap()
2839 .collect::<ArrowResult<Vec<_>>>()
2840 .unwrap();
2841
2842 assert_eq!(batches.len(), 5);
2843 assert!(batches.iter().all(|x| x.num_columns() == 1));
2844
2845 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
2846
2847 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
2848
2849 let values: Vec<_> = batches
2850 .iter()
2851 .flat_map(|x| {
2852 x.column(0)
2853 .as_any()
2854 .downcast_ref::<Int32Array>()
2855 .unwrap()
2856 .values()
2857 .iter()
2858 .cloned()
2859 })
2860 .collect();
2861
2862 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
2863 assert_eq!(&values, &expected_values)
2864 }
2865
2866 #[test]
2867 fn complex_aggregate() {
2868 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
2870 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
2871 let struct_a = Arc::new(Field::new(
2872 "struct_a",
2873 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
2874 true,
2875 ));
2876
2877 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
2878 let struct_b = Arc::new(Field::new(
2879 "struct_b",
2880 DataType::Struct(vec![list_a.clone()].into()),
2881 false,
2882 ));
2883
2884 let schema = Arc::new(Schema::new(vec![struct_b]));
2885
2886 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2888 let field_b_array =
2889 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
2890
2891 let struct_a_array = StructArray::from(vec![
2892 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
2893 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
2894 ]);
2895
2896 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
2897 .len(5)
2898 .add_buffer(Buffer::from_iter(vec![
2899 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
2900 ]))
2901 .null_bit_buffer(Some(Buffer::from_iter(vec![
2902 true, false, true, false, true,
2903 ])))
2904 .child_data(vec![struct_a_array.into_data()])
2905 .build()
2906 .unwrap();
2907
2908 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
2909 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
2910
2911 let batch1 =
2912 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
2913 .unwrap();
2914
2915 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
2916 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
2917
2918 let struct_a_array = StructArray::from(vec![
2919 (field_a, Arc::new(field_a_array) as ArrayRef),
2920 (field_b, Arc::new(field_b_array) as ArrayRef),
2921 ]);
2922
2923 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
2924 .len(2)
2925 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
2926 .child_data(vec![struct_a_array.into_data()])
2927 .build()
2928 .unwrap();
2929
2930 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
2931 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
2932
2933 let batch2 =
2934 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
2935 .unwrap();
2936
2937 let batches = &[batch1, batch2];
2938
2939 let expected = r#"
2942 +-------------------------------------------------------------------------------------------------------+
2943 | struct_b |
2944 +-------------------------------------------------------------------------------------------------------+
2945 | {list: [{leaf_a: 1, leaf_b: 1}]} |
2946 | {list: } |
2947 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
2948 | {list: } |
2949 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
2950 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
2951 | {list: [{leaf_a: 10, leaf_b: }]} |
2952 +-------------------------------------------------------------------------------------------------------+
2953 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
2954
2955 let actual = pretty_format_batches(batches).unwrap().to_string();
2956 assert_eq!(actual, expected);
2957
2958 let file = tempfile::tempfile().unwrap();
2960 let props = WriterProperties::builder()
2961 .set_max_row_group_size(6)
2962 .build();
2963
2964 let mut writer =
2965 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
2966
2967 for batch in batches {
2968 writer.write(batch).unwrap();
2969 }
2970 writer.close().unwrap();
2971
2972 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2977 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
2978
2979 let batches = builder
2980 .with_batch_size(2)
2981 .build()
2982 .unwrap()
2983 .collect::<ArrowResult<Vec<_>>>()
2984 .unwrap();
2985
2986 assert_eq!(batches.len(), 4);
2987 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
2988 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
2989
2990 let actual = pretty_format_batches(&batches).unwrap().to_string();
2991 assert_eq!(actual, expected);
2992 }
2993
2994 #[test]
2995 fn test_arrow_writer_metadata() {
2996 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
2997 let file_schema = batch_schema.clone().with_metadata(
2998 vec![("foo".to_string(), "bar".to_string())]
2999 .into_iter()
3000 .collect(),
3001 );
3002
3003 let batch = RecordBatch::try_new(
3004 Arc::new(batch_schema),
3005 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3006 )
3007 .unwrap();
3008
3009 let mut buf = Vec::with_capacity(1024);
3010 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3011 writer.write(&batch).unwrap();
3012 writer.close().unwrap();
3013 }
3014
3015 #[test]
3016 fn test_arrow_writer_nullable() {
3017 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3018 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3019 let file_schema = Arc::new(file_schema);
3020
3021 let batch = RecordBatch::try_new(
3022 Arc::new(batch_schema),
3023 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3024 )
3025 .unwrap();
3026
3027 let mut buf = Vec::with_capacity(1024);
3028 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3029 writer.write(&batch).unwrap();
3030 writer.close().unwrap();
3031
3032 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3033 let back = read.next().unwrap().unwrap();
3034 assert_eq!(back.schema(), file_schema);
3035 assert_ne!(back.schema(), batch.schema());
3036 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3037 }
3038
3039 #[test]
3040 fn in_progress_accounting() {
3041 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3043
3044 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3046
3047 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3049
3050 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3051
3052 assert_eq!(writer.in_progress_size(), 0);
3054 assert_eq!(writer.in_progress_rows(), 0);
3055 assert_eq!(writer.memory_size(), 0);
3056 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
3058
3059 let initial_size = writer.in_progress_size();
3061 assert!(initial_size > 0);
3062 assert_eq!(writer.in_progress_rows(), 5);
3063 let initial_memory = writer.memory_size();
3064 assert!(initial_memory > 0);
3065 assert!(
3067 initial_size <= initial_memory,
3068 "{initial_size} <= {initial_memory}"
3069 );
3070
3071 writer.write(&batch).unwrap();
3073 assert!(writer.in_progress_size() > initial_size);
3074 assert_eq!(writer.in_progress_rows(), 10);
3075 assert!(writer.memory_size() > initial_memory);
3076 assert!(
3077 writer.in_progress_size() <= writer.memory_size(),
3078 "in_progress_size {} <= memory_size {}",
3079 writer.in_progress_size(),
3080 writer.memory_size()
3081 );
3082
3083 let pre_flush_bytes_written = writer.bytes_written();
3085 writer.flush().unwrap();
3086 assert_eq!(writer.in_progress_size(), 0);
3087 assert_eq!(writer.memory_size(), 0);
3088 assert!(writer.bytes_written() > pre_flush_bytes_written);
3089
3090 writer.close().unwrap();
3091 }
3092
3093 #[test]
3094 fn test_writer_all_null() {
3095 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3096 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3097 let batch = RecordBatch::try_from_iter(vec![
3098 ("a", Arc::new(a) as ArrayRef),
3099 ("b", Arc::new(b) as ArrayRef),
3100 ])
3101 .unwrap();
3102
3103 let mut buf = Vec::with_capacity(1024);
3104 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3105 writer.write(&batch).unwrap();
3106 writer.close().unwrap();
3107
3108 let bytes = Bytes::from(buf);
3109 let options = ReadOptionsBuilder::new().with_page_index().build();
3110 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3111 let index = reader.metadata().offset_index().unwrap();
3112
3113 assert_eq!(index.len(), 1);
3114 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
3118
3119 #[test]
3120 fn test_disabled_statistics_with_page() {
3121 let file_schema = Schema::new(vec![
3122 Field::new("a", DataType::Utf8, true),
3123 Field::new("b", DataType::Utf8, true),
3124 ]);
3125 let file_schema = Arc::new(file_schema);
3126
3127 let batch = RecordBatch::try_new(
3128 file_schema.clone(),
3129 vec![
3130 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3131 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3132 ],
3133 )
3134 .unwrap();
3135
3136 let props = WriterProperties::builder()
3137 .set_statistics_enabled(EnabledStatistics::None)
3138 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3139 .build();
3140
3141 let mut buf = Vec::with_capacity(1024);
3142 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3143 writer.write(&batch).unwrap();
3144
3145 let metadata = writer.close().unwrap();
3146 assert_eq!(metadata.row_groups.len(), 1);
3147 let row_group = &metadata.row_groups[0];
3148 assert_eq!(row_group.columns.len(), 2);
3149 assert!(row_group.columns[0].offset_index_offset.is_some());
3151 assert!(row_group.columns[0].column_index_offset.is_some());
3152 assert!(row_group.columns[1].offset_index_offset.is_some());
3154 assert!(row_group.columns[1].column_index_offset.is_none());
3155
3156 let options = ReadOptionsBuilder::new().with_page_index().build();
3157 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3158
3159 let row_group = reader.get_row_group(0).unwrap();
3160 let a_col = row_group.metadata().column(0);
3161 let b_col = row_group.metadata().column(1);
3162
3163 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3165 let min = byte_array_stats.min_opt().unwrap();
3166 let max = byte_array_stats.max_opt().unwrap();
3167
3168 assert_eq!(min.as_bytes(), b"a");
3169 assert_eq!(max.as_bytes(), b"d");
3170 } else {
3171 panic!("expecting Statistics::ByteArray");
3172 }
3173
3174 assert!(b_col.statistics().is_none());
3176
3177 let offset_index = reader.metadata().offset_index().unwrap();
3178 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
3182 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
3186 assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
3187 let b_idx = &column_index[0][1];
3188 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3189 }
3190
3191 #[test]
3192 fn test_disabled_statistics_with_chunk() {
3193 let file_schema = Schema::new(vec![
3194 Field::new("a", DataType::Utf8, true),
3195 Field::new("b", DataType::Utf8, true),
3196 ]);
3197 let file_schema = Arc::new(file_schema);
3198
3199 let batch = RecordBatch::try_new(
3200 file_schema.clone(),
3201 vec![
3202 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3203 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3204 ],
3205 )
3206 .unwrap();
3207
3208 let props = WriterProperties::builder()
3209 .set_statistics_enabled(EnabledStatistics::None)
3210 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
3211 .build();
3212
3213 let mut buf = Vec::with_capacity(1024);
3214 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3215 writer.write(&batch).unwrap();
3216
3217 let metadata = writer.close().unwrap();
3218 assert_eq!(metadata.row_groups.len(), 1);
3219 let row_group = &metadata.row_groups[0];
3220 assert_eq!(row_group.columns.len(), 2);
3221 assert!(row_group.columns[0].offset_index_offset.is_some());
3223 assert!(row_group.columns[0].column_index_offset.is_none());
3224 assert!(row_group.columns[1].offset_index_offset.is_some());
3226 assert!(row_group.columns[1].column_index_offset.is_none());
3227
3228 let options = ReadOptionsBuilder::new().with_page_index().build();
3229 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3230
3231 let row_group = reader.get_row_group(0).unwrap();
3232 let a_col = row_group.metadata().column(0);
3233 let b_col = row_group.metadata().column(1);
3234
3235 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3237 let min = byte_array_stats.min_opt().unwrap();
3238 let max = byte_array_stats.max_opt().unwrap();
3239
3240 assert_eq!(min.as_bytes(), b"a");
3241 assert_eq!(max.as_bytes(), b"d");
3242 } else {
3243 panic!("expecting Statistics::ByteArray");
3244 }
3245
3246 assert!(b_col.statistics().is_none());
3248
3249 let column_index = reader.metadata().column_index().unwrap();
3250 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
3254 assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
3255 let b_idx = &column_index[0][1];
3256 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3257 }
3258
3259 #[test]
3260 fn test_arrow_writer_skip_metadata() {
3261 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3262 let file_schema = Arc::new(batch_schema.clone());
3263
3264 let batch = RecordBatch::try_new(
3265 Arc::new(batch_schema),
3266 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3267 )
3268 .unwrap();
3269 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
3270
3271 let mut buf = Vec::with_capacity(1024);
3272 let mut writer =
3273 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
3274 writer.write(&batch).unwrap();
3275 writer.close().unwrap();
3276
3277 let bytes = Bytes::from(buf);
3278 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3279 assert_eq!(file_schema, *reader_builder.schema());
3280 if let Some(key_value_metadata) = reader_builder
3281 .metadata()
3282 .file_metadata()
3283 .key_value_metadata()
3284 {
3285 assert!(!key_value_metadata
3286 .iter()
3287 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
3288 }
3289 }
3290
3291 #[test]
3292 fn mismatched_schemas() {
3293 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
3294 let file_schema = Arc::new(Schema::new(vec![Field::new(
3295 "temperature",
3296 DataType::Float64,
3297 false,
3298 )]));
3299
3300 let batch = RecordBatch::try_new(
3301 Arc::new(batch_schema),
3302 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3303 )
3304 .unwrap();
3305
3306 let mut buf = Vec::with_capacity(1024);
3307 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3308
3309 let err = writer.write(&batch).unwrap_err().to_string();
3310 assert_eq!(
3311 err,
3312 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
3313 );
3314 }
3315}