arrow_array/array/
run_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer, RunEndBuffer};
22use arrow_data::{ArrayData, ArrayDataBuilder};
23use arrow_schema::{ArrowError, DataType, Field};
24
25use crate::{
26    builder::StringRunBuilder,
27    make_array,
28    run_iterator::RunArrayIter,
29    types::{Int16Type, Int32Type, Int64Type, RunEndIndexType},
30    Array, ArrayAccessor, ArrayRef, PrimitiveArray,
31};
32
33/// An array of [run-end encoded values](https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout)
34///
35/// This encoding is variation on [run-length encoding (RLE)](https://en.wikipedia.org/wiki/Run-length_encoding)
36/// and is good for representing data containing same values repeated consecutively.
37///
38/// [`RunArray`] contains `run_ends` array and `values` array of same length.
39/// The `run_ends` array stores the indexes at which the run ends. The `values` array
40/// stores the value of each run. Below example illustrates how a logical array is represented in
41/// [`RunArray`]
42///
43///
44/// ```text
45/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┐
46///   ┌─────────────────┐  ┌─────────┐       ┌─────────────────┐
47/// │ │        A        │  │    2    │ │     │        A        │
48///   ├─────────────────┤  ├─────────┤       ├─────────────────┤
49/// │ │        D        │  │    3    │ │     │        A        │    run length of 'A' = runs_ends[0] - 0 = 2
50///   ├─────────────────┤  ├─────────┤       ├─────────────────┤
51/// │ │        B        │  │    6    │ │     │        D        │    run length of 'D' = run_ends[1] - run_ends[0] = 1
52///   └─────────────────┘  └─────────┘       ├─────────────────┤
53/// │        values          run_ends  │     │        B        │
54///                                          ├─────────────────┤
55/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┘     │        B        │
56///                                          ├─────────────────┤
57///                RunArray                  │        B        │    run length of 'B' = run_ends[2] - run_ends[1] = 3
58///               length = 3                 └─────────────────┘
59///
60///                                             Logical array
61///                                                Contents
62/// ```
63pub struct RunArray<R: RunEndIndexType> {
64    data_type: DataType,
65    run_ends: RunEndBuffer<R::Native>,
66    values: ArrayRef,
67}
68
69impl<R: RunEndIndexType> Clone for RunArray<R> {
70    fn clone(&self) -> Self {
71        Self {
72            data_type: self.data_type.clone(),
73            run_ends: self.run_ends.clone(),
74            values: self.values.clone(),
75        }
76    }
77}
78
79impl<R: RunEndIndexType> RunArray<R> {
80    /// Calculates the logical length of the array encoded
81    /// by the given run_ends array.
82    pub fn logical_len(run_ends: &PrimitiveArray<R>) -> usize {
83        let len = run_ends.len();
84        if len == 0 {
85            return 0;
86        }
87        run_ends.value(len - 1).as_usize()
88    }
89
90    /// Attempts to create RunArray using given run_ends (index where a run ends)
91    /// and the values (value of the run). Returns an error if the given data is not compatible
92    /// with RunEndEncoded specification.
93    pub fn try_new(run_ends: &PrimitiveArray<R>, values: &dyn Array) -> Result<Self, ArrowError> {
94        let run_ends_type = run_ends.data_type().clone();
95        let values_type = values.data_type().clone();
96        let ree_array_type = DataType::RunEndEncoded(
97            Arc::new(Field::new("run_ends", run_ends_type, false)),
98            Arc::new(Field::new("values", values_type, true)),
99        );
100        let len = RunArray::logical_len(run_ends);
101        let builder = ArrayDataBuilder::new(ree_array_type)
102            .len(len)
103            .add_child_data(run_ends.to_data())
104            .add_child_data(values.to_data());
105
106        // `build_unchecked` is used to avoid recursive validation of child arrays.
107        let array_data = unsafe { builder.build_unchecked() };
108
109        // Safety: `validate_data` checks below
110        //    1. The given array data has exactly two child arrays.
111        //    2. The first child array (run_ends) has valid data type.
112        //    3. run_ends array does not have null values
113        //    4. run_ends array has non-zero and strictly increasing values.
114        //    5. The length of run_ends array and values array are the same.
115        array_data.validate_data()?;
116
117        Ok(array_data.into())
118    }
119
120    /// Returns a reference to [`RunEndBuffer`]
121    pub fn run_ends(&self) -> &RunEndBuffer<R::Native> {
122        &self.run_ends
123    }
124
125    /// Returns a reference to values array
126    ///
127    /// Note: any slicing of this [`RunArray`] array is not applied to the returned array
128    /// and must be handled separately
129    pub fn values(&self) -> &ArrayRef {
130        &self.values
131    }
132
133    /// Returns the physical index at which the array slice starts.
134    pub fn get_start_physical_index(&self) -> usize {
135        self.run_ends.get_start_physical_index()
136    }
137
138    /// Returns the physical index at which the array slice ends.
139    pub fn get_end_physical_index(&self) -> usize {
140        self.run_ends.get_end_physical_index()
141    }
142
143    /// Downcast this [`RunArray`] to a [`TypedRunArray`]
144    ///
145    /// ```
146    /// use arrow_array::{Array, ArrayAccessor, RunArray, StringArray, types::Int32Type};
147    ///
148    /// let orig = [Some("a"), Some("b"), None];
149    /// let run_array = RunArray::<Int32Type>::from_iter(orig);
150    /// let typed = run_array.downcast::<StringArray>().unwrap();
151    /// assert_eq!(typed.value(0), "a");
152    /// assert_eq!(typed.value(1), "b");
153    /// assert!(typed.values().is_null(2));
154    /// ```
155    ///
156    pub fn downcast<V: 'static>(&self) -> Option<TypedRunArray<'_, R, V>> {
157        let values = self.values.as_any().downcast_ref()?;
158        Some(TypedRunArray {
159            run_array: self,
160            values,
161        })
162    }
163
164    /// Returns index to the physical array for the given index to the logical array.
165    /// This function adjusts the input logical index based on `ArrayData::offset`
166    /// Performs a binary search on the run_ends array for the input index.
167    ///
168    /// The result is arbitrary if `logical_index >= self.len()`
169    pub fn get_physical_index(&self, logical_index: usize) -> usize {
170        self.run_ends.get_physical_index(logical_index)
171    }
172
173    /// Returns the physical indices of the input logical indices. Returns error if any of the logical
174    /// index cannot be converted to physical index. The logical indices are sorted and iterated along
175    /// with run_ends array to find matching physical index. The approach used here was chosen over
176    /// finding physical index for each logical index using binary search using the function
177    /// `get_physical_index`. Running benchmarks on both approaches showed that the approach used here
178    /// scaled well for larger inputs.
179    /// See <https://github.com/apache/arrow-rs/pull/3622#issuecomment-1407753727> for more details.
180    #[inline]
181    pub fn get_physical_indices<I>(&self, logical_indices: &[I]) -> Result<Vec<usize>, ArrowError>
182    where
183        I: ArrowNativeType,
184    {
185        let len = self.run_ends().len();
186        let offset = self.run_ends().offset();
187
188        let indices_len = logical_indices.len();
189
190        if indices_len == 0 {
191            return Ok(vec![]);
192        }
193
194        // `ordered_indices` store index into `logical_indices` and can be used
195        // to iterate `logical_indices` in sorted order.
196        let mut ordered_indices: Vec<usize> = (0..indices_len).collect();
197
198        // Instead of sorting `logical_indices` directly, sort the `ordered_indices`
199        // whose values are index of `logical_indices`
200        ordered_indices.sort_unstable_by(|lhs, rhs| {
201            logical_indices[*lhs]
202                .partial_cmp(&logical_indices[*rhs])
203                .unwrap()
204        });
205
206        // Return early if all the logical indices cannot be converted to physical indices.
207        let largest_logical_index = logical_indices[*ordered_indices.last().unwrap()].as_usize();
208        if largest_logical_index >= len {
209            return Err(ArrowError::InvalidArgumentError(format!(
210                "Cannot convert all logical indices to physical indices. The logical index cannot be converted is {largest_logical_index}.",
211            )));
212        }
213
214        // Skip some physical indices based on offset.
215        let skip_value = self.get_start_physical_index();
216
217        let mut physical_indices = vec![0; indices_len];
218
219        let mut ordered_index = 0_usize;
220        for (physical_index, run_end) in self.run_ends.values().iter().enumerate().skip(skip_value)
221        {
222            // Get the run end index (relative to offset) of current physical index
223            let run_end_value = run_end.as_usize() - offset;
224
225            // All the `logical_indices` that are less than current run end index
226            // belongs to current physical index.
227            while ordered_index < indices_len
228                && logical_indices[ordered_indices[ordered_index]].as_usize() < run_end_value
229            {
230                physical_indices[ordered_indices[ordered_index]] = physical_index;
231                ordered_index += 1;
232            }
233        }
234
235        // If there are input values >= run_ends.last_value then we'll not be able to convert
236        // all logical indices to physical indices.
237        if ordered_index < logical_indices.len() {
238            let logical_index = logical_indices[ordered_indices[ordered_index]].as_usize();
239            return Err(ArrowError::InvalidArgumentError(format!(
240                "Cannot convert all logical indices to physical indices. The logical index cannot be converted is {logical_index}.",
241            )));
242        }
243        Ok(physical_indices)
244    }
245
246    /// Returns a zero-copy slice of this array with the indicated offset and length.
247    pub fn slice(&self, offset: usize, length: usize) -> Self {
248        Self {
249            data_type: self.data_type.clone(),
250            run_ends: self.run_ends.slice(offset, length),
251            values: self.values.clone(),
252        }
253    }
254}
255
256impl<R: RunEndIndexType> From<ArrayData> for RunArray<R> {
257    // The method assumes the caller already validated the data using `ArrayData::validate_data()`
258    fn from(data: ArrayData) -> Self {
259        match data.data_type() {
260            DataType::RunEndEncoded(_, _) => {}
261            _ => {
262                panic!("Invalid data type for RunArray. The data type should be DataType::RunEndEncoded");
263            }
264        }
265
266        // Safety
267        // ArrayData is valid
268        let child = &data.child_data()[0];
269        assert_eq!(child.data_type(), &R::DATA_TYPE, "Incorrect run ends type");
270        let run_ends = unsafe {
271            let scalar = child.buffers()[0].clone().into();
272            RunEndBuffer::new_unchecked(scalar, data.offset(), data.len())
273        };
274
275        let values = make_array(data.child_data()[1].clone());
276        Self {
277            data_type: data.data_type().clone(),
278            run_ends,
279            values,
280        }
281    }
282}
283
284impl<R: RunEndIndexType> From<RunArray<R>> for ArrayData {
285    fn from(array: RunArray<R>) -> Self {
286        let len = array.run_ends.len();
287        let offset = array.run_ends.offset();
288
289        let run_ends = ArrayDataBuilder::new(R::DATA_TYPE)
290            .len(array.run_ends.values().len())
291            .buffers(vec![array.run_ends.into_inner().into_inner()]);
292
293        let run_ends = unsafe { run_ends.build_unchecked() };
294
295        let builder = ArrayDataBuilder::new(array.data_type)
296            .len(len)
297            .offset(offset)
298            .child_data(vec![run_ends, array.values.to_data()]);
299
300        unsafe { builder.build_unchecked() }
301    }
302}
303
304impl<T: RunEndIndexType> Array for RunArray<T> {
305    fn as_any(&self) -> &dyn Any {
306        self
307    }
308
309    fn to_data(&self) -> ArrayData {
310        self.clone().into()
311    }
312
313    fn into_data(self) -> ArrayData {
314        self.into()
315    }
316
317    fn data_type(&self) -> &DataType {
318        &self.data_type
319    }
320
321    fn slice(&self, offset: usize, length: usize) -> ArrayRef {
322        Arc::new(self.slice(offset, length))
323    }
324
325    fn len(&self) -> usize {
326        self.run_ends.len()
327    }
328
329    fn is_empty(&self) -> bool {
330        self.run_ends.is_empty()
331    }
332
333    fn offset(&self) -> usize {
334        self.run_ends.offset()
335    }
336
337    fn nulls(&self) -> Option<&NullBuffer> {
338        None
339    }
340
341    fn logical_nulls(&self) -> Option<NullBuffer> {
342        let len = self.len();
343        let nulls = self.values.logical_nulls()?;
344        let mut out = BooleanBufferBuilder::new(len);
345        let offset = self.run_ends.offset();
346        let mut valid_start = 0;
347        let mut last_end = 0;
348        for (idx, end) in self.run_ends.values().iter().enumerate() {
349            let end = end.as_usize();
350            if end < offset {
351                continue;
352            }
353            let end = (end - offset).min(len);
354            if nulls.is_null(idx) {
355                if valid_start < last_end {
356                    out.append_n(last_end - valid_start, true);
357                }
358                out.append_n(end - last_end, false);
359                valid_start = end;
360            }
361            last_end = end;
362            if end == len {
363                break;
364            }
365        }
366        if valid_start < len {
367            out.append_n(len - valid_start, true)
368        }
369        // Sanity check
370        assert_eq!(out.len(), len);
371        Some(out.finish().into())
372    }
373
374    fn is_nullable(&self) -> bool {
375        !self.is_empty() && self.values.is_nullable()
376    }
377
378    fn get_buffer_memory_size(&self) -> usize {
379        self.run_ends.inner().inner().capacity() + self.values.get_buffer_memory_size()
380    }
381
382    fn get_array_memory_size(&self) -> usize {
383        std::mem::size_of::<Self>()
384            + self.run_ends.inner().inner().capacity()
385            + self.values.get_array_memory_size()
386    }
387}
388
389impl<R: RunEndIndexType> std::fmt::Debug for RunArray<R> {
390    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
391        writeln!(
392            f,
393            "RunArray {{run_ends: {:?}, values: {:?}}}",
394            self.run_ends.values(),
395            self.values
396        )
397    }
398}
399
400/// Constructs a `RunArray` from an iterator of optional strings.
401///
402/// # Example:
403/// ```
404/// use arrow_array::{RunArray, PrimitiveArray, StringArray, types::Int16Type};
405///
406/// let test = vec!["a", "a", "b", "c", "c"];
407/// let array: RunArray<Int16Type> = test
408///     .iter()
409///     .map(|&x| if x == "b" { None } else { Some(x) })
410///     .collect();
411/// assert_eq!(
412///     "RunArray {run_ends: [2, 3, 5], values: StringArray\n[\n  \"a\",\n  null,\n  \"c\",\n]}\n",
413///     format!("{:?}", array)
414/// );
415/// ```
416impl<'a, T: RunEndIndexType> FromIterator<Option<&'a str>> for RunArray<T> {
417    fn from_iter<I: IntoIterator<Item = Option<&'a str>>>(iter: I) -> Self {
418        let it = iter.into_iter();
419        let (lower, _) = it.size_hint();
420        let mut builder = StringRunBuilder::with_capacity(lower, 256);
421        it.for_each(|i| {
422            builder.append_option(i);
423        });
424
425        builder.finish()
426    }
427}
428
429/// Constructs a `RunArray` from an iterator of strings.
430///
431/// # Example:
432///
433/// ```
434/// use arrow_array::{RunArray, PrimitiveArray, StringArray, types::Int16Type};
435///
436/// let test = vec!["a", "a", "b", "c"];
437/// let array: RunArray<Int16Type> = test.into_iter().collect();
438/// assert_eq!(
439///     "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n  \"a\",\n  \"b\",\n  \"c\",\n]}\n",
440///     format!("{:?}", array)
441/// );
442/// ```
443impl<'a, T: RunEndIndexType> FromIterator<&'a str> for RunArray<T> {
444    fn from_iter<I: IntoIterator<Item = &'a str>>(iter: I) -> Self {
445        let it = iter.into_iter();
446        let (lower, _) = it.size_hint();
447        let mut builder = StringRunBuilder::with_capacity(lower, 256);
448        it.for_each(|i| {
449            builder.append_value(i);
450        });
451
452        builder.finish()
453    }
454}
455
456///
457/// A [`RunArray`] with `i16` run ends
458///
459/// # Example: Using `collect`
460/// ```
461/// # use arrow_array::{Array, Int16RunArray, Int16Array, StringArray};
462/// # use std::sync::Arc;
463///
464/// let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
465/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
466/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
467/// assert_eq!(array.values(), &values);
468/// ```
469pub type Int16RunArray = RunArray<Int16Type>;
470
471///
472/// A [`RunArray`] with `i32` run ends
473///
474/// # Example: Using `collect`
475/// ```
476/// # use arrow_array::{Array, Int32RunArray, Int32Array, StringArray};
477/// # use std::sync::Arc;
478///
479/// let array: Int32RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
480/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
481/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
482/// assert_eq!(array.values(), &values);
483/// ```
484pub type Int32RunArray = RunArray<Int32Type>;
485
486///
487/// A [`RunArray`] with `i64` run ends
488///
489/// # Example: Using `collect`
490/// ```
491/// # use arrow_array::{Array, Int64RunArray, Int64Array, StringArray};
492/// # use std::sync::Arc;
493///
494/// let array: Int64RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
495/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
496/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
497/// assert_eq!(array.values(), &values);
498/// ```
499pub type Int64RunArray = RunArray<Int64Type>;
500
501/// A [`RunArray`] typed typed on its child values array
502///
503/// Implements [`ArrayAccessor`] and [`IntoIterator`] allowing fast access to its elements
504///
505/// ```
506/// use arrow_array::{RunArray, StringArray, types::Int32Type};
507///
508/// let orig = ["a", "b", "a", "b"];
509/// let ree_array = RunArray::<Int32Type>::from_iter(orig);
510///
511/// // `TypedRunArray` allows you to access the values directly
512/// let typed = ree_array.downcast::<StringArray>().unwrap();
513///
514/// for (maybe_val, orig) in typed.into_iter().zip(orig) {
515///     assert_eq!(maybe_val.unwrap(), orig)
516/// }
517/// ```
518pub struct TypedRunArray<'a, R: RunEndIndexType, V> {
519    /// The run array
520    run_array: &'a RunArray<R>,
521
522    /// The values of the run_array
523    values: &'a V,
524}
525
526// Manually implement `Clone` to avoid `V: Clone` type constraint
527impl<R: RunEndIndexType, V> Clone for TypedRunArray<'_, R, V> {
528    fn clone(&self) -> Self {
529        *self
530    }
531}
532
533impl<R: RunEndIndexType, V> Copy for TypedRunArray<'_, R, V> {}
534
535impl<R: RunEndIndexType, V> std::fmt::Debug for TypedRunArray<'_, R, V> {
536    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
537        writeln!(f, "TypedRunArray({:?})", self.run_array)
538    }
539}
540
541impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> {
542    /// Returns the run_ends of this [`TypedRunArray`]
543    pub fn run_ends(&self) -> &'a RunEndBuffer<R::Native> {
544        self.run_array.run_ends()
545    }
546
547    /// Returns the values of this [`TypedRunArray`]
548    pub fn values(&self) -> &'a V {
549        self.values
550    }
551
552    /// Returns the run array of this [`TypedRunArray`]
553    pub fn run_array(&self) -> &'a RunArray<R> {
554        self.run_array
555    }
556}
557
558impl<R: RunEndIndexType, V: Sync> Array for TypedRunArray<'_, R, V> {
559    fn as_any(&self) -> &dyn Any {
560        self.run_array
561    }
562
563    fn to_data(&self) -> ArrayData {
564        self.run_array.to_data()
565    }
566
567    fn into_data(self) -> ArrayData {
568        self.run_array.into_data()
569    }
570
571    fn data_type(&self) -> &DataType {
572        self.run_array.data_type()
573    }
574
575    fn slice(&self, offset: usize, length: usize) -> ArrayRef {
576        Arc::new(self.run_array.slice(offset, length))
577    }
578
579    fn len(&self) -> usize {
580        self.run_array.len()
581    }
582
583    fn is_empty(&self) -> bool {
584        self.run_array.is_empty()
585    }
586
587    fn offset(&self) -> usize {
588        self.run_array.offset()
589    }
590
591    fn nulls(&self) -> Option<&NullBuffer> {
592        self.run_array.nulls()
593    }
594
595    fn logical_nulls(&self) -> Option<NullBuffer> {
596        self.run_array.logical_nulls()
597    }
598
599    fn logical_null_count(&self) -> usize {
600        self.run_array.logical_null_count()
601    }
602
603    fn is_nullable(&self) -> bool {
604        self.run_array.is_nullable()
605    }
606
607    fn get_buffer_memory_size(&self) -> usize {
608        self.run_array.get_buffer_memory_size()
609    }
610
611    fn get_array_memory_size(&self) -> usize {
612        self.run_array.get_array_memory_size()
613    }
614}
615
616// Array accessor converts the index of logical array to the index of the physical array
617// using binary search. The time complexity is O(log N) where N is number of runs.
618impl<'a, R, V> ArrayAccessor for TypedRunArray<'a, R, V>
619where
620    R: RunEndIndexType,
621    V: Sync + Send,
622    &'a V: ArrayAccessor,
623    <&'a V as ArrayAccessor>::Item: Default,
624{
625    type Item = <&'a V as ArrayAccessor>::Item;
626
627    fn value(&self, logical_index: usize) -> Self::Item {
628        assert!(
629            logical_index < self.len(),
630            "Trying to access an element at index {} from a TypedRunArray of length {}",
631            logical_index,
632            self.len()
633        );
634        unsafe { self.value_unchecked(logical_index) }
635    }
636
637    unsafe fn value_unchecked(&self, logical_index: usize) -> Self::Item {
638        let physical_index = self.run_array.get_physical_index(logical_index);
639        self.values().value_unchecked(physical_index)
640    }
641}
642
643impl<'a, R, V> IntoIterator for TypedRunArray<'a, R, V>
644where
645    R: RunEndIndexType,
646    V: Sync + Send,
647    &'a V: ArrayAccessor,
648    <&'a V as ArrayAccessor>::Item: Default,
649{
650    type Item = Option<<&'a V as ArrayAccessor>::Item>;
651    type IntoIter = RunArrayIter<'a, R, V>;
652
653    fn into_iter(self) -> Self::IntoIter {
654        RunArrayIter::new(self)
655    }
656}
657
658#[cfg(test)]
659mod tests {
660    use rand::seq::SliceRandom;
661    use rand::thread_rng;
662    use rand::Rng;
663
664    use super::*;
665    use crate::builder::PrimitiveRunBuilder;
666    use crate::cast::AsArray;
667    use crate::types::{Int8Type, UInt32Type};
668    use crate::{Int32Array, StringArray};
669
670    fn build_input_array(size: usize) -> Vec<Option<i32>> {
671        // The input array is created by shuffling and repeating
672        // the seed values random number of times.
673        let mut seed: Vec<Option<i32>> = vec![
674            None,
675            None,
676            None,
677            Some(1),
678            Some(2),
679            Some(3),
680            Some(4),
681            Some(5),
682            Some(6),
683            Some(7),
684            Some(8),
685            Some(9),
686        ];
687        let mut result: Vec<Option<i32>> = Vec::with_capacity(size);
688        let mut ix = 0;
689        let mut rng = thread_rng();
690        // run length can go up to 8. Cap the max run length for smaller arrays to size / 2.
691        let max_run_length = 8_usize.min(1_usize.max(size / 2));
692        while result.len() < size {
693            // shuffle the seed array if all the values are iterated.
694            if ix == 0 {
695                seed.shuffle(&mut rng);
696            }
697            // repeat the items between 1 and 8 times. Cap the length for smaller sized arrays
698            let num = max_run_length.min(rand::thread_rng().gen_range(1..=max_run_length));
699            for _ in 0..num {
700                result.push(seed[ix]);
701            }
702            ix += 1;
703            if ix == seed.len() {
704                ix = 0
705            }
706        }
707        result.resize(size, None);
708        result
709    }
710
711    // Asserts that `logical_array[logical_indices[*]] == physical_array[physical_indices[*]]`
712    fn compare_logical_and_physical_indices(
713        logical_indices: &[u32],
714        logical_array: &[Option<i32>],
715        physical_indices: &[usize],
716        physical_array: &PrimitiveArray<Int32Type>,
717    ) {
718        assert_eq!(logical_indices.len(), physical_indices.len());
719
720        // check value in logical index in the logical_array matches physical index in physical_array
721        logical_indices
722            .iter()
723            .map(|f| f.as_usize())
724            .zip(physical_indices.iter())
725            .for_each(|(logical_ix, physical_ix)| {
726                let expected = logical_array[logical_ix];
727                match expected {
728                    Some(val) => {
729                        assert!(physical_array.is_valid(*physical_ix));
730                        let actual = physical_array.value(*physical_ix);
731                        assert_eq!(val, actual);
732                    }
733                    None => {
734                        assert!(physical_array.is_null(*physical_ix))
735                    }
736                };
737            });
738    }
739    #[test]
740    fn test_run_array() {
741        // Construct a value array
742        let value_data =
743            PrimitiveArray::<Int8Type>::from_iter_values([10_i8, 11, 12, 13, 14, 15, 16, 17]);
744
745        // Construct a run_ends array:
746        let run_ends_values = [4_i16, 6, 7, 9, 13, 18, 20, 22];
747        let run_ends_data =
748            PrimitiveArray::<Int16Type>::from_iter_values(run_ends_values.iter().copied());
749
750        // Construct a run ends encoded array from the above two
751        let ree_array = RunArray::<Int16Type>::try_new(&run_ends_data, &value_data).unwrap();
752
753        assert_eq!(ree_array.len(), 22);
754        assert_eq!(ree_array.null_count(), 0);
755
756        let values = ree_array.values();
757        assert_eq!(value_data.into_data(), values.to_data());
758        assert_eq!(&DataType::Int8, values.data_type());
759
760        let run_ends = ree_array.run_ends();
761        assert_eq!(run_ends.values(), &run_ends_values);
762    }
763
764    #[test]
765    fn test_run_array_fmt_debug() {
766        let mut builder = PrimitiveRunBuilder::<Int16Type, UInt32Type>::with_capacity(3);
767        builder.append_value(12345678);
768        builder.append_null();
769        builder.append_value(22345678);
770        let array = builder.finish();
771        assert_eq!(
772            "RunArray {run_ends: [1, 2, 3], values: PrimitiveArray<UInt32>\n[\n  12345678,\n  null,\n  22345678,\n]}\n",
773            format!("{array:?}")
774        );
775
776        let mut builder = PrimitiveRunBuilder::<Int16Type, UInt32Type>::with_capacity(20);
777        for _ in 0..20 {
778            builder.append_value(1);
779        }
780        let array = builder.finish();
781
782        assert_eq!(array.len(), 20);
783        assert_eq!(array.null_count(), 0);
784        assert_eq!(array.logical_null_count(), 0);
785
786        assert_eq!(
787            "RunArray {run_ends: [20], values: PrimitiveArray<UInt32>\n[\n  1,\n]}\n",
788            format!("{array:?}")
789        );
790    }
791
792    #[test]
793    fn test_run_array_from_iter() {
794        let test = vec!["a", "a", "b", "c"];
795        let array: RunArray<Int16Type> = test
796            .iter()
797            .map(|&x| if x == "b" { None } else { Some(x) })
798            .collect();
799        assert_eq!(
800            "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n  \"a\",\n  null,\n  \"c\",\n]}\n",
801            format!("{array:?}")
802        );
803
804        assert_eq!(array.len(), 4);
805        assert_eq!(array.null_count(), 0);
806        assert_eq!(array.logical_null_count(), 1);
807
808        let array: RunArray<Int16Type> = test.into_iter().collect();
809        assert_eq!(
810            "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n  \"a\",\n  \"b\",\n  \"c\",\n]}\n",
811            format!("{array:?}")
812        );
813    }
814
815    #[test]
816    fn test_run_array_run_ends_as_primitive_array() {
817        let test = vec!["a", "b", "c", "a"];
818        let array: RunArray<Int16Type> = test.into_iter().collect();
819
820        assert_eq!(array.len(), 4);
821        assert_eq!(array.null_count(), 0);
822        assert_eq!(array.logical_null_count(), 0);
823
824        let run_ends = array.run_ends();
825        assert_eq!(&[1, 2, 3, 4], run_ends.values());
826    }
827
828    #[test]
829    fn test_run_array_as_primitive_array_with_null() {
830        let test = vec![Some("a"), None, Some("b"), None, None, Some("a")];
831        let array: RunArray<Int32Type> = test.into_iter().collect();
832
833        assert_eq!(array.len(), 6);
834        assert_eq!(array.null_count(), 0);
835        assert_eq!(array.logical_null_count(), 3);
836
837        let run_ends = array.run_ends();
838        assert_eq!(&[1, 2, 3, 5, 6], run_ends.values());
839
840        let values_data = array.values();
841        assert_eq!(2, values_data.null_count());
842        assert_eq!(5, values_data.len());
843    }
844
845    #[test]
846    fn test_run_array_all_nulls() {
847        let test = vec![None, None, None];
848        let array: RunArray<Int32Type> = test.into_iter().collect();
849
850        assert_eq!(array.len(), 3);
851        assert_eq!(array.null_count(), 0);
852        assert_eq!(array.logical_null_count(), 3);
853
854        let run_ends = array.run_ends();
855        assert_eq!(3, run_ends.len());
856        assert_eq!(&[3], run_ends.values());
857
858        let values_data = array.values();
859        assert_eq!(1, values_data.null_count());
860    }
861
862    #[test]
863    fn test_run_array_try_new() {
864        let values: StringArray = [Some("foo"), Some("bar"), None, Some("baz")]
865            .into_iter()
866            .collect();
867        let run_ends: Int32Array = [Some(1), Some(2), Some(3), Some(4)].into_iter().collect();
868
869        let array = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
870        assert_eq!(array.values().data_type(), &DataType::Utf8);
871
872        assert_eq!(array.null_count(), 0);
873        assert_eq!(array.logical_null_count(), 1);
874        assert_eq!(array.len(), 4);
875        assert_eq!(array.values().null_count(), 1);
876
877        assert_eq!(
878            "RunArray {run_ends: [1, 2, 3, 4], values: StringArray\n[\n  \"foo\",\n  \"bar\",\n  null,\n  \"baz\",\n]}\n",
879            format!("{array:?}")
880        );
881    }
882
883    #[test]
884    fn test_run_array_int16_type_definition() {
885        let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
886        let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
887        assert_eq!(array.run_ends().values(), &[2, 3, 5]);
888        assert_eq!(array.values(), &values);
889    }
890
891    #[test]
892    fn test_run_array_empty_string() {
893        let array: Int16RunArray = vec!["a", "a", "", "", "c"].into_iter().collect();
894        let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "", "c"]));
895        assert_eq!(array.run_ends().values(), &[2, 4, 5]);
896        assert_eq!(array.values(), &values);
897    }
898
899    #[test]
900    fn test_run_array_length_mismatch() {
901        let values: StringArray = [Some("foo"), Some("bar"), None, Some("baz")]
902            .into_iter()
903            .collect();
904        let run_ends: Int32Array = [Some(1), Some(2), Some(3)].into_iter().collect();
905
906        let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
907        let expected = ArrowError::InvalidArgumentError("The run_ends array length should be the same as values array length. Run_ends array length is 3, values array length is 4".to_string());
908        assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
909    }
910
911    #[test]
912    fn test_run_array_run_ends_with_null() {
913        let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
914            .into_iter()
915            .collect();
916        let run_ends: Int32Array = [Some(1), None, Some(3)].into_iter().collect();
917
918        let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
919        let expected = ArrowError::InvalidArgumentError(
920            "Found null values in run_ends array. The run_ends array should not have null values."
921                .to_string(),
922        );
923        assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
924    }
925
926    #[test]
927    fn test_run_array_run_ends_with_zeroes() {
928        let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
929            .into_iter()
930            .collect();
931        let run_ends: Int32Array = [Some(0), Some(1), Some(3)].into_iter().collect();
932
933        let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
934        let expected = ArrowError::InvalidArgumentError("The values in run_ends array should be strictly positive. Found value 0 at index 0 that does not match the criteria.".to_string());
935        assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
936    }
937
938    #[test]
939    fn test_run_array_run_ends_non_increasing() {
940        let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
941            .into_iter()
942            .collect();
943        let run_ends: Int32Array = [Some(1), Some(4), Some(4)].into_iter().collect();
944
945        let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
946        let expected = ArrowError::InvalidArgumentError("The values in run_ends array should be strictly increasing. Found value 4 at index 2 with previous value 4 that does not match the criteria.".to_string());
947        assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
948    }
949
950    #[test]
951    #[should_panic(expected = "Incorrect run ends type")]
952    fn test_run_array_run_ends_data_type_mismatch() {
953        let a = RunArray::<Int32Type>::from_iter(["32"]);
954        let _ = RunArray::<Int64Type>::from(a.into_data());
955    }
956
957    #[test]
958    fn test_ree_array_accessor() {
959        let input_array = build_input_array(256);
960
961        // Encode the input_array to ree_array
962        let mut builder =
963            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
964        builder.extend(input_array.iter().copied());
965        let run_array = builder.finish();
966        let typed = run_array.downcast::<PrimitiveArray<Int32Type>>().unwrap();
967
968        // Access every index and check if the value in the input array matches returned value.
969        for (i, inp_val) in input_array.iter().enumerate() {
970            if let Some(val) = inp_val {
971                let actual = typed.value(i);
972                assert_eq!(*val, actual)
973            } else {
974                let physical_ix = run_array.get_physical_index(i);
975                assert!(typed.values().is_null(physical_ix));
976            };
977        }
978    }
979
980    #[test]
981    #[cfg_attr(miri, ignore)] // Takes too long
982    fn test_get_physical_indices() {
983        // Test for logical lengths starting from 10 to 250 increasing by 10
984        for logical_len in (0..250).step_by(10) {
985            let input_array = build_input_array(logical_len);
986
987            // create run array using input_array
988            let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
989            builder.extend(input_array.clone().into_iter());
990
991            let run_array = builder.finish();
992            let physical_values_array = run_array.values().as_primitive::<Int32Type>();
993
994            // create an array consisting of all the indices repeated twice and shuffled.
995            let mut logical_indices: Vec<u32> = (0_u32..(logical_len as u32)).collect();
996            // add same indices once more
997            logical_indices.append(&mut logical_indices.clone());
998            let mut rng = thread_rng();
999            logical_indices.shuffle(&mut rng);
1000
1001            let physical_indices = run_array.get_physical_indices(&logical_indices).unwrap();
1002
1003            assert_eq!(logical_indices.len(), physical_indices.len());
1004
1005            // check value in logical index in the input_array matches physical index in typed_run_array
1006            compare_logical_and_physical_indices(
1007                &logical_indices,
1008                &input_array,
1009                &physical_indices,
1010                physical_values_array,
1011            );
1012        }
1013    }
1014
1015    #[test]
1016    #[cfg_attr(miri, ignore)] // Takes too long
1017    fn test_get_physical_indices_sliced() {
1018        let total_len = 80;
1019        let input_array = build_input_array(total_len);
1020
1021        // Encode the input_array to run array
1022        let mut builder =
1023            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
1024        builder.extend(input_array.iter().copied());
1025        let run_array = builder.finish();
1026        let physical_values_array = run_array.values().as_primitive::<Int32Type>();
1027
1028        // test for all slice lengths.
1029        for slice_len in 1..=total_len {
1030            // create an array consisting of all the indices repeated twice and shuffled.
1031            let mut logical_indices: Vec<u32> = (0_u32..(slice_len as u32)).collect();
1032            // add same indices once more
1033            logical_indices.append(&mut logical_indices.clone());
1034            let mut rng = thread_rng();
1035            logical_indices.shuffle(&mut rng);
1036
1037            // test for offset = 0 and slice length = slice_len
1038            // slice the input array using which the run array was built.
1039            let sliced_input_array = &input_array[0..slice_len];
1040
1041            // slice the run array
1042            let sliced_run_array: RunArray<Int16Type> =
1043                run_array.slice(0, slice_len).into_data().into();
1044
1045            // Get physical indices.
1046            let physical_indices = sliced_run_array
1047                .get_physical_indices(&logical_indices)
1048                .unwrap();
1049
1050            compare_logical_and_physical_indices(
1051                &logical_indices,
1052                sliced_input_array,
1053                &physical_indices,
1054                physical_values_array,
1055            );
1056
1057            // test for offset = total_len - slice_len and slice length = slice_len
1058            // slice the input array using which the run array was built.
1059            let sliced_input_array = &input_array[total_len - slice_len..total_len];
1060
1061            // slice the run array
1062            let sliced_run_array: RunArray<Int16Type> = run_array
1063                .slice(total_len - slice_len, slice_len)
1064                .into_data()
1065                .into();
1066
1067            // Get physical indices
1068            let physical_indices = sliced_run_array
1069                .get_physical_indices(&logical_indices)
1070                .unwrap();
1071
1072            compare_logical_and_physical_indices(
1073                &logical_indices,
1074                sliced_input_array,
1075                &physical_indices,
1076                physical_values_array,
1077            );
1078        }
1079    }
1080
1081    #[test]
1082    fn test_logical_nulls() {
1083        let run = Int32Array::from(vec![3, 6, 9, 12]);
1084        let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
1085        let array = RunArray::try_new(&run, &values).unwrap();
1086
1087        let expected = [
1088            true, true, true, false, false, false, true, true, true, false, false, false,
1089        ];
1090
1091        let n = array.logical_nulls().unwrap();
1092        assert_eq!(n.null_count(), 6);
1093
1094        let slices = [(0, 12), (0, 2), (2, 5), (3, 0), (3, 3), (3, 4), (4, 8)];
1095        for (offset, length) in slices {
1096            let a = array.slice(offset, length);
1097            let n = a.logical_nulls().unwrap();
1098            let n = n.into_iter().collect::<Vec<_>>();
1099            assert_eq!(&n, &expected[offset..offset + length], "{offset} {length}");
1100        }
1101    }
1102}