1mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35use crate::delete_file_index::DeleteFileIndex;
36use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
37use crate::expr::{Bind, BoundPredicate, Predicate};
38use crate::io::FileIO;
39use crate::runtime::spawn;
40use crate::spec::{DataContentType, SnapshotRef};
41use crate::table::Table;
42use crate::utils::available_parallelism;
43use crate::{Error, ErrorKind, Result};
44
45pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
47
48pub struct TableScanBuilder<'a> {
50 table: &'a Table,
51 column_names: Option<Vec<String>>,
53 snapshot_id: Option<i64>,
54 batch_size: Option<usize>,
55 case_sensitive: bool,
56 filter: Option<Predicate>,
57 concurrency_limit_data_files: usize,
58 concurrency_limit_manifest_entries: usize,
59 concurrency_limit_manifest_files: usize,
60 row_group_filtering_enabled: bool,
61 row_selection_enabled: bool,
62}
63
64impl<'a> TableScanBuilder<'a> {
65 pub(crate) fn new(table: &'a Table) -> Self {
66 let num_cpus = available_parallelism().get();
67
68 Self {
69 table,
70 column_names: None,
71 snapshot_id: None,
72 batch_size: None,
73 case_sensitive: true,
74 filter: None,
75 concurrency_limit_data_files: num_cpus,
76 concurrency_limit_manifest_entries: num_cpus,
77 concurrency_limit_manifest_files: num_cpus,
78 row_group_filtering_enabled: true,
79 row_selection_enabled: false,
80 }
81 }
82
83 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
86 self.batch_size = batch_size;
87 self
88 }
89
90 pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
92 self.case_sensitive = case_sensitive;
93 self
94 }
95
96 pub fn with_filter(mut self, predicate: Predicate) -> Self {
98 self.filter = Some(predicate.rewrite_not());
101 self
102 }
103
104 pub fn select_all(mut self) -> Self {
106 self.column_names = None;
107 self
108 }
109
110 pub fn select_empty(mut self) -> Self {
112 self.column_names = Some(vec![]);
113 self
114 }
115
116 pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
118 self.column_names = Some(
119 column_names
120 .into_iter()
121 .map(|item| item.to_string())
122 .collect(),
123 );
124 self
125 }
126
127 pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
129 self.snapshot_id = Some(snapshot_id);
130 self
131 }
132
133 pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
136 self.concurrency_limit_manifest_files = limit;
137 self.concurrency_limit_manifest_entries = limit;
138 self.concurrency_limit_data_files = limit;
139 self
140 }
141
142 pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
144 self.concurrency_limit_data_files = limit;
145 self
146 }
147
148 pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
150 self.concurrency_limit_manifest_entries = limit;
151 self
152 }
153
154 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
163 self.row_group_filtering_enabled = row_group_filtering_enabled;
164 self
165 }
166
167 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
182 self.row_selection_enabled = row_selection_enabled;
183 self
184 }
185
186 pub fn build(self) -> Result<TableScan> {
188 let snapshot = match self.snapshot_id {
189 Some(snapshot_id) => self
190 .table
191 .metadata()
192 .snapshot_by_id(snapshot_id)
193 .ok_or_else(|| {
194 Error::new(
195 ErrorKind::DataInvalid,
196 format!("Snapshot with id {} not found", snapshot_id),
197 )
198 })?
199 .clone(),
200 None => {
201 let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
202 return Ok(TableScan {
203 batch_size: self.batch_size,
204 column_names: self.column_names,
205 file_io: self.table.file_io().clone(),
206 plan_context: None,
207 concurrency_limit_data_files: self.concurrency_limit_data_files,
208 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
209 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
210 row_group_filtering_enabled: self.row_group_filtering_enabled,
211 row_selection_enabled: self.row_selection_enabled,
212 });
213 };
214 current_snapshot_id.clone()
215 }
216 };
217
218 let schema = snapshot.schema(self.table.metadata())?;
219
220 if let Some(column_names) = self.column_names.as_ref() {
222 for column_name in column_names {
223 if schema.field_by_name(column_name).is_none() {
224 return Err(Error::new(
225 ErrorKind::DataInvalid,
226 format!(
227 "Column {} not found in table. Schema: {}",
228 column_name, schema
229 ),
230 ));
231 }
232 }
233 }
234
235 let mut field_ids = vec![];
236 let column_names = self.column_names.clone().unwrap_or_else(|| {
237 schema
238 .as_struct()
239 .fields()
240 .iter()
241 .map(|f| f.name.clone())
242 .collect()
243 });
244
245 for column_name in column_names.iter() {
246 let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
247 Error::new(
248 ErrorKind::DataInvalid,
249 format!(
250 "Column {} not found in table. Schema: {}",
251 column_name, schema
252 ),
253 )
254 })?;
255
256 schema
257 .as_struct()
258 .field_by_id(field_id)
259 .ok_or_else(|| {
260 Error::new(
261 ErrorKind::FeatureUnsupported,
262 format!(
263 "Column {} is not a direct child of schema but a nested field, which is not supported now. Schema: {}",
264 column_name, schema
265 ),
266 )
267 })?;
268
269 field_ids.push(field_id);
270 }
271
272 let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
273 Some(predicates.bind(schema.clone(), true)?)
274 } else {
275 None
276 };
277
278 let plan_context = PlanContext {
279 snapshot,
280 table_metadata: self.table.metadata_ref(),
281 snapshot_schema: schema,
282 case_sensitive: self.case_sensitive,
283 predicate: self.filter.map(Arc::new),
284 snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
285 object_cache: self.table.object_cache(),
286 field_ids: Arc::new(field_ids),
287 partition_filter_cache: Arc::new(PartitionFilterCache::new()),
288 manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
289 expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
290 };
291
292 Ok(TableScan {
293 batch_size: self.batch_size,
294 column_names: self.column_names,
295 file_io: self.table.file_io().clone(),
296 plan_context: Some(plan_context),
297 concurrency_limit_data_files: self.concurrency_limit_data_files,
298 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
299 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
300 row_group_filtering_enabled: self.row_group_filtering_enabled,
301 row_selection_enabled: self.row_selection_enabled,
302 })
303 }
304}
305
306#[derive(Debug)]
308pub struct TableScan {
309 plan_context: Option<PlanContext>,
313 batch_size: Option<usize>,
314 file_io: FileIO,
315 column_names: Option<Vec<String>>,
316 concurrency_limit_manifest_files: usize,
319
320 concurrency_limit_manifest_entries: usize,
323
324 concurrency_limit_data_files: usize,
327
328 row_group_filtering_enabled: bool,
329 row_selection_enabled: bool,
330}
331
332impl TableScan {
333 pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
335 let Some(plan_context) = self.plan_context.as_ref() else {
336 return Ok(Box::pin(futures::stream::empty()));
337 };
338
339 let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
340 let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
341
342 let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
344 channel(concurrency_limit_manifest_files);
345 let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
346 channel(concurrency_limit_manifest_files);
347
348 let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
350
351 let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
352
353 let manifest_list = plan_context.get_manifest_list().await?;
354
355 let manifest_file_contexts = plan_context.build_manifest_file_contexts(
359 manifest_list,
360 manifest_entry_data_ctx_tx,
361 delete_file_idx.clone(),
362 manifest_entry_delete_ctx_tx,
363 )?;
364
365 let mut channel_for_manifest_error = file_scan_task_tx.clone();
366
367 spawn(async move {
369 let result = futures::stream::iter(manifest_file_contexts)
370 .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
371 ctx.fetch_manifest_and_stream_manifest_entries().await
372 })
373 .await;
374
375 if let Err(error) = result {
376 let _ = channel_for_manifest_error.send(Err(error)).await;
377 }
378 });
379
380 let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
381 let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
382
383 spawn(async move {
385 let result = manifest_entry_delete_ctx_rx
386 .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
387 .try_for_each_concurrent(
388 concurrency_limit_manifest_entries,
389 |(manifest_entry_context, tx)| async move {
390 spawn(async move {
391 Self::process_delete_manifest_entry(manifest_entry_context, tx).await
392 })
393 .await
394 },
395 )
396 .await;
397
398 if let Err(error) = result {
399 let _ = channel_for_delete_manifest_entry_error
400 .send(Err(error))
401 .await;
402 }
403 })
404 .await;
405
406 spawn(async move {
408 let result = manifest_entry_data_ctx_rx
409 .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
410 .try_for_each_concurrent(
411 concurrency_limit_manifest_entries,
412 |(manifest_entry_context, tx)| async move {
413 spawn(async move {
414 Self::process_data_manifest_entry(manifest_entry_context, tx).await
415 })
416 .await
417 },
418 )
419 .await;
420
421 if let Err(error) = result {
422 let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
423 }
424 });
425
426 Ok(file_scan_task_rx.boxed())
427 }
428
429 pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
431 let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
432 .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
433 .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
434 .with_row_selection_enabled(self.row_selection_enabled);
435
436 if let Some(batch_size) = self.batch_size {
437 arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
438 }
439
440 arrow_reader_builder.build().read(self.plan_files().await?)
441 }
442
443 pub fn column_names(&self) -> Option<&[String]> {
445 self.column_names.as_deref()
446 }
447
448 pub fn snapshot(&self) -> Option<&SnapshotRef> {
450 self.plan_context.as_ref().map(|x| &x.snapshot)
451 }
452
453 async fn process_data_manifest_entry(
454 manifest_entry_context: ManifestEntryContext,
455 mut file_scan_task_tx: Sender<Result<FileScanTask>>,
456 ) -> Result<()> {
457 if !manifest_entry_context.manifest_entry.is_alive() {
459 return Ok(());
460 }
461
462 if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
464 return Err(Error::new(
465 ErrorKind::FeatureUnsupported,
466 "Encountered an entry for a delete file in a data file manifest",
467 ));
468 }
469
470 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
471 let BoundPredicates {
472 snapshot_bound_predicate,
473 partition_bound_predicate,
474 } = bound_predicates.as_ref();
475
476 let expression_evaluator_cache =
477 manifest_entry_context.expression_evaluator_cache.as_ref();
478
479 let expression_evaluator = expression_evaluator_cache.get(
480 manifest_entry_context.partition_spec_id,
481 partition_bound_predicate,
482 )?;
483
484 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
487 return Ok(());
488 }
489
490 if !InclusiveMetricsEvaluator::eval(
492 snapshot_bound_predicate,
493 manifest_entry_context.manifest_entry.data_file(),
494 false,
495 )? {
496 return Ok(());
497 }
498 }
499
500 file_scan_task_tx
504 .send(Ok(manifest_entry_context.into_file_scan_task().await?))
505 .await?;
506
507 Ok(())
508 }
509
510 async fn process_delete_manifest_entry(
511 manifest_entry_context: ManifestEntryContext,
512 mut delete_file_ctx_tx: Sender<DeleteFileContext>,
513 ) -> Result<()> {
514 if !manifest_entry_context.manifest_entry.is_alive() {
516 return Ok(());
517 }
518
519 if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
521 return Err(Error::new(
522 ErrorKind::FeatureUnsupported,
523 "Encountered an entry for a data file in a delete manifest",
524 ));
525 }
526
527 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
528 let expression_evaluator_cache =
529 manifest_entry_context.expression_evaluator_cache.as_ref();
530
531 let expression_evaluator = expression_evaluator_cache.get(
532 manifest_entry_context.partition_spec_id,
533 &bound_predicates.partition_bound_predicate,
534 )?;
535
536 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
539 return Ok(());
540 }
541 }
542
543 delete_file_ctx_tx
544 .send(DeleteFileContext {
545 manifest_entry: manifest_entry_context.manifest_entry.clone(),
546 partition_spec_id: manifest_entry_context.partition_spec_id,
547 })
548 .await?;
549
550 Ok(())
551 }
552}
553
554pub(crate) struct BoundPredicates {
555 partition_bound_predicate: BoundPredicate,
556 snapshot_bound_predicate: BoundPredicate,
557}
558
559#[cfg(test)]
560pub mod tests {
561 #![allow(missing_docs)]
563
564 use std::collections::HashMap;
565 use std::fs;
566 use std::fs::File;
567 use std::sync::Arc;
568
569 use arrow_array::{
570 ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
571 };
572 use futures::{TryStreamExt, stream};
573 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
574 use parquet::basic::Compression;
575 use parquet::file::properties::WriterProperties;
576 use tempfile::TempDir;
577 use tera::{Context, Tera};
578 use uuid::Uuid;
579
580 use crate::TableIdent;
581 use crate::arrow::ArrowReaderBuilder;
582 use crate::expr::{BoundPredicate, Reference};
583 use crate::io::{FileIO, OutputFile};
584 use crate::scan::FileScanTask;
585 use crate::spec::{
586 DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
587 ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
588 PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
589 };
590 use crate::table::Table;
591
592 pub struct TableTestFixture {
593 pub table_location: String,
594 pub table: Table,
595 }
596
597 impl TableTestFixture {
598 #[allow(clippy::new_without_default)]
599 pub fn new() -> Self {
600 let tmp_dir = TempDir::new().unwrap();
601 let table_location = tmp_dir.path().join("table1");
602 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
603 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
604 let table_metadata1_location = table_location.join("metadata/v1.json");
605
606 let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
607 .unwrap()
608 .build()
609 .unwrap();
610
611 let table_metadata = {
612 let template_json_str = fs::read_to_string(format!(
613 "{}/testdata/example_table_metadata_v2.json",
614 env!("CARGO_MANIFEST_DIR")
615 ))
616 .unwrap();
617 let mut context = Context::new();
618 context.insert("table_location", &table_location);
619 context.insert("manifest_list_1_location", &manifest_list1_location);
620 context.insert("manifest_list_2_location", &manifest_list2_location);
621 context.insert("table_metadata_1_location", &table_metadata1_location);
622
623 let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap();
624 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
625 };
626
627 let table = Table::builder()
628 .metadata(table_metadata)
629 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
630 .file_io(file_io.clone())
631 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
632 .build()
633 .unwrap();
634
635 Self {
636 table_location: table_location.to_str().unwrap().to_string(),
637 table,
638 }
639 }
640
641 #[allow(clippy::new_without_default)]
642 pub fn new_empty() -> Self {
643 let tmp_dir = TempDir::new().unwrap();
644 let table_location = tmp_dir.path().join("table1");
645 let table_metadata1_location = table_location.join("metadata/v1.json");
646
647 let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
648 .unwrap()
649 .build()
650 .unwrap();
651
652 let table_metadata = {
653 let template_json_str = fs::read_to_string(format!(
654 "{}/testdata/example_empty_table_metadata_v2.json",
655 env!("CARGO_MANIFEST_DIR")
656 ))
657 .unwrap();
658 let mut context = Context::new();
659 context.insert("table_location", &table_location);
660 context.insert("table_metadata_1_location", &table_metadata1_location);
661
662 let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap();
663 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
664 };
665
666 let table = Table::builder()
667 .metadata(table_metadata)
668 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
669 .file_io(file_io.clone())
670 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
671 .build()
672 .unwrap();
673
674 Self {
675 table_location: table_location.to_str().unwrap().to_string(),
676 table,
677 }
678 }
679
680 pub fn new_unpartitioned() -> Self {
681 let tmp_dir = TempDir::new().unwrap();
682 let table_location = tmp_dir.path().join("table1");
683 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
684 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
685 let table_metadata1_location = table_location.join("metadata/v1.json");
686
687 let file_io = FileIO::from_path(table_location.to_str().unwrap())
688 .unwrap()
689 .build()
690 .unwrap();
691
692 let mut table_metadata = {
693 let template_json_str = fs::read_to_string(format!(
694 "{}/testdata/example_table_metadata_v2.json",
695 env!("CARGO_MANIFEST_DIR")
696 ))
697 .unwrap();
698 let mut context = Context::new();
699 context.insert("table_location", &table_location);
700 context.insert("manifest_list_1_location", &manifest_list1_location);
701 context.insert("manifest_list_2_location", &manifest_list2_location);
702 context.insert("table_metadata_1_location", &table_metadata1_location);
703
704 let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap();
705 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
706 };
707
708 table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
709 table_metadata.partition_specs.clear();
710 table_metadata.default_partition_type = StructType::new(vec![]);
711 table_metadata
712 .partition_specs
713 .insert(0, table_metadata.default_spec.clone());
714
715 let table = Table::builder()
716 .metadata(table_metadata)
717 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
718 .file_io(file_io.clone())
719 .metadata_location(table_metadata1_location.to_str().unwrap())
720 .build()
721 .unwrap();
722
723 Self {
724 table_location: table_location.to_str().unwrap().to_string(),
725 table,
726 }
727 }
728
729 fn next_manifest_file(&self) -> OutputFile {
730 self.table
731 .file_io()
732 .new_output(format!(
733 "{}/metadata/manifest_{}.avro",
734 self.table_location,
735 Uuid::new_v4()
736 ))
737 .unwrap()
738 }
739
740 pub async fn setup_manifest_files(&mut self) {
741 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
742 let parent_snapshot = current_snapshot
743 .parent_snapshot(self.table.metadata())
744 .unwrap();
745 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
746 let current_partition_spec = self.table.metadata().default_partition_spec();
747
748 let mut writer = ManifestWriterBuilder::new(
750 self.next_manifest_file(),
751 Some(current_snapshot.snapshot_id()),
752 None,
753 current_schema.clone(),
754 current_partition_spec.as_ref().clone(),
755 )
756 .build_v2_data();
757 writer
758 .add_entry(
759 ManifestEntry::builder()
760 .status(ManifestStatus::Added)
761 .data_file(
762 DataFileBuilder::default()
763 .partition_spec_id(0)
764 .content(DataContentType::Data)
765 .file_path(format!("{}/1.parquet", &self.table_location))
766 .file_format(DataFileFormat::Parquet)
767 .file_size_in_bytes(100)
768 .record_count(1)
769 .partition(Struct::from_iter([Some(Literal::long(100))]))
770 .key_metadata(None)
771 .build()
772 .unwrap(),
773 )
774 .build(),
775 )
776 .unwrap();
777 writer
778 .add_delete_entry(
779 ManifestEntry::builder()
780 .status(ManifestStatus::Deleted)
781 .snapshot_id(parent_snapshot.snapshot_id())
782 .sequence_number(parent_snapshot.sequence_number())
783 .file_sequence_number(parent_snapshot.sequence_number())
784 .data_file(
785 DataFileBuilder::default()
786 .partition_spec_id(0)
787 .content(DataContentType::Data)
788 .file_path(format!("{}/2.parquet", &self.table_location))
789 .file_format(DataFileFormat::Parquet)
790 .file_size_in_bytes(100)
791 .record_count(1)
792 .partition(Struct::from_iter([Some(Literal::long(200))]))
793 .build()
794 .unwrap(),
795 )
796 .build(),
797 )
798 .unwrap();
799 writer
800 .add_existing_entry(
801 ManifestEntry::builder()
802 .status(ManifestStatus::Existing)
803 .snapshot_id(parent_snapshot.snapshot_id())
804 .sequence_number(parent_snapshot.sequence_number())
805 .file_sequence_number(parent_snapshot.sequence_number())
806 .data_file(
807 DataFileBuilder::default()
808 .partition_spec_id(0)
809 .content(DataContentType::Data)
810 .file_path(format!("{}/3.parquet", &self.table_location))
811 .file_format(DataFileFormat::Parquet)
812 .file_size_in_bytes(100)
813 .record_count(1)
814 .partition(Struct::from_iter([Some(Literal::long(300))]))
815 .build()
816 .unwrap(),
817 )
818 .build(),
819 )
820 .unwrap();
821 let data_file_manifest = writer.write_manifest_file().await.unwrap();
822
823 let mut manifest_list_write = ManifestListWriter::v2(
825 self.table
826 .file_io()
827 .new_output(current_snapshot.manifest_list())
828 .unwrap(),
829 current_snapshot.snapshot_id(),
830 current_snapshot.parent_snapshot_id(),
831 current_snapshot.sequence_number(),
832 );
833 manifest_list_write
834 .add_manifests(vec![data_file_manifest].into_iter())
835 .unwrap();
836 manifest_list_write.close().await.unwrap();
837
838 let schema = {
840 let fields = vec![
841 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
842 .with_metadata(HashMap::from([(
843 PARQUET_FIELD_ID_META_KEY.to_string(),
844 "1".to_string(),
845 )])),
846 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
847 .with_metadata(HashMap::from([(
848 PARQUET_FIELD_ID_META_KEY.to_string(),
849 "2".to_string(),
850 )])),
851 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
852 .with_metadata(HashMap::from([(
853 PARQUET_FIELD_ID_META_KEY.to_string(),
854 "3".to_string(),
855 )])),
856 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
857 .with_metadata(HashMap::from([(
858 PARQUET_FIELD_ID_META_KEY.to_string(),
859 "4".to_string(),
860 )])),
861 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
862 .with_metadata(HashMap::from([(
863 PARQUET_FIELD_ID_META_KEY.to_string(),
864 "5".to_string(),
865 )])),
866 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
867 .with_metadata(HashMap::from([(
868 PARQUET_FIELD_ID_META_KEY.to_string(),
869 "6".to_string(),
870 )])),
871 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
872 .with_metadata(HashMap::from([(
873 PARQUET_FIELD_ID_META_KEY.to_string(),
874 "7".to_string(),
875 )])),
876 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
877 .with_metadata(HashMap::from([(
878 PARQUET_FIELD_ID_META_KEY.to_string(),
879 "8".to_string(),
880 )])),
881 ];
882 Arc::new(arrow_schema::Schema::new(fields))
883 };
884 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
886
887 let mut values = vec![2; 512];
888 values.append(vec![3; 200].as_mut());
889 values.append(vec![4; 300].as_mut());
890 values.append(vec![5; 12].as_mut());
891
892 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
894
895 let mut values = vec![3; 512];
896 values.append(vec![4; 512].as_mut());
897
898 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
900
901 let mut values = vec!["Apache"; 512];
903 values.append(vec!["Iceberg"; 512].as_mut());
904 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
905
906 let mut values = vec![100.0f64; 512];
908 values.append(vec![150.0f64; 12].as_mut());
909 values.append(vec![200.0f64; 500].as_mut());
910 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
911
912 let mut values = vec![100i32; 512];
914 values.append(vec![150i32; 12].as_mut());
915 values.append(vec![200i32; 500].as_mut());
916 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
917
918 let mut values = vec![100i64; 512];
920 values.append(vec![150i64; 12].as_mut());
921 values.append(vec![200i64; 500].as_mut());
922 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
923
924 let mut values = vec![false; 512];
926 values.append(vec![true; 512].as_mut());
927 let values: BooleanArray = values.into();
928 let col8 = Arc::new(values) as ArrayRef;
929
930 let to_write = RecordBatch::try_new(schema.clone(), vec![
931 col1, col2, col3, col4, col5, col6, col7, col8,
932 ])
933 .unwrap();
934
935 let props = WriterProperties::builder()
937 .set_compression(Compression::SNAPPY)
938 .build();
939
940 for n in 1..=3 {
941 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
942 let mut writer =
943 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
944
945 writer.write(&to_write).expect("Writing batch");
946
947 writer.close().unwrap();
949 }
950 }
951
952 pub async fn setup_unpartitioned_manifest_files(&mut self) {
953 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
954 let parent_snapshot = current_snapshot
955 .parent_snapshot(self.table.metadata())
956 .unwrap();
957 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
958 let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
959
960 let mut writer = ManifestWriterBuilder::new(
962 self.next_manifest_file(),
963 Some(current_snapshot.snapshot_id()),
964 None,
965 current_schema.clone(),
966 current_partition_spec.as_ref().clone(),
967 )
968 .build_v2_data();
969
970 let empty_partition = Struct::empty();
972
973 writer
974 .add_entry(
975 ManifestEntry::builder()
976 .status(ManifestStatus::Added)
977 .data_file(
978 DataFileBuilder::default()
979 .partition_spec_id(0)
980 .content(DataContentType::Data)
981 .file_path(format!("{}/1.parquet", &self.table_location))
982 .file_format(DataFileFormat::Parquet)
983 .file_size_in_bytes(100)
984 .record_count(1)
985 .partition(empty_partition.clone())
986 .key_metadata(None)
987 .build()
988 .unwrap(),
989 )
990 .build(),
991 )
992 .unwrap();
993
994 writer
995 .add_delete_entry(
996 ManifestEntry::builder()
997 .status(ManifestStatus::Deleted)
998 .snapshot_id(parent_snapshot.snapshot_id())
999 .sequence_number(parent_snapshot.sequence_number())
1000 .file_sequence_number(parent_snapshot.sequence_number())
1001 .data_file(
1002 DataFileBuilder::default()
1003 .partition_spec_id(0)
1004 .content(DataContentType::Data)
1005 .file_path(format!("{}/2.parquet", &self.table_location))
1006 .file_format(DataFileFormat::Parquet)
1007 .file_size_in_bytes(100)
1008 .record_count(1)
1009 .partition(empty_partition.clone())
1010 .build()
1011 .unwrap(),
1012 )
1013 .build(),
1014 )
1015 .unwrap();
1016
1017 writer
1018 .add_existing_entry(
1019 ManifestEntry::builder()
1020 .status(ManifestStatus::Existing)
1021 .snapshot_id(parent_snapshot.snapshot_id())
1022 .sequence_number(parent_snapshot.sequence_number())
1023 .file_sequence_number(parent_snapshot.sequence_number())
1024 .data_file(
1025 DataFileBuilder::default()
1026 .partition_spec_id(0)
1027 .content(DataContentType::Data)
1028 .file_path(format!("{}/3.parquet", &self.table_location))
1029 .file_format(DataFileFormat::Parquet)
1030 .file_size_in_bytes(100)
1031 .record_count(1)
1032 .partition(empty_partition.clone())
1033 .build()
1034 .unwrap(),
1035 )
1036 .build(),
1037 )
1038 .unwrap();
1039
1040 let data_file_manifest = writer.write_manifest_file().await.unwrap();
1041
1042 let mut manifest_list_write = ManifestListWriter::v2(
1044 self.table
1045 .file_io()
1046 .new_output(current_snapshot.manifest_list())
1047 .unwrap(),
1048 current_snapshot.snapshot_id(),
1049 current_snapshot.parent_snapshot_id(),
1050 current_snapshot.sequence_number(),
1051 );
1052 manifest_list_write
1053 .add_manifests(vec![data_file_manifest].into_iter())
1054 .unwrap();
1055 manifest_list_write.close().await.unwrap();
1056
1057 let schema = {
1059 let fields = vec![
1060 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
1061 .with_metadata(HashMap::from([(
1062 PARQUET_FIELD_ID_META_KEY.to_string(),
1063 "1".to_string(),
1064 )])),
1065 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
1066 .with_metadata(HashMap::from([(
1067 PARQUET_FIELD_ID_META_KEY.to_string(),
1068 "2".to_string(),
1069 )])),
1070 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
1071 .with_metadata(HashMap::from([(
1072 PARQUET_FIELD_ID_META_KEY.to_string(),
1073 "3".to_string(),
1074 )])),
1075 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
1076 .with_metadata(HashMap::from([(
1077 PARQUET_FIELD_ID_META_KEY.to_string(),
1078 "4".to_string(),
1079 )])),
1080 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
1081 .with_metadata(HashMap::from([(
1082 PARQUET_FIELD_ID_META_KEY.to_string(),
1083 "5".to_string(),
1084 )])),
1085 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
1086 .with_metadata(HashMap::from([(
1087 PARQUET_FIELD_ID_META_KEY.to_string(),
1088 "6".to_string(),
1089 )])),
1090 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
1091 .with_metadata(HashMap::from([(
1092 PARQUET_FIELD_ID_META_KEY.to_string(),
1093 "7".to_string(),
1094 )])),
1095 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
1096 .with_metadata(HashMap::from([(
1097 PARQUET_FIELD_ID_META_KEY.to_string(),
1098 "8".to_string(),
1099 )])),
1100 ];
1101 Arc::new(arrow_schema::Schema::new(fields))
1102 };
1103
1104 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1106
1107 let mut values = vec![2; 512];
1108 values.append(vec![3; 200].as_mut());
1109 values.append(vec![4; 300].as_mut());
1110 values.append(vec![5; 12].as_mut());
1111 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1112
1113 let mut values = vec![3; 512];
1114 values.append(vec![4; 512].as_mut());
1115 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1116
1117 let mut values = vec!["Apache"; 512];
1118 values.append(vec!["Iceberg"; 512].as_mut());
1119 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
1120
1121 let mut values = vec![100.0f64; 512];
1122 values.append(vec![150.0f64; 12].as_mut());
1123 values.append(vec![200.0f64; 500].as_mut());
1124 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
1125
1126 let mut values = vec![100i32; 512];
1127 values.append(vec![150i32; 12].as_mut());
1128 values.append(vec![200i32; 500].as_mut());
1129 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
1130
1131 let mut values = vec![100i64; 512];
1132 values.append(vec![150i64; 12].as_mut());
1133 values.append(vec![200i64; 500].as_mut());
1134 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1135
1136 let mut values = vec![false; 512];
1137 values.append(vec![true; 512].as_mut());
1138 let values: BooleanArray = values.into();
1139 let col8 = Arc::new(values) as ArrayRef;
1140
1141 let to_write = RecordBatch::try_new(schema.clone(), vec![
1142 col1, col2, col3, col4, col5, col6, col7, col8,
1143 ])
1144 .unwrap();
1145
1146 let props = WriterProperties::builder()
1148 .set_compression(Compression::SNAPPY)
1149 .build();
1150
1151 for n in 1..=3 {
1152 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
1153 let mut writer =
1154 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1155
1156 writer.write(&to_write).expect("Writing batch");
1157
1158 writer.close().unwrap();
1160 }
1161 }
1162 }
1163
1164 #[test]
1165 fn test_table_scan_columns() {
1166 let table = TableTestFixture::new().table;
1167
1168 let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1169 assert_eq!(
1170 Some(vec!["x".to_string(), "y".to_string()]),
1171 table_scan.column_names
1172 );
1173
1174 let table_scan = table
1175 .scan()
1176 .select(["x", "y"])
1177 .select(["z"])
1178 .build()
1179 .unwrap();
1180 assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1181 }
1182
1183 #[test]
1184 fn test_select_all() {
1185 let table = TableTestFixture::new().table;
1186
1187 let table_scan = table.scan().select_all().build().unwrap();
1188 assert!(table_scan.column_names.is_none());
1189 }
1190
1191 #[test]
1192 fn test_select_no_exist_column() {
1193 let table = TableTestFixture::new().table;
1194
1195 let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1196 assert!(table_scan.is_err());
1197 }
1198
1199 #[test]
1200 fn test_table_scan_default_snapshot_id() {
1201 let table = TableTestFixture::new().table;
1202
1203 let table_scan = table.scan().build().unwrap();
1204 assert_eq!(
1205 table.metadata().current_snapshot().unwrap().snapshot_id(),
1206 table_scan.snapshot().unwrap().snapshot_id()
1207 );
1208 }
1209
1210 #[test]
1211 fn test_table_scan_non_exist_snapshot_id() {
1212 let table = TableTestFixture::new().table;
1213
1214 let table_scan = table.scan().snapshot_id(1024).build();
1215 assert!(table_scan.is_err());
1216 }
1217
1218 #[test]
1219 fn test_table_scan_with_snapshot_id() {
1220 let table = TableTestFixture::new().table;
1221
1222 let table_scan = table
1223 .scan()
1224 .snapshot_id(3051729675574597004)
1225 .with_row_selection_enabled(true)
1226 .build()
1227 .unwrap();
1228 assert_eq!(
1229 table_scan.snapshot().unwrap().snapshot_id(),
1230 3051729675574597004
1231 );
1232 }
1233
1234 #[tokio::test]
1235 async fn test_plan_files_on_table_without_any_snapshots() {
1236 let table = TableTestFixture::new_empty().table;
1237 let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1238 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1239 assert!(batches.is_empty());
1240 }
1241
1242 #[tokio::test]
1243 async fn test_plan_files_no_deletions() {
1244 let mut fixture = TableTestFixture::new();
1245 fixture.setup_manifest_files().await;
1246
1247 let table_scan = fixture
1249 .table
1250 .scan()
1251 .with_row_selection_enabled(true)
1252 .build()
1253 .unwrap();
1254
1255 let mut tasks = table_scan
1256 .plan_files()
1257 .await
1258 .unwrap()
1259 .try_fold(vec![], |mut acc, task| async move {
1260 acc.push(task);
1261 Ok(acc)
1262 })
1263 .await
1264 .unwrap();
1265
1266 assert_eq!(tasks.len(), 2);
1267
1268 tasks.sort_by_key(|t| t.data_file_path.to_string());
1269
1270 assert_eq!(
1272 tasks[0].data_file_path,
1273 format!("{}/1.parquet", &fixture.table_location)
1274 );
1275
1276 assert_eq!(
1278 tasks[1].data_file_path,
1279 format!("{}/3.parquet", &fixture.table_location)
1280 );
1281 }
1282
1283 #[tokio::test]
1284 async fn test_open_parquet_no_deletions() {
1285 let mut fixture = TableTestFixture::new();
1286 fixture.setup_manifest_files().await;
1287
1288 let table_scan = fixture
1290 .table
1291 .scan()
1292 .with_row_selection_enabled(true)
1293 .build()
1294 .unwrap();
1295
1296 let batch_stream = table_scan.to_arrow().await.unwrap();
1297
1298 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1299
1300 let col = batches[0].column_by_name("x").unwrap();
1301
1302 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1303 assert_eq!(int64_arr.value(0), 1);
1304 }
1305
1306 #[tokio::test]
1307 async fn test_open_parquet_no_deletions_by_separate_reader() {
1308 let mut fixture = TableTestFixture::new();
1309 fixture.setup_manifest_files().await;
1310
1311 let table_scan = fixture
1313 .table
1314 .scan()
1315 .with_row_selection_enabled(true)
1316 .build()
1317 .unwrap();
1318
1319 let mut plan_task: Vec<_> = table_scan
1320 .plan_files()
1321 .await
1322 .unwrap()
1323 .try_collect()
1324 .await
1325 .unwrap();
1326 assert_eq!(plan_task.len(), 2);
1327
1328 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1329 let batch_stream = reader
1330 .clone()
1331 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1332 .unwrap();
1333 let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1334
1335 let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
1336 let batch_stream = reader
1337 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1338 .unwrap();
1339 let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1340
1341 assert_eq!(batch_1, batch_2);
1342 }
1343
1344 #[tokio::test]
1345 async fn test_open_parquet_with_projection() {
1346 let mut fixture = TableTestFixture::new();
1347 fixture.setup_manifest_files().await;
1348
1349 let table_scan = fixture
1351 .table
1352 .scan()
1353 .select(["x", "z"])
1354 .with_row_selection_enabled(true)
1355 .build()
1356 .unwrap();
1357
1358 let batch_stream = table_scan.to_arrow().await.unwrap();
1359
1360 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1361
1362 assert_eq!(batches[0].num_columns(), 2);
1363
1364 let col1 = batches[0].column_by_name("x").unwrap();
1365 let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1366 assert_eq!(int64_arr.value(0), 1);
1367
1368 let col2 = batches[0].column_by_name("z").unwrap();
1369 let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1370 assert_eq!(int64_arr.value(0), 3);
1371
1372 let table_scan = fixture.table.scan().select_empty().build().unwrap();
1374 let batch_stream = table_scan.to_arrow().await.unwrap();
1375 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1376
1377 assert_eq!(batches[0].num_columns(), 0);
1378 assert_eq!(batches[0].num_rows(), 1024);
1379 }
1380
1381 #[tokio::test]
1382 async fn test_filter_on_arrow_lt() {
1383 let mut fixture = TableTestFixture::new();
1384 fixture.setup_manifest_files().await;
1385
1386 let mut builder = fixture.table.scan();
1388 let predicate = Reference::new("y").less_than(Datum::long(3));
1389 builder = builder
1390 .with_filter(predicate)
1391 .with_row_selection_enabled(true);
1392 let table_scan = builder.build().unwrap();
1393
1394 let batch_stream = table_scan.to_arrow().await.unwrap();
1395
1396 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1397
1398 assert_eq!(batches[0].num_rows(), 512);
1399
1400 let col = batches[0].column_by_name("x").unwrap();
1401 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1402 assert_eq!(int64_arr.value(0), 1);
1403
1404 let col = batches[0].column_by_name("y").unwrap();
1405 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1406 assert_eq!(int64_arr.value(0), 2);
1407 }
1408
1409 #[tokio::test]
1410 async fn test_filter_on_arrow_gt_eq() {
1411 let mut fixture = TableTestFixture::new();
1412 fixture.setup_manifest_files().await;
1413
1414 let mut builder = fixture.table.scan();
1416 let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1417 builder = builder
1418 .with_filter(predicate)
1419 .with_row_selection_enabled(true);
1420 let table_scan = builder.build().unwrap();
1421
1422 let batch_stream = table_scan.to_arrow().await.unwrap();
1423
1424 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1425
1426 assert_eq!(batches[0].num_rows(), 12);
1427
1428 let col = batches[0].column_by_name("x").unwrap();
1429 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1430 assert_eq!(int64_arr.value(0), 1);
1431
1432 let col = batches[0].column_by_name("y").unwrap();
1433 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1434 assert_eq!(int64_arr.value(0), 5);
1435 }
1436
1437 #[tokio::test]
1438 async fn test_filter_double_eq() {
1439 let mut fixture = TableTestFixture::new();
1440 fixture.setup_manifest_files().await;
1441
1442 let mut builder = fixture.table.scan();
1444 let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1445 builder = builder
1446 .with_filter(predicate)
1447 .with_row_selection_enabled(true);
1448 let table_scan = builder.build().unwrap();
1449
1450 let batch_stream = table_scan.to_arrow().await.unwrap();
1451
1452 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1453
1454 assert_eq!(batches.len(), 2);
1455 assert_eq!(batches[0].num_rows(), 12);
1456
1457 let col = batches[0].column_by_name("dbl").unwrap();
1458 let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1459 assert_eq!(f64_arr.value(1), 150.0f64);
1460 }
1461
1462 #[tokio::test]
1463 async fn test_filter_int_eq() {
1464 let mut fixture = TableTestFixture::new();
1465 fixture.setup_manifest_files().await;
1466
1467 let mut builder = fixture.table.scan();
1469 let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1470 builder = builder
1471 .with_filter(predicate)
1472 .with_row_selection_enabled(true);
1473 let table_scan = builder.build().unwrap();
1474
1475 let batch_stream = table_scan.to_arrow().await.unwrap();
1476
1477 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1478
1479 assert_eq!(batches.len(), 2);
1480 assert_eq!(batches[0].num_rows(), 12);
1481
1482 let col = batches[0].column_by_name("i32").unwrap();
1483 let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1484 assert_eq!(i32_arr.value(1), 150i32);
1485 }
1486
1487 #[tokio::test]
1488 async fn test_filter_long_eq() {
1489 let mut fixture = TableTestFixture::new();
1490 fixture.setup_manifest_files().await;
1491
1492 let mut builder = fixture.table.scan();
1494 let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1495 builder = builder
1496 .with_filter(predicate)
1497 .with_row_selection_enabled(true);
1498 let table_scan = builder.build().unwrap();
1499
1500 let batch_stream = table_scan.to_arrow().await.unwrap();
1501
1502 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1503
1504 assert_eq!(batches.len(), 2);
1505 assert_eq!(batches[0].num_rows(), 12);
1506
1507 let col = batches[0].column_by_name("i64").unwrap();
1508 let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1509 assert_eq!(i64_arr.value(1), 150i64);
1510 }
1511
1512 #[tokio::test]
1513 async fn test_filter_bool_eq() {
1514 let mut fixture = TableTestFixture::new();
1515 fixture.setup_manifest_files().await;
1516
1517 let mut builder = fixture.table.scan();
1519 let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1520 builder = builder
1521 .with_filter(predicate)
1522 .with_row_selection_enabled(true);
1523 let table_scan = builder.build().unwrap();
1524
1525 let batch_stream = table_scan.to_arrow().await.unwrap();
1526
1527 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1528
1529 assert_eq!(batches.len(), 2);
1530 assert_eq!(batches[0].num_rows(), 512);
1531
1532 let col = batches[0].column_by_name("bool").unwrap();
1533 let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1534 assert!(bool_arr.value(1));
1535 }
1536
1537 #[tokio::test]
1538 async fn test_filter_on_arrow_is_null() {
1539 let mut fixture = TableTestFixture::new();
1540 fixture.setup_manifest_files().await;
1541
1542 let mut builder = fixture.table.scan();
1544 let predicate = Reference::new("y").is_null();
1545 builder = builder
1546 .with_filter(predicate)
1547 .with_row_selection_enabled(true);
1548 let table_scan = builder.build().unwrap();
1549
1550 let batch_stream = table_scan.to_arrow().await.unwrap();
1551
1552 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1553 assert_eq!(batches.len(), 0);
1554 }
1555
1556 #[tokio::test]
1557 async fn test_filter_on_arrow_is_not_null() {
1558 let mut fixture = TableTestFixture::new();
1559 fixture.setup_manifest_files().await;
1560
1561 let mut builder = fixture.table.scan();
1563 let predicate = Reference::new("y").is_not_null();
1564 builder = builder
1565 .with_filter(predicate)
1566 .with_row_selection_enabled(true);
1567 let table_scan = builder.build().unwrap();
1568
1569 let batch_stream = table_scan.to_arrow().await.unwrap();
1570
1571 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1572 assert_eq!(batches[0].num_rows(), 1024);
1573 }
1574
1575 #[tokio::test]
1576 async fn test_filter_on_arrow_lt_and_gt() {
1577 let mut fixture = TableTestFixture::new();
1578 fixture.setup_manifest_files().await;
1579
1580 let mut builder = fixture.table.scan();
1582 let predicate = Reference::new("y")
1583 .less_than(Datum::long(5))
1584 .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1585 builder = builder
1586 .with_filter(predicate)
1587 .with_row_selection_enabled(true);
1588 let table_scan = builder.build().unwrap();
1589
1590 let batch_stream = table_scan.to_arrow().await.unwrap();
1591
1592 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1593 assert_eq!(batches[0].num_rows(), 500);
1594
1595 let col = batches[0].column_by_name("x").unwrap();
1596 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1597 assert_eq!(col, &expected_x);
1598
1599 let col = batches[0].column_by_name("y").unwrap();
1600 let mut values = vec![];
1601 values.append(vec![3; 200].as_mut());
1602 values.append(vec![4; 300].as_mut());
1603 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1604 assert_eq!(col, &expected_y);
1605
1606 let col = batches[0].column_by_name("z").unwrap();
1607 let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1608 assert_eq!(col, &expected_z);
1609 }
1610
1611 #[tokio::test]
1612 async fn test_filter_on_arrow_lt_or_gt() {
1613 let mut fixture = TableTestFixture::new();
1614 fixture.setup_manifest_files().await;
1615
1616 let mut builder = fixture.table.scan();
1618 let predicate = Reference::new("y")
1619 .less_than(Datum::long(5))
1620 .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1621 builder = builder
1622 .with_filter(predicate)
1623 .with_row_selection_enabled(true);
1624 let table_scan = builder.build().unwrap();
1625
1626 let batch_stream = table_scan.to_arrow().await.unwrap();
1627
1628 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1629 assert_eq!(batches[0].num_rows(), 1024);
1630
1631 let col = batches[0].column_by_name("x").unwrap();
1632 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1633 assert_eq!(col, &expected_x);
1634
1635 let col = batches[0].column_by_name("y").unwrap();
1636 let mut values = vec![2; 512];
1637 values.append(vec![3; 200].as_mut());
1638 values.append(vec![4; 300].as_mut());
1639 values.append(vec![5; 12].as_mut());
1640 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1641 assert_eq!(col, &expected_y);
1642
1643 let col = batches[0].column_by_name("z").unwrap();
1644 let mut values = vec![3; 512];
1645 values.append(vec![4; 512].as_mut());
1646 let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1647 assert_eq!(col, &expected_z);
1648 }
1649
1650 #[tokio::test]
1651 async fn test_filter_on_arrow_startswith() {
1652 let mut fixture = TableTestFixture::new();
1653 fixture.setup_manifest_files().await;
1654
1655 let mut builder = fixture.table.scan();
1657 let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1658 builder = builder
1659 .with_filter(predicate)
1660 .with_row_selection_enabled(true);
1661 let table_scan = builder.build().unwrap();
1662
1663 let batch_stream = table_scan.to_arrow().await.unwrap();
1664
1665 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1666
1667 assert_eq!(batches[0].num_rows(), 512);
1668
1669 let col = batches[0].column_by_name("a").unwrap();
1670 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1671 assert_eq!(string_arr.value(0), "Iceberg");
1672 }
1673
1674 #[tokio::test]
1675 async fn test_filter_on_arrow_not_startswith() {
1676 let mut fixture = TableTestFixture::new();
1677 fixture.setup_manifest_files().await;
1678
1679 let mut builder = fixture.table.scan();
1681 let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1682 builder = builder
1683 .with_filter(predicate)
1684 .with_row_selection_enabled(true);
1685 let table_scan = builder.build().unwrap();
1686
1687 let batch_stream = table_scan.to_arrow().await.unwrap();
1688
1689 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1690
1691 assert_eq!(batches[0].num_rows(), 512);
1692
1693 let col = batches[0].column_by_name("a").unwrap();
1694 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1695 assert_eq!(string_arr.value(0), "Apache");
1696 }
1697
1698 #[tokio::test]
1699 async fn test_filter_on_arrow_in() {
1700 let mut fixture = TableTestFixture::new();
1701 fixture.setup_manifest_files().await;
1702
1703 let mut builder = fixture.table.scan();
1705 let predicate =
1706 Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1707 builder = builder
1708 .with_filter(predicate)
1709 .with_row_selection_enabled(true);
1710 let table_scan = builder.build().unwrap();
1711
1712 let batch_stream = table_scan.to_arrow().await.unwrap();
1713
1714 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1715
1716 assert_eq!(batches[0].num_rows(), 512);
1717
1718 let col = batches[0].column_by_name("a").unwrap();
1719 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1720 assert_eq!(string_arr.value(0), "Iceberg");
1721 }
1722
1723 #[tokio::test]
1724 async fn test_filter_on_arrow_not_in() {
1725 let mut fixture = TableTestFixture::new();
1726 fixture.setup_manifest_files().await;
1727
1728 let mut builder = fixture.table.scan();
1730 let predicate =
1731 Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1732 builder = builder
1733 .with_filter(predicate)
1734 .with_row_selection_enabled(true);
1735 let table_scan = builder.build().unwrap();
1736
1737 let batch_stream = table_scan.to_arrow().await.unwrap();
1738
1739 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1740
1741 assert_eq!(batches[0].num_rows(), 512);
1742
1743 let col = batches[0].column_by_name("a").unwrap();
1744 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1745 assert_eq!(string_arr.value(0), "Apache");
1746 }
1747
1748 #[test]
1749 fn test_file_scan_task_serialize_deserialize() {
1750 let test_fn = |task: FileScanTask| {
1751 let serialized = serde_json::to_string(&task).unwrap();
1752 let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1753
1754 assert_eq!(task.data_file_path, deserialized.data_file_path);
1755 assert_eq!(task.start, deserialized.start);
1756 assert_eq!(task.length, deserialized.length);
1757 assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1758 assert_eq!(task.predicate, deserialized.predicate);
1759 assert_eq!(task.schema, deserialized.schema);
1760 };
1761
1762 let schema = Arc::new(
1764 Schema::builder()
1765 .with_fields(vec![Arc::new(NestedField::required(
1766 1,
1767 "x",
1768 Type::Primitive(PrimitiveType::Binary),
1769 ))])
1770 .build()
1771 .unwrap(),
1772 );
1773 let task = FileScanTask {
1774 data_file_path: "data_file_path".to_string(),
1775 start: 0,
1776 length: 100,
1777 project_field_ids: vec![1, 2, 3],
1778 predicate: None,
1779 schema: schema.clone(),
1780 record_count: Some(100),
1781 data_file_format: DataFileFormat::Parquet,
1782 deletes: vec![],
1783 };
1784 test_fn(task);
1785
1786 let task = FileScanTask {
1788 data_file_path: "data_file_path".to_string(),
1789 start: 0,
1790 length: 100,
1791 project_field_ids: vec![1, 2, 3],
1792 predicate: Some(BoundPredicate::AlwaysTrue),
1793 schema,
1794 record_count: None,
1795 data_file_format: DataFileFormat::Avro,
1796 deletes: vec![],
1797 };
1798 test_fn(task);
1799 }
1800}