iceberg/spec/
table_metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata).
19//! The main struct here is [TableMetadataV2] which defines the data for a table.
20
21use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::hash::Hash;
25use std::sync::Arc;
26
27use _serde::TableMetadataEnum;
28use chrono::{DateTime, Utc};
29use serde::{Deserialize, Serialize};
30use serde_repr::{Deserialize_repr, Serialize_repr};
31use uuid::Uuid;
32
33use super::snapshot::SnapshotReference;
34pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
35use super::{
36    DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
37    SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
38};
39use crate::error::{Result, timestamp_ms_to_utc};
40use crate::io::FileIO;
41use crate::{Error, ErrorKind};
42
43static MAIN_BRANCH: &str = "main";
44pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
45
46pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
47pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
48
49/// Reserved table property for table format version.
50///
51/// Iceberg will default a new table's format version to the latest stable and recommended
52/// version. This reserved property keyword allows users to override the Iceberg format version of
53/// the table metadata.
54///
55/// If this table property exists when creating a table, the table will use the specified format
56/// version. If a table updates this property, it will try to upgrade to the specified format
57/// version.
58pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
59/// Reserved table property for table UUID.
60pub const PROPERTY_UUID: &str = "uuid";
61/// Reserved table property for the total number of snapshots.
62pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
63/// Reserved table property for current snapshot summary.
64pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary";
65/// Reserved table property for current snapshot id.
66pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
67/// Reserved table property for current snapshot timestamp.
68pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = "current-snapshot-timestamp-ms";
69/// Reserved table property for the JSON representation of current schema.
70pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
71/// Reserved table property for the JSON representation of current(default) partition spec.
72pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
73/// Reserved table property for the JSON representation of current(default) sort order.
74pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
75
76/// Property key for max number of previous versions to keep.
77pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = "write.metadata.previous-versions-max";
78/// Default value for max number of previous versions to keep.
79pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
80
81/// Property key for max number of partitions to keep summary stats for.
82pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
83/// Default value for the max number of partitions to keep summary stats for.
84pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
85
86/// Reserved Iceberg table properties list.
87///
88/// Reserved table properties are only used to control behaviors when creating or updating a
89/// table. The value of these properties are not persisted as a part of the table metadata.
90pub const RESERVED_PROPERTIES: [&str; 9] = [
91    PROPERTY_FORMAT_VERSION,
92    PROPERTY_UUID,
93    PROPERTY_SNAPSHOT_COUNT,
94    PROPERTY_CURRENT_SNAPSHOT_ID,
95    PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
96    PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
97    PROPERTY_CURRENT_SCHEMA,
98    PROPERTY_DEFAULT_PARTITION_SPEC,
99    PROPERTY_DEFAULT_SORT_ORDER,
100];
101
102/// Property key for number of commit retries.
103pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
104/// Default value for number of commit retries.
105pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
106
107/// Property key for minimum wait time (ms) between retries.
108pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
109/// Default value for minimum wait time (ms) between retries.
110pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
111
112/// Property key for maximum wait time (ms) between retries.
113pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
114/// Default value for maximum wait time (ms) between retries.
115pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute
116
117/// Property key for total maximum retry time (ms).
118pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms";
119/// Default value for total maximum retry time (ms).
120pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes
121
122/// Default file format for data files
123pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
124/// Default file format for delete files
125pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
126/// Default value for data file format
127pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
128
129/// Target file size for newly written files.
130pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
131/// Default target file size
132pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB
133
134/// Reference to [`TableMetadata`].
135pub type TableMetadataRef = Arc<TableMetadata>;
136
137#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
138#[serde(try_from = "TableMetadataEnum")]
139/// Fields for the version 2 of the table metadata.
140///
141/// We assume that this data structure is always valid, so we will panic when invalid error happens.
142/// We check the validity of this data structure when constructing.
143pub struct TableMetadata {
144    /// Integer Version for the format.
145    pub(crate) format_version: FormatVersion,
146    /// A UUID that identifies the table
147    pub(crate) table_uuid: Uuid,
148    /// Location tables base location
149    pub(crate) location: String,
150    /// The tables highest sequence number
151    pub(crate) last_sequence_number: i64,
152    /// Timestamp in milliseconds from the unix epoch when the table was last updated.
153    pub(crate) last_updated_ms: i64,
154    /// An integer; the highest assigned column ID for the table.
155    pub(crate) last_column_id: i32,
156    /// A list of schemas, stored as objects with schema-id.
157    pub(crate) schemas: HashMap<i32, SchemaRef>,
158    /// ID of the table’s current schema.
159    pub(crate) current_schema_id: i32,
160    /// A list of partition specs, stored as full partition spec objects.
161    pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
162    /// ID of the “current” spec that writers should use by default.
163    pub(crate) default_spec: PartitionSpecRef,
164    /// Partition type of the default partition spec.
165    pub(crate) default_partition_type: StructType,
166    /// An integer; the highest assigned partition field ID across all partition specs for the table.
167    pub(crate) last_partition_id: i32,
168    ///A string to string map of table properties. This is used to control settings that
169    /// affect reading and writing and is not intended to be used for arbitrary metadata.
170    /// For example, commit.retry.num-retries is used to control the number of commit retries.
171    pub(crate) properties: HashMap<String, String>,
172    /// long ID of the current table snapshot; must be the same as the current
173    /// ID of the main branch in refs.
174    pub(crate) current_snapshot_id: Option<i64>,
175    ///A list of valid snapshots. Valid snapshots are snapshots for which all
176    /// data files exist in the file system. A data file must not be deleted
177    /// from the file system until the last snapshot in which it was listed is
178    /// garbage collected.
179    pub(crate) snapshots: HashMap<i64, SnapshotRef>,
180    /// A list (optional) of timestamp and snapshot ID pairs that encodes changes
181    /// to the current snapshot for the table. Each time the current-snapshot-id
182    /// is changed, a new entry should be added with the last-updated-ms
183    /// and the new current-snapshot-id. When snapshots are expired from
184    /// the list of valid snapshots, all entries before a snapshot that has
185    /// expired should be removed.
186    pub(crate) snapshot_log: Vec<SnapshotLog>,
187
188    /// A list (optional) of timestamp and metadata file location pairs
189    /// that encodes changes to the previous metadata files for the table.
190    /// Each time a new metadata file is created, a new entry of the
191    /// previous metadata file location should be added to the list.
192    /// Tables can be configured to remove the oldest metadata log entries and
193    /// keep a fixed-size log of the most recent entries after a commit.
194    pub(crate) metadata_log: Vec<MetadataLog>,
195
196    /// A list of sort orders, stored as full sort order objects.
197    pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
198    /// Default sort order id of the table. Note that this could be used by
199    /// writers, but is not used when reading because reads use the specs
200    /// stored in manifest files.
201    pub(crate) default_sort_order_id: i64,
202    /// A map of snapshot references. The map keys are the unique snapshot reference
203    /// names in the table, and the map values are snapshot reference objects.
204    /// There is always a main branch reference pointing to the current-snapshot-id
205    /// even if the refs map is null.
206    pub(crate) refs: HashMap<String, SnapshotReference>,
207    /// Mapping of snapshot ids to statistics files.
208    pub(crate) statistics: HashMap<i64, StatisticsFile>,
209    /// Mapping of snapshot ids to partition statistics files.
210    pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
211    /// Encryption Keys
212    pub(crate) encryption_keys: HashMap<String, String>,
213}
214
215impl TableMetadata {
216    /// Convert this Table Metadata into a builder for modification.
217    ///
218    /// `current_file_location` is the location where the current version
219    /// of the metadata file is stored. This is used to update the metadata log.
220    /// If `current_file_location` is `None`, the metadata log will not be updated.
221    /// This should only be used to stage-create tables.
222    #[must_use]
223    pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
224        TableMetadataBuilder::new_from_metadata(self, current_file_location)
225    }
226
227    /// Check if a partition field name exists in any partition spec.
228    #[inline]
229    pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
230        self.partition_specs
231            .values()
232            .any(|spec| spec.fields().iter().any(|pf| pf.name == name))
233    }
234
235    /// Check if a field name exists in any schema.
236    #[inline]
237    pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
238        self.schemas
239            .values()
240            .any(|schema| schema.field_by_name(name).is_some())
241    }
242
243    /// Returns format version of this metadata.
244    #[inline]
245    pub fn format_version(&self) -> FormatVersion {
246        self.format_version
247    }
248
249    /// Returns uuid of current table.
250    #[inline]
251    pub fn uuid(&self) -> Uuid {
252        self.table_uuid
253    }
254
255    /// Returns table location.
256    #[inline]
257    pub fn location(&self) -> &str {
258        self.location.as_str()
259    }
260
261    /// Returns last sequence number.
262    #[inline]
263    pub fn last_sequence_number(&self) -> i64 {
264        self.last_sequence_number
265    }
266
267    /// Returns the next sequence number for the table.
268    ///
269    /// For format version 1, it always returns the initial sequence number.
270    /// For other versions, it returns the last sequence number incremented by 1.
271    #[inline]
272    pub fn next_sequence_number(&self) -> i64 {
273        match self.format_version {
274            FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
275            _ => self.last_sequence_number + 1,
276        }
277    }
278
279    /// Returns the last column id.
280    #[inline]
281    pub fn last_column_id(&self) -> i32 {
282        self.last_column_id
283    }
284
285    /// Returns the last partition_id
286    #[inline]
287    pub fn last_partition_id(&self) -> i32 {
288        self.last_partition_id
289    }
290
291    /// Returns last updated time.
292    #[inline]
293    pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
294        timestamp_ms_to_utc(self.last_updated_ms)
295    }
296
297    /// Returns last updated time in milliseconds.
298    #[inline]
299    pub fn last_updated_ms(&self) -> i64 {
300        self.last_updated_ms
301    }
302
303    /// Returns schemas
304    #[inline]
305    pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
306        self.schemas.values()
307    }
308
309    /// Lookup schema by id.
310    #[inline]
311    pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
312        self.schemas.get(&schema_id)
313    }
314
315    /// Get current schema
316    #[inline]
317    pub fn current_schema(&self) -> &SchemaRef {
318        self.schema_by_id(self.current_schema_id)
319            .expect("Current schema id set, but not found in table metadata")
320    }
321
322    /// Get the id of the current schema
323    #[inline]
324    pub fn current_schema_id(&self) -> SchemaId {
325        self.current_schema_id
326    }
327
328    /// Returns all partition specs.
329    #[inline]
330    pub fn partition_specs_iter(&self) -> impl ExactSizeIterator<Item = &PartitionSpecRef> {
331        self.partition_specs.values()
332    }
333
334    /// Lookup partition spec by id.
335    #[inline]
336    pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> {
337        self.partition_specs.get(&spec_id)
338    }
339
340    /// Get default partition spec
341    #[inline]
342    pub fn default_partition_spec(&self) -> &PartitionSpecRef {
343        &self.default_spec
344    }
345
346    /// Return the partition type of the default partition spec.
347    #[inline]
348    pub fn default_partition_type(&self) -> &StructType {
349        &self.default_partition_type
350    }
351
352    #[inline]
353    /// Returns spec id of the "current" partition spec.
354    pub fn default_partition_spec_id(&self) -> i32 {
355        self.default_spec.spec_id()
356    }
357
358    /// Returns all snapshots
359    #[inline]
360    pub fn snapshots(&self) -> impl ExactSizeIterator<Item = &SnapshotRef> {
361        self.snapshots.values()
362    }
363
364    /// Lookup snapshot by id.
365    #[inline]
366    pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
367        self.snapshots.get(&snapshot_id)
368    }
369
370    /// Returns snapshot history.
371    #[inline]
372    pub fn history(&self) -> &[SnapshotLog] {
373        &self.snapshot_log
374    }
375
376    /// Returns the metadata log.
377    #[inline]
378    pub fn metadata_log(&self) -> &[MetadataLog] {
379        &self.metadata_log
380    }
381
382    /// Get current snapshot
383    #[inline]
384    pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
385        self.current_snapshot_id.map(|s| {
386            self.snapshot_by_id(s)
387                .expect("Current snapshot id has been set, but doesn't exist in metadata")
388        })
389    }
390
391    /// Get the current snapshot id
392    #[inline]
393    pub fn current_snapshot_id(&self) -> Option<i64> {
394        self.current_snapshot_id
395    }
396
397    /// Get the snapshot for a reference
398    /// Returns an option if the `ref_name` is not found
399    #[inline]
400    pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> {
401        self.refs.get(ref_name).map(|r| {
402            self.snapshot_by_id(r.snapshot_id)
403                .unwrap_or_else(|| panic!("Snapshot id of ref {} doesn't exist", ref_name))
404        })
405    }
406
407    /// Return all sort orders.
408    #[inline]
409    pub fn sort_orders_iter(&self) -> impl ExactSizeIterator<Item = &SortOrderRef> {
410        self.sort_orders.values()
411    }
412
413    /// Lookup sort order by id.
414    #[inline]
415    pub fn sort_order_by_id(&self, sort_order_id: i64) -> Option<&SortOrderRef> {
416        self.sort_orders.get(&sort_order_id)
417    }
418
419    /// Returns default sort order id.
420    #[inline]
421    pub fn default_sort_order(&self) -> &SortOrderRef {
422        self.sort_orders
423            .get(&self.default_sort_order_id)
424            .expect("Default order id has been set, but not found in table metadata!")
425    }
426
427    /// Returns default sort order id.
428    #[inline]
429    pub fn default_sort_order_id(&self) -> i64 {
430        self.default_sort_order_id
431    }
432
433    /// Returns properties of table.
434    #[inline]
435    pub fn properties(&self) -> &HashMap<String, String> {
436        &self.properties
437    }
438
439    /// Return location of statistics files.
440    #[inline]
441    pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
442        self.statistics.values()
443    }
444
445    /// Return location of partition statistics files.
446    #[inline]
447    pub fn partition_statistics_iter(
448        &self,
449    ) -> impl ExactSizeIterator<Item = &PartitionStatisticsFile> {
450        self.partition_statistics.values()
451    }
452
453    /// Get a statistics file for a snapshot id.
454    #[inline]
455    pub fn statistics_for_snapshot(&self, snapshot_id: i64) -> Option<&StatisticsFile> {
456        self.statistics.get(&snapshot_id)
457    }
458
459    /// Get a partition statistics file for a snapshot id.
460    #[inline]
461    pub fn partition_statistics_for_snapshot(
462        &self,
463        snapshot_id: i64,
464    ) -> Option<&PartitionStatisticsFile> {
465        self.partition_statistics.get(&snapshot_id)
466    }
467
468    fn construct_refs(&mut self) {
469        if let Some(current_snapshot_id) = self.current_snapshot_id {
470            if !self.refs.contains_key(MAIN_BRANCH) {
471                self.refs
472                    .insert(MAIN_BRANCH.to_string(), SnapshotReference {
473                        snapshot_id: current_snapshot_id,
474                        retention: SnapshotRetention::Branch {
475                            min_snapshots_to_keep: None,
476                            max_snapshot_age_ms: None,
477                            max_ref_age_ms: None,
478                        },
479                    });
480            }
481        }
482    }
483
484    /// Iterate over all encryption keys
485    #[inline]
486    pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = (&String, &String)> {
487        self.encryption_keys.iter()
488    }
489
490    /// Get the encryption key for a given key id
491    #[inline]
492    pub fn encryption_key(&self, key_id: &str) -> Option<&String> {
493        self.encryption_keys.get(key_id)
494    }
495
496    /// Read table metadata from the given location.
497    pub async fn read_from(
498        file_io: &FileIO,
499        metadata_location: impl AsRef<str>,
500    ) -> Result<TableMetadata> {
501        let input_file = file_io.new_input(metadata_location)?;
502        let metadata_content = input_file.read().await?;
503        let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
504        Ok(metadata)
505    }
506
507    /// Write table metadata to the given location.
508    pub async fn write_to(
509        &self,
510        file_io: &FileIO,
511        metadata_location: impl AsRef<str>,
512    ) -> Result<()> {
513        file_io
514            .new_output(metadata_location)?
515            .write(serde_json::to_vec(self)?.into())
516            .await
517    }
518
519    /// Normalize this partition spec.
520    ///
521    /// This is an internal method
522    /// meant to be called after constructing table metadata from untrusted sources.
523    /// We run this method after json deserialization.
524    /// All constructors for `TableMetadata` which are part of `iceberg-rust`
525    /// should return normalized `TableMetadata`.
526    pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
527        self.validate_current_schema()?;
528        self.normalize_current_snapshot()?;
529        self.construct_refs();
530        self.validate_refs()?;
531        self.validate_chronological_snapshot_logs()?;
532        self.validate_chronological_metadata_logs()?;
533        // Normalize location (remove trailing slash)
534        self.location = self.location.trim_end_matches('/').to_string();
535        self.validate_snapshot_sequence_number()?;
536        self.try_normalize_partition_spec()?;
537        self.try_normalize_sort_order()?;
538        Ok(self)
539    }
540
541    /// If the default partition spec is not present in specs, add it
542    fn try_normalize_partition_spec(&mut self) -> Result<()> {
543        if self
544            .partition_spec_by_id(self.default_spec.spec_id())
545            .is_none()
546        {
547            self.partition_specs.insert(
548                self.default_spec.spec_id(),
549                Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())),
550            );
551        }
552
553        Ok(())
554    }
555
556    /// If the default sort order is unsorted but the sort order is not present, add it
557    fn try_normalize_sort_order(&mut self) -> Result<()> {
558        if self.sort_order_by_id(self.default_sort_order_id).is_some() {
559            return Ok(());
560        }
561
562        if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
563            return Err(Error::new(
564                ErrorKind::DataInvalid,
565                format!(
566                    "No sort order exists with the default sort order id {}.",
567                    self.default_sort_order_id
568                ),
569            ));
570        }
571
572        let sort_order = SortOrder::unsorted_order();
573        self.sort_orders
574            .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
575        Ok(())
576    }
577
578    /// Validate the current schema is set and exists.
579    fn validate_current_schema(&self) -> Result<()> {
580        if self.schema_by_id(self.current_schema_id).is_none() {
581            return Err(Error::new(
582                ErrorKind::DataInvalid,
583                format!(
584                    "No schema exists with the current schema id {}.",
585                    self.current_schema_id
586                ),
587            ));
588        }
589        Ok(())
590    }
591
592    /// If current snapshot is Some(-1) then set it to None.
593    fn normalize_current_snapshot(&mut self) -> Result<()> {
594        if let Some(current_snapshot_id) = self.current_snapshot_id {
595            if current_snapshot_id == EMPTY_SNAPSHOT_ID {
596                self.current_snapshot_id = None;
597            } else if self.snapshot_by_id(current_snapshot_id).is_none() {
598                return Err(Error::new(
599                    ErrorKind::DataInvalid,
600                    format!(
601                        "Snapshot for current snapshot id {} does not exist in the existing snapshots list",
602                        current_snapshot_id
603                    ),
604                ));
605            }
606        }
607        Ok(())
608    }
609
610    /// Validate that all refs are valid (snapshot exists)
611    fn validate_refs(&self) -> Result<()> {
612        for (name, snapshot_ref) in self.refs.iter() {
613            if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
614                return Err(Error::new(
615                    ErrorKind::DataInvalid,
616                    format!(
617                        "Snapshot for reference {name} does not exist in the existing snapshots list"
618                    ),
619                ));
620            }
621        }
622
623        let main_ref = self.refs.get(MAIN_BRANCH);
624        if self.current_snapshot_id.is_some() {
625            if let Some(main_ref) = main_ref {
626                if main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default() {
627                    return Err(Error::new(
628                        ErrorKind::DataInvalid,
629                        format!(
630                            "Current snapshot id does not match main branch ({:?} != {:?})",
631                            self.current_snapshot_id.unwrap_or_default(),
632                            main_ref.snapshot_id
633                        ),
634                    ));
635                }
636            }
637        } else if main_ref.is_some() {
638            return Err(Error::new(
639                ErrorKind::DataInvalid,
640                "Current snapshot is not set, but main branch exists",
641            ));
642        }
643
644        Ok(())
645    }
646
647    /// Validate that for V1 Metadata the last_sequence_number is 0
648    fn validate_snapshot_sequence_number(&self) -> Result<()> {
649        if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
650            return Err(Error::new(
651                ErrorKind::DataInvalid,
652                format!(
653                    "Last sequence number must be 0 in v1. Found {}",
654                    self.last_sequence_number
655                ),
656            ));
657        }
658
659        if self.format_version >= FormatVersion::V2 {
660            if let Some(snapshot) = self
661                .snapshots
662                .values()
663                .find(|snapshot| snapshot.sequence_number() > self.last_sequence_number)
664            {
665                return Err(Error::new(
666                    ErrorKind::DataInvalid,
667                    format!(
668                        "Invalid snapshot with id {} and sequence number {} greater than last sequence number {}",
669                        snapshot.snapshot_id(),
670                        snapshot.sequence_number(),
671                        self.last_sequence_number
672                    ),
673                ));
674            }
675        }
676
677        Ok(())
678    }
679
680    /// Validate snapshots logs are chronological and last updated is after the last snapshot log.
681    fn validate_chronological_snapshot_logs(&self) -> Result<()> {
682        for window in self.snapshot_log.windows(2) {
683            let (prev, curr) = (&window[0], &window[1]);
684            // commits can happen concurrently from different machines.
685            // A tolerance helps us avoid failure for small clock skew
686            if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
687                return Err(Error::new(
688                    ErrorKind::DataInvalid,
689                    "Expected sorted snapshot log entries",
690                ));
691            }
692        }
693
694        if let Some(last) = self.snapshot_log.last() {
695            // commits can happen concurrently from different machines.
696            // A tolerance helps us avoid failure for small clock skew
697            if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
698                return Err(Error::new(
699                    ErrorKind::DataInvalid,
700                    format!(
701                        "Invalid update timestamp {}: before last snapshot log entry at {}",
702                        self.last_updated_ms, last.timestamp_ms
703                    ),
704                ));
705            }
706        }
707        Ok(())
708    }
709
710    fn validate_chronological_metadata_logs(&self) -> Result<()> {
711        for window in self.metadata_log.windows(2) {
712            let (prev, curr) = (&window[0], &window[1]);
713            // commits can happen concurrently from different machines.
714            // A tolerance helps us avoid failure for small clock skew
715            if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
716                return Err(Error::new(
717                    ErrorKind::DataInvalid,
718                    "Expected sorted metadata log entries",
719                ));
720            }
721        }
722
723        if let Some(last) = self.metadata_log.last() {
724            // commits can happen concurrently from different machines.
725            // A tolerance helps us avoid failure for small clock skew
726            if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
727                return Err(Error::new(
728                    ErrorKind::DataInvalid,
729                    format!(
730                        "Invalid update timestamp {}: before last metadata log entry at {}",
731                        self.last_updated_ms, last.timestamp_ms
732                    ),
733                ));
734            }
735        }
736
737        Ok(())
738    }
739}
740
741pub(super) mod _serde {
742    use std::borrow::BorrowMut;
743    /// This is a helper module that defines types to help with serialization/deserialization.
744    /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct
745    /// and then converted into the [TableMetadata] struct. Serialization works the other way around.
746    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization.
747    use std::collections::HashMap;
748    /// This is a helper module that defines types to help with serialization/deserialization.
749    /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct
750    /// and then converted into the [TableMetadata] struct. Serialization works the other way around.
751    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization.
752    use std::sync::Arc;
753
754    use serde::{Deserialize, Serialize};
755    use uuid::Uuid;
756
757    use super::{
758        DEFAULT_PARTITION_SPEC_ID, FormatVersion, MAIN_BRANCH, MetadataLog, SnapshotLog,
759        TableMetadata,
760    };
761    use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
762    use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2};
763    use crate::spec::{
764        PartitionField, PartitionSpec, PartitionSpecRef, PartitionStatisticsFile, Schema,
765        SchemaRef, Snapshot, SnapshotReference, SnapshotRetention, SortOrder, StatisticsFile,
766    };
767    use crate::{Error, ErrorKind};
768
769    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
770    #[serde(untagged)]
771    pub(super) enum TableMetadataEnum {
772        V2(TableMetadataV2),
773        V1(TableMetadataV1),
774    }
775
776    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
777    #[serde(rename_all = "kebab-case")]
778    /// Defines the structure of a v2 table metadata for serialization/deserialization
779    pub(super) struct TableMetadataV2 {
780        pub format_version: VersionNumber<2>,
781        pub table_uuid: Uuid,
782        pub location: String,
783        pub last_sequence_number: i64,
784        pub last_updated_ms: i64,
785        pub last_column_id: i32,
786        pub schemas: Vec<SchemaV2>,
787        pub current_schema_id: i32,
788        pub partition_specs: Vec<PartitionSpec>,
789        pub default_spec_id: i32,
790        pub last_partition_id: i32,
791        #[serde(skip_serializing_if = "Option::is_none")]
792        pub properties: Option<HashMap<String, String>>,
793        #[serde(skip_serializing_if = "Option::is_none")]
794        pub current_snapshot_id: Option<i64>,
795        #[serde(skip_serializing_if = "Option::is_none")]
796        pub snapshots: Option<Vec<SnapshotV2>>,
797        #[serde(skip_serializing_if = "Option::is_none")]
798        pub snapshot_log: Option<Vec<SnapshotLog>>,
799        #[serde(skip_serializing_if = "Option::is_none")]
800        pub metadata_log: Option<Vec<MetadataLog>>,
801        pub sort_orders: Vec<SortOrder>,
802        pub default_sort_order_id: i64,
803        #[serde(skip_serializing_if = "Option::is_none")]
804        pub refs: Option<HashMap<String, SnapshotReference>>,
805        #[serde(default, skip_serializing_if = "Vec::is_empty")]
806        pub statistics: Vec<StatisticsFile>,
807        #[serde(default, skip_serializing_if = "Vec::is_empty")]
808        pub partition_statistics: Vec<PartitionStatisticsFile>,
809    }
810
811    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
812    #[serde(rename_all = "kebab-case")]
813    /// Defines the structure of a v1 table metadata for serialization/deserialization
814    pub(super) struct TableMetadataV1 {
815        pub format_version: VersionNumber<1>,
816        #[serde(skip_serializing_if = "Option::is_none")]
817        pub table_uuid: Option<Uuid>,
818        pub location: String,
819        pub last_updated_ms: i64,
820        pub last_column_id: i32,
821        /// `schema` is optional to prioritize `schemas` and `current-schema-id`, allowing liberal reading of V1 metadata.
822        pub schema: Option<SchemaV1>,
823        #[serde(skip_serializing_if = "Option::is_none")]
824        pub schemas: Option<Vec<SchemaV1>>,
825        #[serde(skip_serializing_if = "Option::is_none")]
826        pub current_schema_id: Option<i32>,
827        /// `partition_spec` is optional to prioritize `partition_specs`, aligning with liberal reading of potentially invalid V1 metadata.
828        pub partition_spec: Option<Vec<PartitionField>>,
829        #[serde(skip_serializing_if = "Option::is_none")]
830        pub partition_specs: Option<Vec<PartitionSpec>>,
831        #[serde(skip_serializing_if = "Option::is_none")]
832        pub default_spec_id: Option<i32>,
833        #[serde(skip_serializing_if = "Option::is_none")]
834        pub last_partition_id: Option<i32>,
835        #[serde(skip_serializing_if = "Option::is_none")]
836        pub properties: Option<HashMap<String, String>>,
837        #[serde(skip_serializing_if = "Option::is_none")]
838        pub current_snapshot_id: Option<i64>,
839        #[serde(skip_serializing_if = "Option::is_none")]
840        pub snapshots: Option<Vec<SnapshotV1>>,
841        #[serde(skip_serializing_if = "Option::is_none")]
842        pub snapshot_log: Option<Vec<SnapshotLog>>,
843        #[serde(skip_serializing_if = "Option::is_none")]
844        pub metadata_log: Option<Vec<MetadataLog>>,
845        pub sort_orders: Option<Vec<SortOrder>>,
846        pub default_sort_order_id: Option<i64>,
847        #[serde(default, skip_serializing_if = "Vec::is_empty")]
848        pub statistics: Vec<StatisticsFile>,
849        #[serde(default, skip_serializing_if = "Vec::is_empty")]
850        pub partition_statistics: Vec<PartitionStatisticsFile>,
851    }
852
853    /// Helper to serialize and deserialize the format version.
854    #[derive(Debug, PartialEq, Eq)]
855    pub(crate) struct VersionNumber<const V: u8>;
856
857    impl Serialize for TableMetadata {
858        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
859        where S: serde::Serializer {
860            // we must do a clone here
861            let table_metadata_enum: TableMetadataEnum =
862                self.clone().try_into().map_err(serde::ser::Error::custom)?;
863
864            table_metadata_enum.serialize(serializer)
865        }
866    }
867
868    impl<const V: u8> Serialize for VersionNumber<V> {
869        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
870        where S: serde::Serializer {
871            serializer.serialize_u8(V)
872        }
873    }
874
875    impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
876        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
877        where D: serde::Deserializer<'de> {
878            let value = u8::deserialize(deserializer)?;
879            if value == V {
880                Ok(VersionNumber::<V>)
881            } else {
882                Err(serde::de::Error::custom("Invalid Version"))
883            }
884        }
885    }
886
887    impl TryFrom<TableMetadataEnum> for TableMetadata {
888        type Error = Error;
889        fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
890            match value {
891                TableMetadataEnum::V2(value) => value.try_into(),
892                TableMetadataEnum::V1(value) => value.try_into(),
893            }
894        }
895    }
896
897    impl TryFrom<TableMetadata> for TableMetadataEnum {
898        type Error = Error;
899        fn try_from(value: TableMetadata) -> Result<Self, Error> {
900            Ok(match value.format_version {
901                FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
902                FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
903            })
904        }
905    }
906
907    impl TryFrom<TableMetadataV2> for TableMetadata {
908        type Error = Error;
909        fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
910            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
911                None
912            } else {
913                value.current_snapshot_id
914            };
915            let schemas = HashMap::from_iter(
916                value
917                    .schemas
918                    .into_iter()
919                    .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
920                    .collect::<Result<Vec<_>, Error>>()?,
921            );
922
923            let current_schema: &SchemaRef =
924                schemas.get(&value.current_schema_id).ok_or_else(|| {
925                    Error::new(
926                        ErrorKind::DataInvalid,
927                        format!(
928                            "No schema exists with the current schema id {}.",
929                            value.current_schema_id
930                        ),
931                    )
932                })?;
933            let partition_specs = HashMap::from_iter(
934                value
935                    .partition_specs
936                    .into_iter()
937                    .map(|x| (x.spec_id(), Arc::new(x))),
938            );
939            let default_spec_id = value.default_spec_id;
940            let default_spec: PartitionSpecRef = partition_specs
941                .get(&value.default_spec_id)
942                .map(|spec| (**spec).clone())
943                .or_else(|| {
944                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
945                        .then(PartitionSpec::unpartition_spec)
946                })
947                .ok_or_else(|| {
948                    Error::new(
949                        ErrorKind::DataInvalid,
950                        format!("Default partition spec {default_spec_id} not found"),
951                    )
952                })?
953                .into();
954            let default_partition_type = default_spec.partition_type(current_schema)?;
955
956            let mut metadata = TableMetadata {
957                format_version: FormatVersion::V2,
958                table_uuid: value.table_uuid,
959                location: value.location,
960                last_sequence_number: value.last_sequence_number,
961                last_updated_ms: value.last_updated_ms,
962                last_column_id: value.last_column_id,
963                current_schema_id: value.current_schema_id,
964                schemas,
965                partition_specs,
966                default_partition_type,
967                default_spec,
968                last_partition_id: value.last_partition_id,
969                properties: value.properties.unwrap_or_default(),
970                current_snapshot_id,
971                snapshots: value
972                    .snapshots
973                    .map(|snapshots| {
974                        HashMap::from_iter(
975                            snapshots
976                                .into_iter()
977                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
978                        )
979                    })
980                    .unwrap_or_default(),
981                snapshot_log: value.snapshot_log.unwrap_or_default(),
982                metadata_log: value.metadata_log.unwrap_or_default(),
983                sort_orders: HashMap::from_iter(
984                    value
985                        .sort_orders
986                        .into_iter()
987                        .map(|x| (x.order_id, Arc::new(x))),
988                ),
989                default_sort_order_id: value.default_sort_order_id,
990                refs: value.refs.unwrap_or_else(|| {
991                    if let Some(snapshot_id) = current_snapshot_id {
992                        HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
993                            snapshot_id,
994                            retention: SnapshotRetention::Branch {
995                                min_snapshots_to_keep: None,
996                                max_snapshot_age_ms: None,
997                                max_ref_age_ms: None,
998                            },
999                        })])
1000                    } else {
1001                        HashMap::new()
1002                    }
1003                }),
1004                statistics: index_statistics(value.statistics),
1005                partition_statistics: index_partition_statistics(value.partition_statistics),
1006                encryption_keys: HashMap::new(),
1007            };
1008
1009            metadata.borrow_mut().try_normalize()?;
1010            Ok(metadata)
1011        }
1012    }
1013
1014    impl TryFrom<TableMetadataV1> for TableMetadata {
1015        type Error = Error;
1016        fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
1017            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1018                None
1019            } else {
1020                value.current_snapshot_id
1021            };
1022
1023            let (schemas, current_schema_id, current_schema) =
1024                if let (Some(schemas_vec), Some(schema_id)) =
1025                    (&value.schemas, value.current_schema_id)
1026                {
1027                    // Option 1: Use 'schemas' + 'current_schema_id'
1028                    let schema_map = HashMap::from_iter(
1029                        schemas_vec
1030                            .clone()
1031                            .into_iter()
1032                            .map(|schema| {
1033                                let schema: Schema = schema.try_into()?;
1034                                Ok((schema.schema_id(), Arc::new(schema)))
1035                            })
1036                            .collect::<Result<Vec<_>, Error>>()?,
1037                    );
1038
1039                    let schema = schema_map
1040                        .get(&schema_id)
1041                        .ok_or_else(|| {
1042                            Error::new(
1043                                ErrorKind::DataInvalid,
1044                                format!(
1045                                    "No schema exists with the current schema id {}.",
1046                                    schema_id
1047                                ),
1048                            )
1049                        })?
1050                        .clone();
1051                    (schema_map, schema_id, schema)
1052                } else if let Some(schema) = value.schema {
1053                    // Option 2: Fall back to `schema`
1054                    let schema: Schema = schema.try_into()?;
1055                    let schema_id = schema.schema_id();
1056                    let schema_arc = Arc::new(schema);
1057                    let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
1058                    (schema_map, schema_id, schema_arc)
1059                } else {
1060                    // Option 3: No valid schema configuration found
1061                    return Err(Error::new(
1062                        ErrorKind::DataInvalid,
1063                        "No valid schema configuration found in table metadata",
1064                    ));
1065                };
1066
1067            // Prioritize 'partition_specs' over 'partition_spec'
1068            let partition_specs = if let Some(specs_vec) = value.partition_specs {
1069                // Option 1: Use 'partition_specs'
1070                specs_vec
1071                    .into_iter()
1072                    .map(|x| (x.spec_id(), Arc::new(x)))
1073                    .collect::<HashMap<_, _>>()
1074            } else if let Some(partition_spec) = value.partition_spec {
1075                // Option 2: Fall back to 'partition_spec'
1076                let spec = PartitionSpec::builder(current_schema.clone())
1077                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1078                    .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
1079                    .build()?;
1080
1081                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1082            } else {
1083                // Option 3: Create empty partition spec
1084                let spec = PartitionSpec::builder(current_schema.clone())
1085                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1086                    .build()?;
1087
1088                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1089            };
1090
1091            // Get the default_spec_id, prioritizing the explicit value if provided
1092            let default_spec_id = value
1093                .default_spec_id
1094                .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
1095
1096            // Get the default spec
1097            let default_spec: PartitionSpecRef = partition_specs
1098                .get(&default_spec_id)
1099                .map(|x| Arc::unwrap_or_clone(x.clone()))
1100                .ok_or_else(|| {
1101                    Error::new(
1102                        ErrorKind::DataInvalid,
1103                        format!("Default partition spec {default_spec_id} not found"),
1104                    )
1105                })?
1106                .into();
1107            let default_partition_type = default_spec.partition_type(&current_schema)?;
1108
1109            let mut metadata = TableMetadata {
1110                format_version: FormatVersion::V1,
1111                table_uuid: value.table_uuid.unwrap_or_default(),
1112                location: value.location,
1113                last_sequence_number: 0,
1114                last_updated_ms: value.last_updated_ms,
1115                last_column_id: value.last_column_id,
1116                current_schema_id,
1117                default_spec,
1118                default_partition_type,
1119                last_partition_id: value
1120                    .last_partition_id
1121                    .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
1122                partition_specs,
1123                schemas,
1124                properties: value.properties.unwrap_or_default(),
1125                current_snapshot_id,
1126                snapshots: value
1127                    .snapshots
1128                    .map(|snapshots| {
1129                        Ok::<_, Error>(HashMap::from_iter(
1130                            snapshots
1131                                .into_iter()
1132                                .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
1133                                .collect::<Result<Vec<_>, Error>>()?,
1134                        ))
1135                    })
1136                    .transpose()?
1137                    .unwrap_or_default(),
1138                snapshot_log: value.snapshot_log.unwrap_or_default(),
1139                metadata_log: value.metadata_log.unwrap_or_default(),
1140                sort_orders: match value.sort_orders {
1141                    Some(sort_orders) => HashMap::from_iter(
1142                        sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
1143                    ),
1144                    None => HashMap::new(),
1145                },
1146                default_sort_order_id: value
1147                    .default_sort_order_id
1148                    .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
1149                refs: if let Some(snapshot_id) = current_snapshot_id {
1150                    HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1151                        snapshot_id,
1152                        retention: SnapshotRetention::Branch {
1153                            min_snapshots_to_keep: None,
1154                            max_snapshot_age_ms: None,
1155                            max_ref_age_ms: None,
1156                        },
1157                    })])
1158                } else {
1159                    HashMap::new()
1160                },
1161                statistics: index_statistics(value.statistics),
1162                partition_statistics: index_partition_statistics(value.partition_statistics),
1163                encryption_keys: HashMap::new(),
1164            };
1165
1166            metadata.borrow_mut().try_normalize()?;
1167            Ok(metadata)
1168        }
1169    }
1170
1171    impl From<TableMetadata> for TableMetadataV2 {
1172        fn from(v: TableMetadata) -> Self {
1173            TableMetadataV2 {
1174                format_version: VersionNumber::<2>,
1175                table_uuid: v.table_uuid,
1176                location: v.location,
1177                last_sequence_number: v.last_sequence_number,
1178                last_updated_ms: v.last_updated_ms,
1179                last_column_id: v.last_column_id,
1180                schemas: v
1181                    .schemas
1182                    .into_values()
1183                    .map(|x| {
1184                        Arc::try_unwrap(x)
1185                            .unwrap_or_else(|schema| schema.as_ref().clone())
1186                            .into()
1187                    })
1188                    .collect(),
1189                current_schema_id: v.current_schema_id,
1190                partition_specs: v
1191                    .partition_specs
1192                    .into_values()
1193                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1194                    .collect(),
1195                default_spec_id: v.default_spec.spec_id(),
1196                last_partition_id: v.last_partition_id,
1197                properties: if v.properties.is_empty() {
1198                    None
1199                } else {
1200                    Some(v.properties)
1201                },
1202                current_snapshot_id: v.current_snapshot_id,
1203                snapshots: if v.snapshots.is_empty() {
1204                    None
1205                } else {
1206                    Some(
1207                        v.snapshots
1208                            .into_values()
1209                            .map(|x| {
1210                                Arc::try_unwrap(x)
1211                                    .unwrap_or_else(|snapshot| snapshot.as_ref().clone())
1212                                    .into()
1213                            })
1214                            .collect(),
1215                    )
1216                },
1217                snapshot_log: if v.snapshot_log.is_empty() {
1218                    None
1219                } else {
1220                    Some(v.snapshot_log)
1221                },
1222                metadata_log: if v.metadata_log.is_empty() {
1223                    None
1224                } else {
1225                    Some(v.metadata_log)
1226                },
1227                sort_orders: v
1228                    .sort_orders
1229                    .into_values()
1230                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1231                    .collect(),
1232                default_sort_order_id: v.default_sort_order_id,
1233                refs: Some(v.refs),
1234                statistics: v.statistics.into_values().collect(),
1235                partition_statistics: v.partition_statistics.into_values().collect(),
1236            }
1237        }
1238    }
1239
1240    impl TryFrom<TableMetadata> for TableMetadataV1 {
1241        type Error = Error;
1242        fn try_from(v: TableMetadata) -> Result<Self, Error> {
1243            Ok(TableMetadataV1 {
1244                format_version: VersionNumber::<1>,
1245                table_uuid: Some(v.table_uuid),
1246                location: v.location,
1247                last_updated_ms: v.last_updated_ms,
1248                last_column_id: v.last_column_id,
1249                schema: Some(
1250                    v.schemas
1251                        .get(&v.current_schema_id)
1252                        .ok_or(Error::new(
1253                            ErrorKind::Unexpected,
1254                            "current_schema_id not found in schemas",
1255                        ))?
1256                        .as_ref()
1257                        .clone()
1258                        .into(),
1259                ),
1260                schemas: Some(
1261                    v.schemas
1262                        .into_values()
1263                        .map(|x| {
1264                            Arc::try_unwrap(x)
1265                                .unwrap_or_else(|schema| schema.as_ref().clone())
1266                                .into()
1267                        })
1268                        .collect(),
1269                ),
1270                current_schema_id: Some(v.current_schema_id),
1271                partition_spec: Some(v.default_spec.fields().to_vec()),
1272                partition_specs: Some(
1273                    v.partition_specs
1274                        .into_values()
1275                        .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1276                        .collect(),
1277                ),
1278                default_spec_id: Some(v.default_spec.spec_id()),
1279                last_partition_id: Some(v.last_partition_id),
1280                properties: if v.properties.is_empty() {
1281                    None
1282                } else {
1283                    Some(v.properties)
1284                },
1285                current_snapshot_id: v.current_snapshot_id,
1286                snapshots: if v.snapshots.is_empty() {
1287                    None
1288                } else {
1289                    Some(
1290                        v.snapshots
1291                            .into_values()
1292                            .map(|x| Snapshot::clone(&x).into())
1293                            .collect(),
1294                    )
1295                },
1296                snapshot_log: if v.snapshot_log.is_empty() {
1297                    None
1298                } else {
1299                    Some(v.snapshot_log)
1300                },
1301                metadata_log: if v.metadata_log.is_empty() {
1302                    None
1303                } else {
1304                    Some(v.metadata_log)
1305                },
1306                sort_orders: Some(
1307                    v.sort_orders
1308                        .into_values()
1309                        .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| s.as_ref().clone()))
1310                        .collect(),
1311                ),
1312                default_sort_order_id: Some(v.default_sort_order_id),
1313                statistics: v.statistics.into_values().collect(),
1314                partition_statistics: v.partition_statistics.into_values().collect(),
1315            })
1316        }
1317    }
1318
1319    fn index_statistics(statistics: Vec<StatisticsFile>) -> HashMap<i64, StatisticsFile> {
1320        statistics
1321            .into_iter()
1322            .rev()
1323            .map(|s| (s.snapshot_id, s))
1324            .collect()
1325    }
1326
1327    fn index_partition_statistics(
1328        statistics: Vec<PartitionStatisticsFile>,
1329    ) -> HashMap<i64, PartitionStatisticsFile> {
1330        statistics
1331            .into_iter()
1332            .rev()
1333            .map(|s| (s.snapshot_id, s))
1334            .collect()
1335    }
1336}
1337
1338#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)]
1339#[repr(u8)]
1340/// Iceberg format version
1341pub enum FormatVersion {
1342    /// Iceberg spec version 1
1343    V1 = 1u8,
1344    /// Iceberg spec version 2
1345    V2 = 2u8,
1346}
1347
1348impl PartialOrd for FormatVersion {
1349    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1350        Some(self.cmp(other))
1351    }
1352}
1353
1354impl Ord for FormatVersion {
1355    fn cmp(&self, other: &Self) -> Ordering {
1356        (*self as u8).cmp(&(*other as u8))
1357    }
1358}
1359
1360impl Display for FormatVersion {
1361    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1362        match self {
1363            FormatVersion::V1 => write!(f, "v1"),
1364            FormatVersion::V2 => write!(f, "v2"),
1365        }
1366    }
1367}
1368
1369#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1370#[serde(rename_all = "kebab-case")]
1371/// Encodes changes to the previous metadata files for the table
1372pub struct MetadataLog {
1373    /// The file for the log.
1374    pub metadata_file: String,
1375    /// Time new metadata was created
1376    pub timestamp_ms: i64,
1377}
1378
1379#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1380#[serde(rename_all = "kebab-case")]
1381/// A log of when each snapshot was made.
1382pub struct SnapshotLog {
1383    /// Id of the snapshot.
1384    pub snapshot_id: i64,
1385    /// Last updated timestamp
1386    pub timestamp_ms: i64,
1387}
1388
1389impl SnapshotLog {
1390    /// Returns the last updated timestamp as a DateTime<Utc> with millisecond precision
1391    pub fn timestamp(self) -> Result<DateTime<Utc>> {
1392        timestamp_ms_to_utc(self.timestamp_ms)
1393    }
1394
1395    /// Returns the timestamp in milliseconds
1396    #[inline]
1397    pub fn timestamp_ms(&self) -> i64 {
1398        self.timestamp_ms
1399    }
1400}
1401
1402#[cfg(test)]
1403mod tests {
1404    use std::collections::HashMap;
1405    use std::fs;
1406    use std::sync::Arc;
1407
1408    use anyhow::Result;
1409    use pretty_assertions::assert_eq;
1410    use tempfile::TempDir;
1411    use uuid::Uuid;
1412
1413    use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
1414    use crate::TableCreation;
1415    use crate::io::FileIOBuilder;
1416    use crate::spec::table_metadata::TableMetadata;
1417    use crate::spec::{
1418        BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PartitionStatisticsFile,
1419        PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection,
1420        SortField, SortOrder, StatisticsFile, Summary, Transform, Type, UnboundPartitionField,
1421    };
1422
1423    fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
1424        let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
1425        assert_eq!(desered_type, expected_type);
1426
1427        let sered_json = serde_json::to_string(&expected_type).unwrap();
1428        let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
1429
1430        assert_eq!(parsed_json_value, desered_type);
1431    }
1432
1433    fn get_test_table_metadata(file_name: &str) -> TableMetadata {
1434        let path = format!("testdata/table_metadata/{}", file_name);
1435        let metadata: String = fs::read_to_string(path).unwrap();
1436
1437        serde_json::from_str(&metadata).unwrap()
1438    }
1439
1440    #[test]
1441    fn test_table_data_v2() {
1442        let data = r#"
1443            {
1444                "format-version" : 2,
1445                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1446                "location": "s3://b/wh/data.db/table",
1447                "last-sequence-number" : 1,
1448                "last-updated-ms": 1515100955770,
1449                "last-column-id": 1,
1450                "schemas": [
1451                    {
1452                        "schema-id" : 1,
1453                        "type" : "struct",
1454                        "fields" :[
1455                            {
1456                                "id": 1,
1457                                "name": "struct_name",
1458                                "required": true,
1459                                "type": "fixed[1]"
1460                            },
1461                            {
1462                                "id": 4,
1463                                "name": "ts",
1464                                "required": true,
1465                                "type": "timestamp"
1466                            }
1467                        ]
1468                    }
1469                ],
1470                "current-schema-id" : 1,
1471                "partition-specs": [
1472                    {
1473                        "spec-id": 0,
1474                        "fields": [
1475                            {
1476                                "source-id": 4,
1477                                "field-id": 1000,
1478                                "name": "ts_day",
1479                                "transform": "day"
1480                            }
1481                        ]
1482                    }
1483                ],
1484                "default-spec-id": 0,
1485                "last-partition-id": 1000,
1486                "properties": {
1487                    "commit.retry.num-retries": "1"
1488                },
1489                "metadata-log": [
1490                    {
1491                        "metadata-file": "s3://bucket/.../v1.json",
1492                        "timestamp-ms": 1515100
1493                    }
1494                ],
1495                "refs": {},
1496                "sort-orders": [
1497                    {
1498                    "order-id": 0,
1499                    "fields": []
1500                    }
1501                ],
1502                "default-sort-order-id": 0
1503            }
1504        "#;
1505
1506        let schema = Schema::builder()
1507            .with_schema_id(1)
1508            .with_fields(vec![
1509                Arc::new(NestedField::required(
1510                    1,
1511                    "struct_name",
1512                    Type::Primitive(PrimitiveType::Fixed(1)),
1513                )),
1514                Arc::new(NestedField::required(
1515                    4,
1516                    "ts",
1517                    Type::Primitive(PrimitiveType::Timestamp),
1518                )),
1519            ])
1520            .build()
1521            .unwrap();
1522
1523        let partition_spec = PartitionSpec::builder(schema.clone())
1524            .with_spec_id(0)
1525            .add_unbound_field(UnboundPartitionField {
1526                name: "ts_day".to_string(),
1527                transform: Transform::Day,
1528                source_id: 4,
1529                field_id: Some(1000),
1530            })
1531            .unwrap()
1532            .build()
1533            .unwrap();
1534
1535        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1536        let expected = TableMetadata {
1537            format_version: FormatVersion::V2,
1538            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1539            location: "s3://b/wh/data.db/table".to_string(),
1540            last_updated_ms: 1515100955770,
1541            last_column_id: 1,
1542            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1543            current_schema_id: 1,
1544            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1545            default_partition_type,
1546            default_spec: partition_spec.into(),
1547            last_partition_id: 1000,
1548            default_sort_order_id: 0,
1549            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1550            snapshots: HashMap::default(),
1551            current_snapshot_id: None,
1552            last_sequence_number: 1,
1553            properties: HashMap::from_iter(vec![(
1554                "commit.retry.num-retries".to_string(),
1555                "1".to_string(),
1556            )]),
1557            snapshot_log: Vec::new(),
1558            metadata_log: vec![MetadataLog {
1559                metadata_file: "s3://bucket/.../v1.json".to_string(),
1560                timestamp_ms: 1515100,
1561            }],
1562            refs: HashMap::new(),
1563            statistics: HashMap::new(),
1564            partition_statistics: HashMap::new(),
1565            encryption_keys: HashMap::new(),
1566        };
1567
1568        let expected_json_value = serde_json::to_value(&expected).unwrap();
1569        check_table_metadata_serde(data, expected);
1570
1571        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1572        assert_eq!(json_value, expected_json_value);
1573    }
1574
1575    #[test]
1576    fn test_table_data_v1() {
1577        let data = r#"
1578        {
1579            "format-version" : 1,
1580            "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1581            "location" : "/home/iceberg/warehouse/nyc/taxis",
1582            "last-updated-ms" : 1662532818843,
1583            "last-column-id" : 5,
1584            "schema" : {
1585              "type" : "struct",
1586              "schema-id" : 0,
1587              "fields" : [ {
1588                "id" : 1,
1589                "name" : "vendor_id",
1590                "required" : false,
1591                "type" : "long"
1592              }, {
1593                "id" : 2,
1594                "name" : "trip_id",
1595                "required" : false,
1596                "type" : "long"
1597              }, {
1598                "id" : 3,
1599                "name" : "trip_distance",
1600                "required" : false,
1601                "type" : "float"
1602              }, {
1603                "id" : 4,
1604                "name" : "fare_amount",
1605                "required" : false,
1606                "type" : "double"
1607              }, {
1608                "id" : 5,
1609                "name" : "store_and_fwd_flag",
1610                "required" : false,
1611                "type" : "string"
1612              } ]
1613            },
1614            "partition-spec" : [ {
1615              "name" : "vendor_id",
1616              "transform" : "identity",
1617              "source-id" : 1,
1618              "field-id" : 1000
1619            } ],
1620            "last-partition-id" : 1000,
1621            "default-sort-order-id" : 0,
1622            "sort-orders" : [ {
1623              "order-id" : 0,
1624              "fields" : [ ]
1625            } ],
1626            "properties" : {
1627              "owner" : "root"
1628            },
1629            "current-snapshot-id" : 638933773299822130,
1630            "refs" : {
1631              "main" : {
1632                "snapshot-id" : 638933773299822130,
1633                "type" : "branch"
1634              }
1635            },
1636            "snapshots" : [ {
1637              "snapshot-id" : 638933773299822130,
1638              "timestamp-ms" : 1662532818843,
1639              "sequence-number" : 0,
1640              "summary" : {
1641                "operation" : "append",
1642                "spark.app.id" : "local-1662532784305",
1643                "added-data-files" : "4",
1644                "added-records" : "4",
1645                "added-files-size" : "6001"
1646              },
1647              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1648              "schema-id" : 0
1649            } ],
1650            "snapshot-log" : [ {
1651              "timestamp-ms" : 1662532818843,
1652              "snapshot-id" : 638933773299822130
1653            } ],
1654            "metadata-log" : [ {
1655              "timestamp-ms" : 1662532805245,
1656              "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
1657            } ]
1658          }
1659        "#;
1660
1661        let schema = Schema::builder()
1662            .with_fields(vec![
1663                Arc::new(NestedField::optional(
1664                    1,
1665                    "vendor_id",
1666                    Type::Primitive(PrimitiveType::Long),
1667                )),
1668                Arc::new(NestedField::optional(
1669                    2,
1670                    "trip_id",
1671                    Type::Primitive(PrimitiveType::Long),
1672                )),
1673                Arc::new(NestedField::optional(
1674                    3,
1675                    "trip_distance",
1676                    Type::Primitive(PrimitiveType::Float),
1677                )),
1678                Arc::new(NestedField::optional(
1679                    4,
1680                    "fare_amount",
1681                    Type::Primitive(PrimitiveType::Double),
1682                )),
1683                Arc::new(NestedField::optional(
1684                    5,
1685                    "store_and_fwd_flag",
1686                    Type::Primitive(PrimitiveType::String),
1687                )),
1688            ])
1689            .build()
1690            .unwrap();
1691
1692        let schema = Arc::new(schema);
1693        let partition_spec = PartitionSpec::builder(schema.clone())
1694            .with_spec_id(0)
1695            .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
1696            .unwrap()
1697            .build()
1698            .unwrap();
1699
1700        let sort_order = SortOrder::builder()
1701            .with_order_id(0)
1702            .build_unbound()
1703            .unwrap();
1704
1705        let snapshot = Snapshot::builder()
1706            .with_snapshot_id(638933773299822130)
1707            .with_timestamp_ms(1662532818843)
1708            .with_sequence_number(0)
1709            .with_schema_id(0)
1710            .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
1711            .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
1712            .build();
1713
1714        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1715        let expected = TableMetadata {
1716            format_version: FormatVersion::V1,
1717            table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
1718            location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
1719            last_updated_ms: 1662532818843,
1720            last_column_id: 5,
1721            schemas: HashMap::from_iter(vec![(0, schema)]),
1722            current_schema_id: 0,
1723            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1724            default_partition_type,
1725            default_spec: Arc::new(partition_spec),
1726            last_partition_id: 1000,
1727            default_sort_order_id: 0,
1728            sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
1729            snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
1730            current_snapshot_id: Some(638933773299822130),
1731            last_sequence_number: 0,
1732            properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
1733            snapshot_log: vec![SnapshotLog {
1734                snapshot_id: 638933773299822130,
1735                timestamp_ms: 1662532818843,
1736            }],
1737            metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
1738            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
1739            statistics: HashMap::new(),
1740            partition_statistics: HashMap::new(),
1741            encryption_keys: HashMap::new(),
1742        };
1743
1744        check_table_metadata_serde(data, expected);
1745    }
1746
1747    #[test]
1748    fn test_table_data_v2_no_snapshots() {
1749        let data = r#"
1750        {
1751            "format-version" : 2,
1752            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1753            "location": "s3://b/wh/data.db/table",
1754            "last-sequence-number" : 1,
1755            "last-updated-ms": 1515100955770,
1756            "last-column-id": 1,
1757            "schemas": [
1758                {
1759                    "schema-id" : 1,
1760                    "type" : "struct",
1761                    "fields" :[
1762                        {
1763                            "id": 1,
1764                            "name": "struct_name",
1765                            "required": true,
1766                            "type": "fixed[1]"
1767                        }
1768                    ]
1769                }
1770            ],
1771            "current-schema-id" : 1,
1772            "partition-specs": [
1773                {
1774                    "spec-id": 0,
1775                    "fields": []
1776                }
1777            ],
1778            "refs": {},
1779            "default-spec-id": 0,
1780            "last-partition-id": 1000,
1781            "metadata-log": [
1782                {
1783                    "metadata-file": "s3://bucket/.../v1.json",
1784                    "timestamp-ms": 1515100
1785                }
1786            ],
1787            "sort-orders": [
1788                {
1789                "order-id": 0,
1790                "fields": []
1791                }
1792            ],
1793            "default-sort-order-id": 0
1794        }
1795        "#;
1796
1797        let schema = Schema::builder()
1798            .with_schema_id(1)
1799            .with_fields(vec![Arc::new(NestedField::required(
1800                1,
1801                "struct_name",
1802                Type::Primitive(PrimitiveType::Fixed(1)),
1803            ))])
1804            .build()
1805            .unwrap();
1806
1807        let partition_spec = PartitionSpec::builder(schema.clone())
1808            .with_spec_id(0)
1809            .build()
1810            .unwrap();
1811
1812        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1813        let expected = TableMetadata {
1814            format_version: FormatVersion::V2,
1815            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1816            location: "s3://b/wh/data.db/table".to_string(),
1817            last_updated_ms: 1515100955770,
1818            last_column_id: 1,
1819            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1820            current_schema_id: 1,
1821            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1822            default_partition_type,
1823            default_spec: partition_spec.into(),
1824            last_partition_id: 1000,
1825            default_sort_order_id: 0,
1826            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1827            snapshots: HashMap::default(),
1828            current_snapshot_id: None,
1829            last_sequence_number: 1,
1830            properties: HashMap::new(),
1831            snapshot_log: Vec::new(),
1832            metadata_log: vec![MetadataLog {
1833                metadata_file: "s3://bucket/.../v1.json".to_string(),
1834                timestamp_ms: 1515100,
1835            }],
1836            refs: HashMap::new(),
1837            statistics: HashMap::new(),
1838            partition_statistics: HashMap::new(),
1839            encryption_keys: HashMap::new(),
1840        };
1841
1842        let expected_json_value = serde_json::to_value(&expected).unwrap();
1843        check_table_metadata_serde(data, expected);
1844
1845        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1846        assert_eq!(json_value, expected_json_value);
1847    }
1848
1849    #[test]
1850    fn test_current_snapshot_id_must_match_main_branch() {
1851        let data = r#"
1852        {
1853            "format-version" : 2,
1854            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1855            "location": "s3://b/wh/data.db/table",
1856            "last-sequence-number" : 1,
1857            "last-updated-ms": 1515100955770,
1858            "last-column-id": 1,
1859            "schemas": [
1860                {
1861                    "schema-id" : 1,
1862                    "type" : "struct",
1863                    "fields" :[
1864                        {
1865                            "id": 1,
1866                            "name": "struct_name",
1867                            "required": true,
1868                            "type": "fixed[1]"
1869                        },
1870                        {
1871                            "id": 4,
1872                            "name": "ts",
1873                            "required": true,
1874                            "type": "timestamp"
1875                        }
1876                    ]
1877                }
1878            ],
1879            "current-schema-id" : 1,
1880            "partition-specs": [
1881                {
1882                    "spec-id": 0,
1883                    "fields": [
1884                        {
1885                            "source-id": 4,
1886                            "field-id": 1000,
1887                            "name": "ts_day",
1888                            "transform": "day"
1889                        }
1890                    ]
1891                }
1892            ],
1893            "default-spec-id": 0,
1894            "last-partition-id": 1000,
1895            "properties": {
1896                "commit.retry.num-retries": "1"
1897            },
1898            "metadata-log": [
1899                {
1900                    "metadata-file": "s3://bucket/.../v1.json",
1901                    "timestamp-ms": 1515100
1902                }
1903            ],
1904            "sort-orders": [
1905                {
1906                "order-id": 0,
1907                "fields": []
1908                }
1909            ],
1910            "default-sort-order-id": 0,
1911            "current-snapshot-id" : 1,
1912            "refs" : {
1913              "main" : {
1914                "snapshot-id" : 2,
1915                "type" : "branch"
1916              }
1917            },
1918            "snapshots" : [ {
1919              "snapshot-id" : 1,
1920              "timestamp-ms" : 1662532818843,
1921              "sequence-number" : 0,
1922              "summary" : {
1923                "operation" : "append",
1924                "spark.app.id" : "local-1662532784305",
1925                "added-data-files" : "4",
1926                "added-records" : "4",
1927                "added-files-size" : "6001"
1928              },
1929              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1930              "schema-id" : 0
1931            },
1932            {
1933              "snapshot-id" : 2,
1934              "timestamp-ms" : 1662532818844,
1935              "sequence-number" : 0,
1936              "summary" : {
1937                "operation" : "append",
1938                "spark.app.id" : "local-1662532784305",
1939                "added-data-files" : "4",
1940                "added-records" : "4",
1941                "added-files-size" : "6001"
1942              },
1943              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1944              "schema-id" : 0
1945            } ]
1946        }
1947    "#;
1948
1949        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
1950        assert!(
1951            err.to_string()
1952                .contains("Current snapshot id does not match main branch")
1953        );
1954    }
1955
1956    #[test]
1957    fn test_main_without_current() {
1958        let data = r#"
1959        {
1960            "format-version" : 2,
1961            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1962            "location": "s3://b/wh/data.db/table",
1963            "last-sequence-number" : 1,
1964            "last-updated-ms": 1515100955770,
1965            "last-column-id": 1,
1966            "schemas": [
1967                {
1968                    "schema-id" : 1,
1969                    "type" : "struct",
1970                    "fields" :[
1971                        {
1972                            "id": 1,
1973                            "name": "struct_name",
1974                            "required": true,
1975                            "type": "fixed[1]"
1976                        },
1977                        {
1978                            "id": 4,
1979                            "name": "ts",
1980                            "required": true,
1981                            "type": "timestamp"
1982                        }
1983                    ]
1984                }
1985            ],
1986            "current-schema-id" : 1,
1987            "partition-specs": [
1988                {
1989                    "spec-id": 0,
1990                    "fields": [
1991                        {
1992                            "source-id": 4,
1993                            "field-id": 1000,
1994                            "name": "ts_day",
1995                            "transform": "day"
1996                        }
1997                    ]
1998                }
1999            ],
2000            "default-spec-id": 0,
2001            "last-partition-id": 1000,
2002            "properties": {
2003                "commit.retry.num-retries": "1"
2004            },
2005            "metadata-log": [
2006                {
2007                    "metadata-file": "s3://bucket/.../v1.json",
2008                    "timestamp-ms": 1515100
2009                }
2010            ],
2011            "sort-orders": [
2012                {
2013                "order-id": 0,
2014                "fields": []
2015                }
2016            ],
2017            "default-sort-order-id": 0,
2018            "refs" : {
2019              "main" : {
2020                "snapshot-id" : 1,
2021                "type" : "branch"
2022              }
2023            },
2024            "snapshots" : [ {
2025              "snapshot-id" : 1,
2026              "timestamp-ms" : 1662532818843,
2027              "sequence-number" : 0,
2028              "summary" : {
2029                "operation" : "append",
2030                "spark.app.id" : "local-1662532784305",
2031                "added-data-files" : "4",
2032                "added-records" : "4",
2033                "added-files-size" : "6001"
2034              },
2035              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2036              "schema-id" : 0
2037            } ]
2038        }
2039    "#;
2040
2041        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2042        assert!(
2043            err.to_string()
2044                .contains("Current snapshot is not set, but main branch exists")
2045        );
2046    }
2047
2048    #[test]
2049    fn test_branch_snapshot_missing() {
2050        let data = r#"
2051        {
2052            "format-version" : 2,
2053            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2054            "location": "s3://b/wh/data.db/table",
2055            "last-sequence-number" : 1,
2056            "last-updated-ms": 1515100955770,
2057            "last-column-id": 1,
2058            "schemas": [
2059                {
2060                    "schema-id" : 1,
2061                    "type" : "struct",
2062                    "fields" :[
2063                        {
2064                            "id": 1,
2065                            "name": "struct_name",
2066                            "required": true,
2067                            "type": "fixed[1]"
2068                        },
2069                        {
2070                            "id": 4,
2071                            "name": "ts",
2072                            "required": true,
2073                            "type": "timestamp"
2074                        }
2075                    ]
2076                }
2077            ],
2078            "current-schema-id" : 1,
2079            "partition-specs": [
2080                {
2081                    "spec-id": 0,
2082                    "fields": [
2083                        {
2084                            "source-id": 4,
2085                            "field-id": 1000,
2086                            "name": "ts_day",
2087                            "transform": "day"
2088                        }
2089                    ]
2090                }
2091            ],
2092            "default-spec-id": 0,
2093            "last-partition-id": 1000,
2094            "properties": {
2095                "commit.retry.num-retries": "1"
2096            },
2097            "metadata-log": [
2098                {
2099                    "metadata-file": "s3://bucket/.../v1.json",
2100                    "timestamp-ms": 1515100
2101                }
2102            ],
2103            "sort-orders": [
2104                {
2105                "order-id": 0,
2106                "fields": []
2107                }
2108            ],
2109            "default-sort-order-id": 0,
2110            "refs" : {
2111              "main" : {
2112                "snapshot-id" : 1,
2113                "type" : "branch"
2114              },
2115              "foo" : {
2116                "snapshot-id" : 2,
2117                "type" : "branch"
2118              }
2119            },
2120            "snapshots" : [ {
2121              "snapshot-id" : 1,
2122              "timestamp-ms" : 1662532818843,
2123              "sequence-number" : 0,
2124              "summary" : {
2125                "operation" : "append",
2126                "spark.app.id" : "local-1662532784305",
2127                "added-data-files" : "4",
2128                "added-records" : "4",
2129                "added-files-size" : "6001"
2130              },
2131              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2132              "schema-id" : 0
2133            } ]
2134        }
2135    "#;
2136
2137        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2138        assert!(
2139            err.to_string().contains(
2140                "Snapshot for reference foo does not exist in the existing snapshots list"
2141            )
2142        );
2143    }
2144
2145    #[test]
2146    fn test_v2_wrong_max_snapshot_sequence_number() {
2147        let data = r#"
2148        {
2149            "format-version": 2,
2150            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2151            "location": "s3://bucket/test/location",
2152            "last-sequence-number": 1,
2153            "last-updated-ms": 1602638573590,
2154            "last-column-id": 3,
2155            "current-schema-id": 0,
2156            "schemas": [
2157                {
2158                    "type": "struct",
2159                    "schema-id": 0,
2160                    "fields": [
2161                        {
2162                            "id": 1,
2163                            "name": "x",
2164                            "required": true,
2165                            "type": "long"
2166                        }
2167                    ]
2168                }
2169            ],
2170            "default-spec-id": 0,
2171            "partition-specs": [
2172                {
2173                    "spec-id": 0,
2174                    "fields": []
2175                }
2176            ],
2177            "last-partition-id": 1000,
2178            "default-sort-order-id": 0,
2179            "sort-orders": [
2180                {
2181                    "order-id": 0,
2182                    "fields": []
2183                }
2184            ],
2185            "properties": {},
2186            "current-snapshot-id": 3055729675574597004,
2187            "snapshots": [
2188                {
2189                    "snapshot-id": 3055729675574597004,
2190                    "timestamp-ms": 1555100955770,
2191                    "sequence-number": 4,
2192                    "summary": {
2193                        "operation": "append"
2194                    },
2195                    "manifest-list": "s3://a/b/2.avro",
2196                    "schema-id": 0
2197                }
2198            ],
2199            "statistics": [],
2200            "snapshot-log": [],
2201            "metadata-log": []
2202        }
2203    "#;
2204
2205        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2206        println!("{}", err);
2207        assert!(err.to_string().contains(
2208            "Invalid snapshot with id 3055729675574597004 and sequence number 4 greater than last sequence number 1"
2209        ));
2210
2211        // Change max sequence number to 4 - should work
2212        let data = data.replace(
2213            r#""last-sequence-number": 1,"#,
2214            r#""last-sequence-number": 4,"#,
2215        );
2216        let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2217        assert_eq!(metadata.last_sequence_number, 4);
2218
2219        // Change max sequence number to 5 - should work
2220        let data = data.replace(
2221            r#""last-sequence-number": 4,"#,
2222            r#""last-sequence-number": 5,"#,
2223        );
2224        let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2225        assert_eq!(metadata.last_sequence_number, 5);
2226    }
2227
2228    #[test]
2229    fn test_statistic_files() {
2230        let data = r#"
2231        {
2232            "format-version": 2,
2233            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2234            "location": "s3://bucket/test/location",
2235            "last-sequence-number": 34,
2236            "last-updated-ms": 1602638573590,
2237            "last-column-id": 3,
2238            "current-schema-id": 0,
2239            "schemas": [
2240                {
2241                    "type": "struct",
2242                    "schema-id": 0,
2243                    "fields": [
2244                        {
2245                            "id": 1,
2246                            "name": "x",
2247                            "required": true,
2248                            "type": "long"
2249                        }
2250                    ]
2251                }
2252            ],
2253            "default-spec-id": 0,
2254            "partition-specs": [
2255                {
2256                    "spec-id": 0,
2257                    "fields": []
2258                }
2259            ],
2260            "last-partition-id": 1000,
2261            "default-sort-order-id": 0,
2262            "sort-orders": [
2263                {
2264                    "order-id": 0,
2265                    "fields": []
2266                }
2267            ],
2268            "properties": {},
2269            "current-snapshot-id": 3055729675574597004,
2270            "snapshots": [
2271                {
2272                    "snapshot-id": 3055729675574597004,
2273                    "timestamp-ms": 1555100955770,
2274                    "sequence-number": 1,
2275                    "summary": {
2276                        "operation": "append"
2277                    },
2278                    "manifest-list": "s3://a/b/2.avro",
2279                    "schema-id": 0
2280                }
2281            ],
2282            "statistics": [
2283                {
2284                    "snapshot-id": 3055729675574597004,
2285                    "statistics-path": "s3://a/b/stats.puffin",
2286                    "file-size-in-bytes": 413,
2287                    "file-footer-size-in-bytes": 42,
2288                    "blob-metadata": [
2289                        {
2290                            "type": "ndv",
2291                            "snapshot-id": 3055729675574597004,
2292                            "sequence-number": 1,
2293                            "fields": [
2294                                1
2295                            ]
2296                        }
2297                    ]
2298                }
2299            ],
2300            "snapshot-log": [],
2301            "metadata-log": []
2302        }
2303    "#;
2304
2305        let schema = Schema::builder()
2306            .with_schema_id(0)
2307            .with_fields(vec![Arc::new(NestedField::required(
2308                1,
2309                "x",
2310                Type::Primitive(PrimitiveType::Long),
2311            ))])
2312            .build()
2313            .unwrap();
2314        let partition_spec = PartitionSpec::builder(schema.clone())
2315            .with_spec_id(0)
2316            .build()
2317            .unwrap();
2318        let snapshot = Snapshot::builder()
2319            .with_snapshot_id(3055729675574597004)
2320            .with_timestamp_ms(1555100955770)
2321            .with_sequence_number(1)
2322            .with_manifest_list("s3://a/b/2.avro")
2323            .with_schema_id(0)
2324            .with_summary(Summary {
2325                operation: Operation::Append,
2326                additional_properties: HashMap::new(),
2327            })
2328            .build();
2329
2330        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2331        let expected = TableMetadata {
2332            format_version: FormatVersion::V2,
2333            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2334            location: "s3://bucket/test/location".to_string(),
2335            last_updated_ms: 1602638573590,
2336            last_column_id: 3,
2337            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2338            current_schema_id: 0,
2339            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2340            default_partition_type,
2341            default_spec: Arc::new(partition_spec),
2342            last_partition_id: 1000,
2343            default_sort_order_id: 0,
2344            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2345            snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2346            current_snapshot_id: Some(3055729675574597004),
2347            last_sequence_number: 34,
2348            properties: HashMap::new(),
2349            snapshot_log: Vec::new(),
2350            metadata_log: Vec::new(),
2351            statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile {
2352                snapshot_id: 3055729675574597004,
2353                statistics_path: "s3://a/b/stats.puffin".to_string(),
2354                file_size_in_bytes: 413,
2355                file_footer_size_in_bytes: 42,
2356                key_metadata: None,
2357                blob_metadata: vec![BlobMetadata {
2358                    snapshot_id: 3055729675574597004,
2359                    sequence_number: 1,
2360                    fields: vec![1],
2361                    r#type: "ndv".to_string(),
2362                    properties: HashMap::new(),
2363                }],
2364            })]),
2365            partition_statistics: HashMap::new(),
2366            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2367                snapshot_id: 3055729675574597004,
2368                retention: SnapshotRetention::Branch {
2369                    min_snapshots_to_keep: None,
2370                    max_snapshot_age_ms: None,
2371                    max_ref_age_ms: None,
2372                },
2373            })]),
2374            encryption_keys: HashMap::new(),
2375        };
2376
2377        check_table_metadata_serde(data, expected);
2378    }
2379
2380    #[test]
2381    fn test_partition_statistics_file() {
2382        let data = r#"
2383        {
2384            "format-version": 2,
2385            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2386            "location": "s3://bucket/test/location",
2387            "last-sequence-number": 34,
2388            "last-updated-ms": 1602638573590,
2389            "last-column-id": 3,
2390            "current-schema-id": 0,
2391            "schemas": [
2392                {
2393                    "type": "struct",
2394                    "schema-id": 0,
2395                    "fields": [
2396                        {
2397                            "id": 1,
2398                            "name": "x",
2399                            "required": true,
2400                            "type": "long"
2401                        }
2402                    ]
2403                }
2404            ],
2405            "default-spec-id": 0,
2406            "partition-specs": [
2407                {
2408                    "spec-id": 0,
2409                    "fields": []
2410                }
2411            ],
2412            "last-partition-id": 1000,
2413            "default-sort-order-id": 0,
2414            "sort-orders": [
2415                {
2416                    "order-id": 0,
2417                    "fields": []
2418                }
2419            ],
2420            "properties": {},
2421            "current-snapshot-id": 3055729675574597004,
2422            "snapshots": [
2423                {
2424                    "snapshot-id": 3055729675574597004,
2425                    "timestamp-ms": 1555100955770,
2426                    "sequence-number": 1,
2427                    "summary": {
2428                        "operation": "append"
2429                    },
2430                    "manifest-list": "s3://a/b/2.avro",
2431                    "schema-id": 0
2432                }
2433            ],
2434            "partition-statistics": [
2435                {
2436                    "snapshot-id": 3055729675574597004,
2437                    "statistics-path": "s3://a/b/partition-stats.parquet",
2438                    "file-size-in-bytes": 43
2439                }
2440            ],
2441            "snapshot-log": [],
2442            "metadata-log": []
2443        }
2444        "#;
2445
2446        let schema = Schema::builder()
2447            .with_schema_id(0)
2448            .with_fields(vec![Arc::new(NestedField::required(
2449                1,
2450                "x",
2451                Type::Primitive(PrimitiveType::Long),
2452            ))])
2453            .build()
2454            .unwrap();
2455        let partition_spec = PartitionSpec::builder(schema.clone())
2456            .with_spec_id(0)
2457            .build()
2458            .unwrap();
2459        let snapshot = Snapshot::builder()
2460            .with_snapshot_id(3055729675574597004)
2461            .with_timestamp_ms(1555100955770)
2462            .with_sequence_number(1)
2463            .with_manifest_list("s3://a/b/2.avro")
2464            .with_schema_id(0)
2465            .with_summary(Summary {
2466                operation: Operation::Append,
2467                additional_properties: HashMap::new(),
2468            })
2469            .build();
2470
2471        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2472        let expected = TableMetadata {
2473            format_version: FormatVersion::V2,
2474            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2475            location: "s3://bucket/test/location".to_string(),
2476            last_updated_ms: 1602638573590,
2477            last_column_id: 3,
2478            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2479            current_schema_id: 0,
2480            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2481            default_spec: Arc::new(partition_spec),
2482            default_partition_type,
2483            last_partition_id: 1000,
2484            default_sort_order_id: 0,
2485            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2486            snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2487            current_snapshot_id: Some(3055729675574597004),
2488            last_sequence_number: 34,
2489            properties: HashMap::new(),
2490            snapshot_log: Vec::new(),
2491            metadata_log: Vec::new(),
2492            statistics: HashMap::new(),
2493            partition_statistics: HashMap::from_iter(vec![(
2494                3055729675574597004,
2495                PartitionStatisticsFile {
2496                    snapshot_id: 3055729675574597004,
2497                    statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2498                    file_size_in_bytes: 43,
2499                },
2500            )]),
2501            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2502                snapshot_id: 3055729675574597004,
2503                retention: SnapshotRetention::Branch {
2504                    min_snapshots_to_keep: None,
2505                    max_snapshot_age_ms: None,
2506                    max_ref_age_ms: None,
2507                },
2508            })]),
2509            encryption_keys: HashMap::new(),
2510        };
2511
2512        check_table_metadata_serde(data, expected);
2513    }
2514
2515    #[test]
2516    fn test_invalid_table_uuid() -> Result<()> {
2517        let data = r#"
2518            {
2519                "format-version" : 2,
2520                "table-uuid": "xxxx"
2521            }
2522        "#;
2523        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2524        Ok(())
2525    }
2526
2527    #[test]
2528    fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
2529        let data = r#"
2530            {
2531                "format-version" : 1
2532            }
2533        "#;
2534        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2535        Ok(())
2536    }
2537
2538    #[test]
2539    fn test_table_metadata_v2_file_valid() {
2540        let metadata =
2541            fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
2542
2543        let schema1 = Schema::builder()
2544            .with_schema_id(0)
2545            .with_fields(vec![Arc::new(NestedField::required(
2546                1,
2547                "x",
2548                Type::Primitive(PrimitiveType::Long),
2549            ))])
2550            .build()
2551            .unwrap();
2552
2553        let schema2 = Schema::builder()
2554            .with_schema_id(1)
2555            .with_fields(vec![
2556                Arc::new(NestedField::required(
2557                    1,
2558                    "x",
2559                    Type::Primitive(PrimitiveType::Long),
2560                )),
2561                Arc::new(
2562                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2563                        .with_doc("comment"),
2564                ),
2565                Arc::new(NestedField::required(
2566                    3,
2567                    "z",
2568                    Type::Primitive(PrimitiveType::Long),
2569                )),
2570            ])
2571            .with_identifier_field_ids(vec![1, 2])
2572            .build()
2573            .unwrap();
2574
2575        let partition_spec = PartitionSpec::builder(schema2.clone())
2576            .with_spec_id(0)
2577            .add_unbound_field(UnboundPartitionField {
2578                name: "x".to_string(),
2579                transform: Transform::Identity,
2580                source_id: 1,
2581                field_id: Some(1000),
2582            })
2583            .unwrap()
2584            .build()
2585            .unwrap();
2586
2587        let sort_order = SortOrder::builder()
2588            .with_order_id(3)
2589            .with_sort_field(SortField {
2590                source_id: 2,
2591                transform: Transform::Identity,
2592                direction: SortDirection::Ascending,
2593                null_order: NullOrder::First,
2594            })
2595            .with_sort_field(SortField {
2596                source_id: 3,
2597                transform: Transform::Bucket(4),
2598                direction: SortDirection::Descending,
2599                null_order: NullOrder::Last,
2600            })
2601            .build_unbound()
2602            .unwrap();
2603
2604        let snapshot1 = Snapshot::builder()
2605            .with_snapshot_id(3051729675574597004)
2606            .with_timestamp_ms(1515100955770)
2607            .with_sequence_number(0)
2608            .with_manifest_list("s3://a/b/1.avro")
2609            .with_summary(Summary {
2610                operation: Operation::Append,
2611                additional_properties: HashMap::new(),
2612            })
2613            .build();
2614
2615        let snapshot2 = Snapshot::builder()
2616            .with_snapshot_id(3055729675574597004)
2617            .with_parent_snapshot_id(Some(3051729675574597004))
2618            .with_timestamp_ms(1555100955770)
2619            .with_sequence_number(1)
2620            .with_schema_id(1)
2621            .with_manifest_list("s3://a/b/2.avro")
2622            .with_summary(Summary {
2623                operation: Operation::Append,
2624                additional_properties: HashMap::new(),
2625            })
2626            .build();
2627
2628        let default_partition_type = partition_spec.partition_type(&schema2).unwrap();
2629        let expected = TableMetadata {
2630            format_version: FormatVersion::V2,
2631            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2632            location: "s3://bucket/test/location".to_string(),
2633            last_updated_ms: 1602638573590,
2634            last_column_id: 3,
2635            schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]),
2636            current_schema_id: 1,
2637            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2638            default_spec: Arc::new(partition_spec),
2639            default_partition_type,
2640            last_partition_id: 1000,
2641            default_sort_order_id: 3,
2642            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2643            snapshots: HashMap::from_iter(vec![
2644                (3051729675574597004, Arc::new(snapshot1)),
2645                (3055729675574597004, Arc::new(snapshot2)),
2646            ]),
2647            current_snapshot_id: Some(3055729675574597004),
2648            last_sequence_number: 34,
2649            properties: HashMap::new(),
2650            snapshot_log: vec![
2651                SnapshotLog {
2652                    snapshot_id: 3051729675574597004,
2653                    timestamp_ms: 1515100955770,
2654                },
2655                SnapshotLog {
2656                    snapshot_id: 3055729675574597004,
2657                    timestamp_ms: 1555100955770,
2658                },
2659            ],
2660            metadata_log: Vec::new(),
2661            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2662                snapshot_id: 3055729675574597004,
2663                retention: SnapshotRetention::Branch {
2664                    min_snapshots_to_keep: None,
2665                    max_snapshot_age_ms: None,
2666                    max_ref_age_ms: None,
2667                },
2668            })]),
2669            statistics: HashMap::new(),
2670            partition_statistics: HashMap::new(),
2671            encryption_keys: HashMap::new(),
2672        };
2673
2674        check_table_metadata_serde(&metadata, expected);
2675    }
2676
2677    #[test]
2678    fn test_table_metadata_v2_file_valid_minimal() {
2679        let metadata =
2680            fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
2681
2682        let schema = Schema::builder()
2683            .with_schema_id(0)
2684            .with_fields(vec![
2685                Arc::new(NestedField::required(
2686                    1,
2687                    "x",
2688                    Type::Primitive(PrimitiveType::Long),
2689                )),
2690                Arc::new(
2691                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2692                        .with_doc("comment"),
2693                ),
2694                Arc::new(NestedField::required(
2695                    3,
2696                    "z",
2697                    Type::Primitive(PrimitiveType::Long),
2698                )),
2699            ])
2700            .build()
2701            .unwrap();
2702
2703        let partition_spec = PartitionSpec::builder(schema.clone())
2704            .with_spec_id(0)
2705            .add_unbound_field(UnboundPartitionField {
2706                name: "x".to_string(),
2707                transform: Transform::Identity,
2708                source_id: 1,
2709                field_id: Some(1000),
2710            })
2711            .unwrap()
2712            .build()
2713            .unwrap();
2714
2715        let sort_order = SortOrder::builder()
2716            .with_order_id(3)
2717            .with_sort_field(SortField {
2718                source_id: 2,
2719                transform: Transform::Identity,
2720                direction: SortDirection::Ascending,
2721                null_order: NullOrder::First,
2722            })
2723            .with_sort_field(SortField {
2724                source_id: 3,
2725                transform: Transform::Bucket(4),
2726                direction: SortDirection::Descending,
2727                null_order: NullOrder::Last,
2728            })
2729            .build_unbound()
2730            .unwrap();
2731
2732        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2733        let expected = TableMetadata {
2734            format_version: FormatVersion::V2,
2735            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2736            location: "s3://bucket/test/location".to_string(),
2737            last_updated_ms: 1602638573590,
2738            last_column_id: 3,
2739            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2740            current_schema_id: 0,
2741            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2742            default_partition_type,
2743            default_spec: Arc::new(partition_spec),
2744            last_partition_id: 1000,
2745            default_sort_order_id: 3,
2746            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2747            snapshots: HashMap::default(),
2748            current_snapshot_id: None,
2749            last_sequence_number: 34,
2750            properties: HashMap::new(),
2751            snapshot_log: vec![],
2752            metadata_log: Vec::new(),
2753            refs: HashMap::new(),
2754            statistics: HashMap::new(),
2755            partition_statistics: HashMap::new(),
2756            encryption_keys: HashMap::new(),
2757        };
2758
2759        check_table_metadata_serde(&metadata, expected);
2760    }
2761
2762    #[test]
2763    fn test_table_metadata_v1_file_valid() {
2764        let metadata =
2765            fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
2766
2767        let schema = Schema::builder()
2768            .with_schema_id(0)
2769            .with_fields(vec![
2770                Arc::new(NestedField::required(
2771                    1,
2772                    "x",
2773                    Type::Primitive(PrimitiveType::Long),
2774                )),
2775                Arc::new(
2776                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2777                        .with_doc("comment"),
2778                ),
2779                Arc::new(NestedField::required(
2780                    3,
2781                    "z",
2782                    Type::Primitive(PrimitiveType::Long),
2783                )),
2784            ])
2785            .build()
2786            .unwrap();
2787
2788        let partition_spec = PartitionSpec::builder(schema.clone())
2789            .with_spec_id(0)
2790            .add_unbound_field(UnboundPartitionField {
2791                name: "x".to_string(),
2792                transform: Transform::Identity,
2793                source_id: 1,
2794                field_id: Some(1000),
2795            })
2796            .unwrap()
2797            .build()
2798            .unwrap();
2799
2800        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2801        let expected = TableMetadata {
2802            format_version: FormatVersion::V1,
2803            table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
2804            location: "s3://bucket/test/location".to_string(),
2805            last_updated_ms: 1602638573874,
2806            last_column_id: 3,
2807            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2808            current_schema_id: 0,
2809            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2810            default_spec: Arc::new(partition_spec),
2811            default_partition_type,
2812            last_partition_id: 0,
2813            default_sort_order_id: 0,
2814            // Sort order is added during deserialization for V2 compatibility
2815            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2816            snapshots: HashMap::new(),
2817            current_snapshot_id: None,
2818            last_sequence_number: 0,
2819            properties: HashMap::new(),
2820            snapshot_log: vec![],
2821            metadata_log: Vec::new(),
2822            refs: HashMap::new(),
2823            statistics: HashMap::new(),
2824            partition_statistics: HashMap::new(),
2825            encryption_keys: HashMap::new(),
2826        };
2827
2828        check_table_metadata_serde(&metadata, expected);
2829    }
2830
2831    #[test]
2832    fn test_table_metadata_v1_compat() {
2833        let metadata =
2834            fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
2835
2836        // Deserialize the JSON to verify it works
2837        let desered_type: TableMetadata = serde_json::from_str(&metadata)
2838            .expect("Failed to deserialize TableMetadataV1Compat.json");
2839
2840        // Verify some key fields match
2841        assert_eq!(desered_type.format_version(), FormatVersion::V1);
2842        assert_eq!(
2843            desered_type.uuid(),
2844            Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
2845        );
2846        assert_eq!(
2847            desered_type.location(),
2848            "s3://bucket/warehouse/iceberg/glue.db/table_name"
2849        );
2850        assert_eq!(desered_type.last_updated_ms(), 1727773114005);
2851        assert_eq!(desered_type.current_schema_id(), 0);
2852    }
2853
2854    #[test]
2855    fn test_table_metadata_v1_schemas_without_current_id() {
2856        let metadata = fs::read_to_string(
2857            "testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
2858        )
2859        .unwrap();
2860
2861        // Deserialize the JSON - this should succeed by using the 'schema' field instead of 'schemas'
2862        let desered_type: TableMetadata = serde_json::from_str(&metadata)
2863            .expect("Failed to deserialize TableMetadataV1SchemasWithoutCurrentId.json");
2864
2865        // Verify it used the 'schema' field
2866        assert_eq!(desered_type.format_version(), FormatVersion::V1);
2867        assert_eq!(
2868            desered_type.uuid(),
2869            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
2870        );
2871
2872        // Get the schema and verify it has the expected fields
2873        let schema = desered_type.current_schema();
2874        assert_eq!(schema.as_struct().fields().len(), 3);
2875        assert_eq!(schema.as_struct().fields()[0].name, "x");
2876        assert_eq!(schema.as_struct().fields()[1].name, "y");
2877        assert_eq!(schema.as_struct().fields()[2].name, "z");
2878    }
2879
2880    #[test]
2881    fn test_table_metadata_v1_no_valid_schema() {
2882        let metadata =
2883            fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
2884                .unwrap();
2885
2886        // Deserialize the JSON - this should fail because neither schemas + current_schema_id nor schema is valid
2887        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
2888
2889        assert!(desered.is_err());
2890        let error_message = desered.unwrap_err().to_string();
2891        assert!(
2892            error_message.contains("No valid schema configuration found"),
2893            "Expected error about no valid schema configuration, got: {}",
2894            error_message
2895        );
2896    }
2897
2898    #[test]
2899    fn test_table_metadata_v1_partition_specs_without_default_id() {
2900        let metadata = fs::read_to_string(
2901            "testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
2902        )
2903        .unwrap();
2904
2905        // Deserialize the JSON - this should succeed by inferring default_spec_id as the max spec ID
2906        let desered_type: TableMetadata = serde_json::from_str(&metadata)
2907            .expect("Failed to deserialize TableMetadataV1PartitionSpecsWithoutDefaultId.json");
2908
2909        // Verify basic metadata
2910        assert_eq!(desered_type.format_version(), FormatVersion::V1);
2911        assert_eq!(
2912            desered_type.uuid(),
2913            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
2914        );
2915
2916        // Verify partition specs
2917        assert_eq!(desered_type.default_partition_spec_id(), 2); // Should pick the largest spec ID (2)
2918        assert_eq!(desered_type.partition_specs.len(), 2);
2919
2920        // Verify the default spec has the expected fields
2921        let default_spec = &desered_type.default_spec;
2922        assert_eq!(default_spec.spec_id(), 2);
2923        assert_eq!(default_spec.fields().len(), 1);
2924        assert_eq!(default_spec.fields()[0].name, "y");
2925        assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
2926        assert_eq!(default_spec.fields()[0].source_id, 2);
2927    }
2928
2929    #[test]
2930    fn test_table_metadata_v2_schema_not_found() {
2931        let metadata =
2932            fs::read_to_string("testdata/table_metadata/TableMetadataV2CurrentSchemaNotFound.json")
2933                .unwrap();
2934
2935        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
2936
2937        assert_eq!(
2938            desered.unwrap_err().to_string(),
2939            "DataInvalid => No schema exists with the current schema id 2."
2940        )
2941    }
2942
2943    #[test]
2944    fn test_table_metadata_v2_missing_sort_order() {
2945        let metadata =
2946            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
2947                .unwrap();
2948
2949        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
2950
2951        assert_eq!(
2952            desered.unwrap_err().to_string(),
2953            "data did not match any variant of untagged enum TableMetadataEnum"
2954        )
2955    }
2956
2957    #[test]
2958    fn test_table_metadata_v2_missing_partition_specs() {
2959        let metadata =
2960            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingPartitionSpecs.json")
2961                .unwrap();
2962
2963        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
2964
2965        assert_eq!(
2966            desered.unwrap_err().to_string(),
2967            "data did not match any variant of untagged enum TableMetadataEnum"
2968        )
2969    }
2970
2971    #[test]
2972    fn test_table_metadata_v2_missing_last_partition_id() {
2973        let metadata = fs::read_to_string(
2974            "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json",
2975        )
2976        .unwrap();
2977
2978        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
2979
2980        assert_eq!(
2981            desered.unwrap_err().to_string(),
2982            "data did not match any variant of untagged enum TableMetadataEnum"
2983        )
2984    }
2985
2986    #[test]
2987    fn test_table_metadata_v2_missing_schemas() {
2988        let metadata =
2989            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSchemas.json")
2990                .unwrap();
2991
2992        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
2993
2994        assert_eq!(
2995            desered.unwrap_err().to_string(),
2996            "data did not match any variant of untagged enum TableMetadataEnum"
2997        )
2998    }
2999
3000    #[test]
3001    fn test_table_metadata_v2_unsupported_version() {
3002        let metadata =
3003            fs::read_to_string("testdata/table_metadata/TableMetadataUnsupportedVersion.json")
3004                .unwrap();
3005
3006        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3007
3008        assert_eq!(
3009            desered.unwrap_err().to_string(),
3010            "data did not match any variant of untagged enum TableMetadataEnum"
3011        )
3012    }
3013
3014    #[test]
3015    fn test_order_of_format_version() {
3016        assert!(FormatVersion::V1 < FormatVersion::V2);
3017        assert_eq!(FormatVersion::V1, FormatVersion::V1);
3018        assert_eq!(FormatVersion::V2, FormatVersion::V2);
3019    }
3020
3021    #[test]
3022    fn test_default_partition_spec() {
3023        let default_spec_id = 1234;
3024        let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3025        let partition_spec = PartitionSpec::unpartition_spec();
3026        table_meta_data.default_spec = partition_spec.clone().into();
3027        table_meta_data
3028            .partition_specs
3029            .insert(default_spec_id, Arc::new(partition_spec));
3030
3031        assert_eq!(
3032            (*table_meta_data.default_partition_spec().clone()).clone(),
3033            (*table_meta_data
3034                .partition_spec_by_id(default_spec_id)
3035                .unwrap()
3036                .clone())
3037            .clone()
3038        );
3039    }
3040    #[test]
3041    fn test_default_sort_order() {
3042        let default_sort_order_id = 1234;
3043        let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3044        table_meta_data.default_sort_order_id = default_sort_order_id;
3045        table_meta_data
3046            .sort_orders
3047            .insert(default_sort_order_id, Arc::new(SortOrder::default()));
3048
3049        assert_eq!(
3050            table_meta_data.default_sort_order(),
3051            table_meta_data
3052                .sort_orders
3053                .get(&default_sort_order_id)
3054                .unwrap()
3055        )
3056    }
3057
3058    #[test]
3059    fn test_table_metadata_builder_from_table_creation() {
3060        let table_creation = TableCreation::builder()
3061            .location("s3://db/table".to_string())
3062            .name("table".to_string())
3063            .properties(HashMap::new())
3064            .schema(Schema::builder().build().unwrap())
3065            .build();
3066        let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3067            .unwrap()
3068            .build()
3069            .unwrap()
3070            .metadata;
3071        assert_eq!(table_metadata.location, "s3://db/table");
3072        assert_eq!(table_metadata.schemas.len(), 1);
3073        assert_eq!(
3074            table_metadata
3075                .schemas
3076                .get(&0)
3077                .unwrap()
3078                .as_struct()
3079                .fields()
3080                .len(),
3081            0
3082        );
3083        assert_eq!(table_metadata.properties.len(), 0);
3084        assert_eq!(
3085            table_metadata.partition_specs,
3086            HashMap::from([(
3087                0,
3088                Arc::new(
3089                    PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
3090                        .with_spec_id(0)
3091                        .build()
3092                        .unwrap()
3093                )
3094            )])
3095        );
3096        assert_eq!(
3097            table_metadata.sort_orders,
3098            HashMap::from([(
3099                0,
3100                Arc::new(SortOrder {
3101                    order_id: 0,
3102                    fields: vec![]
3103                })
3104            )])
3105        );
3106    }
3107
3108    #[tokio::test]
3109    async fn test_table_metadata_read_write() {
3110        // Create a temporary directory for our test
3111        let temp_dir = TempDir::new().unwrap();
3112        let temp_path = temp_dir.path().to_str().unwrap();
3113
3114        // Create a FileIO instance
3115        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3116
3117        // Use an existing test metadata from the test files
3118        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3119
3120        // Define the metadata location
3121        let metadata_location = format!("{}/metadata.json", temp_path);
3122
3123        // Write the metadata
3124        original_metadata
3125            .write_to(&file_io, &metadata_location)
3126            .await
3127            .unwrap();
3128
3129        // Verify the file exists
3130        assert!(fs::metadata(&metadata_location).is_ok());
3131
3132        // Read the metadata back
3133        let read_metadata = TableMetadata::read_from(&file_io, &metadata_location)
3134            .await
3135            .unwrap();
3136
3137        // Verify the metadata matches
3138        assert_eq!(read_metadata, original_metadata);
3139    }
3140
3141    #[tokio::test]
3142    async fn test_table_metadata_read_nonexistent_file() {
3143        // Create a FileIO instance
3144        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3145
3146        // Try to read a non-existent file
3147        let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;
3148
3149        // Verify it returns an error
3150        assert!(result.is_err());
3151    }
3152
3153    #[test]
3154    fn test_partition_name_exists() {
3155        let schema = Schema::builder()
3156            .with_fields(vec![
3157                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3158                NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
3159                    .into(),
3160            ])
3161            .build()
3162            .unwrap();
3163
3164        let spec1 = PartitionSpec::builder(schema.clone())
3165            .with_spec_id(1)
3166            .add_partition_field("data", "data_partition", Transform::Identity)
3167            .unwrap()
3168            .build()
3169            .unwrap();
3170
3171        let spec2 = PartitionSpec::builder(schema.clone())
3172            .with_spec_id(2)
3173            .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
3174            .unwrap()
3175            .build()
3176            .unwrap();
3177
3178        // Build metadata with these specs
3179        let metadata = TableMetadataBuilder::new(
3180            schema,
3181            spec1.clone().into_unbound(),
3182            SortOrder::unsorted_order(),
3183            "s3://test/location".to_string(),
3184            FormatVersion::V2,
3185            HashMap::new(),
3186        )
3187        .unwrap()
3188        .add_partition_spec(spec2.into_unbound())
3189        .unwrap()
3190        .build()
3191        .unwrap()
3192        .metadata;
3193
3194        assert!(metadata.partition_name_exists("data_partition"));
3195        assert!(metadata.partition_name_exists("partition_bucket"));
3196
3197        assert!(!metadata.partition_name_exists("nonexistent_field"));
3198        assert!(!metadata.partition_name_exists("data")); // schema field name, not partition field name
3199        assert!(!metadata.partition_name_exists(""));
3200    }
3201
3202    #[test]
3203    fn test_partition_name_exists_empty_specs() {
3204        // Create metadata with no partition specs (unpartitioned table)
3205        let schema = Schema::builder()
3206            .with_fields(vec![
3207                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3208            ])
3209            .build()
3210            .unwrap();
3211
3212        let metadata = TableMetadataBuilder::new(
3213            schema,
3214            PartitionSpec::unpartition_spec().into_unbound(),
3215            SortOrder::unsorted_order(),
3216            "s3://test/location".to_string(),
3217            FormatVersion::V2,
3218            HashMap::new(),
3219        )
3220        .unwrap()
3221        .build()
3222        .unwrap()
3223        .metadata;
3224
3225        assert!(!metadata.partition_name_exists("any_field"));
3226        assert!(!metadata.partition_name_exists("data"));
3227    }
3228
3229    #[test]
3230    fn test_name_exists_in_any_schema() {
3231        // Create multiple schemas with different fields
3232        let schema1 = Schema::builder()
3233            .with_schema_id(1)
3234            .with_fields(vec![
3235                NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3236                NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
3237            ])
3238            .build()
3239            .unwrap();
3240
3241        let schema2 = Schema::builder()
3242            .with_schema_id(2)
3243            .with_fields(vec![
3244                NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3245                NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
3246            ])
3247            .build()
3248            .unwrap();
3249
3250        let metadata = TableMetadataBuilder::new(
3251            schema1,
3252            PartitionSpec::unpartition_spec().into_unbound(),
3253            SortOrder::unsorted_order(),
3254            "s3://test/location".to_string(),
3255            FormatVersion::V2,
3256            HashMap::new(),
3257        )
3258        .unwrap()
3259        .add_current_schema(schema2)
3260        .unwrap()
3261        .build()
3262        .unwrap()
3263        .metadata;
3264
3265        assert!(metadata.name_exists_in_any_schema("field1")); // exists in both schemas
3266        assert!(metadata.name_exists_in_any_schema("field2")); // exists only in schema1 (historical)
3267        assert!(metadata.name_exists_in_any_schema("field3")); // exists only in schema2 (current)
3268
3269        assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
3270        assert!(!metadata.name_exists_in_any_schema("field4"));
3271        assert!(!metadata.name_exists_in_any_schema(""));
3272    }
3273
3274    #[test]
3275    fn test_name_exists_in_any_schema_empty_schemas() {
3276        let schema = Schema::builder().with_fields(vec![]).build().unwrap();
3277
3278        let metadata = TableMetadataBuilder::new(
3279            schema,
3280            PartitionSpec::unpartition_spec().into_unbound(),
3281            SortOrder::unsorted_order(),
3282            "s3://test/location".to_string(),
3283            FormatVersion::V2,
3284            HashMap::new(),
3285        )
3286        .unwrap()
3287        .build()
3288        .unwrap()
3289        .metadata;
3290
3291        assert!(!metadata.name_exists_in_any_schema("any_field"));
3292    }
3293
3294    #[test]
3295    fn test_helper_methods_multi_version_scenario() {
3296        // Test a realistic multi-version scenario
3297        let initial_schema = Schema::builder()
3298            .with_fields(vec![
3299                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3300                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3301                NestedField::required(
3302                    3,
3303                    "deprecated_field",
3304                    Type::Primitive(PrimitiveType::String),
3305                )
3306                .into(),
3307            ])
3308            .build()
3309            .unwrap();
3310
3311        let metadata = TableMetadataBuilder::new(
3312            initial_schema,
3313            PartitionSpec::unpartition_spec().into_unbound(),
3314            SortOrder::unsorted_order(),
3315            "s3://test/location".to_string(),
3316            FormatVersion::V2,
3317            HashMap::new(),
3318        )
3319        .unwrap();
3320
3321        let evolved_schema = Schema::builder()
3322            .with_fields(vec![
3323                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3324                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3325                NestedField::required(
3326                    3,
3327                    "deprecated_field",
3328                    Type::Primitive(PrimitiveType::String),
3329                )
3330                .into(),
3331                NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3332                    .into(),
3333            ])
3334            .build()
3335            .unwrap();
3336
3337        // Then add a third schema that removes the deprecated field
3338        let _final_schema = Schema::builder()
3339            .with_fields(vec![
3340                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3341                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3342                NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3343                    .into(),
3344                NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
3345                    .into(),
3346            ])
3347            .build()
3348            .unwrap();
3349
3350        let final_metadata = metadata
3351            .add_current_schema(evolved_schema)
3352            .unwrap()
3353            .build()
3354            .unwrap()
3355            .metadata;
3356
3357        assert!(!final_metadata.partition_name_exists("nonexistent_partition")); // unpartitioned table
3358
3359        assert!(final_metadata.name_exists_in_any_schema("id")); // exists in both schemas
3360        assert!(final_metadata.name_exists_in_any_schema("name")); // exists in both schemas
3361        assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); // exists in both schemas
3362        assert!(final_metadata.name_exists_in_any_schema("new_field")); // only in current schema
3363        assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
3364    }
3365}