arrow_select/coalesce.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`BatchCoalescer`] concatenates multiple [`RecordBatch`]es after
19//! operations such as [`filter`] and [`take`].
20//!
21//! [`filter`]: crate::filter::filter
22//! [`take`]: crate::take::take
23use crate::filter::filter_record_batch;
24use arrow_array::types::{BinaryViewType, StringViewType};
25use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
26use arrow_schema::{ArrowError, DataType, SchemaRef};
27use std::collections::VecDeque;
28use std::sync::Arc;
29// Originally From DataFusion's coalesce module:
30// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25
31
32mod byte_view;
33mod generic;
34
35use byte_view::InProgressByteViewArray;
36use generic::GenericInProgressArray;
37
38/// Concatenate multiple [`RecordBatch`]es
39///
40/// Implements the common pattern of incrementally creating output
41/// [`RecordBatch`]es of a specific size from an input stream of
42/// [`RecordBatch`]es.
43///
44/// This is useful after operations such as [`filter`] and [`take`] that produce
45/// smaller batches, and we want to coalesce them into larger batches for
46/// further processing.
47///
48/// [`filter`]: crate::filter::filter
49/// [`take`]: crate::take::take
50///
51/// See: <https://github.com/apache/arrow-rs/issues/6692>
52///
53/// # Example
54/// ```
55/// use arrow_array::record_batch;
56/// use arrow_select::coalesce::{BatchCoalescer};
57/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
58/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
59///
60/// // Create a `BatchCoalescer` that will produce batches with at least 4 rows
61/// let target_batch_size = 4;
62/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
63///
64/// // push the batches
65/// coalescer.push_batch(batch1).unwrap();
66/// // only pushed 3 rows (not yet 4, enough to produce a batch)
67/// assert!(coalescer.next_completed_batch().is_none());
68/// coalescer.push_batch(batch2).unwrap();
69/// // now we have 5 rows, so we can produce a batch
70/// let finished = coalescer.next_completed_batch().unwrap();
71/// // 4 rows came out (target batch size is 4)
72/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
73/// assert_eq!(finished, expected);
74///
75/// // Have no more input, but still have an in-progress batch
76/// assert!(coalescer.next_completed_batch().is_none());
77/// // We can finish the batch, which will produce the remaining rows
78/// coalescer.finish_buffered_batch().unwrap();
79/// let expected = record_batch!(("a", Int32, [5])).unwrap();
80/// assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
81///
82/// // The coalescer is now empty
83/// assert!(coalescer.next_completed_batch().is_none());
84/// ```
85///
86/// # Background
87///
88/// Generally speaking, larger [`RecordBatch`]es are more efficient to process
89/// than smaller [`RecordBatch`]es (until the CPU cache is exceeded) because
90/// there is fixed processing overhead per batch. This coalescer builds up these
91/// larger batches incrementally.
92///
93/// ```text
94/// ┌────────────────────┐
95/// │ RecordBatch │
96/// │ num_rows = 100 │
97/// └────────────────────┘ ┌────────────────────┐
98/// │ │
99/// ┌────────────────────┐ Coalesce │ │
100/// │ │ Batches │ │
101/// │ RecordBatch │ │ │
102/// │ num_rows = 200 │ ─ ─ ─ ─ ─ ─ ▶ │ │
103/// │ │ │ RecordBatch │
104/// │ │ │ num_rows = 400 │
105/// └────────────────────┘ │ │
106/// │ │
107/// ┌────────────────────┐ │ │
108/// │ │ │ │
109/// │ RecordBatch │ │ │
110/// │ num_rows = 100 │ └────────────────────┘
111/// │ │
112/// └────────────────────┘
113/// ```
114///
115/// # Notes:
116///
117/// 1. Output rows are produced in the same order as the input rows
118///
119/// 2. The output is a sequence of batches, with all but the last being at exactly
120/// `target_batch_size` rows.
121#[derive(Debug)]
122pub struct BatchCoalescer {
123 /// The input schema
124 schema: SchemaRef,
125 /// output batch size
126 batch_size: usize,
127 /// In-progress arrays
128 in_progress_arrays: Vec<Box<dyn InProgressArray>>,
129 /// Buffered row count. Always less than `batch_size`
130 buffered_rows: usize,
131 /// Completed batches
132 completed: VecDeque<RecordBatch>,
133}
134
135impl BatchCoalescer {
136 /// Create a new `BatchCoalescer`
137 ///
138 /// # Arguments
139 /// - `schema` - the schema of the output batches
140 /// - `batch_size` - the number of rows in each output batch.
141 /// Typical values are `4096` or `8192` rows.
142 ///
143 pub fn new(schema: SchemaRef, batch_size: usize) -> Self {
144 let in_progress_arrays = schema
145 .fields()
146 .iter()
147 .map(|field| create_in_progress_array(field.data_type(), batch_size))
148 .collect::<Vec<_>>();
149
150 Self {
151 schema,
152 batch_size,
153 in_progress_arrays,
154 // We will for sure store at least one completed batch
155 completed: VecDeque::with_capacity(1),
156 buffered_rows: 0,
157 }
158 }
159
160 /// Return the schema of the output batches
161 pub fn schema(&self) -> SchemaRef {
162 Arc::clone(&self.schema)
163 }
164
165 /// Push a batch into the Coalescer after applying a filter
166 ///
167 /// This is semantically equivalent of calling [`Self::push_batch`]
168 /// with the results from [`filter_record_batch`]
169 ///
170 /// # Example
171 /// ```
172 /// # use arrow_array::{record_batch, BooleanArray};
173 /// # use arrow_select::coalesce::BatchCoalescer;
174 /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
175 /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
176 /// // Apply a filter to each batch to pick the first and last row
177 /// let filter = BooleanArray::from(vec![true, false, true]);
178 /// // create a new Coalescer that targets creating 1000 row batches
179 /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
180 /// coalescer.push_batch_with_filter(batch1, &filter);
181 /// coalescer.push_batch_with_filter(batch2, &filter);
182 /// // finsh and retrieve the created batch
183 /// coalescer.finish_buffered_batch().unwrap();
184 /// let completed_batch = coalescer.next_completed_batch().unwrap();
185 /// // filtered out 2 and 5:
186 /// let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
187 /// assert_eq!(completed_batch, expected_batch);
188 /// ```
189 pub fn push_batch_with_filter(
190 &mut self,
191 batch: RecordBatch,
192 filter: &BooleanArray,
193 ) -> Result<(), ArrowError> {
194 // TODO: optimize this to avoid materializing (copying the results
195 // of filter to a new batch)
196 let filtered_batch = filter_record_batch(&batch, filter)?;
197 self.push_batch(filtered_batch)
198 }
199
200 /// Push all the rows from `batch` into the Coalescer
201 ///
202 /// See [`Self::next_completed_batch()`] to retrieve any completed batches.
203 ///
204 /// # Example
205 /// ```
206 /// # use arrow_array::record_batch;
207 /// # use arrow_select::coalesce::BatchCoalescer;
208 /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
209 /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
210 /// // create a new Coalescer that targets creating 1000 row batches
211 /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
212 /// coalescer.push_batch(batch1);
213 /// coalescer.push_batch(batch2);
214 /// // finsh and retrieve the created batch
215 /// coalescer.finish_buffered_batch().unwrap();
216 /// let completed_batch = coalescer.next_completed_batch().unwrap();
217 /// let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
218 /// assert_eq!(completed_batch, expected_batch);
219 /// ```
220 pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
221 let (_schema, arrays, mut num_rows) = batch.into_parts();
222 if num_rows == 0 {
223 return Ok(());
224 }
225
226 // setup input rows
227 assert_eq!(arrays.len(), self.in_progress_arrays.len());
228 self.in_progress_arrays
229 .iter_mut()
230 .zip(arrays)
231 .for_each(|(in_progress, array)| {
232 in_progress.set_source(Some(array));
233 });
234
235 // If pushing this batch would exceed the target batch size,
236 // finish the current batch and start a new one
237 let mut offset = 0;
238 while num_rows > (self.batch_size - self.buffered_rows) {
239 let remaining_rows = self.batch_size - self.buffered_rows;
240 debug_assert!(remaining_rows > 0);
241
242 // Copy remaining_rows from each array
243 for in_progress in self.in_progress_arrays.iter_mut() {
244 in_progress.copy_rows(offset, remaining_rows)?;
245 }
246
247 self.buffered_rows += remaining_rows;
248 offset += remaining_rows;
249 num_rows -= remaining_rows;
250
251 self.finish_buffered_batch()?;
252 }
253
254 // Add any the remaining rows to the buffer
255 self.buffered_rows += num_rows;
256 if num_rows > 0 {
257 for in_progress in self.in_progress_arrays.iter_mut() {
258 in_progress.copy_rows(offset, num_rows)?;
259 }
260 }
261
262 // If we have reached the target batch size, finalize the buffered batch
263 if self.buffered_rows >= self.batch_size {
264 self.finish_buffered_batch()?;
265 }
266
267 // clear in progress sources (to allow the memory to be freed)
268 for in_progress in self.in_progress_arrays.iter_mut() {
269 in_progress.set_source(None);
270 }
271
272 Ok(())
273 }
274
275 /// Concatenates any buffered batches into a single `RecordBatch` and
276 /// clears any output buffers
277 ///
278 /// Normally this is called when the input stream is exhausted, and
279 /// we want to finalize the last batch of rows.
280 ///
281 /// See [`Self::next_completed_batch()`] for the completed batches.
282 pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
283 if self.buffered_rows == 0 {
284 return Ok(());
285 }
286 let new_arrays = self
287 .in_progress_arrays
288 .iter_mut()
289 .map(|array| array.finish())
290 .collect::<Result<Vec<_>, ArrowError>>()?;
291
292 for (array, field) in new_arrays.iter().zip(self.schema.fields().iter()) {
293 debug_assert_eq!(array.data_type(), field.data_type());
294 debug_assert_eq!(array.len(), self.buffered_rows);
295 }
296
297 // SAFETY: each array was created of the correct type and length.
298 let batch = unsafe {
299 RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays, self.buffered_rows)
300 };
301
302 self.buffered_rows = 0;
303 self.completed.push_back(batch);
304 Ok(())
305 }
306
307 /// Returns true if there is any buffered data
308 pub fn is_empty(&self) -> bool {
309 self.buffered_rows == 0 && self.completed.is_empty()
310 }
311
312 /// Returns true if there are any completed batches
313 pub fn has_completed_batch(&self) -> bool {
314 !self.completed.is_empty()
315 }
316
317 /// Returns the next completed batch, if any
318 pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
319 self.completed.pop_front()
320 }
321}
322
323/// Return a new `InProgressArray` for the given data type
324fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
325 match data_type {
326 DataType::Utf8View => Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
327 DataType::BinaryView => {
328 Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
329 }
330 _ => Box::new(GenericInProgressArray::new()),
331 }
332}
333
334/// Incrementally builds up arrays
335///
336/// [`GenericInProgressArray`] is the default implementation that buffers
337/// arrays and uses other kernels concatenates them when finished.
338///
339/// Some types have specialized implementations for this array types (e.g.,
340/// [`StringViewArray`], etc.).
341///
342/// [`StringViewArray`]: arrow_array::StringViewArray
343trait InProgressArray: std::fmt::Debug + Send + Sync {
344 /// Set the source array.
345 ///
346 /// Calls to [`Self::copy_rows`] will copy rows from this array into the
347 /// current in-progress array
348 fn set_source(&mut self, source: Option<ArrayRef>);
349
350 /// Copy rows from the current source array into the in-progress array
351 ///
352 /// The source array is set by [`Self::set_source`].
353 ///
354 /// Return an error if the source array is not set
355 fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
356
357 /// Finish the currently in-progress array and return it as an `ArrayRef`
358 fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
359}
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364 use crate::concat::concat_batches;
365 use arrow_array::builder::StringViewBuilder;
366 use arrow_array::cast::AsArray;
367 use arrow_array::{BinaryViewArray, RecordBatchOptions, StringViewArray, UInt32Array};
368 use arrow_schema::{DataType, Field, Schema};
369 use std::ops::Range;
370
371 #[test]
372 fn test_coalesce() {
373 let batch = uint32_batch(0..8);
374 Test::new()
375 .with_batches(std::iter::repeat_n(batch, 10))
376 // expected output is exactly 21 rows (except for the final batch)
377 .with_batch_size(21)
378 .with_expected_output_sizes(vec![21, 21, 21, 17])
379 .run();
380 }
381
382 #[test]
383 fn test_coalesce_one_by_one() {
384 let batch = uint32_batch(0..1); // single row input
385 Test::new()
386 .with_batches(std::iter::repeat_n(batch, 97))
387 // expected output is exactly 20 rows (except for the final batch)
388 .with_batch_size(20)
389 .with_expected_output_sizes(vec![20, 20, 20, 20, 17])
390 .run();
391 }
392
393 #[test]
394 fn test_coalesce_empty() {
395 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
396
397 Test::new()
398 .with_batches(vec![])
399 .with_schema(schema)
400 .with_batch_size(21)
401 .with_expected_output_sizes(vec![])
402 .run();
403 }
404
405 #[test]
406 fn test_single_large_batch_greater_than_target() {
407 // test a single large batch
408 let batch = uint32_batch(0..4096);
409 Test::new()
410 .with_batch(batch)
411 .with_batch_size(1000)
412 .with_expected_output_sizes(vec![1000, 1000, 1000, 1000, 96])
413 .run();
414 }
415
416 #[test]
417 fn test_single_large_batch_smaller_than_target() {
418 // test a single large batch
419 let batch = uint32_batch(0..4096);
420 Test::new()
421 .with_batch(batch)
422 .with_batch_size(8192)
423 .with_expected_output_sizes(vec![4096])
424 .run();
425 }
426
427 #[test]
428 fn test_single_large_batch_equal_to_target() {
429 // test a single large batch
430 let batch = uint32_batch(0..4096);
431 Test::new()
432 .with_batch(batch)
433 .with_batch_size(4096)
434 .with_expected_output_sizes(vec![4096])
435 .run();
436 }
437
438 #[test]
439 fn test_single_large_batch_equally_divisible_in_target() {
440 // test a single large batch
441 let batch = uint32_batch(0..4096);
442 Test::new()
443 .with_batch(batch)
444 .with_batch_size(1024)
445 .with_expected_output_sizes(vec![1024, 1024, 1024, 1024])
446 .run();
447 }
448
449 #[test]
450 fn test_empty_schema() {
451 let schema = Schema::empty();
452 let batch = RecordBatch::new_empty(schema.into());
453 Test::new()
454 .with_batch(batch)
455 .with_expected_output_sizes(vec![])
456 .run();
457 }
458
459 #[test]
460 fn test_string_view_no_views() {
461 let output_batches = Test::new()
462 // both input batches have no views, so no need to compact
463 .with_batch(stringview_batch([Some("foo"), Some("bar")]))
464 .with_batch(stringview_batch([Some("baz"), Some("qux")]))
465 .with_expected_output_sizes(vec![4])
466 .run();
467
468 expect_buffer_layout(
469 col_as_string_view("c0", output_batches.first().unwrap()),
470 vec![],
471 );
472 }
473
474 #[test]
475 fn test_string_view_batch_small_no_compact() {
476 // view with only short strings (no buffers) --> no need to compact
477 let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]);
478 let output_batches = Test::new()
479 .with_batch(batch.clone())
480 .with_expected_output_sizes(vec![1000])
481 .run();
482
483 let array = col_as_string_view("c0", &batch);
484 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
485 assert_eq!(array.data_buffers().len(), 0);
486 assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
487
488 expect_buffer_layout(gc_array, vec![]);
489 }
490
491 #[test]
492 fn test_string_view_batch_large_no_compact() {
493 // view with large strings (has buffers) but full --> no need to compact
494 let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
495 let output_batches = Test::new()
496 .with_batch(batch.clone())
497 .with_batch_size(1000)
498 .with_expected_output_sizes(vec![1000])
499 .run();
500
501 let array = col_as_string_view("c0", &batch);
502 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
503 assert_eq!(array.data_buffers().len(), 5);
504 assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
505
506 expect_buffer_layout(
507 gc_array,
508 vec![
509 ExpectedLayout {
510 len: 8190,
511 capacity: 8192,
512 },
513 ExpectedLayout {
514 len: 8190,
515 capacity: 8192,
516 },
517 ExpectedLayout {
518 len: 8190,
519 capacity: 8192,
520 },
521 ExpectedLayout {
522 len: 8190,
523 capacity: 8192,
524 },
525 ExpectedLayout {
526 len: 2240,
527 capacity: 8192,
528 },
529 ],
530 );
531 }
532
533 #[test]
534 fn test_string_view_batch_small_with_buffers_no_compact() {
535 // view with buffers but only short views
536 let short_strings = std::iter::repeat(Some("SmallString"));
537 let long_strings = std::iter::once(Some("This string is longer than 12 bytes"));
538 // 20 short strings, then a long ones
539 let values = short_strings.take(20).chain(long_strings);
540 let batch = stringview_batch_repeated(1000, values)
541 // take only 10 short strings (no long ones)
542 .slice(5, 10);
543 let output_batches = Test::new()
544 .with_batch(batch.clone())
545 .with_batch_size(1000)
546 .with_expected_output_sizes(vec![10])
547 .run();
548
549 let array = col_as_string_view("c0", &batch);
550 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
551 assert_eq!(array.data_buffers().len(), 1); // input has one buffer
552 assert_eq!(gc_array.data_buffers().len(), 0); // output has no buffers as only short strings
553 }
554
555 #[test]
556 fn test_string_view_batch_large_slice_compact() {
557 // view with large strings (has buffers) and only partially used --> no need to compact
558 let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")])
559 // slice only 22 rows, so most of the buffer is not used
560 .slice(11, 22);
561
562 let output_batches = Test::new()
563 .with_batch(batch.clone())
564 .with_batch_size(1000)
565 .with_expected_output_sizes(vec![22])
566 .run();
567
568 let array = col_as_string_view("c0", &batch);
569 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
570 assert_eq!(array.data_buffers().len(), 5);
571
572 expect_buffer_layout(
573 gc_array,
574 vec![ExpectedLayout {
575 len: 770,
576 capacity: 8192,
577 }],
578 );
579 }
580
581 #[test]
582 fn test_string_view_mixed() {
583 let large_view_batch =
584 stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
585 let small_view_batch = stringview_batch_repeated(1000, [Some("SmallString")]);
586 let mixed_batch = stringview_batch_repeated(
587 1000,
588 [Some("This string is longer than 12 bytes"), Some("Small")],
589 );
590 let mixed_batch_nulls = stringview_batch_repeated(
591 1000,
592 [
593 Some("This string is longer than 12 bytes"),
594 Some("Small"),
595 None,
596 ],
597 );
598
599 // Several batches with mixed inline / non inline
600 // 4k rows in
601 let output_batches = Test::new()
602 .with_batch(large_view_batch.clone())
603 .with_batch(small_view_batch)
604 // this batch needs to be compacted (less than 1/2 full)
605 .with_batch(large_view_batch.slice(10, 20))
606 .with_batch(mixed_batch_nulls)
607 // this batch needs to be compacted (less than 1/2 full)
608 .with_batch(large_view_batch.slice(10, 20))
609 .with_batch(mixed_batch)
610 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
611 .run();
612
613 expect_buffer_layout(
614 col_as_string_view("c0", output_batches.first().unwrap()),
615 vec![
616 ExpectedLayout {
617 len: 8190,
618 capacity: 8192,
619 },
620 ExpectedLayout {
621 len: 8190,
622 capacity: 8192,
623 },
624 ExpectedLayout {
625 len: 8190,
626 capacity: 8192,
627 },
628 ExpectedLayout {
629 len: 8190,
630 capacity: 8192,
631 },
632 ExpectedLayout {
633 len: 2240,
634 capacity: 8192,
635 },
636 ],
637 );
638 }
639
640 #[test]
641 fn test_string_view_many_small_compact() {
642 // The strings are 28 long, so each batch has 400 * 28 = 5600 bytes
643 let batch = stringview_batch_repeated(
644 400,
645 [Some("This string is 28 bytes long"), Some("small string")],
646 );
647 let output_batches = Test::new()
648 // First allocated buffer is 8kb.
649 // Appending five batches of 5600 bytes will use 5600 * 5 = 28kb (8kb, an 16kb and 32kbkb)
650 .with_batch(batch.clone())
651 .with_batch(batch.clone())
652 .with_batch(batch.clone())
653 .with_batch(batch.clone())
654 .with_batch(batch.clone())
655 .with_batch_size(8000)
656 .with_expected_output_sizes(vec![2000]) // only 2000 rows total
657 .run();
658
659 // expect a nice even distribution of buffers
660 expect_buffer_layout(
661 col_as_string_view("c0", output_batches.first().unwrap()),
662 vec![
663 ExpectedLayout {
664 len: 8176,
665 capacity: 8192,
666 },
667 ExpectedLayout {
668 len: 16380,
669 capacity: 16384,
670 },
671 ExpectedLayout {
672 len: 3444,
673 capacity: 32768,
674 },
675 ],
676 );
677 }
678
679 #[test]
680 fn test_string_view_many_small_boundary() {
681 // The strings are designed to exactly fit into buffers that are powers of 2 long
682 let batch = stringview_batch_repeated(100, [Some("This string is a power of two=32")]);
683 let output_batches = Test::new()
684 .with_batches(std::iter::repeat(batch).take(20))
685 .with_batch_size(900)
686 .with_expected_output_sizes(vec![900, 900, 200])
687 .run();
688
689 // expect each buffer to be entirely full except the last one
690 expect_buffer_layout(
691 col_as_string_view("c0", output_batches.first().unwrap()),
692 vec![
693 ExpectedLayout {
694 len: 8192,
695 capacity: 8192,
696 },
697 ExpectedLayout {
698 len: 16384,
699 capacity: 16384,
700 },
701 ExpectedLayout {
702 len: 4224,
703 capacity: 32768,
704 },
705 ],
706 );
707 }
708
709 #[test]
710 fn test_string_view_large_small() {
711 // The strings are 37 bytes long, so each batch has 200 * 28 = 5600 bytes
712 let mixed_batch = stringview_batch_repeated(
713 400,
714 [Some("This string is 28 bytes long"), Some("small string")],
715 );
716 // These strings aren't copied, this array has an 8k buffer
717 let all_large = stringview_batch_repeated(
718 100,
719 [Some(
720 "This buffer has only large strings in it so there are no buffer copies",
721 )],
722 );
723
724 let output_batches = Test::new()
725 // First allocated buffer is 8kb.
726 // Appending five batches of 5600 bytes will use 5600 * 5 = 28kb (8kb, an 16kb and 32kbkb)
727 .with_batch(mixed_batch.clone())
728 .with_batch(mixed_batch.clone())
729 .with_batch(all_large.clone())
730 .with_batch(mixed_batch.clone())
731 .with_batch(all_large.clone())
732 .with_batch_size(8000)
733 .with_expected_output_sizes(vec![1400])
734 .run();
735
736 expect_buffer_layout(
737 col_as_string_view("c0", output_batches.first().unwrap()),
738 vec![
739 ExpectedLayout {
740 len: 8176,
741 capacity: 8192,
742 },
743 // this buffer was allocated but not used when the all_large batch was pushed
744 ExpectedLayout {
745 len: 3024,
746 capacity: 16384,
747 },
748 ExpectedLayout {
749 len: 7000,
750 capacity: 8192,
751 },
752 ExpectedLayout {
753 len: 5600,
754 capacity: 32768,
755 },
756 ExpectedLayout {
757 len: 7000,
758 capacity: 8192,
759 },
760 ],
761 );
762 }
763
764 #[test]
765 fn test_binary_view() {
766 let values: Vec<Option<&[u8]>> = vec![
767 Some(b"foo"),
768 None,
769 Some(b"A longer string that is more than 12 bytes"),
770 ];
771
772 let binary_view =
773 BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
774 let batch =
775 RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
776
777 Test::new()
778 .with_batch(batch.clone())
779 .with_batch(batch.clone())
780 .with_batch_size(512)
781 .with_expected_output_sizes(vec![512, 512, 512, 464])
782 .run();
783 }
784
785 #[derive(Debug, Clone, PartialEq)]
786 struct ExpectedLayout {
787 len: usize,
788 capacity: usize,
789 }
790
791 /// Asserts that the buffer layout of the specified StringViewArray matches the expected layout
792 fn expect_buffer_layout(array: &StringViewArray, expected: Vec<ExpectedLayout>) {
793 let actual = array
794 .data_buffers()
795 .iter()
796 .map(|b| ExpectedLayout {
797 len: b.len(),
798 capacity: b.capacity(),
799 })
800 .collect::<Vec<_>>();
801
802 assert_eq!(
803 actual, expected,
804 "Expected buffer layout {expected:#?} but got {actual:#?}"
805 );
806 }
807
808 /// Test for [`BatchCoalescer`]
809 ///
810 /// Pushes the input batches to the coalescer and verifies that the resulting
811 /// batches have the expected number of rows and contents.
812 #[derive(Debug, Clone)]
813 struct Test {
814 /// Batches to feed to the coalescer.
815 input_batches: Vec<RecordBatch>,
816 /// The schema. If not provided, the first batch's schema is used.
817 schema: Option<SchemaRef>,
818 /// Expected output sizes of the resulting batches
819 expected_output_sizes: Vec<usize>,
820 /// target batch size (default to 1024)
821 target_batch_size: usize,
822 }
823
824 impl Default for Test {
825 fn default() -> Self {
826 Self {
827 input_batches: vec![],
828 schema: None,
829 expected_output_sizes: vec![],
830 target_batch_size: 1024,
831 }
832 }
833 }
834
835 impl Test {
836 fn new() -> Self {
837 Self::default()
838 }
839
840 /// Set the target batch size
841 fn with_batch_size(mut self, target_batch_size: usize) -> Self {
842 self.target_batch_size = target_batch_size;
843 self
844 }
845
846 /// Extend the input batches with `batch`
847 fn with_batch(mut self, batch: RecordBatch) -> Self {
848 self.input_batches.push(batch);
849 self
850 }
851
852 /// Extends the input batches with `batches`
853 fn with_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
854 self.input_batches.extend(batches);
855 self
856 }
857
858 /// Specifies the schema for the test
859 fn with_schema(mut self, schema: SchemaRef) -> Self {
860 self.schema = Some(schema);
861 self
862 }
863
864 /// Extends `sizes` to expected output sizes
865 fn with_expected_output_sizes(mut self, sizes: impl IntoIterator<Item = usize>) -> Self {
866 self.expected_output_sizes.extend(sizes);
867 self
868 }
869
870 /// Runs the test -- see documentation on [`Test`] for details
871 ///
872 /// Returns the resulting output batches
873 fn run(self) -> Vec<RecordBatch> {
874 let Self {
875 input_batches,
876 schema,
877 target_batch_size,
878 expected_output_sizes,
879 } = self;
880
881 let schema = schema.unwrap_or_else(|| input_batches[0].schema());
882
883 // create a single large input batch for output comparison
884 let single_input_batch = concat_batches(&schema, &input_batches).unwrap();
885
886 let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
887
888 let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
889 for batch in input_batches {
890 coalescer.push_batch(batch).unwrap();
891 }
892 assert_eq!(schema, coalescer.schema());
893
894 if had_input {
895 assert!(!coalescer.is_empty(), "Coalescer should not be empty");
896 } else {
897 assert!(coalescer.is_empty(), "Coalescer should be empty");
898 }
899
900 coalescer.finish_buffered_batch().unwrap();
901 if had_input {
902 assert!(
903 coalescer.has_completed_batch(),
904 "Coalescer should have completed batches"
905 );
906 }
907
908 let mut output_batches = vec![];
909 while let Some(batch) = coalescer.next_completed_batch() {
910 output_batches.push(batch);
911 }
912
913 // make sure we got the expected number of output batches and content
914 let mut starting_idx = 0;
915 let actual_output_sizes: Vec<usize> =
916 output_batches.iter().map(|b| b.num_rows()).collect();
917 assert_eq!(
918 expected_output_sizes, actual_output_sizes,
919 "Unexpected number of rows in output batches\n\
920 Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}"
921 );
922 let iter = expected_output_sizes
923 .iter()
924 .zip(output_batches.iter())
925 .enumerate();
926
927 for (i, (expected_size, batch)) in iter {
928 // compare the contents of the batch after normalization (using
929 // `==` compares the underlying memory layout too)
930 let expected_batch = single_input_batch.slice(starting_idx, *expected_size);
931 let expected_batch = normalize_batch(expected_batch);
932 let batch = normalize_batch(batch.clone());
933 assert_eq!(
934 expected_batch, batch,
935 "Unexpected content in batch {i}:\
936 \n\nExpected:\n{expected_batch:#?}\n\nActual:\n{batch:#?}"
937 );
938 starting_idx += *expected_size;
939 }
940 output_batches
941 }
942 }
943
944 /// Return a RecordBatch with a UInt32Array with the specified range
945 fn uint32_batch(range: Range<u32>) -> RecordBatch {
946 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
947
948 RecordBatch::try_new(
949 Arc::clone(&schema),
950 vec![Arc::new(UInt32Array::from_iter_values(range))],
951 )
952 .unwrap()
953 }
954
955 /// Return a RecordBatch with a StringViewArray with (only) the specified values
956 fn stringview_batch<'a>(values: impl IntoIterator<Item = Option<&'a str>>) -> RecordBatch {
957 let schema = Arc::new(Schema::new(vec![Field::new(
958 "c0",
959 DataType::Utf8View,
960 false,
961 )]));
962
963 RecordBatch::try_new(
964 Arc::clone(&schema),
965 vec![Arc::new(StringViewArray::from_iter(values))],
966 )
967 .unwrap()
968 }
969
970 /// Return a RecordBatch with a StringViewArray with num_rows by repating
971 /// values over and over.
972 fn stringview_batch_repeated<'a>(
973 num_rows: usize,
974 values: impl IntoIterator<Item = Option<&'a str>>,
975 ) -> RecordBatch {
976 let schema = Arc::new(Schema::new(vec![Field::new(
977 "c0",
978 DataType::Utf8View,
979 true,
980 )]));
981
982 // Repeat the values to a total of num_rows
983 let values: Vec<_> = values.into_iter().collect();
984 let values_iter = std::iter::repeat(values.iter())
985 .flatten()
986 .cloned()
987 .take(num_rows);
988
989 let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
990 for val in values_iter {
991 builder.append_option(val);
992 }
993
994 let array = builder.finish();
995 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
996 }
997
998 /// Returns the named column as a StringViewArray
999 fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
1000 batch
1001 .column_by_name(name)
1002 .expect("column not found")
1003 .as_string_view_opt()
1004 .expect("column is not a string view")
1005 }
1006
1007 /// Normalize the `RecordBatch` so that the memory layout is consistent
1008 /// (e.g. StringArray is compacted).
1009 fn normalize_batch(batch: RecordBatch) -> RecordBatch {
1010 // Only need to normalize StringViews (as == also tests for memory layout)
1011 let (schema, mut columns, row_count) = batch.into_parts();
1012
1013 for column in columns.iter_mut() {
1014 let Some(string_view) = column.as_string_view_opt() else {
1015 continue;
1016 };
1017
1018 // Re-create the StringViewArray to ensure memory layout is
1019 // consistent
1020 let mut builder = StringViewBuilder::new();
1021 for s in string_view.iter() {
1022 builder.append_option(s);
1023 }
1024 // Update the column with the new StringViewArray
1025 *column = Arc::new(builder.finish());
1026 }
1027
1028 let options = RecordBatchOptions::new().with_row_count(Some(row_count));
1029 RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
1030 }
1031}