1use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::hash::Hash;
25use std::sync::Arc;
26
27use _serde::TableMetadataEnum;
28use chrono::{DateTime, Utc};
29use serde::{Deserialize, Serialize};
30use serde_repr::{Deserialize_repr, Serialize_repr};
31use uuid::Uuid;
32
33use super::snapshot::SnapshotReference;
34pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
35use super::{
36 DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
37 SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
38};
39use crate::error::{Result, timestamp_ms_to_utc};
40use crate::io::FileIO;
41use crate::{Error, ErrorKind};
42
43static MAIN_BRANCH: &str = "main";
44pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
45
46pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
47pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
48
49pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
59pub const PROPERTY_UUID: &str = "uuid";
61pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
63pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary";
65pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
67pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = "current-snapshot-timestamp-ms";
69pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
71pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
73pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
75
76pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = "write.metadata.previous-versions-max";
78pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
80
81pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
83pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
85
86pub const RESERVED_PROPERTIES: [&str; 9] = [
91 PROPERTY_FORMAT_VERSION,
92 PROPERTY_UUID,
93 PROPERTY_SNAPSHOT_COUNT,
94 PROPERTY_CURRENT_SNAPSHOT_ID,
95 PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
96 PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
97 PROPERTY_CURRENT_SCHEMA,
98 PROPERTY_DEFAULT_PARTITION_SPEC,
99 PROPERTY_DEFAULT_SORT_ORDER,
100];
101
102pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
104pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
106
107pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
109pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
111
112pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
114pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms";
119pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
124pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
126pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
128
129pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
131pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; pub type TableMetadataRef = Arc<TableMetadata>;
136
137#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
138#[serde(try_from = "TableMetadataEnum")]
139pub struct TableMetadata {
144 pub(crate) format_version: FormatVersion,
146 pub(crate) table_uuid: Uuid,
148 pub(crate) location: String,
150 pub(crate) last_sequence_number: i64,
152 pub(crate) last_updated_ms: i64,
154 pub(crate) last_column_id: i32,
156 pub(crate) schemas: HashMap<i32, SchemaRef>,
158 pub(crate) current_schema_id: i32,
160 pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
162 pub(crate) default_spec: PartitionSpecRef,
164 pub(crate) default_partition_type: StructType,
166 pub(crate) last_partition_id: i32,
168 pub(crate) properties: HashMap<String, String>,
172 pub(crate) current_snapshot_id: Option<i64>,
175 pub(crate) snapshots: HashMap<i64, SnapshotRef>,
180 pub(crate) snapshot_log: Vec<SnapshotLog>,
187
188 pub(crate) metadata_log: Vec<MetadataLog>,
195
196 pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
198 pub(crate) default_sort_order_id: i64,
202 pub(crate) refs: HashMap<String, SnapshotReference>,
207 pub(crate) statistics: HashMap<i64, StatisticsFile>,
209 pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
211 pub(crate) encryption_keys: HashMap<String, String>,
213}
214
215impl TableMetadata {
216 #[must_use]
223 pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
224 TableMetadataBuilder::new_from_metadata(self, current_file_location)
225 }
226
227 #[inline]
229 pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
230 self.partition_specs
231 .values()
232 .any(|spec| spec.fields().iter().any(|pf| pf.name == name))
233 }
234
235 #[inline]
237 pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
238 self.schemas
239 .values()
240 .any(|schema| schema.field_by_name(name).is_some())
241 }
242
243 #[inline]
245 pub fn format_version(&self) -> FormatVersion {
246 self.format_version
247 }
248
249 #[inline]
251 pub fn uuid(&self) -> Uuid {
252 self.table_uuid
253 }
254
255 #[inline]
257 pub fn location(&self) -> &str {
258 self.location.as_str()
259 }
260
261 #[inline]
263 pub fn last_sequence_number(&self) -> i64 {
264 self.last_sequence_number
265 }
266
267 #[inline]
272 pub fn next_sequence_number(&self) -> i64 {
273 match self.format_version {
274 FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
275 _ => self.last_sequence_number + 1,
276 }
277 }
278
279 #[inline]
281 pub fn last_column_id(&self) -> i32 {
282 self.last_column_id
283 }
284
285 #[inline]
287 pub fn last_partition_id(&self) -> i32 {
288 self.last_partition_id
289 }
290
291 #[inline]
293 pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
294 timestamp_ms_to_utc(self.last_updated_ms)
295 }
296
297 #[inline]
299 pub fn last_updated_ms(&self) -> i64 {
300 self.last_updated_ms
301 }
302
303 #[inline]
305 pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
306 self.schemas.values()
307 }
308
309 #[inline]
311 pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
312 self.schemas.get(&schema_id)
313 }
314
315 #[inline]
317 pub fn current_schema(&self) -> &SchemaRef {
318 self.schema_by_id(self.current_schema_id)
319 .expect("Current schema id set, but not found in table metadata")
320 }
321
322 #[inline]
324 pub fn current_schema_id(&self) -> SchemaId {
325 self.current_schema_id
326 }
327
328 #[inline]
330 pub fn partition_specs_iter(&self) -> impl ExactSizeIterator<Item = &PartitionSpecRef> {
331 self.partition_specs.values()
332 }
333
334 #[inline]
336 pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> {
337 self.partition_specs.get(&spec_id)
338 }
339
340 #[inline]
342 pub fn default_partition_spec(&self) -> &PartitionSpecRef {
343 &self.default_spec
344 }
345
346 #[inline]
348 pub fn default_partition_type(&self) -> &StructType {
349 &self.default_partition_type
350 }
351
352 #[inline]
353 pub fn default_partition_spec_id(&self) -> i32 {
355 self.default_spec.spec_id()
356 }
357
358 #[inline]
360 pub fn snapshots(&self) -> impl ExactSizeIterator<Item = &SnapshotRef> {
361 self.snapshots.values()
362 }
363
364 #[inline]
366 pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
367 self.snapshots.get(&snapshot_id)
368 }
369
370 #[inline]
372 pub fn history(&self) -> &[SnapshotLog] {
373 &self.snapshot_log
374 }
375
376 #[inline]
378 pub fn metadata_log(&self) -> &[MetadataLog] {
379 &self.metadata_log
380 }
381
382 #[inline]
384 pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
385 self.current_snapshot_id.map(|s| {
386 self.snapshot_by_id(s)
387 .expect("Current snapshot id has been set, but doesn't exist in metadata")
388 })
389 }
390
391 #[inline]
393 pub fn current_snapshot_id(&self) -> Option<i64> {
394 self.current_snapshot_id
395 }
396
397 #[inline]
400 pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> {
401 self.refs.get(ref_name).map(|r| {
402 self.snapshot_by_id(r.snapshot_id)
403 .unwrap_or_else(|| panic!("Snapshot id of ref {} doesn't exist", ref_name))
404 })
405 }
406
407 #[inline]
409 pub fn sort_orders_iter(&self) -> impl ExactSizeIterator<Item = &SortOrderRef> {
410 self.sort_orders.values()
411 }
412
413 #[inline]
415 pub fn sort_order_by_id(&self, sort_order_id: i64) -> Option<&SortOrderRef> {
416 self.sort_orders.get(&sort_order_id)
417 }
418
419 #[inline]
421 pub fn default_sort_order(&self) -> &SortOrderRef {
422 self.sort_orders
423 .get(&self.default_sort_order_id)
424 .expect("Default order id has been set, but not found in table metadata!")
425 }
426
427 #[inline]
429 pub fn default_sort_order_id(&self) -> i64 {
430 self.default_sort_order_id
431 }
432
433 #[inline]
435 pub fn properties(&self) -> &HashMap<String, String> {
436 &self.properties
437 }
438
439 #[inline]
441 pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
442 self.statistics.values()
443 }
444
445 #[inline]
447 pub fn partition_statistics_iter(
448 &self,
449 ) -> impl ExactSizeIterator<Item = &PartitionStatisticsFile> {
450 self.partition_statistics.values()
451 }
452
453 #[inline]
455 pub fn statistics_for_snapshot(&self, snapshot_id: i64) -> Option<&StatisticsFile> {
456 self.statistics.get(&snapshot_id)
457 }
458
459 #[inline]
461 pub fn partition_statistics_for_snapshot(
462 &self,
463 snapshot_id: i64,
464 ) -> Option<&PartitionStatisticsFile> {
465 self.partition_statistics.get(&snapshot_id)
466 }
467
468 fn construct_refs(&mut self) {
469 if let Some(current_snapshot_id) = self.current_snapshot_id {
470 if !self.refs.contains_key(MAIN_BRANCH) {
471 self.refs
472 .insert(MAIN_BRANCH.to_string(), SnapshotReference {
473 snapshot_id: current_snapshot_id,
474 retention: SnapshotRetention::Branch {
475 min_snapshots_to_keep: None,
476 max_snapshot_age_ms: None,
477 max_ref_age_ms: None,
478 },
479 });
480 }
481 }
482 }
483
484 #[inline]
486 pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = (&String, &String)> {
487 self.encryption_keys.iter()
488 }
489
490 #[inline]
492 pub fn encryption_key(&self, key_id: &str) -> Option<&String> {
493 self.encryption_keys.get(key_id)
494 }
495
496 pub async fn read_from(
498 file_io: &FileIO,
499 metadata_location: impl AsRef<str>,
500 ) -> Result<TableMetadata> {
501 let input_file = file_io.new_input(metadata_location)?;
502 let metadata_content = input_file.read().await?;
503 let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
504 Ok(metadata)
505 }
506
507 pub async fn write_to(
509 &self,
510 file_io: &FileIO,
511 metadata_location: impl AsRef<str>,
512 ) -> Result<()> {
513 file_io
514 .new_output(metadata_location)?
515 .write(serde_json::to_vec(self)?.into())
516 .await
517 }
518
519 pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
527 self.validate_current_schema()?;
528 self.normalize_current_snapshot()?;
529 self.construct_refs();
530 self.validate_refs()?;
531 self.validate_chronological_snapshot_logs()?;
532 self.validate_chronological_metadata_logs()?;
533 self.location = self.location.trim_end_matches('/').to_string();
535 self.validate_snapshot_sequence_number()?;
536 self.try_normalize_partition_spec()?;
537 self.try_normalize_sort_order()?;
538 Ok(self)
539 }
540
541 fn try_normalize_partition_spec(&mut self) -> Result<()> {
543 if self
544 .partition_spec_by_id(self.default_spec.spec_id())
545 .is_none()
546 {
547 self.partition_specs.insert(
548 self.default_spec.spec_id(),
549 Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())),
550 );
551 }
552
553 Ok(())
554 }
555
556 fn try_normalize_sort_order(&mut self) -> Result<()> {
558 if self.sort_order_by_id(self.default_sort_order_id).is_some() {
559 return Ok(());
560 }
561
562 if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
563 return Err(Error::new(
564 ErrorKind::DataInvalid,
565 format!(
566 "No sort order exists with the default sort order id {}.",
567 self.default_sort_order_id
568 ),
569 ));
570 }
571
572 let sort_order = SortOrder::unsorted_order();
573 self.sort_orders
574 .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
575 Ok(())
576 }
577
578 fn validate_current_schema(&self) -> Result<()> {
580 if self.schema_by_id(self.current_schema_id).is_none() {
581 return Err(Error::new(
582 ErrorKind::DataInvalid,
583 format!(
584 "No schema exists with the current schema id {}.",
585 self.current_schema_id
586 ),
587 ));
588 }
589 Ok(())
590 }
591
592 fn normalize_current_snapshot(&mut self) -> Result<()> {
594 if let Some(current_snapshot_id) = self.current_snapshot_id {
595 if current_snapshot_id == EMPTY_SNAPSHOT_ID {
596 self.current_snapshot_id = None;
597 } else if self.snapshot_by_id(current_snapshot_id).is_none() {
598 return Err(Error::new(
599 ErrorKind::DataInvalid,
600 format!(
601 "Snapshot for current snapshot id {} does not exist in the existing snapshots list",
602 current_snapshot_id
603 ),
604 ));
605 }
606 }
607 Ok(())
608 }
609
610 fn validate_refs(&self) -> Result<()> {
612 for (name, snapshot_ref) in self.refs.iter() {
613 if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
614 return Err(Error::new(
615 ErrorKind::DataInvalid,
616 format!(
617 "Snapshot for reference {name} does not exist in the existing snapshots list"
618 ),
619 ));
620 }
621 }
622
623 let main_ref = self.refs.get(MAIN_BRANCH);
624 if self.current_snapshot_id.is_some() {
625 if let Some(main_ref) = main_ref {
626 if main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default() {
627 return Err(Error::new(
628 ErrorKind::DataInvalid,
629 format!(
630 "Current snapshot id does not match main branch ({:?} != {:?})",
631 self.current_snapshot_id.unwrap_or_default(),
632 main_ref.snapshot_id
633 ),
634 ));
635 }
636 }
637 } else if main_ref.is_some() {
638 return Err(Error::new(
639 ErrorKind::DataInvalid,
640 "Current snapshot is not set, but main branch exists",
641 ));
642 }
643
644 Ok(())
645 }
646
647 fn validate_snapshot_sequence_number(&self) -> Result<()> {
649 if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
650 return Err(Error::new(
651 ErrorKind::DataInvalid,
652 format!(
653 "Last sequence number must be 0 in v1. Found {}",
654 self.last_sequence_number
655 ),
656 ));
657 }
658
659 if self.format_version >= FormatVersion::V2 {
660 if let Some(snapshot) = self
661 .snapshots
662 .values()
663 .find(|snapshot| snapshot.sequence_number() > self.last_sequence_number)
664 {
665 return Err(Error::new(
666 ErrorKind::DataInvalid,
667 format!(
668 "Invalid snapshot with id {} and sequence number {} greater than last sequence number {}",
669 snapshot.snapshot_id(),
670 snapshot.sequence_number(),
671 self.last_sequence_number
672 ),
673 ));
674 }
675 }
676
677 Ok(())
678 }
679
680 fn validate_chronological_snapshot_logs(&self) -> Result<()> {
682 for window in self.snapshot_log.windows(2) {
683 let (prev, curr) = (&window[0], &window[1]);
684 if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
687 return Err(Error::new(
688 ErrorKind::DataInvalid,
689 "Expected sorted snapshot log entries",
690 ));
691 }
692 }
693
694 if let Some(last) = self.snapshot_log.last() {
695 if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
698 return Err(Error::new(
699 ErrorKind::DataInvalid,
700 format!(
701 "Invalid update timestamp {}: before last snapshot log entry at {}",
702 self.last_updated_ms, last.timestamp_ms
703 ),
704 ));
705 }
706 }
707 Ok(())
708 }
709
710 fn validate_chronological_metadata_logs(&self) -> Result<()> {
711 for window in self.metadata_log.windows(2) {
712 let (prev, curr) = (&window[0], &window[1]);
713 if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
716 return Err(Error::new(
717 ErrorKind::DataInvalid,
718 "Expected sorted metadata log entries",
719 ));
720 }
721 }
722
723 if let Some(last) = self.metadata_log.last() {
724 if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
727 return Err(Error::new(
728 ErrorKind::DataInvalid,
729 format!(
730 "Invalid update timestamp {}: before last metadata log entry at {}",
731 self.last_updated_ms, last.timestamp_ms
732 ),
733 ));
734 }
735 }
736
737 Ok(())
738 }
739}
740
741pub(super) mod _serde {
742 use std::borrow::BorrowMut;
743 use std::collections::HashMap;
748 use std::sync::Arc;
753
754 use serde::{Deserialize, Serialize};
755 use uuid::Uuid;
756
757 use super::{
758 DEFAULT_PARTITION_SPEC_ID, FormatVersion, MAIN_BRANCH, MetadataLog, SnapshotLog,
759 TableMetadata,
760 };
761 use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
762 use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2};
763 use crate::spec::{
764 PartitionField, PartitionSpec, PartitionSpecRef, PartitionStatisticsFile, Schema,
765 SchemaRef, Snapshot, SnapshotReference, SnapshotRetention, SortOrder, StatisticsFile,
766 };
767 use crate::{Error, ErrorKind};
768
769 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
770 #[serde(untagged)]
771 pub(super) enum TableMetadataEnum {
772 V2(TableMetadataV2),
773 V1(TableMetadataV1),
774 }
775
776 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
777 #[serde(rename_all = "kebab-case")]
778 pub(super) struct TableMetadataV2 {
780 pub format_version: VersionNumber<2>,
781 pub table_uuid: Uuid,
782 pub location: String,
783 pub last_sequence_number: i64,
784 pub last_updated_ms: i64,
785 pub last_column_id: i32,
786 pub schemas: Vec<SchemaV2>,
787 pub current_schema_id: i32,
788 pub partition_specs: Vec<PartitionSpec>,
789 pub default_spec_id: i32,
790 pub last_partition_id: i32,
791 #[serde(skip_serializing_if = "Option::is_none")]
792 pub properties: Option<HashMap<String, String>>,
793 #[serde(skip_serializing_if = "Option::is_none")]
794 pub current_snapshot_id: Option<i64>,
795 #[serde(skip_serializing_if = "Option::is_none")]
796 pub snapshots: Option<Vec<SnapshotV2>>,
797 #[serde(skip_serializing_if = "Option::is_none")]
798 pub snapshot_log: Option<Vec<SnapshotLog>>,
799 #[serde(skip_serializing_if = "Option::is_none")]
800 pub metadata_log: Option<Vec<MetadataLog>>,
801 pub sort_orders: Vec<SortOrder>,
802 pub default_sort_order_id: i64,
803 #[serde(skip_serializing_if = "Option::is_none")]
804 pub refs: Option<HashMap<String, SnapshotReference>>,
805 #[serde(default, skip_serializing_if = "Vec::is_empty")]
806 pub statistics: Vec<StatisticsFile>,
807 #[serde(default, skip_serializing_if = "Vec::is_empty")]
808 pub partition_statistics: Vec<PartitionStatisticsFile>,
809 }
810
811 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
812 #[serde(rename_all = "kebab-case")]
813 pub(super) struct TableMetadataV1 {
815 pub format_version: VersionNumber<1>,
816 #[serde(skip_serializing_if = "Option::is_none")]
817 pub table_uuid: Option<Uuid>,
818 pub location: String,
819 pub last_updated_ms: i64,
820 pub last_column_id: i32,
821 pub schema: Option<SchemaV1>,
823 #[serde(skip_serializing_if = "Option::is_none")]
824 pub schemas: Option<Vec<SchemaV1>>,
825 #[serde(skip_serializing_if = "Option::is_none")]
826 pub current_schema_id: Option<i32>,
827 pub partition_spec: Option<Vec<PartitionField>>,
829 #[serde(skip_serializing_if = "Option::is_none")]
830 pub partition_specs: Option<Vec<PartitionSpec>>,
831 #[serde(skip_serializing_if = "Option::is_none")]
832 pub default_spec_id: Option<i32>,
833 #[serde(skip_serializing_if = "Option::is_none")]
834 pub last_partition_id: Option<i32>,
835 #[serde(skip_serializing_if = "Option::is_none")]
836 pub properties: Option<HashMap<String, String>>,
837 #[serde(skip_serializing_if = "Option::is_none")]
838 pub current_snapshot_id: Option<i64>,
839 #[serde(skip_serializing_if = "Option::is_none")]
840 pub snapshots: Option<Vec<SnapshotV1>>,
841 #[serde(skip_serializing_if = "Option::is_none")]
842 pub snapshot_log: Option<Vec<SnapshotLog>>,
843 #[serde(skip_serializing_if = "Option::is_none")]
844 pub metadata_log: Option<Vec<MetadataLog>>,
845 pub sort_orders: Option<Vec<SortOrder>>,
846 pub default_sort_order_id: Option<i64>,
847 #[serde(default, skip_serializing_if = "Vec::is_empty")]
848 pub statistics: Vec<StatisticsFile>,
849 #[serde(default, skip_serializing_if = "Vec::is_empty")]
850 pub partition_statistics: Vec<PartitionStatisticsFile>,
851 }
852
853 #[derive(Debug, PartialEq, Eq)]
855 pub(crate) struct VersionNumber<const V: u8>;
856
857 impl Serialize for TableMetadata {
858 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
859 where S: serde::Serializer {
860 let table_metadata_enum: TableMetadataEnum =
862 self.clone().try_into().map_err(serde::ser::Error::custom)?;
863
864 table_metadata_enum.serialize(serializer)
865 }
866 }
867
868 impl<const V: u8> Serialize for VersionNumber<V> {
869 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
870 where S: serde::Serializer {
871 serializer.serialize_u8(V)
872 }
873 }
874
875 impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
876 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
877 where D: serde::Deserializer<'de> {
878 let value = u8::deserialize(deserializer)?;
879 if value == V {
880 Ok(VersionNumber::<V>)
881 } else {
882 Err(serde::de::Error::custom("Invalid Version"))
883 }
884 }
885 }
886
887 impl TryFrom<TableMetadataEnum> for TableMetadata {
888 type Error = Error;
889 fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
890 match value {
891 TableMetadataEnum::V2(value) => value.try_into(),
892 TableMetadataEnum::V1(value) => value.try_into(),
893 }
894 }
895 }
896
897 impl TryFrom<TableMetadata> for TableMetadataEnum {
898 type Error = Error;
899 fn try_from(value: TableMetadata) -> Result<Self, Error> {
900 Ok(match value.format_version {
901 FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
902 FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
903 })
904 }
905 }
906
907 impl TryFrom<TableMetadataV2> for TableMetadata {
908 type Error = Error;
909 fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
910 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
911 None
912 } else {
913 value.current_snapshot_id
914 };
915 let schemas = HashMap::from_iter(
916 value
917 .schemas
918 .into_iter()
919 .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
920 .collect::<Result<Vec<_>, Error>>()?,
921 );
922
923 let current_schema: &SchemaRef =
924 schemas.get(&value.current_schema_id).ok_or_else(|| {
925 Error::new(
926 ErrorKind::DataInvalid,
927 format!(
928 "No schema exists with the current schema id {}.",
929 value.current_schema_id
930 ),
931 )
932 })?;
933 let partition_specs = HashMap::from_iter(
934 value
935 .partition_specs
936 .into_iter()
937 .map(|x| (x.spec_id(), Arc::new(x))),
938 );
939 let default_spec_id = value.default_spec_id;
940 let default_spec: PartitionSpecRef = partition_specs
941 .get(&value.default_spec_id)
942 .map(|spec| (**spec).clone())
943 .or_else(|| {
944 (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
945 .then(PartitionSpec::unpartition_spec)
946 })
947 .ok_or_else(|| {
948 Error::new(
949 ErrorKind::DataInvalid,
950 format!("Default partition spec {default_spec_id} not found"),
951 )
952 })?
953 .into();
954 let default_partition_type = default_spec.partition_type(current_schema)?;
955
956 let mut metadata = TableMetadata {
957 format_version: FormatVersion::V2,
958 table_uuid: value.table_uuid,
959 location: value.location,
960 last_sequence_number: value.last_sequence_number,
961 last_updated_ms: value.last_updated_ms,
962 last_column_id: value.last_column_id,
963 current_schema_id: value.current_schema_id,
964 schemas,
965 partition_specs,
966 default_partition_type,
967 default_spec,
968 last_partition_id: value.last_partition_id,
969 properties: value.properties.unwrap_or_default(),
970 current_snapshot_id,
971 snapshots: value
972 .snapshots
973 .map(|snapshots| {
974 HashMap::from_iter(
975 snapshots
976 .into_iter()
977 .map(|x| (x.snapshot_id, Arc::new(x.into()))),
978 )
979 })
980 .unwrap_or_default(),
981 snapshot_log: value.snapshot_log.unwrap_or_default(),
982 metadata_log: value.metadata_log.unwrap_or_default(),
983 sort_orders: HashMap::from_iter(
984 value
985 .sort_orders
986 .into_iter()
987 .map(|x| (x.order_id, Arc::new(x))),
988 ),
989 default_sort_order_id: value.default_sort_order_id,
990 refs: value.refs.unwrap_or_else(|| {
991 if let Some(snapshot_id) = current_snapshot_id {
992 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
993 snapshot_id,
994 retention: SnapshotRetention::Branch {
995 min_snapshots_to_keep: None,
996 max_snapshot_age_ms: None,
997 max_ref_age_ms: None,
998 },
999 })])
1000 } else {
1001 HashMap::new()
1002 }
1003 }),
1004 statistics: index_statistics(value.statistics),
1005 partition_statistics: index_partition_statistics(value.partition_statistics),
1006 encryption_keys: HashMap::new(),
1007 };
1008
1009 metadata.borrow_mut().try_normalize()?;
1010 Ok(metadata)
1011 }
1012 }
1013
1014 impl TryFrom<TableMetadataV1> for TableMetadata {
1015 type Error = Error;
1016 fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
1017 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1018 None
1019 } else {
1020 value.current_snapshot_id
1021 };
1022
1023 let (schemas, current_schema_id, current_schema) =
1024 if let (Some(schemas_vec), Some(schema_id)) =
1025 (&value.schemas, value.current_schema_id)
1026 {
1027 let schema_map = HashMap::from_iter(
1029 schemas_vec
1030 .clone()
1031 .into_iter()
1032 .map(|schema| {
1033 let schema: Schema = schema.try_into()?;
1034 Ok((schema.schema_id(), Arc::new(schema)))
1035 })
1036 .collect::<Result<Vec<_>, Error>>()?,
1037 );
1038
1039 let schema = schema_map
1040 .get(&schema_id)
1041 .ok_or_else(|| {
1042 Error::new(
1043 ErrorKind::DataInvalid,
1044 format!(
1045 "No schema exists with the current schema id {}.",
1046 schema_id
1047 ),
1048 )
1049 })?
1050 .clone();
1051 (schema_map, schema_id, schema)
1052 } else if let Some(schema) = value.schema {
1053 let schema: Schema = schema.try_into()?;
1055 let schema_id = schema.schema_id();
1056 let schema_arc = Arc::new(schema);
1057 let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
1058 (schema_map, schema_id, schema_arc)
1059 } else {
1060 return Err(Error::new(
1062 ErrorKind::DataInvalid,
1063 "No valid schema configuration found in table metadata",
1064 ));
1065 };
1066
1067 let partition_specs = if let Some(specs_vec) = value.partition_specs {
1069 specs_vec
1071 .into_iter()
1072 .map(|x| (x.spec_id(), Arc::new(x)))
1073 .collect::<HashMap<_, _>>()
1074 } else if let Some(partition_spec) = value.partition_spec {
1075 let spec = PartitionSpec::builder(current_schema.clone())
1077 .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1078 .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
1079 .build()?;
1080
1081 HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1082 } else {
1083 let spec = PartitionSpec::builder(current_schema.clone())
1085 .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1086 .build()?;
1087
1088 HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1089 };
1090
1091 let default_spec_id = value
1093 .default_spec_id
1094 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
1095
1096 let default_spec: PartitionSpecRef = partition_specs
1098 .get(&default_spec_id)
1099 .map(|x| Arc::unwrap_or_clone(x.clone()))
1100 .ok_or_else(|| {
1101 Error::new(
1102 ErrorKind::DataInvalid,
1103 format!("Default partition spec {default_spec_id} not found"),
1104 )
1105 })?
1106 .into();
1107 let default_partition_type = default_spec.partition_type(¤t_schema)?;
1108
1109 let mut metadata = TableMetadata {
1110 format_version: FormatVersion::V1,
1111 table_uuid: value.table_uuid.unwrap_or_default(),
1112 location: value.location,
1113 last_sequence_number: 0,
1114 last_updated_ms: value.last_updated_ms,
1115 last_column_id: value.last_column_id,
1116 current_schema_id,
1117 default_spec,
1118 default_partition_type,
1119 last_partition_id: value
1120 .last_partition_id
1121 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
1122 partition_specs,
1123 schemas,
1124 properties: value.properties.unwrap_or_default(),
1125 current_snapshot_id,
1126 snapshots: value
1127 .snapshots
1128 .map(|snapshots| {
1129 Ok::<_, Error>(HashMap::from_iter(
1130 snapshots
1131 .into_iter()
1132 .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
1133 .collect::<Result<Vec<_>, Error>>()?,
1134 ))
1135 })
1136 .transpose()?
1137 .unwrap_or_default(),
1138 snapshot_log: value.snapshot_log.unwrap_or_default(),
1139 metadata_log: value.metadata_log.unwrap_or_default(),
1140 sort_orders: match value.sort_orders {
1141 Some(sort_orders) => HashMap::from_iter(
1142 sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
1143 ),
1144 None => HashMap::new(),
1145 },
1146 default_sort_order_id: value
1147 .default_sort_order_id
1148 .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
1149 refs: if let Some(snapshot_id) = current_snapshot_id {
1150 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1151 snapshot_id,
1152 retention: SnapshotRetention::Branch {
1153 min_snapshots_to_keep: None,
1154 max_snapshot_age_ms: None,
1155 max_ref_age_ms: None,
1156 },
1157 })])
1158 } else {
1159 HashMap::new()
1160 },
1161 statistics: index_statistics(value.statistics),
1162 partition_statistics: index_partition_statistics(value.partition_statistics),
1163 encryption_keys: HashMap::new(),
1164 };
1165
1166 metadata.borrow_mut().try_normalize()?;
1167 Ok(metadata)
1168 }
1169 }
1170
1171 impl From<TableMetadata> for TableMetadataV2 {
1172 fn from(v: TableMetadata) -> Self {
1173 TableMetadataV2 {
1174 format_version: VersionNumber::<2>,
1175 table_uuid: v.table_uuid,
1176 location: v.location,
1177 last_sequence_number: v.last_sequence_number,
1178 last_updated_ms: v.last_updated_ms,
1179 last_column_id: v.last_column_id,
1180 schemas: v
1181 .schemas
1182 .into_values()
1183 .map(|x| {
1184 Arc::try_unwrap(x)
1185 .unwrap_or_else(|schema| schema.as_ref().clone())
1186 .into()
1187 })
1188 .collect(),
1189 current_schema_id: v.current_schema_id,
1190 partition_specs: v
1191 .partition_specs
1192 .into_values()
1193 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1194 .collect(),
1195 default_spec_id: v.default_spec.spec_id(),
1196 last_partition_id: v.last_partition_id,
1197 properties: if v.properties.is_empty() {
1198 None
1199 } else {
1200 Some(v.properties)
1201 },
1202 current_snapshot_id: v.current_snapshot_id,
1203 snapshots: if v.snapshots.is_empty() {
1204 None
1205 } else {
1206 Some(
1207 v.snapshots
1208 .into_values()
1209 .map(|x| {
1210 Arc::try_unwrap(x)
1211 .unwrap_or_else(|snapshot| snapshot.as_ref().clone())
1212 .into()
1213 })
1214 .collect(),
1215 )
1216 },
1217 snapshot_log: if v.snapshot_log.is_empty() {
1218 None
1219 } else {
1220 Some(v.snapshot_log)
1221 },
1222 metadata_log: if v.metadata_log.is_empty() {
1223 None
1224 } else {
1225 Some(v.metadata_log)
1226 },
1227 sort_orders: v
1228 .sort_orders
1229 .into_values()
1230 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1231 .collect(),
1232 default_sort_order_id: v.default_sort_order_id,
1233 refs: Some(v.refs),
1234 statistics: v.statistics.into_values().collect(),
1235 partition_statistics: v.partition_statistics.into_values().collect(),
1236 }
1237 }
1238 }
1239
1240 impl TryFrom<TableMetadata> for TableMetadataV1 {
1241 type Error = Error;
1242 fn try_from(v: TableMetadata) -> Result<Self, Error> {
1243 Ok(TableMetadataV1 {
1244 format_version: VersionNumber::<1>,
1245 table_uuid: Some(v.table_uuid),
1246 location: v.location,
1247 last_updated_ms: v.last_updated_ms,
1248 last_column_id: v.last_column_id,
1249 schema: Some(
1250 v.schemas
1251 .get(&v.current_schema_id)
1252 .ok_or(Error::new(
1253 ErrorKind::Unexpected,
1254 "current_schema_id not found in schemas",
1255 ))?
1256 .as_ref()
1257 .clone()
1258 .into(),
1259 ),
1260 schemas: Some(
1261 v.schemas
1262 .into_values()
1263 .map(|x| {
1264 Arc::try_unwrap(x)
1265 .unwrap_or_else(|schema| schema.as_ref().clone())
1266 .into()
1267 })
1268 .collect(),
1269 ),
1270 current_schema_id: Some(v.current_schema_id),
1271 partition_spec: Some(v.default_spec.fields().to_vec()),
1272 partition_specs: Some(
1273 v.partition_specs
1274 .into_values()
1275 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1276 .collect(),
1277 ),
1278 default_spec_id: Some(v.default_spec.spec_id()),
1279 last_partition_id: Some(v.last_partition_id),
1280 properties: if v.properties.is_empty() {
1281 None
1282 } else {
1283 Some(v.properties)
1284 },
1285 current_snapshot_id: v.current_snapshot_id,
1286 snapshots: if v.snapshots.is_empty() {
1287 None
1288 } else {
1289 Some(
1290 v.snapshots
1291 .into_values()
1292 .map(|x| Snapshot::clone(&x).into())
1293 .collect(),
1294 )
1295 },
1296 snapshot_log: if v.snapshot_log.is_empty() {
1297 None
1298 } else {
1299 Some(v.snapshot_log)
1300 },
1301 metadata_log: if v.metadata_log.is_empty() {
1302 None
1303 } else {
1304 Some(v.metadata_log)
1305 },
1306 sort_orders: Some(
1307 v.sort_orders
1308 .into_values()
1309 .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| s.as_ref().clone()))
1310 .collect(),
1311 ),
1312 default_sort_order_id: Some(v.default_sort_order_id),
1313 statistics: v.statistics.into_values().collect(),
1314 partition_statistics: v.partition_statistics.into_values().collect(),
1315 })
1316 }
1317 }
1318
1319 fn index_statistics(statistics: Vec<StatisticsFile>) -> HashMap<i64, StatisticsFile> {
1320 statistics
1321 .into_iter()
1322 .rev()
1323 .map(|s| (s.snapshot_id, s))
1324 .collect()
1325 }
1326
1327 fn index_partition_statistics(
1328 statistics: Vec<PartitionStatisticsFile>,
1329 ) -> HashMap<i64, PartitionStatisticsFile> {
1330 statistics
1331 .into_iter()
1332 .rev()
1333 .map(|s| (s.snapshot_id, s))
1334 .collect()
1335 }
1336}
1337
1338#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)]
1339#[repr(u8)]
1340pub enum FormatVersion {
1342 V1 = 1u8,
1344 V2 = 2u8,
1346}
1347
1348impl PartialOrd for FormatVersion {
1349 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1350 Some(self.cmp(other))
1351 }
1352}
1353
1354impl Ord for FormatVersion {
1355 fn cmp(&self, other: &Self) -> Ordering {
1356 (*self as u8).cmp(&(*other as u8))
1357 }
1358}
1359
1360impl Display for FormatVersion {
1361 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1362 match self {
1363 FormatVersion::V1 => write!(f, "v1"),
1364 FormatVersion::V2 => write!(f, "v2"),
1365 }
1366 }
1367}
1368
1369#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1370#[serde(rename_all = "kebab-case")]
1371pub struct MetadataLog {
1373 pub metadata_file: String,
1375 pub timestamp_ms: i64,
1377}
1378
1379#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1380#[serde(rename_all = "kebab-case")]
1381pub struct SnapshotLog {
1383 pub snapshot_id: i64,
1385 pub timestamp_ms: i64,
1387}
1388
1389impl SnapshotLog {
1390 pub fn timestamp(self) -> Result<DateTime<Utc>> {
1392 timestamp_ms_to_utc(self.timestamp_ms)
1393 }
1394
1395 #[inline]
1397 pub fn timestamp_ms(&self) -> i64 {
1398 self.timestamp_ms
1399 }
1400}
1401
1402#[cfg(test)]
1403mod tests {
1404 use std::collections::HashMap;
1405 use std::fs;
1406 use std::sync::Arc;
1407
1408 use anyhow::Result;
1409 use pretty_assertions::assert_eq;
1410 use tempfile::TempDir;
1411 use uuid::Uuid;
1412
1413 use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
1414 use crate::TableCreation;
1415 use crate::io::FileIOBuilder;
1416 use crate::spec::table_metadata::TableMetadata;
1417 use crate::spec::{
1418 BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PartitionStatisticsFile,
1419 PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection,
1420 SortField, SortOrder, StatisticsFile, Summary, Transform, Type, UnboundPartitionField,
1421 };
1422
1423 fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
1424 let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
1425 assert_eq!(desered_type, expected_type);
1426
1427 let sered_json = serde_json::to_string(&expected_type).unwrap();
1428 let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
1429
1430 assert_eq!(parsed_json_value, desered_type);
1431 }
1432
1433 fn get_test_table_metadata(file_name: &str) -> TableMetadata {
1434 let path = format!("testdata/table_metadata/{}", file_name);
1435 let metadata: String = fs::read_to_string(path).unwrap();
1436
1437 serde_json::from_str(&metadata).unwrap()
1438 }
1439
1440 #[test]
1441 fn test_table_data_v2() {
1442 let data = r#"
1443 {
1444 "format-version" : 2,
1445 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1446 "location": "s3://b/wh/data.db/table",
1447 "last-sequence-number" : 1,
1448 "last-updated-ms": 1515100955770,
1449 "last-column-id": 1,
1450 "schemas": [
1451 {
1452 "schema-id" : 1,
1453 "type" : "struct",
1454 "fields" :[
1455 {
1456 "id": 1,
1457 "name": "struct_name",
1458 "required": true,
1459 "type": "fixed[1]"
1460 },
1461 {
1462 "id": 4,
1463 "name": "ts",
1464 "required": true,
1465 "type": "timestamp"
1466 }
1467 ]
1468 }
1469 ],
1470 "current-schema-id" : 1,
1471 "partition-specs": [
1472 {
1473 "spec-id": 0,
1474 "fields": [
1475 {
1476 "source-id": 4,
1477 "field-id": 1000,
1478 "name": "ts_day",
1479 "transform": "day"
1480 }
1481 ]
1482 }
1483 ],
1484 "default-spec-id": 0,
1485 "last-partition-id": 1000,
1486 "properties": {
1487 "commit.retry.num-retries": "1"
1488 },
1489 "metadata-log": [
1490 {
1491 "metadata-file": "s3://bucket/.../v1.json",
1492 "timestamp-ms": 1515100
1493 }
1494 ],
1495 "refs": {},
1496 "sort-orders": [
1497 {
1498 "order-id": 0,
1499 "fields": []
1500 }
1501 ],
1502 "default-sort-order-id": 0
1503 }
1504 "#;
1505
1506 let schema = Schema::builder()
1507 .with_schema_id(1)
1508 .with_fields(vec![
1509 Arc::new(NestedField::required(
1510 1,
1511 "struct_name",
1512 Type::Primitive(PrimitiveType::Fixed(1)),
1513 )),
1514 Arc::new(NestedField::required(
1515 4,
1516 "ts",
1517 Type::Primitive(PrimitiveType::Timestamp),
1518 )),
1519 ])
1520 .build()
1521 .unwrap();
1522
1523 let partition_spec = PartitionSpec::builder(schema.clone())
1524 .with_spec_id(0)
1525 .add_unbound_field(UnboundPartitionField {
1526 name: "ts_day".to_string(),
1527 transform: Transform::Day,
1528 source_id: 4,
1529 field_id: Some(1000),
1530 })
1531 .unwrap()
1532 .build()
1533 .unwrap();
1534
1535 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1536 let expected = TableMetadata {
1537 format_version: FormatVersion::V2,
1538 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1539 location: "s3://b/wh/data.db/table".to_string(),
1540 last_updated_ms: 1515100955770,
1541 last_column_id: 1,
1542 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1543 current_schema_id: 1,
1544 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1545 default_partition_type,
1546 default_spec: partition_spec.into(),
1547 last_partition_id: 1000,
1548 default_sort_order_id: 0,
1549 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1550 snapshots: HashMap::default(),
1551 current_snapshot_id: None,
1552 last_sequence_number: 1,
1553 properties: HashMap::from_iter(vec![(
1554 "commit.retry.num-retries".to_string(),
1555 "1".to_string(),
1556 )]),
1557 snapshot_log: Vec::new(),
1558 metadata_log: vec![MetadataLog {
1559 metadata_file: "s3://bucket/.../v1.json".to_string(),
1560 timestamp_ms: 1515100,
1561 }],
1562 refs: HashMap::new(),
1563 statistics: HashMap::new(),
1564 partition_statistics: HashMap::new(),
1565 encryption_keys: HashMap::new(),
1566 };
1567
1568 let expected_json_value = serde_json::to_value(&expected).unwrap();
1569 check_table_metadata_serde(data, expected);
1570
1571 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1572 assert_eq!(json_value, expected_json_value);
1573 }
1574
1575 #[test]
1576 fn test_table_data_v1() {
1577 let data = r#"
1578 {
1579 "format-version" : 1,
1580 "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1581 "location" : "/home/iceberg/warehouse/nyc/taxis",
1582 "last-updated-ms" : 1662532818843,
1583 "last-column-id" : 5,
1584 "schema" : {
1585 "type" : "struct",
1586 "schema-id" : 0,
1587 "fields" : [ {
1588 "id" : 1,
1589 "name" : "vendor_id",
1590 "required" : false,
1591 "type" : "long"
1592 }, {
1593 "id" : 2,
1594 "name" : "trip_id",
1595 "required" : false,
1596 "type" : "long"
1597 }, {
1598 "id" : 3,
1599 "name" : "trip_distance",
1600 "required" : false,
1601 "type" : "float"
1602 }, {
1603 "id" : 4,
1604 "name" : "fare_amount",
1605 "required" : false,
1606 "type" : "double"
1607 }, {
1608 "id" : 5,
1609 "name" : "store_and_fwd_flag",
1610 "required" : false,
1611 "type" : "string"
1612 } ]
1613 },
1614 "partition-spec" : [ {
1615 "name" : "vendor_id",
1616 "transform" : "identity",
1617 "source-id" : 1,
1618 "field-id" : 1000
1619 } ],
1620 "last-partition-id" : 1000,
1621 "default-sort-order-id" : 0,
1622 "sort-orders" : [ {
1623 "order-id" : 0,
1624 "fields" : [ ]
1625 } ],
1626 "properties" : {
1627 "owner" : "root"
1628 },
1629 "current-snapshot-id" : 638933773299822130,
1630 "refs" : {
1631 "main" : {
1632 "snapshot-id" : 638933773299822130,
1633 "type" : "branch"
1634 }
1635 },
1636 "snapshots" : [ {
1637 "snapshot-id" : 638933773299822130,
1638 "timestamp-ms" : 1662532818843,
1639 "sequence-number" : 0,
1640 "summary" : {
1641 "operation" : "append",
1642 "spark.app.id" : "local-1662532784305",
1643 "added-data-files" : "4",
1644 "added-records" : "4",
1645 "added-files-size" : "6001"
1646 },
1647 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1648 "schema-id" : 0
1649 } ],
1650 "snapshot-log" : [ {
1651 "timestamp-ms" : 1662532818843,
1652 "snapshot-id" : 638933773299822130
1653 } ],
1654 "metadata-log" : [ {
1655 "timestamp-ms" : 1662532805245,
1656 "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
1657 } ]
1658 }
1659 "#;
1660
1661 let schema = Schema::builder()
1662 .with_fields(vec![
1663 Arc::new(NestedField::optional(
1664 1,
1665 "vendor_id",
1666 Type::Primitive(PrimitiveType::Long),
1667 )),
1668 Arc::new(NestedField::optional(
1669 2,
1670 "trip_id",
1671 Type::Primitive(PrimitiveType::Long),
1672 )),
1673 Arc::new(NestedField::optional(
1674 3,
1675 "trip_distance",
1676 Type::Primitive(PrimitiveType::Float),
1677 )),
1678 Arc::new(NestedField::optional(
1679 4,
1680 "fare_amount",
1681 Type::Primitive(PrimitiveType::Double),
1682 )),
1683 Arc::new(NestedField::optional(
1684 5,
1685 "store_and_fwd_flag",
1686 Type::Primitive(PrimitiveType::String),
1687 )),
1688 ])
1689 .build()
1690 .unwrap();
1691
1692 let schema = Arc::new(schema);
1693 let partition_spec = PartitionSpec::builder(schema.clone())
1694 .with_spec_id(0)
1695 .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
1696 .unwrap()
1697 .build()
1698 .unwrap();
1699
1700 let sort_order = SortOrder::builder()
1701 .with_order_id(0)
1702 .build_unbound()
1703 .unwrap();
1704
1705 let snapshot = Snapshot::builder()
1706 .with_snapshot_id(638933773299822130)
1707 .with_timestamp_ms(1662532818843)
1708 .with_sequence_number(0)
1709 .with_schema_id(0)
1710 .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
1711 .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
1712 .build();
1713
1714 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1715 let expected = TableMetadata {
1716 format_version: FormatVersion::V1,
1717 table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
1718 location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
1719 last_updated_ms: 1662532818843,
1720 last_column_id: 5,
1721 schemas: HashMap::from_iter(vec![(0, schema)]),
1722 current_schema_id: 0,
1723 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1724 default_partition_type,
1725 default_spec: Arc::new(partition_spec),
1726 last_partition_id: 1000,
1727 default_sort_order_id: 0,
1728 sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
1729 snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
1730 current_snapshot_id: Some(638933773299822130),
1731 last_sequence_number: 0,
1732 properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
1733 snapshot_log: vec![SnapshotLog {
1734 snapshot_id: 638933773299822130,
1735 timestamp_ms: 1662532818843,
1736 }],
1737 metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
1738 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
1739 statistics: HashMap::new(),
1740 partition_statistics: HashMap::new(),
1741 encryption_keys: HashMap::new(),
1742 };
1743
1744 check_table_metadata_serde(data, expected);
1745 }
1746
1747 #[test]
1748 fn test_table_data_v2_no_snapshots() {
1749 let data = r#"
1750 {
1751 "format-version" : 2,
1752 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1753 "location": "s3://b/wh/data.db/table",
1754 "last-sequence-number" : 1,
1755 "last-updated-ms": 1515100955770,
1756 "last-column-id": 1,
1757 "schemas": [
1758 {
1759 "schema-id" : 1,
1760 "type" : "struct",
1761 "fields" :[
1762 {
1763 "id": 1,
1764 "name": "struct_name",
1765 "required": true,
1766 "type": "fixed[1]"
1767 }
1768 ]
1769 }
1770 ],
1771 "current-schema-id" : 1,
1772 "partition-specs": [
1773 {
1774 "spec-id": 0,
1775 "fields": []
1776 }
1777 ],
1778 "refs": {},
1779 "default-spec-id": 0,
1780 "last-partition-id": 1000,
1781 "metadata-log": [
1782 {
1783 "metadata-file": "s3://bucket/.../v1.json",
1784 "timestamp-ms": 1515100
1785 }
1786 ],
1787 "sort-orders": [
1788 {
1789 "order-id": 0,
1790 "fields": []
1791 }
1792 ],
1793 "default-sort-order-id": 0
1794 }
1795 "#;
1796
1797 let schema = Schema::builder()
1798 .with_schema_id(1)
1799 .with_fields(vec![Arc::new(NestedField::required(
1800 1,
1801 "struct_name",
1802 Type::Primitive(PrimitiveType::Fixed(1)),
1803 ))])
1804 .build()
1805 .unwrap();
1806
1807 let partition_spec = PartitionSpec::builder(schema.clone())
1808 .with_spec_id(0)
1809 .build()
1810 .unwrap();
1811
1812 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1813 let expected = TableMetadata {
1814 format_version: FormatVersion::V2,
1815 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1816 location: "s3://b/wh/data.db/table".to_string(),
1817 last_updated_ms: 1515100955770,
1818 last_column_id: 1,
1819 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1820 current_schema_id: 1,
1821 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1822 default_partition_type,
1823 default_spec: partition_spec.into(),
1824 last_partition_id: 1000,
1825 default_sort_order_id: 0,
1826 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1827 snapshots: HashMap::default(),
1828 current_snapshot_id: None,
1829 last_sequence_number: 1,
1830 properties: HashMap::new(),
1831 snapshot_log: Vec::new(),
1832 metadata_log: vec![MetadataLog {
1833 metadata_file: "s3://bucket/.../v1.json".to_string(),
1834 timestamp_ms: 1515100,
1835 }],
1836 refs: HashMap::new(),
1837 statistics: HashMap::new(),
1838 partition_statistics: HashMap::new(),
1839 encryption_keys: HashMap::new(),
1840 };
1841
1842 let expected_json_value = serde_json::to_value(&expected).unwrap();
1843 check_table_metadata_serde(data, expected);
1844
1845 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1846 assert_eq!(json_value, expected_json_value);
1847 }
1848
1849 #[test]
1850 fn test_current_snapshot_id_must_match_main_branch() {
1851 let data = r#"
1852 {
1853 "format-version" : 2,
1854 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1855 "location": "s3://b/wh/data.db/table",
1856 "last-sequence-number" : 1,
1857 "last-updated-ms": 1515100955770,
1858 "last-column-id": 1,
1859 "schemas": [
1860 {
1861 "schema-id" : 1,
1862 "type" : "struct",
1863 "fields" :[
1864 {
1865 "id": 1,
1866 "name": "struct_name",
1867 "required": true,
1868 "type": "fixed[1]"
1869 },
1870 {
1871 "id": 4,
1872 "name": "ts",
1873 "required": true,
1874 "type": "timestamp"
1875 }
1876 ]
1877 }
1878 ],
1879 "current-schema-id" : 1,
1880 "partition-specs": [
1881 {
1882 "spec-id": 0,
1883 "fields": [
1884 {
1885 "source-id": 4,
1886 "field-id": 1000,
1887 "name": "ts_day",
1888 "transform": "day"
1889 }
1890 ]
1891 }
1892 ],
1893 "default-spec-id": 0,
1894 "last-partition-id": 1000,
1895 "properties": {
1896 "commit.retry.num-retries": "1"
1897 },
1898 "metadata-log": [
1899 {
1900 "metadata-file": "s3://bucket/.../v1.json",
1901 "timestamp-ms": 1515100
1902 }
1903 ],
1904 "sort-orders": [
1905 {
1906 "order-id": 0,
1907 "fields": []
1908 }
1909 ],
1910 "default-sort-order-id": 0,
1911 "current-snapshot-id" : 1,
1912 "refs" : {
1913 "main" : {
1914 "snapshot-id" : 2,
1915 "type" : "branch"
1916 }
1917 },
1918 "snapshots" : [ {
1919 "snapshot-id" : 1,
1920 "timestamp-ms" : 1662532818843,
1921 "sequence-number" : 0,
1922 "summary" : {
1923 "operation" : "append",
1924 "spark.app.id" : "local-1662532784305",
1925 "added-data-files" : "4",
1926 "added-records" : "4",
1927 "added-files-size" : "6001"
1928 },
1929 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1930 "schema-id" : 0
1931 },
1932 {
1933 "snapshot-id" : 2,
1934 "timestamp-ms" : 1662532818844,
1935 "sequence-number" : 0,
1936 "summary" : {
1937 "operation" : "append",
1938 "spark.app.id" : "local-1662532784305",
1939 "added-data-files" : "4",
1940 "added-records" : "4",
1941 "added-files-size" : "6001"
1942 },
1943 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1944 "schema-id" : 0
1945 } ]
1946 }
1947 "#;
1948
1949 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
1950 assert!(
1951 err.to_string()
1952 .contains("Current snapshot id does not match main branch")
1953 );
1954 }
1955
1956 #[test]
1957 fn test_main_without_current() {
1958 let data = r#"
1959 {
1960 "format-version" : 2,
1961 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1962 "location": "s3://b/wh/data.db/table",
1963 "last-sequence-number" : 1,
1964 "last-updated-ms": 1515100955770,
1965 "last-column-id": 1,
1966 "schemas": [
1967 {
1968 "schema-id" : 1,
1969 "type" : "struct",
1970 "fields" :[
1971 {
1972 "id": 1,
1973 "name": "struct_name",
1974 "required": true,
1975 "type": "fixed[1]"
1976 },
1977 {
1978 "id": 4,
1979 "name": "ts",
1980 "required": true,
1981 "type": "timestamp"
1982 }
1983 ]
1984 }
1985 ],
1986 "current-schema-id" : 1,
1987 "partition-specs": [
1988 {
1989 "spec-id": 0,
1990 "fields": [
1991 {
1992 "source-id": 4,
1993 "field-id": 1000,
1994 "name": "ts_day",
1995 "transform": "day"
1996 }
1997 ]
1998 }
1999 ],
2000 "default-spec-id": 0,
2001 "last-partition-id": 1000,
2002 "properties": {
2003 "commit.retry.num-retries": "1"
2004 },
2005 "metadata-log": [
2006 {
2007 "metadata-file": "s3://bucket/.../v1.json",
2008 "timestamp-ms": 1515100
2009 }
2010 ],
2011 "sort-orders": [
2012 {
2013 "order-id": 0,
2014 "fields": []
2015 }
2016 ],
2017 "default-sort-order-id": 0,
2018 "refs" : {
2019 "main" : {
2020 "snapshot-id" : 1,
2021 "type" : "branch"
2022 }
2023 },
2024 "snapshots" : [ {
2025 "snapshot-id" : 1,
2026 "timestamp-ms" : 1662532818843,
2027 "sequence-number" : 0,
2028 "summary" : {
2029 "operation" : "append",
2030 "spark.app.id" : "local-1662532784305",
2031 "added-data-files" : "4",
2032 "added-records" : "4",
2033 "added-files-size" : "6001"
2034 },
2035 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2036 "schema-id" : 0
2037 } ]
2038 }
2039 "#;
2040
2041 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2042 assert!(
2043 err.to_string()
2044 .contains("Current snapshot is not set, but main branch exists")
2045 );
2046 }
2047
2048 #[test]
2049 fn test_branch_snapshot_missing() {
2050 let data = r#"
2051 {
2052 "format-version" : 2,
2053 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2054 "location": "s3://b/wh/data.db/table",
2055 "last-sequence-number" : 1,
2056 "last-updated-ms": 1515100955770,
2057 "last-column-id": 1,
2058 "schemas": [
2059 {
2060 "schema-id" : 1,
2061 "type" : "struct",
2062 "fields" :[
2063 {
2064 "id": 1,
2065 "name": "struct_name",
2066 "required": true,
2067 "type": "fixed[1]"
2068 },
2069 {
2070 "id": 4,
2071 "name": "ts",
2072 "required": true,
2073 "type": "timestamp"
2074 }
2075 ]
2076 }
2077 ],
2078 "current-schema-id" : 1,
2079 "partition-specs": [
2080 {
2081 "spec-id": 0,
2082 "fields": [
2083 {
2084 "source-id": 4,
2085 "field-id": 1000,
2086 "name": "ts_day",
2087 "transform": "day"
2088 }
2089 ]
2090 }
2091 ],
2092 "default-spec-id": 0,
2093 "last-partition-id": 1000,
2094 "properties": {
2095 "commit.retry.num-retries": "1"
2096 },
2097 "metadata-log": [
2098 {
2099 "metadata-file": "s3://bucket/.../v1.json",
2100 "timestamp-ms": 1515100
2101 }
2102 ],
2103 "sort-orders": [
2104 {
2105 "order-id": 0,
2106 "fields": []
2107 }
2108 ],
2109 "default-sort-order-id": 0,
2110 "refs" : {
2111 "main" : {
2112 "snapshot-id" : 1,
2113 "type" : "branch"
2114 },
2115 "foo" : {
2116 "snapshot-id" : 2,
2117 "type" : "branch"
2118 }
2119 },
2120 "snapshots" : [ {
2121 "snapshot-id" : 1,
2122 "timestamp-ms" : 1662532818843,
2123 "sequence-number" : 0,
2124 "summary" : {
2125 "operation" : "append",
2126 "spark.app.id" : "local-1662532784305",
2127 "added-data-files" : "4",
2128 "added-records" : "4",
2129 "added-files-size" : "6001"
2130 },
2131 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2132 "schema-id" : 0
2133 } ]
2134 }
2135 "#;
2136
2137 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2138 assert!(
2139 err.to_string().contains(
2140 "Snapshot for reference foo does not exist in the existing snapshots list"
2141 )
2142 );
2143 }
2144
2145 #[test]
2146 fn test_v2_wrong_max_snapshot_sequence_number() {
2147 let data = r#"
2148 {
2149 "format-version": 2,
2150 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2151 "location": "s3://bucket/test/location",
2152 "last-sequence-number": 1,
2153 "last-updated-ms": 1602638573590,
2154 "last-column-id": 3,
2155 "current-schema-id": 0,
2156 "schemas": [
2157 {
2158 "type": "struct",
2159 "schema-id": 0,
2160 "fields": [
2161 {
2162 "id": 1,
2163 "name": "x",
2164 "required": true,
2165 "type": "long"
2166 }
2167 ]
2168 }
2169 ],
2170 "default-spec-id": 0,
2171 "partition-specs": [
2172 {
2173 "spec-id": 0,
2174 "fields": []
2175 }
2176 ],
2177 "last-partition-id": 1000,
2178 "default-sort-order-id": 0,
2179 "sort-orders": [
2180 {
2181 "order-id": 0,
2182 "fields": []
2183 }
2184 ],
2185 "properties": {},
2186 "current-snapshot-id": 3055729675574597004,
2187 "snapshots": [
2188 {
2189 "snapshot-id": 3055729675574597004,
2190 "timestamp-ms": 1555100955770,
2191 "sequence-number": 4,
2192 "summary": {
2193 "operation": "append"
2194 },
2195 "manifest-list": "s3://a/b/2.avro",
2196 "schema-id": 0
2197 }
2198 ],
2199 "statistics": [],
2200 "snapshot-log": [],
2201 "metadata-log": []
2202 }
2203 "#;
2204
2205 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2206 println!("{}", err);
2207 assert!(err.to_string().contains(
2208 "Invalid snapshot with id 3055729675574597004 and sequence number 4 greater than last sequence number 1"
2209 ));
2210
2211 let data = data.replace(
2213 r#""last-sequence-number": 1,"#,
2214 r#""last-sequence-number": 4,"#,
2215 );
2216 let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2217 assert_eq!(metadata.last_sequence_number, 4);
2218
2219 let data = data.replace(
2221 r#""last-sequence-number": 4,"#,
2222 r#""last-sequence-number": 5,"#,
2223 );
2224 let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2225 assert_eq!(metadata.last_sequence_number, 5);
2226 }
2227
2228 #[test]
2229 fn test_statistic_files() {
2230 let data = r#"
2231 {
2232 "format-version": 2,
2233 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2234 "location": "s3://bucket/test/location",
2235 "last-sequence-number": 34,
2236 "last-updated-ms": 1602638573590,
2237 "last-column-id": 3,
2238 "current-schema-id": 0,
2239 "schemas": [
2240 {
2241 "type": "struct",
2242 "schema-id": 0,
2243 "fields": [
2244 {
2245 "id": 1,
2246 "name": "x",
2247 "required": true,
2248 "type": "long"
2249 }
2250 ]
2251 }
2252 ],
2253 "default-spec-id": 0,
2254 "partition-specs": [
2255 {
2256 "spec-id": 0,
2257 "fields": []
2258 }
2259 ],
2260 "last-partition-id": 1000,
2261 "default-sort-order-id": 0,
2262 "sort-orders": [
2263 {
2264 "order-id": 0,
2265 "fields": []
2266 }
2267 ],
2268 "properties": {},
2269 "current-snapshot-id": 3055729675574597004,
2270 "snapshots": [
2271 {
2272 "snapshot-id": 3055729675574597004,
2273 "timestamp-ms": 1555100955770,
2274 "sequence-number": 1,
2275 "summary": {
2276 "operation": "append"
2277 },
2278 "manifest-list": "s3://a/b/2.avro",
2279 "schema-id": 0
2280 }
2281 ],
2282 "statistics": [
2283 {
2284 "snapshot-id": 3055729675574597004,
2285 "statistics-path": "s3://a/b/stats.puffin",
2286 "file-size-in-bytes": 413,
2287 "file-footer-size-in-bytes": 42,
2288 "blob-metadata": [
2289 {
2290 "type": "ndv",
2291 "snapshot-id": 3055729675574597004,
2292 "sequence-number": 1,
2293 "fields": [
2294 1
2295 ]
2296 }
2297 ]
2298 }
2299 ],
2300 "snapshot-log": [],
2301 "metadata-log": []
2302 }
2303 "#;
2304
2305 let schema = Schema::builder()
2306 .with_schema_id(0)
2307 .with_fields(vec![Arc::new(NestedField::required(
2308 1,
2309 "x",
2310 Type::Primitive(PrimitiveType::Long),
2311 ))])
2312 .build()
2313 .unwrap();
2314 let partition_spec = PartitionSpec::builder(schema.clone())
2315 .with_spec_id(0)
2316 .build()
2317 .unwrap();
2318 let snapshot = Snapshot::builder()
2319 .with_snapshot_id(3055729675574597004)
2320 .with_timestamp_ms(1555100955770)
2321 .with_sequence_number(1)
2322 .with_manifest_list("s3://a/b/2.avro")
2323 .with_schema_id(0)
2324 .with_summary(Summary {
2325 operation: Operation::Append,
2326 additional_properties: HashMap::new(),
2327 })
2328 .build();
2329
2330 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2331 let expected = TableMetadata {
2332 format_version: FormatVersion::V2,
2333 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2334 location: "s3://bucket/test/location".to_string(),
2335 last_updated_ms: 1602638573590,
2336 last_column_id: 3,
2337 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2338 current_schema_id: 0,
2339 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2340 default_partition_type,
2341 default_spec: Arc::new(partition_spec),
2342 last_partition_id: 1000,
2343 default_sort_order_id: 0,
2344 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2345 snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2346 current_snapshot_id: Some(3055729675574597004),
2347 last_sequence_number: 34,
2348 properties: HashMap::new(),
2349 snapshot_log: Vec::new(),
2350 metadata_log: Vec::new(),
2351 statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile {
2352 snapshot_id: 3055729675574597004,
2353 statistics_path: "s3://a/b/stats.puffin".to_string(),
2354 file_size_in_bytes: 413,
2355 file_footer_size_in_bytes: 42,
2356 key_metadata: None,
2357 blob_metadata: vec![BlobMetadata {
2358 snapshot_id: 3055729675574597004,
2359 sequence_number: 1,
2360 fields: vec![1],
2361 r#type: "ndv".to_string(),
2362 properties: HashMap::new(),
2363 }],
2364 })]),
2365 partition_statistics: HashMap::new(),
2366 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2367 snapshot_id: 3055729675574597004,
2368 retention: SnapshotRetention::Branch {
2369 min_snapshots_to_keep: None,
2370 max_snapshot_age_ms: None,
2371 max_ref_age_ms: None,
2372 },
2373 })]),
2374 encryption_keys: HashMap::new(),
2375 };
2376
2377 check_table_metadata_serde(data, expected);
2378 }
2379
2380 #[test]
2381 fn test_partition_statistics_file() {
2382 let data = r#"
2383 {
2384 "format-version": 2,
2385 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2386 "location": "s3://bucket/test/location",
2387 "last-sequence-number": 34,
2388 "last-updated-ms": 1602638573590,
2389 "last-column-id": 3,
2390 "current-schema-id": 0,
2391 "schemas": [
2392 {
2393 "type": "struct",
2394 "schema-id": 0,
2395 "fields": [
2396 {
2397 "id": 1,
2398 "name": "x",
2399 "required": true,
2400 "type": "long"
2401 }
2402 ]
2403 }
2404 ],
2405 "default-spec-id": 0,
2406 "partition-specs": [
2407 {
2408 "spec-id": 0,
2409 "fields": []
2410 }
2411 ],
2412 "last-partition-id": 1000,
2413 "default-sort-order-id": 0,
2414 "sort-orders": [
2415 {
2416 "order-id": 0,
2417 "fields": []
2418 }
2419 ],
2420 "properties": {},
2421 "current-snapshot-id": 3055729675574597004,
2422 "snapshots": [
2423 {
2424 "snapshot-id": 3055729675574597004,
2425 "timestamp-ms": 1555100955770,
2426 "sequence-number": 1,
2427 "summary": {
2428 "operation": "append"
2429 },
2430 "manifest-list": "s3://a/b/2.avro",
2431 "schema-id": 0
2432 }
2433 ],
2434 "partition-statistics": [
2435 {
2436 "snapshot-id": 3055729675574597004,
2437 "statistics-path": "s3://a/b/partition-stats.parquet",
2438 "file-size-in-bytes": 43
2439 }
2440 ],
2441 "snapshot-log": [],
2442 "metadata-log": []
2443 }
2444 "#;
2445
2446 let schema = Schema::builder()
2447 .with_schema_id(0)
2448 .with_fields(vec![Arc::new(NestedField::required(
2449 1,
2450 "x",
2451 Type::Primitive(PrimitiveType::Long),
2452 ))])
2453 .build()
2454 .unwrap();
2455 let partition_spec = PartitionSpec::builder(schema.clone())
2456 .with_spec_id(0)
2457 .build()
2458 .unwrap();
2459 let snapshot = Snapshot::builder()
2460 .with_snapshot_id(3055729675574597004)
2461 .with_timestamp_ms(1555100955770)
2462 .with_sequence_number(1)
2463 .with_manifest_list("s3://a/b/2.avro")
2464 .with_schema_id(0)
2465 .with_summary(Summary {
2466 operation: Operation::Append,
2467 additional_properties: HashMap::new(),
2468 })
2469 .build();
2470
2471 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2472 let expected = TableMetadata {
2473 format_version: FormatVersion::V2,
2474 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2475 location: "s3://bucket/test/location".to_string(),
2476 last_updated_ms: 1602638573590,
2477 last_column_id: 3,
2478 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2479 current_schema_id: 0,
2480 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2481 default_spec: Arc::new(partition_spec),
2482 default_partition_type,
2483 last_partition_id: 1000,
2484 default_sort_order_id: 0,
2485 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2486 snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2487 current_snapshot_id: Some(3055729675574597004),
2488 last_sequence_number: 34,
2489 properties: HashMap::new(),
2490 snapshot_log: Vec::new(),
2491 metadata_log: Vec::new(),
2492 statistics: HashMap::new(),
2493 partition_statistics: HashMap::from_iter(vec![(
2494 3055729675574597004,
2495 PartitionStatisticsFile {
2496 snapshot_id: 3055729675574597004,
2497 statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2498 file_size_in_bytes: 43,
2499 },
2500 )]),
2501 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2502 snapshot_id: 3055729675574597004,
2503 retention: SnapshotRetention::Branch {
2504 min_snapshots_to_keep: None,
2505 max_snapshot_age_ms: None,
2506 max_ref_age_ms: None,
2507 },
2508 })]),
2509 encryption_keys: HashMap::new(),
2510 };
2511
2512 check_table_metadata_serde(data, expected);
2513 }
2514
2515 #[test]
2516 fn test_invalid_table_uuid() -> Result<()> {
2517 let data = r#"
2518 {
2519 "format-version" : 2,
2520 "table-uuid": "xxxx"
2521 }
2522 "#;
2523 assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2524 Ok(())
2525 }
2526
2527 #[test]
2528 fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
2529 let data = r#"
2530 {
2531 "format-version" : 1
2532 }
2533 "#;
2534 assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2535 Ok(())
2536 }
2537
2538 #[test]
2539 fn test_table_metadata_v2_file_valid() {
2540 let metadata =
2541 fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
2542
2543 let schema1 = Schema::builder()
2544 .with_schema_id(0)
2545 .with_fields(vec![Arc::new(NestedField::required(
2546 1,
2547 "x",
2548 Type::Primitive(PrimitiveType::Long),
2549 ))])
2550 .build()
2551 .unwrap();
2552
2553 let schema2 = Schema::builder()
2554 .with_schema_id(1)
2555 .with_fields(vec![
2556 Arc::new(NestedField::required(
2557 1,
2558 "x",
2559 Type::Primitive(PrimitiveType::Long),
2560 )),
2561 Arc::new(
2562 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2563 .with_doc("comment"),
2564 ),
2565 Arc::new(NestedField::required(
2566 3,
2567 "z",
2568 Type::Primitive(PrimitiveType::Long),
2569 )),
2570 ])
2571 .with_identifier_field_ids(vec![1, 2])
2572 .build()
2573 .unwrap();
2574
2575 let partition_spec = PartitionSpec::builder(schema2.clone())
2576 .with_spec_id(0)
2577 .add_unbound_field(UnboundPartitionField {
2578 name: "x".to_string(),
2579 transform: Transform::Identity,
2580 source_id: 1,
2581 field_id: Some(1000),
2582 })
2583 .unwrap()
2584 .build()
2585 .unwrap();
2586
2587 let sort_order = SortOrder::builder()
2588 .with_order_id(3)
2589 .with_sort_field(SortField {
2590 source_id: 2,
2591 transform: Transform::Identity,
2592 direction: SortDirection::Ascending,
2593 null_order: NullOrder::First,
2594 })
2595 .with_sort_field(SortField {
2596 source_id: 3,
2597 transform: Transform::Bucket(4),
2598 direction: SortDirection::Descending,
2599 null_order: NullOrder::Last,
2600 })
2601 .build_unbound()
2602 .unwrap();
2603
2604 let snapshot1 = Snapshot::builder()
2605 .with_snapshot_id(3051729675574597004)
2606 .with_timestamp_ms(1515100955770)
2607 .with_sequence_number(0)
2608 .with_manifest_list("s3://a/b/1.avro")
2609 .with_summary(Summary {
2610 operation: Operation::Append,
2611 additional_properties: HashMap::new(),
2612 })
2613 .build();
2614
2615 let snapshot2 = Snapshot::builder()
2616 .with_snapshot_id(3055729675574597004)
2617 .with_parent_snapshot_id(Some(3051729675574597004))
2618 .with_timestamp_ms(1555100955770)
2619 .with_sequence_number(1)
2620 .with_schema_id(1)
2621 .with_manifest_list("s3://a/b/2.avro")
2622 .with_summary(Summary {
2623 operation: Operation::Append,
2624 additional_properties: HashMap::new(),
2625 })
2626 .build();
2627
2628 let default_partition_type = partition_spec.partition_type(&schema2).unwrap();
2629 let expected = TableMetadata {
2630 format_version: FormatVersion::V2,
2631 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2632 location: "s3://bucket/test/location".to_string(),
2633 last_updated_ms: 1602638573590,
2634 last_column_id: 3,
2635 schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]),
2636 current_schema_id: 1,
2637 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2638 default_spec: Arc::new(partition_spec),
2639 default_partition_type,
2640 last_partition_id: 1000,
2641 default_sort_order_id: 3,
2642 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2643 snapshots: HashMap::from_iter(vec![
2644 (3051729675574597004, Arc::new(snapshot1)),
2645 (3055729675574597004, Arc::new(snapshot2)),
2646 ]),
2647 current_snapshot_id: Some(3055729675574597004),
2648 last_sequence_number: 34,
2649 properties: HashMap::new(),
2650 snapshot_log: vec![
2651 SnapshotLog {
2652 snapshot_id: 3051729675574597004,
2653 timestamp_ms: 1515100955770,
2654 },
2655 SnapshotLog {
2656 snapshot_id: 3055729675574597004,
2657 timestamp_ms: 1555100955770,
2658 },
2659 ],
2660 metadata_log: Vec::new(),
2661 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2662 snapshot_id: 3055729675574597004,
2663 retention: SnapshotRetention::Branch {
2664 min_snapshots_to_keep: None,
2665 max_snapshot_age_ms: None,
2666 max_ref_age_ms: None,
2667 },
2668 })]),
2669 statistics: HashMap::new(),
2670 partition_statistics: HashMap::new(),
2671 encryption_keys: HashMap::new(),
2672 };
2673
2674 check_table_metadata_serde(&metadata, expected);
2675 }
2676
2677 #[test]
2678 fn test_table_metadata_v2_file_valid_minimal() {
2679 let metadata =
2680 fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
2681
2682 let schema = Schema::builder()
2683 .with_schema_id(0)
2684 .with_fields(vec![
2685 Arc::new(NestedField::required(
2686 1,
2687 "x",
2688 Type::Primitive(PrimitiveType::Long),
2689 )),
2690 Arc::new(
2691 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2692 .with_doc("comment"),
2693 ),
2694 Arc::new(NestedField::required(
2695 3,
2696 "z",
2697 Type::Primitive(PrimitiveType::Long),
2698 )),
2699 ])
2700 .build()
2701 .unwrap();
2702
2703 let partition_spec = PartitionSpec::builder(schema.clone())
2704 .with_spec_id(0)
2705 .add_unbound_field(UnboundPartitionField {
2706 name: "x".to_string(),
2707 transform: Transform::Identity,
2708 source_id: 1,
2709 field_id: Some(1000),
2710 })
2711 .unwrap()
2712 .build()
2713 .unwrap();
2714
2715 let sort_order = SortOrder::builder()
2716 .with_order_id(3)
2717 .with_sort_field(SortField {
2718 source_id: 2,
2719 transform: Transform::Identity,
2720 direction: SortDirection::Ascending,
2721 null_order: NullOrder::First,
2722 })
2723 .with_sort_field(SortField {
2724 source_id: 3,
2725 transform: Transform::Bucket(4),
2726 direction: SortDirection::Descending,
2727 null_order: NullOrder::Last,
2728 })
2729 .build_unbound()
2730 .unwrap();
2731
2732 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2733 let expected = TableMetadata {
2734 format_version: FormatVersion::V2,
2735 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2736 location: "s3://bucket/test/location".to_string(),
2737 last_updated_ms: 1602638573590,
2738 last_column_id: 3,
2739 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2740 current_schema_id: 0,
2741 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2742 default_partition_type,
2743 default_spec: Arc::new(partition_spec),
2744 last_partition_id: 1000,
2745 default_sort_order_id: 3,
2746 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2747 snapshots: HashMap::default(),
2748 current_snapshot_id: None,
2749 last_sequence_number: 34,
2750 properties: HashMap::new(),
2751 snapshot_log: vec![],
2752 metadata_log: Vec::new(),
2753 refs: HashMap::new(),
2754 statistics: HashMap::new(),
2755 partition_statistics: HashMap::new(),
2756 encryption_keys: HashMap::new(),
2757 };
2758
2759 check_table_metadata_serde(&metadata, expected);
2760 }
2761
2762 #[test]
2763 fn test_table_metadata_v1_file_valid() {
2764 let metadata =
2765 fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
2766
2767 let schema = Schema::builder()
2768 .with_schema_id(0)
2769 .with_fields(vec![
2770 Arc::new(NestedField::required(
2771 1,
2772 "x",
2773 Type::Primitive(PrimitiveType::Long),
2774 )),
2775 Arc::new(
2776 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2777 .with_doc("comment"),
2778 ),
2779 Arc::new(NestedField::required(
2780 3,
2781 "z",
2782 Type::Primitive(PrimitiveType::Long),
2783 )),
2784 ])
2785 .build()
2786 .unwrap();
2787
2788 let partition_spec = PartitionSpec::builder(schema.clone())
2789 .with_spec_id(0)
2790 .add_unbound_field(UnboundPartitionField {
2791 name: "x".to_string(),
2792 transform: Transform::Identity,
2793 source_id: 1,
2794 field_id: Some(1000),
2795 })
2796 .unwrap()
2797 .build()
2798 .unwrap();
2799
2800 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2801 let expected = TableMetadata {
2802 format_version: FormatVersion::V1,
2803 table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
2804 location: "s3://bucket/test/location".to_string(),
2805 last_updated_ms: 1602638573874,
2806 last_column_id: 3,
2807 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2808 current_schema_id: 0,
2809 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2810 default_spec: Arc::new(partition_spec),
2811 default_partition_type,
2812 last_partition_id: 0,
2813 default_sort_order_id: 0,
2814 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2816 snapshots: HashMap::new(),
2817 current_snapshot_id: None,
2818 last_sequence_number: 0,
2819 properties: HashMap::new(),
2820 snapshot_log: vec![],
2821 metadata_log: Vec::new(),
2822 refs: HashMap::new(),
2823 statistics: HashMap::new(),
2824 partition_statistics: HashMap::new(),
2825 encryption_keys: HashMap::new(),
2826 };
2827
2828 check_table_metadata_serde(&metadata, expected);
2829 }
2830
2831 #[test]
2832 fn test_table_metadata_v1_compat() {
2833 let metadata =
2834 fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
2835
2836 let desered_type: TableMetadata = serde_json::from_str(&metadata)
2838 .expect("Failed to deserialize TableMetadataV1Compat.json");
2839
2840 assert_eq!(desered_type.format_version(), FormatVersion::V1);
2842 assert_eq!(
2843 desered_type.uuid(),
2844 Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
2845 );
2846 assert_eq!(
2847 desered_type.location(),
2848 "s3://bucket/warehouse/iceberg/glue.db/table_name"
2849 );
2850 assert_eq!(desered_type.last_updated_ms(), 1727773114005);
2851 assert_eq!(desered_type.current_schema_id(), 0);
2852 }
2853
2854 #[test]
2855 fn test_table_metadata_v1_schemas_without_current_id() {
2856 let metadata = fs::read_to_string(
2857 "testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
2858 )
2859 .unwrap();
2860
2861 let desered_type: TableMetadata = serde_json::from_str(&metadata)
2863 .expect("Failed to deserialize TableMetadataV1SchemasWithoutCurrentId.json");
2864
2865 assert_eq!(desered_type.format_version(), FormatVersion::V1);
2867 assert_eq!(
2868 desered_type.uuid(),
2869 Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
2870 );
2871
2872 let schema = desered_type.current_schema();
2874 assert_eq!(schema.as_struct().fields().len(), 3);
2875 assert_eq!(schema.as_struct().fields()[0].name, "x");
2876 assert_eq!(schema.as_struct().fields()[1].name, "y");
2877 assert_eq!(schema.as_struct().fields()[2].name, "z");
2878 }
2879
2880 #[test]
2881 fn test_table_metadata_v1_no_valid_schema() {
2882 let metadata =
2883 fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
2884 .unwrap();
2885
2886 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
2888
2889 assert!(desered.is_err());
2890 let error_message = desered.unwrap_err().to_string();
2891 assert!(
2892 error_message.contains("No valid schema configuration found"),
2893 "Expected error about no valid schema configuration, got: {}",
2894 error_message
2895 );
2896 }
2897
2898 #[test]
2899 fn test_table_metadata_v1_partition_specs_without_default_id() {
2900 let metadata = fs::read_to_string(
2901 "testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
2902 )
2903 .unwrap();
2904
2905 let desered_type: TableMetadata = serde_json::from_str(&metadata)
2907 .expect("Failed to deserialize TableMetadataV1PartitionSpecsWithoutDefaultId.json");
2908
2909 assert_eq!(desered_type.format_version(), FormatVersion::V1);
2911 assert_eq!(
2912 desered_type.uuid(),
2913 Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
2914 );
2915
2916 assert_eq!(desered_type.default_partition_spec_id(), 2); assert_eq!(desered_type.partition_specs.len(), 2);
2919
2920 let default_spec = &desered_type.default_spec;
2922 assert_eq!(default_spec.spec_id(), 2);
2923 assert_eq!(default_spec.fields().len(), 1);
2924 assert_eq!(default_spec.fields()[0].name, "y");
2925 assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
2926 assert_eq!(default_spec.fields()[0].source_id, 2);
2927 }
2928
2929 #[test]
2930 fn test_table_metadata_v2_schema_not_found() {
2931 let metadata =
2932 fs::read_to_string("testdata/table_metadata/TableMetadataV2CurrentSchemaNotFound.json")
2933 .unwrap();
2934
2935 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
2936
2937 assert_eq!(
2938 desered.unwrap_err().to_string(),
2939 "DataInvalid => No schema exists with the current schema id 2."
2940 )
2941 }
2942
2943 #[test]
2944 fn test_table_metadata_v2_missing_sort_order() {
2945 let metadata =
2946 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
2947 .unwrap();
2948
2949 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
2950
2951 assert_eq!(
2952 desered.unwrap_err().to_string(),
2953 "data did not match any variant of untagged enum TableMetadataEnum"
2954 )
2955 }
2956
2957 #[test]
2958 fn test_table_metadata_v2_missing_partition_specs() {
2959 let metadata =
2960 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingPartitionSpecs.json")
2961 .unwrap();
2962
2963 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
2964
2965 assert_eq!(
2966 desered.unwrap_err().to_string(),
2967 "data did not match any variant of untagged enum TableMetadataEnum"
2968 )
2969 }
2970
2971 #[test]
2972 fn test_table_metadata_v2_missing_last_partition_id() {
2973 let metadata = fs::read_to_string(
2974 "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json",
2975 )
2976 .unwrap();
2977
2978 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
2979
2980 assert_eq!(
2981 desered.unwrap_err().to_string(),
2982 "data did not match any variant of untagged enum TableMetadataEnum"
2983 )
2984 }
2985
2986 #[test]
2987 fn test_table_metadata_v2_missing_schemas() {
2988 let metadata =
2989 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSchemas.json")
2990 .unwrap();
2991
2992 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
2993
2994 assert_eq!(
2995 desered.unwrap_err().to_string(),
2996 "data did not match any variant of untagged enum TableMetadataEnum"
2997 )
2998 }
2999
3000 #[test]
3001 fn test_table_metadata_v2_unsupported_version() {
3002 let metadata =
3003 fs::read_to_string("testdata/table_metadata/TableMetadataUnsupportedVersion.json")
3004 .unwrap();
3005
3006 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3007
3008 assert_eq!(
3009 desered.unwrap_err().to_string(),
3010 "data did not match any variant of untagged enum TableMetadataEnum"
3011 )
3012 }
3013
3014 #[test]
3015 fn test_order_of_format_version() {
3016 assert!(FormatVersion::V1 < FormatVersion::V2);
3017 assert_eq!(FormatVersion::V1, FormatVersion::V1);
3018 assert_eq!(FormatVersion::V2, FormatVersion::V2);
3019 }
3020
3021 #[test]
3022 fn test_default_partition_spec() {
3023 let default_spec_id = 1234;
3024 let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3025 let partition_spec = PartitionSpec::unpartition_spec();
3026 table_meta_data.default_spec = partition_spec.clone().into();
3027 table_meta_data
3028 .partition_specs
3029 .insert(default_spec_id, Arc::new(partition_spec));
3030
3031 assert_eq!(
3032 (*table_meta_data.default_partition_spec().clone()).clone(),
3033 (*table_meta_data
3034 .partition_spec_by_id(default_spec_id)
3035 .unwrap()
3036 .clone())
3037 .clone()
3038 );
3039 }
3040 #[test]
3041 fn test_default_sort_order() {
3042 let default_sort_order_id = 1234;
3043 let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3044 table_meta_data.default_sort_order_id = default_sort_order_id;
3045 table_meta_data
3046 .sort_orders
3047 .insert(default_sort_order_id, Arc::new(SortOrder::default()));
3048
3049 assert_eq!(
3050 table_meta_data.default_sort_order(),
3051 table_meta_data
3052 .sort_orders
3053 .get(&default_sort_order_id)
3054 .unwrap()
3055 )
3056 }
3057
3058 #[test]
3059 fn test_table_metadata_builder_from_table_creation() {
3060 let table_creation = TableCreation::builder()
3061 .location("s3://db/table".to_string())
3062 .name("table".to_string())
3063 .properties(HashMap::new())
3064 .schema(Schema::builder().build().unwrap())
3065 .build();
3066 let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3067 .unwrap()
3068 .build()
3069 .unwrap()
3070 .metadata;
3071 assert_eq!(table_metadata.location, "s3://db/table");
3072 assert_eq!(table_metadata.schemas.len(), 1);
3073 assert_eq!(
3074 table_metadata
3075 .schemas
3076 .get(&0)
3077 .unwrap()
3078 .as_struct()
3079 .fields()
3080 .len(),
3081 0
3082 );
3083 assert_eq!(table_metadata.properties.len(), 0);
3084 assert_eq!(
3085 table_metadata.partition_specs,
3086 HashMap::from([(
3087 0,
3088 Arc::new(
3089 PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
3090 .with_spec_id(0)
3091 .build()
3092 .unwrap()
3093 )
3094 )])
3095 );
3096 assert_eq!(
3097 table_metadata.sort_orders,
3098 HashMap::from([(
3099 0,
3100 Arc::new(SortOrder {
3101 order_id: 0,
3102 fields: vec![]
3103 })
3104 )])
3105 );
3106 }
3107
3108 #[tokio::test]
3109 async fn test_table_metadata_read_write() {
3110 let temp_dir = TempDir::new().unwrap();
3112 let temp_path = temp_dir.path().to_str().unwrap();
3113
3114 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3116
3117 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3119
3120 let metadata_location = format!("{}/metadata.json", temp_path);
3122
3123 original_metadata
3125 .write_to(&file_io, &metadata_location)
3126 .await
3127 .unwrap();
3128
3129 assert!(fs::metadata(&metadata_location).is_ok());
3131
3132 let read_metadata = TableMetadata::read_from(&file_io, &metadata_location)
3134 .await
3135 .unwrap();
3136
3137 assert_eq!(read_metadata, original_metadata);
3139 }
3140
3141 #[tokio::test]
3142 async fn test_table_metadata_read_nonexistent_file() {
3143 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3145
3146 let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;
3148
3149 assert!(result.is_err());
3151 }
3152
3153 #[test]
3154 fn test_partition_name_exists() {
3155 let schema = Schema::builder()
3156 .with_fields(vec![
3157 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3158 NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
3159 .into(),
3160 ])
3161 .build()
3162 .unwrap();
3163
3164 let spec1 = PartitionSpec::builder(schema.clone())
3165 .with_spec_id(1)
3166 .add_partition_field("data", "data_partition", Transform::Identity)
3167 .unwrap()
3168 .build()
3169 .unwrap();
3170
3171 let spec2 = PartitionSpec::builder(schema.clone())
3172 .with_spec_id(2)
3173 .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
3174 .unwrap()
3175 .build()
3176 .unwrap();
3177
3178 let metadata = TableMetadataBuilder::new(
3180 schema,
3181 spec1.clone().into_unbound(),
3182 SortOrder::unsorted_order(),
3183 "s3://test/location".to_string(),
3184 FormatVersion::V2,
3185 HashMap::new(),
3186 )
3187 .unwrap()
3188 .add_partition_spec(spec2.into_unbound())
3189 .unwrap()
3190 .build()
3191 .unwrap()
3192 .metadata;
3193
3194 assert!(metadata.partition_name_exists("data_partition"));
3195 assert!(metadata.partition_name_exists("partition_bucket"));
3196
3197 assert!(!metadata.partition_name_exists("nonexistent_field"));
3198 assert!(!metadata.partition_name_exists("data")); assert!(!metadata.partition_name_exists(""));
3200 }
3201
3202 #[test]
3203 fn test_partition_name_exists_empty_specs() {
3204 let schema = Schema::builder()
3206 .with_fields(vec![
3207 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3208 ])
3209 .build()
3210 .unwrap();
3211
3212 let metadata = TableMetadataBuilder::new(
3213 schema,
3214 PartitionSpec::unpartition_spec().into_unbound(),
3215 SortOrder::unsorted_order(),
3216 "s3://test/location".to_string(),
3217 FormatVersion::V2,
3218 HashMap::new(),
3219 )
3220 .unwrap()
3221 .build()
3222 .unwrap()
3223 .metadata;
3224
3225 assert!(!metadata.partition_name_exists("any_field"));
3226 assert!(!metadata.partition_name_exists("data"));
3227 }
3228
3229 #[test]
3230 fn test_name_exists_in_any_schema() {
3231 let schema1 = Schema::builder()
3233 .with_schema_id(1)
3234 .with_fields(vec![
3235 NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3236 NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
3237 ])
3238 .build()
3239 .unwrap();
3240
3241 let schema2 = Schema::builder()
3242 .with_schema_id(2)
3243 .with_fields(vec![
3244 NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3245 NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
3246 ])
3247 .build()
3248 .unwrap();
3249
3250 let metadata = TableMetadataBuilder::new(
3251 schema1,
3252 PartitionSpec::unpartition_spec().into_unbound(),
3253 SortOrder::unsorted_order(),
3254 "s3://test/location".to_string(),
3255 FormatVersion::V2,
3256 HashMap::new(),
3257 )
3258 .unwrap()
3259 .add_current_schema(schema2)
3260 .unwrap()
3261 .build()
3262 .unwrap()
3263 .metadata;
3264
3265 assert!(metadata.name_exists_in_any_schema("field1")); assert!(metadata.name_exists_in_any_schema("field2")); assert!(metadata.name_exists_in_any_schema("field3")); assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
3270 assert!(!metadata.name_exists_in_any_schema("field4"));
3271 assert!(!metadata.name_exists_in_any_schema(""));
3272 }
3273
3274 #[test]
3275 fn test_name_exists_in_any_schema_empty_schemas() {
3276 let schema = Schema::builder().with_fields(vec![]).build().unwrap();
3277
3278 let metadata = TableMetadataBuilder::new(
3279 schema,
3280 PartitionSpec::unpartition_spec().into_unbound(),
3281 SortOrder::unsorted_order(),
3282 "s3://test/location".to_string(),
3283 FormatVersion::V2,
3284 HashMap::new(),
3285 )
3286 .unwrap()
3287 .build()
3288 .unwrap()
3289 .metadata;
3290
3291 assert!(!metadata.name_exists_in_any_schema("any_field"));
3292 }
3293
3294 #[test]
3295 fn test_helper_methods_multi_version_scenario() {
3296 let initial_schema = Schema::builder()
3298 .with_fields(vec![
3299 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3300 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3301 NestedField::required(
3302 3,
3303 "deprecated_field",
3304 Type::Primitive(PrimitiveType::String),
3305 )
3306 .into(),
3307 ])
3308 .build()
3309 .unwrap();
3310
3311 let metadata = TableMetadataBuilder::new(
3312 initial_schema,
3313 PartitionSpec::unpartition_spec().into_unbound(),
3314 SortOrder::unsorted_order(),
3315 "s3://test/location".to_string(),
3316 FormatVersion::V2,
3317 HashMap::new(),
3318 )
3319 .unwrap();
3320
3321 let evolved_schema = Schema::builder()
3322 .with_fields(vec![
3323 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3324 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3325 NestedField::required(
3326 3,
3327 "deprecated_field",
3328 Type::Primitive(PrimitiveType::String),
3329 )
3330 .into(),
3331 NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3332 .into(),
3333 ])
3334 .build()
3335 .unwrap();
3336
3337 let _final_schema = Schema::builder()
3339 .with_fields(vec![
3340 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3341 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3342 NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3343 .into(),
3344 NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
3345 .into(),
3346 ])
3347 .build()
3348 .unwrap();
3349
3350 let final_metadata = metadata
3351 .add_current_schema(evolved_schema)
3352 .unwrap()
3353 .build()
3354 .unwrap()
3355 .metadata;
3356
3357 assert!(!final_metadata.partition_name_exists("nonexistent_partition")); assert!(final_metadata.name_exists_in_any_schema("id")); assert!(final_metadata.name_exists_in_any_schema("name")); assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); assert!(final_metadata.name_exists_in_any_schema("new_field")); assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
3364 }
3365}