iceberg/arrow/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet file data reader
19
20use std::collections::{HashMap, HashSet};
21use std::ops::Range;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
26use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
27use arrow_cast::cast::cast;
28use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
29use arrow_schema::{
30    ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
31};
32use arrow_string::like::starts_with;
33use bytes::Bytes;
34use fnv::FnvHashSet;
35use futures::future::BoxFuture;
36use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
37use parquet::arrow::arrow_reader::{
38    ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
39};
40use parquet::arrow::async_reader::AsyncFileReader;
41use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
42use parquet::file::metadata::{
43    PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
44};
45use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
46
47use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
48use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
49use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
50use crate::delete_vector::DeleteVector;
51use crate::error::Result;
52use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
53use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
54use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
55use crate::expr::{BoundPredicate, BoundReference};
56use crate::io::{FileIO, FileMetadata, FileRead};
57use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
58use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
59use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
60use crate::utils::available_parallelism;
61use crate::{Error, ErrorKind};
62
63/// Builder to create ArrowReader
64pub struct ArrowReaderBuilder {
65    batch_size: Option<usize>,
66    file_io: FileIO,
67    concurrency_limit_data_files: usize,
68    row_group_filtering_enabled: bool,
69    row_selection_enabled: bool,
70}
71
72impl ArrowReaderBuilder {
73    /// Create a new ArrowReaderBuilder
74    pub fn new(file_io: FileIO) -> Self {
75        let num_cpus = available_parallelism().get();
76
77        ArrowReaderBuilder {
78            batch_size: None,
79            file_io,
80            concurrency_limit_data_files: num_cpus,
81            row_group_filtering_enabled: true,
82            row_selection_enabled: false,
83        }
84    }
85
86    /// Sets the max number of in flight data files that are being fetched
87    pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
88        self.concurrency_limit_data_files = val;
89        self
90    }
91
92    /// Sets the desired size of batches in the response
93    /// to something other than the default
94    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
95        self.batch_size = Some(batch_size);
96        self
97    }
98
99    /// Determines whether to enable row group filtering.
100    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
101        self.row_group_filtering_enabled = row_group_filtering_enabled;
102        self
103    }
104
105    /// Determines whether to enable row selection.
106    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
107        self.row_selection_enabled = row_selection_enabled;
108        self
109    }
110
111    /// Build the ArrowReader.
112    pub fn build(self) -> ArrowReader {
113        ArrowReader {
114            batch_size: self.batch_size,
115            file_io: self.file_io.clone(),
116            delete_file_loader: CachingDeleteFileLoader::new(
117                self.file_io.clone(),
118                self.concurrency_limit_data_files,
119            ),
120            concurrency_limit_data_files: self.concurrency_limit_data_files,
121            row_group_filtering_enabled: self.row_group_filtering_enabled,
122            row_selection_enabled: self.row_selection_enabled,
123        }
124    }
125}
126
127/// Reads data from Parquet files
128#[derive(Clone)]
129pub struct ArrowReader {
130    batch_size: Option<usize>,
131    file_io: FileIO,
132    delete_file_loader: CachingDeleteFileLoader,
133
134    /// the maximum number of data files that can be fetched at the same time
135    concurrency_limit_data_files: usize,
136
137    row_group_filtering_enabled: bool,
138    row_selection_enabled: bool,
139}
140
141impl ArrowReader {
142    /// Take a stream of FileScanTasks and reads all the files.
143    /// Returns a stream of Arrow RecordBatches containing the data from the files
144    pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
145        let file_io = self.file_io.clone();
146        let batch_size = self.batch_size;
147        let concurrency_limit_data_files = self.concurrency_limit_data_files;
148        let row_group_filtering_enabled = self.row_group_filtering_enabled;
149        let row_selection_enabled = self.row_selection_enabled;
150
151        let stream = tasks
152            .map_ok(move |task| {
153                let file_io = file_io.clone();
154
155                Self::process_file_scan_task(
156                    task,
157                    batch_size,
158                    file_io,
159                    self.delete_file_loader.clone(),
160                    row_group_filtering_enabled,
161                    row_selection_enabled,
162                )
163            })
164            .map_err(|err| {
165                Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
166            })
167            .try_buffer_unordered(concurrency_limit_data_files)
168            .try_flatten_unordered(concurrency_limit_data_files);
169
170        Ok(Box::pin(stream) as ArrowRecordBatchStream)
171    }
172
173    #[allow(clippy::too_many_arguments)]
174    async fn process_file_scan_task(
175        task: FileScanTask,
176        batch_size: Option<usize>,
177        file_io: FileIO,
178        delete_file_loader: CachingDeleteFileLoader,
179        row_group_filtering_enabled: bool,
180        row_selection_enabled: bool,
181    ) -> Result<ArrowRecordBatchStream> {
182        let should_load_page_index =
183            (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
184
185        let delete_filter_rx =
186            delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
187
188        // Migrated tables lack field IDs, requiring us to inspect the schema to choose
189        // between field-ID-based or position-based projection
190        let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
191            &task.data_file_path,
192            file_io.clone(),
193            should_load_page_index,
194            None,
195        )
196        .await?;
197
198        // Check if Parquet file has embedded field IDs
199        // Corresponds to Java's ParquetSchemaUtil.hasIds()
200        // Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
201        let missing_field_ids = initial_stream_builder
202            .schema()
203            .fields()
204            .iter()
205            .next()
206            .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
207
208        // Three-branch schema resolution strategy matching Java's ReadConf constructor
209        //
210        // Per Iceberg spec Column Projection rules:
211        // "Columns in Iceberg data files are selected by field id. The table schema's column
212        //  names and order may change after a data file is written, and projection must be done
213        //  using field ids."
214        // https://iceberg.apache.org/spec/#column-projection
215        //
216        // When Parquet files lack field IDs (e.g., Hive/Spark migrations via add_files),
217        // we must assign field IDs BEFORE reading data to enable correct projection.
218        //
219        // Java's ReadConf determines field ID strategy:
220        // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
221        // - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
222        // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
223        let mut record_batch_stream_builder = if missing_field_ids {
224            // Parquet file lacks field IDs - must assign them before reading
225            let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
226                // Branch 2: Apply name mapping to assign correct Iceberg field IDs
227                // Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
228                // to columns without field id"
229                // Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
230                apply_name_mapping_to_arrow_schema(
231                    Arc::clone(initial_stream_builder.schema()),
232                    name_mapping,
233                )?
234            } else {
235                // Branch 3: No name mapping - use position-based fallback IDs
236                // Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
237                add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
238            };
239
240            let options = ArrowReaderOptions::new().with_schema(arrow_schema);
241
242            Self::create_parquet_record_batch_stream_builder(
243                &task.data_file_path,
244                file_io.clone(),
245                should_load_page_index,
246                Some(options),
247            )
248            .await?
249        } else {
250            // Branch 1: File has embedded field IDs - trust them
251            initial_stream_builder
252        };
253
254        // Filter out metadata fields for Parquet projection (they don't exist in files)
255        let project_field_ids_without_metadata: Vec<i32> = task
256            .project_field_ids
257            .iter()
258            .filter(|&&id| !is_metadata_field(id))
259            .copied()
260            .collect();
261
262        // Create projection mask based on field IDs
263        // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false)
264        // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match)
265        // - If fallback IDs: position-based projection (missing_field_ids=true)
266        let projection_mask = Self::get_arrow_projection_mask(
267            &project_field_ids_without_metadata,
268            &task.schema,
269            record_batch_stream_builder.parquet_schema(),
270            record_batch_stream_builder.schema(),
271            missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection
272        )?;
273
274        record_batch_stream_builder =
275            record_batch_stream_builder.with_projection(projection_mask.clone());
276
277        // RecordBatchTransformer performs any transformations required on the RecordBatches
278        // that come back from the file, such as type promotion, default column insertion,
279        // column re-ordering, partition constants, and virtual field addition (like _file)
280        let mut record_batch_transformer_builder =
281            RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
282
283        // Add the _file metadata column if it's in the projected fields
284        if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
285            let file_datum = Datum::string(task.data_file_path.clone());
286            record_batch_transformer_builder =
287                record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
288        }
289
290        if let (Some(partition_spec), Some(partition_data)) =
291            (task.partition_spec.clone(), task.partition.clone())
292        {
293            record_batch_transformer_builder =
294                record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
295        }
296
297        let mut record_batch_transformer = record_batch_transformer_builder.build();
298
299        if let Some(batch_size) = batch_size {
300            record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
301        }
302
303        let delete_filter = delete_filter_rx.await.unwrap()?;
304        let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
305
306        // In addition to the optional predicate supplied in the `FileScanTask`,
307        // we also have an optional predicate resulting from equality delete files.
308        // If both are present, we logical-AND them together to form a single filter
309        // predicate that we can pass to the `RecordBatchStreamBuilder`.
310        let final_predicate = match (&task.predicate, delete_predicate) {
311            (None, None) => None,
312            (Some(predicate), None) => Some(predicate.clone()),
313            (None, Some(ref predicate)) => Some(predicate.clone()),
314            (Some(filter_predicate), Some(delete_predicate)) => {
315                Some(filter_predicate.clone().and(delete_predicate))
316            }
317        };
318
319        // There are three possible sources for potential lists of selected RowGroup indices,
320        // and two for `RowSelection`s.
321        // Selected RowGroup index lists can come from three sources:
322        //   * When task.start and task.length specify a byte range (file splitting);
323        //   * When there are equality delete files that are applicable;
324        //   * When there is a scan predicate and row_group_filtering_enabled = true.
325        // `RowSelection`s can be created in either or both of the following cases:
326        //   * When there are positional delete files that are applicable;
327        //   * When there is a scan predicate and row_selection_enabled = true
328        // Note that row group filtering from predicates only happens when
329        // there is a scan predicate AND row_group_filtering_enabled = true,
330        // but we perform row selection filtering if there are applicable
331        // equality delete files OR (there is a scan predicate AND row_selection_enabled),
332        // since the only implemented method of applying positional deletes is
333        // by using a `RowSelection`.
334        let mut selected_row_group_indices = None;
335        let mut row_selection = None;
336
337        // Filter row groups based on byte range from task.start and task.length.
338        // If both start and length are 0, read the entire file (backwards compatibility).
339        if task.start != 0 || task.length != 0 {
340            let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
341                record_batch_stream_builder.metadata(),
342                task.start,
343                task.length,
344            )?;
345            selected_row_group_indices = Some(byte_range_filtered_row_groups);
346        }
347
348        if let Some(predicate) = final_predicate {
349            let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
350                record_batch_stream_builder.parquet_schema(),
351                &predicate,
352            )?;
353
354            let row_filter = Self::get_row_filter(
355                &predicate,
356                record_batch_stream_builder.parquet_schema(),
357                &iceberg_field_ids,
358                &field_id_map,
359            )?;
360            record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
361
362            if row_group_filtering_enabled {
363                let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
364                    &predicate,
365                    record_batch_stream_builder.metadata(),
366                    &field_id_map,
367                    &task.schema,
368                )?;
369
370                // Merge predicate-based filtering with byte range filtering (if present)
371                // by taking the intersection of both filters
372                selected_row_group_indices = match selected_row_group_indices {
373                    Some(byte_range_filtered) => {
374                        // Keep only row groups that are in both filters
375                        let intersection: Vec<usize> = byte_range_filtered
376                            .into_iter()
377                            .filter(|idx| predicate_filtered_row_groups.contains(idx))
378                            .collect();
379                        Some(intersection)
380                    }
381                    None => Some(predicate_filtered_row_groups),
382                };
383            }
384
385            if row_selection_enabled {
386                row_selection = Some(Self::get_row_selection_for_filter_predicate(
387                    &predicate,
388                    record_batch_stream_builder.metadata(),
389                    &selected_row_group_indices,
390                    &field_id_map,
391                    &task.schema,
392                )?);
393            }
394        }
395
396        let positional_delete_indexes = delete_filter.get_delete_vector(&task);
397
398        if let Some(positional_delete_indexes) = positional_delete_indexes {
399            let delete_row_selection = {
400                let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
401
402                Self::build_deletes_row_selection(
403                    record_batch_stream_builder.metadata().row_groups(),
404                    &selected_row_group_indices,
405                    &positional_delete_indexes,
406                )
407            }?;
408
409            // merge the row selection from the delete files with the row selection
410            // from the filter predicate, if there is one from the filter predicate
411            row_selection = match row_selection {
412                None => Some(delete_row_selection),
413                Some(filter_row_selection) => {
414                    Some(filter_row_selection.intersection(&delete_row_selection))
415                }
416            };
417        }
418
419        if let Some(row_selection) = row_selection {
420            record_batch_stream_builder =
421                record_batch_stream_builder.with_row_selection(row_selection);
422        }
423
424        if let Some(selected_row_group_indices) = selected_row_group_indices {
425            record_batch_stream_builder =
426                record_batch_stream_builder.with_row_groups(selected_row_group_indices);
427        }
428
429        // Build the batch stream and send all the RecordBatches that it generates
430        // to the requester.
431        let record_batch_stream =
432            record_batch_stream_builder
433                .build()?
434                .map(move |batch| match batch {
435                    Ok(batch) => {
436                        // Process the record batch (type promotion, column reordering, virtual fields, etc.)
437                        record_batch_transformer.process_record_batch(batch)
438                    }
439                    Err(err) => Err(err.into()),
440                });
441
442        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
443    }
444
445    pub(crate) async fn create_parquet_record_batch_stream_builder(
446        data_file_path: &str,
447        file_io: FileIO,
448        should_load_page_index: bool,
449        arrow_reader_options: Option<ArrowReaderOptions>,
450    ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead + Sized>>> {
451        // Get the metadata for the Parquet file we need to read and build
452        // a reader for the data within
453        let parquet_file = file_io.new_input(data_file_path)?;
454        let (parquet_metadata, parquet_reader) =
455            try_join!(parquet_file.metadata(), parquet_file.reader())?;
456        let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader)
457            .with_preload_column_index(true)
458            .with_preload_offset_index(true)
459            .with_preload_page_index(should_load_page_index);
460
461        // Create the record batch stream builder, which wraps the parquet file reader
462        let options = arrow_reader_options.unwrap_or_default();
463        let record_batch_stream_builder =
464            ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
465        Ok(record_batch_stream_builder)
466    }
467
468    /// computes a `RowSelection` from positional delete indices.
469    ///
470    /// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
471    /// as having been deleted by a positional delete, taking into account any row groups that have
472    /// been skipped entirely by the filter predicate
473    fn build_deletes_row_selection(
474        row_group_metadata_list: &[RowGroupMetaData],
475        selected_row_groups: &Option<Vec<usize>>,
476        positional_deletes: &DeleteVector,
477    ) -> Result<RowSelection> {
478        let mut results: Vec<RowSelector> = Vec::new();
479        let mut selected_row_groups_idx = 0;
480        let mut current_row_group_base_idx: u64 = 0;
481        let mut delete_vector_iter = positional_deletes.iter();
482        let mut next_deleted_row_idx_opt = delete_vector_iter.next();
483
484        for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
485            let row_group_num_rows = row_group_metadata.num_rows() as u64;
486            let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
487
488            // if row group selection is enabled,
489            if let Some(selected_row_groups) = selected_row_groups {
490                // if we've consumed all the selected row groups, we're done
491                if selected_row_groups_idx == selected_row_groups.len() {
492                    break;
493                }
494
495                if idx == selected_row_groups[selected_row_groups_idx] {
496                    // we're in a selected row group. Increment selected_row_groups_idx
497                    // so that next time around the for loop we're looking for the next
498                    // selected row group
499                    selected_row_groups_idx += 1;
500                } else {
501                    // Advance iterator past all deletes in the skipped row group.
502                    // advance_to() positions the iterator to the first delete >= next_row_group_base_idx.
503                    // However, if our cached next_deleted_row_idx_opt is in the skipped range,
504                    // we need to call next() to update the cache with the newly positioned value.
505                    delete_vector_iter.advance_to(next_row_group_base_idx);
506                    // Only update the cache if the cached value is stale (in the skipped range)
507                    if let Some(cached_idx) = next_deleted_row_idx_opt
508                        && cached_idx < next_row_group_base_idx
509                    {
510                        next_deleted_row_idx_opt = delete_vector_iter.next();
511                    }
512
513                    // still increment the current page base index but then skip to the next row group
514                    // in the file
515                    current_row_group_base_idx += row_group_num_rows;
516                    continue;
517                }
518            }
519
520            let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
521                Some(next_deleted_row_idx) => {
522                    // if the index of the next deleted row is beyond this row group, add a selection for
523                    // the remainder of this row group and skip to the next row group
524                    if next_deleted_row_idx >= next_row_group_base_idx {
525                        results.push(RowSelector::select(row_group_num_rows as usize));
526                        current_row_group_base_idx += row_group_num_rows;
527                        continue;
528                    }
529
530                    next_deleted_row_idx
531                }
532
533                // If there are no more pos deletes, add a selector for the entirety of this row group.
534                _ => {
535                    results.push(RowSelector::select(row_group_num_rows as usize));
536                    current_row_group_base_idx += row_group_num_rows;
537                    continue;
538                }
539            };
540
541            let mut current_idx = current_row_group_base_idx;
542            'chunks: while next_deleted_row_idx < next_row_group_base_idx {
543                // `select` all rows that precede the next delete index
544                if current_idx < next_deleted_row_idx {
545                    let run_length = next_deleted_row_idx - current_idx;
546                    results.push(RowSelector::select(run_length as usize));
547                    current_idx += run_length;
548                }
549
550                // `skip` all consecutive deleted rows in the current row group
551                let mut run_length = 0;
552                while next_deleted_row_idx == current_idx
553                    && next_deleted_row_idx < next_row_group_base_idx
554                {
555                    run_length += 1;
556                    current_idx += 1;
557
558                    next_deleted_row_idx_opt = delete_vector_iter.next();
559                    next_deleted_row_idx = match next_deleted_row_idx_opt {
560                        Some(next_deleted_row_idx) => next_deleted_row_idx,
561                        _ => {
562                            // We've processed the final positional delete.
563                            // Conclude the skip and then break so that we select the remaining
564                            // rows in the row group and move on to the next row group
565                            results.push(RowSelector::skip(run_length));
566                            break 'chunks;
567                        }
568                    };
569                }
570                if run_length > 0 {
571                    results.push(RowSelector::skip(run_length));
572                }
573            }
574
575            if current_idx < next_row_group_base_idx {
576                results.push(RowSelector::select(
577                    (next_row_group_base_idx - current_idx) as usize,
578                ));
579            }
580
581            current_row_group_base_idx += row_group_num_rows;
582        }
583
584        Ok(results.into())
585    }
586
587    fn build_field_id_set_and_map(
588        parquet_schema: &SchemaDescriptor,
589        predicate: &BoundPredicate,
590    ) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
591        // Collects all Iceberg field IDs referenced in the filter predicate
592        let mut collector = CollectFieldIdVisitor {
593            field_ids: HashSet::default(),
594        };
595        visit(&mut collector, predicate)?;
596
597        let iceberg_field_ids = collector.field_ids();
598
599        // Without embedded field IDs, we fall back to position-based mapping for compatibility
600        let field_id_map = match build_field_id_map(parquet_schema)? {
601            Some(map) => map,
602            None => build_fallback_field_id_map(parquet_schema),
603        };
604
605        Ok((iceberg_field_ids, field_id_map))
606    }
607
608    /// Recursively extract leaf field IDs because Parquet projection works at the leaf column level.
609    /// Nested types (struct/list/map) are flattened in Parquet's columnar format.
610    fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
611        match field.field_type.as_ref() {
612            Type::Primitive(_) => {
613                field_ids.push(field.id);
614            }
615            Type::Struct(struct_type) => {
616                for nested_field in struct_type.fields() {
617                    Self::include_leaf_field_id(nested_field, field_ids);
618                }
619            }
620            Type::List(list_type) => {
621                Self::include_leaf_field_id(&list_type.element_field, field_ids);
622            }
623            Type::Map(map_type) => {
624                Self::include_leaf_field_id(&map_type.key_field, field_ids);
625                Self::include_leaf_field_id(&map_type.value_field, field_ids);
626            }
627        }
628    }
629
630    fn get_arrow_projection_mask(
631        field_ids: &[i32],
632        iceberg_schema_of_task: &Schema,
633        parquet_schema: &SchemaDescriptor,
634        arrow_schema: &ArrowSchemaRef,
635        use_fallback: bool, // Whether file lacks embedded field IDs (e.g., migrated from Hive/Spark)
636    ) -> Result<ProjectionMask> {
637        fn type_promotion_is_valid(
638            file_type: Option<&PrimitiveType>,
639            projected_type: Option<&PrimitiveType>,
640        ) -> bool {
641            match (file_type, projected_type) {
642                (Some(lhs), Some(rhs)) if lhs == rhs => true,
643                (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
644                (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
645                (
646                    Some(PrimitiveType::Decimal {
647                        precision: file_precision,
648                        scale: file_scale,
649                    }),
650                    Some(PrimitiveType::Decimal {
651                        precision: requested_precision,
652                        scale: requested_scale,
653                    }),
654                ) if requested_precision >= file_precision && file_scale == requested_scale => true,
655                // Uuid will be store as Fixed(16) in parquet file, so the read back type will be Fixed(16).
656                (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
657                _ => false,
658            }
659        }
660
661        if field_ids.is_empty() {
662            return Ok(ProjectionMask::all());
663        }
664
665        if use_fallback {
666            // Position-based projection necessary because file lacks embedded field IDs
667            Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
668        } else {
669            // Field-ID-based projection using embedded field IDs from Parquet metadata
670
671            // Parquet's columnar format requires leaf-level (not top-level struct/list/map) projection
672            let mut leaf_field_ids = vec![];
673            for field_id in field_ids {
674                let field = iceberg_schema_of_task.field_by_id(*field_id);
675                if let Some(field) = field {
676                    Self::include_leaf_field_id(field, &mut leaf_field_ids);
677                }
678            }
679
680            Self::get_arrow_projection_mask_with_field_ids(
681                &leaf_field_ids,
682                iceberg_schema_of_task,
683                parquet_schema,
684                arrow_schema,
685                type_promotion_is_valid,
686            )
687        }
688    }
689
690    /// Standard projection using embedded field IDs from Parquet metadata.
691    /// For iceberg-java compatibility with ParquetSchemaUtil.pruneColumns().
692    fn get_arrow_projection_mask_with_field_ids(
693        leaf_field_ids: &[i32],
694        iceberg_schema_of_task: &Schema,
695        parquet_schema: &SchemaDescriptor,
696        arrow_schema: &ArrowSchemaRef,
697        type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool,
698    ) -> Result<ProjectionMask> {
699        let mut column_map = HashMap::new();
700        let fields = arrow_schema.fields();
701
702        // Pre-project only the fields that have been selected, possibly avoiding converting
703        // some Arrow types that are not yet supported.
704        let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
705        let projected_arrow_schema = ArrowSchema::new_with_metadata(
706            fields.filter_leaves(|_, f| {
707                f.metadata()
708                    .get(PARQUET_FIELD_ID_META_KEY)
709                    .and_then(|field_id| i32::from_str(field_id).ok())
710                    .is_some_and(|field_id| {
711                        projected_fields.insert((*f).clone(), field_id);
712                        leaf_field_ids.contains(&field_id)
713                    })
714            }),
715            arrow_schema.metadata().clone(),
716        );
717        let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
718
719        fields.filter_leaves(|idx, field| {
720            let Some(field_id) = projected_fields.get(field).cloned() else {
721                return false;
722            };
723
724            let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
725            let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
726
727            if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
728                return false;
729            }
730
731            if !type_promotion_is_valid(
732                parquet_iceberg_field
733                    .unwrap()
734                    .field_type
735                    .as_primitive_type(),
736                iceberg_field.unwrap().field_type.as_primitive_type(),
737            ) {
738                return false;
739            }
740
741            column_map.insert(field_id, idx);
742            true
743        });
744
745        // Schema evolution: New columns may not exist in old Parquet files.
746        // We only project existing columns; RecordBatchTransformer adds default/NULL values.
747        let mut indices = vec![];
748        for field_id in leaf_field_ids {
749            if let Some(col_idx) = column_map.get(field_id) {
750                indices.push(*col_idx);
751            }
752        }
753
754        if indices.is_empty() {
755            // Edge case: All requested columns are new (don't exist in file).
756            // Project all columns so RecordBatchTransformer has a batch to transform.
757            Ok(ProjectionMask::all())
758        } else {
759            Ok(ProjectionMask::leaves(parquet_schema, indices))
760        }
761    }
762
763    /// Fallback projection for Parquet files without field IDs.
764    /// Uses position-based matching: field ID N → column position N-1.
765    /// Projects entire top-level columns (including nested content) for iceberg-java compatibility.
766    fn get_arrow_projection_mask_fallback(
767        field_ids: &[i32],
768        parquet_schema: &SchemaDescriptor,
769    ) -> Result<ProjectionMask> {
770        // Position-based: field_id N → column N-1 (field IDs are 1-indexed)
771        let parquet_root_fields = parquet_schema.root_schema().get_fields();
772        let mut root_indices = vec![];
773
774        for field_id in field_ids.iter() {
775            let parquet_pos = (*field_id - 1) as usize;
776
777            if parquet_pos < parquet_root_fields.len() {
778                root_indices.push(parquet_pos);
779            }
780            // RecordBatchTransformer adds missing columns with NULL values
781        }
782
783        if root_indices.is_empty() {
784            Ok(ProjectionMask::all())
785        } else {
786            Ok(ProjectionMask::roots(parquet_schema, root_indices))
787        }
788    }
789
790    fn get_row_filter(
791        predicates: &BoundPredicate,
792        parquet_schema: &SchemaDescriptor,
793        iceberg_field_ids: &HashSet<i32>,
794        field_id_map: &HashMap<i32, usize>,
795    ) -> Result<RowFilter> {
796        // Collect Parquet column indices from field ids.
797        // If the field id is not found in Parquet schema, it will be ignored due to schema evolution.
798        let mut column_indices = iceberg_field_ids
799            .iter()
800            .filter_map(|field_id| field_id_map.get(field_id).cloned())
801            .collect::<Vec<_>>();
802        column_indices.sort();
803
804        // The converter that converts `BoundPredicates` to `ArrowPredicates`
805        let mut converter = PredicateConverter {
806            parquet_schema,
807            column_map: field_id_map,
808            column_indices: &column_indices,
809        };
810
811        // After collecting required leaf column indices used in the predicate,
812        // creates the projection mask for the Arrow predicates.
813        let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
814        let predicate_func = visit(&mut converter, predicates)?;
815        let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
816        Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
817    }
818
819    fn get_selected_row_group_indices(
820        predicate: &BoundPredicate,
821        parquet_metadata: &Arc<ParquetMetaData>,
822        field_id_map: &HashMap<i32, usize>,
823        snapshot_schema: &Schema,
824    ) -> Result<Vec<usize>> {
825        let row_groups_metadata = parquet_metadata.row_groups();
826        let mut results = Vec::with_capacity(row_groups_metadata.len());
827
828        for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
829            if RowGroupMetricsEvaluator::eval(
830                predicate,
831                row_group_metadata,
832                field_id_map,
833                snapshot_schema,
834            )? {
835                results.push(idx);
836            }
837        }
838
839        Ok(results)
840    }
841
842    fn get_row_selection_for_filter_predicate(
843        predicate: &BoundPredicate,
844        parquet_metadata: &Arc<ParquetMetaData>,
845        selected_row_groups: &Option<Vec<usize>>,
846        field_id_map: &HashMap<i32, usize>,
847        snapshot_schema: &Schema,
848    ) -> Result<RowSelection> {
849        let Some(column_index) = parquet_metadata.column_index() else {
850            return Err(Error::new(
851                ErrorKind::Unexpected,
852                "Parquet file metadata does not contain a column index",
853            ));
854        };
855
856        let Some(offset_index) = parquet_metadata.offset_index() else {
857            return Err(Error::new(
858                ErrorKind::Unexpected,
859                "Parquet file metadata does not contain an offset index",
860            ));
861        };
862
863        // If all row groups were filtered out, return an empty RowSelection (select no rows)
864        if let Some(selected_row_groups) = selected_row_groups
865            && selected_row_groups.is_empty()
866        {
867            return Ok(RowSelection::from(Vec::new()));
868        }
869
870        let mut selected_row_groups_idx = 0;
871
872        let page_index = column_index
873            .iter()
874            .enumerate()
875            .zip(offset_index)
876            .zip(parquet_metadata.row_groups());
877
878        let mut results = Vec::new();
879        for (((idx, column_index), offset_index), row_group_metadata) in page_index {
880            if let Some(selected_row_groups) = selected_row_groups {
881                // skip row groups that aren't present in selected_row_groups
882                if idx == selected_row_groups[selected_row_groups_idx] {
883                    selected_row_groups_idx += 1;
884                } else {
885                    continue;
886                }
887            }
888
889            let selections_for_page = PageIndexEvaluator::eval(
890                predicate,
891                column_index,
892                offset_index,
893                row_group_metadata,
894                field_id_map,
895                snapshot_schema,
896            )?;
897
898            results.push(selections_for_page);
899
900            if let Some(selected_row_groups) = selected_row_groups
901                && selected_row_groups_idx == selected_row_groups.len()
902            {
903                break;
904            }
905        }
906
907        Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
908    }
909
910    /// Filters row groups by byte range to support Iceberg's file splitting.
911    ///
912    /// Iceberg splits large files at row group boundaries, so we only read row groups
913    /// whose byte ranges overlap with [start, start+length).
914    fn filter_row_groups_by_byte_range(
915        parquet_metadata: &Arc<ParquetMetaData>,
916        start: u64,
917        length: u64,
918    ) -> Result<Vec<usize>> {
919        let row_groups = parquet_metadata.row_groups();
920        let mut selected = Vec::new();
921        let end = start + length;
922
923        // Row groups are stored sequentially after the 4-byte magic header.
924        let mut current_byte_offset = 4u64;
925
926        for (idx, row_group) in row_groups.iter().enumerate() {
927            let row_group_size = row_group.compressed_size() as u64;
928            let row_group_end = current_byte_offset + row_group_size;
929
930            if current_byte_offset < end && start < row_group_end {
931                selected.push(idx);
932            }
933
934            current_byte_offset = row_group_end;
935        }
936
937        Ok(selected)
938    }
939}
940
941/// Build the map of parquet field id to Parquet column index in the schema.
942/// Returns None if the Parquet file doesn't have field IDs embedded (e.g., migrated tables).
943fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMap<i32, usize>>> {
944    let mut column_map = HashMap::new();
945
946    for (idx, field) in parquet_schema.columns().iter().enumerate() {
947        let field_type = field.self_type();
948        match field_type {
949            ParquetType::PrimitiveType { basic_info, .. } => {
950                if !basic_info.has_id() {
951                    return Ok(None);
952                }
953                column_map.insert(basic_info.id(), idx);
954            }
955            ParquetType::GroupType { .. } => {
956                return Err(Error::new(
957                    ErrorKind::DataInvalid,
958                    format!(
959                        "Leave column in schema should be primitive type but got {field_type:?}"
960                    ),
961                ));
962            }
963        };
964    }
965
966    Ok(Some(column_map))
967}
968
969/// Build a fallback field ID map for Parquet files without embedded field IDs.
970/// Position-based (1, 2, 3, ...) for compatibility with iceberg-java migrations.
971fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
972    let mut column_map = HashMap::new();
973
974    // 1-indexed to match iceberg-java's convention
975    for (idx, _field) in parquet_schema.columns().iter().enumerate() {
976        let field_id = (idx + 1) as i32;
977        column_map.insert(field_id, idx);
978    }
979
980    column_map
981}
982
983/// Apply name mapping to Arrow schema for Parquet files lacking field IDs.
984///
985/// Assigns Iceberg field IDs based on column names using the name mapping,
986/// enabling correct projection on migrated files (e.g., from Hive/Spark via add_files).
987///
988/// Per Iceberg spec Column Projection rule #2:
989/// "Use schema.name-mapping.default metadata to map field id to columns without field id"
990/// https://iceberg.apache.org/spec/#column-projection
991///
992/// Corresponds to Java's ParquetSchemaUtil.applyNameMapping() and ApplyNameMapping visitor.
993/// The key difference is Java operates on Parquet MessageType, while we operate on Arrow Schema.
994///
995/// # Arguments
996/// * `arrow_schema` - Arrow schema from Parquet file (without field IDs)
997/// * `name_mapping` - Name mapping from table metadata (TableProperties.DEFAULT_NAME_MAPPING)
998///
999/// # Returns
1000/// Arrow schema with field IDs assigned based on name mapping
1001fn apply_name_mapping_to_arrow_schema(
1002    arrow_schema: ArrowSchemaRef,
1003    name_mapping: &NameMapping,
1004) -> Result<Arc<ArrowSchema>> {
1005    debug_assert!(
1006        arrow_schema
1007            .fields()
1008            .iter()
1009            .next()
1010            .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1011        "Schema already has field IDs - name mapping should not be applied"
1012    );
1013
1014    use arrow_schema::Field;
1015
1016    let fields_with_mapped_ids: Vec<_> = arrow_schema
1017        .fields()
1018        .iter()
1019        .map(|field| {
1020            // Look up this column name in name mapping to get the Iceberg field ID.
1021            // Corresponds to Java's ApplyNameMapping visitor which calls
1022            // nameMapping.find(currentPath()) and returns field.withId() if found.
1023            //
1024            // If the field isn't in the mapping, leave it WITHOUT assigning an ID
1025            // (matching Java's behavior of returning the field unchanged).
1026            // Later, during projection, fields without IDs are filtered out.
1027            let mapped_field_opt = name_mapping
1028                .fields()
1029                .iter()
1030                .find(|f| f.names().contains(&field.name().to_string()));
1031
1032            let mut metadata = field.metadata().clone();
1033
1034            if let Some(mapped_field) = mapped_field_opt
1035                && let Some(field_id) = mapped_field.field_id()
1036            {
1037                // Field found in mapping with a field_id → assign it
1038                metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1039            }
1040            // If field_id is None, leave the field without an ID (will be filtered by projection)
1041
1042            Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1043                .with_metadata(metadata)
1044        })
1045        .collect();
1046
1047    Ok(Arc::new(ArrowSchema::new_with_metadata(
1048        fields_with_mapped_ids,
1049        arrow_schema.metadata().clone(),
1050    )))
1051}
1052
1053/// Add position-based fallback field IDs to Arrow schema for Parquet files lacking them.
1054/// Enables projection on migrated files (e.g., from Hive/Spark).
1055///
1056/// Why at schema level (not per-batch): Efficiency - avoids repeated schema modification.
1057/// Why only top-level: Nested projection uses leaf column indices, not parent struct IDs.
1058/// Why 1-indexed: Compatibility with iceberg-java's ParquetSchemaUtil.addFallbackIds().
1059fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<ArrowSchema> {
1060    debug_assert!(
1061        arrow_schema
1062            .fields()
1063            .iter()
1064            .next()
1065            .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1066        "Schema already has field IDs"
1067    );
1068
1069    use arrow_schema::Field;
1070
1071    let fields_with_fallback_ids: Vec<_> = arrow_schema
1072        .fields()
1073        .iter()
1074        .enumerate()
1075        .map(|(pos, field)| {
1076            let mut metadata = field.metadata().clone();
1077            let field_id = (pos + 1) as i32; // 1-indexed for Java compatibility
1078            metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1079
1080            Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1081                .with_metadata(metadata)
1082        })
1083        .collect();
1084
1085    Arc::new(ArrowSchema::new_with_metadata(
1086        fields_with_fallback_ids,
1087        arrow_schema.metadata().clone(),
1088    ))
1089}
1090
1091/// A visitor to collect field ids from bound predicates.
1092struct CollectFieldIdVisitor {
1093    field_ids: HashSet<i32>,
1094}
1095
1096impl CollectFieldIdVisitor {
1097    fn field_ids(self) -> HashSet<i32> {
1098        self.field_ids
1099    }
1100}
1101
1102impl BoundPredicateVisitor for CollectFieldIdVisitor {
1103    type T = ();
1104
1105    fn always_true(&mut self) -> Result<()> {
1106        Ok(())
1107    }
1108
1109    fn always_false(&mut self) -> Result<()> {
1110        Ok(())
1111    }
1112
1113    fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1114        Ok(())
1115    }
1116
1117    fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1118        Ok(())
1119    }
1120
1121    fn not(&mut self, _inner: ()) -> Result<()> {
1122        Ok(())
1123    }
1124
1125    fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1126        self.field_ids.insert(reference.field().id);
1127        Ok(())
1128    }
1129
1130    fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1131        self.field_ids.insert(reference.field().id);
1132        Ok(())
1133    }
1134
1135    fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1136        self.field_ids.insert(reference.field().id);
1137        Ok(())
1138    }
1139
1140    fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1141        self.field_ids.insert(reference.field().id);
1142        Ok(())
1143    }
1144
1145    fn less_than(
1146        &mut self,
1147        reference: &BoundReference,
1148        _literal: &Datum,
1149        _predicate: &BoundPredicate,
1150    ) -> Result<()> {
1151        self.field_ids.insert(reference.field().id);
1152        Ok(())
1153    }
1154
1155    fn less_than_or_eq(
1156        &mut self,
1157        reference: &BoundReference,
1158        _literal: &Datum,
1159        _predicate: &BoundPredicate,
1160    ) -> Result<()> {
1161        self.field_ids.insert(reference.field().id);
1162        Ok(())
1163    }
1164
1165    fn greater_than(
1166        &mut self,
1167        reference: &BoundReference,
1168        _literal: &Datum,
1169        _predicate: &BoundPredicate,
1170    ) -> Result<()> {
1171        self.field_ids.insert(reference.field().id);
1172        Ok(())
1173    }
1174
1175    fn greater_than_or_eq(
1176        &mut self,
1177        reference: &BoundReference,
1178        _literal: &Datum,
1179        _predicate: &BoundPredicate,
1180    ) -> Result<()> {
1181        self.field_ids.insert(reference.field().id);
1182        Ok(())
1183    }
1184
1185    fn eq(
1186        &mut self,
1187        reference: &BoundReference,
1188        _literal: &Datum,
1189        _predicate: &BoundPredicate,
1190    ) -> Result<()> {
1191        self.field_ids.insert(reference.field().id);
1192        Ok(())
1193    }
1194
1195    fn not_eq(
1196        &mut self,
1197        reference: &BoundReference,
1198        _literal: &Datum,
1199        _predicate: &BoundPredicate,
1200    ) -> Result<()> {
1201        self.field_ids.insert(reference.field().id);
1202        Ok(())
1203    }
1204
1205    fn starts_with(
1206        &mut self,
1207        reference: &BoundReference,
1208        _literal: &Datum,
1209        _predicate: &BoundPredicate,
1210    ) -> Result<()> {
1211        self.field_ids.insert(reference.field().id);
1212        Ok(())
1213    }
1214
1215    fn not_starts_with(
1216        &mut self,
1217        reference: &BoundReference,
1218        _literal: &Datum,
1219        _predicate: &BoundPredicate,
1220    ) -> Result<()> {
1221        self.field_ids.insert(reference.field().id);
1222        Ok(())
1223    }
1224
1225    fn r#in(
1226        &mut self,
1227        reference: &BoundReference,
1228        _literals: &FnvHashSet<Datum>,
1229        _predicate: &BoundPredicate,
1230    ) -> Result<()> {
1231        self.field_ids.insert(reference.field().id);
1232        Ok(())
1233    }
1234
1235    fn not_in(
1236        &mut self,
1237        reference: &BoundReference,
1238        _literals: &FnvHashSet<Datum>,
1239        _predicate: &BoundPredicate,
1240    ) -> Result<()> {
1241        self.field_ids.insert(reference.field().id);
1242        Ok(())
1243    }
1244}
1245
1246/// A visitor to convert Iceberg bound predicates to Arrow predicates.
1247struct PredicateConverter<'a> {
1248    /// The Parquet schema descriptor.
1249    pub parquet_schema: &'a SchemaDescriptor,
1250    /// The map between field id and leaf column index in Parquet schema.
1251    pub column_map: &'a HashMap<i32, usize>,
1252    /// The required column indices in Parquet schema for the predicates.
1253    pub column_indices: &'a Vec<usize>,
1254}
1255
1256impl PredicateConverter<'_> {
1257    /// When visiting a bound reference, we return index of the leaf column in the
1258    /// required column indices which is used to project the column in the record batch.
1259    /// Return None if the field id is not found in the column map, which is possible
1260    /// due to schema evolution.
1261    fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
1262        // The leaf column's index in Parquet schema.
1263        if let Some(column_idx) = self.column_map.get(&reference.field().id) {
1264            if self.parquet_schema.get_column_root(*column_idx).is_group() {
1265                return Err(Error::new(
1266                    ErrorKind::DataInvalid,
1267                    format!(
1268                        "Leave column `{}` in predicates isn't a root column in Parquet schema.",
1269                        reference.field().name
1270                    ),
1271                ));
1272            }
1273
1274            // The leaf column's index in the required column indices.
1275            let index = self
1276                .column_indices
1277                .iter()
1278                .position(|&idx| idx == *column_idx)
1279                .ok_or(Error::new(
1280                    ErrorKind::DataInvalid,
1281                    format!(
1282                "Leave column `{}` in predicates cannot be found in the required column indices.",
1283                reference.field().name
1284            ),
1285                ))?;
1286
1287            Ok(Some(index))
1288        } else {
1289            Ok(None)
1290        }
1291    }
1292
1293    /// Build an Arrow predicate that always returns true.
1294    fn build_always_true(&self) -> Result<Box<PredicateResult>> {
1295        Ok(Box::new(|batch| {
1296            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1297        }))
1298    }
1299
1300    /// Build an Arrow predicate that always returns false.
1301    fn build_always_false(&self) -> Result<Box<PredicateResult>> {
1302        Ok(Box::new(|batch| {
1303            Ok(BooleanArray::from(vec![false; batch.num_rows()]))
1304        }))
1305    }
1306}
1307
1308/// Gets the leaf column from the record batch for the required column index. Only
1309/// supports top-level columns for now.
1310fn project_column(
1311    batch: &RecordBatch,
1312    column_idx: usize,
1313) -> std::result::Result<ArrayRef, ArrowError> {
1314    let column = batch.column(column_idx);
1315
1316    match column.data_type() {
1317        DataType::Struct(_) => Err(ArrowError::SchemaError(
1318            "Does not support struct column yet.".to_string(),
1319        )),
1320        _ => Ok(column.clone()),
1321    }
1322}
1323
1324type PredicateResult =
1325    dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
1326
1327impl BoundPredicateVisitor for PredicateConverter<'_> {
1328    type T = Box<PredicateResult>;
1329
1330    fn always_true(&mut self) -> Result<Box<PredicateResult>> {
1331        self.build_always_true()
1332    }
1333
1334    fn always_false(&mut self) -> Result<Box<PredicateResult>> {
1335        self.build_always_false()
1336    }
1337
1338    fn and(
1339        &mut self,
1340        mut lhs: Box<PredicateResult>,
1341        mut rhs: Box<PredicateResult>,
1342    ) -> Result<Box<PredicateResult>> {
1343        Ok(Box::new(move |batch| {
1344            let left = lhs(batch.clone())?;
1345            let right = rhs(batch)?;
1346            and_kleene(&left, &right)
1347        }))
1348    }
1349
1350    fn or(
1351        &mut self,
1352        mut lhs: Box<PredicateResult>,
1353        mut rhs: Box<PredicateResult>,
1354    ) -> Result<Box<PredicateResult>> {
1355        Ok(Box::new(move |batch| {
1356            let left = lhs(batch.clone())?;
1357            let right = rhs(batch)?;
1358            or_kleene(&left, &right)
1359        }))
1360    }
1361
1362    fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
1363        Ok(Box::new(move |batch| {
1364            let pred_ret = inner(batch)?;
1365            not(&pred_ret)
1366        }))
1367    }
1368
1369    fn is_null(
1370        &mut self,
1371        reference: &BoundReference,
1372        _predicate: &BoundPredicate,
1373    ) -> Result<Box<PredicateResult>> {
1374        if let Some(idx) = self.bound_reference(reference)? {
1375            Ok(Box::new(move |batch| {
1376                let column = project_column(&batch, idx)?;
1377                is_null(&column)
1378            }))
1379        } else {
1380            // A missing column, treating it as null.
1381            self.build_always_true()
1382        }
1383    }
1384
1385    fn not_null(
1386        &mut self,
1387        reference: &BoundReference,
1388        _predicate: &BoundPredicate,
1389    ) -> Result<Box<PredicateResult>> {
1390        if let Some(idx) = self.bound_reference(reference)? {
1391            Ok(Box::new(move |batch| {
1392                let column = project_column(&batch, idx)?;
1393                is_not_null(&column)
1394            }))
1395        } else {
1396            // A missing column, treating it as null.
1397            self.build_always_false()
1398        }
1399    }
1400
1401    fn is_nan(
1402        &mut self,
1403        reference: &BoundReference,
1404        _predicate: &BoundPredicate,
1405    ) -> Result<Box<PredicateResult>> {
1406        if self.bound_reference(reference)?.is_some() {
1407            self.build_always_true()
1408        } else {
1409            // A missing column, treating it as null.
1410            self.build_always_false()
1411        }
1412    }
1413
1414    fn not_nan(
1415        &mut self,
1416        reference: &BoundReference,
1417        _predicate: &BoundPredicate,
1418    ) -> Result<Box<PredicateResult>> {
1419        if self.bound_reference(reference)?.is_some() {
1420            self.build_always_false()
1421        } else {
1422            // A missing column, treating it as null.
1423            self.build_always_true()
1424        }
1425    }
1426
1427    fn less_than(
1428        &mut self,
1429        reference: &BoundReference,
1430        literal: &Datum,
1431        _predicate: &BoundPredicate,
1432    ) -> Result<Box<PredicateResult>> {
1433        if let Some(idx) = self.bound_reference(reference)? {
1434            let literal = get_arrow_datum(literal)?;
1435
1436            Ok(Box::new(move |batch| {
1437                let left = project_column(&batch, idx)?;
1438                let literal = try_cast_literal(&literal, left.data_type())?;
1439                lt(&left, literal.as_ref())
1440            }))
1441        } else {
1442            // A missing column, treating it as null.
1443            self.build_always_true()
1444        }
1445    }
1446
1447    fn less_than_or_eq(
1448        &mut self,
1449        reference: &BoundReference,
1450        literal: &Datum,
1451        _predicate: &BoundPredicate,
1452    ) -> Result<Box<PredicateResult>> {
1453        if let Some(idx) = self.bound_reference(reference)? {
1454            let literal = get_arrow_datum(literal)?;
1455
1456            Ok(Box::new(move |batch| {
1457                let left = project_column(&batch, idx)?;
1458                let literal = try_cast_literal(&literal, left.data_type())?;
1459                lt_eq(&left, literal.as_ref())
1460            }))
1461        } else {
1462            // A missing column, treating it as null.
1463            self.build_always_true()
1464        }
1465    }
1466
1467    fn greater_than(
1468        &mut self,
1469        reference: &BoundReference,
1470        literal: &Datum,
1471        _predicate: &BoundPredicate,
1472    ) -> Result<Box<PredicateResult>> {
1473        if let Some(idx) = self.bound_reference(reference)? {
1474            let literal = get_arrow_datum(literal)?;
1475
1476            Ok(Box::new(move |batch| {
1477                let left = project_column(&batch, idx)?;
1478                let literal = try_cast_literal(&literal, left.data_type())?;
1479                gt(&left, literal.as_ref())
1480            }))
1481        } else {
1482            // A missing column, treating it as null.
1483            self.build_always_false()
1484        }
1485    }
1486
1487    fn greater_than_or_eq(
1488        &mut self,
1489        reference: &BoundReference,
1490        literal: &Datum,
1491        _predicate: &BoundPredicate,
1492    ) -> Result<Box<PredicateResult>> {
1493        if let Some(idx) = self.bound_reference(reference)? {
1494            let literal = get_arrow_datum(literal)?;
1495
1496            Ok(Box::new(move |batch| {
1497                let left = project_column(&batch, idx)?;
1498                let literal = try_cast_literal(&literal, left.data_type())?;
1499                gt_eq(&left, literal.as_ref())
1500            }))
1501        } else {
1502            // A missing column, treating it as null.
1503            self.build_always_false()
1504        }
1505    }
1506
1507    fn eq(
1508        &mut self,
1509        reference: &BoundReference,
1510        literal: &Datum,
1511        _predicate: &BoundPredicate,
1512    ) -> Result<Box<PredicateResult>> {
1513        if let Some(idx) = self.bound_reference(reference)? {
1514            let literal = get_arrow_datum(literal)?;
1515
1516            Ok(Box::new(move |batch| {
1517                let left = project_column(&batch, idx)?;
1518                let literal = try_cast_literal(&literal, left.data_type())?;
1519                eq(&left, literal.as_ref())
1520            }))
1521        } else {
1522            // A missing column, treating it as null.
1523            self.build_always_false()
1524        }
1525    }
1526
1527    fn not_eq(
1528        &mut self,
1529        reference: &BoundReference,
1530        literal: &Datum,
1531        _predicate: &BoundPredicate,
1532    ) -> Result<Box<PredicateResult>> {
1533        if let Some(idx) = self.bound_reference(reference)? {
1534            let literal = get_arrow_datum(literal)?;
1535
1536            Ok(Box::new(move |batch| {
1537                let left = project_column(&batch, idx)?;
1538                let literal = try_cast_literal(&literal, left.data_type())?;
1539                neq(&left, literal.as_ref())
1540            }))
1541        } else {
1542            // A missing column, treating it as null.
1543            self.build_always_false()
1544        }
1545    }
1546
1547    fn starts_with(
1548        &mut self,
1549        reference: &BoundReference,
1550        literal: &Datum,
1551        _predicate: &BoundPredicate,
1552    ) -> Result<Box<PredicateResult>> {
1553        if let Some(idx) = self.bound_reference(reference)? {
1554            let literal = get_arrow_datum(literal)?;
1555
1556            Ok(Box::new(move |batch| {
1557                let left = project_column(&batch, idx)?;
1558                let literal = try_cast_literal(&literal, left.data_type())?;
1559                starts_with(&left, literal.as_ref())
1560            }))
1561        } else {
1562            // A missing column, treating it as null.
1563            self.build_always_false()
1564        }
1565    }
1566
1567    fn not_starts_with(
1568        &mut self,
1569        reference: &BoundReference,
1570        literal: &Datum,
1571        _predicate: &BoundPredicate,
1572    ) -> Result<Box<PredicateResult>> {
1573        if let Some(idx) = self.bound_reference(reference)? {
1574            let literal = get_arrow_datum(literal)?;
1575
1576            Ok(Box::new(move |batch| {
1577                let left = project_column(&batch, idx)?;
1578                let literal = try_cast_literal(&literal, left.data_type())?;
1579                // update here if arrow ever adds a native not_starts_with
1580                not(&starts_with(&left, literal.as_ref())?)
1581            }))
1582        } else {
1583            // A missing column, treating it as null.
1584            self.build_always_true()
1585        }
1586    }
1587
1588    fn r#in(
1589        &mut self,
1590        reference: &BoundReference,
1591        literals: &FnvHashSet<Datum>,
1592        _predicate: &BoundPredicate,
1593    ) -> Result<Box<PredicateResult>> {
1594        if let Some(idx) = self.bound_reference(reference)? {
1595            let literals: Vec<_> = literals
1596                .iter()
1597                .map(|lit| get_arrow_datum(lit).unwrap())
1598                .collect();
1599
1600            Ok(Box::new(move |batch| {
1601                // update this if arrow ever adds a native is_in kernel
1602                let left = project_column(&batch, idx)?;
1603
1604                let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
1605                for literal in &literals {
1606                    let literal = try_cast_literal(literal, left.data_type())?;
1607                    acc = or(&acc, &eq(&left, literal.as_ref())?)?
1608                }
1609
1610                Ok(acc)
1611            }))
1612        } else {
1613            // A missing column, treating it as null.
1614            self.build_always_false()
1615        }
1616    }
1617
1618    fn not_in(
1619        &mut self,
1620        reference: &BoundReference,
1621        literals: &FnvHashSet<Datum>,
1622        _predicate: &BoundPredicate,
1623    ) -> Result<Box<PredicateResult>> {
1624        if let Some(idx) = self.bound_reference(reference)? {
1625            let literals: Vec<_> = literals
1626                .iter()
1627                .map(|lit| get_arrow_datum(lit).unwrap())
1628                .collect();
1629
1630            Ok(Box::new(move |batch| {
1631                // update this if arrow ever adds a native not_in kernel
1632                let left = project_column(&batch, idx)?;
1633                let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
1634                for literal in &literals {
1635                    let literal = try_cast_literal(literal, left.data_type())?;
1636                    acc = and(&acc, &neq(&left, literal.as_ref())?)?
1637                }
1638
1639                Ok(acc)
1640            }))
1641        } else {
1642            // A missing column, treating it as null.
1643            self.build_always_true()
1644        }
1645    }
1646}
1647
1648/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
1649pub struct ArrowFileReader<R: FileRead> {
1650    meta: FileMetadata,
1651    preload_column_index: bool,
1652    preload_offset_index: bool,
1653    preload_page_index: bool,
1654    metadata_size_hint: Option<usize>,
1655    r: R,
1656}
1657
1658impl<R: FileRead> ArrowFileReader<R> {
1659    /// Create a new ArrowFileReader
1660    pub fn new(meta: FileMetadata, r: R) -> Self {
1661        Self {
1662            meta,
1663            preload_column_index: false,
1664            preload_offset_index: false,
1665            preload_page_index: false,
1666            metadata_size_hint: None,
1667            r,
1668        }
1669    }
1670
1671    /// Enable or disable preloading of the column index
1672    pub fn with_preload_column_index(mut self, preload: bool) -> Self {
1673        self.preload_column_index = preload;
1674        self
1675    }
1676
1677    /// Enable or disable preloading of the offset index
1678    pub fn with_preload_offset_index(mut self, preload: bool) -> Self {
1679        self.preload_offset_index = preload;
1680        self
1681    }
1682
1683    /// Enable or disable preloading of the page index
1684    pub fn with_preload_page_index(mut self, preload: bool) -> Self {
1685        self.preload_page_index = preload;
1686        self
1687    }
1688
1689    /// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata
1690    ///
1691    /// This hint can help reduce the number of fetch requests. For more details see the
1692    /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint).
1693    pub fn with_metadata_size_hint(mut self, hint: usize) -> Self {
1694        self.metadata_size_hint = Some(hint);
1695        self
1696    }
1697}
1698
1699impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
1700    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
1701        Box::pin(
1702            self.r
1703                .read(range.start..range.end)
1704                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
1705        )
1706    }
1707
1708    // TODO: currently we don't respect `ArrowReaderOptions` cause it don't expose any method to access the option field
1709    // we will fix it after `v55.1.0` is released in https://github.com/apache/arrow-rs/issues/7393
1710    fn get_metadata(
1711        &mut self,
1712        _options: Option<&'_ ArrowReaderOptions>,
1713    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
1714        async move {
1715            let reader = ParquetMetaDataReader::new()
1716                .with_prefetch_hint(self.metadata_size_hint)
1717                // Set the page policy first because it updates both column and offset policies.
1718                .with_page_index_policy(PageIndexPolicy::from(self.preload_page_index))
1719                .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
1720                .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index));
1721            let size = self.meta.size;
1722            let meta = reader.load_and_finish(self, size).await?;
1723
1724            Ok(Arc::new(meta))
1725        }
1726        .boxed()
1727    }
1728}
1729
1730/// The Arrow type of an array that the Parquet reader reads may not match the exact Arrow type
1731/// that Iceberg uses for literals - but they are effectively the same logical type,
1732/// i.e. LargeUtf8 and Utf8 or Utf8View and Utf8 or Utf8View and LargeUtf8.
1733///
1734/// The Arrow compute kernels that we use must match the type exactly, so first cast the literal
1735/// into the type of the batch we read from Parquet before sending it to the compute kernel.
1736fn try_cast_literal(
1737    literal: &Arc<dyn ArrowDatum + Send + Sync>,
1738    column_type: &DataType,
1739) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
1740    let literal_array = literal.get().0;
1741
1742    // No cast required
1743    if literal_array.data_type() == column_type {
1744        return Ok(Arc::clone(literal));
1745    }
1746
1747    let literal_array = cast(literal_array, column_type)?;
1748    Ok(Arc::new(Scalar::new(literal_array)))
1749}
1750
1751#[cfg(test)]
1752mod tests {
1753    use std::collections::{HashMap, HashSet};
1754    use std::fs::File;
1755    use std::sync::Arc;
1756
1757    use arrow_array::cast::AsArray;
1758    use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
1759    use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1760    use futures::TryStreamExt;
1761    use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1762    use parquet::arrow::{ArrowWriter, ProjectionMask};
1763    use parquet::basic::Compression;
1764    use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1765    use parquet::file::properties::WriterProperties;
1766    use parquet::schema::parser::parse_message_type;
1767    use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1768    use roaring::RoaringTreemap;
1769    use tempfile::TempDir;
1770
1771    use crate::ErrorKind;
1772    use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1773    use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1774    use crate::delete_vector::DeleteVector;
1775    use crate::expr::visitors::bound_predicate_visitor::visit;
1776    use crate::expr::{Bind, Predicate, Reference};
1777    use crate::io::FileIO;
1778    use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream};
1779    use crate::spec::{
1780        DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
1781    };
1782
1783    fn table_schema_simple() -> SchemaRef {
1784        Arc::new(
1785            Schema::builder()
1786                .with_schema_id(1)
1787                .with_identifier_field_ids(vec![2])
1788                .with_fields(vec![
1789                    NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1790                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1791                    NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1792                    NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
1793                ])
1794                .build()
1795                .unwrap(),
1796        )
1797    }
1798
1799    #[test]
1800    fn test_collect_field_id() {
1801        let schema = table_schema_simple();
1802        let expr = Reference::new("qux").is_null();
1803        let bound_expr = expr.bind(schema, true).unwrap();
1804
1805        let mut visitor = CollectFieldIdVisitor {
1806            field_ids: HashSet::default(),
1807        };
1808        visit(&mut visitor, &bound_expr).unwrap();
1809
1810        let mut expected = HashSet::default();
1811        expected.insert(4_i32);
1812
1813        assert_eq!(visitor.field_ids, expected);
1814    }
1815
1816    #[test]
1817    fn test_collect_field_id_with_and() {
1818        let schema = table_schema_simple();
1819        let expr = Reference::new("qux")
1820            .is_null()
1821            .and(Reference::new("baz").is_null());
1822        let bound_expr = expr.bind(schema, true).unwrap();
1823
1824        let mut visitor = CollectFieldIdVisitor {
1825            field_ids: HashSet::default(),
1826        };
1827        visit(&mut visitor, &bound_expr).unwrap();
1828
1829        let mut expected = HashSet::default();
1830        expected.insert(4_i32);
1831        expected.insert(3);
1832
1833        assert_eq!(visitor.field_ids, expected);
1834    }
1835
1836    #[test]
1837    fn test_collect_field_id_with_or() {
1838        let schema = table_schema_simple();
1839        let expr = Reference::new("qux")
1840            .is_null()
1841            .or(Reference::new("baz").is_null());
1842        let bound_expr = expr.bind(schema, true).unwrap();
1843
1844        let mut visitor = CollectFieldIdVisitor {
1845            field_ids: HashSet::default(),
1846        };
1847        visit(&mut visitor, &bound_expr).unwrap();
1848
1849        let mut expected = HashSet::default();
1850        expected.insert(4_i32);
1851        expected.insert(3);
1852
1853        assert_eq!(visitor.field_ids, expected);
1854    }
1855
1856    #[test]
1857    fn test_arrow_projection_mask() {
1858        let schema = Arc::new(
1859            Schema::builder()
1860                .with_schema_id(1)
1861                .with_identifier_field_ids(vec![1])
1862                .with_fields(vec![
1863                    NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
1864                    NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
1865                    NestedField::optional(
1866                        3,
1867                        "c3",
1868                        Type::Primitive(PrimitiveType::Decimal {
1869                            precision: 38,
1870                            scale: 3,
1871                        }),
1872                    )
1873                    .into(),
1874                ])
1875                .build()
1876                .unwrap(),
1877        );
1878        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1879            Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
1880                PARQUET_FIELD_ID_META_KEY.to_string(),
1881                "1".to_string(),
1882            )])),
1883            // Type not supported
1884            Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
1885                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1886            ),
1887            // Precision is beyond the supported range
1888            Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
1889                PARQUET_FIELD_ID_META_KEY.to_string(),
1890                "3".to_string(),
1891            )])),
1892        ]));
1893
1894        let message_type = "
1895message schema {
1896  required binary c1 (STRING) = 1;
1897  optional int32 c2 (INTEGER(8,true)) = 2;
1898  optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
1899}
1900    ";
1901        let parquet_type = parse_message_type(message_type).expect("should parse schema");
1902        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
1903
1904        // Try projecting the fields c2 and c3 with the unsupported data types
1905        let err = ArrowReader::get_arrow_projection_mask(
1906            &[1, 2, 3],
1907            &schema,
1908            &parquet_schema,
1909            &arrow_schema,
1910            false,
1911        )
1912        .unwrap_err();
1913
1914        assert_eq!(err.kind(), ErrorKind::DataInvalid);
1915        assert_eq!(
1916            err.to_string(),
1917            "DataInvalid => Unsupported Arrow data type: Duration(µs)".to_string()
1918        );
1919
1920        // Omitting field c2, we still get an error due to c3 being selected
1921        let err = ArrowReader::get_arrow_projection_mask(
1922            &[1, 3],
1923            &schema,
1924            &parquet_schema,
1925            &arrow_schema,
1926            false,
1927        )
1928        .unwrap_err();
1929
1930        assert_eq!(err.kind(), ErrorKind::DataInvalid);
1931        assert_eq!(
1932            err.to_string(),
1933            "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
1934        );
1935
1936        // Finally avoid selecting fields with unsupported data types
1937        let mask = ArrowReader::get_arrow_projection_mask(
1938            &[1],
1939            &schema,
1940            &parquet_schema,
1941            &arrow_schema,
1942            false,
1943        )
1944        .expect("Some ProjectionMask");
1945        assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
1946    }
1947
1948    #[tokio::test]
1949    async fn test_kleene_logic_or_behaviour() {
1950        // a IS NULL OR a = 'foo'
1951        let predicate = Reference::new("a")
1952            .is_null()
1953            .or(Reference::new("a").equal_to(Datum::string("foo")));
1954
1955        // Table data: [NULL, "foo", "bar"]
1956        let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1957
1958        // Expected: [NULL, "foo"].
1959        let expected = vec![None, Some("foo".to_string())];
1960
1961        let (file_io, schema, table_location, _temp_dir) =
1962            setup_kleene_logic(data_for_col_a, DataType::Utf8);
1963        let reader = ArrowReaderBuilder::new(file_io).build();
1964
1965        let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1966
1967        assert_eq!(result_data, expected);
1968    }
1969
1970    #[tokio::test]
1971    async fn test_kleene_logic_and_behaviour() {
1972        // a IS NOT NULL AND a != 'foo'
1973        let predicate = Reference::new("a")
1974            .is_not_null()
1975            .and(Reference::new("a").not_equal_to(Datum::string("foo")));
1976
1977        // Table data: [NULL, "foo", "bar"]
1978        let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1979
1980        // Expected: ["bar"].
1981        let expected = vec![Some("bar".to_string())];
1982
1983        let (file_io, schema, table_location, _temp_dir) =
1984            setup_kleene_logic(data_for_col_a, DataType::Utf8);
1985        let reader = ArrowReaderBuilder::new(file_io).build();
1986
1987        let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1988
1989        assert_eq!(result_data, expected);
1990    }
1991
1992    #[tokio::test]
1993    async fn test_predicate_cast_literal() {
1994        let predicates = vec![
1995            // a == 'foo'
1996            (Reference::new("a").equal_to(Datum::string("foo")), vec![
1997                Some("foo".to_string()),
1998            ]),
1999            // a != 'foo'
2000            (
2001                Reference::new("a").not_equal_to(Datum::string("foo")),
2002                vec![Some("bar".to_string())],
2003            ),
2004            // STARTS_WITH(a, 'foo')
2005            (Reference::new("a").starts_with(Datum::string("f")), vec![
2006                Some("foo".to_string()),
2007            ]),
2008            // NOT STARTS_WITH(a, 'foo')
2009            (
2010                Reference::new("a").not_starts_with(Datum::string("f")),
2011                vec![Some("bar".to_string())],
2012            ),
2013            // a < 'foo'
2014            (Reference::new("a").less_than(Datum::string("foo")), vec![
2015                Some("bar".to_string()),
2016            ]),
2017            // a <= 'foo'
2018            (
2019                Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
2020                vec![Some("foo".to_string()), Some("bar".to_string())],
2021            ),
2022            // a > 'foo'
2023            (
2024                Reference::new("a").greater_than(Datum::string("bar")),
2025                vec![Some("foo".to_string())],
2026            ),
2027            // a >= 'foo'
2028            (
2029                Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
2030                vec![Some("foo".to_string())],
2031            ),
2032            // a IN ('foo', 'bar')
2033            (
2034                Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
2035                vec![Some("foo".to_string())],
2036            ),
2037            // a NOT IN ('foo', 'bar')
2038            (
2039                Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
2040                vec![Some("bar".to_string())],
2041            ),
2042        ];
2043
2044        // Table data: ["foo", "bar"]
2045        let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
2046
2047        let (file_io, schema, table_location, _temp_dir) =
2048            setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
2049        let reader = ArrowReaderBuilder::new(file_io).build();
2050
2051        for (predicate, expected) in predicates {
2052            println!("testing predicate {predicate}");
2053            let result_data = test_perform_read(
2054                predicate.clone(),
2055                schema.clone(),
2056                table_location.clone(),
2057                reader.clone(),
2058            )
2059            .await;
2060
2061            assert_eq!(result_data, expected, "predicate={predicate}");
2062        }
2063    }
2064
2065    async fn test_perform_read(
2066        predicate: Predicate,
2067        schema: SchemaRef,
2068        table_location: String,
2069        reader: ArrowReader,
2070    ) -> Vec<Option<String>> {
2071        let tasks = Box::pin(futures::stream::iter(
2072            vec![Ok(FileScanTask {
2073                start: 0,
2074                length: 0,
2075                record_count: None,
2076                data_file_path: format!("{table_location}/1.parquet"),
2077                data_file_format: DataFileFormat::Parquet,
2078                schema: schema.clone(),
2079                project_field_ids: vec![1],
2080                predicate: Some(predicate.bind(schema, true).unwrap()),
2081                deletes: vec![],
2082                partition: None,
2083                partition_spec: None,
2084                name_mapping: None,
2085            })]
2086            .into_iter(),
2087        )) as FileScanTaskStream;
2088
2089        let result = reader
2090            .read(tasks)
2091            .unwrap()
2092            .try_collect::<Vec<RecordBatch>>()
2093            .await
2094            .unwrap();
2095
2096        result[0].columns()[0]
2097            .as_string_opt::<i32>()
2098            .unwrap()
2099            .iter()
2100            .map(|v| v.map(ToOwned::to_owned))
2101            .collect::<Vec<_>>()
2102    }
2103
2104    fn setup_kleene_logic(
2105        data_for_col_a: Vec<Option<String>>,
2106        col_a_type: DataType,
2107    ) -> (FileIO, SchemaRef, String, TempDir) {
2108        let schema = Arc::new(
2109            Schema::builder()
2110                .with_schema_id(1)
2111                .with_fields(vec![
2112                    NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
2113                ])
2114                .build()
2115                .unwrap(),
2116        );
2117
2118        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2119            Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
2120                PARQUET_FIELD_ID_META_KEY.to_string(),
2121                "1".to_string(),
2122            )])),
2123        ]));
2124
2125        let tmp_dir = TempDir::new().unwrap();
2126        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2127
2128        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2129
2130        let col = match col_a_type {
2131            DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
2132            DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
2133            _ => panic!("unexpected col_a_type"),
2134        };
2135
2136        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
2137
2138        // Write the Parquet files
2139        let props = WriterProperties::builder()
2140            .set_compression(Compression::SNAPPY)
2141            .build();
2142
2143        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
2144        let mut writer =
2145            ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
2146
2147        writer.write(&to_write).expect("Writing batch");
2148
2149        // writer must be closed to write footer
2150        writer.close().unwrap();
2151
2152        (file_io, schema, table_location, tmp_dir)
2153    }
2154
2155    #[test]
2156    fn test_build_deletes_row_selection() {
2157        let schema_descr = get_test_schema_descr();
2158
2159        let mut columns = vec![];
2160        for ptr in schema_descr.columns() {
2161            let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
2162            columns.push(column);
2163        }
2164
2165        let row_groups_metadata = vec![
2166            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
2167            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
2168            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
2169            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
2170            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
2171        ];
2172
2173        let selected_row_groups = Some(vec![1, 3]);
2174
2175        /* cases to cover:
2176           * {skip|select} {first|intermediate|last} {one row|multiple rows} in
2177             {first|intermediate|last} {skipped|selected} row group
2178           * row group selection disabled
2179        */
2180
2181        let positional_deletes = RoaringTreemap::from_iter(&[
2182            1, // in skipped rg 0, should be ignored
2183            3, // run of three consecutive items in skipped rg0
2184            4, 5, 998, // two consecutive items at end of skipped rg0
2185            999, 1000, // solitary row at start of selected rg1 (1, 9)
2186            1010, // run of 3 rows in selected rg1
2187            1011, 1012, // (3, 485)
2188            1498, // run of two items at end of selected rg1
2189            1499, 1500, // run of two items at start of skipped rg2
2190            1501, 1600, // should ignore, in skipped rg2
2191            1999, // single row at end of skipped rg2
2192            2000, // run of two items at start of selected rg3
2193            2001, // (4, 98)
2194            2100, // single row in selected row group 3 (1, 99)
2195            2200, // run of 3 consecutive rows in selected row group 3
2196            2201, 2202, // (3, 796)
2197            2999, // single item at end of selected rg3 (1)
2198            3000, // single item at start of skipped rg4
2199        ]);
2200
2201        let positional_deletes = DeleteVector::new(positional_deletes);
2202
2203        // using selected row groups 1 and 3
2204        let result = ArrowReader::build_deletes_row_selection(
2205            &row_groups_metadata,
2206            &selected_row_groups,
2207            &positional_deletes,
2208        )
2209        .unwrap();
2210
2211        let expected = RowSelection::from(vec![
2212            RowSelector::skip(1),
2213            RowSelector::select(9),
2214            RowSelector::skip(3),
2215            RowSelector::select(485),
2216            RowSelector::skip(4),
2217            RowSelector::select(98),
2218            RowSelector::skip(1),
2219            RowSelector::select(99),
2220            RowSelector::skip(3),
2221            RowSelector::select(796),
2222            RowSelector::skip(1),
2223        ]);
2224
2225        assert_eq!(result, expected);
2226
2227        // selecting all row groups
2228        let result = ArrowReader::build_deletes_row_selection(
2229            &row_groups_metadata,
2230            &None,
2231            &positional_deletes,
2232        )
2233        .unwrap();
2234
2235        let expected = RowSelection::from(vec![
2236            RowSelector::select(1),
2237            RowSelector::skip(1),
2238            RowSelector::select(1),
2239            RowSelector::skip(3),
2240            RowSelector::select(992),
2241            RowSelector::skip(3),
2242            RowSelector::select(9),
2243            RowSelector::skip(3),
2244            RowSelector::select(485),
2245            RowSelector::skip(4),
2246            RowSelector::select(98),
2247            RowSelector::skip(1),
2248            RowSelector::select(398),
2249            RowSelector::skip(3),
2250            RowSelector::select(98),
2251            RowSelector::skip(1),
2252            RowSelector::select(99),
2253            RowSelector::skip(3),
2254            RowSelector::select(796),
2255            RowSelector::skip(2),
2256            RowSelector::select(499),
2257        ]);
2258
2259        assert_eq!(result, expected);
2260    }
2261
2262    fn build_test_row_group_meta(
2263        schema_descr: SchemaDescPtr,
2264        columns: Vec<ColumnChunkMetaData>,
2265        num_rows: i64,
2266        ordinal: i16,
2267    ) -> RowGroupMetaData {
2268        RowGroupMetaData::builder(schema_descr.clone())
2269            .set_num_rows(num_rows)
2270            .set_total_byte_size(2000)
2271            .set_column_metadata(columns)
2272            .set_ordinal(ordinal)
2273            .build()
2274            .unwrap()
2275    }
2276
2277    fn get_test_schema_descr() -> SchemaDescPtr {
2278        use parquet::schema::types::Type as SchemaType;
2279
2280        let schema = SchemaType::group_type_builder("schema")
2281            .with_fields(vec![
2282                Arc::new(
2283                    SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
2284                        .build()
2285                        .unwrap(),
2286                ),
2287                Arc::new(
2288                    SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
2289                        .build()
2290                        .unwrap(),
2291                ),
2292            ])
2293            .build()
2294            .unwrap();
2295
2296        Arc::new(SchemaDescriptor::new(Arc::new(schema)))
2297    }
2298
2299    /// Verifies that file splits respect byte ranges and only read specific row groups.
2300    #[tokio::test]
2301    async fn test_file_splits_respect_byte_ranges() {
2302        use arrow_array::Int32Array;
2303        use parquet::file::reader::{FileReader, SerializedFileReader};
2304
2305        let schema = Arc::new(
2306            Schema::builder()
2307                .with_schema_id(1)
2308                .with_fields(vec![
2309                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2310                ])
2311                .build()
2312                .unwrap(),
2313        );
2314
2315        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2316            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2317                PARQUET_FIELD_ID_META_KEY.to_string(),
2318                "1".to_string(),
2319            )])),
2320        ]));
2321
2322        let tmp_dir = TempDir::new().unwrap();
2323        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2324        let file_path = format!("{table_location}/multi_row_group.parquet");
2325
2326        // Force each batch into its own row group for testing byte range filtering.
2327        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2328            (0..100).collect::<Vec<i32>>(),
2329        ))])
2330        .unwrap();
2331        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2332            (100..200).collect::<Vec<i32>>(),
2333        ))])
2334        .unwrap();
2335        let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2336            (200..300).collect::<Vec<i32>>(),
2337        ))])
2338        .unwrap();
2339
2340        let props = WriterProperties::builder()
2341            .set_compression(Compression::SNAPPY)
2342            .set_max_row_group_size(100)
2343            .build();
2344
2345        let file = File::create(&file_path).unwrap();
2346        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2347        writer.write(&batch1).expect("Writing batch 1");
2348        writer.write(&batch2).expect("Writing batch 2");
2349        writer.write(&batch3).expect("Writing batch 3");
2350        writer.close().unwrap();
2351
2352        // Read the file metadata to get row group byte positions
2353        let file = File::open(&file_path).unwrap();
2354        let reader = SerializedFileReader::new(file).unwrap();
2355        let metadata = reader.metadata();
2356
2357        println!("File has {} row groups", metadata.num_row_groups());
2358        assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
2359
2360        // Get byte positions for each row group
2361        let row_group_0 = metadata.row_group(0);
2362        let row_group_1 = metadata.row_group(1);
2363        let row_group_2 = metadata.row_group(2);
2364
2365        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
2366        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2367        let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
2368        let file_end = rg2_start + row_group_2.compressed_size() as u64;
2369
2370        println!(
2371            "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
2372            row_group_0.num_rows(),
2373            rg0_start,
2374            row_group_0.compressed_size()
2375        );
2376        println!(
2377            "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
2378            row_group_1.num_rows(),
2379            rg1_start,
2380            row_group_1.compressed_size()
2381        );
2382        println!(
2383            "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
2384            row_group_2.num_rows(),
2385            rg2_start,
2386            row_group_2.compressed_size()
2387        );
2388
2389        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2390        let reader = ArrowReaderBuilder::new(file_io).build();
2391
2392        // Task 1: read only the first row group
2393        let task1 = FileScanTask {
2394            start: rg0_start,
2395            length: row_group_0.compressed_size() as u64,
2396            record_count: Some(100),
2397            data_file_path: file_path.clone(),
2398            data_file_format: DataFileFormat::Parquet,
2399            schema: schema.clone(),
2400            project_field_ids: vec![1],
2401            predicate: None,
2402            deletes: vec![],
2403            partition: None,
2404            partition_spec: None,
2405            name_mapping: None,
2406        };
2407
2408        // Task 2: read the second and third row groups
2409        let task2 = FileScanTask {
2410            start: rg1_start,
2411            length: file_end - rg1_start,
2412            record_count: Some(200),
2413            data_file_path: file_path.clone(),
2414            data_file_format: DataFileFormat::Parquet,
2415            schema: schema.clone(),
2416            project_field_ids: vec![1],
2417            predicate: None,
2418            deletes: vec![],
2419            partition: None,
2420            partition_spec: None,
2421            name_mapping: None,
2422        };
2423
2424        let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
2425        let result1 = reader
2426            .clone()
2427            .read(tasks1)
2428            .unwrap()
2429            .try_collect::<Vec<RecordBatch>>()
2430            .await
2431            .unwrap();
2432
2433        let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
2434        println!(
2435            "Task 1 (bytes {}-{}) returned {} rows",
2436            rg0_start,
2437            rg0_start + row_group_0.compressed_size() as u64,
2438            total_rows_task1
2439        );
2440
2441        let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
2442        let result2 = reader
2443            .read(tasks2)
2444            .unwrap()
2445            .try_collect::<Vec<RecordBatch>>()
2446            .await
2447            .unwrap();
2448
2449        let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
2450        println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
2451
2452        assert_eq!(
2453            total_rows_task1, 100,
2454            "Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
2455        );
2456
2457        assert_eq!(
2458            total_rows_task2, 200,
2459            "Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
2460        );
2461
2462        // Verify the actual data values are correct (not just the row count)
2463        if total_rows_task1 > 0 {
2464            let first_batch = &result1[0];
2465            let id_col = first_batch
2466                .column(0)
2467                .as_primitive::<arrow_array::types::Int32Type>();
2468            let first_val = id_col.value(0);
2469            let last_val = id_col.value(id_col.len() - 1);
2470            println!("Task 1 data range: {first_val} to {last_val}");
2471
2472            assert_eq!(first_val, 0, "Task 1 should start with id=0");
2473            assert_eq!(last_val, 99, "Task 1 should end with id=99");
2474        }
2475
2476        if total_rows_task2 > 0 {
2477            let first_batch = &result2[0];
2478            let id_col = first_batch
2479                .column(0)
2480                .as_primitive::<arrow_array::types::Int32Type>();
2481            let first_val = id_col.value(0);
2482            println!("Task 2 first value: {first_val}");
2483
2484            assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
2485        }
2486    }
2487
2488    /// Test schema evolution: reading old Parquet file (with only column 'a')
2489    /// using a newer table schema (with columns 'a' and 'b').
2490    /// This tests that:
2491    /// 1. get_arrow_projection_mask allows missing columns
2492    /// 2. RecordBatchTransformer adds missing column 'b' with NULL values
2493    #[tokio::test]
2494    async fn test_schema_evolution_add_column() {
2495        use arrow_array::{Array, Int32Array};
2496
2497        // New table schema: columns 'a' and 'b' (b was added later, file only has 'a')
2498        let new_schema = Arc::new(
2499            Schema::builder()
2500                .with_schema_id(2)
2501                .with_fields(vec![
2502                    NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
2503                    NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
2504                ])
2505                .build()
2506                .unwrap(),
2507        );
2508
2509        // Create Arrow schema for old Parquet file (only has column 'a')
2510        let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
2511            Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
2512                PARQUET_FIELD_ID_META_KEY.to_string(),
2513                "1".to_string(),
2514            )])),
2515        ]));
2516
2517        // Write old Parquet file with only column 'a'
2518        let tmp_dir = TempDir::new().unwrap();
2519        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2520        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2521
2522        let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
2523        let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();
2524
2525        let props = WriterProperties::builder()
2526            .set_compression(Compression::SNAPPY)
2527            .build();
2528        let file = File::create(format!("{table_location}/old_file.parquet")).unwrap();
2529        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
2530        writer.write(&to_write).expect("Writing batch");
2531        writer.close().unwrap();
2532
2533        // Read the old Parquet file using the NEW schema (with column 'b')
2534        let reader = ArrowReaderBuilder::new(file_io).build();
2535        let tasks = Box::pin(futures::stream::iter(
2536            vec![Ok(FileScanTask {
2537                start: 0,
2538                length: 0,
2539                record_count: None,
2540                data_file_path: format!("{table_location}/old_file.parquet"),
2541                data_file_format: DataFileFormat::Parquet,
2542                schema: new_schema.clone(),
2543                project_field_ids: vec![1, 2], // Request both columns 'a' and 'b'
2544                predicate: None,
2545                deletes: vec![],
2546                partition: None,
2547                partition_spec: None,
2548                name_mapping: None,
2549            })]
2550            .into_iter(),
2551        )) as FileScanTaskStream;
2552
2553        let result = reader
2554            .read(tasks)
2555            .unwrap()
2556            .try_collect::<Vec<RecordBatch>>()
2557            .await
2558            .unwrap();
2559
2560        // Verify we got the correct data
2561        assert_eq!(result.len(), 1);
2562        let batch = &result[0];
2563
2564        // Should have 2 columns now
2565        assert_eq!(batch.num_columns(), 2);
2566        assert_eq!(batch.num_rows(), 3);
2567
2568        // Column 'a' should have the original data
2569        let col_a = batch
2570            .column(0)
2571            .as_primitive::<arrow_array::types::Int32Type>();
2572        assert_eq!(col_a.values(), &[1, 2, 3]);
2573
2574        // Column 'b' should be all NULLs (it didn't exist in the old file)
2575        let col_b = batch
2576            .column(1)
2577            .as_primitive::<arrow_array::types::Int32Type>();
2578        assert_eq!(col_b.null_count(), 3);
2579        assert!(col_b.is_null(0));
2580        assert!(col_b.is_null(1));
2581        assert!(col_b.is_null(2));
2582    }
2583
2584    /// Test for bug where position deletes in later row groups are not applied correctly.
2585    ///
2586    /// When a file has multiple row groups and a position delete targets a row in a later
2587    /// row group, the `build_deletes_row_selection` function had a bug where it would
2588    /// fail to increment `current_row_group_base_idx` when skipping row groups.
2589    ///
2590    /// This test creates:
2591    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
2592    /// - A position delete file that deletes row 199 (last row in second row group)
2593    ///
2594    /// Expected behavior: Should return 199 rows (with id=200 deleted)
2595    /// Bug behavior: Returns 200 rows (delete is not applied)
2596    ///
2597    /// This bug was discovered while running Apache Spark + Apache Iceberg integration tests
2598    /// through DataFusion Comet. The following Iceberg Java tests failed due to this bug:
2599    /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadDelete::testDeleteWithMultipleRowGroupsParquet`
2600    /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate::testUpdateWithMultipleRowGroupsParquet`
2601    #[tokio::test]
2602    async fn test_position_delete_across_multiple_row_groups() {
2603        use arrow_array::{Int32Array, Int64Array};
2604        use parquet::file::reader::{FileReader, SerializedFileReader};
2605
2606        // Field IDs for positional delete schema
2607        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2608        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2609
2610        let tmp_dir = TempDir::new().unwrap();
2611        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2612
2613        // Create table schema with a single 'id' column
2614        let table_schema = Arc::new(
2615            Schema::builder()
2616                .with_schema_id(1)
2617                .with_fields(vec![
2618                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2619                ])
2620                .build()
2621                .unwrap(),
2622        );
2623
2624        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2625            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2626                PARQUET_FIELD_ID_META_KEY.to_string(),
2627                "1".to_string(),
2628            )])),
2629        ]));
2630
2631        // Step 1: Create data file with 200 rows in 2 row groups
2632        // Row group 0: rows 0-99 (ids 1-100)
2633        // Row group 1: rows 100-199 (ids 101-200)
2634        let data_file_path = format!("{table_location}/data.parquet");
2635
2636        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2637            Int32Array::from_iter_values(1..=100),
2638        )])
2639        .unwrap();
2640
2641        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2642            Int32Array::from_iter_values(101..=200),
2643        )])
2644        .unwrap();
2645
2646        // Force each batch into its own row group
2647        let props = WriterProperties::builder()
2648            .set_compression(Compression::SNAPPY)
2649            .set_max_row_group_size(100)
2650            .build();
2651
2652        let file = File::create(&data_file_path).unwrap();
2653        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2654        writer.write(&batch1).expect("Writing batch 1");
2655        writer.write(&batch2).expect("Writing batch 2");
2656        writer.close().unwrap();
2657
2658        // Verify we created 2 row groups
2659        let verify_file = File::open(&data_file_path).unwrap();
2660        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2661        assert_eq!(
2662            verify_reader.metadata().num_row_groups(),
2663            2,
2664            "Should have 2 row groups"
2665        );
2666
2667        // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1)
2668        let delete_file_path = format!("{table_location}/deletes.parquet");
2669
2670        let delete_schema = Arc::new(ArrowSchema::new(vec![
2671            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2672                PARQUET_FIELD_ID_META_KEY.to_string(),
2673                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2674            )])),
2675            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2676                PARQUET_FIELD_ID_META_KEY.to_string(),
2677                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2678            )])),
2679        ]));
2680
2681        // Delete row at position 199 (0-indexed, so it's the last row: id=200)
2682        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2683            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2684            Arc::new(Int64Array::from_iter_values(vec![199i64])),
2685        ])
2686        .unwrap();
2687
2688        let delete_props = WriterProperties::builder()
2689            .set_compression(Compression::SNAPPY)
2690            .build();
2691
2692        let delete_file = File::create(&delete_file_path).unwrap();
2693        let mut delete_writer =
2694            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2695        delete_writer.write(&delete_batch).unwrap();
2696        delete_writer.close().unwrap();
2697
2698        // Step 3: Read the data file with the delete applied
2699        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2700        let reader = ArrowReaderBuilder::new(file_io).build();
2701
2702        let task = FileScanTask {
2703            start: 0,
2704            length: 0,
2705            record_count: Some(200),
2706            data_file_path: data_file_path.clone(),
2707            data_file_format: DataFileFormat::Parquet,
2708            schema: table_schema.clone(),
2709            project_field_ids: vec![1],
2710            predicate: None,
2711            deletes: vec![FileScanTaskDeleteFile {
2712                file_path: delete_file_path,
2713                file_type: DataContentType::PositionDeletes,
2714                partition_spec_id: 0,
2715                equality_ids: None,
2716            }],
2717            partition: None,
2718            partition_spec: None,
2719            name_mapping: None,
2720        };
2721
2722        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2723        let result = reader
2724            .read(tasks)
2725            .unwrap()
2726            .try_collect::<Vec<RecordBatch>>()
2727            .await
2728            .unwrap();
2729
2730        // Step 4: Verify we got 199 rows (not 200)
2731        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2732
2733        println!("Total rows read: {total_rows}");
2734        println!("Expected: 199 rows (deleted row 199 which had id=200)");
2735
2736        // This assertion will FAIL before the fix and PASS after the fix
2737        assert_eq!(
2738            total_rows, 199,
2739            "Expected 199 rows after deleting row 199, but got {total_rows} rows. \
2740             The bug causes position deletes in later row groups to be ignored."
2741        );
2742
2743        // Verify the deleted row (id=200) is not present
2744        let all_ids: Vec<i32> = result
2745            .iter()
2746            .flat_map(|batch| {
2747                batch
2748                    .column(0)
2749                    .as_primitive::<arrow_array::types::Int32Type>()
2750                    .values()
2751                    .iter()
2752                    .copied()
2753            })
2754            .collect();
2755
2756        assert!(
2757            !all_ids.contains(&200),
2758            "Row with id=200 should be deleted but was found in results"
2759        );
2760
2761        // Verify we have all other ids (1-199)
2762        let expected_ids: Vec<i32> = (1..=199).collect();
2763        assert_eq!(
2764            all_ids, expected_ids,
2765            "Should have ids 1-199 but got different values"
2766        );
2767    }
2768
2769    /// Test for bug where position deletes are lost when skipping unselected row groups.
2770    ///
2771    /// This is a variant of `test_position_delete_across_multiple_row_groups` that exercises
2772    /// the row group selection code path (`selected_row_groups: Some([...])`).
2773    ///
2774    /// When a file has multiple row groups and only some are selected for reading,
2775    /// the `build_deletes_row_selection` function must correctly skip over deletes in
2776    /// unselected row groups WITHOUT consuming deletes that belong to selected row groups.
2777    ///
2778    /// This test creates:
2779    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
2780    /// - A position delete file that deletes row 199 (last row in second row group)
2781    /// - Row group selection that reads ONLY row group 1 (rows 100-199)
2782    ///
2783    /// Expected behavior: Should return 99 rows (with row 199 deleted)
2784    /// Bug behavior: Returns 100 rows (delete is lost when skipping row group 0)
2785    ///
2786    /// The bug occurs when processing row group 0 (unselected):
2787    /// ```rust
2788    /// delete_vector_iter.advance_to(next_row_group_base_idx); // Position at first delete >= 100
2789    /// next_deleted_row_idx_opt = delete_vector_iter.next(); // BUG: Consumes delete at 199!
2790    /// ```
2791    ///
2792    /// The fix is to NOT call `next()` after `advance_to()` when skipping unselected row groups,
2793    /// because `advance_to()` already positions the iterator correctly without consuming elements.
2794    #[tokio::test]
2795    async fn test_position_delete_with_row_group_selection() {
2796        use arrow_array::{Int32Array, Int64Array};
2797        use parquet::file::reader::{FileReader, SerializedFileReader};
2798
2799        // Field IDs for positional delete schema
2800        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2801        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2802
2803        let tmp_dir = TempDir::new().unwrap();
2804        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2805
2806        // Create table schema with a single 'id' column
2807        let table_schema = Arc::new(
2808            Schema::builder()
2809                .with_schema_id(1)
2810                .with_fields(vec![
2811                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2812                ])
2813                .build()
2814                .unwrap(),
2815        );
2816
2817        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2818            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2819                PARQUET_FIELD_ID_META_KEY.to_string(),
2820                "1".to_string(),
2821            )])),
2822        ]));
2823
2824        // Step 1: Create data file with 200 rows in 2 row groups
2825        // Row group 0: rows 0-99 (ids 1-100)
2826        // Row group 1: rows 100-199 (ids 101-200)
2827        let data_file_path = format!("{table_location}/data.parquet");
2828
2829        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2830            Int32Array::from_iter_values(1..=100),
2831        )])
2832        .unwrap();
2833
2834        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2835            Int32Array::from_iter_values(101..=200),
2836        )])
2837        .unwrap();
2838
2839        // Force each batch into its own row group
2840        let props = WriterProperties::builder()
2841            .set_compression(Compression::SNAPPY)
2842            .set_max_row_group_size(100)
2843            .build();
2844
2845        let file = File::create(&data_file_path).unwrap();
2846        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2847        writer.write(&batch1).expect("Writing batch 1");
2848        writer.write(&batch2).expect("Writing batch 2");
2849        writer.close().unwrap();
2850
2851        // Verify we created 2 row groups
2852        let verify_file = File::open(&data_file_path).unwrap();
2853        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2854        assert_eq!(
2855            verify_reader.metadata().num_row_groups(),
2856            2,
2857            "Should have 2 row groups"
2858        );
2859
2860        // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1)
2861        let delete_file_path = format!("{table_location}/deletes.parquet");
2862
2863        let delete_schema = Arc::new(ArrowSchema::new(vec![
2864            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2865                PARQUET_FIELD_ID_META_KEY.to_string(),
2866                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2867            )])),
2868            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2869                PARQUET_FIELD_ID_META_KEY.to_string(),
2870                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2871            )])),
2872        ]));
2873
2874        // Delete row at position 199 (0-indexed, so it's the last row: id=200)
2875        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2876            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2877            Arc::new(Int64Array::from_iter_values(vec![199i64])),
2878        ])
2879        .unwrap();
2880
2881        let delete_props = WriterProperties::builder()
2882            .set_compression(Compression::SNAPPY)
2883            .build();
2884
2885        let delete_file = File::create(&delete_file_path).unwrap();
2886        let mut delete_writer =
2887            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2888        delete_writer.write(&delete_batch).unwrap();
2889        delete_writer.close().unwrap();
2890
2891        // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
2892        // This exercises the row group selection code path where row group 0 is skipped
2893        let metadata_file = File::open(&data_file_path).unwrap();
2894        let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
2895        let metadata = metadata_reader.metadata();
2896
2897        let row_group_0 = metadata.row_group(0);
2898        let row_group_1 = metadata.row_group(1);
2899
2900        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
2901        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2902        let rg1_length = row_group_1.compressed_size() as u64;
2903
2904        println!(
2905            "Row group 0: starts at byte {}, {} bytes compressed",
2906            rg0_start,
2907            row_group_0.compressed_size()
2908        );
2909        println!(
2910            "Row group 1: starts at byte {}, {} bytes compressed",
2911            rg1_start,
2912            row_group_1.compressed_size()
2913        );
2914
2915        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2916        let reader = ArrowReaderBuilder::new(file_io).build();
2917
2918        // Create FileScanTask that reads ONLY row group 1 via byte range filtering
2919        let task = FileScanTask {
2920            start: rg1_start,
2921            length: rg1_length,
2922            record_count: Some(100), // Row group 1 has 100 rows
2923            data_file_path: data_file_path.clone(),
2924            data_file_format: DataFileFormat::Parquet,
2925            schema: table_schema.clone(),
2926            project_field_ids: vec![1],
2927            predicate: None,
2928            deletes: vec![FileScanTaskDeleteFile {
2929                file_path: delete_file_path,
2930                file_type: DataContentType::PositionDeletes,
2931                partition_spec_id: 0,
2932                equality_ids: None,
2933            }],
2934            partition: None,
2935            partition_spec: None,
2936            name_mapping: None,
2937        };
2938
2939        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2940        let result = reader
2941            .read(tasks)
2942            .unwrap()
2943            .try_collect::<Vec<RecordBatch>>()
2944            .await
2945            .unwrap();
2946
2947        // Step 4: Verify we got 99 rows (not 100)
2948        // Row group 1 has 100 rows (ids 101-200), minus 1 delete (id=200) = 99 rows
2949        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2950
2951        println!("Total rows read from row group 1: {total_rows}");
2952        println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)");
2953
2954        // This assertion will FAIL before the fix and PASS after the fix
2955        assert_eq!(
2956            total_rows, 99,
2957            "Expected 99 rows from row group 1 after deleting position 199, but got {total_rows} rows. \
2958             The bug causes position deletes to be lost when advance_to() is followed by next() \
2959             when skipping unselected row groups."
2960        );
2961
2962        // Verify the deleted row (id=200) is not present
2963        let all_ids: Vec<i32> = result
2964            .iter()
2965            .flat_map(|batch| {
2966                batch
2967                    .column(0)
2968                    .as_primitive::<arrow_array::types::Int32Type>()
2969                    .values()
2970                    .iter()
2971                    .copied()
2972            })
2973            .collect();
2974
2975        assert!(
2976            !all_ids.contains(&200),
2977            "Row with id=200 should be deleted but was found in results"
2978        );
2979
2980        // Verify we have ids 101-199 (not 101-200)
2981        let expected_ids: Vec<i32> = (101..=199).collect();
2982        assert_eq!(
2983            all_ids, expected_ids,
2984            "Should have ids 101-199 but got different values"
2985        );
2986    }
2987    /// Test for bug where stale cached delete causes infinite loop when skipping row groups.
2988    ///
2989    /// This test exposes the inverse scenario of `test_position_delete_with_row_group_selection`:
2990    /// - Position delete targets a row in the SKIPPED row group (not the selected one)
2991    /// - After calling advance_to(), the cached delete index is stale
2992    /// - Without updating the cache, the code enters an infinite loop
2993    ///
2994    /// This test creates:
2995    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
2996    /// - A position delete file that deletes row 0 (first row in SKIPPED row group 0)
2997    /// - Row group selection that reads ONLY row group 1 (rows 100-199)
2998    ///
2999    /// The bug occurs when skipping row group 0:
3000    /// ```rust
3001    /// let mut next_deleted_row_idx_opt = delete_vector_iter.next(); // Some(0)
3002    /// // ... skip to row group 1 ...
3003    /// delete_vector_iter.advance_to(100); // Iterator advances past delete at 0
3004    /// // BUG: next_deleted_row_idx_opt is still Some(0) - STALE!
3005    /// // When processing row group 1:
3006    /// //   current_idx = 100, next_deleted_row_idx = 0, next_row_group_base_idx = 200
3007    /// //   Loop condition: 0 < 200 (true)
3008    /// //   But: current_idx (100) > next_deleted_row_idx (0)
3009    /// //   And: current_idx (100) != next_deleted_row_idx (0)
3010    /// //   Neither branch executes -> INFINITE LOOP!
3011    /// ```
3012    ///
3013    /// Expected behavior: Should return 100 rows (delete at 0 doesn't affect row group 1)
3014    /// Bug behavior: Infinite loop in build_deletes_row_selection
3015    #[tokio::test]
3016    async fn test_position_delete_in_skipped_row_group() {
3017        use arrow_array::{Int32Array, Int64Array};
3018        use parquet::file::reader::{FileReader, SerializedFileReader};
3019
3020        // Field IDs for positional delete schema
3021        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
3022        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
3023
3024        let tmp_dir = TempDir::new().unwrap();
3025        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3026
3027        // Create table schema with a single 'id' column
3028        let table_schema = Arc::new(
3029            Schema::builder()
3030                .with_schema_id(1)
3031                .with_fields(vec![
3032                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3033                ])
3034                .build()
3035                .unwrap(),
3036        );
3037
3038        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3039            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3040                PARQUET_FIELD_ID_META_KEY.to_string(),
3041                "1".to_string(),
3042            )])),
3043        ]));
3044
3045        // Step 1: Create data file with 200 rows in 2 row groups
3046        // Row group 0: rows 0-99 (ids 1-100)
3047        // Row group 1: rows 100-199 (ids 101-200)
3048        let data_file_path = format!("{table_location}/data.parquet");
3049
3050        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3051            Int32Array::from_iter_values(1..=100),
3052        )])
3053        .unwrap();
3054
3055        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3056            Int32Array::from_iter_values(101..=200),
3057        )])
3058        .unwrap();
3059
3060        // Force each batch into its own row group
3061        let props = WriterProperties::builder()
3062            .set_compression(Compression::SNAPPY)
3063            .set_max_row_group_size(100)
3064            .build();
3065
3066        let file = File::create(&data_file_path).unwrap();
3067        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3068        writer.write(&batch1).expect("Writing batch 1");
3069        writer.write(&batch2).expect("Writing batch 2");
3070        writer.close().unwrap();
3071
3072        // Verify we created 2 row groups
3073        let verify_file = File::open(&data_file_path).unwrap();
3074        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
3075        assert_eq!(
3076            verify_reader.metadata().num_row_groups(),
3077            2,
3078            "Should have 2 row groups"
3079        );
3080
3081        // Step 2: Create position delete file that deletes row 0 (id=1, first row in row group 0)
3082        let delete_file_path = format!("{table_location}/deletes.parquet");
3083
3084        let delete_schema = Arc::new(ArrowSchema::new(vec![
3085            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
3086                PARQUET_FIELD_ID_META_KEY.to_string(),
3087                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
3088            )])),
3089            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
3090                PARQUET_FIELD_ID_META_KEY.to_string(),
3091                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
3092            )])),
3093        ]));
3094
3095        // Delete row at position 0 (0-indexed, so it's the first row: id=1)
3096        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
3097            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
3098            Arc::new(Int64Array::from_iter_values(vec![0i64])),
3099        ])
3100        .unwrap();
3101
3102        let delete_props = WriterProperties::builder()
3103            .set_compression(Compression::SNAPPY)
3104            .build();
3105
3106        let delete_file = File::create(&delete_file_path).unwrap();
3107        let mut delete_writer =
3108            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
3109        delete_writer.write(&delete_batch).unwrap();
3110        delete_writer.close().unwrap();
3111
3112        // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
3113        // This exercises the row group selection code path where row group 0 is skipped
3114        let metadata_file = File::open(&data_file_path).unwrap();
3115        let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
3116        let metadata = metadata_reader.metadata();
3117
3118        let row_group_0 = metadata.row_group(0);
3119        let row_group_1 = metadata.row_group(1);
3120
3121        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
3122        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
3123        let rg1_length = row_group_1.compressed_size() as u64;
3124
3125        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3126        let reader = ArrowReaderBuilder::new(file_io).build();
3127
3128        // Create FileScanTask that reads ONLY row group 1 via byte range filtering
3129        let task = FileScanTask {
3130            start: rg1_start,
3131            length: rg1_length,
3132            record_count: Some(100), // Row group 1 has 100 rows
3133            data_file_path: data_file_path.clone(),
3134            data_file_format: DataFileFormat::Parquet,
3135            schema: table_schema.clone(),
3136            project_field_ids: vec![1],
3137            predicate: None,
3138            deletes: vec![FileScanTaskDeleteFile {
3139                file_path: delete_file_path,
3140                file_type: DataContentType::PositionDeletes,
3141                partition_spec_id: 0,
3142                equality_ids: None,
3143            }],
3144            partition: None,
3145            partition_spec: None,
3146            name_mapping: None,
3147        };
3148
3149        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3150        let result = reader
3151            .read(tasks)
3152            .unwrap()
3153            .try_collect::<Vec<RecordBatch>>()
3154            .await
3155            .unwrap();
3156
3157        // Step 4: Verify we got 100 rows (all of row group 1)
3158        // The delete at position 0 is in row group 0, which is skipped, so it doesn't affect us
3159        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3160
3161        assert_eq!(
3162            total_rows, 100,
3163            "Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \
3164             If this hangs or fails, it indicates the cached delete index was not updated after advance_to()."
3165        );
3166
3167        // Verify we have all ids from row group 1 (101-200)
3168        let all_ids: Vec<i32> = result
3169            .iter()
3170            .flat_map(|batch| {
3171                batch
3172                    .column(0)
3173                    .as_primitive::<arrow_array::types::Int32Type>()
3174                    .values()
3175                    .iter()
3176                    .copied()
3177            })
3178            .collect();
3179
3180        let expected_ids: Vec<i32> = (101..=200).collect();
3181        assert_eq!(
3182            all_ids, expected_ids,
3183            "Should have ids 101-200 (all of row group 1)"
3184        );
3185    }
3186
3187    /// Test reading Parquet files without field ID metadata (e.g., migrated tables).
3188    /// This exercises the position-based fallback path.
3189    ///
3190    /// Corresponds to Java's ParquetSchemaUtil.addFallbackIds() + pruneColumnsFallback()
3191    /// in /parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
3192    #[tokio::test]
3193    async fn test_read_parquet_file_without_field_ids() {
3194        let schema = Arc::new(
3195            Schema::builder()
3196                .with_schema_id(1)
3197                .with_fields(vec![
3198                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3199                    NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3200                ])
3201                .build()
3202                .unwrap(),
3203        );
3204
3205        // Parquet file from a migrated table - no field ID metadata
3206        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3207            Field::new("name", DataType::Utf8, false),
3208            Field::new("age", DataType::Int32, false),
3209        ]));
3210
3211        let tmp_dir = TempDir::new().unwrap();
3212        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3213        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3214
3215        let name_data = vec!["Alice", "Bob", "Charlie"];
3216        let age_data = vec![30, 25, 35];
3217
3218        use arrow_array::Int32Array;
3219        let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef;
3220        let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
3221
3222        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap();
3223
3224        let props = WriterProperties::builder()
3225            .set_compression(Compression::SNAPPY)
3226            .build();
3227
3228        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3229        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3230
3231        writer.write(&to_write).expect("Writing batch");
3232        writer.close().unwrap();
3233
3234        let reader = ArrowReaderBuilder::new(file_io).build();
3235
3236        let tasks = Box::pin(futures::stream::iter(
3237            vec![Ok(FileScanTask {
3238                start: 0,
3239                length: 0,
3240                record_count: None,
3241                data_file_path: format!("{table_location}/1.parquet"),
3242                data_file_format: DataFileFormat::Parquet,
3243                schema: schema.clone(),
3244                project_field_ids: vec![1, 2],
3245                predicate: None,
3246                deletes: vec![],
3247                partition: None,
3248                partition_spec: None,
3249                name_mapping: None,
3250            })]
3251            .into_iter(),
3252        )) as FileScanTaskStream;
3253
3254        let result = reader
3255            .read(tasks)
3256            .unwrap()
3257            .try_collect::<Vec<RecordBatch>>()
3258            .await
3259            .unwrap();
3260
3261        assert_eq!(result.len(), 1);
3262        let batch = &result[0];
3263        assert_eq!(batch.num_rows(), 3);
3264        assert_eq!(batch.num_columns(), 2);
3265
3266        // Verify position-based mapping: field_id 1 → position 0, field_id 2 → position 1
3267        let name_array = batch.column(0).as_string::<i32>();
3268        assert_eq!(name_array.value(0), "Alice");
3269        assert_eq!(name_array.value(1), "Bob");
3270        assert_eq!(name_array.value(2), "Charlie");
3271
3272        let age_array = batch
3273            .column(1)
3274            .as_primitive::<arrow_array::types::Int32Type>();
3275        assert_eq!(age_array.value(0), 30);
3276        assert_eq!(age_array.value(1), 25);
3277        assert_eq!(age_array.value(2), 35);
3278    }
3279
3280    /// Test reading Parquet files without field IDs with partial projection.
3281    /// Only a subset of columns are requested, verifying position-based fallback
3282    /// handles column selection correctly.
3283    #[tokio::test]
3284    async fn test_read_parquet_without_field_ids_partial_projection() {
3285        use arrow_array::Int32Array;
3286
3287        let schema = Arc::new(
3288            Schema::builder()
3289                .with_schema_id(1)
3290                .with_fields(vec![
3291                    NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(),
3292                    NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
3293                    NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(),
3294                    NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(),
3295                ])
3296                .build()
3297                .unwrap(),
3298        );
3299
3300        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3301            Field::new("col1", DataType::Utf8, false),
3302            Field::new("col2", DataType::Int32, false),
3303            Field::new("col3", DataType::Utf8, false),
3304            Field::new("col4", DataType::Int32, false),
3305        ]));
3306
3307        let tmp_dir = TempDir::new().unwrap();
3308        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3309        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3310
3311        let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
3312        let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3313        let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef;
3314        let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
3315
3316        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
3317            col1_data, col2_data, col3_data, col4_data,
3318        ])
3319        .unwrap();
3320
3321        let props = WriterProperties::builder()
3322            .set_compression(Compression::SNAPPY)
3323            .build();
3324
3325        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3326        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3327
3328        writer.write(&to_write).expect("Writing batch");
3329        writer.close().unwrap();
3330
3331        let reader = ArrowReaderBuilder::new(file_io).build();
3332
3333        let tasks = Box::pin(futures::stream::iter(
3334            vec![Ok(FileScanTask {
3335                start: 0,
3336                length: 0,
3337                record_count: None,
3338                data_file_path: format!("{table_location}/1.parquet"),
3339                data_file_format: DataFileFormat::Parquet,
3340                schema: schema.clone(),
3341                project_field_ids: vec![1, 3],
3342                predicate: None,
3343                deletes: vec![],
3344                partition: None,
3345                partition_spec: None,
3346                name_mapping: None,
3347            })]
3348            .into_iter(),
3349        )) as FileScanTaskStream;
3350
3351        let result = reader
3352            .read(tasks)
3353            .unwrap()
3354            .try_collect::<Vec<RecordBatch>>()
3355            .await
3356            .unwrap();
3357
3358        assert_eq!(result.len(), 1);
3359        let batch = &result[0];
3360        assert_eq!(batch.num_rows(), 2);
3361        assert_eq!(batch.num_columns(), 2);
3362
3363        let col1_array = batch.column(0).as_string::<i32>();
3364        assert_eq!(col1_array.value(0), "a");
3365        assert_eq!(col1_array.value(1), "b");
3366
3367        let col3_array = batch.column(1).as_string::<i32>();
3368        assert_eq!(col3_array.value(0), "c");
3369        assert_eq!(col3_array.value(1), "d");
3370    }
3371
3372    /// Test reading Parquet files without field IDs with schema evolution.
3373    /// The Iceberg schema has more fields than the Parquet file, testing that
3374    /// missing columns are filled with NULLs.
3375    #[tokio::test]
3376    async fn test_read_parquet_without_field_ids_schema_evolution() {
3377        use arrow_array::{Array, Int32Array};
3378
3379        // Schema with field 3 added after the file was written
3380        let schema = Arc::new(
3381            Schema::builder()
3382                .with_schema_id(1)
3383                .with_fields(vec![
3384                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3385                    NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3386                    NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(),
3387                ])
3388                .build()
3389                .unwrap(),
3390        );
3391
3392        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3393            Field::new("name", DataType::Utf8, false),
3394            Field::new("age", DataType::Int32, false),
3395        ]));
3396
3397        let tmp_dir = TempDir::new().unwrap();
3398        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3399        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3400
3401        let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3402        let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3403
3404        let to_write =
3405            RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap();
3406
3407        let props = WriterProperties::builder()
3408            .set_compression(Compression::SNAPPY)
3409            .build();
3410
3411        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3412        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3413
3414        writer.write(&to_write).expect("Writing batch");
3415        writer.close().unwrap();
3416
3417        let reader = ArrowReaderBuilder::new(file_io).build();
3418
3419        let tasks = Box::pin(futures::stream::iter(
3420            vec![Ok(FileScanTask {
3421                start: 0,
3422                length: 0,
3423                record_count: None,
3424                data_file_path: format!("{table_location}/1.parquet"),
3425                data_file_format: DataFileFormat::Parquet,
3426                schema: schema.clone(),
3427                project_field_ids: vec![1, 2, 3],
3428                predicate: None,
3429                deletes: vec![],
3430                partition: None,
3431                partition_spec: None,
3432                name_mapping: None,
3433            })]
3434            .into_iter(),
3435        )) as FileScanTaskStream;
3436
3437        let result = reader
3438            .read(tasks)
3439            .unwrap()
3440            .try_collect::<Vec<RecordBatch>>()
3441            .await
3442            .unwrap();
3443
3444        assert_eq!(result.len(), 1);
3445        let batch = &result[0];
3446        assert_eq!(batch.num_rows(), 2);
3447        assert_eq!(batch.num_columns(), 3);
3448
3449        let name_array = batch.column(0).as_string::<i32>();
3450        assert_eq!(name_array.value(0), "Alice");
3451        assert_eq!(name_array.value(1), "Bob");
3452
3453        let age_array = batch
3454            .column(1)
3455            .as_primitive::<arrow_array::types::Int32Type>();
3456        assert_eq!(age_array.value(0), 30);
3457        assert_eq!(age_array.value(1), 25);
3458
3459        // Verify missing column filled with NULLs
3460        let city_array = batch.column(2).as_string::<i32>();
3461        assert_eq!(city_array.null_count(), 2);
3462        assert!(city_array.is_null(0));
3463        assert!(city_array.is_null(1));
3464    }
3465
3466    /// Test reading Parquet files without field IDs that have multiple row groups.
3467    /// This ensures the position-based fallback works correctly across row group boundaries.
3468    #[tokio::test]
3469    async fn test_read_parquet_without_field_ids_multiple_row_groups() {
3470        use arrow_array::Int32Array;
3471
3472        let schema = Arc::new(
3473            Schema::builder()
3474                .with_schema_id(1)
3475                .with_fields(vec![
3476                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3477                    NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(),
3478                ])
3479                .build()
3480                .unwrap(),
3481        );
3482
3483        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3484            Field::new("name", DataType::Utf8, false),
3485            Field::new("value", DataType::Int32, false),
3486        ]));
3487
3488        let tmp_dir = TempDir::new().unwrap();
3489        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3490        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3491
3492        // Small row group size to create multiple row groups
3493        let props = WriterProperties::builder()
3494            .set_compression(Compression::SNAPPY)
3495            .set_write_batch_size(2)
3496            .set_max_row_group_size(2)
3497            .build();
3498
3499        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3500        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3501
3502        // Write 6 rows in 3 batches (will create 3 row groups)
3503        for batch_num in 0..3 {
3504            let name_data = Arc::new(StringArray::from(vec![
3505                format!("name_{}", batch_num * 2),
3506                format!("name_{}", batch_num * 2 + 1),
3507            ])) as ArrayRef;
3508            let value_data =
3509                Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef;
3510
3511            let batch =
3512                RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap();
3513            writer.write(&batch).expect("Writing batch");
3514        }
3515        writer.close().unwrap();
3516
3517        let reader = ArrowReaderBuilder::new(file_io).build();
3518
3519        let tasks = Box::pin(futures::stream::iter(
3520            vec![Ok(FileScanTask {
3521                start: 0,
3522                length: 0,
3523                record_count: None,
3524                data_file_path: format!("{table_location}/1.parquet"),
3525                data_file_format: DataFileFormat::Parquet,
3526                schema: schema.clone(),
3527                project_field_ids: vec![1, 2],
3528                predicate: None,
3529                deletes: vec![],
3530                partition: None,
3531                partition_spec: None,
3532                name_mapping: None,
3533            })]
3534            .into_iter(),
3535        )) as FileScanTaskStream;
3536
3537        let result = reader
3538            .read(tasks)
3539            .unwrap()
3540            .try_collect::<Vec<RecordBatch>>()
3541            .await
3542            .unwrap();
3543
3544        assert!(!result.is_empty());
3545
3546        let mut all_names = Vec::new();
3547        let mut all_values = Vec::new();
3548
3549        for batch in &result {
3550            let name_array = batch.column(0).as_string::<i32>();
3551            let value_array = batch
3552                .column(1)
3553                .as_primitive::<arrow_array::types::Int32Type>();
3554
3555            for i in 0..batch.num_rows() {
3556                all_names.push(name_array.value(i).to_string());
3557                all_values.push(value_array.value(i));
3558            }
3559        }
3560
3561        assert_eq!(all_names.len(), 6);
3562        assert_eq!(all_values.len(), 6);
3563
3564        for i in 0..6 {
3565            assert_eq!(all_names[i], format!("name_{i}"));
3566            assert_eq!(all_values[i], i as i32);
3567        }
3568    }
3569
3570    /// Test reading Parquet files without field IDs with nested types (struct).
3571    /// Java's pruneColumnsFallback() projects entire top-level columns including nested content.
3572    /// This test verifies that a top-level struct field is projected correctly with all its nested fields.
3573    #[tokio::test]
3574    async fn test_read_parquet_without_field_ids_with_struct() {
3575        use arrow_array::{Int32Array, StructArray};
3576        use arrow_schema::Fields;
3577
3578        let schema = Arc::new(
3579            Schema::builder()
3580                .with_schema_id(1)
3581                .with_fields(vec![
3582                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3583                    NestedField::required(
3584                        2,
3585                        "person",
3586                        Type::Struct(crate::spec::StructType::new(vec![
3587                            NestedField::required(
3588                                3,
3589                                "name",
3590                                Type::Primitive(PrimitiveType::String),
3591                            )
3592                            .into(),
3593                            NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int))
3594                                .into(),
3595                        ])),
3596                    )
3597                    .into(),
3598                ])
3599                .build()
3600                .unwrap(),
3601        );
3602
3603        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3604            Field::new("id", DataType::Int32, false),
3605            Field::new(
3606                "person",
3607                DataType::Struct(Fields::from(vec![
3608                    Field::new("name", DataType::Utf8, false),
3609                    Field::new("age", DataType::Int32, false),
3610                ])),
3611                false,
3612            ),
3613        ]));
3614
3615        let tmp_dir = TempDir::new().unwrap();
3616        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3617        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3618
3619        let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3620        let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3621        let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3622        let person_data = Arc::new(StructArray::from(vec![
3623            (
3624                Arc::new(Field::new("name", DataType::Utf8, false)),
3625                name_data,
3626            ),
3627            (
3628                Arc::new(Field::new("age", DataType::Int32, false)),
3629                age_data,
3630            ),
3631        ])) as ArrayRef;
3632
3633        let to_write =
3634            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap();
3635
3636        let props = WriterProperties::builder()
3637            .set_compression(Compression::SNAPPY)
3638            .build();
3639
3640        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3641        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3642
3643        writer.write(&to_write).expect("Writing batch");
3644        writer.close().unwrap();
3645
3646        let reader = ArrowReaderBuilder::new(file_io).build();
3647
3648        let tasks = Box::pin(futures::stream::iter(
3649            vec![Ok(FileScanTask {
3650                start: 0,
3651                length: 0,
3652                record_count: None,
3653                data_file_path: format!("{table_location}/1.parquet"),
3654                data_file_format: DataFileFormat::Parquet,
3655                schema: schema.clone(),
3656                project_field_ids: vec![1, 2],
3657                predicate: None,
3658                deletes: vec![],
3659                partition: None,
3660                partition_spec: None,
3661                name_mapping: None,
3662            })]
3663            .into_iter(),
3664        )) as FileScanTaskStream;
3665
3666        let result = reader
3667            .read(tasks)
3668            .unwrap()
3669            .try_collect::<Vec<RecordBatch>>()
3670            .await
3671            .unwrap();
3672
3673        assert_eq!(result.len(), 1);
3674        let batch = &result[0];
3675        assert_eq!(batch.num_rows(), 2);
3676        assert_eq!(batch.num_columns(), 2);
3677
3678        let id_array = batch
3679            .column(0)
3680            .as_primitive::<arrow_array::types::Int32Type>();
3681        assert_eq!(id_array.value(0), 1);
3682        assert_eq!(id_array.value(1), 2);
3683
3684        let person_array = batch.column(1).as_struct();
3685        assert_eq!(person_array.num_columns(), 2);
3686
3687        let name_array = person_array.column(0).as_string::<i32>();
3688        assert_eq!(name_array.value(0), "Alice");
3689        assert_eq!(name_array.value(1), "Bob");
3690
3691        let age_array = person_array
3692            .column(1)
3693            .as_primitive::<arrow_array::types::Int32Type>();
3694        assert_eq!(age_array.value(0), 30);
3695        assert_eq!(age_array.value(1), 25);
3696    }
3697
3698    /// Test reading Parquet files without field IDs with schema evolution - column added in the middle.
3699    /// When a new column is inserted between existing columns in the schema order,
3700    /// the fallback projection must correctly map field IDs to output positions.
3701    #[tokio::test]
3702    async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
3703        use arrow_array::{Array, Int32Array};
3704
3705        let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
3706            Field::new("col0", DataType::Int32, true),
3707            Field::new("col1", DataType::Int32, true),
3708        ]));
3709
3710        // New column added between existing columns: col0 (id=1), newCol (id=5), col1 (id=2)
3711        let schema = Arc::new(
3712            Schema::builder()
3713                .with_schema_id(1)
3714                .with_fields(vec![
3715                    NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(),
3716                    NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(),
3717                    NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(),
3718                ])
3719                .build()
3720                .unwrap(),
3721        );
3722
3723        let tmp_dir = TempDir::new().unwrap();
3724        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3725        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3726
3727        let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3728        let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3729
3730        let to_write =
3731            RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap();
3732
3733        let props = WriterProperties::builder()
3734            .set_compression(Compression::SNAPPY)
3735            .build();
3736
3737        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3738        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3739        writer.write(&to_write).expect("Writing batch");
3740        writer.close().unwrap();
3741
3742        let reader = ArrowReaderBuilder::new(file_io).build();
3743
3744        let tasks = Box::pin(futures::stream::iter(
3745            vec![Ok(FileScanTask {
3746                start: 0,
3747                length: 0,
3748                record_count: None,
3749                data_file_path: format!("{table_location}/1.parquet"),
3750                data_file_format: DataFileFormat::Parquet,
3751                schema: schema.clone(),
3752                project_field_ids: vec![1, 5, 2],
3753                predicate: None,
3754                deletes: vec![],
3755                partition: None,
3756                partition_spec: None,
3757                name_mapping: None,
3758            })]
3759            .into_iter(),
3760        )) as FileScanTaskStream;
3761
3762        let result = reader
3763            .read(tasks)
3764            .unwrap()
3765            .try_collect::<Vec<RecordBatch>>()
3766            .await
3767            .unwrap();
3768
3769        assert_eq!(result.len(), 1);
3770        let batch = &result[0];
3771        assert_eq!(batch.num_rows(), 2);
3772        assert_eq!(batch.num_columns(), 3);
3773
3774        let result_col0 = batch
3775            .column(0)
3776            .as_primitive::<arrow_array::types::Int32Type>();
3777        assert_eq!(result_col0.value(0), 1);
3778        assert_eq!(result_col0.value(1), 2);
3779
3780        // New column should be NULL (doesn't exist in old file)
3781        let result_newcol = batch
3782            .column(1)
3783            .as_primitive::<arrow_array::types::Int32Type>();
3784        assert_eq!(result_newcol.null_count(), 2);
3785        assert!(result_newcol.is_null(0));
3786        assert!(result_newcol.is_null(1));
3787
3788        let result_col1 = batch
3789            .column(2)
3790            .as_primitive::<arrow_array::types::Int32Type>();
3791        assert_eq!(result_col1.value(0), 10);
3792        assert_eq!(result_col1.value(1), 20);
3793    }
3794
3795    /// Test reading Parquet files without field IDs with a filter that eliminates all row groups.
3796    /// During development of field ID mapping, we saw a panic when row_selection_enabled=true and
3797    /// all row groups are filtered out.
3798    #[tokio::test]
3799    async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
3800        use arrow_array::{Float64Array, Int32Array};
3801
3802        // Schema with fields that will use fallback IDs 1, 2, 3
3803        let schema = Arc::new(
3804            Schema::builder()
3805                .with_schema_id(1)
3806                .with_fields(vec![
3807                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3808                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3809                    NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double))
3810                        .into(),
3811                ])
3812                .build()
3813                .unwrap(),
3814        );
3815
3816        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3817            Field::new("id", DataType::Int32, false),
3818            Field::new("name", DataType::Utf8, false),
3819            Field::new("value", DataType::Float64, false),
3820        ]));
3821
3822        let tmp_dir = TempDir::new().unwrap();
3823        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3824        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3825
3826        // Write data where all ids are >= 10
3827        let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
3828        let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3829        let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef;
3830
3831        let to_write =
3832            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data])
3833                .unwrap();
3834
3835        let props = WriterProperties::builder()
3836            .set_compression(Compression::SNAPPY)
3837            .build();
3838
3839        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3840        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3841        writer.write(&to_write).expect("Writing batch");
3842        writer.close().unwrap();
3843
3844        // Filter that eliminates all row groups: id < 5
3845        let predicate = Reference::new("id").less_than(Datum::int(5));
3846
3847        // Enable both row_group_filtering and row_selection - triggered the panic
3848        let reader = ArrowReaderBuilder::new(file_io)
3849            .with_row_group_filtering_enabled(true)
3850            .with_row_selection_enabled(true)
3851            .build();
3852
3853        let tasks = Box::pin(futures::stream::iter(
3854            vec![Ok(FileScanTask {
3855                start: 0,
3856                length: 0,
3857                record_count: None,
3858                data_file_path: format!("{table_location}/1.parquet"),
3859                data_file_format: DataFileFormat::Parquet,
3860                schema: schema.clone(),
3861                project_field_ids: vec![1, 2, 3],
3862                predicate: Some(predicate.bind(schema, true).unwrap()),
3863                deletes: vec![],
3864                partition: None,
3865                partition_spec: None,
3866                name_mapping: None,
3867            })]
3868            .into_iter(),
3869        )) as FileScanTaskStream;
3870
3871        // Should no longer panic
3872        let result = reader
3873            .read(tasks)
3874            .unwrap()
3875            .try_collect::<Vec<RecordBatch>>()
3876            .await
3877            .unwrap();
3878
3879        // Should return empty results
3880        assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
3881    }
3882
3883    /// Test bucket partitioning reads source column from data file (not partition metadata).
3884    ///
3885    /// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning.
3886    /// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g., testRenamedSourceColumnTable).
3887    ///
3888    /// # Iceberg Spec Requirements
3889    ///
3890    /// Per the Iceberg spec "Column Projection" section:
3891    /// > "Return the value from partition metadata if an **Identity Transform** exists for the field"
3892    ///
3893    /// This means:
3894    /// - Identity transforms (e.g., `identity(dept)`) use constants from partition metadata
3895    /// - Non-identity transforms (e.g., `bucket(4, id)`) must read source columns from data files
3896    /// - Partition metadata for bucket transforms stores bucket numbers (0-3), NOT source values
3897    ///
3898    /// Java's PartitionUtil.constantsMap() implements this via:
3899    /// ```java
3900    /// if (field.transform().isIdentity()) {
3901    ///     idToConstant.put(field.sourceId(), converted);
3902    /// }
3903    /// ```
3904    ///
3905    /// # What This Test Verifies
3906    ///
3907    /// This test ensures the full ArrowReader → RecordBatchTransformer pipeline correctly handles
3908    /// bucket partitioning when FileScanTask provides partition_spec and partition_data:
3909    ///
3910    /// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 13]
3911    /// - FileScanTask specifies partition_spec with bucket(4, id) and partition_data with bucket=1
3912    /// - RecordBatchTransformer.constants_map() excludes bucket-partitioned field from constants
3913    /// - ArrowReader correctly reads [1, 5, 9, 13] from the data file
3914    /// - Values are NOT replaced with constant 1 from partition metadata
3915    ///
3916    /// # Why This Matters
3917    ///
3918    /// Without correct handling:
3919    /// - Runtime filtering would break (e.g., `WHERE id = 5` would fail)
3920    /// - Query results would be incorrect (all rows would have id=1)
3921    /// - Bucket partitioning would be unusable for query optimization
3922    ///
3923    /// # References
3924    /// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms"
3925    /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
3926    /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
3927    #[tokio::test]
3928    async fn test_bucket_partitioning_reads_source_column_from_file() {
3929        use arrow_array::Int32Array;
3930
3931        use crate::spec::{Literal, PartitionSpec, Struct, Transform};
3932
3933        // Iceberg schema with id and name columns
3934        let schema = Arc::new(
3935            Schema::builder()
3936                .with_schema_id(0)
3937                .with_fields(vec![
3938                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3939                    NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3940                ])
3941                .build()
3942                .unwrap(),
3943        );
3944
3945        // Partition spec: bucket(4, id)
3946        let partition_spec = Arc::new(
3947            PartitionSpec::builder(schema.clone())
3948                .with_spec_id(0)
3949                .add_partition_field("id", "id_bucket", Transform::Bucket(4))
3950                .unwrap()
3951                .build()
3952                .unwrap(),
3953        );
3954
3955        // Partition data: bucket value is 1
3956        let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
3957
3958        // Create Arrow schema with field IDs for Parquet file
3959        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3960            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3961                PARQUET_FIELD_ID_META_KEY.to_string(),
3962                "1".to_string(),
3963            )])),
3964            Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
3965                PARQUET_FIELD_ID_META_KEY.to_string(),
3966                "2".to_string(),
3967            )])),
3968        ]));
3969
3970        // Write Parquet file with data
3971        let tmp_dir = TempDir::new().unwrap();
3972        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3973        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3974
3975        let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
3976        let name_data =
3977            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;
3978
3979        let to_write =
3980            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();
3981
3982        let props = WriterProperties::builder()
3983            .set_compression(Compression::SNAPPY)
3984            .build();
3985        let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
3986        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3987        writer.write(&to_write).expect("Writing batch");
3988        writer.close().unwrap();
3989
3990        // Read the Parquet file with partition spec and data
3991        let reader = ArrowReaderBuilder::new(file_io).build();
3992        let tasks = Box::pin(futures::stream::iter(
3993            vec![Ok(FileScanTask {
3994                start: 0,
3995                length: 0,
3996                record_count: None,
3997                data_file_path: format!("{table_location}/data.parquet"),
3998                data_file_format: DataFileFormat::Parquet,
3999                schema: schema.clone(),
4000                project_field_ids: vec![1, 2],
4001                predicate: None,
4002                deletes: vec![],
4003                partition: Some(partition_data),
4004                partition_spec: Some(partition_spec),
4005                name_mapping: None,
4006            })]
4007            .into_iter(),
4008        )) as FileScanTaskStream;
4009
4010        let result = reader
4011            .read(tasks)
4012            .unwrap()
4013            .try_collect::<Vec<RecordBatch>>()
4014            .await
4015            .unwrap();
4016
4017        // Verify we got the correct data
4018        assert_eq!(result.len(), 1);
4019        let batch = &result[0];
4020
4021        assert_eq!(batch.num_columns(), 2);
4022        assert_eq!(batch.num_rows(), 4);
4023
4024        // The id column MUST contain actual values from the Parquet file [1, 5, 9, 13],
4025        // NOT the constant partition value 1
4026        let id_col = batch
4027            .column(0)
4028            .as_primitive::<arrow_array::types::Int32Type>();
4029        assert_eq!(id_col.value(0), 1);
4030        assert_eq!(id_col.value(1), 5);
4031        assert_eq!(id_col.value(2), 9);
4032        assert_eq!(id_col.value(3), 13);
4033
4034        let name_col = batch.column(1).as_string::<i32>();
4035        assert_eq!(name_col.value(0), "Alice");
4036        assert_eq!(name_col.value(1), "Bob");
4037        assert_eq!(name_col.value(2), "Charlie");
4038        assert_eq!(name_col.value(3), "Dave");
4039    }
4040}