iceberg/scan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Table scan api.
19
20mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35use crate::delete_file_index::DeleteFileIndex;
36use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
37use crate::expr::{Bind, BoundPredicate, Predicate};
38use crate::io::FileIO;
39use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
40use crate::runtime::spawn;
41use crate::spec::{DataContentType, SnapshotRef};
42use crate::table::Table;
43use crate::utils::available_parallelism;
44use crate::{Error, ErrorKind, Result};
45
46/// A stream of arrow [`RecordBatch`]es.
47pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
48
49/// Builder to create table scan.
50pub struct TableScanBuilder<'a> {
51    table: &'a Table,
52    // Defaults to none which means select all columns
53    column_names: Option<Vec<String>>,
54    snapshot_id: Option<i64>,
55    batch_size: Option<usize>,
56    case_sensitive: bool,
57    filter: Option<Predicate>,
58    concurrency_limit_data_files: usize,
59    concurrency_limit_manifest_entries: usize,
60    concurrency_limit_manifest_files: usize,
61    row_group_filtering_enabled: bool,
62    row_selection_enabled: bool,
63}
64
65impl<'a> TableScanBuilder<'a> {
66    pub(crate) fn new(table: &'a Table) -> Self {
67        let num_cpus = available_parallelism().get();
68
69        Self {
70            table,
71            column_names: None,
72            snapshot_id: None,
73            batch_size: None,
74            case_sensitive: true,
75            filter: None,
76            concurrency_limit_data_files: num_cpus,
77            concurrency_limit_manifest_entries: num_cpus,
78            concurrency_limit_manifest_files: num_cpus,
79            row_group_filtering_enabled: true,
80            row_selection_enabled: false,
81        }
82    }
83
84    /// Sets the desired size of batches in the response
85    /// to something other than the default
86    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
87        self.batch_size = batch_size;
88        self
89    }
90
91    /// Sets the scan's case sensitivity
92    pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
93        self.case_sensitive = case_sensitive;
94        self
95    }
96
97    /// Specifies a predicate to use as a filter
98    pub fn with_filter(mut self, predicate: Predicate) -> Self {
99        // calls rewrite_not to remove Not nodes, which must be absent
100        // when applying the manifest evaluator
101        self.filter = Some(predicate.rewrite_not());
102        self
103    }
104
105    /// Select all columns.
106    pub fn select_all(mut self) -> Self {
107        self.column_names = None;
108        self
109    }
110
111    /// Select empty columns.
112    pub fn select_empty(mut self) -> Self {
113        self.column_names = Some(vec![]);
114        self
115    }
116
117    /// Select some columns of the table.
118    pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
119        self.column_names = Some(
120            column_names
121                .into_iter()
122                .map(|item| item.to_string())
123                .collect(),
124        );
125        self
126    }
127
128    /// Set the snapshot to scan. When not set, it uses current snapshot.
129    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
130        self.snapshot_id = Some(snapshot_id);
131        self
132    }
133
134    /// Sets the concurrency limit for both manifest files and manifest
135    /// entries for this scan
136    pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
137        self.concurrency_limit_manifest_files = limit;
138        self.concurrency_limit_manifest_entries = limit;
139        self.concurrency_limit_data_files = limit;
140        self
141    }
142
143    /// Sets the data file concurrency limit for this scan
144    pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
145        self.concurrency_limit_data_files = limit;
146        self
147    }
148
149    /// Sets the manifest entry concurrency limit for this scan
150    pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
151        self.concurrency_limit_manifest_entries = limit;
152        self
153    }
154
155    /// Determines whether to enable row group filtering.
156    /// When enabled, if a read is performed with a filter predicate,
157    /// then the metadata for each row group in the parquet file is
158    /// evaluated against the filter predicate and row groups
159    /// that cant contain matching rows will be skipped entirely.
160    ///
161    /// Defaults to enabled, as it generally improves performance or
162    /// keeps it the same, with performance degradation unlikely.
163    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
164        self.row_group_filtering_enabled = row_group_filtering_enabled;
165        self
166    }
167
168    /// Determines whether to enable row selection.
169    /// When enabled, if a read is performed with a filter predicate,
170    /// then (for row groups that have not been skipped) the page index
171    /// for each row group in a parquet file is parsed and evaluated
172    /// against the filter predicate to determine if ranges of rows
173    /// within a row group can be skipped, based upon the page-level
174    /// statistics for each column.
175    ///
176    /// Defaults to being disabled. Enabling requires parsing the parquet page
177    /// index, which can be slow enough that parsing the page index outweighs any
178    /// gains from the reduced number of rows that need scanning.
179    /// It is recommended to experiment with partitioning, sorting, row group size,
180    /// page size, and page row limit Iceberg settings on the table being scanned in
181    /// order to get the best performance from using row selection.
182    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
183        self.row_selection_enabled = row_selection_enabled;
184        self
185    }
186
187    /// Build the table scan.
188    pub fn build(self) -> Result<TableScan> {
189        let snapshot = match self.snapshot_id {
190            Some(snapshot_id) => self
191                .table
192                .metadata()
193                .snapshot_by_id(snapshot_id)
194                .ok_or_else(|| {
195                    Error::new(
196                        ErrorKind::DataInvalid,
197                        format!("Snapshot with id {snapshot_id} not found"),
198                    )
199                })?
200                .clone(),
201            None => {
202                let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
203                    return Ok(TableScan {
204                        batch_size: self.batch_size,
205                        column_names: self.column_names,
206                        file_io: self.table.file_io().clone(),
207                        plan_context: None,
208                        concurrency_limit_data_files: self.concurrency_limit_data_files,
209                        concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
210                        concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
211                        row_group_filtering_enabled: self.row_group_filtering_enabled,
212                        row_selection_enabled: self.row_selection_enabled,
213                    });
214                };
215                current_snapshot_id.clone()
216            }
217        };
218
219        let schema = snapshot.schema(self.table.metadata())?;
220
221        // Check that all column names exist in the schema (skip reserved columns).
222        if let Some(column_names) = self.column_names.as_ref() {
223            for column_name in column_names {
224                // Skip reserved columns that don't exist in the schema
225                if is_metadata_column_name(column_name) {
226                    continue;
227                }
228                if schema.field_by_name(column_name).is_none() {
229                    return Err(Error::new(
230                        ErrorKind::DataInvalid,
231                        format!("Column {column_name} not found in table. Schema: {schema}"),
232                    ));
233                }
234            }
235        }
236
237        let mut field_ids = vec![];
238        let column_names = self.column_names.clone().unwrap_or_else(|| {
239            schema
240                .as_struct()
241                .fields()
242                .iter()
243                .map(|f| f.name.clone())
244                .collect()
245        });
246
247        for column_name in column_names.iter() {
248            // Handle metadata columns (like "_file")
249            if is_metadata_column_name(column_name) {
250                field_ids.push(get_metadata_field_id(column_name)?);
251                continue;
252            }
253
254            let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
255                Error::new(
256                    ErrorKind::DataInvalid,
257                    format!("Column {column_name} not found in table. Schema: {schema}"),
258                )
259            })?;
260
261            schema
262                .as_struct()
263                .field_by_id(field_id)
264                .ok_or_else(|| {
265                    Error::new(
266                        ErrorKind::FeatureUnsupported,
267                        format!(
268                        "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
269                    ),
270                )
271            })?;
272
273            field_ids.push(field_id);
274        }
275
276        let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
277            Some(predicates.bind(schema.clone(), true)?)
278        } else {
279            None
280        };
281
282        let plan_context = PlanContext {
283            snapshot,
284            table_metadata: self.table.metadata_ref(),
285            snapshot_schema: schema,
286            case_sensitive: self.case_sensitive,
287            predicate: self.filter.map(Arc::new),
288            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
289            object_cache: self.table.object_cache(),
290            field_ids: Arc::new(field_ids),
291            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
292            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
293            expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
294        };
295
296        Ok(TableScan {
297            batch_size: self.batch_size,
298            column_names: self.column_names,
299            file_io: self.table.file_io().clone(),
300            plan_context: Some(plan_context),
301            concurrency_limit_data_files: self.concurrency_limit_data_files,
302            concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
303            concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
304            row_group_filtering_enabled: self.row_group_filtering_enabled,
305            row_selection_enabled: self.row_selection_enabled,
306        })
307    }
308}
309
310/// Table scan.
311#[derive(Debug)]
312pub struct TableScan {
313    /// A [PlanContext], if this table has at least one snapshot, otherwise None.
314    ///
315    /// If this is None, then the scan contains no rows.
316    plan_context: Option<PlanContext>,
317    batch_size: Option<usize>,
318    file_io: FileIO,
319    column_names: Option<Vec<String>>,
320    /// The maximum number of manifest files that will be
321    /// retrieved from [`FileIO`] concurrently
322    concurrency_limit_manifest_files: usize,
323
324    /// The maximum number of [`ManifestEntry`]s that will
325    /// be processed in parallel
326    concurrency_limit_manifest_entries: usize,
327
328    /// The maximum number of [`ManifestEntry`]s that will
329    /// be processed in parallel
330    concurrency_limit_data_files: usize,
331
332    row_group_filtering_enabled: bool,
333    row_selection_enabled: bool,
334}
335
336impl TableScan {
337    /// Returns a stream of [`FileScanTask`]s.
338    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
339        let Some(plan_context) = self.plan_context.as_ref() else {
340            return Ok(Box::pin(futures::stream::empty()));
341        };
342
343        let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
344        let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
345
346        // used to stream ManifestEntryContexts between stages of the file plan operation
347        let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
348            channel(concurrency_limit_manifest_files);
349        let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
350            channel(concurrency_limit_manifest_files);
351
352        // used to stream the results back to the caller
353        let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
354
355        let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
356
357        let manifest_list = plan_context.get_manifest_list().await?;
358
359        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
360        // whose partitions cannot match this
361        // scan's filter
362        let manifest_file_contexts = plan_context.build_manifest_file_contexts(
363            manifest_list,
364            manifest_entry_data_ctx_tx,
365            delete_file_idx.clone(),
366            manifest_entry_delete_ctx_tx,
367        )?;
368
369        let mut channel_for_manifest_error = file_scan_task_tx.clone();
370
371        // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s
372        spawn(async move {
373            let result = futures::stream::iter(manifest_file_contexts)
374                .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
375                    ctx.fetch_manifest_and_stream_manifest_entries().await
376                })
377                .await;
378
379            if let Err(error) = result {
380                let _ = channel_for_manifest_error.send(Err(error)).await;
381            }
382        });
383
384        let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
385        let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
386
387        // Process the delete file [`ManifestEntry`] stream in parallel
388        spawn(async move {
389            let result = manifest_entry_delete_ctx_rx
390                .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
391                .try_for_each_concurrent(
392                    concurrency_limit_manifest_entries,
393                    |(manifest_entry_context, tx)| async move {
394                        spawn(async move {
395                            Self::process_delete_manifest_entry(manifest_entry_context, tx).await
396                        })
397                        .await
398                    },
399                )
400                .await;
401
402            if let Err(error) = result {
403                let _ = channel_for_delete_manifest_entry_error
404                    .send(Err(error))
405                    .await;
406            }
407        })
408        .await;
409
410        // Process the data file [`ManifestEntry`] stream in parallel
411        spawn(async move {
412            let result = manifest_entry_data_ctx_rx
413                .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
414                .try_for_each_concurrent(
415                    concurrency_limit_manifest_entries,
416                    |(manifest_entry_context, tx)| async move {
417                        spawn(async move {
418                            Self::process_data_manifest_entry(manifest_entry_context, tx).await
419                        })
420                        .await
421                    },
422                )
423                .await;
424
425            if let Err(error) = result {
426                let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
427            }
428        });
429
430        Ok(file_scan_task_rx.boxed())
431    }
432
433    /// Returns an [`ArrowRecordBatchStream`].
434    pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
435        let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
436            .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
437            .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
438            .with_row_selection_enabled(self.row_selection_enabled);
439
440        if let Some(batch_size) = self.batch_size {
441            arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
442        }
443
444        arrow_reader_builder.build().read(self.plan_files().await?)
445    }
446
447    /// Returns a reference to the column names of the table scan.
448    pub fn column_names(&self) -> Option<&[String]> {
449        self.column_names.as_deref()
450    }
451
452    /// Returns a reference to the snapshot of the table scan.
453    pub fn snapshot(&self) -> Option<&SnapshotRef> {
454        self.plan_context.as_ref().map(|x| &x.snapshot)
455    }
456
457    async fn process_data_manifest_entry(
458        manifest_entry_context: ManifestEntryContext,
459        mut file_scan_task_tx: Sender<Result<FileScanTask>>,
460    ) -> Result<()> {
461        // skip processing this manifest entry if it has been marked as deleted
462        if !manifest_entry_context.manifest_entry.is_alive() {
463            return Ok(());
464        }
465
466        // abort the plan if we encounter a manifest entry for a delete file
467        if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
468            return Err(Error::new(
469                ErrorKind::FeatureUnsupported,
470                "Encountered an entry for a delete file in a data file manifest",
471            ));
472        }
473
474        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
475            let BoundPredicates {
476                snapshot_bound_predicate,
477                partition_bound_predicate,
478            } = bound_predicates.as_ref();
479
480            let expression_evaluator_cache =
481                manifest_entry_context.expression_evaluator_cache.as_ref();
482
483            let expression_evaluator = expression_evaluator_cache.get(
484                manifest_entry_context.partition_spec_id,
485                partition_bound_predicate,
486            )?;
487
488            // skip any data file whose partition data indicates that it can't contain
489            // any data that matches this scan's filter
490            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
491                return Ok(());
492            }
493
494            // skip any data file whose metrics don't match this scan's filter
495            if !InclusiveMetricsEvaluator::eval(
496                snapshot_bound_predicate,
497                manifest_entry_context.manifest_entry.data_file(),
498                false,
499            )? {
500                return Ok(());
501            }
502        }
503
504        // congratulations! the manifest entry has made its way through the
505        // entire plan without getting filtered out. Create a corresponding
506        // FileScanTask and push it to the result stream
507        file_scan_task_tx
508            .send(Ok(manifest_entry_context.into_file_scan_task().await?))
509            .await?;
510
511        Ok(())
512    }
513
514    async fn process_delete_manifest_entry(
515        manifest_entry_context: ManifestEntryContext,
516        mut delete_file_ctx_tx: Sender<DeleteFileContext>,
517    ) -> Result<()> {
518        // skip processing this manifest entry if it has been marked as deleted
519        if !manifest_entry_context.manifest_entry.is_alive() {
520            return Ok(());
521        }
522
523        // abort the plan if we encounter a manifest entry that is not for a delete file
524        if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
525            return Err(Error::new(
526                ErrorKind::FeatureUnsupported,
527                "Encountered an entry for a data file in a delete manifest",
528            ));
529        }
530
531        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
532            let expression_evaluator_cache =
533                manifest_entry_context.expression_evaluator_cache.as_ref();
534
535            let expression_evaluator = expression_evaluator_cache.get(
536                manifest_entry_context.partition_spec_id,
537                &bound_predicates.partition_bound_predicate,
538            )?;
539
540            // skip any data file whose partition data indicates that it can't contain
541            // any data that matches this scan's filter
542            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
543                return Ok(());
544            }
545        }
546
547        delete_file_ctx_tx
548            .send(DeleteFileContext {
549                manifest_entry: manifest_entry_context.manifest_entry.clone(),
550                partition_spec_id: manifest_entry_context.partition_spec_id,
551            })
552            .await?;
553
554        Ok(())
555    }
556}
557
558pub(crate) struct BoundPredicates {
559    partition_bound_predicate: BoundPredicate,
560    snapshot_bound_predicate: BoundPredicate,
561}
562
563#[cfg(test)]
564pub mod tests {
565    //! shared tests for the table scan API
566    #![allow(missing_docs)]
567
568    use std::collections::HashMap;
569    use std::fs;
570    use std::fs::File;
571    use std::sync::Arc;
572
573    use arrow_array::cast::AsArray;
574    use arrow_array::{
575        Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
576        StringArray,
577    };
578    use futures::{TryStreamExt, stream};
579    use minijinja::value::Value;
580    use minijinja::{AutoEscape, Environment, context};
581    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
582    use parquet::basic::Compression;
583    use parquet::file::properties::WriterProperties;
584    use tempfile::TempDir;
585    use uuid::Uuid;
586
587    use crate::TableIdent;
588    use crate::arrow::ArrowReaderBuilder;
589    use crate::expr::{BoundPredicate, Reference};
590    use crate::io::{FileIO, OutputFile};
591    use crate::metadata_columns::RESERVED_COL_NAME_FILE;
592    use crate::scan::FileScanTask;
593    use crate::spec::{
594        DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
595        ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
596        PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
597    };
598    use crate::table::Table;
599
600    fn render_template(template: &str, ctx: Value) -> String {
601        let mut env = Environment::new();
602        env.set_auto_escape_callback(|_| AutoEscape::None);
603        env.render_str(template, ctx).unwrap()
604    }
605
606    pub struct TableTestFixture {
607        pub table_location: String,
608        pub table: Table,
609    }
610
611    impl TableTestFixture {
612        #[allow(clippy::new_without_default)]
613        pub fn new() -> Self {
614            let tmp_dir = TempDir::new().unwrap();
615            let table_location = tmp_dir.path().join("table1");
616            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
617            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
618            let table_metadata1_location = table_location.join("metadata/v1.json");
619
620            let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
621                .unwrap()
622                .build()
623                .unwrap();
624
625            let table_metadata = {
626                let template_json_str = fs::read_to_string(format!(
627                    "{}/testdata/example_table_metadata_v2.json",
628                    env!("CARGO_MANIFEST_DIR")
629                ))
630                .unwrap();
631                let metadata_json = render_template(&template_json_str, context! {
632                    table_location => &table_location,
633                    manifest_list_1_location => &manifest_list1_location,
634                    manifest_list_2_location => &manifest_list2_location,
635                    table_metadata_1_location => &table_metadata1_location,
636                });
637                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
638            };
639
640            let table = Table::builder()
641                .metadata(table_metadata)
642                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
643                .file_io(file_io.clone())
644                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
645                .build()
646                .unwrap();
647
648            Self {
649                table_location: table_location.to_str().unwrap().to_string(),
650                table,
651            }
652        }
653
654        #[allow(clippy::new_without_default)]
655        pub fn new_empty() -> Self {
656            let tmp_dir = TempDir::new().unwrap();
657            let table_location = tmp_dir.path().join("table1");
658            let table_metadata1_location = table_location.join("metadata/v1.json");
659
660            let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
661                .unwrap()
662                .build()
663                .unwrap();
664
665            let table_metadata = {
666                let template_json_str = fs::read_to_string(format!(
667                    "{}/testdata/example_empty_table_metadata_v2.json",
668                    env!("CARGO_MANIFEST_DIR")
669                ))
670                .unwrap();
671                let metadata_json = render_template(&template_json_str, context! {
672                    table_location => &table_location,
673                    table_metadata_1_location => &table_metadata1_location,
674                });
675                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
676            };
677
678            let table = Table::builder()
679                .metadata(table_metadata)
680                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
681                .file_io(file_io.clone())
682                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
683                .build()
684                .unwrap();
685
686            Self {
687                table_location: table_location.to_str().unwrap().to_string(),
688                table,
689            }
690        }
691
692        pub fn new_unpartitioned() -> Self {
693            let tmp_dir = TempDir::new().unwrap();
694            let table_location = tmp_dir.path().join("table1");
695            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
696            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
697            let table_metadata1_location = table_location.join("metadata/v1.json");
698
699            let file_io = FileIO::from_path(table_location.to_str().unwrap())
700                .unwrap()
701                .build()
702                .unwrap();
703
704            let mut table_metadata = {
705                let template_json_str = fs::read_to_string(format!(
706                    "{}/testdata/example_table_metadata_v2.json",
707                    env!("CARGO_MANIFEST_DIR")
708                ))
709                .unwrap();
710                let metadata_json = render_template(&template_json_str, context! {
711                    table_location => &table_location,
712                    manifest_list_1_location => &manifest_list1_location,
713                    manifest_list_2_location => &manifest_list2_location,
714                    table_metadata_1_location => &table_metadata1_location,
715                });
716                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
717            };
718
719            table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
720            table_metadata.partition_specs.clear();
721            table_metadata.default_partition_type = StructType::new(vec![]);
722            table_metadata
723                .partition_specs
724                .insert(0, table_metadata.default_spec.clone());
725
726            let table = Table::builder()
727                .metadata(table_metadata)
728                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
729                .file_io(file_io.clone())
730                .metadata_location(table_metadata1_location.to_str().unwrap())
731                .build()
732                .unwrap();
733
734            Self {
735                table_location: table_location.to_str().unwrap().to_string(),
736                table,
737            }
738        }
739
740        fn next_manifest_file(&self) -> OutputFile {
741            self.table
742                .file_io()
743                .new_output(format!(
744                    "{}/metadata/manifest_{}.avro",
745                    self.table_location,
746                    Uuid::new_v4()
747                ))
748                .unwrap()
749        }
750
751        pub async fn setup_manifest_files(&mut self) {
752            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
753            let parent_snapshot = current_snapshot
754                .parent_snapshot(self.table.metadata())
755                .unwrap();
756            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
757            let current_partition_spec = self.table.metadata().default_partition_spec();
758
759            // Write data files
760            let mut writer = ManifestWriterBuilder::new(
761                self.next_manifest_file(),
762                Some(current_snapshot.snapshot_id()),
763                None,
764                current_schema.clone(),
765                current_partition_spec.as_ref().clone(),
766            )
767            .build_v2_data();
768            writer
769                .add_entry(
770                    ManifestEntry::builder()
771                        .status(ManifestStatus::Added)
772                        .data_file(
773                            DataFileBuilder::default()
774                                .partition_spec_id(0)
775                                .content(DataContentType::Data)
776                                .file_path(format!("{}/1.parquet", &self.table_location))
777                                .file_format(DataFileFormat::Parquet)
778                                .file_size_in_bytes(100)
779                                .record_count(1)
780                                .partition(Struct::from_iter([Some(Literal::long(100))]))
781                                .key_metadata(None)
782                                .build()
783                                .unwrap(),
784                        )
785                        .build(),
786                )
787                .unwrap();
788            writer
789                .add_delete_entry(
790                    ManifestEntry::builder()
791                        .status(ManifestStatus::Deleted)
792                        .snapshot_id(parent_snapshot.snapshot_id())
793                        .sequence_number(parent_snapshot.sequence_number())
794                        .file_sequence_number(parent_snapshot.sequence_number())
795                        .data_file(
796                            DataFileBuilder::default()
797                                .partition_spec_id(0)
798                                .content(DataContentType::Data)
799                                .file_path(format!("{}/2.parquet", &self.table_location))
800                                .file_format(DataFileFormat::Parquet)
801                                .file_size_in_bytes(100)
802                                .record_count(1)
803                                .partition(Struct::from_iter([Some(Literal::long(200))]))
804                                .build()
805                                .unwrap(),
806                        )
807                        .build(),
808                )
809                .unwrap();
810            writer
811                .add_existing_entry(
812                    ManifestEntry::builder()
813                        .status(ManifestStatus::Existing)
814                        .snapshot_id(parent_snapshot.snapshot_id())
815                        .sequence_number(parent_snapshot.sequence_number())
816                        .file_sequence_number(parent_snapshot.sequence_number())
817                        .data_file(
818                            DataFileBuilder::default()
819                                .partition_spec_id(0)
820                                .content(DataContentType::Data)
821                                .file_path(format!("{}/3.parquet", &self.table_location))
822                                .file_format(DataFileFormat::Parquet)
823                                .file_size_in_bytes(100)
824                                .record_count(1)
825                                .partition(Struct::from_iter([Some(Literal::long(300))]))
826                                .build()
827                                .unwrap(),
828                        )
829                        .build(),
830                )
831                .unwrap();
832            let data_file_manifest = writer.write_manifest_file().await.unwrap();
833
834            // Write to manifest list
835            let mut manifest_list_write = ManifestListWriter::v2(
836                self.table
837                    .file_io()
838                    .new_output(current_snapshot.manifest_list())
839                    .unwrap(),
840                current_snapshot.snapshot_id(),
841                current_snapshot.parent_snapshot_id(),
842                current_snapshot.sequence_number(),
843            );
844            manifest_list_write
845                .add_manifests(vec![data_file_manifest].into_iter())
846                .unwrap();
847            manifest_list_write.close().await.unwrap();
848
849            // prepare data
850            let schema = {
851                let fields = vec![
852                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
853                        .with_metadata(HashMap::from([(
854                            PARQUET_FIELD_ID_META_KEY.to_string(),
855                            "1".to_string(),
856                        )])),
857                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
858                        .with_metadata(HashMap::from([(
859                            PARQUET_FIELD_ID_META_KEY.to_string(),
860                            "2".to_string(),
861                        )])),
862                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
863                        .with_metadata(HashMap::from([(
864                            PARQUET_FIELD_ID_META_KEY.to_string(),
865                            "3".to_string(),
866                        )])),
867                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
868                        .with_metadata(HashMap::from([(
869                            PARQUET_FIELD_ID_META_KEY.to_string(),
870                            "4".to_string(),
871                        )])),
872                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
873                        .with_metadata(HashMap::from([(
874                            PARQUET_FIELD_ID_META_KEY.to_string(),
875                            "5".to_string(),
876                        )])),
877                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
878                        .with_metadata(HashMap::from([(
879                            PARQUET_FIELD_ID_META_KEY.to_string(),
880                            "6".to_string(),
881                        )])),
882                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
883                        .with_metadata(HashMap::from([(
884                            PARQUET_FIELD_ID_META_KEY.to_string(),
885                            "7".to_string(),
886                        )])),
887                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
888                        .with_metadata(HashMap::from([(
889                            PARQUET_FIELD_ID_META_KEY.to_string(),
890                            "8".to_string(),
891                        )])),
892                ];
893                Arc::new(arrow_schema::Schema::new(fields))
894            };
895            // x: [1, 1, 1, 1, ...]
896            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
897
898            let mut values = vec![2; 512];
899            values.append(vec![3; 200].as_mut());
900            values.append(vec![4; 300].as_mut());
901            values.append(vec![5; 12].as_mut());
902
903            // y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5]
904            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
905
906            let mut values = vec![3; 512];
907            values.append(vec![4; 512].as_mut());
908
909            // z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
910            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
911
912            // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"]
913            let mut values = vec!["Apache"; 512];
914            values.append(vec!["Iceberg"; 512].as_mut());
915            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
916
917            // dbl:
918            let mut values = vec![100.0f64; 512];
919            values.append(vec![150.0f64; 12].as_mut());
920            values.append(vec![200.0f64; 500].as_mut());
921            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
922
923            // i32:
924            let mut values = vec![100i32; 512];
925            values.append(vec![150i32; 12].as_mut());
926            values.append(vec![200i32; 500].as_mut());
927            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
928
929            // i64:
930            let mut values = vec![100i64; 512];
931            values.append(vec![150i64; 12].as_mut());
932            values.append(vec![200i64; 500].as_mut());
933            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
934
935            // bool:
936            let mut values = vec![false; 512];
937            values.append(vec![true; 512].as_mut());
938            let values: BooleanArray = values.into();
939            let col8 = Arc::new(values) as ArrayRef;
940
941            let to_write = RecordBatch::try_new(schema.clone(), vec![
942                col1, col2, col3, col4, col5, col6, col7, col8,
943            ])
944            .unwrap();
945
946            // Write the Parquet files
947            let props = WriterProperties::builder()
948                .set_compression(Compression::SNAPPY)
949                .build();
950
951            for n in 1..=3 {
952                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
953                let mut writer =
954                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
955
956                writer.write(&to_write).expect("Writing batch");
957
958                // writer must be closed to write footer
959                writer.close().unwrap();
960            }
961        }
962
963        pub async fn setup_unpartitioned_manifest_files(&mut self) {
964            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
965            let parent_snapshot = current_snapshot
966                .parent_snapshot(self.table.metadata())
967                .unwrap();
968            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
969            let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
970
971            // Write data files using an empty partition for unpartitioned tables.
972            let mut writer = ManifestWriterBuilder::new(
973                self.next_manifest_file(),
974                Some(current_snapshot.snapshot_id()),
975                None,
976                current_schema.clone(),
977                current_partition_spec.as_ref().clone(),
978            )
979            .build_v2_data();
980
981            // Create an empty partition value.
982            let empty_partition = Struct::empty();
983
984            writer
985                .add_entry(
986                    ManifestEntry::builder()
987                        .status(ManifestStatus::Added)
988                        .data_file(
989                            DataFileBuilder::default()
990                                .partition_spec_id(0)
991                                .content(DataContentType::Data)
992                                .file_path(format!("{}/1.parquet", &self.table_location))
993                                .file_format(DataFileFormat::Parquet)
994                                .file_size_in_bytes(100)
995                                .record_count(1)
996                                .partition(empty_partition.clone())
997                                .key_metadata(None)
998                                .build()
999                                .unwrap(),
1000                        )
1001                        .build(),
1002                )
1003                .unwrap();
1004
1005            writer
1006                .add_delete_entry(
1007                    ManifestEntry::builder()
1008                        .status(ManifestStatus::Deleted)
1009                        .snapshot_id(parent_snapshot.snapshot_id())
1010                        .sequence_number(parent_snapshot.sequence_number())
1011                        .file_sequence_number(parent_snapshot.sequence_number())
1012                        .data_file(
1013                            DataFileBuilder::default()
1014                                .partition_spec_id(0)
1015                                .content(DataContentType::Data)
1016                                .file_path(format!("{}/2.parquet", &self.table_location))
1017                                .file_format(DataFileFormat::Parquet)
1018                                .file_size_in_bytes(100)
1019                                .record_count(1)
1020                                .partition(empty_partition.clone())
1021                                .build()
1022                                .unwrap(),
1023                        )
1024                        .build(),
1025                )
1026                .unwrap();
1027
1028            writer
1029                .add_existing_entry(
1030                    ManifestEntry::builder()
1031                        .status(ManifestStatus::Existing)
1032                        .snapshot_id(parent_snapshot.snapshot_id())
1033                        .sequence_number(parent_snapshot.sequence_number())
1034                        .file_sequence_number(parent_snapshot.sequence_number())
1035                        .data_file(
1036                            DataFileBuilder::default()
1037                                .partition_spec_id(0)
1038                                .content(DataContentType::Data)
1039                                .file_path(format!("{}/3.parquet", &self.table_location))
1040                                .file_format(DataFileFormat::Parquet)
1041                                .file_size_in_bytes(100)
1042                                .record_count(1)
1043                                .partition(empty_partition.clone())
1044                                .build()
1045                                .unwrap(),
1046                        )
1047                        .build(),
1048                )
1049                .unwrap();
1050
1051            let data_file_manifest = writer.write_manifest_file().await.unwrap();
1052
1053            // Write to manifest list
1054            let mut manifest_list_write = ManifestListWriter::v2(
1055                self.table
1056                    .file_io()
1057                    .new_output(current_snapshot.manifest_list())
1058                    .unwrap(),
1059                current_snapshot.snapshot_id(),
1060                current_snapshot.parent_snapshot_id(),
1061                current_snapshot.sequence_number(),
1062            );
1063            manifest_list_write
1064                .add_manifests(vec![data_file_manifest].into_iter())
1065                .unwrap();
1066            manifest_list_write.close().await.unwrap();
1067
1068            // prepare data for parquet files
1069            let schema = {
1070                let fields = vec![
1071                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
1072                        .with_metadata(HashMap::from([(
1073                            PARQUET_FIELD_ID_META_KEY.to_string(),
1074                            "1".to_string(),
1075                        )])),
1076                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
1077                        .with_metadata(HashMap::from([(
1078                            PARQUET_FIELD_ID_META_KEY.to_string(),
1079                            "2".to_string(),
1080                        )])),
1081                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
1082                        .with_metadata(HashMap::from([(
1083                            PARQUET_FIELD_ID_META_KEY.to_string(),
1084                            "3".to_string(),
1085                        )])),
1086                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
1087                        .with_metadata(HashMap::from([(
1088                            PARQUET_FIELD_ID_META_KEY.to_string(),
1089                            "4".to_string(),
1090                        )])),
1091                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
1092                        .with_metadata(HashMap::from([(
1093                            PARQUET_FIELD_ID_META_KEY.to_string(),
1094                            "5".to_string(),
1095                        )])),
1096                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
1097                        .with_metadata(HashMap::from([(
1098                            PARQUET_FIELD_ID_META_KEY.to_string(),
1099                            "6".to_string(),
1100                        )])),
1101                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
1102                        .with_metadata(HashMap::from([(
1103                            PARQUET_FIELD_ID_META_KEY.to_string(),
1104                            "7".to_string(),
1105                        )])),
1106                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
1107                        .with_metadata(HashMap::from([(
1108                            PARQUET_FIELD_ID_META_KEY.to_string(),
1109                            "8".to_string(),
1110                        )])),
1111                ];
1112                Arc::new(arrow_schema::Schema::new(fields))
1113            };
1114
1115            // Build the arrays for the RecordBatch
1116            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1117
1118            let mut values = vec![2; 512];
1119            values.append(vec![3; 200].as_mut());
1120            values.append(vec![4; 300].as_mut());
1121            values.append(vec![5; 12].as_mut());
1122            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1123
1124            let mut values = vec![3; 512];
1125            values.append(vec![4; 512].as_mut());
1126            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1127
1128            let mut values = vec!["Apache"; 512];
1129            values.append(vec!["Iceberg"; 512].as_mut());
1130            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
1131
1132            let mut values = vec![100.0f64; 512];
1133            values.append(vec![150.0f64; 12].as_mut());
1134            values.append(vec![200.0f64; 500].as_mut());
1135            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
1136
1137            let mut values = vec![100i32; 512];
1138            values.append(vec![150i32; 12].as_mut());
1139            values.append(vec![200i32; 500].as_mut());
1140            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
1141
1142            let mut values = vec![100i64; 512];
1143            values.append(vec![150i64; 12].as_mut());
1144            values.append(vec![200i64; 500].as_mut());
1145            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1146
1147            let mut values = vec![false; 512];
1148            values.append(vec![true; 512].as_mut());
1149            let values: BooleanArray = values.into();
1150            let col8 = Arc::new(values) as ArrayRef;
1151
1152            let to_write = RecordBatch::try_new(schema.clone(), vec![
1153                col1, col2, col3, col4, col5, col6, col7, col8,
1154            ])
1155            .unwrap();
1156
1157            // Write the Parquet files
1158            let props = WriterProperties::builder()
1159                .set_compression(Compression::SNAPPY)
1160                .build();
1161
1162            for n in 1..=3 {
1163                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
1164                let mut writer =
1165                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1166
1167                writer.write(&to_write).expect("Writing batch");
1168
1169                // writer must be closed to write footer
1170                writer.close().unwrap();
1171            }
1172        }
1173
1174        pub async fn setup_deadlock_manifests(&mut self) {
1175            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1176            let _parent_snapshot = current_snapshot
1177                .parent_snapshot(self.table.metadata())
1178                .unwrap();
1179            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1180            let current_partition_spec = self.table.metadata().default_partition_spec();
1181
1182            // 1. Write DATA manifest with MULTIPLE entries to fill buffer
1183            let mut writer = ManifestWriterBuilder::new(
1184                self.next_manifest_file(),
1185                Some(current_snapshot.snapshot_id()),
1186                None,
1187                current_schema.clone(),
1188                current_partition_spec.as_ref().clone(),
1189            )
1190            .build_v2_data();
1191
1192            // Add 10 data entries
1193            for i in 0..10 {
1194                writer
1195                    .add_entry(
1196                        ManifestEntry::builder()
1197                            .status(ManifestStatus::Added)
1198                            .data_file(
1199                                DataFileBuilder::default()
1200                                    .partition_spec_id(0)
1201                                    .content(DataContentType::Data)
1202                                    .file_path(format!("{}/{}.parquet", &self.table_location, i))
1203                                    .file_format(DataFileFormat::Parquet)
1204                                    .file_size_in_bytes(100)
1205                                    .record_count(1)
1206                                    .partition(Struct::from_iter([Some(Literal::long(100))]))
1207                                    .key_metadata(None)
1208                                    .build()
1209                                    .unwrap(),
1210                            )
1211                            .build(),
1212                    )
1213                    .unwrap();
1214            }
1215            let data_manifest = writer.write_manifest_file().await.unwrap();
1216
1217            // 2. Write DELETE manifest
1218            let mut writer = ManifestWriterBuilder::new(
1219                self.next_manifest_file(),
1220                Some(current_snapshot.snapshot_id()),
1221                None,
1222                current_schema.clone(),
1223                current_partition_spec.as_ref().clone(),
1224            )
1225            .build_v2_deletes();
1226
1227            writer
1228                .add_entry(
1229                    ManifestEntry::builder()
1230                        .status(ManifestStatus::Added)
1231                        .data_file(
1232                            DataFileBuilder::default()
1233                                .partition_spec_id(0)
1234                                .content(DataContentType::PositionDeletes)
1235                                .file_path(format!("{}/del.parquet", &self.table_location))
1236                                .file_format(DataFileFormat::Parquet)
1237                                .file_size_in_bytes(100)
1238                                .record_count(1)
1239                                .partition(Struct::from_iter([Some(Literal::long(100))]))
1240                                .build()
1241                                .unwrap(),
1242                        )
1243                        .build(),
1244                )
1245                .unwrap();
1246            let delete_manifest = writer.write_manifest_file().await.unwrap();
1247
1248            // Write to manifest list - DATA FIRST then DELETE
1249            // This order is crucial for reproduction
1250            let mut manifest_list_write = ManifestListWriter::v2(
1251                self.table
1252                    .file_io()
1253                    .new_output(current_snapshot.manifest_list())
1254                    .unwrap(),
1255                current_snapshot.snapshot_id(),
1256                current_snapshot.parent_snapshot_id(),
1257                current_snapshot.sequence_number(),
1258            );
1259            manifest_list_write
1260                .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1261                .unwrap();
1262            manifest_list_write.close().await.unwrap();
1263        }
1264    }
1265
1266    #[test]
1267    fn test_table_scan_columns() {
1268        let table = TableTestFixture::new().table;
1269
1270        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1271        assert_eq!(
1272            Some(vec!["x".to_string(), "y".to_string()]),
1273            table_scan.column_names
1274        );
1275
1276        let table_scan = table
1277            .scan()
1278            .select(["x", "y"])
1279            .select(["z"])
1280            .build()
1281            .unwrap();
1282        assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1283    }
1284
1285    #[test]
1286    fn test_select_all() {
1287        let table = TableTestFixture::new().table;
1288
1289        let table_scan = table.scan().select_all().build().unwrap();
1290        assert!(table_scan.column_names.is_none());
1291    }
1292
1293    #[test]
1294    fn test_select_no_exist_column() {
1295        let table = TableTestFixture::new().table;
1296
1297        let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1298        assert!(table_scan.is_err());
1299    }
1300
1301    #[test]
1302    fn test_table_scan_default_snapshot_id() {
1303        let table = TableTestFixture::new().table;
1304
1305        let table_scan = table.scan().build().unwrap();
1306        assert_eq!(
1307            table.metadata().current_snapshot().unwrap().snapshot_id(),
1308            table_scan.snapshot().unwrap().snapshot_id()
1309        );
1310    }
1311
1312    #[test]
1313    fn test_table_scan_non_exist_snapshot_id() {
1314        let table = TableTestFixture::new().table;
1315
1316        let table_scan = table.scan().snapshot_id(1024).build();
1317        assert!(table_scan.is_err());
1318    }
1319
1320    #[test]
1321    fn test_table_scan_with_snapshot_id() {
1322        let table = TableTestFixture::new().table;
1323
1324        let table_scan = table
1325            .scan()
1326            .snapshot_id(3051729675574597004)
1327            .with_row_selection_enabled(true)
1328            .build()
1329            .unwrap();
1330        assert_eq!(
1331            table_scan.snapshot().unwrap().snapshot_id(),
1332            3051729675574597004
1333        );
1334    }
1335
1336    #[tokio::test]
1337    async fn test_plan_files_on_table_without_any_snapshots() {
1338        let table = TableTestFixture::new_empty().table;
1339        let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1340        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1341        assert!(batches.is_empty());
1342    }
1343
1344    #[tokio::test]
1345    async fn test_plan_files_no_deletions() {
1346        let mut fixture = TableTestFixture::new();
1347        fixture.setup_manifest_files().await;
1348
1349        // Create table scan for current snapshot and plan files
1350        let table_scan = fixture
1351            .table
1352            .scan()
1353            .with_row_selection_enabled(true)
1354            .build()
1355            .unwrap();
1356
1357        let mut tasks = table_scan
1358            .plan_files()
1359            .await
1360            .unwrap()
1361            .try_fold(vec![], |mut acc, task| async move {
1362                acc.push(task);
1363                Ok(acc)
1364            })
1365            .await
1366            .unwrap();
1367
1368        assert_eq!(tasks.len(), 2);
1369
1370        tasks.sort_by_key(|t| t.data_file_path.to_string());
1371
1372        // Check first task is added data file
1373        assert_eq!(
1374            tasks[0].data_file_path,
1375            format!("{}/1.parquet", &fixture.table_location)
1376        );
1377
1378        // Check second task is existing data file
1379        assert_eq!(
1380            tasks[1].data_file_path,
1381            format!("{}/3.parquet", &fixture.table_location)
1382        );
1383    }
1384
1385    #[tokio::test]
1386    async fn test_open_parquet_no_deletions() {
1387        let mut fixture = TableTestFixture::new();
1388        fixture.setup_manifest_files().await;
1389
1390        // Create table scan for current snapshot and plan files
1391        let table_scan = fixture
1392            .table
1393            .scan()
1394            .with_row_selection_enabled(true)
1395            .build()
1396            .unwrap();
1397
1398        let batch_stream = table_scan.to_arrow().await.unwrap();
1399
1400        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1401
1402        let col = batches[0].column_by_name("x").unwrap();
1403
1404        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1405        assert_eq!(int64_arr.value(0), 1);
1406    }
1407
1408    #[tokio::test]
1409    async fn test_open_parquet_no_deletions_by_separate_reader() {
1410        let mut fixture = TableTestFixture::new();
1411        fixture.setup_manifest_files().await;
1412
1413        // Create table scan for current snapshot and plan files
1414        let table_scan = fixture
1415            .table
1416            .scan()
1417            .with_row_selection_enabled(true)
1418            .build()
1419            .unwrap();
1420
1421        let mut plan_task: Vec<_> = table_scan
1422            .plan_files()
1423            .await
1424            .unwrap()
1425            .try_collect()
1426            .await
1427            .unwrap();
1428        assert_eq!(plan_task.len(), 2);
1429
1430        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1431        let batch_stream = reader
1432            .clone()
1433            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1434            .unwrap();
1435        let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1436
1437        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1438        let batch_stream = reader
1439            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1440            .unwrap();
1441        let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1442
1443        assert_eq!(batch_1, batch_2);
1444    }
1445
1446    #[tokio::test]
1447    async fn test_open_parquet_with_projection() {
1448        let mut fixture = TableTestFixture::new();
1449        fixture.setup_manifest_files().await;
1450
1451        // Create table scan for current snapshot and plan files
1452        let table_scan = fixture
1453            .table
1454            .scan()
1455            .select(["x", "z"])
1456            .with_row_selection_enabled(true)
1457            .build()
1458            .unwrap();
1459
1460        let batch_stream = table_scan.to_arrow().await.unwrap();
1461
1462        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1463
1464        assert_eq!(batches[0].num_columns(), 2);
1465
1466        let col1 = batches[0].column_by_name("x").unwrap();
1467        let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1468        assert_eq!(int64_arr.value(0), 1);
1469
1470        let col2 = batches[0].column_by_name("z").unwrap();
1471        let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1472        assert_eq!(int64_arr.value(0), 3);
1473
1474        // test empty scan
1475        let table_scan = fixture.table.scan().select_empty().build().unwrap();
1476        let batch_stream = table_scan.to_arrow().await.unwrap();
1477        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1478
1479        assert_eq!(batches[0].num_columns(), 0);
1480        assert_eq!(batches[0].num_rows(), 1024);
1481    }
1482
1483    #[tokio::test]
1484    async fn test_filter_on_arrow_lt() {
1485        let mut fixture = TableTestFixture::new();
1486        fixture.setup_manifest_files().await;
1487
1488        // Filter: y < 3
1489        let mut builder = fixture.table.scan();
1490        let predicate = Reference::new("y").less_than(Datum::long(3));
1491        builder = builder
1492            .with_filter(predicate)
1493            .with_row_selection_enabled(true);
1494        let table_scan = builder.build().unwrap();
1495
1496        let batch_stream = table_scan.to_arrow().await.unwrap();
1497
1498        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1499
1500        assert_eq!(batches[0].num_rows(), 512);
1501
1502        let col = batches[0].column_by_name("x").unwrap();
1503        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1504        assert_eq!(int64_arr.value(0), 1);
1505
1506        let col = batches[0].column_by_name("y").unwrap();
1507        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1508        assert_eq!(int64_arr.value(0), 2);
1509    }
1510
1511    #[tokio::test]
1512    async fn test_filter_on_arrow_gt_eq() {
1513        let mut fixture = TableTestFixture::new();
1514        fixture.setup_manifest_files().await;
1515
1516        // Filter: y >= 5
1517        let mut builder = fixture.table.scan();
1518        let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1519        builder = builder
1520            .with_filter(predicate)
1521            .with_row_selection_enabled(true);
1522        let table_scan = builder.build().unwrap();
1523
1524        let batch_stream = table_scan.to_arrow().await.unwrap();
1525
1526        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1527
1528        assert_eq!(batches[0].num_rows(), 12);
1529
1530        let col = batches[0].column_by_name("x").unwrap();
1531        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1532        assert_eq!(int64_arr.value(0), 1);
1533
1534        let col = batches[0].column_by_name("y").unwrap();
1535        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1536        assert_eq!(int64_arr.value(0), 5);
1537    }
1538
1539    #[tokio::test]
1540    async fn test_filter_double_eq() {
1541        let mut fixture = TableTestFixture::new();
1542        fixture.setup_manifest_files().await;
1543
1544        // Filter: dbl == 150.0
1545        let mut builder = fixture.table.scan();
1546        let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1547        builder = builder
1548            .with_filter(predicate)
1549            .with_row_selection_enabled(true);
1550        let table_scan = builder.build().unwrap();
1551
1552        let batch_stream = table_scan.to_arrow().await.unwrap();
1553
1554        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1555
1556        assert_eq!(batches.len(), 2);
1557        assert_eq!(batches[0].num_rows(), 12);
1558
1559        let col = batches[0].column_by_name("dbl").unwrap();
1560        let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1561        assert_eq!(f64_arr.value(1), 150.0f64);
1562    }
1563
1564    #[tokio::test]
1565    async fn test_filter_int_eq() {
1566        let mut fixture = TableTestFixture::new();
1567        fixture.setup_manifest_files().await;
1568
1569        // Filter: i32 == 150
1570        let mut builder = fixture.table.scan();
1571        let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1572        builder = builder
1573            .with_filter(predicate)
1574            .with_row_selection_enabled(true);
1575        let table_scan = builder.build().unwrap();
1576
1577        let batch_stream = table_scan.to_arrow().await.unwrap();
1578
1579        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1580
1581        assert_eq!(batches.len(), 2);
1582        assert_eq!(batches[0].num_rows(), 12);
1583
1584        let col = batches[0].column_by_name("i32").unwrap();
1585        let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1586        assert_eq!(i32_arr.value(1), 150i32);
1587    }
1588
1589    #[tokio::test]
1590    async fn test_filter_long_eq() {
1591        let mut fixture = TableTestFixture::new();
1592        fixture.setup_manifest_files().await;
1593
1594        // Filter: i64 == 150
1595        let mut builder = fixture.table.scan();
1596        let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1597        builder = builder
1598            .with_filter(predicate)
1599            .with_row_selection_enabled(true);
1600        let table_scan = builder.build().unwrap();
1601
1602        let batch_stream = table_scan.to_arrow().await.unwrap();
1603
1604        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1605
1606        assert_eq!(batches.len(), 2);
1607        assert_eq!(batches[0].num_rows(), 12);
1608
1609        let col = batches[0].column_by_name("i64").unwrap();
1610        let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1611        assert_eq!(i64_arr.value(1), 150i64);
1612    }
1613
1614    #[tokio::test]
1615    async fn test_filter_bool_eq() {
1616        let mut fixture = TableTestFixture::new();
1617        fixture.setup_manifest_files().await;
1618
1619        // Filter: bool == true
1620        let mut builder = fixture.table.scan();
1621        let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1622        builder = builder
1623            .with_filter(predicate)
1624            .with_row_selection_enabled(true);
1625        let table_scan = builder.build().unwrap();
1626
1627        let batch_stream = table_scan.to_arrow().await.unwrap();
1628
1629        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1630
1631        assert_eq!(batches.len(), 2);
1632        assert_eq!(batches[0].num_rows(), 512);
1633
1634        let col = batches[0].column_by_name("bool").unwrap();
1635        let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1636        assert!(bool_arr.value(1));
1637    }
1638
1639    #[tokio::test]
1640    async fn test_filter_on_arrow_is_null() {
1641        let mut fixture = TableTestFixture::new();
1642        fixture.setup_manifest_files().await;
1643
1644        // Filter: y is null
1645        let mut builder = fixture.table.scan();
1646        let predicate = Reference::new("y").is_null();
1647        builder = builder
1648            .with_filter(predicate)
1649            .with_row_selection_enabled(true);
1650        let table_scan = builder.build().unwrap();
1651
1652        let batch_stream = table_scan.to_arrow().await.unwrap();
1653
1654        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1655        assert_eq!(batches.len(), 0);
1656    }
1657
1658    #[tokio::test]
1659    async fn test_filter_on_arrow_is_not_null() {
1660        let mut fixture = TableTestFixture::new();
1661        fixture.setup_manifest_files().await;
1662
1663        // Filter: y is not null
1664        let mut builder = fixture.table.scan();
1665        let predicate = Reference::new("y").is_not_null();
1666        builder = builder
1667            .with_filter(predicate)
1668            .with_row_selection_enabled(true);
1669        let table_scan = builder.build().unwrap();
1670
1671        let batch_stream = table_scan.to_arrow().await.unwrap();
1672
1673        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1674        assert_eq!(batches[0].num_rows(), 1024);
1675    }
1676
1677    #[tokio::test]
1678    async fn test_filter_on_arrow_lt_and_gt() {
1679        let mut fixture = TableTestFixture::new();
1680        fixture.setup_manifest_files().await;
1681
1682        // Filter: y < 5 AND z >= 4
1683        let mut builder = fixture.table.scan();
1684        let predicate = Reference::new("y")
1685            .less_than(Datum::long(5))
1686            .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1687        builder = builder
1688            .with_filter(predicate)
1689            .with_row_selection_enabled(true);
1690        let table_scan = builder.build().unwrap();
1691
1692        let batch_stream = table_scan.to_arrow().await.unwrap();
1693
1694        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1695        assert_eq!(batches[0].num_rows(), 500);
1696
1697        let col = batches[0].column_by_name("x").unwrap();
1698        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1699        assert_eq!(col, &expected_x);
1700
1701        let col = batches[0].column_by_name("y").unwrap();
1702        let mut values = vec![];
1703        values.append(vec![3; 200].as_mut());
1704        values.append(vec![4; 300].as_mut());
1705        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1706        assert_eq!(col, &expected_y);
1707
1708        let col = batches[0].column_by_name("z").unwrap();
1709        let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1710        assert_eq!(col, &expected_z);
1711    }
1712
1713    #[tokio::test]
1714    async fn test_filter_on_arrow_lt_or_gt() {
1715        let mut fixture = TableTestFixture::new();
1716        fixture.setup_manifest_files().await;
1717
1718        // Filter: y < 5 AND z >= 4
1719        let mut builder = fixture.table.scan();
1720        let predicate = Reference::new("y")
1721            .less_than(Datum::long(5))
1722            .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1723        builder = builder
1724            .with_filter(predicate)
1725            .with_row_selection_enabled(true);
1726        let table_scan = builder.build().unwrap();
1727
1728        let batch_stream = table_scan.to_arrow().await.unwrap();
1729
1730        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1731        assert_eq!(batches[0].num_rows(), 1024);
1732
1733        let col = batches[0].column_by_name("x").unwrap();
1734        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1735        assert_eq!(col, &expected_x);
1736
1737        let col = batches[0].column_by_name("y").unwrap();
1738        let mut values = vec![2; 512];
1739        values.append(vec![3; 200].as_mut());
1740        values.append(vec![4; 300].as_mut());
1741        values.append(vec![5; 12].as_mut());
1742        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1743        assert_eq!(col, &expected_y);
1744
1745        let col = batches[0].column_by_name("z").unwrap();
1746        let mut values = vec![3; 512];
1747        values.append(vec![4; 512].as_mut());
1748        let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1749        assert_eq!(col, &expected_z);
1750    }
1751
1752    #[tokio::test]
1753    async fn test_filter_on_arrow_startswith() {
1754        let mut fixture = TableTestFixture::new();
1755        fixture.setup_manifest_files().await;
1756
1757        // Filter: a STARTSWITH "Ice"
1758        let mut builder = fixture.table.scan();
1759        let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1760        builder = builder
1761            .with_filter(predicate)
1762            .with_row_selection_enabled(true);
1763        let table_scan = builder.build().unwrap();
1764
1765        let batch_stream = table_scan.to_arrow().await.unwrap();
1766
1767        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1768
1769        assert_eq!(batches[0].num_rows(), 512);
1770
1771        let col = batches[0].column_by_name("a").unwrap();
1772        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1773        assert_eq!(string_arr.value(0), "Iceberg");
1774    }
1775
1776    #[tokio::test]
1777    async fn test_filter_on_arrow_not_startswith() {
1778        let mut fixture = TableTestFixture::new();
1779        fixture.setup_manifest_files().await;
1780
1781        // Filter: a NOT STARTSWITH "Ice"
1782        let mut builder = fixture.table.scan();
1783        let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1784        builder = builder
1785            .with_filter(predicate)
1786            .with_row_selection_enabled(true);
1787        let table_scan = builder.build().unwrap();
1788
1789        let batch_stream = table_scan.to_arrow().await.unwrap();
1790
1791        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1792
1793        assert_eq!(batches[0].num_rows(), 512);
1794
1795        let col = batches[0].column_by_name("a").unwrap();
1796        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1797        assert_eq!(string_arr.value(0), "Apache");
1798    }
1799
1800    #[tokio::test]
1801    async fn test_filter_on_arrow_in() {
1802        let mut fixture = TableTestFixture::new();
1803        fixture.setup_manifest_files().await;
1804
1805        // Filter: a IN ("Sioux", "Iceberg")
1806        let mut builder = fixture.table.scan();
1807        let predicate =
1808            Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1809        builder = builder
1810            .with_filter(predicate)
1811            .with_row_selection_enabled(true);
1812        let table_scan = builder.build().unwrap();
1813
1814        let batch_stream = table_scan.to_arrow().await.unwrap();
1815
1816        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1817
1818        assert_eq!(batches[0].num_rows(), 512);
1819
1820        let col = batches[0].column_by_name("a").unwrap();
1821        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1822        assert_eq!(string_arr.value(0), "Iceberg");
1823    }
1824
1825    #[tokio::test]
1826    async fn test_filter_on_arrow_not_in() {
1827        let mut fixture = TableTestFixture::new();
1828        fixture.setup_manifest_files().await;
1829
1830        // Filter: a NOT IN ("Sioux", "Iceberg")
1831        let mut builder = fixture.table.scan();
1832        let predicate =
1833            Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1834        builder = builder
1835            .with_filter(predicate)
1836            .with_row_selection_enabled(true);
1837        let table_scan = builder.build().unwrap();
1838
1839        let batch_stream = table_scan.to_arrow().await.unwrap();
1840
1841        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1842
1843        assert_eq!(batches[0].num_rows(), 512);
1844
1845        let col = batches[0].column_by_name("a").unwrap();
1846        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1847        assert_eq!(string_arr.value(0), "Apache");
1848    }
1849
1850    #[test]
1851    fn test_file_scan_task_serialize_deserialize() {
1852        let test_fn = |task: FileScanTask| {
1853            let serialized = serde_json::to_string(&task).unwrap();
1854            let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1855
1856            assert_eq!(task.data_file_path, deserialized.data_file_path);
1857            assert_eq!(task.start, deserialized.start);
1858            assert_eq!(task.length, deserialized.length);
1859            assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1860            assert_eq!(task.predicate, deserialized.predicate);
1861            assert_eq!(task.schema, deserialized.schema);
1862        };
1863
1864        // without predicate
1865        let schema = Arc::new(
1866            Schema::builder()
1867                .with_fields(vec![Arc::new(NestedField::required(
1868                    1,
1869                    "x",
1870                    Type::Primitive(PrimitiveType::Binary),
1871                ))])
1872                .build()
1873                .unwrap(),
1874        );
1875        let task = FileScanTask {
1876            data_file_path: "data_file_path".to_string(),
1877            start: 0,
1878            length: 100,
1879            project_field_ids: vec![1, 2, 3],
1880            predicate: None,
1881            schema: schema.clone(),
1882            record_count: Some(100),
1883            data_file_format: DataFileFormat::Parquet,
1884            deletes: vec![],
1885            partition: None,
1886            partition_spec: None,
1887            name_mapping: None,
1888        };
1889        test_fn(task);
1890
1891        // with predicate
1892        let task = FileScanTask {
1893            data_file_path: "data_file_path".to_string(),
1894            start: 0,
1895            length: 100,
1896            project_field_ids: vec![1, 2, 3],
1897            predicate: Some(BoundPredicate::AlwaysTrue),
1898            schema,
1899            record_count: None,
1900            data_file_format: DataFileFormat::Avro,
1901            deletes: vec![],
1902            partition: None,
1903            partition_spec: None,
1904            name_mapping: None,
1905        };
1906        test_fn(task);
1907    }
1908
1909    #[tokio::test]
1910    async fn test_select_with_file_column() {
1911        use arrow_array::cast::AsArray;
1912
1913        let mut fixture = TableTestFixture::new();
1914        fixture.setup_manifest_files().await;
1915
1916        // Select regular columns plus the _file column
1917        let table_scan = fixture
1918            .table
1919            .scan()
1920            .select(["x", RESERVED_COL_NAME_FILE])
1921            .with_row_selection_enabled(true)
1922            .build()
1923            .unwrap();
1924
1925        let batch_stream = table_scan.to_arrow().await.unwrap();
1926        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1927
1928        // Verify we have 2 columns: x and _file
1929        assert_eq!(batches[0].num_columns(), 2);
1930
1931        // Verify the x column exists and has correct data
1932        let x_col = batches[0].column_by_name("x").unwrap();
1933        let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1934        assert_eq!(x_arr.value(0), 1);
1935
1936        // Verify the _file column exists
1937        let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1938        assert!(
1939            file_col.is_some(),
1940            "_file column should be present in the batch"
1941        );
1942
1943        // Verify the _file column contains a file path
1944        let file_col = file_col.unwrap();
1945        assert!(
1946            matches!(
1947                file_col.data_type(),
1948                arrow_schema::DataType::RunEndEncoded(_, _)
1949            ),
1950            "_file column should use RunEndEncoded type"
1951        );
1952
1953        // Decode the RunArray to verify it contains the file path
1954        let run_array = file_col
1955            .as_any()
1956            .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1957            .expect("_file column should be a RunArray");
1958
1959        let values = run_array.values();
1960        let string_values = values.as_string::<i32>();
1961        assert_eq!(string_values.len(), 1, "Should have a single file path");
1962
1963        let file_path = string_values.value(0);
1964        assert!(
1965            file_path.ends_with(".parquet"),
1966            "File path should end with .parquet, got: {file_path}"
1967        );
1968    }
1969
1970    #[tokio::test]
1971    async fn test_select_file_column_position() {
1972        let mut fixture = TableTestFixture::new();
1973        fixture.setup_manifest_files().await;
1974
1975        // Select columns in specific order: x, _file, z
1976        let table_scan = fixture
1977            .table
1978            .scan()
1979            .select(["x", RESERVED_COL_NAME_FILE, "z"])
1980            .with_row_selection_enabled(true)
1981            .build()
1982            .unwrap();
1983
1984        let batch_stream = table_scan.to_arrow().await.unwrap();
1985        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1986
1987        assert_eq!(batches[0].num_columns(), 3);
1988
1989        // Verify column order: x at position 0, _file at position 1, z at position 2
1990        let schema = batches[0].schema();
1991        assert_eq!(schema.field(0).name(), "x");
1992        assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1993        assert_eq!(schema.field(2).name(), "z");
1994
1995        // Verify columns by name also works
1996        assert!(batches[0].column_by_name("x").is_some());
1997        assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1998        assert!(batches[0].column_by_name("z").is_some());
1999    }
2000
2001    #[tokio::test]
2002    async fn test_select_file_column_only() {
2003        let mut fixture = TableTestFixture::new();
2004        fixture.setup_manifest_files().await;
2005
2006        // Select only the _file column
2007        let table_scan = fixture
2008            .table
2009            .scan()
2010            .select([RESERVED_COL_NAME_FILE])
2011            .with_row_selection_enabled(true)
2012            .build()
2013            .unwrap();
2014
2015        let batch_stream = table_scan.to_arrow().await.unwrap();
2016        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2017
2018        // Should have exactly 1 column
2019        assert_eq!(batches[0].num_columns(), 1);
2020
2021        // Verify it's the _file column
2022        let schema = batches[0].schema();
2023        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2024
2025        // Verify the batch has the correct number of rows
2026        // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted)
2027        // Each file has 1024 rows, so total is 2048 rows
2028        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
2029        assert_eq!(total_rows, 2048);
2030    }
2031
2032    #[tokio::test]
2033    async fn test_file_column_with_multiple_files() {
2034        use std::collections::HashSet;
2035
2036        let mut fixture = TableTestFixture::new();
2037        fixture.setup_manifest_files().await;
2038
2039        // Select x and _file columns
2040        let table_scan = fixture
2041            .table
2042            .scan()
2043            .select(["x", RESERVED_COL_NAME_FILE])
2044            .with_row_selection_enabled(true)
2045            .build()
2046            .unwrap();
2047
2048        let batch_stream = table_scan.to_arrow().await.unwrap();
2049        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2050
2051        // Collect all unique file paths from the batches
2052        let mut file_paths = HashSet::new();
2053        for batch in &batches {
2054            let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
2055            let run_array = file_col
2056                .as_any()
2057                .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
2058                .expect("_file column should be a RunArray");
2059
2060            let values = run_array.values();
2061            let string_values = values.as_string::<i32>();
2062            for i in 0..string_values.len() {
2063                file_paths.insert(string_values.value(i).to_string());
2064            }
2065        }
2066
2067        // We should have multiple files (the test creates 1.parquet and 3.parquet)
2068        assert!(!file_paths.is_empty(), "Should have at least one file path");
2069
2070        // All paths should end with .parquet
2071        for path in &file_paths {
2072            assert!(
2073                path.ends_with(".parquet"),
2074                "All file paths should end with .parquet, got: {path}"
2075            );
2076        }
2077    }
2078
2079    #[tokio::test]
2080    async fn test_file_column_at_start() {
2081        let mut fixture = TableTestFixture::new();
2082        fixture.setup_manifest_files().await;
2083
2084        // Select _file at the start
2085        let table_scan = fixture
2086            .table
2087            .scan()
2088            .select([RESERVED_COL_NAME_FILE, "x", "y"])
2089            .with_row_selection_enabled(true)
2090            .build()
2091            .unwrap();
2092
2093        let batch_stream = table_scan.to_arrow().await.unwrap();
2094        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2095
2096        assert_eq!(batches[0].num_columns(), 3);
2097
2098        // Verify _file is at position 0
2099        let schema = batches[0].schema();
2100        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2101        assert_eq!(schema.field(1).name(), "x");
2102        assert_eq!(schema.field(2).name(), "y");
2103    }
2104
2105    #[tokio::test]
2106    async fn test_file_column_at_end() {
2107        let mut fixture = TableTestFixture::new();
2108        fixture.setup_manifest_files().await;
2109
2110        // Select _file at the end
2111        let table_scan = fixture
2112            .table
2113            .scan()
2114            .select(["x", "y", RESERVED_COL_NAME_FILE])
2115            .with_row_selection_enabled(true)
2116            .build()
2117            .unwrap();
2118
2119        let batch_stream = table_scan.to_arrow().await.unwrap();
2120        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2121
2122        assert_eq!(batches[0].num_columns(), 3);
2123
2124        // Verify _file is at position 2 (the end)
2125        let schema = batches[0].schema();
2126        assert_eq!(schema.field(0).name(), "x");
2127        assert_eq!(schema.field(1).name(), "y");
2128        assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2129    }
2130
2131    #[tokio::test]
2132    async fn test_select_with_repeated_column_names() {
2133        let mut fixture = TableTestFixture::new();
2134        fixture.setup_manifest_files().await;
2135
2136        // Select with repeated column names - both regular columns and virtual columns
2137        // Repeated columns should appear multiple times in the result (duplicates are allowed)
2138        let table_scan = fixture
2139            .table
2140            .scan()
2141            .select([
2142                "x",
2143                RESERVED_COL_NAME_FILE,
2144                "x", // x repeated
2145                "y",
2146                RESERVED_COL_NAME_FILE, // _file repeated
2147                "y",                    // y repeated
2148            ])
2149            .with_row_selection_enabled(true)
2150            .build()
2151            .unwrap();
2152
2153        let batch_stream = table_scan.to_arrow().await.unwrap();
2154        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2155
2156        // Verify we have exactly 6 columns (duplicates are allowed and preserved)
2157        assert_eq!(
2158            batches[0].num_columns(),
2159            6,
2160            "Should have exactly 6 columns with duplicates"
2161        );
2162
2163        let schema = batches[0].schema();
2164
2165        // Verify columns appear in the exact order requested: x, _file, x, y, _file, y
2166        assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2167        assert_eq!(
2168            schema.field(1).name(),
2169            RESERVED_COL_NAME_FILE,
2170            "Column 1 should be _file"
2171        );
2172        assert_eq!(
2173            schema.field(2).name(),
2174            "x",
2175            "Column 2 should be x (duplicate)"
2176        );
2177        assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2178        assert_eq!(
2179            schema.field(4).name(),
2180            RESERVED_COL_NAME_FILE,
2181            "Column 4 should be _file (duplicate)"
2182        );
2183        assert_eq!(
2184            schema.field(5).name(),
2185            "y",
2186            "Column 5 should be y (duplicate)"
2187        );
2188
2189        // Verify all columns have correct data types
2190        assert!(
2191            matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2192            "Column x should be Int64"
2193        );
2194        assert!(
2195            matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2196            "Column x (duplicate) should be Int64"
2197        );
2198        assert!(
2199            matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2200            "Column y should be Int64"
2201        );
2202        assert!(
2203            matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2204            "Column y (duplicate) should be Int64"
2205        );
2206        assert!(
2207            matches!(
2208                schema.field(1).data_type(),
2209                arrow_schema::DataType::RunEndEncoded(_, _)
2210            ),
2211            "_file column should use RunEndEncoded type"
2212        );
2213        assert!(
2214            matches!(
2215                schema.field(4).data_type(),
2216                arrow_schema::DataType::RunEndEncoded(_, _)
2217            ),
2218            "_file column (duplicate) should use RunEndEncoded type"
2219        );
2220    }
2221
2222    #[tokio::test]
2223    async fn test_scan_deadlock() {
2224        let mut fixture = TableTestFixture::new();
2225        fixture.setup_deadlock_manifests().await;
2226
2227        // Create table scan with concurrency limit 1
2228        // This sets channel size to 1.
2229        // Data manifest has 10 entries -> will block producer.
2230        // Delete manifest is 2nd in list -> won't be processed.
2231        // Consumer 2 (Data) not started -> blocked.
2232        // Consumer 1 (Delete) waiting -> blocked.
2233        let table_scan = fixture
2234            .table
2235            .scan()
2236            .with_concurrency_limit(1)
2237            .build()
2238            .unwrap();
2239
2240        // This should timeout/hang if deadlock exists
2241        // We can use tokio::time::timeout
2242        let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2243            table_scan
2244                .plan_files()
2245                .await
2246                .unwrap()
2247                .try_collect::<Vec<_>>()
2248                .await
2249        })
2250        .await;
2251
2252        // Assert it finished (didn't timeout)
2253        assert!(result.is_ok(), "Scan timed out - deadlock detected");
2254    }
2255}