arrow_buffer/buffer/
run.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::buffer::ScalarBuffer;
19use crate::ArrowNativeType;
20
21/// A slice-able buffer of monotonically increasing, positive integers used to store run-ends
22///
23/// # Logical vs Physical
24///
25/// A [`RunEndBuffer`] is used to encode runs of the same value, the index of each run is
26/// called the physical index. The logical index is then the corresponding index in the logical
27/// run-encoded array, i.e. a single run of length `3`, would have the logical indices `0..3`.
28///
29/// Each value in [`RunEndBuffer::values`] is the cumulative length of all runs in the
30/// logical array, up to that physical index.
31///
32/// Consider a [`RunEndBuffer`] containing `[3, 4, 6]`. The maximum physical index is `2`,
33/// as there are `3` values, and the maximum logical index is `5`, as the maximum run end
34/// is `6`. The physical indices are therefore `[0, 0, 0, 1, 2, 2]`
35///
36/// ```text
37///     ┌─────────┐        ┌─────────┐           ┌─────────┐
38///     │    3    │        │    0    │ ─┬──────▶ │    0    │
39///     ├─────────┤        ├─────────┤  │        ├─────────┤
40///     │    4    │        │    1    │ ─┤ ┌────▶ │    1    │
41///     ├─────────┤        ├─────────┤  │ │      ├─────────┤
42///     │    6    │        │    2    │ ─┘ │ ┌──▶ │    2    │
43///     └─────────┘        ├─────────┤    │ │    └─────────┘
44///      run ends          │    3    │ ───┘ │  physical indices
45///                        ├─────────┤      │
46///                        │    4    │ ─────┤
47///                        ├─────────┤      │
48///                        │    5    │ ─────┘
49///                        └─────────┘
50///                      logical indices
51/// ```
52///
53/// # Slicing
54///
55/// In order to provide zero-copy slicing, this container stores a separate offset and length
56///
57/// For example, a [`RunEndBuffer`] containing values `[3, 6, 8]` with offset and length `4` would
58/// describe the physical indices `1, 1, 2, 2`
59///
60/// For example, a [`RunEndBuffer`] containing values `[6, 8, 9]` with offset `2` and length `5`
61/// would describe the physical indices `0, 0, 0, 0, 1`
62///
63/// [Run-End encoded layout]: https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout
64#[derive(Debug, Clone)]
65pub struct RunEndBuffer<E: ArrowNativeType> {
66    run_ends: ScalarBuffer<E>,
67    len: usize,
68    offset: usize,
69}
70
71impl<E> RunEndBuffer<E>
72where
73    E: ArrowNativeType,
74{
75    /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], an `offset` and `len`
76    ///
77    /// # Panics
78    ///
79    /// - `buffer` does not contain strictly increasing values greater than zero
80    /// - the last value of `buffer` is less than `offset + len`
81    pub fn new(run_ends: ScalarBuffer<E>, offset: usize, len: usize) -> Self {
82        assert!(
83            run_ends.windows(2).all(|w| w[0] < w[1]),
84            "run-ends not strictly increasing"
85        );
86
87        if len != 0 {
88            assert!(!run_ends.is_empty(), "non-empty slice but empty run-ends");
89            let end = E::from_usize(offset.saturating_add(len)).unwrap();
90            assert!(
91                *run_ends.first().unwrap() > E::usize_as(0),
92                "run-ends not greater than 0"
93            );
94            assert!(
95                *run_ends.last().unwrap() >= end,
96                "slice beyond bounds of run-ends"
97            );
98        }
99
100        Self {
101            run_ends,
102            offset,
103            len,
104        }
105    }
106
107    /// Create a new [`RunEndBuffer`] from an [`ScalarBuffer`], an `offset` and `len`
108    ///
109    /// # Safety
110    ///
111    /// - `buffer` must contain strictly increasing values greater than zero
112    /// - The last value of `buffer` must be greater than or equal to `offset + len`
113    pub unsafe fn new_unchecked(run_ends: ScalarBuffer<E>, offset: usize, len: usize) -> Self {
114        Self {
115            run_ends,
116            offset,
117            len,
118        }
119    }
120
121    /// Returns the logical offset into the run-ends stored by this buffer
122    #[inline]
123    pub fn offset(&self) -> usize {
124        self.offset
125    }
126
127    /// Returns the logical length of the run-ends stored by this buffer
128    #[inline]
129    pub fn len(&self) -> usize {
130        self.len
131    }
132
133    /// Returns true if this buffer is empty
134    #[inline]
135    pub fn is_empty(&self) -> bool {
136        self.len == 0
137    }
138
139    /// Returns the values of this [`RunEndBuffer`] not including any offset
140    #[inline]
141    pub fn values(&self) -> &[E] {
142        &self.run_ends
143    }
144
145    /// Returns the maximum run-end encoded in the underlying buffer
146    #[inline]
147    pub fn max_value(&self) -> usize {
148        self.values().last().copied().unwrap_or_default().as_usize()
149    }
150
151    /// Performs a binary search to find the physical index for the given logical index
152    ///
153    /// The result is arbitrary if `logical_index >= self.len()`
154    pub fn get_physical_index(&self, logical_index: usize) -> usize {
155        let logical_index = E::usize_as(self.offset + logical_index);
156        let cmp = |p: &E| p.partial_cmp(&logical_index).unwrap();
157
158        match self.run_ends.binary_search_by(cmp) {
159            Ok(idx) => idx + 1,
160            Err(idx) => idx,
161        }
162    }
163
164    /// Returns the physical index at which the logical array starts
165    pub fn get_start_physical_index(&self) -> usize {
166        if self.offset == 0 || self.len == 0 {
167            return 0;
168        }
169        // Fallback to binary search
170        self.get_physical_index(0)
171    }
172
173    /// Returns the physical index at which the logical array ends
174    pub fn get_end_physical_index(&self) -> usize {
175        if self.len == 0 {
176            return 0;
177        }
178        if self.max_value() == self.offset + self.len {
179            return self.values().len() - 1;
180        }
181        // Fallback to binary search
182        self.get_physical_index(self.len - 1)
183    }
184
185    /// Slices this [`RunEndBuffer`] by the provided `offset` and `length`
186    pub fn slice(&self, offset: usize, len: usize) -> Self {
187        assert!(
188            offset.saturating_add(len) <= self.len,
189            "the length + offset of the sliced RunEndBuffer cannot exceed the existing length"
190        );
191        Self {
192            run_ends: self.run_ends.clone(),
193            offset: self.offset + offset,
194            len,
195        }
196    }
197
198    /// Returns the inner [`ScalarBuffer`]
199    pub fn inner(&self) -> &ScalarBuffer<E> {
200        &self.run_ends
201    }
202
203    /// Returns the inner [`ScalarBuffer`], consuming self
204    pub fn into_inner(self) -> ScalarBuffer<E> {
205        self.run_ends
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use crate::buffer::RunEndBuffer;
212
213    #[test]
214    fn test_zero_length_slice() {
215        let buffer = RunEndBuffer::new(vec![1_i32, 4_i32].into(), 0, 4);
216        assert_eq!(buffer.get_start_physical_index(), 0);
217        assert_eq!(buffer.get_end_physical_index(), 1);
218        assert_eq!(buffer.get_physical_index(3), 1);
219
220        for offset in 0..4 {
221            let sliced = buffer.slice(offset, 0);
222            assert_eq!(sliced.get_start_physical_index(), 0);
223            assert_eq!(sliced.get_end_physical_index(), 0);
224        }
225
226        let buffer = RunEndBuffer::new(Vec::<i32>::new().into(), 0, 0);
227        assert_eq!(buffer.get_start_physical_index(), 0);
228        assert_eq!(buffer.get_end_physical_index(), 0);
229    }
230}