arrow_ipc/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Writers
19//!
20//! The `FileWriter` and `StreamWriter` have similar interfaces,
21//! however the `FileWriter` expects a reader that supports `Seek`ing
22
23use std::cmp::min;
24use std::collections::HashMap;
25use std::io::{BufWriter, Write};
26use std::sync::Arc;
27
28use flatbuffers::FlatBufferBuilder;
29
30use arrow_array::builder::BufferBuilder;
31use arrow_array::cast::*;
32use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
33use arrow_array::*;
34use arrow_buffer::bit_util;
35use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
36use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec};
37use arrow_schema::*;
38
39use crate::compression::CompressionCodec;
40use crate::convert::IpcSchemaEncoder;
41use crate::CONTINUATION_MARKER;
42
43/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
44#[derive(Debug, Clone)]
45pub struct IpcWriteOptions {
46    /// Write padding after memory buffers to this multiple of bytes.
47    /// Must be 8, 16, 32, or 64 - defaults to 64.
48    alignment: u8,
49    /// The legacy format is for releases before 0.15.0, and uses metadata V4
50    write_legacy_ipc_format: bool,
51    /// The metadata version to write. The Rust IPC writer supports V4+
52    ///
53    /// *Default versions per crate*
54    ///
55    /// When creating the default IpcWriteOptions, the following metadata versions are used:
56    ///
57    /// version 2.0.0: V4, with legacy format enabled
58    /// version 4.0.0: V5
59    metadata_version: crate::MetadataVersion,
60    /// Compression, if desired. Will result in a runtime error
61    /// if the corresponding feature is not enabled
62    batch_compression_type: Option<crate::CompressionType>,
63    /// Flag indicating whether the writer should preserve the dictionary IDs defined in the
64    /// schema or generate unique dictionary IDs internally during encoding.
65    ///
66    /// Defaults to `true`
67    preserve_dict_id: bool,
68}
69
70impl IpcWriteOptions {
71    /// Configures compression when writing IPC files.
72    ///
73    /// Will result in a runtime error if the corresponding feature
74    /// is not enabled
75    pub fn try_with_compression(
76        mut self,
77        batch_compression_type: Option<crate::CompressionType>,
78    ) -> Result<Self, ArrowError> {
79        self.batch_compression_type = batch_compression_type;
80
81        if self.batch_compression_type.is_some()
82            && self.metadata_version < crate::MetadataVersion::V5
83        {
84            return Err(ArrowError::InvalidArgumentError(
85                "Compression only supported in metadata v5 and above".to_string(),
86            ));
87        }
88        Ok(self)
89    }
90    /// Try to create IpcWriteOptions, checking for incompatible settings
91    pub fn try_new(
92        alignment: usize,
93        write_legacy_ipc_format: bool,
94        metadata_version: crate::MetadataVersion,
95    ) -> Result<Self, ArrowError> {
96        let is_alignment_valid =
97            alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
98        if !is_alignment_valid {
99            return Err(ArrowError::InvalidArgumentError(
100                "Alignment should be 8, 16, 32, or 64.".to_string(),
101            ));
102        }
103        let alignment: u8 = u8::try_from(alignment).expect("range already checked");
104        match metadata_version {
105            crate::MetadataVersion::V1
106            | crate::MetadataVersion::V2
107            | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
108                "Writing IPC metadata version 3 and lower not supported".to_string(),
109            )),
110            crate::MetadataVersion::V4 => Ok(Self {
111                alignment,
112                write_legacy_ipc_format,
113                metadata_version,
114                batch_compression_type: None,
115                preserve_dict_id: true,
116            }),
117            crate::MetadataVersion::V5 => {
118                if write_legacy_ipc_format {
119                    Err(ArrowError::InvalidArgumentError(
120                        "Legacy IPC format only supported on metadata version 4".to_string(),
121                    ))
122                } else {
123                    Ok(Self {
124                        alignment,
125                        write_legacy_ipc_format,
126                        metadata_version,
127                        batch_compression_type: None,
128                        preserve_dict_id: true,
129                    })
130                }
131            }
132            z => Err(ArrowError::InvalidArgumentError(format!(
133                "Unsupported crate::MetadataVersion {z:?}"
134            ))),
135        }
136    }
137
138    /// Return whether the writer is configured to preserve the dictionary IDs
139    /// defined in the schema
140    pub fn preserve_dict_id(&self) -> bool {
141        self.preserve_dict_id
142    }
143
144    /// Set whether the IPC writer should preserve the dictionary IDs in the schema
145    /// or auto-assign unique dictionary IDs during encoding (defaults to true)
146    ///
147    /// If this option is true,  the application must handle assigning ids
148    /// to the dictionary batches in order to encode them correctly
149    ///
150    /// The default will change to `false`  in future releases
151    pub fn with_preserve_dict_id(mut self, preserve_dict_id: bool) -> Self {
152        self.preserve_dict_id = preserve_dict_id;
153        self
154    }
155}
156
157impl Default for IpcWriteOptions {
158    fn default() -> Self {
159        Self {
160            alignment: 64,
161            write_legacy_ipc_format: false,
162            metadata_version: crate::MetadataVersion::V5,
163            batch_compression_type: None,
164            preserve_dict_id: true,
165        }
166    }
167}
168
169#[derive(Debug, Default)]
170/// Handles low level details of encoding [`Array`] and [`Schema`] into the
171/// [Arrow IPC Format].
172///
173/// # Example:
174/// ```
175/// # fn run() {
176/// # use std::sync::Arc;
177/// # use arrow_array::UInt64Array;
178/// # use arrow_array::RecordBatch;
179/// # use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
180///
181/// // Create a record batch
182/// let batch = RecordBatch::try_from_iter(vec![
183///  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
184/// ]).unwrap();
185///
186/// // Error of dictionary ids are replaced.
187/// let error_on_replacement = true;
188/// let options = IpcWriteOptions::default();
189/// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
190///
191/// // encode the batch into zero or more encoded dictionaries
192/// // and the data for the actual array.
193/// let data_gen = IpcDataGenerator::default();
194/// let (encoded_dictionaries, encoded_message) = data_gen
195///   .encoded_batch(&batch, &mut dictionary_tracker, &options)
196///   .unwrap();
197/// # }
198/// ```
199///
200/// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
201pub struct IpcDataGenerator {}
202
203impl IpcDataGenerator {
204    /// Converts a schema to an IPC message along with `dictionary_tracker`
205    /// and returns it encoded inside [EncodedData] as a flatbuffer
206    ///
207    /// Preferred method over [IpcDataGenerator::schema_to_bytes] since it's
208    /// deprecated since Arrow v54.0.0
209    pub fn schema_to_bytes_with_dictionary_tracker(
210        &self,
211        schema: &Schema,
212        dictionary_tracker: &mut DictionaryTracker,
213        write_options: &IpcWriteOptions,
214    ) -> EncodedData {
215        let mut fbb = FlatBufferBuilder::new();
216        let schema = {
217            let fb = IpcSchemaEncoder::new()
218                .with_dictionary_tracker(dictionary_tracker)
219                .schema_to_fb_offset(&mut fbb, schema);
220            fb.as_union_value()
221        };
222
223        let mut message = crate::MessageBuilder::new(&mut fbb);
224        message.add_version(write_options.metadata_version);
225        message.add_header_type(crate::MessageHeader::Schema);
226        message.add_bodyLength(0);
227        message.add_header(schema);
228        // TODO: custom metadata
229        let data = message.finish();
230        fbb.finish(data, None);
231
232        let data = fbb.finished_data();
233        EncodedData {
234            ipc_message: data.to_vec(),
235            arrow_data: vec![],
236        }
237    }
238
239    #[deprecated(
240        since = "54.0.0",
241        note = "Use `schema_to_bytes_with_dictionary_tracker` instead. This function signature of `schema_to_bytes_with_dictionary_tracker` in the next release."
242    )]
243    /// Converts a schema to an IPC message and returns it encoded inside [EncodedData] as a flatbuffer
244    pub fn schema_to_bytes(&self, schema: &Schema, write_options: &IpcWriteOptions) -> EncodedData {
245        let mut fbb = FlatBufferBuilder::new();
246        let schema = {
247            #[allow(deprecated)]
248            // This will be replaced with the IpcSchemaConverter in the next release.
249            let fb = crate::convert::schema_to_fb_offset(&mut fbb, schema);
250            fb.as_union_value()
251        };
252
253        let mut message = crate::MessageBuilder::new(&mut fbb);
254        message.add_version(write_options.metadata_version);
255        message.add_header_type(crate::MessageHeader::Schema);
256        message.add_bodyLength(0);
257        message.add_header(schema);
258        // TODO: custom metadata
259        let data = message.finish();
260        fbb.finish(data, None);
261
262        let data = fbb.finished_data();
263        EncodedData {
264            ipc_message: data.to_vec(),
265            arrow_data: vec![],
266        }
267    }
268
269    fn _encode_dictionaries<I: Iterator<Item = i64>>(
270        &self,
271        column: &ArrayRef,
272        encoded_dictionaries: &mut Vec<EncodedData>,
273        dictionary_tracker: &mut DictionaryTracker,
274        write_options: &IpcWriteOptions,
275        dict_id: &mut I,
276    ) -> Result<(), ArrowError> {
277        match column.data_type() {
278            DataType::Struct(fields) => {
279                let s = as_struct_array(column);
280                for (field, column) in fields.iter().zip(s.columns()) {
281                    self.encode_dictionaries(
282                        field,
283                        column,
284                        encoded_dictionaries,
285                        dictionary_tracker,
286                        write_options,
287                        dict_id,
288                    )?;
289                }
290            }
291            DataType::RunEndEncoded(_, values) => {
292                let data = column.to_data();
293                if data.child_data().len() != 2 {
294                    return Err(ArrowError::InvalidArgumentError(format!(
295                        "The run encoded array should have exactly two child arrays. Found {}",
296                        data.child_data().len()
297                    )));
298                }
299                // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries
300                // only for values array.
301                let values_array = make_array(data.child_data()[1].clone());
302                self.encode_dictionaries(
303                    values,
304                    &values_array,
305                    encoded_dictionaries,
306                    dictionary_tracker,
307                    write_options,
308                    dict_id,
309                )?;
310            }
311            DataType::List(field) => {
312                let list = as_list_array(column);
313                self.encode_dictionaries(
314                    field,
315                    list.values(),
316                    encoded_dictionaries,
317                    dictionary_tracker,
318                    write_options,
319                    dict_id,
320                )?;
321            }
322            DataType::LargeList(field) => {
323                let list = as_large_list_array(column);
324                self.encode_dictionaries(
325                    field,
326                    list.values(),
327                    encoded_dictionaries,
328                    dictionary_tracker,
329                    write_options,
330                    dict_id,
331                )?;
332            }
333            DataType::FixedSizeList(field, _) => {
334                let list = column
335                    .as_any()
336                    .downcast_ref::<FixedSizeListArray>()
337                    .expect("Unable to downcast to fixed size list array");
338                self.encode_dictionaries(
339                    field,
340                    list.values(),
341                    encoded_dictionaries,
342                    dictionary_tracker,
343                    write_options,
344                    dict_id,
345                )?;
346            }
347            DataType::Map(field, _) => {
348                let map_array = as_map_array(column);
349
350                let (keys, values) = match field.data_type() {
351                    DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
352                    _ => panic!("Incorrect field data type {:?}", field.data_type()),
353                };
354
355                // keys
356                self.encode_dictionaries(
357                    keys,
358                    map_array.keys(),
359                    encoded_dictionaries,
360                    dictionary_tracker,
361                    write_options,
362                    dict_id,
363                )?;
364
365                // values
366                self.encode_dictionaries(
367                    values,
368                    map_array.values(),
369                    encoded_dictionaries,
370                    dictionary_tracker,
371                    write_options,
372                    dict_id,
373                )?;
374            }
375            DataType::Union(fields, _) => {
376                let union = as_union_array(column);
377                for (type_id, field) in fields.iter() {
378                    let column = union.child(type_id);
379                    self.encode_dictionaries(
380                        field,
381                        column,
382                        encoded_dictionaries,
383                        dictionary_tracker,
384                        write_options,
385                        dict_id,
386                    )?;
387                }
388            }
389            _ => (),
390        }
391
392        Ok(())
393    }
394
395    fn encode_dictionaries<I: Iterator<Item = i64>>(
396        &self,
397        field: &Field,
398        column: &ArrayRef,
399        encoded_dictionaries: &mut Vec<EncodedData>,
400        dictionary_tracker: &mut DictionaryTracker,
401        write_options: &IpcWriteOptions,
402        dict_id_seq: &mut I,
403    ) -> Result<(), ArrowError> {
404        match column.data_type() {
405            DataType::Dictionary(_key_type, _value_type) => {
406                let dict_data = column.to_data();
407                let dict_values = &dict_data.child_data()[0];
408
409                let values = make_array(dict_data.child_data()[0].clone());
410
411                self._encode_dictionaries(
412                    &values,
413                    encoded_dictionaries,
414                    dictionary_tracker,
415                    write_options,
416                    dict_id_seq,
417                )?;
418
419                // It's importnat to only take the dict_id at this point, because the dict ID
420                // sequence is assigned depth-first, so we need to first encode children and have
421                // them take their assigned dict IDs before we take the dict ID for this field.
422                let dict_id = dict_id_seq
423                    .next()
424                    .or_else(|| field.dict_id())
425                    .ok_or_else(|| {
426                        ArrowError::IpcError(format!("no dict id for field {}", field.name()))
427                    })?;
428
429                let emit = dictionary_tracker.insert(dict_id, column)?;
430
431                if emit {
432                    encoded_dictionaries.push(self.dictionary_batch_to_bytes(
433                        dict_id,
434                        dict_values,
435                        write_options,
436                    )?);
437                }
438            }
439            _ => self._encode_dictionaries(
440                column,
441                encoded_dictionaries,
442                dictionary_tracker,
443                write_options,
444                dict_id_seq,
445            )?,
446        }
447
448        Ok(())
449    }
450
451    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
452    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
453    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
454    pub fn encoded_batch(
455        &self,
456        batch: &RecordBatch,
457        dictionary_tracker: &mut DictionaryTracker,
458        write_options: &IpcWriteOptions,
459    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
460        let schema = batch.schema();
461        let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
462
463        let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
464
465        for (i, field) in schema.fields().iter().enumerate() {
466            let column = batch.column(i);
467            self.encode_dictionaries(
468                field,
469                column,
470                &mut encoded_dictionaries,
471                dictionary_tracker,
472                write_options,
473                &mut dict_id,
474            )?;
475        }
476
477        let encoded_message = self.record_batch_to_bytes(batch, write_options)?;
478        Ok((encoded_dictionaries, encoded_message))
479    }
480
481    /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the
482    /// other for the batch's data
483    fn record_batch_to_bytes(
484        &self,
485        batch: &RecordBatch,
486        write_options: &IpcWriteOptions,
487    ) -> Result<EncodedData, ArrowError> {
488        let mut fbb = FlatBufferBuilder::new();
489
490        let mut nodes: Vec<crate::FieldNode> = vec![];
491        let mut buffers: Vec<crate::Buffer> = vec![];
492        let mut arrow_data: Vec<u8> = vec![];
493        let mut offset = 0;
494
495        // get the type of compression
496        let batch_compression_type = write_options.batch_compression_type;
497
498        let compression = batch_compression_type.map(|batch_compression_type| {
499            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
500            c.add_method(crate::BodyCompressionMethod::BUFFER);
501            c.add_codec(batch_compression_type);
502            c.finish()
503        });
504
505        let compression_codec: Option<CompressionCodec> =
506            batch_compression_type.map(TryInto::try_into).transpose()?;
507
508        let mut variadic_buffer_counts = vec![];
509
510        for array in batch.columns() {
511            let array_data = array.to_data();
512            offset = write_array_data(
513                &array_data,
514                &mut buffers,
515                &mut arrow_data,
516                &mut nodes,
517                offset,
518                array.len(),
519                array.null_count(),
520                compression_codec,
521                write_options,
522            )?;
523
524            append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
525        }
526        // pad the tail of body data
527        let len = arrow_data.len();
528        let pad_len = pad_to_alignment(write_options.alignment, len);
529        arrow_data.extend_from_slice(&PADDING[..pad_len]);
530
531        // write data
532        let buffers = fbb.create_vector(&buffers);
533        let nodes = fbb.create_vector(&nodes);
534        let variadic_buffer = if variadic_buffer_counts.is_empty() {
535            None
536        } else {
537            Some(fbb.create_vector(&variadic_buffer_counts))
538        };
539
540        let root = {
541            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
542            batch_builder.add_length(batch.num_rows() as i64);
543            batch_builder.add_nodes(nodes);
544            batch_builder.add_buffers(buffers);
545            if let Some(c) = compression {
546                batch_builder.add_compression(c);
547            }
548
549            if let Some(v) = variadic_buffer {
550                batch_builder.add_variadicBufferCounts(v);
551            }
552            let b = batch_builder.finish();
553            b.as_union_value()
554        };
555        // create an crate::Message
556        let mut message = crate::MessageBuilder::new(&mut fbb);
557        message.add_version(write_options.metadata_version);
558        message.add_header_type(crate::MessageHeader::RecordBatch);
559        message.add_bodyLength(arrow_data.len() as i64);
560        message.add_header(root);
561        let root = message.finish();
562        fbb.finish(root, None);
563        let finished_data = fbb.finished_data();
564
565        Ok(EncodedData {
566            ipc_message: finished_data.to_vec(),
567            arrow_data,
568        })
569    }
570
571    /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
572    /// other for the data
573    fn dictionary_batch_to_bytes(
574        &self,
575        dict_id: i64,
576        array_data: &ArrayData,
577        write_options: &IpcWriteOptions,
578    ) -> Result<EncodedData, ArrowError> {
579        let mut fbb = FlatBufferBuilder::new();
580
581        let mut nodes: Vec<crate::FieldNode> = vec![];
582        let mut buffers: Vec<crate::Buffer> = vec![];
583        let mut arrow_data: Vec<u8> = vec![];
584
585        // get the type of compression
586        let batch_compression_type = write_options.batch_compression_type;
587
588        let compression = batch_compression_type.map(|batch_compression_type| {
589            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
590            c.add_method(crate::BodyCompressionMethod::BUFFER);
591            c.add_codec(batch_compression_type);
592            c.finish()
593        });
594
595        let compression_codec: Option<CompressionCodec> = batch_compression_type
596            .map(|batch_compression_type| batch_compression_type.try_into())
597            .transpose()?;
598
599        write_array_data(
600            array_data,
601            &mut buffers,
602            &mut arrow_data,
603            &mut nodes,
604            0,
605            array_data.len(),
606            array_data.null_count(),
607            compression_codec,
608            write_options,
609        )?;
610
611        let mut variadic_buffer_counts = vec![];
612        append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
613
614        // pad the tail of body data
615        let len = arrow_data.len();
616        let pad_len = pad_to_alignment(write_options.alignment, len);
617        arrow_data.extend_from_slice(&PADDING[..pad_len]);
618
619        // write data
620        let buffers = fbb.create_vector(&buffers);
621        let nodes = fbb.create_vector(&nodes);
622        let variadic_buffer = if variadic_buffer_counts.is_empty() {
623            None
624        } else {
625            Some(fbb.create_vector(&variadic_buffer_counts))
626        };
627
628        let root = {
629            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
630            batch_builder.add_length(array_data.len() as i64);
631            batch_builder.add_nodes(nodes);
632            batch_builder.add_buffers(buffers);
633            if let Some(c) = compression {
634                batch_builder.add_compression(c);
635            }
636            if let Some(v) = variadic_buffer {
637                batch_builder.add_variadicBufferCounts(v);
638            }
639            batch_builder.finish()
640        };
641
642        let root = {
643            let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
644            batch_builder.add_id(dict_id);
645            batch_builder.add_data(root);
646            batch_builder.finish().as_union_value()
647        };
648
649        let root = {
650            let mut message_builder = crate::MessageBuilder::new(&mut fbb);
651            message_builder.add_version(write_options.metadata_version);
652            message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
653            message_builder.add_bodyLength(arrow_data.len() as i64);
654            message_builder.add_header(root);
655            message_builder.finish()
656        };
657
658        fbb.finish(root, None);
659        let finished_data = fbb.finished_data();
660
661        Ok(EncodedData {
662            ipc_message: finished_data.to_vec(),
663            arrow_data,
664        })
665    }
666}
667
668fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
669    match array.data_type() {
670        DataType::BinaryView | DataType::Utf8View => {
671            // The spec documents the counts only includes the variadic buffers, not the view/null buffers.
672            // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
673            counts.push(array.buffers().len() as i64 - 1);
674        }
675        DataType::Dictionary(_, _) => {
676            // Do nothing
677            // Dictionary types are handled in `encode_dictionaries`.
678        }
679        _ => {
680            for child in array.child_data() {
681                append_variadic_buffer_counts(counts, child)
682            }
683        }
684    }
685}
686
687pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
688    match arr.data_type() {
689        DataType::RunEndEncoded(k, _) => match k.data_type() {
690            DataType::Int16 => {
691                Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
692            }
693            DataType::Int32 => {
694                Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
695            }
696            DataType::Int64 => {
697                Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
698            }
699            d => unreachable!("Unexpected data type {d}"),
700        },
701        d => Err(ArrowError::InvalidArgumentError(format!(
702            "The given array is not a run array. Data type of given array: {d}"
703        ))),
704    }
705}
706
707// Returns a `RunArray` with zero offset and length matching the last value
708// in run_ends array.
709fn into_zero_offset_run_array<R: RunEndIndexType>(
710    run_array: RunArray<R>,
711) -> Result<RunArray<R>, ArrowError> {
712    let run_ends = run_array.run_ends();
713    if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
714        return Ok(run_array);
715    }
716
717    // The physical index of original run_ends array from which the `ArrayData`is sliced.
718    let start_physical_index = run_ends.get_start_physical_index();
719
720    // The physical index of original run_ends array until which the `ArrayData`is sliced.
721    let end_physical_index = run_ends.get_end_physical_index();
722
723    let physical_length = end_physical_index - start_physical_index + 1;
724
725    // build new run_ends array by subtracting offset from run ends.
726    let offset = R::Native::usize_as(run_ends.offset());
727    let mut builder = BufferBuilder::<R::Native>::new(physical_length);
728    for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
729        builder.append(run_end_value.sub_wrapping(offset));
730    }
731    builder.append(R::Native::from_usize(run_array.len()).unwrap());
732    let new_run_ends = unsafe {
733        // Safety:
734        // The function builds a valid run_ends array and hence need not be validated.
735        ArrayDataBuilder::new(R::DATA_TYPE)
736            .len(physical_length)
737            .add_buffer(builder.finish())
738            .build_unchecked()
739    };
740
741    // build new values by slicing physical indices.
742    let new_values = run_array
743        .values()
744        .slice(start_physical_index, physical_length)
745        .into_data();
746
747    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
748        .len(run_array.len())
749        .add_child_data(new_run_ends)
750        .add_child_data(new_values);
751    let array_data = unsafe {
752        // Safety:
753        //  This function builds a valid run array and hence can skip validation.
754        builder.build_unchecked()
755    };
756    Ok(array_data.into())
757}
758
759/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary
760/// multiple times.
761///
762/// Can optionally error if an update to an existing dictionary is attempted, which
763/// isn't allowed in the `FileWriter`.
764#[derive(Debug)]
765pub struct DictionaryTracker {
766    written: HashMap<i64, ArrayData>,
767    dict_ids: Vec<i64>,
768    error_on_replacement: bool,
769    preserve_dict_id: bool,
770}
771
772impl DictionaryTracker {
773    /// Create a new [`DictionaryTracker`].
774    ///
775    /// If `error_on_replacement`
776    /// is true, an error will be generated if an update to an
777    /// existing dictionary is attempted.
778    ///
779    /// If `preserve_dict_id` is true, the dictionary ID defined in the schema
780    /// is used, otherwise a unique dictionary ID will be assigned by incrementing
781    /// the last seen dictionary ID (or using `0` if no other dictionary IDs have been
782    /// seen)
783    pub fn new(error_on_replacement: bool) -> Self {
784        Self {
785            written: HashMap::new(),
786            dict_ids: Vec::new(),
787            error_on_replacement,
788            preserve_dict_id: true,
789        }
790    }
791
792    /// Create a new [`DictionaryTracker`].
793    ///
794    /// If `error_on_replacement`
795    /// is true, an error will be generated if an update to an
796    /// existing dictionary is attempted.
797    pub fn new_with_preserve_dict_id(error_on_replacement: bool, preserve_dict_id: bool) -> Self {
798        Self {
799            written: HashMap::new(),
800            dict_ids: Vec::new(),
801            error_on_replacement,
802            preserve_dict_id,
803        }
804    }
805
806    /// Set the dictionary ID for `field`.
807    ///
808    /// If `preserve_dict_id` is true, this will return the `dict_id` in `field` (or panic if `field` does
809    /// not have a `dict_id` defined).
810    ///
811    /// If `preserve_dict_id` is false, this will return the value of the last `dict_id` assigned incremented by 1
812    /// or 0 in the case where no dictionary IDs have yet been assigned
813    pub fn set_dict_id(&mut self, field: &Field) -> i64 {
814        let next = if self.preserve_dict_id {
815            field.dict_id().expect("no dict_id in field")
816        } else {
817            self.dict_ids
818                .last()
819                .copied()
820                .map(|i| i + 1)
821                .unwrap_or_default()
822        };
823
824        self.dict_ids.push(next);
825        next
826    }
827
828    /// Return the sequence of dictionary IDs in the order they should be observed while
829    /// traversing the schema
830    pub fn dict_id(&mut self) -> &[i64] {
831        &self.dict_ids
832    }
833
834    /// Keep track of the dictionary with the given ID and values. Behavior:
835    ///
836    /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
837    ///   that the dictionary was not actually inserted (because it's already been seen).
838    /// * If this ID has been written already but with different data, and this tracker is
839    ///   configured to return an error, return an error.
840    /// * If the tracker has not been configured to error on replacement or this dictionary
841    ///   has never been seen before, return `Ok(true)` to indicate that the dictionary was just
842    ///   inserted.
843    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
844        let dict_data = column.to_data();
845        let dict_values = &dict_data.child_data()[0];
846
847        // If a dictionary with this id was already emitted, check if it was the same.
848        if let Some(last) = self.written.get(&dict_id) {
849            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
850                // Same dictionary values => no need to emit it again
851                return Ok(false);
852            }
853            if self.error_on_replacement {
854                // If error on replacement perform a logical comparison
855                if last.child_data()[0] == *dict_values {
856                    // Same dictionary values => no need to emit it again
857                    return Ok(false);
858                }
859                return Err(ArrowError::InvalidArgumentError(
860                    "Dictionary replacement detected when writing IPC file format. \
861                     Arrow IPC files only support a single dictionary for a given field \
862                     across all batches."
863                        .to_string(),
864                ));
865            }
866        }
867
868        self.written.insert(dict_id, dict_data);
869        Ok(true)
870    }
871}
872
873/// Writer for an IPC file
874pub struct FileWriter<W> {
875    /// The object to write to
876    writer: W,
877    /// IPC write options
878    write_options: IpcWriteOptions,
879    /// A reference to the schema, used in validating record batches
880    schema: SchemaRef,
881    /// The number of bytes between each block of bytes, as an offset for random access
882    block_offsets: usize,
883    /// Dictionary blocks that will be written as part of the IPC footer
884    dictionary_blocks: Vec<crate::Block>,
885    /// Record blocks that will be written as part of the IPC footer
886    record_blocks: Vec<crate::Block>,
887    /// Whether the writer footer has been written, and the writer is finished
888    finished: bool,
889    /// Keeps track of dictionaries that have been written
890    dictionary_tracker: DictionaryTracker,
891    /// User level customized metadata
892    custom_metadata: HashMap<String, String>,
893
894    data_gen: IpcDataGenerator,
895}
896
897impl<W: Write> FileWriter<BufWriter<W>> {
898    /// Try to create a new file writer with the writer wrapped in a BufWriter.
899    ///
900    /// See [`FileWriter::try_new`] for an unbuffered version.
901    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
902        Self::try_new(BufWriter::new(writer), schema)
903    }
904}
905
906impl<W: Write> FileWriter<W> {
907    /// Try to create a new writer, with the schema written as part of the header
908    ///
909    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
910    ///
911    /// # Errors
912    ///
913    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
914    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
915        let write_options = IpcWriteOptions::default();
916        Self::try_new_with_options(writer, schema, write_options)
917    }
918
919    /// Try to create a new writer with IpcWriteOptions
920    ///
921    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
922    ///
923    /// # Errors
924    ///
925    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
926    pub fn try_new_with_options(
927        mut writer: W,
928        schema: &Schema,
929        write_options: IpcWriteOptions,
930    ) -> Result<Self, ArrowError> {
931        let data_gen = IpcDataGenerator::default();
932        // write magic to header aligned on alignment boundary
933        let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
934        let header_size = super::ARROW_MAGIC.len() + pad_len;
935        writer.write_all(&super::ARROW_MAGIC)?;
936        writer.write_all(&PADDING[..pad_len])?;
937        // write the schema, set the written bytes to the schema + header
938        let preserve_dict_id = write_options.preserve_dict_id;
939        let mut dictionary_tracker =
940            DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
941        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
942            schema,
943            &mut dictionary_tracker,
944            &write_options,
945        );
946        let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
947        Ok(Self {
948            writer,
949            write_options,
950            schema: Arc::new(schema.clone()),
951            block_offsets: meta + data + header_size,
952            dictionary_blocks: vec![],
953            record_blocks: vec![],
954            finished: false,
955            dictionary_tracker,
956            custom_metadata: HashMap::new(),
957            data_gen,
958        })
959    }
960
961    /// Adds a key-value pair to the [FileWriter]'s custom metadata
962    pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
963        self.custom_metadata.insert(key.into(), value.into());
964    }
965
966    /// Write a record batch to the file
967    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
968        if self.finished {
969            return Err(ArrowError::IpcError(
970                "Cannot write record batch to file writer as it is closed".to_string(),
971            ));
972        }
973
974        let (encoded_dictionaries, encoded_message) = self.data_gen.encoded_batch(
975            batch,
976            &mut self.dictionary_tracker,
977            &self.write_options,
978        )?;
979
980        for encoded_dictionary in encoded_dictionaries {
981            let (meta, data) =
982                write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
983
984            let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
985            self.dictionary_blocks.push(block);
986            self.block_offsets += meta + data;
987        }
988
989        let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
990        // add a record block for the footer
991        let block = crate::Block::new(
992            self.block_offsets as i64,
993            meta as i32, // TODO: is this still applicable?
994            data as i64,
995        );
996        self.record_blocks.push(block);
997        self.block_offsets += meta + data;
998        Ok(())
999    }
1000
1001    /// Write footer and closing tag, then mark the writer as done
1002    pub fn finish(&mut self) -> Result<(), ArrowError> {
1003        if self.finished {
1004            return Err(ArrowError::IpcError(
1005                "Cannot write footer to file writer as it is closed".to_string(),
1006            ));
1007        }
1008
1009        // write EOS
1010        write_continuation(&mut self.writer, &self.write_options, 0)?;
1011
1012        let mut fbb = FlatBufferBuilder::new();
1013        let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1014        let record_batches = fbb.create_vector(&self.record_blocks);
1015        let preserve_dict_id = self.write_options.preserve_dict_id;
1016        let mut dictionary_tracker =
1017            DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
1018        let schema = IpcSchemaEncoder::new()
1019            .with_dictionary_tracker(&mut dictionary_tracker)
1020            .schema_to_fb_offset(&mut fbb, &self.schema);
1021        let fb_custom_metadata = (!self.custom_metadata.is_empty())
1022            .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1023
1024        let root = {
1025            let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1026            footer_builder.add_version(self.write_options.metadata_version);
1027            footer_builder.add_schema(schema);
1028            footer_builder.add_dictionaries(dictionaries);
1029            footer_builder.add_recordBatches(record_batches);
1030            if let Some(fb_custom_metadata) = fb_custom_metadata {
1031                footer_builder.add_custom_metadata(fb_custom_metadata);
1032            }
1033            footer_builder.finish()
1034        };
1035        fbb.finish(root, None);
1036        let footer_data = fbb.finished_data();
1037        self.writer.write_all(footer_data)?;
1038        self.writer
1039            .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1040        self.writer.write_all(&super::ARROW_MAGIC)?;
1041        self.writer.flush()?;
1042        self.finished = true;
1043
1044        Ok(())
1045    }
1046
1047    /// Returns the arrow [`SchemaRef`] for this arrow file.
1048    pub fn schema(&self) -> &SchemaRef {
1049        &self.schema
1050    }
1051
1052    /// Gets a reference to the underlying writer.
1053    pub fn get_ref(&self) -> &W {
1054        &self.writer
1055    }
1056
1057    /// Gets a mutable reference to the underlying writer.
1058    ///
1059    /// It is inadvisable to directly write to the underlying writer.
1060    pub fn get_mut(&mut self) -> &mut W {
1061        &mut self.writer
1062    }
1063
1064    /// Flush the underlying writer.
1065    ///
1066    /// Both the BufWriter and the underlying writer are flushed.
1067    pub fn flush(&mut self) -> Result<(), ArrowError> {
1068        self.writer.flush()?;
1069        Ok(())
1070    }
1071
1072    /// Unwraps the the underlying writer.
1073    ///
1074    /// The writer is flushed and the FileWriter is finished before returning.
1075    ///
1076    /// # Errors
1077    ///
1078    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1079    /// or while flushing the writer.
1080    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1081        if !self.finished {
1082            // `finish` flushes the writer.
1083            self.finish()?;
1084        }
1085        Ok(self.writer)
1086    }
1087}
1088
1089impl<W: Write> RecordBatchWriter for FileWriter<W> {
1090    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1091        self.write(batch)
1092    }
1093
1094    fn close(mut self) -> Result<(), ArrowError> {
1095        self.finish()
1096    }
1097}
1098
1099/// Writer for an IPC stream
1100pub struct StreamWriter<W> {
1101    /// The object to write to
1102    writer: W,
1103    /// IPC write options
1104    write_options: IpcWriteOptions,
1105    /// Whether the writer footer has been written, and the writer is finished
1106    finished: bool,
1107    /// Keeps track of dictionaries that have been written
1108    dictionary_tracker: DictionaryTracker,
1109
1110    data_gen: IpcDataGenerator,
1111}
1112
1113impl<W: Write> StreamWriter<BufWriter<W>> {
1114    /// Try to create a new stream writer with the writer wrapped in a BufWriter.
1115    ///
1116    /// See [`StreamWriter::try_new`] for an unbuffered version.
1117    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1118        Self::try_new(BufWriter::new(writer), schema)
1119    }
1120}
1121
1122impl<W: Write> StreamWriter<W> {
1123    /// Try to create a new writer, with the schema written as part of the header.
1124    ///
1125    /// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`].
1126    ///
1127    /// # Errors
1128    ///
1129    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1130    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1131        let write_options = IpcWriteOptions::default();
1132        Self::try_new_with_options(writer, schema, write_options)
1133    }
1134
1135    /// Try to create a new writer with [`IpcWriteOptions`].
1136    ///
1137    /// # Errors
1138    ///
1139    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1140    pub fn try_new_with_options(
1141        mut writer: W,
1142        schema: &Schema,
1143        write_options: IpcWriteOptions,
1144    ) -> Result<Self, ArrowError> {
1145        let data_gen = IpcDataGenerator::default();
1146        let preserve_dict_id = write_options.preserve_dict_id;
1147        let mut dictionary_tracker =
1148            DictionaryTracker::new_with_preserve_dict_id(false, preserve_dict_id);
1149
1150        // write the schema, set the written bytes to the schema
1151        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1152            schema,
1153            &mut dictionary_tracker,
1154            &write_options,
1155        );
1156        write_message(&mut writer, encoded_message, &write_options)?;
1157        Ok(Self {
1158            writer,
1159            write_options,
1160            finished: false,
1161            dictionary_tracker,
1162            data_gen,
1163        })
1164    }
1165
1166    /// Write a record batch to the stream
1167    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1168        if self.finished {
1169            return Err(ArrowError::IpcError(
1170                "Cannot write record batch to stream writer as it is closed".to_string(),
1171            ));
1172        }
1173
1174        let (encoded_dictionaries, encoded_message) = self
1175            .data_gen
1176            .encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)
1177            .expect("StreamWriter is configured to not error on dictionary replacement");
1178
1179        for encoded_dictionary in encoded_dictionaries {
1180            write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1181        }
1182
1183        write_message(&mut self.writer, encoded_message, &self.write_options)?;
1184        Ok(())
1185    }
1186
1187    /// Write continuation bytes, and mark the stream as done
1188    pub fn finish(&mut self) -> Result<(), ArrowError> {
1189        if self.finished {
1190            return Err(ArrowError::IpcError(
1191                "Cannot write footer to stream writer as it is closed".to_string(),
1192            ));
1193        }
1194
1195        write_continuation(&mut self.writer, &self.write_options, 0)?;
1196
1197        self.finished = true;
1198
1199        Ok(())
1200    }
1201
1202    /// Gets a reference to the underlying writer.
1203    pub fn get_ref(&self) -> &W {
1204        &self.writer
1205    }
1206
1207    /// Gets a mutable reference to the underlying writer.
1208    ///
1209    /// It is inadvisable to directly write to the underlying writer.
1210    pub fn get_mut(&mut self) -> &mut W {
1211        &mut self.writer
1212    }
1213
1214    /// Flush the underlying writer.
1215    ///
1216    /// Both the BufWriter and the underlying writer are flushed.
1217    pub fn flush(&mut self) -> Result<(), ArrowError> {
1218        self.writer.flush()?;
1219        Ok(())
1220    }
1221
1222    /// Unwraps the the underlying writer.
1223    ///
1224    /// The writer is flushed and the StreamWriter is finished before returning.
1225    ///
1226    /// # Errors
1227    ///
1228    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1229    /// or while flushing the writer.
1230    ///
1231    /// # Example
1232    ///
1233    /// ```
1234    /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1235    /// # use arrow_ipc::MetadataVersion;
1236    /// # use arrow_schema::{ArrowError, Schema};
1237    /// # fn main() -> Result<(), ArrowError> {
1238    /// // The result we expect from an empty schema
1239    /// let expected = vec![
1240    ///     255, 255, 255, 255,  48,   0,   0,   0,
1241    ///      16,   0,   0,   0,   0,   0,  10,   0,
1242    ///      12,   0,  10,   0,   9,   0,   4,   0,
1243    ///      10,   0,   0,   0,  16,   0,   0,   0,
1244    ///       0,   1,   4,   0,   8,   0,   8,   0,
1245    ///       0,   0,   4,   0,   8,   0,   0,   0,
1246    ///       4,   0,   0,   0,   0,   0,   0,   0,
1247    ///     255, 255, 255, 255,   0,   0,   0,   0
1248    /// ];
1249    ///
1250    /// let schema = Schema::empty();
1251    /// let buffer: Vec<u8> = Vec::new();
1252    /// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
1253    /// let stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?;
1254    ///
1255    /// assert_eq!(stream_writer.into_inner()?, expected);
1256    /// # Ok(())
1257    /// # }
1258    /// ```
1259    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1260        if !self.finished {
1261            // `finish` flushes.
1262            self.finish()?;
1263        }
1264        Ok(self.writer)
1265    }
1266}
1267
1268impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1269    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1270        self.write(batch)
1271    }
1272
1273    fn close(mut self) -> Result<(), ArrowError> {
1274        self.finish()
1275    }
1276}
1277
1278/// Stores the encoded data, which is an crate::Message, and optional Arrow data
1279pub struct EncodedData {
1280    /// An encoded crate::Message
1281    pub ipc_message: Vec<u8>,
1282    /// Arrow buffers to be written, should be an empty vec for schema messages
1283    pub arrow_data: Vec<u8>,
1284}
1285/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
1286pub fn write_message<W: Write>(
1287    mut writer: W,
1288    encoded: EncodedData,
1289    write_options: &IpcWriteOptions,
1290) -> Result<(usize, usize), ArrowError> {
1291    let arrow_data_len = encoded.arrow_data.len();
1292    if arrow_data_len % usize::from(write_options.alignment) != 0 {
1293        return Err(ArrowError::MemoryError(
1294            "Arrow data not aligned".to_string(),
1295        ));
1296    }
1297
1298    let a = usize::from(write_options.alignment - 1);
1299    let buffer = encoded.ipc_message;
1300    let flatbuf_size = buffer.len();
1301    let prefix_size = if write_options.write_legacy_ipc_format {
1302        4
1303    } else {
1304        8
1305    };
1306    let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1307    let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1308
1309    write_continuation(
1310        &mut writer,
1311        write_options,
1312        (aligned_size - prefix_size) as i32,
1313    )?;
1314
1315    // write the flatbuf
1316    if flatbuf_size > 0 {
1317        writer.write_all(&buffer)?;
1318    }
1319    // write padding
1320    writer.write_all(&PADDING[..padding_bytes])?;
1321
1322    // write arrow data
1323    let body_len = if arrow_data_len > 0 {
1324        write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1325    } else {
1326        0
1327    };
1328
1329    Ok((aligned_size, body_len))
1330}
1331
1332fn write_body_buffers<W: Write>(
1333    mut writer: W,
1334    data: &[u8],
1335    alignment: u8,
1336) -> Result<usize, ArrowError> {
1337    let len = data.len();
1338    let pad_len = pad_to_alignment(alignment, len);
1339    let total_len = len + pad_len;
1340
1341    // write body buffer
1342    writer.write_all(data)?;
1343    if pad_len > 0 {
1344        writer.write_all(&PADDING[..pad_len])?;
1345    }
1346
1347    writer.flush()?;
1348    Ok(total_len)
1349}
1350
1351/// Write a record batch to the writer, writing the message size before the message
1352/// if the record batch is being written to a stream
1353fn write_continuation<W: Write>(
1354    mut writer: W,
1355    write_options: &IpcWriteOptions,
1356    total_len: i32,
1357) -> Result<usize, ArrowError> {
1358    let mut written = 8;
1359
1360    // the version of the writer determines whether continuation markers should be added
1361    match write_options.metadata_version {
1362        crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1363            unreachable!("Options with the metadata version cannot be created")
1364        }
1365        crate::MetadataVersion::V4 => {
1366            if !write_options.write_legacy_ipc_format {
1367                // v0.15.0 format
1368                writer.write_all(&CONTINUATION_MARKER)?;
1369                written = 4;
1370            }
1371            writer.write_all(&total_len.to_le_bytes()[..])?;
1372        }
1373        crate::MetadataVersion::V5 => {
1374            // write continuation marker and message length
1375            writer.write_all(&CONTINUATION_MARKER)?;
1376            writer.write_all(&total_len.to_le_bytes()[..])?;
1377        }
1378        z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1379    };
1380
1381    writer.flush()?;
1382
1383    Ok(written)
1384}
1385
1386/// In V4, null types have no validity bitmap
1387/// In V5 and later, null and union types have no validity bitmap
1388/// Run end encoded type has no validity bitmap.
1389fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1390    if write_options.metadata_version < crate::MetadataVersion::V5 {
1391        !matches!(data_type, DataType::Null)
1392    } else {
1393        !matches!(
1394            data_type,
1395            DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1396        )
1397    }
1398}
1399
1400/// Whether to truncate the buffer
1401#[inline]
1402fn buffer_need_truncate(
1403    array_offset: usize,
1404    buffer: &Buffer,
1405    spec: &BufferSpec,
1406    min_length: usize,
1407) -> bool {
1408    spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1409}
1410
1411/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
1412#[inline]
1413fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1414    match spec {
1415        BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1416        _ => 0,
1417    }
1418}
1419
1420/// Common functionality for re-encoding offsets. Returns the new offsets as well as
1421/// original start offset and length for use in slicing child data.
1422fn reencode_offsets<O: OffsetSizeTrait>(
1423    offsets: &Buffer,
1424    data: &ArrayData,
1425) -> (Buffer, usize, usize) {
1426    let offsets_slice: &[O] = offsets.typed_data::<O>();
1427    let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1428
1429    let start_offset = offset_slice.first().unwrap();
1430    let end_offset = offset_slice.last().unwrap();
1431
1432    let offsets = match start_offset.as_usize() {
1433        0 => offsets.clone(),
1434        _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1435    };
1436
1437    let start_offset = start_offset.as_usize();
1438    let end_offset = end_offset.as_usize();
1439
1440    (offsets, start_offset, end_offset - start_offset)
1441}
1442
1443/// Returns the values and offsets [`Buffer`] for a ByteArray with offset type `O`
1444///
1445/// In particular, this handles re-encoding the offsets if they don't start at `0`,
1446/// slicing the values buffer as appropriate. This helps reduce the encoded
1447/// size of sliced arrays, as values that have been sliced away are not encoded
1448fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1449    if data.is_empty() {
1450        return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1451    }
1452
1453    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1454    let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1455    (offsets, values)
1456}
1457
1458/// Similar logic as [`get_byte_array_buffers()`] but slices the child array instead
1459/// of a values buffer.
1460fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1461    if data.is_empty() {
1462        return (
1463            MutableBuffer::new(0).into(),
1464            data.child_data()[0].slice(0, 0),
1465        );
1466    }
1467
1468    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1469    let child_data = data.child_data()[0].slice(original_start_offset, len);
1470    (offsets, child_data)
1471}
1472
1473/// Write array data to a vector of bytes
1474#[allow(clippy::too_many_arguments)]
1475fn write_array_data(
1476    array_data: &ArrayData,
1477    buffers: &mut Vec<crate::Buffer>,
1478    arrow_data: &mut Vec<u8>,
1479    nodes: &mut Vec<crate::FieldNode>,
1480    offset: i64,
1481    num_rows: usize,
1482    null_count: usize,
1483    compression_codec: Option<CompressionCodec>,
1484    write_options: &IpcWriteOptions,
1485) -> Result<i64, ArrowError> {
1486    let mut offset = offset;
1487    if !matches!(array_data.data_type(), DataType::Null) {
1488        nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1489    } else {
1490        // NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
1491        // where null_count is always 0.
1492        nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1493    }
1494    if has_validity_bitmap(array_data.data_type(), write_options) {
1495        // write null buffer if exists
1496        let null_buffer = match array_data.nulls() {
1497            None => {
1498                // create a buffer and fill it with valid bits
1499                let num_bytes = bit_util::ceil(num_rows, 8);
1500                let buffer = MutableBuffer::new(num_bytes);
1501                let buffer = buffer.with_bitset(num_bytes, true);
1502                buffer.into()
1503            }
1504            Some(buffer) => buffer.inner().sliced(),
1505        };
1506
1507        offset = write_buffer(
1508            null_buffer.as_slice(),
1509            buffers,
1510            arrow_data,
1511            offset,
1512            compression_codec,
1513            write_options.alignment,
1514        )?;
1515    }
1516
1517    let data_type = array_data.data_type();
1518    if matches!(data_type, DataType::Binary | DataType::Utf8) {
1519        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1520        for buffer in [offsets, values] {
1521            offset = write_buffer(
1522                buffer.as_slice(),
1523                buffers,
1524                arrow_data,
1525                offset,
1526                compression_codec,
1527                write_options.alignment,
1528            )?;
1529        }
1530    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1531        // Slicing the views buffer is safe and easy,
1532        // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
1533        //
1534        // Current implementation just serialize the raw arrays as given and not try to optimize anything.
1535        // If users wants to "compact" the arrays prior to sending them over IPC,
1536        // they should consider the gc API suggested in #5513
1537        for buffer in array_data.buffers() {
1538            offset = write_buffer(
1539                buffer.as_slice(),
1540                buffers,
1541                arrow_data,
1542                offset,
1543                compression_codec,
1544                write_options.alignment,
1545            )?;
1546        }
1547    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1548        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1549        for buffer in [offsets, values] {
1550            offset = write_buffer(
1551                buffer.as_slice(),
1552                buffers,
1553                arrow_data,
1554                offset,
1555                compression_codec,
1556                write_options.alignment,
1557            )?;
1558        }
1559    } else if DataType::is_numeric(data_type)
1560        || DataType::is_temporal(data_type)
1561        || matches!(
1562            array_data.data_type(),
1563            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1564        )
1565    {
1566        // Truncate values
1567        assert_eq!(array_data.buffers().len(), 1);
1568
1569        let buffer = &array_data.buffers()[0];
1570        let layout = layout(data_type);
1571        let spec = &layout.buffers[0];
1572
1573        let byte_width = get_buffer_element_width(spec);
1574        let min_length = array_data.len() * byte_width;
1575        let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1576            let byte_offset = array_data.offset() * byte_width;
1577            let buffer_length = min(min_length, buffer.len() - byte_offset);
1578            &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1579        } else {
1580            buffer.as_slice()
1581        };
1582        offset = write_buffer(
1583            buffer_slice,
1584            buffers,
1585            arrow_data,
1586            offset,
1587            compression_codec,
1588            write_options.alignment,
1589        )?;
1590    } else if matches!(data_type, DataType::Boolean) {
1591        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
1592        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
1593        assert_eq!(array_data.buffers().len(), 1);
1594
1595        let buffer = &array_data.buffers()[0];
1596        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1597        offset = write_buffer(
1598            &buffer,
1599            buffers,
1600            arrow_data,
1601            offset,
1602            compression_codec,
1603            write_options.alignment,
1604        )?;
1605    } else if matches!(
1606        data_type,
1607        DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1608    ) {
1609        assert_eq!(array_data.buffers().len(), 1);
1610        assert_eq!(array_data.child_data().len(), 1);
1611
1612        // Truncate offsets and the child data to avoid writing unnecessary data
1613        let (offsets, sliced_child_data) = match data_type {
1614            DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1615            DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1616            DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1617            _ => unreachable!(),
1618        };
1619        offset = write_buffer(
1620            offsets.as_slice(),
1621            buffers,
1622            arrow_data,
1623            offset,
1624            compression_codec,
1625            write_options.alignment,
1626        )?;
1627        offset = write_array_data(
1628            &sliced_child_data,
1629            buffers,
1630            arrow_data,
1631            nodes,
1632            offset,
1633            sliced_child_data.len(),
1634            sliced_child_data.null_count(),
1635            compression_codec,
1636            write_options,
1637        )?;
1638        return Ok(offset);
1639    } else {
1640        for buffer in array_data.buffers() {
1641            offset = write_buffer(
1642                buffer,
1643                buffers,
1644                arrow_data,
1645                offset,
1646                compression_codec,
1647                write_options.alignment,
1648            )?;
1649        }
1650    }
1651
1652    match array_data.data_type() {
1653        DataType::Dictionary(_, _) => {}
1654        DataType::RunEndEncoded(_, _) => {
1655            // unslice the run encoded array.
1656            let arr = unslice_run_array(array_data.clone())?;
1657            // recursively write out nested structures
1658            for data_ref in arr.child_data() {
1659                // write the nested data (e.g list data)
1660                offset = write_array_data(
1661                    data_ref,
1662                    buffers,
1663                    arrow_data,
1664                    nodes,
1665                    offset,
1666                    data_ref.len(),
1667                    data_ref.null_count(),
1668                    compression_codec,
1669                    write_options,
1670                )?;
1671            }
1672        }
1673        _ => {
1674            // recursively write out nested structures
1675            for data_ref in array_data.child_data() {
1676                // write the nested data (e.g list data)
1677                offset = write_array_data(
1678                    data_ref,
1679                    buffers,
1680                    arrow_data,
1681                    nodes,
1682                    offset,
1683                    data_ref.len(),
1684                    data_ref.null_count(),
1685                    compression_codec,
1686                    write_options,
1687                )?;
1688            }
1689        }
1690    }
1691    Ok(offset)
1692}
1693
1694/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
1695/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
1696///
1697///
1698/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
1699/// Each constituent buffer is first compressed with the indicated
1700/// compressor, and then written with the uncompressed length in the first 8
1701/// bytes as a 64-bit little-endian signed integer followed by the compressed
1702/// buffer bytes (and then padding as required by the protocol). The
1703/// uncompressed length may be set to -1 to indicate that the data that
1704/// follows is not compressed, which can be useful for cases where
1705/// compression does not yield appreciable savings.
1706fn write_buffer(
1707    buffer: &[u8],                    // input
1708    buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
1709    arrow_data: &mut Vec<u8>,         // output stream
1710    offset: i64,                      // current output stream offset
1711    compression_codec: Option<CompressionCodec>,
1712    alignment: u8,
1713) -> Result<i64, ArrowError> {
1714    let len: i64 = match compression_codec {
1715        Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
1716        None => {
1717            arrow_data.extend_from_slice(buffer);
1718            buffer.len()
1719        }
1720    }
1721    .try_into()
1722    .map_err(|e| {
1723        ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
1724    })?;
1725
1726    // make new index entry
1727    buffers.push(crate::Buffer::new(offset, len));
1728    // padding and make offset aligned
1729    let pad_len = pad_to_alignment(alignment, len as usize);
1730    arrow_data.extend_from_slice(&PADDING[..pad_len]);
1731
1732    Ok(offset + len + (pad_len as i64))
1733}
1734
1735const PADDING: [u8; 64] = [0; 64];
1736
1737/// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary
1738#[inline]
1739fn pad_to_alignment(alignment: u8, len: usize) -> usize {
1740    let a = usize::from(alignment - 1);
1741    ((len + a) & !a) - len
1742}
1743
1744#[cfg(test)]
1745mod tests {
1746    use std::io::Cursor;
1747    use std::io::Seek;
1748
1749    use arrow_array::builder::GenericListBuilder;
1750    use arrow_array::builder::MapBuilder;
1751    use arrow_array::builder::UnionBuilder;
1752    use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
1753    use arrow_array::types::*;
1754    use arrow_buffer::ScalarBuffer;
1755
1756    use crate::convert::fb_to_schema;
1757    use crate::reader::*;
1758    use crate::root_as_footer;
1759    use crate::MetadataVersion;
1760
1761    use super::*;
1762
1763    fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
1764        let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
1765        writer.write(rb).unwrap();
1766        writer.finish().unwrap();
1767        writer.into_inner().unwrap()
1768    }
1769
1770    fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
1771        let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
1772        reader.next().unwrap().unwrap()
1773    }
1774
1775    fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
1776        // Use 8-byte alignment so that the various `truncate_*` tests can be compactly written,
1777        // without needing to construct a giant array to spill over the 64-byte default alignment
1778        // boundary.
1779        const IPC_ALIGNMENT: usize = 8;
1780
1781        let mut stream_writer = StreamWriter::try_new_with_options(
1782            vec![],
1783            record.schema_ref(),
1784            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
1785        )
1786        .unwrap();
1787        stream_writer.write(record).unwrap();
1788        stream_writer.finish().unwrap();
1789        stream_writer.into_inner().unwrap()
1790    }
1791
1792    fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
1793        let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
1794        stream_reader.next().unwrap().unwrap()
1795    }
1796
1797    #[test]
1798    #[cfg(feature = "lz4")]
1799    fn test_write_empty_record_batch_lz4_compression() {
1800        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1801        let values: Vec<Option<i32>> = vec![];
1802        let array = Int32Array::from(values);
1803        let record_batch =
1804            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1805
1806        let mut file = tempfile::tempfile().unwrap();
1807
1808        {
1809            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1810                .unwrap()
1811                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1812                .unwrap();
1813
1814            let mut writer =
1815                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1816            writer.write(&record_batch).unwrap();
1817            writer.finish().unwrap();
1818        }
1819        file.rewind().unwrap();
1820        {
1821            // read file
1822            let reader = FileReader::try_new(file, None).unwrap();
1823            for read_batch in reader {
1824                read_batch
1825                    .unwrap()
1826                    .columns()
1827                    .iter()
1828                    .zip(record_batch.columns())
1829                    .for_each(|(a, b)| {
1830                        assert_eq!(a.data_type(), b.data_type());
1831                        assert_eq!(a.len(), b.len());
1832                        assert_eq!(a.null_count(), b.null_count());
1833                    });
1834            }
1835        }
1836    }
1837
1838    #[test]
1839    #[cfg(feature = "lz4")]
1840    fn test_write_file_with_lz4_compression() {
1841        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1842        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1843        let array = Int32Array::from(values);
1844        let record_batch =
1845            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1846
1847        let mut file = tempfile::tempfile().unwrap();
1848        {
1849            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1850                .unwrap()
1851                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1852                .unwrap();
1853
1854            let mut writer =
1855                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1856            writer.write(&record_batch).unwrap();
1857            writer.finish().unwrap();
1858        }
1859        file.rewind().unwrap();
1860        {
1861            // read file
1862            let reader = FileReader::try_new(file, None).unwrap();
1863            for read_batch in reader {
1864                read_batch
1865                    .unwrap()
1866                    .columns()
1867                    .iter()
1868                    .zip(record_batch.columns())
1869                    .for_each(|(a, b)| {
1870                        assert_eq!(a.data_type(), b.data_type());
1871                        assert_eq!(a.len(), b.len());
1872                        assert_eq!(a.null_count(), b.null_count());
1873                    });
1874            }
1875        }
1876    }
1877
1878    #[test]
1879    #[cfg(feature = "zstd")]
1880    fn test_write_file_with_zstd_compression() {
1881        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1882        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1883        let array = Int32Array::from(values);
1884        let record_batch =
1885            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1886        let mut file = tempfile::tempfile().unwrap();
1887        {
1888            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1889                .unwrap()
1890                .try_with_compression(Some(crate::CompressionType::ZSTD))
1891                .unwrap();
1892
1893            let mut writer =
1894                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1895            writer.write(&record_batch).unwrap();
1896            writer.finish().unwrap();
1897        }
1898        file.rewind().unwrap();
1899        {
1900            // read file
1901            let reader = FileReader::try_new(file, None).unwrap();
1902            for read_batch in reader {
1903                read_batch
1904                    .unwrap()
1905                    .columns()
1906                    .iter()
1907                    .zip(record_batch.columns())
1908                    .for_each(|(a, b)| {
1909                        assert_eq!(a.data_type(), b.data_type());
1910                        assert_eq!(a.len(), b.len());
1911                        assert_eq!(a.null_count(), b.null_count());
1912                    });
1913            }
1914        }
1915    }
1916
1917    #[test]
1918    fn test_write_file() {
1919        let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
1920        let values: Vec<Option<u32>> = vec![
1921            Some(999),
1922            None,
1923            Some(235),
1924            Some(123),
1925            None,
1926            None,
1927            None,
1928            None,
1929            None,
1930        ];
1931        let array1 = UInt32Array::from(values);
1932        let batch =
1933            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
1934                .unwrap();
1935        let mut file = tempfile::tempfile().unwrap();
1936        {
1937            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
1938
1939            writer.write(&batch).unwrap();
1940            writer.finish().unwrap();
1941        }
1942        file.rewind().unwrap();
1943
1944        {
1945            let mut reader = FileReader::try_new(file, None).unwrap();
1946            while let Some(Ok(read_batch)) = reader.next() {
1947                read_batch
1948                    .columns()
1949                    .iter()
1950                    .zip(batch.columns())
1951                    .for_each(|(a, b)| {
1952                        assert_eq!(a.data_type(), b.data_type());
1953                        assert_eq!(a.len(), b.len());
1954                        assert_eq!(a.null_count(), b.null_count());
1955                    });
1956            }
1957        }
1958    }
1959
1960    fn write_null_file(options: IpcWriteOptions) {
1961        let schema = Schema::new(vec![
1962            Field::new("nulls", DataType::Null, true),
1963            Field::new("int32s", DataType::Int32, false),
1964            Field::new("nulls2", DataType::Null, true),
1965            Field::new("f64s", DataType::Float64, false),
1966        ]);
1967        let array1 = NullArray::new(32);
1968        let array2 = Int32Array::from(vec![1; 32]);
1969        let array3 = NullArray::new(32);
1970        let array4 = Float64Array::from(vec![f64::NAN; 32]);
1971        let batch = RecordBatch::try_new(
1972            Arc::new(schema.clone()),
1973            vec![
1974                Arc::new(array1) as ArrayRef,
1975                Arc::new(array2) as ArrayRef,
1976                Arc::new(array3) as ArrayRef,
1977                Arc::new(array4) as ArrayRef,
1978            ],
1979        )
1980        .unwrap();
1981        let mut file = tempfile::tempfile().unwrap();
1982        {
1983            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
1984
1985            writer.write(&batch).unwrap();
1986            writer.finish().unwrap();
1987        }
1988
1989        file.rewind().unwrap();
1990
1991        {
1992            let reader = FileReader::try_new(file, None).unwrap();
1993            reader.for_each(|maybe_batch| {
1994                maybe_batch
1995                    .unwrap()
1996                    .columns()
1997                    .iter()
1998                    .zip(batch.columns())
1999                    .for_each(|(a, b)| {
2000                        assert_eq!(a.data_type(), b.data_type());
2001                        assert_eq!(a.len(), b.len());
2002                        assert_eq!(a.null_count(), b.null_count());
2003                    });
2004            });
2005        }
2006    }
2007    #[test]
2008    fn test_write_null_file_v4() {
2009        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2010        write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2011        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2012        write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2013    }
2014
2015    #[test]
2016    fn test_write_null_file_v5() {
2017        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2018        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2019    }
2020
2021    #[test]
2022    fn track_union_nested_dict() {
2023        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2024
2025        let array = Arc::new(inner) as ArrayRef;
2026
2027        // Dict field with id 2
2028        let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 2, false);
2029        let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2030
2031        let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2032        let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2033
2034        let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2035
2036        let schema = Arc::new(Schema::new(vec![Field::new(
2037            "union",
2038            union.data_type().clone(),
2039            false,
2040        )]));
2041
2042        let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2043
2044        let gen = IpcDataGenerator {};
2045        let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2046        gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2047            .unwrap();
2048
2049        // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema
2050        // so we expect the dict will be keyed to 0
2051        assert!(dict_tracker.written.contains_key(&2));
2052    }
2053
2054    #[test]
2055    fn track_struct_nested_dict() {
2056        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2057
2058        let array = Arc::new(inner) as ArrayRef;
2059
2060        // Dict field with id 2
2061        let dctfield = Arc::new(Field::new_dict(
2062            "dict",
2063            array.data_type().clone(),
2064            false,
2065            2,
2066            false,
2067        ));
2068
2069        let s = StructArray::from(vec![(dctfield, array)]);
2070        let struct_array = Arc::new(s) as ArrayRef;
2071
2072        let schema = Arc::new(Schema::new(vec![Field::new(
2073            "struct",
2074            struct_array.data_type().clone(),
2075            false,
2076        )]));
2077
2078        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2079
2080        let gen = IpcDataGenerator {};
2081        let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2082        gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2083            .unwrap();
2084
2085        assert!(dict_tracker.written.contains_key(&2));
2086    }
2087
2088    fn write_union_file(options: IpcWriteOptions) {
2089        let schema = Schema::new(vec![Field::new_union(
2090            "union",
2091            vec![0, 1],
2092            vec![
2093                Field::new("a", DataType::Int32, false),
2094                Field::new("c", DataType::Float64, false),
2095            ],
2096            UnionMode::Sparse,
2097        )]);
2098        let mut builder = UnionBuilder::with_capacity_sparse(5);
2099        builder.append::<Int32Type>("a", 1).unwrap();
2100        builder.append_null::<Int32Type>("a").unwrap();
2101        builder.append::<Float64Type>("c", 3.0).unwrap();
2102        builder.append_null::<Float64Type>("c").unwrap();
2103        builder.append::<Int32Type>("a", 4).unwrap();
2104        let union = builder.build().unwrap();
2105
2106        let batch =
2107            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2108                .unwrap();
2109
2110        let mut file = tempfile::tempfile().unwrap();
2111        {
2112            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2113
2114            writer.write(&batch).unwrap();
2115            writer.finish().unwrap();
2116        }
2117        file.rewind().unwrap();
2118
2119        {
2120            let reader = FileReader::try_new(file, None).unwrap();
2121            reader.for_each(|maybe_batch| {
2122                maybe_batch
2123                    .unwrap()
2124                    .columns()
2125                    .iter()
2126                    .zip(batch.columns())
2127                    .for_each(|(a, b)| {
2128                        assert_eq!(a.data_type(), b.data_type());
2129                        assert_eq!(a.len(), b.len());
2130                        assert_eq!(a.null_count(), b.null_count());
2131                    });
2132            });
2133        }
2134    }
2135
2136    #[test]
2137    fn test_write_union_file_v4_v5() {
2138        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2139        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2140    }
2141
2142    #[test]
2143    fn test_write_view_types() {
2144        const LONG_TEST_STRING: &str =
2145            "This is a long string to make sure binary view array handles it";
2146        let schema = Schema::new(vec![
2147            Field::new("field1", DataType::BinaryView, true),
2148            Field::new("field2", DataType::Utf8View, true),
2149        ]);
2150        let values: Vec<Option<&[u8]>> = vec![
2151            Some(b"foo"),
2152            Some(b"bar"),
2153            Some(LONG_TEST_STRING.as_bytes()),
2154        ];
2155        let binary_array = BinaryViewArray::from_iter(values);
2156        let utf8_array =
2157            StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2158        let record_batch = RecordBatch::try_new(
2159            Arc::new(schema.clone()),
2160            vec![Arc::new(binary_array), Arc::new(utf8_array)],
2161        )
2162        .unwrap();
2163
2164        let mut file = tempfile::tempfile().unwrap();
2165        {
2166            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2167            writer.write(&record_batch).unwrap();
2168            writer.finish().unwrap();
2169        }
2170        file.rewind().unwrap();
2171        {
2172            let mut reader = FileReader::try_new(&file, None).unwrap();
2173            let read_batch = reader.next().unwrap().unwrap();
2174            read_batch
2175                .columns()
2176                .iter()
2177                .zip(record_batch.columns())
2178                .for_each(|(a, b)| {
2179                    assert_eq!(a, b);
2180                });
2181        }
2182        file.rewind().unwrap();
2183        {
2184            let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2185            let read_batch = reader.next().unwrap().unwrap();
2186            assert_eq!(read_batch.num_columns(), 1);
2187            let read_array = read_batch.column(0);
2188            let write_array = record_batch.column(0);
2189            assert_eq!(read_array, write_array);
2190        }
2191    }
2192
2193    #[test]
2194    fn truncate_ipc_record_batch() {
2195        fn create_batch(rows: usize) -> RecordBatch {
2196            let schema = Schema::new(vec![
2197                Field::new("a", DataType::Int32, false),
2198                Field::new("b", DataType::Utf8, false),
2199            ]);
2200
2201            let a = Int32Array::from_iter_values(0..rows as i32);
2202            let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2203
2204            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2205        }
2206
2207        let big_record_batch = create_batch(65536);
2208
2209        let length = 5;
2210        let small_record_batch = create_batch(length);
2211
2212        let offset = 2;
2213        let record_batch_slice = big_record_batch.slice(offset, length);
2214        assert!(
2215            serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2216        );
2217        assert_eq!(
2218            serialize_stream(&small_record_batch).len(),
2219            serialize_stream(&record_batch_slice).len()
2220        );
2221
2222        assert_eq!(
2223            deserialize_stream(serialize_stream(&record_batch_slice)),
2224            record_batch_slice
2225        );
2226    }
2227
2228    #[test]
2229    fn truncate_ipc_record_batch_with_nulls() {
2230        fn create_batch() -> RecordBatch {
2231            let schema = Schema::new(vec![
2232                Field::new("a", DataType::Int32, true),
2233                Field::new("b", DataType::Utf8, true),
2234            ]);
2235
2236            let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2237            let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2238
2239            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2240        }
2241
2242        let record_batch = create_batch();
2243        let record_batch_slice = record_batch.slice(1, 2);
2244        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2245
2246        assert!(
2247            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2248        );
2249
2250        assert!(deserialized_batch.column(0).is_null(0));
2251        assert!(deserialized_batch.column(0).is_valid(1));
2252        assert!(deserialized_batch.column(1).is_valid(0));
2253        assert!(deserialized_batch.column(1).is_valid(1));
2254
2255        assert_eq!(record_batch_slice, deserialized_batch);
2256    }
2257
2258    #[test]
2259    fn truncate_ipc_dictionary_array() {
2260        fn create_batch() -> RecordBatch {
2261            let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2262                .into_iter()
2263                .collect();
2264            let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2265
2266            let array = DictionaryArray::new(keys, Arc::new(values));
2267
2268            let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2269
2270            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2271        }
2272
2273        let record_batch = create_batch();
2274        let record_batch_slice = record_batch.slice(1, 2);
2275        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2276
2277        assert!(
2278            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2279        );
2280
2281        assert!(deserialized_batch.column(0).is_valid(0));
2282        assert!(deserialized_batch.column(0).is_null(1));
2283
2284        assert_eq!(record_batch_slice, deserialized_batch);
2285    }
2286
2287    #[test]
2288    fn truncate_ipc_struct_array() {
2289        fn create_batch() -> RecordBatch {
2290            let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2291                .into_iter()
2292                .collect();
2293            let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2294
2295            let struct_array = StructArray::from(vec![
2296                (
2297                    Arc::new(Field::new("s", DataType::Utf8, true)),
2298                    Arc::new(strings) as ArrayRef,
2299                ),
2300                (
2301                    Arc::new(Field::new("c", DataType::Int32, true)),
2302                    Arc::new(ints) as ArrayRef,
2303                ),
2304            ]);
2305
2306            let schema = Schema::new(vec![Field::new(
2307                "struct_array",
2308                struct_array.data_type().clone(),
2309                true,
2310            )]);
2311
2312            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2313        }
2314
2315        let record_batch = create_batch();
2316        let record_batch_slice = record_batch.slice(1, 2);
2317        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2318
2319        assert!(
2320            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2321        );
2322
2323        let structs = deserialized_batch
2324            .column(0)
2325            .as_any()
2326            .downcast_ref::<StructArray>()
2327            .unwrap();
2328
2329        assert!(structs.column(0).is_null(0));
2330        assert!(structs.column(0).is_valid(1));
2331        assert!(structs.column(1).is_valid(0));
2332        assert!(structs.column(1).is_null(1));
2333        assert_eq!(record_batch_slice, deserialized_batch);
2334    }
2335
2336    #[test]
2337    fn truncate_ipc_string_array_with_all_empty_string() {
2338        fn create_batch() -> RecordBatch {
2339            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2340            let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2341            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2342        }
2343
2344        let record_batch = create_batch();
2345        let record_batch_slice = record_batch.slice(0, 1);
2346        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2347
2348        assert!(
2349            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2350        );
2351        assert_eq!(record_batch_slice, deserialized_batch);
2352    }
2353
2354    #[test]
2355    fn test_stream_writer_writes_array_slice() {
2356        let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2357        assert_eq!(
2358            vec![Some(1), Some(2), Some(3)],
2359            array.iter().collect::<Vec<_>>()
2360        );
2361
2362        let sliced = array.slice(1, 2);
2363        assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2364
2365        let batch = RecordBatch::try_new(
2366            Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2367            vec![Arc::new(sliced)],
2368        )
2369        .expect("new batch");
2370
2371        let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2372        writer.write(&batch).expect("write");
2373        let outbuf = writer.into_inner().expect("inner");
2374
2375        let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2376        let read_batch = reader.next().unwrap().expect("read batch");
2377
2378        let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2379        assert_eq!(
2380            vec![Some(2), Some(3)],
2381            read_array.iter().collect::<Vec<_>>()
2382        );
2383    }
2384
2385    #[test]
2386    fn encode_bools_slice() {
2387        // Test case for https://github.com/apache/arrow-rs/issues/3496
2388        assert_bool_roundtrip([true, false], 1, 1);
2389
2390        // slice somewhere in the middle
2391        assert_bool_roundtrip(
2392            [
2393                true, false, true, true, false, false, true, true, true, false, false, false, true,
2394                true, true, true, false, false, false, false, true, true, true, true, true, false,
2395                false, false, false, false,
2396            ],
2397            13,
2398            17,
2399        );
2400
2401        // start at byte boundary, end in the middle
2402        assert_bool_roundtrip(
2403            [
2404                true, false, true, true, false, false, true, true, true, false, false, false,
2405            ],
2406            8,
2407            2,
2408        );
2409
2410        // start and stop and byte boundary
2411        assert_bool_roundtrip(
2412            [
2413                true, false, true, true, false, false, true, true, true, false, false, false, true,
2414                true, true, true, true, false, false, false, false, false,
2415            ],
2416            8,
2417            8,
2418        );
2419    }
2420
2421    fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2422        let val_bool_field = Field::new("val", DataType::Boolean, false);
2423
2424        let schema = Arc::new(Schema::new(vec![val_bool_field]));
2425
2426        let bools = BooleanArray::from(bools.to_vec());
2427
2428        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2429        let batch = batch.slice(offset, length);
2430
2431        let data = serialize_stream(&batch);
2432        let batch2 = deserialize_stream(data);
2433        assert_eq!(batch, batch2);
2434    }
2435
2436    #[test]
2437    fn test_run_array_unslice() {
2438        let total_len = 80;
2439        let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2440        let repeats: Vec<usize> = vec![3, 4, 1, 2];
2441        let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2442        for ix in 0_usize..32 {
2443            let repeat: usize = repeats[ix % repeats.len()];
2444            let val: Option<i32> = vals[ix % vals.len()];
2445            input_array.resize(input_array.len() + repeat, val);
2446        }
2447
2448        // Encode the input_array to run array
2449        let mut builder =
2450            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
2451        builder.extend(input_array.iter().copied());
2452        let run_array = builder.finish();
2453
2454        // test for all slice lengths.
2455        for slice_len in 1..=total_len {
2456            // test for offset = 0, slice length = slice_len
2457            let sliced_run_array: RunArray<Int16Type> =
2458                run_array.slice(0, slice_len).into_data().into();
2459
2460            // Create unsliced run array.
2461            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2462            let typed = unsliced_run_array
2463                .downcast::<PrimitiveArray<Int32Type>>()
2464                .unwrap();
2465            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
2466            let actual: Vec<Option<i32>> = typed.into_iter().collect();
2467            assert_eq!(expected, actual);
2468
2469            // test for offset = total_len - slice_len, length = slice_len
2470            let sliced_run_array: RunArray<Int16Type> = run_array
2471                .slice(total_len - slice_len, slice_len)
2472                .into_data()
2473                .into();
2474
2475            // Create unsliced run array.
2476            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2477            let typed = unsliced_run_array
2478                .downcast::<PrimitiveArray<Int32Type>>()
2479                .unwrap();
2480            let expected: Vec<Option<i32>> = input_array
2481                .iter()
2482                .skip(total_len - slice_len)
2483                .copied()
2484                .collect();
2485            let actual: Vec<Option<i32>> = typed.into_iter().collect();
2486            assert_eq!(expected, actual);
2487        }
2488    }
2489
2490    fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2491        let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
2492
2493        for i in 0..100_000 {
2494            for value in [i, i, i] {
2495                ls.values().append_value(value);
2496            }
2497            ls.append(true)
2498        }
2499
2500        ls.finish()
2501    }
2502
2503    fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2504        let mut ls =
2505            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2506
2507        for _i in 0..10_000 {
2508            for j in 0..10 {
2509                for value in [j, j, j, j] {
2510                    ls.values().values().append_value(value);
2511                }
2512                ls.values().append(true)
2513            }
2514            ls.append(true);
2515        }
2516
2517        ls.finish()
2518    }
2519
2520    fn generate_map_array_data() -> MapArray {
2521        let keys_builder = UInt32Builder::new();
2522        let values_builder = UInt32Builder::new();
2523
2524        let mut builder = MapBuilder::new(None, keys_builder, values_builder);
2525
2526        for i in 0..100_000 {
2527            for _j in 0..3 {
2528                builder.keys().append_value(i);
2529                builder.values().append_value(i * 2);
2530            }
2531            builder.append(true).unwrap();
2532        }
2533
2534        builder.finish()
2535    }
2536
2537    /// Ensure when serde full & sliced versions they are equal to original input.
2538    /// Also ensure serialized sliced version is significantly smaller than serialized full.
2539    fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
2540        // test both full and sliced versions
2541        let in_sliced = in_batch.slice(999, 1);
2542
2543        let bytes_batch = serialize_file(&in_batch);
2544        let bytes_sliced = serialize_file(&in_sliced);
2545
2546        // serializing 1 row should be significantly smaller than serializing 100,000
2547        assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
2548
2549        // ensure both are still valid and equal to originals
2550        let out_batch = deserialize_file(bytes_batch);
2551        assert_eq!(in_batch, out_batch);
2552
2553        let out_sliced = deserialize_file(bytes_sliced);
2554        assert_eq!(in_sliced, out_sliced);
2555    }
2556
2557    #[test]
2558    fn encode_lists() {
2559        let val_inner = Field::new("item", DataType::UInt32, true);
2560        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2561        let schema = Arc::new(Schema::new(vec![val_list_field]));
2562
2563        let values = Arc::new(generate_list_data::<i32>());
2564
2565        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2566        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2567    }
2568
2569    #[test]
2570    fn encode_empty_list() {
2571        let val_inner = Field::new("item", DataType::UInt32, true);
2572        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2573        let schema = Arc::new(Schema::new(vec![val_list_field]));
2574
2575        let values = Arc::new(generate_list_data::<i32>());
2576
2577        let in_batch = RecordBatch::try_new(schema, vec![values])
2578            .unwrap()
2579            .slice(999, 0);
2580        let out_batch = deserialize_file(serialize_file(&in_batch));
2581        assert_eq!(in_batch, out_batch);
2582    }
2583
2584    #[test]
2585    fn encode_large_lists() {
2586        let val_inner = Field::new("item", DataType::UInt32, true);
2587        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
2588        let schema = Arc::new(Schema::new(vec![val_list_field]));
2589
2590        let values = Arc::new(generate_list_data::<i64>());
2591
2592        // ensure when serde full & sliced versions they are equal to original input
2593        // also ensure serialized sliced version is significantly smaller than serialized full
2594        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2595        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2596    }
2597
2598    #[test]
2599    fn encode_nested_lists() {
2600        let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
2601        let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
2602        let list_field = Field::new("val", DataType::List(inner_list_field), true);
2603        let schema = Arc::new(Schema::new(vec![list_field]));
2604
2605        let values = Arc::new(generate_nested_list_data::<i32>());
2606
2607        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2608        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2609    }
2610
2611    #[test]
2612    fn encode_map_array() {
2613        let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
2614        let values = Arc::new(Field::new("values", DataType::UInt32, true));
2615        let map_field = Field::new_map("map", "entries", keys, values, false, true);
2616        let schema = Arc::new(Schema::new(vec![map_field]));
2617
2618        let values = Arc::new(generate_map_array_data());
2619
2620        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2621        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2622    }
2623
2624    #[test]
2625    fn test_decimal128_alignment16_is_sufficient() {
2626        const IPC_ALIGNMENT: usize = 16;
2627
2628        // Test a bunch of different dimensions to ensure alignment is never an issue.
2629        // For example, if we only test `num_cols = 1` then even with alignment 8 this
2630        // test would _happen_ to pass, even though for different dimensions like
2631        // `num_cols = 2` it would fail.
2632        for num_cols in [1, 2, 3, 17, 50, 73, 99] {
2633            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
2634
2635            let mut fields = Vec::new();
2636            let mut arrays = Vec::new();
2637            for i in 0..num_cols {
2638                let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
2639                let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
2640                fields.push(field);
2641                arrays.push(Arc::new(array) as Arc<dyn Array>);
2642            }
2643            let schema = Schema::new(fields);
2644            let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
2645
2646            let mut writer = FileWriter::try_new_with_options(
2647                Vec::new(),
2648                batch.schema_ref(),
2649                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2650            )
2651            .unwrap();
2652            writer.write(&batch).unwrap();
2653            writer.finish().unwrap();
2654
2655            let out: Vec<u8> = writer.into_inner().unwrap();
2656
2657            let buffer = Buffer::from_vec(out);
2658            let trailer_start = buffer.len() - 10;
2659            let footer_len =
2660                read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
2661            let footer =
2662                root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
2663
2664            let schema = fb_to_schema(footer.schema().unwrap());
2665
2666            // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient
2667            // for `read_record_batch` later on to read the data in a zero-copy manner.
2668            let decoder =
2669                FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
2670
2671            let batches = footer.recordBatches().unwrap();
2672
2673            let block = batches.get(0);
2674            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2675            let data = buffer.slice_with_length(block.offset() as _, block_len);
2676
2677            let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
2678
2679            assert_eq!(batch, batch2);
2680        }
2681    }
2682
2683    #[test]
2684    fn test_decimal128_alignment8_is_unaligned() {
2685        const IPC_ALIGNMENT: usize = 8;
2686
2687        let num_cols = 2;
2688        let num_rows = 1;
2689
2690        let mut fields = Vec::new();
2691        let mut arrays = Vec::new();
2692        for i in 0..num_cols {
2693            let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
2694            let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
2695            fields.push(field);
2696            arrays.push(Arc::new(array) as Arc<dyn Array>);
2697        }
2698        let schema = Schema::new(fields);
2699        let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
2700
2701        let mut writer = FileWriter::try_new_with_options(
2702            Vec::new(),
2703            batch.schema_ref(),
2704            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2705        )
2706        .unwrap();
2707        writer.write(&batch).unwrap();
2708        writer.finish().unwrap();
2709
2710        let out: Vec<u8> = writer.into_inner().unwrap();
2711
2712        let buffer = Buffer::from_vec(out);
2713        let trailer_start = buffer.len() - 10;
2714        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
2715        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
2716
2717        let schema = fb_to_schema(footer.schema().unwrap());
2718
2719        // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying
2720        // to an aligned buffer in `ArrayDataBuilder.build_aligned`.
2721        let decoder =
2722            FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
2723
2724        let batches = footer.recordBatches().unwrap();
2725
2726        let block = batches.get(0);
2727        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2728        let data = buffer.slice_with_length(block.offset() as _, block_len);
2729
2730        let result = decoder.read_record_batch(block, &data);
2731
2732        let error = result.unwrap_err();
2733        assert_eq!(
2734            error.to_string(),
2735            "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
2736             offset from expected alignment of 16 by 8"
2737        );
2738    }
2739
2740    #[test]
2741    fn test_flush() {
2742        // We write a schema which is small enough to fit into a buffer and not get flushed,
2743        // and then force the write with .flush().
2744        let num_cols = 2;
2745        let mut fields = Vec::new();
2746        let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
2747        for i in 0..num_cols {
2748            let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
2749            fields.push(field);
2750        }
2751        let schema = Schema::new(fields);
2752        let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
2753        let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
2754        let mut stream_writer =
2755            StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
2756                .unwrap();
2757        let mut file_writer =
2758            FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
2759
2760        let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
2761        let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
2762        stream_writer.flush().unwrap();
2763        file_writer.flush().unwrap();
2764        let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
2765        let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
2766        let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
2767        // Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
2768        // and then a length of 0 (4 bytes) for a total of 8 bytes.
2769        // Everything before that should have been flushed in the .flush() call.
2770        let expected_stream_flushed_bytes = stream_out.len() - 8;
2771        // A file write is the same as the stream write except for the leading magic string
2772        // ARROW1 plus padding, which is 8 bytes.
2773        let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
2774
2775        assert!(
2776            stream_bytes_written_on_new < stream_bytes_written_on_flush,
2777            "this test makes no sense if flush is not actually required"
2778        );
2779        assert!(
2780            file_bytes_written_on_new < file_bytes_written_on_flush,
2781            "this test makes no sense if flush is not actually required"
2782        );
2783        assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
2784        assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
2785    }
2786}