Skip to main content

iceberg/catalog/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Catalog API for Apache Iceberg
19
20pub mod memory;
21mod metadata_location;
22
23use std::collections::HashMap;
24use std::fmt::{Debug, Display};
25use std::future::Future;
26use std::mem::take;
27use std::ops::Deref;
28use std::str::FromStr;
29use std::sync::Arc;
30
31use _serde::{deserialize_snapshot, serialize_snapshot};
32use async_trait::async_trait;
33pub use memory::MemoryCatalog;
34pub use metadata_location::*;
35#[cfg(test)]
36use mockall::automock;
37use serde_derive::{Deserialize, Serialize};
38use typed_builder::TypedBuilder;
39use uuid::Uuid;
40
41use crate::io::StorageFactory;
42use crate::spec::{
43    EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
44    SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
45    UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
46};
47use crate::table::Table;
48use crate::{Error, ErrorKind, Result};
49
50/// The catalog API for Iceberg Rust.
51#[async_trait]
52#[cfg_attr(test, automock)]
53pub trait Catalog: Debug + Sync + Send {
54    /// List namespaces inside the catalog.
55    async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
56    -> Result<Vec<NamespaceIdent>>;
57
58    /// Create a new namespace inside the catalog.
59    async fn create_namespace(
60        &self,
61        namespace: &NamespaceIdent,
62        properties: HashMap<String, String>,
63    ) -> Result<Namespace>;
64
65    /// Get a namespace information from the catalog.
66    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
67
68    /// Check if namespace exists in catalog.
69    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
70
71    /// Update a namespace inside the catalog.
72    ///
73    /// # Behavior
74    ///
75    /// The properties must be the full set of namespace.
76    async fn update_namespace(
77        &self,
78        namespace: &NamespaceIdent,
79        properties: HashMap<String, String>,
80    ) -> Result<()>;
81
82    /// Drop a namespace from the catalog, or returns error if it doesn't exist.
83    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
84
85    /// List tables from namespace.
86    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
87
88    /// Create a new table inside the namespace.
89    async fn create_table(
90        &self,
91        namespace: &NamespaceIdent,
92        creation: TableCreation,
93    ) -> Result<Table>;
94
95    /// Load table from the catalog.
96    async fn load_table(&self, table: &TableIdent) -> Result<Table>;
97
98    /// Drop a table from the catalog, or returns error if it doesn't exist.
99    async fn drop_table(&self, table: &TableIdent) -> Result<()>;
100
101    /// Check if a table exists in the catalog.
102    async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
103
104    /// Rename a table in the catalog.
105    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
106
107    /// Register an existing table to the catalog.
108    async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table>;
109
110    /// Update a table to the catalog.
111    async fn update_table(&self, commit: TableCommit) -> Result<Table>;
112}
113
114/// Common interface for all catalog builders.
115pub trait CatalogBuilder: Default + Debug + Send + Sync {
116    /// The catalog type that this builder creates.
117    type C: Catalog;
118
119    /// Set a custom StorageFactory to use for storage operations.
120    ///
121    /// When a StorageFactory is provided, the catalog will use it to build FileIO
122    /// instances for all storage operations instead of using the default factory.
123    ///
124    /// # Arguments
125    ///
126    /// * `storage_factory` - The StorageFactory to use for creating storage instances
127    ///
128    /// # Example
129    ///
130    /// ```rust,ignore
131    /// use iceberg::CatalogBuilder;
132    /// use iceberg::io::StorageFactory;
133    /// use iceberg_storage_opendal::OpenDalStorageFactory;
134    /// use std::sync::Arc;
135    ///
136    /// let catalog = MyCatalogBuilder::default()
137    ///     .with_storage_factory(Arc::new(OpenDalStorageFactory::S3 {
138    ///         configured_scheme: "s3a".to_string(),
139    ///         customized_credential_load: None,
140    ///     }))
141    ///     .load("my_catalog", props)
142    ///     .await?;
143    /// ```
144    fn with_storage_factory(self, storage_factory: Arc<dyn StorageFactory>) -> Self;
145
146    /// Create a new catalog instance.
147    fn load(
148        self,
149        name: impl Into<String>,
150        props: HashMap<String, String>,
151    ) -> impl Future<Output = Result<Self::C>> + Send;
152}
153
154/// NamespaceIdent represents the identifier of a namespace in the catalog.
155///
156/// The namespace identifier is a list of strings, where each string is a
157/// component of the namespace. It's the catalog implementer's responsibility to
158/// handle the namespace identifier correctly.
159#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
160pub struct NamespaceIdent(Vec<String>);
161
162impl NamespaceIdent {
163    /// Create a new namespace identifier with only one level.
164    pub fn new(name: String) -> Self {
165        Self(vec![name])
166    }
167
168    /// Create a multi-level namespace identifier from vector.
169    pub fn from_vec(names: Vec<String>) -> Result<Self> {
170        if names.is_empty() {
171            return Err(Error::new(
172                ErrorKind::DataInvalid,
173                "Namespace identifier can't be empty!",
174            ));
175        }
176        Ok(Self(names))
177    }
178
179    /// Try to create namespace identifier from an iterator of string.
180    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
181        Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
182    }
183
184    /// Returns a string for used in url.
185    pub fn to_url_string(&self) -> String {
186        self.as_ref().join("\u{001f}")
187    }
188
189    /// Returns inner strings.
190    pub fn inner(self) -> Vec<String> {
191        self.0
192    }
193
194    /// Get the parent of this namespace.
195    /// Returns None if this namespace only has a single element and thus has no parent.
196    pub fn parent(&self) -> Option<Self> {
197        self.0.split_last().and_then(|(_, parent)| {
198            if parent.is_empty() {
199                None
200            } else {
201                Some(Self(parent.to_vec()))
202            }
203        })
204    }
205}
206
207impl AsRef<Vec<String>> for NamespaceIdent {
208    fn as_ref(&self) -> &Vec<String> {
209        &self.0
210    }
211}
212
213impl Deref for NamespaceIdent {
214    type Target = [String];
215
216    fn deref(&self) -> &Self::Target {
217        &self.0
218    }
219}
220
221/// Namespace represents a namespace in the catalog.
222#[derive(Debug, Clone, PartialEq, Eq)]
223pub struct Namespace {
224    name: NamespaceIdent,
225    properties: HashMap<String, String>,
226}
227
228impl Namespace {
229    /// Create a new namespace.
230    pub fn new(name: NamespaceIdent) -> Self {
231        Self::with_properties(name, HashMap::default())
232    }
233
234    /// Create a new namespace with properties.
235    pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
236        Self { name, properties }
237    }
238
239    /// Get the name of the namespace.
240    pub fn name(&self) -> &NamespaceIdent {
241        &self.name
242    }
243
244    /// Get the properties of the namespace.
245    pub fn properties(&self) -> &HashMap<String, String> {
246        &self.properties
247    }
248}
249
250impl Display for NamespaceIdent {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        write!(f, "{}", self.0.join("."))
253    }
254}
255
256/// TableIdent represents the identifier of a table in the catalog.
257#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
258pub struct TableIdent {
259    /// Namespace of the table.
260    pub namespace: NamespaceIdent,
261    /// Table name.
262    pub name: String,
263}
264
265impl TableIdent {
266    /// Create a new table identifier.
267    pub fn new(namespace: NamespaceIdent, name: String) -> Self {
268        Self { namespace, name }
269    }
270
271    /// Get the namespace of the table.
272    pub fn namespace(&self) -> &NamespaceIdent {
273        &self.namespace
274    }
275
276    /// Get the name of the table.
277    pub fn name(&self) -> &str {
278        &self.name
279    }
280
281    /// Try to create table identifier from an iterator of string.
282    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
283        let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
284        let table_name = vec.pop().ok_or_else(|| {
285            Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
286        })?;
287        let namespace_ident = NamespaceIdent::from_vec(vec)?;
288
289        Ok(Self {
290            namespace: namespace_ident,
291            name: table_name,
292        })
293    }
294}
295
296impl Display for TableIdent {
297    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298        write!(f, "{}.{}", self.namespace, self.name)
299    }
300}
301
302/// TableCreation represents the creation of a table in the catalog.
303#[derive(Debug, TypedBuilder)]
304pub struct TableCreation {
305    /// The name of the table.
306    pub name: String,
307    /// The location of the table.
308    #[builder(default, setter(strip_option(fallback = location_opt)))]
309    pub location: Option<String>,
310    /// The schema of the table.
311    pub schema: Schema,
312    /// The partition spec of the table, could be None.
313    #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))]
314    pub partition_spec: Option<UnboundPartitionSpec>,
315    /// The sort order of the table.
316    #[builder(default, setter(strip_option(fallback = sort_order_opt)))]
317    pub sort_order: Option<SortOrder>,
318    /// The properties of the table.
319    #[builder(default, setter(transform = |props: impl IntoIterator<Item=(String, String)>| {
320        props.into_iter().collect()
321    }))]
322    pub properties: HashMap<String, String>,
323    /// Format version of the table. Defaults to V2.
324    #[builder(default = FormatVersion::V2)]
325    pub format_version: FormatVersion,
326}
327
328/// TableCommit represents the commit of a table in the catalog.
329///
330/// The builder is marked as private since it's dangerous and error-prone to construct
331/// [`TableCommit`] directly.
332/// Users are supposed to use [`crate::transaction::Transaction`] to update table.
333#[derive(Debug, TypedBuilder)]
334#[builder(build_method(vis = "pub(crate)"))]
335pub struct TableCommit {
336    /// The table ident.
337    ident: TableIdent,
338    /// The requirements of the table.
339    ///
340    /// Commit will fail if the requirements are not met.
341    requirements: Vec<TableRequirement>,
342    /// The updates of the table.
343    updates: Vec<TableUpdate>,
344}
345
346impl TableCommit {
347    /// Return the table identifier.
348    pub fn identifier(&self) -> &TableIdent {
349        &self.ident
350    }
351
352    /// Take all requirements.
353    pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
354        take(&mut self.requirements)
355    }
356
357    /// Take all updates.
358    pub fn take_updates(&mut self) -> Vec<TableUpdate> {
359        take(&mut self.updates)
360    }
361
362    /// Applies this [`TableCommit`] to the given [`Table`] as part of a catalog update.
363    /// Typically used by [`Catalog::update_table`] to validate requirements and apply metadata updates.
364    ///
365    /// Returns a new [`Table`] with updated metadata,
366    /// or an error if validation or application fails.
367    pub fn apply(self, table: Table) -> Result<Table> {
368        // check requirements
369        for requirement in self.requirements {
370            requirement.check(Some(table.metadata()))?;
371        }
372
373        // get current metadata location
374        let current_metadata_location = table.metadata_location_result()?;
375
376        // apply updates to metadata builder
377        let mut metadata_builder = table
378            .metadata()
379            .clone()
380            .into_builder(Some(current_metadata_location.to_string()));
381        for update in self.updates {
382            metadata_builder = update.apply(metadata_builder)?;
383        }
384
385        // Bump the version of metadata
386        let new_metadata_location = MetadataLocation::from_str(current_metadata_location)?
387            .with_next_version()
388            .to_string();
389
390        Ok(table
391            .with_metadata(Arc::new(metadata_builder.build()?.metadata))
392            .with_metadata_location(new_metadata_location))
393    }
394}
395
396/// TableRequirement represents a requirement for a table in the catalog.
397#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
398#[serde(tag = "type")]
399pub enum TableRequirement {
400    /// The table must not already exist; used for create transactions
401    #[serde(rename = "assert-create")]
402    NotExist,
403    /// The table UUID must match the requirement.
404    #[serde(rename = "assert-table-uuid")]
405    UuidMatch {
406        /// Uuid of original table.
407        uuid: Uuid,
408    },
409    /// The table branch or tag identified by the requirement's `reference` must
410    /// reference the requirement's `snapshot-id`.
411    #[serde(rename = "assert-ref-snapshot-id")]
412    RefSnapshotIdMatch {
413        /// The reference of the table to assert.
414        r#ref: String,
415        /// The snapshot id of the table to assert.
416        /// If the id is `None`, the ref must not already exist.
417        #[serde(rename = "snapshot-id")]
418        snapshot_id: Option<i64>,
419    },
420    /// The table's last assigned column id must match the requirement.
421    #[serde(rename = "assert-last-assigned-field-id")]
422    LastAssignedFieldIdMatch {
423        /// The last assigned field id of the table to assert.
424        #[serde(rename = "last-assigned-field-id")]
425        last_assigned_field_id: i32,
426    },
427    /// The table's current schema id must match the requirement.
428    #[serde(rename = "assert-current-schema-id")]
429    CurrentSchemaIdMatch {
430        /// Current schema id of the table to assert.
431        #[serde(rename = "current-schema-id")]
432        current_schema_id: SchemaId,
433    },
434    /// The table's last assigned partition id must match the
435    /// requirement.
436    #[serde(rename = "assert-last-assigned-partition-id")]
437    LastAssignedPartitionIdMatch {
438        /// Last assigned partition id of the table to assert.
439        #[serde(rename = "last-assigned-partition-id")]
440        last_assigned_partition_id: i32,
441    },
442    /// The table's default spec id must match the requirement.
443    #[serde(rename = "assert-default-spec-id")]
444    DefaultSpecIdMatch {
445        /// Default spec id of the table to assert.
446        #[serde(rename = "default-spec-id")]
447        default_spec_id: i32,
448    },
449    /// The table's default sort order id must match the requirement.
450    #[serde(rename = "assert-default-sort-order-id")]
451    DefaultSortOrderIdMatch {
452        /// Default sort order id of the table to assert.
453        #[serde(rename = "default-sort-order-id")]
454        default_sort_order_id: i64,
455    },
456}
457
458/// TableUpdate represents an update to a table in the catalog.
459#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
460#[serde(tag = "action", rename_all = "kebab-case")]
461#[allow(clippy::large_enum_variant)]
462pub enum TableUpdate {
463    /// Upgrade table's format version
464    #[serde(rename_all = "kebab-case")]
465    UpgradeFormatVersion {
466        /// Target format upgrade to.
467        format_version: FormatVersion,
468    },
469    /// Assign a new UUID to the table
470    #[serde(rename_all = "kebab-case")]
471    AssignUuid {
472        /// The new UUID to assign.
473        uuid: Uuid,
474    },
475    /// Add a new schema to the table
476    #[serde(rename_all = "kebab-case")]
477    AddSchema {
478        /// The schema to add.
479        schema: Schema,
480    },
481    /// Set table's current schema
482    #[serde(rename_all = "kebab-case")]
483    SetCurrentSchema {
484        /// Schema ID to set as current, or -1 to set last added schema
485        schema_id: i32,
486    },
487    /// Add a new partition spec to the table
488    AddSpec {
489        /// The partition spec to add.
490        spec: UnboundPartitionSpec,
491    },
492    /// Set table's default spec
493    #[serde(rename_all = "kebab-case")]
494    SetDefaultSpec {
495        /// Partition spec ID to set as the default, or -1 to set last added spec
496        spec_id: i32,
497    },
498    /// Add sort order to table.
499    #[serde(rename_all = "kebab-case")]
500    AddSortOrder {
501        /// Sort order to add.
502        sort_order: SortOrder,
503    },
504    /// Set table's default sort order
505    #[serde(rename_all = "kebab-case")]
506    SetDefaultSortOrder {
507        /// Sort order ID to set as the default, or -1 to set last added sort order
508        sort_order_id: i64,
509    },
510    /// Add snapshot to table.
511    #[serde(rename_all = "kebab-case")]
512    AddSnapshot {
513        /// Snapshot to add.
514        #[serde(
515            deserialize_with = "deserialize_snapshot",
516            serialize_with = "serialize_snapshot"
517        )]
518        snapshot: Snapshot,
519    },
520    /// Set table's snapshot ref.
521    #[serde(rename_all = "kebab-case")]
522    SetSnapshotRef {
523        /// Name of snapshot reference to set.
524        ref_name: String,
525        /// Snapshot reference to set.
526        #[serde(flatten)]
527        reference: SnapshotReference,
528    },
529    /// Remove table's snapshots
530    #[serde(rename_all = "kebab-case")]
531    RemoveSnapshots {
532        /// Snapshot ids to remove.
533        snapshot_ids: Vec<i64>,
534    },
535    /// Remove snapshot reference
536    #[serde(rename_all = "kebab-case")]
537    RemoveSnapshotRef {
538        /// Name of snapshot reference to remove.
539        ref_name: String,
540    },
541    /// Update table's location
542    SetLocation {
543        /// New location for table.
544        location: String,
545    },
546    /// Update table's properties
547    SetProperties {
548        /// Properties to update for table.
549        updates: HashMap<String, String>,
550    },
551    /// Remove table's properties
552    RemoveProperties {
553        /// Properties to remove
554        removals: Vec<String>,
555    },
556    /// Remove partition specs
557    #[serde(rename_all = "kebab-case")]
558    RemovePartitionSpecs {
559        /// Partition spec ids to remove.
560        spec_ids: Vec<i32>,
561    },
562    /// Set statistics for a snapshot
563    #[serde(with = "_serde_set_statistics")]
564    SetStatistics {
565        /// File containing the statistics
566        statistics: StatisticsFile,
567    },
568    /// Remove statistics for a snapshot
569    #[serde(rename_all = "kebab-case")]
570    RemoveStatistics {
571        /// Snapshot id to remove statistics for.
572        snapshot_id: i64,
573    },
574    /// Set partition statistics for a snapshot
575    #[serde(rename_all = "kebab-case")]
576    SetPartitionStatistics {
577        /// File containing the partition statistics
578        partition_statistics: PartitionStatisticsFile,
579    },
580    /// Remove partition statistics for a snapshot
581    #[serde(rename_all = "kebab-case")]
582    RemovePartitionStatistics {
583        /// Snapshot id to remove partition statistics for.
584        snapshot_id: i64,
585    },
586    /// Remove schemas
587    #[serde(rename_all = "kebab-case")]
588    RemoveSchemas {
589        /// Schema IDs to remove.
590        schema_ids: Vec<i32>,
591    },
592    /// Add an encryption key
593    #[serde(rename_all = "kebab-case")]
594    AddEncryptionKey {
595        /// The encryption key to add.
596        encryption_key: EncryptedKey,
597    },
598    /// Remove an encryption key
599    #[serde(rename_all = "kebab-case")]
600    RemoveEncryptionKey {
601        /// The id of the encryption key to remove.
602        key_id: String,
603    },
604}
605
606impl TableUpdate {
607    /// Applies the update to the table metadata builder.
608    pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
609        match self {
610            TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
611            TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
612            TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
613            TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
614            TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
615            TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
616            TableUpdate::SetDefaultSortOrder { sort_order_id } => {
617                builder.set_default_sort_order(sort_order_id)
618            }
619            TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
620            TableUpdate::SetSnapshotRef {
621                ref_name,
622                reference,
623            } => builder.set_ref(&ref_name, reference),
624            TableUpdate::RemoveSnapshots { snapshot_ids } => {
625                Ok(builder.remove_snapshots(&snapshot_ids))
626            }
627            TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
628            TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
629            TableUpdate::SetProperties { updates } => builder.set_properties(updates),
630            TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
631            TableUpdate::UpgradeFormatVersion { format_version } => {
632                builder.upgrade_format_version(format_version)
633            }
634            TableUpdate::RemovePartitionSpecs { spec_ids } => {
635                builder.remove_partition_specs(&spec_ids)
636            }
637            TableUpdate::SetStatistics { statistics } => Ok(builder.set_statistics(statistics)),
638            TableUpdate::RemoveStatistics { snapshot_id } => {
639                Ok(builder.remove_statistics(snapshot_id))
640            }
641            TableUpdate::SetPartitionStatistics {
642                partition_statistics,
643            } => Ok(builder.set_partition_statistics(partition_statistics)),
644            TableUpdate::RemovePartitionStatistics { snapshot_id } => {
645                Ok(builder.remove_partition_statistics(snapshot_id))
646            }
647            TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
648            TableUpdate::AddEncryptionKey { encryption_key } => {
649                Ok(builder.add_encryption_key(encryption_key))
650            }
651            TableUpdate::RemoveEncryptionKey { key_id } => {
652                Ok(builder.remove_encryption_key(&key_id))
653            }
654        }
655    }
656}
657
658impl TableRequirement {
659    /// Check that the requirement is met by the table metadata.
660    /// If the requirement is not met, an appropriate error is returned.
661    ///
662    /// Provide metadata as `None` if the table does not exist.
663    pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
664        if let Some(metadata) = metadata {
665            match self {
666                TableRequirement::NotExist => {
667                    return Err(Error::new(
668                        ErrorKind::CatalogCommitConflicts,
669                        format!(
670                            "Requirement failed: Table with id {} already exists",
671                            metadata.uuid()
672                        ),
673                    )
674                    .with_retryable(true));
675                }
676                TableRequirement::UuidMatch { uuid } => {
677                    if &metadata.uuid() != uuid {
678                        return Err(Error::new(
679                            ErrorKind::CatalogCommitConflicts,
680                            "Requirement failed: Table UUID does not match",
681                        )
682                        .with_context("expected", *uuid)
683                        .with_context("found", metadata.uuid())
684                        .with_retryable(true));
685                    }
686                }
687                TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
688                    // ToDo: Harmonize the types of current_schema_id
689                    if metadata.current_schema_id != *current_schema_id {
690                        return Err(Error::new(
691                            ErrorKind::CatalogCommitConflicts,
692                            "Requirement failed: Current schema id does not match",
693                        )
694                        .with_context("expected", current_schema_id.to_string())
695                        .with_context("found", metadata.current_schema_id.to_string())
696                        .with_retryable(true));
697                    }
698                }
699                TableRequirement::DefaultSortOrderIdMatch {
700                    default_sort_order_id,
701                } => {
702                    if metadata.default_sort_order().order_id != *default_sort_order_id {
703                        return Err(Error::new(
704                            ErrorKind::CatalogCommitConflicts,
705                            "Requirement failed: Default sort order id does not match",
706                        )
707                        .with_context("expected", default_sort_order_id.to_string())
708                        .with_context("found", metadata.default_sort_order().order_id.to_string())
709                        .with_retryable(true));
710                    }
711                }
712                TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
713                    let snapshot_ref = metadata.snapshot_for_ref(r#ref);
714                    if let Some(snapshot_id) = snapshot_id {
715                        let snapshot_ref = snapshot_ref.ok_or(
716                            Error::new(
717                                ErrorKind::CatalogCommitConflicts,
718                                format!("Requirement failed: Branch or tag `{ref}` not found"),
719                            )
720                            .with_retryable(true),
721                        )?;
722                        if snapshot_ref.snapshot_id() != *snapshot_id {
723                            return Err(Error::new(
724                                ErrorKind::CatalogCommitConflicts,
725                                format!(
726                                    "Requirement failed: Branch or tag `{ref}`'s snapshot has changed"
727                                ),
728                            )
729                            .with_context("expected", snapshot_id.to_string())
730                            .with_context("found", snapshot_ref.snapshot_id().to_string())
731                            .with_retryable(true));
732                        }
733                    } else if snapshot_ref.is_some() {
734                        // a null snapshot ID means the ref should not exist already
735                        return Err(Error::new(
736                            ErrorKind::CatalogCommitConflicts,
737                            format!("Requirement failed: Branch or tag `{ref}` already exists"),
738                        )
739                        .with_retryable(true));
740                    }
741                }
742                TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
743                    // ToDo: Harmonize the types of default_spec_id
744                    if metadata.default_partition_spec_id() != *default_spec_id {
745                        return Err(Error::new(
746                            ErrorKind::CatalogCommitConflicts,
747                            "Requirement failed: Default partition spec id does not match",
748                        )
749                        .with_context("expected", default_spec_id.to_string())
750                        .with_context("found", metadata.default_partition_spec_id().to_string())
751                        .with_retryable(true));
752                    }
753                }
754                TableRequirement::LastAssignedPartitionIdMatch {
755                    last_assigned_partition_id,
756                } => {
757                    if metadata.last_partition_id != *last_assigned_partition_id {
758                        return Err(Error::new(
759                            ErrorKind::CatalogCommitConflicts,
760                            "Requirement failed: Last assigned partition id does not match",
761                        )
762                        .with_context("expected", last_assigned_partition_id.to_string())
763                        .with_context("found", metadata.last_partition_id.to_string())
764                        .with_retryable(true));
765                    }
766                }
767                TableRequirement::LastAssignedFieldIdMatch {
768                    last_assigned_field_id,
769                } => {
770                    if &metadata.last_column_id != last_assigned_field_id {
771                        return Err(Error::new(
772                            ErrorKind::CatalogCommitConflicts,
773                            "Requirement failed: Last assigned field id does not match",
774                        )
775                        .with_context("expected", last_assigned_field_id.to_string())
776                        .with_context("found", metadata.last_column_id.to_string())
777                        .with_retryable(true));
778                    }
779                }
780            };
781        } else {
782            match self {
783                TableRequirement::NotExist => {}
784                _ => {
785                    return Err(Error::new(
786                        ErrorKind::TableNotFound,
787                        "Requirement failed: Table does not exist",
788                    ));
789                }
790            }
791        }
792
793        Ok(())
794    }
795}
796
797pub(super) mod _serde {
798    use serde::{Deserialize as _, Deserializer, Serialize as _};
799
800    use super::*;
801    use crate::spec::{SchemaId, Summary};
802
803    pub(super) fn deserialize_snapshot<'de, D>(
804        deserializer: D,
805    ) -> std::result::Result<Snapshot, D::Error>
806    where D: Deserializer<'de> {
807        let buf = CatalogSnapshot::deserialize(deserializer)?;
808        Ok(buf.into())
809    }
810
811    pub(super) fn serialize_snapshot<S>(
812        snapshot: &Snapshot,
813        serializer: S,
814    ) -> std::result::Result<S::Ok, S::Error>
815    where
816        S: serde::Serializer,
817    {
818        let buf: CatalogSnapshot = snapshot.clone().into();
819        buf.serialize(serializer)
820    }
821
822    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
823    #[serde(rename_all = "kebab-case")]
824    /// Defines the structure of a v2 snapshot for the catalog.
825    /// Main difference to SnapshotV2 is that sequence-number is optional
826    /// in the rest catalog spec to allow for backwards compatibility with v1.
827    struct CatalogSnapshot {
828        snapshot_id: i64,
829        #[serde(skip_serializing_if = "Option::is_none")]
830        parent_snapshot_id: Option<i64>,
831        #[serde(default)]
832        sequence_number: i64,
833        timestamp_ms: i64,
834        manifest_list: String,
835        summary: Summary,
836        #[serde(skip_serializing_if = "Option::is_none")]
837        schema_id: Option<SchemaId>,
838        #[serde(skip_serializing_if = "Option::is_none")]
839        first_row_id: Option<u64>,
840        #[serde(skip_serializing_if = "Option::is_none")]
841        added_rows: Option<u64>,
842        #[serde(skip_serializing_if = "Option::is_none")]
843        key_id: Option<String>,
844    }
845
846    impl From<CatalogSnapshot> for Snapshot {
847        fn from(snapshot: CatalogSnapshot) -> Self {
848            let CatalogSnapshot {
849                snapshot_id,
850                parent_snapshot_id,
851                sequence_number,
852                timestamp_ms,
853                manifest_list,
854                schema_id,
855                summary,
856                first_row_id,
857                added_rows,
858                key_id,
859            } = snapshot;
860            let builder = Snapshot::builder()
861                .with_snapshot_id(snapshot_id)
862                .with_parent_snapshot_id(parent_snapshot_id)
863                .with_sequence_number(sequence_number)
864                .with_timestamp_ms(timestamp_ms)
865                .with_manifest_list(manifest_list)
866                .with_summary(summary)
867                .with_encryption_key_id(key_id);
868            let row_range = first_row_id.zip(added_rows);
869            match (schema_id, row_range) {
870                (None, None) => builder.build(),
871                (Some(schema_id), None) => builder.with_schema_id(schema_id).build(),
872                (None, Some((first_row_id, last_row_id))) => {
873                    builder.with_row_range(first_row_id, last_row_id).build()
874                }
875                (Some(schema_id), Some((first_row_id, last_row_id))) => builder
876                    .with_schema_id(schema_id)
877                    .with_row_range(first_row_id, last_row_id)
878                    .build(),
879            }
880        }
881    }
882
883    impl From<Snapshot> for CatalogSnapshot {
884        fn from(snapshot: Snapshot) -> Self {
885            let first_row_id = snapshot.first_row_id();
886            let added_rows = snapshot.added_rows_count();
887            let Snapshot {
888                snapshot_id,
889                parent_snapshot_id,
890                sequence_number,
891                timestamp_ms,
892                manifest_list,
893                summary,
894                schema_id,
895                row_range: _,
896                encryption_key_id: key_id,
897            } = snapshot;
898            CatalogSnapshot {
899                snapshot_id,
900                parent_snapshot_id,
901                sequence_number,
902                timestamp_ms,
903                manifest_list,
904                summary,
905                schema_id,
906                first_row_id,
907                added_rows,
908                key_id,
909            }
910        }
911    }
912}
913
914/// ViewCreation represents the creation of a view in the catalog.
915#[derive(Debug, TypedBuilder)]
916pub struct ViewCreation {
917    /// The name of the view.
918    pub name: String,
919    /// The view's base location; used to create metadata file locations
920    pub location: String,
921    /// Representations for the view.
922    pub representations: ViewRepresentations,
923    /// The schema of the view.
924    pub schema: Schema,
925    /// The properties of the view.
926    #[builder(default)]
927    pub properties: HashMap<String, String>,
928    /// The default namespace to use when a reference in the SELECT is a single identifier
929    pub default_namespace: NamespaceIdent,
930    /// Default catalog to use when a reference in the SELECT does not contain a catalog
931    #[builder(default)]
932    pub default_catalog: Option<String>,
933    /// A string to string map of summary metadata about the version
934    /// Typical keys are "engine-name" and "engine-version"
935    #[builder(default)]
936    pub summary: HashMap<String, String>,
937}
938
939/// ViewUpdate represents an update to a view in the catalog.
940#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
941#[serde(tag = "action", rename_all = "kebab-case")]
942#[allow(clippy::large_enum_variant)]
943pub enum ViewUpdate {
944    /// Assign a new UUID to the view
945    #[serde(rename_all = "kebab-case")]
946    AssignUuid {
947        /// The new UUID to assign.
948        uuid: Uuid,
949    },
950    /// Upgrade view's format version
951    #[serde(rename_all = "kebab-case")]
952    UpgradeFormatVersion {
953        /// Target format upgrade to.
954        format_version: ViewFormatVersion,
955    },
956    /// Add a new schema to the view
957    #[serde(rename_all = "kebab-case")]
958    AddSchema {
959        /// The schema to add.
960        schema: Schema,
961        /// The last column id of the view.
962        last_column_id: Option<i32>,
963    },
964    /// Set view's current schema
965    #[serde(rename_all = "kebab-case")]
966    SetLocation {
967        /// New location for view.
968        location: String,
969    },
970    /// Set view's properties
971    ///
972    /// Matching keys are updated, and non-matching keys are left unchanged.
973    #[serde(rename_all = "kebab-case")]
974    SetProperties {
975        /// Properties to update for view.
976        updates: HashMap<String, String>,
977    },
978    /// Remove view's properties
979    #[serde(rename_all = "kebab-case")]
980    RemoveProperties {
981        /// Properties to remove
982        removals: Vec<String>,
983    },
984    /// Add a new version to the view
985    #[serde(rename_all = "kebab-case")]
986    AddViewVersion {
987        /// The view version to add.
988        view_version: ViewVersion,
989    },
990    /// Set view's current version
991    #[serde(rename_all = "kebab-case")]
992    SetCurrentViewVersion {
993        /// View version id to set as current, or -1 to set last added version
994        view_version_id: i32,
995    },
996}
997
998mod _serde_set_statistics {
999    // The rest spec requires an additional field `snapshot-id`
1000    // that is redundant with the `snapshot_id` field in the statistics file.
1001    use serde::{Deserialize, Deserializer, Serialize, Serializer};
1002
1003    use super::*;
1004
1005    #[derive(Debug, Serialize, Deserialize)]
1006    #[serde(rename_all = "kebab-case")]
1007    struct SetStatistics {
1008        snapshot_id: Option<i64>,
1009        statistics: StatisticsFile,
1010    }
1011
1012    pub fn serialize<S>(
1013        value: &StatisticsFile,
1014        serializer: S,
1015    ) -> std::result::Result<S::Ok, S::Error>
1016    where
1017        S: Serializer,
1018    {
1019        SetStatistics {
1020            snapshot_id: Some(value.snapshot_id),
1021            statistics: value.clone(),
1022        }
1023        .serialize(serializer)
1024    }
1025
1026    pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<StatisticsFile, D::Error>
1027    where D: Deserializer<'de> {
1028        let SetStatistics {
1029            snapshot_id,
1030            statistics,
1031        } = SetStatistics::deserialize(deserializer)?;
1032        if let Some(snapshot_id) = snapshot_id
1033            && snapshot_id != statistics.snapshot_id
1034        {
1035            return Err(serde::de::Error::custom(format!(
1036                "Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
1037                statistics.snapshot_id
1038            )));
1039        }
1040
1041        Ok(statistics)
1042    }
1043}
1044
1045#[cfg(test)]
1046mod tests {
1047    use std::collections::HashMap;
1048    use std::fmt::Debug;
1049    use std::fs::File;
1050    use std::io::BufReader;
1051
1052    use base64::Engine as _;
1053    use serde::Serialize;
1054    use serde::de::DeserializeOwned;
1055    use uuid::uuid;
1056
1057    use super::ViewUpdate;
1058    use crate::io::FileIO;
1059    use crate::spec::{
1060        BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
1061        PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
1062        SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
1063        StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
1064        UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
1065        ViewVersion,
1066    };
1067    use crate::table::Table;
1068    use crate::{
1069        NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
1070    };
1071
1072    #[test]
1073    fn test_parent_namespace() {
1074        let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
1075        let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
1076        let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
1077
1078        assert_eq!(ns1.parent(), None);
1079        assert_eq!(ns2.parent(), Some(ns1.clone()));
1080        assert_eq!(ns3.parent(), Some(ns2.clone()));
1081    }
1082
1083    #[test]
1084    fn test_create_table_id() {
1085        let table_id = TableIdent {
1086            namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
1087            name: "t1".to_string(),
1088        };
1089
1090        assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
1091    }
1092
1093    #[test]
1094    fn test_table_creation_iterator_properties() {
1095        let builder = TableCreation::builder()
1096            .name("table".to_string())
1097            .schema(Schema::builder().build().unwrap());
1098
1099        fn s(k: &str, v: &str) -> (String, String) {
1100            (k.to_string(), v.to_string())
1101        }
1102
1103        let table_creation = builder
1104            .properties([s("key", "value"), s("foo", "bar")])
1105            .build();
1106
1107        assert_eq!(
1108            HashMap::from([s("key", "value"), s("foo", "bar")]),
1109            table_creation.properties
1110        );
1111    }
1112
1113    fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
1114        json: impl ToString,
1115        expected: T,
1116    ) {
1117        let json_str = json.to_string();
1118        let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
1119        assert_eq!(actual, expected, "Parsed value is not equal to expected");
1120
1121        let restored: T = serde_json::from_str(
1122            &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1123        )
1124        .expect("Failed to parse from serialized json");
1125
1126        assert_eq!(
1127            restored, expected,
1128            "Parsed restored value is not equal to expected"
1129        );
1130    }
1131
1132    fn metadata() -> TableMetadata {
1133        let tbl_creation = TableCreation::builder()
1134            .name("table".to_string())
1135            .location("/path/to/table".to_string())
1136            .schema(Schema::builder().build().unwrap())
1137            .build();
1138
1139        TableMetadataBuilder::from_table_creation(tbl_creation)
1140            .unwrap()
1141            .assign_uuid(uuid::Uuid::nil())
1142            .build()
1143            .unwrap()
1144            .metadata
1145    }
1146
1147    #[test]
1148    fn test_check_requirement_not_exist() {
1149        let metadata = metadata();
1150        let requirement = TableRequirement::NotExist;
1151
1152        assert!(requirement.check(Some(&metadata)).is_err());
1153        assert!(requirement.check(None).is_ok());
1154    }
1155
1156    #[test]
1157    fn test_check_table_uuid() {
1158        let metadata = metadata();
1159
1160        let requirement = TableRequirement::UuidMatch {
1161            uuid: uuid::Uuid::now_v7(),
1162        };
1163        assert!(requirement.check(Some(&metadata)).is_err());
1164
1165        let requirement = TableRequirement::UuidMatch {
1166            uuid: uuid::Uuid::nil(),
1167        };
1168        assert!(requirement.check(Some(&metadata)).is_ok());
1169    }
1170
1171    #[test]
1172    fn test_check_ref_snapshot_id() {
1173        let metadata = metadata();
1174
1175        // Ref does not exist but should
1176        let requirement = TableRequirement::RefSnapshotIdMatch {
1177            r#ref: "my_branch".to_string(),
1178            snapshot_id: Some(1),
1179        };
1180        assert!(requirement.check(Some(&metadata)).is_err());
1181
1182        // Ref does not exist and should not
1183        let requirement = TableRequirement::RefSnapshotIdMatch {
1184            r#ref: "my_branch".to_string(),
1185            snapshot_id: None,
1186        };
1187        assert!(requirement.check(Some(&metadata)).is_ok());
1188
1189        // Add snapshot
1190        let snapshot = Snapshot::builder()
1191            .with_snapshot_id(3051729675574597004)
1192            .with_sequence_number(10)
1193            .with_timestamp_ms(9992191116217)
1194            .with_manifest_list("s3://b/wh/.../s1.avro".to_string())
1195            .with_schema_id(0)
1196            .with_summary(Summary {
1197                operation: Operation::Append,
1198                additional_properties: HashMap::new(),
1199            })
1200            .build();
1201
1202        let builder = metadata.into_builder(None);
1203        let builder = TableUpdate::AddSnapshot {
1204            snapshot: snapshot.clone(),
1205        }
1206        .apply(builder)
1207        .unwrap();
1208        let metadata = TableUpdate::SetSnapshotRef {
1209            ref_name: MAIN_BRANCH.to_string(),
1210            reference: SnapshotReference {
1211                snapshot_id: snapshot.snapshot_id(),
1212                retention: SnapshotRetention::Branch {
1213                    min_snapshots_to_keep: Some(10),
1214                    max_snapshot_age_ms: None,
1215                    max_ref_age_ms: None,
1216                },
1217            },
1218        }
1219        .apply(builder)
1220        .unwrap()
1221        .build()
1222        .unwrap()
1223        .metadata;
1224
1225        // Ref exists and should match
1226        let requirement = TableRequirement::RefSnapshotIdMatch {
1227            r#ref: "main".to_string(),
1228            snapshot_id: Some(3051729675574597004),
1229        };
1230        assert!(requirement.check(Some(&metadata)).is_ok());
1231
1232        // Ref exists but does not match
1233        let requirement = TableRequirement::RefSnapshotIdMatch {
1234            r#ref: "main".to_string(),
1235            snapshot_id: Some(1),
1236        };
1237        assert!(requirement.check(Some(&metadata)).is_err());
1238    }
1239
1240    #[test]
1241    fn test_check_last_assigned_field_id() {
1242        let metadata = metadata();
1243
1244        let requirement = TableRequirement::LastAssignedFieldIdMatch {
1245            last_assigned_field_id: 1,
1246        };
1247        assert!(requirement.check(Some(&metadata)).is_err());
1248
1249        let requirement = TableRequirement::LastAssignedFieldIdMatch {
1250            last_assigned_field_id: 0,
1251        };
1252        assert!(requirement.check(Some(&metadata)).is_ok());
1253    }
1254
1255    #[test]
1256    fn test_check_current_schema_id() {
1257        let metadata = metadata();
1258
1259        let requirement = TableRequirement::CurrentSchemaIdMatch {
1260            current_schema_id: 1,
1261        };
1262        assert!(requirement.check(Some(&metadata)).is_err());
1263
1264        let requirement = TableRequirement::CurrentSchemaIdMatch {
1265            current_schema_id: 0,
1266        };
1267        assert!(requirement.check(Some(&metadata)).is_ok());
1268    }
1269
1270    #[test]
1271    fn test_check_last_assigned_partition_id() {
1272        let metadata = metadata();
1273        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1274            last_assigned_partition_id: 0,
1275        };
1276        assert!(requirement.check(Some(&metadata)).is_err());
1277
1278        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1279            last_assigned_partition_id: 999,
1280        };
1281        assert!(requirement.check(Some(&metadata)).is_ok());
1282    }
1283
1284    #[test]
1285    fn test_check_default_spec_id() {
1286        let metadata = metadata();
1287
1288        let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
1289        assert!(requirement.check(Some(&metadata)).is_err());
1290
1291        let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
1292        assert!(requirement.check(Some(&metadata)).is_ok());
1293    }
1294
1295    #[test]
1296    fn test_check_default_sort_order_id() {
1297        let metadata = metadata();
1298
1299        let requirement = TableRequirement::DefaultSortOrderIdMatch {
1300            default_sort_order_id: 1,
1301        };
1302        assert!(requirement.check(Some(&metadata)).is_err());
1303
1304        let requirement = TableRequirement::DefaultSortOrderIdMatch {
1305            default_sort_order_id: 0,
1306        };
1307        assert!(requirement.check(Some(&metadata)).is_ok());
1308    }
1309
1310    #[test]
1311    fn test_table_uuid() {
1312        test_serde_json(
1313            r#"
1314{
1315    "type": "assert-table-uuid",
1316    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1317}
1318        "#,
1319            TableRequirement::UuidMatch {
1320                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1321            },
1322        );
1323    }
1324
1325    #[test]
1326    fn test_assert_table_not_exists() {
1327        test_serde_json(
1328            r#"
1329{
1330    "type": "assert-create"
1331}
1332        "#,
1333            TableRequirement::NotExist,
1334        );
1335    }
1336
1337    #[test]
1338    fn test_assert_ref_snapshot_id() {
1339        test_serde_json(
1340            r#"
1341{
1342    "type": "assert-ref-snapshot-id",
1343    "ref": "snapshot-name",
1344    "snapshot-id": null
1345}
1346        "#,
1347            TableRequirement::RefSnapshotIdMatch {
1348                r#ref: "snapshot-name".to_string(),
1349                snapshot_id: None,
1350            },
1351        );
1352
1353        test_serde_json(
1354            r#"
1355{
1356    "type": "assert-ref-snapshot-id",
1357    "ref": "snapshot-name",
1358    "snapshot-id": 1
1359}
1360        "#,
1361            TableRequirement::RefSnapshotIdMatch {
1362                r#ref: "snapshot-name".to_string(),
1363                snapshot_id: Some(1),
1364            },
1365        );
1366    }
1367
1368    #[test]
1369    fn test_assert_last_assigned_field_id() {
1370        test_serde_json(
1371            r#"
1372{
1373    "type": "assert-last-assigned-field-id",
1374    "last-assigned-field-id": 12
1375}
1376        "#,
1377            TableRequirement::LastAssignedFieldIdMatch {
1378                last_assigned_field_id: 12,
1379            },
1380        );
1381    }
1382
1383    #[test]
1384    fn test_assert_current_schema_id() {
1385        test_serde_json(
1386            r#"
1387{
1388    "type": "assert-current-schema-id",
1389    "current-schema-id": 4
1390}
1391        "#,
1392            TableRequirement::CurrentSchemaIdMatch {
1393                current_schema_id: 4,
1394            },
1395        );
1396    }
1397
1398    #[test]
1399    fn test_assert_last_assigned_partition_id() {
1400        test_serde_json(
1401            r#"
1402{
1403    "type": "assert-last-assigned-partition-id",
1404    "last-assigned-partition-id": 1004
1405}
1406        "#,
1407            TableRequirement::LastAssignedPartitionIdMatch {
1408                last_assigned_partition_id: 1004,
1409            },
1410        );
1411    }
1412
1413    #[test]
1414    fn test_assert_default_spec_id() {
1415        test_serde_json(
1416            r#"
1417{
1418    "type": "assert-default-spec-id",
1419    "default-spec-id": 5
1420}
1421        "#,
1422            TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
1423        );
1424    }
1425
1426    #[test]
1427    fn test_assert_default_sort_order() {
1428        let json = r#"
1429{
1430    "type": "assert-default-sort-order-id",
1431    "default-sort-order-id": 10
1432}
1433        "#;
1434
1435        let update = TableRequirement::DefaultSortOrderIdMatch {
1436            default_sort_order_id: 10,
1437        };
1438
1439        test_serde_json(json, update);
1440    }
1441
1442    #[test]
1443    fn test_parse_assert_invalid() {
1444        assert!(
1445            serde_json::from_str::<TableRequirement>(
1446                r#"
1447{
1448    "default-sort-order-id": 10
1449}
1450"#
1451            )
1452            .is_err(),
1453            "Table requirements should not be parsed without type."
1454        );
1455    }
1456
1457    #[test]
1458    fn test_assign_uuid() {
1459        test_serde_json(
1460            r#"
1461{
1462    "action": "assign-uuid",
1463    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1464}
1465        "#,
1466            TableUpdate::AssignUuid {
1467                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1468            },
1469        );
1470    }
1471
1472    #[test]
1473    fn test_upgrade_format_version() {
1474        test_serde_json(
1475            r#"
1476{
1477    "action": "upgrade-format-version",
1478    "format-version": 2
1479}
1480        "#,
1481            TableUpdate::UpgradeFormatVersion {
1482                format_version: FormatVersion::V2,
1483            },
1484        );
1485    }
1486
1487    #[test]
1488    fn test_add_schema() {
1489        let test_schema = Schema::builder()
1490            .with_schema_id(1)
1491            .with_identifier_field_ids(vec![2])
1492            .with_fields(vec![
1493                NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1494                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1495                NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1496            ])
1497            .build()
1498            .unwrap();
1499        test_serde_json(
1500            r#"
1501{
1502    "action": "add-schema",
1503    "schema": {
1504        "type": "struct",
1505        "schema-id": 1,
1506        "fields": [
1507            {
1508                "id": 1,
1509                "name": "foo",
1510                "required": false,
1511                "type": "string"
1512            },
1513            {
1514                "id": 2,
1515                "name": "bar",
1516                "required": true,
1517                "type": "int"
1518            },
1519            {
1520                "id": 3,
1521                "name": "baz",
1522                "required": false,
1523                "type": "boolean"
1524            }
1525        ],
1526        "identifier-field-ids": [
1527            2
1528        ]
1529    },
1530    "last-column-id": 3
1531}
1532        "#,
1533            TableUpdate::AddSchema {
1534                schema: test_schema.clone(),
1535            },
1536        );
1537
1538        test_serde_json(
1539            r#"
1540{
1541    "action": "add-schema",
1542    "schema": {
1543        "type": "struct",
1544        "schema-id": 1,
1545        "fields": [
1546            {
1547                "id": 1,
1548                "name": "foo",
1549                "required": false,
1550                "type": "string"
1551            },
1552            {
1553                "id": 2,
1554                "name": "bar",
1555                "required": true,
1556                "type": "int"
1557            },
1558            {
1559                "id": 3,
1560                "name": "baz",
1561                "required": false,
1562                "type": "boolean"
1563            }
1564        ],
1565        "identifier-field-ids": [
1566            2
1567        ]
1568    }
1569}
1570        "#,
1571            TableUpdate::AddSchema {
1572                schema: test_schema.clone(),
1573            },
1574        );
1575    }
1576
1577    #[test]
1578    fn test_set_current_schema() {
1579        test_serde_json(
1580            r#"
1581{
1582   "action": "set-current-schema",
1583   "schema-id": 23
1584}
1585        "#,
1586            TableUpdate::SetCurrentSchema { schema_id: 23 },
1587        );
1588    }
1589
1590    #[test]
1591    fn test_add_spec() {
1592        test_serde_json(
1593            r#"
1594{
1595    "action": "add-spec",
1596    "spec": {
1597        "fields": [
1598            {
1599                "source-id": 4,
1600                "name": "ts_day",
1601                "transform": "day"
1602            },
1603            {
1604                "source-id": 1,
1605                "name": "id_bucket",
1606                "transform": "bucket[16]"
1607            },
1608            {
1609                "source-id": 2,
1610                "name": "id_truncate",
1611                "transform": "truncate[4]"
1612            }
1613        ]
1614    }
1615}
1616        "#,
1617            TableUpdate::AddSpec {
1618                spec: UnboundPartitionSpec::builder()
1619                    .add_partition_field(4, "ts_day".to_string(), Transform::Day)
1620                    .unwrap()
1621                    .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
1622                    .unwrap()
1623                    .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
1624                    .unwrap()
1625                    .build(),
1626            },
1627        );
1628    }
1629
1630    #[test]
1631    fn test_set_default_spec() {
1632        test_serde_json(
1633            r#"
1634{
1635    "action": "set-default-spec",
1636    "spec-id": 1
1637}
1638        "#,
1639            TableUpdate::SetDefaultSpec { spec_id: 1 },
1640        )
1641    }
1642
1643    #[test]
1644    fn test_add_sort_order() {
1645        let json = r#"
1646{
1647    "action": "add-sort-order",
1648    "sort-order": {
1649        "order-id": 1,
1650        "fields": [
1651            {
1652                "transform": "identity",
1653                "source-id": 2,
1654                "direction": "asc",
1655                "null-order": "nulls-first"
1656            },
1657            {
1658                "transform": "bucket[4]",
1659                "source-id": 3,
1660                "direction": "desc",
1661                "null-order": "nulls-last"
1662            }
1663        ]
1664    }
1665}
1666        "#;
1667
1668        let update = TableUpdate::AddSortOrder {
1669            sort_order: SortOrder::builder()
1670                .with_order_id(1)
1671                .with_sort_field(
1672                    SortField::builder()
1673                        .source_id(2)
1674                        .direction(SortDirection::Ascending)
1675                        .null_order(NullOrder::First)
1676                        .transform(Transform::Identity)
1677                        .build(),
1678                )
1679                .with_sort_field(
1680                    SortField::builder()
1681                        .source_id(3)
1682                        .direction(SortDirection::Descending)
1683                        .null_order(NullOrder::Last)
1684                        .transform(Transform::Bucket(4))
1685                        .build(),
1686                )
1687                .build_unbound()
1688                .unwrap(),
1689        };
1690
1691        test_serde_json(json, update);
1692    }
1693
1694    #[test]
1695    fn test_set_default_order() {
1696        let json = r#"
1697{
1698    "action": "set-default-sort-order",
1699    "sort-order-id": 2
1700}
1701        "#;
1702        let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
1703
1704        test_serde_json(json, update);
1705    }
1706
1707    #[test]
1708    fn test_add_snapshot() {
1709        let json = r#"
1710{
1711    "action": "add-snapshot",
1712    "snapshot": {
1713        "snapshot-id": 3055729675574597000,
1714        "parent-snapshot-id": 3051729675574597000,
1715        "timestamp-ms": 1555100955770,
1716        "sequence-number": 1,
1717        "summary": {
1718            "operation": "append"
1719        },
1720        "manifest-list": "s3://a/b/2.avro",
1721        "schema-id": 1
1722    }
1723}
1724        "#;
1725
1726        let update = TableUpdate::AddSnapshot {
1727            snapshot: Snapshot::builder()
1728                .with_snapshot_id(3055729675574597000)
1729                .with_parent_snapshot_id(Some(3051729675574597000))
1730                .with_timestamp_ms(1555100955770)
1731                .with_sequence_number(1)
1732                .with_manifest_list("s3://a/b/2.avro")
1733                .with_schema_id(1)
1734                .with_summary(Summary {
1735                    operation: Operation::Append,
1736                    additional_properties: HashMap::default(),
1737                })
1738                .build(),
1739        };
1740
1741        test_serde_json(json, update);
1742    }
1743
1744    #[test]
1745    fn test_add_snapshot_v1() {
1746        let json = r#"
1747{
1748    "action": "add-snapshot",
1749    "snapshot": {
1750        "snapshot-id": 3055729675574597000,
1751        "parent-snapshot-id": 3051729675574597000,
1752        "timestamp-ms": 1555100955770,
1753        "summary": {
1754            "operation": "append"
1755        },
1756        "manifest-list": "s3://a/b/2.avro"
1757    }
1758}
1759    "#;
1760
1761        let update = TableUpdate::AddSnapshot {
1762            snapshot: Snapshot::builder()
1763                .with_snapshot_id(3055729675574597000)
1764                .with_parent_snapshot_id(Some(3051729675574597000))
1765                .with_timestamp_ms(1555100955770)
1766                .with_sequence_number(0)
1767                .with_manifest_list("s3://a/b/2.avro")
1768                .with_summary(Summary {
1769                    operation: Operation::Append,
1770                    additional_properties: HashMap::default(),
1771                })
1772                .build(),
1773        };
1774
1775        let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
1776        assert_eq!(actual, update, "Parsed value is not equal to expected");
1777    }
1778
1779    #[test]
1780    fn test_add_snapshot_v3() {
1781        let json = serde_json::json!(
1782        {
1783            "action": "add-snapshot",
1784            "snapshot": {
1785                "snapshot-id": 3055729675574597000i64,
1786                "parent-snapshot-id": 3051729675574597000i64,
1787                "timestamp-ms": 1555100955770i64,
1788                "first-row-id":0,
1789                "added-rows":2,
1790                "key-id":"key123",
1791                "summary": {
1792                    "operation": "append"
1793                },
1794                "manifest-list": "s3://a/b/2.avro"
1795            }
1796        });
1797
1798        let update = TableUpdate::AddSnapshot {
1799            snapshot: Snapshot::builder()
1800                .with_snapshot_id(3055729675574597000)
1801                .with_parent_snapshot_id(Some(3051729675574597000))
1802                .with_timestamp_ms(1555100955770)
1803                .with_sequence_number(0)
1804                .with_manifest_list("s3://a/b/2.avro")
1805                .with_row_range(0, 2)
1806                .with_encryption_key_id(Some("key123".to_string()))
1807                .with_summary(Summary {
1808                    operation: Operation::Append,
1809                    additional_properties: HashMap::default(),
1810                })
1811                .build(),
1812        };
1813
1814        let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json");
1815        assert_eq!(actual, update, "Parsed value is not equal to expected");
1816        let restored: TableUpdate = serde_json::from_str(
1817            &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1818        )
1819        .expect("Failed to parse from serialized json");
1820        assert_eq!(restored, update);
1821    }
1822
1823    #[test]
1824    fn test_remove_snapshots() {
1825        let json = r#"
1826{
1827    "action": "remove-snapshots",
1828    "snapshot-ids": [
1829        1,
1830        2
1831    ]
1832}
1833        "#;
1834
1835        let update = TableUpdate::RemoveSnapshots {
1836            snapshot_ids: vec![1, 2],
1837        };
1838        test_serde_json(json, update);
1839    }
1840
1841    #[test]
1842    fn test_remove_snapshot_ref() {
1843        let json = r#"
1844{
1845    "action": "remove-snapshot-ref",
1846    "ref-name": "snapshot-ref"
1847}
1848        "#;
1849
1850        let update = TableUpdate::RemoveSnapshotRef {
1851            ref_name: "snapshot-ref".to_string(),
1852        };
1853        test_serde_json(json, update);
1854    }
1855
1856    #[test]
1857    fn test_set_snapshot_ref_tag() {
1858        let json = r#"
1859{
1860    "action": "set-snapshot-ref",
1861    "type": "tag",
1862    "ref-name": "hank",
1863    "snapshot-id": 1,
1864    "max-ref-age-ms": 1
1865}
1866        "#;
1867
1868        let update = TableUpdate::SetSnapshotRef {
1869            ref_name: "hank".to_string(),
1870            reference: SnapshotReference {
1871                snapshot_id: 1,
1872                retention: SnapshotRetention::Tag {
1873                    max_ref_age_ms: Some(1),
1874                },
1875            },
1876        };
1877
1878        test_serde_json(json, update);
1879    }
1880
1881    #[test]
1882    fn test_set_snapshot_ref_branch() {
1883        let json = r#"
1884{
1885    "action": "set-snapshot-ref",
1886    "type": "branch",
1887    "ref-name": "hank",
1888    "snapshot-id": 1,
1889    "min-snapshots-to-keep": 2,
1890    "max-snapshot-age-ms": 3,
1891    "max-ref-age-ms": 4
1892}
1893        "#;
1894
1895        let update = TableUpdate::SetSnapshotRef {
1896            ref_name: "hank".to_string(),
1897            reference: SnapshotReference {
1898                snapshot_id: 1,
1899                retention: SnapshotRetention::Branch {
1900                    min_snapshots_to_keep: Some(2),
1901                    max_snapshot_age_ms: Some(3),
1902                    max_ref_age_ms: Some(4),
1903                },
1904            },
1905        };
1906
1907        test_serde_json(json, update);
1908    }
1909
1910    #[test]
1911    fn test_set_properties() {
1912        let json = r#"
1913{
1914    "action": "set-properties",
1915    "updates": {
1916        "prop1": "v1",
1917        "prop2": "v2"
1918    }
1919}
1920        "#;
1921
1922        let update = TableUpdate::SetProperties {
1923            updates: vec![
1924                ("prop1".to_string(), "v1".to_string()),
1925                ("prop2".to_string(), "v2".to_string()),
1926            ]
1927            .into_iter()
1928            .collect(),
1929        };
1930
1931        test_serde_json(json, update);
1932    }
1933
1934    #[test]
1935    fn test_remove_properties() {
1936        let json = r#"
1937{
1938    "action": "remove-properties",
1939    "removals": [
1940        "prop1",
1941        "prop2"
1942    ]
1943}
1944        "#;
1945
1946        let update = TableUpdate::RemoveProperties {
1947            removals: vec!["prop1".to_string(), "prop2".to_string()],
1948        };
1949
1950        test_serde_json(json, update);
1951    }
1952
1953    #[test]
1954    fn test_set_location() {
1955        let json = r#"
1956{
1957    "action": "set-location",
1958    "location": "s3://bucket/warehouse/tbl_location"
1959}
1960    "#;
1961
1962        let update = TableUpdate::SetLocation {
1963            location: "s3://bucket/warehouse/tbl_location".to_string(),
1964        };
1965
1966        test_serde_json(json, update);
1967    }
1968
1969    #[test]
1970    fn test_table_update_apply() {
1971        let table_creation = TableCreation::builder()
1972            .location("s3://db/table".to_string())
1973            .name("table".to_string())
1974            .properties(HashMap::new())
1975            .schema(Schema::builder().build().unwrap())
1976            .build();
1977        let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
1978            .unwrap()
1979            .build()
1980            .unwrap()
1981            .metadata;
1982        let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
1983            table_metadata,
1984            Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
1985        );
1986
1987        let uuid = uuid::Uuid::new_v4();
1988        let update = TableUpdate::AssignUuid { uuid };
1989        let updated_metadata = update
1990            .apply(table_metadata_builder)
1991            .unwrap()
1992            .build()
1993            .unwrap()
1994            .metadata;
1995        assert_eq!(updated_metadata.uuid(), uuid);
1996    }
1997
1998    #[test]
1999    fn test_view_assign_uuid() {
2000        test_serde_json(
2001            r#"
2002{
2003    "action": "assign-uuid",
2004    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
2005}
2006        "#,
2007            ViewUpdate::AssignUuid {
2008                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
2009            },
2010        );
2011    }
2012
2013    #[test]
2014    fn test_view_upgrade_format_version() {
2015        test_serde_json(
2016            r#"
2017{
2018    "action": "upgrade-format-version",
2019    "format-version": 1
2020}
2021        "#,
2022            ViewUpdate::UpgradeFormatVersion {
2023                format_version: ViewFormatVersion::V1,
2024            },
2025        );
2026    }
2027
2028    #[test]
2029    fn test_view_add_schema() {
2030        let test_schema = Schema::builder()
2031            .with_schema_id(1)
2032            .with_identifier_field_ids(vec![2])
2033            .with_fields(vec![
2034                NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
2035                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2036                NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
2037            ])
2038            .build()
2039            .unwrap();
2040        test_serde_json(
2041            r#"
2042{
2043    "action": "add-schema",
2044    "schema": {
2045        "type": "struct",
2046        "schema-id": 1,
2047        "fields": [
2048            {
2049                "id": 1,
2050                "name": "foo",
2051                "required": false,
2052                "type": "string"
2053            },
2054            {
2055                "id": 2,
2056                "name": "bar",
2057                "required": true,
2058                "type": "int"
2059            },
2060            {
2061                "id": 3,
2062                "name": "baz",
2063                "required": false,
2064                "type": "boolean"
2065            }
2066        ],
2067        "identifier-field-ids": [
2068            2
2069        ]
2070    },
2071    "last-column-id": 3
2072}
2073        "#,
2074            ViewUpdate::AddSchema {
2075                schema: test_schema.clone(),
2076                last_column_id: Some(3),
2077            },
2078        );
2079    }
2080
2081    #[test]
2082    fn test_view_set_location() {
2083        test_serde_json(
2084            r#"
2085{
2086    "action": "set-location",
2087    "location": "s3://db/view"
2088}
2089        "#,
2090            ViewUpdate::SetLocation {
2091                location: "s3://db/view".to_string(),
2092            },
2093        );
2094    }
2095
2096    #[test]
2097    fn test_view_set_properties() {
2098        test_serde_json(
2099            r#"
2100{
2101    "action": "set-properties",
2102    "updates": {
2103        "prop1": "v1",
2104        "prop2": "v2"
2105    }
2106}
2107        "#,
2108            ViewUpdate::SetProperties {
2109                updates: vec![
2110                    ("prop1".to_string(), "v1".to_string()),
2111                    ("prop2".to_string(), "v2".to_string()),
2112                ]
2113                .into_iter()
2114                .collect(),
2115            },
2116        );
2117    }
2118
2119    #[test]
2120    fn test_view_remove_properties() {
2121        test_serde_json(
2122            r#"
2123{
2124    "action": "remove-properties",
2125    "removals": [
2126        "prop1",
2127        "prop2"
2128    ]
2129}
2130        "#,
2131            ViewUpdate::RemoveProperties {
2132                removals: vec!["prop1".to_string(), "prop2".to_string()],
2133            },
2134        );
2135    }
2136
2137    #[test]
2138    fn test_view_add_view_version() {
2139        test_serde_json(
2140            r#"
2141{
2142    "action": "add-view-version",
2143    "view-version": {
2144            "version-id" : 1,
2145            "timestamp-ms" : 1573518431292,
2146            "schema-id" : 1,
2147            "default-catalog" : "prod",
2148            "default-namespace" : [ "default" ],
2149            "summary" : {
2150              "engine-name" : "Spark"
2151            },
2152            "representations" : [ {
2153              "type" : "sql",
2154              "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
2155              "dialect" : "spark"
2156            } ]
2157    }
2158}
2159        "#,
2160            ViewUpdate::AddViewVersion {
2161                view_version: ViewVersion::builder()
2162                    .with_version_id(1)
2163                    .with_timestamp_ms(1573518431292)
2164                    .with_schema_id(1)
2165                    .with_default_catalog(Some("prod".to_string()))
2166                    .with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
2167                    .with_summary(
2168                        vec![("engine-name".to_string(), "Spark".to_string())]
2169                            .into_iter()
2170                            .collect(),
2171                    )
2172                    .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
2173                        sql: "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
2174                        dialect: "spark".to_string(),
2175                    })]))
2176                    .build(),
2177            },
2178        );
2179    }
2180
2181    #[test]
2182    fn test_view_set_current_view_version() {
2183        test_serde_json(
2184            r#"
2185{
2186    "action": "set-current-view-version",
2187    "view-version-id": 1
2188}
2189        "#,
2190            ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
2191        );
2192    }
2193
2194    #[test]
2195    fn test_remove_partition_specs_update() {
2196        test_serde_json(
2197            r#"
2198{
2199    "action": "remove-partition-specs",
2200    "spec-ids": [1, 2]
2201}
2202        "#,
2203            TableUpdate::RemovePartitionSpecs {
2204                spec_ids: vec![1, 2],
2205            },
2206        );
2207    }
2208
2209    #[test]
2210    fn test_set_statistics_file() {
2211        test_serde_json(
2212            r#"
2213        {
2214                "action": "set-statistics",
2215                "snapshot-id": 1940541653261589030,
2216                "statistics": {
2217                        "snapshot-id": 1940541653261589030,
2218                        "statistics-path": "s3://bucket/warehouse/stats.puffin",
2219                        "file-size-in-bytes": 124,
2220                        "file-footer-size-in-bytes": 27,
2221                        "blob-metadata": [
2222                                {
2223                                        "type": "boring-type",
2224                                        "snapshot-id": 1940541653261589030,
2225                                        "sequence-number": 2,
2226                                        "fields": [
2227                                                1
2228                                        ],
2229                                        "properties": {
2230                                                "prop-key": "prop-value"
2231                                        }
2232                                }
2233                        ]
2234                }
2235        }
2236        "#,
2237            TableUpdate::SetStatistics {
2238                statistics: StatisticsFile {
2239                    snapshot_id: 1940541653261589030,
2240                    statistics_path: "s3://bucket/warehouse/stats.puffin".to_string(),
2241                    file_size_in_bytes: 124,
2242                    file_footer_size_in_bytes: 27,
2243                    key_metadata: None,
2244                    blob_metadata: vec![BlobMetadata {
2245                        r#type: "boring-type".to_string(),
2246                        snapshot_id: 1940541653261589030,
2247                        sequence_number: 2,
2248                        fields: vec![1],
2249                        properties: vec![("prop-key".to_string(), "prop-value".to_string())]
2250                            .into_iter()
2251                            .collect(),
2252                    }],
2253                },
2254            },
2255        );
2256    }
2257
2258    #[test]
2259    fn test_remove_statistics_file() {
2260        test_serde_json(
2261            r#"
2262        {
2263                "action": "remove-statistics",
2264                "snapshot-id": 1940541653261589030
2265        }
2266        "#,
2267            TableUpdate::RemoveStatistics {
2268                snapshot_id: 1940541653261589030,
2269            },
2270        );
2271    }
2272
2273    #[test]
2274    fn test_set_partition_statistics_file() {
2275        test_serde_json(
2276            r#"
2277            {
2278                "action": "set-partition-statistics",
2279                "partition-statistics": {
2280                    "snapshot-id": 1940541653261589030,
2281                    "statistics-path": "s3://bucket/warehouse/stats1.parquet",
2282                    "file-size-in-bytes": 43
2283                }
2284            }
2285            "#,
2286            TableUpdate::SetPartitionStatistics {
2287                partition_statistics: PartitionStatisticsFile {
2288                    snapshot_id: 1940541653261589030,
2289                    statistics_path: "s3://bucket/warehouse/stats1.parquet".to_string(),
2290                    file_size_in_bytes: 43,
2291                },
2292            },
2293        )
2294    }
2295
2296    #[test]
2297    fn test_remove_partition_statistics_file() {
2298        test_serde_json(
2299            r#"
2300            {
2301                "action": "remove-partition-statistics",
2302                "snapshot-id": 1940541653261589030
2303            }
2304            "#,
2305            TableUpdate::RemovePartitionStatistics {
2306                snapshot_id: 1940541653261589030,
2307            },
2308        )
2309    }
2310
2311    #[test]
2312    fn test_remove_schema_update() {
2313        test_serde_json(
2314            r#"
2315                {
2316                    "action": "remove-schemas",
2317                    "schema-ids": [1, 2]
2318                }        
2319            "#,
2320            TableUpdate::RemoveSchemas {
2321                schema_ids: vec![1, 2],
2322            },
2323        );
2324    }
2325
2326    #[test]
2327    fn test_add_encryption_key() {
2328        let key_bytes = "key".as_bytes();
2329        let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
2330        test_serde_json(
2331            format!(
2332                r#"
2333                {{
2334                    "action": "add-encryption-key",
2335                    "encryption-key": {{
2336                        "key-id": "a",
2337                        "encrypted-key-metadata": "{encoded_key}",
2338                        "encrypted-by-id": "b"
2339                    }}
2340                }}        
2341            "#
2342            ),
2343            TableUpdate::AddEncryptionKey {
2344                encryption_key: EncryptedKey::builder()
2345                    .key_id("a")
2346                    .encrypted_key_metadata(key_bytes.to_vec())
2347                    .encrypted_by_id("b")
2348                    .build(),
2349            },
2350        );
2351    }
2352
2353    #[test]
2354    fn test_remove_encryption_key() {
2355        test_serde_json(
2356            r#"
2357                {
2358                    "action": "remove-encryption-key",
2359                    "key-id": "a"
2360                }        
2361            "#,
2362            TableUpdate::RemoveEncryptionKey {
2363                key_id: "a".to_string(),
2364            },
2365        );
2366    }
2367
2368    #[test]
2369    fn test_table_commit() {
2370        let table = {
2371            let file = File::open(format!(
2372                "{}/testdata/table_metadata/{}",
2373                env!("CARGO_MANIFEST_DIR"),
2374                "TableMetadataV2Valid.json"
2375            ))
2376            .unwrap();
2377            let reader = BufReader::new(file);
2378            let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2379
2380            Table::builder()
2381                .metadata(resp)
2382                .metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string())
2383                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2384                .file_io(FileIO::new_with_memory())
2385                .build()
2386                .unwrap()
2387        };
2388
2389        let updates = vec![
2390            TableUpdate::SetLocation {
2391                location: "s3://bucket/test/new_location/data".to_string(),
2392            },
2393            TableUpdate::SetProperties {
2394                updates: vec![
2395                    ("prop1".to_string(), "v1".to_string()),
2396                    ("prop2".to_string(), "v2".to_string()),
2397                ]
2398                .into_iter()
2399                .collect(),
2400            },
2401        ];
2402
2403        let requirements = vec![TableRequirement::UuidMatch {
2404            uuid: table.metadata().table_uuid,
2405        }];
2406
2407        let table_commit = TableCommit::builder()
2408            .ident(table.identifier().to_owned())
2409            .updates(updates)
2410            .requirements(requirements)
2411            .build();
2412
2413        let updated_table = table_commit.apply(table).unwrap();
2414
2415        assert_eq!(
2416            updated_table.metadata().properties.get("prop1").unwrap(),
2417            "v1"
2418        );
2419        assert_eq!(
2420            updated_table.metadata().properties.get("prop2").unwrap(),
2421            "v2"
2422        );
2423
2424        // metadata version should be bumped
2425        assert!(
2426            updated_table
2427                .metadata_location()
2428                .unwrap()
2429                .starts_with("s3://bucket/test/location/metadata/00001-")
2430        );
2431
2432        assert_eq!(
2433            updated_table.metadata().location,
2434            "s3://bucket/test/new_location/data",
2435        );
2436    }
2437}