iceberg/arrow/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet file data reader
19
20use std::collections::{HashMap, HashSet};
21use std::ops::Range;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
26use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
27use arrow_cast::cast::cast;
28use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
29use arrow_schema::{
30    ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
31};
32use arrow_string::like::starts_with;
33use bytes::Bytes;
34use fnv::FnvHashSet;
35use futures::future::BoxFuture;
36use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
37use parquet::arrow::arrow_reader::{
38    ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
39};
40use parquet::arrow::async_reader::AsyncFileReader;
41use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
42use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
43use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
44
45use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
46use crate::arrow::record_batch_transformer::RecordBatchTransformer;
47use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
48use crate::delete_vector::DeleteVector;
49use crate::error::Result;
50use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
51use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
52use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
53use crate::expr::{BoundPredicate, BoundReference};
54use crate::io::{FileIO, FileMetadata, FileRead};
55use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
56use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
57use crate::utils::available_parallelism;
58use crate::{Error, ErrorKind};
59
60/// Builder to create ArrowReader
61pub struct ArrowReaderBuilder {
62    batch_size: Option<usize>,
63    file_io: FileIO,
64    concurrency_limit_data_files: usize,
65    row_group_filtering_enabled: bool,
66    row_selection_enabled: bool,
67}
68
69impl ArrowReaderBuilder {
70    /// Create a new ArrowReaderBuilder
71    pub(crate) fn new(file_io: FileIO) -> Self {
72        let num_cpus = available_parallelism().get();
73
74        ArrowReaderBuilder {
75            batch_size: None,
76            file_io,
77            concurrency_limit_data_files: num_cpus,
78            row_group_filtering_enabled: true,
79            row_selection_enabled: false,
80        }
81    }
82
83    /// Sets the max number of in flight data files that are being fetched
84    pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
85        self.concurrency_limit_data_files = val;
86        self
87    }
88
89    /// Sets the desired size of batches in the response
90    /// to something other than the default
91    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
92        self.batch_size = Some(batch_size);
93        self
94    }
95
96    /// Determines whether to enable row group filtering.
97    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
98        self.row_group_filtering_enabled = row_group_filtering_enabled;
99        self
100    }
101
102    /// Determines whether to enable row selection.
103    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
104        self.row_selection_enabled = row_selection_enabled;
105        self
106    }
107
108    /// Build the ArrowReader.
109    pub fn build(self) -> ArrowReader {
110        ArrowReader {
111            batch_size: self.batch_size,
112            file_io: self.file_io.clone(),
113            delete_file_loader: CachingDeleteFileLoader::new(
114                self.file_io.clone(),
115                self.concurrency_limit_data_files,
116            ),
117            concurrency_limit_data_files: self.concurrency_limit_data_files,
118            row_group_filtering_enabled: self.row_group_filtering_enabled,
119            row_selection_enabled: self.row_selection_enabled,
120        }
121    }
122}
123
124/// Reads data from Parquet files
125#[derive(Clone)]
126pub struct ArrowReader {
127    batch_size: Option<usize>,
128    file_io: FileIO,
129    delete_file_loader: CachingDeleteFileLoader,
130
131    /// the maximum number of data files that can be fetched at the same time
132    concurrency_limit_data_files: usize,
133
134    row_group_filtering_enabled: bool,
135    row_selection_enabled: bool,
136}
137
138impl ArrowReader {
139    /// Take a stream of FileScanTasks and reads all the files.
140    /// Returns a stream of Arrow RecordBatches containing the data from the files
141    pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
142        let file_io = self.file_io.clone();
143        let batch_size = self.batch_size;
144        let concurrency_limit_data_files = self.concurrency_limit_data_files;
145        let row_group_filtering_enabled = self.row_group_filtering_enabled;
146        let row_selection_enabled = self.row_selection_enabled;
147
148        let stream = tasks
149            .map_ok(move |task| {
150                let file_io = file_io.clone();
151
152                Self::process_file_scan_task(
153                    task,
154                    batch_size,
155                    file_io,
156                    self.delete_file_loader.clone(),
157                    row_group_filtering_enabled,
158                    row_selection_enabled,
159                )
160            })
161            .map_err(|err| {
162                Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
163            })
164            .try_buffer_unordered(concurrency_limit_data_files)
165            .try_flatten_unordered(concurrency_limit_data_files);
166
167        Ok(Box::pin(stream) as ArrowRecordBatchStream)
168    }
169
170    #[allow(clippy::too_many_arguments)]
171    async fn process_file_scan_task(
172        task: FileScanTask,
173        batch_size: Option<usize>,
174        file_io: FileIO,
175        delete_file_loader: CachingDeleteFileLoader,
176        row_group_filtering_enabled: bool,
177        row_selection_enabled: bool,
178    ) -> Result<ArrowRecordBatchStream> {
179        let should_load_page_index =
180            (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
181
182        let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, task.schema.clone());
183
184        let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder(
185            &task.data_file_path,
186            file_io.clone(),
187            should_load_page_index,
188        )
189        .await?;
190
191        // Create a projection mask for the batch stream to select which columns in the
192        // Parquet file that we want in the response
193        let projection_mask = Self::get_arrow_projection_mask(
194            &task.project_field_ids,
195            &task.schema,
196            record_batch_stream_builder.parquet_schema(),
197            record_batch_stream_builder.schema(),
198        )?;
199        record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);
200
201        // RecordBatchTransformer performs any transformations required on the RecordBatches
202        // that come back from the file, such as type promotion, default column insertion
203        // and column re-ordering
204        let mut record_batch_transformer =
205            RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());
206
207        if let Some(batch_size) = batch_size {
208            record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
209        }
210
211        let delete_filter = delete_filter_rx.await.unwrap()?;
212        let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
213
214        // In addition to the optional predicate supplied in the `FileScanTask`,
215        // we also have an optional predicate resulting from equality delete files.
216        // If both are present, we logical-AND them together to form a single filter
217        // predicate that we can pass to the `RecordBatchStreamBuilder`.
218        let final_predicate = match (&task.predicate, delete_predicate) {
219            (None, None) => None,
220            (Some(predicate), None) => Some(predicate.clone()),
221            (None, Some(ref predicate)) => Some(predicate.clone()),
222            (Some(filter_predicate), Some(delete_predicate)) => {
223                Some(filter_predicate.clone().and(delete_predicate))
224            }
225        };
226
227        // There are two possible sources both for potential lists of selected RowGroup indices,
228        // and for `RowSelection`s.
229        // Selected RowGroup index lists can come from two sources:
230        //   * When there are equality delete files that are applicable;
231        //   * When there is a scan predicate and row_group_filtering_enabled = true.
232        // `RowSelection`s can be created in either or both of the following cases:
233        //   * When there are positional delete files that are applicable;
234        //   * When there is a scan predicate and row_selection_enabled = true
235        // Note that, in the former case we only perform row group filtering when
236        // there is a scan predicate AND row_group_filtering_enabled = true,
237        // but we perform row selection filtering if there are applicable
238        // equality delete files OR (there is a scan predicate AND row_selection_enabled),
239        // since the only implemented method of applying positional deletes is
240        // by using a `RowSelection`.
241        let mut selected_row_group_indices = None;
242        let mut row_selection = None;
243
244        if let Some(predicate) = final_predicate {
245            let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
246                record_batch_stream_builder.parquet_schema(),
247                &predicate,
248            )?;
249
250            let row_filter = Self::get_row_filter(
251                &predicate,
252                record_batch_stream_builder.parquet_schema(),
253                &iceberg_field_ids,
254                &field_id_map,
255            )?;
256            record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
257
258            if row_group_filtering_enabled {
259                let result = Self::get_selected_row_group_indices(
260                    &predicate,
261                    record_batch_stream_builder.metadata(),
262                    &field_id_map,
263                    &task.schema,
264                )?;
265
266                selected_row_group_indices = Some(result);
267            }
268
269            if row_selection_enabled {
270                row_selection = Some(Self::get_row_selection_for_filter_predicate(
271                    &predicate,
272                    record_batch_stream_builder.metadata(),
273                    &selected_row_group_indices,
274                    &field_id_map,
275                    &task.schema,
276                )?);
277            }
278        }
279
280        let positional_delete_indexes = delete_filter.get_delete_vector(&task);
281
282        if let Some(positional_delete_indexes) = positional_delete_indexes {
283            let delete_row_selection = {
284                let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
285
286                Self::build_deletes_row_selection(
287                    record_batch_stream_builder.metadata().row_groups(),
288                    &selected_row_group_indices,
289                    &positional_delete_indexes,
290                )
291            }?;
292
293            // merge the row selection from the delete files with the row selection
294            // from the filter predicate, if there is one from the filter predicate
295            row_selection = match row_selection {
296                None => Some(delete_row_selection),
297                Some(filter_row_selection) => {
298                    Some(filter_row_selection.intersection(&delete_row_selection))
299                }
300            };
301        }
302
303        if let Some(row_selection) = row_selection {
304            record_batch_stream_builder =
305                record_batch_stream_builder.with_row_selection(row_selection);
306        }
307
308        if let Some(selected_row_group_indices) = selected_row_group_indices {
309            record_batch_stream_builder =
310                record_batch_stream_builder.with_row_groups(selected_row_group_indices);
311        }
312
313        // Build the batch stream and send all the RecordBatches that it generates
314        // to the requester.
315        let record_batch_stream =
316            record_batch_stream_builder
317                .build()?
318                .map(move |batch| match batch {
319                    Ok(batch) => record_batch_transformer.process_record_batch(batch),
320                    Err(err) => Err(err.into()),
321                });
322
323        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
324    }
325
326    pub(crate) async fn create_parquet_record_batch_stream_builder(
327        data_file_path: &str,
328        file_io: FileIO,
329        should_load_page_index: bool,
330    ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead + Sized>>> {
331        // Get the metadata for the Parquet file we need to read and build
332        // a reader for the data within
333        let parquet_file = file_io.new_input(data_file_path)?;
334        let (parquet_metadata, parquet_reader) =
335            try_join!(parquet_file.metadata(), parquet_file.reader())?;
336        let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader)
337            .with_preload_column_index(true)
338            .with_preload_offset_index(true)
339            .with_preload_page_index(should_load_page_index);
340
341        // Create the record batch stream builder, which wraps the parquet file reader
342        let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
343            parquet_file_reader,
344            ArrowReaderOptions::new(),
345        )
346        .await?;
347        Ok(record_batch_stream_builder)
348    }
349
350    /// computes a `RowSelection` from positional delete indices.
351    ///
352    /// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
353    /// as having been deleted by a positional delete, taking into account any row groups that have
354    /// been skipped entirely by the filter predicate
355    fn build_deletes_row_selection(
356        row_group_metadata_list: &[RowGroupMetaData],
357        selected_row_groups: &Option<Vec<usize>>,
358        positional_deletes: &DeleteVector,
359    ) -> Result<RowSelection> {
360        let mut results: Vec<RowSelector> = Vec::new();
361        let mut selected_row_groups_idx = 0;
362        let mut current_row_group_base_idx: u64 = 0;
363        let mut delete_vector_iter = positional_deletes.iter();
364        let mut next_deleted_row_idx_opt = delete_vector_iter.next();
365
366        for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
367            let row_group_num_rows = row_group_metadata.num_rows() as u64;
368            let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
369
370            // if row group selection is enabled,
371            if let Some(selected_row_groups) = selected_row_groups {
372                // if we've consumed all the selected row groups, we're done
373                if selected_row_groups_idx == selected_row_groups.len() {
374                    break;
375                }
376
377                if idx == selected_row_groups[selected_row_groups_idx] {
378                    // we're in a selected row group. Increment selected_row_groups_idx
379                    // so that next time around the for loop we're looking for the next
380                    // selected row group
381                    selected_row_groups_idx += 1;
382                } else {
383                    // remove any positional deletes from the skipped page so that
384                    // `positional.deletes.min()` can be used
385                    delete_vector_iter.advance_to(next_row_group_base_idx);
386                    next_deleted_row_idx_opt = delete_vector_iter.next();
387
388                    // still increment the current page base index but then skip to the next row group
389                    // in the file
390                    current_row_group_base_idx += row_group_num_rows;
391                    continue;
392                }
393            }
394
395            let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
396                Some(next_deleted_row_idx) => {
397                    // if the index of the next deleted row is beyond this row group, add a selection for
398                    // the remainder of this row group and skip to the next row group
399                    if next_deleted_row_idx >= next_row_group_base_idx {
400                        results.push(RowSelector::select(row_group_num_rows as usize));
401                        continue;
402                    }
403
404                    next_deleted_row_idx
405                }
406
407                // If there are no more pos deletes, add a selector for the entirety of this row group.
408                _ => {
409                    results.push(RowSelector::select(row_group_num_rows as usize));
410                    continue;
411                }
412            };
413
414            let mut current_idx = current_row_group_base_idx;
415            'chunks: while next_deleted_row_idx < next_row_group_base_idx {
416                // `select` all rows that precede the next delete index
417                if current_idx < next_deleted_row_idx {
418                    let run_length = next_deleted_row_idx - current_idx;
419                    results.push(RowSelector::select(run_length as usize));
420                    current_idx += run_length;
421                }
422
423                // `skip` all consecutive deleted rows in the current row group
424                let mut run_length = 0;
425                while next_deleted_row_idx == current_idx
426                    && next_deleted_row_idx < next_row_group_base_idx
427                {
428                    run_length += 1;
429                    current_idx += 1;
430
431                    next_deleted_row_idx_opt = delete_vector_iter.next();
432                    next_deleted_row_idx = match next_deleted_row_idx_opt {
433                        Some(next_deleted_row_idx) => next_deleted_row_idx,
434                        _ => {
435                            // We've processed the final positional delete.
436                            // Conclude the skip and then break so that we select the remaining
437                            // rows in the row group and move on to the next row group
438                            results.push(RowSelector::skip(run_length));
439                            break 'chunks;
440                        }
441                    };
442                }
443                if run_length > 0 {
444                    results.push(RowSelector::skip(run_length));
445                }
446            }
447
448            if current_idx < next_row_group_base_idx {
449                results.push(RowSelector::select(
450                    (next_row_group_base_idx - current_idx) as usize,
451                ));
452            }
453
454            current_row_group_base_idx += row_group_num_rows;
455        }
456
457        Ok(results.into())
458    }
459
460    fn build_field_id_set_and_map(
461        parquet_schema: &SchemaDescriptor,
462        predicate: &BoundPredicate,
463    ) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
464        // Collects all Iceberg field IDs referenced in the filter predicate
465        let mut collector = CollectFieldIdVisitor {
466            field_ids: HashSet::default(),
467        };
468        visit(&mut collector, predicate)?;
469
470        let iceberg_field_ids = collector.field_ids();
471        let field_id_map = build_field_id_map(parquet_schema)?;
472
473        Ok((iceberg_field_ids, field_id_map))
474    }
475
476    /// Insert the leaf field id into the field_ids using for projection.
477    /// For nested type, it will recursively insert the leaf field id.
478    fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
479        match field.field_type.as_ref() {
480            Type::Primitive(_) => {
481                field_ids.push(field.id);
482            }
483            Type::Struct(struct_type) => {
484                for nested_field in struct_type.fields() {
485                    Self::include_leaf_field_id(nested_field, field_ids);
486                }
487            }
488            Type::List(list_type) => {
489                Self::include_leaf_field_id(&list_type.element_field, field_ids);
490            }
491            Type::Map(map_type) => {
492                Self::include_leaf_field_id(&map_type.key_field, field_ids);
493                Self::include_leaf_field_id(&map_type.value_field, field_ids);
494            }
495        }
496    }
497
498    fn get_arrow_projection_mask(
499        field_ids: &[i32],
500        iceberg_schema_of_task: &Schema,
501        parquet_schema: &SchemaDescriptor,
502        arrow_schema: &ArrowSchemaRef,
503    ) -> Result<ProjectionMask> {
504        fn type_promotion_is_valid(
505            file_type: Option<&PrimitiveType>,
506            projected_type: Option<&PrimitiveType>,
507        ) -> bool {
508            match (file_type, projected_type) {
509                (Some(lhs), Some(rhs)) if lhs == rhs => true,
510                (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
511                (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
512                (
513                    Some(PrimitiveType::Decimal {
514                        precision: file_precision,
515                        scale: file_scale,
516                    }),
517                    Some(PrimitiveType::Decimal {
518                        precision: requested_precision,
519                        scale: requested_scale,
520                    }),
521                ) if requested_precision >= file_precision && file_scale == requested_scale => true,
522                // Uuid will be store as Fixed(16) in parquet file, so the read back type will be Fixed(16).
523                (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
524                _ => false,
525            }
526        }
527
528        let mut leaf_field_ids = vec![];
529        for field_id in field_ids {
530            let field = iceberg_schema_of_task.field_by_id(*field_id);
531            if let Some(field) = field {
532                Self::include_leaf_field_id(field, &mut leaf_field_ids);
533            }
534        }
535
536        if leaf_field_ids.is_empty() {
537            Ok(ProjectionMask::all())
538        } else {
539            // Build the map between field id and column index in Parquet schema.
540            let mut column_map = HashMap::new();
541
542            let fields = arrow_schema.fields();
543            // Pre-project only the fields that have been selected, possibly avoiding converting
544            // some Arrow types that are not yet supported.
545            let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
546            let projected_arrow_schema = ArrowSchema::new_with_metadata(
547                fields.filter_leaves(|_, f| {
548                    f.metadata()
549                        .get(PARQUET_FIELD_ID_META_KEY)
550                        .and_then(|field_id| i32::from_str(field_id).ok())
551                        .is_some_and(|field_id| {
552                            projected_fields.insert((*f).clone(), field_id);
553                            leaf_field_ids.contains(&field_id)
554                        })
555                }),
556                arrow_schema.metadata().clone(),
557            );
558            let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
559
560            fields.filter_leaves(|idx, field| {
561                let Some(field_id) = projected_fields.get(field).cloned() else {
562                    return false;
563                };
564
565                let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
566                let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
567
568                if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
569                    return false;
570                }
571
572                if !type_promotion_is_valid(
573                    parquet_iceberg_field
574                        .unwrap()
575                        .field_type
576                        .as_primitive_type(),
577                    iceberg_field.unwrap().field_type.as_primitive_type(),
578                ) {
579                    return false;
580                }
581
582                column_map.insert(field_id, idx);
583                true
584            });
585
586            if column_map.len() != leaf_field_ids.len() {
587                let missing_fields = leaf_field_ids
588                    .iter()
589                    .filter(|field_id| !column_map.contains_key(field_id))
590                    .collect::<Vec<_>>();
591                return Err(Error::new(
592                    ErrorKind::DataInvalid,
593                    format!(
594                        "Parquet schema {} and Iceberg schema {} do not match.",
595                        iceberg_schema, iceberg_schema_of_task
596                    ),
597                )
598                .with_context("column_map", format! {"{:?}", column_map})
599                .with_context("field_ids", format! {"{:?}", leaf_field_ids})
600                .with_context("missing_fields", format! {"{:?}", missing_fields}));
601            }
602
603            let mut indices = vec![];
604            for field_id in leaf_field_ids {
605                if let Some(col_idx) = column_map.get(&field_id) {
606                    indices.push(*col_idx);
607                } else {
608                    return Err(Error::new(
609                        ErrorKind::DataInvalid,
610                        format!("Field {} is not found in Parquet schema.", field_id),
611                    ));
612                }
613            }
614            Ok(ProjectionMask::leaves(parquet_schema, indices))
615        }
616    }
617
618    fn get_row_filter(
619        predicates: &BoundPredicate,
620        parquet_schema: &SchemaDescriptor,
621        iceberg_field_ids: &HashSet<i32>,
622        field_id_map: &HashMap<i32, usize>,
623    ) -> Result<RowFilter> {
624        // Collect Parquet column indices from field ids.
625        // If the field id is not found in Parquet schema, it will be ignored due to schema evolution.
626        let mut column_indices = iceberg_field_ids
627            .iter()
628            .filter_map(|field_id| field_id_map.get(field_id).cloned())
629            .collect::<Vec<_>>();
630        column_indices.sort();
631
632        // The converter that converts `BoundPredicates` to `ArrowPredicates`
633        let mut converter = PredicateConverter {
634            parquet_schema,
635            column_map: field_id_map,
636            column_indices: &column_indices,
637        };
638
639        // After collecting required leaf column indices used in the predicate,
640        // creates the projection mask for the Arrow predicates.
641        let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
642        let predicate_func = visit(&mut converter, predicates)?;
643        let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
644        Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
645    }
646
647    fn get_selected_row_group_indices(
648        predicate: &BoundPredicate,
649        parquet_metadata: &Arc<ParquetMetaData>,
650        field_id_map: &HashMap<i32, usize>,
651        snapshot_schema: &Schema,
652    ) -> Result<Vec<usize>> {
653        let row_groups_metadata = parquet_metadata.row_groups();
654        let mut results = Vec::with_capacity(row_groups_metadata.len());
655
656        for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
657            if RowGroupMetricsEvaluator::eval(
658                predicate,
659                row_group_metadata,
660                field_id_map,
661                snapshot_schema,
662            )? {
663                results.push(idx);
664            }
665        }
666
667        Ok(results)
668    }
669
670    fn get_row_selection_for_filter_predicate(
671        predicate: &BoundPredicate,
672        parquet_metadata: &Arc<ParquetMetaData>,
673        selected_row_groups: &Option<Vec<usize>>,
674        field_id_map: &HashMap<i32, usize>,
675        snapshot_schema: &Schema,
676    ) -> Result<RowSelection> {
677        let Some(column_index) = parquet_metadata.column_index() else {
678            return Err(Error::new(
679                ErrorKind::Unexpected,
680                "Parquet file metadata does not contain a column index",
681            ));
682        };
683
684        let Some(offset_index) = parquet_metadata.offset_index() else {
685            return Err(Error::new(
686                ErrorKind::Unexpected,
687                "Parquet file metadata does not contain an offset index",
688            ));
689        };
690
691        let mut selected_row_groups_idx = 0;
692
693        let page_index = column_index
694            .iter()
695            .enumerate()
696            .zip(offset_index)
697            .zip(parquet_metadata.row_groups());
698
699        let mut results = Vec::new();
700        for (((idx, column_index), offset_index), row_group_metadata) in page_index {
701            if let Some(selected_row_groups) = selected_row_groups {
702                // skip row groups that aren't present in selected_row_groups
703                if idx == selected_row_groups[selected_row_groups_idx] {
704                    selected_row_groups_idx += 1;
705                } else {
706                    continue;
707                }
708            }
709
710            let selections_for_page = PageIndexEvaluator::eval(
711                predicate,
712                column_index,
713                offset_index,
714                row_group_metadata,
715                field_id_map,
716                snapshot_schema,
717            )?;
718
719            results.push(selections_for_page);
720
721            if let Some(selected_row_groups) = selected_row_groups {
722                if selected_row_groups_idx == selected_row_groups.len() {
723                    break;
724                }
725            }
726        }
727
728        Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
729    }
730}
731
732/// Build the map of parquet field id to Parquet column index in the schema.
733fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<HashMap<i32, usize>> {
734    let mut column_map = HashMap::new();
735    for (idx, field) in parquet_schema.columns().iter().enumerate() {
736        let field_type = field.self_type();
737        match field_type {
738            ParquetType::PrimitiveType { basic_info, .. } => {
739                if !basic_info.has_id() {
740                    return Err(Error::new(
741                        ErrorKind::DataInvalid,
742                        format!(
743                            "Leave column idx: {}, name: {}, type {:?} in schema doesn't have field id",
744                            idx,
745                            basic_info.name(),
746                            field_type
747                        ),
748                    ));
749                }
750                column_map.insert(basic_info.id(), idx);
751            }
752            ParquetType::GroupType { .. } => {
753                return Err(Error::new(
754                    ErrorKind::DataInvalid,
755                    format!(
756                        "Leave column in schema should be primitive type but got {:?}",
757                        field_type
758                    ),
759                ));
760            }
761        };
762    }
763
764    Ok(column_map)
765}
766
767/// A visitor to collect field ids from bound predicates.
768struct CollectFieldIdVisitor {
769    field_ids: HashSet<i32>,
770}
771
772impl CollectFieldIdVisitor {
773    fn field_ids(self) -> HashSet<i32> {
774        self.field_ids
775    }
776}
777
778impl BoundPredicateVisitor for CollectFieldIdVisitor {
779    type T = ();
780
781    fn always_true(&mut self) -> Result<()> {
782        Ok(())
783    }
784
785    fn always_false(&mut self) -> Result<()> {
786        Ok(())
787    }
788
789    fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
790        Ok(())
791    }
792
793    fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
794        Ok(())
795    }
796
797    fn not(&mut self, _inner: ()) -> Result<()> {
798        Ok(())
799    }
800
801    fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
802        self.field_ids.insert(reference.field().id);
803        Ok(())
804    }
805
806    fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
807        self.field_ids.insert(reference.field().id);
808        Ok(())
809    }
810
811    fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
812        self.field_ids.insert(reference.field().id);
813        Ok(())
814    }
815
816    fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
817        self.field_ids.insert(reference.field().id);
818        Ok(())
819    }
820
821    fn less_than(
822        &mut self,
823        reference: &BoundReference,
824        _literal: &Datum,
825        _predicate: &BoundPredicate,
826    ) -> Result<()> {
827        self.field_ids.insert(reference.field().id);
828        Ok(())
829    }
830
831    fn less_than_or_eq(
832        &mut self,
833        reference: &BoundReference,
834        _literal: &Datum,
835        _predicate: &BoundPredicate,
836    ) -> Result<()> {
837        self.field_ids.insert(reference.field().id);
838        Ok(())
839    }
840
841    fn greater_than(
842        &mut self,
843        reference: &BoundReference,
844        _literal: &Datum,
845        _predicate: &BoundPredicate,
846    ) -> Result<()> {
847        self.field_ids.insert(reference.field().id);
848        Ok(())
849    }
850
851    fn greater_than_or_eq(
852        &mut self,
853        reference: &BoundReference,
854        _literal: &Datum,
855        _predicate: &BoundPredicate,
856    ) -> Result<()> {
857        self.field_ids.insert(reference.field().id);
858        Ok(())
859    }
860
861    fn eq(
862        &mut self,
863        reference: &BoundReference,
864        _literal: &Datum,
865        _predicate: &BoundPredicate,
866    ) -> Result<()> {
867        self.field_ids.insert(reference.field().id);
868        Ok(())
869    }
870
871    fn not_eq(
872        &mut self,
873        reference: &BoundReference,
874        _literal: &Datum,
875        _predicate: &BoundPredicate,
876    ) -> Result<()> {
877        self.field_ids.insert(reference.field().id);
878        Ok(())
879    }
880
881    fn starts_with(
882        &mut self,
883        reference: &BoundReference,
884        _literal: &Datum,
885        _predicate: &BoundPredicate,
886    ) -> Result<()> {
887        self.field_ids.insert(reference.field().id);
888        Ok(())
889    }
890
891    fn not_starts_with(
892        &mut self,
893        reference: &BoundReference,
894        _literal: &Datum,
895        _predicate: &BoundPredicate,
896    ) -> Result<()> {
897        self.field_ids.insert(reference.field().id);
898        Ok(())
899    }
900
901    fn r#in(
902        &mut self,
903        reference: &BoundReference,
904        _literals: &FnvHashSet<Datum>,
905        _predicate: &BoundPredicate,
906    ) -> Result<()> {
907        self.field_ids.insert(reference.field().id);
908        Ok(())
909    }
910
911    fn not_in(
912        &mut self,
913        reference: &BoundReference,
914        _literals: &FnvHashSet<Datum>,
915        _predicate: &BoundPredicate,
916    ) -> Result<()> {
917        self.field_ids.insert(reference.field().id);
918        Ok(())
919    }
920}
921
922/// A visitor to convert Iceberg bound predicates to Arrow predicates.
923struct PredicateConverter<'a> {
924    /// The Parquet schema descriptor.
925    pub parquet_schema: &'a SchemaDescriptor,
926    /// The map between field id and leaf column index in Parquet schema.
927    pub column_map: &'a HashMap<i32, usize>,
928    /// The required column indices in Parquet schema for the predicates.
929    pub column_indices: &'a Vec<usize>,
930}
931
932impl PredicateConverter<'_> {
933    /// When visiting a bound reference, we return index of the leaf column in the
934    /// required column indices which is used to project the column in the record batch.
935    /// Return None if the field id is not found in the column map, which is possible
936    /// due to schema evolution.
937    fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
938        // The leaf column's index in Parquet schema.
939        if let Some(column_idx) = self.column_map.get(&reference.field().id) {
940            if self.parquet_schema.get_column_root(*column_idx).is_group() {
941                return Err(Error::new(
942                    ErrorKind::DataInvalid,
943                    format!(
944                        "Leave column `{}` in predicates isn't a root column in Parquet schema.",
945                        reference.field().name
946                    ),
947                ));
948            }
949
950            // The leaf column's index in the required column indices.
951            let index = self
952                .column_indices
953                .iter()
954                .position(|&idx| idx == *column_idx)
955                .ok_or(Error::new(
956                    ErrorKind::DataInvalid,
957                    format!(
958                "Leave column `{}` in predicates cannot be found in the required column indices.",
959                reference.field().name
960            ),
961                ))?;
962
963            Ok(Some(index))
964        } else {
965            Ok(None)
966        }
967    }
968
969    /// Build an Arrow predicate that always returns true.
970    fn build_always_true(&self) -> Result<Box<PredicateResult>> {
971        Ok(Box::new(|batch| {
972            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
973        }))
974    }
975
976    /// Build an Arrow predicate that always returns false.
977    fn build_always_false(&self) -> Result<Box<PredicateResult>> {
978        Ok(Box::new(|batch| {
979            Ok(BooleanArray::from(vec![false; batch.num_rows()]))
980        }))
981    }
982}
983
984/// Gets the leaf column from the record batch for the required column index. Only
985/// supports top-level columns for now.
986fn project_column(
987    batch: &RecordBatch,
988    column_idx: usize,
989) -> std::result::Result<ArrayRef, ArrowError> {
990    let column = batch.column(column_idx);
991
992    match column.data_type() {
993        DataType::Struct(_) => Err(ArrowError::SchemaError(
994            "Does not support struct column yet.".to_string(),
995        )),
996        _ => Ok(column.clone()),
997    }
998}
999
1000type PredicateResult =
1001    dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
1002
1003impl BoundPredicateVisitor for PredicateConverter<'_> {
1004    type T = Box<PredicateResult>;
1005
1006    fn always_true(&mut self) -> Result<Box<PredicateResult>> {
1007        self.build_always_true()
1008    }
1009
1010    fn always_false(&mut self) -> Result<Box<PredicateResult>> {
1011        self.build_always_false()
1012    }
1013
1014    fn and(
1015        &mut self,
1016        mut lhs: Box<PredicateResult>,
1017        mut rhs: Box<PredicateResult>,
1018    ) -> Result<Box<PredicateResult>> {
1019        Ok(Box::new(move |batch| {
1020            let left = lhs(batch.clone())?;
1021            let right = rhs(batch)?;
1022            and_kleene(&left, &right)
1023        }))
1024    }
1025
1026    fn or(
1027        &mut self,
1028        mut lhs: Box<PredicateResult>,
1029        mut rhs: Box<PredicateResult>,
1030    ) -> Result<Box<PredicateResult>> {
1031        Ok(Box::new(move |batch| {
1032            let left = lhs(batch.clone())?;
1033            let right = rhs(batch)?;
1034            or_kleene(&left, &right)
1035        }))
1036    }
1037
1038    fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
1039        Ok(Box::new(move |batch| {
1040            let pred_ret = inner(batch)?;
1041            not(&pred_ret)
1042        }))
1043    }
1044
1045    fn is_null(
1046        &mut self,
1047        reference: &BoundReference,
1048        _predicate: &BoundPredicate,
1049    ) -> Result<Box<PredicateResult>> {
1050        if let Some(idx) = self.bound_reference(reference)? {
1051            Ok(Box::new(move |batch| {
1052                let column = project_column(&batch, idx)?;
1053                is_null(&column)
1054            }))
1055        } else {
1056            // A missing column, treating it as null.
1057            self.build_always_true()
1058        }
1059    }
1060
1061    fn not_null(
1062        &mut self,
1063        reference: &BoundReference,
1064        _predicate: &BoundPredicate,
1065    ) -> Result<Box<PredicateResult>> {
1066        if let Some(idx) = self.bound_reference(reference)? {
1067            Ok(Box::new(move |batch| {
1068                let column = project_column(&batch, idx)?;
1069                is_not_null(&column)
1070            }))
1071        } else {
1072            // A missing column, treating it as null.
1073            self.build_always_false()
1074        }
1075    }
1076
1077    fn is_nan(
1078        &mut self,
1079        reference: &BoundReference,
1080        _predicate: &BoundPredicate,
1081    ) -> Result<Box<PredicateResult>> {
1082        if self.bound_reference(reference)?.is_some() {
1083            self.build_always_true()
1084        } else {
1085            // A missing column, treating it as null.
1086            self.build_always_false()
1087        }
1088    }
1089
1090    fn not_nan(
1091        &mut self,
1092        reference: &BoundReference,
1093        _predicate: &BoundPredicate,
1094    ) -> Result<Box<PredicateResult>> {
1095        if self.bound_reference(reference)?.is_some() {
1096            self.build_always_false()
1097        } else {
1098            // A missing column, treating it as null.
1099            self.build_always_true()
1100        }
1101    }
1102
1103    fn less_than(
1104        &mut self,
1105        reference: &BoundReference,
1106        literal: &Datum,
1107        _predicate: &BoundPredicate,
1108    ) -> Result<Box<PredicateResult>> {
1109        if let Some(idx) = self.bound_reference(reference)? {
1110            let literal = get_arrow_datum(literal)?;
1111
1112            Ok(Box::new(move |batch| {
1113                let left = project_column(&batch, idx)?;
1114                let literal = try_cast_literal(&literal, left.data_type())?;
1115                lt(&left, literal.as_ref())
1116            }))
1117        } else {
1118            // A missing column, treating it as null.
1119            self.build_always_true()
1120        }
1121    }
1122
1123    fn less_than_or_eq(
1124        &mut self,
1125        reference: &BoundReference,
1126        literal: &Datum,
1127        _predicate: &BoundPredicate,
1128    ) -> Result<Box<PredicateResult>> {
1129        if let Some(idx) = self.bound_reference(reference)? {
1130            let literal = get_arrow_datum(literal)?;
1131
1132            Ok(Box::new(move |batch| {
1133                let left = project_column(&batch, idx)?;
1134                let literal = try_cast_literal(&literal, left.data_type())?;
1135                lt_eq(&left, literal.as_ref())
1136            }))
1137        } else {
1138            // A missing column, treating it as null.
1139            self.build_always_true()
1140        }
1141    }
1142
1143    fn greater_than(
1144        &mut self,
1145        reference: &BoundReference,
1146        literal: &Datum,
1147        _predicate: &BoundPredicate,
1148    ) -> Result<Box<PredicateResult>> {
1149        if let Some(idx) = self.bound_reference(reference)? {
1150            let literal = get_arrow_datum(literal)?;
1151
1152            Ok(Box::new(move |batch| {
1153                let left = project_column(&batch, idx)?;
1154                let literal = try_cast_literal(&literal, left.data_type())?;
1155                gt(&left, literal.as_ref())
1156            }))
1157        } else {
1158            // A missing column, treating it as null.
1159            self.build_always_false()
1160        }
1161    }
1162
1163    fn greater_than_or_eq(
1164        &mut self,
1165        reference: &BoundReference,
1166        literal: &Datum,
1167        _predicate: &BoundPredicate,
1168    ) -> Result<Box<PredicateResult>> {
1169        if let Some(idx) = self.bound_reference(reference)? {
1170            let literal = get_arrow_datum(literal)?;
1171
1172            Ok(Box::new(move |batch| {
1173                let left = project_column(&batch, idx)?;
1174                let literal = try_cast_literal(&literal, left.data_type())?;
1175                gt_eq(&left, literal.as_ref())
1176            }))
1177        } else {
1178            // A missing column, treating it as null.
1179            self.build_always_false()
1180        }
1181    }
1182
1183    fn eq(
1184        &mut self,
1185        reference: &BoundReference,
1186        literal: &Datum,
1187        _predicate: &BoundPredicate,
1188    ) -> Result<Box<PredicateResult>> {
1189        if let Some(idx) = self.bound_reference(reference)? {
1190            let literal = get_arrow_datum(literal)?;
1191
1192            Ok(Box::new(move |batch| {
1193                let left = project_column(&batch, idx)?;
1194                let literal = try_cast_literal(&literal, left.data_type())?;
1195                eq(&left, literal.as_ref())
1196            }))
1197        } else {
1198            // A missing column, treating it as null.
1199            self.build_always_false()
1200        }
1201    }
1202
1203    fn not_eq(
1204        &mut self,
1205        reference: &BoundReference,
1206        literal: &Datum,
1207        _predicate: &BoundPredicate,
1208    ) -> Result<Box<PredicateResult>> {
1209        if let Some(idx) = self.bound_reference(reference)? {
1210            let literal = get_arrow_datum(literal)?;
1211
1212            Ok(Box::new(move |batch| {
1213                let left = project_column(&batch, idx)?;
1214                let literal = try_cast_literal(&literal, left.data_type())?;
1215                neq(&left, literal.as_ref())
1216            }))
1217        } else {
1218            // A missing column, treating it as null.
1219            self.build_always_false()
1220        }
1221    }
1222
1223    fn starts_with(
1224        &mut self,
1225        reference: &BoundReference,
1226        literal: &Datum,
1227        _predicate: &BoundPredicate,
1228    ) -> Result<Box<PredicateResult>> {
1229        if let Some(idx) = self.bound_reference(reference)? {
1230            let literal = get_arrow_datum(literal)?;
1231
1232            Ok(Box::new(move |batch| {
1233                let left = project_column(&batch, idx)?;
1234                let literal = try_cast_literal(&literal, left.data_type())?;
1235                starts_with(&left, literal.as_ref())
1236            }))
1237        } else {
1238            // A missing column, treating it as null.
1239            self.build_always_false()
1240        }
1241    }
1242
1243    fn not_starts_with(
1244        &mut self,
1245        reference: &BoundReference,
1246        literal: &Datum,
1247        _predicate: &BoundPredicate,
1248    ) -> Result<Box<PredicateResult>> {
1249        if let Some(idx) = self.bound_reference(reference)? {
1250            let literal = get_arrow_datum(literal)?;
1251
1252            Ok(Box::new(move |batch| {
1253                let left = project_column(&batch, idx)?;
1254                let literal = try_cast_literal(&literal, left.data_type())?;
1255                // update here if arrow ever adds a native not_starts_with
1256                not(&starts_with(&left, literal.as_ref())?)
1257            }))
1258        } else {
1259            // A missing column, treating it as null.
1260            self.build_always_true()
1261        }
1262    }
1263
1264    fn r#in(
1265        &mut self,
1266        reference: &BoundReference,
1267        literals: &FnvHashSet<Datum>,
1268        _predicate: &BoundPredicate,
1269    ) -> Result<Box<PredicateResult>> {
1270        if let Some(idx) = self.bound_reference(reference)? {
1271            let literals: Vec<_> = literals
1272                .iter()
1273                .map(|lit| get_arrow_datum(lit).unwrap())
1274                .collect();
1275
1276            Ok(Box::new(move |batch| {
1277                // update this if arrow ever adds a native is_in kernel
1278                let left = project_column(&batch, idx)?;
1279
1280                let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
1281                for literal in &literals {
1282                    let literal = try_cast_literal(literal, left.data_type())?;
1283                    acc = or(&acc, &eq(&left, literal.as_ref())?)?
1284                }
1285
1286                Ok(acc)
1287            }))
1288        } else {
1289            // A missing column, treating it as null.
1290            self.build_always_false()
1291        }
1292    }
1293
1294    fn not_in(
1295        &mut self,
1296        reference: &BoundReference,
1297        literals: &FnvHashSet<Datum>,
1298        _predicate: &BoundPredicate,
1299    ) -> Result<Box<PredicateResult>> {
1300        if let Some(idx) = self.bound_reference(reference)? {
1301            let literals: Vec<_> = literals
1302                .iter()
1303                .map(|lit| get_arrow_datum(lit).unwrap())
1304                .collect();
1305
1306            Ok(Box::new(move |batch| {
1307                // update this if arrow ever adds a native not_in kernel
1308                let left = project_column(&batch, idx)?;
1309                let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
1310                for literal in &literals {
1311                    let literal = try_cast_literal(literal, left.data_type())?;
1312                    acc = and(&acc, &neq(&left, literal.as_ref())?)?
1313                }
1314
1315                Ok(acc)
1316            }))
1317        } else {
1318            // A missing column, treating it as null.
1319            self.build_always_true()
1320        }
1321    }
1322}
1323
1324/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
1325pub struct ArrowFileReader<R: FileRead> {
1326    meta: FileMetadata,
1327    preload_column_index: bool,
1328    preload_offset_index: bool,
1329    preload_page_index: bool,
1330    metadata_size_hint: Option<usize>,
1331    r: R,
1332}
1333
1334impl<R: FileRead> ArrowFileReader<R> {
1335    /// Create a new ArrowFileReader
1336    pub fn new(meta: FileMetadata, r: R) -> Self {
1337        Self {
1338            meta,
1339            preload_column_index: false,
1340            preload_offset_index: false,
1341            preload_page_index: false,
1342            metadata_size_hint: None,
1343            r,
1344        }
1345    }
1346
1347    /// Enable or disable preloading of the column index
1348    pub fn with_preload_column_index(mut self, preload: bool) -> Self {
1349        self.preload_column_index = preload;
1350        self
1351    }
1352
1353    /// Enable or disable preloading of the offset index
1354    pub fn with_preload_offset_index(mut self, preload: bool) -> Self {
1355        self.preload_offset_index = preload;
1356        self
1357    }
1358
1359    /// Enable or disable preloading of the page index
1360    pub fn with_preload_page_index(mut self, preload: bool) -> Self {
1361        self.preload_page_index = preload;
1362        self
1363    }
1364
1365    /// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata
1366    ///
1367    /// This hint can help reduce the number of fetch requests. For more details see the
1368    /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint).
1369    pub fn with_metadata_size_hint(mut self, hint: usize) -> Self {
1370        self.metadata_size_hint = Some(hint);
1371        self
1372    }
1373}
1374
1375impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
1376    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
1377        Box::pin(
1378            self.r
1379                .read(range.start..range.end)
1380                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
1381        )
1382    }
1383
1384    // TODO: currently we don't respect `ArrowReaderOptions` cause it don't expose any method to access the option field
1385    // we will fix it after `v55.1.0` is released in https://github.com/apache/arrow-rs/issues/7393
1386    fn get_metadata(
1387        &mut self,
1388        _options: Option<&'_ ArrowReaderOptions>,
1389    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
1390        async move {
1391            let reader = ParquetMetaDataReader::new()
1392                .with_prefetch_hint(self.metadata_size_hint)
1393                .with_column_indexes(self.preload_column_index)
1394                .with_page_indexes(self.preload_page_index)
1395                .with_offset_indexes(self.preload_offset_index);
1396            let size = self.meta.size;
1397            let meta = reader.load_and_finish(self, size).await?;
1398
1399            Ok(Arc::new(meta))
1400        }
1401        .boxed()
1402    }
1403}
1404
1405/// The Arrow type of an array that the Parquet reader reads may not match the exact Arrow type
1406/// that Iceberg uses for literals - but they are effectively the same logical type,
1407/// i.e. LargeUtf8 and Utf8 or Utf8View and Utf8 or Utf8View and LargeUtf8.
1408///
1409/// The Arrow compute kernels that we use must match the type exactly, so first cast the literal
1410/// into the type of the batch we read from Parquet before sending it to the compute kernel.
1411fn try_cast_literal(
1412    literal: &Arc<dyn ArrowDatum + Send + Sync>,
1413    column_type: &DataType,
1414) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
1415    let literal_array = literal.get().0;
1416
1417    // No cast required
1418    if literal_array.data_type() == column_type {
1419        return Ok(Arc::clone(literal));
1420    }
1421
1422    let literal_array = cast(literal_array, column_type)?;
1423    Ok(Arc::new(Scalar::new(literal_array)))
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428    use std::collections::{HashMap, HashSet};
1429    use std::fs::File;
1430    use std::sync::Arc;
1431
1432    use arrow_array::cast::AsArray;
1433    use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
1434    use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1435    use futures::TryStreamExt;
1436    use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1437    use parquet::arrow::{ArrowWriter, ProjectionMask};
1438    use parquet::basic::Compression;
1439    use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1440    use parquet::file::properties::WriterProperties;
1441    use parquet::schema::parser::parse_message_type;
1442    use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1443    use roaring::RoaringTreemap;
1444    use tempfile::TempDir;
1445
1446    use crate::ErrorKind;
1447    use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1448    use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1449    use crate::delete_vector::DeleteVector;
1450    use crate::expr::visitors::bound_predicate_visitor::visit;
1451    use crate::expr::{Bind, Predicate, Reference};
1452    use crate::io::FileIO;
1453    use crate::scan::{FileScanTask, FileScanTaskStream};
1454    use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type};
1455
1456    fn table_schema_simple() -> SchemaRef {
1457        Arc::new(
1458            Schema::builder()
1459                .with_schema_id(1)
1460                .with_identifier_field_ids(vec![2])
1461                .with_fields(vec![
1462                    NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1463                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1464                    NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1465                    NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
1466                ])
1467                .build()
1468                .unwrap(),
1469        )
1470    }
1471
1472    #[test]
1473    fn test_collect_field_id() {
1474        let schema = table_schema_simple();
1475        let expr = Reference::new("qux").is_null();
1476        let bound_expr = expr.bind(schema, true).unwrap();
1477
1478        let mut visitor = CollectFieldIdVisitor {
1479            field_ids: HashSet::default(),
1480        };
1481        visit(&mut visitor, &bound_expr).unwrap();
1482
1483        let mut expected = HashSet::default();
1484        expected.insert(4_i32);
1485
1486        assert_eq!(visitor.field_ids, expected);
1487    }
1488
1489    #[test]
1490    fn test_collect_field_id_with_and() {
1491        let schema = table_schema_simple();
1492        let expr = Reference::new("qux")
1493            .is_null()
1494            .and(Reference::new("baz").is_null());
1495        let bound_expr = expr.bind(schema, true).unwrap();
1496
1497        let mut visitor = CollectFieldIdVisitor {
1498            field_ids: HashSet::default(),
1499        };
1500        visit(&mut visitor, &bound_expr).unwrap();
1501
1502        let mut expected = HashSet::default();
1503        expected.insert(4_i32);
1504        expected.insert(3);
1505
1506        assert_eq!(visitor.field_ids, expected);
1507    }
1508
1509    #[test]
1510    fn test_collect_field_id_with_or() {
1511        let schema = table_schema_simple();
1512        let expr = Reference::new("qux")
1513            .is_null()
1514            .or(Reference::new("baz").is_null());
1515        let bound_expr = expr.bind(schema, true).unwrap();
1516
1517        let mut visitor = CollectFieldIdVisitor {
1518            field_ids: HashSet::default(),
1519        };
1520        visit(&mut visitor, &bound_expr).unwrap();
1521
1522        let mut expected = HashSet::default();
1523        expected.insert(4_i32);
1524        expected.insert(3);
1525
1526        assert_eq!(visitor.field_ids, expected);
1527    }
1528
1529    #[test]
1530    fn test_arrow_projection_mask() {
1531        let schema = Arc::new(
1532            Schema::builder()
1533                .with_schema_id(1)
1534                .with_identifier_field_ids(vec![1])
1535                .with_fields(vec![
1536                    NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
1537                    NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
1538                    NestedField::optional(
1539                        3,
1540                        "c3",
1541                        Type::Primitive(PrimitiveType::Decimal {
1542                            precision: 38,
1543                            scale: 3,
1544                        }),
1545                    )
1546                    .into(),
1547                ])
1548                .build()
1549                .unwrap(),
1550        );
1551        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1552            Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
1553                PARQUET_FIELD_ID_META_KEY.to_string(),
1554                "1".to_string(),
1555            )])),
1556            // Type not supported
1557            Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
1558                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1559            ),
1560            // Precision is beyond the supported range
1561            Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
1562                PARQUET_FIELD_ID_META_KEY.to_string(),
1563                "3".to_string(),
1564            )])),
1565        ]));
1566
1567        let message_type = "
1568message schema {
1569  required binary c1 (STRING) = 1;
1570  optional int32 c2 (INTEGER(8,true)) = 2;
1571  optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
1572}
1573    ";
1574        let parquet_type = parse_message_type(message_type).expect("should parse schema");
1575        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
1576
1577        // Try projecting the fields c2 and c3 with the unsupported data types
1578        let err = ArrowReader::get_arrow_projection_mask(
1579            &[1, 2, 3],
1580            &schema,
1581            &parquet_schema,
1582            &arrow_schema,
1583        )
1584        .unwrap_err();
1585
1586        assert_eq!(err.kind(), ErrorKind::DataInvalid);
1587        assert_eq!(
1588            err.to_string(),
1589            "DataInvalid => Unsupported Arrow data type: Duration(Microsecond)".to_string()
1590        );
1591
1592        // Omitting field c2, we still get an error due to c3 being selected
1593        let err = ArrowReader::get_arrow_projection_mask(
1594            &[1, 3],
1595            &schema,
1596            &parquet_schema,
1597            &arrow_schema,
1598        )
1599        .unwrap_err();
1600
1601        assert_eq!(err.kind(), ErrorKind::DataInvalid);
1602        assert_eq!(
1603            err.to_string(),
1604            "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
1605        );
1606
1607        // Finally avoid selecting fields with unsupported data types
1608        let mask =
1609            ArrowReader::get_arrow_projection_mask(&[1], &schema, &parquet_schema, &arrow_schema)
1610                .expect("Some ProjectionMask");
1611        assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
1612    }
1613
1614    #[tokio::test]
1615    async fn test_kleene_logic_or_behaviour() {
1616        // a IS NULL OR a = 'foo'
1617        let predicate = Reference::new("a")
1618            .is_null()
1619            .or(Reference::new("a").equal_to(Datum::string("foo")));
1620
1621        // Table data: [NULL, "foo", "bar"]
1622        let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1623
1624        // Expected: [NULL, "foo"].
1625        let expected = vec![None, Some("foo".to_string())];
1626
1627        let (file_io, schema, table_location, _temp_dir) =
1628            setup_kleene_logic(data_for_col_a, DataType::Utf8);
1629        let reader = ArrowReaderBuilder::new(file_io).build();
1630
1631        let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1632
1633        assert_eq!(result_data, expected);
1634    }
1635
1636    #[tokio::test]
1637    async fn test_kleene_logic_and_behaviour() {
1638        // a IS NOT NULL AND a != 'foo'
1639        let predicate = Reference::new("a")
1640            .is_not_null()
1641            .and(Reference::new("a").not_equal_to(Datum::string("foo")));
1642
1643        // Table data: [NULL, "foo", "bar"]
1644        let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1645
1646        // Expected: ["bar"].
1647        let expected = vec![Some("bar".to_string())];
1648
1649        let (file_io, schema, table_location, _temp_dir) =
1650            setup_kleene_logic(data_for_col_a, DataType::Utf8);
1651        let reader = ArrowReaderBuilder::new(file_io).build();
1652
1653        let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1654
1655        assert_eq!(result_data, expected);
1656    }
1657
1658    #[tokio::test]
1659    async fn test_predicate_cast_literal() {
1660        let predicates = vec![
1661            // a == 'foo'
1662            (Reference::new("a").equal_to(Datum::string("foo")), vec![
1663                Some("foo".to_string()),
1664            ]),
1665            // a != 'foo'
1666            (
1667                Reference::new("a").not_equal_to(Datum::string("foo")),
1668                vec![Some("bar".to_string())],
1669            ),
1670            // STARTS_WITH(a, 'foo')
1671            (Reference::new("a").starts_with(Datum::string("f")), vec![
1672                Some("foo".to_string()),
1673            ]),
1674            // NOT STARTS_WITH(a, 'foo')
1675            (
1676                Reference::new("a").not_starts_with(Datum::string("f")),
1677                vec![Some("bar".to_string())],
1678            ),
1679            // a < 'foo'
1680            (Reference::new("a").less_than(Datum::string("foo")), vec![
1681                Some("bar".to_string()),
1682            ]),
1683            // a <= 'foo'
1684            (
1685                Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
1686                vec![Some("foo".to_string()), Some("bar".to_string())],
1687            ),
1688            // a > 'foo'
1689            (
1690                Reference::new("a").greater_than(Datum::string("bar")),
1691                vec![Some("foo".to_string())],
1692            ),
1693            // a >= 'foo'
1694            (
1695                Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
1696                vec![Some("foo".to_string())],
1697            ),
1698            // a IN ('foo', 'bar')
1699            (
1700                Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
1701                vec![Some("foo".to_string())],
1702            ),
1703            // a NOT IN ('foo', 'bar')
1704            (
1705                Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
1706                vec![Some("bar".to_string())],
1707            ),
1708        ];
1709
1710        // Table data: ["foo", "bar"]
1711        let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
1712
1713        let (file_io, schema, table_location, _temp_dir) =
1714            setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
1715        let reader = ArrowReaderBuilder::new(file_io).build();
1716
1717        for (predicate, expected) in predicates {
1718            println!("testing predicate {predicate}");
1719            let result_data = test_perform_read(
1720                predicate.clone(),
1721                schema.clone(),
1722                table_location.clone(),
1723                reader.clone(),
1724            )
1725            .await;
1726
1727            assert_eq!(result_data, expected, "predicate={predicate}");
1728        }
1729    }
1730
1731    async fn test_perform_read(
1732        predicate: Predicate,
1733        schema: SchemaRef,
1734        table_location: String,
1735        reader: ArrowReader,
1736    ) -> Vec<Option<String>> {
1737        let tasks = Box::pin(futures::stream::iter(
1738            vec![Ok(FileScanTask {
1739                start: 0,
1740                length: 0,
1741                record_count: None,
1742                data_file_path: format!("{}/1.parquet", table_location),
1743                data_file_format: DataFileFormat::Parquet,
1744                schema: schema.clone(),
1745                project_field_ids: vec![1],
1746                predicate: Some(predicate.bind(schema, true).unwrap()),
1747                deletes: vec![],
1748            })]
1749            .into_iter(),
1750        )) as FileScanTaskStream;
1751
1752        let result = reader
1753            .read(tasks)
1754            .unwrap()
1755            .try_collect::<Vec<RecordBatch>>()
1756            .await
1757            .unwrap();
1758
1759        result[0].columns()[0]
1760            .as_string_opt::<i32>()
1761            .unwrap()
1762            .iter()
1763            .map(|v| v.map(ToOwned::to_owned))
1764            .collect::<Vec<_>>()
1765    }
1766
1767    fn setup_kleene_logic(
1768        data_for_col_a: Vec<Option<String>>,
1769        col_a_type: DataType,
1770    ) -> (FileIO, SchemaRef, String, TempDir) {
1771        let schema = Arc::new(
1772            Schema::builder()
1773                .with_schema_id(1)
1774                .with_fields(vec![
1775                    NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
1776                ])
1777                .build()
1778                .unwrap(),
1779        );
1780
1781        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1782            Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
1783                PARQUET_FIELD_ID_META_KEY.to_string(),
1784                "1".to_string(),
1785            )])),
1786        ]));
1787
1788        let tmp_dir = TempDir::new().unwrap();
1789        let table_location = tmp_dir.path().to_str().unwrap().to_string();
1790
1791        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
1792
1793        let col = match col_a_type {
1794            DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
1795            DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
1796            _ => panic!("unexpected col_a_type"),
1797        };
1798
1799        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
1800
1801        // Write the Parquet files
1802        let props = WriterProperties::builder()
1803            .set_compression(Compression::SNAPPY)
1804            .build();
1805
1806        let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
1807        let mut writer =
1808            ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1809
1810        writer.write(&to_write).expect("Writing batch");
1811
1812        // writer must be closed to write footer
1813        writer.close().unwrap();
1814
1815        (file_io, schema, table_location, tmp_dir)
1816    }
1817
1818    #[test]
1819    fn test_build_deletes_row_selection() {
1820        let schema_descr = get_test_schema_descr();
1821
1822        let mut columns = vec![];
1823        for ptr in schema_descr.columns() {
1824            let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
1825            columns.push(column);
1826        }
1827
1828        let row_groups_metadata = vec![
1829            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
1830            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
1831            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
1832            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
1833            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
1834        ];
1835
1836        let selected_row_groups = Some(vec![1, 3]);
1837
1838        /* cases to cover:
1839           * {skip|select} {first|intermediate|last} {one row|multiple rows} in
1840             {first|intermediate|last} {skipped|selected} row group
1841           * row group selection disabled
1842        */
1843
1844        let positional_deletes = RoaringTreemap::from_iter(&[
1845            1, // in skipped rg 0, should be ignored
1846            3, // run of three consecutive items in skipped rg0
1847            4, 5, 998, // two consecutive items at end of skipped rg0
1848            999, 1000, // solitary row at start of selected rg1 (1, 9)
1849            1010, // run of 3 rows in selected rg1
1850            1011, 1012, // (3, 485)
1851            1498, // run of two items at end of selected rg1
1852            1499, 1500, // run of two items at start of skipped rg2
1853            1501, 1600, // should ignore, in skipped rg2
1854            1999, // single row at end of skipped rg2
1855            2000, // run of two items at start of selected rg3
1856            2001, // (4, 98)
1857            2100, // single row in selected row group 3 (1, 99)
1858            2200, // run of 3 consecutive rows in selected row group 3
1859            2201, 2202, // (3, 796)
1860            2999, // single item at end of selected rg3 (1)
1861            3000, // single item at start of skipped rg4
1862        ]);
1863
1864        let positional_deletes = DeleteVector::new(positional_deletes);
1865
1866        // using selected row groups 1 and 3
1867        let result = ArrowReader::build_deletes_row_selection(
1868            &row_groups_metadata,
1869            &selected_row_groups,
1870            &positional_deletes,
1871        )
1872        .unwrap();
1873
1874        let expected = RowSelection::from(vec![
1875            RowSelector::skip(1),
1876            RowSelector::select(9),
1877            RowSelector::skip(3),
1878            RowSelector::select(485),
1879            RowSelector::skip(4),
1880            RowSelector::select(98),
1881            RowSelector::skip(1),
1882            RowSelector::select(99),
1883            RowSelector::skip(3),
1884            RowSelector::select(796),
1885            RowSelector::skip(1),
1886        ]);
1887
1888        assert_eq!(result, expected);
1889
1890        // selecting all row groups
1891        let result = ArrowReader::build_deletes_row_selection(
1892            &row_groups_metadata,
1893            &None,
1894            &positional_deletes,
1895        )
1896        .unwrap();
1897
1898        let expected = RowSelection::from(vec![
1899            RowSelector::select(1),
1900            RowSelector::skip(1),
1901            RowSelector::select(1),
1902            RowSelector::skip(3),
1903            RowSelector::select(992),
1904            RowSelector::skip(3),
1905            RowSelector::select(9),
1906            RowSelector::skip(3),
1907            RowSelector::select(485),
1908            RowSelector::skip(4),
1909            RowSelector::select(98),
1910            RowSelector::skip(1),
1911            RowSelector::select(398),
1912            RowSelector::skip(3),
1913            RowSelector::select(98),
1914            RowSelector::skip(1),
1915            RowSelector::select(99),
1916            RowSelector::skip(3),
1917            RowSelector::select(796),
1918            RowSelector::skip(2),
1919            RowSelector::select(499),
1920        ]);
1921
1922        assert_eq!(result, expected);
1923    }
1924
1925    fn build_test_row_group_meta(
1926        schema_descr: SchemaDescPtr,
1927        columns: Vec<ColumnChunkMetaData>,
1928        num_rows: i64,
1929        ordinal: i16,
1930    ) -> RowGroupMetaData {
1931        RowGroupMetaData::builder(schema_descr.clone())
1932            .set_num_rows(num_rows)
1933            .set_total_byte_size(2000)
1934            .set_column_metadata(columns)
1935            .set_ordinal(ordinal)
1936            .build()
1937            .unwrap()
1938    }
1939
1940    fn get_test_schema_descr() -> SchemaDescPtr {
1941        use parquet::schema::types::Type as SchemaType;
1942
1943        let schema = SchemaType::group_type_builder("schema")
1944            .with_fields(vec![
1945                Arc::new(
1946                    SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
1947                        .build()
1948                        .unwrap(),
1949                ),
1950                Arc::new(
1951                    SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
1952                        .build()
1953                        .unwrap(),
1954                ),
1955            ])
1956            .build()
1957            .unwrap();
1958
1959        Arc::new(SchemaDescriptor::new(Arc::new(schema)))
1960    }
1961}