1use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26
27use arrow_array::cast::AsArray;
28use arrow_array::types::*;
29use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
30use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
31
32use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
33
34use crate::arrow::ArrowSchemaConverter;
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
37use crate::column::page_encryption::PageEncryptor;
38use crate::column::writer::encoder::ColumnValueEncoder;
39use crate::column::writer::{
40 ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
41};
42use crate::data_type::{ByteArray, FixedLenByteArray};
43#[cfg(feature = "encryption")]
44use crate::encryption::encrypt::FileEncryptor;
45use crate::errors::{ParquetError, Result};
46use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
47use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
48use crate::file::reader::{ChunkReader, Length};
49use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
50use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
51use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
52use levels::{ArrayLevels, calculate_array_levels};
53
54mod byte_array;
55mod levels;
56
57pub struct ArrowWriter<W: Write> {
174 writer: SerializedFileWriter<W>,
176
177 in_progress: Option<ArrowRowGroupWriter>,
179
180 arrow_schema: SchemaRef,
184
185 row_group_writer_factory: ArrowRowGroupWriterFactory,
187
188 max_row_group_size: usize,
190}
191
192impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 let buffered_memory = self.in_progress_size();
195 f.debug_struct("ArrowWriter")
196 .field("writer", &self.writer)
197 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
198 .field("in_progress_rows", &self.in_progress_rows())
199 .field("arrow_schema", &self.arrow_schema)
200 .field("max_row_group_size", &self.max_row_group_size)
201 .finish()
202 }
203}
204
205impl<W: Write + Send> ArrowWriter<W> {
206 pub fn try_new(
212 writer: W,
213 arrow_schema: SchemaRef,
214 props: Option<WriterProperties>,
215 ) -> Result<Self> {
216 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
217 Self::try_new_with_options(writer, arrow_schema, options)
218 }
219
220 pub fn try_new_with_options(
226 writer: W,
227 arrow_schema: SchemaRef,
228 options: ArrowWriterOptions,
229 ) -> Result<Self> {
230 let mut props = options.properties;
231
232 let schema = if let Some(parquet_schema) = options.schema_descr {
233 parquet_schema.clone()
234 } else {
235 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
236 if let Some(schema_root) = &options.schema_root {
237 converter = converter.schema_root(schema_root);
238 }
239
240 converter.convert(&arrow_schema)?
241 };
242
243 if !options.skip_arrow_metadata {
244 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
246 }
247
248 let max_row_group_size = props.max_row_group_size();
249
250 let props_ptr = Arc::new(props);
251 let file_writer =
252 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
253
254 let row_group_writer_factory =
255 ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
256
257 Ok(Self {
258 writer: file_writer,
259 in_progress: None,
260 arrow_schema,
261 row_group_writer_factory,
262 max_row_group_size,
263 })
264 }
265
266 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
268 self.writer.flushed_row_groups()
269 }
270
271 pub fn memory_size(&self) -> usize {
276 match &self.in_progress {
277 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
278 None => 0,
279 }
280 }
281
282 pub fn in_progress_size(&self) -> usize {
289 match &self.in_progress {
290 Some(in_progress) => in_progress
291 .writers
292 .iter()
293 .map(|x| x.get_estimated_total_bytes())
294 .sum(),
295 None => 0,
296 }
297 }
298
299 pub fn in_progress_rows(&self) -> usize {
301 self.in_progress
302 .as_ref()
303 .map(|x| x.buffered_rows)
304 .unwrap_or_default()
305 }
306
307 pub fn bytes_written(&self) -> usize {
309 self.writer.bytes_written()
310 }
311
312 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
320 if batch.num_rows() == 0 {
321 return Ok(());
322 }
323
324 let in_progress = match &mut self.in_progress {
325 Some(in_progress) => in_progress,
326 x => x.insert(
327 self.row_group_writer_factory
328 .create_row_group_writer(self.writer.flushed_row_groups().len())?,
329 ),
330 };
331
332 if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
334 let to_write = self.max_row_group_size - in_progress.buffered_rows;
335 let a = batch.slice(0, to_write);
336 let b = batch.slice(to_write, batch.num_rows() - to_write);
337 self.write(&a)?;
338 return self.write(&b);
339 }
340
341 in_progress.write(batch)?;
342
343 if in_progress.buffered_rows >= self.max_row_group_size {
344 self.flush()?
345 }
346 Ok(())
347 }
348
349 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
354 self.writer.write_all(buf)
355 }
356
357 pub fn sync(&mut self) -> std::io::Result<()> {
359 self.writer.flush()
360 }
361
362 pub fn flush(&mut self) -> Result<()> {
367 let in_progress = match self.in_progress.take() {
368 Some(in_progress) => in_progress,
369 None => return Ok(()),
370 };
371
372 let mut row_group_writer = self.writer.next_row_group()?;
373 for chunk in in_progress.close()? {
374 chunk.append_to_row_group(&mut row_group_writer)?;
375 }
376 row_group_writer.close()?;
377 Ok(())
378 }
379
380 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
384 self.writer.append_key_value_metadata(kv_metadata)
385 }
386
387 pub fn inner(&self) -> &W {
389 self.writer.inner()
390 }
391
392 pub fn inner_mut(&mut self) -> &mut W {
401 self.writer.inner_mut()
402 }
403
404 pub fn into_inner(mut self) -> Result<W> {
406 self.flush()?;
407 self.writer.into_inner()
408 }
409
410 pub fn finish(&mut self) -> Result<ParquetMetaData> {
416 self.flush()?;
417 self.writer.finish()
418 }
419
420 pub fn close(mut self) -> Result<ParquetMetaData> {
422 self.finish()
423 }
424
425 #[deprecated(
427 since = "56.2.0",
428 note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
429 )]
430 pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
431 self.flush()?;
432 let in_progress = self
433 .row_group_writer_factory
434 .create_row_group_writer(self.writer.flushed_row_groups().len())?;
435 Ok(in_progress.writers)
436 }
437
438 #[deprecated(
440 since = "56.2.0",
441 note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
442 )]
443 pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
444 let mut row_group_writer = self.writer.next_row_group()?;
445 for chunk in chunks {
446 chunk.append_to_row_group(&mut row_group_writer)?;
447 }
448 row_group_writer.close()?;
449 Ok(())
450 }
451
452 pub fn into_serialized_writer(
459 mut self,
460 ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
461 self.flush()?;
462 Ok((self.writer, self.row_group_writer_factory))
463 }
464}
465
466impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
467 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
468 self.write(batch).map_err(|e| e.into())
469 }
470
471 fn close(self) -> std::result::Result<(), ArrowError> {
472 self.close()?;
473 Ok(())
474 }
475}
476
477#[derive(Debug, Clone, Default)]
481pub struct ArrowWriterOptions {
482 properties: WriterProperties,
483 skip_arrow_metadata: bool,
484 schema_root: Option<String>,
485 schema_descr: Option<SchemaDescriptor>,
486}
487
488impl ArrowWriterOptions {
489 pub fn new() -> Self {
491 Self::default()
492 }
493
494 pub fn with_properties(self, properties: WriterProperties) -> Self {
496 Self { properties, ..self }
497 }
498
499 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
506 Self {
507 skip_arrow_metadata,
508 ..self
509 }
510 }
511
512 pub fn with_schema_root(self, schema_root: String) -> Self {
514 Self {
515 schema_root: Some(schema_root),
516 ..self
517 }
518 }
519
520 pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
526 Self {
527 schema_descr: Some(schema_descr),
528 ..self
529 }
530 }
531}
532
533#[derive(Default)]
535struct ArrowColumnChunkData {
536 length: usize,
537 data: Vec<Bytes>,
538}
539
540impl Length for ArrowColumnChunkData {
541 fn len(&self) -> u64 {
542 self.length as _
543 }
544}
545
546impl ChunkReader for ArrowColumnChunkData {
547 type T = ArrowColumnChunkReader;
548
549 fn get_read(&self, start: u64) -> Result<Self::T> {
550 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
552 self.data.clone().into_iter().peekable(),
553 ))
554 }
555
556 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
557 unimplemented!()
558 }
559}
560
561struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
563
564impl Read for ArrowColumnChunkReader {
565 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
566 let buffer = loop {
567 match self.0.peek_mut() {
568 Some(b) if b.is_empty() => {
569 self.0.next();
570 continue;
571 }
572 Some(b) => break b,
573 None => return Ok(0),
574 }
575 };
576
577 let len = buffer.len().min(out.len());
578 let b = buffer.split_to(len);
579 out[..len].copy_from_slice(&b);
580 Ok(len)
581 }
582}
583
584type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
589
590#[derive(Default)]
591struct ArrowPageWriter {
592 buffer: SharedColumnChunk,
593 #[cfg(feature = "encryption")]
594 page_encryptor: Option<PageEncryptor>,
595}
596
597impl ArrowPageWriter {
598 #[cfg(feature = "encryption")]
599 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
600 self.page_encryptor = page_encryptor;
601 self
602 }
603
604 #[cfg(feature = "encryption")]
605 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
606 self.page_encryptor.as_mut()
607 }
608
609 #[cfg(not(feature = "encryption"))]
610 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
611 None
612 }
613}
614
615impl PageWriter for ArrowPageWriter {
616 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
617 let page = match self.page_encryptor_mut() {
618 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
619 None => page,
620 };
621
622 let page_header = page.to_thrift_header()?;
623 let header = {
624 let mut header = Vec::with_capacity(1024);
625
626 match self.page_encryptor_mut() {
627 Some(page_encryptor) => {
628 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
629 if page.compressed_page().is_data_page() {
630 page_encryptor.increment_page();
631 }
632 }
633 None => {
634 let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
635 page_header.write_thrift(&mut protocol)?;
636 }
637 };
638
639 Bytes::from(header)
640 };
641
642 let mut buf = self.buffer.try_lock().unwrap();
643
644 let data = page.compressed_page().buffer().clone();
645 let compressed_size = data.len() + header.len();
646
647 let mut spec = PageWriteSpec::new();
648 spec.page_type = page.page_type();
649 spec.num_values = page.num_values();
650 spec.uncompressed_size = page.uncompressed_size() + header.len();
651 spec.offset = buf.length as u64;
652 spec.compressed_size = compressed_size;
653 spec.bytes_written = compressed_size as u64;
654
655 buf.length += compressed_size;
656 buf.data.push(header);
657 buf.data.push(data);
658
659 Ok(spec)
660 }
661
662 fn close(&mut self) -> Result<()> {
663 Ok(())
664 }
665}
666
667#[derive(Debug)]
669pub struct ArrowLeafColumn(ArrayLevels);
670
671pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
676 let levels = calculate_array_levels(array, field)?;
677 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
678}
679
680pub struct ArrowColumnChunk {
682 data: ArrowColumnChunkData,
683 close: ColumnCloseResult,
684}
685
686impl std::fmt::Debug for ArrowColumnChunk {
687 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
688 f.debug_struct("ArrowColumnChunk")
689 .field("length", &self.data.length)
690 .finish_non_exhaustive()
691 }
692}
693
694impl ArrowColumnChunk {
695 pub fn append_to_row_group<W: Write + Send>(
697 self,
698 writer: &mut SerializedRowGroupWriter<'_, W>,
699 ) -> Result<()> {
700 writer.append_column(&self.data, self.close)
701 }
702}
703
704pub struct ArrowColumnWriter {
802 writer: ArrowColumnWriterImpl,
803 chunk: SharedColumnChunk,
804}
805
806impl std::fmt::Debug for ArrowColumnWriter {
807 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
808 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
809 }
810}
811
812enum ArrowColumnWriterImpl {
813 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
814 Column(ColumnWriter<'static>),
815}
816
817impl ArrowColumnWriter {
818 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
820 match &mut self.writer {
821 ArrowColumnWriterImpl::Column(c) => {
822 write_leaf(c, &col.0)?;
823 }
824 ArrowColumnWriterImpl::ByteArray(c) => {
825 write_primitive(c, col.0.array().as_ref(), &col.0)?;
826 }
827 }
828 Ok(())
829 }
830
831 pub fn close(self) -> Result<ArrowColumnChunk> {
833 let close = match self.writer {
834 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
835 ArrowColumnWriterImpl::Column(c) => c.close()?,
836 };
837 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
838 let data = chunk.into_inner().unwrap();
839 Ok(ArrowColumnChunk { data, close })
840 }
841
842 pub fn memory_size(&self) -> usize {
853 match &self.writer {
854 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
855 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
856 }
857 }
858
859 pub fn get_estimated_total_bytes(&self) -> usize {
867 match &self.writer {
868 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
869 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
870 }
871 }
872}
873
874#[derive(Debug)]
881struct ArrowRowGroupWriter {
882 writers: Vec<ArrowColumnWriter>,
883 schema: SchemaRef,
884 buffered_rows: usize,
885}
886
887impl ArrowRowGroupWriter {
888 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
889 Self {
890 writers,
891 schema: arrow.clone(),
892 buffered_rows: 0,
893 }
894 }
895
896 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
897 self.buffered_rows += batch.num_rows();
898 let mut writers = self.writers.iter_mut();
899 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
900 for leaf in compute_leaves(field.as_ref(), column)? {
901 writers.next().unwrap().write(&leaf)?
902 }
903 }
904 Ok(())
905 }
906
907 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
908 self.writers
909 .into_iter()
910 .map(|writer| writer.close())
911 .collect()
912 }
913}
914
915#[derive(Debug)]
920pub struct ArrowRowGroupWriterFactory {
921 schema: SchemaDescPtr,
922 arrow_schema: SchemaRef,
923 props: WriterPropertiesPtr,
924 #[cfg(feature = "encryption")]
925 file_encryptor: Option<Arc<FileEncryptor>>,
926}
927
928impl ArrowRowGroupWriterFactory {
929 pub fn new<W: Write + Send>(
931 file_writer: &SerializedFileWriter<W>,
932 arrow_schema: SchemaRef,
933 ) -> Self {
934 let schema = Arc::clone(file_writer.schema_descr_ptr());
935 let props = Arc::clone(file_writer.properties());
936 Self {
937 schema,
938 arrow_schema,
939 props,
940 #[cfg(feature = "encryption")]
941 file_encryptor: file_writer.file_encryptor(),
942 }
943 }
944
945 fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
946 let writers = self.create_column_writers(row_group_index)?;
947 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
948 }
949
950 pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
952 let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
953 let mut leaves = self.schema.columns().iter();
954 let column_factory = self.column_writer_factory(row_group_index);
955 for field in &self.arrow_schema.fields {
956 column_factory.get_arrow_column_writer(
957 field.data_type(),
958 &self.props,
959 &mut leaves,
960 &mut writers,
961 )?;
962 }
963 Ok(writers)
964 }
965
966 #[cfg(feature = "encryption")]
967 fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
968 ArrowColumnWriterFactory::new()
969 .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
970 }
971
972 #[cfg(not(feature = "encryption"))]
973 fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
974 ArrowColumnWriterFactory::new()
975 }
976}
977
978#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
980pub fn get_column_writers(
981 parquet: &SchemaDescriptor,
982 props: &WriterPropertiesPtr,
983 arrow: &SchemaRef,
984) -> Result<Vec<ArrowColumnWriter>> {
985 let mut writers = Vec::with_capacity(arrow.fields.len());
986 let mut leaves = parquet.columns().iter();
987 let column_factory = ArrowColumnWriterFactory::new();
988 for field in &arrow.fields {
989 column_factory.get_arrow_column_writer(
990 field.data_type(),
991 props,
992 &mut leaves,
993 &mut writers,
994 )?;
995 }
996 Ok(writers)
997}
998
999struct ArrowColumnWriterFactory {
1001 #[cfg(feature = "encryption")]
1002 row_group_index: usize,
1003 #[cfg(feature = "encryption")]
1004 file_encryptor: Option<Arc<FileEncryptor>>,
1005}
1006
1007impl ArrowColumnWriterFactory {
1008 pub fn new() -> Self {
1009 Self {
1010 #[cfg(feature = "encryption")]
1011 row_group_index: 0,
1012 #[cfg(feature = "encryption")]
1013 file_encryptor: None,
1014 }
1015 }
1016
1017 #[cfg(feature = "encryption")]
1018 pub fn with_file_encryptor(
1019 mut self,
1020 row_group_index: usize,
1021 file_encryptor: Option<Arc<FileEncryptor>>,
1022 ) -> Self {
1023 self.row_group_index = row_group_index;
1024 self.file_encryptor = file_encryptor;
1025 self
1026 }
1027
1028 #[cfg(feature = "encryption")]
1029 fn create_page_writer(
1030 &self,
1031 column_descriptor: &ColumnDescPtr,
1032 column_index: usize,
1033 ) -> Result<Box<ArrowPageWriter>> {
1034 let column_path = column_descriptor.path().string();
1035 let page_encryptor = PageEncryptor::create_if_column_encrypted(
1036 &self.file_encryptor,
1037 self.row_group_index,
1038 column_index,
1039 &column_path,
1040 )?;
1041 Ok(Box::new(
1042 ArrowPageWriter::default().with_encryptor(page_encryptor),
1043 ))
1044 }
1045
1046 #[cfg(not(feature = "encryption"))]
1047 fn create_page_writer(
1048 &self,
1049 _column_descriptor: &ColumnDescPtr,
1050 _column_index: usize,
1051 ) -> Result<Box<ArrowPageWriter>> {
1052 Ok(Box::<ArrowPageWriter>::default())
1053 }
1054
1055 fn get_arrow_column_writer(
1058 &self,
1059 data_type: &ArrowDataType,
1060 props: &WriterPropertiesPtr,
1061 leaves: &mut Iter<'_, ColumnDescPtr>,
1062 out: &mut Vec<ArrowColumnWriter>,
1063 ) -> Result<()> {
1064 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1066 let page_writer = self.create_page_writer(desc, out.len())?;
1067 let chunk = page_writer.buffer.clone();
1068 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1069 Ok(ArrowColumnWriter {
1070 chunk,
1071 writer: ArrowColumnWriterImpl::Column(writer),
1072 })
1073 };
1074
1075 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1077 let page_writer = self.create_page_writer(desc, out.len())?;
1078 let chunk = page_writer.buffer.clone();
1079 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1080 Ok(ArrowColumnWriter {
1081 chunk,
1082 writer: ArrowColumnWriterImpl::ByteArray(writer),
1083 })
1084 };
1085
1086 match data_type {
1087 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1088 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1089 out.push(col(leaves.next().unwrap())?)
1090 }
1091 ArrowDataType::LargeBinary
1092 | ArrowDataType::Binary
1093 | ArrowDataType::Utf8
1094 | ArrowDataType::LargeUtf8
1095 | ArrowDataType::BinaryView
1096 | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1097 ArrowDataType::List(f)
1098 | ArrowDataType::LargeList(f)
1099 | ArrowDataType::FixedSizeList(f, _) => {
1100 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1101 }
1102 ArrowDataType::Struct(fields) => {
1103 for field in fields {
1104 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1105 }
1106 }
1107 ArrowDataType::Map(f, _) => match f.data_type() {
1108 ArrowDataType::Struct(f) => {
1109 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1110 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1111 }
1112 _ => unreachable!("invalid map type"),
1113 },
1114 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1115 ArrowDataType::Utf8
1116 | ArrowDataType::LargeUtf8
1117 | ArrowDataType::Binary
1118 | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1119 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1120 out.push(bytes(leaves.next().unwrap())?)
1121 }
1122 ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1123 _ => out.push(col(leaves.next().unwrap())?),
1124 },
1125 _ => {
1126 return Err(ParquetError::NYI(format!(
1127 "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1128 )));
1129 }
1130 }
1131 Ok(())
1132 }
1133}
1134
1135fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1136 let column = levels.array().as_ref();
1137 let indices = levels.non_null_indices();
1138 match writer {
1139 ColumnWriter::Int32ColumnWriter(typed) => {
1140 match column.data_type() {
1141 ArrowDataType::Date64 => {
1142 let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1144 let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1145
1146 let array = array.as_primitive::<Int32Type>();
1147 write_primitive(typed, array.values(), levels)
1148 }
1149 ArrowDataType::UInt32 => {
1150 let values = column.as_primitive::<UInt32Type>().values();
1151 let array = values.inner().typed_data::<i32>();
1154 write_primitive(typed, array, levels)
1155 }
1156 ArrowDataType::Decimal32(_, _) => {
1157 let array = column
1158 .as_primitive::<Decimal32Type>()
1159 .unary::<_, Int32Type>(|v| v);
1160 write_primitive(typed, array.values(), levels)
1161 }
1162 ArrowDataType::Decimal64(_, _) => {
1163 let array = column
1165 .as_primitive::<Decimal64Type>()
1166 .unary::<_, Int32Type>(|v| v as i32);
1167 write_primitive(typed, array.values(), levels)
1168 }
1169 ArrowDataType::Decimal128(_, _) => {
1170 let array = column
1172 .as_primitive::<Decimal128Type>()
1173 .unary::<_, Int32Type>(|v| v as i32);
1174 write_primitive(typed, array.values(), levels)
1175 }
1176 ArrowDataType::Decimal256(_, _) => {
1177 let array = column
1179 .as_primitive::<Decimal256Type>()
1180 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1181 write_primitive(typed, array.values(), levels)
1182 }
1183 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1184 ArrowDataType::Decimal32(_, _) => {
1185 let array = arrow_cast::cast(column, value_type)?;
1186 let array = array
1187 .as_primitive::<Decimal32Type>()
1188 .unary::<_, Int32Type>(|v| v);
1189 write_primitive(typed, array.values(), levels)
1190 }
1191 ArrowDataType::Decimal64(_, _) => {
1192 let array = arrow_cast::cast(column, value_type)?;
1193 let array = array
1194 .as_primitive::<Decimal64Type>()
1195 .unary::<_, Int32Type>(|v| v as i32);
1196 write_primitive(typed, array.values(), levels)
1197 }
1198 ArrowDataType::Decimal128(_, _) => {
1199 let array = arrow_cast::cast(column, value_type)?;
1200 let array = array
1201 .as_primitive::<Decimal128Type>()
1202 .unary::<_, Int32Type>(|v| v as i32);
1203 write_primitive(typed, array.values(), levels)
1204 }
1205 ArrowDataType::Decimal256(_, _) => {
1206 let array = arrow_cast::cast(column, value_type)?;
1207 let array = array
1208 .as_primitive::<Decimal256Type>()
1209 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1210 write_primitive(typed, array.values(), levels)
1211 }
1212 _ => {
1213 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1214 let array = array.as_primitive::<Int32Type>();
1215 write_primitive(typed, array.values(), levels)
1216 }
1217 },
1218 _ => {
1219 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1220 let array = array.as_primitive::<Int32Type>();
1221 write_primitive(typed, array.values(), levels)
1222 }
1223 }
1224 }
1225 ColumnWriter::BoolColumnWriter(typed) => {
1226 let array = column.as_boolean();
1227 typed.write_batch(
1228 get_bool_array_slice(array, indices).as_slice(),
1229 levels.def_levels(),
1230 levels.rep_levels(),
1231 )
1232 }
1233 ColumnWriter::Int64ColumnWriter(typed) => {
1234 match column.data_type() {
1235 ArrowDataType::Date64 => {
1236 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1237
1238 let array = array.as_primitive::<Int64Type>();
1239 write_primitive(typed, array.values(), levels)
1240 }
1241 ArrowDataType::Int64 => {
1242 let array = column.as_primitive::<Int64Type>();
1243 write_primitive(typed, array.values(), levels)
1244 }
1245 ArrowDataType::UInt64 => {
1246 let values = column.as_primitive::<UInt64Type>().values();
1247 let array = values.inner().typed_data::<i64>();
1250 write_primitive(typed, array, levels)
1251 }
1252 ArrowDataType::Decimal64(_, _) => {
1253 let array = column
1254 .as_primitive::<Decimal64Type>()
1255 .unary::<_, Int64Type>(|v| v);
1256 write_primitive(typed, array.values(), levels)
1257 }
1258 ArrowDataType::Decimal128(_, _) => {
1259 let array = column
1261 .as_primitive::<Decimal128Type>()
1262 .unary::<_, Int64Type>(|v| v as i64);
1263 write_primitive(typed, array.values(), levels)
1264 }
1265 ArrowDataType::Decimal256(_, _) => {
1266 let array = column
1268 .as_primitive::<Decimal256Type>()
1269 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1270 write_primitive(typed, array.values(), levels)
1271 }
1272 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1273 ArrowDataType::Decimal64(_, _) => {
1274 let array = arrow_cast::cast(column, value_type)?;
1275 let array = array
1276 .as_primitive::<Decimal64Type>()
1277 .unary::<_, Int64Type>(|v| v);
1278 write_primitive(typed, array.values(), levels)
1279 }
1280 ArrowDataType::Decimal128(_, _) => {
1281 let array = arrow_cast::cast(column, value_type)?;
1282 let array = array
1283 .as_primitive::<Decimal128Type>()
1284 .unary::<_, Int64Type>(|v| v as i64);
1285 write_primitive(typed, array.values(), levels)
1286 }
1287 ArrowDataType::Decimal256(_, _) => {
1288 let array = arrow_cast::cast(column, value_type)?;
1289 let array = array
1290 .as_primitive::<Decimal256Type>()
1291 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1292 write_primitive(typed, array.values(), levels)
1293 }
1294 _ => {
1295 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1296 let array = array.as_primitive::<Int64Type>();
1297 write_primitive(typed, array.values(), levels)
1298 }
1299 },
1300 _ => {
1301 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1302 let array = array.as_primitive::<Int64Type>();
1303 write_primitive(typed, array.values(), levels)
1304 }
1305 }
1306 }
1307 ColumnWriter::Int96ColumnWriter(_typed) => {
1308 unreachable!("Currently unreachable because data type not supported")
1309 }
1310 ColumnWriter::FloatColumnWriter(typed) => {
1311 let array = column.as_primitive::<Float32Type>();
1312 write_primitive(typed, array.values(), levels)
1313 }
1314 ColumnWriter::DoubleColumnWriter(typed) => {
1315 let array = column.as_primitive::<Float64Type>();
1316 write_primitive(typed, array.values(), levels)
1317 }
1318 ColumnWriter::ByteArrayColumnWriter(_) => {
1319 unreachable!("should use ByteArrayWriter")
1320 }
1321 ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1322 let bytes = match column.data_type() {
1323 ArrowDataType::Interval(interval_unit) => match interval_unit {
1324 IntervalUnit::YearMonth => {
1325 let array = column
1326 .as_any()
1327 .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1328 .unwrap();
1329 get_interval_ym_array_slice(array, indices)
1330 }
1331 IntervalUnit::DayTime => {
1332 let array = column
1333 .as_any()
1334 .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1335 .unwrap();
1336 get_interval_dt_array_slice(array, indices)
1337 }
1338 _ => {
1339 return Err(ParquetError::NYI(format!(
1340 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1341 )));
1342 }
1343 },
1344 ArrowDataType::FixedSizeBinary(_) => {
1345 let array = column
1346 .as_any()
1347 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1348 .unwrap();
1349 get_fsb_array_slice(array, indices)
1350 }
1351 ArrowDataType::Decimal32(_, _) => {
1352 let array = column.as_primitive::<Decimal32Type>();
1353 get_decimal_32_array_slice(array, indices)
1354 }
1355 ArrowDataType::Decimal64(_, _) => {
1356 let array = column.as_primitive::<Decimal64Type>();
1357 get_decimal_64_array_slice(array, indices)
1358 }
1359 ArrowDataType::Decimal128(_, _) => {
1360 let array = column.as_primitive::<Decimal128Type>();
1361 get_decimal_128_array_slice(array, indices)
1362 }
1363 ArrowDataType::Decimal256(_, _) => {
1364 let array = column
1365 .as_any()
1366 .downcast_ref::<arrow_array::Decimal256Array>()
1367 .unwrap();
1368 get_decimal_256_array_slice(array, indices)
1369 }
1370 ArrowDataType::Float16 => {
1371 let array = column.as_primitive::<Float16Type>();
1372 get_float_16_array_slice(array, indices)
1373 }
1374 _ => {
1375 return Err(ParquetError::NYI(
1376 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1377 ));
1378 }
1379 };
1380 typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1381 }
1382 }
1383}
1384
1385fn write_primitive<E: ColumnValueEncoder>(
1386 writer: &mut GenericColumnWriter<E>,
1387 values: &E::Values,
1388 levels: &ArrayLevels,
1389) -> Result<usize> {
1390 writer.write_batch_internal(
1391 values,
1392 Some(levels.non_null_indices()),
1393 levels.def_levels(),
1394 levels.rep_levels(),
1395 None,
1396 None,
1397 None,
1398 )
1399}
1400
1401fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1402 let mut values = Vec::with_capacity(indices.len());
1403 for i in indices {
1404 values.push(array.value(*i))
1405 }
1406 values
1407}
1408
1409fn get_interval_ym_array_slice(
1412 array: &arrow_array::IntervalYearMonthArray,
1413 indices: &[usize],
1414) -> Vec<FixedLenByteArray> {
1415 let mut values = Vec::with_capacity(indices.len());
1416 for i in indices {
1417 let mut value = array.value(*i).to_le_bytes().to_vec();
1418 let mut suffix = vec![0; 8];
1419 value.append(&mut suffix);
1420 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1421 }
1422 values
1423}
1424
1425fn get_interval_dt_array_slice(
1428 array: &arrow_array::IntervalDayTimeArray,
1429 indices: &[usize],
1430) -> Vec<FixedLenByteArray> {
1431 let mut values = Vec::with_capacity(indices.len());
1432 for i in indices {
1433 let mut out = [0; 12];
1434 let value = array.value(*i);
1435 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1436 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1437 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1438 }
1439 values
1440}
1441
1442fn get_decimal_32_array_slice(
1443 array: &arrow_array::Decimal32Array,
1444 indices: &[usize],
1445) -> Vec<FixedLenByteArray> {
1446 let mut values = Vec::with_capacity(indices.len());
1447 let size = decimal_length_from_precision(array.precision());
1448 for i in indices {
1449 let as_be_bytes = array.value(*i).to_be_bytes();
1450 let resized_value = as_be_bytes[(4 - size)..].to_vec();
1451 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1452 }
1453 values
1454}
1455
1456fn get_decimal_64_array_slice(
1457 array: &arrow_array::Decimal64Array,
1458 indices: &[usize],
1459) -> Vec<FixedLenByteArray> {
1460 let mut values = Vec::with_capacity(indices.len());
1461 let size = decimal_length_from_precision(array.precision());
1462 for i in indices {
1463 let as_be_bytes = array.value(*i).to_be_bytes();
1464 let resized_value = as_be_bytes[(8 - size)..].to_vec();
1465 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1466 }
1467 values
1468}
1469
1470fn get_decimal_128_array_slice(
1471 array: &arrow_array::Decimal128Array,
1472 indices: &[usize],
1473) -> Vec<FixedLenByteArray> {
1474 let mut values = Vec::with_capacity(indices.len());
1475 let size = decimal_length_from_precision(array.precision());
1476 for i in indices {
1477 let as_be_bytes = array.value(*i).to_be_bytes();
1478 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1479 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1480 }
1481 values
1482}
1483
1484fn get_decimal_256_array_slice(
1485 array: &arrow_array::Decimal256Array,
1486 indices: &[usize],
1487) -> Vec<FixedLenByteArray> {
1488 let mut values = Vec::with_capacity(indices.len());
1489 let size = decimal_length_from_precision(array.precision());
1490 for i in indices {
1491 let as_be_bytes = array.value(*i).to_be_bytes();
1492 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1493 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1494 }
1495 values
1496}
1497
1498fn get_float_16_array_slice(
1499 array: &arrow_array::Float16Array,
1500 indices: &[usize],
1501) -> Vec<FixedLenByteArray> {
1502 let mut values = Vec::with_capacity(indices.len());
1503 for i in indices {
1504 let value = array.value(*i).to_le_bytes().to_vec();
1505 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1506 }
1507 values
1508}
1509
1510fn get_fsb_array_slice(
1511 array: &arrow_array::FixedSizeBinaryArray,
1512 indices: &[usize],
1513) -> Vec<FixedLenByteArray> {
1514 let mut values = Vec::with_capacity(indices.len());
1515 for i in indices {
1516 let value = array.value(*i).to_vec();
1517 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1518 }
1519 values
1520}
1521
1522#[cfg(test)]
1523mod tests {
1524 use super::*;
1525 use std::collections::HashMap;
1526
1527 use std::fs::File;
1528
1529 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1530 use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1531 use crate::column::page::{Page, PageReader};
1532 use crate::file::metadata::thrift::PageHeader;
1533 use crate::file::page_index::column_index::ColumnIndexMetaData;
1534 use crate::file::reader::SerializedPageReader;
1535 use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1536 use crate::schema::types::{ColumnPath, Type};
1537 use arrow::datatypes::ToByteSlice;
1538 use arrow::datatypes::{DataType, Schema};
1539 use arrow::error::Result as ArrowResult;
1540 use arrow::util::data_gen::create_random_array;
1541 use arrow::util::pretty::pretty_format_batches;
1542 use arrow::{array::*, buffer::Buffer};
1543 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1544 use arrow_schema::Fields;
1545 use half::f16;
1546 use num_traits::{FromPrimitive, ToPrimitive};
1547 use tempfile::tempfile;
1548
1549 use crate::basic::Encoding;
1550 use crate::data_type::AsBytes;
1551 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1552 use crate::file::properties::{
1553 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1554 };
1555 use crate::file::serialized_reader::ReadOptionsBuilder;
1556 use crate::file::{
1557 reader::{FileReader, SerializedFileReader},
1558 statistics::Statistics,
1559 };
1560
1561 #[test]
1562 fn arrow_writer() {
1563 let schema = Schema::new(vec![
1565 Field::new("a", DataType::Int32, false),
1566 Field::new("b", DataType::Int32, true),
1567 ]);
1568
1569 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1571 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1572
1573 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1575
1576 roundtrip(batch, Some(SMALL_SIZE / 2));
1577 }
1578
1579 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1580 let mut buffer = vec![];
1581
1582 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1583 writer.write(expected_batch).unwrap();
1584 writer.close().unwrap();
1585
1586 buffer
1587 }
1588
1589 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1590 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1591 writer.write(expected_batch).unwrap();
1592 writer.into_inner().unwrap()
1593 }
1594
1595 #[test]
1596 fn roundtrip_bytes() {
1597 let schema = Arc::new(Schema::new(vec![
1599 Field::new("a", DataType::Int32, false),
1600 Field::new("b", DataType::Int32, true),
1601 ]));
1602
1603 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1605 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1606
1607 let expected_batch =
1609 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1610
1611 for buffer in [
1612 get_bytes_after_close(schema.clone(), &expected_batch),
1613 get_bytes_by_into_inner(schema, &expected_batch),
1614 ] {
1615 let cursor = Bytes::from(buffer);
1616 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1617
1618 let actual_batch = record_batch_reader
1619 .next()
1620 .expect("No batch found")
1621 .expect("Unable to get batch");
1622
1623 assert_eq!(expected_batch.schema(), actual_batch.schema());
1624 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1625 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1626 for i in 0..expected_batch.num_columns() {
1627 let expected_data = expected_batch.column(i).to_data();
1628 let actual_data = actual_batch.column(i).to_data();
1629
1630 assert_eq!(expected_data, actual_data);
1631 }
1632 }
1633 }
1634
1635 #[test]
1636 fn arrow_writer_non_null() {
1637 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1639
1640 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1642
1643 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1645
1646 roundtrip(batch, Some(SMALL_SIZE / 2));
1647 }
1648
1649 #[test]
1650 fn arrow_writer_list() {
1651 let schema = Schema::new(vec![Field::new(
1653 "a",
1654 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1655 true,
1656 )]);
1657
1658 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1660
1661 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1664
1665 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1667 DataType::Int32,
1668 false,
1669 ))))
1670 .len(5)
1671 .add_buffer(a_value_offsets)
1672 .add_child_data(a_values.into_data())
1673 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1674 .build()
1675 .unwrap();
1676 let a = ListArray::from(a_list_data);
1677
1678 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1680
1681 assert_eq!(batch.column(0).null_count(), 1);
1682
1683 roundtrip(batch, None);
1686 }
1687
1688 #[test]
1689 fn arrow_writer_list_non_null() {
1690 let schema = Schema::new(vec![Field::new(
1692 "a",
1693 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1694 false,
1695 )]);
1696
1697 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1699
1700 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1703
1704 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1706 DataType::Int32,
1707 false,
1708 ))))
1709 .len(5)
1710 .add_buffer(a_value_offsets)
1711 .add_child_data(a_values.into_data())
1712 .build()
1713 .unwrap();
1714 let a = ListArray::from(a_list_data);
1715
1716 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1718
1719 assert_eq!(batch.column(0).null_count(), 0);
1722
1723 roundtrip(batch, None);
1724 }
1725
1726 #[test]
1727 fn arrow_writer_binary() {
1728 let string_field = Field::new("a", DataType::Utf8, false);
1729 let binary_field = Field::new("b", DataType::Binary, false);
1730 let schema = Schema::new(vec![string_field, binary_field]);
1731
1732 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1733 let raw_binary_values = [
1734 b"foo".to_vec(),
1735 b"bar".to_vec(),
1736 b"baz".to_vec(),
1737 b"quux".to_vec(),
1738 ];
1739 let raw_binary_value_refs = raw_binary_values
1740 .iter()
1741 .map(|x| x.as_slice())
1742 .collect::<Vec<_>>();
1743
1744 let string_values = StringArray::from(raw_string_values.clone());
1745 let binary_values = BinaryArray::from(raw_binary_value_refs);
1746 let batch = RecordBatch::try_new(
1747 Arc::new(schema),
1748 vec![Arc::new(string_values), Arc::new(binary_values)],
1749 )
1750 .unwrap();
1751
1752 roundtrip(batch, Some(SMALL_SIZE / 2));
1753 }
1754
1755 #[test]
1756 fn arrow_writer_binary_view() {
1757 let string_field = Field::new("a", DataType::Utf8View, false);
1758 let binary_field = Field::new("b", DataType::BinaryView, false);
1759 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1760 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1761
1762 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1763 let raw_binary_values = vec![
1764 b"foo".to_vec(),
1765 b"bar".to_vec(),
1766 b"large payload over 12 bytes".to_vec(),
1767 b"lulu".to_vec(),
1768 ];
1769 let nullable_string_values =
1770 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1771
1772 let string_view_values = StringViewArray::from(raw_string_values);
1773 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1774 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1775 let batch = RecordBatch::try_new(
1776 Arc::new(schema),
1777 vec![
1778 Arc::new(string_view_values),
1779 Arc::new(binary_view_values),
1780 Arc::new(nullable_string_view_values),
1781 ],
1782 )
1783 .unwrap();
1784
1785 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1786 roundtrip(batch, None);
1787 }
1788
1789 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1790 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1791 let schema = Schema::new(vec![decimal_field]);
1792
1793 let decimal_values = vec![10_000, 50_000, 0, -100]
1794 .into_iter()
1795 .map(Some)
1796 .collect::<Decimal128Array>()
1797 .with_precision_and_scale(precision, scale)
1798 .unwrap();
1799
1800 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1801 }
1802
1803 #[test]
1804 fn arrow_writer_decimal() {
1805 let batch_int32_decimal = get_decimal_batch(5, 2);
1807 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1808 let batch_int64_decimal = get_decimal_batch(12, 2);
1810 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1811 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1813 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1814 }
1815
1816 #[test]
1817 fn arrow_writer_complex() {
1818 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1820 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1821 let struct_field_g = Arc::new(Field::new_list(
1822 "g",
1823 Field::new_list_field(DataType::Int16, true),
1824 false,
1825 ));
1826 let struct_field_h = Arc::new(Field::new_list(
1827 "h",
1828 Field::new_list_field(DataType::Int16, false),
1829 true,
1830 ));
1831 let struct_field_e = Arc::new(Field::new_struct(
1832 "e",
1833 vec![
1834 struct_field_f.clone(),
1835 struct_field_g.clone(),
1836 struct_field_h.clone(),
1837 ],
1838 false,
1839 ));
1840 let schema = Schema::new(vec![
1841 Field::new("a", DataType::Int32, false),
1842 Field::new("b", DataType::Int32, true),
1843 Field::new_struct(
1844 "c",
1845 vec![struct_field_d.clone(), struct_field_e.clone()],
1846 false,
1847 ),
1848 ]);
1849
1850 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1852 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1853 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1854 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1855
1856 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1857
1858 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1861
1862 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1864 .len(5)
1865 .add_buffer(g_value_offsets.clone())
1866 .add_child_data(g_value.to_data())
1867 .build()
1868 .unwrap();
1869 let g = ListArray::from(g_list_data);
1870 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1872 .len(5)
1873 .add_buffer(g_value_offsets)
1874 .add_child_data(g_value.to_data())
1875 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1876 .build()
1877 .unwrap();
1878 let h = ListArray::from(h_list_data);
1879
1880 let e = StructArray::from(vec![
1881 (struct_field_f, Arc::new(f) as ArrayRef),
1882 (struct_field_g, Arc::new(g) as ArrayRef),
1883 (struct_field_h, Arc::new(h) as ArrayRef),
1884 ]);
1885
1886 let c = StructArray::from(vec![
1887 (struct_field_d, Arc::new(d) as ArrayRef),
1888 (struct_field_e, Arc::new(e) as ArrayRef),
1889 ]);
1890
1891 let batch = RecordBatch::try_new(
1893 Arc::new(schema),
1894 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1895 )
1896 .unwrap();
1897
1898 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1899 roundtrip(batch, Some(SMALL_SIZE / 3));
1900 }
1901
1902 #[test]
1903 fn arrow_writer_complex_mixed() {
1904 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1909 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1910 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1911 let schema = Schema::new(vec![Field::new(
1912 "some_nested_object",
1913 DataType::Struct(Fields::from(vec![
1914 offset_field.clone(),
1915 partition_field.clone(),
1916 topic_field.clone(),
1917 ])),
1918 false,
1919 )]);
1920
1921 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1923 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1924 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1925
1926 let some_nested_object = StructArray::from(vec![
1927 (offset_field, Arc::new(offset) as ArrayRef),
1928 (partition_field, Arc::new(partition) as ArrayRef),
1929 (topic_field, Arc::new(topic) as ArrayRef),
1930 ]);
1931
1932 let batch =
1934 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1935
1936 roundtrip(batch, Some(SMALL_SIZE / 2));
1937 }
1938
1939 #[test]
1940 fn arrow_writer_map() {
1941 let json_content = r#"
1943 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1944 {"stocks":{"long": null, "long": "$CCC", "short": null}}
1945 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1946 "#;
1947 let entries_struct_type = DataType::Struct(Fields::from(vec![
1948 Field::new("key", DataType::Utf8, false),
1949 Field::new("value", DataType::Utf8, true),
1950 ]));
1951 let stocks_field = Field::new(
1952 "stocks",
1953 DataType::Map(
1954 Arc::new(Field::new("entries", entries_struct_type, false)),
1955 false,
1956 ),
1957 true,
1958 );
1959 let schema = Arc::new(Schema::new(vec![stocks_field]));
1960 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1961 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1962
1963 let batch = reader.next().unwrap().unwrap();
1964 roundtrip(batch, None);
1965 }
1966
1967 #[test]
1968 fn arrow_writer_2_level_struct() {
1969 let field_c = Field::new("c", DataType::Int32, true);
1971 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1972 let type_a = DataType::Struct(vec![field_b.clone()].into());
1973 let field_a = Field::new("a", type_a, true);
1974 let schema = Schema::new(vec![field_a.clone()]);
1975
1976 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1978 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1979 .len(6)
1980 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1981 .add_child_data(c.into_data())
1982 .build()
1983 .unwrap();
1984 let b = StructArray::from(b_data);
1985 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1986 .len(6)
1987 .null_bit_buffer(Some(Buffer::from([0b00101111])))
1988 .add_child_data(b.into_data())
1989 .build()
1990 .unwrap();
1991 let a = StructArray::from(a_data);
1992
1993 assert_eq!(a.null_count(), 1);
1994 assert_eq!(a.column(0).null_count(), 2);
1995
1996 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1998
1999 roundtrip(batch, Some(SMALL_SIZE / 2));
2000 }
2001
2002 #[test]
2003 fn arrow_writer_2_level_struct_non_null() {
2004 let field_c = Field::new("c", DataType::Int32, false);
2006 let type_b = DataType::Struct(vec![field_c].into());
2007 let field_b = Field::new("b", type_b.clone(), false);
2008 let type_a = DataType::Struct(vec![field_b].into());
2009 let field_a = Field::new("a", type_a.clone(), false);
2010 let schema = Schema::new(vec![field_a]);
2011
2012 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2014 let b_data = ArrayDataBuilder::new(type_b)
2015 .len(6)
2016 .add_child_data(c.into_data())
2017 .build()
2018 .unwrap();
2019 let b = StructArray::from(b_data);
2020 let a_data = ArrayDataBuilder::new(type_a)
2021 .len(6)
2022 .add_child_data(b.into_data())
2023 .build()
2024 .unwrap();
2025 let a = StructArray::from(a_data);
2026
2027 assert_eq!(a.null_count(), 0);
2028 assert_eq!(a.column(0).null_count(), 0);
2029
2030 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2032
2033 roundtrip(batch, Some(SMALL_SIZE / 2));
2034 }
2035
2036 #[test]
2037 fn arrow_writer_2_level_struct_mixed_null() {
2038 let field_c = Field::new("c", DataType::Int32, false);
2040 let type_b = DataType::Struct(vec![field_c].into());
2041 let field_b = Field::new("b", type_b.clone(), true);
2042 let type_a = DataType::Struct(vec![field_b].into());
2043 let field_a = Field::new("a", type_a.clone(), false);
2044 let schema = Schema::new(vec![field_a]);
2045
2046 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2048 let b_data = ArrayDataBuilder::new(type_b)
2049 .len(6)
2050 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2051 .add_child_data(c.into_data())
2052 .build()
2053 .unwrap();
2054 let b = StructArray::from(b_data);
2055 let a_data = ArrayDataBuilder::new(type_a)
2057 .len(6)
2058 .add_child_data(b.into_data())
2059 .build()
2060 .unwrap();
2061 let a = StructArray::from(a_data);
2062
2063 assert_eq!(a.null_count(), 0);
2064 assert_eq!(a.column(0).null_count(), 2);
2065
2066 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2068
2069 roundtrip(batch, Some(SMALL_SIZE / 2));
2070 }
2071
2072 #[test]
2073 fn arrow_writer_2_level_struct_mixed_null_2() {
2074 let field_c = Field::new("c", DataType::Int32, false);
2076 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2077 let field_e = Field::new(
2078 "e",
2079 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2080 false,
2081 );
2082
2083 let field_b = Field::new(
2084 "b",
2085 DataType::Struct(vec![field_c, field_d, field_e].into()),
2086 false,
2087 );
2088 let type_a = DataType::Struct(vec![field_b.clone()].into());
2089 let field_a = Field::new("a", type_a, true);
2090 let schema = Schema::new(vec![field_a.clone()]);
2091
2092 let c = Int32Array::from_iter_values(0..6);
2094 let d = FixedSizeBinaryArray::try_from_iter(
2095 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2096 )
2097 .expect("four byte values");
2098 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2099 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2100 .len(6)
2101 .add_child_data(c.into_data())
2102 .add_child_data(d.into_data())
2103 .add_child_data(e.into_data())
2104 .build()
2105 .unwrap();
2106 let b = StructArray::from(b_data);
2107 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2108 .len(6)
2109 .null_bit_buffer(Some(Buffer::from([0b00100101])))
2110 .add_child_data(b.into_data())
2111 .build()
2112 .unwrap();
2113 let a = StructArray::from(a_data);
2114
2115 assert_eq!(a.null_count(), 3);
2116 assert_eq!(a.column(0).null_count(), 0);
2117
2118 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2120
2121 roundtrip(batch, Some(SMALL_SIZE / 2));
2122 }
2123
2124 #[test]
2125 fn test_fixed_size_binary_in_dict() {
2126 fn test_fixed_size_binary_in_dict_inner<K>()
2127 where
2128 K: ArrowDictionaryKeyType,
2129 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2130 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2131 {
2132 let field = Field::new(
2133 "a",
2134 DataType::Dictionary(
2135 Box::new(K::DATA_TYPE),
2136 Box::new(DataType::FixedSizeBinary(4)),
2137 ),
2138 false,
2139 );
2140 let schema = Schema::new(vec![field]);
2141
2142 let keys: Vec<K::Native> = vec![
2143 K::Native::try_from(0u8).unwrap(),
2144 K::Native::try_from(0u8).unwrap(),
2145 K::Native::try_from(1u8).unwrap(),
2146 ];
2147 let keys = PrimitiveArray::<K>::from_iter_values(keys);
2148 let values = FixedSizeBinaryArray::try_from_iter(
2149 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2150 )
2151 .unwrap();
2152
2153 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2154 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2155 roundtrip(batch, None);
2156 }
2157
2158 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2159 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2160 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2161 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2162 test_fixed_size_binary_in_dict_inner::<Int8Type>();
2163 test_fixed_size_binary_in_dict_inner::<Int16Type>();
2164 test_fixed_size_binary_in_dict_inner::<Int32Type>();
2165 test_fixed_size_binary_in_dict_inner::<Int64Type>();
2166 }
2167
2168 #[test]
2169 fn test_empty_dict() {
2170 let struct_fields = Fields::from(vec![Field::new(
2171 "dict",
2172 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2173 false,
2174 )]);
2175
2176 let schema = Schema::new(vec![Field::new_struct(
2177 "struct",
2178 struct_fields.clone(),
2179 true,
2180 )]);
2181 let dictionary = Arc::new(DictionaryArray::new(
2182 Int32Array::new_null(5),
2183 Arc::new(StringArray::new_null(0)),
2184 ));
2185
2186 let s = StructArray::new(
2187 struct_fields,
2188 vec![dictionary],
2189 Some(NullBuffer::new_null(5)),
2190 );
2191
2192 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2193 roundtrip(batch, None);
2194 }
2195 #[test]
2196 fn arrow_writer_page_size() {
2197 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2198
2199 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2200
2201 for i in 0..10 {
2203 let value = i
2204 .to_string()
2205 .repeat(10)
2206 .chars()
2207 .take(10)
2208 .collect::<String>();
2209
2210 builder.append_value(value);
2211 }
2212
2213 let array = Arc::new(builder.finish());
2214
2215 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2216
2217 let file = tempfile::tempfile().unwrap();
2218
2219 let props = WriterProperties::builder()
2221 .set_data_page_size_limit(1)
2222 .set_dictionary_page_size_limit(1)
2223 .set_write_batch_size(1)
2224 .build();
2225
2226 let mut writer =
2227 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2228 .expect("Unable to write file");
2229 writer.write(&batch).unwrap();
2230 writer.close().unwrap();
2231
2232 let options = ReadOptionsBuilder::new().with_page_index().build();
2233 let reader =
2234 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2235
2236 let column = reader.metadata().row_group(0).columns();
2237
2238 assert_eq!(column.len(), 1);
2239
2240 assert!(
2243 column[0].dictionary_page_offset().is_some(),
2244 "Expected a dictionary page"
2245 );
2246
2247 assert!(reader.metadata().offset_index().is_some());
2248 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2249
2250 let page_locations = offset_indexes[0].page_locations.clone();
2251
2252 assert_eq!(
2255 page_locations.len(),
2256 10,
2257 "Expected 10 pages but got {page_locations:#?}"
2258 );
2259 }
2260
2261 #[test]
2262 fn arrow_writer_float_nans() {
2263 let f16_field = Field::new("a", DataType::Float16, false);
2264 let f32_field = Field::new("b", DataType::Float32, false);
2265 let f64_field = Field::new("c", DataType::Float64, false);
2266 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2267
2268 let f16_values = (0..MEDIUM_SIZE)
2269 .map(|i| {
2270 Some(if i % 2 == 0 {
2271 f16::NAN
2272 } else {
2273 f16::from_f32(i as f32)
2274 })
2275 })
2276 .collect::<Float16Array>();
2277
2278 let f32_values = (0..MEDIUM_SIZE)
2279 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2280 .collect::<Float32Array>();
2281
2282 let f64_values = (0..MEDIUM_SIZE)
2283 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2284 .collect::<Float64Array>();
2285
2286 let batch = RecordBatch::try_new(
2287 Arc::new(schema),
2288 vec![
2289 Arc::new(f16_values),
2290 Arc::new(f32_values),
2291 Arc::new(f64_values),
2292 ],
2293 )
2294 .unwrap();
2295
2296 roundtrip(batch, None);
2297 }
2298
2299 const SMALL_SIZE: usize = 7;
2300 const MEDIUM_SIZE: usize = 63;
2301
2302 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
2303 let mut files = vec![];
2304 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2305 let mut props = WriterProperties::builder().set_writer_version(version);
2306
2307 if let Some(size) = max_row_group_size {
2308 props = props.set_max_row_group_size(size)
2309 }
2310
2311 let props = props.build();
2312 files.push(roundtrip_opts(&expected_batch, props))
2313 }
2314 files
2315 }
2316
2317 fn roundtrip_opts_with_array_validation<F>(
2321 expected_batch: &RecordBatch,
2322 props: WriterProperties,
2323 validate: F,
2324 ) -> Bytes
2325 where
2326 F: Fn(&ArrayData, &ArrayData),
2327 {
2328 let mut file = vec![];
2329
2330 let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
2331 .expect("Unable to write file");
2332 writer.write(expected_batch).unwrap();
2333 writer.close().unwrap();
2334
2335 let file = Bytes::from(file);
2336 let mut record_batch_reader =
2337 ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
2338
2339 let actual_batch = record_batch_reader
2340 .next()
2341 .expect("No batch found")
2342 .expect("Unable to get batch");
2343
2344 assert_eq!(expected_batch.schema(), actual_batch.schema());
2345 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2346 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2347 for i in 0..expected_batch.num_columns() {
2348 let expected_data = expected_batch.column(i).to_data();
2349 let actual_data = actual_batch.column(i).to_data();
2350 validate(&expected_data, &actual_data);
2351 }
2352
2353 file
2354 }
2355
2356 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
2357 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2358 a.validate_full().expect("valid expected data");
2359 b.validate_full().expect("valid actual data");
2360 assert_eq!(a, b)
2361 })
2362 }
2363
2364 struct RoundTripOptions {
2365 values: ArrayRef,
2366 schema: SchemaRef,
2367 bloom_filter: bool,
2368 bloom_filter_position: BloomFilterPosition,
2369 }
2370
2371 impl RoundTripOptions {
2372 fn new(values: ArrayRef, nullable: bool) -> Self {
2373 let data_type = values.data_type().clone();
2374 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2375 Self {
2376 values,
2377 schema: Arc::new(schema),
2378 bloom_filter: false,
2379 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2380 }
2381 }
2382 }
2383
2384 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
2385 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2386 }
2387
2388 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
2389 let mut options = RoundTripOptions::new(values, false);
2390 options.schema = schema;
2391 one_column_roundtrip_with_options(options)
2392 }
2393
2394 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
2395 let RoundTripOptions {
2396 values,
2397 schema,
2398 bloom_filter,
2399 bloom_filter_position,
2400 } = options;
2401
2402 let encodings = match values.data_type() {
2403 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2404 vec![
2405 Encoding::PLAIN,
2406 Encoding::DELTA_BYTE_ARRAY,
2407 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2408 ]
2409 }
2410 DataType::Int64
2411 | DataType::Int32
2412 | DataType::Int16
2413 | DataType::Int8
2414 | DataType::UInt64
2415 | DataType::UInt32
2416 | DataType::UInt16
2417 | DataType::UInt8 => vec![
2418 Encoding::PLAIN,
2419 Encoding::DELTA_BINARY_PACKED,
2420 Encoding::BYTE_STREAM_SPLIT,
2421 ],
2422 DataType::Float32 | DataType::Float64 => {
2423 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2424 }
2425 _ => vec![Encoding::PLAIN],
2426 };
2427
2428 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2429
2430 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2431
2432 let mut files = vec![];
2433 for dictionary_size in [0, 1, 1024] {
2434 for encoding in &encodings {
2435 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2436 for row_group_size in row_group_sizes {
2437 let props = WriterProperties::builder()
2438 .set_writer_version(version)
2439 .set_max_row_group_size(row_group_size)
2440 .set_dictionary_enabled(dictionary_size != 0)
2441 .set_dictionary_page_size_limit(dictionary_size.max(1))
2442 .set_encoding(*encoding)
2443 .set_bloom_filter_enabled(bloom_filter)
2444 .set_bloom_filter_position(bloom_filter_position)
2445 .build();
2446
2447 files.push(roundtrip_opts(&expected_batch, props))
2448 }
2449 }
2450 }
2451 }
2452 files
2453 }
2454
2455 fn values_required<A, I>(iter: I) -> Vec<Bytes>
2456 where
2457 A: From<Vec<I::Item>> + Array + 'static,
2458 I: IntoIterator,
2459 {
2460 let raw_values: Vec<_> = iter.into_iter().collect();
2461 let values = Arc::new(A::from(raw_values));
2462 one_column_roundtrip(values, false)
2463 }
2464
2465 fn values_optional<A, I>(iter: I) -> Vec<Bytes>
2466 where
2467 A: From<Vec<Option<I::Item>>> + Array + 'static,
2468 I: IntoIterator,
2469 {
2470 let optional_raw_values: Vec<_> = iter
2471 .into_iter()
2472 .enumerate()
2473 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2474 .collect();
2475 let optional_values = Arc::new(A::from(optional_raw_values));
2476 one_column_roundtrip(optional_values, true)
2477 }
2478
2479 fn required_and_optional<A, I>(iter: I)
2480 where
2481 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2482 I: IntoIterator + Clone,
2483 {
2484 values_required::<A, I>(iter.clone());
2485 values_optional::<A, I>(iter);
2486 }
2487
2488 fn check_bloom_filter<T: AsBytes>(
2489 files: Vec<Bytes>,
2490 file_column: String,
2491 positive_values: Vec<T>,
2492 negative_values: Vec<T>,
2493 ) {
2494 files.into_iter().take(1).for_each(|file| {
2495 let file_reader = SerializedFileReader::new_with_options(
2496 file,
2497 ReadOptionsBuilder::new()
2498 .with_reader_properties(
2499 ReaderProperties::builder()
2500 .set_read_bloom_filter(true)
2501 .build(),
2502 )
2503 .build(),
2504 )
2505 .expect("Unable to open file as Parquet");
2506 let metadata = file_reader.metadata();
2507
2508 let mut bloom_filters: Vec<_> = vec![];
2510 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2511 if let Some((column_index, _)) = row_group
2512 .columns()
2513 .iter()
2514 .enumerate()
2515 .find(|(_, column)| column.column_path().string() == file_column)
2516 {
2517 let row_group_reader = file_reader
2518 .get_row_group(ri)
2519 .expect("Unable to read row group");
2520 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2521 bloom_filters.push(sbbf.clone());
2522 } else {
2523 panic!("No bloom filter for column named {file_column} found");
2524 }
2525 } else {
2526 panic!("No column named {file_column} found");
2527 }
2528 }
2529
2530 positive_values.iter().for_each(|value| {
2531 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2532 assert!(
2533 found.is_some(),
2534 "{}",
2535 format!("Value {:?} should be in bloom filter", value.as_bytes())
2536 );
2537 });
2538
2539 negative_values.iter().for_each(|value| {
2540 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2541 assert!(
2542 found.is_none(),
2543 "{}",
2544 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2545 );
2546 });
2547 });
2548 }
2549
2550 #[test]
2551 fn all_null_primitive_single_column() {
2552 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2553 one_column_roundtrip(values, true);
2554 }
2555 #[test]
2556 fn null_single_column() {
2557 let values = Arc::new(NullArray::new(SMALL_SIZE));
2558 one_column_roundtrip(values, true);
2559 }
2561
2562 #[test]
2563 fn bool_single_column() {
2564 required_and_optional::<BooleanArray, _>(
2565 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2566 );
2567 }
2568
2569 #[test]
2570 fn bool_large_single_column() {
2571 let values = Arc::new(
2572 [None, Some(true), Some(false)]
2573 .iter()
2574 .cycle()
2575 .copied()
2576 .take(200_000)
2577 .collect::<BooleanArray>(),
2578 );
2579 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2580 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2581 let file = tempfile::tempfile().unwrap();
2582
2583 let mut writer =
2584 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2585 .expect("Unable to write file");
2586 writer.write(&expected_batch).unwrap();
2587 writer.close().unwrap();
2588 }
2589
2590 #[test]
2591 fn check_page_offset_index_with_nan() {
2592 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2593 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2594 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2595
2596 let mut out = Vec::with_capacity(1024);
2597 let mut writer =
2598 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2599 writer.write(&batch).unwrap();
2600 let file_meta_data = writer.close().unwrap();
2601 for row_group in file_meta_data.row_groups() {
2602 for column in row_group.columns() {
2603 assert!(column.offset_index_offset().is_some());
2604 assert!(column.offset_index_length().is_some());
2605 assert!(column.column_index_offset().is_none());
2606 assert!(column.column_index_length().is_none());
2607 }
2608 }
2609 }
2610
2611 #[test]
2612 fn i8_single_column() {
2613 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2614 }
2615
2616 #[test]
2617 fn i16_single_column() {
2618 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2619 }
2620
2621 #[test]
2622 fn i32_single_column() {
2623 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2624 }
2625
2626 #[test]
2627 fn i64_single_column() {
2628 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2629 }
2630
2631 #[test]
2632 fn u8_single_column() {
2633 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2634 }
2635
2636 #[test]
2637 fn u16_single_column() {
2638 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2639 }
2640
2641 #[test]
2642 fn u32_single_column() {
2643 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2644 }
2645
2646 #[test]
2647 fn u64_single_column() {
2648 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2649 }
2650
2651 #[test]
2652 fn f32_single_column() {
2653 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2654 }
2655
2656 #[test]
2657 fn f64_single_column() {
2658 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2659 }
2660
2661 #[test]
2666 fn timestamp_second_single_column() {
2667 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2668 let values = Arc::new(TimestampSecondArray::from(raw_values));
2669
2670 one_column_roundtrip(values, false);
2671 }
2672
2673 #[test]
2674 fn timestamp_millisecond_single_column() {
2675 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2676 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2677
2678 one_column_roundtrip(values, false);
2679 }
2680
2681 #[test]
2682 fn timestamp_microsecond_single_column() {
2683 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2684 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2685
2686 one_column_roundtrip(values, false);
2687 }
2688
2689 #[test]
2690 fn timestamp_nanosecond_single_column() {
2691 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2692 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2693
2694 one_column_roundtrip(values, false);
2695 }
2696
2697 #[test]
2698 fn date32_single_column() {
2699 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2700 }
2701
2702 #[test]
2703 fn date64_single_column() {
2704 required_and_optional::<Date64Array, _>(
2706 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2707 );
2708 }
2709
2710 #[test]
2711 fn time32_second_single_column() {
2712 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2713 }
2714
2715 #[test]
2716 fn time32_millisecond_single_column() {
2717 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2718 }
2719
2720 #[test]
2721 fn time64_microsecond_single_column() {
2722 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2723 }
2724
2725 #[test]
2726 fn time64_nanosecond_single_column() {
2727 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2728 }
2729
2730 #[test]
2731 fn duration_second_single_column() {
2732 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2733 }
2734
2735 #[test]
2736 fn duration_millisecond_single_column() {
2737 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2738 }
2739
2740 #[test]
2741 fn duration_microsecond_single_column() {
2742 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2743 }
2744
2745 #[test]
2746 fn duration_nanosecond_single_column() {
2747 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2748 }
2749
2750 #[test]
2751 fn interval_year_month_single_column() {
2752 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2753 }
2754
2755 #[test]
2756 fn interval_day_time_single_column() {
2757 required_and_optional::<IntervalDayTimeArray, _>(vec![
2758 IntervalDayTime::new(0, 1),
2759 IntervalDayTime::new(0, 3),
2760 IntervalDayTime::new(3, -2),
2761 IntervalDayTime::new(-200, 4),
2762 ]);
2763 }
2764
2765 #[test]
2766 #[should_panic(
2767 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2768 )]
2769 fn interval_month_day_nano_single_column() {
2770 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2771 IntervalMonthDayNano::new(0, 1, 5),
2772 IntervalMonthDayNano::new(0, 3, 2),
2773 IntervalMonthDayNano::new(3, -2, -5),
2774 IntervalMonthDayNano::new(-200, 4, -1),
2775 ]);
2776 }
2777
2778 #[test]
2779 fn binary_single_column() {
2780 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2781 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2782 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2783
2784 values_required::<BinaryArray, _>(many_vecs_iter);
2786 }
2787
2788 #[test]
2789 fn binary_view_single_column() {
2790 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2791 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2792 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2793
2794 values_required::<BinaryViewArray, _>(many_vecs_iter);
2796 }
2797
2798 #[test]
2799 fn i32_column_bloom_filter_at_end() {
2800 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2801 let mut options = RoundTripOptions::new(array, false);
2802 options.bloom_filter = true;
2803 options.bloom_filter_position = BloomFilterPosition::End;
2804
2805 let files = one_column_roundtrip_with_options(options);
2806 check_bloom_filter(
2807 files,
2808 "col".to_string(),
2809 (0..SMALL_SIZE as i32).collect(),
2810 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2811 );
2812 }
2813
2814 #[test]
2815 fn i32_column_bloom_filter() {
2816 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2817 let mut options = RoundTripOptions::new(array, false);
2818 options.bloom_filter = true;
2819
2820 let files = one_column_roundtrip_with_options(options);
2821 check_bloom_filter(
2822 files,
2823 "col".to_string(),
2824 (0..SMALL_SIZE as i32).collect(),
2825 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2826 );
2827 }
2828
2829 #[test]
2830 fn binary_column_bloom_filter() {
2831 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2832 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2833 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2834
2835 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2836 let mut options = RoundTripOptions::new(array, false);
2837 options.bloom_filter = true;
2838
2839 let files = one_column_roundtrip_with_options(options);
2840 check_bloom_filter(
2841 files,
2842 "col".to_string(),
2843 many_vecs,
2844 vec![vec![(SMALL_SIZE + 1) as u8]],
2845 );
2846 }
2847
2848 #[test]
2849 fn empty_string_null_column_bloom_filter() {
2850 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2851 let raw_strs = raw_values.iter().map(|s| s.as_str());
2852
2853 let array = Arc::new(StringArray::from_iter_values(raw_strs));
2854 let mut options = RoundTripOptions::new(array, false);
2855 options.bloom_filter = true;
2856
2857 let files = one_column_roundtrip_with_options(options);
2858
2859 let optional_raw_values: Vec<_> = raw_values
2860 .iter()
2861 .enumerate()
2862 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2863 .collect();
2864 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2866 }
2867
2868 #[test]
2869 fn large_binary_single_column() {
2870 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2871 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2872 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2873
2874 values_required::<LargeBinaryArray, _>(many_vecs_iter);
2876 }
2877
2878 #[test]
2879 fn fixed_size_binary_single_column() {
2880 let mut builder = FixedSizeBinaryBuilder::new(4);
2881 builder.append_value(b"0123").unwrap();
2882 builder.append_null();
2883 builder.append_value(b"8910").unwrap();
2884 builder.append_value(b"1112").unwrap();
2885 let array = Arc::new(builder.finish());
2886
2887 one_column_roundtrip(array, true);
2888 }
2889
2890 #[test]
2891 fn string_single_column() {
2892 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2893 let raw_strs = raw_values.iter().map(|s| s.as_str());
2894
2895 required_and_optional::<StringArray, _>(raw_strs);
2896 }
2897
2898 #[test]
2899 fn large_string_single_column() {
2900 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2901 let raw_strs = raw_values.iter().map(|s| s.as_str());
2902
2903 required_and_optional::<LargeStringArray, _>(raw_strs);
2904 }
2905
2906 #[test]
2907 fn string_view_single_column() {
2908 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2909 let raw_strs = raw_values.iter().map(|s| s.as_str());
2910
2911 required_and_optional::<StringViewArray, _>(raw_strs);
2912 }
2913
2914 #[test]
2915 fn null_list_single_column() {
2916 let null_field = Field::new_list_field(DataType::Null, true);
2917 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2918
2919 let schema = Schema::new(vec![list_field]);
2920
2921 let a_values = NullArray::new(2);
2923 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2924 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2925 DataType::Null,
2926 true,
2927 ))))
2928 .len(3)
2929 .add_buffer(a_value_offsets)
2930 .null_bit_buffer(Some(Buffer::from([0b00000101])))
2931 .add_child_data(a_values.into_data())
2932 .build()
2933 .unwrap();
2934
2935 let a = ListArray::from(a_list_data);
2936
2937 assert!(a.is_valid(0));
2938 assert!(!a.is_valid(1));
2939 assert!(a.is_valid(2));
2940
2941 assert_eq!(a.value(0).len(), 0);
2942 assert_eq!(a.value(2).len(), 2);
2943 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2944
2945 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2946 roundtrip(batch, None);
2947 }
2948
2949 #[test]
2950 fn list_single_column() {
2951 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2952 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2953 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2954 DataType::Int32,
2955 false,
2956 ))))
2957 .len(5)
2958 .add_buffer(a_value_offsets)
2959 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2960 .add_child_data(a_values.into_data())
2961 .build()
2962 .unwrap();
2963
2964 assert_eq!(a_list_data.null_count(), 1);
2965
2966 let a = ListArray::from(a_list_data);
2967 let values = Arc::new(a);
2968
2969 one_column_roundtrip(values, true);
2970 }
2971
2972 #[test]
2973 fn large_list_single_column() {
2974 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2975 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2976 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2977 "large_item",
2978 DataType::Int32,
2979 true,
2980 ))))
2981 .len(5)
2982 .add_buffer(a_value_offsets)
2983 .add_child_data(a_values.into_data())
2984 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2985 .build()
2986 .unwrap();
2987
2988 assert_eq!(a_list_data.null_count(), 1);
2990
2991 let a = LargeListArray::from(a_list_data);
2992 let values = Arc::new(a);
2993
2994 one_column_roundtrip(values, true);
2995 }
2996
2997 #[test]
2998 fn list_nested_nulls() {
2999 use arrow::datatypes::Int32Type;
3000 let data = vec![
3001 Some(vec![Some(1)]),
3002 Some(vec![Some(2), Some(3)]),
3003 None,
3004 Some(vec![Some(4), Some(5), None]),
3005 Some(vec![None]),
3006 Some(vec![Some(6), Some(7)]),
3007 ];
3008
3009 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3010 one_column_roundtrip(Arc::new(list), true);
3011
3012 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3013 one_column_roundtrip(Arc::new(list), true);
3014 }
3015
3016 #[test]
3017 fn struct_single_column() {
3018 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3019 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3020 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3021
3022 let values = Arc::new(s);
3023 one_column_roundtrip(values, false);
3024 }
3025
3026 #[test]
3027 fn list_and_map_coerced_names() {
3028 let list_field =
3030 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3031 let map_field = Field::new_map(
3032 "my_map",
3033 "entries",
3034 Field::new("keys", DataType::Int32, false),
3035 Field::new("values", DataType::Int32, true),
3036 false,
3037 true,
3038 );
3039
3040 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3041 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3042
3043 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3044
3045 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3047 let file = tempfile::tempfile().unwrap();
3048 let mut writer =
3049 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3050
3051 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3052 writer.write(&batch).unwrap();
3053 let file_metadata = writer.close().unwrap();
3054
3055 let schema = file_metadata.file_metadata().schema();
3056 let list_field = &schema.get_fields()[0].get_fields()[0];
3058 assert_eq!(list_field.get_fields()[0].name(), "element");
3059
3060 let map_field = &schema.get_fields()[1].get_fields()[0];
3061 assert_eq!(map_field.name(), "key_value");
3063 assert_eq!(map_field.get_fields()[0].name(), "key");
3065 assert_eq!(map_field.get_fields()[1].name(), "value");
3067
3068 let reader = SerializedFileReader::new(file).unwrap();
3070 let file_schema = reader.metadata().file_metadata().schema();
3071 let fields = file_schema.get_fields();
3072 let list_field = &fields[0].get_fields()[0];
3073 assert_eq!(list_field.get_fields()[0].name(), "element");
3074 let map_field = &fields[1].get_fields()[0];
3075 assert_eq!(map_field.name(), "key_value");
3076 assert_eq!(map_field.get_fields()[0].name(), "key");
3077 assert_eq!(map_field.get_fields()[1].name(), "value");
3078 }
3079
3080 #[test]
3081 fn fallback_flush_data_page() {
3082 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3084 let values = Arc::new(StringArray::from(raw_values));
3085 let encodings = vec![
3086 Encoding::DELTA_BYTE_ARRAY,
3087 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3088 ];
3089 let data_type = values.data_type().clone();
3090 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3091 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3092
3093 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3094 let data_page_size_limit: usize = 32;
3095 let write_batch_size: usize = 16;
3096
3097 for encoding in &encodings {
3098 for row_group_size in row_group_sizes {
3099 let props = WriterProperties::builder()
3100 .set_writer_version(WriterVersion::PARQUET_2_0)
3101 .set_max_row_group_size(row_group_size)
3102 .set_dictionary_enabled(false)
3103 .set_encoding(*encoding)
3104 .set_data_page_size_limit(data_page_size_limit)
3105 .set_write_batch_size(write_batch_size)
3106 .build();
3107
3108 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3109 let string_array_a = StringArray::from(a.clone());
3110 let string_array_b = StringArray::from(b.clone());
3111 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3112 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3113 assert_eq!(
3114 vec_a, vec_b,
3115 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3116 );
3117 });
3118 }
3119 }
3120 }
3121
3122 #[test]
3123 fn arrow_writer_string_dictionary() {
3124 #[allow(deprecated)]
3126 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3127 "dictionary",
3128 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3129 true,
3130 42,
3131 true,
3132 )]));
3133
3134 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3136 .iter()
3137 .copied()
3138 .collect();
3139
3140 one_column_roundtrip_with_schema(Arc::new(d), schema);
3142 }
3143
3144 #[test]
3145 fn arrow_writer_test_type_compatibility() {
3146 fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3147 where
3148 T1: Array + 'static,
3149 T2: Array + 'static,
3150 {
3151 let schema1 = Arc::new(Schema::new(vec![Field::new(
3152 "a",
3153 array1.data_type().clone(),
3154 false,
3155 )]));
3156
3157 let file = tempfile().unwrap();
3158 let mut writer =
3159 ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3160
3161 let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3162 writer.write(&rb1).unwrap();
3163
3164 let schema2 = Arc::new(Schema::new(vec![Field::new(
3165 "a",
3166 array2.data_type().clone(),
3167 false,
3168 )]));
3169 let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3170 writer.write(&rb2).unwrap();
3171
3172 writer.close().unwrap();
3173
3174 let mut record_batch_reader =
3175 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3176 let actual_batch = record_batch_reader.next().unwrap().unwrap();
3177
3178 let expected_batch =
3179 RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3180 assert_eq!(actual_batch, expected_batch);
3181 }
3182
3183 ensure_compatible_write(
3186 DictionaryArray::new(
3187 UInt8Array::from_iter_values(vec![0]),
3188 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3189 ),
3190 StringArray::from_iter_values(vec!["barquet"]),
3191 DictionaryArray::new(
3192 UInt8Array::from_iter_values(vec![0, 1]),
3193 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3194 ),
3195 );
3196
3197 ensure_compatible_write(
3198 StringArray::from_iter_values(vec!["parquet"]),
3199 DictionaryArray::new(
3200 UInt8Array::from_iter_values(vec![0]),
3201 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3202 ),
3203 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3204 );
3205
3206 ensure_compatible_write(
3209 DictionaryArray::new(
3210 UInt8Array::from_iter_values(vec![0]),
3211 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3212 ),
3213 DictionaryArray::new(
3214 UInt16Array::from_iter_values(vec![0]),
3215 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3216 ),
3217 DictionaryArray::new(
3218 UInt8Array::from_iter_values(vec![0, 1]),
3219 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3220 ),
3221 );
3222
3223 ensure_compatible_write(
3225 DictionaryArray::new(
3226 UInt8Array::from_iter_values(vec![0]),
3227 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3228 ),
3229 DictionaryArray::new(
3230 UInt8Array::from_iter_values(vec![0]),
3231 Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3232 ),
3233 DictionaryArray::new(
3234 UInt8Array::from_iter_values(vec![0, 1]),
3235 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3236 ),
3237 );
3238
3239 ensure_compatible_write(
3241 DictionaryArray::new(
3242 UInt8Array::from_iter_values(vec![0]),
3243 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3244 ),
3245 LargeStringArray::from_iter_values(vec!["barquet"]),
3246 DictionaryArray::new(
3247 UInt8Array::from_iter_values(vec![0, 1]),
3248 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3249 ),
3250 );
3251
3252 ensure_compatible_write(
3255 StringArray::from_iter_values(vec!["parquet"]),
3256 LargeStringArray::from_iter_values(vec!["barquet"]),
3257 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3258 );
3259
3260 ensure_compatible_write(
3261 LargeStringArray::from_iter_values(vec!["parquet"]),
3262 StringArray::from_iter_values(vec!["barquet"]),
3263 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3264 );
3265
3266 ensure_compatible_write(
3267 StringArray::from_iter_values(vec!["parquet"]),
3268 StringViewArray::from_iter_values(vec!["barquet"]),
3269 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3270 );
3271
3272 ensure_compatible_write(
3273 StringViewArray::from_iter_values(vec!["parquet"]),
3274 StringArray::from_iter_values(vec!["barquet"]),
3275 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3276 );
3277
3278 ensure_compatible_write(
3279 LargeStringArray::from_iter_values(vec!["parquet"]),
3280 StringViewArray::from_iter_values(vec!["barquet"]),
3281 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3282 );
3283
3284 ensure_compatible_write(
3285 StringViewArray::from_iter_values(vec!["parquet"]),
3286 LargeStringArray::from_iter_values(vec!["barquet"]),
3287 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3288 );
3289
3290 ensure_compatible_write(
3293 BinaryArray::from_iter_values(vec![b"parquet"]),
3294 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3295 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3296 );
3297
3298 ensure_compatible_write(
3299 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3300 BinaryArray::from_iter_values(vec![b"barquet"]),
3301 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3302 );
3303
3304 ensure_compatible_write(
3305 BinaryArray::from_iter_values(vec![b"parquet"]),
3306 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3307 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3308 );
3309
3310 ensure_compatible_write(
3311 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3312 BinaryArray::from_iter_values(vec![b"barquet"]),
3313 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3314 );
3315
3316 ensure_compatible_write(
3317 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3318 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3319 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3320 );
3321
3322 ensure_compatible_write(
3323 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3324 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3325 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3326 );
3327
3328 let list_field_metadata = HashMap::from_iter(vec![(
3331 PARQUET_FIELD_ID_META_KEY.to_string(),
3332 "1".to_string(),
3333 )]);
3334 let list_field = Field::new_list_field(DataType::Int32, false);
3335
3336 let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
3337 let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
3338
3339 let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
3340 let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
3341
3342 let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
3343 let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
3344
3345 ensure_compatible_write(
3346 ListArray::try_new(
3348 Arc::new(
3349 list_field
3350 .clone()
3351 .with_metadata(list_field_metadata.clone()),
3352 ),
3353 offsets1,
3354 values1,
3355 None,
3356 )
3357 .unwrap(),
3358 ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
3360 ListArray::try_new(
3362 Arc::new(
3363 list_field
3364 .clone()
3365 .with_metadata(list_field_metadata.clone()),
3366 ),
3367 offsets_expected,
3368 values_expected,
3369 None,
3370 )
3371 .unwrap(),
3372 );
3373 }
3374
3375 #[test]
3376 fn arrow_writer_primitive_dictionary() {
3377 #[allow(deprecated)]
3379 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3380 "dictionary",
3381 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3382 true,
3383 42,
3384 true,
3385 )]));
3386
3387 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3389 builder.append(12345678).unwrap();
3390 builder.append_null();
3391 builder.append(22345678).unwrap();
3392 builder.append(12345678).unwrap();
3393 let d = builder.finish();
3394
3395 one_column_roundtrip_with_schema(Arc::new(d), schema);
3396 }
3397
3398 #[test]
3399 fn arrow_writer_decimal32_dictionary() {
3400 let integers = vec![12345, 56789, 34567];
3401
3402 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3403
3404 let values = Decimal32Array::from(integers.clone())
3405 .with_precision_and_scale(5, 2)
3406 .unwrap();
3407
3408 let array = DictionaryArray::new(keys, Arc::new(values));
3409 one_column_roundtrip(Arc::new(array.clone()), true);
3410
3411 let values = Decimal32Array::from(integers)
3412 .with_precision_and_scale(9, 2)
3413 .unwrap();
3414
3415 let array = array.with_values(Arc::new(values));
3416 one_column_roundtrip(Arc::new(array), true);
3417 }
3418
3419 #[test]
3420 fn arrow_writer_decimal64_dictionary() {
3421 let integers = vec![12345, 56789, 34567];
3422
3423 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3424
3425 let values = Decimal64Array::from(integers.clone())
3426 .with_precision_and_scale(5, 2)
3427 .unwrap();
3428
3429 let array = DictionaryArray::new(keys, Arc::new(values));
3430 one_column_roundtrip(Arc::new(array.clone()), true);
3431
3432 let values = Decimal64Array::from(integers)
3433 .with_precision_and_scale(12, 2)
3434 .unwrap();
3435
3436 let array = array.with_values(Arc::new(values));
3437 one_column_roundtrip(Arc::new(array), true);
3438 }
3439
3440 #[test]
3441 fn arrow_writer_decimal128_dictionary() {
3442 let integers = vec![12345, 56789, 34567];
3443
3444 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3445
3446 let values = Decimal128Array::from(integers.clone())
3447 .with_precision_and_scale(5, 2)
3448 .unwrap();
3449
3450 let array = DictionaryArray::new(keys, Arc::new(values));
3451 one_column_roundtrip(Arc::new(array.clone()), true);
3452
3453 let values = Decimal128Array::from(integers)
3454 .with_precision_and_scale(12, 2)
3455 .unwrap();
3456
3457 let array = array.with_values(Arc::new(values));
3458 one_column_roundtrip(Arc::new(array), true);
3459 }
3460
3461 #[test]
3462 fn arrow_writer_decimal256_dictionary() {
3463 let integers = vec![
3464 i256::from_i128(12345),
3465 i256::from_i128(56789),
3466 i256::from_i128(34567),
3467 ];
3468
3469 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3470
3471 let values = Decimal256Array::from(integers.clone())
3472 .with_precision_and_scale(5, 2)
3473 .unwrap();
3474
3475 let array = DictionaryArray::new(keys, Arc::new(values));
3476 one_column_roundtrip(Arc::new(array.clone()), true);
3477
3478 let values = Decimal256Array::from(integers)
3479 .with_precision_and_scale(12, 2)
3480 .unwrap();
3481
3482 let array = array.with_values(Arc::new(values));
3483 one_column_roundtrip(Arc::new(array), true);
3484 }
3485
3486 #[test]
3487 fn arrow_writer_string_dictionary_unsigned_index() {
3488 #[allow(deprecated)]
3490 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3491 "dictionary",
3492 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3493 true,
3494 42,
3495 true,
3496 )]));
3497
3498 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3500 .iter()
3501 .copied()
3502 .collect();
3503
3504 one_column_roundtrip_with_schema(Arc::new(d), schema);
3505 }
3506
3507 #[test]
3508 fn u32_min_max() {
3509 let src = [
3511 u32::MIN,
3512 u32::MIN + 1,
3513 (i32::MAX as u32) - 1,
3514 i32::MAX as u32,
3515 (i32::MAX as u32) + 1,
3516 u32::MAX - 1,
3517 u32::MAX,
3518 ];
3519 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3520 let files = one_column_roundtrip(values, false);
3521
3522 for file in files {
3523 let reader = SerializedFileReader::new(file).unwrap();
3525 let metadata = reader.metadata();
3526
3527 let mut row_offset = 0;
3528 for row_group in metadata.row_groups() {
3529 assert_eq!(row_group.num_columns(), 1);
3530 let column = row_group.column(0);
3531
3532 let num_values = column.num_values() as usize;
3533 let src_slice = &src[row_offset..row_offset + num_values];
3534 row_offset += column.num_values() as usize;
3535
3536 let stats = column.statistics().unwrap();
3537 if let Statistics::Int32(stats) = stats {
3538 assert_eq!(
3539 *stats.min_opt().unwrap() as u32,
3540 *src_slice.iter().min().unwrap()
3541 );
3542 assert_eq!(
3543 *stats.max_opt().unwrap() as u32,
3544 *src_slice.iter().max().unwrap()
3545 );
3546 } else {
3547 panic!("Statistics::Int32 missing")
3548 }
3549 }
3550 }
3551 }
3552
3553 #[test]
3554 fn u64_min_max() {
3555 let src = [
3557 u64::MIN,
3558 u64::MIN + 1,
3559 (i64::MAX as u64) - 1,
3560 i64::MAX as u64,
3561 (i64::MAX as u64) + 1,
3562 u64::MAX - 1,
3563 u64::MAX,
3564 ];
3565 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3566 let files = one_column_roundtrip(values, false);
3567
3568 for file in files {
3569 let reader = SerializedFileReader::new(file).unwrap();
3571 let metadata = reader.metadata();
3572
3573 let mut row_offset = 0;
3574 for row_group in metadata.row_groups() {
3575 assert_eq!(row_group.num_columns(), 1);
3576 let column = row_group.column(0);
3577
3578 let num_values = column.num_values() as usize;
3579 let src_slice = &src[row_offset..row_offset + num_values];
3580 row_offset += column.num_values() as usize;
3581
3582 let stats = column.statistics().unwrap();
3583 if let Statistics::Int64(stats) = stats {
3584 assert_eq!(
3585 *stats.min_opt().unwrap() as u64,
3586 *src_slice.iter().min().unwrap()
3587 );
3588 assert_eq!(
3589 *stats.max_opt().unwrap() as u64,
3590 *src_slice.iter().max().unwrap()
3591 );
3592 } else {
3593 panic!("Statistics::Int64 missing")
3594 }
3595 }
3596 }
3597 }
3598
3599 #[test]
3600 fn statistics_null_counts_only_nulls() {
3601 let values = Arc::new(UInt64Array::from(vec![None, None]));
3603 let files = one_column_roundtrip(values, true);
3604
3605 for file in files {
3606 let reader = SerializedFileReader::new(file).unwrap();
3608 let metadata = reader.metadata();
3609 assert_eq!(metadata.num_row_groups(), 1);
3610 let row_group = metadata.row_group(0);
3611 assert_eq!(row_group.num_columns(), 1);
3612 let column = row_group.column(0);
3613 let stats = column.statistics().unwrap();
3614 assert_eq!(stats.null_count_opt(), Some(2));
3615 }
3616 }
3617
3618 #[test]
3619 fn test_list_of_struct_roundtrip() {
3620 let int_field = Field::new("a", DataType::Int32, true);
3622 let int_field2 = Field::new("b", DataType::Int32, true);
3623
3624 let int_builder = Int32Builder::with_capacity(10);
3625 let int_builder2 = Int32Builder::with_capacity(10);
3626
3627 let struct_builder = StructBuilder::new(
3628 vec![int_field, int_field2],
3629 vec![Box::new(int_builder), Box::new(int_builder2)],
3630 );
3631 let mut list_builder = ListBuilder::new(struct_builder);
3632
3633 let values = list_builder.values();
3638 values
3639 .field_builder::<Int32Builder>(0)
3640 .unwrap()
3641 .append_value(1);
3642 values
3643 .field_builder::<Int32Builder>(1)
3644 .unwrap()
3645 .append_value(2);
3646 values.append(true);
3647 list_builder.append(true);
3648
3649 list_builder.append(true);
3651
3652 list_builder.append(false);
3654
3655 let values = list_builder.values();
3657 values
3658 .field_builder::<Int32Builder>(0)
3659 .unwrap()
3660 .append_null();
3661 values
3662 .field_builder::<Int32Builder>(1)
3663 .unwrap()
3664 .append_null();
3665 values.append(false);
3666 values
3667 .field_builder::<Int32Builder>(0)
3668 .unwrap()
3669 .append_null();
3670 values
3671 .field_builder::<Int32Builder>(1)
3672 .unwrap()
3673 .append_null();
3674 values.append(false);
3675 list_builder.append(true);
3676
3677 let values = list_builder.values();
3679 values
3680 .field_builder::<Int32Builder>(0)
3681 .unwrap()
3682 .append_null();
3683 values
3684 .field_builder::<Int32Builder>(1)
3685 .unwrap()
3686 .append_value(3);
3687 values.append(true);
3688 list_builder.append(true);
3689
3690 let values = list_builder.values();
3692 values
3693 .field_builder::<Int32Builder>(0)
3694 .unwrap()
3695 .append_value(2);
3696 values
3697 .field_builder::<Int32Builder>(1)
3698 .unwrap()
3699 .append_null();
3700 values.append(true);
3701 list_builder.append(true);
3702
3703 let array = Arc::new(list_builder.finish());
3704
3705 one_column_roundtrip(array, true);
3706 }
3707
3708 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3709 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3710 }
3711
3712 #[test]
3713 fn test_aggregates_records() {
3714 let arrays = [
3715 Int32Array::from((0..100).collect::<Vec<_>>()),
3716 Int32Array::from((0..50).collect::<Vec<_>>()),
3717 Int32Array::from((200..500).collect::<Vec<_>>()),
3718 ];
3719
3720 let schema = Arc::new(Schema::new(vec![Field::new(
3721 "int",
3722 ArrowDataType::Int32,
3723 false,
3724 )]));
3725
3726 let file = tempfile::tempfile().unwrap();
3727
3728 let props = WriterProperties::builder()
3729 .set_max_row_group_size(200)
3730 .build();
3731
3732 let mut writer =
3733 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3734
3735 for array in arrays {
3736 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3737 writer.write(&batch).unwrap();
3738 }
3739
3740 writer.close().unwrap();
3741
3742 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3743 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3744
3745 let batches = builder
3746 .with_batch_size(100)
3747 .build()
3748 .unwrap()
3749 .collect::<ArrowResult<Vec<_>>>()
3750 .unwrap();
3751
3752 assert_eq!(batches.len(), 5);
3753 assert!(batches.iter().all(|x| x.num_columns() == 1));
3754
3755 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3756
3757 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3758
3759 let values: Vec<_> = batches
3760 .iter()
3761 .flat_map(|x| {
3762 x.column(0)
3763 .as_any()
3764 .downcast_ref::<Int32Array>()
3765 .unwrap()
3766 .values()
3767 .iter()
3768 .cloned()
3769 })
3770 .collect();
3771
3772 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3773 assert_eq!(&values, &expected_values)
3774 }
3775
3776 #[test]
3777 fn complex_aggregate() {
3778 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3780 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3781 let struct_a = Arc::new(Field::new(
3782 "struct_a",
3783 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3784 true,
3785 ));
3786
3787 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3788 let struct_b = Arc::new(Field::new(
3789 "struct_b",
3790 DataType::Struct(vec![list_a.clone()].into()),
3791 false,
3792 ));
3793
3794 let schema = Arc::new(Schema::new(vec![struct_b]));
3795
3796 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3798 let field_b_array =
3799 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3800
3801 let struct_a_array = StructArray::from(vec![
3802 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3803 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3804 ]);
3805
3806 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3807 .len(5)
3808 .add_buffer(Buffer::from_iter(vec![
3809 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3810 ]))
3811 .null_bit_buffer(Some(Buffer::from_iter(vec![
3812 true, false, true, false, true,
3813 ])))
3814 .child_data(vec![struct_a_array.into_data()])
3815 .build()
3816 .unwrap();
3817
3818 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3819 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3820
3821 let batch1 =
3822 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3823 .unwrap();
3824
3825 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3826 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3827
3828 let struct_a_array = StructArray::from(vec![
3829 (field_a, Arc::new(field_a_array) as ArrayRef),
3830 (field_b, Arc::new(field_b_array) as ArrayRef),
3831 ]);
3832
3833 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3834 .len(2)
3835 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3836 .child_data(vec![struct_a_array.into_data()])
3837 .build()
3838 .unwrap();
3839
3840 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3841 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3842
3843 let batch2 =
3844 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3845 .unwrap();
3846
3847 let batches = &[batch1, batch2];
3848
3849 let expected = r#"
3852 +-------------------------------------------------------------------------------------------------------+
3853 | struct_b |
3854 +-------------------------------------------------------------------------------------------------------+
3855 | {list: [{leaf_a: 1, leaf_b: 1}]} |
3856 | {list: } |
3857 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
3858 | {list: } |
3859 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
3860 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3861 | {list: [{leaf_a: 10, leaf_b: }]} |
3862 +-------------------------------------------------------------------------------------------------------+
3863 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3864
3865 let actual = pretty_format_batches(batches).unwrap().to_string();
3866 assert_eq!(actual, expected);
3867
3868 let file = tempfile::tempfile().unwrap();
3870 let props = WriterProperties::builder()
3871 .set_max_row_group_size(6)
3872 .build();
3873
3874 let mut writer =
3875 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3876
3877 for batch in batches {
3878 writer.write(batch).unwrap();
3879 }
3880 writer.close().unwrap();
3881
3882 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3887 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3888
3889 let batches = builder
3890 .with_batch_size(2)
3891 .build()
3892 .unwrap()
3893 .collect::<ArrowResult<Vec<_>>>()
3894 .unwrap();
3895
3896 assert_eq!(batches.len(), 4);
3897 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3898 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3899
3900 let actual = pretty_format_batches(&batches).unwrap().to_string();
3901 assert_eq!(actual, expected);
3902 }
3903
3904 #[test]
3905 fn test_arrow_writer_metadata() {
3906 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3907 let file_schema = batch_schema.clone().with_metadata(
3908 vec![("foo".to_string(), "bar".to_string())]
3909 .into_iter()
3910 .collect(),
3911 );
3912
3913 let batch = RecordBatch::try_new(
3914 Arc::new(batch_schema),
3915 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3916 )
3917 .unwrap();
3918
3919 let mut buf = Vec::with_capacity(1024);
3920 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3921 writer.write(&batch).unwrap();
3922 writer.close().unwrap();
3923 }
3924
3925 #[test]
3926 fn test_arrow_writer_nullable() {
3927 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3928 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3929 let file_schema = Arc::new(file_schema);
3930
3931 let batch = RecordBatch::try_new(
3932 Arc::new(batch_schema),
3933 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3934 )
3935 .unwrap();
3936
3937 let mut buf = Vec::with_capacity(1024);
3938 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3939 writer.write(&batch).unwrap();
3940 writer.close().unwrap();
3941
3942 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3943 let back = read.next().unwrap().unwrap();
3944 assert_eq!(back.schema(), file_schema);
3945 assert_ne!(back.schema(), batch.schema());
3946 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3947 }
3948
3949 #[test]
3950 fn in_progress_accounting() {
3951 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3953
3954 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3956
3957 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3959
3960 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3961
3962 assert_eq!(writer.in_progress_size(), 0);
3964 assert_eq!(writer.in_progress_rows(), 0);
3965 assert_eq!(writer.memory_size(), 0);
3966 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
3968
3969 let initial_size = writer.in_progress_size();
3971 assert!(initial_size > 0);
3972 assert_eq!(writer.in_progress_rows(), 5);
3973 let initial_memory = writer.memory_size();
3974 assert!(initial_memory > 0);
3975 assert!(
3977 initial_size <= initial_memory,
3978 "{initial_size} <= {initial_memory}"
3979 );
3980
3981 writer.write(&batch).unwrap();
3983 assert!(writer.in_progress_size() > initial_size);
3984 assert_eq!(writer.in_progress_rows(), 10);
3985 assert!(writer.memory_size() > initial_memory);
3986 assert!(
3987 writer.in_progress_size() <= writer.memory_size(),
3988 "in_progress_size {} <= memory_size {}",
3989 writer.in_progress_size(),
3990 writer.memory_size()
3991 );
3992
3993 let pre_flush_bytes_written = writer.bytes_written();
3995 writer.flush().unwrap();
3996 assert_eq!(writer.in_progress_size(), 0);
3997 assert_eq!(writer.memory_size(), 0);
3998 assert!(writer.bytes_written() > pre_flush_bytes_written);
3999
4000 writer.close().unwrap();
4001 }
4002
4003 #[test]
4004 fn test_writer_all_null() {
4005 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4006 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4007 let batch = RecordBatch::try_from_iter(vec![
4008 ("a", Arc::new(a) as ArrayRef),
4009 ("b", Arc::new(b) as ArrayRef),
4010 ])
4011 .unwrap();
4012
4013 let mut buf = Vec::with_capacity(1024);
4014 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4015 writer.write(&batch).unwrap();
4016 writer.close().unwrap();
4017
4018 let bytes = Bytes::from(buf);
4019 let options = ReadOptionsBuilder::new().with_page_index().build();
4020 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4021 let index = reader.metadata().offset_index().unwrap();
4022
4023 assert_eq!(index.len(), 1);
4024 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
4028
4029 #[test]
4030 fn test_disabled_statistics_with_page() {
4031 let file_schema = Schema::new(vec![
4032 Field::new("a", DataType::Utf8, true),
4033 Field::new("b", DataType::Utf8, true),
4034 ]);
4035 let file_schema = Arc::new(file_schema);
4036
4037 let batch = RecordBatch::try_new(
4038 file_schema.clone(),
4039 vec![
4040 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4041 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4042 ],
4043 )
4044 .unwrap();
4045
4046 let props = WriterProperties::builder()
4047 .set_statistics_enabled(EnabledStatistics::None)
4048 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4049 .build();
4050
4051 let mut buf = Vec::with_capacity(1024);
4052 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4053 writer.write(&batch).unwrap();
4054
4055 let metadata = writer.close().unwrap();
4056 assert_eq!(metadata.num_row_groups(), 1);
4057 let row_group = metadata.row_group(0);
4058 assert_eq!(row_group.num_columns(), 2);
4059 assert!(row_group.column(0).offset_index_offset().is_some());
4061 assert!(row_group.column(0).column_index_offset().is_some());
4062 assert!(row_group.column(1).offset_index_offset().is_some());
4064 assert!(row_group.column(1).column_index_offset().is_none());
4065
4066 let options = ReadOptionsBuilder::new().with_page_index().build();
4067 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4068
4069 let row_group = reader.get_row_group(0).unwrap();
4070 let a_col = row_group.metadata().column(0);
4071 let b_col = row_group.metadata().column(1);
4072
4073 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4075 let min = byte_array_stats.min_opt().unwrap();
4076 let max = byte_array_stats.max_opt().unwrap();
4077
4078 assert_eq!(min.as_bytes(), b"a");
4079 assert_eq!(max.as_bytes(), b"d");
4080 } else {
4081 panic!("expecting Statistics::ByteArray");
4082 }
4083
4084 assert!(b_col.statistics().is_none());
4086
4087 let offset_index = reader.metadata().offset_index().unwrap();
4088 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
4092 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4096 assert!(
4097 matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4098 "{a_idx:?}"
4099 );
4100 let b_idx = &column_index[0][1];
4101 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4102 }
4103
4104 #[test]
4105 fn test_disabled_statistics_with_chunk() {
4106 let file_schema = Schema::new(vec![
4107 Field::new("a", DataType::Utf8, true),
4108 Field::new("b", DataType::Utf8, true),
4109 ]);
4110 let file_schema = Arc::new(file_schema);
4111
4112 let batch = RecordBatch::try_new(
4113 file_schema.clone(),
4114 vec![
4115 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4116 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4117 ],
4118 )
4119 .unwrap();
4120
4121 let props = WriterProperties::builder()
4122 .set_statistics_enabled(EnabledStatistics::None)
4123 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4124 .build();
4125
4126 let mut buf = Vec::with_capacity(1024);
4127 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4128 writer.write(&batch).unwrap();
4129
4130 let metadata = writer.close().unwrap();
4131 assert_eq!(metadata.num_row_groups(), 1);
4132 let row_group = metadata.row_group(0);
4133 assert_eq!(row_group.num_columns(), 2);
4134 assert!(row_group.column(0).offset_index_offset().is_some());
4136 assert!(row_group.column(0).column_index_offset().is_none());
4137 assert!(row_group.column(1).offset_index_offset().is_some());
4139 assert!(row_group.column(1).column_index_offset().is_none());
4140
4141 let options = ReadOptionsBuilder::new().with_page_index().build();
4142 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4143
4144 let row_group = reader.get_row_group(0).unwrap();
4145 let a_col = row_group.metadata().column(0);
4146 let b_col = row_group.metadata().column(1);
4147
4148 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4150 let min = byte_array_stats.min_opt().unwrap();
4151 let max = byte_array_stats.max_opt().unwrap();
4152
4153 assert_eq!(min.as_bytes(), b"a");
4154 assert_eq!(max.as_bytes(), b"d");
4155 } else {
4156 panic!("expecting Statistics::ByteArray");
4157 }
4158
4159 assert!(b_col.statistics().is_none());
4161
4162 let column_index = reader.metadata().column_index().unwrap();
4163 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4167 assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4168 let b_idx = &column_index[0][1];
4169 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4170 }
4171
4172 #[test]
4173 fn test_arrow_writer_skip_metadata() {
4174 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4175 let file_schema = Arc::new(batch_schema.clone());
4176
4177 let batch = RecordBatch::try_new(
4178 Arc::new(batch_schema),
4179 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4180 )
4181 .unwrap();
4182 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4183
4184 let mut buf = Vec::with_capacity(1024);
4185 let mut writer =
4186 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4187 writer.write(&batch).unwrap();
4188 writer.close().unwrap();
4189
4190 let bytes = Bytes::from(buf);
4191 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4192 assert_eq!(file_schema, *reader_builder.schema());
4193 if let Some(key_value_metadata) = reader_builder
4194 .metadata()
4195 .file_metadata()
4196 .key_value_metadata()
4197 {
4198 assert!(
4199 !key_value_metadata
4200 .iter()
4201 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4202 );
4203 }
4204 }
4205
4206 #[test]
4207 fn test_arrow_writer_explicit_schema() {
4208 let batch_schema = Arc::new(Schema::new(vec![Field::new(
4210 "integers",
4211 DataType::Int32,
4212 true,
4213 )]));
4214 let parquet_schema = Type::group_type_builder("root")
4215 .with_fields(vec![
4216 Type::primitive_type_builder("integers", crate::basic::Type::INT64)
4217 .build()
4218 .unwrap()
4219 .into(),
4220 ])
4221 .build()
4222 .unwrap();
4223 let parquet_schema_descr = SchemaDescriptor::new(parquet_schema.into());
4224
4225 let batch = RecordBatch::try_new(
4226 batch_schema.clone(),
4227 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4228 )
4229 .unwrap();
4230
4231 let explicit_schema_options =
4232 ArrowWriterOptions::new().with_parquet_schema(parquet_schema_descr);
4233 let mut buf = Vec::with_capacity(1024);
4234 let mut writer = ArrowWriter::try_new_with_options(
4235 &mut buf,
4236 batch_schema.clone(),
4237 explicit_schema_options,
4238 )
4239 .unwrap();
4240 writer.write(&batch).unwrap();
4241 writer.close().unwrap();
4242
4243 let bytes = Bytes::from(buf);
4244 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4245
4246 let expected_schema = Arc::new(Schema::new(vec![Field::new(
4247 "integers",
4248 DataType::Int64,
4249 true,
4250 )]));
4251 assert_eq!(reader_builder.schema(), &expected_schema);
4252
4253 let batches = reader_builder
4254 .build()
4255 .unwrap()
4256 .collect::<Result<Vec<_>, ArrowError>>()
4257 .unwrap();
4258 assert_eq!(batches.len(), 1);
4259
4260 let expected_batch = RecordBatch::try_new(
4261 expected_schema.clone(),
4262 vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as _],
4263 )
4264 .unwrap();
4265 assert_eq!(batches[0], expected_batch);
4266 }
4267
4268 #[test]
4269 fn mismatched_schemas() {
4270 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4271 let file_schema = Arc::new(Schema::new(vec![Field::new(
4272 "temperature",
4273 DataType::Float64,
4274 false,
4275 )]));
4276
4277 let batch = RecordBatch::try_new(
4278 Arc::new(batch_schema),
4279 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4280 )
4281 .unwrap();
4282
4283 let mut buf = Vec::with_capacity(1024);
4284 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4285
4286 let err = writer.write(&batch).unwrap_err().to_string();
4287 assert_eq!(
4288 err,
4289 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4290 );
4291 }
4292
4293 #[test]
4294 fn test_roundtrip_empty_schema() {
4296 let empty_batch = RecordBatch::try_new_with_options(
4298 Arc::new(Schema::empty()),
4299 vec![],
4300 &RecordBatchOptions::default().with_row_count(Some(0)),
4301 )
4302 .unwrap();
4303
4304 let mut parquet_bytes: Vec<u8> = Vec::new();
4306 let mut writer =
4307 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4308 writer.write(&empty_batch).unwrap();
4309 writer.close().unwrap();
4310
4311 let bytes = Bytes::from(parquet_bytes);
4313 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4314 assert_eq!(reader.schema(), &empty_batch.schema());
4315 let batches: Vec<_> = reader
4316 .build()
4317 .unwrap()
4318 .collect::<ArrowResult<Vec<_>>>()
4319 .unwrap();
4320 assert_eq!(batches.len(), 0);
4321 }
4322
4323 #[test]
4324 fn test_page_stats_not_written_by_default() {
4325 let string_field = Field::new("a", DataType::Utf8, false);
4326 let schema = Schema::new(vec![string_field]);
4327 let raw_string_values = vec!["Blart Versenwald III"];
4328 let string_values = StringArray::from(raw_string_values.clone());
4329 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4330
4331 let props = WriterProperties::builder()
4332 .set_statistics_enabled(EnabledStatistics::Page)
4333 .set_dictionary_enabled(false)
4334 .set_encoding(Encoding::PLAIN)
4335 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4336 .build();
4337
4338 let file = roundtrip_opts(&batch, props);
4339
4340 let first_page = &file[4..];
4345 let mut prot = ThriftSliceInputProtocol::new(first_page);
4346 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4347 let stats = hdr.data_page_header.unwrap().statistics;
4348
4349 assert!(stats.is_none());
4350 }
4351
4352 #[test]
4353 fn test_page_stats_when_enabled() {
4354 let string_field = Field::new("a", DataType::Utf8, false);
4355 let schema = Schema::new(vec![string_field]);
4356 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4357 let string_values = StringArray::from(raw_string_values.clone());
4358 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4359
4360 let props = WriterProperties::builder()
4361 .set_statistics_enabled(EnabledStatistics::Page)
4362 .set_dictionary_enabled(false)
4363 .set_encoding(Encoding::PLAIN)
4364 .set_write_page_header_statistics(true)
4365 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4366 .build();
4367
4368 let file = roundtrip_opts(&batch, props);
4369
4370 let first_page = &file[4..];
4375 let mut prot = ThriftSliceInputProtocol::new(first_page);
4376 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4377 let stats = hdr.data_page_header.unwrap().statistics;
4378
4379 let stats = stats.unwrap();
4380 assert!(stats.is_max_value_exact.unwrap());
4382 assert!(stats.is_min_value_exact.unwrap());
4383 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4384 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4385 }
4386
4387 #[test]
4388 fn test_page_stats_truncation() {
4389 let string_field = Field::new("a", DataType::Utf8, false);
4390 let binary_field = Field::new("b", DataType::Binary, false);
4391 let schema = Schema::new(vec![string_field, binary_field]);
4392
4393 let raw_string_values = vec!["Blart Versenwald III"];
4394 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4395 let raw_binary_value_refs = raw_binary_values
4396 .iter()
4397 .map(|x| x.as_slice())
4398 .collect::<Vec<_>>();
4399
4400 let string_values = StringArray::from(raw_string_values.clone());
4401 let binary_values = BinaryArray::from(raw_binary_value_refs);
4402 let batch = RecordBatch::try_new(
4403 Arc::new(schema),
4404 vec![Arc::new(string_values), Arc::new(binary_values)],
4405 )
4406 .unwrap();
4407
4408 let props = WriterProperties::builder()
4409 .set_statistics_truncate_length(Some(2))
4410 .set_dictionary_enabled(false)
4411 .set_encoding(Encoding::PLAIN)
4412 .set_write_page_header_statistics(true)
4413 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4414 .build();
4415
4416 let file = roundtrip_opts(&batch, props);
4417
4418 let first_page = &file[4..];
4423 let mut prot = ThriftSliceInputProtocol::new(first_page);
4424 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4425 let stats = hdr.data_page_header.unwrap().statistics;
4426 assert!(stats.is_some());
4427 let stats = stats.unwrap();
4428 assert!(!stats.is_max_value_exact.unwrap());
4430 assert!(!stats.is_min_value_exact.unwrap());
4431 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4432 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4433
4434 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4436 let mut prot = ThriftSliceInputProtocol::new(second_page);
4437 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4438 let stats = hdr.data_page_header.unwrap().statistics;
4439 assert!(stats.is_some());
4440 let stats = stats.unwrap();
4441 assert!(!stats.is_max_value_exact.unwrap());
4443 assert!(!stats.is_min_value_exact.unwrap());
4444 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4445 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4446 }
4447
4448 #[test]
4449 fn test_page_encoding_statistics_roundtrip() {
4450 let batch_schema = Schema::new(vec![Field::new(
4451 "int32",
4452 arrow_schema::DataType::Int32,
4453 false,
4454 )]);
4455
4456 let batch = RecordBatch::try_new(
4457 Arc::new(batch_schema.clone()),
4458 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4459 )
4460 .unwrap();
4461
4462 let mut file: File = tempfile::tempfile().unwrap();
4463 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4464 writer.write(&batch).unwrap();
4465 let file_metadata = writer.close().unwrap();
4466
4467 assert_eq!(file_metadata.num_row_groups(), 1);
4468 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
4469 assert!(
4470 file_metadata
4471 .row_group(0)
4472 .column(0)
4473 .page_encoding_stats()
4474 .is_some()
4475 );
4476 let chunk_page_stats = file_metadata
4477 .row_group(0)
4478 .column(0)
4479 .page_encoding_stats()
4480 .unwrap();
4481
4482 let options = ReadOptionsBuilder::new().with_page_index().build();
4484 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4485
4486 let rowgroup = reader.get_row_group(0).expect("row group missing");
4487 assert_eq!(rowgroup.num_columns(), 1);
4488 let column = rowgroup.metadata().column(0);
4489 assert!(column.page_encoding_stats().is_some());
4490 let file_page_stats = column.page_encoding_stats().unwrap();
4491 assert_eq!(chunk_page_stats, file_page_stats);
4492 }
4493
4494 #[test]
4495 fn test_different_dict_page_size_limit() {
4496 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4497 let schema = Arc::new(Schema::new(vec![
4498 Field::new("col0", arrow_schema::DataType::Int64, false),
4499 Field::new("col1", arrow_schema::DataType::Int64, false),
4500 ]));
4501 let batch =
4502 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4503
4504 let props = WriterProperties::builder()
4505 .set_dictionary_page_size_limit(1024 * 1024)
4506 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4507 .build();
4508 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4509 writer.write(&batch).unwrap();
4510 let data = Bytes::from(writer.into_inner().unwrap());
4511
4512 let mut metadata = ParquetMetaDataReader::new();
4513 metadata.try_parse(&data).unwrap();
4514 let metadata = metadata.finish().unwrap();
4515 let col0_meta = metadata.row_group(0).column(0);
4516 let col1_meta = metadata.row_group(0).column(1);
4517
4518 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4519 let mut reader =
4520 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4521 let page = reader.get_next_page().unwrap().unwrap();
4522 match page {
4523 Page::DictionaryPage { buf, .. } => buf.len(),
4524 _ => panic!("expected DictionaryPage"),
4525 }
4526 };
4527
4528 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4529 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4530 }
4531}