1use std::collections::{HashMap, HashSet};
21use std::ops::Range;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
26use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
27use arrow_cast::cast::cast;
28use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
29use arrow_schema::{
30 ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
31};
32use arrow_string::like::starts_with;
33use bytes::Bytes;
34use fnv::FnvHashSet;
35use futures::future::BoxFuture;
36use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
37use parquet::arrow::arrow_reader::{
38 ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
39};
40use parquet::arrow::async_reader::AsyncFileReader;
41use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
42use parquet::file::metadata::{
43 PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
44};
45use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
46
47use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
48use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
49use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
50use crate::delete_vector::DeleteVector;
51use crate::error::Result;
52use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
53use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
54use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
55use crate::expr::{BoundPredicate, BoundReference};
56use crate::io::{FileIO, FileMetadata, FileRead};
57use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
58use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
59use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
60use crate::utils::available_parallelism;
61use crate::{Error, ErrorKind};
62
63pub struct ArrowReaderBuilder {
65 batch_size: Option<usize>,
66 file_io: FileIO,
67 concurrency_limit_data_files: usize,
68 row_group_filtering_enabled: bool,
69 row_selection_enabled: bool,
70}
71
72impl ArrowReaderBuilder {
73 pub fn new(file_io: FileIO) -> Self {
75 let num_cpus = available_parallelism().get();
76
77 ArrowReaderBuilder {
78 batch_size: None,
79 file_io,
80 concurrency_limit_data_files: num_cpus,
81 row_group_filtering_enabled: true,
82 row_selection_enabled: false,
83 }
84 }
85
86 pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
88 self.concurrency_limit_data_files = val;
89 self
90 }
91
92 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
95 self.batch_size = Some(batch_size);
96 self
97 }
98
99 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
101 self.row_group_filtering_enabled = row_group_filtering_enabled;
102 self
103 }
104
105 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
107 self.row_selection_enabled = row_selection_enabled;
108 self
109 }
110
111 pub fn build(self) -> ArrowReader {
113 ArrowReader {
114 batch_size: self.batch_size,
115 file_io: self.file_io.clone(),
116 delete_file_loader: CachingDeleteFileLoader::new(
117 self.file_io.clone(),
118 self.concurrency_limit_data_files,
119 ),
120 concurrency_limit_data_files: self.concurrency_limit_data_files,
121 row_group_filtering_enabled: self.row_group_filtering_enabled,
122 row_selection_enabled: self.row_selection_enabled,
123 }
124 }
125}
126
127#[derive(Clone)]
129pub struct ArrowReader {
130 batch_size: Option<usize>,
131 file_io: FileIO,
132 delete_file_loader: CachingDeleteFileLoader,
133
134 concurrency_limit_data_files: usize,
136
137 row_group_filtering_enabled: bool,
138 row_selection_enabled: bool,
139}
140
141impl ArrowReader {
142 pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
145 let file_io = self.file_io.clone();
146 let batch_size = self.batch_size;
147 let concurrency_limit_data_files = self.concurrency_limit_data_files;
148 let row_group_filtering_enabled = self.row_group_filtering_enabled;
149 let row_selection_enabled = self.row_selection_enabled;
150
151 let stream = tasks
152 .map_ok(move |task| {
153 let file_io = file_io.clone();
154
155 Self::process_file_scan_task(
156 task,
157 batch_size,
158 file_io,
159 self.delete_file_loader.clone(),
160 row_group_filtering_enabled,
161 row_selection_enabled,
162 )
163 })
164 .map_err(|err| {
165 Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
166 })
167 .try_buffer_unordered(concurrency_limit_data_files)
168 .try_flatten_unordered(concurrency_limit_data_files);
169
170 Ok(Box::pin(stream) as ArrowRecordBatchStream)
171 }
172
173 #[allow(clippy::too_many_arguments)]
174 async fn process_file_scan_task(
175 task: FileScanTask,
176 batch_size: Option<usize>,
177 file_io: FileIO,
178 delete_file_loader: CachingDeleteFileLoader,
179 row_group_filtering_enabled: bool,
180 row_selection_enabled: bool,
181 ) -> Result<ArrowRecordBatchStream> {
182 let should_load_page_index =
183 (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
184
185 let delete_filter_rx =
186 delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
187
188 let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
191 &task.data_file_path,
192 file_io.clone(),
193 should_load_page_index,
194 None,
195 )
196 .await?;
197
198 let missing_field_ids = initial_stream_builder
202 .schema()
203 .fields()
204 .iter()
205 .next()
206 .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
207
208 let mut record_batch_stream_builder = if missing_field_ids {
224 let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
226 apply_name_mapping_to_arrow_schema(
231 Arc::clone(initial_stream_builder.schema()),
232 name_mapping,
233 )?
234 } else {
235 add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
238 };
239
240 let options = ArrowReaderOptions::new().with_schema(arrow_schema);
241
242 Self::create_parquet_record_batch_stream_builder(
243 &task.data_file_path,
244 file_io.clone(),
245 should_load_page_index,
246 Some(options),
247 )
248 .await?
249 } else {
250 initial_stream_builder
252 };
253
254 let project_field_ids_without_metadata: Vec<i32> = task
256 .project_field_ids
257 .iter()
258 .filter(|&&id| !is_metadata_field(id))
259 .copied()
260 .collect();
261
262 let projection_mask = Self::get_arrow_projection_mask(
267 &project_field_ids_without_metadata,
268 &task.schema,
269 record_batch_stream_builder.parquet_schema(),
270 record_batch_stream_builder.schema(),
271 missing_field_ids, )?;
273
274 record_batch_stream_builder =
275 record_batch_stream_builder.with_projection(projection_mask.clone());
276
277 let mut record_batch_transformer_builder =
281 RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
282
283 if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
285 let file_datum = Datum::string(task.data_file_path.clone());
286 record_batch_transformer_builder =
287 record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
288 }
289
290 if let (Some(partition_spec), Some(partition_data)) =
291 (task.partition_spec.clone(), task.partition.clone())
292 {
293 record_batch_transformer_builder =
294 record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
295 }
296
297 let mut record_batch_transformer = record_batch_transformer_builder.build();
298
299 if let Some(batch_size) = batch_size {
300 record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
301 }
302
303 let delete_filter = delete_filter_rx.await.unwrap()?;
304 let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
305
306 let final_predicate = match (&task.predicate, delete_predicate) {
311 (None, None) => None,
312 (Some(predicate), None) => Some(predicate.clone()),
313 (None, Some(ref predicate)) => Some(predicate.clone()),
314 (Some(filter_predicate), Some(delete_predicate)) => {
315 Some(filter_predicate.clone().and(delete_predicate))
316 }
317 };
318
319 let mut selected_row_group_indices = None;
335 let mut row_selection = None;
336
337 if task.start != 0 || task.length != 0 {
340 let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
341 record_batch_stream_builder.metadata(),
342 task.start,
343 task.length,
344 )?;
345 selected_row_group_indices = Some(byte_range_filtered_row_groups);
346 }
347
348 if let Some(predicate) = final_predicate {
349 let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
350 record_batch_stream_builder.parquet_schema(),
351 &predicate,
352 )?;
353
354 let row_filter = Self::get_row_filter(
355 &predicate,
356 record_batch_stream_builder.parquet_schema(),
357 &iceberg_field_ids,
358 &field_id_map,
359 )?;
360 record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
361
362 if row_group_filtering_enabled {
363 let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
364 &predicate,
365 record_batch_stream_builder.metadata(),
366 &field_id_map,
367 &task.schema,
368 )?;
369
370 selected_row_group_indices = match selected_row_group_indices {
373 Some(byte_range_filtered) => {
374 let intersection: Vec<usize> = byte_range_filtered
376 .into_iter()
377 .filter(|idx| predicate_filtered_row_groups.contains(idx))
378 .collect();
379 Some(intersection)
380 }
381 None => Some(predicate_filtered_row_groups),
382 };
383 }
384
385 if row_selection_enabled {
386 row_selection = Some(Self::get_row_selection_for_filter_predicate(
387 &predicate,
388 record_batch_stream_builder.metadata(),
389 &selected_row_group_indices,
390 &field_id_map,
391 &task.schema,
392 )?);
393 }
394 }
395
396 let positional_delete_indexes = delete_filter.get_delete_vector(&task);
397
398 if let Some(positional_delete_indexes) = positional_delete_indexes {
399 let delete_row_selection = {
400 let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
401
402 Self::build_deletes_row_selection(
403 record_batch_stream_builder.metadata().row_groups(),
404 &selected_row_group_indices,
405 &positional_delete_indexes,
406 )
407 }?;
408
409 row_selection = match row_selection {
412 None => Some(delete_row_selection),
413 Some(filter_row_selection) => {
414 Some(filter_row_selection.intersection(&delete_row_selection))
415 }
416 };
417 }
418
419 if let Some(row_selection) = row_selection {
420 record_batch_stream_builder =
421 record_batch_stream_builder.with_row_selection(row_selection);
422 }
423
424 if let Some(selected_row_group_indices) = selected_row_group_indices {
425 record_batch_stream_builder =
426 record_batch_stream_builder.with_row_groups(selected_row_group_indices);
427 }
428
429 let record_batch_stream =
432 record_batch_stream_builder
433 .build()?
434 .map(move |batch| match batch {
435 Ok(batch) => {
436 record_batch_transformer.process_record_batch(batch)
438 }
439 Err(err) => Err(err.into()),
440 });
441
442 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
443 }
444
445 pub(crate) async fn create_parquet_record_batch_stream_builder(
446 data_file_path: &str,
447 file_io: FileIO,
448 should_load_page_index: bool,
449 arrow_reader_options: Option<ArrowReaderOptions>,
450 ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead + Sized>>> {
451 let parquet_file = file_io.new_input(data_file_path)?;
454 let (parquet_metadata, parquet_reader) =
455 try_join!(parquet_file.metadata(), parquet_file.reader())?;
456 let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader)
457 .with_preload_column_index(true)
458 .with_preload_offset_index(true)
459 .with_preload_page_index(should_load_page_index);
460
461 let options = arrow_reader_options.unwrap_or_default();
463 let record_batch_stream_builder =
464 ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
465 Ok(record_batch_stream_builder)
466 }
467
468 fn build_deletes_row_selection(
474 row_group_metadata_list: &[RowGroupMetaData],
475 selected_row_groups: &Option<Vec<usize>>,
476 positional_deletes: &DeleteVector,
477 ) -> Result<RowSelection> {
478 let mut results: Vec<RowSelector> = Vec::new();
479 let mut selected_row_groups_idx = 0;
480 let mut current_row_group_base_idx: u64 = 0;
481 let mut delete_vector_iter = positional_deletes.iter();
482 let mut next_deleted_row_idx_opt = delete_vector_iter.next();
483
484 for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
485 let row_group_num_rows = row_group_metadata.num_rows() as u64;
486 let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
487
488 if let Some(selected_row_groups) = selected_row_groups {
490 if selected_row_groups_idx == selected_row_groups.len() {
492 break;
493 }
494
495 if idx == selected_row_groups[selected_row_groups_idx] {
496 selected_row_groups_idx += 1;
500 } else {
501 delete_vector_iter.advance_to(next_row_group_base_idx);
506 if let Some(cached_idx) = next_deleted_row_idx_opt
508 && cached_idx < next_row_group_base_idx
509 {
510 next_deleted_row_idx_opt = delete_vector_iter.next();
511 }
512
513 current_row_group_base_idx += row_group_num_rows;
516 continue;
517 }
518 }
519
520 let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
521 Some(next_deleted_row_idx) => {
522 if next_deleted_row_idx >= next_row_group_base_idx {
525 results.push(RowSelector::select(row_group_num_rows as usize));
526 current_row_group_base_idx += row_group_num_rows;
527 continue;
528 }
529
530 next_deleted_row_idx
531 }
532
533 _ => {
535 results.push(RowSelector::select(row_group_num_rows as usize));
536 current_row_group_base_idx += row_group_num_rows;
537 continue;
538 }
539 };
540
541 let mut current_idx = current_row_group_base_idx;
542 'chunks: while next_deleted_row_idx < next_row_group_base_idx {
543 if current_idx < next_deleted_row_idx {
545 let run_length = next_deleted_row_idx - current_idx;
546 results.push(RowSelector::select(run_length as usize));
547 current_idx += run_length;
548 }
549
550 let mut run_length = 0;
552 while next_deleted_row_idx == current_idx
553 && next_deleted_row_idx < next_row_group_base_idx
554 {
555 run_length += 1;
556 current_idx += 1;
557
558 next_deleted_row_idx_opt = delete_vector_iter.next();
559 next_deleted_row_idx = match next_deleted_row_idx_opt {
560 Some(next_deleted_row_idx) => next_deleted_row_idx,
561 _ => {
562 results.push(RowSelector::skip(run_length));
566 break 'chunks;
567 }
568 };
569 }
570 if run_length > 0 {
571 results.push(RowSelector::skip(run_length));
572 }
573 }
574
575 if current_idx < next_row_group_base_idx {
576 results.push(RowSelector::select(
577 (next_row_group_base_idx - current_idx) as usize,
578 ));
579 }
580
581 current_row_group_base_idx += row_group_num_rows;
582 }
583
584 Ok(results.into())
585 }
586
587 fn build_field_id_set_and_map(
588 parquet_schema: &SchemaDescriptor,
589 predicate: &BoundPredicate,
590 ) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
591 let mut collector = CollectFieldIdVisitor {
593 field_ids: HashSet::default(),
594 };
595 visit(&mut collector, predicate)?;
596
597 let iceberg_field_ids = collector.field_ids();
598
599 let field_id_map = match build_field_id_map(parquet_schema)? {
601 Some(map) => map,
602 None => build_fallback_field_id_map(parquet_schema),
603 };
604
605 Ok((iceberg_field_ids, field_id_map))
606 }
607
608 fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
611 match field.field_type.as_ref() {
612 Type::Primitive(_) => {
613 field_ids.push(field.id);
614 }
615 Type::Struct(struct_type) => {
616 for nested_field in struct_type.fields() {
617 Self::include_leaf_field_id(nested_field, field_ids);
618 }
619 }
620 Type::List(list_type) => {
621 Self::include_leaf_field_id(&list_type.element_field, field_ids);
622 }
623 Type::Map(map_type) => {
624 Self::include_leaf_field_id(&map_type.key_field, field_ids);
625 Self::include_leaf_field_id(&map_type.value_field, field_ids);
626 }
627 }
628 }
629
630 fn get_arrow_projection_mask(
631 field_ids: &[i32],
632 iceberg_schema_of_task: &Schema,
633 parquet_schema: &SchemaDescriptor,
634 arrow_schema: &ArrowSchemaRef,
635 use_fallback: bool, ) -> Result<ProjectionMask> {
637 fn type_promotion_is_valid(
638 file_type: Option<&PrimitiveType>,
639 projected_type: Option<&PrimitiveType>,
640 ) -> bool {
641 match (file_type, projected_type) {
642 (Some(lhs), Some(rhs)) if lhs == rhs => true,
643 (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
644 (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
645 (
646 Some(PrimitiveType::Decimal {
647 precision: file_precision,
648 scale: file_scale,
649 }),
650 Some(PrimitiveType::Decimal {
651 precision: requested_precision,
652 scale: requested_scale,
653 }),
654 ) if requested_precision >= file_precision && file_scale == requested_scale => true,
655 (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
657 _ => false,
658 }
659 }
660
661 if field_ids.is_empty() {
662 return Ok(ProjectionMask::all());
663 }
664
665 if use_fallback {
666 Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
668 } else {
669 let mut leaf_field_ids = vec![];
673 for field_id in field_ids {
674 let field = iceberg_schema_of_task.field_by_id(*field_id);
675 if let Some(field) = field {
676 Self::include_leaf_field_id(field, &mut leaf_field_ids);
677 }
678 }
679
680 Self::get_arrow_projection_mask_with_field_ids(
681 &leaf_field_ids,
682 iceberg_schema_of_task,
683 parquet_schema,
684 arrow_schema,
685 type_promotion_is_valid,
686 )
687 }
688 }
689
690 fn get_arrow_projection_mask_with_field_ids(
693 leaf_field_ids: &[i32],
694 iceberg_schema_of_task: &Schema,
695 parquet_schema: &SchemaDescriptor,
696 arrow_schema: &ArrowSchemaRef,
697 type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool,
698 ) -> Result<ProjectionMask> {
699 let mut column_map = HashMap::new();
700 let fields = arrow_schema.fields();
701
702 let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
705 let projected_arrow_schema = ArrowSchema::new_with_metadata(
706 fields.filter_leaves(|_, f| {
707 f.metadata()
708 .get(PARQUET_FIELD_ID_META_KEY)
709 .and_then(|field_id| i32::from_str(field_id).ok())
710 .is_some_and(|field_id| {
711 projected_fields.insert((*f).clone(), field_id);
712 leaf_field_ids.contains(&field_id)
713 })
714 }),
715 arrow_schema.metadata().clone(),
716 );
717 let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
718
719 fields.filter_leaves(|idx, field| {
720 let Some(field_id) = projected_fields.get(field).cloned() else {
721 return false;
722 };
723
724 let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
725 let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
726
727 if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
728 return false;
729 }
730
731 if !type_promotion_is_valid(
732 parquet_iceberg_field
733 .unwrap()
734 .field_type
735 .as_primitive_type(),
736 iceberg_field.unwrap().field_type.as_primitive_type(),
737 ) {
738 return false;
739 }
740
741 column_map.insert(field_id, idx);
742 true
743 });
744
745 let mut indices = vec![];
748 for field_id in leaf_field_ids {
749 if let Some(col_idx) = column_map.get(field_id) {
750 indices.push(*col_idx);
751 }
752 }
753
754 if indices.is_empty() {
755 Ok(ProjectionMask::all())
758 } else {
759 Ok(ProjectionMask::leaves(parquet_schema, indices))
760 }
761 }
762
763 fn get_arrow_projection_mask_fallback(
767 field_ids: &[i32],
768 parquet_schema: &SchemaDescriptor,
769 ) -> Result<ProjectionMask> {
770 let parquet_root_fields = parquet_schema.root_schema().get_fields();
772 let mut root_indices = vec![];
773
774 for field_id in field_ids.iter() {
775 let parquet_pos = (*field_id - 1) as usize;
776
777 if parquet_pos < parquet_root_fields.len() {
778 root_indices.push(parquet_pos);
779 }
780 }
782
783 if root_indices.is_empty() {
784 Ok(ProjectionMask::all())
785 } else {
786 Ok(ProjectionMask::roots(parquet_schema, root_indices))
787 }
788 }
789
790 fn get_row_filter(
791 predicates: &BoundPredicate,
792 parquet_schema: &SchemaDescriptor,
793 iceberg_field_ids: &HashSet<i32>,
794 field_id_map: &HashMap<i32, usize>,
795 ) -> Result<RowFilter> {
796 let mut column_indices = iceberg_field_ids
799 .iter()
800 .filter_map(|field_id| field_id_map.get(field_id).cloned())
801 .collect::<Vec<_>>();
802 column_indices.sort();
803
804 let mut converter = PredicateConverter {
806 parquet_schema,
807 column_map: field_id_map,
808 column_indices: &column_indices,
809 };
810
811 let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
814 let predicate_func = visit(&mut converter, predicates)?;
815 let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
816 Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
817 }
818
819 fn get_selected_row_group_indices(
820 predicate: &BoundPredicate,
821 parquet_metadata: &Arc<ParquetMetaData>,
822 field_id_map: &HashMap<i32, usize>,
823 snapshot_schema: &Schema,
824 ) -> Result<Vec<usize>> {
825 let row_groups_metadata = parquet_metadata.row_groups();
826 let mut results = Vec::with_capacity(row_groups_metadata.len());
827
828 for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
829 if RowGroupMetricsEvaluator::eval(
830 predicate,
831 row_group_metadata,
832 field_id_map,
833 snapshot_schema,
834 )? {
835 results.push(idx);
836 }
837 }
838
839 Ok(results)
840 }
841
842 fn get_row_selection_for_filter_predicate(
843 predicate: &BoundPredicate,
844 parquet_metadata: &Arc<ParquetMetaData>,
845 selected_row_groups: &Option<Vec<usize>>,
846 field_id_map: &HashMap<i32, usize>,
847 snapshot_schema: &Schema,
848 ) -> Result<RowSelection> {
849 let Some(column_index) = parquet_metadata.column_index() else {
850 return Err(Error::new(
851 ErrorKind::Unexpected,
852 "Parquet file metadata does not contain a column index",
853 ));
854 };
855
856 let Some(offset_index) = parquet_metadata.offset_index() else {
857 return Err(Error::new(
858 ErrorKind::Unexpected,
859 "Parquet file metadata does not contain an offset index",
860 ));
861 };
862
863 if let Some(selected_row_groups) = selected_row_groups
865 && selected_row_groups.is_empty()
866 {
867 return Ok(RowSelection::from(Vec::new()));
868 }
869
870 let mut selected_row_groups_idx = 0;
871
872 let page_index = column_index
873 .iter()
874 .enumerate()
875 .zip(offset_index)
876 .zip(parquet_metadata.row_groups());
877
878 let mut results = Vec::new();
879 for (((idx, column_index), offset_index), row_group_metadata) in page_index {
880 if let Some(selected_row_groups) = selected_row_groups {
881 if idx == selected_row_groups[selected_row_groups_idx] {
883 selected_row_groups_idx += 1;
884 } else {
885 continue;
886 }
887 }
888
889 let selections_for_page = PageIndexEvaluator::eval(
890 predicate,
891 column_index,
892 offset_index,
893 row_group_metadata,
894 field_id_map,
895 snapshot_schema,
896 )?;
897
898 results.push(selections_for_page);
899
900 if let Some(selected_row_groups) = selected_row_groups
901 && selected_row_groups_idx == selected_row_groups.len()
902 {
903 break;
904 }
905 }
906
907 Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
908 }
909
910 fn filter_row_groups_by_byte_range(
915 parquet_metadata: &Arc<ParquetMetaData>,
916 start: u64,
917 length: u64,
918 ) -> Result<Vec<usize>> {
919 let row_groups = parquet_metadata.row_groups();
920 let mut selected = Vec::new();
921 let end = start + length;
922
923 let mut current_byte_offset = 4u64;
925
926 for (idx, row_group) in row_groups.iter().enumerate() {
927 let row_group_size = row_group.compressed_size() as u64;
928 let row_group_end = current_byte_offset + row_group_size;
929
930 if current_byte_offset < end && start < row_group_end {
931 selected.push(idx);
932 }
933
934 current_byte_offset = row_group_end;
935 }
936
937 Ok(selected)
938 }
939}
940
941fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMap<i32, usize>>> {
944 let mut column_map = HashMap::new();
945
946 for (idx, field) in parquet_schema.columns().iter().enumerate() {
947 let field_type = field.self_type();
948 match field_type {
949 ParquetType::PrimitiveType { basic_info, .. } => {
950 if !basic_info.has_id() {
951 return Ok(None);
952 }
953 column_map.insert(basic_info.id(), idx);
954 }
955 ParquetType::GroupType { .. } => {
956 return Err(Error::new(
957 ErrorKind::DataInvalid,
958 format!(
959 "Leave column in schema should be primitive type but got {field_type:?}"
960 ),
961 ));
962 }
963 };
964 }
965
966 Ok(Some(column_map))
967}
968
969fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
972 let mut column_map = HashMap::new();
973
974 for (idx, _field) in parquet_schema.columns().iter().enumerate() {
976 let field_id = (idx + 1) as i32;
977 column_map.insert(field_id, idx);
978 }
979
980 column_map
981}
982
983fn apply_name_mapping_to_arrow_schema(
1002 arrow_schema: ArrowSchemaRef,
1003 name_mapping: &NameMapping,
1004) -> Result<Arc<ArrowSchema>> {
1005 debug_assert!(
1006 arrow_schema
1007 .fields()
1008 .iter()
1009 .next()
1010 .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1011 "Schema already has field IDs - name mapping should not be applied"
1012 );
1013
1014 use arrow_schema::Field;
1015
1016 let fields_with_mapped_ids: Vec<_> = arrow_schema
1017 .fields()
1018 .iter()
1019 .map(|field| {
1020 let mapped_field_opt = name_mapping
1028 .fields()
1029 .iter()
1030 .find(|f| f.names().contains(&field.name().to_string()));
1031
1032 let mut metadata = field.metadata().clone();
1033
1034 if let Some(mapped_field) = mapped_field_opt
1035 && let Some(field_id) = mapped_field.field_id()
1036 {
1037 metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1039 }
1040 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1043 .with_metadata(metadata)
1044 })
1045 .collect();
1046
1047 Ok(Arc::new(ArrowSchema::new_with_metadata(
1048 fields_with_mapped_ids,
1049 arrow_schema.metadata().clone(),
1050 )))
1051}
1052
1053fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<ArrowSchema> {
1060 debug_assert!(
1061 arrow_schema
1062 .fields()
1063 .iter()
1064 .next()
1065 .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1066 "Schema already has field IDs"
1067 );
1068
1069 use arrow_schema::Field;
1070
1071 let fields_with_fallback_ids: Vec<_> = arrow_schema
1072 .fields()
1073 .iter()
1074 .enumerate()
1075 .map(|(pos, field)| {
1076 let mut metadata = field.metadata().clone();
1077 let field_id = (pos + 1) as i32; metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1079
1080 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1081 .with_metadata(metadata)
1082 })
1083 .collect();
1084
1085 Arc::new(ArrowSchema::new_with_metadata(
1086 fields_with_fallback_ids,
1087 arrow_schema.metadata().clone(),
1088 ))
1089}
1090
1091struct CollectFieldIdVisitor {
1093 field_ids: HashSet<i32>,
1094}
1095
1096impl CollectFieldIdVisitor {
1097 fn field_ids(self) -> HashSet<i32> {
1098 self.field_ids
1099 }
1100}
1101
1102impl BoundPredicateVisitor for CollectFieldIdVisitor {
1103 type T = ();
1104
1105 fn always_true(&mut self) -> Result<()> {
1106 Ok(())
1107 }
1108
1109 fn always_false(&mut self) -> Result<()> {
1110 Ok(())
1111 }
1112
1113 fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1114 Ok(())
1115 }
1116
1117 fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1118 Ok(())
1119 }
1120
1121 fn not(&mut self, _inner: ()) -> Result<()> {
1122 Ok(())
1123 }
1124
1125 fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1126 self.field_ids.insert(reference.field().id);
1127 Ok(())
1128 }
1129
1130 fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1131 self.field_ids.insert(reference.field().id);
1132 Ok(())
1133 }
1134
1135 fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1136 self.field_ids.insert(reference.field().id);
1137 Ok(())
1138 }
1139
1140 fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1141 self.field_ids.insert(reference.field().id);
1142 Ok(())
1143 }
1144
1145 fn less_than(
1146 &mut self,
1147 reference: &BoundReference,
1148 _literal: &Datum,
1149 _predicate: &BoundPredicate,
1150 ) -> Result<()> {
1151 self.field_ids.insert(reference.field().id);
1152 Ok(())
1153 }
1154
1155 fn less_than_or_eq(
1156 &mut self,
1157 reference: &BoundReference,
1158 _literal: &Datum,
1159 _predicate: &BoundPredicate,
1160 ) -> Result<()> {
1161 self.field_ids.insert(reference.field().id);
1162 Ok(())
1163 }
1164
1165 fn greater_than(
1166 &mut self,
1167 reference: &BoundReference,
1168 _literal: &Datum,
1169 _predicate: &BoundPredicate,
1170 ) -> Result<()> {
1171 self.field_ids.insert(reference.field().id);
1172 Ok(())
1173 }
1174
1175 fn greater_than_or_eq(
1176 &mut self,
1177 reference: &BoundReference,
1178 _literal: &Datum,
1179 _predicate: &BoundPredicate,
1180 ) -> Result<()> {
1181 self.field_ids.insert(reference.field().id);
1182 Ok(())
1183 }
1184
1185 fn eq(
1186 &mut self,
1187 reference: &BoundReference,
1188 _literal: &Datum,
1189 _predicate: &BoundPredicate,
1190 ) -> Result<()> {
1191 self.field_ids.insert(reference.field().id);
1192 Ok(())
1193 }
1194
1195 fn not_eq(
1196 &mut self,
1197 reference: &BoundReference,
1198 _literal: &Datum,
1199 _predicate: &BoundPredicate,
1200 ) -> Result<()> {
1201 self.field_ids.insert(reference.field().id);
1202 Ok(())
1203 }
1204
1205 fn starts_with(
1206 &mut self,
1207 reference: &BoundReference,
1208 _literal: &Datum,
1209 _predicate: &BoundPredicate,
1210 ) -> Result<()> {
1211 self.field_ids.insert(reference.field().id);
1212 Ok(())
1213 }
1214
1215 fn not_starts_with(
1216 &mut self,
1217 reference: &BoundReference,
1218 _literal: &Datum,
1219 _predicate: &BoundPredicate,
1220 ) -> Result<()> {
1221 self.field_ids.insert(reference.field().id);
1222 Ok(())
1223 }
1224
1225 fn r#in(
1226 &mut self,
1227 reference: &BoundReference,
1228 _literals: &FnvHashSet<Datum>,
1229 _predicate: &BoundPredicate,
1230 ) -> Result<()> {
1231 self.field_ids.insert(reference.field().id);
1232 Ok(())
1233 }
1234
1235 fn not_in(
1236 &mut self,
1237 reference: &BoundReference,
1238 _literals: &FnvHashSet<Datum>,
1239 _predicate: &BoundPredicate,
1240 ) -> Result<()> {
1241 self.field_ids.insert(reference.field().id);
1242 Ok(())
1243 }
1244}
1245
1246struct PredicateConverter<'a> {
1248 pub parquet_schema: &'a SchemaDescriptor,
1250 pub column_map: &'a HashMap<i32, usize>,
1252 pub column_indices: &'a Vec<usize>,
1254}
1255
1256impl PredicateConverter<'_> {
1257 fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
1262 if let Some(column_idx) = self.column_map.get(&reference.field().id) {
1264 if self.parquet_schema.get_column_root(*column_idx).is_group() {
1265 return Err(Error::new(
1266 ErrorKind::DataInvalid,
1267 format!(
1268 "Leave column `{}` in predicates isn't a root column in Parquet schema.",
1269 reference.field().name
1270 ),
1271 ));
1272 }
1273
1274 let index = self
1276 .column_indices
1277 .iter()
1278 .position(|&idx| idx == *column_idx)
1279 .ok_or(Error::new(
1280 ErrorKind::DataInvalid,
1281 format!(
1282 "Leave column `{}` in predicates cannot be found in the required column indices.",
1283 reference.field().name
1284 ),
1285 ))?;
1286
1287 Ok(Some(index))
1288 } else {
1289 Ok(None)
1290 }
1291 }
1292
1293 fn build_always_true(&self) -> Result<Box<PredicateResult>> {
1295 Ok(Box::new(|batch| {
1296 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1297 }))
1298 }
1299
1300 fn build_always_false(&self) -> Result<Box<PredicateResult>> {
1302 Ok(Box::new(|batch| {
1303 Ok(BooleanArray::from(vec![false; batch.num_rows()]))
1304 }))
1305 }
1306}
1307
1308fn project_column(
1311 batch: &RecordBatch,
1312 column_idx: usize,
1313) -> std::result::Result<ArrayRef, ArrowError> {
1314 let column = batch.column(column_idx);
1315
1316 match column.data_type() {
1317 DataType::Struct(_) => Err(ArrowError::SchemaError(
1318 "Does not support struct column yet.".to_string(),
1319 )),
1320 _ => Ok(column.clone()),
1321 }
1322}
1323
1324type PredicateResult =
1325 dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
1326
1327impl BoundPredicateVisitor for PredicateConverter<'_> {
1328 type T = Box<PredicateResult>;
1329
1330 fn always_true(&mut self) -> Result<Box<PredicateResult>> {
1331 self.build_always_true()
1332 }
1333
1334 fn always_false(&mut self) -> Result<Box<PredicateResult>> {
1335 self.build_always_false()
1336 }
1337
1338 fn and(
1339 &mut self,
1340 mut lhs: Box<PredicateResult>,
1341 mut rhs: Box<PredicateResult>,
1342 ) -> Result<Box<PredicateResult>> {
1343 Ok(Box::new(move |batch| {
1344 let left = lhs(batch.clone())?;
1345 let right = rhs(batch)?;
1346 and_kleene(&left, &right)
1347 }))
1348 }
1349
1350 fn or(
1351 &mut self,
1352 mut lhs: Box<PredicateResult>,
1353 mut rhs: Box<PredicateResult>,
1354 ) -> Result<Box<PredicateResult>> {
1355 Ok(Box::new(move |batch| {
1356 let left = lhs(batch.clone())?;
1357 let right = rhs(batch)?;
1358 or_kleene(&left, &right)
1359 }))
1360 }
1361
1362 fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
1363 Ok(Box::new(move |batch| {
1364 let pred_ret = inner(batch)?;
1365 not(&pred_ret)
1366 }))
1367 }
1368
1369 fn is_null(
1370 &mut self,
1371 reference: &BoundReference,
1372 _predicate: &BoundPredicate,
1373 ) -> Result<Box<PredicateResult>> {
1374 if let Some(idx) = self.bound_reference(reference)? {
1375 Ok(Box::new(move |batch| {
1376 let column = project_column(&batch, idx)?;
1377 is_null(&column)
1378 }))
1379 } else {
1380 self.build_always_true()
1382 }
1383 }
1384
1385 fn not_null(
1386 &mut self,
1387 reference: &BoundReference,
1388 _predicate: &BoundPredicate,
1389 ) -> Result<Box<PredicateResult>> {
1390 if let Some(idx) = self.bound_reference(reference)? {
1391 Ok(Box::new(move |batch| {
1392 let column = project_column(&batch, idx)?;
1393 is_not_null(&column)
1394 }))
1395 } else {
1396 self.build_always_false()
1398 }
1399 }
1400
1401 fn is_nan(
1402 &mut self,
1403 reference: &BoundReference,
1404 _predicate: &BoundPredicate,
1405 ) -> Result<Box<PredicateResult>> {
1406 if self.bound_reference(reference)?.is_some() {
1407 self.build_always_true()
1408 } else {
1409 self.build_always_false()
1411 }
1412 }
1413
1414 fn not_nan(
1415 &mut self,
1416 reference: &BoundReference,
1417 _predicate: &BoundPredicate,
1418 ) -> Result<Box<PredicateResult>> {
1419 if self.bound_reference(reference)?.is_some() {
1420 self.build_always_false()
1421 } else {
1422 self.build_always_true()
1424 }
1425 }
1426
1427 fn less_than(
1428 &mut self,
1429 reference: &BoundReference,
1430 literal: &Datum,
1431 _predicate: &BoundPredicate,
1432 ) -> Result<Box<PredicateResult>> {
1433 if let Some(idx) = self.bound_reference(reference)? {
1434 let literal = get_arrow_datum(literal)?;
1435
1436 Ok(Box::new(move |batch| {
1437 let left = project_column(&batch, idx)?;
1438 let literal = try_cast_literal(&literal, left.data_type())?;
1439 lt(&left, literal.as_ref())
1440 }))
1441 } else {
1442 self.build_always_true()
1444 }
1445 }
1446
1447 fn less_than_or_eq(
1448 &mut self,
1449 reference: &BoundReference,
1450 literal: &Datum,
1451 _predicate: &BoundPredicate,
1452 ) -> Result<Box<PredicateResult>> {
1453 if let Some(idx) = self.bound_reference(reference)? {
1454 let literal = get_arrow_datum(literal)?;
1455
1456 Ok(Box::new(move |batch| {
1457 let left = project_column(&batch, idx)?;
1458 let literal = try_cast_literal(&literal, left.data_type())?;
1459 lt_eq(&left, literal.as_ref())
1460 }))
1461 } else {
1462 self.build_always_true()
1464 }
1465 }
1466
1467 fn greater_than(
1468 &mut self,
1469 reference: &BoundReference,
1470 literal: &Datum,
1471 _predicate: &BoundPredicate,
1472 ) -> Result<Box<PredicateResult>> {
1473 if let Some(idx) = self.bound_reference(reference)? {
1474 let literal = get_arrow_datum(literal)?;
1475
1476 Ok(Box::new(move |batch| {
1477 let left = project_column(&batch, idx)?;
1478 let literal = try_cast_literal(&literal, left.data_type())?;
1479 gt(&left, literal.as_ref())
1480 }))
1481 } else {
1482 self.build_always_false()
1484 }
1485 }
1486
1487 fn greater_than_or_eq(
1488 &mut self,
1489 reference: &BoundReference,
1490 literal: &Datum,
1491 _predicate: &BoundPredicate,
1492 ) -> Result<Box<PredicateResult>> {
1493 if let Some(idx) = self.bound_reference(reference)? {
1494 let literal = get_arrow_datum(literal)?;
1495
1496 Ok(Box::new(move |batch| {
1497 let left = project_column(&batch, idx)?;
1498 let literal = try_cast_literal(&literal, left.data_type())?;
1499 gt_eq(&left, literal.as_ref())
1500 }))
1501 } else {
1502 self.build_always_false()
1504 }
1505 }
1506
1507 fn eq(
1508 &mut self,
1509 reference: &BoundReference,
1510 literal: &Datum,
1511 _predicate: &BoundPredicate,
1512 ) -> Result<Box<PredicateResult>> {
1513 if let Some(idx) = self.bound_reference(reference)? {
1514 let literal = get_arrow_datum(literal)?;
1515
1516 Ok(Box::new(move |batch| {
1517 let left = project_column(&batch, idx)?;
1518 let literal = try_cast_literal(&literal, left.data_type())?;
1519 eq(&left, literal.as_ref())
1520 }))
1521 } else {
1522 self.build_always_false()
1524 }
1525 }
1526
1527 fn not_eq(
1528 &mut self,
1529 reference: &BoundReference,
1530 literal: &Datum,
1531 _predicate: &BoundPredicate,
1532 ) -> Result<Box<PredicateResult>> {
1533 if let Some(idx) = self.bound_reference(reference)? {
1534 let literal = get_arrow_datum(literal)?;
1535
1536 Ok(Box::new(move |batch| {
1537 let left = project_column(&batch, idx)?;
1538 let literal = try_cast_literal(&literal, left.data_type())?;
1539 neq(&left, literal.as_ref())
1540 }))
1541 } else {
1542 self.build_always_false()
1544 }
1545 }
1546
1547 fn starts_with(
1548 &mut self,
1549 reference: &BoundReference,
1550 literal: &Datum,
1551 _predicate: &BoundPredicate,
1552 ) -> Result<Box<PredicateResult>> {
1553 if let Some(idx) = self.bound_reference(reference)? {
1554 let literal = get_arrow_datum(literal)?;
1555
1556 Ok(Box::new(move |batch| {
1557 let left = project_column(&batch, idx)?;
1558 let literal = try_cast_literal(&literal, left.data_type())?;
1559 starts_with(&left, literal.as_ref())
1560 }))
1561 } else {
1562 self.build_always_false()
1564 }
1565 }
1566
1567 fn not_starts_with(
1568 &mut self,
1569 reference: &BoundReference,
1570 literal: &Datum,
1571 _predicate: &BoundPredicate,
1572 ) -> Result<Box<PredicateResult>> {
1573 if let Some(idx) = self.bound_reference(reference)? {
1574 let literal = get_arrow_datum(literal)?;
1575
1576 Ok(Box::new(move |batch| {
1577 let left = project_column(&batch, idx)?;
1578 let literal = try_cast_literal(&literal, left.data_type())?;
1579 not(&starts_with(&left, literal.as_ref())?)
1581 }))
1582 } else {
1583 self.build_always_true()
1585 }
1586 }
1587
1588 fn r#in(
1589 &mut self,
1590 reference: &BoundReference,
1591 literals: &FnvHashSet<Datum>,
1592 _predicate: &BoundPredicate,
1593 ) -> Result<Box<PredicateResult>> {
1594 if let Some(idx) = self.bound_reference(reference)? {
1595 let literals: Vec<_> = literals
1596 .iter()
1597 .map(|lit| get_arrow_datum(lit).unwrap())
1598 .collect();
1599
1600 Ok(Box::new(move |batch| {
1601 let left = project_column(&batch, idx)?;
1603
1604 let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
1605 for literal in &literals {
1606 let literal = try_cast_literal(literal, left.data_type())?;
1607 acc = or(&acc, &eq(&left, literal.as_ref())?)?
1608 }
1609
1610 Ok(acc)
1611 }))
1612 } else {
1613 self.build_always_false()
1615 }
1616 }
1617
1618 fn not_in(
1619 &mut self,
1620 reference: &BoundReference,
1621 literals: &FnvHashSet<Datum>,
1622 _predicate: &BoundPredicate,
1623 ) -> Result<Box<PredicateResult>> {
1624 if let Some(idx) = self.bound_reference(reference)? {
1625 let literals: Vec<_> = literals
1626 .iter()
1627 .map(|lit| get_arrow_datum(lit).unwrap())
1628 .collect();
1629
1630 Ok(Box::new(move |batch| {
1631 let left = project_column(&batch, idx)?;
1633 let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
1634 for literal in &literals {
1635 let literal = try_cast_literal(literal, left.data_type())?;
1636 acc = and(&acc, &neq(&left, literal.as_ref())?)?
1637 }
1638
1639 Ok(acc)
1640 }))
1641 } else {
1642 self.build_always_true()
1644 }
1645 }
1646}
1647
1648pub struct ArrowFileReader<R: FileRead> {
1650 meta: FileMetadata,
1651 preload_column_index: bool,
1652 preload_offset_index: bool,
1653 preload_page_index: bool,
1654 metadata_size_hint: Option<usize>,
1655 r: R,
1656}
1657
1658impl<R: FileRead> ArrowFileReader<R> {
1659 pub fn new(meta: FileMetadata, r: R) -> Self {
1661 Self {
1662 meta,
1663 preload_column_index: false,
1664 preload_offset_index: false,
1665 preload_page_index: false,
1666 metadata_size_hint: None,
1667 r,
1668 }
1669 }
1670
1671 pub fn with_preload_column_index(mut self, preload: bool) -> Self {
1673 self.preload_column_index = preload;
1674 self
1675 }
1676
1677 pub fn with_preload_offset_index(mut self, preload: bool) -> Self {
1679 self.preload_offset_index = preload;
1680 self
1681 }
1682
1683 pub fn with_preload_page_index(mut self, preload: bool) -> Self {
1685 self.preload_page_index = preload;
1686 self
1687 }
1688
1689 pub fn with_metadata_size_hint(mut self, hint: usize) -> Self {
1694 self.metadata_size_hint = Some(hint);
1695 self
1696 }
1697}
1698
1699impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
1700 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
1701 Box::pin(
1702 self.r
1703 .read(range.start..range.end)
1704 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
1705 )
1706 }
1707
1708 fn get_metadata(
1711 &mut self,
1712 _options: Option<&'_ ArrowReaderOptions>,
1713 ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
1714 async move {
1715 let reader = ParquetMetaDataReader::new()
1716 .with_prefetch_hint(self.metadata_size_hint)
1717 .with_page_index_policy(PageIndexPolicy::from(self.preload_page_index))
1719 .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
1720 .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index));
1721 let size = self.meta.size;
1722 let meta = reader.load_and_finish(self, size).await?;
1723
1724 Ok(Arc::new(meta))
1725 }
1726 .boxed()
1727 }
1728}
1729
1730fn try_cast_literal(
1737 literal: &Arc<dyn ArrowDatum + Send + Sync>,
1738 column_type: &DataType,
1739) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
1740 let literal_array = literal.get().0;
1741
1742 if literal_array.data_type() == column_type {
1744 return Ok(Arc::clone(literal));
1745 }
1746
1747 let literal_array = cast(literal_array, column_type)?;
1748 Ok(Arc::new(Scalar::new(literal_array)))
1749}
1750
1751#[cfg(test)]
1752mod tests {
1753 use std::collections::{HashMap, HashSet};
1754 use std::fs::File;
1755 use std::sync::Arc;
1756
1757 use arrow_array::cast::AsArray;
1758 use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
1759 use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1760 use futures::TryStreamExt;
1761 use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1762 use parquet::arrow::{ArrowWriter, ProjectionMask};
1763 use parquet::basic::Compression;
1764 use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1765 use parquet::file::properties::WriterProperties;
1766 use parquet::schema::parser::parse_message_type;
1767 use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1768 use roaring::RoaringTreemap;
1769 use tempfile::TempDir;
1770
1771 use crate::ErrorKind;
1772 use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1773 use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1774 use crate::delete_vector::DeleteVector;
1775 use crate::expr::visitors::bound_predicate_visitor::visit;
1776 use crate::expr::{Bind, Predicate, Reference};
1777 use crate::io::FileIO;
1778 use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream};
1779 use crate::spec::{
1780 DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
1781 };
1782
1783 fn table_schema_simple() -> SchemaRef {
1784 Arc::new(
1785 Schema::builder()
1786 .with_schema_id(1)
1787 .with_identifier_field_ids(vec![2])
1788 .with_fields(vec![
1789 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1790 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1791 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1792 NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
1793 ])
1794 .build()
1795 .unwrap(),
1796 )
1797 }
1798
1799 #[test]
1800 fn test_collect_field_id() {
1801 let schema = table_schema_simple();
1802 let expr = Reference::new("qux").is_null();
1803 let bound_expr = expr.bind(schema, true).unwrap();
1804
1805 let mut visitor = CollectFieldIdVisitor {
1806 field_ids: HashSet::default(),
1807 };
1808 visit(&mut visitor, &bound_expr).unwrap();
1809
1810 let mut expected = HashSet::default();
1811 expected.insert(4_i32);
1812
1813 assert_eq!(visitor.field_ids, expected);
1814 }
1815
1816 #[test]
1817 fn test_collect_field_id_with_and() {
1818 let schema = table_schema_simple();
1819 let expr = Reference::new("qux")
1820 .is_null()
1821 .and(Reference::new("baz").is_null());
1822 let bound_expr = expr.bind(schema, true).unwrap();
1823
1824 let mut visitor = CollectFieldIdVisitor {
1825 field_ids: HashSet::default(),
1826 };
1827 visit(&mut visitor, &bound_expr).unwrap();
1828
1829 let mut expected = HashSet::default();
1830 expected.insert(4_i32);
1831 expected.insert(3);
1832
1833 assert_eq!(visitor.field_ids, expected);
1834 }
1835
1836 #[test]
1837 fn test_collect_field_id_with_or() {
1838 let schema = table_schema_simple();
1839 let expr = Reference::new("qux")
1840 .is_null()
1841 .or(Reference::new("baz").is_null());
1842 let bound_expr = expr.bind(schema, true).unwrap();
1843
1844 let mut visitor = CollectFieldIdVisitor {
1845 field_ids: HashSet::default(),
1846 };
1847 visit(&mut visitor, &bound_expr).unwrap();
1848
1849 let mut expected = HashSet::default();
1850 expected.insert(4_i32);
1851 expected.insert(3);
1852
1853 assert_eq!(visitor.field_ids, expected);
1854 }
1855
1856 #[test]
1857 fn test_arrow_projection_mask() {
1858 let schema = Arc::new(
1859 Schema::builder()
1860 .with_schema_id(1)
1861 .with_identifier_field_ids(vec![1])
1862 .with_fields(vec![
1863 NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
1864 NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
1865 NestedField::optional(
1866 3,
1867 "c3",
1868 Type::Primitive(PrimitiveType::Decimal {
1869 precision: 38,
1870 scale: 3,
1871 }),
1872 )
1873 .into(),
1874 ])
1875 .build()
1876 .unwrap(),
1877 );
1878 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1879 Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
1880 PARQUET_FIELD_ID_META_KEY.to_string(),
1881 "1".to_string(),
1882 )])),
1883 Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
1885 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1886 ),
1887 Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
1889 PARQUET_FIELD_ID_META_KEY.to_string(),
1890 "3".to_string(),
1891 )])),
1892 ]));
1893
1894 let message_type = "
1895message schema {
1896 required binary c1 (STRING) = 1;
1897 optional int32 c2 (INTEGER(8,true)) = 2;
1898 optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
1899}
1900 ";
1901 let parquet_type = parse_message_type(message_type).expect("should parse schema");
1902 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
1903
1904 let err = ArrowReader::get_arrow_projection_mask(
1906 &[1, 2, 3],
1907 &schema,
1908 &parquet_schema,
1909 &arrow_schema,
1910 false,
1911 )
1912 .unwrap_err();
1913
1914 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1915 assert_eq!(
1916 err.to_string(),
1917 "DataInvalid => Unsupported Arrow data type: Duration(µs)".to_string()
1918 );
1919
1920 let err = ArrowReader::get_arrow_projection_mask(
1922 &[1, 3],
1923 &schema,
1924 &parquet_schema,
1925 &arrow_schema,
1926 false,
1927 )
1928 .unwrap_err();
1929
1930 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1931 assert_eq!(
1932 err.to_string(),
1933 "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
1934 );
1935
1936 let mask = ArrowReader::get_arrow_projection_mask(
1938 &[1],
1939 &schema,
1940 &parquet_schema,
1941 &arrow_schema,
1942 false,
1943 )
1944 .expect("Some ProjectionMask");
1945 assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
1946 }
1947
1948 #[tokio::test]
1949 async fn test_kleene_logic_or_behaviour() {
1950 let predicate = Reference::new("a")
1952 .is_null()
1953 .or(Reference::new("a").equal_to(Datum::string("foo")));
1954
1955 let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1957
1958 let expected = vec![None, Some("foo".to_string())];
1960
1961 let (file_io, schema, table_location, _temp_dir) =
1962 setup_kleene_logic(data_for_col_a, DataType::Utf8);
1963 let reader = ArrowReaderBuilder::new(file_io).build();
1964
1965 let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1966
1967 assert_eq!(result_data, expected);
1968 }
1969
1970 #[tokio::test]
1971 async fn test_kleene_logic_and_behaviour() {
1972 let predicate = Reference::new("a")
1974 .is_not_null()
1975 .and(Reference::new("a").not_equal_to(Datum::string("foo")));
1976
1977 let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1979
1980 let expected = vec![Some("bar".to_string())];
1982
1983 let (file_io, schema, table_location, _temp_dir) =
1984 setup_kleene_logic(data_for_col_a, DataType::Utf8);
1985 let reader = ArrowReaderBuilder::new(file_io).build();
1986
1987 let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1988
1989 assert_eq!(result_data, expected);
1990 }
1991
1992 #[tokio::test]
1993 async fn test_predicate_cast_literal() {
1994 let predicates = vec![
1995 (Reference::new("a").equal_to(Datum::string("foo")), vec![
1997 Some("foo".to_string()),
1998 ]),
1999 (
2001 Reference::new("a").not_equal_to(Datum::string("foo")),
2002 vec![Some("bar".to_string())],
2003 ),
2004 (Reference::new("a").starts_with(Datum::string("f")), vec![
2006 Some("foo".to_string()),
2007 ]),
2008 (
2010 Reference::new("a").not_starts_with(Datum::string("f")),
2011 vec![Some("bar".to_string())],
2012 ),
2013 (Reference::new("a").less_than(Datum::string("foo")), vec![
2015 Some("bar".to_string()),
2016 ]),
2017 (
2019 Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
2020 vec![Some("foo".to_string()), Some("bar".to_string())],
2021 ),
2022 (
2024 Reference::new("a").greater_than(Datum::string("bar")),
2025 vec![Some("foo".to_string())],
2026 ),
2027 (
2029 Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
2030 vec![Some("foo".to_string())],
2031 ),
2032 (
2034 Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
2035 vec![Some("foo".to_string())],
2036 ),
2037 (
2039 Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
2040 vec![Some("bar".to_string())],
2041 ),
2042 ];
2043
2044 let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
2046
2047 let (file_io, schema, table_location, _temp_dir) =
2048 setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
2049 let reader = ArrowReaderBuilder::new(file_io).build();
2050
2051 for (predicate, expected) in predicates {
2052 println!("testing predicate {predicate}");
2053 let result_data = test_perform_read(
2054 predicate.clone(),
2055 schema.clone(),
2056 table_location.clone(),
2057 reader.clone(),
2058 )
2059 .await;
2060
2061 assert_eq!(result_data, expected, "predicate={predicate}");
2062 }
2063 }
2064
2065 async fn test_perform_read(
2066 predicate: Predicate,
2067 schema: SchemaRef,
2068 table_location: String,
2069 reader: ArrowReader,
2070 ) -> Vec<Option<String>> {
2071 let tasks = Box::pin(futures::stream::iter(
2072 vec![Ok(FileScanTask {
2073 start: 0,
2074 length: 0,
2075 record_count: None,
2076 data_file_path: format!("{table_location}/1.parquet"),
2077 data_file_format: DataFileFormat::Parquet,
2078 schema: schema.clone(),
2079 project_field_ids: vec![1],
2080 predicate: Some(predicate.bind(schema, true).unwrap()),
2081 deletes: vec![],
2082 partition: None,
2083 partition_spec: None,
2084 name_mapping: None,
2085 })]
2086 .into_iter(),
2087 )) as FileScanTaskStream;
2088
2089 let result = reader
2090 .read(tasks)
2091 .unwrap()
2092 .try_collect::<Vec<RecordBatch>>()
2093 .await
2094 .unwrap();
2095
2096 result[0].columns()[0]
2097 .as_string_opt::<i32>()
2098 .unwrap()
2099 .iter()
2100 .map(|v| v.map(ToOwned::to_owned))
2101 .collect::<Vec<_>>()
2102 }
2103
2104 fn setup_kleene_logic(
2105 data_for_col_a: Vec<Option<String>>,
2106 col_a_type: DataType,
2107 ) -> (FileIO, SchemaRef, String, TempDir) {
2108 let schema = Arc::new(
2109 Schema::builder()
2110 .with_schema_id(1)
2111 .with_fields(vec![
2112 NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
2113 ])
2114 .build()
2115 .unwrap(),
2116 );
2117
2118 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2119 Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
2120 PARQUET_FIELD_ID_META_KEY.to_string(),
2121 "1".to_string(),
2122 )])),
2123 ]));
2124
2125 let tmp_dir = TempDir::new().unwrap();
2126 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2127
2128 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2129
2130 let col = match col_a_type {
2131 DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
2132 DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
2133 _ => panic!("unexpected col_a_type"),
2134 };
2135
2136 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
2137
2138 let props = WriterProperties::builder()
2140 .set_compression(Compression::SNAPPY)
2141 .build();
2142
2143 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
2144 let mut writer =
2145 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
2146
2147 writer.write(&to_write).expect("Writing batch");
2148
2149 writer.close().unwrap();
2151
2152 (file_io, schema, table_location, tmp_dir)
2153 }
2154
2155 #[test]
2156 fn test_build_deletes_row_selection() {
2157 let schema_descr = get_test_schema_descr();
2158
2159 let mut columns = vec![];
2160 for ptr in schema_descr.columns() {
2161 let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
2162 columns.push(column);
2163 }
2164
2165 let row_groups_metadata = vec![
2166 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
2167 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
2168 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
2169 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
2170 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
2171 ];
2172
2173 let selected_row_groups = Some(vec![1, 3]);
2174
2175 let positional_deletes = RoaringTreemap::from_iter(&[
2182 1, 3, 4, 5, 998, 999, 1000, 1010, 1011, 1012, 1498, 1499, 1500, 1501, 1600, 1999, 2000, 2001, 2100, 2200, 2201, 2202, 2999, 3000, ]);
2200
2201 let positional_deletes = DeleteVector::new(positional_deletes);
2202
2203 let result = ArrowReader::build_deletes_row_selection(
2205 &row_groups_metadata,
2206 &selected_row_groups,
2207 &positional_deletes,
2208 )
2209 .unwrap();
2210
2211 let expected = RowSelection::from(vec![
2212 RowSelector::skip(1),
2213 RowSelector::select(9),
2214 RowSelector::skip(3),
2215 RowSelector::select(485),
2216 RowSelector::skip(4),
2217 RowSelector::select(98),
2218 RowSelector::skip(1),
2219 RowSelector::select(99),
2220 RowSelector::skip(3),
2221 RowSelector::select(796),
2222 RowSelector::skip(1),
2223 ]);
2224
2225 assert_eq!(result, expected);
2226
2227 let result = ArrowReader::build_deletes_row_selection(
2229 &row_groups_metadata,
2230 &None,
2231 &positional_deletes,
2232 )
2233 .unwrap();
2234
2235 let expected = RowSelection::from(vec![
2236 RowSelector::select(1),
2237 RowSelector::skip(1),
2238 RowSelector::select(1),
2239 RowSelector::skip(3),
2240 RowSelector::select(992),
2241 RowSelector::skip(3),
2242 RowSelector::select(9),
2243 RowSelector::skip(3),
2244 RowSelector::select(485),
2245 RowSelector::skip(4),
2246 RowSelector::select(98),
2247 RowSelector::skip(1),
2248 RowSelector::select(398),
2249 RowSelector::skip(3),
2250 RowSelector::select(98),
2251 RowSelector::skip(1),
2252 RowSelector::select(99),
2253 RowSelector::skip(3),
2254 RowSelector::select(796),
2255 RowSelector::skip(2),
2256 RowSelector::select(499),
2257 ]);
2258
2259 assert_eq!(result, expected);
2260 }
2261
2262 fn build_test_row_group_meta(
2263 schema_descr: SchemaDescPtr,
2264 columns: Vec<ColumnChunkMetaData>,
2265 num_rows: i64,
2266 ordinal: i16,
2267 ) -> RowGroupMetaData {
2268 RowGroupMetaData::builder(schema_descr.clone())
2269 .set_num_rows(num_rows)
2270 .set_total_byte_size(2000)
2271 .set_column_metadata(columns)
2272 .set_ordinal(ordinal)
2273 .build()
2274 .unwrap()
2275 }
2276
2277 fn get_test_schema_descr() -> SchemaDescPtr {
2278 use parquet::schema::types::Type as SchemaType;
2279
2280 let schema = SchemaType::group_type_builder("schema")
2281 .with_fields(vec![
2282 Arc::new(
2283 SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
2284 .build()
2285 .unwrap(),
2286 ),
2287 Arc::new(
2288 SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
2289 .build()
2290 .unwrap(),
2291 ),
2292 ])
2293 .build()
2294 .unwrap();
2295
2296 Arc::new(SchemaDescriptor::new(Arc::new(schema)))
2297 }
2298
2299 #[tokio::test]
2301 async fn test_file_splits_respect_byte_ranges() {
2302 use arrow_array::Int32Array;
2303 use parquet::file::reader::{FileReader, SerializedFileReader};
2304
2305 let schema = Arc::new(
2306 Schema::builder()
2307 .with_schema_id(1)
2308 .with_fields(vec![
2309 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2310 ])
2311 .build()
2312 .unwrap(),
2313 );
2314
2315 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2316 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2317 PARQUET_FIELD_ID_META_KEY.to_string(),
2318 "1".to_string(),
2319 )])),
2320 ]));
2321
2322 let tmp_dir = TempDir::new().unwrap();
2323 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2324 let file_path = format!("{table_location}/multi_row_group.parquet");
2325
2326 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2328 (0..100).collect::<Vec<i32>>(),
2329 ))])
2330 .unwrap();
2331 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2332 (100..200).collect::<Vec<i32>>(),
2333 ))])
2334 .unwrap();
2335 let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2336 (200..300).collect::<Vec<i32>>(),
2337 ))])
2338 .unwrap();
2339
2340 let props = WriterProperties::builder()
2341 .set_compression(Compression::SNAPPY)
2342 .set_max_row_group_size(100)
2343 .build();
2344
2345 let file = File::create(&file_path).unwrap();
2346 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2347 writer.write(&batch1).expect("Writing batch 1");
2348 writer.write(&batch2).expect("Writing batch 2");
2349 writer.write(&batch3).expect("Writing batch 3");
2350 writer.close().unwrap();
2351
2352 let file = File::open(&file_path).unwrap();
2354 let reader = SerializedFileReader::new(file).unwrap();
2355 let metadata = reader.metadata();
2356
2357 println!("File has {} row groups", metadata.num_row_groups());
2358 assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
2359
2360 let row_group_0 = metadata.row_group(0);
2362 let row_group_1 = metadata.row_group(1);
2363 let row_group_2 = metadata.row_group(2);
2364
2365 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2367 let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
2368 let file_end = rg2_start + row_group_2.compressed_size() as u64;
2369
2370 println!(
2371 "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
2372 row_group_0.num_rows(),
2373 rg0_start,
2374 row_group_0.compressed_size()
2375 );
2376 println!(
2377 "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
2378 row_group_1.num_rows(),
2379 rg1_start,
2380 row_group_1.compressed_size()
2381 );
2382 println!(
2383 "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
2384 row_group_2.num_rows(),
2385 rg2_start,
2386 row_group_2.compressed_size()
2387 );
2388
2389 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2390 let reader = ArrowReaderBuilder::new(file_io).build();
2391
2392 let task1 = FileScanTask {
2394 start: rg0_start,
2395 length: row_group_0.compressed_size() as u64,
2396 record_count: Some(100),
2397 data_file_path: file_path.clone(),
2398 data_file_format: DataFileFormat::Parquet,
2399 schema: schema.clone(),
2400 project_field_ids: vec![1],
2401 predicate: None,
2402 deletes: vec![],
2403 partition: None,
2404 partition_spec: None,
2405 name_mapping: None,
2406 };
2407
2408 let task2 = FileScanTask {
2410 start: rg1_start,
2411 length: file_end - rg1_start,
2412 record_count: Some(200),
2413 data_file_path: file_path.clone(),
2414 data_file_format: DataFileFormat::Parquet,
2415 schema: schema.clone(),
2416 project_field_ids: vec![1],
2417 predicate: None,
2418 deletes: vec![],
2419 partition: None,
2420 partition_spec: None,
2421 name_mapping: None,
2422 };
2423
2424 let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
2425 let result1 = reader
2426 .clone()
2427 .read(tasks1)
2428 .unwrap()
2429 .try_collect::<Vec<RecordBatch>>()
2430 .await
2431 .unwrap();
2432
2433 let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
2434 println!(
2435 "Task 1 (bytes {}-{}) returned {} rows",
2436 rg0_start,
2437 rg0_start + row_group_0.compressed_size() as u64,
2438 total_rows_task1
2439 );
2440
2441 let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
2442 let result2 = reader
2443 .read(tasks2)
2444 .unwrap()
2445 .try_collect::<Vec<RecordBatch>>()
2446 .await
2447 .unwrap();
2448
2449 let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
2450 println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
2451
2452 assert_eq!(
2453 total_rows_task1, 100,
2454 "Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
2455 );
2456
2457 assert_eq!(
2458 total_rows_task2, 200,
2459 "Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
2460 );
2461
2462 if total_rows_task1 > 0 {
2464 let first_batch = &result1[0];
2465 let id_col = first_batch
2466 .column(0)
2467 .as_primitive::<arrow_array::types::Int32Type>();
2468 let first_val = id_col.value(0);
2469 let last_val = id_col.value(id_col.len() - 1);
2470 println!("Task 1 data range: {first_val} to {last_val}");
2471
2472 assert_eq!(first_val, 0, "Task 1 should start with id=0");
2473 assert_eq!(last_val, 99, "Task 1 should end with id=99");
2474 }
2475
2476 if total_rows_task2 > 0 {
2477 let first_batch = &result2[0];
2478 let id_col = first_batch
2479 .column(0)
2480 .as_primitive::<arrow_array::types::Int32Type>();
2481 let first_val = id_col.value(0);
2482 println!("Task 2 first value: {first_val}");
2483
2484 assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
2485 }
2486 }
2487
2488 #[tokio::test]
2494 async fn test_schema_evolution_add_column() {
2495 use arrow_array::{Array, Int32Array};
2496
2497 let new_schema = Arc::new(
2499 Schema::builder()
2500 .with_schema_id(2)
2501 .with_fields(vec![
2502 NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
2503 NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
2504 ])
2505 .build()
2506 .unwrap(),
2507 );
2508
2509 let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
2511 Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
2512 PARQUET_FIELD_ID_META_KEY.to_string(),
2513 "1".to_string(),
2514 )])),
2515 ]));
2516
2517 let tmp_dir = TempDir::new().unwrap();
2519 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2520 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2521
2522 let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
2523 let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();
2524
2525 let props = WriterProperties::builder()
2526 .set_compression(Compression::SNAPPY)
2527 .build();
2528 let file = File::create(format!("{table_location}/old_file.parquet")).unwrap();
2529 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
2530 writer.write(&to_write).expect("Writing batch");
2531 writer.close().unwrap();
2532
2533 let reader = ArrowReaderBuilder::new(file_io).build();
2535 let tasks = Box::pin(futures::stream::iter(
2536 vec![Ok(FileScanTask {
2537 start: 0,
2538 length: 0,
2539 record_count: None,
2540 data_file_path: format!("{table_location}/old_file.parquet"),
2541 data_file_format: DataFileFormat::Parquet,
2542 schema: new_schema.clone(),
2543 project_field_ids: vec![1, 2], predicate: None,
2545 deletes: vec![],
2546 partition: None,
2547 partition_spec: None,
2548 name_mapping: None,
2549 })]
2550 .into_iter(),
2551 )) as FileScanTaskStream;
2552
2553 let result = reader
2554 .read(tasks)
2555 .unwrap()
2556 .try_collect::<Vec<RecordBatch>>()
2557 .await
2558 .unwrap();
2559
2560 assert_eq!(result.len(), 1);
2562 let batch = &result[0];
2563
2564 assert_eq!(batch.num_columns(), 2);
2566 assert_eq!(batch.num_rows(), 3);
2567
2568 let col_a = batch
2570 .column(0)
2571 .as_primitive::<arrow_array::types::Int32Type>();
2572 assert_eq!(col_a.values(), &[1, 2, 3]);
2573
2574 let col_b = batch
2576 .column(1)
2577 .as_primitive::<arrow_array::types::Int32Type>();
2578 assert_eq!(col_b.null_count(), 3);
2579 assert!(col_b.is_null(0));
2580 assert!(col_b.is_null(1));
2581 assert!(col_b.is_null(2));
2582 }
2583
2584 #[tokio::test]
2602 async fn test_position_delete_across_multiple_row_groups() {
2603 use arrow_array::{Int32Array, Int64Array};
2604 use parquet::file::reader::{FileReader, SerializedFileReader};
2605
2606 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2608 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2609
2610 let tmp_dir = TempDir::new().unwrap();
2611 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2612
2613 let table_schema = Arc::new(
2615 Schema::builder()
2616 .with_schema_id(1)
2617 .with_fields(vec![
2618 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2619 ])
2620 .build()
2621 .unwrap(),
2622 );
2623
2624 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2625 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2626 PARQUET_FIELD_ID_META_KEY.to_string(),
2627 "1".to_string(),
2628 )])),
2629 ]));
2630
2631 let data_file_path = format!("{table_location}/data.parquet");
2635
2636 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2637 Int32Array::from_iter_values(1..=100),
2638 )])
2639 .unwrap();
2640
2641 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2642 Int32Array::from_iter_values(101..=200),
2643 )])
2644 .unwrap();
2645
2646 let props = WriterProperties::builder()
2648 .set_compression(Compression::SNAPPY)
2649 .set_max_row_group_size(100)
2650 .build();
2651
2652 let file = File::create(&data_file_path).unwrap();
2653 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2654 writer.write(&batch1).expect("Writing batch 1");
2655 writer.write(&batch2).expect("Writing batch 2");
2656 writer.close().unwrap();
2657
2658 let verify_file = File::open(&data_file_path).unwrap();
2660 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2661 assert_eq!(
2662 verify_reader.metadata().num_row_groups(),
2663 2,
2664 "Should have 2 row groups"
2665 );
2666
2667 let delete_file_path = format!("{table_location}/deletes.parquet");
2669
2670 let delete_schema = Arc::new(ArrowSchema::new(vec![
2671 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2672 PARQUET_FIELD_ID_META_KEY.to_string(),
2673 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2674 )])),
2675 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2676 PARQUET_FIELD_ID_META_KEY.to_string(),
2677 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2678 )])),
2679 ]));
2680
2681 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2683 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2684 Arc::new(Int64Array::from_iter_values(vec![199i64])),
2685 ])
2686 .unwrap();
2687
2688 let delete_props = WriterProperties::builder()
2689 .set_compression(Compression::SNAPPY)
2690 .build();
2691
2692 let delete_file = File::create(&delete_file_path).unwrap();
2693 let mut delete_writer =
2694 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2695 delete_writer.write(&delete_batch).unwrap();
2696 delete_writer.close().unwrap();
2697
2698 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2700 let reader = ArrowReaderBuilder::new(file_io).build();
2701
2702 let task = FileScanTask {
2703 start: 0,
2704 length: 0,
2705 record_count: Some(200),
2706 data_file_path: data_file_path.clone(),
2707 data_file_format: DataFileFormat::Parquet,
2708 schema: table_schema.clone(),
2709 project_field_ids: vec![1],
2710 predicate: None,
2711 deletes: vec![FileScanTaskDeleteFile {
2712 file_path: delete_file_path,
2713 file_type: DataContentType::PositionDeletes,
2714 partition_spec_id: 0,
2715 equality_ids: None,
2716 }],
2717 partition: None,
2718 partition_spec: None,
2719 name_mapping: None,
2720 };
2721
2722 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2723 let result = reader
2724 .read(tasks)
2725 .unwrap()
2726 .try_collect::<Vec<RecordBatch>>()
2727 .await
2728 .unwrap();
2729
2730 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2732
2733 println!("Total rows read: {total_rows}");
2734 println!("Expected: 199 rows (deleted row 199 which had id=200)");
2735
2736 assert_eq!(
2738 total_rows, 199,
2739 "Expected 199 rows after deleting row 199, but got {total_rows} rows. \
2740 The bug causes position deletes in later row groups to be ignored."
2741 );
2742
2743 let all_ids: Vec<i32> = result
2745 .iter()
2746 .flat_map(|batch| {
2747 batch
2748 .column(0)
2749 .as_primitive::<arrow_array::types::Int32Type>()
2750 .values()
2751 .iter()
2752 .copied()
2753 })
2754 .collect();
2755
2756 assert!(
2757 !all_ids.contains(&200),
2758 "Row with id=200 should be deleted but was found in results"
2759 );
2760
2761 let expected_ids: Vec<i32> = (1..=199).collect();
2763 assert_eq!(
2764 all_ids, expected_ids,
2765 "Should have ids 1-199 but got different values"
2766 );
2767 }
2768
2769 #[tokio::test]
2795 async fn test_position_delete_with_row_group_selection() {
2796 use arrow_array::{Int32Array, Int64Array};
2797 use parquet::file::reader::{FileReader, SerializedFileReader};
2798
2799 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2801 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2802
2803 let tmp_dir = TempDir::new().unwrap();
2804 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2805
2806 let table_schema = Arc::new(
2808 Schema::builder()
2809 .with_schema_id(1)
2810 .with_fields(vec![
2811 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2812 ])
2813 .build()
2814 .unwrap(),
2815 );
2816
2817 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2818 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2819 PARQUET_FIELD_ID_META_KEY.to_string(),
2820 "1".to_string(),
2821 )])),
2822 ]));
2823
2824 let data_file_path = format!("{table_location}/data.parquet");
2828
2829 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2830 Int32Array::from_iter_values(1..=100),
2831 )])
2832 .unwrap();
2833
2834 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2835 Int32Array::from_iter_values(101..=200),
2836 )])
2837 .unwrap();
2838
2839 let props = WriterProperties::builder()
2841 .set_compression(Compression::SNAPPY)
2842 .set_max_row_group_size(100)
2843 .build();
2844
2845 let file = File::create(&data_file_path).unwrap();
2846 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2847 writer.write(&batch1).expect("Writing batch 1");
2848 writer.write(&batch2).expect("Writing batch 2");
2849 writer.close().unwrap();
2850
2851 let verify_file = File::open(&data_file_path).unwrap();
2853 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2854 assert_eq!(
2855 verify_reader.metadata().num_row_groups(),
2856 2,
2857 "Should have 2 row groups"
2858 );
2859
2860 let delete_file_path = format!("{table_location}/deletes.parquet");
2862
2863 let delete_schema = Arc::new(ArrowSchema::new(vec![
2864 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2865 PARQUET_FIELD_ID_META_KEY.to_string(),
2866 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2867 )])),
2868 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2869 PARQUET_FIELD_ID_META_KEY.to_string(),
2870 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2871 )])),
2872 ]));
2873
2874 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2876 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2877 Arc::new(Int64Array::from_iter_values(vec![199i64])),
2878 ])
2879 .unwrap();
2880
2881 let delete_props = WriterProperties::builder()
2882 .set_compression(Compression::SNAPPY)
2883 .build();
2884
2885 let delete_file = File::create(&delete_file_path).unwrap();
2886 let mut delete_writer =
2887 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2888 delete_writer.write(&delete_batch).unwrap();
2889 delete_writer.close().unwrap();
2890
2891 let metadata_file = File::open(&data_file_path).unwrap();
2894 let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
2895 let metadata = metadata_reader.metadata();
2896
2897 let row_group_0 = metadata.row_group(0);
2898 let row_group_1 = metadata.row_group(1);
2899
2900 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2902 let rg1_length = row_group_1.compressed_size() as u64;
2903
2904 println!(
2905 "Row group 0: starts at byte {}, {} bytes compressed",
2906 rg0_start,
2907 row_group_0.compressed_size()
2908 );
2909 println!(
2910 "Row group 1: starts at byte {}, {} bytes compressed",
2911 rg1_start,
2912 row_group_1.compressed_size()
2913 );
2914
2915 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2916 let reader = ArrowReaderBuilder::new(file_io).build();
2917
2918 let task = FileScanTask {
2920 start: rg1_start,
2921 length: rg1_length,
2922 record_count: Some(100), data_file_path: data_file_path.clone(),
2924 data_file_format: DataFileFormat::Parquet,
2925 schema: table_schema.clone(),
2926 project_field_ids: vec![1],
2927 predicate: None,
2928 deletes: vec![FileScanTaskDeleteFile {
2929 file_path: delete_file_path,
2930 file_type: DataContentType::PositionDeletes,
2931 partition_spec_id: 0,
2932 equality_ids: None,
2933 }],
2934 partition: None,
2935 partition_spec: None,
2936 name_mapping: None,
2937 };
2938
2939 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2940 let result = reader
2941 .read(tasks)
2942 .unwrap()
2943 .try_collect::<Vec<RecordBatch>>()
2944 .await
2945 .unwrap();
2946
2947 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2950
2951 println!("Total rows read from row group 1: {total_rows}");
2952 println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)");
2953
2954 assert_eq!(
2956 total_rows, 99,
2957 "Expected 99 rows from row group 1 after deleting position 199, but got {total_rows} rows. \
2958 The bug causes position deletes to be lost when advance_to() is followed by next() \
2959 when skipping unselected row groups."
2960 );
2961
2962 let all_ids: Vec<i32> = result
2964 .iter()
2965 .flat_map(|batch| {
2966 batch
2967 .column(0)
2968 .as_primitive::<arrow_array::types::Int32Type>()
2969 .values()
2970 .iter()
2971 .copied()
2972 })
2973 .collect();
2974
2975 assert!(
2976 !all_ids.contains(&200),
2977 "Row with id=200 should be deleted but was found in results"
2978 );
2979
2980 let expected_ids: Vec<i32> = (101..=199).collect();
2982 assert_eq!(
2983 all_ids, expected_ids,
2984 "Should have ids 101-199 but got different values"
2985 );
2986 }
2987 #[tokio::test]
3016 async fn test_position_delete_in_skipped_row_group() {
3017 use arrow_array::{Int32Array, Int64Array};
3018 use parquet::file::reader::{FileReader, SerializedFileReader};
3019
3020 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
3022 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
3023
3024 let tmp_dir = TempDir::new().unwrap();
3025 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3026
3027 let table_schema = Arc::new(
3029 Schema::builder()
3030 .with_schema_id(1)
3031 .with_fields(vec![
3032 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3033 ])
3034 .build()
3035 .unwrap(),
3036 );
3037
3038 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3039 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3040 PARQUET_FIELD_ID_META_KEY.to_string(),
3041 "1".to_string(),
3042 )])),
3043 ]));
3044
3045 let data_file_path = format!("{table_location}/data.parquet");
3049
3050 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3051 Int32Array::from_iter_values(1..=100),
3052 )])
3053 .unwrap();
3054
3055 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3056 Int32Array::from_iter_values(101..=200),
3057 )])
3058 .unwrap();
3059
3060 let props = WriterProperties::builder()
3062 .set_compression(Compression::SNAPPY)
3063 .set_max_row_group_size(100)
3064 .build();
3065
3066 let file = File::create(&data_file_path).unwrap();
3067 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3068 writer.write(&batch1).expect("Writing batch 1");
3069 writer.write(&batch2).expect("Writing batch 2");
3070 writer.close().unwrap();
3071
3072 let verify_file = File::open(&data_file_path).unwrap();
3074 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
3075 assert_eq!(
3076 verify_reader.metadata().num_row_groups(),
3077 2,
3078 "Should have 2 row groups"
3079 );
3080
3081 let delete_file_path = format!("{table_location}/deletes.parquet");
3083
3084 let delete_schema = Arc::new(ArrowSchema::new(vec![
3085 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
3086 PARQUET_FIELD_ID_META_KEY.to_string(),
3087 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
3088 )])),
3089 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
3090 PARQUET_FIELD_ID_META_KEY.to_string(),
3091 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
3092 )])),
3093 ]));
3094
3095 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
3097 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
3098 Arc::new(Int64Array::from_iter_values(vec![0i64])),
3099 ])
3100 .unwrap();
3101
3102 let delete_props = WriterProperties::builder()
3103 .set_compression(Compression::SNAPPY)
3104 .build();
3105
3106 let delete_file = File::create(&delete_file_path).unwrap();
3107 let mut delete_writer =
3108 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
3109 delete_writer.write(&delete_batch).unwrap();
3110 delete_writer.close().unwrap();
3111
3112 let metadata_file = File::open(&data_file_path).unwrap();
3115 let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
3116 let metadata = metadata_reader.metadata();
3117
3118 let row_group_0 = metadata.row_group(0);
3119 let row_group_1 = metadata.row_group(1);
3120
3121 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
3123 let rg1_length = row_group_1.compressed_size() as u64;
3124
3125 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3126 let reader = ArrowReaderBuilder::new(file_io).build();
3127
3128 let task = FileScanTask {
3130 start: rg1_start,
3131 length: rg1_length,
3132 record_count: Some(100), data_file_path: data_file_path.clone(),
3134 data_file_format: DataFileFormat::Parquet,
3135 schema: table_schema.clone(),
3136 project_field_ids: vec![1],
3137 predicate: None,
3138 deletes: vec![FileScanTaskDeleteFile {
3139 file_path: delete_file_path,
3140 file_type: DataContentType::PositionDeletes,
3141 partition_spec_id: 0,
3142 equality_ids: None,
3143 }],
3144 partition: None,
3145 partition_spec: None,
3146 name_mapping: None,
3147 };
3148
3149 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3150 let result = reader
3151 .read(tasks)
3152 .unwrap()
3153 .try_collect::<Vec<RecordBatch>>()
3154 .await
3155 .unwrap();
3156
3157 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3160
3161 assert_eq!(
3162 total_rows, 100,
3163 "Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \
3164 If this hangs or fails, it indicates the cached delete index was not updated after advance_to()."
3165 );
3166
3167 let all_ids: Vec<i32> = result
3169 .iter()
3170 .flat_map(|batch| {
3171 batch
3172 .column(0)
3173 .as_primitive::<arrow_array::types::Int32Type>()
3174 .values()
3175 .iter()
3176 .copied()
3177 })
3178 .collect();
3179
3180 let expected_ids: Vec<i32> = (101..=200).collect();
3181 assert_eq!(
3182 all_ids, expected_ids,
3183 "Should have ids 101-200 (all of row group 1)"
3184 );
3185 }
3186
3187 #[tokio::test]
3193 async fn test_read_parquet_file_without_field_ids() {
3194 let schema = Arc::new(
3195 Schema::builder()
3196 .with_schema_id(1)
3197 .with_fields(vec![
3198 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3199 NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3200 ])
3201 .build()
3202 .unwrap(),
3203 );
3204
3205 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3207 Field::new("name", DataType::Utf8, false),
3208 Field::new("age", DataType::Int32, false),
3209 ]));
3210
3211 let tmp_dir = TempDir::new().unwrap();
3212 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3213 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3214
3215 let name_data = vec!["Alice", "Bob", "Charlie"];
3216 let age_data = vec![30, 25, 35];
3217
3218 use arrow_array::Int32Array;
3219 let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef;
3220 let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
3221
3222 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap();
3223
3224 let props = WriterProperties::builder()
3225 .set_compression(Compression::SNAPPY)
3226 .build();
3227
3228 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3229 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3230
3231 writer.write(&to_write).expect("Writing batch");
3232 writer.close().unwrap();
3233
3234 let reader = ArrowReaderBuilder::new(file_io).build();
3235
3236 let tasks = Box::pin(futures::stream::iter(
3237 vec![Ok(FileScanTask {
3238 start: 0,
3239 length: 0,
3240 record_count: None,
3241 data_file_path: format!("{table_location}/1.parquet"),
3242 data_file_format: DataFileFormat::Parquet,
3243 schema: schema.clone(),
3244 project_field_ids: vec![1, 2],
3245 predicate: None,
3246 deletes: vec![],
3247 partition: None,
3248 partition_spec: None,
3249 name_mapping: None,
3250 })]
3251 .into_iter(),
3252 )) as FileScanTaskStream;
3253
3254 let result = reader
3255 .read(tasks)
3256 .unwrap()
3257 .try_collect::<Vec<RecordBatch>>()
3258 .await
3259 .unwrap();
3260
3261 assert_eq!(result.len(), 1);
3262 let batch = &result[0];
3263 assert_eq!(batch.num_rows(), 3);
3264 assert_eq!(batch.num_columns(), 2);
3265
3266 let name_array = batch.column(0).as_string::<i32>();
3268 assert_eq!(name_array.value(0), "Alice");
3269 assert_eq!(name_array.value(1), "Bob");
3270 assert_eq!(name_array.value(2), "Charlie");
3271
3272 let age_array = batch
3273 .column(1)
3274 .as_primitive::<arrow_array::types::Int32Type>();
3275 assert_eq!(age_array.value(0), 30);
3276 assert_eq!(age_array.value(1), 25);
3277 assert_eq!(age_array.value(2), 35);
3278 }
3279
3280 #[tokio::test]
3284 async fn test_read_parquet_without_field_ids_partial_projection() {
3285 use arrow_array::Int32Array;
3286
3287 let schema = Arc::new(
3288 Schema::builder()
3289 .with_schema_id(1)
3290 .with_fields(vec![
3291 NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(),
3292 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
3293 NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(),
3294 NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(),
3295 ])
3296 .build()
3297 .unwrap(),
3298 );
3299
3300 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3301 Field::new("col1", DataType::Utf8, false),
3302 Field::new("col2", DataType::Int32, false),
3303 Field::new("col3", DataType::Utf8, false),
3304 Field::new("col4", DataType::Int32, false),
3305 ]));
3306
3307 let tmp_dir = TempDir::new().unwrap();
3308 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3309 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3310
3311 let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
3312 let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3313 let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef;
3314 let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
3315
3316 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
3317 col1_data, col2_data, col3_data, col4_data,
3318 ])
3319 .unwrap();
3320
3321 let props = WriterProperties::builder()
3322 .set_compression(Compression::SNAPPY)
3323 .build();
3324
3325 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3326 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3327
3328 writer.write(&to_write).expect("Writing batch");
3329 writer.close().unwrap();
3330
3331 let reader = ArrowReaderBuilder::new(file_io).build();
3332
3333 let tasks = Box::pin(futures::stream::iter(
3334 vec![Ok(FileScanTask {
3335 start: 0,
3336 length: 0,
3337 record_count: None,
3338 data_file_path: format!("{table_location}/1.parquet"),
3339 data_file_format: DataFileFormat::Parquet,
3340 schema: schema.clone(),
3341 project_field_ids: vec![1, 3],
3342 predicate: None,
3343 deletes: vec![],
3344 partition: None,
3345 partition_spec: None,
3346 name_mapping: None,
3347 })]
3348 .into_iter(),
3349 )) as FileScanTaskStream;
3350
3351 let result = reader
3352 .read(tasks)
3353 .unwrap()
3354 .try_collect::<Vec<RecordBatch>>()
3355 .await
3356 .unwrap();
3357
3358 assert_eq!(result.len(), 1);
3359 let batch = &result[0];
3360 assert_eq!(batch.num_rows(), 2);
3361 assert_eq!(batch.num_columns(), 2);
3362
3363 let col1_array = batch.column(0).as_string::<i32>();
3364 assert_eq!(col1_array.value(0), "a");
3365 assert_eq!(col1_array.value(1), "b");
3366
3367 let col3_array = batch.column(1).as_string::<i32>();
3368 assert_eq!(col3_array.value(0), "c");
3369 assert_eq!(col3_array.value(1), "d");
3370 }
3371
3372 #[tokio::test]
3376 async fn test_read_parquet_without_field_ids_schema_evolution() {
3377 use arrow_array::{Array, Int32Array};
3378
3379 let schema = Arc::new(
3381 Schema::builder()
3382 .with_schema_id(1)
3383 .with_fields(vec![
3384 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3385 NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3386 NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(),
3387 ])
3388 .build()
3389 .unwrap(),
3390 );
3391
3392 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3393 Field::new("name", DataType::Utf8, false),
3394 Field::new("age", DataType::Int32, false),
3395 ]));
3396
3397 let tmp_dir = TempDir::new().unwrap();
3398 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3399 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3400
3401 let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3402 let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3403
3404 let to_write =
3405 RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap();
3406
3407 let props = WriterProperties::builder()
3408 .set_compression(Compression::SNAPPY)
3409 .build();
3410
3411 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3412 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3413
3414 writer.write(&to_write).expect("Writing batch");
3415 writer.close().unwrap();
3416
3417 let reader = ArrowReaderBuilder::new(file_io).build();
3418
3419 let tasks = Box::pin(futures::stream::iter(
3420 vec![Ok(FileScanTask {
3421 start: 0,
3422 length: 0,
3423 record_count: None,
3424 data_file_path: format!("{table_location}/1.parquet"),
3425 data_file_format: DataFileFormat::Parquet,
3426 schema: schema.clone(),
3427 project_field_ids: vec![1, 2, 3],
3428 predicate: None,
3429 deletes: vec![],
3430 partition: None,
3431 partition_spec: None,
3432 name_mapping: None,
3433 })]
3434 .into_iter(),
3435 )) as FileScanTaskStream;
3436
3437 let result = reader
3438 .read(tasks)
3439 .unwrap()
3440 .try_collect::<Vec<RecordBatch>>()
3441 .await
3442 .unwrap();
3443
3444 assert_eq!(result.len(), 1);
3445 let batch = &result[0];
3446 assert_eq!(batch.num_rows(), 2);
3447 assert_eq!(batch.num_columns(), 3);
3448
3449 let name_array = batch.column(0).as_string::<i32>();
3450 assert_eq!(name_array.value(0), "Alice");
3451 assert_eq!(name_array.value(1), "Bob");
3452
3453 let age_array = batch
3454 .column(1)
3455 .as_primitive::<arrow_array::types::Int32Type>();
3456 assert_eq!(age_array.value(0), 30);
3457 assert_eq!(age_array.value(1), 25);
3458
3459 let city_array = batch.column(2).as_string::<i32>();
3461 assert_eq!(city_array.null_count(), 2);
3462 assert!(city_array.is_null(0));
3463 assert!(city_array.is_null(1));
3464 }
3465
3466 #[tokio::test]
3469 async fn test_read_parquet_without_field_ids_multiple_row_groups() {
3470 use arrow_array::Int32Array;
3471
3472 let schema = Arc::new(
3473 Schema::builder()
3474 .with_schema_id(1)
3475 .with_fields(vec![
3476 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3477 NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(),
3478 ])
3479 .build()
3480 .unwrap(),
3481 );
3482
3483 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3484 Field::new("name", DataType::Utf8, false),
3485 Field::new("value", DataType::Int32, false),
3486 ]));
3487
3488 let tmp_dir = TempDir::new().unwrap();
3489 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3490 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3491
3492 let props = WriterProperties::builder()
3494 .set_compression(Compression::SNAPPY)
3495 .set_write_batch_size(2)
3496 .set_max_row_group_size(2)
3497 .build();
3498
3499 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3500 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3501
3502 for batch_num in 0..3 {
3504 let name_data = Arc::new(StringArray::from(vec![
3505 format!("name_{}", batch_num * 2),
3506 format!("name_{}", batch_num * 2 + 1),
3507 ])) as ArrayRef;
3508 let value_data =
3509 Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef;
3510
3511 let batch =
3512 RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap();
3513 writer.write(&batch).expect("Writing batch");
3514 }
3515 writer.close().unwrap();
3516
3517 let reader = ArrowReaderBuilder::new(file_io).build();
3518
3519 let tasks = Box::pin(futures::stream::iter(
3520 vec![Ok(FileScanTask {
3521 start: 0,
3522 length: 0,
3523 record_count: None,
3524 data_file_path: format!("{table_location}/1.parquet"),
3525 data_file_format: DataFileFormat::Parquet,
3526 schema: schema.clone(),
3527 project_field_ids: vec![1, 2],
3528 predicate: None,
3529 deletes: vec![],
3530 partition: None,
3531 partition_spec: None,
3532 name_mapping: None,
3533 })]
3534 .into_iter(),
3535 )) as FileScanTaskStream;
3536
3537 let result = reader
3538 .read(tasks)
3539 .unwrap()
3540 .try_collect::<Vec<RecordBatch>>()
3541 .await
3542 .unwrap();
3543
3544 assert!(!result.is_empty());
3545
3546 let mut all_names = Vec::new();
3547 let mut all_values = Vec::new();
3548
3549 for batch in &result {
3550 let name_array = batch.column(0).as_string::<i32>();
3551 let value_array = batch
3552 .column(1)
3553 .as_primitive::<arrow_array::types::Int32Type>();
3554
3555 for i in 0..batch.num_rows() {
3556 all_names.push(name_array.value(i).to_string());
3557 all_values.push(value_array.value(i));
3558 }
3559 }
3560
3561 assert_eq!(all_names.len(), 6);
3562 assert_eq!(all_values.len(), 6);
3563
3564 for i in 0..6 {
3565 assert_eq!(all_names[i], format!("name_{i}"));
3566 assert_eq!(all_values[i], i as i32);
3567 }
3568 }
3569
3570 #[tokio::test]
3574 async fn test_read_parquet_without_field_ids_with_struct() {
3575 use arrow_array::{Int32Array, StructArray};
3576 use arrow_schema::Fields;
3577
3578 let schema = Arc::new(
3579 Schema::builder()
3580 .with_schema_id(1)
3581 .with_fields(vec![
3582 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3583 NestedField::required(
3584 2,
3585 "person",
3586 Type::Struct(crate::spec::StructType::new(vec![
3587 NestedField::required(
3588 3,
3589 "name",
3590 Type::Primitive(PrimitiveType::String),
3591 )
3592 .into(),
3593 NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int))
3594 .into(),
3595 ])),
3596 )
3597 .into(),
3598 ])
3599 .build()
3600 .unwrap(),
3601 );
3602
3603 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3604 Field::new("id", DataType::Int32, false),
3605 Field::new(
3606 "person",
3607 DataType::Struct(Fields::from(vec![
3608 Field::new("name", DataType::Utf8, false),
3609 Field::new("age", DataType::Int32, false),
3610 ])),
3611 false,
3612 ),
3613 ]));
3614
3615 let tmp_dir = TempDir::new().unwrap();
3616 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3617 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3618
3619 let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3620 let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3621 let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3622 let person_data = Arc::new(StructArray::from(vec![
3623 (
3624 Arc::new(Field::new("name", DataType::Utf8, false)),
3625 name_data,
3626 ),
3627 (
3628 Arc::new(Field::new("age", DataType::Int32, false)),
3629 age_data,
3630 ),
3631 ])) as ArrayRef;
3632
3633 let to_write =
3634 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap();
3635
3636 let props = WriterProperties::builder()
3637 .set_compression(Compression::SNAPPY)
3638 .build();
3639
3640 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3641 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3642
3643 writer.write(&to_write).expect("Writing batch");
3644 writer.close().unwrap();
3645
3646 let reader = ArrowReaderBuilder::new(file_io).build();
3647
3648 let tasks = Box::pin(futures::stream::iter(
3649 vec![Ok(FileScanTask {
3650 start: 0,
3651 length: 0,
3652 record_count: None,
3653 data_file_path: format!("{table_location}/1.parquet"),
3654 data_file_format: DataFileFormat::Parquet,
3655 schema: schema.clone(),
3656 project_field_ids: vec![1, 2],
3657 predicate: None,
3658 deletes: vec![],
3659 partition: None,
3660 partition_spec: None,
3661 name_mapping: None,
3662 })]
3663 .into_iter(),
3664 )) as FileScanTaskStream;
3665
3666 let result = reader
3667 .read(tasks)
3668 .unwrap()
3669 .try_collect::<Vec<RecordBatch>>()
3670 .await
3671 .unwrap();
3672
3673 assert_eq!(result.len(), 1);
3674 let batch = &result[0];
3675 assert_eq!(batch.num_rows(), 2);
3676 assert_eq!(batch.num_columns(), 2);
3677
3678 let id_array = batch
3679 .column(0)
3680 .as_primitive::<arrow_array::types::Int32Type>();
3681 assert_eq!(id_array.value(0), 1);
3682 assert_eq!(id_array.value(1), 2);
3683
3684 let person_array = batch.column(1).as_struct();
3685 assert_eq!(person_array.num_columns(), 2);
3686
3687 let name_array = person_array.column(0).as_string::<i32>();
3688 assert_eq!(name_array.value(0), "Alice");
3689 assert_eq!(name_array.value(1), "Bob");
3690
3691 let age_array = person_array
3692 .column(1)
3693 .as_primitive::<arrow_array::types::Int32Type>();
3694 assert_eq!(age_array.value(0), 30);
3695 assert_eq!(age_array.value(1), 25);
3696 }
3697
3698 #[tokio::test]
3702 async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
3703 use arrow_array::{Array, Int32Array};
3704
3705 let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
3706 Field::new("col0", DataType::Int32, true),
3707 Field::new("col1", DataType::Int32, true),
3708 ]));
3709
3710 let schema = Arc::new(
3712 Schema::builder()
3713 .with_schema_id(1)
3714 .with_fields(vec![
3715 NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(),
3716 NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(),
3717 NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(),
3718 ])
3719 .build()
3720 .unwrap(),
3721 );
3722
3723 let tmp_dir = TempDir::new().unwrap();
3724 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3725 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3726
3727 let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3728 let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3729
3730 let to_write =
3731 RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap();
3732
3733 let props = WriterProperties::builder()
3734 .set_compression(Compression::SNAPPY)
3735 .build();
3736
3737 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3738 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3739 writer.write(&to_write).expect("Writing batch");
3740 writer.close().unwrap();
3741
3742 let reader = ArrowReaderBuilder::new(file_io).build();
3743
3744 let tasks = Box::pin(futures::stream::iter(
3745 vec![Ok(FileScanTask {
3746 start: 0,
3747 length: 0,
3748 record_count: None,
3749 data_file_path: format!("{table_location}/1.parquet"),
3750 data_file_format: DataFileFormat::Parquet,
3751 schema: schema.clone(),
3752 project_field_ids: vec![1, 5, 2],
3753 predicate: None,
3754 deletes: vec![],
3755 partition: None,
3756 partition_spec: None,
3757 name_mapping: None,
3758 })]
3759 .into_iter(),
3760 )) as FileScanTaskStream;
3761
3762 let result = reader
3763 .read(tasks)
3764 .unwrap()
3765 .try_collect::<Vec<RecordBatch>>()
3766 .await
3767 .unwrap();
3768
3769 assert_eq!(result.len(), 1);
3770 let batch = &result[0];
3771 assert_eq!(batch.num_rows(), 2);
3772 assert_eq!(batch.num_columns(), 3);
3773
3774 let result_col0 = batch
3775 .column(0)
3776 .as_primitive::<arrow_array::types::Int32Type>();
3777 assert_eq!(result_col0.value(0), 1);
3778 assert_eq!(result_col0.value(1), 2);
3779
3780 let result_newcol = batch
3782 .column(1)
3783 .as_primitive::<arrow_array::types::Int32Type>();
3784 assert_eq!(result_newcol.null_count(), 2);
3785 assert!(result_newcol.is_null(0));
3786 assert!(result_newcol.is_null(1));
3787
3788 let result_col1 = batch
3789 .column(2)
3790 .as_primitive::<arrow_array::types::Int32Type>();
3791 assert_eq!(result_col1.value(0), 10);
3792 assert_eq!(result_col1.value(1), 20);
3793 }
3794
3795 #[tokio::test]
3799 async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
3800 use arrow_array::{Float64Array, Int32Array};
3801
3802 let schema = Arc::new(
3804 Schema::builder()
3805 .with_schema_id(1)
3806 .with_fields(vec![
3807 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3808 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3809 NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double))
3810 .into(),
3811 ])
3812 .build()
3813 .unwrap(),
3814 );
3815
3816 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3817 Field::new("id", DataType::Int32, false),
3818 Field::new("name", DataType::Utf8, false),
3819 Field::new("value", DataType::Float64, false),
3820 ]));
3821
3822 let tmp_dir = TempDir::new().unwrap();
3823 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3824 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3825
3826 let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
3828 let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3829 let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef;
3830
3831 let to_write =
3832 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data])
3833 .unwrap();
3834
3835 let props = WriterProperties::builder()
3836 .set_compression(Compression::SNAPPY)
3837 .build();
3838
3839 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3840 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3841 writer.write(&to_write).expect("Writing batch");
3842 writer.close().unwrap();
3843
3844 let predicate = Reference::new("id").less_than(Datum::int(5));
3846
3847 let reader = ArrowReaderBuilder::new(file_io)
3849 .with_row_group_filtering_enabled(true)
3850 .with_row_selection_enabled(true)
3851 .build();
3852
3853 let tasks = Box::pin(futures::stream::iter(
3854 vec![Ok(FileScanTask {
3855 start: 0,
3856 length: 0,
3857 record_count: None,
3858 data_file_path: format!("{table_location}/1.parquet"),
3859 data_file_format: DataFileFormat::Parquet,
3860 schema: schema.clone(),
3861 project_field_ids: vec![1, 2, 3],
3862 predicate: Some(predicate.bind(schema, true).unwrap()),
3863 deletes: vec![],
3864 partition: None,
3865 partition_spec: None,
3866 name_mapping: None,
3867 })]
3868 .into_iter(),
3869 )) as FileScanTaskStream;
3870
3871 let result = reader
3873 .read(tasks)
3874 .unwrap()
3875 .try_collect::<Vec<RecordBatch>>()
3876 .await
3877 .unwrap();
3878
3879 assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
3881 }
3882
3883 #[tokio::test]
3928 async fn test_bucket_partitioning_reads_source_column_from_file() {
3929 use arrow_array::Int32Array;
3930
3931 use crate::spec::{Literal, PartitionSpec, Struct, Transform};
3932
3933 let schema = Arc::new(
3935 Schema::builder()
3936 .with_schema_id(0)
3937 .with_fields(vec![
3938 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3939 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3940 ])
3941 .build()
3942 .unwrap(),
3943 );
3944
3945 let partition_spec = Arc::new(
3947 PartitionSpec::builder(schema.clone())
3948 .with_spec_id(0)
3949 .add_partition_field("id", "id_bucket", Transform::Bucket(4))
3950 .unwrap()
3951 .build()
3952 .unwrap(),
3953 );
3954
3955 let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
3957
3958 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3960 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3961 PARQUET_FIELD_ID_META_KEY.to_string(),
3962 "1".to_string(),
3963 )])),
3964 Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
3965 PARQUET_FIELD_ID_META_KEY.to_string(),
3966 "2".to_string(),
3967 )])),
3968 ]));
3969
3970 let tmp_dir = TempDir::new().unwrap();
3972 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3973 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3974
3975 let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
3976 let name_data =
3977 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;
3978
3979 let to_write =
3980 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();
3981
3982 let props = WriterProperties::builder()
3983 .set_compression(Compression::SNAPPY)
3984 .build();
3985 let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
3986 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3987 writer.write(&to_write).expect("Writing batch");
3988 writer.close().unwrap();
3989
3990 let reader = ArrowReaderBuilder::new(file_io).build();
3992 let tasks = Box::pin(futures::stream::iter(
3993 vec![Ok(FileScanTask {
3994 start: 0,
3995 length: 0,
3996 record_count: None,
3997 data_file_path: format!("{table_location}/data.parquet"),
3998 data_file_format: DataFileFormat::Parquet,
3999 schema: schema.clone(),
4000 project_field_ids: vec![1, 2],
4001 predicate: None,
4002 deletes: vec![],
4003 partition: Some(partition_data),
4004 partition_spec: Some(partition_spec),
4005 name_mapping: None,
4006 })]
4007 .into_iter(),
4008 )) as FileScanTaskStream;
4009
4010 let result = reader
4011 .read(tasks)
4012 .unwrap()
4013 .try_collect::<Vec<RecordBatch>>()
4014 .await
4015 .unwrap();
4016
4017 assert_eq!(result.len(), 1);
4019 let batch = &result[0];
4020
4021 assert_eq!(batch.num_columns(), 2);
4022 assert_eq!(batch.num_rows(), 4);
4023
4024 let id_col = batch
4027 .column(0)
4028 .as_primitive::<arrow_array::types::Int32Type>();
4029 assert_eq!(id_col.value(0), 1);
4030 assert_eq!(id_col.value(1), 5);
4031 assert_eq!(id_col.value(2), 9);
4032 assert_eq!(id_col.value(3), 13);
4033
4034 let name_col = batch.column(1).as_string::<i32>();
4035 assert_eq!(name_col.value(0), "Alice");
4036 assert_eq!(name_col.value(1), "Bob");
4037 assert_eq!(name_col.value(2), "Charlie");
4038 assert_eq!(name_col.value(3), "Dave");
4039 }
4040}