1use std::collections::{HashMap, HashSet};
21use std::ops::Range;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
26use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
27use arrow_cast::cast::cast;
28use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
29use arrow_schema::{
30 ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
31};
32use arrow_string::like::starts_with;
33use bytes::Bytes;
34use fnv::FnvHashSet;
35use futures::future::BoxFuture;
36use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
37use parquet::arrow::arrow_reader::{
38 ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
39};
40use parquet::arrow::async_reader::AsyncFileReader;
41use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
42use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
43use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
44
45use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
46use crate::arrow::record_batch_transformer::RecordBatchTransformer;
47use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
48use crate::delete_vector::DeleteVector;
49use crate::error::Result;
50use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
51use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
52use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
53use crate::expr::{BoundPredicate, BoundReference};
54use crate::io::{FileIO, FileMetadata, FileRead};
55use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
56use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
57use crate::utils::available_parallelism;
58use crate::{Error, ErrorKind};
59
60pub struct ArrowReaderBuilder {
62 batch_size: Option<usize>,
63 file_io: FileIO,
64 concurrency_limit_data_files: usize,
65 row_group_filtering_enabled: bool,
66 row_selection_enabled: bool,
67}
68
69impl ArrowReaderBuilder {
70 pub(crate) fn new(file_io: FileIO) -> Self {
72 let num_cpus = available_parallelism().get();
73
74 ArrowReaderBuilder {
75 batch_size: None,
76 file_io,
77 concurrency_limit_data_files: num_cpus,
78 row_group_filtering_enabled: true,
79 row_selection_enabled: false,
80 }
81 }
82
83 pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
85 self.concurrency_limit_data_files = val;
86 self
87 }
88
89 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
92 self.batch_size = Some(batch_size);
93 self
94 }
95
96 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
98 self.row_group_filtering_enabled = row_group_filtering_enabled;
99 self
100 }
101
102 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
104 self.row_selection_enabled = row_selection_enabled;
105 self
106 }
107
108 pub fn build(self) -> ArrowReader {
110 ArrowReader {
111 batch_size: self.batch_size,
112 file_io: self.file_io.clone(),
113 delete_file_loader: CachingDeleteFileLoader::new(
114 self.file_io.clone(),
115 self.concurrency_limit_data_files,
116 ),
117 concurrency_limit_data_files: self.concurrency_limit_data_files,
118 row_group_filtering_enabled: self.row_group_filtering_enabled,
119 row_selection_enabled: self.row_selection_enabled,
120 }
121 }
122}
123
124#[derive(Clone)]
126pub struct ArrowReader {
127 batch_size: Option<usize>,
128 file_io: FileIO,
129 delete_file_loader: CachingDeleteFileLoader,
130
131 concurrency_limit_data_files: usize,
133
134 row_group_filtering_enabled: bool,
135 row_selection_enabled: bool,
136}
137
138impl ArrowReader {
139 pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
142 let file_io = self.file_io.clone();
143 let batch_size = self.batch_size;
144 let concurrency_limit_data_files = self.concurrency_limit_data_files;
145 let row_group_filtering_enabled = self.row_group_filtering_enabled;
146 let row_selection_enabled = self.row_selection_enabled;
147
148 let stream = tasks
149 .map_ok(move |task| {
150 let file_io = file_io.clone();
151
152 Self::process_file_scan_task(
153 task,
154 batch_size,
155 file_io,
156 self.delete_file_loader.clone(),
157 row_group_filtering_enabled,
158 row_selection_enabled,
159 )
160 })
161 .map_err(|err| {
162 Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
163 })
164 .try_buffer_unordered(concurrency_limit_data_files)
165 .try_flatten_unordered(concurrency_limit_data_files);
166
167 Ok(Box::pin(stream) as ArrowRecordBatchStream)
168 }
169
170 #[allow(clippy::too_many_arguments)]
171 async fn process_file_scan_task(
172 task: FileScanTask,
173 batch_size: Option<usize>,
174 file_io: FileIO,
175 delete_file_loader: CachingDeleteFileLoader,
176 row_group_filtering_enabled: bool,
177 row_selection_enabled: bool,
178 ) -> Result<ArrowRecordBatchStream> {
179 let should_load_page_index =
180 (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
181
182 let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, task.schema.clone());
183
184 let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder(
185 &task.data_file_path,
186 file_io.clone(),
187 should_load_page_index,
188 )
189 .await?;
190
191 let projection_mask = Self::get_arrow_projection_mask(
194 &task.project_field_ids,
195 &task.schema,
196 record_batch_stream_builder.parquet_schema(),
197 record_batch_stream_builder.schema(),
198 )?;
199 record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);
200
201 let mut record_batch_transformer =
205 RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());
206
207 if let Some(batch_size) = batch_size {
208 record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
209 }
210
211 let delete_filter = delete_filter_rx.await.unwrap()?;
212 let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
213
214 let final_predicate = match (&task.predicate, delete_predicate) {
219 (None, None) => None,
220 (Some(predicate), None) => Some(predicate.clone()),
221 (None, Some(ref predicate)) => Some(predicate.clone()),
222 (Some(filter_predicate), Some(delete_predicate)) => {
223 Some(filter_predicate.clone().and(delete_predicate))
224 }
225 };
226
227 let mut selected_row_group_indices = None;
242 let mut row_selection = None;
243
244 if let Some(predicate) = final_predicate {
245 let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
246 record_batch_stream_builder.parquet_schema(),
247 &predicate,
248 )?;
249
250 let row_filter = Self::get_row_filter(
251 &predicate,
252 record_batch_stream_builder.parquet_schema(),
253 &iceberg_field_ids,
254 &field_id_map,
255 )?;
256 record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
257
258 if row_group_filtering_enabled {
259 let result = Self::get_selected_row_group_indices(
260 &predicate,
261 record_batch_stream_builder.metadata(),
262 &field_id_map,
263 &task.schema,
264 )?;
265
266 selected_row_group_indices = Some(result);
267 }
268
269 if row_selection_enabled {
270 row_selection = Some(Self::get_row_selection_for_filter_predicate(
271 &predicate,
272 record_batch_stream_builder.metadata(),
273 &selected_row_group_indices,
274 &field_id_map,
275 &task.schema,
276 )?);
277 }
278 }
279
280 let positional_delete_indexes = delete_filter.get_delete_vector(&task);
281
282 if let Some(positional_delete_indexes) = positional_delete_indexes {
283 let delete_row_selection = {
284 let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
285
286 Self::build_deletes_row_selection(
287 record_batch_stream_builder.metadata().row_groups(),
288 &selected_row_group_indices,
289 &positional_delete_indexes,
290 )
291 }?;
292
293 row_selection = match row_selection {
296 None => Some(delete_row_selection),
297 Some(filter_row_selection) => {
298 Some(filter_row_selection.intersection(&delete_row_selection))
299 }
300 };
301 }
302
303 if let Some(row_selection) = row_selection {
304 record_batch_stream_builder =
305 record_batch_stream_builder.with_row_selection(row_selection);
306 }
307
308 if let Some(selected_row_group_indices) = selected_row_group_indices {
309 record_batch_stream_builder =
310 record_batch_stream_builder.with_row_groups(selected_row_group_indices);
311 }
312
313 let record_batch_stream =
316 record_batch_stream_builder
317 .build()?
318 .map(move |batch| match batch {
319 Ok(batch) => record_batch_transformer.process_record_batch(batch),
320 Err(err) => Err(err.into()),
321 });
322
323 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
324 }
325
326 pub(crate) async fn create_parquet_record_batch_stream_builder(
327 data_file_path: &str,
328 file_io: FileIO,
329 should_load_page_index: bool,
330 ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead + Sized>>> {
331 let parquet_file = file_io.new_input(data_file_path)?;
334 let (parquet_metadata, parquet_reader) =
335 try_join!(parquet_file.metadata(), parquet_file.reader())?;
336 let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader)
337 .with_preload_column_index(true)
338 .with_preload_offset_index(true)
339 .with_preload_page_index(should_load_page_index);
340
341 let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
343 parquet_file_reader,
344 ArrowReaderOptions::new(),
345 )
346 .await?;
347 Ok(record_batch_stream_builder)
348 }
349
350 fn build_deletes_row_selection(
356 row_group_metadata_list: &[RowGroupMetaData],
357 selected_row_groups: &Option<Vec<usize>>,
358 positional_deletes: &DeleteVector,
359 ) -> Result<RowSelection> {
360 let mut results: Vec<RowSelector> = Vec::new();
361 let mut selected_row_groups_idx = 0;
362 let mut current_row_group_base_idx: u64 = 0;
363 let mut delete_vector_iter = positional_deletes.iter();
364 let mut next_deleted_row_idx_opt = delete_vector_iter.next();
365
366 for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
367 let row_group_num_rows = row_group_metadata.num_rows() as u64;
368 let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
369
370 if let Some(selected_row_groups) = selected_row_groups {
372 if selected_row_groups_idx == selected_row_groups.len() {
374 break;
375 }
376
377 if idx == selected_row_groups[selected_row_groups_idx] {
378 selected_row_groups_idx += 1;
382 } else {
383 delete_vector_iter.advance_to(next_row_group_base_idx);
386 next_deleted_row_idx_opt = delete_vector_iter.next();
387
388 current_row_group_base_idx += row_group_num_rows;
391 continue;
392 }
393 }
394
395 let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
396 Some(next_deleted_row_idx) => {
397 if next_deleted_row_idx >= next_row_group_base_idx {
400 results.push(RowSelector::select(row_group_num_rows as usize));
401 continue;
402 }
403
404 next_deleted_row_idx
405 }
406
407 _ => {
409 results.push(RowSelector::select(row_group_num_rows as usize));
410 continue;
411 }
412 };
413
414 let mut current_idx = current_row_group_base_idx;
415 'chunks: while next_deleted_row_idx < next_row_group_base_idx {
416 if current_idx < next_deleted_row_idx {
418 let run_length = next_deleted_row_idx - current_idx;
419 results.push(RowSelector::select(run_length as usize));
420 current_idx += run_length;
421 }
422
423 let mut run_length = 0;
425 while next_deleted_row_idx == current_idx
426 && next_deleted_row_idx < next_row_group_base_idx
427 {
428 run_length += 1;
429 current_idx += 1;
430
431 next_deleted_row_idx_opt = delete_vector_iter.next();
432 next_deleted_row_idx = match next_deleted_row_idx_opt {
433 Some(next_deleted_row_idx) => next_deleted_row_idx,
434 _ => {
435 results.push(RowSelector::skip(run_length));
439 break 'chunks;
440 }
441 };
442 }
443 if run_length > 0 {
444 results.push(RowSelector::skip(run_length));
445 }
446 }
447
448 if current_idx < next_row_group_base_idx {
449 results.push(RowSelector::select(
450 (next_row_group_base_idx - current_idx) as usize,
451 ));
452 }
453
454 current_row_group_base_idx += row_group_num_rows;
455 }
456
457 Ok(results.into())
458 }
459
460 fn build_field_id_set_and_map(
461 parquet_schema: &SchemaDescriptor,
462 predicate: &BoundPredicate,
463 ) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
464 let mut collector = CollectFieldIdVisitor {
466 field_ids: HashSet::default(),
467 };
468 visit(&mut collector, predicate)?;
469
470 let iceberg_field_ids = collector.field_ids();
471 let field_id_map = build_field_id_map(parquet_schema)?;
472
473 Ok((iceberg_field_ids, field_id_map))
474 }
475
476 fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
479 match field.field_type.as_ref() {
480 Type::Primitive(_) => {
481 field_ids.push(field.id);
482 }
483 Type::Struct(struct_type) => {
484 for nested_field in struct_type.fields() {
485 Self::include_leaf_field_id(nested_field, field_ids);
486 }
487 }
488 Type::List(list_type) => {
489 Self::include_leaf_field_id(&list_type.element_field, field_ids);
490 }
491 Type::Map(map_type) => {
492 Self::include_leaf_field_id(&map_type.key_field, field_ids);
493 Self::include_leaf_field_id(&map_type.value_field, field_ids);
494 }
495 }
496 }
497
498 fn get_arrow_projection_mask(
499 field_ids: &[i32],
500 iceberg_schema_of_task: &Schema,
501 parquet_schema: &SchemaDescriptor,
502 arrow_schema: &ArrowSchemaRef,
503 ) -> Result<ProjectionMask> {
504 fn type_promotion_is_valid(
505 file_type: Option<&PrimitiveType>,
506 projected_type: Option<&PrimitiveType>,
507 ) -> bool {
508 match (file_type, projected_type) {
509 (Some(lhs), Some(rhs)) if lhs == rhs => true,
510 (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
511 (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
512 (
513 Some(PrimitiveType::Decimal {
514 precision: file_precision,
515 scale: file_scale,
516 }),
517 Some(PrimitiveType::Decimal {
518 precision: requested_precision,
519 scale: requested_scale,
520 }),
521 ) if requested_precision >= file_precision && file_scale == requested_scale => true,
522 (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
524 _ => false,
525 }
526 }
527
528 let mut leaf_field_ids = vec![];
529 for field_id in field_ids {
530 let field = iceberg_schema_of_task.field_by_id(*field_id);
531 if let Some(field) = field {
532 Self::include_leaf_field_id(field, &mut leaf_field_ids);
533 }
534 }
535
536 if leaf_field_ids.is_empty() {
537 Ok(ProjectionMask::all())
538 } else {
539 let mut column_map = HashMap::new();
541
542 let fields = arrow_schema.fields();
543 let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
546 let projected_arrow_schema = ArrowSchema::new_with_metadata(
547 fields.filter_leaves(|_, f| {
548 f.metadata()
549 .get(PARQUET_FIELD_ID_META_KEY)
550 .and_then(|field_id| i32::from_str(field_id).ok())
551 .is_some_and(|field_id| {
552 projected_fields.insert((*f).clone(), field_id);
553 leaf_field_ids.contains(&field_id)
554 })
555 }),
556 arrow_schema.metadata().clone(),
557 );
558 let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
559
560 fields.filter_leaves(|idx, field| {
561 let Some(field_id) = projected_fields.get(field).cloned() else {
562 return false;
563 };
564
565 let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
566 let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
567
568 if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
569 return false;
570 }
571
572 if !type_promotion_is_valid(
573 parquet_iceberg_field
574 .unwrap()
575 .field_type
576 .as_primitive_type(),
577 iceberg_field.unwrap().field_type.as_primitive_type(),
578 ) {
579 return false;
580 }
581
582 column_map.insert(field_id, idx);
583 true
584 });
585
586 if column_map.len() != leaf_field_ids.len() {
587 let missing_fields = leaf_field_ids
588 .iter()
589 .filter(|field_id| !column_map.contains_key(field_id))
590 .collect::<Vec<_>>();
591 return Err(Error::new(
592 ErrorKind::DataInvalid,
593 format!(
594 "Parquet schema {} and Iceberg schema {} do not match.",
595 iceberg_schema, iceberg_schema_of_task
596 ),
597 )
598 .with_context("column_map", format! {"{:?}", column_map})
599 .with_context("field_ids", format! {"{:?}", leaf_field_ids})
600 .with_context("missing_fields", format! {"{:?}", missing_fields}));
601 }
602
603 let mut indices = vec![];
604 for field_id in leaf_field_ids {
605 if let Some(col_idx) = column_map.get(&field_id) {
606 indices.push(*col_idx);
607 } else {
608 return Err(Error::new(
609 ErrorKind::DataInvalid,
610 format!("Field {} is not found in Parquet schema.", field_id),
611 ));
612 }
613 }
614 Ok(ProjectionMask::leaves(parquet_schema, indices))
615 }
616 }
617
618 fn get_row_filter(
619 predicates: &BoundPredicate,
620 parquet_schema: &SchemaDescriptor,
621 iceberg_field_ids: &HashSet<i32>,
622 field_id_map: &HashMap<i32, usize>,
623 ) -> Result<RowFilter> {
624 let mut column_indices = iceberg_field_ids
627 .iter()
628 .filter_map(|field_id| field_id_map.get(field_id).cloned())
629 .collect::<Vec<_>>();
630 column_indices.sort();
631
632 let mut converter = PredicateConverter {
634 parquet_schema,
635 column_map: field_id_map,
636 column_indices: &column_indices,
637 };
638
639 let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
642 let predicate_func = visit(&mut converter, predicates)?;
643 let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
644 Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
645 }
646
647 fn get_selected_row_group_indices(
648 predicate: &BoundPredicate,
649 parquet_metadata: &Arc<ParquetMetaData>,
650 field_id_map: &HashMap<i32, usize>,
651 snapshot_schema: &Schema,
652 ) -> Result<Vec<usize>> {
653 let row_groups_metadata = parquet_metadata.row_groups();
654 let mut results = Vec::with_capacity(row_groups_metadata.len());
655
656 for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
657 if RowGroupMetricsEvaluator::eval(
658 predicate,
659 row_group_metadata,
660 field_id_map,
661 snapshot_schema,
662 )? {
663 results.push(idx);
664 }
665 }
666
667 Ok(results)
668 }
669
670 fn get_row_selection_for_filter_predicate(
671 predicate: &BoundPredicate,
672 parquet_metadata: &Arc<ParquetMetaData>,
673 selected_row_groups: &Option<Vec<usize>>,
674 field_id_map: &HashMap<i32, usize>,
675 snapshot_schema: &Schema,
676 ) -> Result<RowSelection> {
677 let Some(column_index) = parquet_metadata.column_index() else {
678 return Err(Error::new(
679 ErrorKind::Unexpected,
680 "Parquet file metadata does not contain a column index",
681 ));
682 };
683
684 let Some(offset_index) = parquet_metadata.offset_index() else {
685 return Err(Error::new(
686 ErrorKind::Unexpected,
687 "Parquet file metadata does not contain an offset index",
688 ));
689 };
690
691 let mut selected_row_groups_idx = 0;
692
693 let page_index = column_index
694 .iter()
695 .enumerate()
696 .zip(offset_index)
697 .zip(parquet_metadata.row_groups());
698
699 let mut results = Vec::new();
700 for (((idx, column_index), offset_index), row_group_metadata) in page_index {
701 if let Some(selected_row_groups) = selected_row_groups {
702 if idx == selected_row_groups[selected_row_groups_idx] {
704 selected_row_groups_idx += 1;
705 } else {
706 continue;
707 }
708 }
709
710 let selections_for_page = PageIndexEvaluator::eval(
711 predicate,
712 column_index,
713 offset_index,
714 row_group_metadata,
715 field_id_map,
716 snapshot_schema,
717 )?;
718
719 results.push(selections_for_page);
720
721 if let Some(selected_row_groups) = selected_row_groups {
722 if selected_row_groups_idx == selected_row_groups.len() {
723 break;
724 }
725 }
726 }
727
728 Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
729 }
730}
731
732fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<HashMap<i32, usize>> {
734 let mut column_map = HashMap::new();
735 for (idx, field) in parquet_schema.columns().iter().enumerate() {
736 let field_type = field.self_type();
737 match field_type {
738 ParquetType::PrimitiveType { basic_info, .. } => {
739 if !basic_info.has_id() {
740 return Err(Error::new(
741 ErrorKind::DataInvalid,
742 format!(
743 "Leave column idx: {}, name: {}, type {:?} in schema doesn't have field id",
744 idx,
745 basic_info.name(),
746 field_type
747 ),
748 ));
749 }
750 column_map.insert(basic_info.id(), idx);
751 }
752 ParquetType::GroupType { .. } => {
753 return Err(Error::new(
754 ErrorKind::DataInvalid,
755 format!(
756 "Leave column in schema should be primitive type but got {:?}",
757 field_type
758 ),
759 ));
760 }
761 };
762 }
763
764 Ok(column_map)
765}
766
767struct CollectFieldIdVisitor {
769 field_ids: HashSet<i32>,
770}
771
772impl CollectFieldIdVisitor {
773 fn field_ids(self) -> HashSet<i32> {
774 self.field_ids
775 }
776}
777
778impl BoundPredicateVisitor for CollectFieldIdVisitor {
779 type T = ();
780
781 fn always_true(&mut self) -> Result<()> {
782 Ok(())
783 }
784
785 fn always_false(&mut self) -> Result<()> {
786 Ok(())
787 }
788
789 fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
790 Ok(())
791 }
792
793 fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
794 Ok(())
795 }
796
797 fn not(&mut self, _inner: ()) -> Result<()> {
798 Ok(())
799 }
800
801 fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
802 self.field_ids.insert(reference.field().id);
803 Ok(())
804 }
805
806 fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
807 self.field_ids.insert(reference.field().id);
808 Ok(())
809 }
810
811 fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
812 self.field_ids.insert(reference.field().id);
813 Ok(())
814 }
815
816 fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
817 self.field_ids.insert(reference.field().id);
818 Ok(())
819 }
820
821 fn less_than(
822 &mut self,
823 reference: &BoundReference,
824 _literal: &Datum,
825 _predicate: &BoundPredicate,
826 ) -> Result<()> {
827 self.field_ids.insert(reference.field().id);
828 Ok(())
829 }
830
831 fn less_than_or_eq(
832 &mut self,
833 reference: &BoundReference,
834 _literal: &Datum,
835 _predicate: &BoundPredicate,
836 ) -> Result<()> {
837 self.field_ids.insert(reference.field().id);
838 Ok(())
839 }
840
841 fn greater_than(
842 &mut self,
843 reference: &BoundReference,
844 _literal: &Datum,
845 _predicate: &BoundPredicate,
846 ) -> Result<()> {
847 self.field_ids.insert(reference.field().id);
848 Ok(())
849 }
850
851 fn greater_than_or_eq(
852 &mut self,
853 reference: &BoundReference,
854 _literal: &Datum,
855 _predicate: &BoundPredicate,
856 ) -> Result<()> {
857 self.field_ids.insert(reference.field().id);
858 Ok(())
859 }
860
861 fn eq(
862 &mut self,
863 reference: &BoundReference,
864 _literal: &Datum,
865 _predicate: &BoundPredicate,
866 ) -> Result<()> {
867 self.field_ids.insert(reference.field().id);
868 Ok(())
869 }
870
871 fn not_eq(
872 &mut self,
873 reference: &BoundReference,
874 _literal: &Datum,
875 _predicate: &BoundPredicate,
876 ) -> Result<()> {
877 self.field_ids.insert(reference.field().id);
878 Ok(())
879 }
880
881 fn starts_with(
882 &mut self,
883 reference: &BoundReference,
884 _literal: &Datum,
885 _predicate: &BoundPredicate,
886 ) -> Result<()> {
887 self.field_ids.insert(reference.field().id);
888 Ok(())
889 }
890
891 fn not_starts_with(
892 &mut self,
893 reference: &BoundReference,
894 _literal: &Datum,
895 _predicate: &BoundPredicate,
896 ) -> Result<()> {
897 self.field_ids.insert(reference.field().id);
898 Ok(())
899 }
900
901 fn r#in(
902 &mut self,
903 reference: &BoundReference,
904 _literals: &FnvHashSet<Datum>,
905 _predicate: &BoundPredicate,
906 ) -> Result<()> {
907 self.field_ids.insert(reference.field().id);
908 Ok(())
909 }
910
911 fn not_in(
912 &mut self,
913 reference: &BoundReference,
914 _literals: &FnvHashSet<Datum>,
915 _predicate: &BoundPredicate,
916 ) -> Result<()> {
917 self.field_ids.insert(reference.field().id);
918 Ok(())
919 }
920}
921
922struct PredicateConverter<'a> {
924 pub parquet_schema: &'a SchemaDescriptor,
926 pub column_map: &'a HashMap<i32, usize>,
928 pub column_indices: &'a Vec<usize>,
930}
931
932impl PredicateConverter<'_> {
933 fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
938 if let Some(column_idx) = self.column_map.get(&reference.field().id) {
940 if self.parquet_schema.get_column_root(*column_idx).is_group() {
941 return Err(Error::new(
942 ErrorKind::DataInvalid,
943 format!(
944 "Leave column `{}` in predicates isn't a root column in Parquet schema.",
945 reference.field().name
946 ),
947 ));
948 }
949
950 let index = self
952 .column_indices
953 .iter()
954 .position(|&idx| idx == *column_idx)
955 .ok_or(Error::new(
956 ErrorKind::DataInvalid,
957 format!(
958 "Leave column `{}` in predicates cannot be found in the required column indices.",
959 reference.field().name
960 ),
961 ))?;
962
963 Ok(Some(index))
964 } else {
965 Ok(None)
966 }
967 }
968
969 fn build_always_true(&self) -> Result<Box<PredicateResult>> {
971 Ok(Box::new(|batch| {
972 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
973 }))
974 }
975
976 fn build_always_false(&self) -> Result<Box<PredicateResult>> {
978 Ok(Box::new(|batch| {
979 Ok(BooleanArray::from(vec![false; batch.num_rows()]))
980 }))
981 }
982}
983
984fn project_column(
987 batch: &RecordBatch,
988 column_idx: usize,
989) -> std::result::Result<ArrayRef, ArrowError> {
990 let column = batch.column(column_idx);
991
992 match column.data_type() {
993 DataType::Struct(_) => Err(ArrowError::SchemaError(
994 "Does not support struct column yet.".to_string(),
995 )),
996 _ => Ok(column.clone()),
997 }
998}
999
1000type PredicateResult =
1001 dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
1002
1003impl BoundPredicateVisitor for PredicateConverter<'_> {
1004 type T = Box<PredicateResult>;
1005
1006 fn always_true(&mut self) -> Result<Box<PredicateResult>> {
1007 self.build_always_true()
1008 }
1009
1010 fn always_false(&mut self) -> Result<Box<PredicateResult>> {
1011 self.build_always_false()
1012 }
1013
1014 fn and(
1015 &mut self,
1016 mut lhs: Box<PredicateResult>,
1017 mut rhs: Box<PredicateResult>,
1018 ) -> Result<Box<PredicateResult>> {
1019 Ok(Box::new(move |batch| {
1020 let left = lhs(batch.clone())?;
1021 let right = rhs(batch)?;
1022 and_kleene(&left, &right)
1023 }))
1024 }
1025
1026 fn or(
1027 &mut self,
1028 mut lhs: Box<PredicateResult>,
1029 mut rhs: Box<PredicateResult>,
1030 ) -> Result<Box<PredicateResult>> {
1031 Ok(Box::new(move |batch| {
1032 let left = lhs(batch.clone())?;
1033 let right = rhs(batch)?;
1034 or_kleene(&left, &right)
1035 }))
1036 }
1037
1038 fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
1039 Ok(Box::new(move |batch| {
1040 let pred_ret = inner(batch)?;
1041 not(&pred_ret)
1042 }))
1043 }
1044
1045 fn is_null(
1046 &mut self,
1047 reference: &BoundReference,
1048 _predicate: &BoundPredicate,
1049 ) -> Result<Box<PredicateResult>> {
1050 if let Some(idx) = self.bound_reference(reference)? {
1051 Ok(Box::new(move |batch| {
1052 let column = project_column(&batch, idx)?;
1053 is_null(&column)
1054 }))
1055 } else {
1056 self.build_always_true()
1058 }
1059 }
1060
1061 fn not_null(
1062 &mut self,
1063 reference: &BoundReference,
1064 _predicate: &BoundPredicate,
1065 ) -> Result<Box<PredicateResult>> {
1066 if let Some(idx) = self.bound_reference(reference)? {
1067 Ok(Box::new(move |batch| {
1068 let column = project_column(&batch, idx)?;
1069 is_not_null(&column)
1070 }))
1071 } else {
1072 self.build_always_false()
1074 }
1075 }
1076
1077 fn is_nan(
1078 &mut self,
1079 reference: &BoundReference,
1080 _predicate: &BoundPredicate,
1081 ) -> Result<Box<PredicateResult>> {
1082 if self.bound_reference(reference)?.is_some() {
1083 self.build_always_true()
1084 } else {
1085 self.build_always_false()
1087 }
1088 }
1089
1090 fn not_nan(
1091 &mut self,
1092 reference: &BoundReference,
1093 _predicate: &BoundPredicate,
1094 ) -> Result<Box<PredicateResult>> {
1095 if self.bound_reference(reference)?.is_some() {
1096 self.build_always_false()
1097 } else {
1098 self.build_always_true()
1100 }
1101 }
1102
1103 fn less_than(
1104 &mut self,
1105 reference: &BoundReference,
1106 literal: &Datum,
1107 _predicate: &BoundPredicate,
1108 ) -> Result<Box<PredicateResult>> {
1109 if let Some(idx) = self.bound_reference(reference)? {
1110 let literal = get_arrow_datum(literal)?;
1111
1112 Ok(Box::new(move |batch| {
1113 let left = project_column(&batch, idx)?;
1114 let literal = try_cast_literal(&literal, left.data_type())?;
1115 lt(&left, literal.as_ref())
1116 }))
1117 } else {
1118 self.build_always_true()
1120 }
1121 }
1122
1123 fn less_than_or_eq(
1124 &mut self,
1125 reference: &BoundReference,
1126 literal: &Datum,
1127 _predicate: &BoundPredicate,
1128 ) -> Result<Box<PredicateResult>> {
1129 if let Some(idx) = self.bound_reference(reference)? {
1130 let literal = get_arrow_datum(literal)?;
1131
1132 Ok(Box::new(move |batch| {
1133 let left = project_column(&batch, idx)?;
1134 let literal = try_cast_literal(&literal, left.data_type())?;
1135 lt_eq(&left, literal.as_ref())
1136 }))
1137 } else {
1138 self.build_always_true()
1140 }
1141 }
1142
1143 fn greater_than(
1144 &mut self,
1145 reference: &BoundReference,
1146 literal: &Datum,
1147 _predicate: &BoundPredicate,
1148 ) -> Result<Box<PredicateResult>> {
1149 if let Some(idx) = self.bound_reference(reference)? {
1150 let literal = get_arrow_datum(literal)?;
1151
1152 Ok(Box::new(move |batch| {
1153 let left = project_column(&batch, idx)?;
1154 let literal = try_cast_literal(&literal, left.data_type())?;
1155 gt(&left, literal.as_ref())
1156 }))
1157 } else {
1158 self.build_always_false()
1160 }
1161 }
1162
1163 fn greater_than_or_eq(
1164 &mut self,
1165 reference: &BoundReference,
1166 literal: &Datum,
1167 _predicate: &BoundPredicate,
1168 ) -> Result<Box<PredicateResult>> {
1169 if let Some(idx) = self.bound_reference(reference)? {
1170 let literal = get_arrow_datum(literal)?;
1171
1172 Ok(Box::new(move |batch| {
1173 let left = project_column(&batch, idx)?;
1174 let literal = try_cast_literal(&literal, left.data_type())?;
1175 gt_eq(&left, literal.as_ref())
1176 }))
1177 } else {
1178 self.build_always_false()
1180 }
1181 }
1182
1183 fn eq(
1184 &mut self,
1185 reference: &BoundReference,
1186 literal: &Datum,
1187 _predicate: &BoundPredicate,
1188 ) -> Result<Box<PredicateResult>> {
1189 if let Some(idx) = self.bound_reference(reference)? {
1190 let literal = get_arrow_datum(literal)?;
1191
1192 Ok(Box::new(move |batch| {
1193 let left = project_column(&batch, idx)?;
1194 let literal = try_cast_literal(&literal, left.data_type())?;
1195 eq(&left, literal.as_ref())
1196 }))
1197 } else {
1198 self.build_always_false()
1200 }
1201 }
1202
1203 fn not_eq(
1204 &mut self,
1205 reference: &BoundReference,
1206 literal: &Datum,
1207 _predicate: &BoundPredicate,
1208 ) -> Result<Box<PredicateResult>> {
1209 if let Some(idx) = self.bound_reference(reference)? {
1210 let literal = get_arrow_datum(literal)?;
1211
1212 Ok(Box::new(move |batch| {
1213 let left = project_column(&batch, idx)?;
1214 let literal = try_cast_literal(&literal, left.data_type())?;
1215 neq(&left, literal.as_ref())
1216 }))
1217 } else {
1218 self.build_always_false()
1220 }
1221 }
1222
1223 fn starts_with(
1224 &mut self,
1225 reference: &BoundReference,
1226 literal: &Datum,
1227 _predicate: &BoundPredicate,
1228 ) -> Result<Box<PredicateResult>> {
1229 if let Some(idx) = self.bound_reference(reference)? {
1230 let literal = get_arrow_datum(literal)?;
1231
1232 Ok(Box::new(move |batch| {
1233 let left = project_column(&batch, idx)?;
1234 let literal = try_cast_literal(&literal, left.data_type())?;
1235 starts_with(&left, literal.as_ref())
1236 }))
1237 } else {
1238 self.build_always_false()
1240 }
1241 }
1242
1243 fn not_starts_with(
1244 &mut self,
1245 reference: &BoundReference,
1246 literal: &Datum,
1247 _predicate: &BoundPredicate,
1248 ) -> Result<Box<PredicateResult>> {
1249 if let Some(idx) = self.bound_reference(reference)? {
1250 let literal = get_arrow_datum(literal)?;
1251
1252 Ok(Box::new(move |batch| {
1253 let left = project_column(&batch, idx)?;
1254 let literal = try_cast_literal(&literal, left.data_type())?;
1255 not(&starts_with(&left, literal.as_ref())?)
1257 }))
1258 } else {
1259 self.build_always_true()
1261 }
1262 }
1263
1264 fn r#in(
1265 &mut self,
1266 reference: &BoundReference,
1267 literals: &FnvHashSet<Datum>,
1268 _predicate: &BoundPredicate,
1269 ) -> Result<Box<PredicateResult>> {
1270 if let Some(idx) = self.bound_reference(reference)? {
1271 let literals: Vec<_> = literals
1272 .iter()
1273 .map(|lit| get_arrow_datum(lit).unwrap())
1274 .collect();
1275
1276 Ok(Box::new(move |batch| {
1277 let left = project_column(&batch, idx)?;
1279
1280 let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
1281 for literal in &literals {
1282 let literal = try_cast_literal(literal, left.data_type())?;
1283 acc = or(&acc, &eq(&left, literal.as_ref())?)?
1284 }
1285
1286 Ok(acc)
1287 }))
1288 } else {
1289 self.build_always_false()
1291 }
1292 }
1293
1294 fn not_in(
1295 &mut self,
1296 reference: &BoundReference,
1297 literals: &FnvHashSet<Datum>,
1298 _predicate: &BoundPredicate,
1299 ) -> Result<Box<PredicateResult>> {
1300 if let Some(idx) = self.bound_reference(reference)? {
1301 let literals: Vec<_> = literals
1302 .iter()
1303 .map(|lit| get_arrow_datum(lit).unwrap())
1304 .collect();
1305
1306 Ok(Box::new(move |batch| {
1307 let left = project_column(&batch, idx)?;
1309 let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
1310 for literal in &literals {
1311 let literal = try_cast_literal(literal, left.data_type())?;
1312 acc = and(&acc, &neq(&left, literal.as_ref())?)?
1313 }
1314
1315 Ok(acc)
1316 }))
1317 } else {
1318 self.build_always_true()
1320 }
1321 }
1322}
1323
1324pub struct ArrowFileReader<R: FileRead> {
1326 meta: FileMetadata,
1327 preload_column_index: bool,
1328 preload_offset_index: bool,
1329 preload_page_index: bool,
1330 metadata_size_hint: Option<usize>,
1331 r: R,
1332}
1333
1334impl<R: FileRead> ArrowFileReader<R> {
1335 pub fn new(meta: FileMetadata, r: R) -> Self {
1337 Self {
1338 meta,
1339 preload_column_index: false,
1340 preload_offset_index: false,
1341 preload_page_index: false,
1342 metadata_size_hint: None,
1343 r,
1344 }
1345 }
1346
1347 pub fn with_preload_column_index(mut self, preload: bool) -> Self {
1349 self.preload_column_index = preload;
1350 self
1351 }
1352
1353 pub fn with_preload_offset_index(mut self, preload: bool) -> Self {
1355 self.preload_offset_index = preload;
1356 self
1357 }
1358
1359 pub fn with_preload_page_index(mut self, preload: bool) -> Self {
1361 self.preload_page_index = preload;
1362 self
1363 }
1364
1365 pub fn with_metadata_size_hint(mut self, hint: usize) -> Self {
1370 self.metadata_size_hint = Some(hint);
1371 self
1372 }
1373}
1374
1375impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
1376 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
1377 Box::pin(
1378 self.r
1379 .read(range.start..range.end)
1380 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
1381 )
1382 }
1383
1384 fn get_metadata(
1387 &mut self,
1388 _options: Option<&'_ ArrowReaderOptions>,
1389 ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
1390 async move {
1391 let reader = ParquetMetaDataReader::new()
1392 .with_prefetch_hint(self.metadata_size_hint)
1393 .with_column_indexes(self.preload_column_index)
1394 .with_page_indexes(self.preload_page_index)
1395 .with_offset_indexes(self.preload_offset_index);
1396 let size = self.meta.size;
1397 let meta = reader.load_and_finish(self, size).await?;
1398
1399 Ok(Arc::new(meta))
1400 }
1401 .boxed()
1402 }
1403}
1404
1405fn try_cast_literal(
1412 literal: &Arc<dyn ArrowDatum + Send + Sync>,
1413 column_type: &DataType,
1414) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
1415 let literal_array = literal.get().0;
1416
1417 if literal_array.data_type() == column_type {
1419 return Ok(Arc::clone(literal));
1420 }
1421
1422 let literal_array = cast(literal_array, column_type)?;
1423 Ok(Arc::new(Scalar::new(literal_array)))
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428 use std::collections::{HashMap, HashSet};
1429 use std::fs::File;
1430 use std::sync::Arc;
1431
1432 use arrow_array::cast::AsArray;
1433 use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
1434 use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1435 use futures::TryStreamExt;
1436 use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1437 use parquet::arrow::{ArrowWriter, ProjectionMask};
1438 use parquet::basic::Compression;
1439 use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1440 use parquet::file::properties::WriterProperties;
1441 use parquet::schema::parser::parse_message_type;
1442 use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1443 use roaring::RoaringTreemap;
1444 use tempfile::TempDir;
1445
1446 use crate::ErrorKind;
1447 use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1448 use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1449 use crate::delete_vector::DeleteVector;
1450 use crate::expr::visitors::bound_predicate_visitor::visit;
1451 use crate::expr::{Bind, Predicate, Reference};
1452 use crate::io::FileIO;
1453 use crate::scan::{FileScanTask, FileScanTaskStream};
1454 use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type};
1455
1456 fn table_schema_simple() -> SchemaRef {
1457 Arc::new(
1458 Schema::builder()
1459 .with_schema_id(1)
1460 .with_identifier_field_ids(vec![2])
1461 .with_fields(vec![
1462 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1463 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1464 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1465 NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
1466 ])
1467 .build()
1468 .unwrap(),
1469 )
1470 }
1471
1472 #[test]
1473 fn test_collect_field_id() {
1474 let schema = table_schema_simple();
1475 let expr = Reference::new("qux").is_null();
1476 let bound_expr = expr.bind(schema, true).unwrap();
1477
1478 let mut visitor = CollectFieldIdVisitor {
1479 field_ids: HashSet::default(),
1480 };
1481 visit(&mut visitor, &bound_expr).unwrap();
1482
1483 let mut expected = HashSet::default();
1484 expected.insert(4_i32);
1485
1486 assert_eq!(visitor.field_ids, expected);
1487 }
1488
1489 #[test]
1490 fn test_collect_field_id_with_and() {
1491 let schema = table_schema_simple();
1492 let expr = Reference::new("qux")
1493 .is_null()
1494 .and(Reference::new("baz").is_null());
1495 let bound_expr = expr.bind(schema, true).unwrap();
1496
1497 let mut visitor = CollectFieldIdVisitor {
1498 field_ids: HashSet::default(),
1499 };
1500 visit(&mut visitor, &bound_expr).unwrap();
1501
1502 let mut expected = HashSet::default();
1503 expected.insert(4_i32);
1504 expected.insert(3);
1505
1506 assert_eq!(visitor.field_ids, expected);
1507 }
1508
1509 #[test]
1510 fn test_collect_field_id_with_or() {
1511 let schema = table_schema_simple();
1512 let expr = Reference::new("qux")
1513 .is_null()
1514 .or(Reference::new("baz").is_null());
1515 let bound_expr = expr.bind(schema, true).unwrap();
1516
1517 let mut visitor = CollectFieldIdVisitor {
1518 field_ids: HashSet::default(),
1519 };
1520 visit(&mut visitor, &bound_expr).unwrap();
1521
1522 let mut expected = HashSet::default();
1523 expected.insert(4_i32);
1524 expected.insert(3);
1525
1526 assert_eq!(visitor.field_ids, expected);
1527 }
1528
1529 #[test]
1530 fn test_arrow_projection_mask() {
1531 let schema = Arc::new(
1532 Schema::builder()
1533 .with_schema_id(1)
1534 .with_identifier_field_ids(vec![1])
1535 .with_fields(vec![
1536 NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
1537 NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
1538 NestedField::optional(
1539 3,
1540 "c3",
1541 Type::Primitive(PrimitiveType::Decimal {
1542 precision: 38,
1543 scale: 3,
1544 }),
1545 )
1546 .into(),
1547 ])
1548 .build()
1549 .unwrap(),
1550 );
1551 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1552 Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
1553 PARQUET_FIELD_ID_META_KEY.to_string(),
1554 "1".to_string(),
1555 )])),
1556 Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
1558 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1559 ),
1560 Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
1562 PARQUET_FIELD_ID_META_KEY.to_string(),
1563 "3".to_string(),
1564 )])),
1565 ]));
1566
1567 let message_type = "
1568message schema {
1569 required binary c1 (STRING) = 1;
1570 optional int32 c2 (INTEGER(8,true)) = 2;
1571 optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
1572}
1573 ";
1574 let parquet_type = parse_message_type(message_type).expect("should parse schema");
1575 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
1576
1577 let err = ArrowReader::get_arrow_projection_mask(
1579 &[1, 2, 3],
1580 &schema,
1581 &parquet_schema,
1582 &arrow_schema,
1583 )
1584 .unwrap_err();
1585
1586 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1587 assert_eq!(
1588 err.to_string(),
1589 "DataInvalid => Unsupported Arrow data type: Duration(Microsecond)".to_string()
1590 );
1591
1592 let err = ArrowReader::get_arrow_projection_mask(
1594 &[1, 3],
1595 &schema,
1596 &parquet_schema,
1597 &arrow_schema,
1598 )
1599 .unwrap_err();
1600
1601 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1602 assert_eq!(
1603 err.to_string(),
1604 "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
1605 );
1606
1607 let mask =
1609 ArrowReader::get_arrow_projection_mask(&[1], &schema, &parquet_schema, &arrow_schema)
1610 .expect("Some ProjectionMask");
1611 assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
1612 }
1613
1614 #[tokio::test]
1615 async fn test_kleene_logic_or_behaviour() {
1616 let predicate = Reference::new("a")
1618 .is_null()
1619 .or(Reference::new("a").equal_to(Datum::string("foo")));
1620
1621 let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1623
1624 let expected = vec![None, Some("foo".to_string())];
1626
1627 let (file_io, schema, table_location, _temp_dir) =
1628 setup_kleene_logic(data_for_col_a, DataType::Utf8);
1629 let reader = ArrowReaderBuilder::new(file_io).build();
1630
1631 let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1632
1633 assert_eq!(result_data, expected);
1634 }
1635
1636 #[tokio::test]
1637 async fn test_kleene_logic_and_behaviour() {
1638 let predicate = Reference::new("a")
1640 .is_not_null()
1641 .and(Reference::new("a").not_equal_to(Datum::string("foo")));
1642
1643 let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1645
1646 let expected = vec![Some("bar".to_string())];
1648
1649 let (file_io, schema, table_location, _temp_dir) =
1650 setup_kleene_logic(data_for_col_a, DataType::Utf8);
1651 let reader = ArrowReaderBuilder::new(file_io).build();
1652
1653 let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1654
1655 assert_eq!(result_data, expected);
1656 }
1657
1658 #[tokio::test]
1659 async fn test_predicate_cast_literal() {
1660 let predicates = vec![
1661 (Reference::new("a").equal_to(Datum::string("foo")), vec![
1663 Some("foo".to_string()),
1664 ]),
1665 (
1667 Reference::new("a").not_equal_to(Datum::string("foo")),
1668 vec![Some("bar".to_string())],
1669 ),
1670 (Reference::new("a").starts_with(Datum::string("f")), vec![
1672 Some("foo".to_string()),
1673 ]),
1674 (
1676 Reference::new("a").not_starts_with(Datum::string("f")),
1677 vec![Some("bar".to_string())],
1678 ),
1679 (Reference::new("a").less_than(Datum::string("foo")), vec![
1681 Some("bar".to_string()),
1682 ]),
1683 (
1685 Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
1686 vec![Some("foo".to_string()), Some("bar".to_string())],
1687 ),
1688 (
1690 Reference::new("a").greater_than(Datum::string("bar")),
1691 vec![Some("foo".to_string())],
1692 ),
1693 (
1695 Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
1696 vec![Some("foo".to_string())],
1697 ),
1698 (
1700 Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
1701 vec![Some("foo".to_string())],
1702 ),
1703 (
1705 Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
1706 vec![Some("bar".to_string())],
1707 ),
1708 ];
1709
1710 let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
1712
1713 let (file_io, schema, table_location, _temp_dir) =
1714 setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
1715 let reader = ArrowReaderBuilder::new(file_io).build();
1716
1717 for (predicate, expected) in predicates {
1718 println!("testing predicate {predicate}");
1719 let result_data = test_perform_read(
1720 predicate.clone(),
1721 schema.clone(),
1722 table_location.clone(),
1723 reader.clone(),
1724 )
1725 .await;
1726
1727 assert_eq!(result_data, expected, "predicate={predicate}");
1728 }
1729 }
1730
1731 async fn test_perform_read(
1732 predicate: Predicate,
1733 schema: SchemaRef,
1734 table_location: String,
1735 reader: ArrowReader,
1736 ) -> Vec<Option<String>> {
1737 let tasks = Box::pin(futures::stream::iter(
1738 vec![Ok(FileScanTask {
1739 start: 0,
1740 length: 0,
1741 record_count: None,
1742 data_file_path: format!("{}/1.parquet", table_location),
1743 data_file_format: DataFileFormat::Parquet,
1744 schema: schema.clone(),
1745 project_field_ids: vec![1],
1746 predicate: Some(predicate.bind(schema, true).unwrap()),
1747 deletes: vec![],
1748 })]
1749 .into_iter(),
1750 )) as FileScanTaskStream;
1751
1752 let result = reader
1753 .read(tasks)
1754 .unwrap()
1755 .try_collect::<Vec<RecordBatch>>()
1756 .await
1757 .unwrap();
1758
1759 result[0].columns()[0]
1760 .as_string_opt::<i32>()
1761 .unwrap()
1762 .iter()
1763 .map(|v| v.map(ToOwned::to_owned))
1764 .collect::<Vec<_>>()
1765 }
1766
1767 fn setup_kleene_logic(
1768 data_for_col_a: Vec<Option<String>>,
1769 col_a_type: DataType,
1770 ) -> (FileIO, SchemaRef, String, TempDir) {
1771 let schema = Arc::new(
1772 Schema::builder()
1773 .with_schema_id(1)
1774 .with_fields(vec![
1775 NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
1776 ])
1777 .build()
1778 .unwrap(),
1779 );
1780
1781 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1782 Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
1783 PARQUET_FIELD_ID_META_KEY.to_string(),
1784 "1".to_string(),
1785 )])),
1786 ]));
1787
1788 let tmp_dir = TempDir::new().unwrap();
1789 let table_location = tmp_dir.path().to_str().unwrap().to_string();
1790
1791 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
1792
1793 let col = match col_a_type {
1794 DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
1795 DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
1796 _ => panic!("unexpected col_a_type"),
1797 };
1798
1799 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
1800
1801 let props = WriterProperties::builder()
1803 .set_compression(Compression::SNAPPY)
1804 .build();
1805
1806 let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
1807 let mut writer =
1808 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1809
1810 writer.write(&to_write).expect("Writing batch");
1811
1812 writer.close().unwrap();
1814
1815 (file_io, schema, table_location, tmp_dir)
1816 }
1817
1818 #[test]
1819 fn test_build_deletes_row_selection() {
1820 let schema_descr = get_test_schema_descr();
1821
1822 let mut columns = vec![];
1823 for ptr in schema_descr.columns() {
1824 let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
1825 columns.push(column);
1826 }
1827
1828 let row_groups_metadata = vec![
1829 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
1830 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
1831 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
1832 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
1833 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
1834 ];
1835
1836 let selected_row_groups = Some(vec![1, 3]);
1837
1838 let positional_deletes = RoaringTreemap::from_iter(&[
1845 1, 3, 4, 5, 998, 999, 1000, 1010, 1011, 1012, 1498, 1499, 1500, 1501, 1600, 1999, 2000, 2001, 2100, 2200, 2201, 2202, 2999, 3000, ]);
1863
1864 let positional_deletes = DeleteVector::new(positional_deletes);
1865
1866 let result = ArrowReader::build_deletes_row_selection(
1868 &row_groups_metadata,
1869 &selected_row_groups,
1870 &positional_deletes,
1871 )
1872 .unwrap();
1873
1874 let expected = RowSelection::from(vec![
1875 RowSelector::skip(1),
1876 RowSelector::select(9),
1877 RowSelector::skip(3),
1878 RowSelector::select(485),
1879 RowSelector::skip(4),
1880 RowSelector::select(98),
1881 RowSelector::skip(1),
1882 RowSelector::select(99),
1883 RowSelector::skip(3),
1884 RowSelector::select(796),
1885 RowSelector::skip(1),
1886 ]);
1887
1888 assert_eq!(result, expected);
1889
1890 let result = ArrowReader::build_deletes_row_selection(
1892 &row_groups_metadata,
1893 &None,
1894 &positional_deletes,
1895 )
1896 .unwrap();
1897
1898 let expected = RowSelection::from(vec![
1899 RowSelector::select(1),
1900 RowSelector::skip(1),
1901 RowSelector::select(1),
1902 RowSelector::skip(3),
1903 RowSelector::select(992),
1904 RowSelector::skip(3),
1905 RowSelector::select(9),
1906 RowSelector::skip(3),
1907 RowSelector::select(485),
1908 RowSelector::skip(4),
1909 RowSelector::select(98),
1910 RowSelector::skip(1),
1911 RowSelector::select(398),
1912 RowSelector::skip(3),
1913 RowSelector::select(98),
1914 RowSelector::skip(1),
1915 RowSelector::select(99),
1916 RowSelector::skip(3),
1917 RowSelector::select(796),
1918 RowSelector::skip(2),
1919 RowSelector::select(499),
1920 ]);
1921
1922 assert_eq!(result, expected);
1923 }
1924
1925 fn build_test_row_group_meta(
1926 schema_descr: SchemaDescPtr,
1927 columns: Vec<ColumnChunkMetaData>,
1928 num_rows: i64,
1929 ordinal: i16,
1930 ) -> RowGroupMetaData {
1931 RowGroupMetaData::builder(schema_descr.clone())
1932 .set_num_rows(num_rows)
1933 .set_total_byte_size(2000)
1934 .set_column_metadata(columns)
1935 .set_ordinal(ordinal)
1936 .build()
1937 .unwrap()
1938 }
1939
1940 fn get_test_schema_descr() -> SchemaDescPtr {
1941 use parquet::schema::types::Type as SchemaType;
1942
1943 let schema = SchemaType::group_type_builder("schema")
1944 .with_fields(vec![
1945 Arc::new(
1946 SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
1947 .build()
1948 .unwrap(),
1949 ),
1950 Arc::new(
1951 SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
1952 .build()
1953 .unwrap(),
1954 ),
1955 ])
1956 .build()
1957 .unwrap();
1958
1959 Arc::new(SchemaDescriptor::new(Arc::new(schema)))
1960 }
1961}