arrow_ipc/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Readers
19//!
20//! The `FileReader` and `StreamReader` have similar interfaces,
21//! however the `FileReader` expects a reader that supports `Seek`ing
22
23mod stream;
24
25pub use stream::*;
26
27use flatbuffers::{VectorIter, VerifierOptions};
28use std::collections::{HashMap, VecDeque};
29use std::fmt;
30use std::io::{BufReader, Read, Seek, SeekFrom};
31use std::sync::Arc;
32
33use arrow_array::*;
34use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, ScalarBuffer};
35use arrow_data::ArrayData;
36use arrow_schema::*;
37
38use crate::compression::CompressionCodec;
39use crate::{Block, FieldNode, Message, MetadataVersion, CONTINUATION_MARKER};
40use DataType::*;
41
42/// Read a buffer based on offset and length
43/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
44/// Each constituent buffer is first compressed with the indicated
45/// compressor, and then written with the uncompressed length in the first 8
46/// bytes as a 64-bit little-endian signed integer followed by the compressed
47/// buffer bytes (and then padding as required by the protocol). The
48/// uncompressed length may be set to -1 to indicate that the data that
49/// follows is not compressed, which can be useful for cases where
50/// compression does not yield appreciable savings.
51fn read_buffer(
52    buf: &crate::Buffer,
53    a_data: &Buffer,
54    compression_codec: Option<CompressionCodec>,
55) -> Result<Buffer, ArrowError> {
56    let start_offset = buf.offset() as usize;
57    let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
58    // corner case: empty buffer
59    match (buf_data.is_empty(), compression_codec) {
60        (true, _) | (_, None) => Ok(buf_data),
61        (false, Some(decompressor)) => decompressor.decompress_to_buffer(&buf_data),
62    }
63}
64
65/// Coordinates reading arrays based on data types.
66///
67/// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView)
68/// When encounter such types, we pop from the front of the queue to get the number of buffers to read.
69///
70/// Notes:
71/// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls
72/// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size.
73///   We thus:
74///     - check if the bit width of non-64-bit numbers is 64, and
75///     - read the buffer as 64-bit (signed integer or float), and
76///     - cast the 64-bit array to the appropriate data type
77fn create_array(
78    reader: &mut ArrayReader,
79    field: &Field,
80    variadic_counts: &mut VecDeque<i64>,
81    require_alignment: bool,
82) -> Result<ArrayRef, ArrowError> {
83    let data_type = field.data_type();
84    match data_type {
85        Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
86            reader.next_node(field)?,
87            data_type,
88            &[
89                reader.next_buffer()?,
90                reader.next_buffer()?,
91                reader.next_buffer()?,
92            ],
93            require_alignment,
94        ),
95        BinaryView | Utf8View => {
96            let count = variadic_counts
97                .pop_front()
98                .ok_or(ArrowError::IpcError(format!(
99                    "Missing variadic count for {data_type} column"
100                )))?;
101            let count = count + 2; // view and null buffer.
102            let buffers = (0..count)
103                .map(|_| reader.next_buffer())
104                .collect::<Result<Vec<_>, _>>()?;
105            create_primitive_array(
106                reader.next_node(field)?,
107                data_type,
108                &buffers,
109                require_alignment,
110            )
111        }
112        FixedSizeBinary(_) => create_primitive_array(
113            reader.next_node(field)?,
114            data_type,
115            &[reader.next_buffer()?, reader.next_buffer()?],
116            require_alignment,
117        ),
118        List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
119            let list_node = reader.next_node(field)?;
120            let list_buffers = [reader.next_buffer()?, reader.next_buffer()?];
121            let values = create_array(reader, list_field, variadic_counts, require_alignment)?;
122            create_list_array(
123                list_node,
124                data_type,
125                &list_buffers,
126                values,
127                require_alignment,
128            )
129        }
130        FixedSizeList(ref list_field, _) => {
131            let list_node = reader.next_node(field)?;
132            let list_buffers = [reader.next_buffer()?];
133            let values = create_array(reader, list_field, variadic_counts, require_alignment)?;
134            create_list_array(
135                list_node,
136                data_type,
137                &list_buffers,
138                values,
139                require_alignment,
140            )
141        }
142        Struct(struct_fields) => {
143            let struct_node = reader.next_node(field)?;
144            let null_buffer = reader.next_buffer()?;
145
146            // read the arrays for each field
147            let mut struct_arrays = vec![];
148            // TODO investigate whether just knowing the number of buffers could
149            // still work
150            for struct_field in struct_fields {
151                let child = create_array(reader, struct_field, variadic_counts, require_alignment)?;
152                struct_arrays.push(child);
153            }
154            let null_count = struct_node.null_count() as usize;
155            let struct_array = if struct_arrays.is_empty() {
156                // `StructArray::from` can't infer the correct row count
157                // if we have zero fields
158                let len = struct_node.length() as usize;
159                StructArray::new_empty_fields(
160                    len,
161                    (null_count > 0).then(|| BooleanBuffer::new(null_buffer, 0, len).into()),
162                )
163            } else if null_count > 0 {
164                // create struct array from fields, arrays and null data
165                let len = struct_node.length() as usize;
166                let nulls = BooleanBuffer::new(null_buffer, 0, len).into();
167                StructArray::try_new(struct_fields.clone(), struct_arrays, Some(nulls))?
168            } else {
169                StructArray::try_new(struct_fields.clone(), struct_arrays, None)?
170            };
171            Ok(Arc::new(struct_array))
172        }
173        RunEndEncoded(run_ends_field, values_field) => {
174            let run_node = reader.next_node(field)?;
175            let run_ends =
176                create_array(reader, run_ends_field, variadic_counts, require_alignment)?;
177            let values = create_array(reader, values_field, variadic_counts, require_alignment)?;
178
179            let run_array_length = run_node.length() as usize;
180            let builder = ArrayData::builder(data_type.clone())
181                .len(run_array_length)
182                .offset(0)
183                .add_child_data(run_ends.into_data())
184                .add_child_data(values.into_data());
185
186            let array_data = if require_alignment {
187                builder.build()?
188            } else {
189                builder.build_aligned()?
190            };
191
192            Ok(make_array(array_data))
193        }
194        // Create dictionary array from RecordBatch
195        Dictionary(_, _) => {
196            let index_node = reader.next_node(field)?;
197            let index_buffers = [reader.next_buffer()?, reader.next_buffer()?];
198
199            let dict_id = field.dict_id().ok_or_else(|| {
200                ArrowError::ParseError(format!("Field {field} does not have dict id"))
201            })?;
202
203            let value_array = reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
204                ArrowError::ParseError(format!(
205                    "Cannot find a dictionary batch with dict id: {dict_id}"
206                ))
207            })?;
208
209            create_dictionary_array(
210                index_node,
211                data_type,
212                &index_buffers,
213                value_array.clone(),
214                require_alignment,
215            )
216        }
217        Union(fields, mode) => {
218            let union_node = reader.next_node(field)?;
219            let len = union_node.length() as usize;
220
221            // In V4, union types has validity bitmap
222            // In V5 and later, union types have no validity bitmap
223            if reader.version < MetadataVersion::V5 {
224                reader.next_buffer()?;
225            }
226
227            let type_ids: ScalarBuffer<i8> = reader.next_buffer()?.slice_with_length(0, len).into();
228
229            let value_offsets = match mode {
230                UnionMode::Dense => {
231                    let offsets: ScalarBuffer<i32> =
232                        reader.next_buffer()?.slice_with_length(0, len * 4).into();
233                    Some(offsets)
234                }
235                UnionMode::Sparse => None,
236            };
237
238            let mut children = Vec::with_capacity(fields.len());
239
240            for (_id, field) in fields.iter() {
241                let child = create_array(reader, field, variadic_counts, require_alignment)?;
242                children.push(child);
243            }
244
245            let array = UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?;
246            Ok(Arc::new(array))
247        }
248        Null => {
249            let node = reader.next_node(field)?;
250            let length = node.length();
251            let null_count = node.null_count();
252
253            if length != null_count {
254                return Err(ArrowError::SchemaError(format!(
255                    "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
256                )));
257            }
258
259            let builder = ArrayData::builder(data_type.clone())
260                .len(length as usize)
261                .offset(0);
262
263            let array_data = if require_alignment {
264                builder.build()?
265            } else {
266                builder.build_aligned()?
267            };
268
269            // no buffer increases
270            Ok(Arc::new(NullArray::from(array_data)))
271        }
272        _ => create_primitive_array(
273            reader.next_node(field)?,
274            data_type,
275            &[reader.next_buffer()?, reader.next_buffer()?],
276            require_alignment,
277        ),
278    }
279}
280
281/// Reads the correct number of buffers based on data type and null_count, and creates a
282/// primitive array ref
283fn create_primitive_array(
284    field_node: &FieldNode,
285    data_type: &DataType,
286    buffers: &[Buffer],
287    require_alignment: bool,
288) -> Result<ArrayRef, ArrowError> {
289    let length = field_node.length() as usize;
290    let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
291    let builder = match data_type {
292        Utf8 | Binary | LargeBinary | LargeUtf8 => {
293            // read 3 buffers: null buffer (optional), offsets buffer and data buffer
294            ArrayData::builder(data_type.clone())
295                .len(length)
296                .buffers(buffers[1..3].to_vec())
297                .null_bit_buffer(null_buffer)
298        }
299        BinaryView | Utf8View => ArrayData::builder(data_type.clone())
300            .len(length)
301            .buffers(buffers[1..].to_vec())
302            .null_bit_buffer(null_buffer),
303        _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
304            // read 2 buffers: null buffer (optional) and data buffer
305            ArrayData::builder(data_type.clone())
306                .len(length)
307                .add_buffer(buffers[1].clone())
308                .null_bit_buffer(null_buffer)
309        }
310        t => unreachable!("Data type {:?} either unsupported or not primitive", t),
311    };
312
313    let array_data = if require_alignment {
314        builder.build()?
315    } else {
316        builder.build_aligned()?
317    };
318
319    Ok(make_array(array_data))
320}
321
322/// Reads the correct number of buffers based on list type and null_count, and creates a
323/// list array ref
324fn create_list_array(
325    field_node: &FieldNode,
326    data_type: &DataType,
327    buffers: &[Buffer],
328    child_array: ArrayRef,
329    require_alignment: bool,
330) -> Result<ArrayRef, ArrowError> {
331    let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
332    let length = field_node.length() as usize;
333    let child_data = child_array.into_data();
334    let builder = match data_type {
335        List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
336            .len(length)
337            .add_buffer(buffers[1].clone())
338            .add_child_data(child_data)
339            .null_bit_buffer(null_buffer),
340
341        FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
342            .len(length)
343            .add_child_data(child_data)
344            .null_bit_buffer(null_buffer),
345
346        _ => unreachable!("Cannot create list or map array from {:?}", data_type),
347    };
348
349    let array_data = if require_alignment {
350        builder.build()?
351    } else {
352        builder.build_aligned()?
353    };
354
355    Ok(make_array(array_data))
356}
357
358/// Reads the correct number of buffers based on list type and null_count, and creates a
359/// list array ref
360fn create_dictionary_array(
361    field_node: &FieldNode,
362    data_type: &DataType,
363    buffers: &[Buffer],
364    value_array: ArrayRef,
365    require_alignment: bool,
366) -> Result<ArrayRef, ArrowError> {
367    if let Dictionary(_, _) = *data_type {
368        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
369        let builder = ArrayData::builder(data_type.clone())
370            .len(field_node.length() as usize)
371            .add_buffer(buffers[1].clone())
372            .add_child_data(value_array.into_data())
373            .null_bit_buffer(null_buffer);
374
375        let array_data = if require_alignment {
376            builder.build()?
377        } else {
378            builder.build_aligned()?
379        };
380
381        Ok(make_array(array_data))
382    } else {
383        unreachable!("Cannot create dictionary array from {:?}", data_type)
384    }
385}
386
387/// State for decoding arrays from an encoded [`RecordBatch`]
388struct ArrayReader<'a> {
389    /// Decoded dictionaries indexed by dictionary id
390    dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
391    /// Optional compression codec
392    compression: Option<CompressionCodec>,
393    /// The format version
394    version: MetadataVersion,
395    /// The raw data buffer
396    data: &'a Buffer,
397    /// The fields comprising this array
398    nodes: VectorIter<'a, FieldNode>,
399    /// The buffers comprising this array
400    buffers: VectorIter<'a, crate::Buffer>,
401}
402
403impl<'a> ArrayReader<'a> {
404    fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
405        read_buffer(self.buffers.next().unwrap(), self.data, self.compression)
406    }
407
408    fn skip_buffer(&mut self) {
409        self.buffers.next().unwrap();
410    }
411
412    fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
413        self.nodes.next().ok_or_else(|| {
414            ArrowError::SchemaError(format!(
415                "Invalid data for schema. {} refers to node not found in schema",
416                field
417            ))
418        })
419    }
420
421    fn skip_field(
422        &mut self,
423        field: &Field,
424        variadic_count: &mut VecDeque<i64>,
425    ) -> Result<(), ArrowError> {
426        self.next_node(field)?;
427
428        match field.data_type() {
429            Utf8 | Binary | LargeBinary | LargeUtf8 => {
430                for _ in 0..3 {
431                    self.skip_buffer()
432                }
433            }
434            Utf8View | BinaryView => {
435                let count = variadic_count
436                    .pop_front()
437                    .ok_or(ArrowError::IpcError(format!(
438                        "Missing variadic count for {} column",
439                        field.data_type()
440                    )))?;
441                let count = count + 2; // view and null buffer.
442                for _i in 0..count {
443                    self.skip_buffer()
444                }
445            }
446            FixedSizeBinary(_) => {
447                self.skip_buffer();
448                self.skip_buffer();
449            }
450            List(list_field) | LargeList(list_field) | Map(list_field, _) => {
451                self.skip_buffer();
452                self.skip_buffer();
453                self.skip_field(list_field, variadic_count)?;
454            }
455            FixedSizeList(list_field, _) => {
456                self.skip_buffer();
457                self.skip_field(list_field, variadic_count)?;
458            }
459            Struct(struct_fields) => {
460                self.skip_buffer();
461
462                // skip for each field
463                for struct_field in struct_fields {
464                    self.skip_field(struct_field, variadic_count)?
465                }
466            }
467            RunEndEncoded(run_ends_field, values_field) => {
468                self.skip_field(run_ends_field, variadic_count)?;
469                self.skip_field(values_field, variadic_count)?;
470            }
471            Dictionary(_, _) => {
472                self.skip_buffer(); // Nulls
473                self.skip_buffer(); // Indices
474            }
475            Union(fields, mode) => {
476                self.skip_buffer(); // Nulls
477
478                match mode {
479                    UnionMode::Dense => self.skip_buffer(),
480                    UnionMode::Sparse => {}
481                };
482
483                for (_, field) in fields.iter() {
484                    self.skip_field(field, variadic_count)?
485                }
486            }
487            Null => {} // No buffer increases
488            _ => {
489                self.skip_buffer();
490                self.skip_buffer();
491            }
492        };
493        Ok(())
494    }
495}
496
497/// Creates a record batch from binary data using the `crate::RecordBatch` indexes and the `Schema`.
498///
499/// If `require_alignment` is true, this function will return an error if any array data in the
500/// input `buf` is not properly aligned.
501/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct [`arrow_data::ArrayData`].
502///
503/// If `require_alignment` is false, this function will automatically allocate a new aligned buffer
504/// and copy over the data if any array data in the input `buf` is not properly aligned.
505/// (Properly aligned array data will remain zero-copy.)
506/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct [`arrow_data::ArrayData`].
507pub fn read_record_batch(
508    buf: &Buffer,
509    batch: crate::RecordBatch,
510    schema: SchemaRef,
511    dictionaries_by_id: &HashMap<i64, ArrayRef>,
512    projection: Option<&[usize]>,
513    metadata: &MetadataVersion,
514) -> Result<RecordBatch, ArrowError> {
515    read_record_batch_impl(
516        buf,
517        batch,
518        schema,
519        dictionaries_by_id,
520        projection,
521        metadata,
522        false,
523    )
524}
525
526/// Read the dictionary from the buffer and provided metadata,
527/// updating the `dictionaries_by_id` with the resulting dictionary
528pub fn read_dictionary(
529    buf: &Buffer,
530    batch: crate::DictionaryBatch,
531    schema: &Schema,
532    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
533    metadata: &MetadataVersion,
534) -> Result<(), ArrowError> {
535    read_dictionary_impl(buf, batch, schema, dictionaries_by_id, metadata, false)
536}
537
538fn read_record_batch_impl(
539    buf: &Buffer,
540    batch: crate::RecordBatch,
541    schema: SchemaRef,
542    dictionaries_by_id: &HashMap<i64, ArrayRef>,
543    projection: Option<&[usize]>,
544    metadata: &MetadataVersion,
545    require_alignment: bool,
546) -> Result<RecordBatch, ArrowError> {
547    let buffers = batch.buffers().ok_or_else(|| {
548        ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
549    })?;
550    let field_nodes = batch.nodes().ok_or_else(|| {
551        ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
552    })?;
553
554    let mut variadic_counts: VecDeque<i64> =
555        batch.variadicBufferCounts().into_iter().flatten().collect();
556
557    let batch_compression = batch.compression();
558    let compression = batch_compression
559        .map(|batch_compression| batch_compression.codec().try_into())
560        .transpose()?;
561
562    let mut reader = ArrayReader {
563        dictionaries_by_id,
564        compression,
565        version: *metadata,
566        data: buf,
567        nodes: field_nodes.iter(),
568        buffers: buffers.iter(),
569    };
570
571    let options = RecordBatchOptions::new().with_row_count(Some(batch.length() as usize));
572
573    if let Some(projection) = projection {
574        let mut arrays = vec![];
575        // project fields
576        for (idx, field) in schema.fields().iter().enumerate() {
577            // Create array for projected field
578            if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
579                let child =
580                    create_array(&mut reader, field, &mut variadic_counts, require_alignment)?;
581                arrays.push((proj_idx, child));
582            } else {
583                reader.skip_field(field, &mut variadic_counts)?;
584            }
585        }
586        assert!(variadic_counts.is_empty());
587        arrays.sort_by_key(|t| t.0);
588        RecordBatch::try_new_with_options(
589            Arc::new(schema.project(projection)?),
590            arrays.into_iter().map(|t| t.1).collect(),
591            &options,
592        )
593    } else {
594        let mut children = vec![];
595        // keep track of index as lists require more than one node
596        for field in schema.fields() {
597            let child = create_array(&mut reader, field, &mut variadic_counts, require_alignment)?;
598            children.push(child);
599        }
600        assert!(variadic_counts.is_empty());
601        RecordBatch::try_new_with_options(schema, children, &options)
602    }
603}
604
605fn read_dictionary_impl(
606    buf: &Buffer,
607    batch: crate::DictionaryBatch,
608    schema: &Schema,
609    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
610    metadata: &MetadataVersion,
611    require_alignment: bool,
612) -> Result<(), ArrowError> {
613    if batch.isDelta() {
614        return Err(ArrowError::InvalidArgumentError(
615            "delta dictionary batches not supported".to_string(),
616        ));
617    }
618
619    let id = batch.id();
620    let fields_using_this_dictionary = schema.fields_with_dict_id(id);
621    let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
622        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
623    })?;
624
625    // As the dictionary batch does not contain the type of the
626    // values array, we need to retrieve this from the schema.
627    // Get an array representing this dictionary's values.
628    let dictionary_values: ArrayRef = match first_field.data_type() {
629        DataType::Dictionary(_, ref value_type) => {
630            // Make a fake schema for the dictionary batch.
631            let value = value_type.as_ref().clone();
632            let schema = Schema::new(vec![Field::new("", value, true)]);
633            // Read a single column
634            let record_batch = read_record_batch_impl(
635                buf,
636                batch.data().unwrap(),
637                Arc::new(schema),
638                dictionaries_by_id,
639                None,
640                metadata,
641                require_alignment,
642            )?;
643            Some(record_batch.column(0).clone())
644        }
645        _ => None,
646    }
647    .ok_or_else(|| {
648        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
649    })?;
650
651    // We don't currently record the isOrdered field. This could be general
652    // attributes of arrays.
653    // Add (possibly multiple) array refs to the dictionaries array.
654    dictionaries_by_id.insert(id, dictionary_values.clone());
655
656    Ok(())
657}
658
659/// Read the data for a given block
660fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
661    reader.seek(SeekFrom::Start(block.offset() as u64))?;
662    let body_len = block.bodyLength().to_usize().unwrap();
663    let metadata_len = block.metaDataLength().to_usize().unwrap();
664    let total_len = body_len.checked_add(metadata_len).unwrap();
665
666    let mut buf = MutableBuffer::from_len_zeroed(total_len);
667    reader.read_exact(&mut buf)?;
668    Ok(buf.into())
669}
670
671/// Parse an encapsulated message
672///
673/// <https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format>
674fn parse_message(buf: &[u8]) -> Result<Message, ArrowError> {
675    let buf = match buf[..4] == CONTINUATION_MARKER {
676        true => &buf[8..],
677        false => &buf[4..],
678    };
679    crate::root_as_message(buf)
680        .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
681}
682
683/// Read the footer length from the last 10 bytes of an Arrow IPC file
684///
685/// Expects a 4 byte footer length followed by `b"ARROW1"`
686pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
687    if buf[4..] != super::ARROW_MAGIC {
688        return Err(ArrowError::ParseError(
689            "Arrow file does not contain correct footer".to_string(),
690        ));
691    }
692
693    // read footer length
694    let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
695    footer_len
696        .try_into()
697        .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
698}
699
700/// A low-level, push-based interface for reading an IPC file
701///
702/// For a higher-level interface see [`FileReader`]
703///
704/// ```
705/// # use std::sync::Arc;
706/// # use arrow_array::*;
707/// # use arrow_array::types::Int32Type;
708/// # use arrow_buffer::Buffer;
709/// # use arrow_ipc::convert::fb_to_schema;
710/// # use arrow_ipc::reader::{FileDecoder, read_footer_length};
711/// # use arrow_ipc::root_as_footer;
712/// # use arrow_ipc::writer::FileWriter;
713/// // Write an IPC file
714///
715/// let batch = RecordBatch::try_from_iter([
716///     ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
717///     ("b", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
718///     ("c", Arc::new(DictionaryArray::<Int32Type>::from_iter(["hello", "hello", "world"])) as _),
719/// ]).unwrap();
720///
721/// let schema = batch.schema();
722///
723/// let mut out = Vec::with_capacity(1024);
724/// let mut writer = FileWriter::try_new(&mut out, schema.as_ref()).unwrap();
725/// writer.write(&batch).unwrap();
726/// writer.finish().unwrap();
727///
728/// drop(writer);
729///
730/// // Read IPC file
731///
732/// let buffer = Buffer::from_vec(out);
733/// let trailer_start = buffer.len() - 10;
734/// let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
735/// let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
736///
737/// let back = fb_to_schema(footer.schema().unwrap());
738/// assert_eq!(&back, schema.as_ref());
739///
740/// let mut decoder = FileDecoder::new(schema, footer.version());
741///
742/// // Read dictionaries
743/// for block in footer.dictionaries().iter().flatten() {
744///     let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
745///     let data = buffer.slice_with_length(block.offset() as _, block_len);
746///     decoder.read_dictionary(&block, &data).unwrap();
747/// }
748///
749/// // Read record batch
750/// let batches = footer.recordBatches().unwrap();
751/// assert_eq!(batches.len(), 1); // Only wrote a single batch
752///
753/// let block = batches.get(0);
754/// let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
755/// let data = buffer.slice_with_length(block.offset() as _, block_len);
756/// let back = decoder.read_record_batch(block, &data).unwrap().unwrap();
757///
758/// assert_eq!(batch, back);
759/// ```
760#[derive(Debug)]
761pub struct FileDecoder {
762    schema: SchemaRef,
763    dictionaries: HashMap<i64, ArrayRef>,
764    version: MetadataVersion,
765    projection: Option<Vec<usize>>,
766    require_alignment: bool,
767}
768
769impl FileDecoder {
770    /// Create a new [`FileDecoder`] with the given schema and version
771    pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
772        Self {
773            schema,
774            version,
775            dictionaries: Default::default(),
776            projection: None,
777            require_alignment: false,
778        }
779    }
780
781    /// Specify a projection
782    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
783        self.projection = Some(projection);
784        self
785    }
786
787    /// Specifies whether or not array data in input buffers is required to be properly aligned.
788    ///
789    /// If `require_alignment` is true, this decoder will return an error if any array data in the
790    /// input `buf` is not properly aligned.
791    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct
792    /// [`arrow_data::ArrayData`].
793    ///
794    /// If `require_alignment` is false (the default), this decoder will automatically allocate a
795    /// new aligned buffer and copy over the data if any array data in the input `buf` is not
796    /// properly aligned. (Properly aligned array data will remain zero-copy.)
797    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct
798    /// [`arrow_data::ArrayData`].
799    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
800        self.require_alignment = require_alignment;
801        self
802    }
803
804    fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message<'a>, ArrowError> {
805        let message = parse_message(buf)?;
806
807        // some old test data's footer metadata is not set, so we account for that
808        if self.version != MetadataVersion::V1 && message.version() != self.version {
809            return Err(ArrowError::IpcError(
810                "Could not read IPC message as metadata versions mismatch".to_string(),
811            ));
812        }
813        Ok(message)
814    }
815
816    /// Read the dictionary with the given block and data buffer
817    pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
818        let message = self.read_message(buf)?;
819        match message.header_type() {
820            crate::MessageHeader::DictionaryBatch => {
821                let batch = message.header_as_dictionary_batch().unwrap();
822                read_dictionary_impl(
823                    &buf.slice(block.metaDataLength() as _),
824                    batch,
825                    &self.schema,
826                    &mut self.dictionaries,
827                    &message.version(),
828                    self.require_alignment,
829                )
830            }
831            t => Err(ArrowError::ParseError(format!(
832                "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
833            ))),
834        }
835    }
836
837    /// Read the RecordBatch with the given block and data buffer
838    pub fn read_record_batch(
839        &self,
840        block: &Block,
841        buf: &Buffer,
842    ) -> Result<Option<RecordBatch>, ArrowError> {
843        let message = self.read_message(buf)?;
844        match message.header_type() {
845            crate::MessageHeader::Schema => Err(ArrowError::IpcError(
846                "Not expecting a schema when messages are read".to_string(),
847            )),
848            crate::MessageHeader::RecordBatch => {
849                let batch = message.header_as_record_batch().ok_or_else(|| {
850                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
851                })?;
852                // read the block that makes up the record batch into a buffer
853                read_record_batch_impl(
854                    &buf.slice(block.metaDataLength() as _),
855                    batch,
856                    self.schema.clone(),
857                    &self.dictionaries,
858                    self.projection.as_deref(),
859                    &message.version(),
860                    self.require_alignment,
861                )
862                .map(Some)
863            }
864            crate::MessageHeader::NONE => Ok(None),
865            t => Err(ArrowError::InvalidArgumentError(format!(
866                "Reading types other than record batches not yet supported, unable to read {t:?}"
867            ))),
868        }
869    }
870}
871
872/// Build an Arrow [`FileReader`] with custom options.
873#[derive(Debug)]
874pub struct FileReaderBuilder {
875    /// Optional projection for which columns to load (zero-based column indices)
876    projection: Option<Vec<usize>>,
877    /// Passed through to construct [`VerifierOptions`]
878    max_footer_fb_tables: usize,
879    /// Passed through to construct [`VerifierOptions`]
880    max_footer_fb_depth: usize,
881}
882
883impl Default for FileReaderBuilder {
884    fn default() -> Self {
885        let verifier_options = VerifierOptions::default();
886        Self {
887            max_footer_fb_tables: verifier_options.max_tables,
888            max_footer_fb_depth: verifier_options.max_depth,
889            projection: None,
890        }
891    }
892}
893
894impl FileReaderBuilder {
895    /// Options for creating a new [`FileReader`].
896    ///
897    /// To convert a builder into a reader, call [`FileReaderBuilder::build`].
898    pub fn new() -> Self {
899        Self::default()
900    }
901
902    /// Optional projection for which columns to load (zero-based column indices).
903    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
904        self.projection = Some(projection);
905        self
906    }
907
908    /// Flatbuffers option for parsing the footer. Controls the max number of fields and
909    /// metadata key-value pairs that can be parsed from the schema of the footer.
910    ///
911    /// By default this is set to `1_000_000` which roughly translates to a schema with
912    /// no metadata key-value pairs but 499,999 fields.
913    ///
914    /// This default limit is enforced to protect against malicious files with a massive
915    /// amount of flatbuffer tables which could cause a denial of service attack.
916    ///
917    /// If you need to ingest a trusted file with a massive number of fields and/or
918    /// metadata key-value pairs and are facing the error `"Unable to get root as
919    /// footer: TooManyTables"` then increase this parameter as necessary.
920    pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
921        self.max_footer_fb_tables = max_footer_fb_tables;
922        self
923    }
924
925    /// Flatbuffers option for parsing the footer. Controls the max depth for schemas with
926    /// nested fields parsed from the footer.
927    ///
928    /// By default this is set to `64` which roughly translates to a schema with
929    /// a field nested 60 levels down through other struct fields.
930    ///
931    /// This default limit is enforced to protect against malicious files with a extremely
932    /// deep flatbuffer structure which could cause a denial of service attack.
933    ///
934    /// If you need to ingest a trusted file with a deeply nested field and are facing the
935    /// error `"Unable to get root as footer: DepthLimitReached"` then increase this
936    /// parameter as necessary.
937    pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
938        self.max_footer_fb_depth = max_footer_fb_depth;
939        self
940    }
941
942    /// Build [`FileReader`] with given reader.
943    pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
944        // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
945        let mut buffer = [0; 10];
946        reader.seek(SeekFrom::End(-10))?;
947        reader.read_exact(&mut buffer)?;
948
949        let footer_len = read_footer_length(buffer)?;
950
951        // read footer
952        let mut footer_data = vec![0; footer_len];
953        reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
954        reader.read_exact(&mut footer_data)?;
955
956        let verifier_options = VerifierOptions {
957            max_tables: self.max_footer_fb_tables,
958            max_depth: self.max_footer_fb_depth,
959            ..Default::default()
960        };
961        let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
962            |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
963        )?;
964
965        let blocks = footer.recordBatches().ok_or_else(|| {
966            ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
967        })?;
968
969        let total_blocks = blocks.len();
970
971        let ipc_schema = footer.schema().unwrap();
972        if !ipc_schema.endianness().equals_to_target_endianness() {
973            return Err(ArrowError::IpcError(
974                "the endianness of the source system does not match the endianness of the target system.".to_owned()
975            ));
976        }
977
978        let schema = crate::convert::fb_to_schema(ipc_schema);
979
980        let mut custom_metadata = HashMap::new();
981        if let Some(fb_custom_metadata) = footer.custom_metadata() {
982            for kv in fb_custom_metadata.into_iter() {
983                custom_metadata.insert(
984                    kv.key().unwrap().to_string(),
985                    kv.value().unwrap().to_string(),
986                );
987            }
988        }
989
990        let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
991        if let Some(projection) = self.projection {
992            decoder = decoder.with_projection(projection)
993        }
994
995        // Create an array of optional dictionary value arrays, one per field.
996        if let Some(dictionaries) = footer.dictionaries() {
997            for block in dictionaries {
998                let buf = read_block(&mut reader, block)?;
999                decoder.read_dictionary(block, &buf)?;
1000            }
1001        }
1002
1003        Ok(FileReader {
1004            reader,
1005            blocks: blocks.iter().copied().collect(),
1006            current_block: 0,
1007            total_blocks,
1008            decoder,
1009            custom_metadata,
1010        })
1011    }
1012}
1013
1014/// Arrow File reader
1015pub struct FileReader<R> {
1016    /// File reader that supports reading and seeking
1017    reader: R,
1018
1019    /// The decoder
1020    decoder: FileDecoder,
1021
1022    /// The blocks in the file
1023    ///
1024    /// A block indicates the regions in the file to read to get data
1025    blocks: Vec<Block>,
1026
1027    /// A counter to keep track of the current block that should be read
1028    current_block: usize,
1029
1030    /// The total number of blocks, which may contain record batches and other types
1031    total_blocks: usize,
1032
1033    /// User defined metadata
1034    custom_metadata: HashMap<String, String>,
1035}
1036
1037impl<R> fmt::Debug for FileReader<R> {
1038    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1039        f.debug_struct("FileReader<R>")
1040            .field("decoder", &self.decoder)
1041            .field("blocks", &self.blocks)
1042            .field("current_block", &self.current_block)
1043            .field("total_blocks", &self.total_blocks)
1044            .finish_non_exhaustive()
1045    }
1046}
1047
1048impl<R: Read + Seek> FileReader<BufReader<R>> {
1049    /// Try to create a new file reader with the reader wrapped in a BufReader.
1050    ///
1051    /// See [`FileReader::try_new`] for an unbuffered version.
1052    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1053        Self::try_new(BufReader::new(reader), projection)
1054    }
1055}
1056
1057impl<R: Read + Seek> FileReader<R> {
1058    /// Try to create a new file reader.
1059    ///
1060    /// There is no internal buffering. If buffered reads are needed you likely want to use
1061    /// [`FileReader::try_new_buffered`] instead.    
1062    ///
1063    /// # Errors
1064    ///
1065    /// An ['Err'](Result::Err) may be returned if:
1066    /// - the file does not meet the Arrow Format footer requirements, or
1067    /// - file endianness does not match the target endianness.
1068    pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1069        let builder = FileReaderBuilder {
1070            projection,
1071            ..Default::default()
1072        };
1073        builder.build(reader)
1074    }
1075
1076    /// Return user defined customized metadata
1077    pub fn custom_metadata(&self) -> &HashMap<String, String> {
1078        &self.custom_metadata
1079    }
1080
1081    /// Return the number of batches in the file
1082    pub fn num_batches(&self) -> usize {
1083        self.total_blocks
1084    }
1085
1086    /// Return the schema of the file
1087    pub fn schema(&self) -> SchemaRef {
1088        self.decoder.schema.clone()
1089    }
1090
1091    /// Read a specific record batch
1092    ///
1093    /// Sets the current block to the index, allowing random reads
1094    pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1095        if index >= self.total_blocks {
1096            Err(ArrowError::InvalidArgumentError(format!(
1097                "Cannot set batch to index {} from {} total batches",
1098                index, self.total_blocks
1099            )))
1100        } else {
1101            self.current_block = index;
1102            Ok(())
1103        }
1104    }
1105
1106    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1107        let block = &self.blocks[self.current_block];
1108        self.current_block += 1;
1109
1110        // read length
1111        let buffer = read_block(&mut self.reader, block)?;
1112        self.decoder.read_record_batch(block, &buffer)
1113    }
1114
1115    /// Gets a reference to the underlying reader.
1116    ///
1117    /// It is inadvisable to directly read from the underlying reader.
1118    pub fn get_ref(&self) -> &R {
1119        &self.reader
1120    }
1121
1122    /// Gets a mutable reference to the underlying reader.
1123    ///
1124    /// It is inadvisable to directly read from the underlying reader.
1125    pub fn get_mut(&mut self) -> &mut R {
1126        &mut self.reader
1127    }
1128}
1129
1130impl<R: Read + Seek> Iterator for FileReader<R> {
1131    type Item = Result<RecordBatch, ArrowError>;
1132
1133    fn next(&mut self) -> Option<Self::Item> {
1134        // get current block
1135        if self.current_block < self.total_blocks {
1136            self.maybe_next().transpose()
1137        } else {
1138            None
1139        }
1140    }
1141}
1142
1143impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1144    fn schema(&self) -> SchemaRef {
1145        self.schema()
1146    }
1147}
1148
1149/// Arrow Stream reader
1150pub struct StreamReader<R> {
1151    /// Stream reader
1152    reader: R,
1153
1154    /// The schema that is read from the stream's first message
1155    schema: SchemaRef,
1156
1157    /// Optional dictionaries for each schema field.
1158    ///
1159    /// Dictionaries may be appended to in the streaming format.
1160    dictionaries_by_id: HashMap<i64, ArrayRef>,
1161
1162    /// An indicator of whether the stream is complete.
1163    ///
1164    /// This value is set to `true` the first time the reader's `next()` returns `None`.
1165    finished: bool,
1166
1167    /// Optional projection
1168    projection: Option<(Vec<usize>, Schema)>,
1169}
1170
1171impl<R> fmt::Debug for StreamReader<R> {
1172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1173        f.debug_struct("StreamReader<R>")
1174            .field("reader", &"R")
1175            .field("schema", &self.schema)
1176            .field("dictionaries_by_id", &self.dictionaries_by_id)
1177            .field("finished", &self.finished)
1178            .field("projection", &self.projection)
1179            .finish()
1180    }
1181}
1182
1183impl<R: Read> StreamReader<BufReader<R>> {
1184    /// Try to create a new stream reader with the reader wrapped in a BufReader.
1185    ///
1186    /// See [`StreamReader::try_new`] for an unbuffered version.
1187    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1188        Self::try_new(BufReader::new(reader), projection)
1189    }
1190}
1191
1192impl<R: Read> StreamReader<R> {
1193    /// Try to create a new stream reader.
1194    ///
1195    /// To check if the reader is done, use [`is_finished(self)`](StreamReader::is_finished).
1196    ///
1197    /// There is no internal buffering. If buffered reads are needed you likely want to use
1198    /// [`StreamReader::try_new_buffered`] instead.
1199    ///
1200    /// # Errors
1201    ///
1202    /// An ['Err'](Result::Err) may be returned if the reader does not encounter a schema
1203    /// as the first message in the stream.
1204    pub fn try_new(
1205        mut reader: R,
1206        projection: Option<Vec<usize>>,
1207    ) -> Result<StreamReader<R>, ArrowError> {
1208        // determine metadata length
1209        let mut meta_size: [u8; 4] = [0; 4];
1210        reader.read_exact(&mut meta_size)?;
1211        let meta_len = {
1212            // If a continuation marker is encountered, skip over it and read
1213            // the size from the next four bytes.
1214            if meta_size == CONTINUATION_MARKER {
1215                reader.read_exact(&mut meta_size)?;
1216            }
1217            i32::from_le_bytes(meta_size)
1218        };
1219
1220        let mut meta_buffer = vec![0; meta_len as usize];
1221        reader.read_exact(&mut meta_buffer)?;
1222
1223        let message = crate::root_as_message(meta_buffer.as_slice()).map_err(|err| {
1224            ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1225        })?;
1226        // message header is a Schema, so read it
1227        let ipc_schema: crate::Schema = message.header_as_schema().ok_or_else(|| {
1228            ArrowError::ParseError("Unable to read IPC message as schema".to_string())
1229        })?;
1230        let schema = crate::convert::fb_to_schema(ipc_schema);
1231
1232        // Create an array of optional dictionary value arrays, one per field.
1233        let dictionaries_by_id = HashMap::new();
1234
1235        let projection = match projection {
1236            Some(projection_indices) => {
1237                let schema = schema.project(&projection_indices)?;
1238                Some((projection_indices, schema))
1239            }
1240            _ => None,
1241        };
1242        Ok(Self {
1243            reader,
1244            schema: Arc::new(schema),
1245            finished: false,
1246            dictionaries_by_id,
1247            projection,
1248        })
1249    }
1250
1251    /// Deprecated, use [`StreamReader::try_new`] instead.
1252    #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1253    pub fn try_new_unbuffered(
1254        reader: R,
1255        projection: Option<Vec<usize>>,
1256    ) -> Result<Self, ArrowError> {
1257        Self::try_new(reader, projection)
1258    }
1259
1260    /// Return the schema of the stream
1261    pub fn schema(&self) -> SchemaRef {
1262        self.schema.clone()
1263    }
1264
1265    /// Check if the stream is finished
1266    pub fn is_finished(&self) -> bool {
1267        self.finished
1268    }
1269
1270    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1271        if self.finished {
1272            return Ok(None);
1273        }
1274        // determine metadata length
1275        let mut meta_size: [u8; 4] = [0; 4];
1276
1277        match self.reader.read_exact(&mut meta_size) {
1278            Ok(()) => (),
1279            Err(e) => {
1280                return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1281                    // Handle EOF without the "0xFFFFFFFF 0x00000000"
1282                    // valid according to:
1283                    // https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1284                    self.finished = true;
1285                    Ok(None)
1286                } else {
1287                    Err(ArrowError::from(e))
1288                };
1289            }
1290        }
1291
1292        let meta_len = {
1293            // If a continuation marker is encountered, skip over it and read
1294            // the size from the next four bytes.
1295            if meta_size == CONTINUATION_MARKER {
1296                self.reader.read_exact(&mut meta_size)?;
1297            }
1298            i32::from_le_bytes(meta_size)
1299        };
1300
1301        if meta_len == 0 {
1302            // the stream has ended, mark the reader as finished
1303            self.finished = true;
1304            return Ok(None);
1305        }
1306
1307        let mut meta_buffer = vec![0; meta_len as usize];
1308        self.reader.read_exact(&mut meta_buffer)?;
1309
1310        let vecs = &meta_buffer.to_vec();
1311        let message = crate::root_as_message(vecs).map_err(|err| {
1312            ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1313        })?;
1314
1315        match message.header_type() {
1316            crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1317                "Not expecting a schema when messages are read".to_string(),
1318            )),
1319            crate::MessageHeader::RecordBatch => {
1320                let batch = message.header_as_record_batch().ok_or_else(|| {
1321                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1322                })?;
1323                // read the block that makes up the record batch into a buffer
1324                let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1325                self.reader.read_exact(&mut buf)?;
1326
1327                read_record_batch_impl(
1328                    &buf.into(),
1329                    batch,
1330                    self.schema(),
1331                    &self.dictionaries_by_id,
1332                    self.projection.as_ref().map(|x| x.0.as_ref()),
1333                    &message.version(),
1334                    false,
1335                )
1336                .map(Some)
1337            }
1338            crate::MessageHeader::DictionaryBatch => {
1339                let batch = message.header_as_dictionary_batch().ok_or_else(|| {
1340                    ArrowError::IpcError(
1341                        "Unable to read IPC message as dictionary batch".to_string(),
1342                    )
1343                })?;
1344                // read the block that makes up the dictionary batch into a buffer
1345                let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1346                self.reader.read_exact(&mut buf)?;
1347
1348                read_dictionary_impl(
1349                    &buf.into(),
1350                    batch,
1351                    &self.schema,
1352                    &mut self.dictionaries_by_id,
1353                    &message.version(),
1354                    false,
1355                )?;
1356
1357                // read the next message until we encounter a RecordBatch
1358                self.maybe_next()
1359            }
1360            crate::MessageHeader::NONE => Ok(None),
1361            t => Err(ArrowError::InvalidArgumentError(format!(
1362                "Reading types other than record batches not yet supported, unable to read {t:?} "
1363            ))),
1364        }
1365    }
1366
1367    /// Gets a reference to the underlying reader.
1368    ///
1369    /// It is inadvisable to directly read from the underlying reader.
1370    pub fn get_ref(&self) -> &R {
1371        &self.reader
1372    }
1373
1374    /// Gets a mutable reference to the underlying reader.
1375    ///
1376    /// It is inadvisable to directly read from the underlying reader.
1377    pub fn get_mut(&mut self) -> &mut R {
1378        &mut self.reader
1379    }
1380}
1381
1382impl<R: Read> Iterator for StreamReader<R> {
1383    type Item = Result<RecordBatch, ArrowError>;
1384
1385    fn next(&mut self) -> Option<Self::Item> {
1386        self.maybe_next().transpose()
1387    }
1388}
1389
1390impl<R: Read> RecordBatchReader for StreamReader<R> {
1391    fn schema(&self) -> SchemaRef {
1392        self.schema.clone()
1393    }
1394}
1395
1396#[cfg(test)]
1397mod tests {
1398    use crate::writer::{unslice_run_array, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
1399
1400    use super::*;
1401
1402    use crate::root_as_message;
1403    use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1404    use arrow_array::types::*;
1405    use arrow_buffer::NullBuffer;
1406    use arrow_data::ArrayDataBuilder;
1407
1408    fn create_test_projection_schema() -> Schema {
1409        // define field types
1410        let list_data_type = DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
1411
1412        let fixed_size_list_data_type =
1413            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, false)), 3);
1414
1415        let union_fields = UnionFields::new(
1416            vec![0, 1],
1417            vec![
1418                Field::new("a", DataType::Int32, false),
1419                Field::new("b", DataType::Float64, false),
1420            ],
1421        );
1422
1423        let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1424
1425        let struct_fields = Fields::from(vec![
1426            Field::new("id", DataType::Int32, false),
1427            Field::new_list("list", Field::new("item", DataType::Int8, true), false),
1428        ]);
1429        let struct_data_type = DataType::Struct(struct_fields);
1430
1431        let run_encoded_data_type = DataType::RunEndEncoded(
1432            Arc::new(Field::new("run_ends", DataType::Int16, false)),
1433            Arc::new(Field::new("values", DataType::Int32, true)),
1434        );
1435
1436        // define schema
1437        Schema::new(vec![
1438            Field::new("f0", DataType::UInt32, false),
1439            Field::new("f1", DataType::Utf8, false),
1440            Field::new("f2", DataType::Boolean, false),
1441            Field::new("f3", union_data_type, true),
1442            Field::new("f4", DataType::Null, true),
1443            Field::new("f5", DataType::Float64, true),
1444            Field::new("f6", list_data_type, false),
1445            Field::new("f7", DataType::FixedSizeBinary(3), true),
1446            Field::new("f8", fixed_size_list_data_type, false),
1447            Field::new("f9", struct_data_type, false),
1448            Field::new("f10", run_encoded_data_type, false),
1449            Field::new("f11", DataType::Boolean, false),
1450            Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1451            Field::new("f13", DataType::Utf8, false),
1452        ])
1453    }
1454
1455    fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1456        // set test data for each column
1457        let array0 = UInt32Array::from(vec![1, 2, 3]);
1458        let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1459        let array2 = BooleanArray::from(vec![true, false, true]);
1460
1461        let mut union_builder = UnionBuilder::new_dense();
1462        union_builder.append::<Int32Type>("a", 1).unwrap();
1463        union_builder.append::<Float64Type>("b", 10.1).unwrap();
1464        union_builder.append_null::<Float64Type>("b").unwrap();
1465        let array3 = union_builder.build().unwrap();
1466
1467        let array4 = NullArray::new(3);
1468        let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1469        let array6_values = vec![
1470            Some(vec![Some(10), Some(10), Some(10)]),
1471            Some(vec![Some(20), Some(20), Some(20)]),
1472            Some(vec![Some(30), Some(30)]),
1473        ];
1474        let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1475        let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1476        let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1477
1478        let array8_values = ArrayData::builder(DataType::Int32)
1479            .len(9)
1480            .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1481            .build()
1482            .unwrap();
1483        let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1484            .len(3)
1485            .add_child_data(array8_values)
1486            .build()
1487            .unwrap();
1488        let array8 = FixedSizeListArray::from(array8_data);
1489
1490        let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
1491        let array9_list: ArrayRef =
1492            Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
1493                Some(vec![Some(-10)]),
1494                Some(vec![Some(-20), Some(-20), Some(-20)]),
1495                Some(vec![Some(-30)]),
1496            ]));
1497        let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
1498            .add_child_data(array9_id.into_data())
1499            .add_child_data(array9_list.into_data())
1500            .len(3)
1501            .build()
1502            .unwrap();
1503        let array9: ArrayRef = Arc::new(StructArray::from(array9));
1504
1505        let array10_input = vec![Some(1_i32), None, None];
1506        let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1507        array10_builder.extend(array10_input);
1508        let array10 = array10_builder.finish();
1509
1510        let array11 = BooleanArray::from(vec![false, false, true]);
1511
1512        let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
1513        let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
1514        let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
1515
1516        let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
1517
1518        // create record batch
1519        RecordBatch::try_new(
1520            Arc::new(schema.clone()),
1521            vec![
1522                Arc::new(array0),
1523                Arc::new(array1),
1524                Arc::new(array2),
1525                Arc::new(array3),
1526                Arc::new(array4),
1527                Arc::new(array5),
1528                Arc::new(array6),
1529                Arc::new(array7),
1530                Arc::new(array8),
1531                Arc::new(array9),
1532                Arc::new(array10),
1533                Arc::new(array11),
1534                Arc::new(array12),
1535                Arc::new(array13),
1536            ],
1537        )
1538        .unwrap()
1539    }
1540
1541    #[test]
1542    fn test_projection_array_values() {
1543        // define schema
1544        let schema = create_test_projection_schema();
1545
1546        // create record batch with test data
1547        let batch = create_test_projection_batch_data(&schema);
1548
1549        // write record batch in IPC format
1550        let mut buf = Vec::new();
1551        {
1552            let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
1553            writer.write(&batch).unwrap();
1554            writer.finish().unwrap();
1555        }
1556
1557        // read record batch with projection
1558        for index in 0..12 {
1559            let projection = vec![index];
1560            let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
1561            let read_batch = reader.unwrap().next().unwrap().unwrap();
1562            let projected_column = read_batch.column(0);
1563            let expected_column = batch.column(index);
1564
1565            // check the projected column equals the expected column
1566            assert_eq!(projected_column.as_ref(), expected_column.as_ref());
1567        }
1568
1569        {
1570            // read record batch with reversed projection
1571            let reader =
1572                FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
1573            let read_batch = reader.unwrap().next().unwrap().unwrap();
1574            let expected_batch = batch.project(&[3, 2, 1]).unwrap();
1575            assert_eq!(read_batch, expected_batch);
1576        }
1577    }
1578
1579    #[test]
1580    fn test_arrow_single_float_row() {
1581        let schema = Schema::new(vec![
1582            Field::new("a", DataType::Float32, false),
1583            Field::new("b", DataType::Float32, false),
1584            Field::new("c", DataType::Int32, false),
1585            Field::new("d", DataType::Int32, false),
1586        ]);
1587        let arrays = vec![
1588            Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
1589            Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
1590            Arc::new(Int32Array::from(vec![2])) as ArrayRef,
1591            Arc::new(Int32Array::from(vec![1])) as ArrayRef,
1592        ];
1593        let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
1594        // create stream writer
1595        let mut file = tempfile::tempfile().unwrap();
1596        let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
1597        stream_writer.write(&batch).unwrap();
1598        stream_writer.finish().unwrap();
1599
1600        drop(stream_writer);
1601
1602        file.rewind().unwrap();
1603
1604        // read stream back
1605        let reader = StreamReader::try_new(&mut file, None).unwrap();
1606
1607        reader.for_each(|batch| {
1608            let batch = batch.unwrap();
1609            assert!(
1610                batch
1611                    .column(0)
1612                    .as_any()
1613                    .downcast_ref::<Float32Array>()
1614                    .unwrap()
1615                    .value(0)
1616                    != 0.0
1617            );
1618            assert!(
1619                batch
1620                    .column(1)
1621                    .as_any()
1622                    .downcast_ref::<Float32Array>()
1623                    .unwrap()
1624                    .value(0)
1625                    != 0.0
1626            );
1627        });
1628
1629        file.rewind().unwrap();
1630
1631        // Read with projection
1632        let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
1633
1634        reader.for_each(|batch| {
1635            let batch = batch.unwrap();
1636            assert_eq!(batch.schema().fields().len(), 2);
1637            assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
1638            assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
1639        });
1640    }
1641
1642    fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
1643        let mut buf = Vec::new();
1644        let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
1645        writer.write(rb).unwrap();
1646        writer.finish().unwrap();
1647        drop(writer);
1648
1649        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
1650        reader.next().unwrap().unwrap()
1651    }
1652
1653    fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
1654        let mut buf = Vec::new();
1655        let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
1656        writer.write(rb).unwrap();
1657        writer.finish().unwrap();
1658        drop(writer);
1659
1660        let mut reader =
1661            crate::reader::StreamReader::try_new(std::io::Cursor::new(buf), None).unwrap();
1662        reader.next().unwrap().unwrap()
1663    }
1664
1665    #[test]
1666    fn test_roundtrip_with_custom_metadata() {
1667        let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
1668        let mut buf = Vec::new();
1669        let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
1670        let mut test_metadata = HashMap::new();
1671        test_metadata.insert("abc".to_string(), "abc".to_string());
1672        test_metadata.insert("def".to_string(), "def".to_string());
1673        for (k, v) in &test_metadata {
1674            writer.write_metadata(k, v);
1675        }
1676        writer.finish().unwrap();
1677        drop(writer);
1678
1679        let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
1680        assert_eq!(reader.custom_metadata(), &test_metadata);
1681    }
1682
1683    #[test]
1684    fn test_roundtrip_nested_dict() {
1685        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
1686
1687        let array = Arc::new(inner) as ArrayRef;
1688
1689        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
1690
1691        let s = StructArray::from(vec![(dctfield, array)]);
1692        let struct_array = Arc::new(s) as ArrayRef;
1693
1694        let schema = Arc::new(Schema::new(vec![Field::new(
1695            "struct",
1696            struct_array.data_type().clone(),
1697            false,
1698        )]));
1699
1700        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
1701
1702        assert_eq!(batch, roundtrip_ipc(&batch));
1703    }
1704
1705    #[test]
1706    fn test_roundtrip_nested_dict_no_preserve_dict_id() {
1707        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
1708
1709        let array = Arc::new(inner) as ArrayRef;
1710
1711        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
1712
1713        let s = StructArray::from(vec![(dctfield, array)]);
1714        let struct_array = Arc::new(s) as ArrayRef;
1715
1716        let schema = Arc::new(Schema::new(vec![Field::new(
1717            "struct",
1718            struct_array.data_type().clone(),
1719            false,
1720        )]));
1721
1722        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
1723
1724        let mut buf = Vec::new();
1725        let mut writer = crate::writer::FileWriter::try_new_with_options(
1726            &mut buf,
1727            batch.schema_ref(),
1728            IpcWriteOptions::default().with_preserve_dict_id(false),
1729        )
1730        .unwrap();
1731        writer.write(&batch).unwrap();
1732        writer.finish().unwrap();
1733        drop(writer);
1734
1735        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
1736
1737        assert_eq!(batch, reader.next().unwrap().unwrap());
1738    }
1739
1740    fn check_union_with_builder(mut builder: UnionBuilder) {
1741        builder.append::<Int32Type>("a", 1).unwrap();
1742        builder.append_null::<Int32Type>("a").unwrap();
1743        builder.append::<Float64Type>("c", 3.0).unwrap();
1744        builder.append::<Int32Type>("a", 4).unwrap();
1745        builder.append::<Int64Type>("d", 11).unwrap();
1746        let union = builder.build().unwrap();
1747
1748        let schema = Arc::new(Schema::new(vec![Field::new(
1749            "union",
1750            union.data_type().clone(),
1751            false,
1752        )]));
1753
1754        let union_array = Arc::new(union) as ArrayRef;
1755
1756        let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
1757        let rb2 = roundtrip_ipc(&rb);
1758        // TODO: equality not yet implemented for union, so we check that the length of the array is
1759        // the same and that all of the buffers are the same instead.
1760        assert_eq!(rb.schema(), rb2.schema());
1761        assert_eq!(rb.num_columns(), rb2.num_columns());
1762        assert_eq!(rb.num_rows(), rb2.num_rows());
1763        let union1 = rb.column(0);
1764        let union2 = rb2.column(0);
1765
1766        assert_eq!(union1, union2);
1767    }
1768
1769    #[test]
1770    fn test_roundtrip_dense_union() {
1771        check_union_with_builder(UnionBuilder::new_dense());
1772    }
1773
1774    #[test]
1775    fn test_roundtrip_sparse_union() {
1776        check_union_with_builder(UnionBuilder::new_sparse());
1777    }
1778
1779    #[test]
1780    fn test_roundtrip_struct_empty_fields() {
1781        let nulls = NullBuffer::from(&[true, true, false][..]);
1782        let rb = RecordBatch::try_from_iter([(
1783            "",
1784            Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
1785        )])
1786        .unwrap();
1787        let rb2 = roundtrip_ipc(&rb);
1788        assert_eq!(rb, rb2);
1789    }
1790
1791    #[test]
1792    fn test_roundtrip_stream_run_array_sliced() {
1793        let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
1794            .into_iter()
1795            .collect();
1796        let run_array_1_sliced = run_array_1.slice(2, 5);
1797
1798        let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
1799        let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1800        run_array_2_builder.extend(run_array_2_inupt);
1801        let run_array_2 = run_array_2_builder.finish();
1802
1803        let schema = Arc::new(Schema::new(vec![
1804            Field::new(
1805                "run_array_1_sliced",
1806                run_array_1_sliced.data_type().clone(),
1807                false,
1808            ),
1809            Field::new("run_array_2", run_array_2.data_type().clone(), false),
1810        ]));
1811        let input_batch = RecordBatch::try_new(
1812            schema,
1813            vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
1814        )
1815        .unwrap();
1816        let output_batch = roundtrip_ipc_stream(&input_batch);
1817
1818        // As partial comparison not yet supported for run arrays, the sliced run array
1819        // has to be unsliced before comparing with the output. the second run array
1820        // can be compared as such.
1821        assert_eq!(input_batch.column(1), output_batch.column(1));
1822
1823        let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
1824        assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
1825    }
1826
1827    #[test]
1828    fn test_roundtrip_stream_nested_dict() {
1829        let xs = vec!["AA", "BB", "AA", "CC", "BB"];
1830        let dict = Arc::new(
1831            xs.clone()
1832                .into_iter()
1833                .collect::<DictionaryArray<Int8Type>>(),
1834        );
1835        let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
1836        let struct_array = StructArray::from(vec![
1837            (
1838                Arc::new(Field::new("f2.1", DataType::Utf8, false)),
1839                string_array,
1840            ),
1841            (
1842                Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
1843                dict.clone() as ArrayRef,
1844            ),
1845        ]);
1846        let schema = Arc::new(Schema::new(vec![
1847            Field::new("f1_string", DataType::Utf8, false),
1848            Field::new("f2_struct", struct_array.data_type().clone(), false),
1849        ]));
1850        let input_batch = RecordBatch::try_new(
1851            schema,
1852            vec![
1853                Arc::new(StringArray::from(xs.clone())),
1854                Arc::new(struct_array),
1855            ],
1856        )
1857        .unwrap();
1858        let output_batch = roundtrip_ipc_stream(&input_batch);
1859        assert_eq!(input_batch, output_batch);
1860    }
1861
1862    #[test]
1863    fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
1864        let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
1865        let values = Arc::new(values) as ArrayRef;
1866        let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
1867        let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
1868
1869        let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
1870        let key_dict_array = DictionaryArray::new(key_dict_keys, values);
1871
1872        let keys_field = Arc::new(Field::new_dict(
1873            "keys",
1874            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1875            true, // It is technically not legal for this field to be null.
1876            1,
1877            false,
1878        ));
1879        let values_field = Arc::new(Field::new_dict(
1880            "values",
1881            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1882            true,
1883            2,
1884            false,
1885        ));
1886        let entry_struct = StructArray::from(vec![
1887            (keys_field, make_array(key_dict_array.into_data())),
1888            (values_field, make_array(value_dict_array.into_data())),
1889        ]);
1890        let map_data_type = DataType::Map(
1891            Arc::new(Field::new(
1892                "entries",
1893                entry_struct.data_type().clone(),
1894                false,
1895            )),
1896            false,
1897        );
1898
1899        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
1900        let map_data = ArrayData::builder(map_data_type)
1901            .len(3)
1902            .add_buffer(entry_offsets)
1903            .add_child_data(entry_struct.into_data())
1904            .build()
1905            .unwrap();
1906        let map_array = MapArray::from(map_data);
1907
1908        let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
1909        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
1910
1911        let schema = Arc::new(Schema::new(vec![Field::new(
1912            "f1",
1913            dict_dict_array.data_type().clone(),
1914            false,
1915        )]));
1916        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
1917        let output_batch = roundtrip_ipc_stream(&input_batch);
1918        assert_eq!(input_batch, output_batch);
1919    }
1920
1921    fn test_roundtrip_stream_dict_of_list_of_dict_impl<
1922        OffsetSize: OffsetSizeTrait,
1923        U: ArrowNativeType,
1924    >(
1925        list_data_type: DataType,
1926        offsets: &[U; 5],
1927    ) {
1928        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
1929        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
1930        let dict_array = DictionaryArray::new(keys, Arc::new(values));
1931        let dict_data = dict_array.to_data();
1932
1933        let value_offsets = Buffer::from_slice_ref(offsets);
1934
1935        let list_data = ArrayData::builder(list_data_type)
1936            .len(4)
1937            .add_buffer(value_offsets)
1938            .add_child_data(dict_data)
1939            .build()
1940            .unwrap();
1941        let list_array = GenericListArray::<OffsetSize>::from(list_data);
1942
1943        let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
1944        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
1945
1946        let schema = Arc::new(Schema::new(vec![Field::new(
1947            "f1",
1948            dict_dict_array.data_type().clone(),
1949            false,
1950        )]));
1951        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
1952        let output_batch = roundtrip_ipc_stream(&input_batch);
1953        assert_eq!(input_batch, output_batch);
1954    }
1955
1956    #[test]
1957    fn test_roundtrip_stream_dict_of_list_of_dict() {
1958        // list
1959        let list_data_type = DataType::List(Arc::new(Field::new_dict(
1960            "item",
1961            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1962            true,
1963            1,
1964            false,
1965        )));
1966        let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
1967        test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
1968
1969        // large list
1970        let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
1971            "item",
1972            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1973            true,
1974            1,
1975            false,
1976        )));
1977        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
1978        test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
1979    }
1980
1981    #[test]
1982    fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
1983        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
1984        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
1985        let dict_array = DictionaryArray::new(keys, Arc::new(values));
1986        let dict_data = dict_array.into_data();
1987
1988        let list_data_type = DataType::FixedSizeList(
1989            Arc::new(Field::new_dict(
1990                "item",
1991                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1992                true,
1993                1,
1994                false,
1995            )),
1996            3,
1997        );
1998        let list_data = ArrayData::builder(list_data_type)
1999            .len(3)
2000            .add_child_data(dict_data)
2001            .build()
2002            .unwrap();
2003        let list_array = FixedSizeListArray::from(list_data);
2004
2005        let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2006        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2007
2008        let schema = Arc::new(Schema::new(vec![Field::new(
2009            "f1",
2010            dict_dict_array.data_type().clone(),
2011            false,
2012        )]));
2013        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2014        let output_batch = roundtrip_ipc_stream(&input_batch);
2015        assert_eq!(input_batch, output_batch);
2016    }
2017
2018    const LONG_TEST_STRING: &str =
2019        "This is a long string to make sure binary view array handles it";
2020
2021    #[test]
2022    fn test_roundtrip_view_types() {
2023        let schema = Schema::new(vec![
2024            Field::new("field_1", DataType::BinaryView, true),
2025            Field::new("field_2", DataType::Utf8, true),
2026            Field::new("field_3", DataType::Utf8View, true),
2027        ]);
2028        let bin_values: Vec<Option<&[u8]>> = vec![
2029            Some(b"foo"),
2030            None,
2031            Some(b"bar"),
2032            Some(LONG_TEST_STRING.as_bytes()),
2033        ];
2034        let utf8_values: Vec<Option<&str>> =
2035            vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2036        let bin_view_array = BinaryViewArray::from_iter(bin_values);
2037        let utf8_array = StringArray::from_iter(utf8_values.iter());
2038        let utf8_view_array = StringViewArray::from_iter(utf8_values);
2039        let record_batch = RecordBatch::try_new(
2040            Arc::new(schema.clone()),
2041            vec![
2042                Arc::new(bin_view_array),
2043                Arc::new(utf8_array),
2044                Arc::new(utf8_view_array),
2045            ],
2046        )
2047        .unwrap();
2048
2049        assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2050        assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2051
2052        let sliced_batch = record_batch.slice(1, 2);
2053        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2054        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2055    }
2056
2057    #[test]
2058    fn test_roundtrip_view_types_nested_dict() {
2059        let bin_values: Vec<Option<&[u8]>> = vec![
2060            Some(b"foo"),
2061            None,
2062            Some(b"bar"),
2063            Some(LONG_TEST_STRING.as_bytes()),
2064            Some(b"field"),
2065        ];
2066        let utf8_values: Vec<Option<&str>> = vec![
2067            Some("foo"),
2068            None,
2069            Some("bar"),
2070            Some(LONG_TEST_STRING),
2071            Some("field"),
2072        ];
2073        let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2074        let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2075
2076        let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2077        let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2078        let keys_field = Arc::new(Field::new_dict(
2079            "keys",
2080            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2081            true,
2082            1,
2083            false,
2084        ));
2085
2086        let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2087        let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2088        let values_field = Arc::new(Field::new_dict(
2089            "values",
2090            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2091            true,
2092            2,
2093            false,
2094        ));
2095        let entry_struct = StructArray::from(vec![
2096            (keys_field, make_array(key_dict_array.into_data())),
2097            (values_field, make_array(value_dict_array.into_data())),
2098        ]);
2099
2100        let map_data_type = DataType::Map(
2101            Arc::new(Field::new(
2102                "entries",
2103                entry_struct.data_type().clone(),
2104                false,
2105            )),
2106            false,
2107        );
2108        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2109        let map_data = ArrayData::builder(map_data_type)
2110            .len(3)
2111            .add_buffer(entry_offsets)
2112            .add_child_data(entry_struct.into_data())
2113            .build()
2114            .unwrap();
2115        let map_array = MapArray::from(map_data);
2116
2117        let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2118        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2119        let schema = Arc::new(Schema::new(vec![Field::new(
2120            "f1",
2121            dict_dict_array.data_type().clone(),
2122            false,
2123        )]));
2124        let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2125        assert_eq!(batch, roundtrip_ipc(&batch));
2126        assert_eq!(batch, roundtrip_ipc_stream(&batch));
2127
2128        let sliced_batch = batch.slice(1, 2);
2129        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2130        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2131    }
2132
2133    #[test]
2134    fn test_no_columns_batch() {
2135        let schema = Arc::new(Schema::empty());
2136        let options = RecordBatchOptions::new()
2137            .with_match_field_names(true)
2138            .with_row_count(Some(10));
2139        let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2140        let output_batch = roundtrip_ipc_stream(&input_batch);
2141        assert_eq!(input_batch, output_batch);
2142    }
2143
2144    #[test]
2145    fn test_unaligned() {
2146        let batch = RecordBatch::try_from_iter(vec![(
2147            "i32",
2148            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2149        )])
2150        .unwrap();
2151
2152        let gen = IpcDataGenerator {};
2153        let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2154        let (_, encoded) = gen
2155            .encoded_batch(&batch, &mut dict_tracker, &Default::default())
2156            .unwrap();
2157
2158        let message = root_as_message(&encoded.ipc_message).unwrap();
2159
2160        // Construct an unaligned buffer
2161        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2162        buffer.push(0_u8);
2163        buffer.extend_from_slice(&encoded.arrow_data);
2164        let b = Buffer::from(buffer).slice(1);
2165        assert_ne!(b.as_ptr().align_offset(8), 0);
2166
2167        let ipc_batch = message.header_as_record_batch().unwrap();
2168        let roundtrip = read_record_batch_impl(
2169            &b,
2170            ipc_batch,
2171            batch.schema(),
2172            &Default::default(),
2173            None,
2174            &message.version(),
2175            false,
2176        )
2177        .unwrap();
2178        assert_eq!(batch, roundtrip);
2179    }
2180
2181    #[test]
2182    fn test_unaligned_throws_error_with_require_alignment() {
2183        let batch = RecordBatch::try_from_iter(vec![(
2184            "i32",
2185            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2186        )])
2187        .unwrap();
2188
2189        let gen = IpcDataGenerator {};
2190        let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2191        let (_, encoded) = gen
2192            .encoded_batch(&batch, &mut dict_tracker, &Default::default())
2193            .unwrap();
2194
2195        let message = root_as_message(&encoded.ipc_message).unwrap();
2196
2197        // Construct an unaligned buffer
2198        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2199        buffer.push(0_u8);
2200        buffer.extend_from_slice(&encoded.arrow_data);
2201        let b = Buffer::from(buffer).slice(1);
2202        assert_ne!(b.as_ptr().align_offset(8), 0);
2203
2204        let ipc_batch = message.header_as_record_batch().unwrap();
2205        let result = read_record_batch_impl(
2206            &b,
2207            ipc_batch,
2208            batch.schema(),
2209            &Default::default(),
2210            None,
2211            &message.version(),
2212            true,
2213        );
2214
2215        let error = result.unwrap_err();
2216        assert_eq!(
2217            error.to_string(),
2218            "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
2219             offset from expected alignment of 4 by 1"
2220        );
2221    }
2222
2223    #[test]
2224    fn test_file_with_massive_column_count() {
2225        // 499_999 is upper limit for default settings (1_000_000)
2226        let limit = 600_000;
2227
2228        let fields = (0..limit)
2229            .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
2230            .collect::<Vec<_>>();
2231        let schema = Arc::new(Schema::new(fields));
2232        let batch = RecordBatch::new_empty(schema);
2233
2234        let mut buf = Vec::new();
2235        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2236        writer.write(&batch).unwrap();
2237        writer.finish().unwrap();
2238        drop(writer);
2239
2240        let mut reader = FileReaderBuilder::new()
2241            .with_max_footer_fb_tables(1_500_000)
2242            .build(std::io::Cursor::new(buf))
2243            .unwrap();
2244        let roundtrip_batch = reader.next().unwrap().unwrap();
2245
2246        assert_eq!(batch, roundtrip_batch);
2247    }
2248
2249    #[test]
2250    fn test_file_with_deeply_nested_columns() {
2251        // 60 is upper limit for default settings (64)
2252        let limit = 61;
2253
2254        let fields = (0..limit).fold(
2255            vec![Field::new("leaf", DataType::Boolean, false)],
2256            |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
2257        );
2258        let schema = Arc::new(Schema::new(fields));
2259        let batch = RecordBatch::new_empty(schema);
2260
2261        let mut buf = Vec::new();
2262        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2263        writer.write(&batch).unwrap();
2264        writer.finish().unwrap();
2265        drop(writer);
2266
2267        let mut reader = FileReaderBuilder::new()
2268            .with_max_footer_fb_depth(65)
2269            .build(std::io::Cursor::new(buf))
2270            .unwrap();
2271        let roundtrip_batch = reader.next().unwrap().unwrap();
2272
2273        assert_eq!(batch, roundtrip_batch);
2274    }
2275
2276    #[test]
2277    fn test_invalid_struct_array_ipc_read_errors() {
2278        let a_field = Field::new("a", DataType::Int32, false);
2279        let b_field = Field::new("b", DataType::Int32, false);
2280
2281        let schema = Arc::new(Schema::new(vec![Field::new_struct(
2282            "s",
2283            vec![a_field.clone(), b_field.clone()],
2284            false,
2285        )]));
2286
2287        let a_array_data = ArrayData::builder(a_field.data_type().clone())
2288            .len(4)
2289            .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2290            .build()
2291            .unwrap();
2292        let b_array_data = ArrayData::builder(b_field.data_type().clone())
2293            .len(3)
2294            .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
2295            .build()
2296            .unwrap();
2297
2298        let struct_data_type = schema.field(0).data_type();
2299
2300        let invalid_struct_arr = unsafe {
2301            make_array(
2302                ArrayData::builder(struct_data_type.clone())
2303                    .len(4)
2304                    .add_child_data(a_array_data)
2305                    .add_child_data(b_array_data)
2306                    .build_unchecked(),
2307            )
2308        };
2309
2310        let batch = RecordBatch::try_new(schema.clone(), vec![invalid_struct_arr]).unwrap();
2311
2312        let mut buf = Vec::new();
2313        let mut writer = crate::writer::FileWriter::try_new(&mut buf, schema.as_ref()).unwrap();
2314        writer.write(&batch).unwrap();
2315        writer.finish().unwrap();
2316
2317        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2318        let err = reader.next().unwrap().unwrap_err();
2319        assert!(matches!(err, ArrowError::InvalidArgumentError(_)));
2320    }
2321
2322    #[test]
2323    fn test_same_dict_id_without_preserve() {
2324        let batch = RecordBatch::try_new(
2325            Arc::new(Schema::new(
2326                ["a", "b"]
2327                    .iter()
2328                    .map(|name| {
2329                        Field::new_dict(
2330                            name.to_string(),
2331                            DataType::Dictionary(
2332                                Box::new(DataType::Int32),
2333                                Box::new(DataType::Utf8),
2334                            ),
2335                            true,
2336                            0,
2337                            false,
2338                        )
2339                    })
2340                    .collect::<Vec<Field>>(),
2341            )),
2342            vec![
2343                Arc::new(
2344                    vec![Some("c"), Some("d")]
2345                        .into_iter()
2346                        .collect::<DictionaryArray<Int32Type>>(),
2347                ) as ArrayRef,
2348                Arc::new(
2349                    vec![Some("e"), Some("f")]
2350                        .into_iter()
2351                        .collect::<DictionaryArray<Int32Type>>(),
2352                ) as ArrayRef,
2353            ],
2354        )
2355        .expect("Failed to create RecordBatch");
2356
2357        // serialize the record batch as an IPC stream
2358        let mut buf = vec![];
2359        {
2360            let mut writer = crate::writer::StreamWriter::try_new_with_options(
2361                &mut buf,
2362                batch.schema().as_ref(),
2363                crate::writer::IpcWriteOptions::default().with_preserve_dict_id(false),
2364            )
2365            .expect("Failed to create StreamWriter");
2366            writer.write(&batch).expect("Failed to write RecordBatch");
2367            writer.finish().expect("Failed to finish StreamWriter");
2368        }
2369
2370        StreamReader::try_new(std::io::Cursor::new(buf), None)
2371            .expect("Failed to create StreamReader")
2372            .for_each(|decoded_batch| {
2373                assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
2374            });
2375    }
2376}