arrow_select/
coalesce.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`BatchCoalescer`]  concatenates multiple [`RecordBatch`]es after
19//! operations such as [`filter`] and [`take`].
20//!
21//! [`filter`]: crate::filter::filter
22//! [`take`]: crate::take::take
23use crate::filter::filter_record_batch;
24use arrow_array::types::{BinaryViewType, StringViewType};
25use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
26use arrow_schema::{ArrowError, DataType, SchemaRef};
27use std::collections::VecDeque;
28use std::sync::Arc;
29// Originally From DataFusion's coalesce module:
30// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25
31
32mod byte_view;
33mod generic;
34
35use byte_view::InProgressByteViewArray;
36use generic::GenericInProgressArray;
37
38/// Concatenate multiple [`RecordBatch`]es
39///
40/// Implements the common pattern of incrementally creating output
41/// [`RecordBatch`]es of a specific size from an input stream of
42/// [`RecordBatch`]es.
43///
44/// This is useful after operations such as [`filter`] and [`take`] that produce
45/// smaller batches, and we want to coalesce them into larger batches for
46/// further processing.
47///
48/// [`filter`]: crate::filter::filter
49/// [`take`]: crate::take::take
50///
51/// See: <https://github.com/apache/arrow-rs/issues/6692>
52///
53/// # Example
54/// ```
55/// use arrow_array::record_batch;
56/// use arrow_select::coalesce::{BatchCoalescer};
57/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
58/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
59///
60/// // Create a `BatchCoalescer` that will produce batches with at least 4 rows
61/// let target_batch_size = 4;
62/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
63///
64/// // push the batches
65/// coalescer.push_batch(batch1).unwrap();
66/// // only pushed 3 rows (not yet 4, enough to produce a batch)
67/// assert!(coalescer.next_completed_batch().is_none());
68/// coalescer.push_batch(batch2).unwrap();
69/// // now we have 5 rows, so we can produce a batch
70/// let finished = coalescer.next_completed_batch().unwrap();
71/// // 4 rows came out (target batch size is 4)
72/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
73/// assert_eq!(finished, expected);
74///
75/// // Have no more input, but still have an in-progress batch
76/// assert!(coalescer.next_completed_batch().is_none());
77/// // We can finish the batch, which will produce the remaining rows
78/// coalescer.finish_buffered_batch().unwrap();
79/// let expected = record_batch!(("a", Int32, [5])).unwrap();
80/// assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
81///
82/// // The coalescer is now empty
83/// assert!(coalescer.next_completed_batch().is_none());
84/// ```
85///
86/// # Background
87///
88/// Generally speaking, larger [`RecordBatch`]es are more efficient to process
89/// than smaller [`RecordBatch`]es (until the CPU cache is exceeded) because
90/// there is fixed processing overhead per batch. This coalescer builds up these
91/// larger batches incrementally.
92///
93/// ```text
94/// ┌────────────────────┐
95/// │    RecordBatch     │
96/// │   num_rows = 100   │
97/// └────────────────────┘                 ┌────────────────────┐
98///                                        │                    │
99/// ┌────────────────────┐     Coalesce    │                    │
100/// │                    │      Batches    │                    │
101/// │    RecordBatch     │                 │                    │
102/// │   num_rows = 200   │  ─ ─ ─ ─ ─ ─ ▶  │                    │
103/// │                    │                 │    RecordBatch     │
104/// │                    │                 │   num_rows = 400   │
105/// └────────────────────┘                 │                    │
106///                                        │                    │
107/// ┌────────────────────┐                 │                    │
108/// │                    │                 │                    │
109/// │    RecordBatch     │                 │                    │
110/// │   num_rows = 100   │                 └────────────────────┘
111/// │                    │
112/// └────────────────────┘
113/// ```
114///
115/// # Notes:
116///
117/// 1. Output rows are produced in the same order as the input rows
118///
119/// 2. The output is a sequence of batches, with all but the last being at exactly
120///    `target_batch_size` rows.
121#[derive(Debug)]
122pub struct BatchCoalescer {
123    /// The input schema
124    schema: SchemaRef,
125    /// output batch size
126    batch_size: usize,
127    /// In-progress arrays
128    in_progress_arrays: Vec<Box<dyn InProgressArray>>,
129    /// Buffered row count. Always less than `batch_size`
130    buffered_rows: usize,
131    /// Completed batches
132    completed: VecDeque<RecordBatch>,
133}
134
135impl BatchCoalescer {
136    /// Create a new `BatchCoalescer`
137    ///
138    /// # Arguments
139    /// - `schema` - the schema of the output batches
140    /// - `batch_size` - the number of rows in each output batch.
141    ///   Typical values are `4096` or `8192` rows.
142    ///
143    pub fn new(schema: SchemaRef, batch_size: usize) -> Self {
144        let in_progress_arrays = schema
145            .fields()
146            .iter()
147            .map(|field| create_in_progress_array(field.data_type(), batch_size))
148            .collect::<Vec<_>>();
149
150        Self {
151            schema,
152            batch_size,
153            in_progress_arrays,
154            // We will for sure store at least one completed batch
155            completed: VecDeque::with_capacity(1),
156            buffered_rows: 0,
157        }
158    }
159
160    /// Return the schema of the output batches
161    pub fn schema(&self) -> SchemaRef {
162        Arc::clone(&self.schema)
163    }
164
165    /// Push a batch into the Coalescer after applying a filter
166    ///
167    /// This is semantically equivalent of calling [`Self::push_batch`]
168    /// with the results from  [`filter_record_batch`]
169    ///
170    /// # Example
171    /// ```
172    /// # use arrow_array::{record_batch, BooleanArray};
173    /// # use arrow_select::coalesce::BatchCoalescer;
174    /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
175    /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
176    /// // Apply a filter to each batch to pick the first and last row
177    /// let filter = BooleanArray::from(vec![true, false, true]);
178    /// // create a new Coalescer that targets creating 1000 row batches
179    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
180    /// coalescer.push_batch_with_filter(batch1, &filter);
181    /// coalescer.push_batch_with_filter(batch2, &filter);
182    /// // finsh and retrieve the created batch
183    /// coalescer.finish_buffered_batch().unwrap();
184    /// let completed_batch = coalescer.next_completed_batch().unwrap();
185    /// // filtered out 2 and 5:
186    /// let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
187    /// assert_eq!(completed_batch, expected_batch);
188    /// ```
189    pub fn push_batch_with_filter(
190        &mut self,
191        batch: RecordBatch,
192        filter: &BooleanArray,
193    ) -> Result<(), ArrowError> {
194        // TODO: optimize this to avoid materializing (copying the results
195        // of filter to a new batch)
196        let filtered_batch = filter_record_batch(&batch, filter)?;
197        self.push_batch(filtered_batch)
198    }
199
200    /// Push all the rows from `batch` into the Coalescer
201    ///
202    /// See [`Self::next_completed_batch()`] to retrieve any completed batches.
203    ///
204    /// # Example
205    /// ```
206    /// # use arrow_array::record_batch;
207    /// # use arrow_select::coalesce::BatchCoalescer;
208    /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
209    /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
210    /// // create a new Coalescer that targets creating 1000 row batches
211    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
212    /// coalescer.push_batch(batch1);
213    /// coalescer.push_batch(batch2);
214    /// // finsh and retrieve the created batch
215    /// coalescer.finish_buffered_batch().unwrap();
216    /// let completed_batch = coalescer.next_completed_batch().unwrap();
217    /// let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
218    /// assert_eq!(completed_batch, expected_batch);
219    /// ```
220    pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
221        let (_schema, arrays, mut num_rows) = batch.into_parts();
222        if num_rows == 0 {
223            return Ok(());
224        }
225
226        // setup input rows
227        assert_eq!(arrays.len(), self.in_progress_arrays.len());
228        self.in_progress_arrays
229            .iter_mut()
230            .zip(arrays)
231            .for_each(|(in_progress, array)| {
232                in_progress.set_source(Some(array));
233            });
234
235        // If pushing this batch would exceed the target batch size,
236        // finish the current batch and start a new one
237        let mut offset = 0;
238        while num_rows > (self.batch_size - self.buffered_rows) {
239            let remaining_rows = self.batch_size - self.buffered_rows;
240            debug_assert!(remaining_rows > 0);
241
242            // Copy remaining_rows from each array
243            for in_progress in self.in_progress_arrays.iter_mut() {
244                in_progress.copy_rows(offset, remaining_rows)?;
245            }
246
247            self.buffered_rows += remaining_rows;
248            offset += remaining_rows;
249            num_rows -= remaining_rows;
250
251            self.finish_buffered_batch()?;
252        }
253
254        // Add any the remaining rows to the buffer
255        self.buffered_rows += num_rows;
256        if num_rows > 0 {
257            for in_progress in self.in_progress_arrays.iter_mut() {
258                in_progress.copy_rows(offset, num_rows)?;
259            }
260        }
261
262        // If we have reached the target batch size, finalize the buffered batch
263        if self.buffered_rows >= self.batch_size {
264            self.finish_buffered_batch()?;
265        }
266
267        // clear in progress sources (to allow the memory to be freed)
268        for in_progress in self.in_progress_arrays.iter_mut() {
269            in_progress.set_source(None);
270        }
271
272        Ok(())
273    }
274
275    /// Concatenates any buffered batches into a single `RecordBatch` and
276    /// clears any output buffers
277    ///
278    /// Normally this is called when the input stream is exhausted, and
279    /// we want to finalize the last batch of rows.
280    ///
281    /// See [`Self::next_completed_batch()`] for the completed batches.
282    pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
283        if self.buffered_rows == 0 {
284            return Ok(());
285        }
286        let new_arrays = self
287            .in_progress_arrays
288            .iter_mut()
289            .map(|array| array.finish())
290            .collect::<Result<Vec<_>, ArrowError>>()?;
291
292        for (array, field) in new_arrays.iter().zip(self.schema.fields().iter()) {
293            debug_assert_eq!(array.data_type(), field.data_type());
294            debug_assert_eq!(array.len(), self.buffered_rows);
295        }
296
297        // SAFETY: each array was created of the correct type and length.
298        let batch = unsafe {
299            RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays, self.buffered_rows)
300        };
301
302        self.buffered_rows = 0;
303        self.completed.push_back(batch);
304        Ok(())
305    }
306
307    /// Returns true if there is any buffered data
308    pub fn is_empty(&self) -> bool {
309        self.buffered_rows == 0 && self.completed.is_empty()
310    }
311
312    /// Returns true if there are any completed batches
313    pub fn has_completed_batch(&self) -> bool {
314        !self.completed.is_empty()
315    }
316
317    /// Returns the next completed batch, if any
318    pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
319        self.completed.pop_front()
320    }
321}
322
323/// Return a new `InProgressArray` for the given data type
324fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
325    match data_type {
326        DataType::Utf8View => Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
327        DataType::BinaryView => {
328            Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
329        }
330        _ => Box::new(GenericInProgressArray::new()),
331    }
332}
333
334/// Incrementally builds up arrays
335///
336/// [`GenericInProgressArray`] is the default implementation that buffers
337/// arrays and uses other kernels concatenates them when finished.
338///
339/// Some types have specialized implementations for this array types (e.g.,
340/// [`StringViewArray`], etc.).
341///
342/// [`StringViewArray`]: arrow_array::StringViewArray
343trait InProgressArray: std::fmt::Debug + Send + Sync {
344    /// Set the source array.
345    ///
346    /// Calls to [`Self::copy_rows`] will copy rows from this array into the
347    /// current in-progress array
348    fn set_source(&mut self, source: Option<ArrayRef>);
349
350    /// Copy rows from the current source array into the in-progress array
351    ///
352    /// The source array is set by [`Self::set_source`].
353    ///
354    /// Return an error if the source array is not set
355    fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
356
357    /// Finish the currently in-progress array and return it as an `ArrayRef`
358    fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
359}
360
361#[cfg(test)]
362mod tests {
363    use super::*;
364    use crate::concat::concat_batches;
365    use arrow_array::builder::StringViewBuilder;
366    use arrow_array::cast::AsArray;
367    use arrow_array::{BinaryViewArray, RecordBatchOptions, StringViewArray, UInt32Array};
368    use arrow_schema::{DataType, Field, Schema};
369    use std::ops::Range;
370
371    #[test]
372    fn test_coalesce() {
373        let batch = uint32_batch(0..8);
374        Test::new()
375            .with_batches(std::iter::repeat_n(batch, 10))
376            // expected output is exactly 21 rows (except for the final batch)
377            .with_batch_size(21)
378            .with_expected_output_sizes(vec![21, 21, 21, 17])
379            .run();
380    }
381
382    #[test]
383    fn test_coalesce_one_by_one() {
384        let batch = uint32_batch(0..1); // single row input
385        Test::new()
386            .with_batches(std::iter::repeat_n(batch, 97))
387            // expected output is exactly 20 rows (except for the final batch)
388            .with_batch_size(20)
389            .with_expected_output_sizes(vec![20, 20, 20, 20, 17])
390            .run();
391    }
392
393    #[test]
394    fn test_coalesce_empty() {
395        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
396
397        Test::new()
398            .with_batches(vec![])
399            .with_schema(schema)
400            .with_batch_size(21)
401            .with_expected_output_sizes(vec![])
402            .run();
403    }
404
405    #[test]
406    fn test_single_large_batch_greater_than_target() {
407        // test a single large batch
408        let batch = uint32_batch(0..4096);
409        Test::new()
410            .with_batch(batch)
411            .with_batch_size(1000)
412            .with_expected_output_sizes(vec![1000, 1000, 1000, 1000, 96])
413            .run();
414    }
415
416    #[test]
417    fn test_single_large_batch_smaller_than_target() {
418        // test a single large batch
419        let batch = uint32_batch(0..4096);
420        Test::new()
421            .with_batch(batch)
422            .with_batch_size(8192)
423            .with_expected_output_sizes(vec![4096])
424            .run();
425    }
426
427    #[test]
428    fn test_single_large_batch_equal_to_target() {
429        // test a single large batch
430        let batch = uint32_batch(0..4096);
431        Test::new()
432            .with_batch(batch)
433            .with_batch_size(4096)
434            .with_expected_output_sizes(vec![4096])
435            .run();
436    }
437
438    #[test]
439    fn test_single_large_batch_equally_divisible_in_target() {
440        // test a single large batch
441        let batch = uint32_batch(0..4096);
442        Test::new()
443            .with_batch(batch)
444            .with_batch_size(1024)
445            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024])
446            .run();
447    }
448
449    #[test]
450    fn test_empty_schema() {
451        let schema = Schema::empty();
452        let batch = RecordBatch::new_empty(schema.into());
453        Test::new()
454            .with_batch(batch)
455            .with_expected_output_sizes(vec![])
456            .run();
457    }
458
459    #[test]
460    fn test_string_view_no_views() {
461        let output_batches = Test::new()
462            // both input batches have no views, so no need to compact
463            .with_batch(stringview_batch([Some("foo"), Some("bar")]))
464            .with_batch(stringview_batch([Some("baz"), Some("qux")]))
465            .with_expected_output_sizes(vec![4])
466            .run();
467
468        expect_buffer_layout(
469            col_as_string_view("c0", output_batches.first().unwrap()),
470            vec![],
471        );
472    }
473
474    #[test]
475    fn test_string_view_batch_small_no_compact() {
476        // view with only short strings (no buffers) --> no need to compact
477        let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]);
478        let output_batches = Test::new()
479            .with_batch(batch.clone())
480            .with_expected_output_sizes(vec![1000])
481            .run();
482
483        let array = col_as_string_view("c0", &batch);
484        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
485        assert_eq!(array.data_buffers().len(), 0);
486        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
487
488        expect_buffer_layout(gc_array, vec![]);
489    }
490
491    #[test]
492    fn test_string_view_batch_large_no_compact() {
493        // view with large strings (has buffers) but full --> no need to compact
494        let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
495        let output_batches = Test::new()
496            .with_batch(batch.clone())
497            .with_batch_size(1000)
498            .with_expected_output_sizes(vec![1000])
499            .run();
500
501        let array = col_as_string_view("c0", &batch);
502        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
503        assert_eq!(array.data_buffers().len(), 5);
504        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
505
506        expect_buffer_layout(
507            gc_array,
508            vec![
509                ExpectedLayout {
510                    len: 8190,
511                    capacity: 8192,
512                },
513                ExpectedLayout {
514                    len: 8190,
515                    capacity: 8192,
516                },
517                ExpectedLayout {
518                    len: 8190,
519                    capacity: 8192,
520                },
521                ExpectedLayout {
522                    len: 8190,
523                    capacity: 8192,
524                },
525                ExpectedLayout {
526                    len: 2240,
527                    capacity: 8192,
528                },
529            ],
530        );
531    }
532
533    #[test]
534    fn test_string_view_batch_small_with_buffers_no_compact() {
535        // view with buffers but only short views
536        let short_strings = std::iter::repeat(Some("SmallString"));
537        let long_strings = std::iter::once(Some("This string is longer than 12 bytes"));
538        // 20 short strings, then a long ones
539        let values = short_strings.take(20).chain(long_strings);
540        let batch = stringview_batch_repeated(1000, values)
541            // take only 10 short strings (no long ones)
542            .slice(5, 10);
543        let output_batches = Test::new()
544            .with_batch(batch.clone())
545            .with_batch_size(1000)
546            .with_expected_output_sizes(vec![10])
547            .run();
548
549        let array = col_as_string_view("c0", &batch);
550        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
551        assert_eq!(array.data_buffers().len(), 1); // input has one buffer
552        assert_eq!(gc_array.data_buffers().len(), 0); // output has no buffers as only short strings
553    }
554
555    #[test]
556    fn test_string_view_batch_large_slice_compact() {
557        // view with large strings (has buffers) and only partially used  --> no need to compact
558        let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")])
559            // slice only 22 rows, so most of the buffer is not used
560            .slice(11, 22);
561
562        let output_batches = Test::new()
563            .with_batch(batch.clone())
564            .with_batch_size(1000)
565            .with_expected_output_sizes(vec![22])
566            .run();
567
568        let array = col_as_string_view("c0", &batch);
569        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
570        assert_eq!(array.data_buffers().len(), 5);
571
572        expect_buffer_layout(
573            gc_array,
574            vec![ExpectedLayout {
575                len: 770,
576                capacity: 8192,
577            }],
578        );
579    }
580
581    #[test]
582    fn test_string_view_mixed() {
583        let large_view_batch =
584            stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
585        let small_view_batch = stringview_batch_repeated(1000, [Some("SmallString")]);
586        let mixed_batch = stringview_batch_repeated(
587            1000,
588            [Some("This string is longer than 12 bytes"), Some("Small")],
589        );
590        let mixed_batch_nulls = stringview_batch_repeated(
591            1000,
592            [
593                Some("This string is longer than 12 bytes"),
594                Some("Small"),
595                None,
596            ],
597        );
598
599        // Several batches with mixed inline / non inline
600        // 4k rows in
601        let output_batches = Test::new()
602            .with_batch(large_view_batch.clone())
603            .with_batch(small_view_batch)
604            // this batch needs to be compacted (less than 1/2 full)
605            .with_batch(large_view_batch.slice(10, 20))
606            .with_batch(mixed_batch_nulls)
607            // this batch needs to be compacted (less than 1/2 full)
608            .with_batch(large_view_batch.slice(10, 20))
609            .with_batch(mixed_batch)
610            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
611            .run();
612
613        expect_buffer_layout(
614            col_as_string_view("c0", output_batches.first().unwrap()),
615            vec![
616                ExpectedLayout {
617                    len: 8190,
618                    capacity: 8192,
619                },
620                ExpectedLayout {
621                    len: 8190,
622                    capacity: 8192,
623                },
624                ExpectedLayout {
625                    len: 8190,
626                    capacity: 8192,
627                },
628                ExpectedLayout {
629                    len: 8190,
630                    capacity: 8192,
631                },
632                ExpectedLayout {
633                    len: 2240,
634                    capacity: 8192,
635                },
636            ],
637        );
638    }
639
640    #[test]
641    fn test_string_view_many_small_compact() {
642        // The strings are 28 long, so each batch has 400 * 28 = 5600 bytes
643        let batch = stringview_batch_repeated(
644            400,
645            [Some("This string is 28 bytes long"), Some("small string")],
646        );
647        let output_batches = Test::new()
648            // First allocated buffer is 8kb.
649            // Appending five batches of 5600 bytes will use 5600 * 5 = 28kb (8kb, an 16kb and 32kbkb)
650            .with_batch(batch.clone())
651            .with_batch(batch.clone())
652            .with_batch(batch.clone())
653            .with_batch(batch.clone())
654            .with_batch(batch.clone())
655            .with_batch_size(8000)
656            .with_expected_output_sizes(vec![2000]) // only 2000 rows total
657            .run();
658
659        // expect a nice even distribution of buffers
660        expect_buffer_layout(
661            col_as_string_view("c0", output_batches.first().unwrap()),
662            vec![
663                ExpectedLayout {
664                    len: 8176,
665                    capacity: 8192,
666                },
667                ExpectedLayout {
668                    len: 16380,
669                    capacity: 16384,
670                },
671                ExpectedLayout {
672                    len: 3444,
673                    capacity: 32768,
674                },
675            ],
676        );
677    }
678
679    #[test]
680    fn test_string_view_many_small_boundary() {
681        // The strings are designed to exactly fit into buffers that are powers of 2 long
682        let batch = stringview_batch_repeated(100, [Some("This string is a power of two=32")]);
683        let output_batches = Test::new()
684            .with_batches(std::iter::repeat(batch).take(20))
685            .with_batch_size(900)
686            .with_expected_output_sizes(vec![900, 900, 200])
687            .run();
688
689        // expect each buffer to be entirely full except the last one
690        expect_buffer_layout(
691            col_as_string_view("c0", output_batches.first().unwrap()),
692            vec![
693                ExpectedLayout {
694                    len: 8192,
695                    capacity: 8192,
696                },
697                ExpectedLayout {
698                    len: 16384,
699                    capacity: 16384,
700                },
701                ExpectedLayout {
702                    len: 4224,
703                    capacity: 32768,
704                },
705            ],
706        );
707    }
708
709    #[test]
710    fn test_string_view_large_small() {
711        // The strings are 37 bytes long, so each batch has 200 * 28 = 5600 bytes
712        let mixed_batch = stringview_batch_repeated(
713            400,
714            [Some("This string is 28 bytes long"), Some("small string")],
715        );
716        // These strings aren't copied, this array has an 8k buffer
717        let all_large = stringview_batch_repeated(
718            100,
719            [Some(
720                "This buffer has only large strings in it so there are no buffer copies",
721            )],
722        );
723
724        let output_batches = Test::new()
725            // First allocated buffer is 8kb.
726            // Appending five batches of 5600 bytes will use 5600 * 5 = 28kb (8kb, an 16kb and 32kbkb)
727            .with_batch(mixed_batch.clone())
728            .with_batch(mixed_batch.clone())
729            .with_batch(all_large.clone())
730            .with_batch(mixed_batch.clone())
731            .with_batch(all_large.clone())
732            .with_batch_size(8000)
733            .with_expected_output_sizes(vec![1400])
734            .run();
735
736        expect_buffer_layout(
737            col_as_string_view("c0", output_batches.first().unwrap()),
738            vec![
739                ExpectedLayout {
740                    len: 8176,
741                    capacity: 8192,
742                },
743                // this buffer was allocated but not used when the all_large batch was pushed
744                ExpectedLayout {
745                    len: 3024,
746                    capacity: 16384,
747                },
748                ExpectedLayout {
749                    len: 7000,
750                    capacity: 8192,
751                },
752                ExpectedLayout {
753                    len: 5600,
754                    capacity: 32768,
755                },
756                ExpectedLayout {
757                    len: 7000,
758                    capacity: 8192,
759                },
760            ],
761        );
762    }
763
764    #[test]
765    fn test_binary_view() {
766        let values: Vec<Option<&[u8]>> = vec![
767            Some(b"foo"),
768            None,
769            Some(b"A longer string that is more than 12 bytes"),
770        ];
771
772        let binary_view =
773            BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
774        let batch =
775            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
776
777        Test::new()
778            .with_batch(batch.clone())
779            .with_batch(batch.clone())
780            .with_batch_size(512)
781            .with_expected_output_sizes(vec![512, 512, 512, 464])
782            .run();
783    }
784
785    #[derive(Debug, Clone, PartialEq)]
786    struct ExpectedLayout {
787        len: usize,
788        capacity: usize,
789    }
790
791    /// Asserts that the buffer layout of the specified StringViewArray matches the expected layout
792    fn expect_buffer_layout(array: &StringViewArray, expected: Vec<ExpectedLayout>) {
793        let actual = array
794            .data_buffers()
795            .iter()
796            .map(|b| ExpectedLayout {
797                len: b.len(),
798                capacity: b.capacity(),
799            })
800            .collect::<Vec<_>>();
801
802        assert_eq!(
803            actual, expected,
804            "Expected buffer layout {expected:#?} but got {actual:#?}"
805        );
806    }
807
808    /// Test for [`BatchCoalescer`]
809    ///
810    /// Pushes the input batches to the coalescer and verifies that the resulting
811    /// batches have the expected number of rows and contents.
812    #[derive(Debug, Clone)]
813    struct Test {
814        /// Batches to feed to the coalescer.
815        input_batches: Vec<RecordBatch>,
816        /// The schema. If not provided, the first batch's schema is used.
817        schema: Option<SchemaRef>,
818        /// Expected output sizes of the resulting batches
819        expected_output_sizes: Vec<usize>,
820        /// target batch size (default to 1024)
821        target_batch_size: usize,
822    }
823
824    impl Default for Test {
825        fn default() -> Self {
826            Self {
827                input_batches: vec![],
828                schema: None,
829                expected_output_sizes: vec![],
830                target_batch_size: 1024,
831            }
832        }
833    }
834
835    impl Test {
836        fn new() -> Self {
837            Self::default()
838        }
839
840        /// Set the target batch size
841        fn with_batch_size(mut self, target_batch_size: usize) -> Self {
842            self.target_batch_size = target_batch_size;
843            self
844        }
845
846        /// Extend the input batches with `batch`
847        fn with_batch(mut self, batch: RecordBatch) -> Self {
848            self.input_batches.push(batch);
849            self
850        }
851
852        /// Extends the input batches with `batches`
853        fn with_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
854            self.input_batches.extend(batches);
855            self
856        }
857
858        /// Specifies the schema for the test
859        fn with_schema(mut self, schema: SchemaRef) -> Self {
860            self.schema = Some(schema);
861            self
862        }
863
864        /// Extends `sizes` to expected output sizes
865        fn with_expected_output_sizes(mut self, sizes: impl IntoIterator<Item = usize>) -> Self {
866            self.expected_output_sizes.extend(sizes);
867            self
868        }
869
870        /// Runs the test -- see documentation on [`Test`] for details
871        ///
872        /// Returns the resulting output batches
873        fn run(self) -> Vec<RecordBatch> {
874            let Self {
875                input_batches,
876                schema,
877                target_batch_size,
878                expected_output_sizes,
879            } = self;
880
881            let schema = schema.unwrap_or_else(|| input_batches[0].schema());
882
883            // create a single large input batch for output comparison
884            let single_input_batch = concat_batches(&schema, &input_batches).unwrap();
885
886            let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
887
888            let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
889            for batch in input_batches {
890                coalescer.push_batch(batch).unwrap();
891            }
892            assert_eq!(schema, coalescer.schema());
893
894            if had_input {
895                assert!(!coalescer.is_empty(), "Coalescer should not be empty");
896            } else {
897                assert!(coalescer.is_empty(), "Coalescer should be empty");
898            }
899
900            coalescer.finish_buffered_batch().unwrap();
901            if had_input {
902                assert!(
903                    coalescer.has_completed_batch(),
904                    "Coalescer should have completed batches"
905                );
906            }
907
908            let mut output_batches = vec![];
909            while let Some(batch) = coalescer.next_completed_batch() {
910                output_batches.push(batch);
911            }
912
913            // make sure we got the expected number of output batches and content
914            let mut starting_idx = 0;
915            let actual_output_sizes: Vec<usize> =
916                output_batches.iter().map(|b| b.num_rows()).collect();
917            assert_eq!(
918                expected_output_sizes, actual_output_sizes,
919                "Unexpected number of rows in output batches\n\
920                Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}"
921            );
922            let iter = expected_output_sizes
923                .iter()
924                .zip(output_batches.iter())
925                .enumerate();
926
927            for (i, (expected_size, batch)) in iter {
928                // compare the contents of the batch after normalization (using
929                // `==` compares the underlying memory layout too)
930                let expected_batch = single_input_batch.slice(starting_idx, *expected_size);
931                let expected_batch = normalize_batch(expected_batch);
932                let batch = normalize_batch(batch.clone());
933                assert_eq!(
934                    expected_batch, batch,
935                    "Unexpected content in batch {i}:\
936                    \n\nExpected:\n{expected_batch:#?}\n\nActual:\n{batch:#?}"
937                );
938                starting_idx += *expected_size;
939            }
940            output_batches
941        }
942    }
943
944    /// Return a RecordBatch with a UInt32Array with the specified range
945    fn uint32_batch(range: Range<u32>) -> RecordBatch {
946        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
947
948        RecordBatch::try_new(
949            Arc::clone(&schema),
950            vec![Arc::new(UInt32Array::from_iter_values(range))],
951        )
952        .unwrap()
953    }
954
955    /// Return a RecordBatch with a StringViewArray with (only) the specified values
956    fn stringview_batch<'a>(values: impl IntoIterator<Item = Option<&'a str>>) -> RecordBatch {
957        let schema = Arc::new(Schema::new(vec![Field::new(
958            "c0",
959            DataType::Utf8View,
960            false,
961        )]));
962
963        RecordBatch::try_new(
964            Arc::clone(&schema),
965            vec![Arc::new(StringViewArray::from_iter(values))],
966        )
967        .unwrap()
968    }
969
970    /// Return a RecordBatch with a StringViewArray with num_rows by repating
971    /// values over and over.
972    fn stringview_batch_repeated<'a>(
973        num_rows: usize,
974        values: impl IntoIterator<Item = Option<&'a str>>,
975    ) -> RecordBatch {
976        let schema = Arc::new(Schema::new(vec![Field::new(
977            "c0",
978            DataType::Utf8View,
979            true,
980        )]));
981
982        // Repeat the values to a total of num_rows
983        let values: Vec<_> = values.into_iter().collect();
984        let values_iter = std::iter::repeat(values.iter())
985            .flatten()
986            .cloned()
987            .take(num_rows);
988
989        let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
990        for val in values_iter {
991            builder.append_option(val);
992        }
993
994        let array = builder.finish();
995        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
996    }
997
998    /// Returns the named column as a StringViewArray
999    fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
1000        batch
1001            .column_by_name(name)
1002            .expect("column not found")
1003            .as_string_view_opt()
1004            .expect("column is not a string view")
1005    }
1006
1007    /// Normalize the `RecordBatch` so that the memory layout is consistent
1008    /// (e.g. StringArray is compacted).
1009    fn normalize_batch(batch: RecordBatch) -> RecordBatch {
1010        // Only need to normalize StringViews (as == also tests for memory layout)
1011        let (schema, mut columns, row_count) = batch.into_parts();
1012
1013        for column in columns.iter_mut() {
1014            let Some(string_view) = column.as_string_view_opt() else {
1015                continue;
1016            };
1017
1018            // Re-create the StringViewArray to ensure memory layout is
1019            // consistent
1020            let mut builder = StringViewBuilder::new();
1021            for s in string_view.iter() {
1022                builder.append_option(s);
1023            }
1024            // Update the column with the new StringViewArray
1025            *column = Arc::new(builder.finish());
1026        }
1027
1028        let options = RecordBatchOptions::new().with_row_count(Some(row_count));
1029        RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
1030    }
1031}