1use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::hash::Hash;
25use std::io::Read as _;
26use std::sync::Arc;
27
28use _serde::TableMetadataEnum;
29use chrono::{DateTime, Utc};
30use flate2::read::GzDecoder;
31use serde::{Deserialize, Serialize};
32use serde_repr::{Deserialize_repr, Serialize_repr};
33use uuid::Uuid;
34
35use super::snapshot::SnapshotReference;
36pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
37use super::{
38 DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
39 SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
40};
41use crate::error::{Result, timestamp_ms_to_utc};
42use crate::io::FileIO;
43use crate::spec::EncryptedKey;
44use crate::{Error, ErrorKind};
45
46static MAIN_BRANCH: &str = "main";
47pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
48
49pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
50pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
51
52pub const INITIAL_ROW_ID: u64 = 0;
54pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
56pub type TableMetadataRef = Arc<TableMetadata>;
58
59#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
60#[serde(try_from = "TableMetadataEnum")]
61pub struct TableMetadata {
66 pub(crate) format_version: FormatVersion,
68 pub(crate) table_uuid: Uuid,
70 pub(crate) location: String,
72 pub(crate) last_sequence_number: i64,
74 pub(crate) last_updated_ms: i64,
76 pub(crate) last_column_id: i32,
78 pub(crate) schemas: HashMap<i32, SchemaRef>,
80 pub(crate) current_schema_id: i32,
82 pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
84 pub(crate) default_spec: PartitionSpecRef,
86 pub(crate) default_partition_type: StructType,
88 pub(crate) last_partition_id: i32,
90 pub(crate) properties: HashMap<String, String>,
94 pub(crate) current_snapshot_id: Option<i64>,
97 pub(crate) snapshots: HashMap<i64, SnapshotRef>,
102 pub(crate) snapshot_log: Vec<SnapshotLog>,
109
110 pub(crate) metadata_log: Vec<MetadataLog>,
117
118 pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
120 pub(crate) default_sort_order_id: i64,
124 pub(crate) refs: HashMap<String, SnapshotReference>,
129 pub(crate) statistics: HashMap<i64, StatisticsFile>,
131 pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
133 pub(crate) encryption_keys: HashMap<String, EncryptedKey>,
135 pub(crate) next_row_id: u64,
137}
138
139impl TableMetadata {
140 #[must_use]
147 pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
148 TableMetadataBuilder::new_from_metadata(self, current_file_location)
149 }
150
151 #[inline]
153 pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
154 self.partition_specs
155 .values()
156 .any(|spec| spec.fields().iter().any(|pf| pf.name == name))
157 }
158
159 #[inline]
161 pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
162 self.schemas
163 .values()
164 .any(|schema| schema.field_by_name(name).is_some())
165 }
166
167 #[inline]
169 pub fn format_version(&self) -> FormatVersion {
170 self.format_version
171 }
172
173 #[inline]
175 pub fn uuid(&self) -> Uuid {
176 self.table_uuid
177 }
178
179 #[inline]
181 pub fn location(&self) -> &str {
182 self.location.as_str()
183 }
184
185 #[inline]
187 pub fn last_sequence_number(&self) -> i64 {
188 self.last_sequence_number
189 }
190
191 #[inline]
196 pub fn next_sequence_number(&self) -> i64 {
197 match self.format_version {
198 FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
199 _ => self.last_sequence_number + 1,
200 }
201 }
202
203 #[inline]
205 pub fn last_column_id(&self) -> i32 {
206 self.last_column_id
207 }
208
209 #[inline]
211 pub fn last_partition_id(&self) -> i32 {
212 self.last_partition_id
213 }
214
215 #[inline]
217 pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
218 timestamp_ms_to_utc(self.last_updated_ms)
219 }
220
221 #[inline]
223 pub fn last_updated_ms(&self) -> i64 {
224 self.last_updated_ms
225 }
226
227 #[inline]
229 pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
230 self.schemas.values()
231 }
232
233 #[inline]
235 pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
236 self.schemas.get(&schema_id)
237 }
238
239 #[inline]
241 pub fn current_schema(&self) -> &SchemaRef {
242 self.schema_by_id(self.current_schema_id)
243 .expect("Current schema id set, but not found in table metadata")
244 }
245
246 #[inline]
248 pub fn current_schema_id(&self) -> SchemaId {
249 self.current_schema_id
250 }
251
252 #[inline]
254 pub fn partition_specs_iter(&self) -> impl ExactSizeIterator<Item = &PartitionSpecRef> {
255 self.partition_specs.values()
256 }
257
258 #[inline]
260 pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> {
261 self.partition_specs.get(&spec_id)
262 }
263
264 #[inline]
266 pub fn default_partition_spec(&self) -> &PartitionSpecRef {
267 &self.default_spec
268 }
269
270 #[inline]
272 pub fn default_partition_type(&self) -> &StructType {
273 &self.default_partition_type
274 }
275
276 #[inline]
277 pub fn default_partition_spec_id(&self) -> i32 {
279 self.default_spec.spec_id()
280 }
281
282 #[inline]
284 pub fn snapshots(&self) -> impl ExactSizeIterator<Item = &SnapshotRef> {
285 self.snapshots.values()
286 }
287
288 #[inline]
290 pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
291 self.snapshots.get(&snapshot_id)
292 }
293
294 #[inline]
296 pub fn history(&self) -> &[SnapshotLog] {
297 &self.snapshot_log
298 }
299
300 #[inline]
302 pub fn metadata_log(&self) -> &[MetadataLog] {
303 &self.metadata_log
304 }
305
306 #[inline]
308 pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
309 self.current_snapshot_id.map(|s| {
310 self.snapshot_by_id(s)
311 .expect("Current snapshot id has been set, but doesn't exist in metadata")
312 })
313 }
314
315 #[inline]
317 pub fn current_snapshot_id(&self) -> Option<i64> {
318 self.current_snapshot_id
319 }
320
321 #[inline]
324 pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> {
325 self.refs.get(ref_name).map(|r| {
326 self.snapshot_by_id(r.snapshot_id)
327 .unwrap_or_else(|| panic!("Snapshot id of ref {ref_name} doesn't exist"))
328 })
329 }
330
331 #[inline]
333 pub fn sort_orders_iter(&self) -> impl ExactSizeIterator<Item = &SortOrderRef> {
334 self.sort_orders.values()
335 }
336
337 #[inline]
339 pub fn sort_order_by_id(&self, sort_order_id: i64) -> Option<&SortOrderRef> {
340 self.sort_orders.get(&sort_order_id)
341 }
342
343 #[inline]
345 pub fn default_sort_order(&self) -> &SortOrderRef {
346 self.sort_orders
347 .get(&self.default_sort_order_id)
348 .expect("Default order id has been set, but not found in table metadata!")
349 }
350
351 #[inline]
353 pub fn default_sort_order_id(&self) -> i64 {
354 self.default_sort_order_id
355 }
356
357 #[inline]
359 pub fn properties(&self) -> &HashMap<String, String> {
360 &self.properties
361 }
362
363 #[inline]
365 pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
366 self.statistics.values()
367 }
368
369 #[inline]
371 pub fn partition_statistics_iter(
372 &self,
373 ) -> impl ExactSizeIterator<Item = &PartitionStatisticsFile> {
374 self.partition_statistics.values()
375 }
376
377 #[inline]
379 pub fn statistics_for_snapshot(&self, snapshot_id: i64) -> Option<&StatisticsFile> {
380 self.statistics.get(&snapshot_id)
381 }
382
383 #[inline]
385 pub fn partition_statistics_for_snapshot(
386 &self,
387 snapshot_id: i64,
388 ) -> Option<&PartitionStatisticsFile> {
389 self.partition_statistics.get(&snapshot_id)
390 }
391
392 fn construct_refs(&mut self) {
393 if let Some(current_snapshot_id) = self.current_snapshot_id
394 && !self.refs.contains_key(MAIN_BRANCH)
395 {
396 self.refs
397 .insert(MAIN_BRANCH.to_string(), SnapshotReference {
398 snapshot_id: current_snapshot_id,
399 retention: SnapshotRetention::Branch {
400 min_snapshots_to_keep: None,
401 max_snapshot_age_ms: None,
402 max_ref_age_ms: None,
403 },
404 });
405 }
406 }
407
408 #[inline]
410 pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = &EncryptedKey> {
411 self.encryption_keys.values()
412 }
413
414 #[inline]
416 pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> {
417 self.encryption_keys.get(key_id)
418 }
419
420 #[inline]
422 pub fn next_row_id(&self) -> u64 {
423 self.next_row_id
424 }
425
426 pub async fn read_from(
428 file_io: &FileIO,
429 metadata_location: impl AsRef<str>,
430 ) -> Result<TableMetadata> {
431 let metadata_location = metadata_location.as_ref();
432 let input_file = file_io.new_input(metadata_location)?;
433 let metadata_content = input_file.read().await?;
434
435 let metadata = if metadata_content.len() > 2
437 && metadata_content[0] == 0x1F
438 && metadata_content[1] == 0x8B
439 {
440 let mut decoder = GzDecoder::new(metadata_content.as_ref());
441 let mut decompressed_data = Vec::new();
442 decoder.read_to_end(&mut decompressed_data).map_err(|e| {
443 Error::new(
444 ErrorKind::DataInvalid,
445 "Trying to read compressed metadata file",
446 )
447 .with_context("file_path", metadata_location)
448 .with_source(e)
449 })?;
450 serde_json::from_slice(&decompressed_data)?
451 } else {
452 serde_json::from_slice(&metadata_content)?
453 };
454
455 Ok(metadata)
456 }
457
458 pub async fn write_to(
460 &self,
461 file_io: &FileIO,
462 metadata_location: impl AsRef<str>,
463 ) -> Result<()> {
464 file_io
465 .new_output(metadata_location)?
466 .write(serde_json::to_vec(self)?.into())
467 .await
468 }
469
470 pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
478 self.validate_current_schema()?;
479 self.normalize_current_snapshot()?;
480 self.construct_refs();
481 self.validate_refs()?;
482 self.validate_chronological_snapshot_logs()?;
483 self.validate_chronological_metadata_logs()?;
484 self.location = self.location.trim_end_matches('/').to_string();
486 self.validate_snapshot_sequence_number()?;
487 self.try_normalize_partition_spec()?;
488 self.try_normalize_sort_order()?;
489 Ok(self)
490 }
491
492 fn try_normalize_partition_spec(&mut self) -> Result<()> {
494 if self
495 .partition_spec_by_id(self.default_spec.spec_id())
496 .is_none()
497 {
498 self.partition_specs.insert(
499 self.default_spec.spec_id(),
500 Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())),
501 );
502 }
503
504 Ok(())
505 }
506
507 fn try_normalize_sort_order(&mut self) -> Result<()> {
509 if self.sort_order_by_id(self.default_sort_order_id).is_some() {
510 return Ok(());
511 }
512
513 if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
514 return Err(Error::new(
515 ErrorKind::DataInvalid,
516 format!(
517 "No sort order exists with the default sort order id {}.",
518 self.default_sort_order_id
519 ),
520 ));
521 }
522
523 let sort_order = SortOrder::unsorted_order();
524 self.sort_orders
525 .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
526 Ok(())
527 }
528
529 fn validate_current_schema(&self) -> Result<()> {
531 if self.schema_by_id(self.current_schema_id).is_none() {
532 return Err(Error::new(
533 ErrorKind::DataInvalid,
534 format!(
535 "No schema exists with the current schema id {}.",
536 self.current_schema_id
537 ),
538 ));
539 }
540 Ok(())
541 }
542
543 fn normalize_current_snapshot(&mut self) -> Result<()> {
545 if let Some(current_snapshot_id) = self.current_snapshot_id {
546 if current_snapshot_id == EMPTY_SNAPSHOT_ID {
547 self.current_snapshot_id = None;
548 } else if self.snapshot_by_id(current_snapshot_id).is_none() {
549 return Err(Error::new(
550 ErrorKind::DataInvalid,
551 format!(
552 "Snapshot for current snapshot id {current_snapshot_id} does not exist in the existing snapshots list"
553 ),
554 ));
555 }
556 }
557 Ok(())
558 }
559
560 fn validate_refs(&self) -> Result<()> {
562 for (name, snapshot_ref) in self.refs.iter() {
563 if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
564 return Err(Error::new(
565 ErrorKind::DataInvalid,
566 format!(
567 "Snapshot for reference {name} does not exist in the existing snapshots list"
568 ),
569 ));
570 }
571 }
572
573 let main_ref = self.refs.get(MAIN_BRANCH);
574 if self.current_snapshot_id.is_some() {
575 if let Some(main_ref) = main_ref
576 && main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default()
577 {
578 return Err(Error::new(
579 ErrorKind::DataInvalid,
580 format!(
581 "Current snapshot id does not match main branch ({:?} != {:?})",
582 self.current_snapshot_id.unwrap_or_default(),
583 main_ref.snapshot_id
584 ),
585 ));
586 }
587 } else if main_ref.is_some() {
588 return Err(Error::new(
589 ErrorKind::DataInvalid,
590 "Current snapshot is not set, but main branch exists",
591 ));
592 }
593
594 Ok(())
595 }
596
597 fn validate_snapshot_sequence_number(&self) -> Result<()> {
599 if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
600 return Err(Error::new(
601 ErrorKind::DataInvalid,
602 format!(
603 "Last sequence number must be 0 in v1. Found {}",
604 self.last_sequence_number
605 ),
606 ));
607 }
608
609 if self.format_version >= FormatVersion::V2
610 && let Some(snapshot) = self
611 .snapshots
612 .values()
613 .find(|snapshot| snapshot.sequence_number() > self.last_sequence_number)
614 {
615 return Err(Error::new(
616 ErrorKind::DataInvalid,
617 format!(
618 "Invalid snapshot with id {} and sequence number {} greater than last sequence number {}",
619 snapshot.snapshot_id(),
620 snapshot.sequence_number(),
621 self.last_sequence_number
622 ),
623 ));
624 }
625
626 Ok(())
627 }
628
629 fn validate_chronological_snapshot_logs(&self) -> Result<()> {
631 for window in self.snapshot_log.windows(2) {
632 let (prev, curr) = (&window[0], &window[1]);
633 if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
636 return Err(Error::new(
637 ErrorKind::DataInvalid,
638 "Expected sorted snapshot log entries",
639 ));
640 }
641 }
642
643 if let Some(last) = self.snapshot_log.last() {
644 if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
647 return Err(Error::new(
648 ErrorKind::DataInvalid,
649 format!(
650 "Invalid update timestamp {}: before last snapshot log entry at {}",
651 self.last_updated_ms, last.timestamp_ms
652 ),
653 ));
654 }
655 }
656 Ok(())
657 }
658
659 fn validate_chronological_metadata_logs(&self) -> Result<()> {
660 for window in self.metadata_log.windows(2) {
661 let (prev, curr) = (&window[0], &window[1]);
662 if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
665 return Err(Error::new(
666 ErrorKind::DataInvalid,
667 "Expected sorted metadata log entries",
668 ));
669 }
670 }
671
672 if let Some(last) = self.metadata_log.last() {
673 if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
676 return Err(Error::new(
677 ErrorKind::DataInvalid,
678 format!(
679 "Invalid update timestamp {}: before last metadata log entry at {}",
680 self.last_updated_ms, last.timestamp_ms
681 ),
682 ));
683 }
684 }
685
686 Ok(())
687 }
688}
689
690pub(super) mod _serde {
691 use std::borrow::BorrowMut;
692 use std::collections::HashMap;
697 use std::sync::Arc;
702
703 use serde::{Deserialize, Serialize};
704 use uuid::Uuid;
705
706 use super::{
707 DEFAULT_PARTITION_SPEC_ID, FormatVersion, MAIN_BRANCH, MetadataLog, SnapshotLog,
708 TableMetadata,
709 };
710 use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
711 use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3};
712 use crate::spec::{
713 EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec, PartitionSpecRef,
714 PartitionStatisticsFile, Schema, SchemaRef, Snapshot, SnapshotReference, SnapshotRetention,
715 SortOrder, StatisticsFile,
716 };
717 use crate::{Error, ErrorKind};
718
719 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
720 #[serde(untagged)]
721 pub(super) enum TableMetadataEnum {
722 V3(TableMetadataV3),
723 V2(TableMetadataV2),
724 V1(TableMetadataV1),
725 }
726
727 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
728 #[serde(rename_all = "kebab-case")]
729 pub(super) struct TableMetadataV3 {
731 pub format_version: VersionNumber<3>,
732 #[serde(flatten)]
733 pub shared: TableMetadataV2V3Shared,
734 pub next_row_id: u64,
735 #[serde(skip_serializing_if = "Option::is_none")]
736 pub encryption_keys: Option<Vec<EncryptedKey>>,
737 #[serde(skip_serializing_if = "Option::is_none")]
738 pub snapshots: Option<Vec<SnapshotV3>>,
739 }
740
741 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
742 #[serde(rename_all = "kebab-case")]
743 pub(super) struct TableMetadataV2V3Shared {
745 pub table_uuid: Uuid,
746 pub location: String,
747 pub last_sequence_number: i64,
748 pub last_updated_ms: i64,
749 pub last_column_id: i32,
750 pub schemas: Vec<SchemaV2>,
751 pub current_schema_id: i32,
752 pub partition_specs: Vec<PartitionSpec>,
753 pub default_spec_id: i32,
754 pub last_partition_id: i32,
755 #[serde(skip_serializing_if = "Option::is_none")]
756 pub properties: Option<HashMap<String, String>>,
757 #[serde(skip_serializing_if = "Option::is_none")]
758 pub current_snapshot_id: Option<i64>,
759 #[serde(skip_serializing_if = "Option::is_none")]
760 pub snapshot_log: Option<Vec<SnapshotLog>>,
761 #[serde(skip_serializing_if = "Option::is_none")]
762 pub metadata_log: Option<Vec<MetadataLog>>,
763 pub sort_orders: Vec<SortOrder>,
764 pub default_sort_order_id: i64,
765 #[serde(skip_serializing_if = "Option::is_none")]
766 pub refs: Option<HashMap<String, SnapshotReference>>,
767 #[serde(default, skip_serializing_if = "Vec::is_empty")]
768 pub statistics: Vec<StatisticsFile>,
769 #[serde(default, skip_serializing_if = "Vec::is_empty")]
770 pub partition_statistics: Vec<PartitionStatisticsFile>,
771 }
772
773 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
774 #[serde(rename_all = "kebab-case")]
775 pub(super) struct TableMetadataV2 {
777 pub format_version: VersionNumber<2>,
778 #[serde(flatten)]
779 pub shared: TableMetadataV2V3Shared,
780 #[serde(skip_serializing_if = "Option::is_none")]
781 pub snapshots: Option<Vec<SnapshotV2>>,
782 }
783
784 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
785 #[serde(rename_all = "kebab-case")]
786 pub(super) struct TableMetadataV1 {
788 pub format_version: VersionNumber<1>,
789 #[serde(skip_serializing_if = "Option::is_none")]
790 pub table_uuid: Option<Uuid>,
791 pub location: String,
792 pub last_updated_ms: i64,
793 pub last_column_id: i32,
794 pub schema: Option<SchemaV1>,
796 #[serde(skip_serializing_if = "Option::is_none")]
797 pub schemas: Option<Vec<SchemaV1>>,
798 #[serde(skip_serializing_if = "Option::is_none")]
799 pub current_schema_id: Option<i32>,
800 pub partition_spec: Option<Vec<PartitionField>>,
802 #[serde(skip_serializing_if = "Option::is_none")]
803 pub partition_specs: Option<Vec<PartitionSpec>>,
804 #[serde(skip_serializing_if = "Option::is_none")]
805 pub default_spec_id: Option<i32>,
806 #[serde(skip_serializing_if = "Option::is_none")]
807 pub last_partition_id: Option<i32>,
808 #[serde(skip_serializing_if = "Option::is_none")]
809 pub properties: Option<HashMap<String, String>>,
810 #[serde(skip_serializing_if = "Option::is_none")]
811 pub current_snapshot_id: Option<i64>,
812 #[serde(skip_serializing_if = "Option::is_none")]
813 pub snapshots: Option<Vec<SnapshotV1>>,
814 #[serde(skip_serializing_if = "Option::is_none")]
815 pub snapshot_log: Option<Vec<SnapshotLog>>,
816 #[serde(skip_serializing_if = "Option::is_none")]
817 pub metadata_log: Option<Vec<MetadataLog>>,
818 pub sort_orders: Option<Vec<SortOrder>>,
819 pub default_sort_order_id: Option<i64>,
820 #[serde(default, skip_serializing_if = "Vec::is_empty")]
821 pub statistics: Vec<StatisticsFile>,
822 #[serde(default, skip_serializing_if = "Vec::is_empty")]
823 pub partition_statistics: Vec<PartitionStatisticsFile>,
824 }
825
826 #[derive(Debug, PartialEq, Eq)]
828 pub(crate) struct VersionNumber<const V: u8>;
829
830 impl Serialize for TableMetadata {
831 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
832 where S: serde::Serializer {
833 let table_metadata_enum: TableMetadataEnum =
835 self.clone().try_into().map_err(serde::ser::Error::custom)?;
836
837 table_metadata_enum.serialize(serializer)
838 }
839 }
840
841 impl<const V: u8> Serialize for VersionNumber<V> {
842 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
843 where S: serde::Serializer {
844 serializer.serialize_u8(V)
845 }
846 }
847
848 impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
849 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
850 where D: serde::Deserializer<'de> {
851 let value = u8::deserialize(deserializer)?;
852 if value == V {
853 Ok(VersionNumber::<V>)
854 } else {
855 Err(serde::de::Error::custom("Invalid Version"))
856 }
857 }
858 }
859
860 impl TryFrom<TableMetadataEnum> for TableMetadata {
861 type Error = Error;
862 fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
863 match value {
864 TableMetadataEnum::V3(value) => value.try_into(),
865 TableMetadataEnum::V2(value) => value.try_into(),
866 TableMetadataEnum::V1(value) => value.try_into(),
867 }
868 }
869 }
870
871 impl TryFrom<TableMetadata> for TableMetadataEnum {
872 type Error = Error;
873 fn try_from(value: TableMetadata) -> Result<Self, Error> {
874 Ok(match value.format_version {
875 FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?),
876 FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
877 FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
878 })
879 }
880 }
881
882 impl TryFrom<TableMetadataV3> for TableMetadata {
883 type Error = Error;
884 fn try_from(value: TableMetadataV3) -> Result<Self, self::Error> {
885 let TableMetadataV3 {
886 format_version: _,
887 shared: value,
888 next_row_id,
889 encryption_keys,
890 snapshots,
891 } = value;
892 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
893 None
894 } else {
895 value.current_snapshot_id
896 };
897 let schemas = HashMap::from_iter(
898 value
899 .schemas
900 .into_iter()
901 .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
902 .collect::<Result<Vec<_>, Error>>()?,
903 );
904
905 let current_schema: &SchemaRef =
906 schemas.get(&value.current_schema_id).ok_or_else(|| {
907 Error::new(
908 ErrorKind::DataInvalid,
909 format!(
910 "No schema exists with the current schema id {}.",
911 value.current_schema_id
912 ),
913 )
914 })?;
915 let partition_specs = HashMap::from_iter(
916 value
917 .partition_specs
918 .into_iter()
919 .map(|x| (x.spec_id(), Arc::new(x))),
920 );
921 let default_spec_id = value.default_spec_id;
922 let default_spec: PartitionSpecRef = partition_specs
923 .get(&value.default_spec_id)
924 .map(|spec| (**spec).clone())
925 .or_else(|| {
926 (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
927 .then(PartitionSpec::unpartition_spec)
928 })
929 .ok_or_else(|| {
930 Error::new(
931 ErrorKind::DataInvalid,
932 format!("Default partition spec {default_spec_id} not found"),
933 )
934 })?
935 .into();
936 let default_partition_type = default_spec.partition_type(current_schema)?;
937
938 let mut metadata = TableMetadata {
939 format_version: FormatVersion::V3,
940 table_uuid: value.table_uuid,
941 location: value.location,
942 last_sequence_number: value.last_sequence_number,
943 last_updated_ms: value.last_updated_ms,
944 last_column_id: value.last_column_id,
945 current_schema_id: value.current_schema_id,
946 schemas,
947 partition_specs,
948 default_partition_type,
949 default_spec,
950 last_partition_id: value.last_partition_id,
951 properties: value.properties.unwrap_or_default(),
952 current_snapshot_id,
953 snapshots: snapshots
954 .map(|snapshots| {
955 HashMap::from_iter(
956 snapshots
957 .into_iter()
958 .map(|x| (x.snapshot_id, Arc::new(x.into()))),
959 )
960 })
961 .unwrap_or_default(),
962 snapshot_log: value.snapshot_log.unwrap_or_default(),
963 metadata_log: value.metadata_log.unwrap_or_default(),
964 sort_orders: HashMap::from_iter(
965 value
966 .sort_orders
967 .into_iter()
968 .map(|x| (x.order_id, Arc::new(x))),
969 ),
970 default_sort_order_id: value.default_sort_order_id,
971 refs: value.refs.unwrap_or_else(|| {
972 if let Some(snapshot_id) = current_snapshot_id {
973 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
974 snapshot_id,
975 retention: SnapshotRetention::Branch {
976 min_snapshots_to_keep: None,
977 max_snapshot_age_ms: None,
978 max_ref_age_ms: None,
979 },
980 })])
981 } else {
982 HashMap::new()
983 }
984 }),
985 statistics: index_statistics(value.statistics),
986 partition_statistics: index_partition_statistics(value.partition_statistics),
987 encryption_keys: encryption_keys
988 .map(|keys| {
989 HashMap::from_iter(keys.into_iter().map(|key| (key.key_id.clone(), key)))
990 })
991 .unwrap_or_default(),
992 next_row_id,
993 };
994
995 metadata.borrow_mut().try_normalize()?;
996 Ok(metadata)
997 }
998 }
999
1000 impl TryFrom<TableMetadataV2> for TableMetadata {
1001 type Error = Error;
1002 fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
1003 let snapshots = value.snapshots;
1004 let value = value.shared;
1005 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1006 None
1007 } else {
1008 value.current_snapshot_id
1009 };
1010 let schemas = HashMap::from_iter(
1011 value
1012 .schemas
1013 .into_iter()
1014 .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
1015 .collect::<Result<Vec<_>, Error>>()?,
1016 );
1017
1018 let current_schema: &SchemaRef =
1019 schemas.get(&value.current_schema_id).ok_or_else(|| {
1020 Error::new(
1021 ErrorKind::DataInvalid,
1022 format!(
1023 "No schema exists with the current schema id {}.",
1024 value.current_schema_id
1025 ),
1026 )
1027 })?;
1028 let partition_specs = HashMap::from_iter(
1029 value
1030 .partition_specs
1031 .into_iter()
1032 .map(|x| (x.spec_id(), Arc::new(x))),
1033 );
1034 let default_spec_id = value.default_spec_id;
1035 let default_spec: PartitionSpecRef = partition_specs
1036 .get(&value.default_spec_id)
1037 .map(|spec| (**spec).clone())
1038 .or_else(|| {
1039 (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
1040 .then(PartitionSpec::unpartition_spec)
1041 })
1042 .ok_or_else(|| {
1043 Error::new(
1044 ErrorKind::DataInvalid,
1045 format!("Default partition spec {default_spec_id} not found"),
1046 )
1047 })?
1048 .into();
1049 let default_partition_type = default_spec.partition_type(current_schema)?;
1050
1051 let mut metadata = TableMetadata {
1052 format_version: FormatVersion::V2,
1053 table_uuid: value.table_uuid,
1054 location: value.location,
1055 last_sequence_number: value.last_sequence_number,
1056 last_updated_ms: value.last_updated_ms,
1057 last_column_id: value.last_column_id,
1058 current_schema_id: value.current_schema_id,
1059 schemas,
1060 partition_specs,
1061 default_partition_type,
1062 default_spec,
1063 last_partition_id: value.last_partition_id,
1064 properties: value.properties.unwrap_or_default(),
1065 current_snapshot_id,
1066 snapshots: snapshots
1067 .map(|snapshots| {
1068 HashMap::from_iter(
1069 snapshots
1070 .into_iter()
1071 .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1072 )
1073 })
1074 .unwrap_or_default(),
1075 snapshot_log: value.snapshot_log.unwrap_or_default(),
1076 metadata_log: value.metadata_log.unwrap_or_default(),
1077 sort_orders: HashMap::from_iter(
1078 value
1079 .sort_orders
1080 .into_iter()
1081 .map(|x| (x.order_id, Arc::new(x))),
1082 ),
1083 default_sort_order_id: value.default_sort_order_id,
1084 refs: value.refs.unwrap_or_else(|| {
1085 if let Some(snapshot_id) = current_snapshot_id {
1086 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1087 snapshot_id,
1088 retention: SnapshotRetention::Branch {
1089 min_snapshots_to_keep: None,
1090 max_snapshot_age_ms: None,
1091 max_ref_age_ms: None,
1092 },
1093 })])
1094 } else {
1095 HashMap::new()
1096 }
1097 }),
1098 statistics: index_statistics(value.statistics),
1099 partition_statistics: index_partition_statistics(value.partition_statistics),
1100 encryption_keys: HashMap::new(),
1101 next_row_id: INITIAL_ROW_ID,
1102 };
1103
1104 metadata.borrow_mut().try_normalize()?;
1105 Ok(metadata)
1106 }
1107 }
1108
1109 impl TryFrom<TableMetadataV1> for TableMetadata {
1110 type Error = Error;
1111 fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
1112 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1113 None
1114 } else {
1115 value.current_snapshot_id
1116 };
1117
1118 let (schemas, current_schema_id, current_schema) =
1119 if let (Some(schemas_vec), Some(schema_id)) =
1120 (&value.schemas, value.current_schema_id)
1121 {
1122 let schema_map = HashMap::from_iter(
1124 schemas_vec
1125 .clone()
1126 .into_iter()
1127 .map(|schema| {
1128 let schema: Schema = schema.try_into()?;
1129 Ok((schema.schema_id(), Arc::new(schema)))
1130 })
1131 .collect::<Result<Vec<_>, Error>>()?,
1132 );
1133
1134 let schema = schema_map
1135 .get(&schema_id)
1136 .ok_or_else(|| {
1137 Error::new(
1138 ErrorKind::DataInvalid,
1139 format!("No schema exists with the current schema id {schema_id}."),
1140 )
1141 })?
1142 .clone();
1143 (schema_map, schema_id, schema)
1144 } else if let Some(schema) = value.schema {
1145 let schema: Schema = schema.try_into()?;
1147 let schema_id = schema.schema_id();
1148 let schema_arc = Arc::new(schema);
1149 let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
1150 (schema_map, schema_id, schema_arc)
1151 } else {
1152 return Err(Error::new(
1154 ErrorKind::DataInvalid,
1155 "No valid schema configuration found in table metadata",
1156 ));
1157 };
1158
1159 let partition_specs = if let Some(specs_vec) = value.partition_specs {
1161 specs_vec
1163 .into_iter()
1164 .map(|x| (x.spec_id(), Arc::new(x)))
1165 .collect::<HashMap<_, _>>()
1166 } else if let Some(partition_spec) = value.partition_spec {
1167 let spec = PartitionSpec::builder(current_schema.clone())
1169 .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1170 .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
1171 .build()?;
1172
1173 HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1174 } else {
1175 let spec = PartitionSpec::builder(current_schema.clone())
1177 .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1178 .build()?;
1179
1180 HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1181 };
1182
1183 let default_spec_id = value
1185 .default_spec_id
1186 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
1187
1188 let default_spec: PartitionSpecRef = partition_specs
1190 .get(&default_spec_id)
1191 .map(|x| Arc::unwrap_or_clone(x.clone()))
1192 .ok_or_else(|| {
1193 Error::new(
1194 ErrorKind::DataInvalid,
1195 format!("Default partition spec {default_spec_id} not found"),
1196 )
1197 })?
1198 .into();
1199 let default_partition_type = default_spec.partition_type(¤t_schema)?;
1200
1201 let mut metadata = TableMetadata {
1202 format_version: FormatVersion::V1,
1203 table_uuid: value.table_uuid.unwrap_or_default(),
1204 location: value.location,
1205 last_sequence_number: 0,
1206 last_updated_ms: value.last_updated_ms,
1207 last_column_id: value.last_column_id,
1208 current_schema_id,
1209 default_spec,
1210 default_partition_type,
1211 last_partition_id: value
1212 .last_partition_id
1213 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
1214 partition_specs,
1215 schemas,
1216 properties: value.properties.unwrap_or_default(),
1217 current_snapshot_id,
1218 snapshots: value
1219 .snapshots
1220 .map(|snapshots| {
1221 Ok::<_, Error>(HashMap::from_iter(
1222 snapshots
1223 .into_iter()
1224 .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
1225 .collect::<Result<Vec<_>, Error>>()?,
1226 ))
1227 })
1228 .transpose()?
1229 .unwrap_or_default(),
1230 snapshot_log: value.snapshot_log.unwrap_or_default(),
1231 metadata_log: value.metadata_log.unwrap_or_default(),
1232 sort_orders: match value.sort_orders {
1233 Some(sort_orders) => HashMap::from_iter(
1234 sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
1235 ),
1236 None => HashMap::new(),
1237 },
1238 default_sort_order_id: value
1239 .default_sort_order_id
1240 .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
1241 refs: if let Some(snapshot_id) = current_snapshot_id {
1242 HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1243 snapshot_id,
1244 retention: SnapshotRetention::Branch {
1245 min_snapshots_to_keep: None,
1246 max_snapshot_age_ms: None,
1247 max_ref_age_ms: None,
1248 },
1249 })])
1250 } else {
1251 HashMap::new()
1252 },
1253 statistics: index_statistics(value.statistics),
1254 partition_statistics: index_partition_statistics(value.partition_statistics),
1255 encryption_keys: HashMap::new(),
1256 next_row_id: INITIAL_ROW_ID, };
1258
1259 metadata.borrow_mut().try_normalize()?;
1260 Ok(metadata)
1261 }
1262 }
1263
1264 impl TryFrom<TableMetadata> for TableMetadataV3 {
1265 type Error = Error;
1266
1267 fn try_from(mut v: TableMetadata) -> Result<Self, Self::Error> {
1268 let next_row_id = v.next_row_id;
1269 let encryption_keys = std::mem::take(&mut v.encryption_keys);
1270 let snapshots = std::mem::take(&mut v.snapshots);
1271 let shared = v.into();
1272
1273 Ok(TableMetadataV3 {
1274 format_version: VersionNumber::<3>,
1275 shared,
1276 next_row_id,
1277 encryption_keys: if encryption_keys.is_empty() {
1278 None
1279 } else {
1280 Some(encryption_keys.into_values().collect())
1281 },
1282 snapshots: if snapshots.is_empty() {
1283 None
1284 } else {
1285 Some(
1286 snapshots
1287 .into_values()
1288 .map(|s| SnapshotV3::try_from(Arc::unwrap_or_clone(s)))
1289 .collect::<Result<_, _>>()?,
1290 )
1291 },
1292 })
1293 }
1294 }
1295
1296 impl From<TableMetadata> for TableMetadataV2 {
1297 fn from(mut v: TableMetadata) -> Self {
1298 let snapshots = std::mem::take(&mut v.snapshots);
1299 let shared = v.into();
1300
1301 TableMetadataV2 {
1302 format_version: VersionNumber::<2>,
1303 shared,
1304 snapshots: if snapshots.is_empty() {
1305 None
1306 } else {
1307 Some(
1308 snapshots
1309 .into_values()
1310 .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s)))
1311 .collect(),
1312 )
1313 },
1314 }
1315 }
1316 }
1317
1318 impl From<TableMetadata> for TableMetadataV2V3Shared {
1319 fn from(v: TableMetadata) -> Self {
1320 TableMetadataV2V3Shared {
1321 table_uuid: v.table_uuid,
1322 location: v.location,
1323 last_sequence_number: v.last_sequence_number,
1324 last_updated_ms: v.last_updated_ms,
1325 last_column_id: v.last_column_id,
1326 schemas: v
1327 .schemas
1328 .into_values()
1329 .map(|x| {
1330 Arc::try_unwrap(x)
1331 .unwrap_or_else(|schema| schema.as_ref().clone())
1332 .into()
1333 })
1334 .collect(),
1335 current_schema_id: v.current_schema_id,
1336 partition_specs: v
1337 .partition_specs
1338 .into_values()
1339 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1340 .collect(),
1341 default_spec_id: v.default_spec.spec_id(),
1342 last_partition_id: v.last_partition_id,
1343 properties: if v.properties.is_empty() {
1344 None
1345 } else {
1346 Some(v.properties)
1347 },
1348 current_snapshot_id: v.current_snapshot_id,
1349 snapshot_log: if v.snapshot_log.is_empty() {
1350 None
1351 } else {
1352 Some(v.snapshot_log)
1353 },
1354 metadata_log: if v.metadata_log.is_empty() {
1355 None
1356 } else {
1357 Some(v.metadata_log)
1358 },
1359 sort_orders: v
1360 .sort_orders
1361 .into_values()
1362 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1363 .collect(),
1364 default_sort_order_id: v.default_sort_order_id,
1365 refs: Some(v.refs),
1366 statistics: v.statistics.into_values().collect(),
1367 partition_statistics: v.partition_statistics.into_values().collect(),
1368 }
1369 }
1370 }
1371
1372 impl TryFrom<TableMetadata> for TableMetadataV1 {
1373 type Error = Error;
1374 fn try_from(v: TableMetadata) -> Result<Self, Error> {
1375 Ok(TableMetadataV1 {
1376 format_version: VersionNumber::<1>,
1377 table_uuid: Some(v.table_uuid),
1378 location: v.location,
1379 last_updated_ms: v.last_updated_ms,
1380 last_column_id: v.last_column_id,
1381 schema: Some(
1382 v.schemas
1383 .get(&v.current_schema_id)
1384 .ok_or(Error::new(
1385 ErrorKind::Unexpected,
1386 "current_schema_id not found in schemas",
1387 ))?
1388 .as_ref()
1389 .clone()
1390 .into(),
1391 ),
1392 schemas: Some(
1393 v.schemas
1394 .into_values()
1395 .map(|x| {
1396 Arc::try_unwrap(x)
1397 .unwrap_or_else(|schema| schema.as_ref().clone())
1398 .into()
1399 })
1400 .collect(),
1401 ),
1402 current_schema_id: Some(v.current_schema_id),
1403 partition_spec: Some(v.default_spec.fields().to_vec()),
1404 partition_specs: Some(
1405 v.partition_specs
1406 .into_values()
1407 .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1408 .collect(),
1409 ),
1410 default_spec_id: Some(v.default_spec.spec_id()),
1411 last_partition_id: Some(v.last_partition_id),
1412 properties: if v.properties.is_empty() {
1413 None
1414 } else {
1415 Some(v.properties)
1416 },
1417 current_snapshot_id: v.current_snapshot_id,
1418 snapshots: if v.snapshots.is_empty() {
1419 None
1420 } else {
1421 Some(
1422 v.snapshots
1423 .into_values()
1424 .map(|x| Snapshot::clone(&x).into())
1425 .collect(),
1426 )
1427 },
1428 snapshot_log: if v.snapshot_log.is_empty() {
1429 None
1430 } else {
1431 Some(v.snapshot_log)
1432 },
1433 metadata_log: if v.metadata_log.is_empty() {
1434 None
1435 } else {
1436 Some(v.metadata_log)
1437 },
1438 sort_orders: Some(
1439 v.sort_orders
1440 .into_values()
1441 .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| s.as_ref().clone()))
1442 .collect(),
1443 ),
1444 default_sort_order_id: Some(v.default_sort_order_id),
1445 statistics: v.statistics.into_values().collect(),
1446 partition_statistics: v.partition_statistics.into_values().collect(),
1447 })
1448 }
1449 }
1450
1451 fn index_statistics(statistics: Vec<StatisticsFile>) -> HashMap<i64, StatisticsFile> {
1452 statistics
1453 .into_iter()
1454 .rev()
1455 .map(|s| (s.snapshot_id, s))
1456 .collect()
1457 }
1458
1459 fn index_partition_statistics(
1460 statistics: Vec<PartitionStatisticsFile>,
1461 ) -> HashMap<i64, PartitionStatisticsFile> {
1462 statistics
1463 .into_iter()
1464 .rev()
1465 .map(|s| (s.snapshot_id, s))
1466 .collect()
1467 }
1468}
1469
1470#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)]
1471#[repr(u8)]
1472pub enum FormatVersion {
1474 V1 = 1u8,
1476 V2 = 2u8,
1478 V3 = 3u8,
1480}
1481
1482impl PartialOrd for FormatVersion {
1483 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1484 Some(self.cmp(other))
1485 }
1486}
1487
1488impl Ord for FormatVersion {
1489 fn cmp(&self, other: &Self) -> Ordering {
1490 (*self as u8).cmp(&(*other as u8))
1491 }
1492}
1493
1494impl Display for FormatVersion {
1495 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1496 match self {
1497 FormatVersion::V1 => write!(f, "v1"),
1498 FormatVersion::V2 => write!(f, "v2"),
1499 FormatVersion::V3 => write!(f, "v3"),
1500 }
1501 }
1502}
1503
1504#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1505#[serde(rename_all = "kebab-case")]
1506pub struct MetadataLog {
1508 pub metadata_file: String,
1510 pub timestamp_ms: i64,
1512}
1513
1514#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1515#[serde(rename_all = "kebab-case")]
1516pub struct SnapshotLog {
1518 pub snapshot_id: i64,
1520 pub timestamp_ms: i64,
1522}
1523
1524impl SnapshotLog {
1525 pub fn timestamp(self) -> Result<DateTime<Utc>> {
1527 timestamp_ms_to_utc(self.timestamp_ms)
1528 }
1529
1530 #[inline]
1532 pub fn timestamp_ms(&self) -> i64 {
1533 self.timestamp_ms
1534 }
1535}
1536
1537#[cfg(test)]
1538mod tests {
1539 use std::collections::HashMap;
1540 use std::fs;
1541 use std::io::Write as _;
1542 use std::sync::Arc;
1543
1544 use anyhow::Result;
1545 use base64::Engine as _;
1546 use pretty_assertions::assert_eq;
1547 use tempfile::TempDir;
1548 use uuid::Uuid;
1549
1550 use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
1551 use crate::TableCreation;
1552 use crate::io::FileIOBuilder;
1553 use crate::spec::table_metadata::TableMetadata;
1554 use crate::spec::{
1555 BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
1556 PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
1557 SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
1558 Summary, Transform, Type, UnboundPartitionField,
1559 };
1560
1561 fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
1562 let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
1563 assert_eq!(desered_type, expected_type);
1564
1565 let sered_json = serde_json::to_string(&expected_type).unwrap();
1566 let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
1567
1568 assert_eq!(parsed_json_value, desered_type);
1569 }
1570
1571 fn get_test_table_metadata(file_name: &str) -> TableMetadata {
1572 let path = format!("testdata/table_metadata/{file_name}");
1573 let metadata: String = fs::read_to_string(path).unwrap();
1574
1575 serde_json::from_str(&metadata).unwrap()
1576 }
1577
1578 #[test]
1579 fn test_table_data_v2() {
1580 let data = r#"
1581 {
1582 "format-version" : 2,
1583 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1584 "location": "s3://b/wh/data.db/table",
1585 "last-sequence-number" : 1,
1586 "last-updated-ms": 1515100955770,
1587 "last-column-id": 1,
1588 "schemas": [
1589 {
1590 "schema-id" : 1,
1591 "type" : "struct",
1592 "fields" :[
1593 {
1594 "id": 1,
1595 "name": "struct_name",
1596 "required": true,
1597 "type": "fixed[1]"
1598 },
1599 {
1600 "id": 4,
1601 "name": "ts",
1602 "required": true,
1603 "type": "timestamp"
1604 }
1605 ]
1606 }
1607 ],
1608 "current-schema-id" : 1,
1609 "partition-specs": [
1610 {
1611 "spec-id": 0,
1612 "fields": [
1613 {
1614 "source-id": 4,
1615 "field-id": 1000,
1616 "name": "ts_day",
1617 "transform": "day"
1618 }
1619 ]
1620 }
1621 ],
1622 "default-spec-id": 0,
1623 "last-partition-id": 1000,
1624 "properties": {
1625 "commit.retry.num-retries": "1"
1626 },
1627 "metadata-log": [
1628 {
1629 "metadata-file": "s3://bucket/.../v1.json",
1630 "timestamp-ms": 1515100
1631 }
1632 ],
1633 "refs": {},
1634 "sort-orders": [
1635 {
1636 "order-id": 0,
1637 "fields": []
1638 }
1639 ],
1640 "default-sort-order-id": 0
1641 }
1642 "#;
1643
1644 let schema = Schema::builder()
1645 .with_schema_id(1)
1646 .with_fields(vec![
1647 Arc::new(NestedField::required(
1648 1,
1649 "struct_name",
1650 Type::Primitive(PrimitiveType::Fixed(1)),
1651 )),
1652 Arc::new(NestedField::required(
1653 4,
1654 "ts",
1655 Type::Primitive(PrimitiveType::Timestamp),
1656 )),
1657 ])
1658 .build()
1659 .unwrap();
1660
1661 let partition_spec = PartitionSpec::builder(schema.clone())
1662 .with_spec_id(0)
1663 .add_unbound_field(UnboundPartitionField {
1664 name: "ts_day".to_string(),
1665 transform: Transform::Day,
1666 source_id: 4,
1667 field_id: Some(1000),
1668 })
1669 .unwrap()
1670 .build()
1671 .unwrap();
1672
1673 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1674 let expected = TableMetadata {
1675 format_version: FormatVersion::V2,
1676 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1677 location: "s3://b/wh/data.db/table".to_string(),
1678 last_updated_ms: 1515100955770,
1679 last_column_id: 1,
1680 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1681 current_schema_id: 1,
1682 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1683 default_partition_type,
1684 default_spec: partition_spec.into(),
1685 last_partition_id: 1000,
1686 default_sort_order_id: 0,
1687 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1688 snapshots: HashMap::default(),
1689 current_snapshot_id: None,
1690 last_sequence_number: 1,
1691 properties: HashMap::from_iter(vec![(
1692 "commit.retry.num-retries".to_string(),
1693 "1".to_string(),
1694 )]),
1695 snapshot_log: Vec::new(),
1696 metadata_log: vec![MetadataLog {
1697 metadata_file: "s3://bucket/.../v1.json".to_string(),
1698 timestamp_ms: 1515100,
1699 }],
1700 refs: HashMap::new(),
1701 statistics: HashMap::new(),
1702 partition_statistics: HashMap::new(),
1703 encryption_keys: HashMap::new(),
1704 next_row_id: INITIAL_ROW_ID,
1705 };
1706
1707 let expected_json_value = serde_json::to_value(&expected).unwrap();
1708 check_table_metadata_serde(data, expected);
1709
1710 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1711 assert_eq!(json_value, expected_json_value);
1712 }
1713
1714 #[test]
1715 fn test_table_data_v3() {
1716 let data = r#"
1717 {
1718 "format-version" : 3,
1719 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1720 "location": "s3://b/wh/data.db/table",
1721 "last-sequence-number" : 1,
1722 "last-updated-ms": 1515100955770,
1723 "last-column-id": 1,
1724 "next-row-id": 5,
1725 "schemas": [
1726 {
1727 "schema-id" : 1,
1728 "type" : "struct",
1729 "fields" :[
1730 {
1731 "id": 4,
1732 "name": "ts",
1733 "required": true,
1734 "type": "timestamp"
1735 }
1736 ]
1737 }
1738 ],
1739 "current-schema-id" : 1,
1740 "partition-specs": [
1741 {
1742 "spec-id": 0,
1743 "fields": [
1744 {
1745 "source-id": 4,
1746 "field-id": 1000,
1747 "name": "ts_day",
1748 "transform": "day"
1749 }
1750 ]
1751 }
1752 ],
1753 "default-spec-id": 0,
1754 "last-partition-id": 1000,
1755 "properties": {
1756 "commit.retry.num-retries": "1"
1757 },
1758 "metadata-log": [
1759 {
1760 "metadata-file": "s3://bucket/.../v1.json",
1761 "timestamp-ms": 1515100
1762 }
1763 ],
1764 "refs": {},
1765 "snapshots" : [ {
1766 "snapshot-id" : 1,
1767 "timestamp-ms" : 1662532818843,
1768 "sequence-number" : 0,
1769 "first-row-id" : 0,
1770 "added-rows" : 4,
1771 "key-id" : "key1",
1772 "summary" : {
1773 "operation" : "append"
1774 },
1775 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1776 "schema-id" : 0
1777 }
1778 ],
1779 "encryption-keys": [
1780 {
1781 "key-id": "key1",
1782 "encrypted-by-id": "KMS",
1783 "encrypted-key-metadata": "c29tZS1lbmNyeXB0aW9uLWtleQ==",
1784 "properties": {
1785 "p1": "v1"
1786 }
1787 }
1788 ],
1789 "sort-orders": [
1790 {
1791 "order-id": 0,
1792 "fields": []
1793 }
1794 ],
1795 "default-sort-order-id": 0
1796 }
1797 "#;
1798
1799 let schema = Schema::builder()
1800 .with_schema_id(1)
1801 .with_fields(vec![Arc::new(NestedField::required(
1802 4,
1803 "ts",
1804 Type::Primitive(PrimitiveType::Timestamp),
1805 ))])
1806 .build()
1807 .unwrap();
1808
1809 let partition_spec = PartitionSpec::builder(schema.clone())
1810 .with_spec_id(0)
1811 .add_unbound_field(UnboundPartitionField {
1812 name: "ts_day".to_string(),
1813 transform: Transform::Day,
1814 source_id: 4,
1815 field_id: Some(1000),
1816 })
1817 .unwrap()
1818 .build()
1819 .unwrap();
1820
1821 let snapshot = Snapshot::builder()
1822 .with_snapshot_id(1)
1823 .with_timestamp_ms(1662532818843)
1824 .with_sequence_number(0)
1825 .with_row_range(0, 4)
1826 .with_encryption_key_id(Some("key1".to_string()))
1827 .with_summary(Summary {
1828 operation: Operation::Append,
1829 additional_properties: HashMap::new(),
1830 })
1831 .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
1832 .with_schema_id(0)
1833 .build();
1834
1835 let encryption_key = EncryptedKey::builder()
1836 .key_id("key1".to_string())
1837 .encrypted_by_id("KMS".to_string())
1838 .encrypted_key_metadata(
1839 base64::prelude::BASE64_STANDARD
1840 .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==")
1841 .unwrap(),
1842 )
1843 .properties(HashMap::from_iter(vec![(
1844 "p1".to_string(),
1845 "v1".to_string(),
1846 )]))
1847 .build();
1848
1849 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1850 let expected = TableMetadata {
1851 format_version: FormatVersion::V3,
1852 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1853 location: "s3://b/wh/data.db/table".to_string(),
1854 last_updated_ms: 1515100955770,
1855 last_column_id: 1,
1856 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1857 current_schema_id: 1,
1858 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1859 default_partition_type,
1860 default_spec: partition_spec.into(),
1861 last_partition_id: 1000,
1862 default_sort_order_id: 0,
1863 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1864 snapshots: HashMap::from_iter(vec![(1, snapshot.into())]),
1865 current_snapshot_id: None,
1866 last_sequence_number: 1,
1867 properties: HashMap::from_iter(vec![(
1868 "commit.retry.num-retries".to_string(),
1869 "1".to_string(),
1870 )]),
1871 snapshot_log: Vec::new(),
1872 metadata_log: vec![MetadataLog {
1873 metadata_file: "s3://bucket/.../v1.json".to_string(),
1874 timestamp_ms: 1515100,
1875 }],
1876 refs: HashMap::new(),
1877 statistics: HashMap::new(),
1878 partition_statistics: HashMap::new(),
1879 encryption_keys: HashMap::from_iter(vec![("key1".to_string(), encryption_key)]),
1880 next_row_id: 5,
1881 };
1882
1883 let expected_json_value = serde_json::to_value(&expected).unwrap();
1884 check_table_metadata_serde(data, expected);
1885
1886 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1887 assert_eq!(json_value, expected_json_value);
1888 }
1889
1890 #[test]
1891 fn test_table_data_v1() {
1892 let data = r#"
1893 {
1894 "format-version" : 1,
1895 "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1896 "location" : "/home/iceberg/warehouse/nyc/taxis",
1897 "last-updated-ms" : 1662532818843,
1898 "last-column-id" : 5,
1899 "schema" : {
1900 "type" : "struct",
1901 "schema-id" : 0,
1902 "fields" : [ {
1903 "id" : 1,
1904 "name" : "vendor_id",
1905 "required" : false,
1906 "type" : "long"
1907 }, {
1908 "id" : 2,
1909 "name" : "trip_id",
1910 "required" : false,
1911 "type" : "long"
1912 }, {
1913 "id" : 3,
1914 "name" : "trip_distance",
1915 "required" : false,
1916 "type" : "float"
1917 }, {
1918 "id" : 4,
1919 "name" : "fare_amount",
1920 "required" : false,
1921 "type" : "double"
1922 }, {
1923 "id" : 5,
1924 "name" : "store_and_fwd_flag",
1925 "required" : false,
1926 "type" : "string"
1927 } ]
1928 },
1929 "partition-spec" : [ {
1930 "name" : "vendor_id",
1931 "transform" : "identity",
1932 "source-id" : 1,
1933 "field-id" : 1000
1934 } ],
1935 "last-partition-id" : 1000,
1936 "default-sort-order-id" : 0,
1937 "sort-orders" : [ {
1938 "order-id" : 0,
1939 "fields" : [ ]
1940 } ],
1941 "properties" : {
1942 "owner" : "root"
1943 },
1944 "current-snapshot-id" : 638933773299822130,
1945 "refs" : {
1946 "main" : {
1947 "snapshot-id" : 638933773299822130,
1948 "type" : "branch"
1949 }
1950 },
1951 "snapshots" : [ {
1952 "snapshot-id" : 638933773299822130,
1953 "timestamp-ms" : 1662532818843,
1954 "sequence-number" : 0,
1955 "summary" : {
1956 "operation" : "append",
1957 "spark.app.id" : "local-1662532784305",
1958 "added-data-files" : "4",
1959 "added-records" : "4",
1960 "added-files-size" : "6001"
1961 },
1962 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1963 "schema-id" : 0
1964 } ],
1965 "snapshot-log" : [ {
1966 "timestamp-ms" : 1662532818843,
1967 "snapshot-id" : 638933773299822130
1968 } ],
1969 "metadata-log" : [ {
1970 "timestamp-ms" : 1662532805245,
1971 "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
1972 } ]
1973 }
1974 "#;
1975
1976 let schema = Schema::builder()
1977 .with_fields(vec![
1978 Arc::new(NestedField::optional(
1979 1,
1980 "vendor_id",
1981 Type::Primitive(PrimitiveType::Long),
1982 )),
1983 Arc::new(NestedField::optional(
1984 2,
1985 "trip_id",
1986 Type::Primitive(PrimitiveType::Long),
1987 )),
1988 Arc::new(NestedField::optional(
1989 3,
1990 "trip_distance",
1991 Type::Primitive(PrimitiveType::Float),
1992 )),
1993 Arc::new(NestedField::optional(
1994 4,
1995 "fare_amount",
1996 Type::Primitive(PrimitiveType::Double),
1997 )),
1998 Arc::new(NestedField::optional(
1999 5,
2000 "store_and_fwd_flag",
2001 Type::Primitive(PrimitiveType::String),
2002 )),
2003 ])
2004 .build()
2005 .unwrap();
2006
2007 let schema = Arc::new(schema);
2008 let partition_spec = PartitionSpec::builder(schema.clone())
2009 .with_spec_id(0)
2010 .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
2011 .unwrap()
2012 .build()
2013 .unwrap();
2014
2015 let sort_order = SortOrder::builder()
2016 .with_order_id(0)
2017 .build_unbound()
2018 .unwrap();
2019
2020 let snapshot = Snapshot::builder()
2021 .with_snapshot_id(638933773299822130)
2022 .with_timestamp_ms(1662532818843)
2023 .with_sequence_number(0)
2024 .with_schema_id(0)
2025 .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
2026 .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
2027 .build();
2028
2029 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2030 let expected = TableMetadata {
2031 format_version: FormatVersion::V1,
2032 table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
2033 location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
2034 last_updated_ms: 1662532818843,
2035 last_column_id: 5,
2036 schemas: HashMap::from_iter(vec![(0, schema)]),
2037 current_schema_id: 0,
2038 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2039 default_partition_type,
2040 default_spec: Arc::new(partition_spec),
2041 last_partition_id: 1000,
2042 default_sort_order_id: 0,
2043 sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
2044 snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
2045 current_snapshot_id: Some(638933773299822130),
2046 last_sequence_number: 0,
2047 properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
2048 snapshot_log: vec![SnapshotLog {
2049 snapshot_id: 638933773299822130,
2050 timestamp_ms: 1662532818843,
2051 }],
2052 metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
2053 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
2054 statistics: HashMap::new(),
2055 partition_statistics: HashMap::new(),
2056 encryption_keys: HashMap::new(),
2057 next_row_id: INITIAL_ROW_ID,
2058 };
2059
2060 check_table_metadata_serde(data, expected);
2061 }
2062
2063 #[test]
2064 fn test_table_data_v2_no_snapshots() {
2065 let data = r#"
2066 {
2067 "format-version" : 2,
2068 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2069 "location": "s3://b/wh/data.db/table",
2070 "last-sequence-number" : 1,
2071 "last-updated-ms": 1515100955770,
2072 "last-column-id": 1,
2073 "schemas": [
2074 {
2075 "schema-id" : 1,
2076 "type" : "struct",
2077 "fields" :[
2078 {
2079 "id": 1,
2080 "name": "struct_name",
2081 "required": true,
2082 "type": "fixed[1]"
2083 }
2084 ]
2085 }
2086 ],
2087 "current-schema-id" : 1,
2088 "partition-specs": [
2089 {
2090 "spec-id": 0,
2091 "fields": []
2092 }
2093 ],
2094 "refs": {},
2095 "default-spec-id": 0,
2096 "last-partition-id": 1000,
2097 "metadata-log": [
2098 {
2099 "metadata-file": "s3://bucket/.../v1.json",
2100 "timestamp-ms": 1515100
2101 }
2102 ],
2103 "sort-orders": [
2104 {
2105 "order-id": 0,
2106 "fields": []
2107 }
2108 ],
2109 "default-sort-order-id": 0
2110 }
2111 "#;
2112
2113 let schema = Schema::builder()
2114 .with_schema_id(1)
2115 .with_fields(vec![Arc::new(NestedField::required(
2116 1,
2117 "struct_name",
2118 Type::Primitive(PrimitiveType::Fixed(1)),
2119 ))])
2120 .build()
2121 .unwrap();
2122
2123 let partition_spec = PartitionSpec::builder(schema.clone())
2124 .with_spec_id(0)
2125 .build()
2126 .unwrap();
2127
2128 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2129 let expected = TableMetadata {
2130 format_version: FormatVersion::V2,
2131 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
2132 location: "s3://b/wh/data.db/table".to_string(),
2133 last_updated_ms: 1515100955770,
2134 last_column_id: 1,
2135 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
2136 current_schema_id: 1,
2137 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2138 default_partition_type,
2139 default_spec: partition_spec.into(),
2140 last_partition_id: 1000,
2141 default_sort_order_id: 0,
2142 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2143 snapshots: HashMap::default(),
2144 current_snapshot_id: None,
2145 last_sequence_number: 1,
2146 properties: HashMap::new(),
2147 snapshot_log: Vec::new(),
2148 metadata_log: vec![MetadataLog {
2149 metadata_file: "s3://bucket/.../v1.json".to_string(),
2150 timestamp_ms: 1515100,
2151 }],
2152 refs: HashMap::new(),
2153 statistics: HashMap::new(),
2154 partition_statistics: HashMap::new(),
2155 encryption_keys: HashMap::new(),
2156 next_row_id: INITIAL_ROW_ID,
2157 };
2158
2159 let expected_json_value = serde_json::to_value(&expected).unwrap();
2160 check_table_metadata_serde(data, expected);
2161
2162 let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
2163 assert_eq!(json_value, expected_json_value);
2164 }
2165
2166 #[test]
2167 fn test_current_snapshot_id_must_match_main_branch() {
2168 let data = r#"
2169 {
2170 "format-version" : 2,
2171 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2172 "location": "s3://b/wh/data.db/table",
2173 "last-sequence-number" : 1,
2174 "last-updated-ms": 1515100955770,
2175 "last-column-id": 1,
2176 "schemas": [
2177 {
2178 "schema-id" : 1,
2179 "type" : "struct",
2180 "fields" :[
2181 {
2182 "id": 1,
2183 "name": "struct_name",
2184 "required": true,
2185 "type": "fixed[1]"
2186 },
2187 {
2188 "id": 4,
2189 "name": "ts",
2190 "required": true,
2191 "type": "timestamp"
2192 }
2193 ]
2194 }
2195 ],
2196 "current-schema-id" : 1,
2197 "partition-specs": [
2198 {
2199 "spec-id": 0,
2200 "fields": [
2201 {
2202 "source-id": 4,
2203 "field-id": 1000,
2204 "name": "ts_day",
2205 "transform": "day"
2206 }
2207 ]
2208 }
2209 ],
2210 "default-spec-id": 0,
2211 "last-partition-id": 1000,
2212 "properties": {
2213 "commit.retry.num-retries": "1"
2214 },
2215 "metadata-log": [
2216 {
2217 "metadata-file": "s3://bucket/.../v1.json",
2218 "timestamp-ms": 1515100
2219 }
2220 ],
2221 "sort-orders": [
2222 {
2223 "order-id": 0,
2224 "fields": []
2225 }
2226 ],
2227 "default-sort-order-id": 0,
2228 "current-snapshot-id" : 1,
2229 "refs" : {
2230 "main" : {
2231 "snapshot-id" : 2,
2232 "type" : "branch"
2233 }
2234 },
2235 "snapshots" : [ {
2236 "snapshot-id" : 1,
2237 "timestamp-ms" : 1662532818843,
2238 "sequence-number" : 0,
2239 "summary" : {
2240 "operation" : "append",
2241 "spark.app.id" : "local-1662532784305",
2242 "added-data-files" : "4",
2243 "added-records" : "4",
2244 "added-files-size" : "6001"
2245 },
2246 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2247 "schema-id" : 0
2248 },
2249 {
2250 "snapshot-id" : 2,
2251 "timestamp-ms" : 1662532818844,
2252 "sequence-number" : 0,
2253 "summary" : {
2254 "operation" : "append",
2255 "spark.app.id" : "local-1662532784305",
2256 "added-data-files" : "4",
2257 "added-records" : "4",
2258 "added-files-size" : "6001"
2259 },
2260 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2261 "schema-id" : 0
2262 } ]
2263 }
2264 "#;
2265
2266 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2267 assert!(
2268 err.to_string()
2269 .contains("Current snapshot id does not match main branch")
2270 );
2271 }
2272
2273 #[test]
2274 fn test_main_without_current() {
2275 let data = r#"
2276 {
2277 "format-version" : 2,
2278 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2279 "location": "s3://b/wh/data.db/table",
2280 "last-sequence-number" : 1,
2281 "last-updated-ms": 1515100955770,
2282 "last-column-id": 1,
2283 "schemas": [
2284 {
2285 "schema-id" : 1,
2286 "type" : "struct",
2287 "fields" :[
2288 {
2289 "id": 1,
2290 "name": "struct_name",
2291 "required": true,
2292 "type": "fixed[1]"
2293 },
2294 {
2295 "id": 4,
2296 "name": "ts",
2297 "required": true,
2298 "type": "timestamp"
2299 }
2300 ]
2301 }
2302 ],
2303 "current-schema-id" : 1,
2304 "partition-specs": [
2305 {
2306 "spec-id": 0,
2307 "fields": [
2308 {
2309 "source-id": 4,
2310 "field-id": 1000,
2311 "name": "ts_day",
2312 "transform": "day"
2313 }
2314 ]
2315 }
2316 ],
2317 "default-spec-id": 0,
2318 "last-partition-id": 1000,
2319 "properties": {
2320 "commit.retry.num-retries": "1"
2321 },
2322 "metadata-log": [
2323 {
2324 "metadata-file": "s3://bucket/.../v1.json",
2325 "timestamp-ms": 1515100
2326 }
2327 ],
2328 "sort-orders": [
2329 {
2330 "order-id": 0,
2331 "fields": []
2332 }
2333 ],
2334 "default-sort-order-id": 0,
2335 "refs" : {
2336 "main" : {
2337 "snapshot-id" : 1,
2338 "type" : "branch"
2339 }
2340 },
2341 "snapshots" : [ {
2342 "snapshot-id" : 1,
2343 "timestamp-ms" : 1662532818843,
2344 "sequence-number" : 0,
2345 "summary" : {
2346 "operation" : "append",
2347 "spark.app.id" : "local-1662532784305",
2348 "added-data-files" : "4",
2349 "added-records" : "4",
2350 "added-files-size" : "6001"
2351 },
2352 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2353 "schema-id" : 0
2354 } ]
2355 }
2356 "#;
2357
2358 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2359 assert!(
2360 err.to_string()
2361 .contains("Current snapshot is not set, but main branch exists")
2362 );
2363 }
2364
2365 #[test]
2366 fn test_branch_snapshot_missing() {
2367 let data = r#"
2368 {
2369 "format-version" : 2,
2370 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2371 "location": "s3://b/wh/data.db/table",
2372 "last-sequence-number" : 1,
2373 "last-updated-ms": 1515100955770,
2374 "last-column-id": 1,
2375 "schemas": [
2376 {
2377 "schema-id" : 1,
2378 "type" : "struct",
2379 "fields" :[
2380 {
2381 "id": 1,
2382 "name": "struct_name",
2383 "required": true,
2384 "type": "fixed[1]"
2385 },
2386 {
2387 "id": 4,
2388 "name": "ts",
2389 "required": true,
2390 "type": "timestamp"
2391 }
2392 ]
2393 }
2394 ],
2395 "current-schema-id" : 1,
2396 "partition-specs": [
2397 {
2398 "spec-id": 0,
2399 "fields": [
2400 {
2401 "source-id": 4,
2402 "field-id": 1000,
2403 "name": "ts_day",
2404 "transform": "day"
2405 }
2406 ]
2407 }
2408 ],
2409 "default-spec-id": 0,
2410 "last-partition-id": 1000,
2411 "properties": {
2412 "commit.retry.num-retries": "1"
2413 },
2414 "metadata-log": [
2415 {
2416 "metadata-file": "s3://bucket/.../v1.json",
2417 "timestamp-ms": 1515100
2418 }
2419 ],
2420 "sort-orders": [
2421 {
2422 "order-id": 0,
2423 "fields": []
2424 }
2425 ],
2426 "default-sort-order-id": 0,
2427 "refs" : {
2428 "main" : {
2429 "snapshot-id" : 1,
2430 "type" : "branch"
2431 },
2432 "foo" : {
2433 "snapshot-id" : 2,
2434 "type" : "branch"
2435 }
2436 },
2437 "snapshots" : [ {
2438 "snapshot-id" : 1,
2439 "timestamp-ms" : 1662532818843,
2440 "sequence-number" : 0,
2441 "summary" : {
2442 "operation" : "append",
2443 "spark.app.id" : "local-1662532784305",
2444 "added-data-files" : "4",
2445 "added-records" : "4",
2446 "added-files-size" : "6001"
2447 },
2448 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2449 "schema-id" : 0
2450 } ]
2451 }
2452 "#;
2453
2454 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2455 assert!(
2456 err.to_string().contains(
2457 "Snapshot for reference foo does not exist in the existing snapshots list"
2458 )
2459 );
2460 }
2461
2462 #[test]
2463 fn test_v2_wrong_max_snapshot_sequence_number() {
2464 let data = r#"
2465 {
2466 "format-version": 2,
2467 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2468 "location": "s3://bucket/test/location",
2469 "last-sequence-number": 1,
2470 "last-updated-ms": 1602638573590,
2471 "last-column-id": 3,
2472 "current-schema-id": 0,
2473 "schemas": [
2474 {
2475 "type": "struct",
2476 "schema-id": 0,
2477 "fields": [
2478 {
2479 "id": 1,
2480 "name": "x",
2481 "required": true,
2482 "type": "long"
2483 }
2484 ]
2485 }
2486 ],
2487 "default-spec-id": 0,
2488 "partition-specs": [
2489 {
2490 "spec-id": 0,
2491 "fields": []
2492 }
2493 ],
2494 "last-partition-id": 1000,
2495 "default-sort-order-id": 0,
2496 "sort-orders": [
2497 {
2498 "order-id": 0,
2499 "fields": []
2500 }
2501 ],
2502 "properties": {},
2503 "current-snapshot-id": 3055729675574597004,
2504 "snapshots": [
2505 {
2506 "snapshot-id": 3055729675574597004,
2507 "timestamp-ms": 1555100955770,
2508 "sequence-number": 4,
2509 "summary": {
2510 "operation": "append"
2511 },
2512 "manifest-list": "s3://a/b/2.avro",
2513 "schema-id": 0
2514 }
2515 ],
2516 "statistics": [],
2517 "snapshot-log": [],
2518 "metadata-log": []
2519 }
2520 "#;
2521
2522 let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2523 assert!(err.to_string().contains(
2524 "Invalid snapshot with id 3055729675574597004 and sequence number 4 greater than last sequence number 1"
2525 ));
2526
2527 let data = data.replace(
2529 r#""last-sequence-number": 1,"#,
2530 r#""last-sequence-number": 4,"#,
2531 );
2532 let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2533 assert_eq!(metadata.last_sequence_number, 4);
2534
2535 let data = data.replace(
2537 r#""last-sequence-number": 4,"#,
2538 r#""last-sequence-number": 5,"#,
2539 );
2540 let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2541 assert_eq!(metadata.last_sequence_number, 5);
2542 }
2543
2544 #[test]
2545 fn test_statistic_files() {
2546 let data = r#"
2547 {
2548 "format-version": 2,
2549 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2550 "location": "s3://bucket/test/location",
2551 "last-sequence-number": 34,
2552 "last-updated-ms": 1602638573590,
2553 "last-column-id": 3,
2554 "current-schema-id": 0,
2555 "schemas": [
2556 {
2557 "type": "struct",
2558 "schema-id": 0,
2559 "fields": [
2560 {
2561 "id": 1,
2562 "name": "x",
2563 "required": true,
2564 "type": "long"
2565 }
2566 ]
2567 }
2568 ],
2569 "default-spec-id": 0,
2570 "partition-specs": [
2571 {
2572 "spec-id": 0,
2573 "fields": []
2574 }
2575 ],
2576 "last-partition-id": 1000,
2577 "default-sort-order-id": 0,
2578 "sort-orders": [
2579 {
2580 "order-id": 0,
2581 "fields": []
2582 }
2583 ],
2584 "properties": {},
2585 "current-snapshot-id": 3055729675574597004,
2586 "snapshots": [
2587 {
2588 "snapshot-id": 3055729675574597004,
2589 "timestamp-ms": 1555100955770,
2590 "sequence-number": 1,
2591 "summary": {
2592 "operation": "append"
2593 },
2594 "manifest-list": "s3://a/b/2.avro",
2595 "schema-id": 0
2596 }
2597 ],
2598 "statistics": [
2599 {
2600 "snapshot-id": 3055729675574597004,
2601 "statistics-path": "s3://a/b/stats.puffin",
2602 "file-size-in-bytes": 413,
2603 "file-footer-size-in-bytes": 42,
2604 "blob-metadata": [
2605 {
2606 "type": "ndv",
2607 "snapshot-id": 3055729675574597004,
2608 "sequence-number": 1,
2609 "fields": [
2610 1
2611 ]
2612 }
2613 ]
2614 }
2615 ],
2616 "snapshot-log": [],
2617 "metadata-log": []
2618 }
2619 "#;
2620
2621 let schema = Schema::builder()
2622 .with_schema_id(0)
2623 .with_fields(vec![Arc::new(NestedField::required(
2624 1,
2625 "x",
2626 Type::Primitive(PrimitiveType::Long),
2627 ))])
2628 .build()
2629 .unwrap();
2630 let partition_spec = PartitionSpec::builder(schema.clone())
2631 .with_spec_id(0)
2632 .build()
2633 .unwrap();
2634 let snapshot = Snapshot::builder()
2635 .with_snapshot_id(3055729675574597004)
2636 .with_timestamp_ms(1555100955770)
2637 .with_sequence_number(1)
2638 .with_manifest_list("s3://a/b/2.avro")
2639 .with_schema_id(0)
2640 .with_summary(Summary {
2641 operation: Operation::Append,
2642 additional_properties: HashMap::new(),
2643 })
2644 .build();
2645
2646 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2647 let expected = TableMetadata {
2648 format_version: FormatVersion::V2,
2649 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2650 location: "s3://bucket/test/location".to_string(),
2651 last_updated_ms: 1602638573590,
2652 last_column_id: 3,
2653 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2654 current_schema_id: 0,
2655 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2656 default_partition_type,
2657 default_spec: Arc::new(partition_spec),
2658 last_partition_id: 1000,
2659 default_sort_order_id: 0,
2660 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2661 snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2662 current_snapshot_id: Some(3055729675574597004),
2663 last_sequence_number: 34,
2664 properties: HashMap::new(),
2665 snapshot_log: Vec::new(),
2666 metadata_log: Vec::new(),
2667 statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile {
2668 snapshot_id: 3055729675574597004,
2669 statistics_path: "s3://a/b/stats.puffin".to_string(),
2670 file_size_in_bytes: 413,
2671 file_footer_size_in_bytes: 42,
2672 key_metadata: None,
2673 blob_metadata: vec![BlobMetadata {
2674 snapshot_id: 3055729675574597004,
2675 sequence_number: 1,
2676 fields: vec![1],
2677 r#type: "ndv".to_string(),
2678 properties: HashMap::new(),
2679 }],
2680 })]),
2681 partition_statistics: HashMap::new(),
2682 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2683 snapshot_id: 3055729675574597004,
2684 retention: SnapshotRetention::Branch {
2685 min_snapshots_to_keep: None,
2686 max_snapshot_age_ms: None,
2687 max_ref_age_ms: None,
2688 },
2689 })]),
2690 encryption_keys: HashMap::new(),
2691 next_row_id: INITIAL_ROW_ID,
2692 };
2693
2694 check_table_metadata_serde(data, expected);
2695 }
2696
2697 #[test]
2698 fn test_partition_statistics_file() {
2699 let data = r#"
2700 {
2701 "format-version": 2,
2702 "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2703 "location": "s3://bucket/test/location",
2704 "last-sequence-number": 34,
2705 "last-updated-ms": 1602638573590,
2706 "last-column-id": 3,
2707 "current-schema-id": 0,
2708 "schemas": [
2709 {
2710 "type": "struct",
2711 "schema-id": 0,
2712 "fields": [
2713 {
2714 "id": 1,
2715 "name": "x",
2716 "required": true,
2717 "type": "long"
2718 }
2719 ]
2720 }
2721 ],
2722 "default-spec-id": 0,
2723 "partition-specs": [
2724 {
2725 "spec-id": 0,
2726 "fields": []
2727 }
2728 ],
2729 "last-partition-id": 1000,
2730 "default-sort-order-id": 0,
2731 "sort-orders": [
2732 {
2733 "order-id": 0,
2734 "fields": []
2735 }
2736 ],
2737 "properties": {},
2738 "current-snapshot-id": 3055729675574597004,
2739 "snapshots": [
2740 {
2741 "snapshot-id": 3055729675574597004,
2742 "timestamp-ms": 1555100955770,
2743 "sequence-number": 1,
2744 "summary": {
2745 "operation": "append"
2746 },
2747 "manifest-list": "s3://a/b/2.avro",
2748 "schema-id": 0
2749 }
2750 ],
2751 "partition-statistics": [
2752 {
2753 "snapshot-id": 3055729675574597004,
2754 "statistics-path": "s3://a/b/partition-stats.parquet",
2755 "file-size-in-bytes": 43
2756 }
2757 ],
2758 "snapshot-log": [],
2759 "metadata-log": []
2760 }
2761 "#;
2762
2763 let schema = Schema::builder()
2764 .with_schema_id(0)
2765 .with_fields(vec![Arc::new(NestedField::required(
2766 1,
2767 "x",
2768 Type::Primitive(PrimitiveType::Long),
2769 ))])
2770 .build()
2771 .unwrap();
2772 let partition_spec = PartitionSpec::builder(schema.clone())
2773 .with_spec_id(0)
2774 .build()
2775 .unwrap();
2776 let snapshot = Snapshot::builder()
2777 .with_snapshot_id(3055729675574597004)
2778 .with_timestamp_ms(1555100955770)
2779 .with_sequence_number(1)
2780 .with_manifest_list("s3://a/b/2.avro")
2781 .with_schema_id(0)
2782 .with_summary(Summary {
2783 operation: Operation::Append,
2784 additional_properties: HashMap::new(),
2785 })
2786 .build();
2787
2788 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2789 let expected = TableMetadata {
2790 format_version: FormatVersion::V2,
2791 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2792 location: "s3://bucket/test/location".to_string(),
2793 last_updated_ms: 1602638573590,
2794 last_column_id: 3,
2795 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2796 current_schema_id: 0,
2797 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2798 default_spec: Arc::new(partition_spec),
2799 default_partition_type,
2800 last_partition_id: 1000,
2801 default_sort_order_id: 0,
2802 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2803 snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2804 current_snapshot_id: Some(3055729675574597004),
2805 last_sequence_number: 34,
2806 properties: HashMap::new(),
2807 snapshot_log: Vec::new(),
2808 metadata_log: Vec::new(),
2809 statistics: HashMap::new(),
2810 partition_statistics: HashMap::from_iter(vec![(
2811 3055729675574597004,
2812 PartitionStatisticsFile {
2813 snapshot_id: 3055729675574597004,
2814 statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2815 file_size_in_bytes: 43,
2816 },
2817 )]),
2818 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2819 snapshot_id: 3055729675574597004,
2820 retention: SnapshotRetention::Branch {
2821 min_snapshots_to_keep: None,
2822 max_snapshot_age_ms: None,
2823 max_ref_age_ms: None,
2824 },
2825 })]),
2826 encryption_keys: HashMap::new(),
2827 next_row_id: INITIAL_ROW_ID,
2828 };
2829
2830 check_table_metadata_serde(data, expected);
2831 }
2832
2833 #[test]
2834 fn test_invalid_table_uuid() -> Result<()> {
2835 let data = r#"
2836 {
2837 "format-version" : 2,
2838 "table-uuid": "xxxx"
2839 }
2840 "#;
2841 assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2842 Ok(())
2843 }
2844
2845 #[test]
2846 fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
2847 let data = r#"
2848 {
2849 "format-version" : 1
2850 }
2851 "#;
2852 assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2853 Ok(())
2854 }
2855
2856 #[test]
2857 fn test_table_metadata_v3_valid_minimal() {
2858 let metadata_str =
2859 fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap();
2860
2861 let table_metadata = serde_json::from_str::<TableMetadata>(&metadata_str).unwrap();
2862 assert_eq!(table_metadata.format_version, FormatVersion::V3);
2863
2864 let schema = Schema::builder()
2865 .with_schema_id(0)
2866 .with_fields(vec![
2867 Arc::new(
2868 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long))
2869 .with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1)))
2870 .with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))),
2871 ),
2872 Arc::new(
2873 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2874 .with_doc("comment"),
2875 ),
2876 Arc::new(NestedField::required(
2877 3,
2878 "z",
2879 Type::Primitive(PrimitiveType::Long),
2880 )),
2881 ])
2882 .build()
2883 .unwrap();
2884
2885 let partition_spec = PartitionSpec::builder(schema.clone())
2886 .with_spec_id(0)
2887 .add_unbound_field(UnboundPartitionField {
2888 name: "x".to_string(),
2889 transform: Transform::Identity,
2890 source_id: 1,
2891 field_id: Some(1000),
2892 })
2893 .unwrap()
2894 .build()
2895 .unwrap();
2896
2897 let sort_order = SortOrder::builder()
2898 .with_order_id(3)
2899 .with_sort_field(SortField {
2900 source_id: 2,
2901 transform: Transform::Identity,
2902 direction: SortDirection::Ascending,
2903 null_order: NullOrder::First,
2904 })
2905 .with_sort_field(SortField {
2906 source_id: 3,
2907 transform: Transform::Bucket(4),
2908 direction: SortDirection::Descending,
2909 null_order: NullOrder::Last,
2910 })
2911 .build_unbound()
2912 .unwrap();
2913
2914 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2915 let expected = TableMetadata {
2916 format_version: FormatVersion::V3,
2917 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2918 location: "s3://bucket/test/location".to_string(),
2919 last_updated_ms: 1602638573590,
2920 last_column_id: 3,
2921 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2922 current_schema_id: 0,
2923 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2924 default_spec: Arc::new(partition_spec),
2925 default_partition_type,
2926 last_partition_id: 1000,
2927 default_sort_order_id: 3,
2928 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2929 snapshots: HashMap::default(),
2930 current_snapshot_id: None,
2931 last_sequence_number: 34,
2932 properties: HashMap::new(),
2933 snapshot_log: Vec::new(),
2934 metadata_log: Vec::new(),
2935 refs: HashMap::new(),
2936 statistics: HashMap::new(),
2937 partition_statistics: HashMap::new(),
2938 encryption_keys: HashMap::new(),
2939 next_row_id: 0, };
2941
2942 check_table_metadata_serde(&metadata_str, expected);
2943 }
2944
2945 #[test]
2946 fn test_table_metadata_v2_file_valid() {
2947 let metadata =
2948 fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
2949
2950 let schema1 = Schema::builder()
2951 .with_schema_id(0)
2952 .with_fields(vec![Arc::new(NestedField::required(
2953 1,
2954 "x",
2955 Type::Primitive(PrimitiveType::Long),
2956 ))])
2957 .build()
2958 .unwrap();
2959
2960 let schema2 = Schema::builder()
2961 .with_schema_id(1)
2962 .with_fields(vec![
2963 Arc::new(NestedField::required(
2964 1,
2965 "x",
2966 Type::Primitive(PrimitiveType::Long),
2967 )),
2968 Arc::new(
2969 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2970 .with_doc("comment"),
2971 ),
2972 Arc::new(NestedField::required(
2973 3,
2974 "z",
2975 Type::Primitive(PrimitiveType::Long),
2976 )),
2977 ])
2978 .with_identifier_field_ids(vec![1, 2])
2979 .build()
2980 .unwrap();
2981
2982 let partition_spec = PartitionSpec::builder(schema2.clone())
2983 .with_spec_id(0)
2984 .add_unbound_field(UnboundPartitionField {
2985 name: "x".to_string(),
2986 transform: Transform::Identity,
2987 source_id: 1,
2988 field_id: Some(1000),
2989 })
2990 .unwrap()
2991 .build()
2992 .unwrap();
2993
2994 let sort_order = SortOrder::builder()
2995 .with_order_id(3)
2996 .with_sort_field(SortField {
2997 source_id: 2,
2998 transform: Transform::Identity,
2999 direction: SortDirection::Ascending,
3000 null_order: NullOrder::First,
3001 })
3002 .with_sort_field(SortField {
3003 source_id: 3,
3004 transform: Transform::Bucket(4),
3005 direction: SortDirection::Descending,
3006 null_order: NullOrder::Last,
3007 })
3008 .build_unbound()
3009 .unwrap();
3010
3011 let snapshot1 = Snapshot::builder()
3012 .with_snapshot_id(3051729675574597004)
3013 .with_timestamp_ms(1515100955770)
3014 .with_sequence_number(0)
3015 .with_manifest_list("s3://a/b/1.avro")
3016 .with_summary(Summary {
3017 operation: Operation::Append,
3018 additional_properties: HashMap::new(),
3019 })
3020 .build();
3021
3022 let snapshot2 = Snapshot::builder()
3023 .with_snapshot_id(3055729675574597004)
3024 .with_parent_snapshot_id(Some(3051729675574597004))
3025 .with_timestamp_ms(1555100955770)
3026 .with_sequence_number(1)
3027 .with_schema_id(1)
3028 .with_manifest_list("s3://a/b/2.avro")
3029 .with_summary(Summary {
3030 operation: Operation::Append,
3031 additional_properties: HashMap::new(),
3032 })
3033 .build();
3034
3035 let default_partition_type = partition_spec.partition_type(&schema2).unwrap();
3036 let expected = TableMetadata {
3037 format_version: FormatVersion::V2,
3038 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3039 location: "s3://bucket/test/location".to_string(),
3040 last_updated_ms: 1602638573590,
3041 last_column_id: 3,
3042 schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]),
3043 current_schema_id: 1,
3044 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3045 default_spec: Arc::new(partition_spec),
3046 default_partition_type,
3047 last_partition_id: 1000,
3048 default_sort_order_id: 3,
3049 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3050 snapshots: HashMap::from_iter(vec![
3051 (3051729675574597004, Arc::new(snapshot1)),
3052 (3055729675574597004, Arc::new(snapshot2)),
3053 ]),
3054 current_snapshot_id: Some(3055729675574597004),
3055 last_sequence_number: 34,
3056 properties: HashMap::new(),
3057 snapshot_log: vec![
3058 SnapshotLog {
3059 snapshot_id: 3051729675574597004,
3060 timestamp_ms: 1515100955770,
3061 },
3062 SnapshotLog {
3063 snapshot_id: 3055729675574597004,
3064 timestamp_ms: 1555100955770,
3065 },
3066 ],
3067 metadata_log: Vec::new(),
3068 refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
3069 snapshot_id: 3055729675574597004,
3070 retention: SnapshotRetention::Branch {
3071 min_snapshots_to_keep: None,
3072 max_snapshot_age_ms: None,
3073 max_ref_age_ms: None,
3074 },
3075 })]),
3076 statistics: HashMap::new(),
3077 partition_statistics: HashMap::new(),
3078 encryption_keys: HashMap::new(),
3079 next_row_id: INITIAL_ROW_ID,
3080 };
3081
3082 check_table_metadata_serde(&metadata, expected);
3083 }
3084
3085 #[test]
3086 fn test_table_metadata_v2_file_valid_minimal() {
3087 let metadata =
3088 fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
3089
3090 let schema = Schema::builder()
3091 .with_schema_id(0)
3092 .with_fields(vec![
3093 Arc::new(NestedField::required(
3094 1,
3095 "x",
3096 Type::Primitive(PrimitiveType::Long),
3097 )),
3098 Arc::new(
3099 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3100 .with_doc("comment"),
3101 ),
3102 Arc::new(NestedField::required(
3103 3,
3104 "z",
3105 Type::Primitive(PrimitiveType::Long),
3106 )),
3107 ])
3108 .build()
3109 .unwrap();
3110
3111 let partition_spec = PartitionSpec::builder(schema.clone())
3112 .with_spec_id(0)
3113 .add_unbound_field(UnboundPartitionField {
3114 name: "x".to_string(),
3115 transform: Transform::Identity,
3116 source_id: 1,
3117 field_id: Some(1000),
3118 })
3119 .unwrap()
3120 .build()
3121 .unwrap();
3122
3123 let sort_order = SortOrder::builder()
3124 .with_order_id(3)
3125 .with_sort_field(SortField {
3126 source_id: 2,
3127 transform: Transform::Identity,
3128 direction: SortDirection::Ascending,
3129 null_order: NullOrder::First,
3130 })
3131 .with_sort_field(SortField {
3132 source_id: 3,
3133 transform: Transform::Bucket(4),
3134 direction: SortDirection::Descending,
3135 null_order: NullOrder::Last,
3136 })
3137 .build_unbound()
3138 .unwrap();
3139
3140 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3141 let expected = TableMetadata {
3142 format_version: FormatVersion::V2,
3143 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3144 location: "s3://bucket/test/location".to_string(),
3145 last_updated_ms: 1602638573590,
3146 last_column_id: 3,
3147 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3148 current_schema_id: 0,
3149 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3150 default_partition_type,
3151 default_spec: Arc::new(partition_spec),
3152 last_partition_id: 1000,
3153 default_sort_order_id: 3,
3154 sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3155 snapshots: HashMap::default(),
3156 current_snapshot_id: None,
3157 last_sequence_number: 34,
3158 properties: HashMap::new(),
3159 snapshot_log: vec![],
3160 metadata_log: Vec::new(),
3161 refs: HashMap::new(),
3162 statistics: HashMap::new(),
3163 partition_statistics: HashMap::new(),
3164 encryption_keys: HashMap::new(),
3165 next_row_id: INITIAL_ROW_ID,
3166 };
3167
3168 check_table_metadata_serde(&metadata, expected);
3169 }
3170
3171 #[test]
3172 fn test_table_metadata_v1_file_valid() {
3173 let metadata =
3174 fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
3175
3176 let schema = Schema::builder()
3177 .with_schema_id(0)
3178 .with_fields(vec![
3179 Arc::new(NestedField::required(
3180 1,
3181 "x",
3182 Type::Primitive(PrimitiveType::Long),
3183 )),
3184 Arc::new(
3185 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3186 .with_doc("comment"),
3187 ),
3188 Arc::new(NestedField::required(
3189 3,
3190 "z",
3191 Type::Primitive(PrimitiveType::Long),
3192 )),
3193 ])
3194 .build()
3195 .unwrap();
3196
3197 let partition_spec = PartitionSpec::builder(schema.clone())
3198 .with_spec_id(0)
3199 .add_unbound_field(UnboundPartitionField {
3200 name: "x".to_string(),
3201 transform: Transform::Identity,
3202 source_id: 1,
3203 field_id: Some(1000),
3204 })
3205 .unwrap()
3206 .build()
3207 .unwrap();
3208
3209 let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3210 let expected = TableMetadata {
3211 format_version: FormatVersion::V1,
3212 table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
3213 location: "s3://bucket/test/location".to_string(),
3214 last_updated_ms: 1602638573874,
3215 last_column_id: 3,
3216 schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3217 current_schema_id: 0,
3218 partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3219 default_spec: Arc::new(partition_spec),
3220 default_partition_type,
3221 last_partition_id: 0,
3222 default_sort_order_id: 0,
3223 sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
3225 snapshots: HashMap::new(),
3226 current_snapshot_id: None,
3227 last_sequence_number: 0,
3228 properties: HashMap::new(),
3229 snapshot_log: vec![],
3230 metadata_log: Vec::new(),
3231 refs: HashMap::new(),
3232 statistics: HashMap::new(),
3233 partition_statistics: HashMap::new(),
3234 encryption_keys: HashMap::new(),
3235 next_row_id: INITIAL_ROW_ID,
3236 };
3237
3238 check_table_metadata_serde(&metadata, expected);
3239 }
3240
3241 #[test]
3242 fn test_table_metadata_v1_compat() {
3243 let metadata =
3244 fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
3245
3246 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3248 .expect("Failed to deserialize TableMetadataV1Compat.json");
3249
3250 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3252 assert_eq!(
3253 desered_type.uuid(),
3254 Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
3255 );
3256 assert_eq!(
3257 desered_type.location(),
3258 "s3://bucket/warehouse/iceberg/glue.db/table_name"
3259 );
3260 assert_eq!(desered_type.last_updated_ms(), 1727773114005);
3261 assert_eq!(desered_type.current_schema_id(), 0);
3262 }
3263
3264 #[test]
3265 fn test_table_metadata_v1_schemas_without_current_id() {
3266 let metadata = fs::read_to_string(
3267 "testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
3268 )
3269 .unwrap();
3270
3271 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3273 .expect("Failed to deserialize TableMetadataV1SchemasWithoutCurrentId.json");
3274
3275 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3277 assert_eq!(
3278 desered_type.uuid(),
3279 Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3280 );
3281
3282 let schema = desered_type.current_schema();
3284 assert_eq!(schema.as_struct().fields().len(), 3);
3285 assert_eq!(schema.as_struct().fields()[0].name, "x");
3286 assert_eq!(schema.as_struct().fields()[1].name, "y");
3287 assert_eq!(schema.as_struct().fields()[2].name, "z");
3288 }
3289
3290 #[test]
3291 fn test_table_metadata_v1_no_valid_schema() {
3292 let metadata =
3293 fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
3294 .unwrap();
3295
3296 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3298
3299 assert!(desered.is_err());
3300 let error_message = desered.unwrap_err().to_string();
3301 assert!(
3302 error_message.contains("No valid schema configuration found"),
3303 "Expected error about no valid schema configuration, got: {error_message}"
3304 );
3305 }
3306
3307 #[test]
3308 fn test_table_metadata_v1_partition_specs_without_default_id() {
3309 let metadata = fs::read_to_string(
3310 "testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
3311 )
3312 .unwrap();
3313
3314 let desered_type: TableMetadata = serde_json::from_str(&metadata)
3316 .expect("Failed to deserialize TableMetadataV1PartitionSpecsWithoutDefaultId.json");
3317
3318 assert_eq!(desered_type.format_version(), FormatVersion::V1);
3320 assert_eq!(
3321 desered_type.uuid(),
3322 Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3323 );
3324
3325 assert_eq!(desered_type.default_partition_spec_id(), 2); assert_eq!(desered_type.partition_specs.len(), 2);
3328
3329 let default_spec = &desered_type.default_spec;
3331 assert_eq!(default_spec.spec_id(), 2);
3332 assert_eq!(default_spec.fields().len(), 1);
3333 assert_eq!(default_spec.fields()[0].name, "y");
3334 assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
3335 assert_eq!(default_spec.fields()[0].source_id, 2);
3336 }
3337
3338 #[test]
3339 fn test_table_metadata_v2_schema_not_found() {
3340 let metadata =
3341 fs::read_to_string("testdata/table_metadata/TableMetadataV2CurrentSchemaNotFound.json")
3342 .unwrap();
3343
3344 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3345
3346 assert_eq!(
3347 desered.unwrap_err().to_string(),
3348 "DataInvalid => No schema exists with the current schema id 2."
3349 )
3350 }
3351
3352 #[test]
3353 fn test_table_metadata_v2_missing_sort_order() {
3354 let metadata =
3355 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
3356 .unwrap();
3357
3358 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3359
3360 assert_eq!(
3361 desered.unwrap_err().to_string(),
3362 "data did not match any variant of untagged enum TableMetadataEnum"
3363 )
3364 }
3365
3366 #[test]
3367 fn test_table_metadata_v2_missing_partition_specs() {
3368 let metadata =
3369 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingPartitionSpecs.json")
3370 .unwrap();
3371
3372 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3373
3374 assert_eq!(
3375 desered.unwrap_err().to_string(),
3376 "data did not match any variant of untagged enum TableMetadataEnum"
3377 )
3378 }
3379
3380 #[test]
3381 fn test_table_metadata_v2_missing_last_partition_id() {
3382 let metadata = fs::read_to_string(
3383 "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json",
3384 )
3385 .unwrap();
3386
3387 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3388
3389 assert_eq!(
3390 desered.unwrap_err().to_string(),
3391 "data did not match any variant of untagged enum TableMetadataEnum"
3392 )
3393 }
3394
3395 #[test]
3396 fn test_table_metadata_v2_missing_schemas() {
3397 let metadata =
3398 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSchemas.json")
3399 .unwrap();
3400
3401 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3402
3403 assert_eq!(
3404 desered.unwrap_err().to_string(),
3405 "data did not match any variant of untagged enum TableMetadataEnum"
3406 )
3407 }
3408
3409 #[test]
3410 fn test_table_metadata_v2_unsupported_version() {
3411 let metadata =
3412 fs::read_to_string("testdata/table_metadata/TableMetadataUnsupportedVersion.json")
3413 .unwrap();
3414
3415 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3416
3417 assert_eq!(
3418 desered.unwrap_err().to_string(),
3419 "data did not match any variant of untagged enum TableMetadataEnum"
3420 )
3421 }
3422
3423 #[test]
3424 fn test_order_of_format_version() {
3425 assert!(FormatVersion::V1 < FormatVersion::V2);
3426 assert_eq!(FormatVersion::V1, FormatVersion::V1);
3427 assert_eq!(FormatVersion::V2, FormatVersion::V2);
3428 }
3429
3430 #[test]
3431 fn test_default_partition_spec() {
3432 let default_spec_id = 1234;
3433 let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3434 let partition_spec = PartitionSpec::unpartition_spec();
3435 table_meta_data.default_spec = partition_spec.clone().into();
3436 table_meta_data
3437 .partition_specs
3438 .insert(default_spec_id, Arc::new(partition_spec));
3439
3440 assert_eq!(
3441 (*table_meta_data.default_partition_spec().clone()).clone(),
3442 (*table_meta_data
3443 .partition_spec_by_id(default_spec_id)
3444 .unwrap()
3445 .clone())
3446 .clone()
3447 );
3448 }
3449 #[test]
3450 fn test_default_sort_order() {
3451 let default_sort_order_id = 1234;
3452 let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3453 table_meta_data.default_sort_order_id = default_sort_order_id;
3454 table_meta_data
3455 .sort_orders
3456 .insert(default_sort_order_id, Arc::new(SortOrder::default()));
3457
3458 assert_eq!(
3459 table_meta_data.default_sort_order(),
3460 table_meta_data
3461 .sort_orders
3462 .get(&default_sort_order_id)
3463 .unwrap()
3464 )
3465 }
3466
3467 #[test]
3468 fn test_table_metadata_builder_from_table_creation() {
3469 let table_creation = TableCreation::builder()
3470 .location("s3://db/table".to_string())
3471 .name("table".to_string())
3472 .properties(HashMap::new())
3473 .schema(Schema::builder().build().unwrap())
3474 .build();
3475 let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3476 .unwrap()
3477 .build()
3478 .unwrap()
3479 .metadata;
3480 assert_eq!(table_metadata.location, "s3://db/table");
3481 assert_eq!(table_metadata.schemas.len(), 1);
3482 assert_eq!(
3483 table_metadata
3484 .schemas
3485 .get(&0)
3486 .unwrap()
3487 .as_struct()
3488 .fields()
3489 .len(),
3490 0
3491 );
3492 assert_eq!(table_metadata.properties.len(), 0);
3493 assert_eq!(
3494 table_metadata.partition_specs,
3495 HashMap::from([(
3496 0,
3497 Arc::new(
3498 PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
3499 .with_spec_id(0)
3500 .build()
3501 .unwrap()
3502 )
3503 )])
3504 );
3505 assert_eq!(
3506 table_metadata.sort_orders,
3507 HashMap::from([(
3508 0,
3509 Arc::new(SortOrder {
3510 order_id: 0,
3511 fields: vec![]
3512 })
3513 )])
3514 );
3515 }
3516
3517 #[tokio::test]
3518 async fn test_table_metadata_read_write() {
3519 let temp_dir = TempDir::new().unwrap();
3521 let temp_path = temp_dir.path().to_str().unwrap();
3522
3523 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3525
3526 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3528
3529 let metadata_location = format!("{temp_path}/metadata.json");
3531
3532 original_metadata
3534 .write_to(&file_io, &metadata_location)
3535 .await
3536 .unwrap();
3537
3538 assert!(fs::metadata(&metadata_location).is_ok());
3540
3541 let read_metadata = TableMetadata::read_from(&file_io, &metadata_location)
3543 .await
3544 .unwrap();
3545
3546 assert_eq!(read_metadata, original_metadata);
3548 }
3549
3550 #[tokio::test]
3551 async fn test_table_metadata_read_compressed() {
3552 let temp_dir = TempDir::new().unwrap();
3553 let metadata_location = temp_dir.path().join("v1.gz.metadata.json");
3554
3555 let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3556 let json = serde_json::to_string(&original_metadata).unwrap();
3557
3558 let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
3559 encoder.write_all(json.as_bytes()).unwrap();
3560 std::fs::write(&metadata_location, encoder.finish().unwrap())
3561 .expect("failed to write metadata");
3562
3563 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3565 let metadata_location = metadata_location.to_str().unwrap();
3566 let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
3567 .await
3568 .unwrap();
3569
3570 assert_eq!(read_metadata, original_metadata);
3572 }
3573
3574 #[tokio::test]
3575 async fn test_table_metadata_read_nonexistent_file() {
3576 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3578
3579 let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;
3581
3582 assert!(result.is_err());
3584 }
3585
3586 #[test]
3587 fn test_partition_name_exists() {
3588 let schema = Schema::builder()
3589 .with_fields(vec![
3590 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3591 NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
3592 .into(),
3593 ])
3594 .build()
3595 .unwrap();
3596
3597 let spec1 = PartitionSpec::builder(schema.clone())
3598 .with_spec_id(1)
3599 .add_partition_field("data", "data_partition", Transform::Identity)
3600 .unwrap()
3601 .build()
3602 .unwrap();
3603
3604 let spec2 = PartitionSpec::builder(schema.clone())
3605 .with_spec_id(2)
3606 .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
3607 .unwrap()
3608 .build()
3609 .unwrap();
3610
3611 let metadata = TableMetadataBuilder::new(
3613 schema,
3614 spec1.clone().into_unbound(),
3615 SortOrder::unsorted_order(),
3616 "s3://test/location".to_string(),
3617 FormatVersion::V2,
3618 HashMap::new(),
3619 )
3620 .unwrap()
3621 .add_partition_spec(spec2.into_unbound())
3622 .unwrap()
3623 .build()
3624 .unwrap()
3625 .metadata;
3626
3627 assert!(metadata.partition_name_exists("data_partition"));
3628 assert!(metadata.partition_name_exists("partition_bucket"));
3629
3630 assert!(!metadata.partition_name_exists("nonexistent_field"));
3631 assert!(!metadata.partition_name_exists("data")); assert!(!metadata.partition_name_exists(""));
3633 }
3634
3635 #[test]
3636 fn test_partition_name_exists_empty_specs() {
3637 let schema = Schema::builder()
3639 .with_fields(vec![
3640 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3641 ])
3642 .build()
3643 .unwrap();
3644
3645 let metadata = TableMetadataBuilder::new(
3646 schema,
3647 PartitionSpec::unpartition_spec().into_unbound(),
3648 SortOrder::unsorted_order(),
3649 "s3://test/location".to_string(),
3650 FormatVersion::V2,
3651 HashMap::new(),
3652 )
3653 .unwrap()
3654 .build()
3655 .unwrap()
3656 .metadata;
3657
3658 assert!(!metadata.partition_name_exists("any_field"));
3659 assert!(!metadata.partition_name_exists("data"));
3660 }
3661
3662 #[test]
3663 fn test_name_exists_in_any_schema() {
3664 let schema1 = Schema::builder()
3666 .with_schema_id(1)
3667 .with_fields(vec![
3668 NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3669 NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
3670 ])
3671 .build()
3672 .unwrap();
3673
3674 let schema2 = Schema::builder()
3675 .with_schema_id(2)
3676 .with_fields(vec![
3677 NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3678 NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
3679 ])
3680 .build()
3681 .unwrap();
3682
3683 let metadata = TableMetadataBuilder::new(
3684 schema1,
3685 PartitionSpec::unpartition_spec().into_unbound(),
3686 SortOrder::unsorted_order(),
3687 "s3://test/location".to_string(),
3688 FormatVersion::V2,
3689 HashMap::new(),
3690 )
3691 .unwrap()
3692 .add_current_schema(schema2)
3693 .unwrap()
3694 .build()
3695 .unwrap()
3696 .metadata;
3697
3698 assert!(metadata.name_exists_in_any_schema("field1")); assert!(metadata.name_exists_in_any_schema("field2")); assert!(metadata.name_exists_in_any_schema("field3")); assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
3703 assert!(!metadata.name_exists_in_any_schema("field4"));
3704 assert!(!metadata.name_exists_in_any_schema(""));
3705 }
3706
3707 #[test]
3708 fn test_name_exists_in_any_schema_empty_schemas() {
3709 let schema = Schema::builder().with_fields(vec![]).build().unwrap();
3710
3711 let metadata = TableMetadataBuilder::new(
3712 schema,
3713 PartitionSpec::unpartition_spec().into_unbound(),
3714 SortOrder::unsorted_order(),
3715 "s3://test/location".to_string(),
3716 FormatVersion::V2,
3717 HashMap::new(),
3718 )
3719 .unwrap()
3720 .build()
3721 .unwrap()
3722 .metadata;
3723
3724 assert!(!metadata.name_exists_in_any_schema("any_field"));
3725 }
3726
3727 #[test]
3728 fn test_helper_methods_multi_version_scenario() {
3729 let initial_schema = Schema::builder()
3731 .with_fields(vec![
3732 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3733 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3734 NestedField::required(
3735 3,
3736 "deprecated_field",
3737 Type::Primitive(PrimitiveType::String),
3738 )
3739 .into(),
3740 ])
3741 .build()
3742 .unwrap();
3743
3744 let metadata = TableMetadataBuilder::new(
3745 initial_schema,
3746 PartitionSpec::unpartition_spec().into_unbound(),
3747 SortOrder::unsorted_order(),
3748 "s3://test/location".to_string(),
3749 FormatVersion::V2,
3750 HashMap::new(),
3751 )
3752 .unwrap();
3753
3754 let evolved_schema = Schema::builder()
3755 .with_fields(vec![
3756 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3757 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3758 NestedField::required(
3759 3,
3760 "deprecated_field",
3761 Type::Primitive(PrimitiveType::String),
3762 )
3763 .into(),
3764 NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3765 .into(),
3766 ])
3767 .build()
3768 .unwrap();
3769
3770 let _final_schema = Schema::builder()
3772 .with_fields(vec![
3773 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3774 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3775 NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3776 .into(),
3777 NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
3778 .into(),
3779 ])
3780 .build()
3781 .unwrap();
3782
3783 let final_metadata = metadata
3784 .add_current_schema(evolved_schema)
3785 .unwrap()
3786 .build()
3787 .unwrap()
3788 .metadata;
3789
3790 assert!(!final_metadata.partition_name_exists("nonexistent_partition")); assert!(final_metadata.name_exists_in_any_schema("id")); assert!(final_metadata.name_exists_in_any_schema("name")); assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); assert!(final_metadata.name_exists_in_any_schema("new_field")); assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
3797 }
3798}