iceberg/spec/
table_metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata).
19//! The main struct here is [TableMetadataV2] which defines the data for a table.
20
21use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::hash::Hash;
25use std::io::Read as _;
26use std::sync::Arc;
27
28use _serde::TableMetadataEnum;
29use chrono::{DateTime, Utc};
30use flate2::read::GzDecoder;
31use serde::{Deserialize, Serialize};
32use serde_repr::{Deserialize_repr, Serialize_repr};
33use uuid::Uuid;
34
35use super::snapshot::SnapshotReference;
36pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
37use super::{
38    DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
39    SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
40};
41use crate::error::{Result, timestamp_ms_to_utc};
42use crate::io::FileIO;
43use crate::spec::EncryptedKey;
44use crate::{Error, ErrorKind};
45
46static MAIN_BRANCH: &str = "main";
47pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
48
49pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
50pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
51
52/// Initial row id for row lineage for new v3 tables and older tables upgrading to v3.
53pub const INITIAL_ROW_ID: u64 = 0;
54/// Minimum format version that supports row lineage (v3).
55pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
56/// Reference to [`TableMetadata`].
57pub type TableMetadataRef = Arc<TableMetadata>;
58
59#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
60#[serde(try_from = "TableMetadataEnum")]
61/// Fields for the version 2 of the table metadata.
62///
63/// We assume that this data structure is always valid, so we will panic when invalid error happens.
64/// We check the validity of this data structure when constructing.
65pub struct TableMetadata {
66    /// Integer Version for the format.
67    pub(crate) format_version: FormatVersion,
68    /// A UUID that identifies the table
69    pub(crate) table_uuid: Uuid,
70    /// Location tables base location
71    pub(crate) location: String,
72    /// The tables highest sequence number
73    pub(crate) last_sequence_number: i64,
74    /// Timestamp in milliseconds from the unix epoch when the table was last updated.
75    pub(crate) last_updated_ms: i64,
76    /// An integer; the highest assigned column ID for the table.
77    pub(crate) last_column_id: i32,
78    /// A list of schemas, stored as objects with schema-id.
79    pub(crate) schemas: HashMap<i32, SchemaRef>,
80    /// ID of the table’s current schema.
81    pub(crate) current_schema_id: i32,
82    /// A list of partition specs, stored as full partition spec objects.
83    pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
84    /// ID of the “current” spec that writers should use by default.
85    pub(crate) default_spec: PartitionSpecRef,
86    /// Partition type of the default partition spec.
87    pub(crate) default_partition_type: StructType,
88    /// An integer; the highest assigned partition field ID across all partition specs for the table.
89    pub(crate) last_partition_id: i32,
90    ///A string to string map of table properties. This is used to control settings that
91    /// affect reading and writing and is not intended to be used for arbitrary metadata.
92    /// For example, commit.retry.num-retries is used to control the number of commit retries.
93    pub(crate) properties: HashMap<String, String>,
94    /// long ID of the current table snapshot; must be the same as the current
95    /// ID of the main branch in refs.
96    pub(crate) current_snapshot_id: Option<i64>,
97    ///A list of valid snapshots. Valid snapshots are snapshots for which all
98    /// data files exist in the file system. A data file must not be deleted
99    /// from the file system until the last snapshot in which it was listed is
100    /// garbage collected.
101    pub(crate) snapshots: HashMap<i64, SnapshotRef>,
102    /// A list (optional) of timestamp and snapshot ID pairs that encodes changes
103    /// to the current snapshot for the table. Each time the current-snapshot-id
104    /// is changed, a new entry should be added with the last-updated-ms
105    /// and the new current-snapshot-id. When snapshots are expired from
106    /// the list of valid snapshots, all entries before a snapshot that has
107    /// expired should be removed.
108    pub(crate) snapshot_log: Vec<SnapshotLog>,
109
110    /// A list (optional) of timestamp and metadata file location pairs
111    /// that encodes changes to the previous metadata files for the table.
112    /// Each time a new metadata file is created, a new entry of the
113    /// previous metadata file location should be added to the list.
114    /// Tables can be configured to remove the oldest metadata log entries and
115    /// keep a fixed-size log of the most recent entries after a commit.
116    pub(crate) metadata_log: Vec<MetadataLog>,
117
118    /// A list of sort orders, stored as full sort order objects.
119    pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
120    /// Default sort order id of the table. Note that this could be used by
121    /// writers, but is not used when reading because reads use the specs
122    /// stored in manifest files.
123    pub(crate) default_sort_order_id: i64,
124    /// A map of snapshot references. The map keys are the unique snapshot reference
125    /// names in the table, and the map values are snapshot reference objects.
126    /// There is always a main branch reference pointing to the current-snapshot-id
127    /// even if the refs map is null.
128    pub(crate) refs: HashMap<String, SnapshotReference>,
129    /// Mapping of snapshot ids to statistics files.
130    pub(crate) statistics: HashMap<i64, StatisticsFile>,
131    /// Mapping of snapshot ids to partition statistics files.
132    pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
133    /// Encryption Keys - map of key id to the actual key
134    pub(crate) encryption_keys: HashMap<String, EncryptedKey>,
135    /// Next row id to be assigned for Row Lineage (v3)
136    pub(crate) next_row_id: u64,
137}
138
139impl TableMetadata {
140    /// Convert this Table Metadata into a builder for modification.
141    ///
142    /// `current_file_location` is the location where the current version
143    /// of the metadata file is stored. This is used to update the metadata log.
144    /// If `current_file_location` is `None`, the metadata log will not be updated.
145    /// This should only be used to stage-create tables.
146    #[must_use]
147    pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
148        TableMetadataBuilder::new_from_metadata(self, current_file_location)
149    }
150
151    /// Check if a partition field name exists in any partition spec.
152    #[inline]
153    pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
154        self.partition_specs
155            .values()
156            .any(|spec| spec.fields().iter().any(|pf| pf.name == name))
157    }
158
159    /// Check if a field name exists in any schema.
160    #[inline]
161    pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
162        self.schemas
163            .values()
164            .any(|schema| schema.field_by_name(name).is_some())
165    }
166
167    /// Returns format version of this metadata.
168    #[inline]
169    pub fn format_version(&self) -> FormatVersion {
170        self.format_version
171    }
172
173    /// Returns uuid of current table.
174    #[inline]
175    pub fn uuid(&self) -> Uuid {
176        self.table_uuid
177    }
178
179    /// Returns table location.
180    #[inline]
181    pub fn location(&self) -> &str {
182        self.location.as_str()
183    }
184
185    /// Returns last sequence number.
186    #[inline]
187    pub fn last_sequence_number(&self) -> i64 {
188        self.last_sequence_number
189    }
190
191    /// Returns the next sequence number for the table.
192    ///
193    /// For format version 1, it always returns the initial sequence number.
194    /// For other versions, it returns the last sequence number incremented by 1.
195    #[inline]
196    pub fn next_sequence_number(&self) -> i64 {
197        match self.format_version {
198            FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
199            _ => self.last_sequence_number + 1,
200        }
201    }
202
203    /// Returns the last column id.
204    #[inline]
205    pub fn last_column_id(&self) -> i32 {
206        self.last_column_id
207    }
208
209    /// Returns the last partition_id
210    #[inline]
211    pub fn last_partition_id(&self) -> i32 {
212        self.last_partition_id
213    }
214
215    /// Returns last updated time.
216    #[inline]
217    pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
218        timestamp_ms_to_utc(self.last_updated_ms)
219    }
220
221    /// Returns last updated time in milliseconds.
222    #[inline]
223    pub fn last_updated_ms(&self) -> i64 {
224        self.last_updated_ms
225    }
226
227    /// Returns schemas
228    #[inline]
229    pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
230        self.schemas.values()
231    }
232
233    /// Lookup schema by id.
234    #[inline]
235    pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
236        self.schemas.get(&schema_id)
237    }
238
239    /// Get current schema
240    #[inline]
241    pub fn current_schema(&self) -> &SchemaRef {
242        self.schema_by_id(self.current_schema_id)
243            .expect("Current schema id set, but not found in table metadata")
244    }
245
246    /// Get the id of the current schema
247    #[inline]
248    pub fn current_schema_id(&self) -> SchemaId {
249        self.current_schema_id
250    }
251
252    /// Returns all partition specs.
253    #[inline]
254    pub fn partition_specs_iter(&self) -> impl ExactSizeIterator<Item = &PartitionSpecRef> {
255        self.partition_specs.values()
256    }
257
258    /// Lookup partition spec by id.
259    #[inline]
260    pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> {
261        self.partition_specs.get(&spec_id)
262    }
263
264    /// Get default partition spec
265    #[inline]
266    pub fn default_partition_spec(&self) -> &PartitionSpecRef {
267        &self.default_spec
268    }
269
270    /// Return the partition type of the default partition spec.
271    #[inline]
272    pub fn default_partition_type(&self) -> &StructType {
273        &self.default_partition_type
274    }
275
276    #[inline]
277    /// Returns spec id of the "current" partition spec.
278    pub fn default_partition_spec_id(&self) -> i32 {
279        self.default_spec.spec_id()
280    }
281
282    /// Returns all snapshots
283    #[inline]
284    pub fn snapshots(&self) -> impl ExactSizeIterator<Item = &SnapshotRef> {
285        self.snapshots.values()
286    }
287
288    /// Lookup snapshot by id.
289    #[inline]
290    pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
291        self.snapshots.get(&snapshot_id)
292    }
293
294    /// Returns snapshot history.
295    #[inline]
296    pub fn history(&self) -> &[SnapshotLog] {
297        &self.snapshot_log
298    }
299
300    /// Returns the metadata log.
301    #[inline]
302    pub fn metadata_log(&self) -> &[MetadataLog] {
303        &self.metadata_log
304    }
305
306    /// Get current snapshot
307    #[inline]
308    pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
309        self.current_snapshot_id.map(|s| {
310            self.snapshot_by_id(s)
311                .expect("Current snapshot id has been set, but doesn't exist in metadata")
312        })
313    }
314
315    /// Get the current snapshot id
316    #[inline]
317    pub fn current_snapshot_id(&self) -> Option<i64> {
318        self.current_snapshot_id
319    }
320
321    /// Get the snapshot for a reference
322    /// Returns an option if the `ref_name` is not found
323    #[inline]
324    pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> {
325        self.refs.get(ref_name).map(|r| {
326            self.snapshot_by_id(r.snapshot_id)
327                .unwrap_or_else(|| panic!("Snapshot id of ref {ref_name} doesn't exist"))
328        })
329    }
330
331    /// Return all sort orders.
332    #[inline]
333    pub fn sort_orders_iter(&self) -> impl ExactSizeIterator<Item = &SortOrderRef> {
334        self.sort_orders.values()
335    }
336
337    /// Lookup sort order by id.
338    #[inline]
339    pub fn sort_order_by_id(&self, sort_order_id: i64) -> Option<&SortOrderRef> {
340        self.sort_orders.get(&sort_order_id)
341    }
342
343    /// Returns default sort order id.
344    #[inline]
345    pub fn default_sort_order(&self) -> &SortOrderRef {
346        self.sort_orders
347            .get(&self.default_sort_order_id)
348            .expect("Default order id has been set, but not found in table metadata!")
349    }
350
351    /// Returns default sort order id.
352    #[inline]
353    pub fn default_sort_order_id(&self) -> i64 {
354        self.default_sort_order_id
355    }
356
357    /// Returns properties of table.
358    #[inline]
359    pub fn properties(&self) -> &HashMap<String, String> {
360        &self.properties
361    }
362
363    /// Return location of statistics files.
364    #[inline]
365    pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
366        self.statistics.values()
367    }
368
369    /// Return location of partition statistics files.
370    #[inline]
371    pub fn partition_statistics_iter(
372        &self,
373    ) -> impl ExactSizeIterator<Item = &PartitionStatisticsFile> {
374        self.partition_statistics.values()
375    }
376
377    /// Get a statistics file for a snapshot id.
378    #[inline]
379    pub fn statistics_for_snapshot(&self, snapshot_id: i64) -> Option<&StatisticsFile> {
380        self.statistics.get(&snapshot_id)
381    }
382
383    /// Get a partition statistics file for a snapshot id.
384    #[inline]
385    pub fn partition_statistics_for_snapshot(
386        &self,
387        snapshot_id: i64,
388    ) -> Option<&PartitionStatisticsFile> {
389        self.partition_statistics.get(&snapshot_id)
390    }
391
392    fn construct_refs(&mut self) {
393        if let Some(current_snapshot_id) = self.current_snapshot_id
394            && !self.refs.contains_key(MAIN_BRANCH)
395        {
396            self.refs
397                .insert(MAIN_BRANCH.to_string(), SnapshotReference {
398                    snapshot_id: current_snapshot_id,
399                    retention: SnapshotRetention::Branch {
400                        min_snapshots_to_keep: None,
401                        max_snapshot_age_ms: None,
402                        max_ref_age_ms: None,
403                    },
404                });
405        }
406    }
407
408    /// Iterate over all encryption keys
409    #[inline]
410    pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = &EncryptedKey> {
411        self.encryption_keys.values()
412    }
413
414    /// Get the encryption key for a given key id
415    #[inline]
416    pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> {
417        self.encryption_keys.get(key_id)
418    }
419
420    /// Get the next row id to be assigned
421    #[inline]
422    pub fn next_row_id(&self) -> u64 {
423        self.next_row_id
424    }
425
426    /// Read table metadata from the given location.
427    pub async fn read_from(
428        file_io: &FileIO,
429        metadata_location: impl AsRef<str>,
430    ) -> Result<TableMetadata> {
431        let metadata_location = metadata_location.as_ref();
432        let input_file = file_io.new_input(metadata_location)?;
433        let metadata_content = input_file.read().await?;
434
435        // Check if the file is compressed by looking for the gzip "magic number".
436        let metadata = if metadata_content.len() > 2
437            && metadata_content[0] == 0x1F
438            && metadata_content[1] == 0x8B
439        {
440            let mut decoder = GzDecoder::new(metadata_content.as_ref());
441            let mut decompressed_data = Vec::new();
442            decoder.read_to_end(&mut decompressed_data).map_err(|e| {
443                Error::new(
444                    ErrorKind::DataInvalid,
445                    "Trying to read compressed metadata file",
446                )
447                .with_context("file_path", metadata_location)
448                .with_source(e)
449            })?;
450            serde_json::from_slice(&decompressed_data)?
451        } else {
452            serde_json::from_slice(&metadata_content)?
453        };
454
455        Ok(metadata)
456    }
457
458    /// Write table metadata to the given location.
459    pub async fn write_to(
460        &self,
461        file_io: &FileIO,
462        metadata_location: impl AsRef<str>,
463    ) -> Result<()> {
464        file_io
465            .new_output(metadata_location)?
466            .write(serde_json::to_vec(self)?.into())
467            .await
468    }
469
470    /// Normalize this partition spec.
471    ///
472    /// This is an internal method
473    /// meant to be called after constructing table metadata from untrusted sources.
474    /// We run this method after json deserialization.
475    /// All constructors for `TableMetadata` which are part of `iceberg-rust`
476    /// should return normalized `TableMetadata`.
477    pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
478        self.validate_current_schema()?;
479        self.normalize_current_snapshot()?;
480        self.construct_refs();
481        self.validate_refs()?;
482        self.validate_chronological_snapshot_logs()?;
483        self.validate_chronological_metadata_logs()?;
484        // Normalize location (remove trailing slash)
485        self.location = self.location.trim_end_matches('/').to_string();
486        self.validate_snapshot_sequence_number()?;
487        self.try_normalize_partition_spec()?;
488        self.try_normalize_sort_order()?;
489        Ok(self)
490    }
491
492    /// If the default partition spec is not present in specs, add it
493    fn try_normalize_partition_spec(&mut self) -> Result<()> {
494        if self
495            .partition_spec_by_id(self.default_spec.spec_id())
496            .is_none()
497        {
498            self.partition_specs.insert(
499                self.default_spec.spec_id(),
500                Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())),
501            );
502        }
503
504        Ok(())
505    }
506
507    /// If the default sort order is unsorted but the sort order is not present, add it
508    fn try_normalize_sort_order(&mut self) -> Result<()> {
509        if self.sort_order_by_id(self.default_sort_order_id).is_some() {
510            return Ok(());
511        }
512
513        if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
514            return Err(Error::new(
515                ErrorKind::DataInvalid,
516                format!(
517                    "No sort order exists with the default sort order id {}.",
518                    self.default_sort_order_id
519                ),
520            ));
521        }
522
523        let sort_order = SortOrder::unsorted_order();
524        self.sort_orders
525            .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
526        Ok(())
527    }
528
529    /// Validate the current schema is set and exists.
530    fn validate_current_schema(&self) -> Result<()> {
531        if self.schema_by_id(self.current_schema_id).is_none() {
532            return Err(Error::new(
533                ErrorKind::DataInvalid,
534                format!(
535                    "No schema exists with the current schema id {}.",
536                    self.current_schema_id
537                ),
538            ));
539        }
540        Ok(())
541    }
542
543    /// If current snapshot is Some(-1) then set it to None.
544    fn normalize_current_snapshot(&mut self) -> Result<()> {
545        if let Some(current_snapshot_id) = self.current_snapshot_id {
546            if current_snapshot_id == EMPTY_SNAPSHOT_ID {
547                self.current_snapshot_id = None;
548            } else if self.snapshot_by_id(current_snapshot_id).is_none() {
549                return Err(Error::new(
550                    ErrorKind::DataInvalid,
551                    format!(
552                        "Snapshot for current snapshot id {current_snapshot_id} does not exist in the existing snapshots list"
553                    ),
554                ));
555            }
556        }
557        Ok(())
558    }
559
560    /// Validate that all refs are valid (snapshot exists)
561    fn validate_refs(&self) -> Result<()> {
562        for (name, snapshot_ref) in self.refs.iter() {
563            if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
564                return Err(Error::new(
565                    ErrorKind::DataInvalid,
566                    format!(
567                        "Snapshot for reference {name} does not exist in the existing snapshots list"
568                    ),
569                ));
570            }
571        }
572
573        let main_ref = self.refs.get(MAIN_BRANCH);
574        if self.current_snapshot_id.is_some() {
575            if let Some(main_ref) = main_ref
576                && main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default()
577            {
578                return Err(Error::new(
579                    ErrorKind::DataInvalid,
580                    format!(
581                        "Current snapshot id does not match main branch ({:?} != {:?})",
582                        self.current_snapshot_id.unwrap_or_default(),
583                        main_ref.snapshot_id
584                    ),
585                ));
586            }
587        } else if main_ref.is_some() {
588            return Err(Error::new(
589                ErrorKind::DataInvalid,
590                "Current snapshot is not set, but main branch exists",
591            ));
592        }
593
594        Ok(())
595    }
596
597    /// Validate that for V1 Metadata the last_sequence_number is 0
598    fn validate_snapshot_sequence_number(&self) -> Result<()> {
599        if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
600            return Err(Error::new(
601                ErrorKind::DataInvalid,
602                format!(
603                    "Last sequence number must be 0 in v1. Found {}",
604                    self.last_sequence_number
605                ),
606            ));
607        }
608
609        if self.format_version >= FormatVersion::V2
610            && let Some(snapshot) = self
611                .snapshots
612                .values()
613                .find(|snapshot| snapshot.sequence_number() > self.last_sequence_number)
614        {
615            return Err(Error::new(
616                ErrorKind::DataInvalid,
617                format!(
618                    "Invalid snapshot with id {} and sequence number {} greater than last sequence number {}",
619                    snapshot.snapshot_id(),
620                    snapshot.sequence_number(),
621                    self.last_sequence_number
622                ),
623            ));
624        }
625
626        Ok(())
627    }
628
629    /// Validate snapshots logs are chronological and last updated is after the last snapshot log.
630    fn validate_chronological_snapshot_logs(&self) -> Result<()> {
631        for window in self.snapshot_log.windows(2) {
632            let (prev, curr) = (&window[0], &window[1]);
633            // commits can happen concurrently from different machines.
634            // A tolerance helps us avoid failure for small clock skew
635            if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
636                return Err(Error::new(
637                    ErrorKind::DataInvalid,
638                    "Expected sorted snapshot log entries",
639                ));
640            }
641        }
642
643        if let Some(last) = self.snapshot_log.last() {
644            // commits can happen concurrently from different machines.
645            // A tolerance helps us avoid failure for small clock skew
646            if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
647                return Err(Error::new(
648                    ErrorKind::DataInvalid,
649                    format!(
650                        "Invalid update timestamp {}: before last snapshot log entry at {}",
651                        self.last_updated_ms, last.timestamp_ms
652                    ),
653                ));
654            }
655        }
656        Ok(())
657    }
658
659    fn validate_chronological_metadata_logs(&self) -> Result<()> {
660        for window in self.metadata_log.windows(2) {
661            let (prev, curr) = (&window[0], &window[1]);
662            // commits can happen concurrently from different machines.
663            // A tolerance helps us avoid failure for small clock skew
664            if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
665                return Err(Error::new(
666                    ErrorKind::DataInvalid,
667                    "Expected sorted metadata log entries",
668                ));
669            }
670        }
671
672        if let Some(last) = self.metadata_log.last() {
673            // commits can happen concurrently from different machines.
674            // A tolerance helps us avoid failure for small clock skew
675            if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
676                return Err(Error::new(
677                    ErrorKind::DataInvalid,
678                    format!(
679                        "Invalid update timestamp {}: before last metadata log entry at {}",
680                        self.last_updated_ms, last.timestamp_ms
681                    ),
682                ));
683            }
684        }
685
686        Ok(())
687    }
688}
689
690pub(super) mod _serde {
691    use std::borrow::BorrowMut;
692    /// This is a helper module that defines types to help with serialization/deserialization.
693    /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct
694    /// and then converted into the [TableMetadata] struct. Serialization works the other way around.
695    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization.
696    use std::collections::HashMap;
697    /// This is a helper module that defines types to help with serialization/deserialization.
698    /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct
699    /// and then converted into the [TableMetadata] struct. Serialization works the other way around.
700    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization.
701    use std::sync::Arc;
702
703    use serde::{Deserialize, Serialize};
704    use uuid::Uuid;
705
706    use super::{
707        DEFAULT_PARTITION_SPEC_ID, FormatVersion, MAIN_BRANCH, MetadataLog, SnapshotLog,
708        TableMetadata,
709    };
710    use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
711    use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3};
712    use crate::spec::{
713        EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec, PartitionSpecRef,
714        PartitionStatisticsFile, Schema, SchemaRef, Snapshot, SnapshotReference, SnapshotRetention,
715        SortOrder, StatisticsFile,
716    };
717    use crate::{Error, ErrorKind};
718
719    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
720    #[serde(untagged)]
721    pub(super) enum TableMetadataEnum {
722        V3(TableMetadataV3),
723        V2(TableMetadataV2),
724        V1(TableMetadataV1),
725    }
726
727    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
728    #[serde(rename_all = "kebab-case")]
729    /// Defines the structure of a v2 table metadata for serialization/deserialization
730    pub(super) struct TableMetadataV3 {
731        pub format_version: VersionNumber<3>,
732        #[serde(flatten)]
733        pub shared: TableMetadataV2V3Shared,
734        pub next_row_id: u64,
735        #[serde(skip_serializing_if = "Option::is_none")]
736        pub encryption_keys: Option<Vec<EncryptedKey>>,
737        #[serde(skip_serializing_if = "Option::is_none")]
738        pub snapshots: Option<Vec<SnapshotV3>>,
739    }
740
741    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
742    #[serde(rename_all = "kebab-case")]
743    /// Defines the structure of a v2 table metadata for serialization/deserialization
744    pub(super) struct TableMetadataV2V3Shared {
745        pub table_uuid: Uuid,
746        pub location: String,
747        pub last_sequence_number: i64,
748        pub last_updated_ms: i64,
749        pub last_column_id: i32,
750        pub schemas: Vec<SchemaV2>,
751        pub current_schema_id: i32,
752        pub partition_specs: Vec<PartitionSpec>,
753        pub default_spec_id: i32,
754        pub last_partition_id: i32,
755        #[serde(skip_serializing_if = "Option::is_none")]
756        pub properties: Option<HashMap<String, String>>,
757        #[serde(skip_serializing_if = "Option::is_none")]
758        pub current_snapshot_id: Option<i64>,
759        #[serde(skip_serializing_if = "Option::is_none")]
760        pub snapshot_log: Option<Vec<SnapshotLog>>,
761        #[serde(skip_serializing_if = "Option::is_none")]
762        pub metadata_log: Option<Vec<MetadataLog>>,
763        pub sort_orders: Vec<SortOrder>,
764        pub default_sort_order_id: i64,
765        #[serde(skip_serializing_if = "Option::is_none")]
766        pub refs: Option<HashMap<String, SnapshotReference>>,
767        #[serde(default, skip_serializing_if = "Vec::is_empty")]
768        pub statistics: Vec<StatisticsFile>,
769        #[serde(default, skip_serializing_if = "Vec::is_empty")]
770        pub partition_statistics: Vec<PartitionStatisticsFile>,
771    }
772
773    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
774    #[serde(rename_all = "kebab-case")]
775    /// Defines the structure of a v2 table metadata for serialization/deserialization
776    pub(super) struct TableMetadataV2 {
777        pub format_version: VersionNumber<2>,
778        #[serde(flatten)]
779        pub shared: TableMetadataV2V3Shared,
780        #[serde(skip_serializing_if = "Option::is_none")]
781        pub snapshots: Option<Vec<SnapshotV2>>,
782    }
783
784    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
785    #[serde(rename_all = "kebab-case")]
786    /// Defines the structure of a v1 table metadata for serialization/deserialization
787    pub(super) struct TableMetadataV1 {
788        pub format_version: VersionNumber<1>,
789        #[serde(skip_serializing_if = "Option::is_none")]
790        pub table_uuid: Option<Uuid>,
791        pub location: String,
792        pub last_updated_ms: i64,
793        pub last_column_id: i32,
794        /// `schema` is optional to prioritize `schemas` and `current-schema-id`, allowing liberal reading of V1 metadata.
795        pub schema: Option<SchemaV1>,
796        #[serde(skip_serializing_if = "Option::is_none")]
797        pub schemas: Option<Vec<SchemaV1>>,
798        #[serde(skip_serializing_if = "Option::is_none")]
799        pub current_schema_id: Option<i32>,
800        /// `partition_spec` is optional to prioritize `partition_specs`, aligning with liberal reading of potentially invalid V1 metadata.
801        pub partition_spec: Option<Vec<PartitionField>>,
802        #[serde(skip_serializing_if = "Option::is_none")]
803        pub partition_specs: Option<Vec<PartitionSpec>>,
804        #[serde(skip_serializing_if = "Option::is_none")]
805        pub default_spec_id: Option<i32>,
806        #[serde(skip_serializing_if = "Option::is_none")]
807        pub last_partition_id: Option<i32>,
808        #[serde(skip_serializing_if = "Option::is_none")]
809        pub properties: Option<HashMap<String, String>>,
810        #[serde(skip_serializing_if = "Option::is_none")]
811        pub current_snapshot_id: Option<i64>,
812        #[serde(skip_serializing_if = "Option::is_none")]
813        pub snapshots: Option<Vec<SnapshotV1>>,
814        #[serde(skip_serializing_if = "Option::is_none")]
815        pub snapshot_log: Option<Vec<SnapshotLog>>,
816        #[serde(skip_serializing_if = "Option::is_none")]
817        pub metadata_log: Option<Vec<MetadataLog>>,
818        pub sort_orders: Option<Vec<SortOrder>>,
819        pub default_sort_order_id: Option<i64>,
820        #[serde(default, skip_serializing_if = "Vec::is_empty")]
821        pub statistics: Vec<StatisticsFile>,
822        #[serde(default, skip_serializing_if = "Vec::is_empty")]
823        pub partition_statistics: Vec<PartitionStatisticsFile>,
824    }
825
826    /// Helper to serialize and deserialize the format version.
827    #[derive(Debug, PartialEq, Eq)]
828    pub(crate) struct VersionNumber<const V: u8>;
829
830    impl Serialize for TableMetadata {
831        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
832        where S: serde::Serializer {
833            // we must do a clone here
834            let table_metadata_enum: TableMetadataEnum =
835                self.clone().try_into().map_err(serde::ser::Error::custom)?;
836
837            table_metadata_enum.serialize(serializer)
838        }
839    }
840
841    impl<const V: u8> Serialize for VersionNumber<V> {
842        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
843        where S: serde::Serializer {
844            serializer.serialize_u8(V)
845        }
846    }
847
848    impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
849        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
850        where D: serde::Deserializer<'de> {
851            let value = u8::deserialize(deserializer)?;
852            if value == V {
853                Ok(VersionNumber::<V>)
854            } else {
855                Err(serde::de::Error::custom("Invalid Version"))
856            }
857        }
858    }
859
860    impl TryFrom<TableMetadataEnum> for TableMetadata {
861        type Error = Error;
862        fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
863            match value {
864                TableMetadataEnum::V3(value) => value.try_into(),
865                TableMetadataEnum::V2(value) => value.try_into(),
866                TableMetadataEnum::V1(value) => value.try_into(),
867            }
868        }
869    }
870
871    impl TryFrom<TableMetadata> for TableMetadataEnum {
872        type Error = Error;
873        fn try_from(value: TableMetadata) -> Result<Self, Error> {
874            Ok(match value.format_version {
875                FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?),
876                FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
877                FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
878            })
879        }
880    }
881
882    impl TryFrom<TableMetadataV3> for TableMetadata {
883        type Error = Error;
884        fn try_from(value: TableMetadataV3) -> Result<Self, self::Error> {
885            let TableMetadataV3 {
886                format_version: _,
887                shared: value,
888                next_row_id,
889                encryption_keys,
890                snapshots,
891            } = value;
892            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
893                None
894            } else {
895                value.current_snapshot_id
896            };
897            let schemas = HashMap::from_iter(
898                value
899                    .schemas
900                    .into_iter()
901                    .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
902                    .collect::<Result<Vec<_>, Error>>()?,
903            );
904
905            let current_schema: &SchemaRef =
906                schemas.get(&value.current_schema_id).ok_or_else(|| {
907                    Error::new(
908                        ErrorKind::DataInvalid,
909                        format!(
910                            "No schema exists with the current schema id {}.",
911                            value.current_schema_id
912                        ),
913                    )
914                })?;
915            let partition_specs = HashMap::from_iter(
916                value
917                    .partition_specs
918                    .into_iter()
919                    .map(|x| (x.spec_id(), Arc::new(x))),
920            );
921            let default_spec_id = value.default_spec_id;
922            let default_spec: PartitionSpecRef = partition_specs
923                .get(&value.default_spec_id)
924                .map(|spec| (**spec).clone())
925                .or_else(|| {
926                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
927                        .then(PartitionSpec::unpartition_spec)
928                })
929                .ok_or_else(|| {
930                    Error::new(
931                        ErrorKind::DataInvalid,
932                        format!("Default partition spec {default_spec_id} not found"),
933                    )
934                })?
935                .into();
936            let default_partition_type = default_spec.partition_type(current_schema)?;
937
938            let mut metadata = TableMetadata {
939                format_version: FormatVersion::V3,
940                table_uuid: value.table_uuid,
941                location: value.location,
942                last_sequence_number: value.last_sequence_number,
943                last_updated_ms: value.last_updated_ms,
944                last_column_id: value.last_column_id,
945                current_schema_id: value.current_schema_id,
946                schemas,
947                partition_specs,
948                default_partition_type,
949                default_spec,
950                last_partition_id: value.last_partition_id,
951                properties: value.properties.unwrap_or_default(),
952                current_snapshot_id,
953                snapshots: snapshots
954                    .map(|snapshots| {
955                        HashMap::from_iter(
956                            snapshots
957                                .into_iter()
958                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
959                        )
960                    })
961                    .unwrap_or_default(),
962                snapshot_log: value.snapshot_log.unwrap_or_default(),
963                metadata_log: value.metadata_log.unwrap_or_default(),
964                sort_orders: HashMap::from_iter(
965                    value
966                        .sort_orders
967                        .into_iter()
968                        .map(|x| (x.order_id, Arc::new(x))),
969                ),
970                default_sort_order_id: value.default_sort_order_id,
971                refs: value.refs.unwrap_or_else(|| {
972                    if let Some(snapshot_id) = current_snapshot_id {
973                        HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
974                            snapshot_id,
975                            retention: SnapshotRetention::Branch {
976                                min_snapshots_to_keep: None,
977                                max_snapshot_age_ms: None,
978                                max_ref_age_ms: None,
979                            },
980                        })])
981                    } else {
982                        HashMap::new()
983                    }
984                }),
985                statistics: index_statistics(value.statistics),
986                partition_statistics: index_partition_statistics(value.partition_statistics),
987                encryption_keys: encryption_keys
988                    .map(|keys| {
989                        HashMap::from_iter(keys.into_iter().map(|key| (key.key_id.clone(), key)))
990                    })
991                    .unwrap_or_default(),
992                next_row_id,
993            };
994
995            metadata.borrow_mut().try_normalize()?;
996            Ok(metadata)
997        }
998    }
999
1000    impl TryFrom<TableMetadataV2> for TableMetadata {
1001        type Error = Error;
1002        fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
1003            let snapshots = value.snapshots;
1004            let value = value.shared;
1005            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1006                None
1007            } else {
1008                value.current_snapshot_id
1009            };
1010            let schemas = HashMap::from_iter(
1011                value
1012                    .schemas
1013                    .into_iter()
1014                    .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
1015                    .collect::<Result<Vec<_>, Error>>()?,
1016            );
1017
1018            let current_schema: &SchemaRef =
1019                schemas.get(&value.current_schema_id).ok_or_else(|| {
1020                    Error::new(
1021                        ErrorKind::DataInvalid,
1022                        format!(
1023                            "No schema exists with the current schema id {}.",
1024                            value.current_schema_id
1025                        ),
1026                    )
1027                })?;
1028            let partition_specs = HashMap::from_iter(
1029                value
1030                    .partition_specs
1031                    .into_iter()
1032                    .map(|x| (x.spec_id(), Arc::new(x))),
1033            );
1034            let default_spec_id = value.default_spec_id;
1035            let default_spec: PartitionSpecRef = partition_specs
1036                .get(&value.default_spec_id)
1037                .map(|spec| (**spec).clone())
1038                .or_else(|| {
1039                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
1040                        .then(PartitionSpec::unpartition_spec)
1041                })
1042                .ok_or_else(|| {
1043                    Error::new(
1044                        ErrorKind::DataInvalid,
1045                        format!("Default partition spec {default_spec_id} not found"),
1046                    )
1047                })?
1048                .into();
1049            let default_partition_type = default_spec.partition_type(current_schema)?;
1050
1051            let mut metadata = TableMetadata {
1052                format_version: FormatVersion::V2,
1053                table_uuid: value.table_uuid,
1054                location: value.location,
1055                last_sequence_number: value.last_sequence_number,
1056                last_updated_ms: value.last_updated_ms,
1057                last_column_id: value.last_column_id,
1058                current_schema_id: value.current_schema_id,
1059                schemas,
1060                partition_specs,
1061                default_partition_type,
1062                default_spec,
1063                last_partition_id: value.last_partition_id,
1064                properties: value.properties.unwrap_or_default(),
1065                current_snapshot_id,
1066                snapshots: snapshots
1067                    .map(|snapshots| {
1068                        HashMap::from_iter(
1069                            snapshots
1070                                .into_iter()
1071                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1072                        )
1073                    })
1074                    .unwrap_or_default(),
1075                snapshot_log: value.snapshot_log.unwrap_or_default(),
1076                metadata_log: value.metadata_log.unwrap_or_default(),
1077                sort_orders: HashMap::from_iter(
1078                    value
1079                        .sort_orders
1080                        .into_iter()
1081                        .map(|x| (x.order_id, Arc::new(x))),
1082                ),
1083                default_sort_order_id: value.default_sort_order_id,
1084                refs: value.refs.unwrap_or_else(|| {
1085                    if let Some(snapshot_id) = current_snapshot_id {
1086                        HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1087                            snapshot_id,
1088                            retention: SnapshotRetention::Branch {
1089                                min_snapshots_to_keep: None,
1090                                max_snapshot_age_ms: None,
1091                                max_ref_age_ms: None,
1092                            },
1093                        })])
1094                    } else {
1095                        HashMap::new()
1096                    }
1097                }),
1098                statistics: index_statistics(value.statistics),
1099                partition_statistics: index_partition_statistics(value.partition_statistics),
1100                encryption_keys: HashMap::new(),
1101                next_row_id: INITIAL_ROW_ID,
1102            };
1103
1104            metadata.borrow_mut().try_normalize()?;
1105            Ok(metadata)
1106        }
1107    }
1108
1109    impl TryFrom<TableMetadataV1> for TableMetadata {
1110        type Error = Error;
1111        fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
1112            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1113                None
1114            } else {
1115                value.current_snapshot_id
1116            };
1117
1118            let (schemas, current_schema_id, current_schema) =
1119                if let (Some(schemas_vec), Some(schema_id)) =
1120                    (&value.schemas, value.current_schema_id)
1121                {
1122                    // Option 1: Use 'schemas' + 'current_schema_id'
1123                    let schema_map = HashMap::from_iter(
1124                        schemas_vec
1125                            .clone()
1126                            .into_iter()
1127                            .map(|schema| {
1128                                let schema: Schema = schema.try_into()?;
1129                                Ok((schema.schema_id(), Arc::new(schema)))
1130                            })
1131                            .collect::<Result<Vec<_>, Error>>()?,
1132                    );
1133
1134                    let schema = schema_map
1135                        .get(&schema_id)
1136                        .ok_or_else(|| {
1137                            Error::new(
1138                                ErrorKind::DataInvalid,
1139                                format!("No schema exists with the current schema id {schema_id}."),
1140                            )
1141                        })?
1142                        .clone();
1143                    (schema_map, schema_id, schema)
1144                } else if let Some(schema) = value.schema {
1145                    // Option 2: Fall back to `schema`
1146                    let schema: Schema = schema.try_into()?;
1147                    let schema_id = schema.schema_id();
1148                    let schema_arc = Arc::new(schema);
1149                    let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
1150                    (schema_map, schema_id, schema_arc)
1151                } else {
1152                    // Option 3: No valid schema configuration found
1153                    return Err(Error::new(
1154                        ErrorKind::DataInvalid,
1155                        "No valid schema configuration found in table metadata",
1156                    ));
1157                };
1158
1159            // Prioritize 'partition_specs' over 'partition_spec'
1160            let partition_specs = if let Some(specs_vec) = value.partition_specs {
1161                // Option 1: Use 'partition_specs'
1162                specs_vec
1163                    .into_iter()
1164                    .map(|x| (x.spec_id(), Arc::new(x)))
1165                    .collect::<HashMap<_, _>>()
1166            } else if let Some(partition_spec) = value.partition_spec {
1167                // Option 2: Fall back to 'partition_spec'
1168                let spec = PartitionSpec::builder(current_schema.clone())
1169                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1170                    .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
1171                    .build()?;
1172
1173                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1174            } else {
1175                // Option 3: Create empty partition spec
1176                let spec = PartitionSpec::builder(current_schema.clone())
1177                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1178                    .build()?;
1179
1180                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1181            };
1182
1183            // Get the default_spec_id, prioritizing the explicit value if provided
1184            let default_spec_id = value
1185                .default_spec_id
1186                .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
1187
1188            // Get the default spec
1189            let default_spec: PartitionSpecRef = partition_specs
1190                .get(&default_spec_id)
1191                .map(|x| Arc::unwrap_or_clone(x.clone()))
1192                .ok_or_else(|| {
1193                    Error::new(
1194                        ErrorKind::DataInvalid,
1195                        format!("Default partition spec {default_spec_id} not found"),
1196                    )
1197                })?
1198                .into();
1199            let default_partition_type = default_spec.partition_type(&current_schema)?;
1200
1201            let mut metadata = TableMetadata {
1202                format_version: FormatVersion::V1,
1203                table_uuid: value.table_uuid.unwrap_or_default(),
1204                location: value.location,
1205                last_sequence_number: 0,
1206                last_updated_ms: value.last_updated_ms,
1207                last_column_id: value.last_column_id,
1208                current_schema_id,
1209                default_spec,
1210                default_partition_type,
1211                last_partition_id: value
1212                    .last_partition_id
1213                    .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
1214                partition_specs,
1215                schemas,
1216                properties: value.properties.unwrap_or_default(),
1217                current_snapshot_id,
1218                snapshots: value
1219                    .snapshots
1220                    .map(|snapshots| {
1221                        Ok::<_, Error>(HashMap::from_iter(
1222                            snapshots
1223                                .into_iter()
1224                                .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
1225                                .collect::<Result<Vec<_>, Error>>()?,
1226                        ))
1227                    })
1228                    .transpose()?
1229                    .unwrap_or_default(),
1230                snapshot_log: value.snapshot_log.unwrap_or_default(),
1231                metadata_log: value.metadata_log.unwrap_or_default(),
1232                sort_orders: match value.sort_orders {
1233                    Some(sort_orders) => HashMap::from_iter(
1234                        sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
1235                    ),
1236                    None => HashMap::new(),
1237                },
1238                default_sort_order_id: value
1239                    .default_sort_order_id
1240                    .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
1241                refs: if let Some(snapshot_id) = current_snapshot_id {
1242                    HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1243                        snapshot_id,
1244                        retention: SnapshotRetention::Branch {
1245                            min_snapshots_to_keep: None,
1246                            max_snapshot_age_ms: None,
1247                            max_ref_age_ms: None,
1248                        },
1249                    })])
1250                } else {
1251                    HashMap::new()
1252                },
1253                statistics: index_statistics(value.statistics),
1254                partition_statistics: index_partition_statistics(value.partition_statistics),
1255                encryption_keys: HashMap::new(),
1256                next_row_id: INITIAL_ROW_ID, // v1 has no row lineage
1257            };
1258
1259            metadata.borrow_mut().try_normalize()?;
1260            Ok(metadata)
1261        }
1262    }
1263
1264    impl TryFrom<TableMetadata> for TableMetadataV3 {
1265        type Error = Error;
1266
1267        fn try_from(mut v: TableMetadata) -> Result<Self, Self::Error> {
1268            let next_row_id = v.next_row_id;
1269            let encryption_keys = std::mem::take(&mut v.encryption_keys);
1270            let snapshots = std::mem::take(&mut v.snapshots);
1271            let shared = v.into();
1272
1273            Ok(TableMetadataV3 {
1274                format_version: VersionNumber::<3>,
1275                shared,
1276                next_row_id,
1277                encryption_keys: if encryption_keys.is_empty() {
1278                    None
1279                } else {
1280                    Some(encryption_keys.into_values().collect())
1281                },
1282                snapshots: if snapshots.is_empty() {
1283                    None
1284                } else {
1285                    Some(
1286                        snapshots
1287                            .into_values()
1288                            .map(|s| SnapshotV3::try_from(Arc::unwrap_or_clone(s)))
1289                            .collect::<Result<_, _>>()?,
1290                    )
1291                },
1292            })
1293        }
1294    }
1295
1296    impl From<TableMetadata> for TableMetadataV2 {
1297        fn from(mut v: TableMetadata) -> Self {
1298            let snapshots = std::mem::take(&mut v.snapshots);
1299            let shared = v.into();
1300
1301            TableMetadataV2 {
1302                format_version: VersionNumber::<2>,
1303                shared,
1304                snapshots: if snapshots.is_empty() {
1305                    None
1306                } else {
1307                    Some(
1308                        snapshots
1309                            .into_values()
1310                            .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s)))
1311                            .collect(),
1312                    )
1313                },
1314            }
1315        }
1316    }
1317
1318    impl From<TableMetadata> for TableMetadataV2V3Shared {
1319        fn from(v: TableMetadata) -> Self {
1320            TableMetadataV2V3Shared {
1321                table_uuid: v.table_uuid,
1322                location: v.location,
1323                last_sequence_number: v.last_sequence_number,
1324                last_updated_ms: v.last_updated_ms,
1325                last_column_id: v.last_column_id,
1326                schemas: v
1327                    .schemas
1328                    .into_values()
1329                    .map(|x| {
1330                        Arc::try_unwrap(x)
1331                            .unwrap_or_else(|schema| schema.as_ref().clone())
1332                            .into()
1333                    })
1334                    .collect(),
1335                current_schema_id: v.current_schema_id,
1336                partition_specs: v
1337                    .partition_specs
1338                    .into_values()
1339                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1340                    .collect(),
1341                default_spec_id: v.default_spec.spec_id(),
1342                last_partition_id: v.last_partition_id,
1343                properties: if v.properties.is_empty() {
1344                    None
1345                } else {
1346                    Some(v.properties)
1347                },
1348                current_snapshot_id: v.current_snapshot_id,
1349                snapshot_log: if v.snapshot_log.is_empty() {
1350                    None
1351                } else {
1352                    Some(v.snapshot_log)
1353                },
1354                metadata_log: if v.metadata_log.is_empty() {
1355                    None
1356                } else {
1357                    Some(v.metadata_log)
1358                },
1359                sort_orders: v
1360                    .sort_orders
1361                    .into_values()
1362                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1363                    .collect(),
1364                default_sort_order_id: v.default_sort_order_id,
1365                refs: Some(v.refs),
1366                statistics: v.statistics.into_values().collect(),
1367                partition_statistics: v.partition_statistics.into_values().collect(),
1368            }
1369        }
1370    }
1371
1372    impl TryFrom<TableMetadata> for TableMetadataV1 {
1373        type Error = Error;
1374        fn try_from(v: TableMetadata) -> Result<Self, Error> {
1375            Ok(TableMetadataV1 {
1376                format_version: VersionNumber::<1>,
1377                table_uuid: Some(v.table_uuid),
1378                location: v.location,
1379                last_updated_ms: v.last_updated_ms,
1380                last_column_id: v.last_column_id,
1381                schema: Some(
1382                    v.schemas
1383                        .get(&v.current_schema_id)
1384                        .ok_or(Error::new(
1385                            ErrorKind::Unexpected,
1386                            "current_schema_id not found in schemas",
1387                        ))?
1388                        .as_ref()
1389                        .clone()
1390                        .into(),
1391                ),
1392                schemas: Some(
1393                    v.schemas
1394                        .into_values()
1395                        .map(|x| {
1396                            Arc::try_unwrap(x)
1397                                .unwrap_or_else(|schema| schema.as_ref().clone())
1398                                .into()
1399                        })
1400                        .collect(),
1401                ),
1402                current_schema_id: Some(v.current_schema_id),
1403                partition_spec: Some(v.default_spec.fields().to_vec()),
1404                partition_specs: Some(
1405                    v.partition_specs
1406                        .into_values()
1407                        .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1408                        .collect(),
1409                ),
1410                default_spec_id: Some(v.default_spec.spec_id()),
1411                last_partition_id: Some(v.last_partition_id),
1412                properties: if v.properties.is_empty() {
1413                    None
1414                } else {
1415                    Some(v.properties)
1416                },
1417                current_snapshot_id: v.current_snapshot_id,
1418                snapshots: if v.snapshots.is_empty() {
1419                    None
1420                } else {
1421                    Some(
1422                        v.snapshots
1423                            .into_values()
1424                            .map(|x| Snapshot::clone(&x).into())
1425                            .collect(),
1426                    )
1427                },
1428                snapshot_log: if v.snapshot_log.is_empty() {
1429                    None
1430                } else {
1431                    Some(v.snapshot_log)
1432                },
1433                metadata_log: if v.metadata_log.is_empty() {
1434                    None
1435                } else {
1436                    Some(v.metadata_log)
1437                },
1438                sort_orders: Some(
1439                    v.sort_orders
1440                        .into_values()
1441                        .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| s.as_ref().clone()))
1442                        .collect(),
1443                ),
1444                default_sort_order_id: Some(v.default_sort_order_id),
1445                statistics: v.statistics.into_values().collect(),
1446                partition_statistics: v.partition_statistics.into_values().collect(),
1447            })
1448        }
1449    }
1450
1451    fn index_statistics(statistics: Vec<StatisticsFile>) -> HashMap<i64, StatisticsFile> {
1452        statistics
1453            .into_iter()
1454            .rev()
1455            .map(|s| (s.snapshot_id, s))
1456            .collect()
1457    }
1458
1459    fn index_partition_statistics(
1460        statistics: Vec<PartitionStatisticsFile>,
1461    ) -> HashMap<i64, PartitionStatisticsFile> {
1462        statistics
1463            .into_iter()
1464            .rev()
1465            .map(|s| (s.snapshot_id, s))
1466            .collect()
1467    }
1468}
1469
1470#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)]
1471#[repr(u8)]
1472/// Iceberg format version
1473pub enum FormatVersion {
1474    /// Iceberg spec version 1
1475    V1 = 1u8,
1476    /// Iceberg spec version 2
1477    V2 = 2u8,
1478    /// Iceberg spec version 3
1479    V3 = 3u8,
1480}
1481
1482impl PartialOrd for FormatVersion {
1483    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1484        Some(self.cmp(other))
1485    }
1486}
1487
1488impl Ord for FormatVersion {
1489    fn cmp(&self, other: &Self) -> Ordering {
1490        (*self as u8).cmp(&(*other as u8))
1491    }
1492}
1493
1494impl Display for FormatVersion {
1495    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1496        match self {
1497            FormatVersion::V1 => write!(f, "v1"),
1498            FormatVersion::V2 => write!(f, "v2"),
1499            FormatVersion::V3 => write!(f, "v3"),
1500        }
1501    }
1502}
1503
1504#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1505#[serde(rename_all = "kebab-case")]
1506/// Encodes changes to the previous metadata files for the table
1507pub struct MetadataLog {
1508    /// The file for the log.
1509    pub metadata_file: String,
1510    /// Time new metadata was created
1511    pub timestamp_ms: i64,
1512}
1513
1514#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1515#[serde(rename_all = "kebab-case")]
1516/// A log of when each snapshot was made.
1517pub struct SnapshotLog {
1518    /// Id of the snapshot.
1519    pub snapshot_id: i64,
1520    /// Last updated timestamp
1521    pub timestamp_ms: i64,
1522}
1523
1524impl SnapshotLog {
1525    /// Returns the last updated timestamp as a DateTime<Utc> with millisecond precision
1526    pub fn timestamp(self) -> Result<DateTime<Utc>> {
1527        timestamp_ms_to_utc(self.timestamp_ms)
1528    }
1529
1530    /// Returns the timestamp in milliseconds
1531    #[inline]
1532    pub fn timestamp_ms(&self) -> i64 {
1533        self.timestamp_ms
1534    }
1535}
1536
1537#[cfg(test)]
1538mod tests {
1539    use std::collections::HashMap;
1540    use std::fs;
1541    use std::io::Write as _;
1542    use std::sync::Arc;
1543
1544    use anyhow::Result;
1545    use base64::Engine as _;
1546    use pretty_assertions::assert_eq;
1547    use tempfile::TempDir;
1548    use uuid::Uuid;
1549
1550    use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
1551    use crate::TableCreation;
1552    use crate::io::FileIOBuilder;
1553    use crate::spec::table_metadata::TableMetadata;
1554    use crate::spec::{
1555        BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
1556        PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
1557        SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
1558        Summary, Transform, Type, UnboundPartitionField,
1559    };
1560
1561    fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
1562        let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
1563        assert_eq!(desered_type, expected_type);
1564
1565        let sered_json = serde_json::to_string(&expected_type).unwrap();
1566        let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
1567
1568        assert_eq!(parsed_json_value, desered_type);
1569    }
1570
1571    fn get_test_table_metadata(file_name: &str) -> TableMetadata {
1572        let path = format!("testdata/table_metadata/{file_name}");
1573        let metadata: String = fs::read_to_string(path).unwrap();
1574
1575        serde_json::from_str(&metadata).unwrap()
1576    }
1577
1578    #[test]
1579    fn test_table_data_v2() {
1580        let data = r#"
1581            {
1582                "format-version" : 2,
1583                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1584                "location": "s3://b/wh/data.db/table",
1585                "last-sequence-number" : 1,
1586                "last-updated-ms": 1515100955770,
1587                "last-column-id": 1,
1588                "schemas": [
1589                    {
1590                        "schema-id" : 1,
1591                        "type" : "struct",
1592                        "fields" :[
1593                            {
1594                                "id": 1,
1595                                "name": "struct_name",
1596                                "required": true,
1597                                "type": "fixed[1]"
1598                            },
1599                            {
1600                                "id": 4,
1601                                "name": "ts",
1602                                "required": true,
1603                                "type": "timestamp"
1604                            }
1605                        ]
1606                    }
1607                ],
1608                "current-schema-id" : 1,
1609                "partition-specs": [
1610                    {
1611                        "spec-id": 0,
1612                        "fields": [
1613                            {
1614                                "source-id": 4,
1615                                "field-id": 1000,
1616                                "name": "ts_day",
1617                                "transform": "day"
1618                            }
1619                        ]
1620                    }
1621                ],
1622                "default-spec-id": 0,
1623                "last-partition-id": 1000,
1624                "properties": {
1625                    "commit.retry.num-retries": "1"
1626                },
1627                "metadata-log": [
1628                    {
1629                        "metadata-file": "s3://bucket/.../v1.json",
1630                        "timestamp-ms": 1515100
1631                    }
1632                ],
1633                "refs": {},
1634                "sort-orders": [
1635                    {
1636                    "order-id": 0,
1637                    "fields": []
1638                    }
1639                ],
1640                "default-sort-order-id": 0
1641            }
1642        "#;
1643
1644        let schema = Schema::builder()
1645            .with_schema_id(1)
1646            .with_fields(vec![
1647                Arc::new(NestedField::required(
1648                    1,
1649                    "struct_name",
1650                    Type::Primitive(PrimitiveType::Fixed(1)),
1651                )),
1652                Arc::new(NestedField::required(
1653                    4,
1654                    "ts",
1655                    Type::Primitive(PrimitiveType::Timestamp),
1656                )),
1657            ])
1658            .build()
1659            .unwrap();
1660
1661        let partition_spec = PartitionSpec::builder(schema.clone())
1662            .with_spec_id(0)
1663            .add_unbound_field(UnboundPartitionField {
1664                name: "ts_day".to_string(),
1665                transform: Transform::Day,
1666                source_id: 4,
1667                field_id: Some(1000),
1668            })
1669            .unwrap()
1670            .build()
1671            .unwrap();
1672
1673        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1674        let expected = TableMetadata {
1675            format_version: FormatVersion::V2,
1676            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1677            location: "s3://b/wh/data.db/table".to_string(),
1678            last_updated_ms: 1515100955770,
1679            last_column_id: 1,
1680            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1681            current_schema_id: 1,
1682            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1683            default_partition_type,
1684            default_spec: partition_spec.into(),
1685            last_partition_id: 1000,
1686            default_sort_order_id: 0,
1687            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1688            snapshots: HashMap::default(),
1689            current_snapshot_id: None,
1690            last_sequence_number: 1,
1691            properties: HashMap::from_iter(vec![(
1692                "commit.retry.num-retries".to_string(),
1693                "1".to_string(),
1694            )]),
1695            snapshot_log: Vec::new(),
1696            metadata_log: vec![MetadataLog {
1697                metadata_file: "s3://bucket/.../v1.json".to_string(),
1698                timestamp_ms: 1515100,
1699            }],
1700            refs: HashMap::new(),
1701            statistics: HashMap::new(),
1702            partition_statistics: HashMap::new(),
1703            encryption_keys: HashMap::new(),
1704            next_row_id: INITIAL_ROW_ID,
1705        };
1706
1707        let expected_json_value = serde_json::to_value(&expected).unwrap();
1708        check_table_metadata_serde(data, expected);
1709
1710        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1711        assert_eq!(json_value, expected_json_value);
1712    }
1713
1714    #[test]
1715    fn test_table_data_v3() {
1716        let data = r#"
1717            {
1718                "format-version" : 3,
1719                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1720                "location": "s3://b/wh/data.db/table",
1721                "last-sequence-number" : 1,
1722                "last-updated-ms": 1515100955770,
1723                "last-column-id": 1,
1724                "next-row-id": 5,
1725                "schemas": [
1726                    {
1727                        "schema-id" : 1,
1728                        "type" : "struct",
1729                        "fields" :[
1730                            {
1731                                "id": 4,
1732                                "name": "ts",
1733                                "required": true,
1734                                "type": "timestamp"
1735                            }
1736                        ]
1737                    }
1738                ],
1739                "current-schema-id" : 1,
1740                "partition-specs": [
1741                    {
1742                        "spec-id": 0,
1743                        "fields": [
1744                            {
1745                                "source-id": 4,
1746                                "field-id": 1000,
1747                                "name": "ts_day",
1748                                "transform": "day"
1749                            }
1750                        ]
1751                    }
1752                ],
1753                "default-spec-id": 0,
1754                "last-partition-id": 1000,
1755                "properties": {
1756                    "commit.retry.num-retries": "1"
1757                },
1758                "metadata-log": [
1759                    {
1760                        "metadata-file": "s3://bucket/.../v1.json",
1761                        "timestamp-ms": 1515100
1762                    }
1763                ],
1764                "refs": {},
1765                "snapshots" : [ {
1766                    "snapshot-id" : 1,
1767                    "timestamp-ms" : 1662532818843,
1768                    "sequence-number" : 0,
1769                    "first-row-id" : 0,
1770                    "added-rows" : 4,
1771                    "key-id" : "key1",
1772                    "summary" : {
1773                        "operation" : "append"
1774                    },
1775                    "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1776                    "schema-id" : 0
1777                    }
1778                ],
1779                "encryption-keys": [
1780                    {
1781                        "key-id": "key1",
1782                        "encrypted-by-id": "KMS",
1783                        "encrypted-key-metadata": "c29tZS1lbmNyeXB0aW9uLWtleQ==",
1784                        "properties": {
1785                            "p1": "v1"
1786                        }
1787                    }
1788                ],
1789                "sort-orders": [
1790                    {
1791                    "order-id": 0,
1792                    "fields": []
1793                    }
1794                ],
1795                "default-sort-order-id": 0
1796            }
1797        "#;
1798
1799        let schema = Schema::builder()
1800            .with_schema_id(1)
1801            .with_fields(vec![Arc::new(NestedField::required(
1802                4,
1803                "ts",
1804                Type::Primitive(PrimitiveType::Timestamp),
1805            ))])
1806            .build()
1807            .unwrap();
1808
1809        let partition_spec = PartitionSpec::builder(schema.clone())
1810            .with_spec_id(0)
1811            .add_unbound_field(UnboundPartitionField {
1812                name: "ts_day".to_string(),
1813                transform: Transform::Day,
1814                source_id: 4,
1815                field_id: Some(1000),
1816            })
1817            .unwrap()
1818            .build()
1819            .unwrap();
1820
1821        let snapshot = Snapshot::builder()
1822            .with_snapshot_id(1)
1823            .with_timestamp_ms(1662532818843)
1824            .with_sequence_number(0)
1825            .with_row_range(0, 4)
1826            .with_encryption_key_id(Some("key1".to_string()))
1827            .with_summary(Summary {
1828                operation: Operation::Append,
1829                additional_properties: HashMap::new(),
1830            })
1831            .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
1832            .with_schema_id(0)
1833            .build();
1834
1835        let encryption_key = EncryptedKey::builder()
1836            .key_id("key1".to_string())
1837            .encrypted_by_id("KMS".to_string())
1838            .encrypted_key_metadata(
1839                base64::prelude::BASE64_STANDARD
1840                    .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==")
1841                    .unwrap(),
1842            )
1843            .properties(HashMap::from_iter(vec![(
1844                "p1".to_string(),
1845                "v1".to_string(),
1846            )]))
1847            .build();
1848
1849        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1850        let expected = TableMetadata {
1851            format_version: FormatVersion::V3,
1852            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1853            location: "s3://b/wh/data.db/table".to_string(),
1854            last_updated_ms: 1515100955770,
1855            last_column_id: 1,
1856            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1857            current_schema_id: 1,
1858            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1859            default_partition_type,
1860            default_spec: partition_spec.into(),
1861            last_partition_id: 1000,
1862            default_sort_order_id: 0,
1863            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1864            snapshots: HashMap::from_iter(vec![(1, snapshot.into())]),
1865            current_snapshot_id: None,
1866            last_sequence_number: 1,
1867            properties: HashMap::from_iter(vec![(
1868                "commit.retry.num-retries".to_string(),
1869                "1".to_string(),
1870            )]),
1871            snapshot_log: Vec::new(),
1872            metadata_log: vec![MetadataLog {
1873                metadata_file: "s3://bucket/.../v1.json".to_string(),
1874                timestamp_ms: 1515100,
1875            }],
1876            refs: HashMap::new(),
1877            statistics: HashMap::new(),
1878            partition_statistics: HashMap::new(),
1879            encryption_keys: HashMap::from_iter(vec![("key1".to_string(), encryption_key)]),
1880            next_row_id: 5,
1881        };
1882
1883        let expected_json_value = serde_json::to_value(&expected).unwrap();
1884        check_table_metadata_serde(data, expected);
1885
1886        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1887        assert_eq!(json_value, expected_json_value);
1888    }
1889
1890    #[test]
1891    fn test_table_data_v1() {
1892        let data = r#"
1893        {
1894            "format-version" : 1,
1895            "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1896            "location" : "/home/iceberg/warehouse/nyc/taxis",
1897            "last-updated-ms" : 1662532818843,
1898            "last-column-id" : 5,
1899            "schema" : {
1900              "type" : "struct",
1901              "schema-id" : 0,
1902              "fields" : [ {
1903                "id" : 1,
1904                "name" : "vendor_id",
1905                "required" : false,
1906                "type" : "long"
1907              }, {
1908                "id" : 2,
1909                "name" : "trip_id",
1910                "required" : false,
1911                "type" : "long"
1912              }, {
1913                "id" : 3,
1914                "name" : "trip_distance",
1915                "required" : false,
1916                "type" : "float"
1917              }, {
1918                "id" : 4,
1919                "name" : "fare_amount",
1920                "required" : false,
1921                "type" : "double"
1922              }, {
1923                "id" : 5,
1924                "name" : "store_and_fwd_flag",
1925                "required" : false,
1926                "type" : "string"
1927              } ]
1928            },
1929            "partition-spec" : [ {
1930              "name" : "vendor_id",
1931              "transform" : "identity",
1932              "source-id" : 1,
1933              "field-id" : 1000
1934            } ],
1935            "last-partition-id" : 1000,
1936            "default-sort-order-id" : 0,
1937            "sort-orders" : [ {
1938              "order-id" : 0,
1939              "fields" : [ ]
1940            } ],
1941            "properties" : {
1942              "owner" : "root"
1943            },
1944            "current-snapshot-id" : 638933773299822130,
1945            "refs" : {
1946              "main" : {
1947                "snapshot-id" : 638933773299822130,
1948                "type" : "branch"
1949              }
1950            },
1951            "snapshots" : [ {
1952              "snapshot-id" : 638933773299822130,
1953              "timestamp-ms" : 1662532818843,
1954              "sequence-number" : 0,
1955              "summary" : {
1956                "operation" : "append",
1957                "spark.app.id" : "local-1662532784305",
1958                "added-data-files" : "4",
1959                "added-records" : "4",
1960                "added-files-size" : "6001"
1961              },
1962              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1963              "schema-id" : 0
1964            } ],
1965            "snapshot-log" : [ {
1966              "timestamp-ms" : 1662532818843,
1967              "snapshot-id" : 638933773299822130
1968            } ],
1969            "metadata-log" : [ {
1970              "timestamp-ms" : 1662532805245,
1971              "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
1972            } ]
1973          }
1974        "#;
1975
1976        let schema = Schema::builder()
1977            .with_fields(vec![
1978                Arc::new(NestedField::optional(
1979                    1,
1980                    "vendor_id",
1981                    Type::Primitive(PrimitiveType::Long),
1982                )),
1983                Arc::new(NestedField::optional(
1984                    2,
1985                    "trip_id",
1986                    Type::Primitive(PrimitiveType::Long),
1987                )),
1988                Arc::new(NestedField::optional(
1989                    3,
1990                    "trip_distance",
1991                    Type::Primitive(PrimitiveType::Float),
1992                )),
1993                Arc::new(NestedField::optional(
1994                    4,
1995                    "fare_amount",
1996                    Type::Primitive(PrimitiveType::Double),
1997                )),
1998                Arc::new(NestedField::optional(
1999                    5,
2000                    "store_and_fwd_flag",
2001                    Type::Primitive(PrimitiveType::String),
2002                )),
2003            ])
2004            .build()
2005            .unwrap();
2006
2007        let schema = Arc::new(schema);
2008        let partition_spec = PartitionSpec::builder(schema.clone())
2009            .with_spec_id(0)
2010            .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
2011            .unwrap()
2012            .build()
2013            .unwrap();
2014
2015        let sort_order = SortOrder::builder()
2016            .with_order_id(0)
2017            .build_unbound()
2018            .unwrap();
2019
2020        let snapshot = Snapshot::builder()
2021            .with_snapshot_id(638933773299822130)
2022            .with_timestamp_ms(1662532818843)
2023            .with_sequence_number(0)
2024            .with_schema_id(0)
2025            .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
2026            .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
2027            .build();
2028
2029        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2030        let expected = TableMetadata {
2031            format_version: FormatVersion::V1,
2032            table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
2033            location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
2034            last_updated_ms: 1662532818843,
2035            last_column_id: 5,
2036            schemas: HashMap::from_iter(vec![(0, schema)]),
2037            current_schema_id: 0,
2038            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2039            default_partition_type,
2040            default_spec: Arc::new(partition_spec),
2041            last_partition_id: 1000,
2042            default_sort_order_id: 0,
2043            sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
2044            snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
2045            current_snapshot_id: Some(638933773299822130),
2046            last_sequence_number: 0,
2047            properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
2048            snapshot_log: vec![SnapshotLog {
2049                snapshot_id: 638933773299822130,
2050                timestamp_ms: 1662532818843,
2051            }],
2052            metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
2053            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
2054            statistics: HashMap::new(),
2055            partition_statistics: HashMap::new(),
2056            encryption_keys: HashMap::new(),
2057            next_row_id: INITIAL_ROW_ID,
2058        };
2059
2060        check_table_metadata_serde(data, expected);
2061    }
2062
2063    #[test]
2064    fn test_table_data_v2_no_snapshots() {
2065        let data = r#"
2066        {
2067            "format-version" : 2,
2068            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2069            "location": "s3://b/wh/data.db/table",
2070            "last-sequence-number" : 1,
2071            "last-updated-ms": 1515100955770,
2072            "last-column-id": 1,
2073            "schemas": [
2074                {
2075                    "schema-id" : 1,
2076                    "type" : "struct",
2077                    "fields" :[
2078                        {
2079                            "id": 1,
2080                            "name": "struct_name",
2081                            "required": true,
2082                            "type": "fixed[1]"
2083                        }
2084                    ]
2085                }
2086            ],
2087            "current-schema-id" : 1,
2088            "partition-specs": [
2089                {
2090                    "spec-id": 0,
2091                    "fields": []
2092                }
2093            ],
2094            "refs": {},
2095            "default-spec-id": 0,
2096            "last-partition-id": 1000,
2097            "metadata-log": [
2098                {
2099                    "metadata-file": "s3://bucket/.../v1.json",
2100                    "timestamp-ms": 1515100
2101                }
2102            ],
2103            "sort-orders": [
2104                {
2105                "order-id": 0,
2106                "fields": []
2107                }
2108            ],
2109            "default-sort-order-id": 0
2110        }
2111        "#;
2112
2113        let schema = Schema::builder()
2114            .with_schema_id(1)
2115            .with_fields(vec![Arc::new(NestedField::required(
2116                1,
2117                "struct_name",
2118                Type::Primitive(PrimitiveType::Fixed(1)),
2119            ))])
2120            .build()
2121            .unwrap();
2122
2123        let partition_spec = PartitionSpec::builder(schema.clone())
2124            .with_spec_id(0)
2125            .build()
2126            .unwrap();
2127
2128        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2129        let expected = TableMetadata {
2130            format_version: FormatVersion::V2,
2131            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
2132            location: "s3://b/wh/data.db/table".to_string(),
2133            last_updated_ms: 1515100955770,
2134            last_column_id: 1,
2135            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
2136            current_schema_id: 1,
2137            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2138            default_partition_type,
2139            default_spec: partition_spec.into(),
2140            last_partition_id: 1000,
2141            default_sort_order_id: 0,
2142            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2143            snapshots: HashMap::default(),
2144            current_snapshot_id: None,
2145            last_sequence_number: 1,
2146            properties: HashMap::new(),
2147            snapshot_log: Vec::new(),
2148            metadata_log: vec![MetadataLog {
2149                metadata_file: "s3://bucket/.../v1.json".to_string(),
2150                timestamp_ms: 1515100,
2151            }],
2152            refs: HashMap::new(),
2153            statistics: HashMap::new(),
2154            partition_statistics: HashMap::new(),
2155            encryption_keys: HashMap::new(),
2156            next_row_id: INITIAL_ROW_ID,
2157        };
2158
2159        let expected_json_value = serde_json::to_value(&expected).unwrap();
2160        check_table_metadata_serde(data, expected);
2161
2162        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
2163        assert_eq!(json_value, expected_json_value);
2164    }
2165
2166    #[test]
2167    fn test_current_snapshot_id_must_match_main_branch() {
2168        let data = r#"
2169        {
2170            "format-version" : 2,
2171            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2172            "location": "s3://b/wh/data.db/table",
2173            "last-sequence-number" : 1,
2174            "last-updated-ms": 1515100955770,
2175            "last-column-id": 1,
2176            "schemas": [
2177                {
2178                    "schema-id" : 1,
2179                    "type" : "struct",
2180                    "fields" :[
2181                        {
2182                            "id": 1,
2183                            "name": "struct_name",
2184                            "required": true,
2185                            "type": "fixed[1]"
2186                        },
2187                        {
2188                            "id": 4,
2189                            "name": "ts",
2190                            "required": true,
2191                            "type": "timestamp"
2192                        }
2193                    ]
2194                }
2195            ],
2196            "current-schema-id" : 1,
2197            "partition-specs": [
2198                {
2199                    "spec-id": 0,
2200                    "fields": [
2201                        {
2202                            "source-id": 4,
2203                            "field-id": 1000,
2204                            "name": "ts_day",
2205                            "transform": "day"
2206                        }
2207                    ]
2208                }
2209            ],
2210            "default-spec-id": 0,
2211            "last-partition-id": 1000,
2212            "properties": {
2213                "commit.retry.num-retries": "1"
2214            },
2215            "metadata-log": [
2216                {
2217                    "metadata-file": "s3://bucket/.../v1.json",
2218                    "timestamp-ms": 1515100
2219                }
2220            ],
2221            "sort-orders": [
2222                {
2223                "order-id": 0,
2224                "fields": []
2225                }
2226            ],
2227            "default-sort-order-id": 0,
2228            "current-snapshot-id" : 1,
2229            "refs" : {
2230              "main" : {
2231                "snapshot-id" : 2,
2232                "type" : "branch"
2233              }
2234            },
2235            "snapshots" : [ {
2236              "snapshot-id" : 1,
2237              "timestamp-ms" : 1662532818843,
2238              "sequence-number" : 0,
2239              "summary" : {
2240                "operation" : "append",
2241                "spark.app.id" : "local-1662532784305",
2242                "added-data-files" : "4",
2243                "added-records" : "4",
2244                "added-files-size" : "6001"
2245              },
2246              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2247              "schema-id" : 0
2248            },
2249            {
2250              "snapshot-id" : 2,
2251              "timestamp-ms" : 1662532818844,
2252              "sequence-number" : 0,
2253              "summary" : {
2254                "operation" : "append",
2255                "spark.app.id" : "local-1662532784305",
2256                "added-data-files" : "4",
2257                "added-records" : "4",
2258                "added-files-size" : "6001"
2259              },
2260              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2261              "schema-id" : 0
2262            } ]
2263        }
2264    "#;
2265
2266        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2267        assert!(
2268            err.to_string()
2269                .contains("Current snapshot id does not match main branch")
2270        );
2271    }
2272
2273    #[test]
2274    fn test_main_without_current() {
2275        let data = r#"
2276        {
2277            "format-version" : 2,
2278            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2279            "location": "s3://b/wh/data.db/table",
2280            "last-sequence-number" : 1,
2281            "last-updated-ms": 1515100955770,
2282            "last-column-id": 1,
2283            "schemas": [
2284                {
2285                    "schema-id" : 1,
2286                    "type" : "struct",
2287                    "fields" :[
2288                        {
2289                            "id": 1,
2290                            "name": "struct_name",
2291                            "required": true,
2292                            "type": "fixed[1]"
2293                        },
2294                        {
2295                            "id": 4,
2296                            "name": "ts",
2297                            "required": true,
2298                            "type": "timestamp"
2299                        }
2300                    ]
2301                }
2302            ],
2303            "current-schema-id" : 1,
2304            "partition-specs": [
2305                {
2306                    "spec-id": 0,
2307                    "fields": [
2308                        {
2309                            "source-id": 4,
2310                            "field-id": 1000,
2311                            "name": "ts_day",
2312                            "transform": "day"
2313                        }
2314                    ]
2315                }
2316            ],
2317            "default-spec-id": 0,
2318            "last-partition-id": 1000,
2319            "properties": {
2320                "commit.retry.num-retries": "1"
2321            },
2322            "metadata-log": [
2323                {
2324                    "metadata-file": "s3://bucket/.../v1.json",
2325                    "timestamp-ms": 1515100
2326                }
2327            ],
2328            "sort-orders": [
2329                {
2330                "order-id": 0,
2331                "fields": []
2332                }
2333            ],
2334            "default-sort-order-id": 0,
2335            "refs" : {
2336              "main" : {
2337                "snapshot-id" : 1,
2338                "type" : "branch"
2339              }
2340            },
2341            "snapshots" : [ {
2342              "snapshot-id" : 1,
2343              "timestamp-ms" : 1662532818843,
2344              "sequence-number" : 0,
2345              "summary" : {
2346                "operation" : "append",
2347                "spark.app.id" : "local-1662532784305",
2348                "added-data-files" : "4",
2349                "added-records" : "4",
2350                "added-files-size" : "6001"
2351              },
2352              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2353              "schema-id" : 0
2354            } ]
2355        }
2356    "#;
2357
2358        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2359        assert!(
2360            err.to_string()
2361                .contains("Current snapshot is not set, but main branch exists")
2362        );
2363    }
2364
2365    #[test]
2366    fn test_branch_snapshot_missing() {
2367        let data = r#"
2368        {
2369            "format-version" : 2,
2370            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2371            "location": "s3://b/wh/data.db/table",
2372            "last-sequence-number" : 1,
2373            "last-updated-ms": 1515100955770,
2374            "last-column-id": 1,
2375            "schemas": [
2376                {
2377                    "schema-id" : 1,
2378                    "type" : "struct",
2379                    "fields" :[
2380                        {
2381                            "id": 1,
2382                            "name": "struct_name",
2383                            "required": true,
2384                            "type": "fixed[1]"
2385                        },
2386                        {
2387                            "id": 4,
2388                            "name": "ts",
2389                            "required": true,
2390                            "type": "timestamp"
2391                        }
2392                    ]
2393                }
2394            ],
2395            "current-schema-id" : 1,
2396            "partition-specs": [
2397                {
2398                    "spec-id": 0,
2399                    "fields": [
2400                        {
2401                            "source-id": 4,
2402                            "field-id": 1000,
2403                            "name": "ts_day",
2404                            "transform": "day"
2405                        }
2406                    ]
2407                }
2408            ],
2409            "default-spec-id": 0,
2410            "last-partition-id": 1000,
2411            "properties": {
2412                "commit.retry.num-retries": "1"
2413            },
2414            "metadata-log": [
2415                {
2416                    "metadata-file": "s3://bucket/.../v1.json",
2417                    "timestamp-ms": 1515100
2418                }
2419            ],
2420            "sort-orders": [
2421                {
2422                "order-id": 0,
2423                "fields": []
2424                }
2425            ],
2426            "default-sort-order-id": 0,
2427            "refs" : {
2428              "main" : {
2429                "snapshot-id" : 1,
2430                "type" : "branch"
2431              },
2432              "foo" : {
2433                "snapshot-id" : 2,
2434                "type" : "branch"
2435              }
2436            },
2437            "snapshots" : [ {
2438              "snapshot-id" : 1,
2439              "timestamp-ms" : 1662532818843,
2440              "sequence-number" : 0,
2441              "summary" : {
2442                "operation" : "append",
2443                "spark.app.id" : "local-1662532784305",
2444                "added-data-files" : "4",
2445                "added-records" : "4",
2446                "added-files-size" : "6001"
2447              },
2448              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2449              "schema-id" : 0
2450            } ]
2451        }
2452    "#;
2453
2454        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2455        assert!(
2456            err.to_string().contains(
2457                "Snapshot for reference foo does not exist in the existing snapshots list"
2458            )
2459        );
2460    }
2461
2462    #[test]
2463    fn test_v2_wrong_max_snapshot_sequence_number() {
2464        let data = r#"
2465        {
2466            "format-version": 2,
2467            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2468            "location": "s3://bucket/test/location",
2469            "last-sequence-number": 1,
2470            "last-updated-ms": 1602638573590,
2471            "last-column-id": 3,
2472            "current-schema-id": 0,
2473            "schemas": [
2474                {
2475                    "type": "struct",
2476                    "schema-id": 0,
2477                    "fields": [
2478                        {
2479                            "id": 1,
2480                            "name": "x",
2481                            "required": true,
2482                            "type": "long"
2483                        }
2484                    ]
2485                }
2486            ],
2487            "default-spec-id": 0,
2488            "partition-specs": [
2489                {
2490                    "spec-id": 0,
2491                    "fields": []
2492                }
2493            ],
2494            "last-partition-id": 1000,
2495            "default-sort-order-id": 0,
2496            "sort-orders": [
2497                {
2498                    "order-id": 0,
2499                    "fields": []
2500                }
2501            ],
2502            "properties": {},
2503            "current-snapshot-id": 3055729675574597004,
2504            "snapshots": [
2505                {
2506                    "snapshot-id": 3055729675574597004,
2507                    "timestamp-ms": 1555100955770,
2508                    "sequence-number": 4,
2509                    "summary": {
2510                        "operation": "append"
2511                    },
2512                    "manifest-list": "s3://a/b/2.avro",
2513                    "schema-id": 0
2514                }
2515            ],
2516            "statistics": [],
2517            "snapshot-log": [],
2518            "metadata-log": []
2519        }
2520    "#;
2521
2522        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2523        assert!(err.to_string().contains(
2524            "Invalid snapshot with id 3055729675574597004 and sequence number 4 greater than last sequence number 1"
2525        ));
2526
2527        // Change max sequence number to 4 - should work
2528        let data = data.replace(
2529            r#""last-sequence-number": 1,"#,
2530            r#""last-sequence-number": 4,"#,
2531        );
2532        let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2533        assert_eq!(metadata.last_sequence_number, 4);
2534
2535        // Change max sequence number to 5 - should work
2536        let data = data.replace(
2537            r#""last-sequence-number": 4,"#,
2538            r#""last-sequence-number": 5,"#,
2539        );
2540        let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2541        assert_eq!(metadata.last_sequence_number, 5);
2542    }
2543
2544    #[test]
2545    fn test_statistic_files() {
2546        let data = r#"
2547        {
2548            "format-version": 2,
2549            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2550            "location": "s3://bucket/test/location",
2551            "last-sequence-number": 34,
2552            "last-updated-ms": 1602638573590,
2553            "last-column-id": 3,
2554            "current-schema-id": 0,
2555            "schemas": [
2556                {
2557                    "type": "struct",
2558                    "schema-id": 0,
2559                    "fields": [
2560                        {
2561                            "id": 1,
2562                            "name": "x",
2563                            "required": true,
2564                            "type": "long"
2565                        }
2566                    ]
2567                }
2568            ],
2569            "default-spec-id": 0,
2570            "partition-specs": [
2571                {
2572                    "spec-id": 0,
2573                    "fields": []
2574                }
2575            ],
2576            "last-partition-id": 1000,
2577            "default-sort-order-id": 0,
2578            "sort-orders": [
2579                {
2580                    "order-id": 0,
2581                    "fields": []
2582                }
2583            ],
2584            "properties": {},
2585            "current-snapshot-id": 3055729675574597004,
2586            "snapshots": [
2587                {
2588                    "snapshot-id": 3055729675574597004,
2589                    "timestamp-ms": 1555100955770,
2590                    "sequence-number": 1,
2591                    "summary": {
2592                        "operation": "append"
2593                    },
2594                    "manifest-list": "s3://a/b/2.avro",
2595                    "schema-id": 0
2596                }
2597            ],
2598            "statistics": [
2599                {
2600                    "snapshot-id": 3055729675574597004,
2601                    "statistics-path": "s3://a/b/stats.puffin",
2602                    "file-size-in-bytes": 413,
2603                    "file-footer-size-in-bytes": 42,
2604                    "blob-metadata": [
2605                        {
2606                            "type": "ndv",
2607                            "snapshot-id": 3055729675574597004,
2608                            "sequence-number": 1,
2609                            "fields": [
2610                                1
2611                            ]
2612                        }
2613                    ]
2614                }
2615            ],
2616            "snapshot-log": [],
2617            "metadata-log": []
2618        }
2619    "#;
2620
2621        let schema = Schema::builder()
2622            .with_schema_id(0)
2623            .with_fields(vec![Arc::new(NestedField::required(
2624                1,
2625                "x",
2626                Type::Primitive(PrimitiveType::Long),
2627            ))])
2628            .build()
2629            .unwrap();
2630        let partition_spec = PartitionSpec::builder(schema.clone())
2631            .with_spec_id(0)
2632            .build()
2633            .unwrap();
2634        let snapshot = Snapshot::builder()
2635            .with_snapshot_id(3055729675574597004)
2636            .with_timestamp_ms(1555100955770)
2637            .with_sequence_number(1)
2638            .with_manifest_list("s3://a/b/2.avro")
2639            .with_schema_id(0)
2640            .with_summary(Summary {
2641                operation: Operation::Append,
2642                additional_properties: HashMap::new(),
2643            })
2644            .build();
2645
2646        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2647        let expected = TableMetadata {
2648            format_version: FormatVersion::V2,
2649            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2650            location: "s3://bucket/test/location".to_string(),
2651            last_updated_ms: 1602638573590,
2652            last_column_id: 3,
2653            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2654            current_schema_id: 0,
2655            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2656            default_partition_type,
2657            default_spec: Arc::new(partition_spec),
2658            last_partition_id: 1000,
2659            default_sort_order_id: 0,
2660            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2661            snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2662            current_snapshot_id: Some(3055729675574597004),
2663            last_sequence_number: 34,
2664            properties: HashMap::new(),
2665            snapshot_log: Vec::new(),
2666            metadata_log: Vec::new(),
2667            statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile {
2668                snapshot_id: 3055729675574597004,
2669                statistics_path: "s3://a/b/stats.puffin".to_string(),
2670                file_size_in_bytes: 413,
2671                file_footer_size_in_bytes: 42,
2672                key_metadata: None,
2673                blob_metadata: vec![BlobMetadata {
2674                    snapshot_id: 3055729675574597004,
2675                    sequence_number: 1,
2676                    fields: vec![1],
2677                    r#type: "ndv".to_string(),
2678                    properties: HashMap::new(),
2679                }],
2680            })]),
2681            partition_statistics: HashMap::new(),
2682            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2683                snapshot_id: 3055729675574597004,
2684                retention: SnapshotRetention::Branch {
2685                    min_snapshots_to_keep: None,
2686                    max_snapshot_age_ms: None,
2687                    max_ref_age_ms: None,
2688                },
2689            })]),
2690            encryption_keys: HashMap::new(),
2691            next_row_id: INITIAL_ROW_ID,
2692        };
2693
2694        check_table_metadata_serde(data, expected);
2695    }
2696
2697    #[test]
2698    fn test_partition_statistics_file() {
2699        let data = r#"
2700        {
2701            "format-version": 2,
2702            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2703            "location": "s3://bucket/test/location",
2704            "last-sequence-number": 34,
2705            "last-updated-ms": 1602638573590,
2706            "last-column-id": 3,
2707            "current-schema-id": 0,
2708            "schemas": [
2709                {
2710                    "type": "struct",
2711                    "schema-id": 0,
2712                    "fields": [
2713                        {
2714                            "id": 1,
2715                            "name": "x",
2716                            "required": true,
2717                            "type": "long"
2718                        }
2719                    ]
2720                }
2721            ],
2722            "default-spec-id": 0,
2723            "partition-specs": [
2724                {
2725                    "spec-id": 0,
2726                    "fields": []
2727                }
2728            ],
2729            "last-partition-id": 1000,
2730            "default-sort-order-id": 0,
2731            "sort-orders": [
2732                {
2733                    "order-id": 0,
2734                    "fields": []
2735                }
2736            ],
2737            "properties": {},
2738            "current-snapshot-id": 3055729675574597004,
2739            "snapshots": [
2740                {
2741                    "snapshot-id": 3055729675574597004,
2742                    "timestamp-ms": 1555100955770,
2743                    "sequence-number": 1,
2744                    "summary": {
2745                        "operation": "append"
2746                    },
2747                    "manifest-list": "s3://a/b/2.avro",
2748                    "schema-id": 0
2749                }
2750            ],
2751            "partition-statistics": [
2752                {
2753                    "snapshot-id": 3055729675574597004,
2754                    "statistics-path": "s3://a/b/partition-stats.parquet",
2755                    "file-size-in-bytes": 43
2756                }
2757            ],
2758            "snapshot-log": [],
2759            "metadata-log": []
2760        }
2761        "#;
2762
2763        let schema = Schema::builder()
2764            .with_schema_id(0)
2765            .with_fields(vec![Arc::new(NestedField::required(
2766                1,
2767                "x",
2768                Type::Primitive(PrimitiveType::Long),
2769            ))])
2770            .build()
2771            .unwrap();
2772        let partition_spec = PartitionSpec::builder(schema.clone())
2773            .with_spec_id(0)
2774            .build()
2775            .unwrap();
2776        let snapshot = Snapshot::builder()
2777            .with_snapshot_id(3055729675574597004)
2778            .with_timestamp_ms(1555100955770)
2779            .with_sequence_number(1)
2780            .with_manifest_list("s3://a/b/2.avro")
2781            .with_schema_id(0)
2782            .with_summary(Summary {
2783                operation: Operation::Append,
2784                additional_properties: HashMap::new(),
2785            })
2786            .build();
2787
2788        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2789        let expected = TableMetadata {
2790            format_version: FormatVersion::V2,
2791            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2792            location: "s3://bucket/test/location".to_string(),
2793            last_updated_ms: 1602638573590,
2794            last_column_id: 3,
2795            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2796            current_schema_id: 0,
2797            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2798            default_spec: Arc::new(partition_spec),
2799            default_partition_type,
2800            last_partition_id: 1000,
2801            default_sort_order_id: 0,
2802            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2803            snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2804            current_snapshot_id: Some(3055729675574597004),
2805            last_sequence_number: 34,
2806            properties: HashMap::new(),
2807            snapshot_log: Vec::new(),
2808            metadata_log: Vec::new(),
2809            statistics: HashMap::new(),
2810            partition_statistics: HashMap::from_iter(vec![(
2811                3055729675574597004,
2812                PartitionStatisticsFile {
2813                    snapshot_id: 3055729675574597004,
2814                    statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2815                    file_size_in_bytes: 43,
2816                },
2817            )]),
2818            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2819                snapshot_id: 3055729675574597004,
2820                retention: SnapshotRetention::Branch {
2821                    min_snapshots_to_keep: None,
2822                    max_snapshot_age_ms: None,
2823                    max_ref_age_ms: None,
2824                },
2825            })]),
2826            encryption_keys: HashMap::new(),
2827            next_row_id: INITIAL_ROW_ID,
2828        };
2829
2830        check_table_metadata_serde(data, expected);
2831    }
2832
2833    #[test]
2834    fn test_invalid_table_uuid() -> Result<()> {
2835        let data = r#"
2836            {
2837                "format-version" : 2,
2838                "table-uuid": "xxxx"
2839            }
2840        "#;
2841        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2842        Ok(())
2843    }
2844
2845    #[test]
2846    fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
2847        let data = r#"
2848            {
2849                "format-version" : 1
2850            }
2851        "#;
2852        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2853        Ok(())
2854    }
2855
2856    #[test]
2857    fn test_table_metadata_v3_valid_minimal() {
2858        let metadata_str =
2859            fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap();
2860
2861        let table_metadata = serde_json::from_str::<TableMetadata>(&metadata_str).unwrap();
2862        assert_eq!(table_metadata.format_version, FormatVersion::V3);
2863
2864        let schema = Schema::builder()
2865            .with_schema_id(0)
2866            .with_fields(vec![
2867                Arc::new(
2868                    NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long))
2869                        .with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1)))
2870                        .with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))),
2871                ),
2872                Arc::new(
2873                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2874                        .with_doc("comment"),
2875                ),
2876                Arc::new(NestedField::required(
2877                    3,
2878                    "z",
2879                    Type::Primitive(PrimitiveType::Long),
2880                )),
2881            ])
2882            .build()
2883            .unwrap();
2884
2885        let partition_spec = PartitionSpec::builder(schema.clone())
2886            .with_spec_id(0)
2887            .add_unbound_field(UnboundPartitionField {
2888                name: "x".to_string(),
2889                transform: Transform::Identity,
2890                source_id: 1,
2891                field_id: Some(1000),
2892            })
2893            .unwrap()
2894            .build()
2895            .unwrap();
2896
2897        let sort_order = SortOrder::builder()
2898            .with_order_id(3)
2899            .with_sort_field(SortField {
2900                source_id: 2,
2901                transform: Transform::Identity,
2902                direction: SortDirection::Ascending,
2903                null_order: NullOrder::First,
2904            })
2905            .with_sort_field(SortField {
2906                source_id: 3,
2907                transform: Transform::Bucket(4),
2908                direction: SortDirection::Descending,
2909                null_order: NullOrder::Last,
2910            })
2911            .build_unbound()
2912            .unwrap();
2913
2914        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2915        let expected = TableMetadata {
2916            format_version: FormatVersion::V3,
2917            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2918            location: "s3://bucket/test/location".to_string(),
2919            last_updated_ms: 1602638573590,
2920            last_column_id: 3,
2921            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2922            current_schema_id: 0,
2923            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2924            default_spec: Arc::new(partition_spec),
2925            default_partition_type,
2926            last_partition_id: 1000,
2927            default_sort_order_id: 3,
2928            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2929            snapshots: HashMap::default(),
2930            current_snapshot_id: None,
2931            last_sequence_number: 34,
2932            properties: HashMap::new(),
2933            snapshot_log: Vec::new(),
2934            metadata_log: Vec::new(),
2935            refs: HashMap::new(),
2936            statistics: HashMap::new(),
2937            partition_statistics: HashMap::new(),
2938            encryption_keys: HashMap::new(),
2939            next_row_id: 0, // V3 specific field from the JSON
2940        };
2941
2942        check_table_metadata_serde(&metadata_str, expected);
2943    }
2944
2945    #[test]
2946    fn test_table_metadata_v2_file_valid() {
2947        let metadata =
2948            fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
2949
2950        let schema1 = Schema::builder()
2951            .with_schema_id(0)
2952            .with_fields(vec![Arc::new(NestedField::required(
2953                1,
2954                "x",
2955                Type::Primitive(PrimitiveType::Long),
2956            ))])
2957            .build()
2958            .unwrap();
2959
2960        let schema2 = Schema::builder()
2961            .with_schema_id(1)
2962            .with_fields(vec![
2963                Arc::new(NestedField::required(
2964                    1,
2965                    "x",
2966                    Type::Primitive(PrimitiveType::Long),
2967                )),
2968                Arc::new(
2969                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2970                        .with_doc("comment"),
2971                ),
2972                Arc::new(NestedField::required(
2973                    3,
2974                    "z",
2975                    Type::Primitive(PrimitiveType::Long),
2976                )),
2977            ])
2978            .with_identifier_field_ids(vec![1, 2])
2979            .build()
2980            .unwrap();
2981
2982        let partition_spec = PartitionSpec::builder(schema2.clone())
2983            .with_spec_id(0)
2984            .add_unbound_field(UnboundPartitionField {
2985                name: "x".to_string(),
2986                transform: Transform::Identity,
2987                source_id: 1,
2988                field_id: Some(1000),
2989            })
2990            .unwrap()
2991            .build()
2992            .unwrap();
2993
2994        let sort_order = SortOrder::builder()
2995            .with_order_id(3)
2996            .with_sort_field(SortField {
2997                source_id: 2,
2998                transform: Transform::Identity,
2999                direction: SortDirection::Ascending,
3000                null_order: NullOrder::First,
3001            })
3002            .with_sort_field(SortField {
3003                source_id: 3,
3004                transform: Transform::Bucket(4),
3005                direction: SortDirection::Descending,
3006                null_order: NullOrder::Last,
3007            })
3008            .build_unbound()
3009            .unwrap();
3010
3011        let snapshot1 = Snapshot::builder()
3012            .with_snapshot_id(3051729675574597004)
3013            .with_timestamp_ms(1515100955770)
3014            .with_sequence_number(0)
3015            .with_manifest_list("s3://a/b/1.avro")
3016            .with_summary(Summary {
3017                operation: Operation::Append,
3018                additional_properties: HashMap::new(),
3019            })
3020            .build();
3021
3022        let snapshot2 = Snapshot::builder()
3023            .with_snapshot_id(3055729675574597004)
3024            .with_parent_snapshot_id(Some(3051729675574597004))
3025            .with_timestamp_ms(1555100955770)
3026            .with_sequence_number(1)
3027            .with_schema_id(1)
3028            .with_manifest_list("s3://a/b/2.avro")
3029            .with_summary(Summary {
3030                operation: Operation::Append,
3031                additional_properties: HashMap::new(),
3032            })
3033            .build();
3034
3035        let default_partition_type = partition_spec.partition_type(&schema2).unwrap();
3036        let expected = TableMetadata {
3037            format_version: FormatVersion::V2,
3038            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3039            location: "s3://bucket/test/location".to_string(),
3040            last_updated_ms: 1602638573590,
3041            last_column_id: 3,
3042            schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]),
3043            current_schema_id: 1,
3044            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3045            default_spec: Arc::new(partition_spec),
3046            default_partition_type,
3047            last_partition_id: 1000,
3048            default_sort_order_id: 3,
3049            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3050            snapshots: HashMap::from_iter(vec![
3051                (3051729675574597004, Arc::new(snapshot1)),
3052                (3055729675574597004, Arc::new(snapshot2)),
3053            ]),
3054            current_snapshot_id: Some(3055729675574597004),
3055            last_sequence_number: 34,
3056            properties: HashMap::new(),
3057            snapshot_log: vec![
3058                SnapshotLog {
3059                    snapshot_id: 3051729675574597004,
3060                    timestamp_ms: 1515100955770,
3061                },
3062                SnapshotLog {
3063                    snapshot_id: 3055729675574597004,
3064                    timestamp_ms: 1555100955770,
3065                },
3066            ],
3067            metadata_log: Vec::new(),
3068            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
3069                snapshot_id: 3055729675574597004,
3070                retention: SnapshotRetention::Branch {
3071                    min_snapshots_to_keep: None,
3072                    max_snapshot_age_ms: None,
3073                    max_ref_age_ms: None,
3074                },
3075            })]),
3076            statistics: HashMap::new(),
3077            partition_statistics: HashMap::new(),
3078            encryption_keys: HashMap::new(),
3079            next_row_id: INITIAL_ROW_ID,
3080        };
3081
3082        check_table_metadata_serde(&metadata, expected);
3083    }
3084
3085    #[test]
3086    fn test_table_metadata_v2_file_valid_minimal() {
3087        let metadata =
3088            fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
3089
3090        let schema = Schema::builder()
3091            .with_schema_id(0)
3092            .with_fields(vec![
3093                Arc::new(NestedField::required(
3094                    1,
3095                    "x",
3096                    Type::Primitive(PrimitiveType::Long),
3097                )),
3098                Arc::new(
3099                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3100                        .with_doc("comment"),
3101                ),
3102                Arc::new(NestedField::required(
3103                    3,
3104                    "z",
3105                    Type::Primitive(PrimitiveType::Long),
3106                )),
3107            ])
3108            .build()
3109            .unwrap();
3110
3111        let partition_spec = PartitionSpec::builder(schema.clone())
3112            .with_spec_id(0)
3113            .add_unbound_field(UnboundPartitionField {
3114                name: "x".to_string(),
3115                transform: Transform::Identity,
3116                source_id: 1,
3117                field_id: Some(1000),
3118            })
3119            .unwrap()
3120            .build()
3121            .unwrap();
3122
3123        let sort_order = SortOrder::builder()
3124            .with_order_id(3)
3125            .with_sort_field(SortField {
3126                source_id: 2,
3127                transform: Transform::Identity,
3128                direction: SortDirection::Ascending,
3129                null_order: NullOrder::First,
3130            })
3131            .with_sort_field(SortField {
3132                source_id: 3,
3133                transform: Transform::Bucket(4),
3134                direction: SortDirection::Descending,
3135                null_order: NullOrder::Last,
3136            })
3137            .build_unbound()
3138            .unwrap();
3139
3140        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3141        let expected = TableMetadata {
3142            format_version: FormatVersion::V2,
3143            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3144            location: "s3://bucket/test/location".to_string(),
3145            last_updated_ms: 1602638573590,
3146            last_column_id: 3,
3147            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3148            current_schema_id: 0,
3149            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3150            default_partition_type,
3151            default_spec: Arc::new(partition_spec),
3152            last_partition_id: 1000,
3153            default_sort_order_id: 3,
3154            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3155            snapshots: HashMap::default(),
3156            current_snapshot_id: None,
3157            last_sequence_number: 34,
3158            properties: HashMap::new(),
3159            snapshot_log: vec![],
3160            metadata_log: Vec::new(),
3161            refs: HashMap::new(),
3162            statistics: HashMap::new(),
3163            partition_statistics: HashMap::new(),
3164            encryption_keys: HashMap::new(),
3165            next_row_id: INITIAL_ROW_ID,
3166        };
3167
3168        check_table_metadata_serde(&metadata, expected);
3169    }
3170
3171    #[test]
3172    fn test_table_metadata_v1_file_valid() {
3173        let metadata =
3174            fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
3175
3176        let schema = Schema::builder()
3177            .with_schema_id(0)
3178            .with_fields(vec![
3179                Arc::new(NestedField::required(
3180                    1,
3181                    "x",
3182                    Type::Primitive(PrimitiveType::Long),
3183                )),
3184                Arc::new(
3185                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3186                        .with_doc("comment"),
3187                ),
3188                Arc::new(NestedField::required(
3189                    3,
3190                    "z",
3191                    Type::Primitive(PrimitiveType::Long),
3192                )),
3193            ])
3194            .build()
3195            .unwrap();
3196
3197        let partition_spec = PartitionSpec::builder(schema.clone())
3198            .with_spec_id(0)
3199            .add_unbound_field(UnboundPartitionField {
3200                name: "x".to_string(),
3201                transform: Transform::Identity,
3202                source_id: 1,
3203                field_id: Some(1000),
3204            })
3205            .unwrap()
3206            .build()
3207            .unwrap();
3208
3209        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3210        let expected = TableMetadata {
3211            format_version: FormatVersion::V1,
3212            table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
3213            location: "s3://bucket/test/location".to_string(),
3214            last_updated_ms: 1602638573874,
3215            last_column_id: 3,
3216            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3217            current_schema_id: 0,
3218            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3219            default_spec: Arc::new(partition_spec),
3220            default_partition_type,
3221            last_partition_id: 0,
3222            default_sort_order_id: 0,
3223            // Sort order is added during deserialization for V2 compatibility
3224            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
3225            snapshots: HashMap::new(),
3226            current_snapshot_id: None,
3227            last_sequence_number: 0,
3228            properties: HashMap::new(),
3229            snapshot_log: vec![],
3230            metadata_log: Vec::new(),
3231            refs: HashMap::new(),
3232            statistics: HashMap::new(),
3233            partition_statistics: HashMap::new(),
3234            encryption_keys: HashMap::new(),
3235            next_row_id: INITIAL_ROW_ID,
3236        };
3237
3238        check_table_metadata_serde(&metadata, expected);
3239    }
3240
3241    #[test]
3242    fn test_table_metadata_v1_compat() {
3243        let metadata =
3244            fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
3245
3246        // Deserialize the JSON to verify it works
3247        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3248            .expect("Failed to deserialize TableMetadataV1Compat.json");
3249
3250        // Verify some key fields match
3251        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3252        assert_eq!(
3253            desered_type.uuid(),
3254            Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
3255        );
3256        assert_eq!(
3257            desered_type.location(),
3258            "s3://bucket/warehouse/iceberg/glue.db/table_name"
3259        );
3260        assert_eq!(desered_type.last_updated_ms(), 1727773114005);
3261        assert_eq!(desered_type.current_schema_id(), 0);
3262    }
3263
3264    #[test]
3265    fn test_table_metadata_v1_schemas_without_current_id() {
3266        let metadata = fs::read_to_string(
3267            "testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
3268        )
3269        .unwrap();
3270
3271        // Deserialize the JSON - this should succeed by using the 'schema' field instead of 'schemas'
3272        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3273            .expect("Failed to deserialize TableMetadataV1SchemasWithoutCurrentId.json");
3274
3275        // Verify it used the 'schema' field
3276        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3277        assert_eq!(
3278            desered_type.uuid(),
3279            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3280        );
3281
3282        // Get the schema and verify it has the expected fields
3283        let schema = desered_type.current_schema();
3284        assert_eq!(schema.as_struct().fields().len(), 3);
3285        assert_eq!(schema.as_struct().fields()[0].name, "x");
3286        assert_eq!(schema.as_struct().fields()[1].name, "y");
3287        assert_eq!(schema.as_struct().fields()[2].name, "z");
3288    }
3289
3290    #[test]
3291    fn test_table_metadata_v1_no_valid_schema() {
3292        let metadata =
3293            fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
3294                .unwrap();
3295
3296        // Deserialize the JSON - this should fail because neither schemas + current_schema_id nor schema is valid
3297        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3298
3299        assert!(desered.is_err());
3300        let error_message = desered.unwrap_err().to_string();
3301        assert!(
3302            error_message.contains("No valid schema configuration found"),
3303            "Expected error about no valid schema configuration, got: {error_message}"
3304        );
3305    }
3306
3307    #[test]
3308    fn test_table_metadata_v1_partition_specs_without_default_id() {
3309        let metadata = fs::read_to_string(
3310            "testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
3311        )
3312        .unwrap();
3313
3314        // Deserialize the JSON - this should succeed by inferring default_spec_id as the max spec ID
3315        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3316            .expect("Failed to deserialize TableMetadataV1PartitionSpecsWithoutDefaultId.json");
3317
3318        // Verify basic metadata
3319        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3320        assert_eq!(
3321            desered_type.uuid(),
3322            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3323        );
3324
3325        // Verify partition specs
3326        assert_eq!(desered_type.default_partition_spec_id(), 2); // Should pick the largest spec ID (2)
3327        assert_eq!(desered_type.partition_specs.len(), 2);
3328
3329        // Verify the default spec has the expected fields
3330        let default_spec = &desered_type.default_spec;
3331        assert_eq!(default_spec.spec_id(), 2);
3332        assert_eq!(default_spec.fields().len(), 1);
3333        assert_eq!(default_spec.fields()[0].name, "y");
3334        assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
3335        assert_eq!(default_spec.fields()[0].source_id, 2);
3336    }
3337
3338    #[test]
3339    fn test_table_metadata_v2_schema_not_found() {
3340        let metadata =
3341            fs::read_to_string("testdata/table_metadata/TableMetadataV2CurrentSchemaNotFound.json")
3342                .unwrap();
3343
3344        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3345
3346        assert_eq!(
3347            desered.unwrap_err().to_string(),
3348            "DataInvalid => No schema exists with the current schema id 2."
3349        )
3350    }
3351
3352    #[test]
3353    fn test_table_metadata_v2_missing_sort_order() {
3354        let metadata =
3355            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
3356                .unwrap();
3357
3358        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3359
3360        assert_eq!(
3361            desered.unwrap_err().to_string(),
3362            "data did not match any variant of untagged enum TableMetadataEnum"
3363        )
3364    }
3365
3366    #[test]
3367    fn test_table_metadata_v2_missing_partition_specs() {
3368        let metadata =
3369            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingPartitionSpecs.json")
3370                .unwrap();
3371
3372        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3373
3374        assert_eq!(
3375            desered.unwrap_err().to_string(),
3376            "data did not match any variant of untagged enum TableMetadataEnum"
3377        )
3378    }
3379
3380    #[test]
3381    fn test_table_metadata_v2_missing_last_partition_id() {
3382        let metadata = fs::read_to_string(
3383            "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json",
3384        )
3385        .unwrap();
3386
3387        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3388
3389        assert_eq!(
3390            desered.unwrap_err().to_string(),
3391            "data did not match any variant of untagged enum TableMetadataEnum"
3392        )
3393    }
3394
3395    #[test]
3396    fn test_table_metadata_v2_missing_schemas() {
3397        let metadata =
3398            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSchemas.json")
3399                .unwrap();
3400
3401        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3402
3403        assert_eq!(
3404            desered.unwrap_err().to_string(),
3405            "data did not match any variant of untagged enum TableMetadataEnum"
3406        )
3407    }
3408
3409    #[test]
3410    fn test_table_metadata_v2_unsupported_version() {
3411        let metadata =
3412            fs::read_to_string("testdata/table_metadata/TableMetadataUnsupportedVersion.json")
3413                .unwrap();
3414
3415        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3416
3417        assert_eq!(
3418            desered.unwrap_err().to_string(),
3419            "data did not match any variant of untagged enum TableMetadataEnum"
3420        )
3421    }
3422
3423    #[test]
3424    fn test_order_of_format_version() {
3425        assert!(FormatVersion::V1 < FormatVersion::V2);
3426        assert_eq!(FormatVersion::V1, FormatVersion::V1);
3427        assert_eq!(FormatVersion::V2, FormatVersion::V2);
3428    }
3429
3430    #[test]
3431    fn test_default_partition_spec() {
3432        let default_spec_id = 1234;
3433        let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3434        let partition_spec = PartitionSpec::unpartition_spec();
3435        table_meta_data.default_spec = partition_spec.clone().into();
3436        table_meta_data
3437            .partition_specs
3438            .insert(default_spec_id, Arc::new(partition_spec));
3439
3440        assert_eq!(
3441            (*table_meta_data.default_partition_spec().clone()).clone(),
3442            (*table_meta_data
3443                .partition_spec_by_id(default_spec_id)
3444                .unwrap()
3445                .clone())
3446            .clone()
3447        );
3448    }
3449    #[test]
3450    fn test_default_sort_order() {
3451        let default_sort_order_id = 1234;
3452        let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3453        table_meta_data.default_sort_order_id = default_sort_order_id;
3454        table_meta_data
3455            .sort_orders
3456            .insert(default_sort_order_id, Arc::new(SortOrder::default()));
3457
3458        assert_eq!(
3459            table_meta_data.default_sort_order(),
3460            table_meta_data
3461                .sort_orders
3462                .get(&default_sort_order_id)
3463                .unwrap()
3464        )
3465    }
3466
3467    #[test]
3468    fn test_table_metadata_builder_from_table_creation() {
3469        let table_creation = TableCreation::builder()
3470            .location("s3://db/table".to_string())
3471            .name("table".to_string())
3472            .properties(HashMap::new())
3473            .schema(Schema::builder().build().unwrap())
3474            .build();
3475        let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3476            .unwrap()
3477            .build()
3478            .unwrap()
3479            .metadata;
3480        assert_eq!(table_metadata.location, "s3://db/table");
3481        assert_eq!(table_metadata.schemas.len(), 1);
3482        assert_eq!(
3483            table_metadata
3484                .schemas
3485                .get(&0)
3486                .unwrap()
3487                .as_struct()
3488                .fields()
3489                .len(),
3490            0
3491        );
3492        assert_eq!(table_metadata.properties.len(), 0);
3493        assert_eq!(
3494            table_metadata.partition_specs,
3495            HashMap::from([(
3496                0,
3497                Arc::new(
3498                    PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
3499                        .with_spec_id(0)
3500                        .build()
3501                        .unwrap()
3502                )
3503            )])
3504        );
3505        assert_eq!(
3506            table_metadata.sort_orders,
3507            HashMap::from([(
3508                0,
3509                Arc::new(SortOrder {
3510                    order_id: 0,
3511                    fields: vec![]
3512                })
3513            )])
3514        );
3515    }
3516
3517    #[tokio::test]
3518    async fn test_table_metadata_read_write() {
3519        // Create a temporary directory for our test
3520        let temp_dir = TempDir::new().unwrap();
3521        let temp_path = temp_dir.path().to_str().unwrap();
3522
3523        // Create a FileIO instance
3524        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3525
3526        // Use an existing test metadata from the test files
3527        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3528
3529        // Define the metadata location
3530        let metadata_location = format!("{temp_path}/metadata.json");
3531
3532        // Write the metadata
3533        original_metadata
3534            .write_to(&file_io, &metadata_location)
3535            .await
3536            .unwrap();
3537
3538        // Verify the file exists
3539        assert!(fs::metadata(&metadata_location).is_ok());
3540
3541        // Read the metadata back
3542        let read_metadata = TableMetadata::read_from(&file_io, &metadata_location)
3543            .await
3544            .unwrap();
3545
3546        // Verify the metadata matches
3547        assert_eq!(read_metadata, original_metadata);
3548    }
3549
3550    #[tokio::test]
3551    async fn test_table_metadata_read_compressed() {
3552        let temp_dir = TempDir::new().unwrap();
3553        let metadata_location = temp_dir.path().join("v1.gz.metadata.json");
3554
3555        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3556        let json = serde_json::to_string(&original_metadata).unwrap();
3557
3558        let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
3559        encoder.write_all(json.as_bytes()).unwrap();
3560        std::fs::write(&metadata_location, encoder.finish().unwrap())
3561            .expect("failed to write metadata");
3562
3563        // Read the metadata back
3564        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3565        let metadata_location = metadata_location.to_str().unwrap();
3566        let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
3567            .await
3568            .unwrap();
3569
3570        // Verify the metadata matches
3571        assert_eq!(read_metadata, original_metadata);
3572    }
3573
3574    #[tokio::test]
3575    async fn test_table_metadata_read_nonexistent_file() {
3576        // Create a FileIO instance
3577        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3578
3579        // Try to read a non-existent file
3580        let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;
3581
3582        // Verify it returns an error
3583        assert!(result.is_err());
3584    }
3585
3586    #[test]
3587    fn test_partition_name_exists() {
3588        let schema = Schema::builder()
3589            .with_fields(vec![
3590                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3591                NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
3592                    .into(),
3593            ])
3594            .build()
3595            .unwrap();
3596
3597        let spec1 = PartitionSpec::builder(schema.clone())
3598            .with_spec_id(1)
3599            .add_partition_field("data", "data_partition", Transform::Identity)
3600            .unwrap()
3601            .build()
3602            .unwrap();
3603
3604        let spec2 = PartitionSpec::builder(schema.clone())
3605            .with_spec_id(2)
3606            .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
3607            .unwrap()
3608            .build()
3609            .unwrap();
3610
3611        // Build metadata with these specs
3612        let metadata = TableMetadataBuilder::new(
3613            schema,
3614            spec1.clone().into_unbound(),
3615            SortOrder::unsorted_order(),
3616            "s3://test/location".to_string(),
3617            FormatVersion::V2,
3618            HashMap::new(),
3619        )
3620        .unwrap()
3621        .add_partition_spec(spec2.into_unbound())
3622        .unwrap()
3623        .build()
3624        .unwrap()
3625        .metadata;
3626
3627        assert!(metadata.partition_name_exists("data_partition"));
3628        assert!(metadata.partition_name_exists("partition_bucket"));
3629
3630        assert!(!metadata.partition_name_exists("nonexistent_field"));
3631        assert!(!metadata.partition_name_exists("data")); // schema field name, not partition field name
3632        assert!(!metadata.partition_name_exists(""));
3633    }
3634
3635    #[test]
3636    fn test_partition_name_exists_empty_specs() {
3637        // Create metadata with no partition specs (unpartitioned table)
3638        let schema = Schema::builder()
3639            .with_fields(vec![
3640                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3641            ])
3642            .build()
3643            .unwrap();
3644
3645        let metadata = TableMetadataBuilder::new(
3646            schema,
3647            PartitionSpec::unpartition_spec().into_unbound(),
3648            SortOrder::unsorted_order(),
3649            "s3://test/location".to_string(),
3650            FormatVersion::V2,
3651            HashMap::new(),
3652        )
3653        .unwrap()
3654        .build()
3655        .unwrap()
3656        .metadata;
3657
3658        assert!(!metadata.partition_name_exists("any_field"));
3659        assert!(!metadata.partition_name_exists("data"));
3660    }
3661
3662    #[test]
3663    fn test_name_exists_in_any_schema() {
3664        // Create multiple schemas with different fields
3665        let schema1 = Schema::builder()
3666            .with_schema_id(1)
3667            .with_fields(vec![
3668                NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3669                NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
3670            ])
3671            .build()
3672            .unwrap();
3673
3674        let schema2 = Schema::builder()
3675            .with_schema_id(2)
3676            .with_fields(vec![
3677                NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3678                NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
3679            ])
3680            .build()
3681            .unwrap();
3682
3683        let metadata = TableMetadataBuilder::new(
3684            schema1,
3685            PartitionSpec::unpartition_spec().into_unbound(),
3686            SortOrder::unsorted_order(),
3687            "s3://test/location".to_string(),
3688            FormatVersion::V2,
3689            HashMap::new(),
3690        )
3691        .unwrap()
3692        .add_current_schema(schema2)
3693        .unwrap()
3694        .build()
3695        .unwrap()
3696        .metadata;
3697
3698        assert!(metadata.name_exists_in_any_schema("field1")); // exists in both schemas
3699        assert!(metadata.name_exists_in_any_schema("field2")); // exists only in schema1 (historical)
3700        assert!(metadata.name_exists_in_any_schema("field3")); // exists only in schema2 (current)
3701
3702        assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
3703        assert!(!metadata.name_exists_in_any_schema("field4"));
3704        assert!(!metadata.name_exists_in_any_schema(""));
3705    }
3706
3707    #[test]
3708    fn test_name_exists_in_any_schema_empty_schemas() {
3709        let schema = Schema::builder().with_fields(vec![]).build().unwrap();
3710
3711        let metadata = TableMetadataBuilder::new(
3712            schema,
3713            PartitionSpec::unpartition_spec().into_unbound(),
3714            SortOrder::unsorted_order(),
3715            "s3://test/location".to_string(),
3716            FormatVersion::V2,
3717            HashMap::new(),
3718        )
3719        .unwrap()
3720        .build()
3721        .unwrap()
3722        .metadata;
3723
3724        assert!(!metadata.name_exists_in_any_schema("any_field"));
3725    }
3726
3727    #[test]
3728    fn test_helper_methods_multi_version_scenario() {
3729        // Test a realistic multi-version scenario
3730        let initial_schema = Schema::builder()
3731            .with_fields(vec![
3732                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3733                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3734                NestedField::required(
3735                    3,
3736                    "deprecated_field",
3737                    Type::Primitive(PrimitiveType::String),
3738                )
3739                .into(),
3740            ])
3741            .build()
3742            .unwrap();
3743
3744        let metadata = TableMetadataBuilder::new(
3745            initial_schema,
3746            PartitionSpec::unpartition_spec().into_unbound(),
3747            SortOrder::unsorted_order(),
3748            "s3://test/location".to_string(),
3749            FormatVersion::V2,
3750            HashMap::new(),
3751        )
3752        .unwrap();
3753
3754        let evolved_schema = Schema::builder()
3755            .with_fields(vec![
3756                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3757                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3758                NestedField::required(
3759                    3,
3760                    "deprecated_field",
3761                    Type::Primitive(PrimitiveType::String),
3762                )
3763                .into(),
3764                NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3765                    .into(),
3766            ])
3767            .build()
3768            .unwrap();
3769
3770        // Then add a third schema that removes the deprecated field
3771        let _final_schema = Schema::builder()
3772            .with_fields(vec![
3773                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3774                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3775                NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3776                    .into(),
3777                NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
3778                    .into(),
3779            ])
3780            .build()
3781            .unwrap();
3782
3783        let final_metadata = metadata
3784            .add_current_schema(evolved_schema)
3785            .unwrap()
3786            .build()
3787            .unwrap()
3788            .metadata;
3789
3790        assert!(!final_metadata.partition_name_exists("nonexistent_partition")); // unpartitioned table
3791
3792        assert!(final_metadata.name_exists_in_any_schema("id")); // exists in both schemas
3793        assert!(final_metadata.name_exists_in_any_schema("name")); // exists in both schemas
3794        assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); // exists in both schemas
3795        assert!(final_metadata.name_exists_in_any_schema("new_field")); // only in current schema
3796        assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
3797    }
3798}