1mod stream;
24
25pub use stream::*;
26
27use flatbuffers::{VectorIter, VerifierOptions};
28use std::collections::{HashMap, VecDeque};
29use std::fmt;
30use std::io::{BufReader, Read, Seek, SeekFrom};
31use std::sync::Arc;
32
33use arrow_array::*;
34use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, ScalarBuffer};
35use arrow_data::ArrayData;
36use arrow_schema::*;
37
38use crate::compression::CompressionCodec;
39use crate::{Block, FieldNode, Message, MetadataVersion, CONTINUATION_MARKER};
40use DataType::*;
41
42fn read_buffer(
52 buf: &crate::Buffer,
53 a_data: &Buffer,
54 compression_codec: Option<CompressionCodec>,
55) -> Result<Buffer, ArrowError> {
56 let start_offset = buf.offset() as usize;
57 let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
58 match (buf_data.is_empty(), compression_codec) {
60 (true, _) | (_, None) => Ok(buf_data),
61 (false, Some(decompressor)) => decompressor.decompress_to_buffer(&buf_data),
62 }
63}
64
65fn create_array(
78 reader: &mut ArrayReader,
79 field: &Field,
80 variadic_counts: &mut VecDeque<i64>,
81 require_alignment: bool,
82) -> Result<ArrayRef, ArrowError> {
83 let data_type = field.data_type();
84 match data_type {
85 Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
86 reader.next_node(field)?,
87 data_type,
88 &[
89 reader.next_buffer()?,
90 reader.next_buffer()?,
91 reader.next_buffer()?,
92 ],
93 require_alignment,
94 ),
95 BinaryView | Utf8View => {
96 let count = variadic_counts
97 .pop_front()
98 .ok_or(ArrowError::IpcError(format!(
99 "Missing variadic count for {data_type} column"
100 )))?;
101 let count = count + 2; let buffers = (0..count)
103 .map(|_| reader.next_buffer())
104 .collect::<Result<Vec<_>, _>>()?;
105 create_primitive_array(
106 reader.next_node(field)?,
107 data_type,
108 &buffers,
109 require_alignment,
110 )
111 }
112 FixedSizeBinary(_) => create_primitive_array(
113 reader.next_node(field)?,
114 data_type,
115 &[reader.next_buffer()?, reader.next_buffer()?],
116 require_alignment,
117 ),
118 List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
119 let list_node = reader.next_node(field)?;
120 let list_buffers = [reader.next_buffer()?, reader.next_buffer()?];
121 let values = create_array(reader, list_field, variadic_counts, require_alignment)?;
122 create_list_array(
123 list_node,
124 data_type,
125 &list_buffers,
126 values,
127 require_alignment,
128 )
129 }
130 FixedSizeList(ref list_field, _) => {
131 let list_node = reader.next_node(field)?;
132 let list_buffers = [reader.next_buffer()?];
133 let values = create_array(reader, list_field, variadic_counts, require_alignment)?;
134 create_list_array(
135 list_node,
136 data_type,
137 &list_buffers,
138 values,
139 require_alignment,
140 )
141 }
142 Struct(struct_fields) => {
143 let struct_node = reader.next_node(field)?;
144 let null_buffer = reader.next_buffer()?;
145
146 let mut struct_arrays = vec![];
148 for struct_field in struct_fields {
151 let child = create_array(reader, struct_field, variadic_counts, require_alignment)?;
152 struct_arrays.push(child);
153 }
154 let null_count = struct_node.null_count() as usize;
155 let struct_array = if struct_arrays.is_empty() {
156 let len = struct_node.length() as usize;
159 StructArray::new_empty_fields(
160 len,
161 (null_count > 0).then(|| BooleanBuffer::new(null_buffer, 0, len).into()),
162 )
163 } else if null_count > 0 {
164 let len = struct_node.length() as usize;
166 let nulls = BooleanBuffer::new(null_buffer, 0, len).into();
167 StructArray::try_new(struct_fields.clone(), struct_arrays, Some(nulls))?
168 } else {
169 StructArray::try_new(struct_fields.clone(), struct_arrays, None)?
170 };
171 Ok(Arc::new(struct_array))
172 }
173 RunEndEncoded(run_ends_field, values_field) => {
174 let run_node = reader.next_node(field)?;
175 let run_ends =
176 create_array(reader, run_ends_field, variadic_counts, require_alignment)?;
177 let values = create_array(reader, values_field, variadic_counts, require_alignment)?;
178
179 let run_array_length = run_node.length() as usize;
180 let builder = ArrayData::builder(data_type.clone())
181 .len(run_array_length)
182 .offset(0)
183 .add_child_data(run_ends.into_data())
184 .add_child_data(values.into_data());
185
186 let array_data = if require_alignment {
187 builder.build()?
188 } else {
189 builder.build_aligned()?
190 };
191
192 Ok(make_array(array_data))
193 }
194 Dictionary(_, _) => {
196 let index_node = reader.next_node(field)?;
197 let index_buffers = [reader.next_buffer()?, reader.next_buffer()?];
198
199 let dict_id = field.dict_id().ok_or_else(|| {
200 ArrowError::ParseError(format!("Field {field} does not have dict id"))
201 })?;
202
203 let value_array = reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
204 ArrowError::ParseError(format!(
205 "Cannot find a dictionary batch with dict id: {dict_id}"
206 ))
207 })?;
208
209 create_dictionary_array(
210 index_node,
211 data_type,
212 &index_buffers,
213 value_array.clone(),
214 require_alignment,
215 )
216 }
217 Union(fields, mode) => {
218 let union_node = reader.next_node(field)?;
219 let len = union_node.length() as usize;
220
221 if reader.version < MetadataVersion::V5 {
224 reader.next_buffer()?;
225 }
226
227 let type_ids: ScalarBuffer<i8> = reader.next_buffer()?.slice_with_length(0, len).into();
228
229 let value_offsets = match mode {
230 UnionMode::Dense => {
231 let offsets: ScalarBuffer<i32> =
232 reader.next_buffer()?.slice_with_length(0, len * 4).into();
233 Some(offsets)
234 }
235 UnionMode::Sparse => None,
236 };
237
238 let mut children = Vec::with_capacity(fields.len());
239
240 for (_id, field) in fields.iter() {
241 let child = create_array(reader, field, variadic_counts, require_alignment)?;
242 children.push(child);
243 }
244
245 let array = UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?;
246 Ok(Arc::new(array))
247 }
248 Null => {
249 let node = reader.next_node(field)?;
250 let length = node.length();
251 let null_count = node.null_count();
252
253 if length != null_count {
254 return Err(ArrowError::SchemaError(format!(
255 "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
256 )));
257 }
258
259 let builder = ArrayData::builder(data_type.clone())
260 .len(length as usize)
261 .offset(0);
262
263 let array_data = if require_alignment {
264 builder.build()?
265 } else {
266 builder.build_aligned()?
267 };
268
269 Ok(Arc::new(NullArray::from(array_data)))
271 }
272 _ => create_primitive_array(
273 reader.next_node(field)?,
274 data_type,
275 &[reader.next_buffer()?, reader.next_buffer()?],
276 require_alignment,
277 ),
278 }
279}
280
281fn create_primitive_array(
284 field_node: &FieldNode,
285 data_type: &DataType,
286 buffers: &[Buffer],
287 require_alignment: bool,
288) -> Result<ArrayRef, ArrowError> {
289 let length = field_node.length() as usize;
290 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
291 let builder = match data_type {
292 Utf8 | Binary | LargeBinary | LargeUtf8 => {
293 ArrayData::builder(data_type.clone())
295 .len(length)
296 .buffers(buffers[1..3].to_vec())
297 .null_bit_buffer(null_buffer)
298 }
299 BinaryView | Utf8View => ArrayData::builder(data_type.clone())
300 .len(length)
301 .buffers(buffers[1..].to_vec())
302 .null_bit_buffer(null_buffer),
303 _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
304 ArrayData::builder(data_type.clone())
306 .len(length)
307 .add_buffer(buffers[1].clone())
308 .null_bit_buffer(null_buffer)
309 }
310 t => unreachable!("Data type {:?} either unsupported or not primitive", t),
311 };
312
313 let array_data = if require_alignment {
314 builder.build()?
315 } else {
316 builder.build_aligned()?
317 };
318
319 Ok(make_array(array_data))
320}
321
322fn create_list_array(
325 field_node: &FieldNode,
326 data_type: &DataType,
327 buffers: &[Buffer],
328 child_array: ArrayRef,
329 require_alignment: bool,
330) -> Result<ArrayRef, ArrowError> {
331 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
332 let length = field_node.length() as usize;
333 let child_data = child_array.into_data();
334 let builder = match data_type {
335 List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
336 .len(length)
337 .add_buffer(buffers[1].clone())
338 .add_child_data(child_data)
339 .null_bit_buffer(null_buffer),
340
341 FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
342 .len(length)
343 .add_child_data(child_data)
344 .null_bit_buffer(null_buffer),
345
346 _ => unreachable!("Cannot create list or map array from {:?}", data_type),
347 };
348
349 let array_data = if require_alignment {
350 builder.build()?
351 } else {
352 builder.build_aligned()?
353 };
354
355 Ok(make_array(array_data))
356}
357
358fn create_dictionary_array(
361 field_node: &FieldNode,
362 data_type: &DataType,
363 buffers: &[Buffer],
364 value_array: ArrayRef,
365 require_alignment: bool,
366) -> Result<ArrayRef, ArrowError> {
367 if let Dictionary(_, _) = *data_type {
368 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
369 let builder = ArrayData::builder(data_type.clone())
370 .len(field_node.length() as usize)
371 .add_buffer(buffers[1].clone())
372 .add_child_data(value_array.into_data())
373 .null_bit_buffer(null_buffer);
374
375 let array_data = if require_alignment {
376 builder.build()?
377 } else {
378 builder.build_aligned()?
379 };
380
381 Ok(make_array(array_data))
382 } else {
383 unreachable!("Cannot create dictionary array from {:?}", data_type)
384 }
385}
386
387struct ArrayReader<'a> {
389 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
391 compression: Option<CompressionCodec>,
393 version: MetadataVersion,
395 data: &'a Buffer,
397 nodes: VectorIter<'a, FieldNode>,
399 buffers: VectorIter<'a, crate::Buffer>,
401}
402
403impl<'a> ArrayReader<'a> {
404 fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
405 read_buffer(self.buffers.next().unwrap(), self.data, self.compression)
406 }
407
408 fn skip_buffer(&mut self) {
409 self.buffers.next().unwrap();
410 }
411
412 fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
413 self.nodes.next().ok_or_else(|| {
414 ArrowError::SchemaError(format!(
415 "Invalid data for schema. {} refers to node not found in schema",
416 field
417 ))
418 })
419 }
420
421 fn skip_field(
422 &mut self,
423 field: &Field,
424 variadic_count: &mut VecDeque<i64>,
425 ) -> Result<(), ArrowError> {
426 self.next_node(field)?;
427
428 match field.data_type() {
429 Utf8 | Binary | LargeBinary | LargeUtf8 => {
430 for _ in 0..3 {
431 self.skip_buffer()
432 }
433 }
434 Utf8View | BinaryView => {
435 let count = variadic_count
436 .pop_front()
437 .ok_or(ArrowError::IpcError(format!(
438 "Missing variadic count for {} column",
439 field.data_type()
440 )))?;
441 let count = count + 2; for _i in 0..count {
443 self.skip_buffer()
444 }
445 }
446 FixedSizeBinary(_) => {
447 self.skip_buffer();
448 self.skip_buffer();
449 }
450 List(list_field) | LargeList(list_field) | Map(list_field, _) => {
451 self.skip_buffer();
452 self.skip_buffer();
453 self.skip_field(list_field, variadic_count)?;
454 }
455 FixedSizeList(list_field, _) => {
456 self.skip_buffer();
457 self.skip_field(list_field, variadic_count)?;
458 }
459 Struct(struct_fields) => {
460 self.skip_buffer();
461
462 for struct_field in struct_fields {
464 self.skip_field(struct_field, variadic_count)?
465 }
466 }
467 RunEndEncoded(run_ends_field, values_field) => {
468 self.skip_field(run_ends_field, variadic_count)?;
469 self.skip_field(values_field, variadic_count)?;
470 }
471 Dictionary(_, _) => {
472 self.skip_buffer(); self.skip_buffer(); }
475 Union(fields, mode) => {
476 self.skip_buffer(); match mode {
479 UnionMode::Dense => self.skip_buffer(),
480 UnionMode::Sparse => {}
481 };
482
483 for (_, field) in fields.iter() {
484 self.skip_field(field, variadic_count)?
485 }
486 }
487 Null => {} _ => {
489 self.skip_buffer();
490 self.skip_buffer();
491 }
492 };
493 Ok(())
494 }
495}
496
497pub fn read_record_batch(
508 buf: &Buffer,
509 batch: crate::RecordBatch,
510 schema: SchemaRef,
511 dictionaries_by_id: &HashMap<i64, ArrayRef>,
512 projection: Option<&[usize]>,
513 metadata: &MetadataVersion,
514) -> Result<RecordBatch, ArrowError> {
515 read_record_batch_impl(
516 buf,
517 batch,
518 schema,
519 dictionaries_by_id,
520 projection,
521 metadata,
522 false,
523 )
524}
525
526pub fn read_dictionary(
529 buf: &Buffer,
530 batch: crate::DictionaryBatch,
531 schema: &Schema,
532 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
533 metadata: &MetadataVersion,
534) -> Result<(), ArrowError> {
535 read_dictionary_impl(buf, batch, schema, dictionaries_by_id, metadata, false)
536}
537
538fn read_record_batch_impl(
539 buf: &Buffer,
540 batch: crate::RecordBatch,
541 schema: SchemaRef,
542 dictionaries_by_id: &HashMap<i64, ArrayRef>,
543 projection: Option<&[usize]>,
544 metadata: &MetadataVersion,
545 require_alignment: bool,
546) -> Result<RecordBatch, ArrowError> {
547 let buffers = batch.buffers().ok_or_else(|| {
548 ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
549 })?;
550 let field_nodes = batch.nodes().ok_or_else(|| {
551 ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
552 })?;
553
554 let mut variadic_counts: VecDeque<i64> =
555 batch.variadicBufferCounts().into_iter().flatten().collect();
556
557 let batch_compression = batch.compression();
558 let compression = batch_compression
559 .map(|batch_compression| batch_compression.codec().try_into())
560 .transpose()?;
561
562 let mut reader = ArrayReader {
563 dictionaries_by_id,
564 compression,
565 version: *metadata,
566 data: buf,
567 nodes: field_nodes.iter(),
568 buffers: buffers.iter(),
569 };
570
571 let options = RecordBatchOptions::new().with_row_count(Some(batch.length() as usize));
572
573 if let Some(projection) = projection {
574 let mut arrays = vec![];
575 for (idx, field) in schema.fields().iter().enumerate() {
577 if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
579 let child =
580 create_array(&mut reader, field, &mut variadic_counts, require_alignment)?;
581 arrays.push((proj_idx, child));
582 } else {
583 reader.skip_field(field, &mut variadic_counts)?;
584 }
585 }
586 assert!(variadic_counts.is_empty());
587 arrays.sort_by_key(|t| t.0);
588 RecordBatch::try_new_with_options(
589 Arc::new(schema.project(projection)?),
590 arrays.into_iter().map(|t| t.1).collect(),
591 &options,
592 )
593 } else {
594 let mut children = vec![];
595 for field in schema.fields() {
597 let child = create_array(&mut reader, field, &mut variadic_counts, require_alignment)?;
598 children.push(child);
599 }
600 assert!(variadic_counts.is_empty());
601 RecordBatch::try_new_with_options(schema, children, &options)
602 }
603}
604
605fn read_dictionary_impl(
606 buf: &Buffer,
607 batch: crate::DictionaryBatch,
608 schema: &Schema,
609 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
610 metadata: &MetadataVersion,
611 require_alignment: bool,
612) -> Result<(), ArrowError> {
613 if batch.isDelta() {
614 return Err(ArrowError::InvalidArgumentError(
615 "delta dictionary batches not supported".to_string(),
616 ));
617 }
618
619 let id = batch.id();
620 let fields_using_this_dictionary = schema.fields_with_dict_id(id);
621 let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
622 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
623 })?;
624
625 let dictionary_values: ArrayRef = match first_field.data_type() {
629 DataType::Dictionary(_, ref value_type) => {
630 let value = value_type.as_ref().clone();
632 let schema = Schema::new(vec![Field::new("", value, true)]);
633 let record_batch = read_record_batch_impl(
635 buf,
636 batch.data().unwrap(),
637 Arc::new(schema),
638 dictionaries_by_id,
639 None,
640 metadata,
641 require_alignment,
642 )?;
643 Some(record_batch.column(0).clone())
644 }
645 _ => None,
646 }
647 .ok_or_else(|| {
648 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
649 })?;
650
651 dictionaries_by_id.insert(id, dictionary_values.clone());
655
656 Ok(())
657}
658
659fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
661 reader.seek(SeekFrom::Start(block.offset() as u64))?;
662 let body_len = block.bodyLength().to_usize().unwrap();
663 let metadata_len = block.metaDataLength().to_usize().unwrap();
664 let total_len = body_len.checked_add(metadata_len).unwrap();
665
666 let mut buf = MutableBuffer::from_len_zeroed(total_len);
667 reader.read_exact(&mut buf)?;
668 Ok(buf.into())
669}
670
671fn parse_message(buf: &[u8]) -> Result<Message, ArrowError> {
675 let buf = match buf[..4] == CONTINUATION_MARKER {
676 true => &buf[8..],
677 false => &buf[4..],
678 };
679 crate::root_as_message(buf)
680 .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
681}
682
683pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
687 if buf[4..] != super::ARROW_MAGIC {
688 return Err(ArrowError::ParseError(
689 "Arrow file does not contain correct footer".to_string(),
690 ));
691 }
692
693 let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
695 footer_len
696 .try_into()
697 .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
698}
699
700#[derive(Debug)]
761pub struct FileDecoder {
762 schema: SchemaRef,
763 dictionaries: HashMap<i64, ArrayRef>,
764 version: MetadataVersion,
765 projection: Option<Vec<usize>>,
766 require_alignment: bool,
767}
768
769impl FileDecoder {
770 pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
772 Self {
773 schema,
774 version,
775 dictionaries: Default::default(),
776 projection: None,
777 require_alignment: false,
778 }
779 }
780
781 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
783 self.projection = Some(projection);
784 self
785 }
786
787 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
800 self.require_alignment = require_alignment;
801 self
802 }
803
804 fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message<'a>, ArrowError> {
805 let message = parse_message(buf)?;
806
807 if self.version != MetadataVersion::V1 && message.version() != self.version {
809 return Err(ArrowError::IpcError(
810 "Could not read IPC message as metadata versions mismatch".to_string(),
811 ));
812 }
813 Ok(message)
814 }
815
816 pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
818 let message = self.read_message(buf)?;
819 match message.header_type() {
820 crate::MessageHeader::DictionaryBatch => {
821 let batch = message.header_as_dictionary_batch().unwrap();
822 read_dictionary_impl(
823 &buf.slice(block.metaDataLength() as _),
824 batch,
825 &self.schema,
826 &mut self.dictionaries,
827 &message.version(),
828 self.require_alignment,
829 )
830 }
831 t => Err(ArrowError::ParseError(format!(
832 "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
833 ))),
834 }
835 }
836
837 pub fn read_record_batch(
839 &self,
840 block: &Block,
841 buf: &Buffer,
842 ) -> Result<Option<RecordBatch>, ArrowError> {
843 let message = self.read_message(buf)?;
844 match message.header_type() {
845 crate::MessageHeader::Schema => Err(ArrowError::IpcError(
846 "Not expecting a schema when messages are read".to_string(),
847 )),
848 crate::MessageHeader::RecordBatch => {
849 let batch = message.header_as_record_batch().ok_or_else(|| {
850 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
851 })?;
852 read_record_batch_impl(
854 &buf.slice(block.metaDataLength() as _),
855 batch,
856 self.schema.clone(),
857 &self.dictionaries,
858 self.projection.as_deref(),
859 &message.version(),
860 self.require_alignment,
861 )
862 .map(Some)
863 }
864 crate::MessageHeader::NONE => Ok(None),
865 t => Err(ArrowError::InvalidArgumentError(format!(
866 "Reading types other than record batches not yet supported, unable to read {t:?}"
867 ))),
868 }
869 }
870}
871
872#[derive(Debug)]
874pub struct FileReaderBuilder {
875 projection: Option<Vec<usize>>,
877 max_footer_fb_tables: usize,
879 max_footer_fb_depth: usize,
881}
882
883impl Default for FileReaderBuilder {
884 fn default() -> Self {
885 let verifier_options = VerifierOptions::default();
886 Self {
887 max_footer_fb_tables: verifier_options.max_tables,
888 max_footer_fb_depth: verifier_options.max_depth,
889 projection: None,
890 }
891 }
892}
893
894impl FileReaderBuilder {
895 pub fn new() -> Self {
899 Self::default()
900 }
901
902 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
904 self.projection = Some(projection);
905 self
906 }
907
908 pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
921 self.max_footer_fb_tables = max_footer_fb_tables;
922 self
923 }
924
925 pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
938 self.max_footer_fb_depth = max_footer_fb_depth;
939 self
940 }
941
942 pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
944 let mut buffer = [0; 10];
946 reader.seek(SeekFrom::End(-10))?;
947 reader.read_exact(&mut buffer)?;
948
949 let footer_len = read_footer_length(buffer)?;
950
951 let mut footer_data = vec![0; footer_len];
953 reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
954 reader.read_exact(&mut footer_data)?;
955
956 let verifier_options = VerifierOptions {
957 max_tables: self.max_footer_fb_tables,
958 max_depth: self.max_footer_fb_depth,
959 ..Default::default()
960 };
961 let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
962 |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
963 )?;
964
965 let blocks = footer.recordBatches().ok_or_else(|| {
966 ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
967 })?;
968
969 let total_blocks = blocks.len();
970
971 let ipc_schema = footer.schema().unwrap();
972 if !ipc_schema.endianness().equals_to_target_endianness() {
973 return Err(ArrowError::IpcError(
974 "the endianness of the source system does not match the endianness of the target system.".to_owned()
975 ));
976 }
977
978 let schema = crate::convert::fb_to_schema(ipc_schema);
979
980 let mut custom_metadata = HashMap::new();
981 if let Some(fb_custom_metadata) = footer.custom_metadata() {
982 for kv in fb_custom_metadata.into_iter() {
983 custom_metadata.insert(
984 kv.key().unwrap().to_string(),
985 kv.value().unwrap().to_string(),
986 );
987 }
988 }
989
990 let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
991 if let Some(projection) = self.projection {
992 decoder = decoder.with_projection(projection)
993 }
994
995 if let Some(dictionaries) = footer.dictionaries() {
997 for block in dictionaries {
998 let buf = read_block(&mut reader, block)?;
999 decoder.read_dictionary(block, &buf)?;
1000 }
1001 }
1002
1003 Ok(FileReader {
1004 reader,
1005 blocks: blocks.iter().copied().collect(),
1006 current_block: 0,
1007 total_blocks,
1008 decoder,
1009 custom_metadata,
1010 })
1011 }
1012}
1013
1014pub struct FileReader<R> {
1016 reader: R,
1018
1019 decoder: FileDecoder,
1021
1022 blocks: Vec<Block>,
1026
1027 current_block: usize,
1029
1030 total_blocks: usize,
1032
1033 custom_metadata: HashMap<String, String>,
1035}
1036
1037impl<R> fmt::Debug for FileReader<R> {
1038 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1039 f.debug_struct("FileReader<R>")
1040 .field("decoder", &self.decoder)
1041 .field("blocks", &self.blocks)
1042 .field("current_block", &self.current_block)
1043 .field("total_blocks", &self.total_blocks)
1044 .finish_non_exhaustive()
1045 }
1046}
1047
1048impl<R: Read + Seek> FileReader<BufReader<R>> {
1049 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1053 Self::try_new(BufReader::new(reader), projection)
1054 }
1055}
1056
1057impl<R: Read + Seek> FileReader<R> {
1058 pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1069 let builder = FileReaderBuilder {
1070 projection,
1071 ..Default::default()
1072 };
1073 builder.build(reader)
1074 }
1075
1076 pub fn custom_metadata(&self) -> &HashMap<String, String> {
1078 &self.custom_metadata
1079 }
1080
1081 pub fn num_batches(&self) -> usize {
1083 self.total_blocks
1084 }
1085
1086 pub fn schema(&self) -> SchemaRef {
1088 self.decoder.schema.clone()
1089 }
1090
1091 pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1095 if index >= self.total_blocks {
1096 Err(ArrowError::InvalidArgumentError(format!(
1097 "Cannot set batch to index {} from {} total batches",
1098 index, self.total_blocks
1099 )))
1100 } else {
1101 self.current_block = index;
1102 Ok(())
1103 }
1104 }
1105
1106 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1107 let block = &self.blocks[self.current_block];
1108 self.current_block += 1;
1109
1110 let buffer = read_block(&mut self.reader, block)?;
1112 self.decoder.read_record_batch(block, &buffer)
1113 }
1114
1115 pub fn get_ref(&self) -> &R {
1119 &self.reader
1120 }
1121
1122 pub fn get_mut(&mut self) -> &mut R {
1126 &mut self.reader
1127 }
1128}
1129
1130impl<R: Read + Seek> Iterator for FileReader<R> {
1131 type Item = Result<RecordBatch, ArrowError>;
1132
1133 fn next(&mut self) -> Option<Self::Item> {
1134 if self.current_block < self.total_blocks {
1136 self.maybe_next().transpose()
1137 } else {
1138 None
1139 }
1140 }
1141}
1142
1143impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1144 fn schema(&self) -> SchemaRef {
1145 self.schema()
1146 }
1147}
1148
1149pub struct StreamReader<R> {
1151 reader: R,
1153
1154 schema: SchemaRef,
1156
1157 dictionaries_by_id: HashMap<i64, ArrayRef>,
1161
1162 finished: bool,
1166
1167 projection: Option<(Vec<usize>, Schema)>,
1169}
1170
1171impl<R> fmt::Debug for StreamReader<R> {
1172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1173 f.debug_struct("StreamReader<R>")
1174 .field("reader", &"R")
1175 .field("schema", &self.schema)
1176 .field("dictionaries_by_id", &self.dictionaries_by_id)
1177 .field("finished", &self.finished)
1178 .field("projection", &self.projection)
1179 .finish()
1180 }
1181}
1182
1183impl<R: Read> StreamReader<BufReader<R>> {
1184 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1188 Self::try_new(BufReader::new(reader), projection)
1189 }
1190}
1191
1192impl<R: Read> StreamReader<R> {
1193 pub fn try_new(
1205 mut reader: R,
1206 projection: Option<Vec<usize>>,
1207 ) -> Result<StreamReader<R>, ArrowError> {
1208 let mut meta_size: [u8; 4] = [0; 4];
1210 reader.read_exact(&mut meta_size)?;
1211 let meta_len = {
1212 if meta_size == CONTINUATION_MARKER {
1215 reader.read_exact(&mut meta_size)?;
1216 }
1217 i32::from_le_bytes(meta_size)
1218 };
1219
1220 let mut meta_buffer = vec![0; meta_len as usize];
1221 reader.read_exact(&mut meta_buffer)?;
1222
1223 let message = crate::root_as_message(meta_buffer.as_slice()).map_err(|err| {
1224 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1225 })?;
1226 let ipc_schema: crate::Schema = message.header_as_schema().ok_or_else(|| {
1228 ArrowError::ParseError("Unable to read IPC message as schema".to_string())
1229 })?;
1230 let schema = crate::convert::fb_to_schema(ipc_schema);
1231
1232 let dictionaries_by_id = HashMap::new();
1234
1235 let projection = match projection {
1236 Some(projection_indices) => {
1237 let schema = schema.project(&projection_indices)?;
1238 Some((projection_indices, schema))
1239 }
1240 _ => None,
1241 };
1242 Ok(Self {
1243 reader,
1244 schema: Arc::new(schema),
1245 finished: false,
1246 dictionaries_by_id,
1247 projection,
1248 })
1249 }
1250
1251 #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1253 pub fn try_new_unbuffered(
1254 reader: R,
1255 projection: Option<Vec<usize>>,
1256 ) -> Result<Self, ArrowError> {
1257 Self::try_new(reader, projection)
1258 }
1259
1260 pub fn schema(&self) -> SchemaRef {
1262 self.schema.clone()
1263 }
1264
1265 pub fn is_finished(&self) -> bool {
1267 self.finished
1268 }
1269
1270 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1271 if self.finished {
1272 return Ok(None);
1273 }
1274 let mut meta_size: [u8; 4] = [0; 4];
1276
1277 match self.reader.read_exact(&mut meta_size) {
1278 Ok(()) => (),
1279 Err(e) => {
1280 return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1281 self.finished = true;
1285 Ok(None)
1286 } else {
1287 Err(ArrowError::from(e))
1288 };
1289 }
1290 }
1291
1292 let meta_len = {
1293 if meta_size == CONTINUATION_MARKER {
1296 self.reader.read_exact(&mut meta_size)?;
1297 }
1298 i32::from_le_bytes(meta_size)
1299 };
1300
1301 if meta_len == 0 {
1302 self.finished = true;
1304 return Ok(None);
1305 }
1306
1307 let mut meta_buffer = vec![0; meta_len as usize];
1308 self.reader.read_exact(&mut meta_buffer)?;
1309
1310 let vecs = &meta_buffer.to_vec();
1311 let message = crate::root_as_message(vecs).map_err(|err| {
1312 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1313 })?;
1314
1315 match message.header_type() {
1316 crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1317 "Not expecting a schema when messages are read".to_string(),
1318 )),
1319 crate::MessageHeader::RecordBatch => {
1320 let batch = message.header_as_record_batch().ok_or_else(|| {
1321 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1322 })?;
1323 let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1325 self.reader.read_exact(&mut buf)?;
1326
1327 read_record_batch_impl(
1328 &buf.into(),
1329 batch,
1330 self.schema(),
1331 &self.dictionaries_by_id,
1332 self.projection.as_ref().map(|x| x.0.as_ref()),
1333 &message.version(),
1334 false,
1335 )
1336 .map(Some)
1337 }
1338 crate::MessageHeader::DictionaryBatch => {
1339 let batch = message.header_as_dictionary_batch().ok_or_else(|| {
1340 ArrowError::IpcError(
1341 "Unable to read IPC message as dictionary batch".to_string(),
1342 )
1343 })?;
1344 let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1346 self.reader.read_exact(&mut buf)?;
1347
1348 read_dictionary_impl(
1349 &buf.into(),
1350 batch,
1351 &self.schema,
1352 &mut self.dictionaries_by_id,
1353 &message.version(),
1354 false,
1355 )?;
1356
1357 self.maybe_next()
1359 }
1360 crate::MessageHeader::NONE => Ok(None),
1361 t => Err(ArrowError::InvalidArgumentError(format!(
1362 "Reading types other than record batches not yet supported, unable to read {t:?} "
1363 ))),
1364 }
1365 }
1366
1367 pub fn get_ref(&self) -> &R {
1371 &self.reader
1372 }
1373
1374 pub fn get_mut(&mut self) -> &mut R {
1378 &mut self.reader
1379 }
1380}
1381
1382impl<R: Read> Iterator for StreamReader<R> {
1383 type Item = Result<RecordBatch, ArrowError>;
1384
1385 fn next(&mut self) -> Option<Self::Item> {
1386 self.maybe_next().transpose()
1387 }
1388}
1389
1390impl<R: Read> RecordBatchReader for StreamReader<R> {
1391 fn schema(&self) -> SchemaRef {
1392 self.schema.clone()
1393 }
1394}
1395
1396#[cfg(test)]
1397mod tests {
1398 use crate::writer::{unslice_run_array, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
1399
1400 use super::*;
1401
1402 use crate::root_as_message;
1403 use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1404 use arrow_array::types::*;
1405 use arrow_buffer::NullBuffer;
1406 use arrow_data::ArrayDataBuilder;
1407
1408 fn create_test_projection_schema() -> Schema {
1409 let list_data_type = DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
1411
1412 let fixed_size_list_data_type =
1413 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, false)), 3);
1414
1415 let union_fields = UnionFields::new(
1416 vec![0, 1],
1417 vec![
1418 Field::new("a", DataType::Int32, false),
1419 Field::new("b", DataType::Float64, false),
1420 ],
1421 );
1422
1423 let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1424
1425 let struct_fields = Fields::from(vec![
1426 Field::new("id", DataType::Int32, false),
1427 Field::new_list("list", Field::new("item", DataType::Int8, true), false),
1428 ]);
1429 let struct_data_type = DataType::Struct(struct_fields);
1430
1431 let run_encoded_data_type = DataType::RunEndEncoded(
1432 Arc::new(Field::new("run_ends", DataType::Int16, false)),
1433 Arc::new(Field::new("values", DataType::Int32, true)),
1434 );
1435
1436 Schema::new(vec![
1438 Field::new("f0", DataType::UInt32, false),
1439 Field::new("f1", DataType::Utf8, false),
1440 Field::new("f2", DataType::Boolean, false),
1441 Field::new("f3", union_data_type, true),
1442 Field::new("f4", DataType::Null, true),
1443 Field::new("f5", DataType::Float64, true),
1444 Field::new("f6", list_data_type, false),
1445 Field::new("f7", DataType::FixedSizeBinary(3), true),
1446 Field::new("f8", fixed_size_list_data_type, false),
1447 Field::new("f9", struct_data_type, false),
1448 Field::new("f10", run_encoded_data_type, false),
1449 Field::new("f11", DataType::Boolean, false),
1450 Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1451 Field::new("f13", DataType::Utf8, false),
1452 ])
1453 }
1454
1455 fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1456 let array0 = UInt32Array::from(vec![1, 2, 3]);
1458 let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1459 let array2 = BooleanArray::from(vec![true, false, true]);
1460
1461 let mut union_builder = UnionBuilder::new_dense();
1462 union_builder.append::<Int32Type>("a", 1).unwrap();
1463 union_builder.append::<Float64Type>("b", 10.1).unwrap();
1464 union_builder.append_null::<Float64Type>("b").unwrap();
1465 let array3 = union_builder.build().unwrap();
1466
1467 let array4 = NullArray::new(3);
1468 let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1469 let array6_values = vec![
1470 Some(vec![Some(10), Some(10), Some(10)]),
1471 Some(vec![Some(20), Some(20), Some(20)]),
1472 Some(vec![Some(30), Some(30)]),
1473 ];
1474 let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1475 let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1476 let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1477
1478 let array8_values = ArrayData::builder(DataType::Int32)
1479 .len(9)
1480 .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1481 .build()
1482 .unwrap();
1483 let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1484 .len(3)
1485 .add_child_data(array8_values)
1486 .build()
1487 .unwrap();
1488 let array8 = FixedSizeListArray::from(array8_data);
1489
1490 let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
1491 let array9_list: ArrayRef =
1492 Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
1493 Some(vec![Some(-10)]),
1494 Some(vec![Some(-20), Some(-20), Some(-20)]),
1495 Some(vec![Some(-30)]),
1496 ]));
1497 let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
1498 .add_child_data(array9_id.into_data())
1499 .add_child_data(array9_list.into_data())
1500 .len(3)
1501 .build()
1502 .unwrap();
1503 let array9: ArrayRef = Arc::new(StructArray::from(array9));
1504
1505 let array10_input = vec![Some(1_i32), None, None];
1506 let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1507 array10_builder.extend(array10_input);
1508 let array10 = array10_builder.finish();
1509
1510 let array11 = BooleanArray::from(vec![false, false, true]);
1511
1512 let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
1513 let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
1514 let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
1515
1516 let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
1517
1518 RecordBatch::try_new(
1520 Arc::new(schema.clone()),
1521 vec![
1522 Arc::new(array0),
1523 Arc::new(array1),
1524 Arc::new(array2),
1525 Arc::new(array3),
1526 Arc::new(array4),
1527 Arc::new(array5),
1528 Arc::new(array6),
1529 Arc::new(array7),
1530 Arc::new(array8),
1531 Arc::new(array9),
1532 Arc::new(array10),
1533 Arc::new(array11),
1534 Arc::new(array12),
1535 Arc::new(array13),
1536 ],
1537 )
1538 .unwrap()
1539 }
1540
1541 #[test]
1542 fn test_projection_array_values() {
1543 let schema = create_test_projection_schema();
1545
1546 let batch = create_test_projection_batch_data(&schema);
1548
1549 let mut buf = Vec::new();
1551 {
1552 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
1553 writer.write(&batch).unwrap();
1554 writer.finish().unwrap();
1555 }
1556
1557 for index in 0..12 {
1559 let projection = vec![index];
1560 let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
1561 let read_batch = reader.unwrap().next().unwrap().unwrap();
1562 let projected_column = read_batch.column(0);
1563 let expected_column = batch.column(index);
1564
1565 assert_eq!(projected_column.as_ref(), expected_column.as_ref());
1567 }
1568
1569 {
1570 let reader =
1572 FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
1573 let read_batch = reader.unwrap().next().unwrap().unwrap();
1574 let expected_batch = batch.project(&[3, 2, 1]).unwrap();
1575 assert_eq!(read_batch, expected_batch);
1576 }
1577 }
1578
1579 #[test]
1580 fn test_arrow_single_float_row() {
1581 let schema = Schema::new(vec![
1582 Field::new("a", DataType::Float32, false),
1583 Field::new("b", DataType::Float32, false),
1584 Field::new("c", DataType::Int32, false),
1585 Field::new("d", DataType::Int32, false),
1586 ]);
1587 let arrays = vec![
1588 Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
1589 Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
1590 Arc::new(Int32Array::from(vec![2])) as ArrayRef,
1591 Arc::new(Int32Array::from(vec![1])) as ArrayRef,
1592 ];
1593 let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
1594 let mut file = tempfile::tempfile().unwrap();
1596 let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
1597 stream_writer.write(&batch).unwrap();
1598 stream_writer.finish().unwrap();
1599
1600 drop(stream_writer);
1601
1602 file.rewind().unwrap();
1603
1604 let reader = StreamReader::try_new(&mut file, None).unwrap();
1606
1607 reader.for_each(|batch| {
1608 let batch = batch.unwrap();
1609 assert!(
1610 batch
1611 .column(0)
1612 .as_any()
1613 .downcast_ref::<Float32Array>()
1614 .unwrap()
1615 .value(0)
1616 != 0.0
1617 );
1618 assert!(
1619 batch
1620 .column(1)
1621 .as_any()
1622 .downcast_ref::<Float32Array>()
1623 .unwrap()
1624 .value(0)
1625 != 0.0
1626 );
1627 });
1628
1629 file.rewind().unwrap();
1630
1631 let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
1633
1634 reader.for_each(|batch| {
1635 let batch = batch.unwrap();
1636 assert_eq!(batch.schema().fields().len(), 2);
1637 assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
1638 assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
1639 });
1640 }
1641
1642 fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
1643 let mut buf = Vec::new();
1644 let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
1645 writer.write(rb).unwrap();
1646 writer.finish().unwrap();
1647 drop(writer);
1648
1649 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
1650 reader.next().unwrap().unwrap()
1651 }
1652
1653 fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
1654 let mut buf = Vec::new();
1655 let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
1656 writer.write(rb).unwrap();
1657 writer.finish().unwrap();
1658 drop(writer);
1659
1660 let mut reader =
1661 crate::reader::StreamReader::try_new(std::io::Cursor::new(buf), None).unwrap();
1662 reader.next().unwrap().unwrap()
1663 }
1664
1665 #[test]
1666 fn test_roundtrip_with_custom_metadata() {
1667 let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
1668 let mut buf = Vec::new();
1669 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
1670 let mut test_metadata = HashMap::new();
1671 test_metadata.insert("abc".to_string(), "abc".to_string());
1672 test_metadata.insert("def".to_string(), "def".to_string());
1673 for (k, v) in &test_metadata {
1674 writer.write_metadata(k, v);
1675 }
1676 writer.finish().unwrap();
1677 drop(writer);
1678
1679 let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
1680 assert_eq!(reader.custom_metadata(), &test_metadata);
1681 }
1682
1683 #[test]
1684 fn test_roundtrip_nested_dict() {
1685 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
1686
1687 let array = Arc::new(inner) as ArrayRef;
1688
1689 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
1690
1691 let s = StructArray::from(vec![(dctfield, array)]);
1692 let struct_array = Arc::new(s) as ArrayRef;
1693
1694 let schema = Arc::new(Schema::new(vec![Field::new(
1695 "struct",
1696 struct_array.data_type().clone(),
1697 false,
1698 )]));
1699
1700 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
1701
1702 assert_eq!(batch, roundtrip_ipc(&batch));
1703 }
1704
1705 #[test]
1706 fn test_roundtrip_nested_dict_no_preserve_dict_id() {
1707 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
1708
1709 let array = Arc::new(inner) as ArrayRef;
1710
1711 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
1712
1713 let s = StructArray::from(vec![(dctfield, array)]);
1714 let struct_array = Arc::new(s) as ArrayRef;
1715
1716 let schema = Arc::new(Schema::new(vec![Field::new(
1717 "struct",
1718 struct_array.data_type().clone(),
1719 false,
1720 )]));
1721
1722 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
1723
1724 let mut buf = Vec::new();
1725 let mut writer = crate::writer::FileWriter::try_new_with_options(
1726 &mut buf,
1727 batch.schema_ref(),
1728 IpcWriteOptions::default().with_preserve_dict_id(false),
1729 )
1730 .unwrap();
1731 writer.write(&batch).unwrap();
1732 writer.finish().unwrap();
1733 drop(writer);
1734
1735 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
1736
1737 assert_eq!(batch, reader.next().unwrap().unwrap());
1738 }
1739
1740 fn check_union_with_builder(mut builder: UnionBuilder) {
1741 builder.append::<Int32Type>("a", 1).unwrap();
1742 builder.append_null::<Int32Type>("a").unwrap();
1743 builder.append::<Float64Type>("c", 3.0).unwrap();
1744 builder.append::<Int32Type>("a", 4).unwrap();
1745 builder.append::<Int64Type>("d", 11).unwrap();
1746 let union = builder.build().unwrap();
1747
1748 let schema = Arc::new(Schema::new(vec![Field::new(
1749 "union",
1750 union.data_type().clone(),
1751 false,
1752 )]));
1753
1754 let union_array = Arc::new(union) as ArrayRef;
1755
1756 let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
1757 let rb2 = roundtrip_ipc(&rb);
1758 assert_eq!(rb.schema(), rb2.schema());
1761 assert_eq!(rb.num_columns(), rb2.num_columns());
1762 assert_eq!(rb.num_rows(), rb2.num_rows());
1763 let union1 = rb.column(0);
1764 let union2 = rb2.column(0);
1765
1766 assert_eq!(union1, union2);
1767 }
1768
1769 #[test]
1770 fn test_roundtrip_dense_union() {
1771 check_union_with_builder(UnionBuilder::new_dense());
1772 }
1773
1774 #[test]
1775 fn test_roundtrip_sparse_union() {
1776 check_union_with_builder(UnionBuilder::new_sparse());
1777 }
1778
1779 #[test]
1780 fn test_roundtrip_struct_empty_fields() {
1781 let nulls = NullBuffer::from(&[true, true, false][..]);
1782 let rb = RecordBatch::try_from_iter([(
1783 "",
1784 Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
1785 )])
1786 .unwrap();
1787 let rb2 = roundtrip_ipc(&rb);
1788 assert_eq!(rb, rb2);
1789 }
1790
1791 #[test]
1792 fn test_roundtrip_stream_run_array_sliced() {
1793 let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
1794 .into_iter()
1795 .collect();
1796 let run_array_1_sliced = run_array_1.slice(2, 5);
1797
1798 let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
1799 let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1800 run_array_2_builder.extend(run_array_2_inupt);
1801 let run_array_2 = run_array_2_builder.finish();
1802
1803 let schema = Arc::new(Schema::new(vec![
1804 Field::new(
1805 "run_array_1_sliced",
1806 run_array_1_sliced.data_type().clone(),
1807 false,
1808 ),
1809 Field::new("run_array_2", run_array_2.data_type().clone(), false),
1810 ]));
1811 let input_batch = RecordBatch::try_new(
1812 schema,
1813 vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
1814 )
1815 .unwrap();
1816 let output_batch = roundtrip_ipc_stream(&input_batch);
1817
1818 assert_eq!(input_batch.column(1), output_batch.column(1));
1822
1823 let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
1824 assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
1825 }
1826
1827 #[test]
1828 fn test_roundtrip_stream_nested_dict() {
1829 let xs = vec!["AA", "BB", "AA", "CC", "BB"];
1830 let dict = Arc::new(
1831 xs.clone()
1832 .into_iter()
1833 .collect::<DictionaryArray<Int8Type>>(),
1834 );
1835 let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
1836 let struct_array = StructArray::from(vec![
1837 (
1838 Arc::new(Field::new("f2.1", DataType::Utf8, false)),
1839 string_array,
1840 ),
1841 (
1842 Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
1843 dict.clone() as ArrayRef,
1844 ),
1845 ]);
1846 let schema = Arc::new(Schema::new(vec![
1847 Field::new("f1_string", DataType::Utf8, false),
1848 Field::new("f2_struct", struct_array.data_type().clone(), false),
1849 ]));
1850 let input_batch = RecordBatch::try_new(
1851 schema,
1852 vec![
1853 Arc::new(StringArray::from(xs.clone())),
1854 Arc::new(struct_array),
1855 ],
1856 )
1857 .unwrap();
1858 let output_batch = roundtrip_ipc_stream(&input_batch);
1859 assert_eq!(input_batch, output_batch);
1860 }
1861
1862 #[test]
1863 fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
1864 let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
1865 let values = Arc::new(values) as ArrayRef;
1866 let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
1867 let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
1868
1869 let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
1870 let key_dict_array = DictionaryArray::new(key_dict_keys, values);
1871
1872 let keys_field = Arc::new(Field::new_dict(
1873 "keys",
1874 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1875 true, 1,
1877 false,
1878 ));
1879 let values_field = Arc::new(Field::new_dict(
1880 "values",
1881 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1882 true,
1883 2,
1884 false,
1885 ));
1886 let entry_struct = StructArray::from(vec![
1887 (keys_field, make_array(key_dict_array.into_data())),
1888 (values_field, make_array(value_dict_array.into_data())),
1889 ]);
1890 let map_data_type = DataType::Map(
1891 Arc::new(Field::new(
1892 "entries",
1893 entry_struct.data_type().clone(),
1894 false,
1895 )),
1896 false,
1897 );
1898
1899 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
1900 let map_data = ArrayData::builder(map_data_type)
1901 .len(3)
1902 .add_buffer(entry_offsets)
1903 .add_child_data(entry_struct.into_data())
1904 .build()
1905 .unwrap();
1906 let map_array = MapArray::from(map_data);
1907
1908 let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
1909 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
1910
1911 let schema = Arc::new(Schema::new(vec![Field::new(
1912 "f1",
1913 dict_dict_array.data_type().clone(),
1914 false,
1915 )]));
1916 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
1917 let output_batch = roundtrip_ipc_stream(&input_batch);
1918 assert_eq!(input_batch, output_batch);
1919 }
1920
1921 fn test_roundtrip_stream_dict_of_list_of_dict_impl<
1922 OffsetSize: OffsetSizeTrait,
1923 U: ArrowNativeType,
1924 >(
1925 list_data_type: DataType,
1926 offsets: &[U; 5],
1927 ) {
1928 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
1929 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
1930 let dict_array = DictionaryArray::new(keys, Arc::new(values));
1931 let dict_data = dict_array.to_data();
1932
1933 let value_offsets = Buffer::from_slice_ref(offsets);
1934
1935 let list_data = ArrayData::builder(list_data_type)
1936 .len(4)
1937 .add_buffer(value_offsets)
1938 .add_child_data(dict_data)
1939 .build()
1940 .unwrap();
1941 let list_array = GenericListArray::<OffsetSize>::from(list_data);
1942
1943 let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
1944 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
1945
1946 let schema = Arc::new(Schema::new(vec![Field::new(
1947 "f1",
1948 dict_dict_array.data_type().clone(),
1949 false,
1950 )]));
1951 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
1952 let output_batch = roundtrip_ipc_stream(&input_batch);
1953 assert_eq!(input_batch, output_batch);
1954 }
1955
1956 #[test]
1957 fn test_roundtrip_stream_dict_of_list_of_dict() {
1958 let list_data_type = DataType::List(Arc::new(Field::new_dict(
1960 "item",
1961 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1962 true,
1963 1,
1964 false,
1965 )));
1966 let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
1967 test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
1968
1969 let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
1971 "item",
1972 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1973 true,
1974 1,
1975 false,
1976 )));
1977 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
1978 test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
1979 }
1980
1981 #[test]
1982 fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
1983 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
1984 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
1985 let dict_array = DictionaryArray::new(keys, Arc::new(values));
1986 let dict_data = dict_array.into_data();
1987
1988 let list_data_type = DataType::FixedSizeList(
1989 Arc::new(Field::new_dict(
1990 "item",
1991 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1992 true,
1993 1,
1994 false,
1995 )),
1996 3,
1997 );
1998 let list_data = ArrayData::builder(list_data_type)
1999 .len(3)
2000 .add_child_data(dict_data)
2001 .build()
2002 .unwrap();
2003 let list_array = FixedSizeListArray::from(list_data);
2004
2005 let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2006 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2007
2008 let schema = Arc::new(Schema::new(vec![Field::new(
2009 "f1",
2010 dict_dict_array.data_type().clone(),
2011 false,
2012 )]));
2013 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2014 let output_batch = roundtrip_ipc_stream(&input_batch);
2015 assert_eq!(input_batch, output_batch);
2016 }
2017
2018 const LONG_TEST_STRING: &str =
2019 "This is a long string to make sure binary view array handles it";
2020
2021 #[test]
2022 fn test_roundtrip_view_types() {
2023 let schema = Schema::new(vec![
2024 Field::new("field_1", DataType::BinaryView, true),
2025 Field::new("field_2", DataType::Utf8, true),
2026 Field::new("field_3", DataType::Utf8View, true),
2027 ]);
2028 let bin_values: Vec<Option<&[u8]>> = vec![
2029 Some(b"foo"),
2030 None,
2031 Some(b"bar"),
2032 Some(LONG_TEST_STRING.as_bytes()),
2033 ];
2034 let utf8_values: Vec<Option<&str>> =
2035 vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2036 let bin_view_array = BinaryViewArray::from_iter(bin_values);
2037 let utf8_array = StringArray::from_iter(utf8_values.iter());
2038 let utf8_view_array = StringViewArray::from_iter(utf8_values);
2039 let record_batch = RecordBatch::try_new(
2040 Arc::new(schema.clone()),
2041 vec![
2042 Arc::new(bin_view_array),
2043 Arc::new(utf8_array),
2044 Arc::new(utf8_view_array),
2045 ],
2046 )
2047 .unwrap();
2048
2049 assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2050 assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2051
2052 let sliced_batch = record_batch.slice(1, 2);
2053 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2054 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2055 }
2056
2057 #[test]
2058 fn test_roundtrip_view_types_nested_dict() {
2059 let bin_values: Vec<Option<&[u8]>> = vec![
2060 Some(b"foo"),
2061 None,
2062 Some(b"bar"),
2063 Some(LONG_TEST_STRING.as_bytes()),
2064 Some(b"field"),
2065 ];
2066 let utf8_values: Vec<Option<&str>> = vec![
2067 Some("foo"),
2068 None,
2069 Some("bar"),
2070 Some(LONG_TEST_STRING),
2071 Some("field"),
2072 ];
2073 let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2074 let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2075
2076 let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2077 let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2078 let keys_field = Arc::new(Field::new_dict(
2079 "keys",
2080 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2081 true,
2082 1,
2083 false,
2084 ));
2085
2086 let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2087 let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2088 let values_field = Arc::new(Field::new_dict(
2089 "values",
2090 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2091 true,
2092 2,
2093 false,
2094 ));
2095 let entry_struct = StructArray::from(vec![
2096 (keys_field, make_array(key_dict_array.into_data())),
2097 (values_field, make_array(value_dict_array.into_data())),
2098 ]);
2099
2100 let map_data_type = DataType::Map(
2101 Arc::new(Field::new(
2102 "entries",
2103 entry_struct.data_type().clone(),
2104 false,
2105 )),
2106 false,
2107 );
2108 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2109 let map_data = ArrayData::builder(map_data_type)
2110 .len(3)
2111 .add_buffer(entry_offsets)
2112 .add_child_data(entry_struct.into_data())
2113 .build()
2114 .unwrap();
2115 let map_array = MapArray::from(map_data);
2116
2117 let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2118 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2119 let schema = Arc::new(Schema::new(vec![Field::new(
2120 "f1",
2121 dict_dict_array.data_type().clone(),
2122 false,
2123 )]));
2124 let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2125 assert_eq!(batch, roundtrip_ipc(&batch));
2126 assert_eq!(batch, roundtrip_ipc_stream(&batch));
2127
2128 let sliced_batch = batch.slice(1, 2);
2129 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2130 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2131 }
2132
2133 #[test]
2134 fn test_no_columns_batch() {
2135 let schema = Arc::new(Schema::empty());
2136 let options = RecordBatchOptions::new()
2137 .with_match_field_names(true)
2138 .with_row_count(Some(10));
2139 let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2140 let output_batch = roundtrip_ipc_stream(&input_batch);
2141 assert_eq!(input_batch, output_batch);
2142 }
2143
2144 #[test]
2145 fn test_unaligned() {
2146 let batch = RecordBatch::try_from_iter(vec![(
2147 "i32",
2148 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2149 )])
2150 .unwrap();
2151
2152 let gen = IpcDataGenerator {};
2153 let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2154 let (_, encoded) = gen
2155 .encoded_batch(&batch, &mut dict_tracker, &Default::default())
2156 .unwrap();
2157
2158 let message = root_as_message(&encoded.ipc_message).unwrap();
2159
2160 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2162 buffer.push(0_u8);
2163 buffer.extend_from_slice(&encoded.arrow_data);
2164 let b = Buffer::from(buffer).slice(1);
2165 assert_ne!(b.as_ptr().align_offset(8), 0);
2166
2167 let ipc_batch = message.header_as_record_batch().unwrap();
2168 let roundtrip = read_record_batch_impl(
2169 &b,
2170 ipc_batch,
2171 batch.schema(),
2172 &Default::default(),
2173 None,
2174 &message.version(),
2175 false,
2176 )
2177 .unwrap();
2178 assert_eq!(batch, roundtrip);
2179 }
2180
2181 #[test]
2182 fn test_unaligned_throws_error_with_require_alignment() {
2183 let batch = RecordBatch::try_from_iter(vec![(
2184 "i32",
2185 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2186 )])
2187 .unwrap();
2188
2189 let gen = IpcDataGenerator {};
2190 let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2191 let (_, encoded) = gen
2192 .encoded_batch(&batch, &mut dict_tracker, &Default::default())
2193 .unwrap();
2194
2195 let message = root_as_message(&encoded.ipc_message).unwrap();
2196
2197 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2199 buffer.push(0_u8);
2200 buffer.extend_from_slice(&encoded.arrow_data);
2201 let b = Buffer::from(buffer).slice(1);
2202 assert_ne!(b.as_ptr().align_offset(8), 0);
2203
2204 let ipc_batch = message.header_as_record_batch().unwrap();
2205 let result = read_record_batch_impl(
2206 &b,
2207 ipc_batch,
2208 batch.schema(),
2209 &Default::default(),
2210 None,
2211 &message.version(),
2212 true,
2213 );
2214
2215 let error = result.unwrap_err();
2216 assert_eq!(
2217 error.to_string(),
2218 "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
2219 offset from expected alignment of 4 by 1"
2220 );
2221 }
2222
2223 #[test]
2224 fn test_file_with_massive_column_count() {
2225 let limit = 600_000;
2227
2228 let fields = (0..limit)
2229 .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
2230 .collect::<Vec<_>>();
2231 let schema = Arc::new(Schema::new(fields));
2232 let batch = RecordBatch::new_empty(schema);
2233
2234 let mut buf = Vec::new();
2235 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2236 writer.write(&batch).unwrap();
2237 writer.finish().unwrap();
2238 drop(writer);
2239
2240 let mut reader = FileReaderBuilder::new()
2241 .with_max_footer_fb_tables(1_500_000)
2242 .build(std::io::Cursor::new(buf))
2243 .unwrap();
2244 let roundtrip_batch = reader.next().unwrap().unwrap();
2245
2246 assert_eq!(batch, roundtrip_batch);
2247 }
2248
2249 #[test]
2250 fn test_file_with_deeply_nested_columns() {
2251 let limit = 61;
2253
2254 let fields = (0..limit).fold(
2255 vec![Field::new("leaf", DataType::Boolean, false)],
2256 |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
2257 );
2258 let schema = Arc::new(Schema::new(fields));
2259 let batch = RecordBatch::new_empty(schema);
2260
2261 let mut buf = Vec::new();
2262 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2263 writer.write(&batch).unwrap();
2264 writer.finish().unwrap();
2265 drop(writer);
2266
2267 let mut reader = FileReaderBuilder::new()
2268 .with_max_footer_fb_depth(65)
2269 .build(std::io::Cursor::new(buf))
2270 .unwrap();
2271 let roundtrip_batch = reader.next().unwrap().unwrap();
2272
2273 assert_eq!(batch, roundtrip_batch);
2274 }
2275
2276 #[test]
2277 fn test_invalid_struct_array_ipc_read_errors() {
2278 let a_field = Field::new("a", DataType::Int32, false);
2279 let b_field = Field::new("b", DataType::Int32, false);
2280
2281 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2282 "s",
2283 vec![a_field.clone(), b_field.clone()],
2284 false,
2285 )]));
2286
2287 let a_array_data = ArrayData::builder(a_field.data_type().clone())
2288 .len(4)
2289 .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2290 .build()
2291 .unwrap();
2292 let b_array_data = ArrayData::builder(b_field.data_type().clone())
2293 .len(3)
2294 .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
2295 .build()
2296 .unwrap();
2297
2298 let struct_data_type = schema.field(0).data_type();
2299
2300 let invalid_struct_arr = unsafe {
2301 make_array(
2302 ArrayData::builder(struct_data_type.clone())
2303 .len(4)
2304 .add_child_data(a_array_data)
2305 .add_child_data(b_array_data)
2306 .build_unchecked(),
2307 )
2308 };
2309
2310 let batch = RecordBatch::try_new(schema.clone(), vec![invalid_struct_arr]).unwrap();
2311
2312 let mut buf = Vec::new();
2313 let mut writer = crate::writer::FileWriter::try_new(&mut buf, schema.as_ref()).unwrap();
2314 writer.write(&batch).unwrap();
2315 writer.finish().unwrap();
2316
2317 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2318 let err = reader.next().unwrap().unwrap_err();
2319 assert!(matches!(err, ArrowError::InvalidArgumentError(_)));
2320 }
2321
2322 #[test]
2323 fn test_same_dict_id_without_preserve() {
2324 let batch = RecordBatch::try_new(
2325 Arc::new(Schema::new(
2326 ["a", "b"]
2327 .iter()
2328 .map(|name| {
2329 Field::new_dict(
2330 name.to_string(),
2331 DataType::Dictionary(
2332 Box::new(DataType::Int32),
2333 Box::new(DataType::Utf8),
2334 ),
2335 true,
2336 0,
2337 false,
2338 )
2339 })
2340 .collect::<Vec<Field>>(),
2341 )),
2342 vec![
2343 Arc::new(
2344 vec![Some("c"), Some("d")]
2345 .into_iter()
2346 .collect::<DictionaryArray<Int32Type>>(),
2347 ) as ArrayRef,
2348 Arc::new(
2349 vec![Some("e"), Some("f")]
2350 .into_iter()
2351 .collect::<DictionaryArray<Int32Type>>(),
2352 ) as ArrayRef,
2353 ],
2354 )
2355 .expect("Failed to create RecordBatch");
2356
2357 let mut buf = vec![];
2359 {
2360 let mut writer = crate::writer::StreamWriter::try_new_with_options(
2361 &mut buf,
2362 batch.schema().as_ref(),
2363 crate::writer::IpcWriteOptions::default().with_preserve_dict_id(false),
2364 )
2365 .expect("Failed to create StreamWriter");
2366 writer.write(&batch).expect("Failed to write RecordBatch");
2367 writer.finish().expect("Failed to finish StreamWriter");
2368 }
2369
2370 StreamReader::try_new(std::io::Cursor::new(buf), None)
2371 .expect("Failed to create StreamReader")
2372 .for_each(|decoded_batch| {
2373 assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
2374 });
2375 }
2376}