1pub mod memory;
21mod metadata_location;
22
23use std::collections::HashMap;
24use std::fmt::{Debug, Display};
25use std::future::Future;
26use std::mem::take;
27use std::ops::Deref;
28use std::str::FromStr;
29use std::sync::Arc;
30
31use _serde::{deserialize_snapshot, serialize_snapshot};
32use async_trait::async_trait;
33pub use memory::MemoryCatalog;
34pub use metadata_location::*;
35#[cfg(test)]
36use mockall::automock;
37use serde_derive::{Deserialize, Serialize};
38use typed_builder::TypedBuilder;
39use uuid::Uuid;
40
41use crate::io::StorageFactory;
42use crate::spec::{
43 EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
44 SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
45 UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
46};
47use crate::table::Table;
48use crate::{Error, ErrorKind, Result};
49
50#[async_trait]
52#[cfg_attr(test, automock)]
53pub trait Catalog: Debug + Sync + Send {
54 async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
56 -> Result<Vec<NamespaceIdent>>;
57
58 async fn create_namespace(
60 &self,
61 namespace: &NamespaceIdent,
62 properties: HashMap<String, String>,
63 ) -> Result<Namespace>;
64
65 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
67
68 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
70
71 async fn update_namespace(
77 &self,
78 namespace: &NamespaceIdent,
79 properties: HashMap<String, String>,
80 ) -> Result<()>;
81
82 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
84
85 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
87
88 async fn create_table(
90 &self,
91 namespace: &NamespaceIdent,
92 creation: TableCreation,
93 ) -> Result<Table>;
94
95 async fn load_table(&self, table: &TableIdent) -> Result<Table>;
97
98 async fn drop_table(&self, table: &TableIdent) -> Result<()>;
100
101 async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
103
104 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
106
107 async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table>;
109
110 async fn update_table(&self, commit: TableCommit) -> Result<Table>;
112}
113
114pub trait CatalogBuilder: Default + Debug + Send + Sync {
116 type C: Catalog;
118
119 fn with_storage_factory(self, storage_factory: Arc<dyn StorageFactory>) -> Self;
145
146 fn load(
148 self,
149 name: impl Into<String>,
150 props: HashMap<String, String>,
151 ) -> impl Future<Output = Result<Self::C>> + Send;
152}
153
154#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
160pub struct NamespaceIdent(Vec<String>);
161
162impl NamespaceIdent {
163 pub fn new(name: String) -> Self {
165 Self(vec![name])
166 }
167
168 pub fn from_vec(names: Vec<String>) -> Result<Self> {
170 if names.is_empty() {
171 return Err(Error::new(
172 ErrorKind::DataInvalid,
173 "Namespace identifier can't be empty!",
174 ));
175 }
176 Ok(Self(names))
177 }
178
179 pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
181 Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
182 }
183
184 pub fn to_url_string(&self) -> String {
186 self.as_ref().join("\u{001f}")
187 }
188
189 pub fn inner(self) -> Vec<String> {
191 self.0
192 }
193
194 pub fn parent(&self) -> Option<Self> {
197 self.0.split_last().and_then(|(_, parent)| {
198 if parent.is_empty() {
199 None
200 } else {
201 Some(Self(parent.to_vec()))
202 }
203 })
204 }
205}
206
207impl AsRef<Vec<String>> for NamespaceIdent {
208 fn as_ref(&self) -> &Vec<String> {
209 &self.0
210 }
211}
212
213impl Deref for NamespaceIdent {
214 type Target = [String];
215
216 fn deref(&self) -> &Self::Target {
217 &self.0
218 }
219}
220
221#[derive(Debug, Clone, PartialEq, Eq)]
223pub struct Namespace {
224 name: NamespaceIdent,
225 properties: HashMap<String, String>,
226}
227
228impl Namespace {
229 pub fn new(name: NamespaceIdent) -> Self {
231 Self::with_properties(name, HashMap::default())
232 }
233
234 pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
236 Self { name, properties }
237 }
238
239 pub fn name(&self) -> &NamespaceIdent {
241 &self.name
242 }
243
244 pub fn properties(&self) -> &HashMap<String, String> {
246 &self.properties
247 }
248}
249
250impl Display for NamespaceIdent {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 write!(f, "{}", self.0.join("."))
253 }
254}
255
256#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
258pub struct TableIdent {
259 pub namespace: NamespaceIdent,
261 pub name: String,
263}
264
265impl TableIdent {
266 pub fn new(namespace: NamespaceIdent, name: String) -> Self {
268 Self { namespace, name }
269 }
270
271 pub fn namespace(&self) -> &NamespaceIdent {
273 &self.namespace
274 }
275
276 pub fn name(&self) -> &str {
278 &self.name
279 }
280
281 pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
283 let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
284 let table_name = vec.pop().ok_or_else(|| {
285 Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
286 })?;
287 let namespace_ident = NamespaceIdent::from_vec(vec)?;
288
289 Ok(Self {
290 namespace: namespace_ident,
291 name: table_name,
292 })
293 }
294}
295
296impl Display for TableIdent {
297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298 write!(f, "{}.{}", self.namespace, self.name)
299 }
300}
301
302#[derive(Debug, TypedBuilder)]
304pub struct TableCreation {
305 pub name: String,
307 #[builder(default, setter(strip_option(fallback = location_opt)))]
309 pub location: Option<String>,
310 pub schema: Schema,
312 #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))]
314 pub partition_spec: Option<UnboundPartitionSpec>,
315 #[builder(default, setter(strip_option(fallback = sort_order_opt)))]
317 pub sort_order: Option<SortOrder>,
318 #[builder(default, setter(transform = |props: impl IntoIterator<Item=(String, String)>| {
320 props.into_iter().collect()
321 }))]
322 pub properties: HashMap<String, String>,
323 #[builder(default = FormatVersion::V2)]
325 pub format_version: FormatVersion,
326}
327
328#[derive(Debug, TypedBuilder)]
334#[builder(build_method(vis = "pub(crate)"))]
335pub struct TableCommit {
336 ident: TableIdent,
338 requirements: Vec<TableRequirement>,
342 updates: Vec<TableUpdate>,
344}
345
346impl TableCommit {
347 pub fn identifier(&self) -> &TableIdent {
349 &self.ident
350 }
351
352 pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
354 take(&mut self.requirements)
355 }
356
357 pub fn take_updates(&mut self) -> Vec<TableUpdate> {
359 take(&mut self.updates)
360 }
361
362 pub fn apply(self, table: Table) -> Result<Table> {
368 for requirement in self.requirements {
370 requirement.check(Some(table.metadata()))?;
371 }
372
373 let current_metadata_location = table.metadata_location_result()?;
375
376 let mut metadata_builder = table
378 .metadata()
379 .clone()
380 .into_builder(Some(current_metadata_location.to_string()));
381 for update in self.updates {
382 metadata_builder = update.apply(metadata_builder)?;
383 }
384
385 let new_metadata_location = MetadataLocation::from_str(current_metadata_location)?
387 .with_next_version()
388 .to_string();
389
390 Ok(table
391 .with_metadata(Arc::new(metadata_builder.build()?.metadata))
392 .with_metadata_location(new_metadata_location))
393 }
394}
395
396#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
398#[serde(tag = "type")]
399pub enum TableRequirement {
400 #[serde(rename = "assert-create")]
402 NotExist,
403 #[serde(rename = "assert-table-uuid")]
405 UuidMatch {
406 uuid: Uuid,
408 },
409 #[serde(rename = "assert-ref-snapshot-id")]
412 RefSnapshotIdMatch {
413 r#ref: String,
415 #[serde(rename = "snapshot-id")]
418 snapshot_id: Option<i64>,
419 },
420 #[serde(rename = "assert-last-assigned-field-id")]
422 LastAssignedFieldIdMatch {
423 #[serde(rename = "last-assigned-field-id")]
425 last_assigned_field_id: i32,
426 },
427 #[serde(rename = "assert-current-schema-id")]
429 CurrentSchemaIdMatch {
430 #[serde(rename = "current-schema-id")]
432 current_schema_id: SchemaId,
433 },
434 #[serde(rename = "assert-last-assigned-partition-id")]
437 LastAssignedPartitionIdMatch {
438 #[serde(rename = "last-assigned-partition-id")]
440 last_assigned_partition_id: i32,
441 },
442 #[serde(rename = "assert-default-spec-id")]
444 DefaultSpecIdMatch {
445 #[serde(rename = "default-spec-id")]
447 default_spec_id: i32,
448 },
449 #[serde(rename = "assert-default-sort-order-id")]
451 DefaultSortOrderIdMatch {
452 #[serde(rename = "default-sort-order-id")]
454 default_sort_order_id: i64,
455 },
456}
457
458#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
460#[serde(tag = "action", rename_all = "kebab-case")]
461#[allow(clippy::large_enum_variant)]
462pub enum TableUpdate {
463 #[serde(rename_all = "kebab-case")]
465 UpgradeFormatVersion {
466 format_version: FormatVersion,
468 },
469 #[serde(rename_all = "kebab-case")]
471 AssignUuid {
472 uuid: Uuid,
474 },
475 #[serde(rename_all = "kebab-case")]
477 AddSchema {
478 schema: Schema,
480 },
481 #[serde(rename_all = "kebab-case")]
483 SetCurrentSchema {
484 schema_id: i32,
486 },
487 AddSpec {
489 spec: UnboundPartitionSpec,
491 },
492 #[serde(rename_all = "kebab-case")]
494 SetDefaultSpec {
495 spec_id: i32,
497 },
498 #[serde(rename_all = "kebab-case")]
500 AddSortOrder {
501 sort_order: SortOrder,
503 },
504 #[serde(rename_all = "kebab-case")]
506 SetDefaultSortOrder {
507 sort_order_id: i64,
509 },
510 #[serde(rename_all = "kebab-case")]
512 AddSnapshot {
513 #[serde(
515 deserialize_with = "deserialize_snapshot",
516 serialize_with = "serialize_snapshot"
517 )]
518 snapshot: Snapshot,
519 },
520 #[serde(rename_all = "kebab-case")]
522 SetSnapshotRef {
523 ref_name: String,
525 #[serde(flatten)]
527 reference: SnapshotReference,
528 },
529 #[serde(rename_all = "kebab-case")]
531 RemoveSnapshots {
532 snapshot_ids: Vec<i64>,
534 },
535 #[serde(rename_all = "kebab-case")]
537 RemoveSnapshotRef {
538 ref_name: String,
540 },
541 SetLocation {
543 location: String,
545 },
546 SetProperties {
548 updates: HashMap<String, String>,
550 },
551 RemoveProperties {
553 removals: Vec<String>,
555 },
556 #[serde(rename_all = "kebab-case")]
558 RemovePartitionSpecs {
559 spec_ids: Vec<i32>,
561 },
562 #[serde(with = "_serde_set_statistics")]
564 SetStatistics {
565 statistics: StatisticsFile,
567 },
568 #[serde(rename_all = "kebab-case")]
570 RemoveStatistics {
571 snapshot_id: i64,
573 },
574 #[serde(rename_all = "kebab-case")]
576 SetPartitionStatistics {
577 partition_statistics: PartitionStatisticsFile,
579 },
580 #[serde(rename_all = "kebab-case")]
582 RemovePartitionStatistics {
583 snapshot_id: i64,
585 },
586 #[serde(rename_all = "kebab-case")]
588 RemoveSchemas {
589 schema_ids: Vec<i32>,
591 },
592 #[serde(rename_all = "kebab-case")]
594 AddEncryptionKey {
595 encryption_key: EncryptedKey,
597 },
598 #[serde(rename_all = "kebab-case")]
600 RemoveEncryptionKey {
601 key_id: String,
603 },
604}
605
606impl TableUpdate {
607 pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
609 match self {
610 TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
611 TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
612 TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
613 TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
614 TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
615 TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
616 TableUpdate::SetDefaultSortOrder { sort_order_id } => {
617 builder.set_default_sort_order(sort_order_id)
618 }
619 TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
620 TableUpdate::SetSnapshotRef {
621 ref_name,
622 reference,
623 } => builder.set_ref(&ref_name, reference),
624 TableUpdate::RemoveSnapshots { snapshot_ids } => {
625 Ok(builder.remove_snapshots(&snapshot_ids))
626 }
627 TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
628 TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
629 TableUpdate::SetProperties { updates } => builder.set_properties(updates),
630 TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
631 TableUpdate::UpgradeFormatVersion { format_version } => {
632 builder.upgrade_format_version(format_version)
633 }
634 TableUpdate::RemovePartitionSpecs { spec_ids } => {
635 builder.remove_partition_specs(&spec_ids)
636 }
637 TableUpdate::SetStatistics { statistics } => Ok(builder.set_statistics(statistics)),
638 TableUpdate::RemoveStatistics { snapshot_id } => {
639 Ok(builder.remove_statistics(snapshot_id))
640 }
641 TableUpdate::SetPartitionStatistics {
642 partition_statistics,
643 } => Ok(builder.set_partition_statistics(partition_statistics)),
644 TableUpdate::RemovePartitionStatistics { snapshot_id } => {
645 Ok(builder.remove_partition_statistics(snapshot_id))
646 }
647 TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
648 TableUpdate::AddEncryptionKey { encryption_key } => {
649 Ok(builder.add_encryption_key(encryption_key))
650 }
651 TableUpdate::RemoveEncryptionKey { key_id } => {
652 Ok(builder.remove_encryption_key(&key_id))
653 }
654 }
655 }
656}
657
658impl TableRequirement {
659 pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
664 if let Some(metadata) = metadata {
665 match self {
666 TableRequirement::NotExist => {
667 return Err(Error::new(
668 ErrorKind::CatalogCommitConflicts,
669 format!(
670 "Requirement failed: Table with id {} already exists",
671 metadata.uuid()
672 ),
673 )
674 .with_retryable(true));
675 }
676 TableRequirement::UuidMatch { uuid } => {
677 if &metadata.uuid() != uuid {
678 return Err(Error::new(
679 ErrorKind::CatalogCommitConflicts,
680 "Requirement failed: Table UUID does not match",
681 )
682 .with_context("expected", *uuid)
683 .with_context("found", metadata.uuid())
684 .with_retryable(true));
685 }
686 }
687 TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
688 if metadata.current_schema_id != *current_schema_id {
690 return Err(Error::new(
691 ErrorKind::CatalogCommitConflicts,
692 "Requirement failed: Current schema id does not match",
693 )
694 .with_context("expected", current_schema_id.to_string())
695 .with_context("found", metadata.current_schema_id.to_string())
696 .with_retryable(true));
697 }
698 }
699 TableRequirement::DefaultSortOrderIdMatch {
700 default_sort_order_id,
701 } => {
702 if metadata.default_sort_order().order_id != *default_sort_order_id {
703 return Err(Error::new(
704 ErrorKind::CatalogCommitConflicts,
705 "Requirement failed: Default sort order id does not match",
706 )
707 .with_context("expected", default_sort_order_id.to_string())
708 .with_context("found", metadata.default_sort_order().order_id.to_string())
709 .with_retryable(true));
710 }
711 }
712 TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
713 let snapshot_ref = metadata.snapshot_for_ref(r#ref);
714 if let Some(snapshot_id) = snapshot_id {
715 let snapshot_ref = snapshot_ref.ok_or(
716 Error::new(
717 ErrorKind::CatalogCommitConflicts,
718 format!("Requirement failed: Branch or tag `{ref}` not found"),
719 )
720 .with_retryable(true),
721 )?;
722 if snapshot_ref.snapshot_id() != *snapshot_id {
723 return Err(Error::new(
724 ErrorKind::CatalogCommitConflicts,
725 format!(
726 "Requirement failed: Branch or tag `{ref}`'s snapshot has changed"
727 ),
728 )
729 .with_context("expected", snapshot_id.to_string())
730 .with_context("found", snapshot_ref.snapshot_id().to_string())
731 .with_retryable(true));
732 }
733 } else if snapshot_ref.is_some() {
734 return Err(Error::new(
736 ErrorKind::CatalogCommitConflicts,
737 format!("Requirement failed: Branch or tag `{ref}` already exists"),
738 )
739 .with_retryable(true));
740 }
741 }
742 TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
743 if metadata.default_partition_spec_id() != *default_spec_id {
745 return Err(Error::new(
746 ErrorKind::CatalogCommitConflicts,
747 "Requirement failed: Default partition spec id does not match",
748 )
749 .with_context("expected", default_spec_id.to_string())
750 .with_context("found", metadata.default_partition_spec_id().to_string())
751 .with_retryable(true));
752 }
753 }
754 TableRequirement::LastAssignedPartitionIdMatch {
755 last_assigned_partition_id,
756 } => {
757 if metadata.last_partition_id != *last_assigned_partition_id {
758 return Err(Error::new(
759 ErrorKind::CatalogCommitConflicts,
760 "Requirement failed: Last assigned partition id does not match",
761 )
762 .with_context("expected", last_assigned_partition_id.to_string())
763 .with_context("found", metadata.last_partition_id.to_string())
764 .with_retryable(true));
765 }
766 }
767 TableRequirement::LastAssignedFieldIdMatch {
768 last_assigned_field_id,
769 } => {
770 if &metadata.last_column_id != last_assigned_field_id {
771 return Err(Error::new(
772 ErrorKind::CatalogCommitConflicts,
773 "Requirement failed: Last assigned field id does not match",
774 )
775 .with_context("expected", last_assigned_field_id.to_string())
776 .with_context("found", metadata.last_column_id.to_string())
777 .with_retryable(true));
778 }
779 }
780 };
781 } else {
782 match self {
783 TableRequirement::NotExist => {}
784 _ => {
785 return Err(Error::new(
786 ErrorKind::TableNotFound,
787 "Requirement failed: Table does not exist",
788 ));
789 }
790 }
791 }
792
793 Ok(())
794 }
795}
796
797pub(super) mod _serde {
798 use serde::{Deserialize as _, Deserializer, Serialize as _};
799
800 use super::*;
801 use crate::spec::{SchemaId, Summary};
802
803 pub(super) fn deserialize_snapshot<'de, D>(
804 deserializer: D,
805 ) -> std::result::Result<Snapshot, D::Error>
806 where D: Deserializer<'de> {
807 let buf = CatalogSnapshot::deserialize(deserializer)?;
808 Ok(buf.into())
809 }
810
811 pub(super) fn serialize_snapshot<S>(
812 snapshot: &Snapshot,
813 serializer: S,
814 ) -> std::result::Result<S::Ok, S::Error>
815 where
816 S: serde::Serializer,
817 {
818 let buf: CatalogSnapshot = snapshot.clone().into();
819 buf.serialize(serializer)
820 }
821
822 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
823 #[serde(rename_all = "kebab-case")]
824 struct CatalogSnapshot {
828 snapshot_id: i64,
829 #[serde(skip_serializing_if = "Option::is_none")]
830 parent_snapshot_id: Option<i64>,
831 #[serde(default)]
832 sequence_number: i64,
833 timestamp_ms: i64,
834 manifest_list: String,
835 summary: Summary,
836 #[serde(skip_serializing_if = "Option::is_none")]
837 schema_id: Option<SchemaId>,
838 #[serde(skip_serializing_if = "Option::is_none")]
839 first_row_id: Option<u64>,
840 #[serde(skip_serializing_if = "Option::is_none")]
841 added_rows: Option<u64>,
842 #[serde(skip_serializing_if = "Option::is_none")]
843 key_id: Option<String>,
844 }
845
846 impl From<CatalogSnapshot> for Snapshot {
847 fn from(snapshot: CatalogSnapshot) -> Self {
848 let CatalogSnapshot {
849 snapshot_id,
850 parent_snapshot_id,
851 sequence_number,
852 timestamp_ms,
853 manifest_list,
854 schema_id,
855 summary,
856 first_row_id,
857 added_rows,
858 key_id,
859 } = snapshot;
860 let builder = Snapshot::builder()
861 .with_snapshot_id(snapshot_id)
862 .with_parent_snapshot_id(parent_snapshot_id)
863 .with_sequence_number(sequence_number)
864 .with_timestamp_ms(timestamp_ms)
865 .with_manifest_list(manifest_list)
866 .with_summary(summary)
867 .with_encryption_key_id(key_id);
868 let row_range = first_row_id.zip(added_rows);
869 match (schema_id, row_range) {
870 (None, None) => builder.build(),
871 (Some(schema_id), None) => builder.with_schema_id(schema_id).build(),
872 (None, Some((first_row_id, last_row_id))) => {
873 builder.with_row_range(first_row_id, last_row_id).build()
874 }
875 (Some(schema_id), Some((first_row_id, last_row_id))) => builder
876 .with_schema_id(schema_id)
877 .with_row_range(first_row_id, last_row_id)
878 .build(),
879 }
880 }
881 }
882
883 impl From<Snapshot> for CatalogSnapshot {
884 fn from(snapshot: Snapshot) -> Self {
885 let first_row_id = snapshot.first_row_id();
886 let added_rows = snapshot.added_rows_count();
887 let Snapshot {
888 snapshot_id,
889 parent_snapshot_id,
890 sequence_number,
891 timestamp_ms,
892 manifest_list,
893 summary,
894 schema_id,
895 row_range: _,
896 encryption_key_id: key_id,
897 } = snapshot;
898 CatalogSnapshot {
899 snapshot_id,
900 parent_snapshot_id,
901 sequence_number,
902 timestamp_ms,
903 manifest_list,
904 summary,
905 schema_id,
906 first_row_id,
907 added_rows,
908 key_id,
909 }
910 }
911 }
912}
913
914#[derive(Debug, TypedBuilder)]
916pub struct ViewCreation {
917 pub name: String,
919 pub location: String,
921 pub representations: ViewRepresentations,
923 pub schema: Schema,
925 #[builder(default)]
927 pub properties: HashMap<String, String>,
928 pub default_namespace: NamespaceIdent,
930 #[builder(default)]
932 pub default_catalog: Option<String>,
933 #[builder(default)]
936 pub summary: HashMap<String, String>,
937}
938
939#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
941#[serde(tag = "action", rename_all = "kebab-case")]
942#[allow(clippy::large_enum_variant)]
943pub enum ViewUpdate {
944 #[serde(rename_all = "kebab-case")]
946 AssignUuid {
947 uuid: Uuid,
949 },
950 #[serde(rename_all = "kebab-case")]
952 UpgradeFormatVersion {
953 format_version: ViewFormatVersion,
955 },
956 #[serde(rename_all = "kebab-case")]
958 AddSchema {
959 schema: Schema,
961 last_column_id: Option<i32>,
963 },
964 #[serde(rename_all = "kebab-case")]
966 SetLocation {
967 location: String,
969 },
970 #[serde(rename_all = "kebab-case")]
974 SetProperties {
975 updates: HashMap<String, String>,
977 },
978 #[serde(rename_all = "kebab-case")]
980 RemoveProperties {
981 removals: Vec<String>,
983 },
984 #[serde(rename_all = "kebab-case")]
986 AddViewVersion {
987 view_version: ViewVersion,
989 },
990 #[serde(rename_all = "kebab-case")]
992 SetCurrentViewVersion {
993 view_version_id: i32,
995 },
996}
997
998mod _serde_set_statistics {
999 use serde::{Deserialize, Deserializer, Serialize, Serializer};
1002
1003 use super::*;
1004
1005 #[derive(Debug, Serialize, Deserialize)]
1006 #[serde(rename_all = "kebab-case")]
1007 struct SetStatistics {
1008 snapshot_id: Option<i64>,
1009 statistics: StatisticsFile,
1010 }
1011
1012 pub fn serialize<S>(
1013 value: &StatisticsFile,
1014 serializer: S,
1015 ) -> std::result::Result<S::Ok, S::Error>
1016 where
1017 S: Serializer,
1018 {
1019 SetStatistics {
1020 snapshot_id: Some(value.snapshot_id),
1021 statistics: value.clone(),
1022 }
1023 .serialize(serializer)
1024 }
1025
1026 pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<StatisticsFile, D::Error>
1027 where D: Deserializer<'de> {
1028 let SetStatistics {
1029 snapshot_id,
1030 statistics,
1031 } = SetStatistics::deserialize(deserializer)?;
1032 if let Some(snapshot_id) = snapshot_id
1033 && snapshot_id != statistics.snapshot_id
1034 {
1035 return Err(serde::de::Error::custom(format!(
1036 "Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
1037 statistics.snapshot_id
1038 )));
1039 }
1040
1041 Ok(statistics)
1042 }
1043}
1044
1045#[cfg(test)]
1046mod tests {
1047 use std::collections::HashMap;
1048 use std::fmt::Debug;
1049 use std::fs::File;
1050 use std::io::BufReader;
1051
1052 use base64::Engine as _;
1053 use serde::Serialize;
1054 use serde::de::DeserializeOwned;
1055 use uuid::uuid;
1056
1057 use super::ViewUpdate;
1058 use crate::io::FileIO;
1059 use crate::spec::{
1060 BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
1061 PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
1062 SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
1063 StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
1064 UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
1065 ViewVersion,
1066 };
1067 use crate::table::Table;
1068 use crate::{
1069 NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
1070 };
1071
1072 #[test]
1073 fn test_parent_namespace() {
1074 let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
1075 let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
1076 let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
1077
1078 assert_eq!(ns1.parent(), None);
1079 assert_eq!(ns2.parent(), Some(ns1.clone()));
1080 assert_eq!(ns3.parent(), Some(ns2.clone()));
1081 }
1082
1083 #[test]
1084 fn test_create_table_id() {
1085 let table_id = TableIdent {
1086 namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
1087 name: "t1".to_string(),
1088 };
1089
1090 assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
1091 }
1092
1093 #[test]
1094 fn test_table_creation_iterator_properties() {
1095 let builder = TableCreation::builder()
1096 .name("table".to_string())
1097 .schema(Schema::builder().build().unwrap());
1098
1099 fn s(k: &str, v: &str) -> (String, String) {
1100 (k.to_string(), v.to_string())
1101 }
1102
1103 let table_creation = builder
1104 .properties([s("key", "value"), s("foo", "bar")])
1105 .build();
1106
1107 assert_eq!(
1108 HashMap::from([s("key", "value"), s("foo", "bar")]),
1109 table_creation.properties
1110 );
1111 }
1112
1113 fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
1114 json: impl ToString,
1115 expected: T,
1116 ) {
1117 let json_str = json.to_string();
1118 let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
1119 assert_eq!(actual, expected, "Parsed value is not equal to expected");
1120
1121 let restored: T = serde_json::from_str(
1122 &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1123 )
1124 .expect("Failed to parse from serialized json");
1125
1126 assert_eq!(
1127 restored, expected,
1128 "Parsed restored value is not equal to expected"
1129 );
1130 }
1131
1132 fn metadata() -> TableMetadata {
1133 let tbl_creation = TableCreation::builder()
1134 .name("table".to_string())
1135 .location("/path/to/table".to_string())
1136 .schema(Schema::builder().build().unwrap())
1137 .build();
1138
1139 TableMetadataBuilder::from_table_creation(tbl_creation)
1140 .unwrap()
1141 .assign_uuid(uuid::Uuid::nil())
1142 .build()
1143 .unwrap()
1144 .metadata
1145 }
1146
1147 #[test]
1148 fn test_check_requirement_not_exist() {
1149 let metadata = metadata();
1150 let requirement = TableRequirement::NotExist;
1151
1152 assert!(requirement.check(Some(&metadata)).is_err());
1153 assert!(requirement.check(None).is_ok());
1154 }
1155
1156 #[test]
1157 fn test_check_table_uuid() {
1158 let metadata = metadata();
1159
1160 let requirement = TableRequirement::UuidMatch {
1161 uuid: uuid::Uuid::now_v7(),
1162 };
1163 assert!(requirement.check(Some(&metadata)).is_err());
1164
1165 let requirement = TableRequirement::UuidMatch {
1166 uuid: uuid::Uuid::nil(),
1167 };
1168 assert!(requirement.check(Some(&metadata)).is_ok());
1169 }
1170
1171 #[test]
1172 fn test_check_ref_snapshot_id() {
1173 let metadata = metadata();
1174
1175 let requirement = TableRequirement::RefSnapshotIdMatch {
1177 r#ref: "my_branch".to_string(),
1178 snapshot_id: Some(1),
1179 };
1180 assert!(requirement.check(Some(&metadata)).is_err());
1181
1182 let requirement = TableRequirement::RefSnapshotIdMatch {
1184 r#ref: "my_branch".to_string(),
1185 snapshot_id: None,
1186 };
1187 assert!(requirement.check(Some(&metadata)).is_ok());
1188
1189 let snapshot = Snapshot::builder()
1191 .with_snapshot_id(3051729675574597004)
1192 .with_sequence_number(10)
1193 .with_timestamp_ms(9992191116217)
1194 .with_manifest_list("s3://b/wh/.../s1.avro".to_string())
1195 .with_schema_id(0)
1196 .with_summary(Summary {
1197 operation: Operation::Append,
1198 additional_properties: HashMap::new(),
1199 })
1200 .build();
1201
1202 let builder = metadata.into_builder(None);
1203 let builder = TableUpdate::AddSnapshot {
1204 snapshot: snapshot.clone(),
1205 }
1206 .apply(builder)
1207 .unwrap();
1208 let metadata = TableUpdate::SetSnapshotRef {
1209 ref_name: MAIN_BRANCH.to_string(),
1210 reference: SnapshotReference {
1211 snapshot_id: snapshot.snapshot_id(),
1212 retention: SnapshotRetention::Branch {
1213 min_snapshots_to_keep: Some(10),
1214 max_snapshot_age_ms: None,
1215 max_ref_age_ms: None,
1216 },
1217 },
1218 }
1219 .apply(builder)
1220 .unwrap()
1221 .build()
1222 .unwrap()
1223 .metadata;
1224
1225 let requirement = TableRequirement::RefSnapshotIdMatch {
1227 r#ref: "main".to_string(),
1228 snapshot_id: Some(3051729675574597004),
1229 };
1230 assert!(requirement.check(Some(&metadata)).is_ok());
1231
1232 let requirement = TableRequirement::RefSnapshotIdMatch {
1234 r#ref: "main".to_string(),
1235 snapshot_id: Some(1),
1236 };
1237 assert!(requirement.check(Some(&metadata)).is_err());
1238 }
1239
1240 #[test]
1241 fn test_check_last_assigned_field_id() {
1242 let metadata = metadata();
1243
1244 let requirement = TableRequirement::LastAssignedFieldIdMatch {
1245 last_assigned_field_id: 1,
1246 };
1247 assert!(requirement.check(Some(&metadata)).is_err());
1248
1249 let requirement = TableRequirement::LastAssignedFieldIdMatch {
1250 last_assigned_field_id: 0,
1251 };
1252 assert!(requirement.check(Some(&metadata)).is_ok());
1253 }
1254
1255 #[test]
1256 fn test_check_current_schema_id() {
1257 let metadata = metadata();
1258
1259 let requirement = TableRequirement::CurrentSchemaIdMatch {
1260 current_schema_id: 1,
1261 };
1262 assert!(requirement.check(Some(&metadata)).is_err());
1263
1264 let requirement = TableRequirement::CurrentSchemaIdMatch {
1265 current_schema_id: 0,
1266 };
1267 assert!(requirement.check(Some(&metadata)).is_ok());
1268 }
1269
1270 #[test]
1271 fn test_check_last_assigned_partition_id() {
1272 let metadata = metadata();
1273 let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1274 last_assigned_partition_id: 0,
1275 };
1276 assert!(requirement.check(Some(&metadata)).is_err());
1277
1278 let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1279 last_assigned_partition_id: 999,
1280 };
1281 assert!(requirement.check(Some(&metadata)).is_ok());
1282 }
1283
1284 #[test]
1285 fn test_check_default_spec_id() {
1286 let metadata = metadata();
1287
1288 let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
1289 assert!(requirement.check(Some(&metadata)).is_err());
1290
1291 let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
1292 assert!(requirement.check(Some(&metadata)).is_ok());
1293 }
1294
1295 #[test]
1296 fn test_check_default_sort_order_id() {
1297 let metadata = metadata();
1298
1299 let requirement = TableRequirement::DefaultSortOrderIdMatch {
1300 default_sort_order_id: 1,
1301 };
1302 assert!(requirement.check(Some(&metadata)).is_err());
1303
1304 let requirement = TableRequirement::DefaultSortOrderIdMatch {
1305 default_sort_order_id: 0,
1306 };
1307 assert!(requirement.check(Some(&metadata)).is_ok());
1308 }
1309
1310 #[test]
1311 fn test_table_uuid() {
1312 test_serde_json(
1313 r#"
1314{
1315 "type": "assert-table-uuid",
1316 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1317}
1318 "#,
1319 TableRequirement::UuidMatch {
1320 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1321 },
1322 );
1323 }
1324
1325 #[test]
1326 fn test_assert_table_not_exists() {
1327 test_serde_json(
1328 r#"
1329{
1330 "type": "assert-create"
1331}
1332 "#,
1333 TableRequirement::NotExist,
1334 );
1335 }
1336
1337 #[test]
1338 fn test_assert_ref_snapshot_id() {
1339 test_serde_json(
1340 r#"
1341{
1342 "type": "assert-ref-snapshot-id",
1343 "ref": "snapshot-name",
1344 "snapshot-id": null
1345}
1346 "#,
1347 TableRequirement::RefSnapshotIdMatch {
1348 r#ref: "snapshot-name".to_string(),
1349 snapshot_id: None,
1350 },
1351 );
1352
1353 test_serde_json(
1354 r#"
1355{
1356 "type": "assert-ref-snapshot-id",
1357 "ref": "snapshot-name",
1358 "snapshot-id": 1
1359}
1360 "#,
1361 TableRequirement::RefSnapshotIdMatch {
1362 r#ref: "snapshot-name".to_string(),
1363 snapshot_id: Some(1),
1364 },
1365 );
1366 }
1367
1368 #[test]
1369 fn test_assert_last_assigned_field_id() {
1370 test_serde_json(
1371 r#"
1372{
1373 "type": "assert-last-assigned-field-id",
1374 "last-assigned-field-id": 12
1375}
1376 "#,
1377 TableRequirement::LastAssignedFieldIdMatch {
1378 last_assigned_field_id: 12,
1379 },
1380 );
1381 }
1382
1383 #[test]
1384 fn test_assert_current_schema_id() {
1385 test_serde_json(
1386 r#"
1387{
1388 "type": "assert-current-schema-id",
1389 "current-schema-id": 4
1390}
1391 "#,
1392 TableRequirement::CurrentSchemaIdMatch {
1393 current_schema_id: 4,
1394 },
1395 );
1396 }
1397
1398 #[test]
1399 fn test_assert_last_assigned_partition_id() {
1400 test_serde_json(
1401 r#"
1402{
1403 "type": "assert-last-assigned-partition-id",
1404 "last-assigned-partition-id": 1004
1405}
1406 "#,
1407 TableRequirement::LastAssignedPartitionIdMatch {
1408 last_assigned_partition_id: 1004,
1409 },
1410 );
1411 }
1412
1413 #[test]
1414 fn test_assert_default_spec_id() {
1415 test_serde_json(
1416 r#"
1417{
1418 "type": "assert-default-spec-id",
1419 "default-spec-id": 5
1420}
1421 "#,
1422 TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
1423 );
1424 }
1425
1426 #[test]
1427 fn test_assert_default_sort_order() {
1428 let json = r#"
1429{
1430 "type": "assert-default-sort-order-id",
1431 "default-sort-order-id": 10
1432}
1433 "#;
1434
1435 let update = TableRequirement::DefaultSortOrderIdMatch {
1436 default_sort_order_id: 10,
1437 };
1438
1439 test_serde_json(json, update);
1440 }
1441
1442 #[test]
1443 fn test_parse_assert_invalid() {
1444 assert!(
1445 serde_json::from_str::<TableRequirement>(
1446 r#"
1447{
1448 "default-sort-order-id": 10
1449}
1450"#
1451 )
1452 .is_err(),
1453 "Table requirements should not be parsed without type."
1454 );
1455 }
1456
1457 #[test]
1458 fn test_assign_uuid() {
1459 test_serde_json(
1460 r#"
1461{
1462 "action": "assign-uuid",
1463 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1464}
1465 "#,
1466 TableUpdate::AssignUuid {
1467 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1468 },
1469 );
1470 }
1471
1472 #[test]
1473 fn test_upgrade_format_version() {
1474 test_serde_json(
1475 r#"
1476{
1477 "action": "upgrade-format-version",
1478 "format-version": 2
1479}
1480 "#,
1481 TableUpdate::UpgradeFormatVersion {
1482 format_version: FormatVersion::V2,
1483 },
1484 );
1485 }
1486
1487 #[test]
1488 fn test_add_schema() {
1489 let test_schema = Schema::builder()
1490 .with_schema_id(1)
1491 .with_identifier_field_ids(vec![2])
1492 .with_fields(vec![
1493 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1494 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1495 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1496 ])
1497 .build()
1498 .unwrap();
1499 test_serde_json(
1500 r#"
1501{
1502 "action": "add-schema",
1503 "schema": {
1504 "type": "struct",
1505 "schema-id": 1,
1506 "fields": [
1507 {
1508 "id": 1,
1509 "name": "foo",
1510 "required": false,
1511 "type": "string"
1512 },
1513 {
1514 "id": 2,
1515 "name": "bar",
1516 "required": true,
1517 "type": "int"
1518 },
1519 {
1520 "id": 3,
1521 "name": "baz",
1522 "required": false,
1523 "type": "boolean"
1524 }
1525 ],
1526 "identifier-field-ids": [
1527 2
1528 ]
1529 },
1530 "last-column-id": 3
1531}
1532 "#,
1533 TableUpdate::AddSchema {
1534 schema: test_schema.clone(),
1535 },
1536 );
1537
1538 test_serde_json(
1539 r#"
1540{
1541 "action": "add-schema",
1542 "schema": {
1543 "type": "struct",
1544 "schema-id": 1,
1545 "fields": [
1546 {
1547 "id": 1,
1548 "name": "foo",
1549 "required": false,
1550 "type": "string"
1551 },
1552 {
1553 "id": 2,
1554 "name": "bar",
1555 "required": true,
1556 "type": "int"
1557 },
1558 {
1559 "id": 3,
1560 "name": "baz",
1561 "required": false,
1562 "type": "boolean"
1563 }
1564 ],
1565 "identifier-field-ids": [
1566 2
1567 ]
1568 }
1569}
1570 "#,
1571 TableUpdate::AddSchema {
1572 schema: test_schema.clone(),
1573 },
1574 );
1575 }
1576
1577 #[test]
1578 fn test_set_current_schema() {
1579 test_serde_json(
1580 r#"
1581{
1582 "action": "set-current-schema",
1583 "schema-id": 23
1584}
1585 "#,
1586 TableUpdate::SetCurrentSchema { schema_id: 23 },
1587 );
1588 }
1589
1590 #[test]
1591 fn test_add_spec() {
1592 test_serde_json(
1593 r#"
1594{
1595 "action": "add-spec",
1596 "spec": {
1597 "fields": [
1598 {
1599 "source-id": 4,
1600 "name": "ts_day",
1601 "transform": "day"
1602 },
1603 {
1604 "source-id": 1,
1605 "name": "id_bucket",
1606 "transform": "bucket[16]"
1607 },
1608 {
1609 "source-id": 2,
1610 "name": "id_truncate",
1611 "transform": "truncate[4]"
1612 }
1613 ]
1614 }
1615}
1616 "#,
1617 TableUpdate::AddSpec {
1618 spec: UnboundPartitionSpec::builder()
1619 .add_partition_field(4, "ts_day".to_string(), Transform::Day)
1620 .unwrap()
1621 .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
1622 .unwrap()
1623 .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
1624 .unwrap()
1625 .build(),
1626 },
1627 );
1628 }
1629
1630 #[test]
1631 fn test_set_default_spec() {
1632 test_serde_json(
1633 r#"
1634{
1635 "action": "set-default-spec",
1636 "spec-id": 1
1637}
1638 "#,
1639 TableUpdate::SetDefaultSpec { spec_id: 1 },
1640 )
1641 }
1642
1643 #[test]
1644 fn test_add_sort_order() {
1645 let json = r#"
1646{
1647 "action": "add-sort-order",
1648 "sort-order": {
1649 "order-id": 1,
1650 "fields": [
1651 {
1652 "transform": "identity",
1653 "source-id": 2,
1654 "direction": "asc",
1655 "null-order": "nulls-first"
1656 },
1657 {
1658 "transform": "bucket[4]",
1659 "source-id": 3,
1660 "direction": "desc",
1661 "null-order": "nulls-last"
1662 }
1663 ]
1664 }
1665}
1666 "#;
1667
1668 let update = TableUpdate::AddSortOrder {
1669 sort_order: SortOrder::builder()
1670 .with_order_id(1)
1671 .with_sort_field(
1672 SortField::builder()
1673 .source_id(2)
1674 .direction(SortDirection::Ascending)
1675 .null_order(NullOrder::First)
1676 .transform(Transform::Identity)
1677 .build(),
1678 )
1679 .with_sort_field(
1680 SortField::builder()
1681 .source_id(3)
1682 .direction(SortDirection::Descending)
1683 .null_order(NullOrder::Last)
1684 .transform(Transform::Bucket(4))
1685 .build(),
1686 )
1687 .build_unbound()
1688 .unwrap(),
1689 };
1690
1691 test_serde_json(json, update);
1692 }
1693
1694 #[test]
1695 fn test_set_default_order() {
1696 let json = r#"
1697{
1698 "action": "set-default-sort-order",
1699 "sort-order-id": 2
1700}
1701 "#;
1702 let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
1703
1704 test_serde_json(json, update);
1705 }
1706
1707 #[test]
1708 fn test_add_snapshot() {
1709 let json = r#"
1710{
1711 "action": "add-snapshot",
1712 "snapshot": {
1713 "snapshot-id": 3055729675574597000,
1714 "parent-snapshot-id": 3051729675574597000,
1715 "timestamp-ms": 1555100955770,
1716 "sequence-number": 1,
1717 "summary": {
1718 "operation": "append"
1719 },
1720 "manifest-list": "s3://a/b/2.avro",
1721 "schema-id": 1
1722 }
1723}
1724 "#;
1725
1726 let update = TableUpdate::AddSnapshot {
1727 snapshot: Snapshot::builder()
1728 .with_snapshot_id(3055729675574597000)
1729 .with_parent_snapshot_id(Some(3051729675574597000))
1730 .with_timestamp_ms(1555100955770)
1731 .with_sequence_number(1)
1732 .with_manifest_list("s3://a/b/2.avro")
1733 .with_schema_id(1)
1734 .with_summary(Summary {
1735 operation: Operation::Append,
1736 additional_properties: HashMap::default(),
1737 })
1738 .build(),
1739 };
1740
1741 test_serde_json(json, update);
1742 }
1743
1744 #[test]
1745 fn test_add_snapshot_v1() {
1746 let json = r#"
1747{
1748 "action": "add-snapshot",
1749 "snapshot": {
1750 "snapshot-id": 3055729675574597000,
1751 "parent-snapshot-id": 3051729675574597000,
1752 "timestamp-ms": 1555100955770,
1753 "summary": {
1754 "operation": "append"
1755 },
1756 "manifest-list": "s3://a/b/2.avro"
1757 }
1758}
1759 "#;
1760
1761 let update = TableUpdate::AddSnapshot {
1762 snapshot: Snapshot::builder()
1763 .with_snapshot_id(3055729675574597000)
1764 .with_parent_snapshot_id(Some(3051729675574597000))
1765 .with_timestamp_ms(1555100955770)
1766 .with_sequence_number(0)
1767 .with_manifest_list("s3://a/b/2.avro")
1768 .with_summary(Summary {
1769 operation: Operation::Append,
1770 additional_properties: HashMap::default(),
1771 })
1772 .build(),
1773 };
1774
1775 let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
1776 assert_eq!(actual, update, "Parsed value is not equal to expected");
1777 }
1778
1779 #[test]
1780 fn test_add_snapshot_v3() {
1781 let json = serde_json::json!(
1782 {
1783 "action": "add-snapshot",
1784 "snapshot": {
1785 "snapshot-id": 3055729675574597000i64,
1786 "parent-snapshot-id": 3051729675574597000i64,
1787 "timestamp-ms": 1555100955770i64,
1788 "first-row-id":0,
1789 "added-rows":2,
1790 "key-id":"key123",
1791 "summary": {
1792 "operation": "append"
1793 },
1794 "manifest-list": "s3://a/b/2.avro"
1795 }
1796 });
1797
1798 let update = TableUpdate::AddSnapshot {
1799 snapshot: Snapshot::builder()
1800 .with_snapshot_id(3055729675574597000)
1801 .with_parent_snapshot_id(Some(3051729675574597000))
1802 .with_timestamp_ms(1555100955770)
1803 .with_sequence_number(0)
1804 .with_manifest_list("s3://a/b/2.avro")
1805 .with_row_range(0, 2)
1806 .with_encryption_key_id(Some("key123".to_string()))
1807 .with_summary(Summary {
1808 operation: Operation::Append,
1809 additional_properties: HashMap::default(),
1810 })
1811 .build(),
1812 };
1813
1814 let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json");
1815 assert_eq!(actual, update, "Parsed value is not equal to expected");
1816 let restored: TableUpdate = serde_json::from_str(
1817 &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1818 )
1819 .expect("Failed to parse from serialized json");
1820 assert_eq!(restored, update);
1821 }
1822
1823 #[test]
1824 fn test_remove_snapshots() {
1825 let json = r#"
1826{
1827 "action": "remove-snapshots",
1828 "snapshot-ids": [
1829 1,
1830 2
1831 ]
1832}
1833 "#;
1834
1835 let update = TableUpdate::RemoveSnapshots {
1836 snapshot_ids: vec![1, 2],
1837 };
1838 test_serde_json(json, update);
1839 }
1840
1841 #[test]
1842 fn test_remove_snapshot_ref() {
1843 let json = r#"
1844{
1845 "action": "remove-snapshot-ref",
1846 "ref-name": "snapshot-ref"
1847}
1848 "#;
1849
1850 let update = TableUpdate::RemoveSnapshotRef {
1851 ref_name: "snapshot-ref".to_string(),
1852 };
1853 test_serde_json(json, update);
1854 }
1855
1856 #[test]
1857 fn test_set_snapshot_ref_tag() {
1858 let json = r#"
1859{
1860 "action": "set-snapshot-ref",
1861 "type": "tag",
1862 "ref-name": "hank",
1863 "snapshot-id": 1,
1864 "max-ref-age-ms": 1
1865}
1866 "#;
1867
1868 let update = TableUpdate::SetSnapshotRef {
1869 ref_name: "hank".to_string(),
1870 reference: SnapshotReference {
1871 snapshot_id: 1,
1872 retention: SnapshotRetention::Tag {
1873 max_ref_age_ms: Some(1),
1874 },
1875 },
1876 };
1877
1878 test_serde_json(json, update);
1879 }
1880
1881 #[test]
1882 fn test_set_snapshot_ref_branch() {
1883 let json = r#"
1884{
1885 "action": "set-snapshot-ref",
1886 "type": "branch",
1887 "ref-name": "hank",
1888 "snapshot-id": 1,
1889 "min-snapshots-to-keep": 2,
1890 "max-snapshot-age-ms": 3,
1891 "max-ref-age-ms": 4
1892}
1893 "#;
1894
1895 let update = TableUpdate::SetSnapshotRef {
1896 ref_name: "hank".to_string(),
1897 reference: SnapshotReference {
1898 snapshot_id: 1,
1899 retention: SnapshotRetention::Branch {
1900 min_snapshots_to_keep: Some(2),
1901 max_snapshot_age_ms: Some(3),
1902 max_ref_age_ms: Some(4),
1903 },
1904 },
1905 };
1906
1907 test_serde_json(json, update);
1908 }
1909
1910 #[test]
1911 fn test_set_properties() {
1912 let json = r#"
1913{
1914 "action": "set-properties",
1915 "updates": {
1916 "prop1": "v1",
1917 "prop2": "v2"
1918 }
1919}
1920 "#;
1921
1922 let update = TableUpdate::SetProperties {
1923 updates: vec![
1924 ("prop1".to_string(), "v1".to_string()),
1925 ("prop2".to_string(), "v2".to_string()),
1926 ]
1927 .into_iter()
1928 .collect(),
1929 };
1930
1931 test_serde_json(json, update);
1932 }
1933
1934 #[test]
1935 fn test_remove_properties() {
1936 let json = r#"
1937{
1938 "action": "remove-properties",
1939 "removals": [
1940 "prop1",
1941 "prop2"
1942 ]
1943}
1944 "#;
1945
1946 let update = TableUpdate::RemoveProperties {
1947 removals: vec!["prop1".to_string(), "prop2".to_string()],
1948 };
1949
1950 test_serde_json(json, update);
1951 }
1952
1953 #[test]
1954 fn test_set_location() {
1955 let json = r#"
1956{
1957 "action": "set-location",
1958 "location": "s3://bucket/warehouse/tbl_location"
1959}
1960 "#;
1961
1962 let update = TableUpdate::SetLocation {
1963 location: "s3://bucket/warehouse/tbl_location".to_string(),
1964 };
1965
1966 test_serde_json(json, update);
1967 }
1968
1969 #[test]
1970 fn test_table_update_apply() {
1971 let table_creation = TableCreation::builder()
1972 .location("s3://db/table".to_string())
1973 .name("table".to_string())
1974 .properties(HashMap::new())
1975 .schema(Schema::builder().build().unwrap())
1976 .build();
1977 let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
1978 .unwrap()
1979 .build()
1980 .unwrap()
1981 .metadata;
1982 let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
1983 table_metadata,
1984 Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
1985 );
1986
1987 let uuid = uuid::Uuid::new_v4();
1988 let update = TableUpdate::AssignUuid { uuid };
1989 let updated_metadata = update
1990 .apply(table_metadata_builder)
1991 .unwrap()
1992 .build()
1993 .unwrap()
1994 .metadata;
1995 assert_eq!(updated_metadata.uuid(), uuid);
1996 }
1997
1998 #[test]
1999 fn test_view_assign_uuid() {
2000 test_serde_json(
2001 r#"
2002{
2003 "action": "assign-uuid",
2004 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
2005}
2006 "#,
2007 ViewUpdate::AssignUuid {
2008 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
2009 },
2010 );
2011 }
2012
2013 #[test]
2014 fn test_view_upgrade_format_version() {
2015 test_serde_json(
2016 r#"
2017{
2018 "action": "upgrade-format-version",
2019 "format-version": 1
2020}
2021 "#,
2022 ViewUpdate::UpgradeFormatVersion {
2023 format_version: ViewFormatVersion::V1,
2024 },
2025 );
2026 }
2027
2028 #[test]
2029 fn test_view_add_schema() {
2030 let test_schema = Schema::builder()
2031 .with_schema_id(1)
2032 .with_identifier_field_ids(vec![2])
2033 .with_fields(vec![
2034 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
2035 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2036 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
2037 ])
2038 .build()
2039 .unwrap();
2040 test_serde_json(
2041 r#"
2042{
2043 "action": "add-schema",
2044 "schema": {
2045 "type": "struct",
2046 "schema-id": 1,
2047 "fields": [
2048 {
2049 "id": 1,
2050 "name": "foo",
2051 "required": false,
2052 "type": "string"
2053 },
2054 {
2055 "id": 2,
2056 "name": "bar",
2057 "required": true,
2058 "type": "int"
2059 },
2060 {
2061 "id": 3,
2062 "name": "baz",
2063 "required": false,
2064 "type": "boolean"
2065 }
2066 ],
2067 "identifier-field-ids": [
2068 2
2069 ]
2070 },
2071 "last-column-id": 3
2072}
2073 "#,
2074 ViewUpdate::AddSchema {
2075 schema: test_schema.clone(),
2076 last_column_id: Some(3),
2077 },
2078 );
2079 }
2080
2081 #[test]
2082 fn test_view_set_location() {
2083 test_serde_json(
2084 r#"
2085{
2086 "action": "set-location",
2087 "location": "s3://db/view"
2088}
2089 "#,
2090 ViewUpdate::SetLocation {
2091 location: "s3://db/view".to_string(),
2092 },
2093 );
2094 }
2095
2096 #[test]
2097 fn test_view_set_properties() {
2098 test_serde_json(
2099 r#"
2100{
2101 "action": "set-properties",
2102 "updates": {
2103 "prop1": "v1",
2104 "prop2": "v2"
2105 }
2106}
2107 "#,
2108 ViewUpdate::SetProperties {
2109 updates: vec![
2110 ("prop1".to_string(), "v1".to_string()),
2111 ("prop2".to_string(), "v2".to_string()),
2112 ]
2113 .into_iter()
2114 .collect(),
2115 },
2116 );
2117 }
2118
2119 #[test]
2120 fn test_view_remove_properties() {
2121 test_serde_json(
2122 r#"
2123{
2124 "action": "remove-properties",
2125 "removals": [
2126 "prop1",
2127 "prop2"
2128 ]
2129}
2130 "#,
2131 ViewUpdate::RemoveProperties {
2132 removals: vec!["prop1".to_string(), "prop2".to_string()],
2133 },
2134 );
2135 }
2136
2137 #[test]
2138 fn test_view_add_view_version() {
2139 test_serde_json(
2140 r#"
2141{
2142 "action": "add-view-version",
2143 "view-version": {
2144 "version-id" : 1,
2145 "timestamp-ms" : 1573518431292,
2146 "schema-id" : 1,
2147 "default-catalog" : "prod",
2148 "default-namespace" : [ "default" ],
2149 "summary" : {
2150 "engine-name" : "Spark"
2151 },
2152 "representations" : [ {
2153 "type" : "sql",
2154 "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
2155 "dialect" : "spark"
2156 } ]
2157 }
2158}
2159 "#,
2160 ViewUpdate::AddViewVersion {
2161 view_version: ViewVersion::builder()
2162 .with_version_id(1)
2163 .with_timestamp_ms(1573518431292)
2164 .with_schema_id(1)
2165 .with_default_catalog(Some("prod".to_string()))
2166 .with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
2167 .with_summary(
2168 vec![("engine-name".to_string(), "Spark".to_string())]
2169 .into_iter()
2170 .collect(),
2171 )
2172 .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
2173 sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
2174 dialect: "spark".to_string(),
2175 })]))
2176 .build(),
2177 },
2178 );
2179 }
2180
2181 #[test]
2182 fn test_view_set_current_view_version() {
2183 test_serde_json(
2184 r#"
2185{
2186 "action": "set-current-view-version",
2187 "view-version-id": 1
2188}
2189 "#,
2190 ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
2191 );
2192 }
2193
2194 #[test]
2195 fn test_remove_partition_specs_update() {
2196 test_serde_json(
2197 r#"
2198{
2199 "action": "remove-partition-specs",
2200 "spec-ids": [1, 2]
2201}
2202 "#,
2203 TableUpdate::RemovePartitionSpecs {
2204 spec_ids: vec![1, 2],
2205 },
2206 );
2207 }
2208
2209 #[test]
2210 fn test_set_statistics_file() {
2211 test_serde_json(
2212 r#"
2213 {
2214 "action": "set-statistics",
2215 "snapshot-id": 1940541653261589030,
2216 "statistics": {
2217 "snapshot-id": 1940541653261589030,
2218 "statistics-path": "s3://bucket/warehouse/stats.puffin",
2219 "file-size-in-bytes": 124,
2220 "file-footer-size-in-bytes": 27,
2221 "blob-metadata": [
2222 {
2223 "type": "boring-type",
2224 "snapshot-id": 1940541653261589030,
2225 "sequence-number": 2,
2226 "fields": [
2227 1
2228 ],
2229 "properties": {
2230 "prop-key": "prop-value"
2231 }
2232 }
2233 ]
2234 }
2235 }
2236 "#,
2237 TableUpdate::SetStatistics {
2238 statistics: StatisticsFile {
2239 snapshot_id: 1940541653261589030,
2240 statistics_path: "s3://bucket/warehouse/stats.puffin".to_string(),
2241 file_size_in_bytes: 124,
2242 file_footer_size_in_bytes: 27,
2243 key_metadata: None,
2244 blob_metadata: vec![BlobMetadata {
2245 r#type: "boring-type".to_string(),
2246 snapshot_id: 1940541653261589030,
2247 sequence_number: 2,
2248 fields: vec![1],
2249 properties: vec![("prop-key".to_string(), "prop-value".to_string())]
2250 .into_iter()
2251 .collect(),
2252 }],
2253 },
2254 },
2255 );
2256 }
2257
2258 #[test]
2259 fn test_remove_statistics_file() {
2260 test_serde_json(
2261 r#"
2262 {
2263 "action": "remove-statistics",
2264 "snapshot-id": 1940541653261589030
2265 }
2266 "#,
2267 TableUpdate::RemoveStatistics {
2268 snapshot_id: 1940541653261589030,
2269 },
2270 );
2271 }
2272
2273 #[test]
2274 fn test_set_partition_statistics_file() {
2275 test_serde_json(
2276 r#"
2277 {
2278 "action": "set-partition-statistics",
2279 "partition-statistics": {
2280 "snapshot-id": 1940541653261589030,
2281 "statistics-path": "s3://bucket/warehouse/stats1.parquet",
2282 "file-size-in-bytes": 43
2283 }
2284 }
2285 "#,
2286 TableUpdate::SetPartitionStatistics {
2287 partition_statistics: PartitionStatisticsFile {
2288 snapshot_id: 1940541653261589030,
2289 statistics_path: "s3://bucket/warehouse/stats1.parquet".to_string(),
2290 file_size_in_bytes: 43,
2291 },
2292 },
2293 )
2294 }
2295
2296 #[test]
2297 fn test_remove_partition_statistics_file() {
2298 test_serde_json(
2299 r#"
2300 {
2301 "action": "remove-partition-statistics",
2302 "snapshot-id": 1940541653261589030
2303 }
2304 "#,
2305 TableUpdate::RemovePartitionStatistics {
2306 snapshot_id: 1940541653261589030,
2307 },
2308 )
2309 }
2310
2311 #[test]
2312 fn test_remove_schema_update() {
2313 test_serde_json(
2314 r#"
2315 {
2316 "action": "remove-schemas",
2317 "schema-ids": [1, 2]
2318 }
2319 "#,
2320 TableUpdate::RemoveSchemas {
2321 schema_ids: vec![1, 2],
2322 },
2323 );
2324 }
2325
2326 #[test]
2327 fn test_add_encryption_key() {
2328 let key_bytes = "key".as_bytes();
2329 let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
2330 test_serde_json(
2331 format!(
2332 r#"
2333 {{
2334 "action": "add-encryption-key",
2335 "encryption-key": {{
2336 "key-id": "a",
2337 "encrypted-key-metadata": "{encoded_key}",
2338 "encrypted-by-id": "b"
2339 }}
2340 }}
2341 "#
2342 ),
2343 TableUpdate::AddEncryptionKey {
2344 encryption_key: EncryptedKey::builder()
2345 .key_id("a")
2346 .encrypted_key_metadata(key_bytes.to_vec())
2347 .encrypted_by_id("b")
2348 .build(),
2349 },
2350 );
2351 }
2352
2353 #[test]
2354 fn test_remove_encryption_key() {
2355 test_serde_json(
2356 r#"
2357 {
2358 "action": "remove-encryption-key",
2359 "key-id": "a"
2360 }
2361 "#,
2362 TableUpdate::RemoveEncryptionKey {
2363 key_id: "a".to_string(),
2364 },
2365 );
2366 }
2367
2368 #[test]
2369 fn test_table_commit() {
2370 let table = {
2371 let file = File::open(format!(
2372 "{}/testdata/table_metadata/{}",
2373 env!("CARGO_MANIFEST_DIR"),
2374 "TableMetadataV2Valid.json"
2375 ))
2376 .unwrap();
2377 let reader = BufReader::new(file);
2378 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2379
2380 Table::builder()
2381 .metadata(resp)
2382 .metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string())
2383 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2384 .file_io(FileIO::new_with_memory())
2385 .build()
2386 .unwrap()
2387 };
2388
2389 let updates = vec![
2390 TableUpdate::SetLocation {
2391 location: "s3://bucket/test/new_location/data".to_string(),
2392 },
2393 TableUpdate::SetProperties {
2394 updates: vec![
2395 ("prop1".to_string(), "v1".to_string()),
2396 ("prop2".to_string(), "v2".to_string()),
2397 ]
2398 .into_iter()
2399 .collect(),
2400 },
2401 ];
2402
2403 let requirements = vec![TableRequirement::UuidMatch {
2404 uuid: table.metadata().table_uuid,
2405 }];
2406
2407 let table_commit = TableCommit::builder()
2408 .ident(table.identifier().to_owned())
2409 .updates(updates)
2410 .requirements(requirements)
2411 .build();
2412
2413 let updated_table = table_commit.apply(table).unwrap();
2414
2415 assert_eq!(
2416 updated_table.metadata().properties.get("prop1").unwrap(),
2417 "v1"
2418 );
2419 assert_eq!(
2420 updated_table.metadata().properties.get("prop2").unwrap(),
2421 "v2"
2422 );
2423
2424 assert!(
2426 updated_table
2427 .metadata_location()
2428 .unwrap()
2429 .starts_with("s3://bucket/test/location/metadata/00001-")
2430 );
2431
2432 assert_eq!(
2433 updated_table.metadata().location,
2434 "s3://bucket/test/new_location/data",
2435 );
2436 }
2437}