1pub mod memory;
21mod metadata_location;
22
23use std::collections::HashMap;
24use std::fmt::{Debug, Display};
25use std::future::Future;
26use std::mem::take;
27use std::ops::Deref;
28use std::str::FromStr;
29use std::sync::Arc;
30
31use _serde::deserialize_snapshot;
32use async_trait::async_trait;
33pub use memory::MemoryCatalog;
34pub use metadata_location::*;
35#[cfg(test)]
36use mockall::automock;
37use serde_derive::{Deserialize, Serialize};
38use typed_builder::TypedBuilder;
39use uuid::Uuid;
40
41use crate::spec::{
42 FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot, SnapshotReference,
43 SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder, UnboundPartitionSpec,
44 ViewFormatVersion, ViewRepresentations, ViewVersion,
45};
46use crate::table::Table;
47use crate::{Error, ErrorKind, Result};
48
49#[async_trait]
51#[cfg_attr(test, automock)]
52pub trait Catalog: Debug + Sync + Send {
53 async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
55 -> Result<Vec<NamespaceIdent>>;
56
57 async fn create_namespace(
59 &self,
60 namespace: &NamespaceIdent,
61 properties: HashMap<String, String>,
62 ) -> Result<Namespace>;
63
64 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
66
67 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
69
70 async fn update_namespace(
76 &self,
77 namespace: &NamespaceIdent,
78 properties: HashMap<String, String>,
79 ) -> Result<()>;
80
81 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
83
84 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
86
87 async fn create_table(
89 &self,
90 namespace: &NamespaceIdent,
91 creation: TableCreation,
92 ) -> Result<Table>;
93
94 async fn load_table(&self, table: &TableIdent) -> Result<Table>;
96
97 async fn drop_table(&self, table: &TableIdent) -> Result<()>;
99
100 async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
102
103 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
105
106 async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table>;
108
109 async fn update_table(&self, commit: TableCommit) -> Result<Table>;
111}
112
113pub trait CatalogBuilder: Default + Debug + Send + Sync {
115 type C: Catalog;
117 fn load(
119 self,
120 name: impl Into<String>,
121 props: HashMap<String, String>,
122 ) -> impl Future<Output = Result<Self::C>> + Send;
123}
124
125#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
131pub struct NamespaceIdent(Vec<String>);
132
133impl NamespaceIdent {
134 pub fn new(name: String) -> Self {
136 Self(vec![name])
137 }
138
139 pub fn from_vec(names: Vec<String>) -> Result<Self> {
141 if names.is_empty() {
142 return Err(Error::new(
143 ErrorKind::DataInvalid,
144 "Namespace identifier can't be empty!",
145 ));
146 }
147 Ok(Self(names))
148 }
149
150 pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
152 Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
153 }
154
155 pub fn to_url_string(&self) -> String {
157 self.as_ref().join("\u{001f}")
158 }
159
160 pub fn inner(self) -> Vec<String> {
162 self.0
163 }
164
165 pub fn parent(&self) -> Option<Self> {
168 self.0.split_last().and_then(|(_, parent)| {
169 if parent.is_empty() {
170 None
171 } else {
172 Some(Self(parent.to_vec()))
173 }
174 })
175 }
176}
177
178impl AsRef<Vec<String>> for NamespaceIdent {
179 fn as_ref(&self) -> &Vec<String> {
180 &self.0
181 }
182}
183
184impl Deref for NamespaceIdent {
185 type Target = [String];
186
187 fn deref(&self) -> &Self::Target {
188 &self.0
189 }
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
194pub struct Namespace {
195 name: NamespaceIdent,
196 properties: HashMap<String, String>,
197}
198
199impl Namespace {
200 pub fn new(name: NamespaceIdent) -> Self {
202 Self::with_properties(name, HashMap::default())
203 }
204
205 pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
207 Self { name, properties }
208 }
209
210 pub fn name(&self) -> &NamespaceIdent {
212 &self.name
213 }
214
215 pub fn properties(&self) -> &HashMap<String, String> {
217 &self.properties
218 }
219}
220
221impl Display for NamespaceIdent {
222 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223 write!(f, "{}", self.0.join("."))
224 }
225}
226
227#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
229pub struct TableIdent {
230 pub namespace: NamespaceIdent,
232 pub name: String,
234}
235
236impl TableIdent {
237 pub fn new(namespace: NamespaceIdent, name: String) -> Self {
239 Self { namespace, name }
240 }
241
242 pub fn namespace(&self) -> &NamespaceIdent {
244 &self.namespace
245 }
246
247 pub fn name(&self) -> &str {
249 &self.name
250 }
251
252 pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
254 let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
255 let table_name = vec.pop().ok_or_else(|| {
256 Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
257 })?;
258 let namespace_ident = NamespaceIdent::from_vec(vec)?;
259
260 Ok(Self {
261 namespace: namespace_ident,
262 name: table_name,
263 })
264 }
265}
266
267impl Display for TableIdent {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 write!(f, "{}.{}", self.namespace, self.name)
270 }
271}
272
273#[derive(Debug, TypedBuilder)]
275pub struct TableCreation {
276 pub name: String,
278 #[builder(default, setter(strip_option(fallback = location_opt)))]
280 pub location: Option<String>,
281 pub schema: Schema,
283 #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))]
285 pub partition_spec: Option<UnboundPartitionSpec>,
286 #[builder(default, setter(strip_option(fallback = sort_order_opt)))]
288 pub sort_order: Option<SortOrder>,
289 #[builder(default, setter(transform = |props: impl IntoIterator<Item=(String, String)>| {
291 props.into_iter().collect()
292 }))]
293 pub properties: HashMap<String, String>,
294}
295
296#[derive(Debug, TypedBuilder)]
302#[builder(build_method(vis = "pub(crate)"))]
303pub struct TableCommit {
304 ident: TableIdent,
306 requirements: Vec<TableRequirement>,
310 updates: Vec<TableUpdate>,
312}
313
314impl TableCommit {
315 pub fn identifier(&self) -> &TableIdent {
317 &self.ident
318 }
319
320 pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
322 take(&mut self.requirements)
323 }
324
325 pub fn take_updates(&mut self) -> Vec<TableUpdate> {
327 take(&mut self.updates)
328 }
329
330 pub fn apply(self, table: Table) -> Result<Table> {
336 for requirement in self.requirements {
338 requirement.check(Some(table.metadata()))?;
339 }
340
341 let current_metadata_location = table.metadata_location_result()?;
343
344 let mut metadata_builder = table
346 .metadata()
347 .clone()
348 .into_builder(Some(current_metadata_location.to_string()));
349 for update in self.updates {
350 metadata_builder = update.apply(metadata_builder)?;
351 }
352
353 let new_metadata_location = MetadataLocation::from_str(current_metadata_location)?
355 .with_next_version()
356 .to_string();
357
358 Ok(table
359 .with_metadata(Arc::new(metadata_builder.build()?.metadata))
360 .with_metadata_location(new_metadata_location))
361 }
362}
363
364#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
366#[serde(tag = "type")]
367pub enum TableRequirement {
368 #[serde(rename = "assert-create")]
370 NotExist,
371 #[serde(rename = "assert-table-uuid")]
373 UuidMatch {
374 uuid: Uuid,
376 },
377 #[serde(rename = "assert-ref-snapshot-id")]
380 RefSnapshotIdMatch {
381 r#ref: String,
383 #[serde(rename = "snapshot-id")]
386 snapshot_id: Option<i64>,
387 },
388 #[serde(rename = "assert-last-assigned-field-id")]
390 LastAssignedFieldIdMatch {
391 #[serde(rename = "last-assigned-field-id")]
393 last_assigned_field_id: i32,
394 },
395 #[serde(rename = "assert-current-schema-id")]
397 CurrentSchemaIdMatch {
398 #[serde(rename = "current-schema-id")]
400 current_schema_id: SchemaId,
401 },
402 #[serde(rename = "assert-last-assigned-partition-id")]
405 LastAssignedPartitionIdMatch {
406 #[serde(rename = "last-assigned-partition-id")]
408 last_assigned_partition_id: i32,
409 },
410 #[serde(rename = "assert-default-spec-id")]
412 DefaultSpecIdMatch {
413 #[serde(rename = "default-spec-id")]
415 default_spec_id: i32,
416 },
417 #[serde(rename = "assert-default-sort-order-id")]
419 DefaultSortOrderIdMatch {
420 #[serde(rename = "default-sort-order-id")]
422 default_sort_order_id: i64,
423 },
424}
425
426#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
428#[serde(tag = "action", rename_all = "kebab-case")]
429#[allow(clippy::large_enum_variant)]
430pub enum TableUpdate {
431 #[serde(rename_all = "kebab-case")]
433 UpgradeFormatVersion {
434 format_version: FormatVersion,
436 },
437 #[serde(rename_all = "kebab-case")]
439 AssignUuid {
440 uuid: Uuid,
442 },
443 #[serde(rename_all = "kebab-case")]
445 AddSchema {
446 schema: Schema,
448 },
449 #[serde(rename_all = "kebab-case")]
451 SetCurrentSchema {
452 schema_id: i32,
454 },
455 AddSpec {
457 spec: UnboundPartitionSpec,
459 },
460 #[serde(rename_all = "kebab-case")]
462 SetDefaultSpec {
463 spec_id: i32,
465 },
466 #[serde(rename_all = "kebab-case")]
468 AddSortOrder {
469 sort_order: SortOrder,
471 },
472 #[serde(rename_all = "kebab-case")]
474 SetDefaultSortOrder {
475 sort_order_id: i64,
477 },
478 #[serde(rename_all = "kebab-case")]
480 AddSnapshot {
481 #[serde(deserialize_with = "deserialize_snapshot")]
483 snapshot: Snapshot,
484 },
485 #[serde(rename_all = "kebab-case")]
487 SetSnapshotRef {
488 ref_name: String,
490 #[serde(flatten)]
492 reference: SnapshotReference,
493 },
494 #[serde(rename_all = "kebab-case")]
496 RemoveSnapshots {
497 snapshot_ids: Vec<i64>,
499 },
500 #[serde(rename_all = "kebab-case")]
502 RemoveSnapshotRef {
503 ref_name: String,
505 },
506 SetLocation {
508 location: String,
510 },
511 SetProperties {
513 updates: HashMap<String, String>,
515 },
516 RemoveProperties {
518 removals: Vec<String>,
520 },
521 #[serde(rename_all = "kebab-case")]
523 RemovePartitionSpecs {
524 spec_ids: Vec<i32>,
526 },
527 #[serde(with = "_serde_set_statistics")]
529 SetStatistics {
530 statistics: StatisticsFile,
532 },
533 #[serde(rename_all = "kebab-case")]
535 RemoveStatistics {
536 snapshot_id: i64,
538 },
539 #[serde(rename_all = "kebab-case")]
541 SetPartitionStatistics {
542 partition_statistics: PartitionStatisticsFile,
544 },
545 #[serde(rename_all = "kebab-case")]
547 RemovePartitionStatistics {
548 snapshot_id: i64,
550 },
551 #[serde(rename_all = "kebab-case")]
553 RemoveSchemas {
554 schema_ids: Vec<i32>,
556 },
557}
558
559impl TableUpdate {
560 pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
562 match self {
563 TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
564 TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
565 TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
566 TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
567 TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
568 TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
569 TableUpdate::SetDefaultSortOrder { sort_order_id } => {
570 builder.set_default_sort_order(sort_order_id)
571 }
572 TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
573 TableUpdate::SetSnapshotRef {
574 ref_name,
575 reference,
576 } => builder.set_ref(&ref_name, reference),
577 TableUpdate::RemoveSnapshots { snapshot_ids } => {
578 Ok(builder.remove_snapshots(&snapshot_ids))
579 }
580 TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
581 TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
582 TableUpdate::SetProperties { updates } => builder.set_properties(updates),
583 TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
584 TableUpdate::UpgradeFormatVersion { format_version } => {
585 builder.upgrade_format_version(format_version)
586 }
587 TableUpdate::RemovePartitionSpecs { spec_ids } => {
588 builder.remove_partition_specs(&spec_ids)
589 }
590 TableUpdate::SetStatistics { statistics } => Ok(builder.set_statistics(statistics)),
591 TableUpdate::RemoveStatistics { snapshot_id } => {
592 Ok(builder.remove_statistics(snapshot_id))
593 }
594 TableUpdate::SetPartitionStatistics {
595 partition_statistics,
596 } => Ok(builder.set_partition_statistics(partition_statistics)),
597 TableUpdate::RemovePartitionStatistics { snapshot_id } => {
598 Ok(builder.remove_partition_statistics(snapshot_id))
599 }
600 TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
601 }
602 }
603}
604
605impl TableRequirement {
606 pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
611 if let Some(metadata) = metadata {
612 match self {
613 TableRequirement::NotExist => {
614 return Err(Error::new(
615 ErrorKind::CatalogCommitConflicts,
616 format!(
617 "Requirement failed: Table with id {} already exists",
618 metadata.uuid()
619 ),
620 )
621 .with_retryable(true));
622 }
623 TableRequirement::UuidMatch { uuid } => {
624 if &metadata.uuid() != uuid {
625 return Err(Error::new(
626 ErrorKind::CatalogCommitConflicts,
627 "Requirement failed: Table UUID does not match",
628 )
629 .with_context("expected", *uuid)
630 .with_context("found", metadata.uuid())
631 .with_retryable(true));
632 }
633 }
634 TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
635 if metadata.current_schema_id != *current_schema_id {
637 return Err(Error::new(
638 ErrorKind::CatalogCommitConflicts,
639 "Requirement failed: Current schema id does not match",
640 )
641 .with_context("expected", current_schema_id.to_string())
642 .with_context("found", metadata.current_schema_id.to_string())
643 .with_retryable(true));
644 }
645 }
646 TableRequirement::DefaultSortOrderIdMatch {
647 default_sort_order_id,
648 } => {
649 if metadata.default_sort_order().order_id != *default_sort_order_id {
650 return Err(Error::new(
651 ErrorKind::CatalogCommitConflicts,
652 "Requirement failed: Default sort order id does not match",
653 )
654 .with_context("expected", default_sort_order_id.to_string())
655 .with_context("found", metadata.default_sort_order().order_id.to_string())
656 .with_retryable(true));
657 }
658 }
659 TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
660 let snapshot_ref = metadata.snapshot_for_ref(r#ref);
661 if let Some(snapshot_id) = snapshot_id {
662 let snapshot_ref = snapshot_ref.ok_or(
663 Error::new(
664 ErrorKind::CatalogCommitConflicts,
665 format!("Requirement failed: Branch or tag `{}` not found", r#ref),
666 )
667 .with_retryable(true),
668 )?;
669 if snapshot_ref.snapshot_id() != *snapshot_id {
670 return Err(Error::new(
671 ErrorKind::CatalogCommitConflicts,
672 format!(
673 "Requirement failed: Branch or tag `{}`'s snapshot has changed",
674 r#ref
675 ),
676 )
677 .with_context("expected", snapshot_id.to_string())
678 .with_context("found", snapshot_ref.snapshot_id().to_string())
679 .with_retryable(true));
680 }
681 } else if snapshot_ref.is_some() {
682 return Err(Error::new(
684 ErrorKind::CatalogCommitConflicts,
685 format!(
686 "Requirement failed: Branch or tag `{}` already exists",
687 r#ref
688 ),
689 )
690 .with_retryable(true));
691 }
692 }
693 TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
694 if metadata.default_partition_spec_id() != *default_spec_id {
696 return Err(Error::new(
697 ErrorKind::CatalogCommitConflicts,
698 "Requirement failed: Default partition spec id does not match",
699 )
700 .with_context("expected", default_spec_id.to_string())
701 .with_context("found", metadata.default_partition_spec_id().to_string())
702 .with_retryable(true));
703 }
704 }
705 TableRequirement::LastAssignedPartitionIdMatch {
706 last_assigned_partition_id,
707 } => {
708 if metadata.last_partition_id != *last_assigned_partition_id {
709 return Err(Error::new(
710 ErrorKind::CatalogCommitConflicts,
711 "Requirement failed: Last assigned partition id does not match",
712 )
713 .with_context("expected", last_assigned_partition_id.to_string())
714 .with_context("found", metadata.last_partition_id.to_string())
715 .with_retryable(true));
716 }
717 }
718 TableRequirement::LastAssignedFieldIdMatch {
719 last_assigned_field_id,
720 } => {
721 if &metadata.last_column_id != last_assigned_field_id {
722 return Err(Error::new(
723 ErrorKind::CatalogCommitConflicts,
724 "Requirement failed: Last assigned field id does not match",
725 )
726 .with_context("expected", last_assigned_field_id.to_string())
727 .with_context("found", metadata.last_column_id.to_string())
728 .with_retryable(true));
729 }
730 }
731 };
732 } else {
733 match self {
734 TableRequirement::NotExist => {}
735 _ => {
736 return Err(Error::new(
737 ErrorKind::TableNotFound,
738 "Requirement failed: Table does not exist",
739 ));
740 }
741 }
742 }
743
744 Ok(())
745 }
746}
747
748pub(super) mod _serde {
749 use serde::{Deserialize as _, Deserializer};
750
751 use super::*;
752 use crate::spec::{SchemaId, Summary};
753
754 pub(super) fn deserialize_snapshot<'de, D>(
755 deserializer: D,
756 ) -> std::result::Result<Snapshot, D::Error>
757 where D: Deserializer<'de> {
758 let buf = CatalogSnapshot::deserialize(deserializer)?;
759 Ok(buf.into())
760 }
761
762 #[derive(Debug, Deserialize, PartialEq, Eq)]
763 #[serde(rename_all = "kebab-case")]
764 struct CatalogSnapshot {
768 snapshot_id: i64,
769 #[serde(skip_serializing_if = "Option::is_none")]
770 parent_snapshot_id: Option<i64>,
771 #[serde(default)]
772 sequence_number: i64,
773 timestamp_ms: i64,
774 manifest_list: String,
775 summary: Summary,
776 #[serde(skip_serializing_if = "Option::is_none")]
777 schema_id: Option<SchemaId>,
778 }
779
780 impl From<CatalogSnapshot> for Snapshot {
781 fn from(snapshot: CatalogSnapshot) -> Self {
782 let CatalogSnapshot {
783 snapshot_id,
784 parent_snapshot_id,
785 sequence_number,
786 timestamp_ms,
787 manifest_list,
788 schema_id,
789 summary,
790 } = snapshot;
791 let builder = Snapshot::builder()
792 .with_snapshot_id(snapshot_id)
793 .with_parent_snapshot_id(parent_snapshot_id)
794 .with_sequence_number(sequence_number)
795 .with_timestamp_ms(timestamp_ms)
796 .with_manifest_list(manifest_list)
797 .with_summary(summary);
798 if let Some(schema_id) = schema_id {
799 builder.with_schema_id(schema_id).build()
800 } else {
801 builder.build()
802 }
803 }
804 }
805}
806
807#[derive(Debug, TypedBuilder)]
809pub struct ViewCreation {
810 pub name: String,
812 pub location: String,
814 pub representations: ViewRepresentations,
816 pub schema: Schema,
818 #[builder(default)]
820 pub properties: HashMap<String, String>,
821 pub default_namespace: NamespaceIdent,
823 #[builder(default)]
825 pub default_catalog: Option<String>,
826 #[builder(default)]
829 pub summary: HashMap<String, String>,
830}
831
832#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
834#[serde(tag = "action", rename_all = "kebab-case")]
835#[allow(clippy::large_enum_variant)]
836pub enum ViewUpdate {
837 #[serde(rename_all = "kebab-case")]
839 AssignUuid {
840 uuid: Uuid,
842 },
843 #[serde(rename_all = "kebab-case")]
845 UpgradeFormatVersion {
846 format_version: ViewFormatVersion,
848 },
849 #[serde(rename_all = "kebab-case")]
851 AddSchema {
852 schema: Schema,
854 last_column_id: Option<i32>,
856 },
857 #[serde(rename_all = "kebab-case")]
859 SetLocation {
860 location: String,
862 },
863 #[serde(rename_all = "kebab-case")]
867 SetProperties {
868 updates: HashMap<String, String>,
870 },
871 #[serde(rename_all = "kebab-case")]
873 RemoveProperties {
874 removals: Vec<String>,
876 },
877 #[serde(rename_all = "kebab-case")]
879 AddViewVersion {
880 view_version: ViewVersion,
882 },
883 #[serde(rename_all = "kebab-case")]
885 SetCurrentViewVersion {
886 view_version_id: i32,
888 },
889}
890
891mod _serde_set_statistics {
892 use serde::{Deserialize, Deserializer, Serialize, Serializer};
895
896 use super::*;
897
898 #[derive(Debug, Serialize, Deserialize)]
899 #[serde(rename_all = "kebab-case")]
900 struct SetStatistics {
901 snapshot_id: Option<i64>,
902 statistics: StatisticsFile,
903 }
904
905 pub fn serialize<S>(
906 value: &StatisticsFile,
907 serializer: S,
908 ) -> std::result::Result<S::Ok, S::Error>
909 where
910 S: Serializer,
911 {
912 SetStatistics {
913 snapshot_id: Some(value.snapshot_id),
914 statistics: value.clone(),
915 }
916 .serialize(serializer)
917 }
918
919 pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<StatisticsFile, D::Error>
920 where D: Deserializer<'de> {
921 let SetStatistics {
922 snapshot_id,
923 statistics,
924 } = SetStatistics::deserialize(deserializer)?;
925 if let Some(snapshot_id) = snapshot_id {
926 if snapshot_id != statistics.snapshot_id {
927 return Err(serde::de::Error::custom(format!(
928 "Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
929 statistics.snapshot_id
930 )));
931 }
932 }
933
934 Ok(statistics)
935 }
936}
937
938#[cfg(test)]
939mod tests {
940 use std::collections::HashMap;
941 use std::fmt::Debug;
942 use std::fs::File;
943 use std::io::BufReader;
944
945 use serde::Serialize;
946 use serde::de::DeserializeOwned;
947 use uuid::uuid;
948
949 use super::ViewUpdate;
950 use crate::io::FileIOBuilder;
951 use crate::spec::{
952 BlobMetadata, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
953 PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
954 SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
955 StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
956 UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
957 ViewVersion,
958 };
959 use crate::table::Table;
960 use crate::{
961 NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
962 };
963
964 #[test]
965 fn test_parent_namespace() {
966 let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
967 let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
968 let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
969
970 assert_eq!(ns1.parent(), None);
971 assert_eq!(ns2.parent(), Some(ns1.clone()));
972 assert_eq!(ns3.parent(), Some(ns2.clone()));
973 }
974
975 #[test]
976 fn test_create_table_id() {
977 let table_id = TableIdent {
978 namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
979 name: "t1".to_string(),
980 };
981
982 assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
983 }
984
985 #[test]
986 fn test_table_creation_iterator_properties() {
987 let builder = TableCreation::builder()
988 .name("table".to_string())
989 .schema(Schema::builder().build().unwrap());
990
991 fn s(k: &str, v: &str) -> (String, String) {
992 (k.to_string(), v.to_string())
993 }
994
995 let table_creation = builder
996 .properties([s("key", "value"), s("foo", "bar")])
997 .build();
998
999 assert_eq!(
1000 HashMap::from([s("key", "value"), s("foo", "bar")]),
1001 table_creation.properties
1002 );
1003 }
1004
1005 fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
1006 json: impl ToString,
1007 expected: T,
1008 ) {
1009 let json_str = json.to_string();
1010 let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
1011 assert_eq!(actual, expected, "Parsed value is not equal to expected");
1012
1013 let restored: T = serde_json::from_str(
1014 &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1015 )
1016 .expect("Failed to parse from serialized json");
1017
1018 assert_eq!(
1019 restored, expected,
1020 "Parsed restored value is not equal to expected"
1021 );
1022 }
1023
1024 fn metadata() -> TableMetadata {
1025 let tbl_creation = TableCreation::builder()
1026 .name("table".to_string())
1027 .location("/path/to/table".to_string())
1028 .schema(Schema::builder().build().unwrap())
1029 .build();
1030
1031 TableMetadataBuilder::from_table_creation(tbl_creation)
1032 .unwrap()
1033 .assign_uuid(uuid::Uuid::nil())
1034 .build()
1035 .unwrap()
1036 .metadata
1037 }
1038
1039 #[test]
1040 fn test_check_requirement_not_exist() {
1041 let metadata = metadata();
1042 let requirement = TableRequirement::NotExist;
1043
1044 assert!(requirement.check(Some(&metadata)).is_err());
1045 assert!(requirement.check(None).is_ok());
1046 }
1047
1048 #[test]
1049 fn test_check_table_uuid() {
1050 let metadata = metadata();
1051
1052 let requirement = TableRequirement::UuidMatch {
1053 uuid: uuid::Uuid::now_v7(),
1054 };
1055 assert!(requirement.check(Some(&metadata)).is_err());
1056
1057 let requirement = TableRequirement::UuidMatch {
1058 uuid: uuid::Uuid::nil(),
1059 };
1060 assert!(requirement.check(Some(&metadata)).is_ok());
1061 }
1062
1063 #[test]
1064 fn test_check_ref_snapshot_id() {
1065 let metadata = metadata();
1066
1067 let requirement = TableRequirement::RefSnapshotIdMatch {
1069 r#ref: "my_branch".to_string(),
1070 snapshot_id: Some(1),
1071 };
1072 assert!(requirement.check(Some(&metadata)).is_err());
1073
1074 let requirement = TableRequirement::RefSnapshotIdMatch {
1076 r#ref: "my_branch".to_string(),
1077 snapshot_id: None,
1078 };
1079 assert!(requirement.check(Some(&metadata)).is_ok());
1080
1081 let record = r#"
1083 {
1084 "snapshot-id": 3051729675574597004,
1085 "sequence-number": 10,
1086 "timestamp-ms": 9992191116217,
1087 "summary": {
1088 "operation": "append"
1089 },
1090 "manifest-list": "s3://b/wh/.../s1.avro",
1091 "schema-id": 0
1092 }
1093 "#;
1094
1095 let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
1096 let builder = metadata.into_builder(None);
1097 let builder = TableUpdate::AddSnapshot {
1098 snapshot: snapshot.clone(),
1099 }
1100 .apply(builder)
1101 .unwrap();
1102 let metadata = TableUpdate::SetSnapshotRef {
1103 ref_name: MAIN_BRANCH.to_string(),
1104 reference: SnapshotReference {
1105 snapshot_id: snapshot.snapshot_id(),
1106 retention: SnapshotRetention::Branch {
1107 min_snapshots_to_keep: Some(10),
1108 max_snapshot_age_ms: None,
1109 max_ref_age_ms: None,
1110 },
1111 },
1112 }
1113 .apply(builder)
1114 .unwrap()
1115 .build()
1116 .unwrap()
1117 .metadata;
1118
1119 let requirement = TableRequirement::RefSnapshotIdMatch {
1121 r#ref: "main".to_string(),
1122 snapshot_id: Some(3051729675574597004),
1123 };
1124 assert!(requirement.check(Some(&metadata)).is_ok());
1125
1126 let requirement = TableRequirement::RefSnapshotIdMatch {
1128 r#ref: "main".to_string(),
1129 snapshot_id: Some(1),
1130 };
1131 assert!(requirement.check(Some(&metadata)).is_err());
1132 }
1133
1134 #[test]
1135 fn test_check_last_assigned_field_id() {
1136 let metadata = metadata();
1137
1138 let requirement = TableRequirement::LastAssignedFieldIdMatch {
1139 last_assigned_field_id: 1,
1140 };
1141 assert!(requirement.check(Some(&metadata)).is_err());
1142
1143 let requirement = TableRequirement::LastAssignedFieldIdMatch {
1144 last_assigned_field_id: 0,
1145 };
1146 assert!(requirement.check(Some(&metadata)).is_ok());
1147 }
1148
1149 #[test]
1150 fn test_check_current_schema_id() {
1151 let metadata = metadata();
1152
1153 let requirement = TableRequirement::CurrentSchemaIdMatch {
1154 current_schema_id: 1,
1155 };
1156 assert!(requirement.check(Some(&metadata)).is_err());
1157
1158 let requirement = TableRequirement::CurrentSchemaIdMatch {
1159 current_schema_id: 0,
1160 };
1161 assert!(requirement.check(Some(&metadata)).is_ok());
1162 }
1163
1164 #[test]
1165 fn test_check_last_assigned_partition_id() {
1166 let metadata = metadata();
1167 let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1168 last_assigned_partition_id: 0,
1169 };
1170 assert!(requirement.check(Some(&metadata)).is_err());
1171
1172 let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1173 last_assigned_partition_id: 999,
1174 };
1175 assert!(requirement.check(Some(&metadata)).is_ok());
1176 }
1177
1178 #[test]
1179 fn test_check_default_spec_id() {
1180 let metadata = metadata();
1181
1182 let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
1183 assert!(requirement.check(Some(&metadata)).is_err());
1184
1185 let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
1186 assert!(requirement.check(Some(&metadata)).is_ok());
1187 }
1188
1189 #[test]
1190 fn test_check_default_sort_order_id() {
1191 let metadata = metadata();
1192
1193 let requirement = TableRequirement::DefaultSortOrderIdMatch {
1194 default_sort_order_id: 1,
1195 };
1196 assert!(requirement.check(Some(&metadata)).is_err());
1197
1198 let requirement = TableRequirement::DefaultSortOrderIdMatch {
1199 default_sort_order_id: 0,
1200 };
1201 assert!(requirement.check(Some(&metadata)).is_ok());
1202 }
1203
1204 #[test]
1205 fn test_table_uuid() {
1206 test_serde_json(
1207 r#"
1208{
1209 "type": "assert-table-uuid",
1210 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1211}
1212 "#,
1213 TableRequirement::UuidMatch {
1214 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1215 },
1216 );
1217 }
1218
1219 #[test]
1220 fn test_assert_table_not_exists() {
1221 test_serde_json(
1222 r#"
1223{
1224 "type": "assert-create"
1225}
1226 "#,
1227 TableRequirement::NotExist,
1228 );
1229 }
1230
1231 #[test]
1232 fn test_assert_ref_snapshot_id() {
1233 test_serde_json(
1234 r#"
1235{
1236 "type": "assert-ref-snapshot-id",
1237 "ref": "snapshot-name",
1238 "snapshot-id": null
1239}
1240 "#,
1241 TableRequirement::RefSnapshotIdMatch {
1242 r#ref: "snapshot-name".to_string(),
1243 snapshot_id: None,
1244 },
1245 );
1246
1247 test_serde_json(
1248 r#"
1249{
1250 "type": "assert-ref-snapshot-id",
1251 "ref": "snapshot-name",
1252 "snapshot-id": 1
1253}
1254 "#,
1255 TableRequirement::RefSnapshotIdMatch {
1256 r#ref: "snapshot-name".to_string(),
1257 snapshot_id: Some(1),
1258 },
1259 );
1260 }
1261
1262 #[test]
1263 fn test_assert_last_assigned_field_id() {
1264 test_serde_json(
1265 r#"
1266{
1267 "type": "assert-last-assigned-field-id",
1268 "last-assigned-field-id": 12
1269}
1270 "#,
1271 TableRequirement::LastAssignedFieldIdMatch {
1272 last_assigned_field_id: 12,
1273 },
1274 );
1275 }
1276
1277 #[test]
1278 fn test_assert_current_schema_id() {
1279 test_serde_json(
1280 r#"
1281{
1282 "type": "assert-current-schema-id",
1283 "current-schema-id": 4
1284}
1285 "#,
1286 TableRequirement::CurrentSchemaIdMatch {
1287 current_schema_id: 4,
1288 },
1289 );
1290 }
1291
1292 #[test]
1293 fn test_assert_last_assigned_partition_id() {
1294 test_serde_json(
1295 r#"
1296{
1297 "type": "assert-last-assigned-partition-id",
1298 "last-assigned-partition-id": 1004
1299}
1300 "#,
1301 TableRequirement::LastAssignedPartitionIdMatch {
1302 last_assigned_partition_id: 1004,
1303 },
1304 );
1305 }
1306
1307 #[test]
1308 fn test_assert_default_spec_id() {
1309 test_serde_json(
1310 r#"
1311{
1312 "type": "assert-default-spec-id",
1313 "default-spec-id": 5
1314}
1315 "#,
1316 TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
1317 );
1318 }
1319
1320 #[test]
1321 fn test_assert_default_sort_order() {
1322 let json = r#"
1323{
1324 "type": "assert-default-sort-order-id",
1325 "default-sort-order-id": 10
1326}
1327 "#;
1328
1329 let update = TableRequirement::DefaultSortOrderIdMatch {
1330 default_sort_order_id: 10,
1331 };
1332
1333 test_serde_json(json, update);
1334 }
1335
1336 #[test]
1337 fn test_parse_assert_invalid() {
1338 assert!(
1339 serde_json::from_str::<TableRequirement>(
1340 r#"
1341{
1342 "default-sort-order-id": 10
1343}
1344"#
1345 )
1346 .is_err(),
1347 "Table requirements should not be parsed without type."
1348 );
1349 }
1350
1351 #[test]
1352 fn test_assign_uuid() {
1353 test_serde_json(
1354 r#"
1355{
1356 "action": "assign-uuid",
1357 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1358}
1359 "#,
1360 TableUpdate::AssignUuid {
1361 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1362 },
1363 );
1364 }
1365
1366 #[test]
1367 fn test_upgrade_format_version() {
1368 test_serde_json(
1369 r#"
1370{
1371 "action": "upgrade-format-version",
1372 "format-version": 2
1373}
1374 "#,
1375 TableUpdate::UpgradeFormatVersion {
1376 format_version: FormatVersion::V2,
1377 },
1378 );
1379 }
1380
1381 #[test]
1382 fn test_add_schema() {
1383 let test_schema = Schema::builder()
1384 .with_schema_id(1)
1385 .with_identifier_field_ids(vec![2])
1386 .with_fields(vec![
1387 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1388 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1389 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1390 ])
1391 .build()
1392 .unwrap();
1393 test_serde_json(
1394 r#"
1395{
1396 "action": "add-schema",
1397 "schema": {
1398 "type": "struct",
1399 "schema-id": 1,
1400 "fields": [
1401 {
1402 "id": 1,
1403 "name": "foo",
1404 "required": false,
1405 "type": "string"
1406 },
1407 {
1408 "id": 2,
1409 "name": "bar",
1410 "required": true,
1411 "type": "int"
1412 },
1413 {
1414 "id": 3,
1415 "name": "baz",
1416 "required": false,
1417 "type": "boolean"
1418 }
1419 ],
1420 "identifier-field-ids": [
1421 2
1422 ]
1423 },
1424 "last-column-id": 3
1425}
1426 "#,
1427 TableUpdate::AddSchema {
1428 schema: test_schema.clone(),
1429 },
1430 );
1431
1432 test_serde_json(
1433 r#"
1434{
1435 "action": "add-schema",
1436 "schema": {
1437 "type": "struct",
1438 "schema-id": 1,
1439 "fields": [
1440 {
1441 "id": 1,
1442 "name": "foo",
1443 "required": false,
1444 "type": "string"
1445 },
1446 {
1447 "id": 2,
1448 "name": "bar",
1449 "required": true,
1450 "type": "int"
1451 },
1452 {
1453 "id": 3,
1454 "name": "baz",
1455 "required": false,
1456 "type": "boolean"
1457 }
1458 ],
1459 "identifier-field-ids": [
1460 2
1461 ]
1462 }
1463}
1464 "#,
1465 TableUpdate::AddSchema {
1466 schema: test_schema.clone(),
1467 },
1468 );
1469 }
1470
1471 #[test]
1472 fn test_set_current_schema() {
1473 test_serde_json(
1474 r#"
1475{
1476 "action": "set-current-schema",
1477 "schema-id": 23
1478}
1479 "#,
1480 TableUpdate::SetCurrentSchema { schema_id: 23 },
1481 );
1482 }
1483
1484 #[test]
1485 fn test_add_spec() {
1486 test_serde_json(
1487 r#"
1488{
1489 "action": "add-spec",
1490 "spec": {
1491 "fields": [
1492 {
1493 "source-id": 4,
1494 "name": "ts_day",
1495 "transform": "day"
1496 },
1497 {
1498 "source-id": 1,
1499 "name": "id_bucket",
1500 "transform": "bucket[16]"
1501 },
1502 {
1503 "source-id": 2,
1504 "name": "id_truncate",
1505 "transform": "truncate[4]"
1506 }
1507 ]
1508 }
1509}
1510 "#,
1511 TableUpdate::AddSpec {
1512 spec: UnboundPartitionSpec::builder()
1513 .add_partition_field(4, "ts_day".to_string(), Transform::Day)
1514 .unwrap()
1515 .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
1516 .unwrap()
1517 .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
1518 .unwrap()
1519 .build(),
1520 },
1521 );
1522 }
1523
1524 #[test]
1525 fn test_set_default_spec() {
1526 test_serde_json(
1527 r#"
1528{
1529 "action": "set-default-spec",
1530 "spec-id": 1
1531}
1532 "#,
1533 TableUpdate::SetDefaultSpec { spec_id: 1 },
1534 )
1535 }
1536
1537 #[test]
1538 fn test_add_sort_order() {
1539 let json = r#"
1540{
1541 "action": "add-sort-order",
1542 "sort-order": {
1543 "order-id": 1,
1544 "fields": [
1545 {
1546 "transform": "identity",
1547 "source-id": 2,
1548 "direction": "asc",
1549 "null-order": "nulls-first"
1550 },
1551 {
1552 "transform": "bucket[4]",
1553 "source-id": 3,
1554 "direction": "desc",
1555 "null-order": "nulls-last"
1556 }
1557 ]
1558 }
1559}
1560 "#;
1561
1562 let update = TableUpdate::AddSortOrder {
1563 sort_order: SortOrder::builder()
1564 .with_order_id(1)
1565 .with_sort_field(
1566 SortField::builder()
1567 .source_id(2)
1568 .direction(SortDirection::Ascending)
1569 .null_order(NullOrder::First)
1570 .transform(Transform::Identity)
1571 .build(),
1572 )
1573 .with_sort_field(
1574 SortField::builder()
1575 .source_id(3)
1576 .direction(SortDirection::Descending)
1577 .null_order(NullOrder::Last)
1578 .transform(Transform::Bucket(4))
1579 .build(),
1580 )
1581 .build_unbound()
1582 .unwrap(),
1583 };
1584
1585 test_serde_json(json, update);
1586 }
1587
1588 #[test]
1589 fn test_set_default_order() {
1590 let json = r#"
1591{
1592 "action": "set-default-sort-order",
1593 "sort-order-id": 2
1594}
1595 "#;
1596 let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
1597
1598 test_serde_json(json, update);
1599 }
1600
1601 #[test]
1602 fn test_add_snapshot() {
1603 let json = r#"
1604{
1605 "action": "add-snapshot",
1606 "snapshot": {
1607 "snapshot-id": 3055729675574597000,
1608 "parent-snapshot-id": 3051729675574597000,
1609 "timestamp-ms": 1555100955770,
1610 "sequence-number": 1,
1611 "summary": {
1612 "operation": "append"
1613 },
1614 "manifest-list": "s3://a/b/2.avro",
1615 "schema-id": 1
1616 }
1617}
1618 "#;
1619
1620 let update = TableUpdate::AddSnapshot {
1621 snapshot: Snapshot::builder()
1622 .with_snapshot_id(3055729675574597000)
1623 .with_parent_snapshot_id(Some(3051729675574597000))
1624 .with_timestamp_ms(1555100955770)
1625 .with_sequence_number(1)
1626 .with_manifest_list("s3://a/b/2.avro")
1627 .with_schema_id(1)
1628 .with_summary(Summary {
1629 operation: Operation::Append,
1630 additional_properties: HashMap::default(),
1631 })
1632 .build(),
1633 };
1634
1635 test_serde_json(json, update);
1636 }
1637
1638 #[test]
1639 fn test_add_snapshot_v1() {
1640 let json = r#"
1641{
1642 "action": "add-snapshot",
1643 "snapshot": {
1644 "snapshot-id": 3055729675574597000,
1645 "parent-snapshot-id": 3051729675574597000,
1646 "timestamp-ms": 1555100955770,
1647 "summary": {
1648 "operation": "append"
1649 },
1650 "manifest-list": "s3://a/b/2.avro"
1651 }
1652}
1653 "#;
1654
1655 let update = TableUpdate::AddSnapshot {
1656 snapshot: Snapshot::builder()
1657 .with_snapshot_id(3055729675574597000)
1658 .with_parent_snapshot_id(Some(3051729675574597000))
1659 .with_timestamp_ms(1555100955770)
1660 .with_sequence_number(0)
1661 .with_manifest_list("s3://a/b/2.avro")
1662 .with_summary(Summary {
1663 operation: Operation::Append,
1664 additional_properties: HashMap::default(),
1665 })
1666 .build(),
1667 };
1668
1669 let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
1670 assert_eq!(actual, update, "Parsed value is not equal to expected");
1671 }
1672
1673 #[test]
1674 fn test_remove_snapshots() {
1675 let json = r#"
1676{
1677 "action": "remove-snapshots",
1678 "snapshot-ids": [
1679 1,
1680 2
1681 ]
1682}
1683 "#;
1684
1685 let update = TableUpdate::RemoveSnapshots {
1686 snapshot_ids: vec![1, 2],
1687 };
1688 test_serde_json(json, update);
1689 }
1690
1691 #[test]
1692 fn test_remove_snapshot_ref() {
1693 let json = r#"
1694{
1695 "action": "remove-snapshot-ref",
1696 "ref-name": "snapshot-ref"
1697}
1698 "#;
1699
1700 let update = TableUpdate::RemoveSnapshotRef {
1701 ref_name: "snapshot-ref".to_string(),
1702 };
1703 test_serde_json(json, update);
1704 }
1705
1706 #[test]
1707 fn test_set_snapshot_ref_tag() {
1708 let json = r#"
1709{
1710 "action": "set-snapshot-ref",
1711 "type": "tag",
1712 "ref-name": "hank",
1713 "snapshot-id": 1,
1714 "max-ref-age-ms": 1
1715}
1716 "#;
1717
1718 let update = TableUpdate::SetSnapshotRef {
1719 ref_name: "hank".to_string(),
1720 reference: SnapshotReference {
1721 snapshot_id: 1,
1722 retention: SnapshotRetention::Tag {
1723 max_ref_age_ms: Some(1),
1724 },
1725 },
1726 };
1727
1728 test_serde_json(json, update);
1729 }
1730
1731 #[test]
1732 fn test_set_snapshot_ref_branch() {
1733 let json = r#"
1734{
1735 "action": "set-snapshot-ref",
1736 "type": "branch",
1737 "ref-name": "hank",
1738 "snapshot-id": 1,
1739 "min-snapshots-to-keep": 2,
1740 "max-snapshot-age-ms": 3,
1741 "max-ref-age-ms": 4
1742}
1743 "#;
1744
1745 let update = TableUpdate::SetSnapshotRef {
1746 ref_name: "hank".to_string(),
1747 reference: SnapshotReference {
1748 snapshot_id: 1,
1749 retention: SnapshotRetention::Branch {
1750 min_snapshots_to_keep: Some(2),
1751 max_snapshot_age_ms: Some(3),
1752 max_ref_age_ms: Some(4),
1753 },
1754 },
1755 };
1756
1757 test_serde_json(json, update);
1758 }
1759
1760 #[test]
1761 fn test_set_properties() {
1762 let json = r#"
1763{
1764 "action": "set-properties",
1765 "updates": {
1766 "prop1": "v1",
1767 "prop2": "v2"
1768 }
1769}
1770 "#;
1771
1772 let update = TableUpdate::SetProperties {
1773 updates: vec![
1774 ("prop1".to_string(), "v1".to_string()),
1775 ("prop2".to_string(), "v2".to_string()),
1776 ]
1777 .into_iter()
1778 .collect(),
1779 };
1780
1781 test_serde_json(json, update);
1782 }
1783
1784 #[test]
1785 fn test_remove_properties() {
1786 let json = r#"
1787{
1788 "action": "remove-properties",
1789 "removals": [
1790 "prop1",
1791 "prop2"
1792 ]
1793}
1794 "#;
1795
1796 let update = TableUpdate::RemoveProperties {
1797 removals: vec!["prop1".to_string(), "prop2".to_string()],
1798 };
1799
1800 test_serde_json(json, update);
1801 }
1802
1803 #[test]
1804 fn test_set_location() {
1805 let json = r#"
1806{
1807 "action": "set-location",
1808 "location": "s3://bucket/warehouse/tbl_location"
1809}
1810 "#;
1811
1812 let update = TableUpdate::SetLocation {
1813 location: "s3://bucket/warehouse/tbl_location".to_string(),
1814 };
1815
1816 test_serde_json(json, update);
1817 }
1818
1819 #[test]
1820 fn test_table_update_apply() {
1821 let table_creation = TableCreation::builder()
1822 .location("s3://db/table".to_string())
1823 .name("table".to_string())
1824 .properties(HashMap::new())
1825 .schema(Schema::builder().build().unwrap())
1826 .build();
1827 let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
1828 .unwrap()
1829 .build()
1830 .unwrap()
1831 .metadata;
1832 let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
1833 table_metadata,
1834 Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
1835 );
1836
1837 let uuid = uuid::Uuid::new_v4();
1838 let update = TableUpdate::AssignUuid { uuid };
1839 let updated_metadata = update
1840 .apply(table_metadata_builder)
1841 .unwrap()
1842 .build()
1843 .unwrap()
1844 .metadata;
1845 assert_eq!(updated_metadata.uuid(), uuid);
1846 }
1847
1848 #[test]
1849 fn test_view_assign_uuid() {
1850 test_serde_json(
1851 r#"
1852{
1853 "action": "assign-uuid",
1854 "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1855}
1856 "#,
1857 ViewUpdate::AssignUuid {
1858 uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1859 },
1860 );
1861 }
1862
1863 #[test]
1864 fn test_view_upgrade_format_version() {
1865 test_serde_json(
1866 r#"
1867{
1868 "action": "upgrade-format-version",
1869 "format-version": 1
1870}
1871 "#,
1872 ViewUpdate::UpgradeFormatVersion {
1873 format_version: ViewFormatVersion::V1,
1874 },
1875 );
1876 }
1877
1878 #[test]
1879 fn test_view_add_schema() {
1880 let test_schema = Schema::builder()
1881 .with_schema_id(1)
1882 .with_identifier_field_ids(vec![2])
1883 .with_fields(vec![
1884 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1885 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1886 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1887 ])
1888 .build()
1889 .unwrap();
1890 test_serde_json(
1891 r#"
1892{
1893 "action": "add-schema",
1894 "schema": {
1895 "type": "struct",
1896 "schema-id": 1,
1897 "fields": [
1898 {
1899 "id": 1,
1900 "name": "foo",
1901 "required": false,
1902 "type": "string"
1903 },
1904 {
1905 "id": 2,
1906 "name": "bar",
1907 "required": true,
1908 "type": "int"
1909 },
1910 {
1911 "id": 3,
1912 "name": "baz",
1913 "required": false,
1914 "type": "boolean"
1915 }
1916 ],
1917 "identifier-field-ids": [
1918 2
1919 ]
1920 },
1921 "last-column-id": 3
1922}
1923 "#,
1924 ViewUpdate::AddSchema {
1925 schema: test_schema.clone(),
1926 last_column_id: Some(3),
1927 },
1928 );
1929 }
1930
1931 #[test]
1932 fn test_view_set_location() {
1933 test_serde_json(
1934 r#"
1935{
1936 "action": "set-location",
1937 "location": "s3://db/view"
1938}
1939 "#,
1940 ViewUpdate::SetLocation {
1941 location: "s3://db/view".to_string(),
1942 },
1943 );
1944 }
1945
1946 #[test]
1947 fn test_view_set_properties() {
1948 test_serde_json(
1949 r#"
1950{
1951 "action": "set-properties",
1952 "updates": {
1953 "prop1": "v1",
1954 "prop2": "v2"
1955 }
1956}
1957 "#,
1958 ViewUpdate::SetProperties {
1959 updates: vec![
1960 ("prop1".to_string(), "v1".to_string()),
1961 ("prop2".to_string(), "v2".to_string()),
1962 ]
1963 .into_iter()
1964 .collect(),
1965 },
1966 );
1967 }
1968
1969 #[test]
1970 fn test_view_remove_properties() {
1971 test_serde_json(
1972 r#"
1973{
1974 "action": "remove-properties",
1975 "removals": [
1976 "prop1",
1977 "prop2"
1978 ]
1979}
1980 "#,
1981 ViewUpdate::RemoveProperties {
1982 removals: vec!["prop1".to_string(), "prop2".to_string()],
1983 },
1984 );
1985 }
1986
1987 #[test]
1988 fn test_view_add_view_version() {
1989 test_serde_json(
1990 r#"
1991{
1992 "action": "add-view-version",
1993 "view-version": {
1994 "version-id" : 1,
1995 "timestamp-ms" : 1573518431292,
1996 "schema-id" : 1,
1997 "default-catalog" : "prod",
1998 "default-namespace" : [ "default" ],
1999 "summary" : {
2000 "engine-name" : "Spark"
2001 },
2002 "representations" : [ {
2003 "type" : "sql",
2004 "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
2005 "dialect" : "spark"
2006 } ]
2007 }
2008}
2009 "#,
2010 ViewUpdate::AddViewVersion {
2011 view_version: ViewVersion::builder()
2012 .with_version_id(1)
2013 .with_timestamp_ms(1573518431292)
2014 .with_schema_id(1)
2015 .with_default_catalog(Some("prod".to_string()))
2016 .with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
2017 .with_summary(
2018 vec![("engine-name".to_string(), "Spark".to_string())]
2019 .into_iter()
2020 .collect(),
2021 )
2022 .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
2023 sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
2024 dialect: "spark".to_string(),
2025 })]))
2026 .build(),
2027 },
2028 );
2029 }
2030
2031 #[test]
2032 fn test_view_set_current_view_version() {
2033 test_serde_json(
2034 r#"
2035{
2036 "action": "set-current-view-version",
2037 "view-version-id": 1
2038}
2039 "#,
2040 ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
2041 );
2042 }
2043
2044 #[test]
2045 fn test_remove_partition_specs_update() {
2046 test_serde_json(
2047 r#"
2048{
2049 "action": "remove-partition-specs",
2050 "spec-ids": [1, 2]
2051}
2052 "#,
2053 TableUpdate::RemovePartitionSpecs {
2054 spec_ids: vec![1, 2],
2055 },
2056 );
2057 }
2058
2059 #[test]
2060 fn test_set_statistics_file() {
2061 test_serde_json(
2062 r#"
2063 {
2064 "action": "set-statistics",
2065 "snapshot-id": 1940541653261589030,
2066 "statistics": {
2067 "snapshot-id": 1940541653261589030,
2068 "statistics-path": "s3://bucket/warehouse/stats.puffin",
2069 "file-size-in-bytes": 124,
2070 "file-footer-size-in-bytes": 27,
2071 "blob-metadata": [
2072 {
2073 "type": "boring-type",
2074 "snapshot-id": 1940541653261589030,
2075 "sequence-number": 2,
2076 "fields": [
2077 1
2078 ],
2079 "properties": {
2080 "prop-key": "prop-value"
2081 }
2082 }
2083 ]
2084 }
2085 }
2086 "#,
2087 TableUpdate::SetStatistics {
2088 statistics: StatisticsFile {
2089 snapshot_id: 1940541653261589030,
2090 statistics_path: "s3://bucket/warehouse/stats.puffin".to_string(),
2091 file_size_in_bytes: 124,
2092 file_footer_size_in_bytes: 27,
2093 key_metadata: None,
2094 blob_metadata: vec![BlobMetadata {
2095 r#type: "boring-type".to_string(),
2096 snapshot_id: 1940541653261589030,
2097 sequence_number: 2,
2098 fields: vec![1],
2099 properties: vec![("prop-key".to_string(), "prop-value".to_string())]
2100 .into_iter()
2101 .collect(),
2102 }],
2103 },
2104 },
2105 );
2106 }
2107
2108 #[test]
2109 fn test_remove_statistics_file() {
2110 test_serde_json(
2111 r#"
2112 {
2113 "action": "remove-statistics",
2114 "snapshot-id": 1940541653261589030
2115 }
2116 "#,
2117 TableUpdate::RemoveStatistics {
2118 snapshot_id: 1940541653261589030,
2119 },
2120 );
2121 }
2122
2123 #[test]
2124 fn test_set_partition_statistics_file() {
2125 test_serde_json(
2126 r#"
2127 {
2128 "action": "set-partition-statistics",
2129 "partition-statistics": {
2130 "snapshot-id": 1940541653261589030,
2131 "statistics-path": "s3://bucket/warehouse/stats1.parquet",
2132 "file-size-in-bytes": 43
2133 }
2134 }
2135 "#,
2136 TableUpdate::SetPartitionStatistics {
2137 partition_statistics: PartitionStatisticsFile {
2138 snapshot_id: 1940541653261589030,
2139 statistics_path: "s3://bucket/warehouse/stats1.parquet".to_string(),
2140 file_size_in_bytes: 43,
2141 },
2142 },
2143 )
2144 }
2145
2146 #[test]
2147 fn test_remove_partition_statistics_file() {
2148 test_serde_json(
2149 r#"
2150 {
2151 "action": "remove-partition-statistics",
2152 "snapshot-id": 1940541653261589030
2153 }
2154 "#,
2155 TableUpdate::RemovePartitionStatistics {
2156 snapshot_id: 1940541653261589030,
2157 },
2158 )
2159 }
2160
2161 #[test]
2162 fn test_remove_schema_update() {
2163 test_serde_json(
2164 r#"
2165 {
2166 "action": "remove-schemas",
2167 "schema-ids": [1, 2]
2168 }
2169 "#,
2170 TableUpdate::RemoveSchemas {
2171 schema_ids: vec![1, 2],
2172 },
2173 );
2174 }
2175
2176 #[test]
2177 fn test_table_commit() {
2178 let table = {
2179 let file = File::open(format!(
2180 "{}/testdata/table_metadata/{}",
2181 env!("CARGO_MANIFEST_DIR"),
2182 "TableMetadataV2Valid.json"
2183 ))
2184 .unwrap();
2185 let reader = BufReader::new(file);
2186 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2187
2188 Table::builder()
2189 .metadata(resp)
2190 .metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string())
2191 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2192 .file_io(FileIOBuilder::new("memory").build().unwrap())
2193 .build()
2194 .unwrap()
2195 };
2196
2197 let updates = vec![
2198 TableUpdate::SetLocation {
2199 location: "s3://bucket/test/new_location/data".to_string(),
2200 },
2201 TableUpdate::SetProperties {
2202 updates: vec![
2203 ("prop1".to_string(), "v1".to_string()),
2204 ("prop2".to_string(), "v2".to_string()),
2205 ]
2206 .into_iter()
2207 .collect(),
2208 },
2209 ];
2210
2211 let requirements = vec![TableRequirement::UuidMatch {
2212 uuid: table.metadata().table_uuid,
2213 }];
2214
2215 let table_commit = TableCommit::builder()
2216 .ident(table.identifier().to_owned())
2217 .updates(updates)
2218 .requirements(requirements)
2219 .build();
2220
2221 let updated_table = table_commit.apply(table).unwrap();
2222
2223 assert_eq!(
2224 updated_table.metadata().properties.get("prop1").unwrap(),
2225 "v1"
2226 );
2227 assert_eq!(
2228 updated_table.metadata().properties.get("prop2").unwrap(),
2229 "v2"
2230 );
2231
2232 assert!(
2234 updated_table
2235 .metadata_location()
2236 .unwrap()
2237 .starts_with("s3://bucket/test/location/metadata/00001-")
2238 );
2239
2240 assert_eq!(
2241 updated_table.metadata().location,
2242 "s3://bucket/test/new_location/data",
2243 );
2244 }
2245}