1mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35use crate::delete_file_index::DeleteFileIndex;
36use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
37use crate::expr::{Bind, BoundPredicate, Predicate};
38use crate::io::FileIO;
39use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
40use crate::runtime::spawn;
41use crate::spec::{DataContentType, SnapshotRef};
42use crate::table::Table;
43use crate::utils::available_parallelism;
44use crate::{Error, ErrorKind, Result};
45
46pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
48
49pub struct TableScanBuilder<'a> {
51 table: &'a Table,
52 column_names: Option<Vec<String>>,
54 snapshot_id: Option<i64>,
55 batch_size: Option<usize>,
56 case_sensitive: bool,
57 filter: Option<Predicate>,
58 concurrency_limit_data_files: usize,
59 concurrency_limit_manifest_entries: usize,
60 concurrency_limit_manifest_files: usize,
61 row_group_filtering_enabled: bool,
62 row_selection_enabled: bool,
63}
64
65impl<'a> TableScanBuilder<'a> {
66 pub(crate) fn new(table: &'a Table) -> Self {
67 let num_cpus = available_parallelism().get();
68
69 Self {
70 table,
71 column_names: None,
72 snapshot_id: None,
73 batch_size: None,
74 case_sensitive: true,
75 filter: None,
76 concurrency_limit_data_files: num_cpus,
77 concurrency_limit_manifest_entries: num_cpus,
78 concurrency_limit_manifest_files: num_cpus,
79 row_group_filtering_enabled: true,
80 row_selection_enabled: false,
81 }
82 }
83
84 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
87 self.batch_size = batch_size;
88 self
89 }
90
91 pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
93 self.case_sensitive = case_sensitive;
94 self
95 }
96
97 pub fn with_filter(mut self, predicate: Predicate) -> Self {
99 self.filter = Some(predicate.rewrite_not());
102 self
103 }
104
105 pub fn select_all(mut self) -> Self {
107 self.column_names = None;
108 self
109 }
110
111 pub fn select_empty(mut self) -> Self {
113 self.column_names = Some(vec![]);
114 self
115 }
116
117 pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
119 self.column_names = Some(
120 column_names
121 .into_iter()
122 .map(|item| item.to_string())
123 .collect(),
124 );
125 self
126 }
127
128 pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
130 self.snapshot_id = Some(snapshot_id);
131 self
132 }
133
134 pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
137 self.concurrency_limit_manifest_files = limit;
138 self.concurrency_limit_manifest_entries = limit;
139 self.concurrency_limit_data_files = limit;
140 self
141 }
142
143 pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
145 self.concurrency_limit_data_files = limit;
146 self
147 }
148
149 pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
151 self.concurrency_limit_manifest_entries = limit;
152 self
153 }
154
155 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
164 self.row_group_filtering_enabled = row_group_filtering_enabled;
165 self
166 }
167
168 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
183 self.row_selection_enabled = row_selection_enabled;
184 self
185 }
186
187 pub fn build(self) -> Result<TableScan> {
189 let snapshot = match self.snapshot_id {
190 Some(snapshot_id) => self
191 .table
192 .metadata()
193 .snapshot_by_id(snapshot_id)
194 .ok_or_else(|| {
195 Error::new(
196 ErrorKind::DataInvalid,
197 format!("Snapshot with id {snapshot_id} not found"),
198 )
199 })?
200 .clone(),
201 None => {
202 let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
203 return Ok(TableScan {
204 batch_size: self.batch_size,
205 column_names: self.column_names,
206 file_io: self.table.file_io().clone(),
207 plan_context: None,
208 concurrency_limit_data_files: self.concurrency_limit_data_files,
209 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
210 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
211 row_group_filtering_enabled: self.row_group_filtering_enabled,
212 row_selection_enabled: self.row_selection_enabled,
213 });
214 };
215 current_snapshot_id.clone()
216 }
217 };
218
219 let schema = snapshot.schema(self.table.metadata())?;
220
221 if let Some(column_names) = self.column_names.as_ref() {
223 for column_name in column_names {
224 if is_metadata_column_name(column_name) {
226 continue;
227 }
228 if schema.field_by_name(column_name).is_none() {
229 return Err(Error::new(
230 ErrorKind::DataInvalid,
231 format!("Column {column_name} not found in table. Schema: {schema}"),
232 ));
233 }
234 }
235 }
236
237 let mut field_ids = vec![];
238 let column_names = self.column_names.clone().unwrap_or_else(|| {
239 schema
240 .as_struct()
241 .fields()
242 .iter()
243 .map(|f| f.name.clone())
244 .collect()
245 });
246
247 for column_name in column_names.iter() {
248 if is_metadata_column_name(column_name) {
250 field_ids.push(get_metadata_field_id(column_name)?);
251 continue;
252 }
253
254 let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
255 Error::new(
256 ErrorKind::DataInvalid,
257 format!("Column {column_name} not found in table. Schema: {schema}"),
258 )
259 })?;
260
261 schema
262 .as_struct()
263 .field_by_id(field_id)
264 .ok_or_else(|| {
265 Error::new(
266 ErrorKind::FeatureUnsupported,
267 format!(
268 "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
269 ),
270 )
271 })?;
272
273 field_ids.push(field_id);
274 }
275
276 let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
277 Some(predicates.bind(schema.clone(), true)?)
278 } else {
279 None
280 };
281
282 let plan_context = PlanContext {
283 snapshot,
284 table_metadata: self.table.metadata_ref(),
285 snapshot_schema: schema,
286 case_sensitive: self.case_sensitive,
287 predicate: self.filter.map(Arc::new),
288 snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
289 object_cache: self.table.object_cache(),
290 field_ids: Arc::new(field_ids),
291 partition_filter_cache: Arc::new(PartitionFilterCache::new()),
292 manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
293 expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
294 };
295
296 Ok(TableScan {
297 batch_size: self.batch_size,
298 column_names: self.column_names,
299 file_io: self.table.file_io().clone(),
300 plan_context: Some(plan_context),
301 concurrency_limit_data_files: self.concurrency_limit_data_files,
302 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
303 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
304 row_group_filtering_enabled: self.row_group_filtering_enabled,
305 row_selection_enabled: self.row_selection_enabled,
306 })
307 }
308}
309
310#[derive(Debug)]
312pub struct TableScan {
313 plan_context: Option<PlanContext>,
317 batch_size: Option<usize>,
318 file_io: FileIO,
319 column_names: Option<Vec<String>>,
320 concurrency_limit_manifest_files: usize,
323
324 concurrency_limit_manifest_entries: usize,
327
328 concurrency_limit_data_files: usize,
331
332 row_group_filtering_enabled: bool,
333 row_selection_enabled: bool,
334}
335
336impl TableScan {
337 pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
339 let Some(plan_context) = self.plan_context.as_ref() else {
340 return Ok(Box::pin(futures::stream::empty()));
341 };
342
343 let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
344 let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
345
346 let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
348 channel(concurrency_limit_manifest_files);
349 let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
350 channel(concurrency_limit_manifest_files);
351
352 let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
354
355 let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
356
357 let manifest_list = plan_context.get_manifest_list().await?;
358
359 let manifest_file_contexts = plan_context.build_manifest_file_contexts(
363 manifest_list,
364 manifest_entry_data_ctx_tx,
365 delete_file_idx.clone(),
366 manifest_entry_delete_ctx_tx,
367 )?;
368
369 let mut channel_for_manifest_error = file_scan_task_tx.clone();
370
371 spawn(async move {
373 let result = futures::stream::iter(manifest_file_contexts)
374 .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
375 ctx.fetch_manifest_and_stream_manifest_entries().await
376 })
377 .await;
378
379 if let Err(error) = result {
380 let _ = channel_for_manifest_error.send(Err(error)).await;
381 }
382 });
383
384 let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
385 let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
386
387 spawn(async move {
389 let result = manifest_entry_delete_ctx_rx
390 .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
391 .try_for_each_concurrent(
392 concurrency_limit_manifest_entries,
393 |(manifest_entry_context, tx)| async move {
394 spawn(async move {
395 Self::process_delete_manifest_entry(manifest_entry_context, tx).await
396 })
397 .await
398 },
399 )
400 .await;
401
402 if let Err(error) = result {
403 let _ = channel_for_delete_manifest_entry_error
404 .send(Err(error))
405 .await;
406 }
407 })
408 .await;
409
410 spawn(async move {
412 let result = manifest_entry_data_ctx_rx
413 .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
414 .try_for_each_concurrent(
415 concurrency_limit_manifest_entries,
416 |(manifest_entry_context, tx)| async move {
417 spawn(async move {
418 Self::process_data_manifest_entry(manifest_entry_context, tx).await
419 })
420 .await
421 },
422 )
423 .await;
424
425 if let Err(error) = result {
426 let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
427 }
428 });
429
430 Ok(file_scan_task_rx.boxed())
431 }
432
433 pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
435 let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
436 .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
437 .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
438 .with_row_selection_enabled(self.row_selection_enabled);
439
440 if let Some(batch_size) = self.batch_size {
441 arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
442 }
443
444 arrow_reader_builder.build().read(self.plan_files().await?)
445 }
446
447 pub fn column_names(&self) -> Option<&[String]> {
449 self.column_names.as_deref()
450 }
451
452 pub fn snapshot(&self) -> Option<&SnapshotRef> {
454 self.plan_context.as_ref().map(|x| &x.snapshot)
455 }
456
457 async fn process_data_manifest_entry(
458 manifest_entry_context: ManifestEntryContext,
459 mut file_scan_task_tx: Sender<Result<FileScanTask>>,
460 ) -> Result<()> {
461 if !manifest_entry_context.manifest_entry.is_alive() {
463 return Ok(());
464 }
465
466 if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
468 return Err(Error::new(
469 ErrorKind::FeatureUnsupported,
470 "Encountered an entry for a delete file in a data file manifest",
471 ));
472 }
473
474 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
475 let BoundPredicates {
476 snapshot_bound_predicate,
477 partition_bound_predicate,
478 } = bound_predicates.as_ref();
479
480 let expression_evaluator_cache =
481 manifest_entry_context.expression_evaluator_cache.as_ref();
482
483 let expression_evaluator = expression_evaluator_cache.get(
484 manifest_entry_context.partition_spec_id,
485 partition_bound_predicate,
486 )?;
487
488 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
491 return Ok(());
492 }
493
494 if !InclusiveMetricsEvaluator::eval(
496 snapshot_bound_predicate,
497 manifest_entry_context.manifest_entry.data_file(),
498 false,
499 )? {
500 return Ok(());
501 }
502 }
503
504 file_scan_task_tx
508 .send(Ok(manifest_entry_context.into_file_scan_task().await?))
509 .await?;
510
511 Ok(())
512 }
513
514 async fn process_delete_manifest_entry(
515 manifest_entry_context: ManifestEntryContext,
516 mut delete_file_ctx_tx: Sender<DeleteFileContext>,
517 ) -> Result<()> {
518 if !manifest_entry_context.manifest_entry.is_alive() {
520 return Ok(());
521 }
522
523 if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
525 return Err(Error::new(
526 ErrorKind::FeatureUnsupported,
527 "Encountered an entry for a data file in a delete manifest",
528 ));
529 }
530
531 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
532 let expression_evaluator_cache =
533 manifest_entry_context.expression_evaluator_cache.as_ref();
534
535 let expression_evaluator = expression_evaluator_cache.get(
536 manifest_entry_context.partition_spec_id,
537 &bound_predicates.partition_bound_predicate,
538 )?;
539
540 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
543 return Ok(());
544 }
545 }
546
547 delete_file_ctx_tx
548 .send(DeleteFileContext {
549 manifest_entry: manifest_entry_context.manifest_entry.clone(),
550 partition_spec_id: manifest_entry_context.partition_spec_id,
551 })
552 .await?;
553
554 Ok(())
555 }
556}
557
558pub(crate) struct BoundPredicates {
559 partition_bound_predicate: BoundPredicate,
560 snapshot_bound_predicate: BoundPredicate,
561}
562
563#[cfg(test)]
564pub mod tests {
565 #![allow(missing_docs)]
567
568 use std::collections::HashMap;
569 use std::fs;
570 use std::fs::File;
571 use std::sync::Arc;
572
573 use arrow_array::cast::AsArray;
574 use arrow_array::{
575 Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
576 StringArray,
577 };
578 use futures::{TryStreamExt, stream};
579 use minijinja::value::Value;
580 use minijinja::{AutoEscape, Environment, context};
581 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
582 use parquet::basic::Compression;
583 use parquet::file::properties::WriterProperties;
584 use tempfile::TempDir;
585 use uuid::Uuid;
586
587 use crate::TableIdent;
588 use crate::arrow::ArrowReaderBuilder;
589 use crate::expr::{BoundPredicate, Reference};
590 use crate::io::{FileIO, OutputFile};
591 use crate::metadata_columns::RESERVED_COL_NAME_FILE;
592 use crate::scan::FileScanTask;
593 use crate::spec::{
594 DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
595 ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
596 PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
597 };
598 use crate::table::Table;
599
600 fn render_template(template: &str, ctx: Value) -> String {
601 let mut env = Environment::new();
602 env.set_auto_escape_callback(|_| AutoEscape::None);
603 env.render_str(template, ctx).unwrap()
604 }
605
606 pub struct TableTestFixture {
607 pub table_location: String,
608 pub table: Table,
609 }
610
611 impl TableTestFixture {
612 #[allow(clippy::new_without_default)]
613 pub fn new() -> Self {
614 let tmp_dir = TempDir::new().unwrap();
615 let table_location = tmp_dir.path().join("table1");
616 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
617 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
618 let table_metadata1_location = table_location.join("metadata/v1.json");
619
620 let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
621 .unwrap()
622 .build()
623 .unwrap();
624
625 let table_metadata = {
626 let template_json_str = fs::read_to_string(format!(
627 "{}/testdata/example_table_metadata_v2.json",
628 env!("CARGO_MANIFEST_DIR")
629 ))
630 .unwrap();
631 let metadata_json = render_template(&template_json_str, context! {
632 table_location => &table_location,
633 manifest_list_1_location => &manifest_list1_location,
634 manifest_list_2_location => &manifest_list2_location,
635 table_metadata_1_location => &table_metadata1_location,
636 });
637 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
638 };
639
640 let table = Table::builder()
641 .metadata(table_metadata)
642 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
643 .file_io(file_io.clone())
644 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
645 .build()
646 .unwrap();
647
648 Self {
649 table_location: table_location.to_str().unwrap().to_string(),
650 table,
651 }
652 }
653
654 #[allow(clippy::new_without_default)]
655 pub fn new_empty() -> Self {
656 let tmp_dir = TempDir::new().unwrap();
657 let table_location = tmp_dir.path().join("table1");
658 let table_metadata1_location = table_location.join("metadata/v1.json");
659
660 let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
661 .unwrap()
662 .build()
663 .unwrap();
664
665 let table_metadata = {
666 let template_json_str = fs::read_to_string(format!(
667 "{}/testdata/example_empty_table_metadata_v2.json",
668 env!("CARGO_MANIFEST_DIR")
669 ))
670 .unwrap();
671 let metadata_json = render_template(&template_json_str, context! {
672 table_location => &table_location,
673 table_metadata_1_location => &table_metadata1_location,
674 });
675 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
676 };
677
678 let table = Table::builder()
679 .metadata(table_metadata)
680 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
681 .file_io(file_io.clone())
682 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
683 .build()
684 .unwrap();
685
686 Self {
687 table_location: table_location.to_str().unwrap().to_string(),
688 table,
689 }
690 }
691
692 pub fn new_unpartitioned() -> Self {
693 let tmp_dir = TempDir::new().unwrap();
694 let table_location = tmp_dir.path().join("table1");
695 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
696 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
697 let table_metadata1_location = table_location.join("metadata/v1.json");
698
699 let file_io = FileIO::from_path(table_location.to_str().unwrap())
700 .unwrap()
701 .build()
702 .unwrap();
703
704 let mut table_metadata = {
705 let template_json_str = fs::read_to_string(format!(
706 "{}/testdata/example_table_metadata_v2.json",
707 env!("CARGO_MANIFEST_DIR")
708 ))
709 .unwrap();
710 let metadata_json = render_template(&template_json_str, context! {
711 table_location => &table_location,
712 manifest_list_1_location => &manifest_list1_location,
713 manifest_list_2_location => &manifest_list2_location,
714 table_metadata_1_location => &table_metadata1_location,
715 });
716 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
717 };
718
719 table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
720 table_metadata.partition_specs.clear();
721 table_metadata.default_partition_type = StructType::new(vec![]);
722 table_metadata
723 .partition_specs
724 .insert(0, table_metadata.default_spec.clone());
725
726 let table = Table::builder()
727 .metadata(table_metadata)
728 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
729 .file_io(file_io.clone())
730 .metadata_location(table_metadata1_location.to_str().unwrap())
731 .build()
732 .unwrap();
733
734 Self {
735 table_location: table_location.to_str().unwrap().to_string(),
736 table,
737 }
738 }
739
740 fn next_manifest_file(&self) -> OutputFile {
741 self.table
742 .file_io()
743 .new_output(format!(
744 "{}/metadata/manifest_{}.avro",
745 self.table_location,
746 Uuid::new_v4()
747 ))
748 .unwrap()
749 }
750
751 pub async fn setup_manifest_files(&mut self) {
752 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
753 let parent_snapshot = current_snapshot
754 .parent_snapshot(self.table.metadata())
755 .unwrap();
756 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
757 let current_partition_spec = self.table.metadata().default_partition_spec();
758
759 let mut writer = ManifestWriterBuilder::new(
761 self.next_manifest_file(),
762 Some(current_snapshot.snapshot_id()),
763 None,
764 current_schema.clone(),
765 current_partition_spec.as_ref().clone(),
766 )
767 .build_v2_data();
768 writer
769 .add_entry(
770 ManifestEntry::builder()
771 .status(ManifestStatus::Added)
772 .data_file(
773 DataFileBuilder::default()
774 .partition_spec_id(0)
775 .content(DataContentType::Data)
776 .file_path(format!("{}/1.parquet", &self.table_location))
777 .file_format(DataFileFormat::Parquet)
778 .file_size_in_bytes(100)
779 .record_count(1)
780 .partition(Struct::from_iter([Some(Literal::long(100))]))
781 .key_metadata(None)
782 .build()
783 .unwrap(),
784 )
785 .build(),
786 )
787 .unwrap();
788 writer
789 .add_delete_entry(
790 ManifestEntry::builder()
791 .status(ManifestStatus::Deleted)
792 .snapshot_id(parent_snapshot.snapshot_id())
793 .sequence_number(parent_snapshot.sequence_number())
794 .file_sequence_number(parent_snapshot.sequence_number())
795 .data_file(
796 DataFileBuilder::default()
797 .partition_spec_id(0)
798 .content(DataContentType::Data)
799 .file_path(format!("{}/2.parquet", &self.table_location))
800 .file_format(DataFileFormat::Parquet)
801 .file_size_in_bytes(100)
802 .record_count(1)
803 .partition(Struct::from_iter([Some(Literal::long(200))]))
804 .build()
805 .unwrap(),
806 )
807 .build(),
808 )
809 .unwrap();
810 writer
811 .add_existing_entry(
812 ManifestEntry::builder()
813 .status(ManifestStatus::Existing)
814 .snapshot_id(parent_snapshot.snapshot_id())
815 .sequence_number(parent_snapshot.sequence_number())
816 .file_sequence_number(parent_snapshot.sequence_number())
817 .data_file(
818 DataFileBuilder::default()
819 .partition_spec_id(0)
820 .content(DataContentType::Data)
821 .file_path(format!("{}/3.parquet", &self.table_location))
822 .file_format(DataFileFormat::Parquet)
823 .file_size_in_bytes(100)
824 .record_count(1)
825 .partition(Struct::from_iter([Some(Literal::long(300))]))
826 .build()
827 .unwrap(),
828 )
829 .build(),
830 )
831 .unwrap();
832 let data_file_manifest = writer.write_manifest_file().await.unwrap();
833
834 let mut manifest_list_write = ManifestListWriter::v2(
836 self.table
837 .file_io()
838 .new_output(current_snapshot.manifest_list())
839 .unwrap(),
840 current_snapshot.snapshot_id(),
841 current_snapshot.parent_snapshot_id(),
842 current_snapshot.sequence_number(),
843 );
844 manifest_list_write
845 .add_manifests(vec![data_file_manifest].into_iter())
846 .unwrap();
847 manifest_list_write.close().await.unwrap();
848
849 let schema = {
851 let fields = vec![
852 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
853 .with_metadata(HashMap::from([(
854 PARQUET_FIELD_ID_META_KEY.to_string(),
855 "1".to_string(),
856 )])),
857 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
858 .with_metadata(HashMap::from([(
859 PARQUET_FIELD_ID_META_KEY.to_string(),
860 "2".to_string(),
861 )])),
862 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
863 .with_metadata(HashMap::from([(
864 PARQUET_FIELD_ID_META_KEY.to_string(),
865 "3".to_string(),
866 )])),
867 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
868 .with_metadata(HashMap::from([(
869 PARQUET_FIELD_ID_META_KEY.to_string(),
870 "4".to_string(),
871 )])),
872 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
873 .with_metadata(HashMap::from([(
874 PARQUET_FIELD_ID_META_KEY.to_string(),
875 "5".to_string(),
876 )])),
877 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
878 .with_metadata(HashMap::from([(
879 PARQUET_FIELD_ID_META_KEY.to_string(),
880 "6".to_string(),
881 )])),
882 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
883 .with_metadata(HashMap::from([(
884 PARQUET_FIELD_ID_META_KEY.to_string(),
885 "7".to_string(),
886 )])),
887 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
888 .with_metadata(HashMap::from([(
889 PARQUET_FIELD_ID_META_KEY.to_string(),
890 "8".to_string(),
891 )])),
892 ];
893 Arc::new(arrow_schema::Schema::new(fields))
894 };
895 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
897
898 let mut values = vec![2; 512];
899 values.append(vec![3; 200].as_mut());
900 values.append(vec![4; 300].as_mut());
901 values.append(vec![5; 12].as_mut());
902
903 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
905
906 let mut values = vec![3; 512];
907 values.append(vec![4; 512].as_mut());
908
909 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
911
912 let mut values = vec!["Apache"; 512];
914 values.append(vec!["Iceberg"; 512].as_mut());
915 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
916
917 let mut values = vec![100.0f64; 512];
919 values.append(vec![150.0f64; 12].as_mut());
920 values.append(vec![200.0f64; 500].as_mut());
921 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
922
923 let mut values = vec![100i32; 512];
925 values.append(vec![150i32; 12].as_mut());
926 values.append(vec![200i32; 500].as_mut());
927 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
928
929 let mut values = vec![100i64; 512];
931 values.append(vec![150i64; 12].as_mut());
932 values.append(vec![200i64; 500].as_mut());
933 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
934
935 let mut values = vec![false; 512];
937 values.append(vec![true; 512].as_mut());
938 let values: BooleanArray = values.into();
939 let col8 = Arc::new(values) as ArrayRef;
940
941 let to_write = RecordBatch::try_new(schema.clone(), vec![
942 col1, col2, col3, col4, col5, col6, col7, col8,
943 ])
944 .unwrap();
945
946 let props = WriterProperties::builder()
948 .set_compression(Compression::SNAPPY)
949 .build();
950
951 for n in 1..=3 {
952 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
953 let mut writer =
954 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
955
956 writer.write(&to_write).expect("Writing batch");
957
958 writer.close().unwrap();
960 }
961 }
962
963 pub async fn setup_unpartitioned_manifest_files(&mut self) {
964 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
965 let parent_snapshot = current_snapshot
966 .parent_snapshot(self.table.metadata())
967 .unwrap();
968 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
969 let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
970
971 let mut writer = ManifestWriterBuilder::new(
973 self.next_manifest_file(),
974 Some(current_snapshot.snapshot_id()),
975 None,
976 current_schema.clone(),
977 current_partition_spec.as_ref().clone(),
978 )
979 .build_v2_data();
980
981 let empty_partition = Struct::empty();
983
984 writer
985 .add_entry(
986 ManifestEntry::builder()
987 .status(ManifestStatus::Added)
988 .data_file(
989 DataFileBuilder::default()
990 .partition_spec_id(0)
991 .content(DataContentType::Data)
992 .file_path(format!("{}/1.parquet", &self.table_location))
993 .file_format(DataFileFormat::Parquet)
994 .file_size_in_bytes(100)
995 .record_count(1)
996 .partition(empty_partition.clone())
997 .key_metadata(None)
998 .build()
999 .unwrap(),
1000 )
1001 .build(),
1002 )
1003 .unwrap();
1004
1005 writer
1006 .add_delete_entry(
1007 ManifestEntry::builder()
1008 .status(ManifestStatus::Deleted)
1009 .snapshot_id(parent_snapshot.snapshot_id())
1010 .sequence_number(parent_snapshot.sequence_number())
1011 .file_sequence_number(parent_snapshot.sequence_number())
1012 .data_file(
1013 DataFileBuilder::default()
1014 .partition_spec_id(0)
1015 .content(DataContentType::Data)
1016 .file_path(format!("{}/2.parquet", &self.table_location))
1017 .file_format(DataFileFormat::Parquet)
1018 .file_size_in_bytes(100)
1019 .record_count(1)
1020 .partition(empty_partition.clone())
1021 .build()
1022 .unwrap(),
1023 )
1024 .build(),
1025 )
1026 .unwrap();
1027
1028 writer
1029 .add_existing_entry(
1030 ManifestEntry::builder()
1031 .status(ManifestStatus::Existing)
1032 .snapshot_id(parent_snapshot.snapshot_id())
1033 .sequence_number(parent_snapshot.sequence_number())
1034 .file_sequence_number(parent_snapshot.sequence_number())
1035 .data_file(
1036 DataFileBuilder::default()
1037 .partition_spec_id(0)
1038 .content(DataContentType::Data)
1039 .file_path(format!("{}/3.parquet", &self.table_location))
1040 .file_format(DataFileFormat::Parquet)
1041 .file_size_in_bytes(100)
1042 .record_count(1)
1043 .partition(empty_partition.clone())
1044 .build()
1045 .unwrap(),
1046 )
1047 .build(),
1048 )
1049 .unwrap();
1050
1051 let data_file_manifest = writer.write_manifest_file().await.unwrap();
1052
1053 let mut manifest_list_write = ManifestListWriter::v2(
1055 self.table
1056 .file_io()
1057 .new_output(current_snapshot.manifest_list())
1058 .unwrap(),
1059 current_snapshot.snapshot_id(),
1060 current_snapshot.parent_snapshot_id(),
1061 current_snapshot.sequence_number(),
1062 );
1063 manifest_list_write
1064 .add_manifests(vec![data_file_manifest].into_iter())
1065 .unwrap();
1066 manifest_list_write.close().await.unwrap();
1067
1068 let schema = {
1070 let fields = vec![
1071 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
1072 .with_metadata(HashMap::from([(
1073 PARQUET_FIELD_ID_META_KEY.to_string(),
1074 "1".to_string(),
1075 )])),
1076 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
1077 .with_metadata(HashMap::from([(
1078 PARQUET_FIELD_ID_META_KEY.to_string(),
1079 "2".to_string(),
1080 )])),
1081 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
1082 .with_metadata(HashMap::from([(
1083 PARQUET_FIELD_ID_META_KEY.to_string(),
1084 "3".to_string(),
1085 )])),
1086 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
1087 .with_metadata(HashMap::from([(
1088 PARQUET_FIELD_ID_META_KEY.to_string(),
1089 "4".to_string(),
1090 )])),
1091 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
1092 .with_metadata(HashMap::from([(
1093 PARQUET_FIELD_ID_META_KEY.to_string(),
1094 "5".to_string(),
1095 )])),
1096 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
1097 .with_metadata(HashMap::from([(
1098 PARQUET_FIELD_ID_META_KEY.to_string(),
1099 "6".to_string(),
1100 )])),
1101 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
1102 .with_metadata(HashMap::from([(
1103 PARQUET_FIELD_ID_META_KEY.to_string(),
1104 "7".to_string(),
1105 )])),
1106 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
1107 .with_metadata(HashMap::from([(
1108 PARQUET_FIELD_ID_META_KEY.to_string(),
1109 "8".to_string(),
1110 )])),
1111 ];
1112 Arc::new(arrow_schema::Schema::new(fields))
1113 };
1114
1115 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1117
1118 let mut values = vec![2; 512];
1119 values.append(vec![3; 200].as_mut());
1120 values.append(vec![4; 300].as_mut());
1121 values.append(vec![5; 12].as_mut());
1122 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1123
1124 let mut values = vec![3; 512];
1125 values.append(vec![4; 512].as_mut());
1126 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1127
1128 let mut values = vec!["Apache"; 512];
1129 values.append(vec!["Iceberg"; 512].as_mut());
1130 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
1131
1132 let mut values = vec![100.0f64; 512];
1133 values.append(vec![150.0f64; 12].as_mut());
1134 values.append(vec![200.0f64; 500].as_mut());
1135 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
1136
1137 let mut values = vec![100i32; 512];
1138 values.append(vec![150i32; 12].as_mut());
1139 values.append(vec![200i32; 500].as_mut());
1140 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
1141
1142 let mut values = vec![100i64; 512];
1143 values.append(vec![150i64; 12].as_mut());
1144 values.append(vec![200i64; 500].as_mut());
1145 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1146
1147 let mut values = vec![false; 512];
1148 values.append(vec![true; 512].as_mut());
1149 let values: BooleanArray = values.into();
1150 let col8 = Arc::new(values) as ArrayRef;
1151
1152 let to_write = RecordBatch::try_new(schema.clone(), vec![
1153 col1, col2, col3, col4, col5, col6, col7, col8,
1154 ])
1155 .unwrap();
1156
1157 let props = WriterProperties::builder()
1159 .set_compression(Compression::SNAPPY)
1160 .build();
1161
1162 for n in 1..=3 {
1163 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
1164 let mut writer =
1165 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1166
1167 writer.write(&to_write).expect("Writing batch");
1168
1169 writer.close().unwrap();
1171 }
1172 }
1173
1174 pub async fn setup_deadlock_manifests(&mut self) {
1175 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1176 let _parent_snapshot = current_snapshot
1177 .parent_snapshot(self.table.metadata())
1178 .unwrap();
1179 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1180 let current_partition_spec = self.table.metadata().default_partition_spec();
1181
1182 let mut writer = ManifestWriterBuilder::new(
1184 self.next_manifest_file(),
1185 Some(current_snapshot.snapshot_id()),
1186 None,
1187 current_schema.clone(),
1188 current_partition_spec.as_ref().clone(),
1189 )
1190 .build_v2_data();
1191
1192 for i in 0..10 {
1194 writer
1195 .add_entry(
1196 ManifestEntry::builder()
1197 .status(ManifestStatus::Added)
1198 .data_file(
1199 DataFileBuilder::default()
1200 .partition_spec_id(0)
1201 .content(DataContentType::Data)
1202 .file_path(format!("{}/{}.parquet", &self.table_location, i))
1203 .file_format(DataFileFormat::Parquet)
1204 .file_size_in_bytes(100)
1205 .record_count(1)
1206 .partition(Struct::from_iter([Some(Literal::long(100))]))
1207 .key_metadata(None)
1208 .build()
1209 .unwrap(),
1210 )
1211 .build(),
1212 )
1213 .unwrap();
1214 }
1215 let data_manifest = writer.write_manifest_file().await.unwrap();
1216
1217 let mut writer = ManifestWriterBuilder::new(
1219 self.next_manifest_file(),
1220 Some(current_snapshot.snapshot_id()),
1221 None,
1222 current_schema.clone(),
1223 current_partition_spec.as_ref().clone(),
1224 )
1225 .build_v2_deletes();
1226
1227 writer
1228 .add_entry(
1229 ManifestEntry::builder()
1230 .status(ManifestStatus::Added)
1231 .data_file(
1232 DataFileBuilder::default()
1233 .partition_spec_id(0)
1234 .content(DataContentType::PositionDeletes)
1235 .file_path(format!("{}/del.parquet", &self.table_location))
1236 .file_format(DataFileFormat::Parquet)
1237 .file_size_in_bytes(100)
1238 .record_count(1)
1239 .partition(Struct::from_iter([Some(Literal::long(100))]))
1240 .build()
1241 .unwrap(),
1242 )
1243 .build(),
1244 )
1245 .unwrap();
1246 let delete_manifest = writer.write_manifest_file().await.unwrap();
1247
1248 let mut manifest_list_write = ManifestListWriter::v2(
1251 self.table
1252 .file_io()
1253 .new_output(current_snapshot.manifest_list())
1254 .unwrap(),
1255 current_snapshot.snapshot_id(),
1256 current_snapshot.parent_snapshot_id(),
1257 current_snapshot.sequence_number(),
1258 );
1259 manifest_list_write
1260 .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1261 .unwrap();
1262 manifest_list_write.close().await.unwrap();
1263 }
1264 }
1265
1266 #[test]
1267 fn test_table_scan_columns() {
1268 let table = TableTestFixture::new().table;
1269
1270 let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1271 assert_eq!(
1272 Some(vec!["x".to_string(), "y".to_string()]),
1273 table_scan.column_names
1274 );
1275
1276 let table_scan = table
1277 .scan()
1278 .select(["x", "y"])
1279 .select(["z"])
1280 .build()
1281 .unwrap();
1282 assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1283 }
1284
1285 #[test]
1286 fn test_select_all() {
1287 let table = TableTestFixture::new().table;
1288
1289 let table_scan = table.scan().select_all().build().unwrap();
1290 assert!(table_scan.column_names.is_none());
1291 }
1292
1293 #[test]
1294 fn test_select_no_exist_column() {
1295 let table = TableTestFixture::new().table;
1296
1297 let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1298 assert!(table_scan.is_err());
1299 }
1300
1301 #[test]
1302 fn test_table_scan_default_snapshot_id() {
1303 let table = TableTestFixture::new().table;
1304
1305 let table_scan = table.scan().build().unwrap();
1306 assert_eq!(
1307 table.metadata().current_snapshot().unwrap().snapshot_id(),
1308 table_scan.snapshot().unwrap().snapshot_id()
1309 );
1310 }
1311
1312 #[test]
1313 fn test_table_scan_non_exist_snapshot_id() {
1314 let table = TableTestFixture::new().table;
1315
1316 let table_scan = table.scan().snapshot_id(1024).build();
1317 assert!(table_scan.is_err());
1318 }
1319
1320 #[test]
1321 fn test_table_scan_with_snapshot_id() {
1322 let table = TableTestFixture::new().table;
1323
1324 let table_scan = table
1325 .scan()
1326 .snapshot_id(3051729675574597004)
1327 .with_row_selection_enabled(true)
1328 .build()
1329 .unwrap();
1330 assert_eq!(
1331 table_scan.snapshot().unwrap().snapshot_id(),
1332 3051729675574597004
1333 );
1334 }
1335
1336 #[tokio::test]
1337 async fn test_plan_files_on_table_without_any_snapshots() {
1338 let table = TableTestFixture::new_empty().table;
1339 let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1340 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1341 assert!(batches.is_empty());
1342 }
1343
1344 #[tokio::test]
1345 async fn test_plan_files_no_deletions() {
1346 let mut fixture = TableTestFixture::new();
1347 fixture.setup_manifest_files().await;
1348
1349 let table_scan = fixture
1351 .table
1352 .scan()
1353 .with_row_selection_enabled(true)
1354 .build()
1355 .unwrap();
1356
1357 let mut tasks = table_scan
1358 .plan_files()
1359 .await
1360 .unwrap()
1361 .try_fold(vec![], |mut acc, task| async move {
1362 acc.push(task);
1363 Ok(acc)
1364 })
1365 .await
1366 .unwrap();
1367
1368 assert_eq!(tasks.len(), 2);
1369
1370 tasks.sort_by_key(|t| t.data_file_path.to_string());
1371
1372 assert_eq!(
1374 tasks[0].data_file_path,
1375 format!("{}/1.parquet", &fixture.table_location)
1376 );
1377
1378 assert_eq!(
1380 tasks[1].data_file_path,
1381 format!("{}/3.parquet", &fixture.table_location)
1382 );
1383 }
1384
1385 #[tokio::test]
1386 async fn test_open_parquet_no_deletions() {
1387 let mut fixture = TableTestFixture::new();
1388 fixture.setup_manifest_files().await;
1389
1390 let table_scan = fixture
1392 .table
1393 .scan()
1394 .with_row_selection_enabled(true)
1395 .build()
1396 .unwrap();
1397
1398 let batch_stream = table_scan.to_arrow().await.unwrap();
1399
1400 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1401
1402 let col = batches[0].column_by_name("x").unwrap();
1403
1404 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1405 assert_eq!(int64_arr.value(0), 1);
1406 }
1407
1408 #[tokio::test]
1409 async fn test_open_parquet_no_deletions_by_separate_reader() {
1410 let mut fixture = TableTestFixture::new();
1411 fixture.setup_manifest_files().await;
1412
1413 let table_scan = fixture
1415 .table
1416 .scan()
1417 .with_row_selection_enabled(true)
1418 .build()
1419 .unwrap();
1420
1421 let mut plan_task: Vec<_> = table_scan
1422 .plan_files()
1423 .await
1424 .unwrap()
1425 .try_collect()
1426 .await
1427 .unwrap();
1428 assert_eq!(plan_task.len(), 2);
1429
1430 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1431 let batch_stream = reader
1432 .clone()
1433 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1434 .unwrap();
1435 let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1436
1437 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1438 let batch_stream = reader
1439 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1440 .unwrap();
1441 let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1442
1443 assert_eq!(batch_1, batch_2);
1444 }
1445
1446 #[tokio::test]
1447 async fn test_open_parquet_with_projection() {
1448 let mut fixture = TableTestFixture::new();
1449 fixture.setup_manifest_files().await;
1450
1451 let table_scan = fixture
1453 .table
1454 .scan()
1455 .select(["x", "z"])
1456 .with_row_selection_enabled(true)
1457 .build()
1458 .unwrap();
1459
1460 let batch_stream = table_scan.to_arrow().await.unwrap();
1461
1462 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1463
1464 assert_eq!(batches[0].num_columns(), 2);
1465
1466 let col1 = batches[0].column_by_name("x").unwrap();
1467 let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1468 assert_eq!(int64_arr.value(0), 1);
1469
1470 let col2 = batches[0].column_by_name("z").unwrap();
1471 let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1472 assert_eq!(int64_arr.value(0), 3);
1473
1474 let table_scan = fixture.table.scan().select_empty().build().unwrap();
1476 let batch_stream = table_scan.to_arrow().await.unwrap();
1477 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1478
1479 assert_eq!(batches[0].num_columns(), 0);
1480 assert_eq!(batches[0].num_rows(), 1024);
1481 }
1482
1483 #[tokio::test]
1484 async fn test_filter_on_arrow_lt() {
1485 let mut fixture = TableTestFixture::new();
1486 fixture.setup_manifest_files().await;
1487
1488 let mut builder = fixture.table.scan();
1490 let predicate = Reference::new("y").less_than(Datum::long(3));
1491 builder = builder
1492 .with_filter(predicate)
1493 .with_row_selection_enabled(true);
1494 let table_scan = builder.build().unwrap();
1495
1496 let batch_stream = table_scan.to_arrow().await.unwrap();
1497
1498 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1499
1500 assert_eq!(batches[0].num_rows(), 512);
1501
1502 let col = batches[0].column_by_name("x").unwrap();
1503 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1504 assert_eq!(int64_arr.value(0), 1);
1505
1506 let col = batches[0].column_by_name("y").unwrap();
1507 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1508 assert_eq!(int64_arr.value(0), 2);
1509 }
1510
1511 #[tokio::test]
1512 async fn test_filter_on_arrow_gt_eq() {
1513 let mut fixture = TableTestFixture::new();
1514 fixture.setup_manifest_files().await;
1515
1516 let mut builder = fixture.table.scan();
1518 let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1519 builder = builder
1520 .with_filter(predicate)
1521 .with_row_selection_enabled(true);
1522 let table_scan = builder.build().unwrap();
1523
1524 let batch_stream = table_scan.to_arrow().await.unwrap();
1525
1526 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1527
1528 assert_eq!(batches[0].num_rows(), 12);
1529
1530 let col = batches[0].column_by_name("x").unwrap();
1531 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1532 assert_eq!(int64_arr.value(0), 1);
1533
1534 let col = batches[0].column_by_name("y").unwrap();
1535 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1536 assert_eq!(int64_arr.value(0), 5);
1537 }
1538
1539 #[tokio::test]
1540 async fn test_filter_double_eq() {
1541 let mut fixture = TableTestFixture::new();
1542 fixture.setup_manifest_files().await;
1543
1544 let mut builder = fixture.table.scan();
1546 let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1547 builder = builder
1548 .with_filter(predicate)
1549 .with_row_selection_enabled(true);
1550 let table_scan = builder.build().unwrap();
1551
1552 let batch_stream = table_scan.to_arrow().await.unwrap();
1553
1554 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1555
1556 assert_eq!(batches.len(), 2);
1557 assert_eq!(batches[0].num_rows(), 12);
1558
1559 let col = batches[0].column_by_name("dbl").unwrap();
1560 let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1561 assert_eq!(f64_arr.value(1), 150.0f64);
1562 }
1563
1564 #[tokio::test]
1565 async fn test_filter_int_eq() {
1566 let mut fixture = TableTestFixture::new();
1567 fixture.setup_manifest_files().await;
1568
1569 let mut builder = fixture.table.scan();
1571 let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1572 builder = builder
1573 .with_filter(predicate)
1574 .with_row_selection_enabled(true);
1575 let table_scan = builder.build().unwrap();
1576
1577 let batch_stream = table_scan.to_arrow().await.unwrap();
1578
1579 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1580
1581 assert_eq!(batches.len(), 2);
1582 assert_eq!(batches[0].num_rows(), 12);
1583
1584 let col = batches[0].column_by_name("i32").unwrap();
1585 let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1586 assert_eq!(i32_arr.value(1), 150i32);
1587 }
1588
1589 #[tokio::test]
1590 async fn test_filter_long_eq() {
1591 let mut fixture = TableTestFixture::new();
1592 fixture.setup_manifest_files().await;
1593
1594 let mut builder = fixture.table.scan();
1596 let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1597 builder = builder
1598 .with_filter(predicate)
1599 .with_row_selection_enabled(true);
1600 let table_scan = builder.build().unwrap();
1601
1602 let batch_stream = table_scan.to_arrow().await.unwrap();
1603
1604 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1605
1606 assert_eq!(batches.len(), 2);
1607 assert_eq!(batches[0].num_rows(), 12);
1608
1609 let col = batches[0].column_by_name("i64").unwrap();
1610 let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1611 assert_eq!(i64_arr.value(1), 150i64);
1612 }
1613
1614 #[tokio::test]
1615 async fn test_filter_bool_eq() {
1616 let mut fixture = TableTestFixture::new();
1617 fixture.setup_manifest_files().await;
1618
1619 let mut builder = fixture.table.scan();
1621 let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1622 builder = builder
1623 .with_filter(predicate)
1624 .with_row_selection_enabled(true);
1625 let table_scan = builder.build().unwrap();
1626
1627 let batch_stream = table_scan.to_arrow().await.unwrap();
1628
1629 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1630
1631 assert_eq!(batches.len(), 2);
1632 assert_eq!(batches[0].num_rows(), 512);
1633
1634 let col = batches[0].column_by_name("bool").unwrap();
1635 let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1636 assert!(bool_arr.value(1));
1637 }
1638
1639 #[tokio::test]
1640 async fn test_filter_on_arrow_is_null() {
1641 let mut fixture = TableTestFixture::new();
1642 fixture.setup_manifest_files().await;
1643
1644 let mut builder = fixture.table.scan();
1646 let predicate = Reference::new("y").is_null();
1647 builder = builder
1648 .with_filter(predicate)
1649 .with_row_selection_enabled(true);
1650 let table_scan = builder.build().unwrap();
1651
1652 let batch_stream = table_scan.to_arrow().await.unwrap();
1653
1654 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1655 assert_eq!(batches.len(), 0);
1656 }
1657
1658 #[tokio::test]
1659 async fn test_filter_on_arrow_is_not_null() {
1660 let mut fixture = TableTestFixture::new();
1661 fixture.setup_manifest_files().await;
1662
1663 let mut builder = fixture.table.scan();
1665 let predicate = Reference::new("y").is_not_null();
1666 builder = builder
1667 .with_filter(predicate)
1668 .with_row_selection_enabled(true);
1669 let table_scan = builder.build().unwrap();
1670
1671 let batch_stream = table_scan.to_arrow().await.unwrap();
1672
1673 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1674 assert_eq!(batches[0].num_rows(), 1024);
1675 }
1676
1677 #[tokio::test]
1678 async fn test_filter_on_arrow_lt_and_gt() {
1679 let mut fixture = TableTestFixture::new();
1680 fixture.setup_manifest_files().await;
1681
1682 let mut builder = fixture.table.scan();
1684 let predicate = Reference::new("y")
1685 .less_than(Datum::long(5))
1686 .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1687 builder = builder
1688 .with_filter(predicate)
1689 .with_row_selection_enabled(true);
1690 let table_scan = builder.build().unwrap();
1691
1692 let batch_stream = table_scan.to_arrow().await.unwrap();
1693
1694 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1695 assert_eq!(batches[0].num_rows(), 500);
1696
1697 let col = batches[0].column_by_name("x").unwrap();
1698 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1699 assert_eq!(col, &expected_x);
1700
1701 let col = batches[0].column_by_name("y").unwrap();
1702 let mut values = vec![];
1703 values.append(vec![3; 200].as_mut());
1704 values.append(vec![4; 300].as_mut());
1705 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1706 assert_eq!(col, &expected_y);
1707
1708 let col = batches[0].column_by_name("z").unwrap();
1709 let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1710 assert_eq!(col, &expected_z);
1711 }
1712
1713 #[tokio::test]
1714 async fn test_filter_on_arrow_lt_or_gt() {
1715 let mut fixture = TableTestFixture::new();
1716 fixture.setup_manifest_files().await;
1717
1718 let mut builder = fixture.table.scan();
1720 let predicate = Reference::new("y")
1721 .less_than(Datum::long(5))
1722 .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1723 builder = builder
1724 .with_filter(predicate)
1725 .with_row_selection_enabled(true);
1726 let table_scan = builder.build().unwrap();
1727
1728 let batch_stream = table_scan.to_arrow().await.unwrap();
1729
1730 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1731 assert_eq!(batches[0].num_rows(), 1024);
1732
1733 let col = batches[0].column_by_name("x").unwrap();
1734 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1735 assert_eq!(col, &expected_x);
1736
1737 let col = batches[0].column_by_name("y").unwrap();
1738 let mut values = vec![2; 512];
1739 values.append(vec![3; 200].as_mut());
1740 values.append(vec![4; 300].as_mut());
1741 values.append(vec![5; 12].as_mut());
1742 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1743 assert_eq!(col, &expected_y);
1744
1745 let col = batches[0].column_by_name("z").unwrap();
1746 let mut values = vec![3; 512];
1747 values.append(vec![4; 512].as_mut());
1748 let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1749 assert_eq!(col, &expected_z);
1750 }
1751
1752 #[tokio::test]
1753 async fn test_filter_on_arrow_startswith() {
1754 let mut fixture = TableTestFixture::new();
1755 fixture.setup_manifest_files().await;
1756
1757 let mut builder = fixture.table.scan();
1759 let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1760 builder = builder
1761 .with_filter(predicate)
1762 .with_row_selection_enabled(true);
1763 let table_scan = builder.build().unwrap();
1764
1765 let batch_stream = table_scan.to_arrow().await.unwrap();
1766
1767 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1768
1769 assert_eq!(batches[0].num_rows(), 512);
1770
1771 let col = batches[0].column_by_name("a").unwrap();
1772 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1773 assert_eq!(string_arr.value(0), "Iceberg");
1774 }
1775
1776 #[tokio::test]
1777 async fn test_filter_on_arrow_not_startswith() {
1778 let mut fixture = TableTestFixture::new();
1779 fixture.setup_manifest_files().await;
1780
1781 let mut builder = fixture.table.scan();
1783 let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1784 builder = builder
1785 .with_filter(predicate)
1786 .with_row_selection_enabled(true);
1787 let table_scan = builder.build().unwrap();
1788
1789 let batch_stream = table_scan.to_arrow().await.unwrap();
1790
1791 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1792
1793 assert_eq!(batches[0].num_rows(), 512);
1794
1795 let col = batches[0].column_by_name("a").unwrap();
1796 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1797 assert_eq!(string_arr.value(0), "Apache");
1798 }
1799
1800 #[tokio::test]
1801 async fn test_filter_on_arrow_in() {
1802 let mut fixture = TableTestFixture::new();
1803 fixture.setup_manifest_files().await;
1804
1805 let mut builder = fixture.table.scan();
1807 let predicate =
1808 Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1809 builder = builder
1810 .with_filter(predicate)
1811 .with_row_selection_enabled(true);
1812 let table_scan = builder.build().unwrap();
1813
1814 let batch_stream = table_scan.to_arrow().await.unwrap();
1815
1816 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1817
1818 assert_eq!(batches[0].num_rows(), 512);
1819
1820 let col = batches[0].column_by_name("a").unwrap();
1821 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1822 assert_eq!(string_arr.value(0), "Iceberg");
1823 }
1824
1825 #[tokio::test]
1826 async fn test_filter_on_arrow_not_in() {
1827 let mut fixture = TableTestFixture::new();
1828 fixture.setup_manifest_files().await;
1829
1830 let mut builder = fixture.table.scan();
1832 let predicate =
1833 Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1834 builder = builder
1835 .with_filter(predicate)
1836 .with_row_selection_enabled(true);
1837 let table_scan = builder.build().unwrap();
1838
1839 let batch_stream = table_scan.to_arrow().await.unwrap();
1840
1841 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1842
1843 assert_eq!(batches[0].num_rows(), 512);
1844
1845 let col = batches[0].column_by_name("a").unwrap();
1846 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1847 assert_eq!(string_arr.value(0), "Apache");
1848 }
1849
1850 #[test]
1851 fn test_file_scan_task_serialize_deserialize() {
1852 let test_fn = |task: FileScanTask| {
1853 let serialized = serde_json::to_string(&task).unwrap();
1854 let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1855
1856 assert_eq!(task.data_file_path, deserialized.data_file_path);
1857 assert_eq!(task.start, deserialized.start);
1858 assert_eq!(task.length, deserialized.length);
1859 assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1860 assert_eq!(task.predicate, deserialized.predicate);
1861 assert_eq!(task.schema, deserialized.schema);
1862 };
1863
1864 let schema = Arc::new(
1866 Schema::builder()
1867 .with_fields(vec![Arc::new(NestedField::required(
1868 1,
1869 "x",
1870 Type::Primitive(PrimitiveType::Binary),
1871 ))])
1872 .build()
1873 .unwrap(),
1874 );
1875 let task = FileScanTask {
1876 data_file_path: "data_file_path".to_string(),
1877 start: 0,
1878 length: 100,
1879 project_field_ids: vec![1, 2, 3],
1880 predicate: None,
1881 schema: schema.clone(),
1882 record_count: Some(100),
1883 data_file_format: DataFileFormat::Parquet,
1884 deletes: vec![],
1885 partition: None,
1886 partition_spec: None,
1887 name_mapping: None,
1888 };
1889 test_fn(task);
1890
1891 let task = FileScanTask {
1893 data_file_path: "data_file_path".to_string(),
1894 start: 0,
1895 length: 100,
1896 project_field_ids: vec![1, 2, 3],
1897 predicate: Some(BoundPredicate::AlwaysTrue),
1898 schema,
1899 record_count: None,
1900 data_file_format: DataFileFormat::Avro,
1901 deletes: vec![],
1902 partition: None,
1903 partition_spec: None,
1904 name_mapping: None,
1905 };
1906 test_fn(task);
1907 }
1908
1909 #[tokio::test]
1910 async fn test_select_with_file_column() {
1911 use arrow_array::cast::AsArray;
1912
1913 let mut fixture = TableTestFixture::new();
1914 fixture.setup_manifest_files().await;
1915
1916 let table_scan = fixture
1918 .table
1919 .scan()
1920 .select(["x", RESERVED_COL_NAME_FILE])
1921 .with_row_selection_enabled(true)
1922 .build()
1923 .unwrap();
1924
1925 let batch_stream = table_scan.to_arrow().await.unwrap();
1926 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1927
1928 assert_eq!(batches[0].num_columns(), 2);
1930
1931 let x_col = batches[0].column_by_name("x").unwrap();
1933 let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1934 assert_eq!(x_arr.value(0), 1);
1935
1936 let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1938 assert!(
1939 file_col.is_some(),
1940 "_file column should be present in the batch"
1941 );
1942
1943 let file_col = file_col.unwrap();
1945 assert!(
1946 matches!(
1947 file_col.data_type(),
1948 arrow_schema::DataType::RunEndEncoded(_, _)
1949 ),
1950 "_file column should use RunEndEncoded type"
1951 );
1952
1953 let run_array = file_col
1955 .as_any()
1956 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1957 .expect("_file column should be a RunArray");
1958
1959 let values = run_array.values();
1960 let string_values = values.as_string::<i32>();
1961 assert_eq!(string_values.len(), 1, "Should have a single file path");
1962
1963 let file_path = string_values.value(0);
1964 assert!(
1965 file_path.ends_with(".parquet"),
1966 "File path should end with .parquet, got: {file_path}"
1967 );
1968 }
1969
1970 #[tokio::test]
1971 async fn test_select_file_column_position() {
1972 let mut fixture = TableTestFixture::new();
1973 fixture.setup_manifest_files().await;
1974
1975 let table_scan = fixture
1977 .table
1978 .scan()
1979 .select(["x", RESERVED_COL_NAME_FILE, "z"])
1980 .with_row_selection_enabled(true)
1981 .build()
1982 .unwrap();
1983
1984 let batch_stream = table_scan.to_arrow().await.unwrap();
1985 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1986
1987 assert_eq!(batches[0].num_columns(), 3);
1988
1989 let schema = batches[0].schema();
1991 assert_eq!(schema.field(0).name(), "x");
1992 assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1993 assert_eq!(schema.field(2).name(), "z");
1994
1995 assert!(batches[0].column_by_name("x").is_some());
1997 assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1998 assert!(batches[0].column_by_name("z").is_some());
1999 }
2000
2001 #[tokio::test]
2002 async fn test_select_file_column_only() {
2003 let mut fixture = TableTestFixture::new();
2004 fixture.setup_manifest_files().await;
2005
2006 let table_scan = fixture
2008 .table
2009 .scan()
2010 .select([RESERVED_COL_NAME_FILE])
2011 .with_row_selection_enabled(true)
2012 .build()
2013 .unwrap();
2014
2015 let batch_stream = table_scan.to_arrow().await.unwrap();
2016 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2017
2018 assert_eq!(batches[0].num_columns(), 1);
2020
2021 let schema = batches[0].schema();
2023 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2024
2025 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
2029 assert_eq!(total_rows, 2048);
2030 }
2031
2032 #[tokio::test]
2033 async fn test_file_column_with_multiple_files() {
2034 use std::collections::HashSet;
2035
2036 let mut fixture = TableTestFixture::new();
2037 fixture.setup_manifest_files().await;
2038
2039 let table_scan = fixture
2041 .table
2042 .scan()
2043 .select(["x", RESERVED_COL_NAME_FILE])
2044 .with_row_selection_enabled(true)
2045 .build()
2046 .unwrap();
2047
2048 let batch_stream = table_scan.to_arrow().await.unwrap();
2049 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2050
2051 let mut file_paths = HashSet::new();
2053 for batch in &batches {
2054 let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
2055 let run_array = file_col
2056 .as_any()
2057 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
2058 .expect("_file column should be a RunArray");
2059
2060 let values = run_array.values();
2061 let string_values = values.as_string::<i32>();
2062 for i in 0..string_values.len() {
2063 file_paths.insert(string_values.value(i).to_string());
2064 }
2065 }
2066
2067 assert!(!file_paths.is_empty(), "Should have at least one file path");
2069
2070 for path in &file_paths {
2072 assert!(
2073 path.ends_with(".parquet"),
2074 "All file paths should end with .parquet, got: {path}"
2075 );
2076 }
2077 }
2078
2079 #[tokio::test]
2080 async fn test_file_column_at_start() {
2081 let mut fixture = TableTestFixture::new();
2082 fixture.setup_manifest_files().await;
2083
2084 let table_scan = fixture
2086 .table
2087 .scan()
2088 .select([RESERVED_COL_NAME_FILE, "x", "y"])
2089 .with_row_selection_enabled(true)
2090 .build()
2091 .unwrap();
2092
2093 let batch_stream = table_scan.to_arrow().await.unwrap();
2094 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2095
2096 assert_eq!(batches[0].num_columns(), 3);
2097
2098 let schema = batches[0].schema();
2100 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2101 assert_eq!(schema.field(1).name(), "x");
2102 assert_eq!(schema.field(2).name(), "y");
2103 }
2104
2105 #[tokio::test]
2106 async fn test_file_column_at_end() {
2107 let mut fixture = TableTestFixture::new();
2108 fixture.setup_manifest_files().await;
2109
2110 let table_scan = fixture
2112 .table
2113 .scan()
2114 .select(["x", "y", RESERVED_COL_NAME_FILE])
2115 .with_row_selection_enabled(true)
2116 .build()
2117 .unwrap();
2118
2119 let batch_stream = table_scan.to_arrow().await.unwrap();
2120 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2121
2122 assert_eq!(batches[0].num_columns(), 3);
2123
2124 let schema = batches[0].schema();
2126 assert_eq!(schema.field(0).name(), "x");
2127 assert_eq!(schema.field(1).name(), "y");
2128 assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2129 }
2130
2131 #[tokio::test]
2132 async fn test_select_with_repeated_column_names() {
2133 let mut fixture = TableTestFixture::new();
2134 fixture.setup_manifest_files().await;
2135
2136 let table_scan = fixture
2139 .table
2140 .scan()
2141 .select([
2142 "x",
2143 RESERVED_COL_NAME_FILE,
2144 "x", "y",
2146 RESERVED_COL_NAME_FILE, "y", ])
2149 .with_row_selection_enabled(true)
2150 .build()
2151 .unwrap();
2152
2153 let batch_stream = table_scan.to_arrow().await.unwrap();
2154 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2155
2156 assert_eq!(
2158 batches[0].num_columns(),
2159 6,
2160 "Should have exactly 6 columns with duplicates"
2161 );
2162
2163 let schema = batches[0].schema();
2164
2165 assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2167 assert_eq!(
2168 schema.field(1).name(),
2169 RESERVED_COL_NAME_FILE,
2170 "Column 1 should be _file"
2171 );
2172 assert_eq!(
2173 schema.field(2).name(),
2174 "x",
2175 "Column 2 should be x (duplicate)"
2176 );
2177 assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2178 assert_eq!(
2179 schema.field(4).name(),
2180 RESERVED_COL_NAME_FILE,
2181 "Column 4 should be _file (duplicate)"
2182 );
2183 assert_eq!(
2184 schema.field(5).name(),
2185 "y",
2186 "Column 5 should be y (duplicate)"
2187 );
2188
2189 assert!(
2191 matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2192 "Column x should be Int64"
2193 );
2194 assert!(
2195 matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2196 "Column x (duplicate) should be Int64"
2197 );
2198 assert!(
2199 matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2200 "Column y should be Int64"
2201 );
2202 assert!(
2203 matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2204 "Column y (duplicate) should be Int64"
2205 );
2206 assert!(
2207 matches!(
2208 schema.field(1).data_type(),
2209 arrow_schema::DataType::RunEndEncoded(_, _)
2210 ),
2211 "_file column should use RunEndEncoded type"
2212 );
2213 assert!(
2214 matches!(
2215 schema.field(4).data_type(),
2216 arrow_schema::DataType::RunEndEncoded(_, _)
2217 ),
2218 "_file column (duplicate) should use RunEndEncoded type"
2219 );
2220 }
2221
2222 #[tokio::test]
2223 async fn test_scan_deadlock() {
2224 let mut fixture = TableTestFixture::new();
2225 fixture.setup_deadlock_manifests().await;
2226
2227 let table_scan = fixture
2234 .table
2235 .scan()
2236 .with_concurrency_limit(1)
2237 .build()
2238 .unwrap();
2239
2240 let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2243 table_scan
2244 .plan_files()
2245 .await
2246 .unwrap()
2247 .try_collect::<Vec<_>>()
2248 .await
2249 })
2250 .await;
2251
2252 assert!(result.is_ok(), "Scan timed out - deadlock detected");
2254 }
2255}