1use std::cmp::min;
24use std::collections::HashMap;
25use std::io::{BufWriter, Write};
26use std::sync::Arc;
27
28use flatbuffers::FlatBufferBuilder;
29
30use arrow_array::builder::BufferBuilder;
31use arrow_array::cast::*;
32use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
33use arrow_array::*;
34use arrow_buffer::bit_util;
35use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
36use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec};
37use arrow_schema::*;
38
39use crate::compression::CompressionCodec;
40use crate::convert::IpcSchemaEncoder;
41use crate::CONTINUATION_MARKER;
42
43#[derive(Debug, Clone)]
45pub struct IpcWriteOptions {
46 alignment: u8,
49 write_legacy_ipc_format: bool,
51 metadata_version: crate::MetadataVersion,
60 batch_compression_type: Option<crate::CompressionType>,
63 preserve_dict_id: bool,
68}
69
70impl IpcWriteOptions {
71 pub fn try_with_compression(
76 mut self,
77 batch_compression_type: Option<crate::CompressionType>,
78 ) -> Result<Self, ArrowError> {
79 self.batch_compression_type = batch_compression_type;
80
81 if self.batch_compression_type.is_some()
82 && self.metadata_version < crate::MetadataVersion::V5
83 {
84 return Err(ArrowError::InvalidArgumentError(
85 "Compression only supported in metadata v5 and above".to_string(),
86 ));
87 }
88 Ok(self)
89 }
90 pub fn try_new(
92 alignment: usize,
93 write_legacy_ipc_format: bool,
94 metadata_version: crate::MetadataVersion,
95 ) -> Result<Self, ArrowError> {
96 let is_alignment_valid =
97 alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
98 if !is_alignment_valid {
99 return Err(ArrowError::InvalidArgumentError(
100 "Alignment should be 8, 16, 32, or 64.".to_string(),
101 ));
102 }
103 let alignment: u8 = u8::try_from(alignment).expect("range already checked");
104 match metadata_version {
105 crate::MetadataVersion::V1
106 | crate::MetadataVersion::V2
107 | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
108 "Writing IPC metadata version 3 and lower not supported".to_string(),
109 )),
110 crate::MetadataVersion::V4 => Ok(Self {
111 alignment,
112 write_legacy_ipc_format,
113 metadata_version,
114 batch_compression_type: None,
115 preserve_dict_id: true,
116 }),
117 crate::MetadataVersion::V5 => {
118 if write_legacy_ipc_format {
119 Err(ArrowError::InvalidArgumentError(
120 "Legacy IPC format only supported on metadata version 4".to_string(),
121 ))
122 } else {
123 Ok(Self {
124 alignment,
125 write_legacy_ipc_format,
126 metadata_version,
127 batch_compression_type: None,
128 preserve_dict_id: true,
129 })
130 }
131 }
132 z => Err(ArrowError::InvalidArgumentError(format!(
133 "Unsupported crate::MetadataVersion {z:?}"
134 ))),
135 }
136 }
137
138 pub fn preserve_dict_id(&self) -> bool {
141 self.preserve_dict_id
142 }
143
144 pub fn with_preserve_dict_id(mut self, preserve_dict_id: bool) -> Self {
152 self.preserve_dict_id = preserve_dict_id;
153 self
154 }
155}
156
157impl Default for IpcWriteOptions {
158 fn default() -> Self {
159 Self {
160 alignment: 64,
161 write_legacy_ipc_format: false,
162 metadata_version: crate::MetadataVersion::V5,
163 batch_compression_type: None,
164 preserve_dict_id: true,
165 }
166 }
167}
168
169#[derive(Debug, Default)]
170pub struct IpcDataGenerator {}
202
203impl IpcDataGenerator {
204 pub fn schema_to_bytes_with_dictionary_tracker(
210 &self,
211 schema: &Schema,
212 dictionary_tracker: &mut DictionaryTracker,
213 write_options: &IpcWriteOptions,
214 ) -> EncodedData {
215 let mut fbb = FlatBufferBuilder::new();
216 let schema = {
217 let fb = IpcSchemaEncoder::new()
218 .with_dictionary_tracker(dictionary_tracker)
219 .schema_to_fb_offset(&mut fbb, schema);
220 fb.as_union_value()
221 };
222
223 let mut message = crate::MessageBuilder::new(&mut fbb);
224 message.add_version(write_options.metadata_version);
225 message.add_header_type(crate::MessageHeader::Schema);
226 message.add_bodyLength(0);
227 message.add_header(schema);
228 let data = message.finish();
230 fbb.finish(data, None);
231
232 let data = fbb.finished_data();
233 EncodedData {
234 ipc_message: data.to_vec(),
235 arrow_data: vec![],
236 }
237 }
238
239 #[deprecated(
240 since = "54.0.0",
241 note = "Use `schema_to_bytes_with_dictionary_tracker` instead. This function signature of `schema_to_bytes_with_dictionary_tracker` in the next release."
242 )]
243 pub fn schema_to_bytes(&self, schema: &Schema, write_options: &IpcWriteOptions) -> EncodedData {
245 let mut fbb = FlatBufferBuilder::new();
246 let schema = {
247 #[allow(deprecated)]
248 let fb = crate::convert::schema_to_fb_offset(&mut fbb, schema);
250 fb.as_union_value()
251 };
252
253 let mut message = crate::MessageBuilder::new(&mut fbb);
254 message.add_version(write_options.metadata_version);
255 message.add_header_type(crate::MessageHeader::Schema);
256 message.add_bodyLength(0);
257 message.add_header(schema);
258 let data = message.finish();
260 fbb.finish(data, None);
261
262 let data = fbb.finished_data();
263 EncodedData {
264 ipc_message: data.to_vec(),
265 arrow_data: vec![],
266 }
267 }
268
269 fn _encode_dictionaries<I: Iterator<Item = i64>>(
270 &self,
271 column: &ArrayRef,
272 encoded_dictionaries: &mut Vec<EncodedData>,
273 dictionary_tracker: &mut DictionaryTracker,
274 write_options: &IpcWriteOptions,
275 dict_id: &mut I,
276 ) -> Result<(), ArrowError> {
277 match column.data_type() {
278 DataType::Struct(fields) => {
279 let s = as_struct_array(column);
280 for (field, column) in fields.iter().zip(s.columns()) {
281 self.encode_dictionaries(
282 field,
283 column,
284 encoded_dictionaries,
285 dictionary_tracker,
286 write_options,
287 dict_id,
288 )?;
289 }
290 }
291 DataType::RunEndEncoded(_, values) => {
292 let data = column.to_data();
293 if data.child_data().len() != 2 {
294 return Err(ArrowError::InvalidArgumentError(format!(
295 "The run encoded array should have exactly two child arrays. Found {}",
296 data.child_data().len()
297 )));
298 }
299 let values_array = make_array(data.child_data()[1].clone());
302 self.encode_dictionaries(
303 values,
304 &values_array,
305 encoded_dictionaries,
306 dictionary_tracker,
307 write_options,
308 dict_id,
309 )?;
310 }
311 DataType::List(field) => {
312 let list = as_list_array(column);
313 self.encode_dictionaries(
314 field,
315 list.values(),
316 encoded_dictionaries,
317 dictionary_tracker,
318 write_options,
319 dict_id,
320 )?;
321 }
322 DataType::LargeList(field) => {
323 let list = as_large_list_array(column);
324 self.encode_dictionaries(
325 field,
326 list.values(),
327 encoded_dictionaries,
328 dictionary_tracker,
329 write_options,
330 dict_id,
331 )?;
332 }
333 DataType::FixedSizeList(field, _) => {
334 let list = column
335 .as_any()
336 .downcast_ref::<FixedSizeListArray>()
337 .expect("Unable to downcast to fixed size list array");
338 self.encode_dictionaries(
339 field,
340 list.values(),
341 encoded_dictionaries,
342 dictionary_tracker,
343 write_options,
344 dict_id,
345 )?;
346 }
347 DataType::Map(field, _) => {
348 let map_array = as_map_array(column);
349
350 let (keys, values) = match field.data_type() {
351 DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
352 _ => panic!("Incorrect field data type {:?}", field.data_type()),
353 };
354
355 self.encode_dictionaries(
357 keys,
358 map_array.keys(),
359 encoded_dictionaries,
360 dictionary_tracker,
361 write_options,
362 dict_id,
363 )?;
364
365 self.encode_dictionaries(
367 values,
368 map_array.values(),
369 encoded_dictionaries,
370 dictionary_tracker,
371 write_options,
372 dict_id,
373 )?;
374 }
375 DataType::Union(fields, _) => {
376 let union = as_union_array(column);
377 for (type_id, field) in fields.iter() {
378 let column = union.child(type_id);
379 self.encode_dictionaries(
380 field,
381 column,
382 encoded_dictionaries,
383 dictionary_tracker,
384 write_options,
385 dict_id,
386 )?;
387 }
388 }
389 _ => (),
390 }
391
392 Ok(())
393 }
394
395 fn encode_dictionaries<I: Iterator<Item = i64>>(
396 &self,
397 field: &Field,
398 column: &ArrayRef,
399 encoded_dictionaries: &mut Vec<EncodedData>,
400 dictionary_tracker: &mut DictionaryTracker,
401 write_options: &IpcWriteOptions,
402 dict_id_seq: &mut I,
403 ) -> Result<(), ArrowError> {
404 match column.data_type() {
405 DataType::Dictionary(_key_type, _value_type) => {
406 let dict_data = column.to_data();
407 let dict_values = &dict_data.child_data()[0];
408
409 let values = make_array(dict_data.child_data()[0].clone());
410
411 self._encode_dictionaries(
412 &values,
413 encoded_dictionaries,
414 dictionary_tracker,
415 write_options,
416 dict_id_seq,
417 )?;
418
419 let dict_id = dict_id_seq
423 .next()
424 .or_else(|| field.dict_id())
425 .ok_or_else(|| {
426 ArrowError::IpcError(format!("no dict id for field {}", field.name()))
427 })?;
428
429 let emit = dictionary_tracker.insert(dict_id, column)?;
430
431 if emit {
432 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
433 dict_id,
434 dict_values,
435 write_options,
436 )?);
437 }
438 }
439 _ => self._encode_dictionaries(
440 column,
441 encoded_dictionaries,
442 dictionary_tracker,
443 write_options,
444 dict_id_seq,
445 )?,
446 }
447
448 Ok(())
449 }
450
451 pub fn encoded_batch(
455 &self,
456 batch: &RecordBatch,
457 dictionary_tracker: &mut DictionaryTracker,
458 write_options: &IpcWriteOptions,
459 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
460 let schema = batch.schema();
461 let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
462
463 let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
464
465 for (i, field) in schema.fields().iter().enumerate() {
466 let column = batch.column(i);
467 self.encode_dictionaries(
468 field,
469 column,
470 &mut encoded_dictionaries,
471 dictionary_tracker,
472 write_options,
473 &mut dict_id,
474 )?;
475 }
476
477 let encoded_message = self.record_batch_to_bytes(batch, write_options)?;
478 Ok((encoded_dictionaries, encoded_message))
479 }
480
481 fn record_batch_to_bytes(
484 &self,
485 batch: &RecordBatch,
486 write_options: &IpcWriteOptions,
487 ) -> Result<EncodedData, ArrowError> {
488 let mut fbb = FlatBufferBuilder::new();
489
490 let mut nodes: Vec<crate::FieldNode> = vec![];
491 let mut buffers: Vec<crate::Buffer> = vec![];
492 let mut arrow_data: Vec<u8> = vec![];
493 let mut offset = 0;
494
495 let batch_compression_type = write_options.batch_compression_type;
497
498 let compression = batch_compression_type.map(|batch_compression_type| {
499 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
500 c.add_method(crate::BodyCompressionMethod::BUFFER);
501 c.add_codec(batch_compression_type);
502 c.finish()
503 });
504
505 let compression_codec: Option<CompressionCodec> =
506 batch_compression_type.map(TryInto::try_into).transpose()?;
507
508 let mut variadic_buffer_counts = vec![];
509
510 for array in batch.columns() {
511 let array_data = array.to_data();
512 offset = write_array_data(
513 &array_data,
514 &mut buffers,
515 &mut arrow_data,
516 &mut nodes,
517 offset,
518 array.len(),
519 array.null_count(),
520 compression_codec,
521 write_options,
522 )?;
523
524 append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
525 }
526 let len = arrow_data.len();
528 let pad_len = pad_to_alignment(write_options.alignment, len);
529 arrow_data.extend_from_slice(&PADDING[..pad_len]);
530
531 let buffers = fbb.create_vector(&buffers);
533 let nodes = fbb.create_vector(&nodes);
534 let variadic_buffer = if variadic_buffer_counts.is_empty() {
535 None
536 } else {
537 Some(fbb.create_vector(&variadic_buffer_counts))
538 };
539
540 let root = {
541 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
542 batch_builder.add_length(batch.num_rows() as i64);
543 batch_builder.add_nodes(nodes);
544 batch_builder.add_buffers(buffers);
545 if let Some(c) = compression {
546 batch_builder.add_compression(c);
547 }
548
549 if let Some(v) = variadic_buffer {
550 batch_builder.add_variadicBufferCounts(v);
551 }
552 let b = batch_builder.finish();
553 b.as_union_value()
554 };
555 let mut message = crate::MessageBuilder::new(&mut fbb);
557 message.add_version(write_options.metadata_version);
558 message.add_header_type(crate::MessageHeader::RecordBatch);
559 message.add_bodyLength(arrow_data.len() as i64);
560 message.add_header(root);
561 let root = message.finish();
562 fbb.finish(root, None);
563 let finished_data = fbb.finished_data();
564
565 Ok(EncodedData {
566 ipc_message: finished_data.to_vec(),
567 arrow_data,
568 })
569 }
570
571 fn dictionary_batch_to_bytes(
574 &self,
575 dict_id: i64,
576 array_data: &ArrayData,
577 write_options: &IpcWriteOptions,
578 ) -> Result<EncodedData, ArrowError> {
579 let mut fbb = FlatBufferBuilder::new();
580
581 let mut nodes: Vec<crate::FieldNode> = vec![];
582 let mut buffers: Vec<crate::Buffer> = vec![];
583 let mut arrow_data: Vec<u8> = vec![];
584
585 let batch_compression_type = write_options.batch_compression_type;
587
588 let compression = batch_compression_type.map(|batch_compression_type| {
589 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
590 c.add_method(crate::BodyCompressionMethod::BUFFER);
591 c.add_codec(batch_compression_type);
592 c.finish()
593 });
594
595 let compression_codec: Option<CompressionCodec> = batch_compression_type
596 .map(|batch_compression_type| batch_compression_type.try_into())
597 .transpose()?;
598
599 write_array_data(
600 array_data,
601 &mut buffers,
602 &mut arrow_data,
603 &mut nodes,
604 0,
605 array_data.len(),
606 array_data.null_count(),
607 compression_codec,
608 write_options,
609 )?;
610
611 let mut variadic_buffer_counts = vec![];
612 append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
613
614 let len = arrow_data.len();
616 let pad_len = pad_to_alignment(write_options.alignment, len);
617 arrow_data.extend_from_slice(&PADDING[..pad_len]);
618
619 let buffers = fbb.create_vector(&buffers);
621 let nodes = fbb.create_vector(&nodes);
622 let variadic_buffer = if variadic_buffer_counts.is_empty() {
623 None
624 } else {
625 Some(fbb.create_vector(&variadic_buffer_counts))
626 };
627
628 let root = {
629 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
630 batch_builder.add_length(array_data.len() as i64);
631 batch_builder.add_nodes(nodes);
632 batch_builder.add_buffers(buffers);
633 if let Some(c) = compression {
634 batch_builder.add_compression(c);
635 }
636 if let Some(v) = variadic_buffer {
637 batch_builder.add_variadicBufferCounts(v);
638 }
639 batch_builder.finish()
640 };
641
642 let root = {
643 let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
644 batch_builder.add_id(dict_id);
645 batch_builder.add_data(root);
646 batch_builder.finish().as_union_value()
647 };
648
649 let root = {
650 let mut message_builder = crate::MessageBuilder::new(&mut fbb);
651 message_builder.add_version(write_options.metadata_version);
652 message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
653 message_builder.add_bodyLength(arrow_data.len() as i64);
654 message_builder.add_header(root);
655 message_builder.finish()
656 };
657
658 fbb.finish(root, None);
659 let finished_data = fbb.finished_data();
660
661 Ok(EncodedData {
662 ipc_message: finished_data.to_vec(),
663 arrow_data,
664 })
665 }
666}
667
668fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
669 match array.data_type() {
670 DataType::BinaryView | DataType::Utf8View => {
671 counts.push(array.buffers().len() as i64 - 1);
674 }
675 DataType::Dictionary(_, _) => {
676 }
679 _ => {
680 for child in array.child_data() {
681 append_variadic_buffer_counts(counts, child)
682 }
683 }
684 }
685}
686
687pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
688 match arr.data_type() {
689 DataType::RunEndEncoded(k, _) => match k.data_type() {
690 DataType::Int16 => {
691 Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
692 }
693 DataType::Int32 => {
694 Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
695 }
696 DataType::Int64 => {
697 Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
698 }
699 d => unreachable!("Unexpected data type {d}"),
700 },
701 d => Err(ArrowError::InvalidArgumentError(format!(
702 "The given array is not a run array. Data type of given array: {d}"
703 ))),
704 }
705}
706
707fn into_zero_offset_run_array<R: RunEndIndexType>(
710 run_array: RunArray<R>,
711) -> Result<RunArray<R>, ArrowError> {
712 let run_ends = run_array.run_ends();
713 if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
714 return Ok(run_array);
715 }
716
717 let start_physical_index = run_ends.get_start_physical_index();
719
720 let end_physical_index = run_ends.get_end_physical_index();
722
723 let physical_length = end_physical_index - start_physical_index + 1;
724
725 let offset = R::Native::usize_as(run_ends.offset());
727 let mut builder = BufferBuilder::<R::Native>::new(physical_length);
728 for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
729 builder.append(run_end_value.sub_wrapping(offset));
730 }
731 builder.append(R::Native::from_usize(run_array.len()).unwrap());
732 let new_run_ends = unsafe {
733 ArrayDataBuilder::new(R::DATA_TYPE)
736 .len(physical_length)
737 .add_buffer(builder.finish())
738 .build_unchecked()
739 };
740
741 let new_values = run_array
743 .values()
744 .slice(start_physical_index, physical_length)
745 .into_data();
746
747 let builder = ArrayDataBuilder::new(run_array.data_type().clone())
748 .len(run_array.len())
749 .add_child_data(new_run_ends)
750 .add_child_data(new_values);
751 let array_data = unsafe {
752 builder.build_unchecked()
755 };
756 Ok(array_data.into())
757}
758
759#[derive(Debug)]
765pub struct DictionaryTracker {
766 written: HashMap<i64, ArrayData>,
767 dict_ids: Vec<i64>,
768 error_on_replacement: bool,
769 preserve_dict_id: bool,
770}
771
772impl DictionaryTracker {
773 pub fn new(error_on_replacement: bool) -> Self {
784 Self {
785 written: HashMap::new(),
786 dict_ids: Vec::new(),
787 error_on_replacement,
788 preserve_dict_id: true,
789 }
790 }
791
792 pub fn new_with_preserve_dict_id(error_on_replacement: bool, preserve_dict_id: bool) -> Self {
798 Self {
799 written: HashMap::new(),
800 dict_ids: Vec::new(),
801 error_on_replacement,
802 preserve_dict_id,
803 }
804 }
805
806 pub fn set_dict_id(&mut self, field: &Field) -> i64 {
814 let next = if self.preserve_dict_id {
815 field.dict_id().expect("no dict_id in field")
816 } else {
817 self.dict_ids
818 .last()
819 .copied()
820 .map(|i| i + 1)
821 .unwrap_or_default()
822 };
823
824 self.dict_ids.push(next);
825 next
826 }
827
828 pub fn dict_id(&mut self) -> &[i64] {
831 &self.dict_ids
832 }
833
834 pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
844 let dict_data = column.to_data();
845 let dict_values = &dict_data.child_data()[0];
846
847 if let Some(last) = self.written.get(&dict_id) {
849 if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
850 return Ok(false);
852 }
853 if self.error_on_replacement {
854 if last.child_data()[0] == *dict_values {
856 return Ok(false);
858 }
859 return Err(ArrowError::InvalidArgumentError(
860 "Dictionary replacement detected when writing IPC file format. \
861 Arrow IPC files only support a single dictionary for a given field \
862 across all batches."
863 .to_string(),
864 ));
865 }
866 }
867
868 self.written.insert(dict_id, dict_data);
869 Ok(true)
870 }
871}
872
873pub struct FileWriter<W> {
875 writer: W,
877 write_options: IpcWriteOptions,
879 schema: SchemaRef,
881 block_offsets: usize,
883 dictionary_blocks: Vec<crate::Block>,
885 record_blocks: Vec<crate::Block>,
887 finished: bool,
889 dictionary_tracker: DictionaryTracker,
891 custom_metadata: HashMap<String, String>,
893
894 data_gen: IpcDataGenerator,
895}
896
897impl<W: Write> FileWriter<BufWriter<W>> {
898 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
902 Self::try_new(BufWriter::new(writer), schema)
903 }
904}
905
906impl<W: Write> FileWriter<W> {
907 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
915 let write_options = IpcWriteOptions::default();
916 Self::try_new_with_options(writer, schema, write_options)
917 }
918
919 pub fn try_new_with_options(
927 mut writer: W,
928 schema: &Schema,
929 write_options: IpcWriteOptions,
930 ) -> Result<Self, ArrowError> {
931 let data_gen = IpcDataGenerator::default();
932 let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
934 let header_size = super::ARROW_MAGIC.len() + pad_len;
935 writer.write_all(&super::ARROW_MAGIC)?;
936 writer.write_all(&PADDING[..pad_len])?;
937 let preserve_dict_id = write_options.preserve_dict_id;
939 let mut dictionary_tracker =
940 DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
941 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
942 schema,
943 &mut dictionary_tracker,
944 &write_options,
945 );
946 let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
947 Ok(Self {
948 writer,
949 write_options,
950 schema: Arc::new(schema.clone()),
951 block_offsets: meta + data + header_size,
952 dictionary_blocks: vec![],
953 record_blocks: vec![],
954 finished: false,
955 dictionary_tracker,
956 custom_metadata: HashMap::new(),
957 data_gen,
958 })
959 }
960
961 pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
963 self.custom_metadata.insert(key.into(), value.into());
964 }
965
966 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
968 if self.finished {
969 return Err(ArrowError::IpcError(
970 "Cannot write record batch to file writer as it is closed".to_string(),
971 ));
972 }
973
974 let (encoded_dictionaries, encoded_message) = self.data_gen.encoded_batch(
975 batch,
976 &mut self.dictionary_tracker,
977 &self.write_options,
978 )?;
979
980 for encoded_dictionary in encoded_dictionaries {
981 let (meta, data) =
982 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
983
984 let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
985 self.dictionary_blocks.push(block);
986 self.block_offsets += meta + data;
987 }
988
989 let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
990 let block = crate::Block::new(
992 self.block_offsets as i64,
993 meta as i32, data as i64,
995 );
996 self.record_blocks.push(block);
997 self.block_offsets += meta + data;
998 Ok(())
999 }
1000
1001 pub fn finish(&mut self) -> Result<(), ArrowError> {
1003 if self.finished {
1004 return Err(ArrowError::IpcError(
1005 "Cannot write footer to file writer as it is closed".to_string(),
1006 ));
1007 }
1008
1009 write_continuation(&mut self.writer, &self.write_options, 0)?;
1011
1012 let mut fbb = FlatBufferBuilder::new();
1013 let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1014 let record_batches = fbb.create_vector(&self.record_blocks);
1015 let preserve_dict_id = self.write_options.preserve_dict_id;
1016 let mut dictionary_tracker =
1017 DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
1018 let schema = IpcSchemaEncoder::new()
1019 .with_dictionary_tracker(&mut dictionary_tracker)
1020 .schema_to_fb_offset(&mut fbb, &self.schema);
1021 let fb_custom_metadata = (!self.custom_metadata.is_empty())
1022 .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1023
1024 let root = {
1025 let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1026 footer_builder.add_version(self.write_options.metadata_version);
1027 footer_builder.add_schema(schema);
1028 footer_builder.add_dictionaries(dictionaries);
1029 footer_builder.add_recordBatches(record_batches);
1030 if let Some(fb_custom_metadata) = fb_custom_metadata {
1031 footer_builder.add_custom_metadata(fb_custom_metadata);
1032 }
1033 footer_builder.finish()
1034 };
1035 fbb.finish(root, None);
1036 let footer_data = fbb.finished_data();
1037 self.writer.write_all(footer_data)?;
1038 self.writer
1039 .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1040 self.writer.write_all(&super::ARROW_MAGIC)?;
1041 self.writer.flush()?;
1042 self.finished = true;
1043
1044 Ok(())
1045 }
1046
1047 pub fn schema(&self) -> &SchemaRef {
1049 &self.schema
1050 }
1051
1052 pub fn get_ref(&self) -> &W {
1054 &self.writer
1055 }
1056
1057 pub fn get_mut(&mut self) -> &mut W {
1061 &mut self.writer
1062 }
1063
1064 pub fn flush(&mut self) -> Result<(), ArrowError> {
1068 self.writer.flush()?;
1069 Ok(())
1070 }
1071
1072 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1081 if !self.finished {
1082 self.finish()?;
1084 }
1085 Ok(self.writer)
1086 }
1087}
1088
1089impl<W: Write> RecordBatchWriter for FileWriter<W> {
1090 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1091 self.write(batch)
1092 }
1093
1094 fn close(mut self) -> Result<(), ArrowError> {
1095 self.finish()
1096 }
1097}
1098
1099pub struct StreamWriter<W> {
1101 writer: W,
1103 write_options: IpcWriteOptions,
1105 finished: bool,
1107 dictionary_tracker: DictionaryTracker,
1109
1110 data_gen: IpcDataGenerator,
1111}
1112
1113impl<W: Write> StreamWriter<BufWriter<W>> {
1114 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1118 Self::try_new(BufWriter::new(writer), schema)
1119 }
1120}
1121
1122impl<W: Write> StreamWriter<W> {
1123 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1131 let write_options = IpcWriteOptions::default();
1132 Self::try_new_with_options(writer, schema, write_options)
1133 }
1134
1135 pub fn try_new_with_options(
1141 mut writer: W,
1142 schema: &Schema,
1143 write_options: IpcWriteOptions,
1144 ) -> Result<Self, ArrowError> {
1145 let data_gen = IpcDataGenerator::default();
1146 let preserve_dict_id = write_options.preserve_dict_id;
1147 let mut dictionary_tracker =
1148 DictionaryTracker::new_with_preserve_dict_id(false, preserve_dict_id);
1149
1150 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1152 schema,
1153 &mut dictionary_tracker,
1154 &write_options,
1155 );
1156 write_message(&mut writer, encoded_message, &write_options)?;
1157 Ok(Self {
1158 writer,
1159 write_options,
1160 finished: false,
1161 dictionary_tracker,
1162 data_gen,
1163 })
1164 }
1165
1166 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1168 if self.finished {
1169 return Err(ArrowError::IpcError(
1170 "Cannot write record batch to stream writer as it is closed".to_string(),
1171 ));
1172 }
1173
1174 let (encoded_dictionaries, encoded_message) = self
1175 .data_gen
1176 .encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)
1177 .expect("StreamWriter is configured to not error on dictionary replacement");
1178
1179 for encoded_dictionary in encoded_dictionaries {
1180 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1181 }
1182
1183 write_message(&mut self.writer, encoded_message, &self.write_options)?;
1184 Ok(())
1185 }
1186
1187 pub fn finish(&mut self) -> Result<(), ArrowError> {
1189 if self.finished {
1190 return Err(ArrowError::IpcError(
1191 "Cannot write footer to stream writer as it is closed".to_string(),
1192 ));
1193 }
1194
1195 write_continuation(&mut self.writer, &self.write_options, 0)?;
1196
1197 self.finished = true;
1198
1199 Ok(())
1200 }
1201
1202 pub fn get_ref(&self) -> &W {
1204 &self.writer
1205 }
1206
1207 pub fn get_mut(&mut self) -> &mut W {
1211 &mut self.writer
1212 }
1213
1214 pub fn flush(&mut self) -> Result<(), ArrowError> {
1218 self.writer.flush()?;
1219 Ok(())
1220 }
1221
1222 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1260 if !self.finished {
1261 self.finish()?;
1263 }
1264 Ok(self.writer)
1265 }
1266}
1267
1268impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1269 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1270 self.write(batch)
1271 }
1272
1273 fn close(mut self) -> Result<(), ArrowError> {
1274 self.finish()
1275 }
1276}
1277
1278pub struct EncodedData {
1280 pub ipc_message: Vec<u8>,
1282 pub arrow_data: Vec<u8>,
1284}
1285pub fn write_message<W: Write>(
1287 mut writer: W,
1288 encoded: EncodedData,
1289 write_options: &IpcWriteOptions,
1290) -> Result<(usize, usize), ArrowError> {
1291 let arrow_data_len = encoded.arrow_data.len();
1292 if arrow_data_len % usize::from(write_options.alignment) != 0 {
1293 return Err(ArrowError::MemoryError(
1294 "Arrow data not aligned".to_string(),
1295 ));
1296 }
1297
1298 let a = usize::from(write_options.alignment - 1);
1299 let buffer = encoded.ipc_message;
1300 let flatbuf_size = buffer.len();
1301 let prefix_size = if write_options.write_legacy_ipc_format {
1302 4
1303 } else {
1304 8
1305 };
1306 let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1307 let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1308
1309 write_continuation(
1310 &mut writer,
1311 write_options,
1312 (aligned_size - prefix_size) as i32,
1313 )?;
1314
1315 if flatbuf_size > 0 {
1317 writer.write_all(&buffer)?;
1318 }
1319 writer.write_all(&PADDING[..padding_bytes])?;
1321
1322 let body_len = if arrow_data_len > 0 {
1324 write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1325 } else {
1326 0
1327 };
1328
1329 Ok((aligned_size, body_len))
1330}
1331
1332fn write_body_buffers<W: Write>(
1333 mut writer: W,
1334 data: &[u8],
1335 alignment: u8,
1336) -> Result<usize, ArrowError> {
1337 let len = data.len();
1338 let pad_len = pad_to_alignment(alignment, len);
1339 let total_len = len + pad_len;
1340
1341 writer.write_all(data)?;
1343 if pad_len > 0 {
1344 writer.write_all(&PADDING[..pad_len])?;
1345 }
1346
1347 writer.flush()?;
1348 Ok(total_len)
1349}
1350
1351fn write_continuation<W: Write>(
1354 mut writer: W,
1355 write_options: &IpcWriteOptions,
1356 total_len: i32,
1357) -> Result<usize, ArrowError> {
1358 let mut written = 8;
1359
1360 match write_options.metadata_version {
1362 crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1363 unreachable!("Options with the metadata version cannot be created")
1364 }
1365 crate::MetadataVersion::V4 => {
1366 if !write_options.write_legacy_ipc_format {
1367 writer.write_all(&CONTINUATION_MARKER)?;
1369 written = 4;
1370 }
1371 writer.write_all(&total_len.to_le_bytes()[..])?;
1372 }
1373 crate::MetadataVersion::V5 => {
1374 writer.write_all(&CONTINUATION_MARKER)?;
1376 writer.write_all(&total_len.to_le_bytes()[..])?;
1377 }
1378 z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1379 };
1380
1381 writer.flush()?;
1382
1383 Ok(written)
1384}
1385
1386fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1390 if write_options.metadata_version < crate::MetadataVersion::V5 {
1391 !matches!(data_type, DataType::Null)
1392 } else {
1393 !matches!(
1394 data_type,
1395 DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1396 )
1397 }
1398}
1399
1400#[inline]
1402fn buffer_need_truncate(
1403 array_offset: usize,
1404 buffer: &Buffer,
1405 spec: &BufferSpec,
1406 min_length: usize,
1407) -> bool {
1408 spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1409}
1410
1411#[inline]
1413fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1414 match spec {
1415 BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1416 _ => 0,
1417 }
1418}
1419
1420fn reencode_offsets<O: OffsetSizeTrait>(
1423 offsets: &Buffer,
1424 data: &ArrayData,
1425) -> (Buffer, usize, usize) {
1426 let offsets_slice: &[O] = offsets.typed_data::<O>();
1427 let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1428
1429 let start_offset = offset_slice.first().unwrap();
1430 let end_offset = offset_slice.last().unwrap();
1431
1432 let offsets = match start_offset.as_usize() {
1433 0 => offsets.clone(),
1434 _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1435 };
1436
1437 let start_offset = start_offset.as_usize();
1438 let end_offset = end_offset.as_usize();
1439
1440 (offsets, start_offset, end_offset - start_offset)
1441}
1442
1443fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1449 if data.is_empty() {
1450 return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1451 }
1452
1453 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1454 let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1455 (offsets, values)
1456}
1457
1458fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1461 if data.is_empty() {
1462 return (
1463 MutableBuffer::new(0).into(),
1464 data.child_data()[0].slice(0, 0),
1465 );
1466 }
1467
1468 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1469 let child_data = data.child_data()[0].slice(original_start_offset, len);
1470 (offsets, child_data)
1471}
1472
1473#[allow(clippy::too_many_arguments)]
1475fn write_array_data(
1476 array_data: &ArrayData,
1477 buffers: &mut Vec<crate::Buffer>,
1478 arrow_data: &mut Vec<u8>,
1479 nodes: &mut Vec<crate::FieldNode>,
1480 offset: i64,
1481 num_rows: usize,
1482 null_count: usize,
1483 compression_codec: Option<CompressionCodec>,
1484 write_options: &IpcWriteOptions,
1485) -> Result<i64, ArrowError> {
1486 let mut offset = offset;
1487 if !matches!(array_data.data_type(), DataType::Null) {
1488 nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1489 } else {
1490 nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1493 }
1494 if has_validity_bitmap(array_data.data_type(), write_options) {
1495 let null_buffer = match array_data.nulls() {
1497 None => {
1498 let num_bytes = bit_util::ceil(num_rows, 8);
1500 let buffer = MutableBuffer::new(num_bytes);
1501 let buffer = buffer.with_bitset(num_bytes, true);
1502 buffer.into()
1503 }
1504 Some(buffer) => buffer.inner().sliced(),
1505 };
1506
1507 offset = write_buffer(
1508 null_buffer.as_slice(),
1509 buffers,
1510 arrow_data,
1511 offset,
1512 compression_codec,
1513 write_options.alignment,
1514 )?;
1515 }
1516
1517 let data_type = array_data.data_type();
1518 if matches!(data_type, DataType::Binary | DataType::Utf8) {
1519 let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1520 for buffer in [offsets, values] {
1521 offset = write_buffer(
1522 buffer.as_slice(),
1523 buffers,
1524 arrow_data,
1525 offset,
1526 compression_codec,
1527 write_options.alignment,
1528 )?;
1529 }
1530 } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1531 for buffer in array_data.buffers() {
1538 offset = write_buffer(
1539 buffer.as_slice(),
1540 buffers,
1541 arrow_data,
1542 offset,
1543 compression_codec,
1544 write_options.alignment,
1545 )?;
1546 }
1547 } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1548 let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1549 for buffer in [offsets, values] {
1550 offset = write_buffer(
1551 buffer.as_slice(),
1552 buffers,
1553 arrow_data,
1554 offset,
1555 compression_codec,
1556 write_options.alignment,
1557 )?;
1558 }
1559 } else if DataType::is_numeric(data_type)
1560 || DataType::is_temporal(data_type)
1561 || matches!(
1562 array_data.data_type(),
1563 DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1564 )
1565 {
1566 assert_eq!(array_data.buffers().len(), 1);
1568
1569 let buffer = &array_data.buffers()[0];
1570 let layout = layout(data_type);
1571 let spec = &layout.buffers[0];
1572
1573 let byte_width = get_buffer_element_width(spec);
1574 let min_length = array_data.len() * byte_width;
1575 let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1576 let byte_offset = array_data.offset() * byte_width;
1577 let buffer_length = min(min_length, buffer.len() - byte_offset);
1578 &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1579 } else {
1580 buffer.as_slice()
1581 };
1582 offset = write_buffer(
1583 buffer_slice,
1584 buffers,
1585 arrow_data,
1586 offset,
1587 compression_codec,
1588 write_options.alignment,
1589 )?;
1590 } else if matches!(data_type, DataType::Boolean) {
1591 assert_eq!(array_data.buffers().len(), 1);
1594
1595 let buffer = &array_data.buffers()[0];
1596 let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1597 offset = write_buffer(
1598 &buffer,
1599 buffers,
1600 arrow_data,
1601 offset,
1602 compression_codec,
1603 write_options.alignment,
1604 )?;
1605 } else if matches!(
1606 data_type,
1607 DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1608 ) {
1609 assert_eq!(array_data.buffers().len(), 1);
1610 assert_eq!(array_data.child_data().len(), 1);
1611
1612 let (offsets, sliced_child_data) = match data_type {
1614 DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1615 DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1616 DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1617 _ => unreachable!(),
1618 };
1619 offset = write_buffer(
1620 offsets.as_slice(),
1621 buffers,
1622 arrow_data,
1623 offset,
1624 compression_codec,
1625 write_options.alignment,
1626 )?;
1627 offset = write_array_data(
1628 &sliced_child_data,
1629 buffers,
1630 arrow_data,
1631 nodes,
1632 offset,
1633 sliced_child_data.len(),
1634 sliced_child_data.null_count(),
1635 compression_codec,
1636 write_options,
1637 )?;
1638 return Ok(offset);
1639 } else {
1640 for buffer in array_data.buffers() {
1641 offset = write_buffer(
1642 buffer,
1643 buffers,
1644 arrow_data,
1645 offset,
1646 compression_codec,
1647 write_options.alignment,
1648 )?;
1649 }
1650 }
1651
1652 match array_data.data_type() {
1653 DataType::Dictionary(_, _) => {}
1654 DataType::RunEndEncoded(_, _) => {
1655 let arr = unslice_run_array(array_data.clone())?;
1657 for data_ref in arr.child_data() {
1659 offset = write_array_data(
1661 data_ref,
1662 buffers,
1663 arrow_data,
1664 nodes,
1665 offset,
1666 data_ref.len(),
1667 data_ref.null_count(),
1668 compression_codec,
1669 write_options,
1670 )?;
1671 }
1672 }
1673 _ => {
1674 for data_ref in array_data.child_data() {
1676 offset = write_array_data(
1678 data_ref,
1679 buffers,
1680 arrow_data,
1681 nodes,
1682 offset,
1683 data_ref.len(),
1684 data_ref.null_count(),
1685 compression_codec,
1686 write_options,
1687 )?;
1688 }
1689 }
1690 }
1691 Ok(offset)
1692}
1693
1694fn write_buffer(
1707 buffer: &[u8], buffers: &mut Vec<crate::Buffer>, arrow_data: &mut Vec<u8>, offset: i64, compression_codec: Option<CompressionCodec>,
1712 alignment: u8,
1713) -> Result<i64, ArrowError> {
1714 let len: i64 = match compression_codec {
1715 Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
1716 None => {
1717 arrow_data.extend_from_slice(buffer);
1718 buffer.len()
1719 }
1720 }
1721 .try_into()
1722 .map_err(|e| {
1723 ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
1724 })?;
1725
1726 buffers.push(crate::Buffer::new(offset, len));
1728 let pad_len = pad_to_alignment(alignment, len as usize);
1730 arrow_data.extend_from_slice(&PADDING[..pad_len]);
1731
1732 Ok(offset + len + (pad_len as i64))
1733}
1734
1735const PADDING: [u8; 64] = [0; 64];
1736
1737#[inline]
1739fn pad_to_alignment(alignment: u8, len: usize) -> usize {
1740 let a = usize::from(alignment - 1);
1741 ((len + a) & !a) - len
1742}
1743
1744#[cfg(test)]
1745mod tests {
1746 use std::io::Cursor;
1747 use std::io::Seek;
1748
1749 use arrow_array::builder::GenericListBuilder;
1750 use arrow_array::builder::MapBuilder;
1751 use arrow_array::builder::UnionBuilder;
1752 use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
1753 use arrow_array::types::*;
1754 use arrow_buffer::ScalarBuffer;
1755
1756 use crate::convert::fb_to_schema;
1757 use crate::reader::*;
1758 use crate::root_as_footer;
1759 use crate::MetadataVersion;
1760
1761 use super::*;
1762
1763 fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
1764 let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
1765 writer.write(rb).unwrap();
1766 writer.finish().unwrap();
1767 writer.into_inner().unwrap()
1768 }
1769
1770 fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
1771 let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
1772 reader.next().unwrap().unwrap()
1773 }
1774
1775 fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
1776 const IPC_ALIGNMENT: usize = 8;
1780
1781 let mut stream_writer = StreamWriter::try_new_with_options(
1782 vec![],
1783 record.schema_ref(),
1784 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
1785 )
1786 .unwrap();
1787 stream_writer.write(record).unwrap();
1788 stream_writer.finish().unwrap();
1789 stream_writer.into_inner().unwrap()
1790 }
1791
1792 fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
1793 let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
1794 stream_reader.next().unwrap().unwrap()
1795 }
1796
1797 #[test]
1798 #[cfg(feature = "lz4")]
1799 fn test_write_empty_record_batch_lz4_compression() {
1800 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1801 let values: Vec<Option<i32>> = vec![];
1802 let array = Int32Array::from(values);
1803 let record_batch =
1804 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1805
1806 let mut file = tempfile::tempfile().unwrap();
1807
1808 {
1809 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1810 .unwrap()
1811 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1812 .unwrap();
1813
1814 let mut writer =
1815 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1816 writer.write(&record_batch).unwrap();
1817 writer.finish().unwrap();
1818 }
1819 file.rewind().unwrap();
1820 {
1821 let reader = FileReader::try_new(file, None).unwrap();
1823 for read_batch in reader {
1824 read_batch
1825 .unwrap()
1826 .columns()
1827 .iter()
1828 .zip(record_batch.columns())
1829 .for_each(|(a, b)| {
1830 assert_eq!(a.data_type(), b.data_type());
1831 assert_eq!(a.len(), b.len());
1832 assert_eq!(a.null_count(), b.null_count());
1833 });
1834 }
1835 }
1836 }
1837
1838 #[test]
1839 #[cfg(feature = "lz4")]
1840 fn test_write_file_with_lz4_compression() {
1841 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1842 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1843 let array = Int32Array::from(values);
1844 let record_batch =
1845 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1846
1847 let mut file = tempfile::tempfile().unwrap();
1848 {
1849 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1850 .unwrap()
1851 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1852 .unwrap();
1853
1854 let mut writer =
1855 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1856 writer.write(&record_batch).unwrap();
1857 writer.finish().unwrap();
1858 }
1859 file.rewind().unwrap();
1860 {
1861 let reader = FileReader::try_new(file, None).unwrap();
1863 for read_batch in reader {
1864 read_batch
1865 .unwrap()
1866 .columns()
1867 .iter()
1868 .zip(record_batch.columns())
1869 .for_each(|(a, b)| {
1870 assert_eq!(a.data_type(), b.data_type());
1871 assert_eq!(a.len(), b.len());
1872 assert_eq!(a.null_count(), b.null_count());
1873 });
1874 }
1875 }
1876 }
1877
1878 #[test]
1879 #[cfg(feature = "zstd")]
1880 fn test_write_file_with_zstd_compression() {
1881 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1882 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1883 let array = Int32Array::from(values);
1884 let record_batch =
1885 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1886 let mut file = tempfile::tempfile().unwrap();
1887 {
1888 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1889 .unwrap()
1890 .try_with_compression(Some(crate::CompressionType::ZSTD))
1891 .unwrap();
1892
1893 let mut writer =
1894 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1895 writer.write(&record_batch).unwrap();
1896 writer.finish().unwrap();
1897 }
1898 file.rewind().unwrap();
1899 {
1900 let reader = FileReader::try_new(file, None).unwrap();
1902 for read_batch in reader {
1903 read_batch
1904 .unwrap()
1905 .columns()
1906 .iter()
1907 .zip(record_batch.columns())
1908 .for_each(|(a, b)| {
1909 assert_eq!(a.data_type(), b.data_type());
1910 assert_eq!(a.len(), b.len());
1911 assert_eq!(a.null_count(), b.null_count());
1912 });
1913 }
1914 }
1915 }
1916
1917 #[test]
1918 fn test_write_file() {
1919 let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
1920 let values: Vec<Option<u32>> = vec![
1921 Some(999),
1922 None,
1923 Some(235),
1924 Some(123),
1925 None,
1926 None,
1927 None,
1928 None,
1929 None,
1930 ];
1931 let array1 = UInt32Array::from(values);
1932 let batch =
1933 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
1934 .unwrap();
1935 let mut file = tempfile::tempfile().unwrap();
1936 {
1937 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
1938
1939 writer.write(&batch).unwrap();
1940 writer.finish().unwrap();
1941 }
1942 file.rewind().unwrap();
1943
1944 {
1945 let mut reader = FileReader::try_new(file, None).unwrap();
1946 while let Some(Ok(read_batch)) = reader.next() {
1947 read_batch
1948 .columns()
1949 .iter()
1950 .zip(batch.columns())
1951 .for_each(|(a, b)| {
1952 assert_eq!(a.data_type(), b.data_type());
1953 assert_eq!(a.len(), b.len());
1954 assert_eq!(a.null_count(), b.null_count());
1955 });
1956 }
1957 }
1958 }
1959
1960 fn write_null_file(options: IpcWriteOptions) {
1961 let schema = Schema::new(vec![
1962 Field::new("nulls", DataType::Null, true),
1963 Field::new("int32s", DataType::Int32, false),
1964 Field::new("nulls2", DataType::Null, true),
1965 Field::new("f64s", DataType::Float64, false),
1966 ]);
1967 let array1 = NullArray::new(32);
1968 let array2 = Int32Array::from(vec![1; 32]);
1969 let array3 = NullArray::new(32);
1970 let array4 = Float64Array::from(vec![f64::NAN; 32]);
1971 let batch = RecordBatch::try_new(
1972 Arc::new(schema.clone()),
1973 vec![
1974 Arc::new(array1) as ArrayRef,
1975 Arc::new(array2) as ArrayRef,
1976 Arc::new(array3) as ArrayRef,
1977 Arc::new(array4) as ArrayRef,
1978 ],
1979 )
1980 .unwrap();
1981 let mut file = tempfile::tempfile().unwrap();
1982 {
1983 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
1984
1985 writer.write(&batch).unwrap();
1986 writer.finish().unwrap();
1987 }
1988
1989 file.rewind().unwrap();
1990
1991 {
1992 let reader = FileReader::try_new(file, None).unwrap();
1993 reader.for_each(|maybe_batch| {
1994 maybe_batch
1995 .unwrap()
1996 .columns()
1997 .iter()
1998 .zip(batch.columns())
1999 .for_each(|(a, b)| {
2000 assert_eq!(a.data_type(), b.data_type());
2001 assert_eq!(a.len(), b.len());
2002 assert_eq!(a.null_count(), b.null_count());
2003 });
2004 });
2005 }
2006 }
2007 #[test]
2008 fn test_write_null_file_v4() {
2009 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2010 write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2011 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2012 write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2013 }
2014
2015 #[test]
2016 fn test_write_null_file_v5() {
2017 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2018 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2019 }
2020
2021 #[test]
2022 fn track_union_nested_dict() {
2023 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2024
2025 let array = Arc::new(inner) as ArrayRef;
2026
2027 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 2, false);
2029 let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2030
2031 let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2032 let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2033
2034 let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2035
2036 let schema = Arc::new(Schema::new(vec![Field::new(
2037 "union",
2038 union.data_type().clone(),
2039 false,
2040 )]));
2041
2042 let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2043
2044 let gen = IpcDataGenerator {};
2045 let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2046 gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2047 .unwrap();
2048
2049 assert!(dict_tracker.written.contains_key(&2));
2052 }
2053
2054 #[test]
2055 fn track_struct_nested_dict() {
2056 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2057
2058 let array = Arc::new(inner) as ArrayRef;
2059
2060 let dctfield = Arc::new(Field::new_dict(
2062 "dict",
2063 array.data_type().clone(),
2064 false,
2065 2,
2066 false,
2067 ));
2068
2069 let s = StructArray::from(vec![(dctfield, array)]);
2070 let struct_array = Arc::new(s) as ArrayRef;
2071
2072 let schema = Arc::new(Schema::new(vec![Field::new(
2073 "struct",
2074 struct_array.data_type().clone(),
2075 false,
2076 )]));
2077
2078 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2079
2080 let gen = IpcDataGenerator {};
2081 let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2082 gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2083 .unwrap();
2084
2085 assert!(dict_tracker.written.contains_key(&2));
2086 }
2087
2088 fn write_union_file(options: IpcWriteOptions) {
2089 let schema = Schema::new(vec![Field::new_union(
2090 "union",
2091 vec![0, 1],
2092 vec![
2093 Field::new("a", DataType::Int32, false),
2094 Field::new("c", DataType::Float64, false),
2095 ],
2096 UnionMode::Sparse,
2097 )]);
2098 let mut builder = UnionBuilder::with_capacity_sparse(5);
2099 builder.append::<Int32Type>("a", 1).unwrap();
2100 builder.append_null::<Int32Type>("a").unwrap();
2101 builder.append::<Float64Type>("c", 3.0).unwrap();
2102 builder.append_null::<Float64Type>("c").unwrap();
2103 builder.append::<Int32Type>("a", 4).unwrap();
2104 let union = builder.build().unwrap();
2105
2106 let batch =
2107 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2108 .unwrap();
2109
2110 let mut file = tempfile::tempfile().unwrap();
2111 {
2112 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2113
2114 writer.write(&batch).unwrap();
2115 writer.finish().unwrap();
2116 }
2117 file.rewind().unwrap();
2118
2119 {
2120 let reader = FileReader::try_new(file, None).unwrap();
2121 reader.for_each(|maybe_batch| {
2122 maybe_batch
2123 .unwrap()
2124 .columns()
2125 .iter()
2126 .zip(batch.columns())
2127 .for_each(|(a, b)| {
2128 assert_eq!(a.data_type(), b.data_type());
2129 assert_eq!(a.len(), b.len());
2130 assert_eq!(a.null_count(), b.null_count());
2131 });
2132 });
2133 }
2134 }
2135
2136 #[test]
2137 fn test_write_union_file_v4_v5() {
2138 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2139 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2140 }
2141
2142 #[test]
2143 fn test_write_view_types() {
2144 const LONG_TEST_STRING: &str =
2145 "This is a long string to make sure binary view array handles it";
2146 let schema = Schema::new(vec![
2147 Field::new("field1", DataType::BinaryView, true),
2148 Field::new("field2", DataType::Utf8View, true),
2149 ]);
2150 let values: Vec<Option<&[u8]>> = vec![
2151 Some(b"foo"),
2152 Some(b"bar"),
2153 Some(LONG_TEST_STRING.as_bytes()),
2154 ];
2155 let binary_array = BinaryViewArray::from_iter(values);
2156 let utf8_array =
2157 StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2158 let record_batch = RecordBatch::try_new(
2159 Arc::new(schema.clone()),
2160 vec![Arc::new(binary_array), Arc::new(utf8_array)],
2161 )
2162 .unwrap();
2163
2164 let mut file = tempfile::tempfile().unwrap();
2165 {
2166 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2167 writer.write(&record_batch).unwrap();
2168 writer.finish().unwrap();
2169 }
2170 file.rewind().unwrap();
2171 {
2172 let mut reader = FileReader::try_new(&file, None).unwrap();
2173 let read_batch = reader.next().unwrap().unwrap();
2174 read_batch
2175 .columns()
2176 .iter()
2177 .zip(record_batch.columns())
2178 .for_each(|(a, b)| {
2179 assert_eq!(a, b);
2180 });
2181 }
2182 file.rewind().unwrap();
2183 {
2184 let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2185 let read_batch = reader.next().unwrap().unwrap();
2186 assert_eq!(read_batch.num_columns(), 1);
2187 let read_array = read_batch.column(0);
2188 let write_array = record_batch.column(0);
2189 assert_eq!(read_array, write_array);
2190 }
2191 }
2192
2193 #[test]
2194 fn truncate_ipc_record_batch() {
2195 fn create_batch(rows: usize) -> RecordBatch {
2196 let schema = Schema::new(vec![
2197 Field::new("a", DataType::Int32, false),
2198 Field::new("b", DataType::Utf8, false),
2199 ]);
2200
2201 let a = Int32Array::from_iter_values(0..rows as i32);
2202 let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2203
2204 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2205 }
2206
2207 let big_record_batch = create_batch(65536);
2208
2209 let length = 5;
2210 let small_record_batch = create_batch(length);
2211
2212 let offset = 2;
2213 let record_batch_slice = big_record_batch.slice(offset, length);
2214 assert!(
2215 serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2216 );
2217 assert_eq!(
2218 serialize_stream(&small_record_batch).len(),
2219 serialize_stream(&record_batch_slice).len()
2220 );
2221
2222 assert_eq!(
2223 deserialize_stream(serialize_stream(&record_batch_slice)),
2224 record_batch_slice
2225 );
2226 }
2227
2228 #[test]
2229 fn truncate_ipc_record_batch_with_nulls() {
2230 fn create_batch() -> RecordBatch {
2231 let schema = Schema::new(vec![
2232 Field::new("a", DataType::Int32, true),
2233 Field::new("b", DataType::Utf8, true),
2234 ]);
2235
2236 let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2237 let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2238
2239 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2240 }
2241
2242 let record_batch = create_batch();
2243 let record_batch_slice = record_batch.slice(1, 2);
2244 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2245
2246 assert!(
2247 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2248 );
2249
2250 assert!(deserialized_batch.column(0).is_null(0));
2251 assert!(deserialized_batch.column(0).is_valid(1));
2252 assert!(deserialized_batch.column(1).is_valid(0));
2253 assert!(deserialized_batch.column(1).is_valid(1));
2254
2255 assert_eq!(record_batch_slice, deserialized_batch);
2256 }
2257
2258 #[test]
2259 fn truncate_ipc_dictionary_array() {
2260 fn create_batch() -> RecordBatch {
2261 let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2262 .into_iter()
2263 .collect();
2264 let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2265
2266 let array = DictionaryArray::new(keys, Arc::new(values));
2267
2268 let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2269
2270 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2271 }
2272
2273 let record_batch = create_batch();
2274 let record_batch_slice = record_batch.slice(1, 2);
2275 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2276
2277 assert!(
2278 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2279 );
2280
2281 assert!(deserialized_batch.column(0).is_valid(0));
2282 assert!(deserialized_batch.column(0).is_null(1));
2283
2284 assert_eq!(record_batch_slice, deserialized_batch);
2285 }
2286
2287 #[test]
2288 fn truncate_ipc_struct_array() {
2289 fn create_batch() -> RecordBatch {
2290 let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2291 .into_iter()
2292 .collect();
2293 let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2294
2295 let struct_array = StructArray::from(vec![
2296 (
2297 Arc::new(Field::new("s", DataType::Utf8, true)),
2298 Arc::new(strings) as ArrayRef,
2299 ),
2300 (
2301 Arc::new(Field::new("c", DataType::Int32, true)),
2302 Arc::new(ints) as ArrayRef,
2303 ),
2304 ]);
2305
2306 let schema = Schema::new(vec![Field::new(
2307 "struct_array",
2308 struct_array.data_type().clone(),
2309 true,
2310 )]);
2311
2312 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2313 }
2314
2315 let record_batch = create_batch();
2316 let record_batch_slice = record_batch.slice(1, 2);
2317 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2318
2319 assert!(
2320 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2321 );
2322
2323 let structs = deserialized_batch
2324 .column(0)
2325 .as_any()
2326 .downcast_ref::<StructArray>()
2327 .unwrap();
2328
2329 assert!(structs.column(0).is_null(0));
2330 assert!(structs.column(0).is_valid(1));
2331 assert!(structs.column(1).is_valid(0));
2332 assert!(structs.column(1).is_null(1));
2333 assert_eq!(record_batch_slice, deserialized_batch);
2334 }
2335
2336 #[test]
2337 fn truncate_ipc_string_array_with_all_empty_string() {
2338 fn create_batch() -> RecordBatch {
2339 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2340 let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2341 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2342 }
2343
2344 let record_batch = create_batch();
2345 let record_batch_slice = record_batch.slice(0, 1);
2346 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2347
2348 assert!(
2349 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2350 );
2351 assert_eq!(record_batch_slice, deserialized_batch);
2352 }
2353
2354 #[test]
2355 fn test_stream_writer_writes_array_slice() {
2356 let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2357 assert_eq!(
2358 vec![Some(1), Some(2), Some(3)],
2359 array.iter().collect::<Vec<_>>()
2360 );
2361
2362 let sliced = array.slice(1, 2);
2363 assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2364
2365 let batch = RecordBatch::try_new(
2366 Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2367 vec![Arc::new(sliced)],
2368 )
2369 .expect("new batch");
2370
2371 let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2372 writer.write(&batch).expect("write");
2373 let outbuf = writer.into_inner().expect("inner");
2374
2375 let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2376 let read_batch = reader.next().unwrap().expect("read batch");
2377
2378 let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2379 assert_eq!(
2380 vec![Some(2), Some(3)],
2381 read_array.iter().collect::<Vec<_>>()
2382 );
2383 }
2384
2385 #[test]
2386 fn encode_bools_slice() {
2387 assert_bool_roundtrip([true, false], 1, 1);
2389
2390 assert_bool_roundtrip(
2392 [
2393 true, false, true, true, false, false, true, true, true, false, false, false, true,
2394 true, true, true, false, false, false, false, true, true, true, true, true, false,
2395 false, false, false, false,
2396 ],
2397 13,
2398 17,
2399 );
2400
2401 assert_bool_roundtrip(
2403 [
2404 true, false, true, true, false, false, true, true, true, false, false, false,
2405 ],
2406 8,
2407 2,
2408 );
2409
2410 assert_bool_roundtrip(
2412 [
2413 true, false, true, true, false, false, true, true, true, false, false, false, true,
2414 true, true, true, true, false, false, false, false, false,
2415 ],
2416 8,
2417 8,
2418 );
2419 }
2420
2421 fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2422 let val_bool_field = Field::new("val", DataType::Boolean, false);
2423
2424 let schema = Arc::new(Schema::new(vec![val_bool_field]));
2425
2426 let bools = BooleanArray::from(bools.to_vec());
2427
2428 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2429 let batch = batch.slice(offset, length);
2430
2431 let data = serialize_stream(&batch);
2432 let batch2 = deserialize_stream(data);
2433 assert_eq!(batch, batch2);
2434 }
2435
2436 #[test]
2437 fn test_run_array_unslice() {
2438 let total_len = 80;
2439 let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2440 let repeats: Vec<usize> = vec![3, 4, 1, 2];
2441 let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2442 for ix in 0_usize..32 {
2443 let repeat: usize = repeats[ix % repeats.len()];
2444 let val: Option<i32> = vals[ix % vals.len()];
2445 input_array.resize(input_array.len() + repeat, val);
2446 }
2447
2448 let mut builder =
2450 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
2451 builder.extend(input_array.iter().copied());
2452 let run_array = builder.finish();
2453
2454 for slice_len in 1..=total_len {
2456 let sliced_run_array: RunArray<Int16Type> =
2458 run_array.slice(0, slice_len).into_data().into();
2459
2460 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2462 let typed = unsliced_run_array
2463 .downcast::<PrimitiveArray<Int32Type>>()
2464 .unwrap();
2465 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
2466 let actual: Vec<Option<i32>> = typed.into_iter().collect();
2467 assert_eq!(expected, actual);
2468
2469 let sliced_run_array: RunArray<Int16Type> = run_array
2471 .slice(total_len - slice_len, slice_len)
2472 .into_data()
2473 .into();
2474
2475 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2477 let typed = unsliced_run_array
2478 .downcast::<PrimitiveArray<Int32Type>>()
2479 .unwrap();
2480 let expected: Vec<Option<i32>> = input_array
2481 .iter()
2482 .skip(total_len - slice_len)
2483 .copied()
2484 .collect();
2485 let actual: Vec<Option<i32>> = typed.into_iter().collect();
2486 assert_eq!(expected, actual);
2487 }
2488 }
2489
2490 fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2491 let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
2492
2493 for i in 0..100_000 {
2494 for value in [i, i, i] {
2495 ls.values().append_value(value);
2496 }
2497 ls.append(true)
2498 }
2499
2500 ls.finish()
2501 }
2502
2503 fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2504 let mut ls =
2505 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2506
2507 for _i in 0..10_000 {
2508 for j in 0..10 {
2509 for value in [j, j, j, j] {
2510 ls.values().values().append_value(value);
2511 }
2512 ls.values().append(true)
2513 }
2514 ls.append(true);
2515 }
2516
2517 ls.finish()
2518 }
2519
2520 fn generate_map_array_data() -> MapArray {
2521 let keys_builder = UInt32Builder::new();
2522 let values_builder = UInt32Builder::new();
2523
2524 let mut builder = MapBuilder::new(None, keys_builder, values_builder);
2525
2526 for i in 0..100_000 {
2527 for _j in 0..3 {
2528 builder.keys().append_value(i);
2529 builder.values().append_value(i * 2);
2530 }
2531 builder.append(true).unwrap();
2532 }
2533
2534 builder.finish()
2535 }
2536
2537 fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
2540 let in_sliced = in_batch.slice(999, 1);
2542
2543 let bytes_batch = serialize_file(&in_batch);
2544 let bytes_sliced = serialize_file(&in_sliced);
2545
2546 assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
2548
2549 let out_batch = deserialize_file(bytes_batch);
2551 assert_eq!(in_batch, out_batch);
2552
2553 let out_sliced = deserialize_file(bytes_sliced);
2554 assert_eq!(in_sliced, out_sliced);
2555 }
2556
2557 #[test]
2558 fn encode_lists() {
2559 let val_inner = Field::new("item", DataType::UInt32, true);
2560 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2561 let schema = Arc::new(Schema::new(vec![val_list_field]));
2562
2563 let values = Arc::new(generate_list_data::<i32>());
2564
2565 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2566 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2567 }
2568
2569 #[test]
2570 fn encode_empty_list() {
2571 let val_inner = Field::new("item", DataType::UInt32, true);
2572 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2573 let schema = Arc::new(Schema::new(vec![val_list_field]));
2574
2575 let values = Arc::new(generate_list_data::<i32>());
2576
2577 let in_batch = RecordBatch::try_new(schema, vec![values])
2578 .unwrap()
2579 .slice(999, 0);
2580 let out_batch = deserialize_file(serialize_file(&in_batch));
2581 assert_eq!(in_batch, out_batch);
2582 }
2583
2584 #[test]
2585 fn encode_large_lists() {
2586 let val_inner = Field::new("item", DataType::UInt32, true);
2587 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
2588 let schema = Arc::new(Schema::new(vec![val_list_field]));
2589
2590 let values = Arc::new(generate_list_data::<i64>());
2591
2592 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2595 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2596 }
2597
2598 #[test]
2599 fn encode_nested_lists() {
2600 let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
2601 let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
2602 let list_field = Field::new("val", DataType::List(inner_list_field), true);
2603 let schema = Arc::new(Schema::new(vec![list_field]));
2604
2605 let values = Arc::new(generate_nested_list_data::<i32>());
2606
2607 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2608 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2609 }
2610
2611 #[test]
2612 fn encode_map_array() {
2613 let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
2614 let values = Arc::new(Field::new("values", DataType::UInt32, true));
2615 let map_field = Field::new_map("map", "entries", keys, values, false, true);
2616 let schema = Arc::new(Schema::new(vec![map_field]));
2617
2618 let values = Arc::new(generate_map_array_data());
2619
2620 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2621 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2622 }
2623
2624 #[test]
2625 fn test_decimal128_alignment16_is_sufficient() {
2626 const IPC_ALIGNMENT: usize = 16;
2627
2628 for num_cols in [1, 2, 3, 17, 50, 73, 99] {
2633 let num_rows = (num_cols * 7 + 11) % 100; let mut fields = Vec::new();
2636 let mut arrays = Vec::new();
2637 for i in 0..num_cols {
2638 let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
2639 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
2640 fields.push(field);
2641 arrays.push(Arc::new(array) as Arc<dyn Array>);
2642 }
2643 let schema = Schema::new(fields);
2644 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
2645
2646 let mut writer = FileWriter::try_new_with_options(
2647 Vec::new(),
2648 batch.schema_ref(),
2649 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2650 )
2651 .unwrap();
2652 writer.write(&batch).unwrap();
2653 writer.finish().unwrap();
2654
2655 let out: Vec<u8> = writer.into_inner().unwrap();
2656
2657 let buffer = Buffer::from_vec(out);
2658 let trailer_start = buffer.len() - 10;
2659 let footer_len =
2660 read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
2661 let footer =
2662 root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
2663
2664 let schema = fb_to_schema(footer.schema().unwrap());
2665
2666 let decoder =
2669 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
2670
2671 let batches = footer.recordBatches().unwrap();
2672
2673 let block = batches.get(0);
2674 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2675 let data = buffer.slice_with_length(block.offset() as _, block_len);
2676
2677 let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
2678
2679 assert_eq!(batch, batch2);
2680 }
2681 }
2682
2683 #[test]
2684 fn test_decimal128_alignment8_is_unaligned() {
2685 const IPC_ALIGNMENT: usize = 8;
2686
2687 let num_cols = 2;
2688 let num_rows = 1;
2689
2690 let mut fields = Vec::new();
2691 let mut arrays = Vec::new();
2692 for i in 0..num_cols {
2693 let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
2694 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
2695 fields.push(field);
2696 arrays.push(Arc::new(array) as Arc<dyn Array>);
2697 }
2698 let schema = Schema::new(fields);
2699 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
2700
2701 let mut writer = FileWriter::try_new_with_options(
2702 Vec::new(),
2703 batch.schema_ref(),
2704 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2705 )
2706 .unwrap();
2707 writer.write(&batch).unwrap();
2708 writer.finish().unwrap();
2709
2710 let out: Vec<u8> = writer.into_inner().unwrap();
2711
2712 let buffer = Buffer::from_vec(out);
2713 let trailer_start = buffer.len() - 10;
2714 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
2715 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
2716
2717 let schema = fb_to_schema(footer.schema().unwrap());
2718
2719 let decoder =
2722 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
2723
2724 let batches = footer.recordBatches().unwrap();
2725
2726 let block = batches.get(0);
2727 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2728 let data = buffer.slice_with_length(block.offset() as _, block_len);
2729
2730 let result = decoder.read_record_batch(block, &data);
2731
2732 let error = result.unwrap_err();
2733 assert_eq!(
2734 error.to_string(),
2735 "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
2736 offset from expected alignment of 16 by 8"
2737 );
2738 }
2739
2740 #[test]
2741 fn test_flush() {
2742 let num_cols = 2;
2745 let mut fields = Vec::new();
2746 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
2747 for i in 0..num_cols {
2748 let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
2749 fields.push(field);
2750 }
2751 let schema = Schema::new(fields);
2752 let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
2753 let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
2754 let mut stream_writer =
2755 StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
2756 .unwrap();
2757 let mut file_writer =
2758 FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
2759
2760 let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
2761 let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
2762 stream_writer.flush().unwrap();
2763 file_writer.flush().unwrap();
2764 let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
2765 let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
2766 let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
2767 let expected_stream_flushed_bytes = stream_out.len() - 8;
2771 let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
2774
2775 assert!(
2776 stream_bytes_written_on_new < stream_bytes_written_on_flush,
2777 "this test makes no sense if flush is not actually required"
2778 );
2779 assert!(
2780 file_bytes_written_on_new < file_bytes_written_on_flush,
2781 "this test makes no sense if flush is not actually required"
2782 );
2783 assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
2784 assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
2785 }
2786}