iceberg/catalog/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Catalog API for Apache Iceberg
19
20pub mod memory;
21mod metadata_location;
22
23use std::collections::HashMap;
24use std::fmt::{Debug, Display};
25use std::future::Future;
26use std::mem::take;
27use std::ops::Deref;
28use std::str::FromStr;
29use std::sync::Arc;
30
31use _serde::deserialize_snapshot;
32use async_trait::async_trait;
33pub use memory::MemoryCatalog;
34pub use metadata_location::*;
35#[cfg(test)]
36use mockall::automock;
37use serde_derive::{Deserialize, Serialize};
38use typed_builder::TypedBuilder;
39use uuid::Uuid;
40
41use crate::spec::{
42    FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot, SnapshotReference,
43    SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder, UnboundPartitionSpec,
44    ViewFormatVersion, ViewRepresentations, ViewVersion,
45};
46use crate::table::Table;
47use crate::{Error, ErrorKind, Result};
48
49/// The catalog API for Iceberg Rust.
50#[async_trait]
51#[cfg_attr(test, automock)]
52pub trait Catalog: Debug + Sync + Send {
53    /// List namespaces inside the catalog.
54    async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
55    -> Result<Vec<NamespaceIdent>>;
56
57    /// Create a new namespace inside the catalog.
58    async fn create_namespace(
59        &self,
60        namespace: &NamespaceIdent,
61        properties: HashMap<String, String>,
62    ) -> Result<Namespace>;
63
64    /// Get a namespace information from the catalog.
65    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
66
67    /// Check if namespace exists in catalog.
68    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
69
70    /// Update a namespace inside the catalog.
71    ///
72    /// # Behavior
73    ///
74    /// The properties must be the full set of namespace.
75    async fn update_namespace(
76        &self,
77        namespace: &NamespaceIdent,
78        properties: HashMap<String, String>,
79    ) -> Result<()>;
80
81    /// Drop a namespace from the catalog, or returns error if it doesn't exist.
82    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
83
84    /// List tables from namespace.
85    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
86
87    /// Create a new table inside the namespace.
88    async fn create_table(
89        &self,
90        namespace: &NamespaceIdent,
91        creation: TableCreation,
92    ) -> Result<Table>;
93
94    /// Load table from the catalog.
95    async fn load_table(&self, table: &TableIdent) -> Result<Table>;
96
97    /// Drop a table from the catalog, or returns error if it doesn't exist.
98    async fn drop_table(&self, table: &TableIdent) -> Result<()>;
99
100    /// Check if a table exists in the catalog.
101    async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
102
103    /// Rename a table in the catalog.
104    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
105
106    /// Register an existing table to the catalog.
107    async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table>;
108
109    /// Update a table to the catalog.
110    async fn update_table(&self, commit: TableCommit) -> Result<Table>;
111}
112
113/// Common interface for all catalog builders.
114pub trait CatalogBuilder: Default + Debug + Send + Sync {
115    /// The catalog type that this builder creates.
116    type C: Catalog;
117    /// Create a new catalog instance.
118    fn load(
119        self,
120        name: impl Into<String>,
121        props: HashMap<String, String>,
122    ) -> impl Future<Output = Result<Self::C>> + Send;
123}
124
125/// NamespaceIdent represents the identifier of a namespace in the catalog.
126///
127/// The namespace identifier is a list of strings, where each string is a
128/// component of the namespace. It's the catalog implementer's responsibility to
129/// handle the namespace identifier correctly.
130#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
131pub struct NamespaceIdent(Vec<String>);
132
133impl NamespaceIdent {
134    /// Create a new namespace identifier with only one level.
135    pub fn new(name: String) -> Self {
136        Self(vec![name])
137    }
138
139    /// Create a multi-level namespace identifier from vector.
140    pub fn from_vec(names: Vec<String>) -> Result<Self> {
141        if names.is_empty() {
142            return Err(Error::new(
143                ErrorKind::DataInvalid,
144                "Namespace identifier can't be empty!",
145            ));
146        }
147        Ok(Self(names))
148    }
149
150    /// Try to create namespace identifier from an iterator of string.
151    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
152        Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
153    }
154
155    /// Returns a string for used in url.
156    pub fn to_url_string(&self) -> String {
157        self.as_ref().join("\u{001f}")
158    }
159
160    /// Returns inner strings.
161    pub fn inner(self) -> Vec<String> {
162        self.0
163    }
164
165    /// Get the parent of this namespace.
166    /// Returns None if this namespace only has a single element and thus has no parent.
167    pub fn parent(&self) -> Option<Self> {
168        self.0.split_last().and_then(|(_, parent)| {
169            if parent.is_empty() {
170                None
171            } else {
172                Some(Self(parent.to_vec()))
173            }
174        })
175    }
176}
177
178impl AsRef<Vec<String>> for NamespaceIdent {
179    fn as_ref(&self) -> &Vec<String> {
180        &self.0
181    }
182}
183
184impl Deref for NamespaceIdent {
185    type Target = [String];
186
187    fn deref(&self) -> &Self::Target {
188        &self.0
189    }
190}
191
192/// Namespace represents a namespace in the catalog.
193#[derive(Debug, Clone, PartialEq, Eq)]
194pub struct Namespace {
195    name: NamespaceIdent,
196    properties: HashMap<String, String>,
197}
198
199impl Namespace {
200    /// Create a new namespace.
201    pub fn new(name: NamespaceIdent) -> Self {
202        Self::with_properties(name, HashMap::default())
203    }
204
205    /// Create a new namespace with properties.
206    pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
207        Self { name, properties }
208    }
209
210    /// Get the name of the namespace.
211    pub fn name(&self) -> &NamespaceIdent {
212        &self.name
213    }
214
215    /// Get the properties of the namespace.
216    pub fn properties(&self) -> &HashMap<String, String> {
217        &self.properties
218    }
219}
220
221impl Display for NamespaceIdent {
222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223        write!(f, "{}", self.0.join("."))
224    }
225}
226
227/// TableIdent represents the identifier of a table in the catalog.
228#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
229pub struct TableIdent {
230    /// Namespace of the table.
231    pub namespace: NamespaceIdent,
232    /// Table name.
233    pub name: String,
234}
235
236impl TableIdent {
237    /// Create a new table identifier.
238    pub fn new(namespace: NamespaceIdent, name: String) -> Self {
239        Self { namespace, name }
240    }
241
242    /// Get the namespace of the table.
243    pub fn namespace(&self) -> &NamespaceIdent {
244        &self.namespace
245    }
246
247    /// Get the name of the table.
248    pub fn name(&self) -> &str {
249        &self.name
250    }
251
252    /// Try to create table identifier from an iterator of string.
253    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
254        let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
255        let table_name = vec.pop().ok_or_else(|| {
256            Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
257        })?;
258        let namespace_ident = NamespaceIdent::from_vec(vec)?;
259
260        Ok(Self {
261            namespace: namespace_ident,
262            name: table_name,
263        })
264    }
265}
266
267impl Display for TableIdent {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        write!(f, "{}.{}", self.namespace, self.name)
270    }
271}
272
273/// TableCreation represents the creation of a table in the catalog.
274#[derive(Debug, TypedBuilder)]
275pub struct TableCreation {
276    /// The name of the table.
277    pub name: String,
278    /// The location of the table.
279    #[builder(default, setter(strip_option(fallback = location_opt)))]
280    pub location: Option<String>,
281    /// The schema of the table.
282    pub schema: Schema,
283    /// The partition spec of the table, could be None.
284    #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))]
285    pub partition_spec: Option<UnboundPartitionSpec>,
286    /// The sort order of the table.
287    #[builder(default, setter(strip_option(fallback = sort_order_opt)))]
288    pub sort_order: Option<SortOrder>,
289    /// The properties of the table.
290    #[builder(default, setter(transform = |props: impl IntoIterator<Item=(String, String)>| {
291        props.into_iter().collect()
292    }))]
293    pub properties: HashMap<String, String>,
294}
295
296/// TableCommit represents the commit of a table in the catalog.
297///
298/// The builder is marked as private since it's dangerous and error-prone to construct
299/// [`TableCommit`] directly.
300/// Users are supposed to use [`crate::transaction::Transaction`] to update table.
301#[derive(Debug, TypedBuilder)]
302#[builder(build_method(vis = "pub(crate)"))]
303pub struct TableCommit {
304    /// The table ident.
305    ident: TableIdent,
306    /// The requirements of the table.
307    ///
308    /// Commit will fail if the requirements are not met.
309    requirements: Vec<TableRequirement>,
310    /// The updates of the table.
311    updates: Vec<TableUpdate>,
312}
313
314impl TableCommit {
315    /// Return the table identifier.
316    pub fn identifier(&self) -> &TableIdent {
317        &self.ident
318    }
319
320    /// Take all requirements.
321    pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
322        take(&mut self.requirements)
323    }
324
325    /// Take all updates.
326    pub fn take_updates(&mut self) -> Vec<TableUpdate> {
327        take(&mut self.updates)
328    }
329
330    /// Applies this [`TableCommit`] to the given [`Table`] as part of a catalog update.
331    /// Typically used by [`Catalog::update_table`] to validate requirements and apply metadata updates.
332    ///
333    /// Returns a new [`Table`] with updated metadata,
334    /// or an error if validation or application fails.
335    pub fn apply(self, table: Table) -> Result<Table> {
336        // check requirements
337        for requirement in self.requirements {
338            requirement.check(Some(table.metadata()))?;
339        }
340
341        // get current metadata location
342        let current_metadata_location = table.metadata_location_result()?;
343
344        // apply updates to metadata builder
345        let mut metadata_builder = table
346            .metadata()
347            .clone()
348            .into_builder(Some(current_metadata_location.to_string()));
349        for update in self.updates {
350            metadata_builder = update.apply(metadata_builder)?;
351        }
352
353        // Bump the version of metadata
354        let new_metadata_location = MetadataLocation::from_str(current_metadata_location)?
355            .with_next_version()
356            .to_string();
357
358        Ok(table
359            .with_metadata(Arc::new(metadata_builder.build()?.metadata))
360            .with_metadata_location(new_metadata_location))
361    }
362}
363
364/// TableRequirement represents a requirement for a table in the catalog.
365#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
366#[serde(tag = "type")]
367pub enum TableRequirement {
368    /// The table must not already exist; used for create transactions
369    #[serde(rename = "assert-create")]
370    NotExist,
371    /// The table UUID must match the requirement.
372    #[serde(rename = "assert-table-uuid")]
373    UuidMatch {
374        /// Uuid of original table.
375        uuid: Uuid,
376    },
377    /// The table branch or tag identified by the requirement's `reference` must
378    /// reference the requirement's `snapshot-id`.
379    #[serde(rename = "assert-ref-snapshot-id")]
380    RefSnapshotIdMatch {
381        /// The reference of the table to assert.
382        r#ref: String,
383        /// The snapshot id of the table to assert.
384        /// If the id is `None`, the ref must not already exist.
385        #[serde(rename = "snapshot-id")]
386        snapshot_id: Option<i64>,
387    },
388    /// The table's last assigned column id must match the requirement.
389    #[serde(rename = "assert-last-assigned-field-id")]
390    LastAssignedFieldIdMatch {
391        /// The last assigned field id of the table to assert.
392        #[serde(rename = "last-assigned-field-id")]
393        last_assigned_field_id: i32,
394    },
395    /// The table's current schema id must match the requirement.
396    #[serde(rename = "assert-current-schema-id")]
397    CurrentSchemaIdMatch {
398        /// Current schema id of the table to assert.
399        #[serde(rename = "current-schema-id")]
400        current_schema_id: SchemaId,
401    },
402    /// The table's last assigned partition id must match the
403    /// requirement.
404    #[serde(rename = "assert-last-assigned-partition-id")]
405    LastAssignedPartitionIdMatch {
406        /// Last assigned partition id of the table to assert.
407        #[serde(rename = "last-assigned-partition-id")]
408        last_assigned_partition_id: i32,
409    },
410    /// The table's default spec id must match the requirement.
411    #[serde(rename = "assert-default-spec-id")]
412    DefaultSpecIdMatch {
413        /// Default spec id of the table to assert.
414        #[serde(rename = "default-spec-id")]
415        default_spec_id: i32,
416    },
417    /// The table's default sort order id must match the requirement.
418    #[serde(rename = "assert-default-sort-order-id")]
419    DefaultSortOrderIdMatch {
420        /// Default sort order id of the table to assert.
421        #[serde(rename = "default-sort-order-id")]
422        default_sort_order_id: i64,
423    },
424}
425
426/// TableUpdate represents an update to a table in the catalog.
427#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
428#[serde(tag = "action", rename_all = "kebab-case")]
429#[allow(clippy::large_enum_variant)]
430pub enum TableUpdate {
431    /// Upgrade table's format version
432    #[serde(rename_all = "kebab-case")]
433    UpgradeFormatVersion {
434        /// Target format upgrade to.
435        format_version: FormatVersion,
436    },
437    /// Assign a new UUID to the table
438    #[serde(rename_all = "kebab-case")]
439    AssignUuid {
440        /// The new UUID to assign.
441        uuid: Uuid,
442    },
443    /// Add a new schema to the table
444    #[serde(rename_all = "kebab-case")]
445    AddSchema {
446        /// The schema to add.
447        schema: Schema,
448    },
449    /// Set table's current schema
450    #[serde(rename_all = "kebab-case")]
451    SetCurrentSchema {
452        /// Schema ID to set as current, or -1 to set last added schema
453        schema_id: i32,
454    },
455    /// Add a new partition spec to the table
456    AddSpec {
457        /// The partition spec to add.
458        spec: UnboundPartitionSpec,
459    },
460    /// Set table's default spec
461    #[serde(rename_all = "kebab-case")]
462    SetDefaultSpec {
463        /// Partition spec ID to set as the default, or -1 to set last added spec
464        spec_id: i32,
465    },
466    /// Add sort order to table.
467    #[serde(rename_all = "kebab-case")]
468    AddSortOrder {
469        /// Sort order to add.
470        sort_order: SortOrder,
471    },
472    /// Set table's default sort order
473    #[serde(rename_all = "kebab-case")]
474    SetDefaultSortOrder {
475        /// Sort order ID to set as the default, or -1 to set last added sort order
476        sort_order_id: i64,
477    },
478    /// Add snapshot to table.
479    #[serde(rename_all = "kebab-case")]
480    AddSnapshot {
481        /// Snapshot to add.
482        #[serde(deserialize_with = "deserialize_snapshot")]
483        snapshot: Snapshot,
484    },
485    /// Set table's snapshot ref.
486    #[serde(rename_all = "kebab-case")]
487    SetSnapshotRef {
488        /// Name of snapshot reference to set.
489        ref_name: String,
490        /// Snapshot reference to set.
491        #[serde(flatten)]
492        reference: SnapshotReference,
493    },
494    /// Remove table's snapshots
495    #[serde(rename_all = "kebab-case")]
496    RemoveSnapshots {
497        /// Snapshot ids to remove.
498        snapshot_ids: Vec<i64>,
499    },
500    /// Remove snapshot reference
501    #[serde(rename_all = "kebab-case")]
502    RemoveSnapshotRef {
503        /// Name of snapshot reference to remove.
504        ref_name: String,
505    },
506    /// Update table's location
507    SetLocation {
508        /// New location for table.
509        location: String,
510    },
511    /// Update table's properties
512    SetProperties {
513        /// Properties to update for table.
514        updates: HashMap<String, String>,
515    },
516    /// Remove table's properties
517    RemoveProperties {
518        /// Properties to remove
519        removals: Vec<String>,
520    },
521    /// Remove partition specs
522    #[serde(rename_all = "kebab-case")]
523    RemovePartitionSpecs {
524        /// Partition spec ids to remove.
525        spec_ids: Vec<i32>,
526    },
527    /// Set statistics for a snapshot
528    #[serde(with = "_serde_set_statistics")]
529    SetStatistics {
530        /// File containing the statistics
531        statistics: StatisticsFile,
532    },
533    /// Remove statistics for a snapshot
534    #[serde(rename_all = "kebab-case")]
535    RemoveStatistics {
536        /// Snapshot id to remove statistics for.
537        snapshot_id: i64,
538    },
539    /// Set partition statistics for a snapshot
540    #[serde(rename_all = "kebab-case")]
541    SetPartitionStatistics {
542        /// File containing the partition statistics
543        partition_statistics: PartitionStatisticsFile,
544    },
545    /// Remove partition statistics for a snapshot
546    #[serde(rename_all = "kebab-case")]
547    RemovePartitionStatistics {
548        /// Snapshot id to remove partition statistics for.
549        snapshot_id: i64,
550    },
551    /// Remove schemas
552    #[serde(rename_all = "kebab-case")]
553    RemoveSchemas {
554        /// Schema IDs to remove.
555        schema_ids: Vec<i32>,
556    },
557}
558
559impl TableUpdate {
560    /// Applies the update to the table metadata builder.
561    pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
562        match self {
563            TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
564            TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
565            TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
566            TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
567            TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
568            TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
569            TableUpdate::SetDefaultSortOrder { sort_order_id } => {
570                builder.set_default_sort_order(sort_order_id)
571            }
572            TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
573            TableUpdate::SetSnapshotRef {
574                ref_name,
575                reference,
576            } => builder.set_ref(&ref_name, reference),
577            TableUpdate::RemoveSnapshots { snapshot_ids } => {
578                Ok(builder.remove_snapshots(&snapshot_ids))
579            }
580            TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
581            TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
582            TableUpdate::SetProperties { updates } => builder.set_properties(updates),
583            TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
584            TableUpdate::UpgradeFormatVersion { format_version } => {
585                builder.upgrade_format_version(format_version)
586            }
587            TableUpdate::RemovePartitionSpecs { spec_ids } => {
588                builder.remove_partition_specs(&spec_ids)
589            }
590            TableUpdate::SetStatistics { statistics } => Ok(builder.set_statistics(statistics)),
591            TableUpdate::RemoveStatistics { snapshot_id } => {
592                Ok(builder.remove_statistics(snapshot_id))
593            }
594            TableUpdate::SetPartitionStatistics {
595                partition_statistics,
596            } => Ok(builder.set_partition_statistics(partition_statistics)),
597            TableUpdate::RemovePartitionStatistics { snapshot_id } => {
598                Ok(builder.remove_partition_statistics(snapshot_id))
599            }
600            TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
601        }
602    }
603}
604
605impl TableRequirement {
606    /// Check that the requirement is met by the table metadata.
607    /// If the requirement is not met, an appropriate error is returned.
608    ///
609    /// Provide metadata as `None` if the table does not exist.
610    pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
611        if let Some(metadata) = metadata {
612            match self {
613                TableRequirement::NotExist => {
614                    return Err(Error::new(
615                        ErrorKind::CatalogCommitConflicts,
616                        format!(
617                            "Requirement failed: Table with id {} already exists",
618                            metadata.uuid()
619                        ),
620                    )
621                    .with_retryable(true));
622                }
623                TableRequirement::UuidMatch { uuid } => {
624                    if &metadata.uuid() != uuid {
625                        return Err(Error::new(
626                            ErrorKind::CatalogCommitConflicts,
627                            "Requirement failed: Table UUID does not match",
628                        )
629                        .with_context("expected", *uuid)
630                        .with_context("found", metadata.uuid())
631                        .with_retryable(true));
632                    }
633                }
634                TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
635                    // ToDo: Harmonize the types of current_schema_id
636                    if metadata.current_schema_id != *current_schema_id {
637                        return Err(Error::new(
638                            ErrorKind::CatalogCommitConflicts,
639                            "Requirement failed: Current schema id does not match",
640                        )
641                        .with_context("expected", current_schema_id.to_string())
642                        .with_context("found", metadata.current_schema_id.to_string())
643                        .with_retryable(true));
644                    }
645                }
646                TableRequirement::DefaultSortOrderIdMatch {
647                    default_sort_order_id,
648                } => {
649                    if metadata.default_sort_order().order_id != *default_sort_order_id {
650                        return Err(Error::new(
651                            ErrorKind::CatalogCommitConflicts,
652                            "Requirement failed: Default sort order id does not match",
653                        )
654                        .with_context("expected", default_sort_order_id.to_string())
655                        .with_context("found", metadata.default_sort_order().order_id.to_string())
656                        .with_retryable(true));
657                    }
658                }
659                TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
660                    let snapshot_ref = metadata.snapshot_for_ref(r#ref);
661                    if let Some(snapshot_id) = snapshot_id {
662                        let snapshot_ref = snapshot_ref.ok_or(
663                            Error::new(
664                                ErrorKind::CatalogCommitConflicts,
665                                format!("Requirement failed: Branch or tag `{}` not found", r#ref),
666                            )
667                            .with_retryable(true),
668                        )?;
669                        if snapshot_ref.snapshot_id() != *snapshot_id {
670                            return Err(Error::new(
671                                ErrorKind::CatalogCommitConflicts,
672                                format!(
673                                    "Requirement failed: Branch or tag `{}`'s snapshot has changed",
674                                    r#ref
675                                ),
676                            )
677                            .with_context("expected", snapshot_id.to_string())
678                            .with_context("found", snapshot_ref.snapshot_id().to_string())
679                            .with_retryable(true));
680                        }
681                    } else if snapshot_ref.is_some() {
682                        // a null snapshot ID means the ref should not exist already
683                        return Err(Error::new(
684                            ErrorKind::CatalogCommitConflicts,
685                            format!(
686                                "Requirement failed: Branch or tag `{}` already exists",
687                                r#ref
688                            ),
689                        )
690                        .with_retryable(true));
691                    }
692                }
693                TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
694                    // ToDo: Harmonize the types of default_spec_id
695                    if metadata.default_partition_spec_id() != *default_spec_id {
696                        return Err(Error::new(
697                            ErrorKind::CatalogCommitConflicts,
698                            "Requirement failed: Default partition spec id does not match",
699                        )
700                        .with_context("expected", default_spec_id.to_string())
701                        .with_context("found", metadata.default_partition_spec_id().to_string())
702                        .with_retryable(true));
703                    }
704                }
705                TableRequirement::LastAssignedPartitionIdMatch {
706                    last_assigned_partition_id,
707                } => {
708                    if metadata.last_partition_id != *last_assigned_partition_id {
709                        return Err(Error::new(
710                            ErrorKind::CatalogCommitConflicts,
711                            "Requirement failed: Last assigned partition id does not match",
712                        )
713                        .with_context("expected", last_assigned_partition_id.to_string())
714                        .with_context("found", metadata.last_partition_id.to_string())
715                        .with_retryable(true));
716                    }
717                }
718                TableRequirement::LastAssignedFieldIdMatch {
719                    last_assigned_field_id,
720                } => {
721                    if &metadata.last_column_id != last_assigned_field_id {
722                        return Err(Error::new(
723                            ErrorKind::CatalogCommitConflicts,
724                            "Requirement failed: Last assigned field id does not match",
725                        )
726                        .with_context("expected", last_assigned_field_id.to_string())
727                        .with_context("found", metadata.last_column_id.to_string())
728                        .with_retryable(true));
729                    }
730                }
731            };
732        } else {
733            match self {
734                TableRequirement::NotExist => {}
735                _ => {
736                    return Err(Error::new(
737                        ErrorKind::TableNotFound,
738                        "Requirement failed: Table does not exist",
739                    ));
740                }
741            }
742        }
743
744        Ok(())
745    }
746}
747
748pub(super) mod _serde {
749    use serde::{Deserialize as _, Deserializer};
750
751    use super::*;
752    use crate::spec::{SchemaId, Summary};
753
754    pub(super) fn deserialize_snapshot<'de, D>(
755        deserializer: D,
756    ) -> std::result::Result<Snapshot, D::Error>
757    where D: Deserializer<'de> {
758        let buf = CatalogSnapshot::deserialize(deserializer)?;
759        Ok(buf.into())
760    }
761
762    #[derive(Debug, Deserialize, PartialEq, Eq)]
763    #[serde(rename_all = "kebab-case")]
764    /// Defines the structure of a v2 snapshot for the catalog.
765    /// Main difference to SnapshotV2 is that sequence-number is optional
766    /// in the rest catalog spec to allow for backwards compatibility with v1.
767    struct CatalogSnapshot {
768        snapshot_id: i64,
769        #[serde(skip_serializing_if = "Option::is_none")]
770        parent_snapshot_id: Option<i64>,
771        #[serde(default)]
772        sequence_number: i64,
773        timestamp_ms: i64,
774        manifest_list: String,
775        summary: Summary,
776        #[serde(skip_serializing_if = "Option::is_none")]
777        schema_id: Option<SchemaId>,
778    }
779
780    impl From<CatalogSnapshot> for Snapshot {
781        fn from(snapshot: CatalogSnapshot) -> Self {
782            let CatalogSnapshot {
783                snapshot_id,
784                parent_snapshot_id,
785                sequence_number,
786                timestamp_ms,
787                manifest_list,
788                schema_id,
789                summary,
790            } = snapshot;
791            let builder = Snapshot::builder()
792                .with_snapshot_id(snapshot_id)
793                .with_parent_snapshot_id(parent_snapshot_id)
794                .with_sequence_number(sequence_number)
795                .with_timestamp_ms(timestamp_ms)
796                .with_manifest_list(manifest_list)
797                .with_summary(summary);
798            if let Some(schema_id) = schema_id {
799                builder.with_schema_id(schema_id).build()
800            } else {
801                builder.build()
802            }
803        }
804    }
805}
806
807/// ViewCreation represents the creation of a view in the catalog.
808#[derive(Debug, TypedBuilder)]
809pub struct ViewCreation {
810    /// The name of the view.
811    pub name: String,
812    /// The view's base location; used to create metadata file locations
813    pub location: String,
814    /// Representations for the view.
815    pub representations: ViewRepresentations,
816    /// The schema of the view.
817    pub schema: Schema,
818    /// The properties of the view.
819    #[builder(default)]
820    pub properties: HashMap<String, String>,
821    /// The default namespace to use when a reference in the SELECT is a single identifier
822    pub default_namespace: NamespaceIdent,
823    /// Default catalog to use when a reference in the SELECT does not contain a catalog
824    #[builder(default)]
825    pub default_catalog: Option<String>,
826    /// A string to string map of summary metadata about the version
827    /// Typical keys are "engine-name" and "engine-version"
828    #[builder(default)]
829    pub summary: HashMap<String, String>,
830}
831
832/// ViewUpdate represents an update to a view in the catalog.
833#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
834#[serde(tag = "action", rename_all = "kebab-case")]
835#[allow(clippy::large_enum_variant)]
836pub enum ViewUpdate {
837    /// Assign a new UUID to the view
838    #[serde(rename_all = "kebab-case")]
839    AssignUuid {
840        /// The new UUID to assign.
841        uuid: Uuid,
842    },
843    /// Upgrade view's format version
844    #[serde(rename_all = "kebab-case")]
845    UpgradeFormatVersion {
846        /// Target format upgrade to.
847        format_version: ViewFormatVersion,
848    },
849    /// Add a new schema to the view
850    #[serde(rename_all = "kebab-case")]
851    AddSchema {
852        /// The schema to add.
853        schema: Schema,
854        /// The last column id of the view.
855        last_column_id: Option<i32>,
856    },
857    /// Set view's current schema
858    #[serde(rename_all = "kebab-case")]
859    SetLocation {
860        /// New location for view.
861        location: String,
862    },
863    /// Set view's properties
864    ///
865    /// Matching keys are updated, and non-matching keys are left unchanged.
866    #[serde(rename_all = "kebab-case")]
867    SetProperties {
868        /// Properties to update for view.
869        updates: HashMap<String, String>,
870    },
871    /// Remove view's properties
872    #[serde(rename_all = "kebab-case")]
873    RemoveProperties {
874        /// Properties to remove
875        removals: Vec<String>,
876    },
877    /// Add a new version to the view
878    #[serde(rename_all = "kebab-case")]
879    AddViewVersion {
880        /// The view version to add.
881        view_version: ViewVersion,
882    },
883    /// Set view's current version
884    #[serde(rename_all = "kebab-case")]
885    SetCurrentViewVersion {
886        /// View version id to set as current, or -1 to set last added version
887        view_version_id: i32,
888    },
889}
890
891mod _serde_set_statistics {
892    // The rest spec requires an additional field `snapshot-id`
893    // that is redundant with the `snapshot_id` field in the statistics file.
894    use serde::{Deserialize, Deserializer, Serialize, Serializer};
895
896    use super::*;
897
898    #[derive(Debug, Serialize, Deserialize)]
899    #[serde(rename_all = "kebab-case")]
900    struct SetStatistics {
901        snapshot_id: Option<i64>,
902        statistics: StatisticsFile,
903    }
904
905    pub fn serialize<S>(
906        value: &StatisticsFile,
907        serializer: S,
908    ) -> std::result::Result<S::Ok, S::Error>
909    where
910        S: Serializer,
911    {
912        SetStatistics {
913            snapshot_id: Some(value.snapshot_id),
914            statistics: value.clone(),
915        }
916        .serialize(serializer)
917    }
918
919    pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<StatisticsFile, D::Error>
920    where D: Deserializer<'de> {
921        let SetStatistics {
922            snapshot_id,
923            statistics,
924        } = SetStatistics::deserialize(deserializer)?;
925        if let Some(snapshot_id) = snapshot_id {
926            if snapshot_id != statistics.snapshot_id {
927                return Err(serde::de::Error::custom(format!(
928                    "Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
929                    statistics.snapshot_id
930                )));
931            }
932        }
933
934        Ok(statistics)
935    }
936}
937
938#[cfg(test)]
939mod tests {
940    use std::collections::HashMap;
941    use std::fmt::Debug;
942    use std::fs::File;
943    use std::io::BufReader;
944
945    use serde::Serialize;
946    use serde::de::DeserializeOwned;
947    use uuid::uuid;
948
949    use super::ViewUpdate;
950    use crate::io::FileIOBuilder;
951    use crate::spec::{
952        BlobMetadata, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
953        PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
954        SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
955        StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
956        UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
957        ViewVersion,
958    };
959    use crate::table::Table;
960    use crate::{
961        NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
962    };
963
964    #[test]
965    fn test_parent_namespace() {
966        let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
967        let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
968        let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
969
970        assert_eq!(ns1.parent(), None);
971        assert_eq!(ns2.parent(), Some(ns1.clone()));
972        assert_eq!(ns3.parent(), Some(ns2.clone()));
973    }
974
975    #[test]
976    fn test_create_table_id() {
977        let table_id = TableIdent {
978            namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
979            name: "t1".to_string(),
980        };
981
982        assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
983    }
984
985    #[test]
986    fn test_table_creation_iterator_properties() {
987        let builder = TableCreation::builder()
988            .name("table".to_string())
989            .schema(Schema::builder().build().unwrap());
990
991        fn s(k: &str, v: &str) -> (String, String) {
992            (k.to_string(), v.to_string())
993        }
994
995        let table_creation = builder
996            .properties([s("key", "value"), s("foo", "bar")])
997            .build();
998
999        assert_eq!(
1000            HashMap::from([s("key", "value"), s("foo", "bar")]),
1001            table_creation.properties
1002        );
1003    }
1004
1005    fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
1006        json: impl ToString,
1007        expected: T,
1008    ) {
1009        let json_str = json.to_string();
1010        let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
1011        assert_eq!(actual, expected, "Parsed value is not equal to expected");
1012
1013        let restored: T = serde_json::from_str(
1014            &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1015        )
1016        .expect("Failed to parse from serialized json");
1017
1018        assert_eq!(
1019            restored, expected,
1020            "Parsed restored value is not equal to expected"
1021        );
1022    }
1023
1024    fn metadata() -> TableMetadata {
1025        let tbl_creation = TableCreation::builder()
1026            .name("table".to_string())
1027            .location("/path/to/table".to_string())
1028            .schema(Schema::builder().build().unwrap())
1029            .build();
1030
1031        TableMetadataBuilder::from_table_creation(tbl_creation)
1032            .unwrap()
1033            .assign_uuid(uuid::Uuid::nil())
1034            .build()
1035            .unwrap()
1036            .metadata
1037    }
1038
1039    #[test]
1040    fn test_check_requirement_not_exist() {
1041        let metadata = metadata();
1042        let requirement = TableRequirement::NotExist;
1043
1044        assert!(requirement.check(Some(&metadata)).is_err());
1045        assert!(requirement.check(None).is_ok());
1046    }
1047
1048    #[test]
1049    fn test_check_table_uuid() {
1050        let metadata = metadata();
1051
1052        let requirement = TableRequirement::UuidMatch {
1053            uuid: uuid::Uuid::now_v7(),
1054        };
1055        assert!(requirement.check(Some(&metadata)).is_err());
1056
1057        let requirement = TableRequirement::UuidMatch {
1058            uuid: uuid::Uuid::nil(),
1059        };
1060        assert!(requirement.check(Some(&metadata)).is_ok());
1061    }
1062
1063    #[test]
1064    fn test_check_ref_snapshot_id() {
1065        let metadata = metadata();
1066
1067        // Ref does not exist but should
1068        let requirement = TableRequirement::RefSnapshotIdMatch {
1069            r#ref: "my_branch".to_string(),
1070            snapshot_id: Some(1),
1071        };
1072        assert!(requirement.check(Some(&metadata)).is_err());
1073
1074        // Ref does not exist and should not
1075        let requirement = TableRequirement::RefSnapshotIdMatch {
1076            r#ref: "my_branch".to_string(),
1077            snapshot_id: None,
1078        };
1079        assert!(requirement.check(Some(&metadata)).is_ok());
1080
1081        // Add snapshot
1082        let record = r#"
1083        {
1084            "snapshot-id": 3051729675574597004,
1085            "sequence-number": 10,
1086            "timestamp-ms": 9992191116217,
1087            "summary": {
1088                "operation": "append"
1089            },
1090            "manifest-list": "s3://b/wh/.../s1.avro",
1091            "schema-id": 0
1092        }
1093        "#;
1094
1095        let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
1096        let builder = metadata.into_builder(None);
1097        let builder = TableUpdate::AddSnapshot {
1098            snapshot: snapshot.clone(),
1099        }
1100        .apply(builder)
1101        .unwrap();
1102        let metadata = TableUpdate::SetSnapshotRef {
1103            ref_name: MAIN_BRANCH.to_string(),
1104            reference: SnapshotReference {
1105                snapshot_id: snapshot.snapshot_id(),
1106                retention: SnapshotRetention::Branch {
1107                    min_snapshots_to_keep: Some(10),
1108                    max_snapshot_age_ms: None,
1109                    max_ref_age_ms: None,
1110                },
1111            },
1112        }
1113        .apply(builder)
1114        .unwrap()
1115        .build()
1116        .unwrap()
1117        .metadata;
1118
1119        // Ref exists and should match
1120        let requirement = TableRequirement::RefSnapshotIdMatch {
1121            r#ref: "main".to_string(),
1122            snapshot_id: Some(3051729675574597004),
1123        };
1124        assert!(requirement.check(Some(&metadata)).is_ok());
1125
1126        // Ref exists but does not match
1127        let requirement = TableRequirement::RefSnapshotIdMatch {
1128            r#ref: "main".to_string(),
1129            snapshot_id: Some(1),
1130        };
1131        assert!(requirement.check(Some(&metadata)).is_err());
1132    }
1133
1134    #[test]
1135    fn test_check_last_assigned_field_id() {
1136        let metadata = metadata();
1137
1138        let requirement = TableRequirement::LastAssignedFieldIdMatch {
1139            last_assigned_field_id: 1,
1140        };
1141        assert!(requirement.check(Some(&metadata)).is_err());
1142
1143        let requirement = TableRequirement::LastAssignedFieldIdMatch {
1144            last_assigned_field_id: 0,
1145        };
1146        assert!(requirement.check(Some(&metadata)).is_ok());
1147    }
1148
1149    #[test]
1150    fn test_check_current_schema_id() {
1151        let metadata = metadata();
1152
1153        let requirement = TableRequirement::CurrentSchemaIdMatch {
1154            current_schema_id: 1,
1155        };
1156        assert!(requirement.check(Some(&metadata)).is_err());
1157
1158        let requirement = TableRequirement::CurrentSchemaIdMatch {
1159            current_schema_id: 0,
1160        };
1161        assert!(requirement.check(Some(&metadata)).is_ok());
1162    }
1163
1164    #[test]
1165    fn test_check_last_assigned_partition_id() {
1166        let metadata = metadata();
1167        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1168            last_assigned_partition_id: 0,
1169        };
1170        assert!(requirement.check(Some(&metadata)).is_err());
1171
1172        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1173            last_assigned_partition_id: 999,
1174        };
1175        assert!(requirement.check(Some(&metadata)).is_ok());
1176    }
1177
1178    #[test]
1179    fn test_check_default_spec_id() {
1180        let metadata = metadata();
1181
1182        let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
1183        assert!(requirement.check(Some(&metadata)).is_err());
1184
1185        let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
1186        assert!(requirement.check(Some(&metadata)).is_ok());
1187    }
1188
1189    #[test]
1190    fn test_check_default_sort_order_id() {
1191        let metadata = metadata();
1192
1193        let requirement = TableRequirement::DefaultSortOrderIdMatch {
1194            default_sort_order_id: 1,
1195        };
1196        assert!(requirement.check(Some(&metadata)).is_err());
1197
1198        let requirement = TableRequirement::DefaultSortOrderIdMatch {
1199            default_sort_order_id: 0,
1200        };
1201        assert!(requirement.check(Some(&metadata)).is_ok());
1202    }
1203
1204    #[test]
1205    fn test_table_uuid() {
1206        test_serde_json(
1207            r#"
1208{
1209    "type": "assert-table-uuid",
1210    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1211}
1212        "#,
1213            TableRequirement::UuidMatch {
1214                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1215            },
1216        );
1217    }
1218
1219    #[test]
1220    fn test_assert_table_not_exists() {
1221        test_serde_json(
1222            r#"
1223{
1224    "type": "assert-create"
1225}
1226        "#,
1227            TableRequirement::NotExist,
1228        );
1229    }
1230
1231    #[test]
1232    fn test_assert_ref_snapshot_id() {
1233        test_serde_json(
1234            r#"
1235{
1236    "type": "assert-ref-snapshot-id",
1237    "ref": "snapshot-name",
1238    "snapshot-id": null
1239}
1240        "#,
1241            TableRequirement::RefSnapshotIdMatch {
1242                r#ref: "snapshot-name".to_string(),
1243                snapshot_id: None,
1244            },
1245        );
1246
1247        test_serde_json(
1248            r#"
1249{
1250    "type": "assert-ref-snapshot-id",
1251    "ref": "snapshot-name",
1252    "snapshot-id": 1
1253}
1254        "#,
1255            TableRequirement::RefSnapshotIdMatch {
1256                r#ref: "snapshot-name".to_string(),
1257                snapshot_id: Some(1),
1258            },
1259        );
1260    }
1261
1262    #[test]
1263    fn test_assert_last_assigned_field_id() {
1264        test_serde_json(
1265            r#"
1266{
1267    "type": "assert-last-assigned-field-id",
1268    "last-assigned-field-id": 12
1269}
1270        "#,
1271            TableRequirement::LastAssignedFieldIdMatch {
1272                last_assigned_field_id: 12,
1273            },
1274        );
1275    }
1276
1277    #[test]
1278    fn test_assert_current_schema_id() {
1279        test_serde_json(
1280            r#"
1281{
1282    "type": "assert-current-schema-id",
1283    "current-schema-id": 4
1284}
1285        "#,
1286            TableRequirement::CurrentSchemaIdMatch {
1287                current_schema_id: 4,
1288            },
1289        );
1290    }
1291
1292    #[test]
1293    fn test_assert_last_assigned_partition_id() {
1294        test_serde_json(
1295            r#"
1296{
1297    "type": "assert-last-assigned-partition-id",
1298    "last-assigned-partition-id": 1004
1299}
1300        "#,
1301            TableRequirement::LastAssignedPartitionIdMatch {
1302                last_assigned_partition_id: 1004,
1303            },
1304        );
1305    }
1306
1307    #[test]
1308    fn test_assert_default_spec_id() {
1309        test_serde_json(
1310            r#"
1311{
1312    "type": "assert-default-spec-id",
1313    "default-spec-id": 5
1314}
1315        "#,
1316            TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
1317        );
1318    }
1319
1320    #[test]
1321    fn test_assert_default_sort_order() {
1322        let json = r#"
1323{
1324    "type": "assert-default-sort-order-id",
1325    "default-sort-order-id": 10
1326}
1327        "#;
1328
1329        let update = TableRequirement::DefaultSortOrderIdMatch {
1330            default_sort_order_id: 10,
1331        };
1332
1333        test_serde_json(json, update);
1334    }
1335
1336    #[test]
1337    fn test_parse_assert_invalid() {
1338        assert!(
1339            serde_json::from_str::<TableRequirement>(
1340                r#"
1341{
1342    "default-sort-order-id": 10
1343}
1344"#
1345            )
1346            .is_err(),
1347            "Table requirements should not be parsed without type."
1348        );
1349    }
1350
1351    #[test]
1352    fn test_assign_uuid() {
1353        test_serde_json(
1354            r#"
1355{
1356    "action": "assign-uuid",
1357    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1358}
1359        "#,
1360            TableUpdate::AssignUuid {
1361                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1362            },
1363        );
1364    }
1365
1366    #[test]
1367    fn test_upgrade_format_version() {
1368        test_serde_json(
1369            r#"
1370{
1371    "action": "upgrade-format-version",
1372    "format-version": 2
1373}
1374        "#,
1375            TableUpdate::UpgradeFormatVersion {
1376                format_version: FormatVersion::V2,
1377            },
1378        );
1379    }
1380
1381    #[test]
1382    fn test_add_schema() {
1383        let test_schema = Schema::builder()
1384            .with_schema_id(1)
1385            .with_identifier_field_ids(vec![2])
1386            .with_fields(vec![
1387                NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1388                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1389                NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1390            ])
1391            .build()
1392            .unwrap();
1393        test_serde_json(
1394            r#"
1395{
1396    "action": "add-schema",
1397    "schema": {
1398        "type": "struct",
1399        "schema-id": 1,
1400        "fields": [
1401            {
1402                "id": 1,
1403                "name": "foo",
1404                "required": false,
1405                "type": "string"
1406            },
1407            {
1408                "id": 2,
1409                "name": "bar",
1410                "required": true,
1411                "type": "int"
1412            },
1413            {
1414                "id": 3,
1415                "name": "baz",
1416                "required": false,
1417                "type": "boolean"
1418            }
1419        ],
1420        "identifier-field-ids": [
1421            2
1422        ]
1423    },
1424    "last-column-id": 3
1425}
1426        "#,
1427            TableUpdate::AddSchema {
1428                schema: test_schema.clone(),
1429            },
1430        );
1431
1432        test_serde_json(
1433            r#"
1434{
1435    "action": "add-schema",
1436    "schema": {
1437        "type": "struct",
1438        "schema-id": 1,
1439        "fields": [
1440            {
1441                "id": 1,
1442                "name": "foo",
1443                "required": false,
1444                "type": "string"
1445            },
1446            {
1447                "id": 2,
1448                "name": "bar",
1449                "required": true,
1450                "type": "int"
1451            },
1452            {
1453                "id": 3,
1454                "name": "baz",
1455                "required": false,
1456                "type": "boolean"
1457            }
1458        ],
1459        "identifier-field-ids": [
1460            2
1461        ]
1462    }
1463}
1464        "#,
1465            TableUpdate::AddSchema {
1466                schema: test_schema.clone(),
1467            },
1468        );
1469    }
1470
1471    #[test]
1472    fn test_set_current_schema() {
1473        test_serde_json(
1474            r#"
1475{
1476   "action": "set-current-schema",
1477   "schema-id": 23
1478}
1479        "#,
1480            TableUpdate::SetCurrentSchema { schema_id: 23 },
1481        );
1482    }
1483
1484    #[test]
1485    fn test_add_spec() {
1486        test_serde_json(
1487            r#"
1488{
1489    "action": "add-spec",
1490    "spec": {
1491        "fields": [
1492            {
1493                "source-id": 4,
1494                "name": "ts_day",
1495                "transform": "day"
1496            },
1497            {
1498                "source-id": 1,
1499                "name": "id_bucket",
1500                "transform": "bucket[16]"
1501            },
1502            {
1503                "source-id": 2,
1504                "name": "id_truncate",
1505                "transform": "truncate[4]"
1506            }
1507        ]
1508    }
1509}
1510        "#,
1511            TableUpdate::AddSpec {
1512                spec: UnboundPartitionSpec::builder()
1513                    .add_partition_field(4, "ts_day".to_string(), Transform::Day)
1514                    .unwrap()
1515                    .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
1516                    .unwrap()
1517                    .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
1518                    .unwrap()
1519                    .build(),
1520            },
1521        );
1522    }
1523
1524    #[test]
1525    fn test_set_default_spec() {
1526        test_serde_json(
1527            r#"
1528{
1529    "action": "set-default-spec",
1530    "spec-id": 1
1531}
1532        "#,
1533            TableUpdate::SetDefaultSpec { spec_id: 1 },
1534        )
1535    }
1536
1537    #[test]
1538    fn test_add_sort_order() {
1539        let json = r#"
1540{
1541    "action": "add-sort-order",
1542    "sort-order": {
1543        "order-id": 1,
1544        "fields": [
1545            {
1546                "transform": "identity",
1547                "source-id": 2,
1548                "direction": "asc",
1549                "null-order": "nulls-first"
1550            },
1551            {
1552                "transform": "bucket[4]",
1553                "source-id": 3,
1554                "direction": "desc",
1555                "null-order": "nulls-last"
1556            }
1557        ]
1558    }
1559}
1560        "#;
1561
1562        let update = TableUpdate::AddSortOrder {
1563            sort_order: SortOrder::builder()
1564                .with_order_id(1)
1565                .with_sort_field(
1566                    SortField::builder()
1567                        .source_id(2)
1568                        .direction(SortDirection::Ascending)
1569                        .null_order(NullOrder::First)
1570                        .transform(Transform::Identity)
1571                        .build(),
1572                )
1573                .with_sort_field(
1574                    SortField::builder()
1575                        .source_id(3)
1576                        .direction(SortDirection::Descending)
1577                        .null_order(NullOrder::Last)
1578                        .transform(Transform::Bucket(4))
1579                        .build(),
1580                )
1581                .build_unbound()
1582                .unwrap(),
1583        };
1584
1585        test_serde_json(json, update);
1586    }
1587
1588    #[test]
1589    fn test_set_default_order() {
1590        let json = r#"
1591{
1592    "action": "set-default-sort-order",
1593    "sort-order-id": 2
1594}
1595        "#;
1596        let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
1597
1598        test_serde_json(json, update);
1599    }
1600
1601    #[test]
1602    fn test_add_snapshot() {
1603        let json = r#"
1604{
1605    "action": "add-snapshot",
1606    "snapshot": {
1607        "snapshot-id": 3055729675574597000,
1608        "parent-snapshot-id": 3051729675574597000,
1609        "timestamp-ms": 1555100955770,
1610        "sequence-number": 1,
1611        "summary": {
1612            "operation": "append"
1613        },
1614        "manifest-list": "s3://a/b/2.avro",
1615        "schema-id": 1
1616    }
1617}
1618        "#;
1619
1620        let update = TableUpdate::AddSnapshot {
1621            snapshot: Snapshot::builder()
1622                .with_snapshot_id(3055729675574597000)
1623                .with_parent_snapshot_id(Some(3051729675574597000))
1624                .with_timestamp_ms(1555100955770)
1625                .with_sequence_number(1)
1626                .with_manifest_list("s3://a/b/2.avro")
1627                .with_schema_id(1)
1628                .with_summary(Summary {
1629                    operation: Operation::Append,
1630                    additional_properties: HashMap::default(),
1631                })
1632                .build(),
1633        };
1634
1635        test_serde_json(json, update);
1636    }
1637
1638    #[test]
1639    fn test_add_snapshot_v1() {
1640        let json = r#"
1641{
1642    "action": "add-snapshot",
1643    "snapshot": {
1644        "snapshot-id": 3055729675574597000,
1645        "parent-snapshot-id": 3051729675574597000,
1646        "timestamp-ms": 1555100955770,
1647        "summary": {
1648            "operation": "append"
1649        },
1650        "manifest-list": "s3://a/b/2.avro"
1651    }
1652}
1653    "#;
1654
1655        let update = TableUpdate::AddSnapshot {
1656            snapshot: Snapshot::builder()
1657                .with_snapshot_id(3055729675574597000)
1658                .with_parent_snapshot_id(Some(3051729675574597000))
1659                .with_timestamp_ms(1555100955770)
1660                .with_sequence_number(0)
1661                .with_manifest_list("s3://a/b/2.avro")
1662                .with_summary(Summary {
1663                    operation: Operation::Append,
1664                    additional_properties: HashMap::default(),
1665                })
1666                .build(),
1667        };
1668
1669        let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
1670        assert_eq!(actual, update, "Parsed value is not equal to expected");
1671    }
1672
1673    #[test]
1674    fn test_remove_snapshots() {
1675        let json = r#"
1676{
1677    "action": "remove-snapshots",
1678    "snapshot-ids": [
1679        1,
1680        2
1681    ]
1682}
1683        "#;
1684
1685        let update = TableUpdate::RemoveSnapshots {
1686            snapshot_ids: vec![1, 2],
1687        };
1688        test_serde_json(json, update);
1689    }
1690
1691    #[test]
1692    fn test_remove_snapshot_ref() {
1693        let json = r#"
1694{
1695    "action": "remove-snapshot-ref",
1696    "ref-name": "snapshot-ref"
1697}
1698        "#;
1699
1700        let update = TableUpdate::RemoveSnapshotRef {
1701            ref_name: "snapshot-ref".to_string(),
1702        };
1703        test_serde_json(json, update);
1704    }
1705
1706    #[test]
1707    fn test_set_snapshot_ref_tag() {
1708        let json = r#"
1709{
1710    "action": "set-snapshot-ref",
1711    "type": "tag",
1712    "ref-name": "hank",
1713    "snapshot-id": 1,
1714    "max-ref-age-ms": 1
1715}
1716        "#;
1717
1718        let update = TableUpdate::SetSnapshotRef {
1719            ref_name: "hank".to_string(),
1720            reference: SnapshotReference {
1721                snapshot_id: 1,
1722                retention: SnapshotRetention::Tag {
1723                    max_ref_age_ms: Some(1),
1724                },
1725            },
1726        };
1727
1728        test_serde_json(json, update);
1729    }
1730
1731    #[test]
1732    fn test_set_snapshot_ref_branch() {
1733        let json = r#"
1734{
1735    "action": "set-snapshot-ref",
1736    "type": "branch",
1737    "ref-name": "hank",
1738    "snapshot-id": 1,
1739    "min-snapshots-to-keep": 2,
1740    "max-snapshot-age-ms": 3,
1741    "max-ref-age-ms": 4
1742}
1743        "#;
1744
1745        let update = TableUpdate::SetSnapshotRef {
1746            ref_name: "hank".to_string(),
1747            reference: SnapshotReference {
1748                snapshot_id: 1,
1749                retention: SnapshotRetention::Branch {
1750                    min_snapshots_to_keep: Some(2),
1751                    max_snapshot_age_ms: Some(3),
1752                    max_ref_age_ms: Some(4),
1753                },
1754            },
1755        };
1756
1757        test_serde_json(json, update);
1758    }
1759
1760    #[test]
1761    fn test_set_properties() {
1762        let json = r#"
1763{
1764    "action": "set-properties",
1765    "updates": {
1766        "prop1": "v1",
1767        "prop2": "v2"
1768    }
1769}
1770        "#;
1771
1772        let update = TableUpdate::SetProperties {
1773            updates: vec![
1774                ("prop1".to_string(), "v1".to_string()),
1775                ("prop2".to_string(), "v2".to_string()),
1776            ]
1777            .into_iter()
1778            .collect(),
1779        };
1780
1781        test_serde_json(json, update);
1782    }
1783
1784    #[test]
1785    fn test_remove_properties() {
1786        let json = r#"
1787{
1788    "action": "remove-properties",
1789    "removals": [
1790        "prop1",
1791        "prop2"
1792    ]
1793}
1794        "#;
1795
1796        let update = TableUpdate::RemoveProperties {
1797            removals: vec!["prop1".to_string(), "prop2".to_string()],
1798        };
1799
1800        test_serde_json(json, update);
1801    }
1802
1803    #[test]
1804    fn test_set_location() {
1805        let json = r#"
1806{
1807    "action": "set-location",
1808    "location": "s3://bucket/warehouse/tbl_location"
1809}
1810    "#;
1811
1812        let update = TableUpdate::SetLocation {
1813            location: "s3://bucket/warehouse/tbl_location".to_string(),
1814        };
1815
1816        test_serde_json(json, update);
1817    }
1818
1819    #[test]
1820    fn test_table_update_apply() {
1821        let table_creation = TableCreation::builder()
1822            .location("s3://db/table".to_string())
1823            .name("table".to_string())
1824            .properties(HashMap::new())
1825            .schema(Schema::builder().build().unwrap())
1826            .build();
1827        let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
1828            .unwrap()
1829            .build()
1830            .unwrap()
1831            .metadata;
1832        let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
1833            table_metadata,
1834            Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
1835        );
1836
1837        let uuid = uuid::Uuid::new_v4();
1838        let update = TableUpdate::AssignUuid { uuid };
1839        let updated_metadata = update
1840            .apply(table_metadata_builder)
1841            .unwrap()
1842            .build()
1843            .unwrap()
1844            .metadata;
1845        assert_eq!(updated_metadata.uuid(), uuid);
1846    }
1847
1848    #[test]
1849    fn test_view_assign_uuid() {
1850        test_serde_json(
1851            r#"
1852{
1853    "action": "assign-uuid",
1854    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1855}
1856        "#,
1857            ViewUpdate::AssignUuid {
1858                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1859            },
1860        );
1861    }
1862
1863    #[test]
1864    fn test_view_upgrade_format_version() {
1865        test_serde_json(
1866            r#"
1867{
1868    "action": "upgrade-format-version",
1869    "format-version": 1
1870}
1871        "#,
1872            ViewUpdate::UpgradeFormatVersion {
1873                format_version: ViewFormatVersion::V1,
1874            },
1875        );
1876    }
1877
1878    #[test]
1879    fn test_view_add_schema() {
1880        let test_schema = Schema::builder()
1881            .with_schema_id(1)
1882            .with_identifier_field_ids(vec![2])
1883            .with_fields(vec![
1884                NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1885                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1886                NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1887            ])
1888            .build()
1889            .unwrap();
1890        test_serde_json(
1891            r#"
1892{
1893    "action": "add-schema",
1894    "schema": {
1895        "type": "struct",
1896        "schema-id": 1,
1897        "fields": [
1898            {
1899                "id": 1,
1900                "name": "foo",
1901                "required": false,
1902                "type": "string"
1903            },
1904            {
1905                "id": 2,
1906                "name": "bar",
1907                "required": true,
1908                "type": "int"
1909            },
1910            {
1911                "id": 3,
1912                "name": "baz",
1913                "required": false,
1914                "type": "boolean"
1915            }
1916        ],
1917        "identifier-field-ids": [
1918            2
1919        ]
1920    },
1921    "last-column-id": 3
1922}
1923        "#,
1924            ViewUpdate::AddSchema {
1925                schema: test_schema.clone(),
1926                last_column_id: Some(3),
1927            },
1928        );
1929    }
1930
1931    #[test]
1932    fn test_view_set_location() {
1933        test_serde_json(
1934            r#"
1935{
1936    "action": "set-location",
1937    "location": "s3://db/view"
1938}
1939        "#,
1940            ViewUpdate::SetLocation {
1941                location: "s3://db/view".to_string(),
1942            },
1943        );
1944    }
1945
1946    #[test]
1947    fn test_view_set_properties() {
1948        test_serde_json(
1949            r#"
1950{
1951    "action": "set-properties",
1952    "updates": {
1953        "prop1": "v1",
1954        "prop2": "v2"
1955    }
1956}
1957        "#,
1958            ViewUpdate::SetProperties {
1959                updates: vec![
1960                    ("prop1".to_string(), "v1".to_string()),
1961                    ("prop2".to_string(), "v2".to_string()),
1962                ]
1963                .into_iter()
1964                .collect(),
1965            },
1966        );
1967    }
1968
1969    #[test]
1970    fn test_view_remove_properties() {
1971        test_serde_json(
1972            r#"
1973{
1974    "action": "remove-properties",
1975    "removals": [
1976        "prop1",
1977        "prop2"
1978    ]
1979}
1980        "#,
1981            ViewUpdate::RemoveProperties {
1982                removals: vec!["prop1".to_string(), "prop2".to_string()],
1983            },
1984        );
1985    }
1986
1987    #[test]
1988    fn test_view_add_view_version() {
1989        test_serde_json(
1990            r#"
1991{
1992    "action": "add-view-version",
1993    "view-version": {
1994            "version-id" : 1,
1995            "timestamp-ms" : 1573518431292,
1996            "schema-id" : 1,
1997            "default-catalog" : "prod",
1998            "default-namespace" : [ "default" ],
1999            "summary" : {
2000              "engine-name" : "Spark"
2001            },
2002            "representations" : [ {
2003              "type" : "sql",
2004              "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
2005              "dialect" : "spark"
2006            } ]
2007    }
2008}
2009        "#,
2010            ViewUpdate::AddViewVersion {
2011                view_version: ViewVersion::builder()
2012                    .with_version_id(1)
2013                    .with_timestamp_ms(1573518431292)
2014                    .with_schema_id(1)
2015                    .with_default_catalog(Some("prod".to_string()))
2016                    .with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
2017                    .with_summary(
2018                        vec![("engine-name".to_string(), "Spark".to_string())]
2019                            .into_iter()
2020                            .collect(),
2021                    )
2022                    .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
2023                        sql: "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
2024                        dialect: "spark".to_string(),
2025                    })]))
2026                    .build(),
2027            },
2028        );
2029    }
2030
2031    #[test]
2032    fn test_view_set_current_view_version() {
2033        test_serde_json(
2034            r#"
2035{
2036    "action": "set-current-view-version",
2037    "view-version-id": 1
2038}
2039        "#,
2040            ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
2041        );
2042    }
2043
2044    #[test]
2045    fn test_remove_partition_specs_update() {
2046        test_serde_json(
2047            r#"
2048{
2049    "action": "remove-partition-specs",
2050    "spec-ids": [1, 2]
2051}
2052        "#,
2053            TableUpdate::RemovePartitionSpecs {
2054                spec_ids: vec![1, 2],
2055            },
2056        );
2057    }
2058
2059    #[test]
2060    fn test_set_statistics_file() {
2061        test_serde_json(
2062            r#"
2063        {
2064                "action": "set-statistics",
2065                "snapshot-id": 1940541653261589030,
2066                "statistics": {
2067                        "snapshot-id": 1940541653261589030,
2068                        "statistics-path": "s3://bucket/warehouse/stats.puffin",
2069                        "file-size-in-bytes": 124,
2070                        "file-footer-size-in-bytes": 27,
2071                        "blob-metadata": [
2072                                {
2073                                        "type": "boring-type",
2074                                        "snapshot-id": 1940541653261589030,
2075                                        "sequence-number": 2,
2076                                        "fields": [
2077                                                1
2078                                        ],
2079                                        "properties": {
2080                                                "prop-key": "prop-value"
2081                                        }
2082                                }
2083                        ]
2084                }
2085        }
2086        "#,
2087            TableUpdate::SetStatistics {
2088                statistics: StatisticsFile {
2089                    snapshot_id: 1940541653261589030,
2090                    statistics_path: "s3://bucket/warehouse/stats.puffin".to_string(),
2091                    file_size_in_bytes: 124,
2092                    file_footer_size_in_bytes: 27,
2093                    key_metadata: None,
2094                    blob_metadata: vec![BlobMetadata {
2095                        r#type: "boring-type".to_string(),
2096                        snapshot_id: 1940541653261589030,
2097                        sequence_number: 2,
2098                        fields: vec![1],
2099                        properties: vec![("prop-key".to_string(), "prop-value".to_string())]
2100                            .into_iter()
2101                            .collect(),
2102                    }],
2103                },
2104            },
2105        );
2106    }
2107
2108    #[test]
2109    fn test_remove_statistics_file() {
2110        test_serde_json(
2111            r#"
2112        {
2113                "action": "remove-statistics",
2114                "snapshot-id": 1940541653261589030
2115        }
2116        "#,
2117            TableUpdate::RemoveStatistics {
2118                snapshot_id: 1940541653261589030,
2119            },
2120        );
2121    }
2122
2123    #[test]
2124    fn test_set_partition_statistics_file() {
2125        test_serde_json(
2126            r#"
2127            {
2128                "action": "set-partition-statistics",
2129                "partition-statistics": {
2130                    "snapshot-id": 1940541653261589030,
2131                    "statistics-path": "s3://bucket/warehouse/stats1.parquet",
2132                    "file-size-in-bytes": 43
2133                }
2134            }
2135            "#,
2136            TableUpdate::SetPartitionStatistics {
2137                partition_statistics: PartitionStatisticsFile {
2138                    snapshot_id: 1940541653261589030,
2139                    statistics_path: "s3://bucket/warehouse/stats1.parquet".to_string(),
2140                    file_size_in_bytes: 43,
2141                },
2142            },
2143        )
2144    }
2145
2146    #[test]
2147    fn test_remove_partition_statistics_file() {
2148        test_serde_json(
2149            r#"
2150            {
2151                "action": "remove-partition-statistics",
2152                "snapshot-id": 1940541653261589030
2153            }
2154            "#,
2155            TableUpdate::RemovePartitionStatistics {
2156                snapshot_id: 1940541653261589030,
2157            },
2158        )
2159    }
2160
2161    #[test]
2162    fn test_remove_schema_update() {
2163        test_serde_json(
2164            r#"
2165                {
2166                    "action": "remove-schemas",
2167                    "schema-ids": [1, 2]
2168                }        
2169            "#,
2170            TableUpdate::RemoveSchemas {
2171                schema_ids: vec![1, 2],
2172            },
2173        );
2174    }
2175
2176    #[test]
2177    fn test_table_commit() {
2178        let table = {
2179            let file = File::open(format!(
2180                "{}/testdata/table_metadata/{}",
2181                env!("CARGO_MANIFEST_DIR"),
2182                "TableMetadataV2Valid.json"
2183            ))
2184            .unwrap();
2185            let reader = BufReader::new(file);
2186            let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2187
2188            Table::builder()
2189                .metadata(resp)
2190                .metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string())
2191                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2192                .file_io(FileIOBuilder::new("memory").build().unwrap())
2193                .build()
2194                .unwrap()
2195        };
2196
2197        let updates = vec![
2198            TableUpdate::SetLocation {
2199                location: "s3://bucket/test/new_location/data".to_string(),
2200            },
2201            TableUpdate::SetProperties {
2202                updates: vec![
2203                    ("prop1".to_string(), "v1".to_string()),
2204                    ("prop2".to_string(), "v2".to_string()),
2205                ]
2206                .into_iter()
2207                .collect(),
2208            },
2209        ];
2210
2211        let requirements = vec![TableRequirement::UuidMatch {
2212            uuid: table.metadata().table_uuid,
2213        }];
2214
2215        let table_commit = TableCommit::builder()
2216            .ident(table.identifier().to_owned())
2217            .updates(updates)
2218            .requirements(requirements)
2219            .build();
2220
2221        let updated_table = table_commit.apply(table).unwrap();
2222
2223        assert_eq!(
2224            updated_table.metadata().properties.get("prop1").unwrap(),
2225            "v1"
2226        );
2227        assert_eq!(
2228            updated_table.metadata().properties.get("prop2").unwrap(),
2229            "v2"
2230        );
2231
2232        // metadata version should be bumped
2233        assert!(
2234            updated_table
2235                .metadata_location()
2236                .unwrap()
2237                .starts_with("s3://bucket/test/location/metadata/00001-")
2238        );
2239
2240        assert_eq!(
2241            updated_table.metadata().location,
2242            "s3://bucket/test/new_location/data",
2243        );
2244    }
2245}