iceberg/spec/
table_metadata_builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use uuid::Uuid;
22
23use super::{
24    DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, FormatVersion, MAIN_BRANCH, MetadataLog,
25    ONE_MINUTE_MS, PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX,
26    PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT, PartitionSpec, PartitionSpecBuilder,
27    PartitionStatisticsFile, RESERVED_PROPERTIES, Schema, SchemaRef, Snapshot, SnapshotLog,
28    SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
29    TableMetadata, UNPARTITIONED_LAST_ASSIGNED_ID, UnboundPartitionSpec,
30};
31use crate::error::{Error, ErrorKind, Result};
32use crate::{TableCreation, TableUpdate};
33
34const FIRST_FIELD_ID: u32 = 1;
35
36/// Manipulating table metadata.
37///
38/// For this builder the order of called functions matters. Functions are applied in-order.
39/// All operations applied to the `TableMetadata` are tracked in `changes` as  a chronologically
40/// ordered vec of `TableUpdate`.
41/// If an operation does not lead to a change of the `TableMetadata`, the corresponding update
42/// is omitted from `changes`.
43///
44/// Unlike a typical builder pattern, the order of function calls matters.
45/// Some basic rules:
46/// - `add_schema` must be called before `set_current_schema`.
47/// - If a new partition spec and schema are added, the schema should be added first.
48#[derive(Debug, Clone)]
49pub struct TableMetadataBuilder {
50    metadata: TableMetadata,
51    changes: Vec<TableUpdate>,
52    last_added_schema_id: Option<i32>,
53    last_added_spec_id: Option<i32>,
54    last_added_order_id: Option<i64>,
55    // None if this is a new table (from_metadata) method not used
56    previous_history_entry: Option<MetadataLog>,
57    last_updated_ms: Option<i64>,
58}
59
60#[derive(Debug, Clone, PartialEq)]
61/// Result of modifying or creating a `TableMetadata`.
62pub struct TableMetadataBuildResult {
63    /// The new `TableMetadata`.
64    pub metadata: TableMetadata,
65    /// The changes that were applied to the metadata.
66    pub changes: Vec<TableUpdate>,
67    /// Expired metadata logs
68    pub expired_metadata_logs: Vec<MetadataLog>,
69}
70
71impl TableMetadataBuilder {
72    /// Proxy id for "last added" items, including schema, partition spec, sort order.
73    pub const LAST_ADDED: i32 = -1;
74
75    /// Create a `TableMetadata` object from scratch.
76    ///
77    /// This method re-assign ids of fields in the schema, schema.id, sort_order.id and
78    /// spec.id. It should only be used to create new table metadata from scratch.
79    pub fn new(
80        schema: Schema,
81        spec: impl Into<UnboundPartitionSpec>,
82        sort_order: SortOrder,
83        location: String,
84        format_version: FormatVersion,
85        properties: HashMap<String, String>,
86    ) -> Result<Self> {
87        // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new table.
88        let (fresh_schema, fresh_spec, fresh_sort_order) =
89            Self::reassign_ids(schema, spec.into(), sort_order)?;
90        let schema_id = fresh_schema.schema_id();
91
92        let builder = Self {
93            metadata: TableMetadata {
94                format_version,
95                table_uuid: Uuid::now_v7(),
96                location: "".to_string(), // Overwritten immediately by set_location
97                last_sequence_number: 0,
98                last_updated_ms: 0,    // Overwritten by build() if not set before
99                last_column_id: -1,    // Overwritten immediately by add_current_schema
100                current_schema_id: -1, // Overwritten immediately by add_current_schema
101                schemas: HashMap::new(),
102                partition_specs: HashMap::new(),
103                default_spec: Arc::new(
104                    // The spec id (-1) is just a proxy value and can be any negative number.
105                    // 0 would lead to wrong changes in the builder if the provided spec by the user is
106                    // also unpartitioned.
107                    // The `default_spec` value is always replaced at the end of this method by he `add_default_partition_spec`
108                    // method.
109                    PartitionSpec::unpartition_spec().with_spec_id(-1),
110                ), // Overwritten immediately by add_default_partition_spec
111                default_partition_type: StructType::new(vec![]),
112                last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
113                properties: HashMap::new(),
114                current_snapshot_id: None,
115                snapshots: HashMap::new(),
116                snapshot_log: vec![],
117                sort_orders: HashMap::new(),
118                metadata_log: vec![],
119                default_sort_order_id: -1, // Overwritten immediately by add_default_sort_order
120                refs: HashMap::default(),
121                statistics: HashMap::new(),
122                partition_statistics: HashMap::new(),
123                encryption_keys: HashMap::new(),
124            },
125            last_updated_ms: None,
126            changes: vec![],
127            last_added_schema_id: Some(schema_id),
128            last_added_spec_id: None,
129            last_added_order_id: None,
130            previous_history_entry: None,
131        };
132
133        builder
134            .set_location(location)
135            .add_current_schema(fresh_schema)?
136            .add_default_partition_spec(fresh_spec.into_unbound())?
137            .add_default_sort_order(fresh_sort_order)?
138            .set_properties(properties)
139    }
140
141    /// Creates a new table metadata builder from the given metadata to modify it.
142    /// `current_file_location` is the location where the current version
143    /// of the metadata file is stored. This is used to update the metadata log.
144    /// If `current_file_location` is `None`, the metadata log will not be updated.
145    /// This should only be used to stage-create tables.
146    #[must_use]
147    pub fn new_from_metadata(
148        previous: TableMetadata,
149        current_file_location: Option<String>,
150    ) -> Self {
151        Self {
152            previous_history_entry: current_file_location.map(|l| MetadataLog {
153                metadata_file: l,
154                timestamp_ms: previous.last_updated_ms,
155            }),
156            metadata: previous,
157            changes: Vec::default(),
158            last_added_schema_id: None,
159            last_added_spec_id: None,
160            last_added_order_id: None,
161            last_updated_ms: None,
162        }
163    }
164
165    /// Creates a new table metadata builder from the given table creation.
166    pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
167        let TableCreation {
168            name: _,
169            location,
170            schema,
171            partition_spec,
172            sort_order,
173            properties,
174        } = table_creation;
175
176        let location = location.ok_or_else(|| {
177            Error::new(
178                ErrorKind::DataInvalid,
179                "Can't create table without location",
180            )
181        })?;
182        let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
183            spec_id: None,
184            fields: vec![],
185        });
186
187        Self::new(
188            schema,
189            partition_spec,
190            sort_order.unwrap_or(SortOrder::unsorted_order()),
191            location,
192            FormatVersion::V2,
193            properties,
194        )
195    }
196
197    /// Changes uuid of table metadata.
198    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
199        if self.metadata.table_uuid != uuid {
200            self.metadata.table_uuid = uuid;
201            self.changes.push(TableUpdate::AssignUuid { uuid });
202        }
203
204        self
205    }
206
207    /// Upgrade `FormatVersion`. Downgrades are not allowed.
208    ///
209    /// # Errors
210    /// - Cannot downgrade to older format versions.
211    pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> Result<Self> {
212        if format_version < self.metadata.format_version {
213            return Err(Error::new(
214                ErrorKind::DataInvalid,
215                format!(
216                    "Cannot downgrade FormatVersion from {} to {}",
217                    self.metadata.format_version, format_version
218                ),
219            ));
220        }
221
222        if format_version != self.metadata.format_version {
223            match format_version {
224                FormatVersion::V1 => {
225                    // No changes needed for V1
226                }
227                FormatVersion::V2 => {
228                    self.metadata.format_version = format_version;
229                    self.changes
230                        .push(TableUpdate::UpgradeFormatVersion { format_version });
231                }
232            }
233        }
234
235        Ok(self)
236    }
237
238    /// Set properties. If a property already exists, it will be overwritten.
239    ///
240    /// If a reserved property is set, the corresponding action is performed and the property is not persisted.
241    /// Currently the following reserved properties are supported:
242    /// * format-version: Set the format version of the table.
243    ///
244    /// # Errors
245    /// - If properties contains a reserved property
246    pub fn set_properties(mut self, properties: HashMap<String, String>) -> Result<Self> {
247        // List of specified properties that are RESERVED and should not be persisted.
248        let reserved_properties = properties
249            .keys()
250            .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
251            .map(ToString::to_string)
252            .collect::<Vec<_>>();
253
254        if !reserved_properties.is_empty() {
255            return Err(Error::new(
256                ErrorKind::DataInvalid,
257                format!(
258                    "Table properties should not contain reserved properties, but got: [{}]",
259                    reserved_properties.join(", ")
260                ),
261            ));
262        }
263
264        if properties.is_empty() {
265            return Ok(self);
266        }
267
268        self.metadata.properties.extend(properties.clone());
269        self.changes.push(TableUpdate::SetProperties {
270            updates: properties,
271        });
272
273        Ok(self)
274    }
275
276    /// Remove properties from the table metadata.
277    /// Does nothing if the key is not present.
278    ///
279    /// # Errors
280    /// - If properties to remove contains a reserved property
281    pub fn remove_properties(mut self, properties: &[String]) -> Result<Self> {
282        // remove duplicates
283        let properties = properties.iter().cloned().collect::<HashSet<_>>();
284
285        // disallow removal of reserved properties
286        let reserved_properties = properties
287            .iter()
288            .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
289            .map(ToString::to_string)
290            .collect::<Vec<_>>();
291
292        if !reserved_properties.is_empty() {
293            return Err(Error::new(
294                ErrorKind::DataInvalid,
295                format!(
296                    "Table properties to remove contain reserved properties: [{}]",
297                    reserved_properties.join(", ")
298                ),
299            ));
300        }
301
302        for property in &properties {
303            self.metadata.properties.remove(property);
304        }
305
306        if !properties.is_empty() {
307            self.changes.push(TableUpdate::RemoveProperties {
308                removals: properties.into_iter().collect(),
309            });
310        }
311
312        Ok(self)
313    }
314
315    /// Set the location of the table, stripping any trailing slashes.
316    pub fn set_location(mut self, location: String) -> Self {
317        let location = location.trim_end_matches('/').to_string();
318        if self.metadata.location != location {
319            self.changes.push(TableUpdate::SetLocation {
320                location: location.clone(),
321            });
322            self.metadata.location = location;
323        }
324
325        self
326    }
327
328    /// Add a snapshot to the table metadata.
329    ///
330    /// # Errors
331    /// - Snapshot id already exists.
332    /// - For format version > 1: the sequence number of the snapshot is lower than the highest sequence number specified so far.
333    pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
334        if self
335            .metadata
336            .snapshots
337            .contains_key(&snapshot.snapshot_id())
338        {
339            return Err(Error::new(
340                ErrorKind::DataInvalid,
341                format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()),
342            ));
343        }
344
345        if self.metadata.format_version != FormatVersion::V1
346            && snapshot.sequence_number() <= self.metadata.last_sequence_number
347            && snapshot.parent_snapshot_id().is_some()
348        {
349            return Err(Error::new(
350                ErrorKind::DataInvalid,
351                format!(
352                    "Cannot add snapshot with sequence number {} older than last sequence number {}",
353                    snapshot.sequence_number(),
354                    self.metadata.last_sequence_number
355                ),
356            ));
357        }
358
359        if let Some(last) = self.metadata.snapshot_log.last() {
360            // commits can happen concurrently from different machines.
361            // A tolerance helps us avoid failure for small clock skew
362            if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
363                return Err(Error::new(
364                    ErrorKind::DataInvalid,
365                    format!(
366                        "Invalid snapshot timestamp {}: before last snapshot timestamp {}",
367                        snapshot.timestamp_ms(),
368                        last.timestamp_ms
369                    ),
370                ));
371            }
372        }
373
374        let max_last_updated = self
375            .last_updated_ms
376            .unwrap_or_default()
377            .max(self.metadata.last_updated_ms);
378        if snapshot.timestamp_ms() - max_last_updated < -ONE_MINUTE_MS {
379            return Err(Error::new(
380                ErrorKind::DataInvalid,
381                format!(
382                    "Invalid snapshot timestamp {}: before last updated timestamp {}",
383                    snapshot.timestamp_ms(),
384                    max_last_updated
385                ),
386            ));
387        }
388
389        // Mutation happens in next line - must be infallible from here
390        self.changes.push(TableUpdate::AddSnapshot {
391            snapshot: snapshot.clone(),
392        });
393
394        self.last_updated_ms = Some(snapshot.timestamp_ms());
395        self.metadata.last_sequence_number = snapshot.sequence_number();
396        self.metadata
397            .snapshots
398            .insert(snapshot.snapshot_id(), snapshot.into());
399
400        Ok(self)
401    }
402
403    /// Append a snapshot to the specified branch.
404    /// Retention settings from the `branch` are re-used.
405    ///
406    /// # Errors
407    /// - Any of the preconditions of `self.add_snapshot` are not met.
408    pub fn set_branch_snapshot(self, snapshot: Snapshot, branch: &str) -> Result<Self> {
409        let reference = self.metadata.refs.get(branch).cloned();
410
411        let reference = if let Some(mut reference) = reference {
412            if !reference.is_branch() {
413                return Err(Error::new(
414                    ErrorKind::DataInvalid,
415                    format!("Cannot append snapshot to non-branch reference '{branch}'",),
416                ));
417            }
418
419            reference.snapshot_id = snapshot.snapshot_id();
420            reference
421        } else {
422            SnapshotReference {
423                snapshot_id: snapshot.snapshot_id(),
424                retention: SnapshotRetention::Branch {
425                    min_snapshots_to_keep: None,
426                    max_snapshot_age_ms: None,
427                    max_ref_age_ms: None,
428                },
429            }
430        };
431
432        self.add_snapshot(snapshot)?.set_ref(branch, reference)
433    }
434
435    /// Remove snapshots by its ids from the table metadata.
436    /// Does nothing if a snapshot id is not present.
437    /// Keeps as changes only the snapshots that were actually removed.
438    pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
439        let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
440
441        self.metadata.snapshots.retain(|k, _| {
442            if snapshot_ids.contains(k) {
443                removed_snapshots.push(*k);
444                false
445            } else {
446                true
447            }
448        });
449
450        if !removed_snapshots.is_empty() {
451            self.changes.push(TableUpdate::RemoveSnapshots {
452                snapshot_ids: removed_snapshots,
453            });
454        }
455
456        // Remove refs that are no longer valid
457        self.metadata
458            .refs
459            .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id));
460
461        self
462    }
463
464    /// Set a reference to a snapshot.
465    ///
466    /// # Errors
467    /// - The snapshot id is unknown.
468    pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> Result<Self> {
469        if self
470            .metadata
471            .refs
472            .get(ref_name)
473            .is_some_and(|snap_ref| snap_ref.eq(&reference))
474        {
475            return Ok(self);
476        }
477
478        let Some(snapshot) = self.metadata.snapshots.get(&reference.snapshot_id) else {
479            return Err(Error::new(
480                ErrorKind::DataInvalid,
481                format!(
482                    "Cannot set '{ref_name}' to unknown snapshot: '{}'",
483                    reference.snapshot_id
484                ),
485            ));
486        };
487
488        // Update last_updated_ms to the exact timestamp of the snapshot if it was added in this commit
489        let is_added_snapshot = self.changes.iter().any(|update| {
490            matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if snap.snapshot_id() == snapshot.snapshot_id())
491        });
492        if is_added_snapshot {
493            self.last_updated_ms = Some(snapshot.timestamp_ms());
494        }
495
496        // Current snapshot id is set only for the main branch
497        if ref_name == MAIN_BRANCH {
498            self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
499            let timestamp_ms = if let Some(last_updated_ms) = self.last_updated_ms {
500                last_updated_ms
501            } else {
502                let last_updated_ms = chrono::Utc::now().timestamp_millis();
503                self.last_updated_ms = Some(last_updated_ms);
504                last_updated_ms
505            };
506
507            self.metadata.snapshot_log.push(SnapshotLog {
508                snapshot_id: snapshot.snapshot_id(),
509                timestamp_ms,
510            });
511        }
512
513        self.changes.push(TableUpdate::SetSnapshotRef {
514            ref_name: ref_name.to_string(),
515            reference: reference.clone(),
516        });
517        self.metadata.refs.insert(ref_name.to_string(), reference);
518
519        Ok(self)
520    }
521
522    /// Remove a reference
523    ///
524    /// If `ref_name='main'` the current snapshot id is set to -1.
525    pub fn remove_ref(mut self, ref_name: &str) -> Self {
526        if ref_name == MAIN_BRANCH {
527            self.metadata.current_snapshot_id = None;
528            self.metadata.snapshot_log.clear();
529        }
530
531        if self.metadata.refs.remove(ref_name).is_some() || ref_name == MAIN_BRANCH {
532            self.changes.push(TableUpdate::RemoveSnapshotRef {
533                ref_name: ref_name.to_string(),
534            });
535        }
536
537        self
538    }
539
540    /// Set statistics for a snapshot
541    pub fn set_statistics(mut self, statistics: StatisticsFile) -> Self {
542        self.metadata
543            .statistics
544            .insert(statistics.snapshot_id, statistics.clone());
545        self.changes.push(TableUpdate::SetStatistics {
546            statistics: statistics.clone(),
547        });
548        self
549    }
550
551    /// Remove statistics for a snapshot
552    pub fn remove_statistics(mut self, snapshot_id: i64) -> Self {
553        let previous = self.metadata.statistics.remove(&snapshot_id);
554        if previous.is_some() {
555            self.changes
556                .push(TableUpdate::RemoveStatistics { snapshot_id });
557        }
558        self
559    }
560
561    /// Set partition statistics
562    pub fn set_partition_statistics(
563        mut self,
564        partition_statistics_file: PartitionStatisticsFile,
565    ) -> Self {
566        self.metadata.partition_statistics.insert(
567            partition_statistics_file.snapshot_id,
568            partition_statistics_file.clone(),
569        );
570        self.changes.push(TableUpdate::SetPartitionStatistics {
571            partition_statistics: partition_statistics_file,
572        });
573        self
574    }
575
576    /// Remove partition statistics
577    pub fn remove_partition_statistics(mut self, snapshot_id: i64) -> Self {
578        let previous = self.metadata.partition_statistics.remove(&snapshot_id);
579        if previous.is_some() {
580            self.changes
581                .push(TableUpdate::RemovePartitionStatistics { snapshot_id });
582        }
583        self
584    }
585
586    /// Add a schema to the table metadata.
587    ///
588    /// The provided `schema.schema_id` may not be used.
589    ///
590    /// Important: Use this method with caution. The builder does not check
591    /// if the added schema is compatible with the current schema.
592    pub fn add_schema(mut self, schema: Schema) -> Result<Self> {
593        // Validate that new schema fields don't conflict with existing partition field names
594        self.validate_schema_field_names(&schema)?;
595
596        let new_schema_id = self.reuse_or_create_new_schema_id(&schema);
597        let schema_found = self.metadata.schemas.contains_key(&new_schema_id);
598
599        if schema_found {
600            if self.last_added_schema_id != Some(new_schema_id) {
601                self.changes.push(TableUpdate::AddSchema {
602                    schema: schema.clone(),
603                });
604                self.last_added_schema_id = Some(new_schema_id);
605            }
606
607            return Ok(self);
608        }
609
610        // New schemas might contain only old columns. In this case last_column_id should not be
611        // reduced.
612        self.metadata.last_column_id =
613            std::cmp::max(self.metadata.last_column_id, schema.highest_field_id());
614
615        // Set schema-id
616        let schema = match new_schema_id == schema.schema_id() {
617            true => schema,
618            false => schema.with_schema_id(new_schema_id),
619        };
620
621        self.metadata
622            .schemas
623            .insert(new_schema_id, schema.clone().into());
624
625        self.changes.push(TableUpdate::AddSchema { schema });
626
627        self.last_added_schema_id = Some(new_schema_id);
628
629        Ok(self)
630    }
631
632    /// Set the current schema id.
633    ///
634    /// If `schema_id` is -1, the last added schema is set as the current schema.
635    ///
636    /// Errors:
637    /// - provided `schema_id` is -1 but no schema has been added via `add_schema`.
638    /// - No schema with the provided `schema_id` exists.
639    pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> {
640        if schema_id == Self::LAST_ADDED {
641            schema_id = self.last_added_schema_id.ok_or_else(|| {
642                Error::new(
643                    ErrorKind::DataInvalid,
644                    "Cannot set current schema to last added schema: no schema has been added.",
645                )
646            })?;
647        };
648        let schema_id = schema_id; // Make immutable
649
650        if schema_id == self.metadata.current_schema_id {
651            return Ok(self);
652        }
653
654        let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| {
655            Error::new(
656                ErrorKind::DataInvalid,
657                format!(
658                    "Cannot set current schema to unknown schema with id: '{}'",
659                    schema_id
660                ),
661            )
662        })?;
663
664        // Old partition specs and sort-orders should be preserved even if they are not compatible with the new schema,
665        // so that older metadata can still be interpreted.
666        // Default partition spec and sort order are checked in the build() method
667        // which allows other default partition specs and sort orders to be set before the build.
668
669        self.metadata.current_schema_id = schema_id;
670
671        if self.last_added_schema_id == Some(schema_id) {
672            self.changes.push(TableUpdate::SetCurrentSchema {
673                schema_id: Self::LAST_ADDED,
674            });
675        } else {
676            self.changes
677                .push(TableUpdate::SetCurrentSchema { schema_id });
678        }
679
680        Ok(self)
681    }
682
683    /// Add a schema and set it as the current schema.
684    pub fn add_current_schema(self, schema: Schema) -> Result<Self> {
685        self.add_schema(schema)?
686            .set_current_schema(Self::LAST_ADDED)
687    }
688
689    /// Validate schema field names against partition field names across all historical schemas.
690    ///
691    /// Due to Iceberg's multi-version property, this check ignores existing schema fields
692    /// that match partition names (schema evolution allows re-adding previously removed fields).
693    /// Only NEW field names that conflict with partition names are rejected.
694    ///
695    /// # Errors
696    /// - Schema field name conflicts with partition field name but doesn't exist in any historical schema.
697    fn validate_schema_field_names(&self, schema: &Schema) -> Result<()> {
698        if self.metadata.schemas.is_empty() {
699            return Ok(());
700        }
701
702        for field_name in schema.field_id_to_name_map().values() {
703            let has_partition_conflict = self.metadata.partition_name_exists(field_name);
704            let is_new_field = !self.metadata.name_exists_in_any_schema(field_name);
705
706            if has_partition_conflict && is_new_field {
707                return Err(Error::new(
708                    ErrorKind::DataInvalid,
709                    format!(
710                        "Cannot add schema field '{}' because it conflicts with existing partition field name. \
711                         Schema evolution cannot introduce field names that match existing partition field names.",
712                        field_name
713                    ),
714                ));
715            }
716        }
717
718        Ok(())
719    }
720
721    /// Validate partition field names against schema field names across all historical schemas.
722    ///
723    /// Due to Iceberg's multi-version property, partition fields can share names with schema fields
724    /// if they meet specific requirements (identity transform + matching source field ID).
725    /// This validation enforces those rules across all historical schema versions.
726    ///
727    /// # Errors
728    /// - Partition field name conflicts with schema field name but doesn't use identity transform.
729    /// - Partition field uses identity transform but references wrong source field ID.
730    fn validate_partition_field_names(&self, unbound_spec: &UnboundPartitionSpec) -> Result<()> {
731        if self.metadata.schemas.is_empty() {
732            return Ok(());
733        }
734
735        let current_schema = self.get_current_schema()?;
736        for partition_field in unbound_spec.fields() {
737            let exists_in_any_schema = self
738                .metadata
739                .name_exists_in_any_schema(&partition_field.name);
740
741            // Skip if partition field name doesn't conflict with any schema field
742            if !exists_in_any_schema {
743                continue;
744            }
745
746            // If name exists in schemas, validate against current schema rules
747            if let Some(schema_field) = current_schema.field_by_name(&partition_field.name) {
748                let is_identity_transform =
749                    partition_field.transform == crate::spec::Transform::Identity;
750                let has_matching_source_id = schema_field.id == partition_field.source_id;
751
752                if !is_identity_transform {
753                    return Err(Error::new(
754                        ErrorKind::DataInvalid,
755                        format!(
756                            "Cannot create partition with name '{}' that conflicts with schema field and is not an identity transform.",
757                            partition_field.name
758                        ),
759                    ));
760                }
761
762                if !has_matching_source_id {
763                    return Err(Error::new(
764                        ErrorKind::DataInvalid,
765                        format!(
766                            "Cannot create identity partition sourced from different field in schema. \
767                             Field name '{}' has id `{}` in schema but partition source id is `{}`",
768                            partition_field.name, schema_field.id, partition_field.source_id
769                        ),
770                    ));
771                }
772            }
773        }
774
775        Ok(())
776    }
777
778    /// Add a partition spec to the table metadata.
779    ///
780    /// The spec is bound eagerly to the current schema.
781    /// If a schema is added in the same set of changes, the schema should be added first.
782    ///
783    /// Even if `unbound_spec.spec_id` is provided as `Some`, it may not be used.
784    ///
785    /// # Errors
786    /// - The partition spec cannot be bound to the current schema.
787    /// - The partition spec has non-sequential field ids and the table format version is 1.
788    pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
789        let schema = self.get_current_schema()?.clone();
790
791        // Check if partition field names conflict with schema field names across all schemas
792        self.validate_partition_field_names(&unbound_spec)?;
793
794        let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)?
795            .with_last_assigned_field_id(self.metadata.last_partition_id)
796            .build()?;
797
798        let new_spec_id = self.reuse_or_create_new_spec_id(&spec);
799        let spec_found = self.metadata.partition_specs.contains_key(&new_spec_id);
800        let spec = spec.with_spec_id(new_spec_id);
801        let unbound_spec = unbound_spec.with_spec_id(new_spec_id);
802
803        if spec_found {
804            if self.last_added_spec_id != Some(new_spec_id) {
805                self.changes
806                    .push(TableUpdate::AddSpec { spec: unbound_spec });
807                self.last_added_spec_id = Some(new_spec_id);
808            }
809
810            return Ok(self);
811        }
812
813        if self.metadata.format_version <= FormatVersion::V1 && !spec.has_sequential_ids() {
814            return Err(Error::new(
815                ErrorKind::DataInvalid,
816                "Cannot add partition spec with non-sequential field ids to format version 1 table",
817            ));
818        }
819
820        let highest_field_id = spec
821            .highest_field_id()
822            .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID);
823        self.metadata
824            .partition_specs
825            .insert(new_spec_id, Arc::new(spec));
826        self.changes
827            .push(TableUpdate::AddSpec { spec: unbound_spec });
828
829        self.last_added_spec_id = Some(new_spec_id);
830        self.metadata.last_partition_id =
831            std::cmp::max(self.metadata.last_partition_id, highest_field_id);
832
833        Ok(self)
834    }
835
836    /// Set the default partition spec.
837    ///
838    /// # Errors
839    /// - spec_id is -1 but no spec has been added via `add_partition_spec`.
840    /// - No partition spec with the provided `spec_id` exists.
841    pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> Result<Self> {
842        if spec_id == Self::LAST_ADDED {
843            spec_id = self.last_added_spec_id.ok_or_else(|| {
844                Error::new(
845                    ErrorKind::DataInvalid,
846                    "Cannot set default partition spec to last added spec: no spec has been added.",
847                )
848            })?;
849        }
850
851        if self.metadata.default_spec.spec_id() == spec_id {
852            return Ok(self);
853        }
854
855        if !self.metadata.partition_specs.contains_key(&spec_id) {
856            return Err(Error::new(
857                ErrorKind::DataInvalid,
858                format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",),
859            ));
860        }
861
862        let schemaless_spec = self
863            .metadata
864            .partition_specs
865            .get(&spec_id)
866            .ok_or_else(|| {
867                Error::new(
868                    ErrorKind::DataInvalid,
869                    format!(
870                        "Cannot set default partition spec to unknown spec with id: '{spec_id}'",
871                    ),
872                )
873            })?
874            .clone();
875        let spec = Arc::unwrap_or_clone(schemaless_spec);
876        let spec_type = spec.partition_type(self.get_current_schema()?)?;
877        self.metadata.default_spec = Arc::new(spec);
878        self.metadata.default_partition_type = spec_type;
879
880        if self.last_added_spec_id == Some(spec_id) {
881            self.changes.push(TableUpdate::SetDefaultSpec {
882                spec_id: Self::LAST_ADDED,
883            });
884        } else {
885            self.changes.push(TableUpdate::SetDefaultSpec { spec_id });
886        }
887
888        Ok(self)
889    }
890
891    /// Add a partition spec and set it as the default
892    pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
893        self.add_partition_spec(unbound_spec)?
894            .set_default_partition_spec(Self::LAST_ADDED)
895    }
896
897    /// Remove partition specs by their ids from the table metadata.
898    /// Does nothing if a spec id is not present. Active partition specs
899    /// should not be removed.
900    ///
901    /// # Errors
902    /// - Cannot remove the default partition spec.
903    pub fn remove_partition_specs(mut self, spec_ids: &[i32]) -> Result<Self> {
904        if spec_ids.contains(&self.metadata.default_spec.spec_id()) {
905            return Err(Error::new(
906                ErrorKind::DataInvalid,
907                "Cannot remove default partition spec",
908            ));
909        }
910
911        let mut removed_specs = Vec::with_capacity(spec_ids.len());
912        spec_ids.iter().for_each(|id| {
913            if self.metadata.partition_specs.remove(id).is_some() {
914                removed_specs.push(*id);
915            }
916        });
917
918        if !removed_specs.is_empty() {
919            self.changes.push(TableUpdate::RemovePartitionSpecs {
920                spec_ids: removed_specs,
921            });
922        }
923
924        Ok(self)
925    }
926
927    /// Add a sort order to the table metadata.
928    ///
929    /// The spec is bound eagerly to the current schema and must be valid for it.
930    /// If a schema is added in the same set of changes, the schema should be added first.
931    ///
932    /// Even if `sort_order.order_id` is provided, it may not be used.
933    ///
934    /// # Errors
935    /// - Sort order id to add already exists.
936    /// - Sort order is incompatible with the current schema.
937    pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result<Self> {
938        let new_order_id = self.reuse_or_create_new_sort_id(&sort_order);
939        let sort_order_found = self.metadata.sort_orders.contains_key(&new_order_id);
940
941        if sort_order_found {
942            if self.last_added_order_id != Some(new_order_id) {
943                self.changes.push(TableUpdate::AddSortOrder {
944                    sort_order: sort_order.clone().with_order_id(new_order_id),
945                });
946                self.last_added_order_id = Some(new_order_id);
947            }
948
949            return Ok(self);
950        }
951
952        let schema = self.get_current_schema()?.clone().as_ref().clone();
953        let sort_order = SortOrder::builder()
954            .with_order_id(new_order_id)
955            .with_fields(sort_order.fields)
956            .build(&schema)
957            .map_err(|e| {
958                Error::new(
959                    ErrorKind::DataInvalid,
960                    format!("Sort order to add is incompatible with current schema: {e}"),
961                )
962                .with_source(e)
963            })?;
964
965        self.last_added_order_id = Some(new_order_id);
966        self.metadata
967            .sort_orders
968            .insert(new_order_id, sort_order.clone().into());
969        self.changes.push(TableUpdate::AddSortOrder { sort_order });
970
971        Ok(self)
972    }
973
974    /// Set the default sort order. If `sort_order_id` is -1, the last added sort order is set as default.
975    ///
976    /// # Errors
977    /// - sort_order_id is -1 but no sort order has been added via `add_sort_order`.
978    /// - No sort order with the provided `sort_order_id` exists.
979    pub fn set_default_sort_order(mut self, mut sort_order_id: i64) -> Result<Self> {
980        if sort_order_id == Self::LAST_ADDED as i64 {
981            sort_order_id = self.last_added_order_id.ok_or_else(|| {
982                Error::new(
983                    ErrorKind::DataInvalid,
984                    "Cannot set default sort order to last added order: no order has been added.",
985                )
986            })?;
987        }
988
989        if self.metadata.default_sort_order_id == sort_order_id {
990            return Ok(self);
991        }
992
993        if !self.metadata.sort_orders.contains_key(&sort_order_id) {
994            return Err(Error::new(
995                ErrorKind::DataInvalid,
996                format!(
997                    "Cannot set default sort order to unknown order with id: '{sort_order_id}'"
998                ),
999            ));
1000        }
1001
1002        self.metadata.default_sort_order_id = sort_order_id;
1003
1004        if self.last_added_order_id == Some(sort_order_id) {
1005            self.changes.push(TableUpdate::SetDefaultSortOrder {
1006                sort_order_id: Self::LAST_ADDED as i64,
1007            });
1008        } else {
1009            self.changes
1010                .push(TableUpdate::SetDefaultSortOrder { sort_order_id });
1011        }
1012
1013        Ok(self)
1014    }
1015
1016    /// Add a sort order and set it as the default
1017    fn add_default_sort_order(self, sort_order: SortOrder) -> Result<Self> {
1018        self.add_sort_order(sort_order)?
1019            .set_default_sort_order(Self::LAST_ADDED as i64)
1020    }
1021
1022    /// Build the table metadata.
1023    pub fn build(mut self) -> Result<TableMetadataBuildResult> {
1024        self.metadata.last_updated_ms = self
1025            .last_updated_ms
1026            .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1027
1028        // Check compatibility of the current schema to the default partition spec and sort order.
1029        // We use the `get_xxx` methods from the builder to avoid using the panicking
1030        // `TableMetadata.default_partition_spec` etc. methods.
1031        let schema = self.get_current_schema()?.clone();
1032        let sort_order = Arc::unwrap_or_clone(self.get_default_sort_order()?);
1033
1034        self.metadata.default_spec = Arc::new(
1035            Arc::unwrap_or_clone(self.metadata.default_spec)
1036                .into_unbound()
1037                .bind(schema.clone())?,
1038        );
1039        self.metadata.default_partition_type =
1040            self.metadata.default_spec.partition_type(&schema)?;
1041        SortOrder::builder()
1042            .with_fields(sort_order.fields)
1043            .build(&schema)?;
1044
1045        self.update_snapshot_log()?;
1046        self.metadata.try_normalize()?;
1047
1048        if let Some(hist_entry) = self.previous_history_entry.take() {
1049            self.metadata.metadata_log.push(hist_entry);
1050        }
1051        let expired_metadata_logs = self.expire_metadata_log();
1052
1053        Ok(TableMetadataBuildResult {
1054            metadata: self.metadata,
1055            changes: self.changes,
1056            expired_metadata_logs,
1057        })
1058    }
1059
1060    fn expire_metadata_log(&mut self) -> Vec<MetadataLog> {
1061        let max_size = self
1062            .metadata
1063            .properties
1064            .get(PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
1065            .and_then(|v| v.parse::<usize>().ok())
1066            .unwrap_or(PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
1067            .max(1);
1068
1069        if self.metadata.metadata_log.len() > max_size {
1070            self.metadata
1071                .metadata_log
1072                .drain(0..self.metadata.metadata_log.len() - max_size)
1073                .collect()
1074        } else {
1075            Vec::new()
1076        }
1077    }
1078
1079    fn update_snapshot_log(&mut self) -> Result<()> {
1080        let intermediate_snapshots = self.get_intermediate_snapshots();
1081        let has_removed_snapshots = self
1082            .changes
1083            .iter()
1084            .any(|update| matches!(update, TableUpdate::RemoveSnapshots { .. }));
1085
1086        if intermediate_snapshots.is_empty() && !has_removed_snapshots {
1087            return Ok(());
1088        }
1089
1090        let mut new_snapshot_log = Vec::new();
1091        for log_entry in &self.metadata.snapshot_log {
1092            let snapshot_id = log_entry.snapshot_id;
1093            if self.metadata.snapshots.contains_key(&snapshot_id) {
1094                if !intermediate_snapshots.contains(&snapshot_id) {
1095                    new_snapshot_log.push(log_entry.clone());
1096                }
1097            } else if has_removed_snapshots {
1098                // any invalid entry causes the history before it to be removed. otherwise, there could be
1099                // history gaps that cause time-travel queries to produce incorrect results. for example,
1100                // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be
1101                // [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2
1102                // and t3 when in fact s2 was the current snapshot.
1103                new_snapshot_log.clear();
1104            }
1105        }
1106
1107        if let Some(current_snapshot_id) = self.metadata.current_snapshot_id {
1108            let last_id = new_snapshot_log.last().map(|entry| entry.snapshot_id);
1109            if last_id != Some(current_snapshot_id) {
1110                return Err(Error::new(
1111                    ErrorKind::DataInvalid,
1112                    "Cannot set invalid snapshot log: latest entry is not the current snapshot",
1113                ));
1114            }
1115        };
1116
1117        self.metadata.snapshot_log = new_snapshot_log;
1118        Ok(())
1119    }
1120
1121    /// Finds intermediate snapshots that have not been committed as the current snapshot.
1122    ///
1123    /// Transactions can create snapshots that are never the current snapshot because several
1124    /// changes are combined by the transaction into one table metadata update. when each
1125    /// intermediate snapshot is added to table metadata, it is added to the snapshot log, assuming
1126    /// that it will be the current snapshot. when there are multiple snapshot updates, the log must
1127    /// be corrected by suppressing the intermediate snapshot entries.
1128    ///     
1129    /// A snapshot is an intermediate snapshot if it was added but is not the current snapshot.
1130    fn get_intermediate_snapshots(&self) -> HashSet<i64> {
1131        let added_snapshot_ids = self
1132            .changes
1133            .iter()
1134            .filter_map(|update| match update {
1135                TableUpdate::AddSnapshot { snapshot } => Some(snapshot.snapshot_id()),
1136                _ => None,
1137            })
1138            .collect::<HashSet<_>>();
1139
1140        self.changes
1141            .iter()
1142            .filter_map(|update| match update {
1143                TableUpdate::SetSnapshotRef {
1144                    ref_name,
1145                    reference,
1146                } => {
1147                    if added_snapshot_ids.contains(&reference.snapshot_id)
1148                        && ref_name == MAIN_BRANCH
1149                        && reference.snapshot_id
1150                            != self
1151                                .metadata
1152                                .current_snapshot_id
1153                                .unwrap_or(i64::from(Self::LAST_ADDED))
1154                    {
1155                        Some(reference.snapshot_id)
1156                    } else {
1157                        None
1158                    }
1159                }
1160                _ => None,
1161            })
1162            .collect()
1163    }
1164
1165    fn reassign_ids(
1166        schema: Schema,
1167        spec: UnboundPartitionSpec,
1168        sort_order: SortOrder,
1169    ) -> Result<(Schema, PartitionSpec, SortOrder)> {
1170        // Re-assign field ids and schema ids for a new table.
1171        let previous_id_to_name = schema.field_id_to_name_map().clone();
1172        let fresh_schema = schema
1173            .into_builder()
1174            .with_schema_id(DEFAULT_SCHEMA_ID)
1175            .with_reassigned_field_ids(FIRST_FIELD_ID)
1176            .build()?;
1177
1178        // Re-build partition spec with new ids
1179        let mut fresh_spec = PartitionSpecBuilder::new(fresh_schema.clone());
1180        for field in spec.fields() {
1181            let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1182                Error::new(
1183                    ErrorKind::DataInvalid,
1184                    format!(
1185                        "Cannot find source column with id {} for partition column {} in schema.",
1186                        field.source_id, field.name
1187                    ),
1188                )
1189            })?;
1190            fresh_spec =
1191                fresh_spec.add_partition_field(source_field_name, &field.name, field.transform)?;
1192        }
1193        let fresh_spec = fresh_spec.build()?;
1194
1195        // Re-build sort order with new ids
1196        let mut fresh_order = SortOrder::builder();
1197        for mut field in sort_order.fields {
1198            let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1199                Error::new(
1200                    ErrorKind::DataInvalid,
1201                    format!(
1202                        "Cannot find source column with id {} for sort column in schema.",
1203                        field.source_id
1204                    ),
1205                )
1206            })?;
1207            let new_field_id = fresh_schema
1208                       .field_by_name(source_field_name)
1209                       .ok_or_else(|| {
1210                           Error::new(
1211                               ErrorKind::Unexpected,
1212                               format!(
1213                                   "Cannot find source column with name {} for sort column in re-assigned schema.",
1214                                   source_field_name
1215                               ),
1216                           )
1217                       })?.id;
1218            field.source_id = new_field_id;
1219            fresh_order.with_sort_field(field);
1220        }
1221        let fresh_sort_order = fresh_order.build(&fresh_schema)?;
1222
1223        Ok((fresh_schema, fresh_spec, fresh_sort_order))
1224    }
1225
1226    fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 {
1227        self.metadata
1228            .schemas
1229            .iter()
1230            .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
1231            .unwrap_or_else(|| self.get_highest_schema_id() + 1)
1232    }
1233
1234    fn get_highest_schema_id(&self) -> i32 {
1235        *self
1236            .metadata
1237            .schemas
1238            .keys()
1239            .max()
1240            .unwrap_or(&self.metadata.current_schema_id)
1241    }
1242
1243    fn get_current_schema(&self) -> Result<&SchemaRef> {
1244        self.metadata
1245            .schemas
1246            .get(&self.metadata.current_schema_id)
1247            .ok_or_else(|| {
1248                Error::new(
1249                    ErrorKind::DataInvalid,
1250                    format!(
1251                        "Current schema with id '{}' not found in table metadata.",
1252                        self.metadata.current_schema_id
1253                    ),
1254                )
1255            })
1256    }
1257
1258    fn get_default_sort_order(&self) -> Result<SortOrderRef> {
1259        self.metadata
1260            .sort_orders
1261            .get(&self.metadata.default_sort_order_id)
1262            .cloned()
1263            .ok_or_else(|| {
1264                Error::new(
1265                    ErrorKind::DataInvalid,
1266                    format!(
1267                        "Default sort order with id '{}' not found in table metadata.",
1268                        self.metadata.default_sort_order_id
1269                    ),
1270                )
1271            })
1272    }
1273
1274    /// If a compatible spec already exists, use the same ID. Otherwise, use 1 more than the highest ID.
1275    fn reuse_or_create_new_spec_id(&self, new_spec: &PartitionSpec) -> i32 {
1276        self.metadata
1277            .partition_specs
1278            .iter()
1279            .find_map(|(id, old_spec)| new_spec.is_compatible_with(old_spec).then_some(*id))
1280            .unwrap_or_else(|| {
1281                self.get_highest_spec_id()
1282                    .map(|id| id + 1)
1283                    .unwrap_or(DEFAULT_PARTITION_SPEC_ID)
1284            })
1285    }
1286
1287    fn get_highest_spec_id(&self) -> Option<i32> {
1288        self.metadata.partition_specs.keys().max().copied()
1289    }
1290
1291    /// If a compatible sort-order already exists, use the same ID. Otherwise, use 1 more than the highest ID.
1292    fn reuse_or_create_new_sort_id(&self, new_sort_order: &SortOrder) -> i64 {
1293        if new_sort_order.is_unsorted() {
1294            return SortOrder::unsorted_order().order_id;
1295        }
1296
1297        self.metadata
1298            .sort_orders
1299            .iter()
1300            .find_map(|(id, sort_order)| {
1301                sort_order.fields.eq(&new_sort_order.fields).then_some(*id)
1302            })
1303            .unwrap_or_else(|| {
1304                self.highest_sort_order_id()
1305                    .unwrap_or(SortOrder::unsorted_order().order_id)
1306                    + 1
1307            })
1308    }
1309
1310    fn highest_sort_order_id(&self) -> Option<i64> {
1311        self.metadata.sort_orders.keys().max().copied()
1312    }
1313
1314    /// Remove schemas by their ids from the table metadata.
1315    /// Does nothing if a schema id is not present. Active schemas should not be removed.
1316    pub fn remove_schemas(mut self, schema_id_to_remove: &[i32]) -> Result<Self> {
1317        if schema_id_to_remove.contains(&self.metadata.current_schema_id) {
1318            return Err(Error::new(
1319                ErrorKind::DataInvalid,
1320                "Cannot remove current schema",
1321            ));
1322        }
1323
1324        if schema_id_to_remove.is_empty() {
1325            return Ok(self);
1326        }
1327
1328        let mut removed_schemas = Vec::with_capacity(schema_id_to_remove.len());
1329        self.metadata.schemas.retain(|id, _schema| {
1330            if schema_id_to_remove.contains(id) {
1331                removed_schemas.push(*id);
1332                false
1333            } else {
1334                true
1335            }
1336        });
1337
1338        self.changes.push(TableUpdate::RemoveSchemas {
1339            schema_ids: removed_schemas,
1340        });
1341
1342        Ok(self)
1343    }
1344}
1345
1346impl From<TableMetadataBuildResult> for TableMetadata {
1347    fn from(result: TableMetadataBuildResult) -> Self {
1348        result.metadata
1349    }
1350}
1351
1352#[cfg(test)]
1353mod tests {
1354    use std::fs::File;
1355    use std::io::BufReader;
1356    use std::thread::sleep;
1357
1358    use super::*;
1359    use crate::TableIdent;
1360    use crate::io::FileIOBuilder;
1361    use crate::spec::{
1362        BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema,
1363        SnapshotRetention, SortDirection, SortField, StructType, Summary, Transform, Type,
1364        UnboundPartitionField,
1365    };
1366    use crate::table::Table;
1367
1368    const TEST_LOCATION: &str = "s3://bucket/test/location";
1369    const LAST_ASSIGNED_COLUMN_ID: i32 = 3;
1370
1371    fn schema() -> Schema {
1372        Schema::builder()
1373            .with_fields(vec![
1374                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1375                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1376                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1377            ])
1378            .build()
1379            .unwrap()
1380    }
1381
1382    fn sort_order() -> SortOrder {
1383        let schema = schema();
1384        SortOrder::builder()
1385            .with_order_id(1)
1386            .with_sort_field(SortField {
1387                source_id: 3,
1388                transform: Transform::Bucket(4),
1389                direction: SortDirection::Descending,
1390                null_order: NullOrder::First,
1391            })
1392            .build(&schema)
1393            .unwrap()
1394    }
1395
1396    fn partition_spec() -> UnboundPartitionSpec {
1397        UnboundPartitionSpec::builder()
1398            .with_spec_id(0)
1399            .add_partition_field(2, "y", Transform::Identity)
1400            .unwrap()
1401            .build()
1402    }
1403
1404    fn builder_without_changes(format_version: FormatVersion) -> TableMetadataBuilder {
1405        TableMetadataBuilder::new(
1406            schema(),
1407            partition_spec(),
1408            sort_order(),
1409            TEST_LOCATION.to_string(),
1410            format_version,
1411            HashMap::new(),
1412        )
1413        .unwrap()
1414        .build()
1415        .unwrap()
1416        .metadata
1417        .into_builder(Some(
1418            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1419        ))
1420    }
1421
1422    #[test]
1423    fn test_minimal_build() {
1424        let metadata = TableMetadataBuilder::new(
1425            schema(),
1426            partition_spec(),
1427            sort_order(),
1428            TEST_LOCATION.to_string(),
1429            FormatVersion::V1,
1430            HashMap::new(),
1431        )
1432        .unwrap()
1433        .build()
1434        .unwrap()
1435        .metadata;
1436
1437        assert_eq!(metadata.format_version, FormatVersion::V1);
1438        assert_eq!(metadata.location, TEST_LOCATION);
1439        assert_eq!(metadata.current_schema_id, 0);
1440        assert_eq!(metadata.default_spec.spec_id(), 0);
1441        assert_eq!(metadata.default_sort_order_id, 1);
1442        assert_eq!(metadata.last_partition_id, 1000);
1443        assert_eq!(metadata.last_column_id, 3);
1444        assert_eq!(metadata.snapshots.len(), 0);
1445        assert_eq!(metadata.current_snapshot_id, None);
1446        assert_eq!(metadata.refs.len(), 0);
1447        assert_eq!(metadata.properties.len(), 0);
1448        assert_eq!(metadata.metadata_log.len(), 0);
1449        assert_eq!(metadata.last_sequence_number, 0);
1450        assert_eq!(metadata.last_column_id, LAST_ASSIGNED_COLUMN_ID);
1451
1452        // Test can serialize v1
1453        let _ = serde_json::to_string(&metadata).unwrap();
1454
1455        // Test can serialize v2
1456        let metadata = metadata
1457            .into_builder(Some(
1458                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1459            ))
1460            .upgrade_format_version(FormatVersion::V2)
1461            .unwrap()
1462            .build()
1463            .unwrap()
1464            .metadata;
1465
1466        assert_eq!(metadata.format_version, FormatVersion::V2);
1467        let _ = serde_json::to_string(&metadata).unwrap();
1468    }
1469
1470    #[test]
1471    fn test_build_unpartitioned_unsorted() {
1472        let schema = Schema::builder().build().unwrap();
1473        let metadata = TableMetadataBuilder::new(
1474            schema.clone(),
1475            PartitionSpec::unpartition_spec(),
1476            SortOrder::unsorted_order(),
1477            TEST_LOCATION.to_string(),
1478            FormatVersion::V2,
1479            HashMap::new(),
1480        )
1481        .unwrap()
1482        .build()
1483        .unwrap()
1484        .metadata;
1485
1486        assert_eq!(metadata.format_version, FormatVersion::V2);
1487        assert_eq!(metadata.location, TEST_LOCATION);
1488        assert_eq!(metadata.current_schema_id, 0);
1489        assert_eq!(metadata.default_spec.spec_id(), 0);
1490        assert_eq!(metadata.default_sort_order_id, 0);
1491        assert_eq!(metadata.last_partition_id, UNPARTITIONED_LAST_ASSIGNED_ID);
1492        assert_eq!(metadata.last_column_id, 0);
1493        assert_eq!(metadata.snapshots.len(), 0);
1494        assert_eq!(metadata.current_snapshot_id, None);
1495        assert_eq!(metadata.refs.len(), 0);
1496        assert_eq!(metadata.properties.len(), 0);
1497        assert_eq!(metadata.metadata_log.len(), 0);
1498        assert_eq!(metadata.last_sequence_number, 0);
1499    }
1500
1501    #[test]
1502    fn test_reassigns_ids() {
1503        let schema = Schema::builder()
1504            .with_schema_id(10)
1505            .with_fields(vec![
1506                NestedField::required(11, "a", Type::Primitive(PrimitiveType::Long)).into(),
1507                NestedField::required(12, "b", Type::Primitive(PrimitiveType::Long)).into(),
1508                NestedField::required(
1509                    13,
1510                    "struct",
1511                    Type::Struct(StructType::new(vec![
1512                        NestedField::required(14, "nested", Type::Primitive(PrimitiveType::Long))
1513                            .into(),
1514                    ])),
1515                )
1516                .into(),
1517                NestedField::required(15, "c", Type::Primitive(PrimitiveType::Long)).into(),
1518            ])
1519            .build()
1520            .unwrap();
1521        let spec = PartitionSpec::builder(schema.clone())
1522            .with_spec_id(20)
1523            .add_partition_field("a", "a", Transform::Identity)
1524            .unwrap()
1525            .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1526            .unwrap()
1527            .build()
1528            .unwrap();
1529        let sort_order = SortOrder::builder()
1530            .with_fields(vec![SortField {
1531                source_id: 11,
1532                transform: Transform::Identity,
1533                direction: SortDirection::Ascending,
1534                null_order: NullOrder::First,
1535            }])
1536            .with_order_id(10)
1537            .build(&schema)
1538            .unwrap();
1539
1540        let (fresh_schema, fresh_spec, fresh_sort_order) =
1541            TableMetadataBuilder::reassign_ids(schema, spec.into_unbound(), sort_order).unwrap();
1542
1543        let expected_schema = Schema::builder()
1544            .with_fields(vec![
1545                NestedField::required(1, "a", Type::Primitive(PrimitiveType::Long)).into(),
1546                NestedField::required(2, "b", Type::Primitive(PrimitiveType::Long)).into(),
1547                NestedField::required(
1548                    3,
1549                    "struct",
1550                    Type::Struct(StructType::new(vec![
1551                        NestedField::required(5, "nested", Type::Primitive(PrimitiveType::Long))
1552                            .into(),
1553                    ])),
1554                )
1555                .into(),
1556                NestedField::required(4, "c", Type::Primitive(PrimitiveType::Long)).into(),
1557            ])
1558            .build()
1559            .unwrap();
1560
1561        let expected_spec = PartitionSpec::builder(expected_schema.clone())
1562            .with_spec_id(0)
1563            .add_partition_field("a", "a", Transform::Identity)
1564            .unwrap()
1565            .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1566            .unwrap()
1567            .build()
1568            .unwrap();
1569
1570        let expected_sort_order = SortOrder::builder()
1571            .with_fields(vec![SortField {
1572                source_id: 1,
1573                transform: Transform::Identity,
1574                direction: SortDirection::Ascending,
1575                null_order: NullOrder::First,
1576            }])
1577            .with_order_id(1)
1578            .build(&expected_schema)
1579            .unwrap();
1580
1581        assert_eq!(fresh_schema, expected_schema);
1582        assert_eq!(fresh_spec, expected_spec);
1583        assert_eq!(fresh_sort_order, expected_sort_order);
1584    }
1585
1586    #[test]
1587    fn test_ids_are_reassigned_for_new_metadata() {
1588        let schema = schema().into_builder().with_schema_id(10).build().unwrap();
1589
1590        let metadata = TableMetadataBuilder::new(
1591            schema,
1592            partition_spec(),
1593            sort_order(),
1594            TEST_LOCATION.to_string(),
1595            FormatVersion::V1,
1596            HashMap::new(),
1597        )
1598        .unwrap()
1599        .build()
1600        .unwrap()
1601        .metadata;
1602
1603        assert_eq!(metadata.current_schema_id, 0);
1604        assert_eq!(metadata.current_schema().schema_id(), 0);
1605    }
1606
1607    #[test]
1608    fn test_new_metadata_changes() {
1609        let changes = TableMetadataBuilder::new(
1610            schema(),
1611            partition_spec(),
1612            sort_order(),
1613            TEST_LOCATION.to_string(),
1614            FormatVersion::V1,
1615            HashMap::from_iter(vec![("property 1".to_string(), "value 1".to_string())]),
1616        )
1617        .unwrap()
1618        .build()
1619        .unwrap()
1620        .changes;
1621
1622        pretty_assertions::assert_eq!(changes, vec![
1623            TableUpdate::SetLocation {
1624                location: TEST_LOCATION.to_string()
1625            },
1626            TableUpdate::AddSchema { schema: schema() },
1627            TableUpdate::SetCurrentSchema { schema_id: -1 },
1628            TableUpdate::AddSpec {
1629                // Because this is a new tables, field-ids are assigned
1630                // partition_spec() has None set for field-id
1631                spec: PartitionSpec::builder(schema())
1632                    .with_spec_id(0)
1633                    .add_unbound_field(UnboundPartitionField {
1634                        name: "y".to_string(),
1635                        transform: Transform::Identity,
1636                        source_id: 2,
1637                        field_id: Some(1000)
1638                    })
1639                    .unwrap()
1640                    .build()
1641                    .unwrap()
1642                    .into_unbound(),
1643            },
1644            TableUpdate::SetDefaultSpec { spec_id: -1 },
1645            TableUpdate::AddSortOrder {
1646                sort_order: sort_order(),
1647            },
1648            TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1649            TableUpdate::SetProperties {
1650                updates: HashMap::from_iter(vec![(
1651                    "property 1".to_string(),
1652                    "value 1".to_string()
1653                )]),
1654            }
1655        ]);
1656    }
1657
1658    #[test]
1659    fn test_new_metadata_changes_unpartitioned_unsorted() {
1660        let schema = Schema::builder().build().unwrap();
1661        let changes = TableMetadataBuilder::new(
1662            schema.clone(),
1663            PartitionSpec::unpartition_spec().into_unbound(),
1664            SortOrder::unsorted_order(),
1665            TEST_LOCATION.to_string(),
1666            FormatVersion::V1,
1667            HashMap::new(),
1668        )
1669        .unwrap()
1670        .build()
1671        .unwrap()
1672        .changes;
1673
1674        pretty_assertions::assert_eq!(changes, vec![
1675            TableUpdate::SetLocation {
1676                location: TEST_LOCATION.to_string()
1677            },
1678            TableUpdate::AddSchema {
1679                schema: Schema::builder().build().unwrap(),
1680            },
1681            TableUpdate::SetCurrentSchema { schema_id: -1 },
1682            TableUpdate::AddSpec {
1683                // Because this is a new tables, field-ids are assigned
1684                // partition_spec() has None set for field-id
1685                spec: PartitionSpec::builder(schema)
1686                    .with_spec_id(0)
1687                    .build()
1688                    .unwrap()
1689                    .into_unbound(),
1690            },
1691            TableUpdate::SetDefaultSpec { spec_id: -1 },
1692            TableUpdate::AddSortOrder {
1693                sort_order: SortOrder::unsorted_order(),
1694            },
1695            TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1696        ]);
1697    }
1698
1699    #[test]
1700    fn test_add_partition_spec() {
1701        let builder = builder_without_changes(FormatVersion::V2);
1702
1703        let added_spec = UnboundPartitionSpec::builder()
1704            .with_spec_id(10)
1705            .add_partition_fields(vec![
1706                UnboundPartitionField {
1707                    // The previous field - has field_id set
1708                    name: "y".to_string(),
1709                    transform: Transform::Identity,
1710                    source_id: 2,
1711                    field_id: Some(1000),
1712                },
1713                UnboundPartitionField {
1714                    // A new field without field id - should still be without field id in changes
1715                    name: "z".to_string(),
1716                    transform: Transform::Identity,
1717                    source_id: 3,
1718                    field_id: None,
1719                },
1720            ])
1721            .unwrap()
1722            .build();
1723
1724        let build_result = builder
1725            .add_partition_spec(added_spec.clone())
1726            .unwrap()
1727            .build()
1728            .unwrap();
1729
1730        // Spec id should be re-assigned
1731        let expected_change = added_spec.with_spec_id(1);
1732        let expected_spec = PartitionSpec::builder(schema())
1733            .with_spec_id(1)
1734            .add_unbound_field(UnboundPartitionField {
1735                name: "y".to_string(),
1736                transform: Transform::Identity,
1737                source_id: 2,
1738                field_id: Some(1000),
1739            })
1740            .unwrap()
1741            .add_unbound_field(UnboundPartitionField {
1742                name: "z".to_string(),
1743                transform: Transform::Identity,
1744                source_id: 3,
1745                field_id: Some(1001),
1746            })
1747            .unwrap()
1748            .build()
1749            .unwrap();
1750
1751        assert_eq!(build_result.changes.len(), 1);
1752        assert_eq!(
1753            build_result.metadata.partition_spec_by_id(1),
1754            Some(&Arc::new(expected_spec))
1755        );
1756        assert_eq!(build_result.metadata.default_spec.spec_id(), 0);
1757        assert_eq!(build_result.metadata.last_partition_id, 1001);
1758        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1759            spec: expected_change
1760        });
1761
1762        // Remove the spec
1763        let build_result = build_result
1764            .metadata
1765            .into_builder(Some(
1766                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1767            ))
1768            .remove_partition_specs(&[1])
1769            .unwrap()
1770            .build()
1771            .unwrap();
1772
1773        assert_eq!(build_result.changes.len(), 1);
1774        assert_eq!(build_result.metadata.partition_specs.len(), 1);
1775        assert!(build_result.metadata.partition_spec_by_id(1).is_none());
1776    }
1777
1778    #[test]
1779    fn test_set_default_partition_spec() {
1780        let builder = builder_without_changes(FormatVersion::V2);
1781        let schema = builder.get_current_schema().unwrap().clone();
1782        let added_spec = UnboundPartitionSpec::builder()
1783            .with_spec_id(10)
1784            .add_partition_field(1, "y_bucket[2]", Transform::Bucket(2))
1785            .unwrap()
1786            .build();
1787
1788        let build_result = builder
1789            .add_partition_spec(added_spec.clone())
1790            .unwrap()
1791            .set_default_partition_spec(-1)
1792            .unwrap()
1793            .build()
1794            .unwrap();
1795
1796        let expected_spec = PartitionSpec::builder(schema)
1797            .with_spec_id(1)
1798            .add_unbound_field(UnboundPartitionField {
1799                name: "y_bucket[2]".to_string(),
1800                transform: Transform::Bucket(2),
1801                source_id: 1,
1802                field_id: Some(1001),
1803            })
1804            .unwrap()
1805            .build()
1806            .unwrap();
1807
1808        assert_eq!(build_result.changes.len(), 2);
1809        assert_eq!(build_result.metadata.default_spec, Arc::new(expected_spec));
1810        assert_eq!(build_result.changes, vec![
1811            TableUpdate::AddSpec {
1812                // Should contain the actual ID that was used
1813                spec: added_spec.with_spec_id(1)
1814            },
1815            TableUpdate::SetDefaultSpec { spec_id: -1 }
1816        ]);
1817    }
1818
1819    #[test]
1820    fn test_set_existing_default_partition_spec() {
1821        let builder = builder_without_changes(FormatVersion::V2);
1822        // Add and set an unbound spec as current
1823        let unbound_spec = UnboundPartitionSpec::builder().with_spec_id(1).build();
1824        let build_result = builder
1825            .add_partition_spec(unbound_spec.clone())
1826            .unwrap()
1827            .set_default_partition_spec(-1)
1828            .unwrap()
1829            .build()
1830            .unwrap();
1831
1832        assert_eq!(build_result.changes.len(), 2);
1833        assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1834            spec: unbound_spec.clone()
1835        });
1836        assert_eq!(build_result.changes[1], TableUpdate::SetDefaultSpec {
1837            spec_id: -1
1838        });
1839        assert_eq!(
1840            build_result.metadata.default_spec,
1841            Arc::new(
1842                unbound_spec
1843                    .bind(build_result.metadata.current_schema().clone())
1844                    .unwrap()
1845            )
1846        );
1847
1848        // Set old spec again
1849        let build_result = build_result
1850            .metadata
1851            .into_builder(Some(
1852                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1853            ))
1854            .set_default_partition_spec(0)
1855            .unwrap()
1856            .build()
1857            .unwrap();
1858
1859        assert_eq!(build_result.changes.len(), 1);
1860        assert_eq!(build_result.changes[0], TableUpdate::SetDefaultSpec {
1861            spec_id: 0
1862        });
1863        assert_eq!(
1864            build_result.metadata.default_spec,
1865            Arc::new(
1866                partition_spec()
1867                    .bind(build_result.metadata.current_schema().clone())
1868                    .unwrap()
1869            )
1870        );
1871    }
1872
1873    #[test]
1874    fn test_add_sort_order() {
1875        let builder = builder_without_changes(FormatVersion::V2);
1876
1877        let added_sort_order = SortOrder::builder()
1878            .with_order_id(10)
1879            .with_fields(vec![SortField {
1880                source_id: 1,
1881                transform: Transform::Identity,
1882                direction: SortDirection::Ascending,
1883                null_order: NullOrder::First,
1884            }])
1885            .build(&schema())
1886            .unwrap();
1887
1888        let build_result = builder
1889            .add_sort_order(added_sort_order.clone())
1890            .unwrap()
1891            .build()
1892            .unwrap();
1893
1894        let expected_sort_order = added_sort_order.with_order_id(2);
1895
1896        assert_eq!(build_result.changes.len(), 1);
1897        assert_eq!(build_result.metadata.sort_orders.keys().max(), Some(&2));
1898        pretty_assertions::assert_eq!(
1899            build_result.metadata.sort_order_by_id(2),
1900            Some(&Arc::new(expected_sort_order.clone()))
1901        );
1902        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSortOrder {
1903            sort_order: expected_sort_order
1904        });
1905    }
1906
1907    #[test]
1908    fn test_add_compatible_schema() {
1909        let builder = builder_without_changes(FormatVersion::V2);
1910
1911        let added_schema = Schema::builder()
1912            .with_schema_id(1)
1913            .with_fields(vec![
1914                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1915                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1916                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1917                NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
1918            ])
1919            .build()
1920            .unwrap();
1921
1922        let build_result = builder
1923            .add_current_schema(added_schema.clone())
1924            .unwrap()
1925            .build()
1926            .unwrap();
1927
1928        assert_eq!(build_result.changes.len(), 2);
1929        assert_eq!(build_result.metadata.schemas.keys().max(), Some(&1));
1930        pretty_assertions::assert_eq!(
1931            build_result.metadata.schema_by_id(1),
1932            Some(&Arc::new(added_schema.clone()))
1933        );
1934        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSchema {
1935            schema: added_schema
1936        });
1937        assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
1938            schema_id: -1
1939        });
1940    }
1941
1942    #[test]
1943    fn test_set_current_schema_change_is_minus_one_if_schema_was_added_in_this_change() {
1944        let builder = builder_without_changes(FormatVersion::V2);
1945
1946        let added_schema = Schema::builder()
1947            .with_schema_id(1)
1948            .with_fields(vec![
1949                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1950                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1951                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1952                NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
1953            ])
1954            .build()
1955            .unwrap();
1956
1957        let build_result = builder
1958            .add_schema(added_schema.clone())
1959            .unwrap()
1960            .set_current_schema(1)
1961            .unwrap()
1962            .build()
1963            .unwrap();
1964
1965        assert_eq!(build_result.changes.len(), 2);
1966        assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
1967            schema_id: -1
1968        });
1969    }
1970
1971    #[test]
1972    fn test_no_metadata_log_for_create_table() {
1973        let build_result = TableMetadataBuilder::new(
1974            schema(),
1975            partition_spec(),
1976            sort_order(),
1977            TEST_LOCATION.to_string(),
1978            FormatVersion::V2,
1979            HashMap::new(),
1980        )
1981        .unwrap()
1982        .build()
1983        .unwrap();
1984
1985        assert_eq!(build_result.metadata.metadata_log.len(), 0);
1986    }
1987
1988    #[test]
1989    fn test_no_metadata_log_entry_for_no_previous_location() {
1990        // Used for first commit after stage-creation of tables
1991        let metadata = builder_without_changes(FormatVersion::V2)
1992            .build()
1993            .unwrap()
1994            .metadata;
1995        assert_eq!(metadata.metadata_log.len(), 1);
1996
1997        let build_result = metadata
1998            .into_builder(None)
1999            .set_properties(HashMap::from_iter(vec![(
2000                "foo".to_string(),
2001                "bar".to_string(),
2002            )]))
2003            .unwrap()
2004            .build()
2005            .unwrap();
2006
2007        assert_eq!(build_result.metadata.metadata_log.len(), 1);
2008    }
2009
2010    #[test]
2011    fn test_from_metadata_generates_metadata_log() {
2012        let metadata_path = "s3://bucket/test/location/metadata/metadata1.json";
2013        let builder = TableMetadataBuilder::new(
2014            schema(),
2015            partition_spec(),
2016            sort_order(),
2017            TEST_LOCATION.to_string(),
2018            FormatVersion::V2,
2019            HashMap::new(),
2020        )
2021        .unwrap()
2022        .build()
2023        .unwrap()
2024        .metadata
2025        .into_builder(Some(metadata_path.to_string()));
2026
2027        let builder = builder
2028            .add_default_sort_order(SortOrder::unsorted_order())
2029            .unwrap();
2030
2031        let build_result = builder.build().unwrap();
2032
2033        assert_eq!(build_result.metadata.metadata_log.len(), 1);
2034        assert_eq!(
2035            build_result.metadata.metadata_log[0].metadata_file,
2036            metadata_path
2037        );
2038    }
2039
2040    #[test]
2041    fn test_set_ref() {
2042        let builder = builder_without_changes(FormatVersion::V2);
2043
2044        let snapshot = Snapshot::builder()
2045            .with_snapshot_id(1)
2046            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2047            .with_sequence_number(0)
2048            .with_schema_id(0)
2049            .with_manifest_list("/snap-1.avro")
2050            .with_summary(Summary {
2051                operation: Operation::Append,
2052                additional_properties: HashMap::from_iter(vec![
2053                    (
2054                        "spark.app.id".to_string(),
2055                        "local-1662532784305".to_string(),
2056                    ),
2057                    ("added-data-files".to_string(), "4".to_string()),
2058                    ("added-records".to_string(), "4".to_string()),
2059                    ("added-files-size".to_string(), "6001".to_string()),
2060                ]),
2061            })
2062            .build();
2063
2064        let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2065
2066        assert!(
2067            builder
2068                .clone()
2069                .set_ref(MAIN_BRANCH, SnapshotReference {
2070                    snapshot_id: 10,
2071                    retention: SnapshotRetention::Branch {
2072                        min_snapshots_to_keep: Some(10),
2073                        max_snapshot_age_ms: None,
2074                        max_ref_age_ms: None,
2075                    },
2076                })
2077                .unwrap_err()
2078                .to_string()
2079                .contains("Cannot set 'main' to unknown snapshot: '10'")
2080        );
2081
2082        let build_result = builder
2083            .set_ref(MAIN_BRANCH, SnapshotReference {
2084                snapshot_id: 1,
2085                retention: SnapshotRetention::Branch {
2086                    min_snapshots_to_keep: Some(10),
2087                    max_snapshot_age_ms: None,
2088                    max_ref_age_ms: None,
2089                },
2090            })
2091            .unwrap()
2092            .build()
2093            .unwrap();
2094        assert_eq!(build_result.metadata.snapshots.len(), 1);
2095        assert_eq!(
2096            build_result.metadata.snapshot_by_id(1),
2097            Some(&Arc::new(snapshot.clone()))
2098        );
2099        assert_eq!(build_result.metadata.snapshot_log, vec![SnapshotLog {
2100            snapshot_id: 1,
2101            timestamp_ms: snapshot.timestamp_ms()
2102        }])
2103    }
2104
2105    #[test]
2106    fn test_snapshot_log_skips_intermediates() {
2107        let builder = builder_without_changes(FormatVersion::V2);
2108
2109        let snapshot_1 = Snapshot::builder()
2110            .with_snapshot_id(1)
2111            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2112            .with_sequence_number(0)
2113            .with_schema_id(0)
2114            .with_manifest_list("/snap-1.avro")
2115            .with_summary(Summary {
2116                operation: Operation::Append,
2117                additional_properties: HashMap::from_iter(vec![
2118                    (
2119                        "spark.app.id".to_string(),
2120                        "local-1662532784305".to_string(),
2121                    ),
2122                    ("added-data-files".to_string(), "4".to_string()),
2123                    ("added-records".to_string(), "4".to_string()),
2124                    ("added-files-size".to_string(), "6001".to_string()),
2125                ]),
2126            })
2127            .build();
2128
2129        let snapshot_2 = Snapshot::builder()
2130            .with_snapshot_id(2)
2131            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2132            .with_sequence_number(0)
2133            .with_schema_id(0)
2134            .with_manifest_list("/snap-1.avro")
2135            .with_summary(Summary {
2136                operation: Operation::Append,
2137                additional_properties: HashMap::from_iter(vec![
2138                    (
2139                        "spark.app.id".to_string(),
2140                        "local-1662532784305".to_string(),
2141                    ),
2142                    ("added-data-files".to_string(), "4".to_string()),
2143                    ("added-records".to_string(), "4".to_string()),
2144                    ("added-files-size".to_string(), "6001".to_string()),
2145                ]),
2146            })
2147            .build();
2148
2149        let result = builder
2150            .add_snapshot(snapshot_1)
2151            .unwrap()
2152            .set_ref(MAIN_BRANCH, SnapshotReference {
2153                snapshot_id: 1,
2154                retention: SnapshotRetention::Branch {
2155                    min_snapshots_to_keep: Some(10),
2156                    max_snapshot_age_ms: None,
2157                    max_ref_age_ms: None,
2158                },
2159            })
2160            .unwrap()
2161            .set_branch_snapshot(snapshot_2.clone(), MAIN_BRANCH)
2162            .unwrap()
2163            .build()
2164            .unwrap();
2165
2166        assert_eq!(result.metadata.snapshot_log, vec![SnapshotLog {
2167            snapshot_id: 2,
2168            timestamp_ms: snapshot_2.timestamp_ms()
2169        }]);
2170        assert_eq!(result.metadata.current_snapshot().unwrap().snapshot_id(), 2);
2171    }
2172
2173    #[test]
2174    fn test_set_branch_snapshot_creates_branch_if_not_exists() {
2175        let builder = builder_without_changes(FormatVersion::V2);
2176
2177        let snapshot = Snapshot::builder()
2178            .with_snapshot_id(2)
2179            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2180            .with_sequence_number(0)
2181            .with_schema_id(0)
2182            .with_manifest_list("/snap-1.avro")
2183            .with_summary(Summary {
2184                operation: Operation::Append,
2185                additional_properties: HashMap::new(),
2186            })
2187            .build();
2188
2189        let build_result = builder
2190            .set_branch_snapshot(snapshot.clone(), "new_branch")
2191            .unwrap()
2192            .build()
2193            .unwrap();
2194
2195        let reference = SnapshotReference {
2196            snapshot_id: 2,
2197            retention: SnapshotRetention::Branch {
2198                min_snapshots_to_keep: None,
2199                max_snapshot_age_ms: None,
2200                max_ref_age_ms: None,
2201            },
2202        };
2203
2204        assert_eq!(build_result.metadata.refs.len(), 1);
2205        assert_eq!(
2206            build_result.metadata.refs.get("new_branch"),
2207            Some(&reference)
2208        );
2209        assert_eq!(build_result.changes, vec![
2210            TableUpdate::AddSnapshot { snapshot },
2211            TableUpdate::SetSnapshotRef {
2212                ref_name: "new_branch".to_string(),
2213                reference
2214            }
2215        ]);
2216    }
2217
2218    #[test]
2219    fn test_cannot_add_duplicate_snapshot_id() {
2220        let builder = builder_without_changes(FormatVersion::V2);
2221
2222        let snapshot = Snapshot::builder()
2223            .with_snapshot_id(2)
2224            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2225            .with_sequence_number(0)
2226            .with_schema_id(0)
2227            .with_manifest_list("/snap-1.avro")
2228            .with_summary(Summary {
2229                operation: Operation::Append,
2230                additional_properties: HashMap::from_iter(vec![
2231                    (
2232                        "spark.app.id".to_string(),
2233                        "local-1662532784305".to_string(),
2234                    ),
2235                    ("added-data-files".to_string(), "4".to_string()),
2236                    ("added-records".to_string(), "4".to_string()),
2237                    ("added-files-size".to_string(), "6001".to_string()),
2238                ]),
2239            })
2240            .build();
2241
2242        let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2243        builder.add_snapshot(snapshot).unwrap_err();
2244    }
2245
2246    #[test]
2247    fn test_add_incompatible_current_schema_fails() {
2248        let builder = builder_without_changes(FormatVersion::V2);
2249
2250        let added_schema = Schema::builder()
2251            .with_schema_id(1)
2252            .with_fields(vec![])
2253            .build()
2254            .unwrap();
2255
2256        let err = builder
2257            .add_current_schema(added_schema)
2258            .unwrap()
2259            .build()
2260            .unwrap_err();
2261
2262        assert!(
2263            err.to_string()
2264                .contains("Cannot find partition source field")
2265        );
2266    }
2267
2268    #[test]
2269    fn test_add_partition_spec_for_v1_requires_sequential_ids() {
2270        let builder = builder_without_changes(FormatVersion::V1);
2271
2272        let added_spec = UnboundPartitionSpec::builder()
2273            .with_spec_id(10)
2274            .add_partition_fields(vec![
2275                UnboundPartitionField {
2276                    name: "y".to_string(),
2277                    transform: Transform::Identity,
2278                    source_id: 2,
2279                    field_id: Some(1000),
2280                },
2281                UnboundPartitionField {
2282                    name: "z".to_string(),
2283                    transform: Transform::Identity,
2284                    source_id: 3,
2285                    field_id: Some(1002),
2286                },
2287            ])
2288            .unwrap()
2289            .build();
2290
2291        let err = builder.add_partition_spec(added_spec).unwrap_err();
2292        assert!(err.to_string().contains(
2293            "Cannot add partition spec with non-sequential field ids to format version 1 table"
2294        ));
2295    }
2296
2297    #[test]
2298    fn test_expire_metadata_log() {
2299        let builder = builder_without_changes(FormatVersion::V2);
2300        let metadata = builder
2301            .set_properties(HashMap::from_iter(vec![(
2302                PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
2303                "2".to_string(),
2304            )]))
2305            .unwrap()
2306            .build()
2307            .unwrap();
2308        assert_eq!(metadata.metadata.metadata_log.len(), 1);
2309        assert_eq!(metadata.expired_metadata_logs.len(), 0);
2310
2311        let metadata = metadata
2312            .metadata
2313            .into_builder(Some("path2".to_string()))
2314            .set_properties(HashMap::from_iter(vec![(
2315                "change_nr".to_string(),
2316                "1".to_string(),
2317            )]))
2318            .unwrap()
2319            .build()
2320            .unwrap();
2321
2322        assert_eq!(metadata.metadata.metadata_log.len(), 2);
2323        assert_eq!(metadata.expired_metadata_logs.len(), 0);
2324
2325        let metadata = metadata
2326            .metadata
2327            .into_builder(Some("path2".to_string()))
2328            .set_properties(HashMap::from_iter(vec![(
2329                "change_nr".to_string(),
2330                "2".to_string(),
2331            )]))
2332            .unwrap()
2333            .build()
2334            .unwrap();
2335        assert_eq!(metadata.metadata.metadata_log.len(), 2);
2336        assert_eq!(metadata.expired_metadata_logs.len(), 1);
2337    }
2338
2339    #[test]
2340    fn test_v2_sequence_number_cannot_decrease() {
2341        let builder = builder_without_changes(FormatVersion::V2);
2342
2343        let snapshot = Snapshot::builder()
2344            .with_snapshot_id(1)
2345            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2346            .with_sequence_number(1)
2347            .with_schema_id(0)
2348            .with_manifest_list("/snap-1")
2349            .with_summary(Summary {
2350                operation: Operation::Append,
2351                additional_properties: HashMap::new(),
2352            })
2353            .build();
2354
2355        let builder = builder
2356            .add_snapshot(snapshot.clone())
2357            .unwrap()
2358            .set_ref(MAIN_BRANCH, SnapshotReference {
2359                snapshot_id: 1,
2360                retention: SnapshotRetention::Branch {
2361                    min_snapshots_to_keep: Some(10),
2362                    max_snapshot_age_ms: None,
2363                    max_ref_age_ms: None,
2364                },
2365            })
2366            .unwrap();
2367
2368        let snapshot = Snapshot::builder()
2369            .with_snapshot_id(2)
2370            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2371            .with_sequence_number(0)
2372            .with_schema_id(0)
2373            .with_manifest_list("/snap-0")
2374            .with_parent_snapshot_id(Some(1))
2375            .with_summary(Summary {
2376                operation: Operation::Append,
2377                additional_properties: HashMap::new(),
2378            })
2379            .build();
2380
2381        let err = builder
2382            .set_branch_snapshot(snapshot, MAIN_BRANCH)
2383            .unwrap_err();
2384        assert!(
2385            err.to_string()
2386                .contains("Cannot add snapshot with sequence number")
2387        );
2388    }
2389
2390    #[test]
2391    fn test_default_spec_cannot_be_removed() {
2392        let builder = builder_without_changes(FormatVersion::V2);
2393
2394        builder.remove_partition_specs(&[0]).unwrap_err();
2395    }
2396
2397    #[test]
2398    fn test_statistics() {
2399        let builder = builder_without_changes(FormatVersion::V2);
2400
2401        let statistics = StatisticsFile {
2402            snapshot_id: 3055729675574597004,
2403            statistics_path: "s3://a/b/stats.puffin".to_string(),
2404            file_size_in_bytes: 413,
2405            file_footer_size_in_bytes: 42,
2406            key_metadata: None,
2407            blob_metadata: vec![BlobMetadata {
2408                snapshot_id: 3055729675574597004,
2409                sequence_number: 1,
2410                fields: vec![1],
2411                r#type: "ndv".to_string(),
2412                properties: HashMap::new(),
2413            }],
2414        };
2415        let build_result = builder.set_statistics(statistics.clone()).build().unwrap();
2416
2417        assert_eq!(
2418            build_result.metadata.statistics,
2419            HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2420        );
2421        assert_eq!(build_result.changes, vec![TableUpdate::SetStatistics {
2422            statistics: statistics.clone()
2423        }]);
2424
2425        // Remove
2426        let builder = build_result.metadata.into_builder(None);
2427        let build_result = builder
2428            .remove_statistics(statistics.snapshot_id)
2429            .build()
2430            .unwrap();
2431
2432        assert_eq!(build_result.metadata.statistics.len(), 0);
2433        assert_eq!(build_result.changes, vec![TableUpdate::RemoveStatistics {
2434            snapshot_id: statistics.snapshot_id
2435        }]);
2436
2437        // Remove again yields no changes
2438        let builder = build_result.metadata.into_builder(None);
2439        let build_result = builder
2440            .remove_statistics(statistics.snapshot_id)
2441            .build()
2442            .unwrap();
2443        assert_eq!(build_result.metadata.statistics.len(), 0);
2444        assert_eq!(build_result.changes.len(), 0);
2445    }
2446
2447    #[test]
2448    fn test_add_partition_statistics() {
2449        let builder = builder_without_changes(FormatVersion::V2);
2450
2451        let statistics = PartitionStatisticsFile {
2452            snapshot_id: 3055729675574597004,
2453            statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2454            file_size_in_bytes: 43,
2455        };
2456
2457        let build_result = builder
2458            .set_partition_statistics(statistics.clone())
2459            .build()
2460            .unwrap();
2461        assert_eq!(
2462            build_result.metadata.partition_statistics,
2463            HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2464        );
2465        assert_eq!(build_result.changes, vec![
2466            TableUpdate::SetPartitionStatistics {
2467                partition_statistics: statistics.clone()
2468            }
2469        ]);
2470
2471        // Remove
2472        let builder = build_result.metadata.into_builder(None);
2473        let build_result = builder
2474            .remove_partition_statistics(statistics.snapshot_id)
2475            .build()
2476            .unwrap();
2477        assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2478        assert_eq!(build_result.changes, vec![
2479            TableUpdate::RemovePartitionStatistics {
2480                snapshot_id: statistics.snapshot_id
2481            }
2482        ]);
2483
2484        // Remove again yields no changes
2485        let builder = build_result.metadata.into_builder(None);
2486        let build_result = builder
2487            .remove_partition_statistics(statistics.snapshot_id)
2488            .build()
2489            .unwrap();
2490        assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2491        assert_eq!(build_result.changes.len(), 0);
2492    }
2493
2494    #[test]
2495    fn last_update_increased_for_property_only_update() {
2496        let builder = builder_without_changes(FormatVersion::V2);
2497
2498        let metadata = builder.build().unwrap().metadata;
2499        let last_updated_ms = metadata.last_updated_ms;
2500        sleep(std::time::Duration::from_millis(2));
2501
2502        let build_result = metadata
2503            .into_builder(Some(
2504                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2505            ))
2506            .set_properties(HashMap::from_iter(vec![(
2507                "foo".to_string(),
2508                "bar".to_string(),
2509            )]))
2510            .unwrap()
2511            .build()
2512            .unwrap();
2513
2514        assert!(
2515            build_result.metadata.last_updated_ms > last_updated_ms,
2516            "{} > {}",
2517            build_result.metadata.last_updated_ms,
2518            last_updated_ms
2519        );
2520    }
2521
2522    #[test]
2523    fn test_construct_default_main_branch() {
2524        // Load the table without ref
2525        let file = File::open(format!(
2526            "{}/testdata/table_metadata/{}",
2527            env!("CARGO_MANIFEST_DIR"),
2528            "TableMetadataV2Valid.json"
2529        ))
2530        .unwrap();
2531        let reader = BufReader::new(file);
2532        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2533
2534        let table = Table::builder()
2535            .metadata(resp)
2536            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2537            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2538            .file_io(FileIOBuilder::new("memory").build().unwrap())
2539            .build()
2540            .unwrap();
2541
2542        assert_eq!(
2543            table.metadata().refs.get(MAIN_BRANCH).unwrap().snapshot_id,
2544            table.metadata().current_snapshot_id().unwrap()
2545        );
2546    }
2547
2548    #[test]
2549    fn test_active_schema_cannot_be_removed() {
2550        let builder = builder_without_changes(FormatVersion::V2);
2551        builder.remove_schemas(&[0]).unwrap_err();
2552    }
2553
2554    #[test]
2555    fn test_remove_schemas() {
2556        let file = File::open(format!(
2557            "{}/testdata/table_metadata/{}",
2558            env!("CARGO_MANIFEST_DIR"),
2559            "TableMetadataV2Valid.json"
2560        ))
2561        .unwrap();
2562        let reader = BufReader::new(file);
2563        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2564
2565        let table = Table::builder()
2566            .metadata(resp)
2567            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2568            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2569            .file_io(FileIOBuilder::new("memory").build().unwrap())
2570            .build()
2571            .unwrap();
2572
2573        assert_eq!(2, table.metadata().schemas.len());
2574
2575        {
2576            // can not remove active schema
2577            let meta_data_builder = table.metadata().clone().into_builder(None);
2578            meta_data_builder.remove_schemas(&[1]).unwrap_err();
2579        }
2580
2581        let mut meta_data_builder = table.metadata().clone().into_builder(None);
2582        meta_data_builder = meta_data_builder.remove_schemas(&[0]).unwrap();
2583        let build_result = meta_data_builder.build().unwrap();
2584        assert_eq!(1, build_result.metadata.schemas.len());
2585        assert_eq!(1, build_result.metadata.current_schema_id);
2586        assert_eq!(1, build_result.metadata.current_schema().schema_id());
2587        assert_eq!(1, build_result.changes.len());
2588
2589        let remove_schema_ids =
2590            if let TableUpdate::RemoveSchemas { schema_ids } = &build_result.changes[0] {
2591                schema_ids
2592            } else {
2593                unreachable!("Expected RemoveSchema change")
2594            };
2595        assert_eq!(remove_schema_ids, &[0]);
2596    }
2597
2598    #[test]
2599    fn test_schema_evolution_now_correctly_validates_partition_field_name_conflicts() {
2600        let initial_schema = Schema::builder()
2601            .with_fields(vec![
2602                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2603            ])
2604            .build()
2605            .unwrap();
2606
2607        let partition_spec_with_bucket = UnboundPartitionSpec::builder()
2608            .with_spec_id(0)
2609            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2610            .unwrap()
2611            .build();
2612
2613        let metadata = TableMetadataBuilder::new(
2614            initial_schema,
2615            partition_spec_with_bucket,
2616            SortOrder::unsorted_order(),
2617            TEST_LOCATION.to_string(),
2618            FormatVersion::V2,
2619            HashMap::new(),
2620        )
2621        .unwrap()
2622        .build()
2623        .unwrap()
2624        .metadata;
2625
2626        let partition_field_names: Vec<String> = metadata
2627            .default_partition_spec()
2628            .fields()
2629            .iter()
2630            .map(|f| f.name.clone())
2631            .collect();
2632        assert!(partition_field_names.contains(&"bucket_data".to_string()));
2633
2634        let evolved_schema = Schema::builder()
2635            .with_fields(vec![
2636                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2637                // Adding a schema field with the same name as an existing partition field
2638                NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
2639            ])
2640            .build()
2641            .unwrap();
2642
2643        let builder = metadata.into_builder(Some(
2644            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2645        ));
2646
2647        // Try to add the evolved schema - this should now fail immediately with a clear error
2648        let result = builder.add_current_schema(evolved_schema);
2649
2650        assert!(result.is_err());
2651        let error = result.unwrap_err();
2652        let error_message = error.message();
2653        assert!(error_message.contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2654        assert!(error_message.contains("Schema evolution cannot introduce field names that match existing partition field names"));
2655    }
2656
2657    #[test]
2658    fn test_schema_evolution_should_validate_on_schema_add_not_metadata_build() {
2659        let initial_schema = Schema::builder()
2660            .with_fields(vec![
2661                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2662            ])
2663            .build()
2664            .unwrap();
2665
2666        let partition_spec = UnboundPartitionSpec::builder()
2667            .with_spec_id(0)
2668            .add_partition_field(1, "partition_col", Transform::Bucket(16))
2669            .unwrap()
2670            .build();
2671
2672        let metadata = TableMetadataBuilder::new(
2673            initial_schema,
2674            partition_spec,
2675            SortOrder::unsorted_order(),
2676            TEST_LOCATION.to_string(),
2677            FormatVersion::V2,
2678            HashMap::new(),
2679        )
2680        .unwrap()
2681        .build()
2682        .unwrap()
2683        .metadata;
2684
2685        let non_conflicting_schema = Schema::builder()
2686            .with_fields(vec![
2687                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2688                NestedField::required(2, "new_field", Type::Primitive(PrimitiveType::Int)).into(),
2689            ])
2690            .build()
2691            .unwrap();
2692
2693        // This should succeed since there's no name conflict
2694        let result = metadata
2695            .clone()
2696            .into_builder(Some("test_location".to_string()))
2697            .add_current_schema(non_conflicting_schema)
2698            .unwrap()
2699            .build();
2700
2701        assert!(result.is_ok());
2702    }
2703
2704    #[test]
2705    fn test_partition_spec_evolution_validates_schema_field_name_conflicts() {
2706        let initial_schema = Schema::builder()
2707            .with_fields(vec![
2708                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2709                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2710                    .into(),
2711            ])
2712            .build()
2713            .unwrap();
2714
2715        let partition_spec = UnboundPartitionSpec::builder()
2716            .with_spec_id(0)
2717            .add_partition_field(1, "data_bucket", Transform::Bucket(16))
2718            .unwrap()
2719            .build();
2720
2721        let metadata = TableMetadataBuilder::new(
2722            initial_schema,
2723            partition_spec,
2724            SortOrder::unsorted_order(),
2725            TEST_LOCATION.to_string(),
2726            FormatVersion::V2,
2727            HashMap::new(),
2728        )
2729        .unwrap()
2730        .build()
2731        .unwrap()
2732        .metadata;
2733
2734        let builder = metadata.into_builder(Some(
2735            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2736        ));
2737
2738        let conflicting_partition_spec = UnboundPartitionSpec::builder()
2739            .with_spec_id(1)
2740            .add_partition_field(1, "existing_field", Transform::Bucket(8))
2741            .unwrap()
2742            .build();
2743
2744        let result = builder.add_partition_spec(conflicting_partition_spec);
2745
2746        assert!(result.is_err());
2747        let error = result.unwrap_err();
2748        let error_message = error.message();
2749        // The error comes from our multi-version validation
2750        assert!(error_message.contains(
2751            "Cannot create partition with name 'existing_field' that conflicts with schema field"
2752        ));
2753        assert!(error_message.contains("and is not an identity transform"));
2754    }
2755
2756    #[test]
2757    fn test_schema_evolution_validates_against_all_historical_schemas() {
2758        // Create a table with an initial schema that has a field "existing_field"
2759        let initial_schema = Schema::builder()
2760            .with_fields(vec![
2761                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2762                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2763                    .into(),
2764            ])
2765            .build()
2766            .unwrap();
2767
2768        let partition_spec = UnboundPartitionSpec::builder()
2769            .with_spec_id(0)
2770            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2771            .unwrap()
2772            .build();
2773
2774        let metadata = TableMetadataBuilder::new(
2775            initial_schema,
2776            partition_spec,
2777            SortOrder::unsorted_order(),
2778            TEST_LOCATION.to_string(),
2779            FormatVersion::V2,
2780            HashMap::new(),
2781        )
2782        .unwrap()
2783        .build()
2784        .unwrap()
2785        .metadata;
2786
2787        // Add a second schema that removes the existing_field but keeps the data field
2788        let second_schema = Schema::builder()
2789            .with_fields(vec![
2790                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2791                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2792                    .into(),
2793            ])
2794            .build()
2795            .unwrap();
2796
2797        let metadata = metadata
2798            .into_builder(Some("test_location".to_string()))
2799            .add_current_schema(second_schema)
2800            .unwrap()
2801            .build()
2802            .unwrap()
2803            .metadata;
2804
2805        // Now try to add a third schema that reintroduces "existing_field"
2806        // This should succeed because "existing_field" exists in a historical schema,
2807        // even though there's a partition field named "bucket_data"
2808        let third_schema = Schema::builder()
2809            .with_fields(vec![
2810                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2811                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2812                    .into(),
2813                NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2814                    .into(),
2815            ])
2816            .build()
2817            .unwrap();
2818
2819        let builder = metadata
2820            .clone()
2821            .into_builder(Some("test_location".to_string()));
2822
2823        // This should succeed because "existing_field" exists in a historical schema
2824        let result = builder.add_current_schema(third_schema);
2825        assert!(result.is_ok());
2826
2827        // However, trying to add a schema field that conflicts with the partition field
2828        // and doesn't exist in any historical schema should fail
2829        let conflicting_schema = Schema::builder()
2830            .with_fields(vec![
2831                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2832                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2833                    .into(),
2834                NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2835                    .into(),
2836                NestedField::required(5, "bucket_data", Type::Primitive(PrimitiveType::String))
2837                    .into(), // conflicts with partition field
2838            ])
2839            .build()
2840            .unwrap();
2841
2842        let builder2 = metadata.into_builder(Some("test_location".to_string()));
2843        let result2 = builder2.add_current_schema(conflicting_schema);
2844
2845        // This should fail because "bucket_data" conflicts with partition field name
2846        // and doesn't exist in any historical schema
2847        assert!(result2.is_err());
2848        let error = result2.unwrap_err();
2849        assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2850    }
2851
2852    #[test]
2853    fn test_schema_evolution_allows_existing_partition_field_if_exists_in_historical_schema() {
2854        // Create initial schema with a field
2855        let initial_schema = Schema::builder()
2856            .with_fields(vec![
2857                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2858                NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
2859                    .into(),
2860            ])
2861            .build()
2862            .unwrap();
2863
2864        let partition_spec = UnboundPartitionSpec::builder()
2865            .with_spec_id(0)
2866            .add_partition_field(2, "partition_data", Transform::Identity)
2867            .unwrap()
2868            .build();
2869
2870        let metadata = TableMetadataBuilder::new(
2871            initial_schema,
2872            partition_spec,
2873            SortOrder::unsorted_order(),
2874            TEST_LOCATION.to_string(),
2875            FormatVersion::V2,
2876            HashMap::new(),
2877        )
2878        .unwrap()
2879        .build()
2880        .unwrap()
2881        .metadata;
2882
2883        // Add a new schema that still contains the partition_data field
2884        let evolved_schema = Schema::builder()
2885            .with_fields(vec![
2886                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2887                NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
2888                    .into(),
2889                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2890                    .into(),
2891            ])
2892            .build()
2893            .unwrap();
2894
2895        // This should succeed because partition_data exists in historical schemas
2896        let result = metadata
2897            .into_builder(Some("test_location".to_string()))
2898            .add_current_schema(evolved_schema);
2899
2900        assert!(result.is_ok());
2901    }
2902
2903    #[test]
2904    fn test_schema_evolution_prevents_new_field_conflicting_with_partition_field() {
2905        // Create initial schema WITHOUT the conflicting field
2906        let initial_schema = Schema::builder()
2907            .with_fields(vec![
2908                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2909            ])
2910            .build()
2911            .unwrap();
2912
2913        let partition_spec = UnboundPartitionSpec::builder()
2914            .with_spec_id(0)
2915            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2916            .unwrap()
2917            .build();
2918
2919        let metadata = TableMetadataBuilder::new(
2920            initial_schema,
2921            partition_spec,
2922            SortOrder::unsorted_order(),
2923            TEST_LOCATION.to_string(),
2924            FormatVersion::V2,
2925            HashMap::new(),
2926        )
2927        .unwrap()
2928        .build()
2929        .unwrap()
2930        .metadata;
2931
2932        // Try to add a schema with a field that conflicts with partition field name
2933        let conflicting_schema = Schema::builder()
2934            .with_fields(vec![
2935                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2936                // This field name conflicts with the partition field "bucket_data"
2937                NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
2938            ])
2939            .build()
2940            .unwrap();
2941
2942        let builder = metadata.into_builder(Some("test_location".to_string()));
2943        let result = builder.add_current_schema(conflicting_schema);
2944
2945        // This should fail because "bucket_data" conflicts with partition field name
2946        // and doesn't exist in any historical schema
2947        assert!(result.is_err());
2948        let error = result.unwrap_err();
2949        assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2950    }
2951
2952    #[test]
2953    fn test_partition_spec_evolution_allows_non_conflicting_names() {
2954        let initial_schema = Schema::builder()
2955            .with_fields(vec![
2956                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2957                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2958                    .into(),
2959            ])
2960            .build()
2961            .unwrap();
2962
2963        let partition_spec = UnboundPartitionSpec::builder()
2964            .with_spec_id(0)
2965            .add_partition_field(1, "data_bucket", Transform::Bucket(16))
2966            .unwrap()
2967            .build();
2968
2969        let metadata = TableMetadataBuilder::new(
2970            initial_schema,
2971            partition_spec,
2972            SortOrder::unsorted_order(),
2973            TEST_LOCATION.to_string(),
2974            FormatVersion::V2,
2975            HashMap::new(),
2976        )
2977        .unwrap()
2978        .build()
2979        .unwrap()
2980        .metadata;
2981
2982        let builder = metadata.into_builder(Some(
2983            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2984        ));
2985
2986        // Try to add a partition spec with a field name that does NOT conflict with existing schema fields
2987        let non_conflicting_partition_spec = UnboundPartitionSpec::builder()
2988            .with_spec_id(1)
2989            .add_partition_field(2, "new_partition_field", Transform::Bucket(8))
2990            .unwrap()
2991            .build();
2992
2993        let result = builder.add_partition_spec(non_conflicting_partition_spec);
2994
2995        assert!(result.is_ok());
2996    }
2997}