iceberg/scan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Table scan api.
19
20mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35use crate::delete_file_index::DeleteFileIndex;
36use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
37use crate::expr::{Bind, BoundPredicate, Predicate};
38use crate::io::FileIO;
39use crate::runtime::spawn;
40use crate::spec::{DataContentType, SnapshotRef};
41use crate::table::Table;
42use crate::utils::available_parallelism;
43use crate::{Error, ErrorKind, Result};
44
45/// A stream of arrow [`RecordBatch`]es.
46pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
47
48/// Builder to create table scan.
49pub struct TableScanBuilder<'a> {
50    table: &'a Table,
51    // Defaults to none which means select all columns
52    column_names: Option<Vec<String>>,
53    snapshot_id: Option<i64>,
54    batch_size: Option<usize>,
55    case_sensitive: bool,
56    filter: Option<Predicate>,
57    concurrency_limit_data_files: usize,
58    concurrency_limit_manifest_entries: usize,
59    concurrency_limit_manifest_files: usize,
60    row_group_filtering_enabled: bool,
61    row_selection_enabled: bool,
62}
63
64impl<'a> TableScanBuilder<'a> {
65    pub(crate) fn new(table: &'a Table) -> Self {
66        let num_cpus = available_parallelism().get();
67
68        Self {
69            table,
70            column_names: None,
71            snapshot_id: None,
72            batch_size: None,
73            case_sensitive: true,
74            filter: None,
75            concurrency_limit_data_files: num_cpus,
76            concurrency_limit_manifest_entries: num_cpus,
77            concurrency_limit_manifest_files: num_cpus,
78            row_group_filtering_enabled: true,
79            row_selection_enabled: false,
80        }
81    }
82
83    /// Sets the desired size of batches in the response
84    /// to something other than the default
85    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
86        self.batch_size = batch_size;
87        self
88    }
89
90    /// Sets the scan's case sensitivity
91    pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
92        self.case_sensitive = case_sensitive;
93        self
94    }
95
96    /// Specifies a predicate to use as a filter
97    pub fn with_filter(mut self, predicate: Predicate) -> Self {
98        // calls rewrite_not to remove Not nodes, which must be absent
99        // when applying the manifest evaluator
100        self.filter = Some(predicate.rewrite_not());
101        self
102    }
103
104    /// Select all columns.
105    pub fn select_all(mut self) -> Self {
106        self.column_names = None;
107        self
108    }
109
110    /// Select empty columns.
111    pub fn select_empty(mut self) -> Self {
112        self.column_names = Some(vec![]);
113        self
114    }
115
116    /// Select some columns of the table.
117    pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
118        self.column_names = Some(
119            column_names
120                .into_iter()
121                .map(|item| item.to_string())
122                .collect(),
123        );
124        self
125    }
126
127    /// Set the snapshot to scan. When not set, it uses current snapshot.
128    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
129        self.snapshot_id = Some(snapshot_id);
130        self
131    }
132
133    /// Sets the concurrency limit for both manifest files and manifest
134    /// entries for this scan
135    pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
136        self.concurrency_limit_manifest_files = limit;
137        self.concurrency_limit_manifest_entries = limit;
138        self.concurrency_limit_data_files = limit;
139        self
140    }
141
142    /// Sets the data file concurrency limit for this scan
143    pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
144        self.concurrency_limit_data_files = limit;
145        self
146    }
147
148    /// Sets the manifest entry concurrency limit for this scan
149    pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
150        self.concurrency_limit_manifest_entries = limit;
151        self
152    }
153
154    /// Determines whether to enable row group filtering.
155    /// When enabled, if a read is performed with a filter predicate,
156    /// then the metadata for each row group in the parquet file is
157    /// evaluated against the filter predicate and row groups
158    /// that cant contain matching rows will be skipped entirely.
159    ///
160    /// Defaults to enabled, as it generally improves performance or
161    /// keeps it the same, with performance degradation unlikely.
162    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
163        self.row_group_filtering_enabled = row_group_filtering_enabled;
164        self
165    }
166
167    /// Determines whether to enable row selection.
168    /// When enabled, if a read is performed with a filter predicate,
169    /// then (for row groups that have not been skipped) the page index
170    /// for each row group in a parquet file is parsed and evaluated
171    /// against the filter predicate to determine if ranges of rows
172    /// within a row group can be skipped, based upon the page-level
173    /// statistics for each column.
174    ///
175    /// Defaults to being disabled. Enabling requires parsing the parquet page
176    /// index, which can be slow enough that parsing the page index outweighs any
177    /// gains from the reduced number of rows that need scanning.
178    /// It is recommended to experiment with partitioning, sorting, row group size,
179    /// page size, and page row limit Iceberg settings on the table being scanned in
180    /// order to get the best performance from using row selection.
181    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
182        self.row_selection_enabled = row_selection_enabled;
183        self
184    }
185
186    /// Build the table scan.
187    pub fn build(self) -> Result<TableScan> {
188        let snapshot = match self.snapshot_id {
189            Some(snapshot_id) => self
190                .table
191                .metadata()
192                .snapshot_by_id(snapshot_id)
193                .ok_or_else(|| {
194                    Error::new(
195                        ErrorKind::DataInvalid,
196                        format!("Snapshot with id {} not found", snapshot_id),
197                    )
198                })?
199                .clone(),
200            None => {
201                let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
202                    return Ok(TableScan {
203                        batch_size: self.batch_size,
204                        column_names: self.column_names,
205                        file_io: self.table.file_io().clone(),
206                        plan_context: None,
207                        concurrency_limit_data_files: self.concurrency_limit_data_files,
208                        concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
209                        concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
210                        row_group_filtering_enabled: self.row_group_filtering_enabled,
211                        row_selection_enabled: self.row_selection_enabled,
212                    });
213                };
214                current_snapshot_id.clone()
215            }
216        };
217
218        let schema = snapshot.schema(self.table.metadata())?;
219
220        // Check that all column names exist in the schema.
221        if let Some(column_names) = self.column_names.as_ref() {
222            for column_name in column_names {
223                if schema.field_by_name(column_name).is_none() {
224                    return Err(Error::new(
225                        ErrorKind::DataInvalid,
226                        format!(
227                            "Column {} not found in table. Schema: {}",
228                            column_name, schema
229                        ),
230                    ));
231                }
232            }
233        }
234
235        let mut field_ids = vec![];
236        let column_names = self.column_names.clone().unwrap_or_else(|| {
237            schema
238                .as_struct()
239                .fields()
240                .iter()
241                .map(|f| f.name.clone())
242                .collect()
243        });
244
245        for column_name in column_names.iter() {
246            let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
247                Error::new(
248                    ErrorKind::DataInvalid,
249                    format!(
250                        "Column {} not found in table. Schema: {}",
251                        column_name, schema
252                    ),
253                )
254            })?;
255
256            schema
257                .as_struct()
258                .field_by_id(field_id)
259                .ok_or_else(|| {
260                    Error::new(
261                        ErrorKind::FeatureUnsupported,
262                        format!(
263                            "Column {} is not a direct child of schema but a nested field, which is not supported now. Schema: {}",
264                            column_name, schema
265                        ),
266                    )
267                })?;
268
269            field_ids.push(field_id);
270        }
271
272        let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
273            Some(predicates.bind(schema.clone(), true)?)
274        } else {
275            None
276        };
277
278        let plan_context = PlanContext {
279            snapshot,
280            table_metadata: self.table.metadata_ref(),
281            snapshot_schema: schema,
282            case_sensitive: self.case_sensitive,
283            predicate: self.filter.map(Arc::new),
284            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
285            object_cache: self.table.object_cache(),
286            field_ids: Arc::new(field_ids),
287            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
288            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
289            expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
290        };
291
292        Ok(TableScan {
293            batch_size: self.batch_size,
294            column_names: self.column_names,
295            file_io: self.table.file_io().clone(),
296            plan_context: Some(plan_context),
297            concurrency_limit_data_files: self.concurrency_limit_data_files,
298            concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
299            concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
300            row_group_filtering_enabled: self.row_group_filtering_enabled,
301            row_selection_enabled: self.row_selection_enabled,
302        })
303    }
304}
305
306/// Table scan.
307#[derive(Debug)]
308pub struct TableScan {
309    /// A [PlanContext], if this table has at least one snapshot, otherwise None.
310    ///
311    /// If this is None, then the scan contains no rows.
312    plan_context: Option<PlanContext>,
313    batch_size: Option<usize>,
314    file_io: FileIO,
315    column_names: Option<Vec<String>>,
316    /// The maximum number of manifest files that will be
317    /// retrieved from [`FileIO`] concurrently
318    concurrency_limit_manifest_files: usize,
319
320    /// The maximum number of [`ManifestEntry`]s that will
321    /// be processed in parallel
322    concurrency_limit_manifest_entries: usize,
323
324    /// The maximum number of [`ManifestEntry`]s that will
325    /// be processed in parallel
326    concurrency_limit_data_files: usize,
327
328    row_group_filtering_enabled: bool,
329    row_selection_enabled: bool,
330}
331
332impl TableScan {
333    /// Returns a stream of [`FileScanTask`]s.
334    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
335        let Some(plan_context) = self.plan_context.as_ref() else {
336            return Ok(Box::pin(futures::stream::empty()));
337        };
338
339        let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
340        let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
341
342        // used to stream ManifestEntryContexts between stages of the file plan operation
343        let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
344            channel(concurrency_limit_manifest_files);
345        let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
346            channel(concurrency_limit_manifest_files);
347
348        // used to stream the results back to the caller
349        let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
350
351        let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
352
353        let manifest_list = plan_context.get_manifest_list().await?;
354
355        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
356        // whose partitions cannot match this
357        // scan's filter
358        let manifest_file_contexts = plan_context.build_manifest_file_contexts(
359            manifest_list,
360            manifest_entry_data_ctx_tx,
361            delete_file_idx.clone(),
362            manifest_entry_delete_ctx_tx,
363        )?;
364
365        let mut channel_for_manifest_error = file_scan_task_tx.clone();
366
367        // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s
368        spawn(async move {
369            let result = futures::stream::iter(manifest_file_contexts)
370                .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
371                    ctx.fetch_manifest_and_stream_manifest_entries().await
372                })
373                .await;
374
375            if let Err(error) = result {
376                let _ = channel_for_manifest_error.send(Err(error)).await;
377            }
378        });
379
380        let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
381        let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
382
383        // Process the delete file [`ManifestEntry`] stream in parallel
384        spawn(async move {
385            let result = manifest_entry_delete_ctx_rx
386                .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
387                .try_for_each_concurrent(
388                    concurrency_limit_manifest_entries,
389                    |(manifest_entry_context, tx)| async move {
390                        spawn(async move {
391                            Self::process_delete_manifest_entry(manifest_entry_context, tx).await
392                        })
393                        .await
394                    },
395                )
396                .await;
397
398            if let Err(error) = result {
399                let _ = channel_for_delete_manifest_entry_error
400                    .send(Err(error))
401                    .await;
402            }
403        })
404        .await;
405
406        // Process the data file [`ManifestEntry`] stream in parallel
407        spawn(async move {
408            let result = manifest_entry_data_ctx_rx
409                .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
410                .try_for_each_concurrent(
411                    concurrency_limit_manifest_entries,
412                    |(manifest_entry_context, tx)| async move {
413                        spawn(async move {
414                            Self::process_data_manifest_entry(manifest_entry_context, tx).await
415                        })
416                        .await
417                    },
418                )
419                .await;
420
421            if let Err(error) = result {
422                let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
423            }
424        });
425
426        Ok(file_scan_task_rx.boxed())
427    }
428
429    /// Returns an [`ArrowRecordBatchStream`].
430    pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
431        let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
432            .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
433            .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
434            .with_row_selection_enabled(self.row_selection_enabled);
435
436        if let Some(batch_size) = self.batch_size {
437            arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
438        }
439
440        arrow_reader_builder.build().read(self.plan_files().await?)
441    }
442
443    /// Returns a reference to the column names of the table scan.
444    pub fn column_names(&self) -> Option<&[String]> {
445        self.column_names.as_deref()
446    }
447
448    /// Returns a reference to the snapshot of the table scan.
449    pub fn snapshot(&self) -> Option<&SnapshotRef> {
450        self.plan_context.as_ref().map(|x| &x.snapshot)
451    }
452
453    async fn process_data_manifest_entry(
454        manifest_entry_context: ManifestEntryContext,
455        mut file_scan_task_tx: Sender<Result<FileScanTask>>,
456    ) -> Result<()> {
457        // skip processing this manifest entry if it has been marked as deleted
458        if !manifest_entry_context.manifest_entry.is_alive() {
459            return Ok(());
460        }
461
462        // abort the plan if we encounter a manifest entry for a delete file
463        if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
464            return Err(Error::new(
465                ErrorKind::FeatureUnsupported,
466                "Encountered an entry for a delete file in a data file manifest",
467            ));
468        }
469
470        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
471            let BoundPredicates {
472                snapshot_bound_predicate,
473                partition_bound_predicate,
474            } = bound_predicates.as_ref();
475
476            let expression_evaluator_cache =
477                manifest_entry_context.expression_evaluator_cache.as_ref();
478
479            let expression_evaluator = expression_evaluator_cache.get(
480                manifest_entry_context.partition_spec_id,
481                partition_bound_predicate,
482            )?;
483
484            // skip any data file whose partition data indicates that it can't contain
485            // any data that matches this scan's filter
486            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
487                return Ok(());
488            }
489
490            // skip any data file whose metrics don't match this scan's filter
491            if !InclusiveMetricsEvaluator::eval(
492                snapshot_bound_predicate,
493                manifest_entry_context.manifest_entry.data_file(),
494                false,
495            )? {
496                return Ok(());
497            }
498        }
499
500        // congratulations! the manifest entry has made its way through the
501        // entire plan without getting filtered out. Create a corresponding
502        // FileScanTask and push it to the result stream
503        file_scan_task_tx
504            .send(Ok(manifest_entry_context.into_file_scan_task().await?))
505            .await?;
506
507        Ok(())
508    }
509
510    async fn process_delete_manifest_entry(
511        manifest_entry_context: ManifestEntryContext,
512        mut delete_file_ctx_tx: Sender<DeleteFileContext>,
513    ) -> Result<()> {
514        // skip processing this manifest entry if it has been marked as deleted
515        if !manifest_entry_context.manifest_entry.is_alive() {
516            return Ok(());
517        }
518
519        // abort the plan if we encounter a manifest entry that is not for a delete file
520        if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
521            return Err(Error::new(
522                ErrorKind::FeatureUnsupported,
523                "Encountered an entry for a data file in a delete manifest",
524            ));
525        }
526
527        if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
528            let expression_evaluator_cache =
529                manifest_entry_context.expression_evaluator_cache.as_ref();
530
531            let expression_evaluator = expression_evaluator_cache.get(
532                manifest_entry_context.partition_spec_id,
533                &bound_predicates.partition_bound_predicate,
534            )?;
535
536            // skip any data file whose partition data indicates that it can't contain
537            // any data that matches this scan's filter
538            if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
539                return Ok(());
540            }
541        }
542
543        delete_file_ctx_tx
544            .send(DeleteFileContext {
545                manifest_entry: manifest_entry_context.manifest_entry.clone(),
546                partition_spec_id: manifest_entry_context.partition_spec_id,
547            })
548            .await?;
549
550        Ok(())
551    }
552}
553
554pub(crate) struct BoundPredicates {
555    partition_bound_predicate: BoundPredicate,
556    snapshot_bound_predicate: BoundPredicate,
557}
558
559#[cfg(test)]
560pub mod tests {
561    //! shared tests for the table scan API
562    #![allow(missing_docs)]
563
564    use std::collections::HashMap;
565    use std::fs;
566    use std::fs::File;
567    use std::sync::Arc;
568
569    use arrow_array::{
570        ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
571    };
572    use futures::{TryStreamExt, stream};
573    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
574    use parquet::basic::Compression;
575    use parquet::file::properties::WriterProperties;
576    use tempfile::TempDir;
577    use tera::{Context, Tera};
578    use uuid::Uuid;
579
580    use crate::TableIdent;
581    use crate::arrow::ArrowReaderBuilder;
582    use crate::expr::{BoundPredicate, Reference};
583    use crate::io::{FileIO, OutputFile};
584    use crate::scan::FileScanTask;
585    use crate::spec::{
586        DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
587        ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
588        PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
589    };
590    use crate::table::Table;
591
592    pub struct TableTestFixture {
593        pub table_location: String,
594        pub table: Table,
595    }
596
597    impl TableTestFixture {
598        #[allow(clippy::new_without_default)]
599        pub fn new() -> Self {
600            let tmp_dir = TempDir::new().unwrap();
601            let table_location = tmp_dir.path().join("table1");
602            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
603            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
604            let table_metadata1_location = table_location.join("metadata/v1.json");
605
606            let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
607                .unwrap()
608                .build()
609                .unwrap();
610
611            let table_metadata = {
612                let template_json_str = fs::read_to_string(format!(
613                    "{}/testdata/example_table_metadata_v2.json",
614                    env!("CARGO_MANIFEST_DIR")
615                ))
616                .unwrap();
617                let mut context = Context::new();
618                context.insert("table_location", &table_location);
619                context.insert("manifest_list_1_location", &manifest_list1_location);
620                context.insert("manifest_list_2_location", &manifest_list2_location);
621                context.insert("table_metadata_1_location", &table_metadata1_location);
622
623                let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap();
624                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
625            };
626
627            let table = Table::builder()
628                .metadata(table_metadata)
629                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
630                .file_io(file_io.clone())
631                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
632                .build()
633                .unwrap();
634
635            Self {
636                table_location: table_location.to_str().unwrap().to_string(),
637                table,
638            }
639        }
640
641        #[allow(clippy::new_without_default)]
642        pub fn new_empty() -> Self {
643            let tmp_dir = TempDir::new().unwrap();
644            let table_location = tmp_dir.path().join("table1");
645            let table_metadata1_location = table_location.join("metadata/v1.json");
646
647            let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
648                .unwrap()
649                .build()
650                .unwrap();
651
652            let table_metadata = {
653                let template_json_str = fs::read_to_string(format!(
654                    "{}/testdata/example_empty_table_metadata_v2.json",
655                    env!("CARGO_MANIFEST_DIR")
656                ))
657                .unwrap();
658                let mut context = Context::new();
659                context.insert("table_location", &table_location);
660                context.insert("table_metadata_1_location", &table_metadata1_location);
661
662                let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap();
663                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
664            };
665
666            let table = Table::builder()
667                .metadata(table_metadata)
668                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
669                .file_io(file_io.clone())
670                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
671                .build()
672                .unwrap();
673
674            Self {
675                table_location: table_location.to_str().unwrap().to_string(),
676                table,
677            }
678        }
679
680        pub fn new_unpartitioned() -> Self {
681            let tmp_dir = TempDir::new().unwrap();
682            let table_location = tmp_dir.path().join("table1");
683            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
684            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
685            let table_metadata1_location = table_location.join("metadata/v1.json");
686
687            let file_io = FileIO::from_path(table_location.to_str().unwrap())
688                .unwrap()
689                .build()
690                .unwrap();
691
692            let mut table_metadata = {
693                let template_json_str = fs::read_to_string(format!(
694                    "{}/testdata/example_table_metadata_v2.json",
695                    env!("CARGO_MANIFEST_DIR")
696                ))
697                .unwrap();
698                let mut context = Context::new();
699                context.insert("table_location", &table_location);
700                context.insert("manifest_list_1_location", &manifest_list1_location);
701                context.insert("manifest_list_2_location", &manifest_list2_location);
702                context.insert("table_metadata_1_location", &table_metadata1_location);
703
704                let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap();
705                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
706            };
707
708            table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
709            table_metadata.partition_specs.clear();
710            table_metadata.default_partition_type = StructType::new(vec![]);
711            table_metadata
712                .partition_specs
713                .insert(0, table_metadata.default_spec.clone());
714
715            let table = Table::builder()
716                .metadata(table_metadata)
717                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
718                .file_io(file_io.clone())
719                .metadata_location(table_metadata1_location.to_str().unwrap())
720                .build()
721                .unwrap();
722
723            Self {
724                table_location: table_location.to_str().unwrap().to_string(),
725                table,
726            }
727        }
728
729        fn next_manifest_file(&self) -> OutputFile {
730            self.table
731                .file_io()
732                .new_output(format!(
733                    "{}/metadata/manifest_{}.avro",
734                    self.table_location,
735                    Uuid::new_v4()
736                ))
737                .unwrap()
738        }
739
740        pub async fn setup_manifest_files(&mut self) {
741            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
742            let parent_snapshot = current_snapshot
743                .parent_snapshot(self.table.metadata())
744                .unwrap();
745            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
746            let current_partition_spec = self.table.metadata().default_partition_spec();
747
748            // Write data files
749            let mut writer = ManifestWriterBuilder::new(
750                self.next_manifest_file(),
751                Some(current_snapshot.snapshot_id()),
752                None,
753                current_schema.clone(),
754                current_partition_spec.as_ref().clone(),
755            )
756            .build_v2_data();
757            writer
758                .add_entry(
759                    ManifestEntry::builder()
760                        .status(ManifestStatus::Added)
761                        .data_file(
762                            DataFileBuilder::default()
763                                .partition_spec_id(0)
764                                .content(DataContentType::Data)
765                                .file_path(format!("{}/1.parquet", &self.table_location))
766                                .file_format(DataFileFormat::Parquet)
767                                .file_size_in_bytes(100)
768                                .record_count(1)
769                                .partition(Struct::from_iter([Some(Literal::long(100))]))
770                                .key_metadata(None)
771                                .build()
772                                .unwrap(),
773                        )
774                        .build(),
775                )
776                .unwrap();
777            writer
778                .add_delete_entry(
779                    ManifestEntry::builder()
780                        .status(ManifestStatus::Deleted)
781                        .snapshot_id(parent_snapshot.snapshot_id())
782                        .sequence_number(parent_snapshot.sequence_number())
783                        .file_sequence_number(parent_snapshot.sequence_number())
784                        .data_file(
785                            DataFileBuilder::default()
786                                .partition_spec_id(0)
787                                .content(DataContentType::Data)
788                                .file_path(format!("{}/2.parquet", &self.table_location))
789                                .file_format(DataFileFormat::Parquet)
790                                .file_size_in_bytes(100)
791                                .record_count(1)
792                                .partition(Struct::from_iter([Some(Literal::long(200))]))
793                                .build()
794                                .unwrap(),
795                        )
796                        .build(),
797                )
798                .unwrap();
799            writer
800                .add_existing_entry(
801                    ManifestEntry::builder()
802                        .status(ManifestStatus::Existing)
803                        .snapshot_id(parent_snapshot.snapshot_id())
804                        .sequence_number(parent_snapshot.sequence_number())
805                        .file_sequence_number(parent_snapshot.sequence_number())
806                        .data_file(
807                            DataFileBuilder::default()
808                                .partition_spec_id(0)
809                                .content(DataContentType::Data)
810                                .file_path(format!("{}/3.parquet", &self.table_location))
811                                .file_format(DataFileFormat::Parquet)
812                                .file_size_in_bytes(100)
813                                .record_count(1)
814                                .partition(Struct::from_iter([Some(Literal::long(300))]))
815                                .build()
816                                .unwrap(),
817                        )
818                        .build(),
819                )
820                .unwrap();
821            let data_file_manifest = writer.write_manifest_file().await.unwrap();
822
823            // Write to manifest list
824            let mut manifest_list_write = ManifestListWriter::v2(
825                self.table
826                    .file_io()
827                    .new_output(current_snapshot.manifest_list())
828                    .unwrap(),
829                current_snapshot.snapshot_id(),
830                current_snapshot.parent_snapshot_id(),
831                current_snapshot.sequence_number(),
832            );
833            manifest_list_write
834                .add_manifests(vec![data_file_manifest].into_iter())
835                .unwrap();
836            manifest_list_write.close().await.unwrap();
837
838            // prepare data
839            let schema = {
840                let fields = vec![
841                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
842                        .with_metadata(HashMap::from([(
843                            PARQUET_FIELD_ID_META_KEY.to_string(),
844                            "1".to_string(),
845                        )])),
846                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
847                        .with_metadata(HashMap::from([(
848                            PARQUET_FIELD_ID_META_KEY.to_string(),
849                            "2".to_string(),
850                        )])),
851                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
852                        .with_metadata(HashMap::from([(
853                            PARQUET_FIELD_ID_META_KEY.to_string(),
854                            "3".to_string(),
855                        )])),
856                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
857                        .with_metadata(HashMap::from([(
858                            PARQUET_FIELD_ID_META_KEY.to_string(),
859                            "4".to_string(),
860                        )])),
861                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
862                        .with_metadata(HashMap::from([(
863                            PARQUET_FIELD_ID_META_KEY.to_string(),
864                            "5".to_string(),
865                        )])),
866                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
867                        .with_metadata(HashMap::from([(
868                            PARQUET_FIELD_ID_META_KEY.to_string(),
869                            "6".to_string(),
870                        )])),
871                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
872                        .with_metadata(HashMap::from([(
873                            PARQUET_FIELD_ID_META_KEY.to_string(),
874                            "7".to_string(),
875                        )])),
876                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
877                        .with_metadata(HashMap::from([(
878                            PARQUET_FIELD_ID_META_KEY.to_string(),
879                            "8".to_string(),
880                        )])),
881                ];
882                Arc::new(arrow_schema::Schema::new(fields))
883            };
884            // x: [1, 1, 1, 1, ...]
885            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
886
887            let mut values = vec![2; 512];
888            values.append(vec![3; 200].as_mut());
889            values.append(vec![4; 300].as_mut());
890            values.append(vec![5; 12].as_mut());
891
892            // y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5]
893            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
894
895            let mut values = vec![3; 512];
896            values.append(vec![4; 512].as_mut());
897
898            // z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
899            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
900
901            // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"]
902            let mut values = vec!["Apache"; 512];
903            values.append(vec!["Iceberg"; 512].as_mut());
904            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
905
906            // dbl:
907            let mut values = vec![100.0f64; 512];
908            values.append(vec![150.0f64; 12].as_mut());
909            values.append(vec![200.0f64; 500].as_mut());
910            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
911
912            // i32:
913            let mut values = vec![100i32; 512];
914            values.append(vec![150i32; 12].as_mut());
915            values.append(vec![200i32; 500].as_mut());
916            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
917
918            // i64:
919            let mut values = vec![100i64; 512];
920            values.append(vec![150i64; 12].as_mut());
921            values.append(vec![200i64; 500].as_mut());
922            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
923
924            // bool:
925            let mut values = vec![false; 512];
926            values.append(vec![true; 512].as_mut());
927            let values: BooleanArray = values.into();
928            let col8 = Arc::new(values) as ArrayRef;
929
930            let to_write = RecordBatch::try_new(schema.clone(), vec![
931                col1, col2, col3, col4, col5, col6, col7, col8,
932            ])
933            .unwrap();
934
935            // Write the Parquet files
936            let props = WriterProperties::builder()
937                .set_compression(Compression::SNAPPY)
938                .build();
939
940            for n in 1..=3 {
941                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
942                let mut writer =
943                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
944
945                writer.write(&to_write).expect("Writing batch");
946
947                // writer must be closed to write footer
948                writer.close().unwrap();
949            }
950        }
951
952        pub async fn setup_unpartitioned_manifest_files(&mut self) {
953            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
954            let parent_snapshot = current_snapshot
955                .parent_snapshot(self.table.metadata())
956                .unwrap();
957            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
958            let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
959
960            // Write data files using an empty partition for unpartitioned tables.
961            let mut writer = ManifestWriterBuilder::new(
962                self.next_manifest_file(),
963                Some(current_snapshot.snapshot_id()),
964                None,
965                current_schema.clone(),
966                current_partition_spec.as_ref().clone(),
967            )
968            .build_v2_data();
969
970            // Create an empty partition value.
971            let empty_partition = Struct::empty();
972
973            writer
974                .add_entry(
975                    ManifestEntry::builder()
976                        .status(ManifestStatus::Added)
977                        .data_file(
978                            DataFileBuilder::default()
979                                .partition_spec_id(0)
980                                .content(DataContentType::Data)
981                                .file_path(format!("{}/1.parquet", &self.table_location))
982                                .file_format(DataFileFormat::Parquet)
983                                .file_size_in_bytes(100)
984                                .record_count(1)
985                                .partition(empty_partition.clone())
986                                .key_metadata(None)
987                                .build()
988                                .unwrap(),
989                        )
990                        .build(),
991                )
992                .unwrap();
993
994            writer
995                .add_delete_entry(
996                    ManifestEntry::builder()
997                        .status(ManifestStatus::Deleted)
998                        .snapshot_id(parent_snapshot.snapshot_id())
999                        .sequence_number(parent_snapshot.sequence_number())
1000                        .file_sequence_number(parent_snapshot.sequence_number())
1001                        .data_file(
1002                            DataFileBuilder::default()
1003                                .partition_spec_id(0)
1004                                .content(DataContentType::Data)
1005                                .file_path(format!("{}/2.parquet", &self.table_location))
1006                                .file_format(DataFileFormat::Parquet)
1007                                .file_size_in_bytes(100)
1008                                .record_count(1)
1009                                .partition(empty_partition.clone())
1010                                .build()
1011                                .unwrap(),
1012                        )
1013                        .build(),
1014                )
1015                .unwrap();
1016
1017            writer
1018                .add_existing_entry(
1019                    ManifestEntry::builder()
1020                        .status(ManifestStatus::Existing)
1021                        .snapshot_id(parent_snapshot.snapshot_id())
1022                        .sequence_number(parent_snapshot.sequence_number())
1023                        .file_sequence_number(parent_snapshot.sequence_number())
1024                        .data_file(
1025                            DataFileBuilder::default()
1026                                .partition_spec_id(0)
1027                                .content(DataContentType::Data)
1028                                .file_path(format!("{}/3.parquet", &self.table_location))
1029                                .file_format(DataFileFormat::Parquet)
1030                                .file_size_in_bytes(100)
1031                                .record_count(1)
1032                                .partition(empty_partition.clone())
1033                                .build()
1034                                .unwrap(),
1035                        )
1036                        .build(),
1037                )
1038                .unwrap();
1039
1040            let data_file_manifest = writer.write_manifest_file().await.unwrap();
1041
1042            // Write to manifest list
1043            let mut manifest_list_write = ManifestListWriter::v2(
1044                self.table
1045                    .file_io()
1046                    .new_output(current_snapshot.manifest_list())
1047                    .unwrap(),
1048                current_snapshot.snapshot_id(),
1049                current_snapshot.parent_snapshot_id(),
1050                current_snapshot.sequence_number(),
1051            );
1052            manifest_list_write
1053                .add_manifests(vec![data_file_manifest].into_iter())
1054                .unwrap();
1055            manifest_list_write.close().await.unwrap();
1056
1057            // prepare data for parquet files
1058            let schema = {
1059                let fields = vec![
1060                    arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
1061                        .with_metadata(HashMap::from([(
1062                            PARQUET_FIELD_ID_META_KEY.to_string(),
1063                            "1".to_string(),
1064                        )])),
1065                    arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
1066                        .with_metadata(HashMap::from([(
1067                            PARQUET_FIELD_ID_META_KEY.to_string(),
1068                            "2".to_string(),
1069                        )])),
1070                    arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
1071                        .with_metadata(HashMap::from([(
1072                            PARQUET_FIELD_ID_META_KEY.to_string(),
1073                            "3".to_string(),
1074                        )])),
1075                    arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
1076                        .with_metadata(HashMap::from([(
1077                            PARQUET_FIELD_ID_META_KEY.to_string(),
1078                            "4".to_string(),
1079                        )])),
1080                    arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
1081                        .with_metadata(HashMap::from([(
1082                            PARQUET_FIELD_ID_META_KEY.to_string(),
1083                            "5".to_string(),
1084                        )])),
1085                    arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
1086                        .with_metadata(HashMap::from([(
1087                            PARQUET_FIELD_ID_META_KEY.to_string(),
1088                            "6".to_string(),
1089                        )])),
1090                    arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
1091                        .with_metadata(HashMap::from([(
1092                            PARQUET_FIELD_ID_META_KEY.to_string(),
1093                            "7".to_string(),
1094                        )])),
1095                    arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
1096                        .with_metadata(HashMap::from([(
1097                            PARQUET_FIELD_ID_META_KEY.to_string(),
1098                            "8".to_string(),
1099                        )])),
1100                ];
1101                Arc::new(arrow_schema::Schema::new(fields))
1102            };
1103
1104            // Build the arrays for the RecordBatch
1105            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1106
1107            let mut values = vec![2; 512];
1108            values.append(vec![3; 200].as_mut());
1109            values.append(vec![4; 300].as_mut());
1110            values.append(vec![5; 12].as_mut());
1111            let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1112
1113            let mut values = vec![3; 512];
1114            values.append(vec![4; 512].as_mut());
1115            let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1116
1117            let mut values = vec!["Apache"; 512];
1118            values.append(vec!["Iceberg"; 512].as_mut());
1119            let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
1120
1121            let mut values = vec![100.0f64; 512];
1122            values.append(vec![150.0f64; 12].as_mut());
1123            values.append(vec![200.0f64; 500].as_mut());
1124            let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
1125
1126            let mut values = vec![100i32; 512];
1127            values.append(vec![150i32; 12].as_mut());
1128            values.append(vec![200i32; 500].as_mut());
1129            let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
1130
1131            let mut values = vec![100i64; 512];
1132            values.append(vec![150i64; 12].as_mut());
1133            values.append(vec![200i64; 500].as_mut());
1134            let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1135
1136            let mut values = vec![false; 512];
1137            values.append(vec![true; 512].as_mut());
1138            let values: BooleanArray = values.into();
1139            let col8 = Arc::new(values) as ArrayRef;
1140
1141            let to_write = RecordBatch::try_new(schema.clone(), vec![
1142                col1, col2, col3, col4, col5, col6, col7, col8,
1143            ])
1144            .unwrap();
1145
1146            // Write the Parquet files
1147            let props = WriterProperties::builder()
1148                .set_compression(Compression::SNAPPY)
1149                .build();
1150
1151            for n in 1..=3 {
1152                let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
1153                let mut writer =
1154                    ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1155
1156                writer.write(&to_write).expect("Writing batch");
1157
1158                // writer must be closed to write footer
1159                writer.close().unwrap();
1160            }
1161        }
1162    }
1163
1164    #[test]
1165    fn test_table_scan_columns() {
1166        let table = TableTestFixture::new().table;
1167
1168        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1169        assert_eq!(
1170            Some(vec!["x".to_string(), "y".to_string()]),
1171            table_scan.column_names
1172        );
1173
1174        let table_scan = table
1175            .scan()
1176            .select(["x", "y"])
1177            .select(["z"])
1178            .build()
1179            .unwrap();
1180        assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1181    }
1182
1183    #[test]
1184    fn test_select_all() {
1185        let table = TableTestFixture::new().table;
1186
1187        let table_scan = table.scan().select_all().build().unwrap();
1188        assert!(table_scan.column_names.is_none());
1189    }
1190
1191    #[test]
1192    fn test_select_no_exist_column() {
1193        let table = TableTestFixture::new().table;
1194
1195        let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1196        assert!(table_scan.is_err());
1197    }
1198
1199    #[test]
1200    fn test_table_scan_default_snapshot_id() {
1201        let table = TableTestFixture::new().table;
1202
1203        let table_scan = table.scan().build().unwrap();
1204        assert_eq!(
1205            table.metadata().current_snapshot().unwrap().snapshot_id(),
1206            table_scan.snapshot().unwrap().snapshot_id()
1207        );
1208    }
1209
1210    #[test]
1211    fn test_table_scan_non_exist_snapshot_id() {
1212        let table = TableTestFixture::new().table;
1213
1214        let table_scan = table.scan().snapshot_id(1024).build();
1215        assert!(table_scan.is_err());
1216    }
1217
1218    #[test]
1219    fn test_table_scan_with_snapshot_id() {
1220        let table = TableTestFixture::new().table;
1221
1222        let table_scan = table
1223            .scan()
1224            .snapshot_id(3051729675574597004)
1225            .with_row_selection_enabled(true)
1226            .build()
1227            .unwrap();
1228        assert_eq!(
1229            table_scan.snapshot().unwrap().snapshot_id(),
1230            3051729675574597004
1231        );
1232    }
1233
1234    #[tokio::test]
1235    async fn test_plan_files_on_table_without_any_snapshots() {
1236        let table = TableTestFixture::new_empty().table;
1237        let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1238        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1239        assert!(batches.is_empty());
1240    }
1241
1242    #[tokio::test]
1243    async fn test_plan_files_no_deletions() {
1244        let mut fixture = TableTestFixture::new();
1245        fixture.setup_manifest_files().await;
1246
1247        // Create table scan for current snapshot and plan files
1248        let table_scan = fixture
1249            .table
1250            .scan()
1251            .with_row_selection_enabled(true)
1252            .build()
1253            .unwrap();
1254
1255        let mut tasks = table_scan
1256            .plan_files()
1257            .await
1258            .unwrap()
1259            .try_fold(vec![], |mut acc, task| async move {
1260                acc.push(task);
1261                Ok(acc)
1262            })
1263            .await
1264            .unwrap();
1265
1266        assert_eq!(tasks.len(), 2);
1267
1268        tasks.sort_by_key(|t| t.data_file_path.to_string());
1269
1270        // Check first task is added data file
1271        assert_eq!(
1272            tasks[0].data_file_path,
1273            format!("{}/1.parquet", &fixture.table_location)
1274        );
1275
1276        // Check second task is existing data file
1277        assert_eq!(
1278            tasks[1].data_file_path,
1279            format!("{}/3.parquet", &fixture.table_location)
1280        );
1281    }
1282
1283    #[tokio::test]
1284    async fn test_open_parquet_no_deletions() {
1285        let mut fixture = TableTestFixture::new();
1286        fixture.setup_manifest_files().await;
1287
1288        // Create table scan for current snapshot and plan files
1289        let table_scan = fixture
1290            .table
1291            .scan()
1292            .with_row_selection_enabled(true)
1293            .build()
1294            .unwrap();
1295
1296        let batch_stream = table_scan.to_arrow().await.unwrap();
1297
1298        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1299
1300        let col = batches[0].column_by_name("x").unwrap();
1301
1302        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1303        assert_eq!(int64_arr.value(0), 1);
1304    }
1305
1306    #[tokio::test]
1307    async fn test_open_parquet_no_deletions_by_separate_reader() {
1308        let mut fixture = TableTestFixture::new();
1309        fixture.setup_manifest_files().await;
1310
1311        // Create table scan for current snapshot and plan files
1312        let table_scan = fixture
1313            .table
1314            .scan()
1315            .with_row_selection_enabled(true)
1316            .build()
1317            .unwrap();
1318
1319        let mut plan_task: Vec<_> = table_scan
1320            .plan_files()
1321            .await
1322            .unwrap()
1323            .try_collect()
1324            .await
1325            .unwrap();
1326        assert_eq!(plan_task.len(), 2);
1327
1328        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1329        let batch_stream = reader
1330            .clone()
1331            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1332            .unwrap();
1333        let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1334
1335        let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1336        let batch_stream = reader
1337            .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1338            .unwrap();
1339        let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1340
1341        assert_eq!(batch_1, batch_2);
1342    }
1343
1344    #[tokio::test]
1345    async fn test_open_parquet_with_projection() {
1346        let mut fixture = TableTestFixture::new();
1347        fixture.setup_manifest_files().await;
1348
1349        // Create table scan for current snapshot and plan files
1350        let table_scan = fixture
1351            .table
1352            .scan()
1353            .select(["x", "z"])
1354            .with_row_selection_enabled(true)
1355            .build()
1356            .unwrap();
1357
1358        let batch_stream = table_scan.to_arrow().await.unwrap();
1359
1360        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1361
1362        assert_eq!(batches[0].num_columns(), 2);
1363
1364        let col1 = batches[0].column_by_name("x").unwrap();
1365        let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1366        assert_eq!(int64_arr.value(0), 1);
1367
1368        let col2 = batches[0].column_by_name("z").unwrap();
1369        let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1370        assert_eq!(int64_arr.value(0), 3);
1371
1372        // test empty scan
1373        let table_scan = fixture.table.scan().select_empty().build().unwrap();
1374        let batch_stream = table_scan.to_arrow().await.unwrap();
1375        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1376
1377        assert_eq!(batches[0].num_columns(), 0);
1378        assert_eq!(batches[0].num_rows(), 1024);
1379    }
1380
1381    #[tokio::test]
1382    async fn test_filter_on_arrow_lt() {
1383        let mut fixture = TableTestFixture::new();
1384        fixture.setup_manifest_files().await;
1385
1386        // Filter: y < 3
1387        let mut builder = fixture.table.scan();
1388        let predicate = Reference::new("y").less_than(Datum::long(3));
1389        builder = builder
1390            .with_filter(predicate)
1391            .with_row_selection_enabled(true);
1392        let table_scan = builder.build().unwrap();
1393
1394        let batch_stream = table_scan.to_arrow().await.unwrap();
1395
1396        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1397
1398        assert_eq!(batches[0].num_rows(), 512);
1399
1400        let col = batches[0].column_by_name("x").unwrap();
1401        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1402        assert_eq!(int64_arr.value(0), 1);
1403
1404        let col = batches[0].column_by_name("y").unwrap();
1405        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1406        assert_eq!(int64_arr.value(0), 2);
1407    }
1408
1409    #[tokio::test]
1410    async fn test_filter_on_arrow_gt_eq() {
1411        let mut fixture = TableTestFixture::new();
1412        fixture.setup_manifest_files().await;
1413
1414        // Filter: y >= 5
1415        let mut builder = fixture.table.scan();
1416        let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1417        builder = builder
1418            .with_filter(predicate)
1419            .with_row_selection_enabled(true);
1420        let table_scan = builder.build().unwrap();
1421
1422        let batch_stream = table_scan.to_arrow().await.unwrap();
1423
1424        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1425
1426        assert_eq!(batches[0].num_rows(), 12);
1427
1428        let col = batches[0].column_by_name("x").unwrap();
1429        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1430        assert_eq!(int64_arr.value(0), 1);
1431
1432        let col = batches[0].column_by_name("y").unwrap();
1433        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1434        assert_eq!(int64_arr.value(0), 5);
1435    }
1436
1437    #[tokio::test]
1438    async fn test_filter_double_eq() {
1439        let mut fixture = TableTestFixture::new();
1440        fixture.setup_manifest_files().await;
1441
1442        // Filter: dbl == 150.0
1443        let mut builder = fixture.table.scan();
1444        let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1445        builder = builder
1446            .with_filter(predicate)
1447            .with_row_selection_enabled(true);
1448        let table_scan = builder.build().unwrap();
1449
1450        let batch_stream = table_scan.to_arrow().await.unwrap();
1451
1452        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1453
1454        assert_eq!(batches.len(), 2);
1455        assert_eq!(batches[0].num_rows(), 12);
1456
1457        let col = batches[0].column_by_name("dbl").unwrap();
1458        let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1459        assert_eq!(f64_arr.value(1), 150.0f64);
1460    }
1461
1462    #[tokio::test]
1463    async fn test_filter_int_eq() {
1464        let mut fixture = TableTestFixture::new();
1465        fixture.setup_manifest_files().await;
1466
1467        // Filter: i32 == 150
1468        let mut builder = fixture.table.scan();
1469        let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1470        builder = builder
1471            .with_filter(predicate)
1472            .with_row_selection_enabled(true);
1473        let table_scan = builder.build().unwrap();
1474
1475        let batch_stream = table_scan.to_arrow().await.unwrap();
1476
1477        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1478
1479        assert_eq!(batches.len(), 2);
1480        assert_eq!(batches[0].num_rows(), 12);
1481
1482        let col = batches[0].column_by_name("i32").unwrap();
1483        let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1484        assert_eq!(i32_arr.value(1), 150i32);
1485    }
1486
1487    #[tokio::test]
1488    async fn test_filter_long_eq() {
1489        let mut fixture = TableTestFixture::new();
1490        fixture.setup_manifest_files().await;
1491
1492        // Filter: i64 == 150
1493        let mut builder = fixture.table.scan();
1494        let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1495        builder = builder
1496            .with_filter(predicate)
1497            .with_row_selection_enabled(true);
1498        let table_scan = builder.build().unwrap();
1499
1500        let batch_stream = table_scan.to_arrow().await.unwrap();
1501
1502        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1503
1504        assert_eq!(batches.len(), 2);
1505        assert_eq!(batches[0].num_rows(), 12);
1506
1507        let col = batches[0].column_by_name("i64").unwrap();
1508        let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1509        assert_eq!(i64_arr.value(1), 150i64);
1510    }
1511
1512    #[tokio::test]
1513    async fn test_filter_bool_eq() {
1514        let mut fixture = TableTestFixture::new();
1515        fixture.setup_manifest_files().await;
1516
1517        // Filter: bool == true
1518        let mut builder = fixture.table.scan();
1519        let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1520        builder = builder
1521            .with_filter(predicate)
1522            .with_row_selection_enabled(true);
1523        let table_scan = builder.build().unwrap();
1524
1525        let batch_stream = table_scan.to_arrow().await.unwrap();
1526
1527        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1528
1529        assert_eq!(batches.len(), 2);
1530        assert_eq!(batches[0].num_rows(), 512);
1531
1532        let col = batches[0].column_by_name("bool").unwrap();
1533        let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1534        assert!(bool_arr.value(1));
1535    }
1536
1537    #[tokio::test]
1538    async fn test_filter_on_arrow_is_null() {
1539        let mut fixture = TableTestFixture::new();
1540        fixture.setup_manifest_files().await;
1541
1542        // Filter: y is null
1543        let mut builder = fixture.table.scan();
1544        let predicate = Reference::new("y").is_null();
1545        builder = builder
1546            .with_filter(predicate)
1547            .with_row_selection_enabled(true);
1548        let table_scan = builder.build().unwrap();
1549
1550        let batch_stream = table_scan.to_arrow().await.unwrap();
1551
1552        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1553        assert_eq!(batches.len(), 0);
1554    }
1555
1556    #[tokio::test]
1557    async fn test_filter_on_arrow_is_not_null() {
1558        let mut fixture = TableTestFixture::new();
1559        fixture.setup_manifest_files().await;
1560
1561        // Filter: y is not null
1562        let mut builder = fixture.table.scan();
1563        let predicate = Reference::new("y").is_not_null();
1564        builder = builder
1565            .with_filter(predicate)
1566            .with_row_selection_enabled(true);
1567        let table_scan = builder.build().unwrap();
1568
1569        let batch_stream = table_scan.to_arrow().await.unwrap();
1570
1571        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1572        assert_eq!(batches[0].num_rows(), 1024);
1573    }
1574
1575    #[tokio::test]
1576    async fn test_filter_on_arrow_lt_and_gt() {
1577        let mut fixture = TableTestFixture::new();
1578        fixture.setup_manifest_files().await;
1579
1580        // Filter: y < 5 AND z >= 4
1581        let mut builder = fixture.table.scan();
1582        let predicate = Reference::new("y")
1583            .less_than(Datum::long(5))
1584            .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1585        builder = builder
1586            .with_filter(predicate)
1587            .with_row_selection_enabled(true);
1588        let table_scan = builder.build().unwrap();
1589
1590        let batch_stream = table_scan.to_arrow().await.unwrap();
1591
1592        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1593        assert_eq!(batches[0].num_rows(), 500);
1594
1595        let col = batches[0].column_by_name("x").unwrap();
1596        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1597        assert_eq!(col, &expected_x);
1598
1599        let col = batches[0].column_by_name("y").unwrap();
1600        let mut values = vec![];
1601        values.append(vec![3; 200].as_mut());
1602        values.append(vec![4; 300].as_mut());
1603        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1604        assert_eq!(col, &expected_y);
1605
1606        let col = batches[0].column_by_name("z").unwrap();
1607        let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1608        assert_eq!(col, &expected_z);
1609    }
1610
1611    #[tokio::test]
1612    async fn test_filter_on_arrow_lt_or_gt() {
1613        let mut fixture = TableTestFixture::new();
1614        fixture.setup_manifest_files().await;
1615
1616        // Filter: y < 5 AND z >= 4
1617        let mut builder = fixture.table.scan();
1618        let predicate = Reference::new("y")
1619            .less_than(Datum::long(5))
1620            .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1621        builder = builder
1622            .with_filter(predicate)
1623            .with_row_selection_enabled(true);
1624        let table_scan = builder.build().unwrap();
1625
1626        let batch_stream = table_scan.to_arrow().await.unwrap();
1627
1628        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1629        assert_eq!(batches[0].num_rows(), 1024);
1630
1631        let col = batches[0].column_by_name("x").unwrap();
1632        let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1633        assert_eq!(col, &expected_x);
1634
1635        let col = batches[0].column_by_name("y").unwrap();
1636        let mut values = vec![2; 512];
1637        values.append(vec![3; 200].as_mut());
1638        values.append(vec![4; 300].as_mut());
1639        values.append(vec![5; 12].as_mut());
1640        let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1641        assert_eq!(col, &expected_y);
1642
1643        let col = batches[0].column_by_name("z").unwrap();
1644        let mut values = vec![3; 512];
1645        values.append(vec![4; 512].as_mut());
1646        let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1647        assert_eq!(col, &expected_z);
1648    }
1649
1650    #[tokio::test]
1651    async fn test_filter_on_arrow_startswith() {
1652        let mut fixture = TableTestFixture::new();
1653        fixture.setup_manifest_files().await;
1654
1655        // Filter: a STARTSWITH "Ice"
1656        let mut builder = fixture.table.scan();
1657        let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1658        builder = builder
1659            .with_filter(predicate)
1660            .with_row_selection_enabled(true);
1661        let table_scan = builder.build().unwrap();
1662
1663        let batch_stream = table_scan.to_arrow().await.unwrap();
1664
1665        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1666
1667        assert_eq!(batches[0].num_rows(), 512);
1668
1669        let col = batches[0].column_by_name("a").unwrap();
1670        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1671        assert_eq!(string_arr.value(0), "Iceberg");
1672    }
1673
1674    #[tokio::test]
1675    async fn test_filter_on_arrow_not_startswith() {
1676        let mut fixture = TableTestFixture::new();
1677        fixture.setup_manifest_files().await;
1678
1679        // Filter: a NOT STARTSWITH "Ice"
1680        let mut builder = fixture.table.scan();
1681        let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1682        builder = builder
1683            .with_filter(predicate)
1684            .with_row_selection_enabled(true);
1685        let table_scan = builder.build().unwrap();
1686
1687        let batch_stream = table_scan.to_arrow().await.unwrap();
1688
1689        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1690
1691        assert_eq!(batches[0].num_rows(), 512);
1692
1693        let col = batches[0].column_by_name("a").unwrap();
1694        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1695        assert_eq!(string_arr.value(0), "Apache");
1696    }
1697
1698    #[tokio::test]
1699    async fn test_filter_on_arrow_in() {
1700        let mut fixture = TableTestFixture::new();
1701        fixture.setup_manifest_files().await;
1702
1703        // Filter: a IN ("Sioux", "Iceberg")
1704        let mut builder = fixture.table.scan();
1705        let predicate =
1706            Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1707        builder = builder
1708            .with_filter(predicate)
1709            .with_row_selection_enabled(true);
1710        let table_scan = builder.build().unwrap();
1711
1712        let batch_stream = table_scan.to_arrow().await.unwrap();
1713
1714        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1715
1716        assert_eq!(batches[0].num_rows(), 512);
1717
1718        let col = batches[0].column_by_name("a").unwrap();
1719        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1720        assert_eq!(string_arr.value(0), "Iceberg");
1721    }
1722
1723    #[tokio::test]
1724    async fn test_filter_on_arrow_not_in() {
1725        let mut fixture = TableTestFixture::new();
1726        fixture.setup_manifest_files().await;
1727
1728        // Filter: a NOT IN ("Sioux", "Iceberg")
1729        let mut builder = fixture.table.scan();
1730        let predicate =
1731            Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1732        builder = builder
1733            .with_filter(predicate)
1734            .with_row_selection_enabled(true);
1735        let table_scan = builder.build().unwrap();
1736
1737        let batch_stream = table_scan.to_arrow().await.unwrap();
1738
1739        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1740
1741        assert_eq!(batches[0].num_rows(), 512);
1742
1743        let col = batches[0].column_by_name("a").unwrap();
1744        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1745        assert_eq!(string_arr.value(0), "Apache");
1746    }
1747
1748    #[test]
1749    fn test_file_scan_task_serialize_deserialize() {
1750        let test_fn = |task: FileScanTask| {
1751            let serialized = serde_json::to_string(&task).unwrap();
1752            let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1753
1754            assert_eq!(task.data_file_path, deserialized.data_file_path);
1755            assert_eq!(task.start, deserialized.start);
1756            assert_eq!(task.length, deserialized.length);
1757            assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1758            assert_eq!(task.predicate, deserialized.predicate);
1759            assert_eq!(task.schema, deserialized.schema);
1760        };
1761
1762        // without predicate
1763        let schema = Arc::new(
1764            Schema::builder()
1765                .with_fields(vec![Arc::new(NestedField::required(
1766                    1,
1767                    "x",
1768                    Type::Primitive(PrimitiveType::Binary),
1769                ))])
1770                .build()
1771                .unwrap(),
1772        );
1773        let task = FileScanTask {
1774            data_file_path: "data_file_path".to_string(),
1775            start: 0,
1776            length: 100,
1777            project_field_ids: vec![1, 2, 3],
1778            predicate: None,
1779            schema: schema.clone(),
1780            record_count: Some(100),
1781            data_file_format: DataFileFormat::Parquet,
1782            deletes: vec![],
1783        };
1784        test_fn(task);
1785
1786        // with predicate
1787        let task = FileScanTask {
1788            data_file_path: "data_file_path".to_string(),
1789            start: 0,
1790            length: 100,
1791            project_field_ids: vec![1, 2, 3],
1792            predicate: Some(BoundPredicate::AlwaysTrue),
1793            schema,
1794            record_count: None,
1795            data_file_format: DataFileFormat::Avro,
1796            deletes: vec![],
1797        };
1798        test_fn(task);
1799    }
1800}