iceberg/spec/
table_metadata_builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use uuid::Uuid;
22
23use super::{
24    DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, FormatVersion, MAIN_BRANCH, MetadataLog,
25    ONE_MINUTE_MS, PartitionSpec, PartitionSpecBuilder, PartitionStatisticsFile, Schema, SchemaRef,
26    Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef,
27    StatisticsFile, StructType, TableMetadata, TableProperties, UNPARTITIONED_LAST_ASSIGNED_ID,
28    UnboundPartitionSpec,
29};
30use crate::error::{Error, ErrorKind, Result};
31use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE};
32use crate::{TableCreation, TableUpdate};
33
34const FIRST_FIELD_ID: u32 = 1;
35
36/// Manipulating table metadata.
37///
38/// For this builder the order of called functions matters. Functions are applied in-order.
39/// All operations applied to the `TableMetadata` are tracked in `changes` as  a chronologically
40/// ordered vec of `TableUpdate`.
41/// If an operation does not lead to a change of the `TableMetadata`, the corresponding update
42/// is omitted from `changes`.
43///
44/// Unlike a typical builder pattern, the order of function calls matters.
45/// Some basic rules:
46/// - `add_schema` must be called before `set_current_schema`.
47/// - If a new partition spec and schema are added, the schema should be added first.
48#[derive(Debug, Clone)]
49pub struct TableMetadataBuilder {
50    metadata: TableMetadata,
51    changes: Vec<TableUpdate>,
52    last_added_schema_id: Option<i32>,
53    last_added_spec_id: Option<i32>,
54    last_added_order_id: Option<i64>,
55    // None if this is a new table (from_metadata) method not used
56    previous_history_entry: Option<MetadataLog>,
57    last_updated_ms: Option<i64>,
58}
59
60#[derive(Debug, Clone, PartialEq)]
61/// Result of modifying or creating a `TableMetadata`.
62pub struct TableMetadataBuildResult {
63    /// The new `TableMetadata`.
64    pub metadata: TableMetadata,
65    /// The changes that were applied to the metadata.
66    pub changes: Vec<TableUpdate>,
67    /// Expired metadata logs
68    pub expired_metadata_logs: Vec<MetadataLog>,
69}
70
71impl TableMetadataBuilder {
72    /// Proxy id for "last added" items, including schema, partition spec, sort order.
73    pub const LAST_ADDED: i32 = -1;
74
75    /// Create a `TableMetadata` object from scratch.
76    ///
77    /// This method re-assign ids of fields in the schema, schema.id, sort_order.id and
78    /// spec.id. It should only be used to create new table metadata from scratch.
79    pub fn new(
80        schema: Schema,
81        spec: impl Into<UnboundPartitionSpec>,
82        sort_order: SortOrder,
83        location: String,
84        format_version: FormatVersion,
85        properties: HashMap<String, String>,
86    ) -> Result<Self> {
87        // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new table.
88        let (fresh_schema, fresh_spec, fresh_sort_order) =
89            Self::reassign_ids(schema, spec.into(), sort_order)?;
90        let schema_id = fresh_schema.schema_id();
91
92        let builder = Self {
93            metadata: TableMetadata {
94                format_version,
95                table_uuid: Uuid::now_v7(),
96                location: "".to_string(), // Overwritten immediately by set_location
97                last_sequence_number: 0,
98                last_updated_ms: 0,    // Overwritten by build() if not set before
99                last_column_id: -1,    // Overwritten immediately by add_current_schema
100                current_schema_id: -1, // Overwritten immediately by add_current_schema
101                schemas: HashMap::new(),
102                partition_specs: HashMap::new(),
103                default_spec: Arc::new(
104                    // The spec id (-1) is just a proxy value and can be any negative number.
105                    // 0 would lead to wrong changes in the builder if the provided spec by the user is
106                    // also unpartitioned.
107                    // The `default_spec` value is always replaced at the end of this method by he `add_default_partition_spec`
108                    // method.
109                    PartitionSpec::unpartition_spec().with_spec_id(-1),
110                ), // Overwritten immediately by add_default_partition_spec
111                default_partition_type: StructType::new(vec![]),
112                last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
113                properties: HashMap::new(),
114                current_snapshot_id: None,
115                snapshots: HashMap::new(),
116                snapshot_log: vec![],
117                sort_orders: HashMap::new(),
118                metadata_log: vec![],
119                default_sort_order_id: -1, // Overwritten immediately by add_default_sort_order
120                refs: HashMap::default(),
121                statistics: HashMap::new(),
122                partition_statistics: HashMap::new(),
123                encryption_keys: HashMap::new(),
124                next_row_id: INITIAL_ROW_ID,
125            },
126            last_updated_ms: None,
127            changes: vec![],
128            last_added_schema_id: Some(schema_id),
129            last_added_spec_id: None,
130            last_added_order_id: None,
131            previous_history_entry: None,
132        };
133
134        builder
135            .set_location(location)
136            .add_current_schema(fresh_schema)?
137            .add_default_partition_spec(fresh_spec.into_unbound())?
138            .add_default_sort_order(fresh_sort_order)?
139            .set_properties(properties)
140    }
141
142    /// Creates a new table metadata builder from the given metadata to modify it.
143    /// `current_file_location` is the location where the current version
144    /// of the metadata file is stored. This is used to update the metadata log.
145    /// If `current_file_location` is `None`, the metadata log will not be updated.
146    /// This should only be used to stage-create tables.
147    #[must_use]
148    pub fn new_from_metadata(
149        previous: TableMetadata,
150        current_file_location: Option<String>,
151    ) -> Self {
152        Self {
153            previous_history_entry: current_file_location.map(|l| MetadataLog {
154                metadata_file: l,
155                timestamp_ms: previous.last_updated_ms,
156            }),
157            metadata: previous,
158            changes: Vec::default(),
159            last_added_schema_id: None,
160            last_added_spec_id: None,
161            last_added_order_id: None,
162            last_updated_ms: None,
163        }
164    }
165
166    /// Creates a new table metadata builder from the given table creation.
167    pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
168        let TableCreation {
169            name: _,
170            location,
171            schema,
172            partition_spec,
173            sort_order,
174            properties,
175            format_version,
176        } = table_creation;
177
178        let location = location.ok_or_else(|| {
179            Error::new(
180                ErrorKind::DataInvalid,
181                "Can't create table without location",
182            )
183        })?;
184        let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
185            spec_id: None,
186            fields: vec![],
187        });
188
189        Self::new(
190            schema,
191            partition_spec,
192            sort_order.unwrap_or(SortOrder::unsorted_order()),
193            location,
194            format_version,
195            properties,
196        )
197    }
198
199    /// Changes uuid of table metadata.
200    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
201        if self.metadata.table_uuid != uuid {
202            self.metadata.table_uuid = uuid;
203            self.changes.push(TableUpdate::AssignUuid { uuid });
204        }
205
206        self
207    }
208
209    /// Upgrade `FormatVersion`. Downgrades are not allowed.
210    ///
211    /// # Errors
212    /// - Cannot downgrade to older format versions.
213    pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> Result<Self> {
214        if format_version < self.metadata.format_version {
215            return Err(Error::new(
216                ErrorKind::DataInvalid,
217                format!(
218                    "Cannot downgrade FormatVersion from {} to {}",
219                    self.metadata.format_version, format_version
220                ),
221            ));
222        }
223
224        if format_version != self.metadata.format_version {
225            match format_version {
226                FormatVersion::V1 => {
227                    // No changes needed for V1
228                }
229                FormatVersion::V2 => {
230                    self.metadata.format_version = format_version;
231                    self.changes
232                        .push(TableUpdate::UpgradeFormatVersion { format_version });
233                }
234                FormatVersion::V3 => {
235                    self.metadata.format_version = format_version;
236                    self.changes
237                        .push(TableUpdate::UpgradeFormatVersion { format_version });
238                }
239            }
240        }
241
242        Ok(self)
243    }
244
245    /// Set properties. If a property already exists, it will be overwritten.
246    ///
247    /// If a reserved property is set, the corresponding action is performed and the property is not persisted.
248    /// Currently the following reserved properties are supported:
249    /// * format-version: Set the format version of the table.
250    ///
251    /// # Errors
252    /// - If properties contains a reserved property
253    pub fn set_properties(mut self, properties: HashMap<String, String>) -> Result<Self> {
254        // List of specified properties that are RESERVED and should not be persisted.
255        let reserved_properties = properties
256            .keys()
257            .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
258            .map(ToString::to_string)
259            .collect::<Vec<_>>();
260
261        if !reserved_properties.is_empty() {
262            return Err(Error::new(
263                ErrorKind::DataInvalid,
264                format!(
265                    "Table properties should not contain reserved properties, but got: [{}]",
266                    reserved_properties.join(", ")
267                ),
268            ));
269        }
270
271        if properties.is_empty() {
272            return Ok(self);
273        }
274
275        self.metadata.properties.extend(properties.clone());
276        self.changes.push(TableUpdate::SetProperties {
277            updates: properties,
278        });
279
280        Ok(self)
281    }
282
283    /// Remove properties from the table metadata.
284    /// Does nothing if the key is not present.
285    ///
286    /// # Errors
287    /// - If properties to remove contains a reserved property
288    pub fn remove_properties(mut self, properties: &[String]) -> Result<Self> {
289        // remove duplicates
290        let properties = properties.iter().cloned().collect::<HashSet<_>>();
291
292        // disallow removal of reserved properties
293        let reserved_properties = properties
294            .iter()
295            .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
296            .map(ToString::to_string)
297            .collect::<Vec<_>>();
298
299        if !reserved_properties.is_empty() {
300            return Err(Error::new(
301                ErrorKind::DataInvalid,
302                format!(
303                    "Table properties to remove contain reserved properties: [{}]",
304                    reserved_properties.join(", ")
305                ),
306            ));
307        }
308
309        for property in &properties {
310            self.metadata.properties.remove(property);
311        }
312
313        if !properties.is_empty() {
314            self.changes.push(TableUpdate::RemoveProperties {
315                removals: properties.into_iter().collect(),
316            });
317        }
318
319        Ok(self)
320    }
321
322    /// Set the location of the table, stripping any trailing slashes.
323    pub fn set_location(mut self, location: String) -> Self {
324        let location = location.trim_end_matches('/').to_string();
325        if self.metadata.location != location {
326            self.changes.push(TableUpdate::SetLocation {
327                location: location.clone(),
328            });
329            self.metadata.location = location;
330        }
331
332        self
333    }
334
335    /// Add a snapshot to the table metadata.
336    ///
337    /// # Errors
338    /// - Snapshot id already exists.
339    /// - For format version > 1: the sequence number of the snapshot is lower than the highest sequence number specified so far.
340    /// - For format version >= 3: the first-row-id of the snapshot is lower than the next-row-id of the table.
341    /// - For format version >= 3: added-rows is null or first-row-id is null.
342    /// - For format version >= 3: next-row-id would overflow when adding added-rows.
343    pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
344        if self
345            .metadata
346            .snapshots
347            .contains_key(&snapshot.snapshot_id())
348        {
349            return Err(Error::new(
350                ErrorKind::DataInvalid,
351                format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()),
352            ));
353        }
354
355        if self.metadata.format_version != FormatVersion::V1
356            && snapshot.sequence_number() <= self.metadata.last_sequence_number
357            && snapshot.parent_snapshot_id().is_some()
358        {
359            return Err(Error::new(
360                ErrorKind::DataInvalid,
361                format!(
362                    "Cannot add snapshot with sequence number {} older than last sequence number {}",
363                    snapshot.sequence_number(),
364                    self.metadata.last_sequence_number
365                ),
366            ));
367        }
368
369        if let Some(last) = self.metadata.snapshot_log.last() {
370            // commits can happen concurrently from different machines.
371            // A tolerance helps us avoid failure for small clock skew
372            if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
373                return Err(Error::new(
374                    ErrorKind::DataInvalid,
375                    format!(
376                        "Invalid snapshot timestamp {}: before last snapshot timestamp {}",
377                        snapshot.timestamp_ms(),
378                        last.timestamp_ms
379                    ),
380                ));
381            }
382        }
383
384        let max_last_updated = self
385            .last_updated_ms
386            .unwrap_or_default()
387            .max(self.metadata.last_updated_ms);
388        if snapshot.timestamp_ms() - max_last_updated < -ONE_MINUTE_MS {
389            return Err(Error::new(
390                ErrorKind::DataInvalid,
391                format!(
392                    "Invalid snapshot timestamp {}: before last updated timestamp {}",
393                    snapshot.timestamp_ms(),
394                    max_last_updated
395                ),
396            ));
397        }
398
399        let mut added_rows = None;
400        if self.metadata.format_version >= MIN_FORMAT_VERSION_ROW_LINEAGE {
401            if let Some((first_row_id, added_rows_count)) = snapshot.row_range() {
402                if first_row_id < self.metadata.next_row_id {
403                    return Err(Error::new(
404                        ErrorKind::DataInvalid,
405                        format!(
406                            "Cannot add a snapshot, first-row-id is behind table next-row-id: {first_row_id} < {}",
407                            self.metadata.next_row_id
408                        ),
409                    ));
410                }
411
412                added_rows = Some(added_rows_count);
413            } else {
414                return Err(Error::new(
415                    ErrorKind::DataInvalid,
416                    format!(
417                        "Cannot add a snapshot: first-row-id is null. first-row-id must be set for format version >= {MIN_FORMAT_VERSION_ROW_LINEAGE}",
418                    ),
419                ));
420            }
421        }
422
423        if let Some(added_rows) = added_rows {
424            self.metadata.next_row_id = self
425                .metadata
426                .next_row_id
427                .checked_add(added_rows)
428                .ok_or_else(|| {
429                    Error::new(
430                        ErrorKind::DataInvalid,
431                        "Cannot add snapshot: next-row-id overflowed when adding added-rows",
432                    )
433                })?;
434        }
435
436        // Mutation happens in next line - must be infallible from here
437        self.changes.push(TableUpdate::AddSnapshot {
438            snapshot: snapshot.clone(),
439        });
440
441        self.last_updated_ms = Some(snapshot.timestamp_ms());
442        self.metadata.last_sequence_number = snapshot.sequence_number();
443        self.metadata
444            .snapshots
445            .insert(snapshot.snapshot_id(), snapshot.into());
446
447        Ok(self)
448    }
449
450    /// Append a snapshot to the specified branch.
451    /// Retention settings from the `branch` are re-used.
452    ///
453    /// # Errors
454    /// - Any of the preconditions of `self.add_snapshot` are not met.
455    pub fn set_branch_snapshot(self, snapshot: Snapshot, branch: &str) -> Result<Self> {
456        let reference = self.metadata.refs.get(branch).cloned();
457
458        let reference = if let Some(mut reference) = reference {
459            if !reference.is_branch() {
460                return Err(Error::new(
461                    ErrorKind::DataInvalid,
462                    format!("Cannot append snapshot to non-branch reference '{branch}'",),
463                ));
464            }
465
466            reference.snapshot_id = snapshot.snapshot_id();
467            reference
468        } else {
469            SnapshotReference {
470                snapshot_id: snapshot.snapshot_id(),
471                retention: SnapshotRetention::Branch {
472                    min_snapshots_to_keep: None,
473                    max_snapshot_age_ms: None,
474                    max_ref_age_ms: None,
475                },
476            }
477        };
478
479        self.add_snapshot(snapshot)?.set_ref(branch, reference)
480    }
481
482    /// Remove snapshots by its ids from the table metadata.
483    /// Does nothing if a snapshot id is not present.
484    /// Keeps as changes only the snapshots that were actually removed.
485    pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
486        let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
487
488        self.metadata.snapshots.retain(|k, _| {
489            if snapshot_ids.contains(k) {
490                removed_snapshots.push(*k);
491                false
492            } else {
493                true
494            }
495        });
496
497        if !removed_snapshots.is_empty() {
498            self.changes.push(TableUpdate::RemoveSnapshots {
499                snapshot_ids: removed_snapshots,
500            });
501        }
502
503        // Remove refs that are no longer valid
504        self.metadata
505            .refs
506            .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id));
507
508        self
509    }
510
511    /// Set a reference to a snapshot.
512    ///
513    /// # Errors
514    /// - The snapshot id is unknown.
515    pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> Result<Self> {
516        if self
517            .metadata
518            .refs
519            .get(ref_name)
520            .is_some_and(|snap_ref| snap_ref.eq(&reference))
521        {
522            return Ok(self);
523        }
524
525        let Some(snapshot) = self.metadata.snapshots.get(&reference.snapshot_id) else {
526            return Err(Error::new(
527                ErrorKind::DataInvalid,
528                format!(
529                    "Cannot set '{ref_name}' to unknown snapshot: '{}'",
530                    reference.snapshot_id
531                ),
532            ));
533        };
534
535        // Update last_updated_ms to the exact timestamp of the snapshot if it was added in this commit
536        let is_added_snapshot = self.changes.iter().any(|update| {
537            matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if snap.snapshot_id() == snapshot.snapshot_id())
538        });
539        if is_added_snapshot {
540            self.last_updated_ms = Some(snapshot.timestamp_ms());
541        }
542
543        // Current snapshot id is set only for the main branch
544        if ref_name == MAIN_BRANCH {
545            self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
546            let timestamp_ms = if let Some(last_updated_ms) = self.last_updated_ms {
547                last_updated_ms
548            } else {
549                let last_updated_ms = chrono::Utc::now().timestamp_millis();
550                self.last_updated_ms = Some(last_updated_ms);
551                last_updated_ms
552            };
553
554            self.metadata.snapshot_log.push(SnapshotLog {
555                snapshot_id: snapshot.snapshot_id(),
556                timestamp_ms,
557            });
558        }
559
560        self.changes.push(TableUpdate::SetSnapshotRef {
561            ref_name: ref_name.to_string(),
562            reference: reference.clone(),
563        });
564        self.metadata.refs.insert(ref_name.to_string(), reference);
565
566        Ok(self)
567    }
568
569    /// Remove a reference
570    ///
571    /// If `ref_name='main'` the current snapshot id is set to -1.
572    pub fn remove_ref(mut self, ref_name: &str) -> Self {
573        if ref_name == MAIN_BRANCH {
574            self.metadata.current_snapshot_id = None;
575        }
576
577        if self.metadata.refs.remove(ref_name).is_some() || ref_name == MAIN_BRANCH {
578            self.changes.push(TableUpdate::RemoveSnapshotRef {
579                ref_name: ref_name.to_string(),
580            });
581        }
582
583        self
584    }
585
586    /// Set statistics for a snapshot
587    pub fn set_statistics(mut self, statistics: StatisticsFile) -> Self {
588        self.metadata
589            .statistics
590            .insert(statistics.snapshot_id, statistics.clone());
591        self.changes.push(TableUpdate::SetStatistics {
592            statistics: statistics.clone(),
593        });
594        self
595    }
596
597    /// Remove statistics for a snapshot
598    pub fn remove_statistics(mut self, snapshot_id: i64) -> Self {
599        let previous = self.metadata.statistics.remove(&snapshot_id);
600        if previous.is_some() {
601            self.changes
602                .push(TableUpdate::RemoveStatistics { snapshot_id });
603        }
604        self
605    }
606
607    /// Set partition statistics
608    pub fn set_partition_statistics(
609        mut self,
610        partition_statistics_file: PartitionStatisticsFile,
611    ) -> Self {
612        self.metadata.partition_statistics.insert(
613            partition_statistics_file.snapshot_id,
614            partition_statistics_file.clone(),
615        );
616        self.changes.push(TableUpdate::SetPartitionStatistics {
617            partition_statistics: partition_statistics_file,
618        });
619        self
620    }
621
622    /// Remove partition statistics
623    pub fn remove_partition_statistics(mut self, snapshot_id: i64) -> Self {
624        let previous = self.metadata.partition_statistics.remove(&snapshot_id);
625        if previous.is_some() {
626            self.changes
627                .push(TableUpdate::RemovePartitionStatistics { snapshot_id });
628        }
629        self
630    }
631
632    /// Add a schema to the table metadata.
633    ///
634    /// The provided `schema.schema_id` may not be used.
635    ///
636    /// Important: Use this method with caution. The builder does not check
637    /// if the added schema is compatible with the current schema.
638    pub fn add_schema(mut self, schema: Schema) -> Result<Self> {
639        // Validate that new schema fields don't conflict with existing partition field names
640        self.validate_schema_field_names(&schema)?;
641
642        let new_schema_id = self.reuse_or_create_new_schema_id(&schema);
643        let schema_found = self.metadata.schemas.contains_key(&new_schema_id);
644
645        if schema_found {
646            if self.last_added_schema_id != Some(new_schema_id) {
647                self.changes.push(TableUpdate::AddSchema {
648                    schema: schema.clone(),
649                });
650                self.last_added_schema_id = Some(new_schema_id);
651            }
652
653            return Ok(self);
654        }
655
656        // New schemas might contain only old columns. In this case last_column_id should not be
657        // reduced.
658        self.metadata.last_column_id =
659            std::cmp::max(self.metadata.last_column_id, schema.highest_field_id());
660
661        // Set schema-id
662        let schema = match new_schema_id == schema.schema_id() {
663            true => schema,
664            false => schema.with_schema_id(new_schema_id),
665        };
666
667        self.metadata
668            .schemas
669            .insert(new_schema_id, schema.clone().into());
670
671        self.changes.push(TableUpdate::AddSchema { schema });
672
673        self.last_added_schema_id = Some(new_schema_id);
674
675        Ok(self)
676    }
677
678    /// Set the current schema id.
679    ///
680    /// If `schema_id` is -1, the last added schema is set as the current schema.
681    ///
682    /// Errors:
683    /// - provided `schema_id` is -1 but no schema has been added via `add_schema`.
684    /// - No schema with the provided `schema_id` exists.
685    pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> {
686        if schema_id == Self::LAST_ADDED {
687            schema_id = self.last_added_schema_id.ok_or_else(|| {
688                Error::new(
689                    ErrorKind::DataInvalid,
690                    "Cannot set current schema to last added schema: no schema has been added.",
691                )
692            })?;
693        };
694        let schema_id = schema_id; // Make immutable
695
696        if schema_id == self.metadata.current_schema_id {
697            return Ok(self);
698        }
699
700        let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| {
701            Error::new(
702                ErrorKind::DataInvalid,
703                format!("Cannot set current schema to unknown schema with id: '{schema_id}'"),
704            )
705        })?;
706
707        // Old partition specs and sort-orders should be preserved even if they are not compatible with the new schema,
708        // so that older metadata can still be interpreted.
709        // Default partition spec and sort order are checked in the build() method
710        // which allows other default partition specs and sort orders to be set before the build.
711
712        self.metadata.current_schema_id = schema_id;
713
714        if self.last_added_schema_id == Some(schema_id) {
715            self.changes.push(TableUpdate::SetCurrentSchema {
716                schema_id: Self::LAST_ADDED,
717            });
718        } else {
719            self.changes
720                .push(TableUpdate::SetCurrentSchema { schema_id });
721        }
722
723        Ok(self)
724    }
725
726    /// Add a schema and set it as the current schema.
727    pub fn add_current_schema(self, schema: Schema) -> Result<Self> {
728        self.add_schema(schema)?
729            .set_current_schema(Self::LAST_ADDED)
730    }
731
732    /// Validate schema field names against partition field names across all historical schemas.
733    ///
734    /// Due to Iceberg's multi-version property, this check ignores existing schema fields
735    /// that match partition names (schema evolution allows re-adding previously removed fields).
736    /// Only NEW field names that conflict with partition names are rejected.
737    ///
738    /// # Errors
739    /// - Schema field name conflicts with partition field name but doesn't exist in any historical schema.
740    fn validate_schema_field_names(&self, schema: &Schema) -> Result<()> {
741        if self.metadata.schemas.is_empty() {
742            return Ok(());
743        }
744
745        for field_name in schema.field_id_to_name_map().values() {
746            let has_partition_conflict = self.metadata.partition_name_exists(field_name);
747            let is_new_field = !self.metadata.name_exists_in_any_schema(field_name);
748
749            if has_partition_conflict && is_new_field {
750                return Err(Error::new(
751                    ErrorKind::DataInvalid,
752                    format!(
753                        "Cannot add schema field '{field_name}' because it conflicts with existing partition field name. \
754                         Schema evolution cannot introduce field names that match existing partition field names."
755                    ),
756                ));
757            }
758        }
759
760        Ok(())
761    }
762
763    /// Validate partition field names against schema field names across all historical schemas.
764    ///
765    /// Due to Iceberg's multi-version property, partition fields can share names with schema fields
766    /// if they meet specific requirements (identity transform + matching source field ID).
767    /// This validation enforces those rules across all historical schema versions.
768    ///
769    /// # Errors
770    /// - Partition field name conflicts with schema field name but doesn't use identity transform.
771    /// - Partition field uses identity transform but references wrong source field ID.
772    fn validate_partition_field_names(&self, unbound_spec: &UnboundPartitionSpec) -> Result<()> {
773        if self.metadata.schemas.is_empty() {
774            return Ok(());
775        }
776
777        let current_schema = self.get_current_schema()?;
778        for partition_field in unbound_spec.fields() {
779            let exists_in_any_schema = self
780                .metadata
781                .name_exists_in_any_schema(&partition_field.name);
782
783            // Skip if partition field name doesn't conflict with any schema field
784            if !exists_in_any_schema {
785                continue;
786            }
787
788            // If name exists in schemas, validate against current schema rules
789            if let Some(schema_field) = current_schema.field_by_name(&partition_field.name) {
790                let is_identity_transform =
791                    partition_field.transform == crate::spec::Transform::Identity;
792                let has_matching_source_id = schema_field.id == partition_field.source_id;
793
794                if !is_identity_transform {
795                    return Err(Error::new(
796                        ErrorKind::DataInvalid,
797                        format!(
798                            "Cannot create partition with name '{}' that conflicts with schema field and is not an identity transform.",
799                            partition_field.name
800                        ),
801                    ));
802                }
803
804                if !has_matching_source_id {
805                    return Err(Error::new(
806                        ErrorKind::DataInvalid,
807                        format!(
808                            "Cannot create identity partition sourced from different field in schema. \
809                             Field name '{}' has id `{}` in schema but partition source id is `{}`",
810                            partition_field.name, schema_field.id, partition_field.source_id
811                        ),
812                    ));
813                }
814            }
815        }
816
817        Ok(())
818    }
819
820    /// Add a partition spec to the table metadata.
821    ///
822    /// The spec is bound eagerly to the current schema.
823    /// If a schema is added in the same set of changes, the schema should be added first.
824    ///
825    /// Even if `unbound_spec.spec_id` is provided as `Some`, it may not be used.
826    ///
827    /// # Errors
828    /// - The partition spec cannot be bound to the current schema.
829    /// - The partition spec has non-sequential field ids and the table format version is 1.
830    pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
831        let schema = self.get_current_schema()?.clone();
832
833        // Check if partition field names conflict with schema field names across all schemas
834        self.validate_partition_field_names(&unbound_spec)?;
835
836        let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)?
837            .with_last_assigned_field_id(self.metadata.last_partition_id)
838            .build()?;
839
840        let new_spec_id = self.reuse_or_create_new_spec_id(&spec);
841        let spec_found = self.metadata.partition_specs.contains_key(&new_spec_id);
842        let spec = spec.with_spec_id(new_spec_id);
843        let unbound_spec = unbound_spec.with_spec_id(new_spec_id);
844
845        if spec_found {
846            if self.last_added_spec_id != Some(new_spec_id) {
847                self.changes
848                    .push(TableUpdate::AddSpec { spec: unbound_spec });
849                self.last_added_spec_id = Some(new_spec_id);
850            }
851
852            return Ok(self);
853        }
854
855        if self.metadata.format_version <= FormatVersion::V1 && !spec.has_sequential_ids() {
856            return Err(Error::new(
857                ErrorKind::DataInvalid,
858                "Cannot add partition spec with non-sequential field ids to format version 1 table",
859            ));
860        }
861
862        let highest_field_id = spec
863            .highest_field_id()
864            .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID);
865        self.metadata
866            .partition_specs
867            .insert(new_spec_id, Arc::new(spec));
868        self.changes
869            .push(TableUpdate::AddSpec { spec: unbound_spec });
870
871        self.last_added_spec_id = Some(new_spec_id);
872        self.metadata.last_partition_id =
873            std::cmp::max(self.metadata.last_partition_id, highest_field_id);
874
875        Ok(self)
876    }
877
878    /// Set the default partition spec.
879    ///
880    /// # Errors
881    /// - spec_id is -1 but no spec has been added via `add_partition_spec`.
882    /// - No partition spec with the provided `spec_id` exists.
883    pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> Result<Self> {
884        if spec_id == Self::LAST_ADDED {
885            spec_id = self.last_added_spec_id.ok_or_else(|| {
886                Error::new(
887                    ErrorKind::DataInvalid,
888                    "Cannot set default partition spec to last added spec: no spec has been added.",
889                )
890            })?;
891        }
892
893        if self.metadata.default_spec.spec_id() == spec_id {
894            return Ok(self);
895        }
896
897        if !self.metadata.partition_specs.contains_key(&spec_id) {
898            return Err(Error::new(
899                ErrorKind::DataInvalid,
900                format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",),
901            ));
902        }
903
904        let schemaless_spec = self
905            .metadata
906            .partition_specs
907            .get(&spec_id)
908            .ok_or_else(|| {
909                Error::new(
910                    ErrorKind::DataInvalid,
911                    format!(
912                        "Cannot set default partition spec to unknown spec with id: '{spec_id}'",
913                    ),
914                )
915            })?
916            .clone();
917        let spec = Arc::unwrap_or_clone(schemaless_spec);
918        let spec_type = spec.partition_type(self.get_current_schema()?)?;
919        self.metadata.default_spec = Arc::new(spec);
920        self.metadata.default_partition_type = spec_type;
921
922        if self.last_added_spec_id == Some(spec_id) {
923            self.changes.push(TableUpdate::SetDefaultSpec {
924                spec_id: Self::LAST_ADDED,
925            });
926        } else {
927            self.changes.push(TableUpdate::SetDefaultSpec { spec_id });
928        }
929
930        Ok(self)
931    }
932
933    /// Add a partition spec and set it as the default
934    pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
935        self.add_partition_spec(unbound_spec)?
936            .set_default_partition_spec(Self::LAST_ADDED)
937    }
938
939    /// Remove partition specs by their ids from the table metadata.
940    /// Does nothing if a spec id is not present. Active partition specs
941    /// should not be removed.
942    ///
943    /// # Errors
944    /// - Cannot remove the default partition spec.
945    pub fn remove_partition_specs(mut self, spec_ids: &[i32]) -> Result<Self> {
946        if spec_ids.contains(&self.metadata.default_spec.spec_id()) {
947            return Err(Error::new(
948                ErrorKind::DataInvalid,
949                "Cannot remove default partition spec",
950            ));
951        }
952
953        let mut removed_specs = Vec::with_capacity(spec_ids.len());
954        spec_ids.iter().for_each(|id| {
955            if self.metadata.partition_specs.remove(id).is_some() {
956                removed_specs.push(*id);
957            }
958        });
959
960        if !removed_specs.is_empty() {
961            self.changes.push(TableUpdate::RemovePartitionSpecs {
962                spec_ids: removed_specs,
963            });
964        }
965
966        Ok(self)
967    }
968
969    /// Add a sort order to the table metadata.
970    ///
971    /// The spec is bound eagerly to the current schema and must be valid for it.
972    /// If a schema is added in the same set of changes, the schema should be added first.
973    ///
974    /// Even if `sort_order.order_id` is provided, it may not be used.
975    ///
976    /// # Errors
977    /// - Sort order id to add already exists.
978    /// - Sort order is incompatible with the current schema.
979    pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result<Self> {
980        let new_order_id = self.reuse_or_create_new_sort_id(&sort_order);
981        let sort_order_found = self.metadata.sort_orders.contains_key(&new_order_id);
982
983        if sort_order_found {
984            if self.last_added_order_id != Some(new_order_id) {
985                self.changes.push(TableUpdate::AddSortOrder {
986                    sort_order: sort_order.clone().with_order_id(new_order_id),
987                });
988                self.last_added_order_id = Some(new_order_id);
989            }
990
991            return Ok(self);
992        }
993
994        let schema = self.get_current_schema()?.clone().as_ref().clone();
995        let sort_order = SortOrder::builder()
996            .with_order_id(new_order_id)
997            .with_fields(sort_order.fields)
998            .build(&schema)
999            .map_err(|e| {
1000                Error::new(
1001                    ErrorKind::DataInvalid,
1002                    format!("Sort order to add is incompatible with current schema: {e}"),
1003                )
1004                .with_source(e)
1005            })?;
1006
1007        self.last_added_order_id = Some(new_order_id);
1008        self.metadata
1009            .sort_orders
1010            .insert(new_order_id, sort_order.clone().into());
1011        self.changes.push(TableUpdate::AddSortOrder { sort_order });
1012
1013        Ok(self)
1014    }
1015
1016    /// Set the default sort order. If `sort_order_id` is -1, the last added sort order is set as default.
1017    ///
1018    /// # Errors
1019    /// - sort_order_id is -1 but no sort order has been added via `add_sort_order`.
1020    /// - No sort order with the provided `sort_order_id` exists.
1021    pub fn set_default_sort_order(mut self, mut sort_order_id: i64) -> Result<Self> {
1022        if sort_order_id == Self::LAST_ADDED as i64 {
1023            sort_order_id = self.last_added_order_id.ok_or_else(|| {
1024                Error::new(
1025                    ErrorKind::DataInvalid,
1026                    "Cannot set default sort order to last added order: no order has been added.",
1027                )
1028            })?;
1029        }
1030
1031        if self.metadata.default_sort_order_id == sort_order_id {
1032            return Ok(self);
1033        }
1034
1035        if !self.metadata.sort_orders.contains_key(&sort_order_id) {
1036            return Err(Error::new(
1037                ErrorKind::DataInvalid,
1038                format!(
1039                    "Cannot set default sort order to unknown order with id: '{sort_order_id}'"
1040                ),
1041            ));
1042        }
1043
1044        self.metadata.default_sort_order_id = sort_order_id;
1045
1046        if self.last_added_order_id == Some(sort_order_id) {
1047            self.changes.push(TableUpdate::SetDefaultSortOrder {
1048                sort_order_id: Self::LAST_ADDED as i64,
1049            });
1050        } else {
1051            self.changes
1052                .push(TableUpdate::SetDefaultSortOrder { sort_order_id });
1053        }
1054
1055        Ok(self)
1056    }
1057
1058    /// Add a sort order and set it as the default
1059    fn add_default_sort_order(self, sort_order: SortOrder) -> Result<Self> {
1060        self.add_sort_order(sort_order)?
1061            .set_default_sort_order(Self::LAST_ADDED as i64)
1062    }
1063
1064    /// Add an encryption key to the table metadata.
1065    pub fn add_encryption_key(mut self, key: EncryptedKey) -> Self {
1066        let key_id = key.key_id().to_string();
1067        if self.metadata.encryption_keys.contains_key(&key_id) {
1068            // already exists
1069            return self;
1070        }
1071
1072        self.metadata.encryption_keys.insert(key_id, key.clone());
1073        self.changes.push(TableUpdate::AddEncryptionKey {
1074            encryption_key: key,
1075        });
1076        self
1077    }
1078
1079    /// Remove an encryption key from the table metadata.
1080    pub fn remove_encryption_key(mut self, key_id: &str) -> Self {
1081        if self.metadata.encryption_keys.remove(key_id).is_some() {
1082            self.changes.push(TableUpdate::RemoveEncryptionKey {
1083                key_id: key_id.to_string(),
1084            });
1085        }
1086        self
1087    }
1088
1089    /// Build the table metadata.
1090    pub fn build(mut self) -> Result<TableMetadataBuildResult> {
1091        self.metadata.last_updated_ms = self
1092            .last_updated_ms
1093            .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1094
1095        // Check compatibility of the current schema to the default partition spec and sort order.
1096        // We use the `get_xxx` methods from the builder to avoid using the panicking
1097        // `TableMetadata.default_partition_spec` etc. methods.
1098        let schema = self.get_current_schema()?.clone();
1099        let sort_order = Arc::unwrap_or_clone(self.get_default_sort_order()?);
1100
1101        self.metadata.default_spec = Arc::new(
1102            Arc::unwrap_or_clone(self.metadata.default_spec)
1103                .into_unbound()
1104                .bind(schema.clone())?,
1105        );
1106        self.metadata.default_partition_type =
1107            self.metadata.default_spec.partition_type(&schema)?;
1108        SortOrder::builder()
1109            .with_fields(sort_order.fields)
1110            .build(&schema)?;
1111
1112        self.update_snapshot_log()?;
1113        self.metadata.try_normalize()?;
1114
1115        if let Some(hist_entry) = self.previous_history_entry.take() {
1116            self.metadata.metadata_log.push(hist_entry);
1117        }
1118        let expired_metadata_logs = self.expire_metadata_log();
1119
1120        Ok(TableMetadataBuildResult {
1121            metadata: self.metadata,
1122            changes: self.changes,
1123            expired_metadata_logs,
1124        })
1125    }
1126
1127    fn expire_metadata_log(&mut self) -> Vec<MetadataLog> {
1128        let max_size = self
1129            .metadata
1130            .properties
1131            .get(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
1132            .and_then(|v| v.parse::<usize>().ok())
1133            .unwrap_or(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
1134            .max(1);
1135
1136        if self.metadata.metadata_log.len() > max_size {
1137            self.metadata
1138                .metadata_log
1139                .drain(0..self.metadata.metadata_log.len() - max_size)
1140                .collect()
1141        } else {
1142            Vec::new()
1143        }
1144    }
1145
1146    fn update_snapshot_log(&mut self) -> Result<()> {
1147        let intermediate_snapshots = self.get_intermediate_snapshots();
1148        let has_removed_snapshots = self
1149            .changes
1150            .iter()
1151            .any(|update| matches!(update, TableUpdate::RemoveSnapshots { .. }));
1152
1153        if intermediate_snapshots.is_empty() && !has_removed_snapshots {
1154            return Ok(());
1155        }
1156
1157        let mut new_snapshot_log = Vec::new();
1158        for log_entry in &self.metadata.snapshot_log {
1159            let snapshot_id = log_entry.snapshot_id;
1160            if self.metadata.snapshots.contains_key(&snapshot_id) {
1161                if !intermediate_snapshots.contains(&snapshot_id) {
1162                    new_snapshot_log.push(log_entry.clone());
1163                }
1164            } else if has_removed_snapshots {
1165                // any invalid entry causes the history before it to be removed. otherwise, there could be
1166                // history gaps that cause time-travel queries to produce incorrect results. for example,
1167                // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be
1168                // [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2
1169                // and t3 when in fact s2 was the current snapshot.
1170                new_snapshot_log.clear();
1171            }
1172        }
1173
1174        if let Some(current_snapshot_id) = self.metadata.current_snapshot_id {
1175            let last_id = new_snapshot_log.last().map(|entry| entry.snapshot_id);
1176            if last_id != Some(current_snapshot_id) {
1177                return Err(Error::new(
1178                    ErrorKind::DataInvalid,
1179                    "Cannot set invalid snapshot log: latest entry is not the current snapshot",
1180                ));
1181            }
1182        };
1183
1184        self.metadata.snapshot_log = new_snapshot_log;
1185        Ok(())
1186    }
1187
1188    /// Finds intermediate snapshots that have not been committed as the current snapshot.
1189    ///
1190    /// Transactions can create snapshots that are never the current snapshot because several
1191    /// changes are combined by the transaction into one table metadata update. when each
1192    /// intermediate snapshot is added to table metadata, it is added to the snapshot log, assuming
1193    /// that it will be the current snapshot. when there are multiple snapshot updates, the log must
1194    /// be corrected by suppressing the intermediate snapshot entries.
1195    ///     
1196    /// A snapshot is an intermediate snapshot if it was added but is not the current snapshot.
1197    fn get_intermediate_snapshots(&self) -> HashSet<i64> {
1198        let added_snapshot_ids = self
1199            .changes
1200            .iter()
1201            .filter_map(|update| match update {
1202                TableUpdate::AddSnapshot { snapshot } => Some(snapshot.snapshot_id()),
1203                _ => None,
1204            })
1205            .collect::<HashSet<_>>();
1206
1207        self.changes
1208            .iter()
1209            .filter_map(|update| match update {
1210                TableUpdate::SetSnapshotRef {
1211                    ref_name,
1212                    reference,
1213                } => {
1214                    if added_snapshot_ids.contains(&reference.snapshot_id)
1215                        && ref_name == MAIN_BRANCH
1216                        && reference.snapshot_id
1217                            != self
1218                                .metadata
1219                                .current_snapshot_id
1220                                .unwrap_or(i64::from(Self::LAST_ADDED))
1221                    {
1222                        Some(reference.snapshot_id)
1223                    } else {
1224                        None
1225                    }
1226                }
1227                _ => None,
1228            })
1229            .collect()
1230    }
1231
1232    fn reassign_ids(
1233        schema: Schema,
1234        spec: UnboundPartitionSpec,
1235        sort_order: SortOrder,
1236    ) -> Result<(Schema, PartitionSpec, SortOrder)> {
1237        // Re-assign field ids and schema ids for a new table.
1238        let previous_id_to_name = schema.field_id_to_name_map().clone();
1239        let fresh_schema = schema
1240            .into_builder()
1241            .with_schema_id(DEFAULT_SCHEMA_ID)
1242            .with_reassigned_field_ids(FIRST_FIELD_ID)
1243            .build()?;
1244
1245        // Re-build partition spec with new ids
1246        let mut fresh_spec = PartitionSpecBuilder::new(fresh_schema.clone());
1247        for field in spec.fields() {
1248            let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1249                Error::new(
1250                    ErrorKind::DataInvalid,
1251                    format!(
1252                        "Cannot find source column with id {} for partition column {} in schema.",
1253                        field.source_id, field.name
1254                    ),
1255                )
1256            })?;
1257            fresh_spec =
1258                fresh_spec.add_partition_field(source_field_name, &field.name, field.transform)?;
1259        }
1260        let fresh_spec = fresh_spec.build()?;
1261
1262        // Re-build sort order with new ids
1263        let mut fresh_order = SortOrder::builder();
1264        for mut field in sort_order.fields {
1265            let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1266                Error::new(
1267                    ErrorKind::DataInvalid,
1268                    format!(
1269                        "Cannot find source column with id {} for sort column in schema.",
1270                        field.source_id
1271                    ),
1272                )
1273            })?;
1274            let new_field_id = fresh_schema
1275                       .field_by_name(source_field_name)
1276                       .ok_or_else(|| {
1277                           Error::new(
1278                               ErrorKind::Unexpected,
1279                               format!(
1280                                   "Cannot find source column with name {source_field_name} for sort column in re-assigned schema."
1281                               ),
1282                           )
1283                       })?.id;
1284            field.source_id = new_field_id;
1285            fresh_order.with_sort_field(field);
1286        }
1287        let fresh_sort_order = fresh_order.build(&fresh_schema)?;
1288
1289        Ok((fresh_schema, fresh_spec, fresh_sort_order))
1290    }
1291
1292    fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 {
1293        self.metadata
1294            .schemas
1295            .iter()
1296            .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
1297            .unwrap_or_else(|| self.get_highest_schema_id() + 1)
1298    }
1299
1300    fn get_highest_schema_id(&self) -> i32 {
1301        *self
1302            .metadata
1303            .schemas
1304            .keys()
1305            .max()
1306            .unwrap_or(&self.metadata.current_schema_id)
1307    }
1308
1309    fn get_current_schema(&self) -> Result<&SchemaRef> {
1310        self.metadata
1311            .schemas
1312            .get(&self.metadata.current_schema_id)
1313            .ok_or_else(|| {
1314                Error::new(
1315                    ErrorKind::DataInvalid,
1316                    format!(
1317                        "Current schema with id '{}' not found in table metadata.",
1318                        self.metadata.current_schema_id
1319                    ),
1320                )
1321            })
1322    }
1323
1324    fn get_default_sort_order(&self) -> Result<SortOrderRef> {
1325        self.metadata
1326            .sort_orders
1327            .get(&self.metadata.default_sort_order_id)
1328            .cloned()
1329            .ok_or_else(|| {
1330                Error::new(
1331                    ErrorKind::DataInvalid,
1332                    format!(
1333                        "Default sort order with id '{}' not found in table metadata.",
1334                        self.metadata.default_sort_order_id
1335                    ),
1336                )
1337            })
1338    }
1339
1340    /// If a compatible spec already exists, use the same ID. Otherwise, use 1 more than the highest ID.
1341    fn reuse_or_create_new_spec_id(&self, new_spec: &PartitionSpec) -> i32 {
1342        self.metadata
1343            .partition_specs
1344            .iter()
1345            .find_map(|(id, old_spec)| new_spec.is_compatible_with(old_spec).then_some(*id))
1346            .unwrap_or_else(|| {
1347                self.get_highest_spec_id()
1348                    .map(|id| id + 1)
1349                    .unwrap_or(DEFAULT_PARTITION_SPEC_ID)
1350            })
1351    }
1352
1353    fn get_highest_spec_id(&self) -> Option<i32> {
1354        self.metadata.partition_specs.keys().max().copied()
1355    }
1356
1357    /// If a compatible sort-order already exists, use the same ID. Otherwise, use 1 more than the highest ID.
1358    fn reuse_or_create_new_sort_id(&self, new_sort_order: &SortOrder) -> i64 {
1359        if new_sort_order.is_unsorted() {
1360            return SortOrder::unsorted_order().order_id;
1361        }
1362
1363        self.metadata
1364            .sort_orders
1365            .iter()
1366            .find_map(|(id, sort_order)| {
1367                sort_order.fields.eq(&new_sort_order.fields).then_some(*id)
1368            })
1369            .unwrap_or_else(|| {
1370                self.highest_sort_order_id()
1371                    .unwrap_or(SortOrder::unsorted_order().order_id)
1372                    + 1
1373            })
1374    }
1375
1376    fn highest_sort_order_id(&self) -> Option<i64> {
1377        self.metadata.sort_orders.keys().max().copied()
1378    }
1379
1380    /// Remove schemas by their ids from the table metadata.
1381    /// Does nothing if a schema id is not present. Active schemas should not be removed.
1382    pub fn remove_schemas(mut self, schema_id_to_remove: &[i32]) -> Result<Self> {
1383        if schema_id_to_remove.contains(&self.metadata.current_schema_id) {
1384            return Err(Error::new(
1385                ErrorKind::DataInvalid,
1386                "Cannot remove current schema",
1387            ));
1388        }
1389
1390        if schema_id_to_remove.is_empty() {
1391            return Ok(self);
1392        }
1393
1394        let mut removed_schemas = Vec::with_capacity(schema_id_to_remove.len());
1395        self.metadata.schemas.retain(|id, _schema| {
1396            if schema_id_to_remove.contains(id) {
1397                removed_schemas.push(*id);
1398                false
1399            } else {
1400                true
1401            }
1402        });
1403
1404        self.changes.push(TableUpdate::RemoveSchemas {
1405            schema_ids: removed_schemas,
1406        });
1407
1408        Ok(self)
1409    }
1410}
1411
1412impl From<TableMetadataBuildResult> for TableMetadata {
1413    fn from(result: TableMetadataBuildResult) -> Self {
1414        result.metadata
1415    }
1416}
1417
1418#[cfg(test)]
1419mod tests {
1420    use std::fs::File;
1421    use std::io::BufReader;
1422    use std::thread::sleep;
1423
1424    use super::*;
1425    use crate::TableIdent;
1426    use crate::io::FileIOBuilder;
1427    use crate::spec::{
1428        BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema,
1429        SnapshotRetention, SortDirection, SortField, StructType, Summary, TableProperties,
1430        Transform, Type, UnboundPartitionField,
1431    };
1432    use crate::table::Table;
1433
1434    const TEST_LOCATION: &str = "s3://bucket/test/location";
1435    const LAST_ASSIGNED_COLUMN_ID: i32 = 3;
1436
1437    fn schema() -> Schema {
1438        Schema::builder()
1439            .with_fields(vec![
1440                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1441                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1442                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1443            ])
1444            .build()
1445            .unwrap()
1446    }
1447
1448    fn sort_order() -> SortOrder {
1449        let schema = schema();
1450        SortOrder::builder()
1451            .with_order_id(1)
1452            .with_sort_field(SortField {
1453                source_id: 3,
1454                transform: Transform::Bucket(4),
1455                direction: SortDirection::Descending,
1456                null_order: NullOrder::First,
1457            })
1458            .build(&schema)
1459            .unwrap()
1460    }
1461
1462    fn partition_spec() -> UnboundPartitionSpec {
1463        UnboundPartitionSpec::builder()
1464            .with_spec_id(0)
1465            .add_partition_field(2, "y", Transform::Identity)
1466            .unwrap()
1467            .build()
1468    }
1469
1470    fn builder_without_changes(format_version: FormatVersion) -> TableMetadataBuilder {
1471        TableMetadataBuilder::new(
1472            schema(),
1473            partition_spec(),
1474            sort_order(),
1475            TEST_LOCATION.to_string(),
1476            format_version,
1477            HashMap::new(),
1478        )
1479        .unwrap()
1480        .build()
1481        .unwrap()
1482        .metadata
1483        .into_builder(Some(
1484            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1485        ))
1486    }
1487
1488    #[test]
1489    fn test_minimal_build() {
1490        let metadata = TableMetadataBuilder::new(
1491            schema(),
1492            partition_spec(),
1493            sort_order(),
1494            TEST_LOCATION.to_string(),
1495            FormatVersion::V1,
1496            HashMap::new(),
1497        )
1498        .unwrap()
1499        .build()
1500        .unwrap()
1501        .metadata;
1502
1503        assert_eq!(metadata.format_version, FormatVersion::V1);
1504        assert_eq!(metadata.location, TEST_LOCATION);
1505        assert_eq!(metadata.current_schema_id, 0);
1506        assert_eq!(metadata.default_spec.spec_id(), 0);
1507        assert_eq!(metadata.default_sort_order_id, 1);
1508        assert_eq!(metadata.last_partition_id, 1000);
1509        assert_eq!(metadata.last_column_id, 3);
1510        assert_eq!(metadata.snapshots.len(), 0);
1511        assert_eq!(metadata.current_snapshot_id, None);
1512        assert_eq!(metadata.refs.len(), 0);
1513        assert_eq!(metadata.properties.len(), 0);
1514        assert_eq!(metadata.metadata_log.len(), 0);
1515        assert_eq!(metadata.last_sequence_number, 0);
1516        assert_eq!(metadata.last_column_id, LAST_ASSIGNED_COLUMN_ID);
1517
1518        // Test can serialize v1
1519        let _ = serde_json::to_string(&metadata).unwrap();
1520
1521        // Test can serialize v2
1522        let metadata = metadata
1523            .into_builder(Some(
1524                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1525            ))
1526            .upgrade_format_version(FormatVersion::V2)
1527            .unwrap()
1528            .build()
1529            .unwrap()
1530            .metadata;
1531
1532        assert_eq!(metadata.format_version, FormatVersion::V2);
1533        let _ = serde_json::to_string(&metadata).unwrap();
1534    }
1535
1536    #[test]
1537    fn test_build_unpartitioned_unsorted() {
1538        let schema = Schema::builder().build().unwrap();
1539        let metadata = TableMetadataBuilder::new(
1540            schema.clone(),
1541            PartitionSpec::unpartition_spec(),
1542            SortOrder::unsorted_order(),
1543            TEST_LOCATION.to_string(),
1544            FormatVersion::V2,
1545            HashMap::new(),
1546        )
1547        .unwrap()
1548        .build()
1549        .unwrap()
1550        .metadata;
1551
1552        assert_eq!(metadata.format_version, FormatVersion::V2);
1553        assert_eq!(metadata.location, TEST_LOCATION);
1554        assert_eq!(metadata.current_schema_id, 0);
1555        assert_eq!(metadata.default_spec.spec_id(), 0);
1556        assert_eq!(metadata.default_sort_order_id, 0);
1557        assert_eq!(metadata.last_partition_id, UNPARTITIONED_LAST_ASSIGNED_ID);
1558        assert_eq!(metadata.last_column_id, 0);
1559        assert_eq!(metadata.snapshots.len(), 0);
1560        assert_eq!(metadata.current_snapshot_id, None);
1561        assert_eq!(metadata.refs.len(), 0);
1562        assert_eq!(metadata.properties.len(), 0);
1563        assert_eq!(metadata.metadata_log.len(), 0);
1564        assert_eq!(metadata.last_sequence_number, 0);
1565    }
1566
1567    #[test]
1568    fn test_reassigns_ids() {
1569        let schema = Schema::builder()
1570            .with_schema_id(10)
1571            .with_fields(vec![
1572                NestedField::required(11, "a", Type::Primitive(PrimitiveType::Long)).into(),
1573                NestedField::required(12, "b", Type::Primitive(PrimitiveType::Long)).into(),
1574                NestedField::required(
1575                    13,
1576                    "struct",
1577                    Type::Struct(StructType::new(vec![
1578                        NestedField::required(14, "nested", Type::Primitive(PrimitiveType::Long))
1579                            .into(),
1580                    ])),
1581                )
1582                .into(),
1583                NestedField::required(15, "c", Type::Primitive(PrimitiveType::Long)).into(),
1584            ])
1585            .build()
1586            .unwrap();
1587        let spec = PartitionSpec::builder(schema.clone())
1588            .with_spec_id(20)
1589            .add_partition_field("a", "a", Transform::Identity)
1590            .unwrap()
1591            .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1592            .unwrap()
1593            .build()
1594            .unwrap();
1595        let sort_order = SortOrder::builder()
1596            .with_fields(vec![SortField {
1597                source_id: 11,
1598                transform: Transform::Identity,
1599                direction: SortDirection::Ascending,
1600                null_order: NullOrder::First,
1601            }])
1602            .with_order_id(10)
1603            .build(&schema)
1604            .unwrap();
1605
1606        let (fresh_schema, fresh_spec, fresh_sort_order) =
1607            TableMetadataBuilder::reassign_ids(schema, spec.into_unbound(), sort_order).unwrap();
1608
1609        let expected_schema = Schema::builder()
1610            .with_fields(vec![
1611                NestedField::required(1, "a", Type::Primitive(PrimitiveType::Long)).into(),
1612                NestedField::required(2, "b", Type::Primitive(PrimitiveType::Long)).into(),
1613                NestedField::required(
1614                    3,
1615                    "struct",
1616                    Type::Struct(StructType::new(vec![
1617                        NestedField::required(5, "nested", Type::Primitive(PrimitiveType::Long))
1618                            .into(),
1619                    ])),
1620                )
1621                .into(),
1622                NestedField::required(4, "c", Type::Primitive(PrimitiveType::Long)).into(),
1623            ])
1624            .build()
1625            .unwrap();
1626
1627        let expected_spec = PartitionSpec::builder(expected_schema.clone())
1628            .with_spec_id(0)
1629            .add_partition_field("a", "a", Transform::Identity)
1630            .unwrap()
1631            .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1632            .unwrap()
1633            .build()
1634            .unwrap();
1635
1636        let expected_sort_order = SortOrder::builder()
1637            .with_fields(vec![SortField {
1638                source_id: 1,
1639                transform: Transform::Identity,
1640                direction: SortDirection::Ascending,
1641                null_order: NullOrder::First,
1642            }])
1643            .with_order_id(1)
1644            .build(&expected_schema)
1645            .unwrap();
1646
1647        assert_eq!(fresh_schema, expected_schema);
1648        assert_eq!(fresh_spec, expected_spec);
1649        assert_eq!(fresh_sort_order, expected_sort_order);
1650    }
1651
1652    #[test]
1653    fn test_ids_are_reassigned_for_new_metadata() {
1654        let schema = schema().into_builder().with_schema_id(10).build().unwrap();
1655
1656        let metadata = TableMetadataBuilder::new(
1657            schema,
1658            partition_spec(),
1659            sort_order(),
1660            TEST_LOCATION.to_string(),
1661            FormatVersion::V1,
1662            HashMap::new(),
1663        )
1664        .unwrap()
1665        .build()
1666        .unwrap()
1667        .metadata;
1668
1669        assert_eq!(metadata.current_schema_id, 0);
1670        assert_eq!(metadata.current_schema().schema_id(), 0);
1671    }
1672
1673    #[test]
1674    fn test_new_metadata_changes() {
1675        let changes = TableMetadataBuilder::new(
1676            schema(),
1677            partition_spec(),
1678            sort_order(),
1679            TEST_LOCATION.to_string(),
1680            FormatVersion::V1,
1681            HashMap::from_iter(vec![("property 1".to_string(), "value 1".to_string())]),
1682        )
1683        .unwrap()
1684        .build()
1685        .unwrap()
1686        .changes;
1687
1688        pretty_assertions::assert_eq!(changes, vec![
1689            TableUpdate::SetLocation {
1690                location: TEST_LOCATION.to_string()
1691            },
1692            TableUpdate::AddSchema { schema: schema() },
1693            TableUpdate::SetCurrentSchema { schema_id: -1 },
1694            TableUpdate::AddSpec {
1695                // Because this is a new tables, field-ids are assigned
1696                // partition_spec() has None set for field-id
1697                spec: PartitionSpec::builder(schema())
1698                    .with_spec_id(0)
1699                    .add_unbound_field(UnboundPartitionField {
1700                        name: "y".to_string(),
1701                        transform: Transform::Identity,
1702                        source_id: 2,
1703                        field_id: Some(1000)
1704                    })
1705                    .unwrap()
1706                    .build()
1707                    .unwrap()
1708                    .into_unbound(),
1709            },
1710            TableUpdate::SetDefaultSpec { spec_id: -1 },
1711            TableUpdate::AddSortOrder {
1712                sort_order: sort_order(),
1713            },
1714            TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1715            TableUpdate::SetProperties {
1716                updates: HashMap::from_iter(vec![(
1717                    "property 1".to_string(),
1718                    "value 1".to_string()
1719                )]),
1720            }
1721        ]);
1722    }
1723
1724    #[test]
1725    fn test_new_metadata_changes_unpartitioned_unsorted() {
1726        let schema = Schema::builder().build().unwrap();
1727        let changes = TableMetadataBuilder::new(
1728            schema.clone(),
1729            PartitionSpec::unpartition_spec().into_unbound(),
1730            SortOrder::unsorted_order(),
1731            TEST_LOCATION.to_string(),
1732            FormatVersion::V1,
1733            HashMap::new(),
1734        )
1735        .unwrap()
1736        .build()
1737        .unwrap()
1738        .changes;
1739
1740        pretty_assertions::assert_eq!(changes, vec![
1741            TableUpdate::SetLocation {
1742                location: TEST_LOCATION.to_string()
1743            },
1744            TableUpdate::AddSchema {
1745                schema: Schema::builder().build().unwrap(),
1746            },
1747            TableUpdate::SetCurrentSchema { schema_id: -1 },
1748            TableUpdate::AddSpec {
1749                // Because this is a new tables, field-ids are assigned
1750                // partition_spec() has None set for field-id
1751                spec: PartitionSpec::builder(schema)
1752                    .with_spec_id(0)
1753                    .build()
1754                    .unwrap()
1755                    .into_unbound(),
1756            },
1757            TableUpdate::SetDefaultSpec { spec_id: -1 },
1758            TableUpdate::AddSortOrder {
1759                sort_order: SortOrder::unsorted_order(),
1760            },
1761            TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1762        ]);
1763    }
1764
1765    #[test]
1766    fn test_add_partition_spec() {
1767        let builder = builder_without_changes(FormatVersion::V2);
1768
1769        let added_spec = UnboundPartitionSpec::builder()
1770            .with_spec_id(10)
1771            .add_partition_fields(vec![
1772                UnboundPartitionField {
1773                    // The previous field - has field_id set
1774                    name: "y".to_string(),
1775                    transform: Transform::Identity,
1776                    source_id: 2,
1777                    field_id: Some(1000),
1778                },
1779                UnboundPartitionField {
1780                    // A new field without field id - should still be without field id in changes
1781                    name: "z".to_string(),
1782                    transform: Transform::Identity,
1783                    source_id: 3,
1784                    field_id: None,
1785                },
1786            ])
1787            .unwrap()
1788            .build();
1789
1790        let build_result = builder
1791            .add_partition_spec(added_spec.clone())
1792            .unwrap()
1793            .build()
1794            .unwrap();
1795
1796        // Spec id should be re-assigned
1797        let expected_change = added_spec.with_spec_id(1);
1798        let expected_spec = PartitionSpec::builder(schema())
1799            .with_spec_id(1)
1800            .add_unbound_field(UnboundPartitionField {
1801                name: "y".to_string(),
1802                transform: Transform::Identity,
1803                source_id: 2,
1804                field_id: Some(1000),
1805            })
1806            .unwrap()
1807            .add_unbound_field(UnboundPartitionField {
1808                name: "z".to_string(),
1809                transform: Transform::Identity,
1810                source_id: 3,
1811                field_id: Some(1001),
1812            })
1813            .unwrap()
1814            .build()
1815            .unwrap();
1816
1817        assert_eq!(build_result.changes.len(), 1);
1818        assert_eq!(
1819            build_result.metadata.partition_spec_by_id(1),
1820            Some(&Arc::new(expected_spec))
1821        );
1822        assert_eq!(build_result.metadata.default_spec.spec_id(), 0);
1823        assert_eq!(build_result.metadata.last_partition_id, 1001);
1824        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1825            spec: expected_change
1826        });
1827
1828        // Remove the spec
1829        let build_result = build_result
1830            .metadata
1831            .into_builder(Some(
1832                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1833            ))
1834            .remove_partition_specs(&[1])
1835            .unwrap()
1836            .build()
1837            .unwrap();
1838
1839        assert_eq!(build_result.changes.len(), 1);
1840        assert_eq!(build_result.metadata.partition_specs.len(), 1);
1841        assert!(build_result.metadata.partition_spec_by_id(1).is_none());
1842    }
1843
1844    #[test]
1845    fn test_set_default_partition_spec() {
1846        let builder = builder_without_changes(FormatVersion::V2);
1847        let schema = builder.get_current_schema().unwrap().clone();
1848        let added_spec = UnboundPartitionSpec::builder()
1849            .with_spec_id(10)
1850            .add_partition_field(1, "y_bucket[2]", Transform::Bucket(2))
1851            .unwrap()
1852            .build();
1853
1854        let build_result = builder
1855            .add_partition_spec(added_spec.clone())
1856            .unwrap()
1857            .set_default_partition_spec(-1)
1858            .unwrap()
1859            .build()
1860            .unwrap();
1861
1862        let expected_spec = PartitionSpec::builder(schema)
1863            .with_spec_id(1)
1864            .add_unbound_field(UnboundPartitionField {
1865                name: "y_bucket[2]".to_string(),
1866                transform: Transform::Bucket(2),
1867                source_id: 1,
1868                field_id: Some(1001),
1869            })
1870            .unwrap()
1871            .build()
1872            .unwrap();
1873
1874        assert_eq!(build_result.changes.len(), 2);
1875        assert_eq!(build_result.metadata.default_spec, Arc::new(expected_spec));
1876        assert_eq!(build_result.changes, vec![
1877            TableUpdate::AddSpec {
1878                // Should contain the actual ID that was used
1879                spec: added_spec.with_spec_id(1)
1880            },
1881            TableUpdate::SetDefaultSpec { spec_id: -1 }
1882        ]);
1883    }
1884
1885    #[test]
1886    fn test_set_existing_default_partition_spec() {
1887        let builder = builder_without_changes(FormatVersion::V2);
1888        // Add and set an unbound spec as current
1889        let unbound_spec = UnboundPartitionSpec::builder().with_spec_id(1).build();
1890        let build_result = builder
1891            .add_partition_spec(unbound_spec.clone())
1892            .unwrap()
1893            .set_default_partition_spec(-1)
1894            .unwrap()
1895            .build()
1896            .unwrap();
1897
1898        assert_eq!(build_result.changes.len(), 2);
1899        assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1900            spec: unbound_spec.clone()
1901        });
1902        assert_eq!(build_result.changes[1], TableUpdate::SetDefaultSpec {
1903            spec_id: -1
1904        });
1905        assert_eq!(
1906            build_result.metadata.default_spec,
1907            Arc::new(
1908                unbound_spec
1909                    .bind(build_result.metadata.current_schema().clone())
1910                    .unwrap()
1911            )
1912        );
1913
1914        // Set old spec again
1915        let build_result = build_result
1916            .metadata
1917            .into_builder(Some(
1918                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1919            ))
1920            .set_default_partition_spec(0)
1921            .unwrap()
1922            .build()
1923            .unwrap();
1924
1925        assert_eq!(build_result.changes.len(), 1);
1926        assert_eq!(build_result.changes[0], TableUpdate::SetDefaultSpec {
1927            spec_id: 0
1928        });
1929        assert_eq!(
1930            build_result.metadata.default_spec,
1931            Arc::new(
1932                partition_spec()
1933                    .bind(build_result.metadata.current_schema().clone())
1934                    .unwrap()
1935            )
1936        );
1937    }
1938
1939    #[test]
1940    fn test_add_sort_order() {
1941        let builder = builder_without_changes(FormatVersion::V2);
1942
1943        let added_sort_order = SortOrder::builder()
1944            .with_order_id(10)
1945            .with_fields(vec![SortField {
1946                source_id: 1,
1947                transform: Transform::Identity,
1948                direction: SortDirection::Ascending,
1949                null_order: NullOrder::First,
1950            }])
1951            .build(&schema())
1952            .unwrap();
1953
1954        let build_result = builder
1955            .add_sort_order(added_sort_order.clone())
1956            .unwrap()
1957            .build()
1958            .unwrap();
1959
1960        let expected_sort_order = added_sort_order.with_order_id(2);
1961
1962        assert_eq!(build_result.changes.len(), 1);
1963        assert_eq!(build_result.metadata.sort_orders.keys().max(), Some(&2));
1964        pretty_assertions::assert_eq!(
1965            build_result.metadata.sort_order_by_id(2),
1966            Some(&Arc::new(expected_sort_order.clone()))
1967        );
1968        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSortOrder {
1969            sort_order: expected_sort_order
1970        });
1971    }
1972
1973    #[test]
1974    fn test_add_compatible_schema() {
1975        let builder = builder_without_changes(FormatVersion::V2);
1976
1977        let added_schema = Schema::builder()
1978            .with_schema_id(1)
1979            .with_fields(vec![
1980                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1981                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1982                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1983                NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
1984            ])
1985            .build()
1986            .unwrap();
1987
1988        let build_result = builder
1989            .add_current_schema(added_schema.clone())
1990            .unwrap()
1991            .build()
1992            .unwrap();
1993
1994        assert_eq!(build_result.changes.len(), 2);
1995        assert_eq!(build_result.metadata.schemas.keys().max(), Some(&1));
1996        pretty_assertions::assert_eq!(
1997            build_result.metadata.schema_by_id(1),
1998            Some(&Arc::new(added_schema.clone()))
1999        );
2000        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSchema {
2001            schema: added_schema
2002        });
2003        assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2004            schema_id: -1
2005        });
2006    }
2007
2008    #[test]
2009    fn test_set_current_schema_change_is_minus_one_if_schema_was_added_in_this_change() {
2010        let builder = builder_without_changes(FormatVersion::V2);
2011
2012        let added_schema = Schema::builder()
2013            .with_schema_id(1)
2014            .with_fields(vec![
2015                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
2016                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
2017                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
2018                NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
2019            ])
2020            .build()
2021            .unwrap();
2022
2023        let build_result = builder
2024            .add_schema(added_schema.clone())
2025            .unwrap()
2026            .set_current_schema(1)
2027            .unwrap()
2028            .build()
2029            .unwrap();
2030
2031        assert_eq!(build_result.changes.len(), 2);
2032        assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2033            schema_id: -1
2034        });
2035    }
2036
2037    #[test]
2038    fn test_no_metadata_log_for_create_table() {
2039        let build_result = TableMetadataBuilder::new(
2040            schema(),
2041            partition_spec(),
2042            sort_order(),
2043            TEST_LOCATION.to_string(),
2044            FormatVersion::V2,
2045            HashMap::new(),
2046        )
2047        .unwrap()
2048        .build()
2049        .unwrap();
2050
2051        assert_eq!(build_result.metadata.metadata_log.len(), 0);
2052    }
2053
2054    #[test]
2055    fn test_no_metadata_log_entry_for_no_previous_location() {
2056        // Used for first commit after stage-creation of tables
2057        let metadata = builder_without_changes(FormatVersion::V2)
2058            .build()
2059            .unwrap()
2060            .metadata;
2061        assert_eq!(metadata.metadata_log.len(), 1);
2062
2063        let build_result = metadata
2064            .into_builder(None)
2065            .set_properties(HashMap::from_iter(vec![(
2066                "foo".to_string(),
2067                "bar".to_string(),
2068            )]))
2069            .unwrap()
2070            .build()
2071            .unwrap();
2072
2073        assert_eq!(build_result.metadata.metadata_log.len(), 1);
2074    }
2075
2076    #[test]
2077    fn test_from_metadata_generates_metadata_log() {
2078        let metadata_path = "s3://bucket/test/location/metadata/metadata1.json";
2079        let builder = TableMetadataBuilder::new(
2080            schema(),
2081            partition_spec(),
2082            sort_order(),
2083            TEST_LOCATION.to_string(),
2084            FormatVersion::V2,
2085            HashMap::new(),
2086        )
2087        .unwrap()
2088        .build()
2089        .unwrap()
2090        .metadata
2091        .into_builder(Some(metadata_path.to_string()));
2092
2093        let builder = builder
2094            .add_default_sort_order(SortOrder::unsorted_order())
2095            .unwrap();
2096
2097        let build_result = builder.build().unwrap();
2098
2099        assert_eq!(build_result.metadata.metadata_log.len(), 1);
2100        assert_eq!(
2101            build_result.metadata.metadata_log[0].metadata_file,
2102            metadata_path
2103        );
2104    }
2105
2106    #[test]
2107    fn test_set_ref() {
2108        let builder = builder_without_changes(FormatVersion::V2);
2109
2110        let snapshot = Snapshot::builder()
2111            .with_snapshot_id(1)
2112            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2113            .with_sequence_number(0)
2114            .with_schema_id(0)
2115            .with_manifest_list("/snap-1.avro")
2116            .with_summary(Summary {
2117                operation: Operation::Append,
2118                additional_properties: HashMap::from_iter(vec![
2119                    (
2120                        "spark.app.id".to_string(),
2121                        "local-1662532784305".to_string(),
2122                    ),
2123                    ("added-data-files".to_string(), "4".to_string()),
2124                    ("added-records".to_string(), "4".to_string()),
2125                    ("added-files-size".to_string(), "6001".to_string()),
2126                ]),
2127            })
2128            .build();
2129
2130        let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2131
2132        assert!(
2133            builder
2134                .clone()
2135                .set_ref(MAIN_BRANCH, SnapshotReference {
2136                    snapshot_id: 10,
2137                    retention: SnapshotRetention::Branch {
2138                        min_snapshots_to_keep: Some(10),
2139                        max_snapshot_age_ms: None,
2140                        max_ref_age_ms: None,
2141                    },
2142                })
2143                .unwrap_err()
2144                .to_string()
2145                .contains("Cannot set 'main' to unknown snapshot: '10'")
2146        );
2147
2148        let build_result = builder
2149            .set_ref(MAIN_BRANCH, SnapshotReference {
2150                snapshot_id: 1,
2151                retention: SnapshotRetention::Branch {
2152                    min_snapshots_to_keep: Some(10),
2153                    max_snapshot_age_ms: None,
2154                    max_ref_age_ms: None,
2155                },
2156            })
2157            .unwrap()
2158            .build()
2159            .unwrap();
2160        assert_eq!(build_result.metadata.snapshots.len(), 1);
2161        assert_eq!(
2162            build_result.metadata.snapshot_by_id(1),
2163            Some(&Arc::new(snapshot.clone()))
2164        );
2165        assert_eq!(build_result.metadata.snapshot_log, vec![SnapshotLog {
2166            snapshot_id: 1,
2167            timestamp_ms: snapshot.timestamp_ms()
2168        }])
2169    }
2170
2171    #[test]
2172    fn test_snapshot_log_skips_intermediates() {
2173        let builder = builder_without_changes(FormatVersion::V2);
2174
2175        let snapshot_1 = Snapshot::builder()
2176            .with_snapshot_id(1)
2177            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2178            .with_sequence_number(0)
2179            .with_schema_id(0)
2180            .with_manifest_list("/snap-1.avro")
2181            .with_summary(Summary {
2182                operation: Operation::Append,
2183                additional_properties: HashMap::from_iter(vec![
2184                    (
2185                        "spark.app.id".to_string(),
2186                        "local-1662532784305".to_string(),
2187                    ),
2188                    ("added-data-files".to_string(), "4".to_string()),
2189                    ("added-records".to_string(), "4".to_string()),
2190                    ("added-files-size".to_string(), "6001".to_string()),
2191                ]),
2192            })
2193            .build();
2194
2195        let snapshot_2 = Snapshot::builder()
2196            .with_snapshot_id(2)
2197            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2198            .with_sequence_number(0)
2199            .with_schema_id(0)
2200            .with_manifest_list("/snap-1.avro")
2201            .with_summary(Summary {
2202                operation: Operation::Append,
2203                additional_properties: HashMap::from_iter(vec![
2204                    (
2205                        "spark.app.id".to_string(),
2206                        "local-1662532784305".to_string(),
2207                    ),
2208                    ("added-data-files".to_string(), "4".to_string()),
2209                    ("added-records".to_string(), "4".to_string()),
2210                    ("added-files-size".to_string(), "6001".to_string()),
2211                ]),
2212            })
2213            .build();
2214
2215        let result = builder
2216            .add_snapshot(snapshot_1)
2217            .unwrap()
2218            .set_ref(MAIN_BRANCH, SnapshotReference {
2219                snapshot_id: 1,
2220                retention: SnapshotRetention::Branch {
2221                    min_snapshots_to_keep: Some(10),
2222                    max_snapshot_age_ms: None,
2223                    max_ref_age_ms: None,
2224                },
2225            })
2226            .unwrap()
2227            .set_branch_snapshot(snapshot_2.clone(), MAIN_BRANCH)
2228            .unwrap()
2229            .build()
2230            .unwrap();
2231
2232        assert_eq!(result.metadata.snapshot_log, vec![SnapshotLog {
2233            snapshot_id: 2,
2234            timestamp_ms: snapshot_2.timestamp_ms()
2235        }]);
2236        assert_eq!(result.metadata.current_snapshot().unwrap().snapshot_id(), 2);
2237    }
2238
2239    #[test]
2240    fn test_remove_main_ref_keeps_snapshot_log() {
2241        let builder = builder_without_changes(FormatVersion::V2);
2242
2243        let snapshot = Snapshot::builder()
2244            .with_snapshot_id(1)
2245            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2246            .with_sequence_number(0)
2247            .with_schema_id(0)
2248            .with_manifest_list("/snap-1.avro")
2249            .with_summary(Summary {
2250                operation: Operation::Append,
2251                additional_properties: HashMap::from_iter(vec![
2252                    (
2253                        "spark.app.id".to_string(),
2254                        "local-1662532784305".to_string(),
2255                    ),
2256                    ("added-data-files".to_string(), "4".to_string()),
2257                    ("added-records".to_string(), "4".to_string()),
2258                    ("added-files-size".to_string(), "6001".to_string()),
2259                ]),
2260            })
2261            .build();
2262
2263        let result = builder
2264            .add_snapshot(snapshot.clone())
2265            .unwrap()
2266            .set_ref(MAIN_BRANCH, SnapshotReference {
2267                snapshot_id: 1,
2268                retention: SnapshotRetention::Branch {
2269                    min_snapshots_to_keep: Some(10),
2270                    max_snapshot_age_ms: None,
2271                    max_ref_age_ms: None,
2272                },
2273            })
2274            .unwrap()
2275            .build()
2276            .unwrap();
2277
2278        // Verify snapshot log was created
2279        assert_eq!(result.metadata.snapshot_log.len(), 1);
2280        assert_eq!(result.metadata.snapshot_log[0].snapshot_id, 1);
2281        assert_eq!(result.metadata.current_snapshot_id, Some(1));
2282
2283        // Remove the main ref
2284        let result_after_remove = result
2285            .metadata
2286            .into_builder(Some(
2287                "s3://bucket/test/location/metadata/metadata2.json".to_string(),
2288            ))
2289            .remove_ref(MAIN_BRANCH)
2290            .build()
2291            .unwrap();
2292
2293        // Verify snapshot log is kept even after removing main ref
2294        assert_eq!(result_after_remove.metadata.snapshot_log.len(), 1);
2295        assert_eq!(result_after_remove.metadata.snapshot_log[0].snapshot_id, 1);
2296        assert_eq!(result_after_remove.metadata.current_snapshot_id, None);
2297        assert_eq!(result_after_remove.changes.len(), 1);
2298        assert_eq!(
2299            result_after_remove.changes[0],
2300            TableUpdate::RemoveSnapshotRef {
2301                ref_name: MAIN_BRANCH.to_string()
2302            }
2303        );
2304    }
2305
2306    #[test]
2307    fn test_set_branch_snapshot_creates_branch_if_not_exists() {
2308        let builder = builder_without_changes(FormatVersion::V2);
2309
2310        let snapshot = Snapshot::builder()
2311            .with_snapshot_id(2)
2312            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2313            .with_sequence_number(0)
2314            .with_schema_id(0)
2315            .with_manifest_list("/snap-1.avro")
2316            .with_summary(Summary {
2317                operation: Operation::Append,
2318                additional_properties: HashMap::new(),
2319            })
2320            .build();
2321
2322        let build_result = builder
2323            .set_branch_snapshot(snapshot.clone(), "new_branch")
2324            .unwrap()
2325            .build()
2326            .unwrap();
2327
2328        let reference = SnapshotReference {
2329            snapshot_id: 2,
2330            retention: SnapshotRetention::Branch {
2331                min_snapshots_to_keep: None,
2332                max_snapshot_age_ms: None,
2333                max_ref_age_ms: None,
2334            },
2335        };
2336
2337        assert_eq!(build_result.metadata.refs.len(), 1);
2338        assert_eq!(
2339            build_result.metadata.refs.get("new_branch"),
2340            Some(&reference)
2341        );
2342        assert_eq!(build_result.changes, vec![
2343            TableUpdate::AddSnapshot { snapshot },
2344            TableUpdate::SetSnapshotRef {
2345                ref_name: "new_branch".to_string(),
2346                reference
2347            }
2348        ]);
2349    }
2350
2351    #[test]
2352    fn test_cannot_add_duplicate_snapshot_id() {
2353        let builder = builder_without_changes(FormatVersion::V2);
2354
2355        let snapshot = Snapshot::builder()
2356            .with_snapshot_id(2)
2357            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2358            .with_sequence_number(0)
2359            .with_schema_id(0)
2360            .with_manifest_list("/snap-1.avro")
2361            .with_summary(Summary {
2362                operation: Operation::Append,
2363                additional_properties: HashMap::from_iter(vec![
2364                    (
2365                        "spark.app.id".to_string(),
2366                        "local-1662532784305".to_string(),
2367                    ),
2368                    ("added-data-files".to_string(), "4".to_string()),
2369                    ("added-records".to_string(), "4".to_string()),
2370                    ("added-files-size".to_string(), "6001".to_string()),
2371                ]),
2372            })
2373            .build();
2374
2375        let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2376        builder.add_snapshot(snapshot).unwrap_err();
2377    }
2378
2379    #[test]
2380    fn test_add_incompatible_current_schema_fails() {
2381        let builder = builder_without_changes(FormatVersion::V2);
2382
2383        let added_schema = Schema::builder()
2384            .with_schema_id(1)
2385            .with_fields(vec![])
2386            .build()
2387            .unwrap();
2388
2389        let err = builder
2390            .add_current_schema(added_schema)
2391            .unwrap()
2392            .build()
2393            .unwrap_err();
2394
2395        assert!(
2396            err.to_string()
2397                .contains("Cannot find partition source field")
2398        );
2399    }
2400
2401    #[test]
2402    fn test_add_partition_spec_for_v1_requires_sequential_ids() {
2403        let builder = builder_without_changes(FormatVersion::V1);
2404
2405        let added_spec = UnboundPartitionSpec::builder()
2406            .with_spec_id(10)
2407            .add_partition_fields(vec![
2408                UnboundPartitionField {
2409                    name: "y".to_string(),
2410                    transform: Transform::Identity,
2411                    source_id: 2,
2412                    field_id: Some(1000),
2413                },
2414                UnboundPartitionField {
2415                    name: "z".to_string(),
2416                    transform: Transform::Identity,
2417                    source_id: 3,
2418                    field_id: Some(1002),
2419                },
2420            ])
2421            .unwrap()
2422            .build();
2423
2424        let err = builder.add_partition_spec(added_spec).unwrap_err();
2425        assert!(err.to_string().contains(
2426            "Cannot add partition spec with non-sequential field ids to format version 1 table"
2427        ));
2428    }
2429
2430    #[test]
2431    fn test_expire_metadata_log() {
2432        let builder = builder_without_changes(FormatVersion::V2);
2433        let metadata = builder
2434            .set_properties(HashMap::from_iter(vec![(
2435                TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
2436                "2".to_string(),
2437            )]))
2438            .unwrap()
2439            .build()
2440            .unwrap();
2441        assert_eq!(metadata.metadata.metadata_log.len(), 1);
2442        assert_eq!(metadata.expired_metadata_logs.len(), 0);
2443
2444        let metadata = metadata
2445            .metadata
2446            .into_builder(Some("path2".to_string()))
2447            .set_properties(HashMap::from_iter(vec![(
2448                "change_nr".to_string(),
2449                "1".to_string(),
2450            )]))
2451            .unwrap()
2452            .build()
2453            .unwrap();
2454
2455        assert_eq!(metadata.metadata.metadata_log.len(), 2);
2456        assert_eq!(metadata.expired_metadata_logs.len(), 0);
2457
2458        let metadata = metadata
2459            .metadata
2460            .into_builder(Some("path2".to_string()))
2461            .set_properties(HashMap::from_iter(vec![(
2462                "change_nr".to_string(),
2463                "2".to_string(),
2464            )]))
2465            .unwrap()
2466            .build()
2467            .unwrap();
2468        assert_eq!(metadata.metadata.metadata_log.len(), 2);
2469        assert_eq!(metadata.expired_metadata_logs.len(), 1);
2470    }
2471
2472    #[test]
2473    fn test_v2_sequence_number_cannot_decrease() {
2474        let builder = builder_without_changes(FormatVersion::V2);
2475
2476        let snapshot = Snapshot::builder()
2477            .with_snapshot_id(1)
2478            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2479            .with_sequence_number(1)
2480            .with_schema_id(0)
2481            .with_manifest_list("/snap-1")
2482            .with_summary(Summary {
2483                operation: Operation::Append,
2484                additional_properties: HashMap::new(),
2485            })
2486            .build();
2487
2488        let builder = builder
2489            .add_snapshot(snapshot.clone())
2490            .unwrap()
2491            .set_ref(MAIN_BRANCH, SnapshotReference {
2492                snapshot_id: 1,
2493                retention: SnapshotRetention::Branch {
2494                    min_snapshots_to_keep: Some(10),
2495                    max_snapshot_age_ms: None,
2496                    max_ref_age_ms: None,
2497                },
2498            })
2499            .unwrap();
2500
2501        let snapshot = Snapshot::builder()
2502            .with_snapshot_id(2)
2503            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2504            .with_sequence_number(0)
2505            .with_schema_id(0)
2506            .with_manifest_list("/snap-0")
2507            .with_parent_snapshot_id(Some(1))
2508            .with_summary(Summary {
2509                operation: Operation::Append,
2510                additional_properties: HashMap::new(),
2511            })
2512            .build();
2513
2514        let err = builder
2515            .set_branch_snapshot(snapshot, MAIN_BRANCH)
2516            .unwrap_err();
2517        assert!(
2518            err.to_string()
2519                .contains("Cannot add snapshot with sequence number")
2520        );
2521    }
2522
2523    #[test]
2524    fn test_default_spec_cannot_be_removed() {
2525        let builder = builder_without_changes(FormatVersion::V2);
2526
2527        builder.remove_partition_specs(&[0]).unwrap_err();
2528    }
2529
2530    #[test]
2531    fn test_statistics() {
2532        let builder = builder_without_changes(FormatVersion::V2);
2533
2534        let statistics = StatisticsFile {
2535            snapshot_id: 3055729675574597004,
2536            statistics_path: "s3://a/b/stats.puffin".to_string(),
2537            file_size_in_bytes: 413,
2538            file_footer_size_in_bytes: 42,
2539            key_metadata: None,
2540            blob_metadata: vec![BlobMetadata {
2541                snapshot_id: 3055729675574597004,
2542                sequence_number: 1,
2543                fields: vec![1],
2544                r#type: "ndv".to_string(),
2545                properties: HashMap::new(),
2546            }],
2547        };
2548        let build_result = builder.set_statistics(statistics.clone()).build().unwrap();
2549
2550        assert_eq!(
2551            build_result.metadata.statistics,
2552            HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2553        );
2554        assert_eq!(build_result.changes, vec![TableUpdate::SetStatistics {
2555            statistics: statistics.clone()
2556        }]);
2557
2558        // Remove
2559        let builder = build_result.metadata.into_builder(None);
2560        let build_result = builder
2561            .remove_statistics(statistics.snapshot_id)
2562            .build()
2563            .unwrap();
2564
2565        assert_eq!(build_result.metadata.statistics.len(), 0);
2566        assert_eq!(build_result.changes, vec![TableUpdate::RemoveStatistics {
2567            snapshot_id: statistics.snapshot_id
2568        }]);
2569
2570        // Remove again yields no changes
2571        let builder = build_result.metadata.into_builder(None);
2572        let build_result = builder
2573            .remove_statistics(statistics.snapshot_id)
2574            .build()
2575            .unwrap();
2576        assert_eq!(build_result.metadata.statistics.len(), 0);
2577        assert_eq!(build_result.changes.len(), 0);
2578    }
2579
2580    #[test]
2581    fn test_add_partition_statistics() {
2582        let builder = builder_without_changes(FormatVersion::V2);
2583
2584        let statistics = PartitionStatisticsFile {
2585            snapshot_id: 3055729675574597004,
2586            statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2587            file_size_in_bytes: 43,
2588        };
2589
2590        let build_result = builder
2591            .set_partition_statistics(statistics.clone())
2592            .build()
2593            .unwrap();
2594        assert_eq!(
2595            build_result.metadata.partition_statistics,
2596            HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2597        );
2598        assert_eq!(build_result.changes, vec![
2599            TableUpdate::SetPartitionStatistics {
2600                partition_statistics: statistics.clone()
2601            }
2602        ]);
2603
2604        // Remove
2605        let builder = build_result.metadata.into_builder(None);
2606        let build_result = builder
2607            .remove_partition_statistics(statistics.snapshot_id)
2608            .build()
2609            .unwrap();
2610        assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2611        assert_eq!(build_result.changes, vec![
2612            TableUpdate::RemovePartitionStatistics {
2613                snapshot_id: statistics.snapshot_id
2614            }
2615        ]);
2616
2617        // Remove again yields no changes
2618        let builder = build_result.metadata.into_builder(None);
2619        let build_result = builder
2620            .remove_partition_statistics(statistics.snapshot_id)
2621            .build()
2622            .unwrap();
2623        assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2624        assert_eq!(build_result.changes.len(), 0);
2625    }
2626
2627    #[test]
2628    fn last_update_increased_for_property_only_update() {
2629        let builder = builder_without_changes(FormatVersion::V2);
2630
2631        let metadata = builder.build().unwrap().metadata;
2632        let last_updated_ms = metadata.last_updated_ms;
2633        sleep(std::time::Duration::from_millis(2));
2634
2635        let build_result = metadata
2636            .into_builder(Some(
2637                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2638            ))
2639            .set_properties(HashMap::from_iter(vec![(
2640                "foo".to_string(),
2641                "bar".to_string(),
2642            )]))
2643            .unwrap()
2644            .build()
2645            .unwrap();
2646
2647        assert!(
2648            build_result.metadata.last_updated_ms > last_updated_ms,
2649            "{} > {}",
2650            build_result.metadata.last_updated_ms,
2651            last_updated_ms
2652        );
2653    }
2654
2655    #[test]
2656    fn test_construct_default_main_branch() {
2657        // Load the table without ref
2658        let file = File::open(format!(
2659            "{}/testdata/table_metadata/{}",
2660            env!("CARGO_MANIFEST_DIR"),
2661            "TableMetadataV2Valid.json"
2662        ))
2663        .unwrap();
2664        let reader = BufReader::new(file);
2665        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2666
2667        let table = Table::builder()
2668            .metadata(resp)
2669            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2670            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2671            .file_io(FileIOBuilder::new("memory").build().unwrap())
2672            .build()
2673            .unwrap();
2674
2675        assert_eq!(
2676            table.metadata().refs.get(MAIN_BRANCH).unwrap().snapshot_id,
2677            table.metadata().current_snapshot_id().unwrap()
2678        );
2679    }
2680
2681    #[test]
2682    fn test_active_schema_cannot_be_removed() {
2683        let builder = builder_without_changes(FormatVersion::V2);
2684        builder.remove_schemas(&[0]).unwrap_err();
2685    }
2686
2687    #[test]
2688    fn test_remove_schemas() {
2689        let file = File::open(format!(
2690            "{}/testdata/table_metadata/{}",
2691            env!("CARGO_MANIFEST_DIR"),
2692            "TableMetadataV2Valid.json"
2693        ))
2694        .unwrap();
2695        let reader = BufReader::new(file);
2696        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2697
2698        let table = Table::builder()
2699            .metadata(resp)
2700            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2701            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2702            .file_io(FileIOBuilder::new("memory").build().unwrap())
2703            .build()
2704            .unwrap();
2705
2706        assert_eq!(2, table.metadata().schemas.len());
2707
2708        {
2709            // can not remove active schema
2710            let meta_data_builder = table.metadata().clone().into_builder(None);
2711            meta_data_builder.remove_schemas(&[1]).unwrap_err();
2712        }
2713
2714        let mut meta_data_builder = table.metadata().clone().into_builder(None);
2715        meta_data_builder = meta_data_builder.remove_schemas(&[0]).unwrap();
2716        let build_result = meta_data_builder.build().unwrap();
2717        assert_eq!(1, build_result.metadata.schemas.len());
2718        assert_eq!(1, build_result.metadata.current_schema_id);
2719        assert_eq!(1, build_result.metadata.current_schema().schema_id());
2720        assert_eq!(1, build_result.changes.len());
2721
2722        let remove_schema_ids =
2723            if let TableUpdate::RemoveSchemas { schema_ids } = &build_result.changes[0] {
2724                schema_ids
2725            } else {
2726                unreachable!("Expected RemoveSchema change")
2727            };
2728        assert_eq!(remove_schema_ids, &[0]);
2729    }
2730
2731    #[test]
2732    fn test_schema_evolution_now_correctly_validates_partition_field_name_conflicts() {
2733        let initial_schema = Schema::builder()
2734            .with_fields(vec![
2735                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2736            ])
2737            .build()
2738            .unwrap();
2739
2740        let partition_spec_with_bucket = UnboundPartitionSpec::builder()
2741            .with_spec_id(0)
2742            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2743            .unwrap()
2744            .build();
2745
2746        let metadata = TableMetadataBuilder::new(
2747            initial_schema,
2748            partition_spec_with_bucket,
2749            SortOrder::unsorted_order(),
2750            TEST_LOCATION.to_string(),
2751            FormatVersion::V2,
2752            HashMap::new(),
2753        )
2754        .unwrap()
2755        .build()
2756        .unwrap()
2757        .metadata;
2758
2759        let partition_field_names: Vec<String> = metadata
2760            .default_partition_spec()
2761            .fields()
2762            .iter()
2763            .map(|f| f.name.clone())
2764            .collect();
2765        assert!(partition_field_names.contains(&"bucket_data".to_string()));
2766
2767        let evolved_schema = Schema::builder()
2768            .with_fields(vec![
2769                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2770                // Adding a schema field with the same name as an existing partition field
2771                NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
2772            ])
2773            .build()
2774            .unwrap();
2775
2776        let builder = metadata.into_builder(Some(
2777            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2778        ));
2779
2780        // Try to add the evolved schema - this should now fail immediately with a clear error
2781        let result = builder.add_current_schema(evolved_schema);
2782
2783        assert!(result.is_err());
2784        let error = result.unwrap_err();
2785        let error_message = error.message();
2786        assert!(error_message.contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2787        assert!(error_message.contains("Schema evolution cannot introduce field names that match existing partition field names"));
2788    }
2789
2790    #[test]
2791    fn test_schema_evolution_should_validate_on_schema_add_not_metadata_build() {
2792        let initial_schema = Schema::builder()
2793            .with_fields(vec![
2794                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2795            ])
2796            .build()
2797            .unwrap();
2798
2799        let partition_spec = UnboundPartitionSpec::builder()
2800            .with_spec_id(0)
2801            .add_partition_field(1, "partition_col", Transform::Bucket(16))
2802            .unwrap()
2803            .build();
2804
2805        let metadata = TableMetadataBuilder::new(
2806            initial_schema,
2807            partition_spec,
2808            SortOrder::unsorted_order(),
2809            TEST_LOCATION.to_string(),
2810            FormatVersion::V2,
2811            HashMap::new(),
2812        )
2813        .unwrap()
2814        .build()
2815        .unwrap()
2816        .metadata;
2817
2818        let non_conflicting_schema = Schema::builder()
2819            .with_fields(vec![
2820                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2821                NestedField::required(2, "new_field", Type::Primitive(PrimitiveType::Int)).into(),
2822            ])
2823            .build()
2824            .unwrap();
2825
2826        // This should succeed since there's no name conflict
2827        let result = metadata
2828            .clone()
2829            .into_builder(Some("test_location".to_string()))
2830            .add_current_schema(non_conflicting_schema)
2831            .unwrap()
2832            .build();
2833
2834        assert!(result.is_ok());
2835    }
2836
2837    #[test]
2838    fn test_partition_spec_evolution_validates_schema_field_name_conflicts() {
2839        let initial_schema = Schema::builder()
2840            .with_fields(vec![
2841                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2842                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2843                    .into(),
2844            ])
2845            .build()
2846            .unwrap();
2847
2848        let partition_spec = UnboundPartitionSpec::builder()
2849            .with_spec_id(0)
2850            .add_partition_field(1, "data_bucket", Transform::Bucket(16))
2851            .unwrap()
2852            .build();
2853
2854        let metadata = TableMetadataBuilder::new(
2855            initial_schema,
2856            partition_spec,
2857            SortOrder::unsorted_order(),
2858            TEST_LOCATION.to_string(),
2859            FormatVersion::V2,
2860            HashMap::new(),
2861        )
2862        .unwrap()
2863        .build()
2864        .unwrap()
2865        .metadata;
2866
2867        let builder = metadata.into_builder(Some(
2868            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2869        ));
2870
2871        let conflicting_partition_spec = UnboundPartitionSpec::builder()
2872            .with_spec_id(1)
2873            .add_partition_field(1, "existing_field", Transform::Bucket(8))
2874            .unwrap()
2875            .build();
2876
2877        let result = builder.add_partition_spec(conflicting_partition_spec);
2878
2879        assert!(result.is_err());
2880        let error = result.unwrap_err();
2881        let error_message = error.message();
2882        // The error comes from our multi-version validation
2883        assert!(error_message.contains(
2884            "Cannot create partition with name 'existing_field' that conflicts with schema field"
2885        ));
2886        assert!(error_message.contains("and is not an identity transform"));
2887    }
2888
2889    #[test]
2890    fn test_schema_evolution_validates_against_all_historical_schemas() {
2891        // Create a table with an initial schema that has a field "existing_field"
2892        let initial_schema = Schema::builder()
2893            .with_fields(vec![
2894                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2895                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2896                    .into(),
2897            ])
2898            .build()
2899            .unwrap();
2900
2901        let partition_spec = UnboundPartitionSpec::builder()
2902            .with_spec_id(0)
2903            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2904            .unwrap()
2905            .build();
2906
2907        let metadata = TableMetadataBuilder::new(
2908            initial_schema,
2909            partition_spec,
2910            SortOrder::unsorted_order(),
2911            TEST_LOCATION.to_string(),
2912            FormatVersion::V2,
2913            HashMap::new(),
2914        )
2915        .unwrap()
2916        .build()
2917        .unwrap()
2918        .metadata;
2919
2920        // Add a second schema that removes the existing_field but keeps the data field
2921        let second_schema = Schema::builder()
2922            .with_fields(vec![
2923                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2924                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2925                    .into(),
2926            ])
2927            .build()
2928            .unwrap();
2929
2930        let metadata = metadata
2931            .into_builder(Some("test_location".to_string()))
2932            .add_current_schema(second_schema)
2933            .unwrap()
2934            .build()
2935            .unwrap()
2936            .metadata;
2937
2938        // Now try to add a third schema that reintroduces "existing_field"
2939        // This should succeed because "existing_field" exists in a historical schema,
2940        // even though there's a partition field named "bucket_data"
2941        let third_schema = Schema::builder()
2942            .with_fields(vec![
2943                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2944                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2945                    .into(),
2946                NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2947                    .into(),
2948            ])
2949            .build()
2950            .unwrap();
2951
2952        let builder = metadata
2953            .clone()
2954            .into_builder(Some("test_location".to_string()));
2955
2956        // This should succeed because "existing_field" exists in a historical schema
2957        let result = builder.add_current_schema(third_schema);
2958        assert!(result.is_ok());
2959
2960        // However, trying to add a schema field that conflicts with the partition field
2961        // and doesn't exist in any historical schema should fail
2962        let conflicting_schema = Schema::builder()
2963            .with_fields(vec![
2964                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2965                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2966                    .into(),
2967                NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2968                    .into(),
2969                NestedField::required(5, "bucket_data", Type::Primitive(PrimitiveType::String))
2970                    .into(), // conflicts with partition field
2971            ])
2972            .build()
2973            .unwrap();
2974
2975        let builder2 = metadata.into_builder(Some("test_location".to_string()));
2976        let result2 = builder2.add_current_schema(conflicting_schema);
2977
2978        // This should fail because "bucket_data" conflicts with partition field name
2979        // and doesn't exist in any historical schema
2980        assert!(result2.is_err());
2981        let error = result2.unwrap_err();
2982        assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2983    }
2984
2985    #[test]
2986    fn test_schema_evolution_allows_existing_partition_field_if_exists_in_historical_schema() {
2987        // Create initial schema with a field
2988        let initial_schema = Schema::builder()
2989            .with_fields(vec![
2990                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2991                NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
2992                    .into(),
2993            ])
2994            .build()
2995            .unwrap();
2996
2997        let partition_spec = UnboundPartitionSpec::builder()
2998            .with_spec_id(0)
2999            .add_partition_field(2, "partition_data", Transform::Identity)
3000            .unwrap()
3001            .build();
3002
3003        let metadata = TableMetadataBuilder::new(
3004            initial_schema,
3005            partition_spec,
3006            SortOrder::unsorted_order(),
3007            TEST_LOCATION.to_string(),
3008            FormatVersion::V2,
3009            HashMap::new(),
3010        )
3011        .unwrap()
3012        .build()
3013        .unwrap()
3014        .metadata;
3015
3016        // Add a new schema that still contains the partition_data field
3017        let evolved_schema = Schema::builder()
3018            .with_fields(vec![
3019                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3020                NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
3021                    .into(),
3022                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
3023                    .into(),
3024            ])
3025            .build()
3026            .unwrap();
3027
3028        // This should succeed because partition_data exists in historical schemas
3029        let result = metadata
3030            .into_builder(Some("test_location".to_string()))
3031            .add_current_schema(evolved_schema);
3032
3033        assert!(result.is_ok());
3034    }
3035
3036    #[test]
3037    fn test_schema_evolution_prevents_new_field_conflicting_with_partition_field() {
3038        // Create initial schema WITHOUT the conflicting field
3039        let initial_schema = Schema::builder()
3040            .with_fields(vec![
3041                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3042            ])
3043            .build()
3044            .unwrap();
3045
3046        let partition_spec = UnboundPartitionSpec::builder()
3047            .with_spec_id(0)
3048            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
3049            .unwrap()
3050            .build();
3051
3052        let metadata = TableMetadataBuilder::new(
3053            initial_schema,
3054            partition_spec,
3055            SortOrder::unsorted_order(),
3056            TEST_LOCATION.to_string(),
3057            FormatVersion::V2,
3058            HashMap::new(),
3059        )
3060        .unwrap()
3061        .build()
3062        .unwrap()
3063        .metadata;
3064
3065        // Try to add a schema with a field that conflicts with partition field name
3066        let conflicting_schema = Schema::builder()
3067            .with_fields(vec![
3068                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3069                // This field name conflicts with the partition field "bucket_data"
3070                NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
3071            ])
3072            .build()
3073            .unwrap();
3074
3075        let builder = metadata.into_builder(Some("test_location".to_string()));
3076        let result = builder.add_current_schema(conflicting_schema);
3077
3078        // This should fail because "bucket_data" conflicts with partition field name
3079        // and doesn't exist in any historical schema
3080        assert!(result.is_err());
3081        let error = result.unwrap_err();
3082        assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
3083    }
3084
3085    #[test]
3086    fn test_partition_spec_evolution_allows_non_conflicting_names() {
3087        let initial_schema = Schema::builder()
3088            .with_fields(vec![
3089                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3090                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
3091                    .into(),
3092            ])
3093            .build()
3094            .unwrap();
3095
3096        let partition_spec = UnboundPartitionSpec::builder()
3097            .with_spec_id(0)
3098            .add_partition_field(1, "data_bucket", Transform::Bucket(16))
3099            .unwrap()
3100            .build();
3101
3102        let metadata = TableMetadataBuilder::new(
3103            initial_schema,
3104            partition_spec,
3105            SortOrder::unsorted_order(),
3106            TEST_LOCATION.to_string(),
3107            FormatVersion::V2,
3108            HashMap::new(),
3109        )
3110        .unwrap()
3111        .build()
3112        .unwrap()
3113        .metadata;
3114
3115        let builder = metadata.into_builder(Some(
3116            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3117        ));
3118
3119        // Try to add a partition spec with a field name that does NOT conflict with existing schema fields
3120        let non_conflicting_partition_spec = UnboundPartitionSpec::builder()
3121            .with_spec_id(1)
3122            .add_partition_field(2, "new_partition_field", Transform::Bucket(8))
3123            .unwrap()
3124            .build();
3125
3126        let result = builder.add_partition_spec(non_conflicting_partition_spec);
3127
3128        assert!(result.is_ok());
3129    }
3130
3131    #[test]
3132    fn test_row_lineage_addition() {
3133        let new_rows = 30;
3134        let base = builder_without_changes(FormatVersion::V3)
3135            .build()
3136            .unwrap()
3137            .metadata;
3138        let add_rows = Snapshot::builder()
3139            .with_snapshot_id(0)
3140            .with_timestamp_ms(base.last_updated_ms + 1)
3141            .with_sequence_number(0)
3142            .with_schema_id(0)
3143            .with_manifest_list("foo")
3144            .with_parent_snapshot_id(None)
3145            .with_summary(Summary {
3146                operation: Operation::Append,
3147                additional_properties: HashMap::new(),
3148            })
3149            .with_row_range(base.next_row_id(), new_rows)
3150            .build();
3151
3152        let first_addition = base
3153            .into_builder(None)
3154            .add_snapshot(add_rows.clone())
3155            .unwrap()
3156            .build()
3157            .unwrap()
3158            .metadata;
3159
3160        assert_eq!(first_addition.next_row_id(), new_rows);
3161
3162        let add_more_rows = Snapshot::builder()
3163            .with_snapshot_id(1)
3164            .with_timestamp_ms(first_addition.last_updated_ms + 1)
3165            .with_sequence_number(1)
3166            .with_schema_id(0)
3167            .with_manifest_list("foo")
3168            .with_parent_snapshot_id(Some(0))
3169            .with_summary(Summary {
3170                operation: Operation::Append,
3171                additional_properties: HashMap::new(),
3172            })
3173            .with_row_range(first_addition.next_row_id(), new_rows)
3174            .build();
3175
3176        let second_addition = first_addition
3177            .into_builder(None)
3178            .add_snapshot(add_more_rows)
3179            .unwrap()
3180            .build()
3181            .unwrap()
3182            .metadata;
3183        assert_eq!(second_addition.next_row_id(), new_rows * 2);
3184    }
3185
3186    #[test]
3187    fn test_row_lineage_invalid_snapshot() {
3188        let new_rows = 30;
3189        let base = builder_without_changes(FormatVersion::V3)
3190            .build()
3191            .unwrap()
3192            .metadata;
3193
3194        // add rows to check TableMetadata validation; Snapshot rejects negative next-row-id
3195        let add_rows = Snapshot::builder()
3196            .with_snapshot_id(0)
3197            .with_timestamp_ms(base.last_updated_ms + 1)
3198            .with_sequence_number(0)
3199            .with_schema_id(0)
3200            .with_manifest_list("foo")
3201            .with_parent_snapshot_id(None)
3202            .with_summary(Summary {
3203                operation: Operation::Append,
3204                additional_properties: HashMap::new(),
3205            })
3206            .with_row_range(base.next_row_id(), new_rows)
3207            .build();
3208
3209        let added = base
3210            .into_builder(None)
3211            .add_snapshot(add_rows)
3212            .unwrap()
3213            .build()
3214            .unwrap()
3215            .metadata;
3216
3217        let invalid_new_rows = Snapshot::builder()
3218            .with_snapshot_id(1)
3219            .with_timestamp_ms(added.last_updated_ms + 1)
3220            .with_sequence_number(1)
3221            .with_schema_id(0)
3222            .with_manifest_list("foo")
3223            .with_parent_snapshot_id(Some(0))
3224            .with_summary(Summary {
3225                operation: Operation::Append,
3226                additional_properties: HashMap::new(),
3227            })
3228            // first_row_id is behind table next_row_id
3229            .with_row_range(added.next_row_id() - 1, 10)
3230            .build();
3231
3232        let err = added
3233            .into_builder(None)
3234            .add_snapshot(invalid_new_rows)
3235            .unwrap_err();
3236        assert!(
3237            err.to_string().contains(
3238                "Cannot add a snapshot, first-row-id is behind table next-row-id: 29 < 30"
3239            )
3240        );
3241    }
3242
3243    #[test]
3244    fn test_row_lineage_append_branch() {
3245        // Appends to a branch should still change last-row-id even if not on main, these changes
3246        // should also affect commits to main
3247
3248        let branch = "some_branch";
3249
3250        // Start with V3 metadata to support row lineage
3251        let base = builder_without_changes(FormatVersion::V3)
3252            .build()
3253            .unwrap()
3254            .metadata;
3255
3256        // Initial next_row_id should be 0
3257        assert_eq!(base.next_row_id(), 0);
3258
3259        // Write to Branch - append 30 rows
3260        let branch_snapshot_1 = Snapshot::builder()
3261            .with_snapshot_id(1)
3262            .with_timestamp_ms(base.last_updated_ms + 1)
3263            .with_sequence_number(0)
3264            .with_schema_id(0)
3265            .with_manifest_list("foo")
3266            .with_parent_snapshot_id(None)
3267            .with_summary(Summary {
3268                operation: Operation::Append,
3269                additional_properties: HashMap::new(),
3270            })
3271            .with_row_range(base.next_row_id(), 30)
3272            .build();
3273
3274        let table_after_branch_1 = base
3275            .into_builder(None)
3276            .set_branch_snapshot(branch_snapshot_1.clone(), branch)
3277            .unwrap()
3278            .build()
3279            .unwrap()
3280            .metadata;
3281
3282        // Current snapshot should be null (no main branch snapshot yet)
3283        assert!(table_after_branch_1.current_snapshot().is_none());
3284
3285        // Branch snapshot should have first_row_id = 0
3286        let branch_ref = table_after_branch_1.refs.get(branch).unwrap();
3287        let branch_snap_1 = table_after_branch_1
3288            .snapshots
3289            .get(&branch_ref.snapshot_id)
3290            .unwrap();
3291        assert_eq!(branch_snap_1.first_row_id(), Some(0));
3292
3293        // Next row id should be 30
3294        assert_eq!(table_after_branch_1.next_row_id(), 30);
3295
3296        // Write to Main - append 28 rows
3297        let main_snapshot = Snapshot::builder()
3298            .with_snapshot_id(2)
3299            .with_timestamp_ms(table_after_branch_1.last_updated_ms + 1)
3300            .with_sequence_number(1)
3301            .with_schema_id(0)
3302            .with_manifest_list("bar")
3303            .with_parent_snapshot_id(None)
3304            .with_summary(Summary {
3305                operation: Operation::Append,
3306                additional_properties: HashMap::new(),
3307            })
3308            .with_row_range(table_after_branch_1.next_row_id(), 28)
3309            .build();
3310
3311        let table_after_main = table_after_branch_1
3312            .into_builder(None)
3313            .add_snapshot(main_snapshot.clone())
3314            .unwrap()
3315            .set_ref(MAIN_BRANCH, SnapshotReference {
3316                snapshot_id: main_snapshot.snapshot_id(),
3317                retention: SnapshotRetention::Branch {
3318                    min_snapshots_to_keep: None,
3319                    max_snapshot_age_ms: None,
3320                    max_ref_age_ms: None,
3321                },
3322            })
3323            .unwrap()
3324            .build()
3325            .unwrap()
3326            .metadata;
3327
3328        // Main snapshot should have first_row_id = 30
3329        let current_snapshot = table_after_main.current_snapshot().unwrap();
3330        assert_eq!(current_snapshot.first_row_id(), Some(30));
3331
3332        // Next row id should be 58 (30 + 28)
3333        assert_eq!(table_after_main.next_row_id(), 58);
3334
3335        // Write again to branch - append 21 rows
3336        let branch_snapshot_2 = Snapshot::builder()
3337            .with_snapshot_id(3)
3338            .with_timestamp_ms(table_after_main.last_updated_ms + 1)
3339            .with_sequence_number(2)
3340            .with_schema_id(0)
3341            .with_manifest_list("baz")
3342            .with_parent_snapshot_id(Some(branch_snapshot_1.snapshot_id()))
3343            .with_summary(Summary {
3344                operation: Operation::Append,
3345                additional_properties: HashMap::new(),
3346            })
3347            .with_row_range(table_after_main.next_row_id(), 21)
3348            .build();
3349
3350        let table_after_branch_2 = table_after_main
3351            .into_builder(None)
3352            .set_branch_snapshot(branch_snapshot_2.clone(), branch)
3353            .unwrap()
3354            .build()
3355            .unwrap()
3356            .metadata;
3357
3358        // Branch snapshot should have first_row_id = 58 (30 + 28)
3359        let branch_ref_2 = table_after_branch_2.refs.get(branch).unwrap();
3360        let branch_snap_2 = table_after_branch_2
3361            .snapshots
3362            .get(&branch_ref_2.snapshot_id)
3363            .unwrap();
3364        assert_eq!(branch_snap_2.first_row_id(), Some(58));
3365
3366        // Next row id should be 79 (30 + 28 + 21)
3367        assert_eq!(table_after_branch_2.next_row_id(), 79);
3368    }
3369
3370    #[test]
3371    fn test_encryption_keys() {
3372        let builder = builder_without_changes(FormatVersion::V2);
3373
3374        // Create test encryption keys
3375        let encryption_key_1 = EncryptedKey::builder()
3376            .key_id("key-1")
3377            .encrypted_key_metadata(vec![1, 2, 3, 4])
3378            .encrypted_by_id("encryption-service-1")
3379            .properties(HashMap::from_iter(vec![(
3380                "algorithm".to_string(),
3381                "AES-256".to_string(),
3382            )]))
3383            .build();
3384
3385        let encryption_key_2 = EncryptedKey::builder()
3386            .key_id("key-2")
3387            .encrypted_key_metadata(vec![5, 6, 7, 8])
3388            .encrypted_by_id("encryption-service-2")
3389            .properties(HashMap::new())
3390            .build();
3391
3392        // Add first encryption key
3393        let build_result = builder
3394            .add_encryption_key(encryption_key_1.clone())
3395            .build()
3396            .unwrap();
3397
3398        assert_eq!(build_result.changes.len(), 1);
3399        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3400        assert_eq!(
3401            build_result.metadata.encryption_key("key-1"),
3402            Some(&encryption_key_1)
3403        );
3404        assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3405            encryption_key: encryption_key_1.clone()
3406        });
3407
3408        // Add second encryption key
3409        let build_result = build_result
3410            .metadata
3411            .into_builder(Some(
3412                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3413            ))
3414            .add_encryption_key(encryption_key_2.clone())
3415            .build()
3416            .unwrap();
3417
3418        assert_eq!(build_result.changes.len(), 1);
3419        assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3420        assert_eq!(
3421            build_result.metadata.encryption_key("key-1"),
3422            Some(&encryption_key_1)
3423        );
3424        assert_eq!(
3425            build_result.metadata.encryption_key("key-2"),
3426            Some(&encryption_key_2)
3427        );
3428        assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3429            encryption_key: encryption_key_2.clone()
3430        });
3431
3432        // Try to add duplicate key - should not create a change
3433        let build_result = build_result
3434            .metadata
3435            .into_builder(Some(
3436                "s3://bucket/test/location/metadata/metadata2.json".to_string(),
3437            ))
3438            .add_encryption_key(encryption_key_1.clone())
3439            .build()
3440            .unwrap();
3441
3442        assert_eq!(build_result.changes.len(), 0);
3443        assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3444
3445        // Remove first encryption key
3446        let build_result = build_result
3447            .metadata
3448            .into_builder(Some(
3449                "s3://bucket/test/location/metadata/metadata3.json".to_string(),
3450            ))
3451            .remove_encryption_key("key-1")
3452            .build()
3453            .unwrap();
3454
3455        assert_eq!(build_result.changes.len(), 1);
3456        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3457        assert_eq!(build_result.metadata.encryption_key("key-1"), None);
3458        assert_eq!(
3459            build_result.metadata.encryption_key("key-2"),
3460            Some(&encryption_key_2)
3461        );
3462        assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3463            key_id: "key-1".to_string()
3464        });
3465
3466        // Try to remove non-existent key - should not create a change
3467        let build_result = build_result
3468            .metadata
3469            .into_builder(Some(
3470                "s3://bucket/test/location/metadata/metadata4.json".to_string(),
3471            ))
3472            .remove_encryption_key("non-existent-key")
3473            .build()
3474            .unwrap();
3475
3476        assert_eq!(build_result.changes.len(), 0);
3477        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3478
3479        // Test encryption_keys_iter()
3480        let keys = build_result
3481            .metadata
3482            .encryption_keys_iter()
3483            .collect::<Vec<_>>();
3484        assert_eq!(keys.len(), 1);
3485        assert_eq!(keys[0], &encryption_key_2);
3486
3487        // Remove last encryption key
3488        let build_result = build_result
3489            .metadata
3490            .into_builder(Some(
3491                "s3://bucket/test/location/metadata/metadata5.json".to_string(),
3492            ))
3493            .remove_encryption_key("key-2")
3494            .build()
3495            .unwrap();
3496
3497        assert_eq!(build_result.changes.len(), 1);
3498        assert_eq!(build_result.metadata.encryption_keys.len(), 0);
3499        assert_eq!(build_result.metadata.encryption_key("key-2"), None);
3500        assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3501            key_id: "key-2".to_string()
3502        });
3503
3504        // Verify empty encryption_keys_iter()
3505        let keys = build_result.metadata.encryption_keys_iter();
3506        assert_eq!(keys.len(), 0);
3507    }
3508}