1use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use uuid::Uuid;
22
23use super::{
24 DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, FormatVersion, MAIN_BRANCH, MetadataLog,
25 ONE_MINUTE_MS, PartitionSpec, PartitionSpecBuilder, PartitionStatisticsFile, Schema, SchemaRef,
26 Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef,
27 StatisticsFile, StructType, TableMetadata, TableProperties, UNPARTITIONED_LAST_ASSIGNED_ID,
28 UnboundPartitionSpec,
29};
30use crate::error::{Error, ErrorKind, Result};
31use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE};
32use crate::{TableCreation, TableUpdate};
33
34const FIRST_FIELD_ID: u32 = 1;
35
36#[derive(Debug, Clone)]
49pub struct TableMetadataBuilder {
50 metadata: TableMetadata,
51 changes: Vec<TableUpdate>,
52 last_added_schema_id: Option<i32>,
53 last_added_spec_id: Option<i32>,
54 last_added_order_id: Option<i64>,
55 previous_history_entry: Option<MetadataLog>,
57 last_updated_ms: Option<i64>,
58}
59
60#[derive(Debug, Clone, PartialEq)]
61pub struct TableMetadataBuildResult {
63 pub metadata: TableMetadata,
65 pub changes: Vec<TableUpdate>,
67 pub expired_metadata_logs: Vec<MetadataLog>,
69}
70
71impl TableMetadataBuilder {
72 pub const LAST_ADDED: i32 = -1;
74
75 pub fn new(
80 schema: Schema,
81 spec: impl Into<UnboundPartitionSpec>,
82 sort_order: SortOrder,
83 location: String,
84 format_version: FormatVersion,
85 properties: HashMap<String, String>,
86 ) -> Result<Self> {
87 let (fresh_schema, fresh_spec, fresh_sort_order) =
89 Self::reassign_ids(schema, spec.into(), sort_order)?;
90 let schema_id = fresh_schema.schema_id();
91
92 let builder = Self {
93 metadata: TableMetadata {
94 format_version,
95 table_uuid: Uuid::now_v7(),
96 location: "".to_string(), last_sequence_number: 0,
98 last_updated_ms: 0, last_column_id: -1, current_schema_id: -1, schemas: HashMap::new(),
102 partition_specs: HashMap::new(),
103 default_spec: Arc::new(
104 PartitionSpec::unpartition_spec().with_spec_id(-1),
110 ), default_partition_type: StructType::new(vec![]),
112 last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
113 properties: HashMap::new(),
114 current_snapshot_id: None,
115 snapshots: HashMap::new(),
116 snapshot_log: vec![],
117 sort_orders: HashMap::new(),
118 metadata_log: vec![],
119 default_sort_order_id: -1, refs: HashMap::default(),
121 statistics: HashMap::new(),
122 partition_statistics: HashMap::new(),
123 encryption_keys: HashMap::new(),
124 next_row_id: INITIAL_ROW_ID,
125 },
126 last_updated_ms: None,
127 changes: vec![],
128 last_added_schema_id: Some(schema_id),
129 last_added_spec_id: None,
130 last_added_order_id: None,
131 previous_history_entry: None,
132 };
133
134 builder
135 .set_location(location)
136 .add_current_schema(fresh_schema)?
137 .add_default_partition_spec(fresh_spec.into_unbound())?
138 .add_default_sort_order(fresh_sort_order)?
139 .set_properties(properties)
140 }
141
142 #[must_use]
148 pub fn new_from_metadata(
149 previous: TableMetadata,
150 current_file_location: Option<String>,
151 ) -> Self {
152 Self {
153 previous_history_entry: current_file_location.map(|l| MetadataLog {
154 metadata_file: l,
155 timestamp_ms: previous.last_updated_ms,
156 }),
157 metadata: previous,
158 changes: Vec::default(),
159 last_added_schema_id: None,
160 last_added_spec_id: None,
161 last_added_order_id: None,
162 last_updated_ms: None,
163 }
164 }
165
166 pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
168 let TableCreation {
169 name: _,
170 location,
171 schema,
172 partition_spec,
173 sort_order,
174 properties,
175 format_version,
176 } = table_creation;
177
178 let location = location.ok_or_else(|| {
179 Error::new(
180 ErrorKind::DataInvalid,
181 "Can't create table without location",
182 )
183 })?;
184 let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
185 spec_id: None,
186 fields: vec![],
187 });
188
189 Self::new(
190 schema,
191 partition_spec,
192 sort_order.unwrap_or(SortOrder::unsorted_order()),
193 location,
194 format_version,
195 properties,
196 )
197 }
198
199 pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
201 if self.metadata.table_uuid != uuid {
202 self.metadata.table_uuid = uuid;
203 self.changes.push(TableUpdate::AssignUuid { uuid });
204 }
205
206 self
207 }
208
209 pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> Result<Self> {
214 if format_version < self.metadata.format_version {
215 return Err(Error::new(
216 ErrorKind::DataInvalid,
217 format!(
218 "Cannot downgrade FormatVersion from {} to {}",
219 self.metadata.format_version, format_version
220 ),
221 ));
222 }
223
224 if format_version != self.metadata.format_version {
225 match format_version {
226 FormatVersion::V1 => {
227 }
229 FormatVersion::V2 => {
230 self.metadata.format_version = format_version;
231 self.changes
232 .push(TableUpdate::UpgradeFormatVersion { format_version });
233 }
234 FormatVersion::V3 => {
235 self.metadata.format_version = format_version;
236 self.changes
237 .push(TableUpdate::UpgradeFormatVersion { format_version });
238 }
239 }
240 }
241
242 Ok(self)
243 }
244
245 pub fn set_properties(mut self, properties: HashMap<String, String>) -> Result<Self> {
254 let reserved_properties = properties
256 .keys()
257 .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
258 .map(ToString::to_string)
259 .collect::<Vec<_>>();
260
261 if !reserved_properties.is_empty() {
262 return Err(Error::new(
263 ErrorKind::DataInvalid,
264 format!(
265 "Table properties should not contain reserved properties, but got: [{}]",
266 reserved_properties.join(", ")
267 ),
268 ));
269 }
270
271 if properties.is_empty() {
272 return Ok(self);
273 }
274
275 self.metadata.properties.extend(properties.clone());
276 self.changes.push(TableUpdate::SetProperties {
277 updates: properties,
278 });
279
280 Ok(self)
281 }
282
283 pub fn remove_properties(mut self, properties: &[String]) -> Result<Self> {
289 let properties = properties.iter().cloned().collect::<HashSet<_>>();
291
292 let reserved_properties = properties
294 .iter()
295 .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
296 .map(ToString::to_string)
297 .collect::<Vec<_>>();
298
299 if !reserved_properties.is_empty() {
300 return Err(Error::new(
301 ErrorKind::DataInvalid,
302 format!(
303 "Table properties to remove contain reserved properties: [{}]",
304 reserved_properties.join(", ")
305 ),
306 ));
307 }
308
309 for property in &properties {
310 self.metadata.properties.remove(property);
311 }
312
313 if !properties.is_empty() {
314 self.changes.push(TableUpdate::RemoveProperties {
315 removals: properties.into_iter().collect(),
316 });
317 }
318
319 Ok(self)
320 }
321
322 pub fn set_location(mut self, location: String) -> Self {
324 let location = location.trim_end_matches('/').to_string();
325 if self.metadata.location != location {
326 self.changes.push(TableUpdate::SetLocation {
327 location: location.clone(),
328 });
329 self.metadata.location = location;
330 }
331
332 self
333 }
334
335 pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
344 if self
345 .metadata
346 .snapshots
347 .contains_key(&snapshot.snapshot_id())
348 {
349 return Err(Error::new(
350 ErrorKind::DataInvalid,
351 format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()),
352 ));
353 }
354
355 if self.metadata.format_version != FormatVersion::V1
356 && snapshot.sequence_number() <= self.metadata.last_sequence_number
357 && snapshot.parent_snapshot_id().is_some()
358 {
359 return Err(Error::new(
360 ErrorKind::DataInvalid,
361 format!(
362 "Cannot add snapshot with sequence number {} older than last sequence number {}",
363 snapshot.sequence_number(),
364 self.metadata.last_sequence_number
365 ),
366 ));
367 }
368
369 if let Some(last) = self.metadata.snapshot_log.last() {
370 if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
373 return Err(Error::new(
374 ErrorKind::DataInvalid,
375 format!(
376 "Invalid snapshot timestamp {}: before last snapshot timestamp {}",
377 snapshot.timestamp_ms(),
378 last.timestamp_ms
379 ),
380 ));
381 }
382 }
383
384 let max_last_updated = self
385 .last_updated_ms
386 .unwrap_or_default()
387 .max(self.metadata.last_updated_ms);
388 if snapshot.timestamp_ms() - max_last_updated < -ONE_MINUTE_MS {
389 return Err(Error::new(
390 ErrorKind::DataInvalid,
391 format!(
392 "Invalid snapshot timestamp {}: before last updated timestamp {}",
393 snapshot.timestamp_ms(),
394 max_last_updated
395 ),
396 ));
397 }
398
399 let mut added_rows = None;
400 if self.metadata.format_version >= MIN_FORMAT_VERSION_ROW_LINEAGE {
401 if let Some((first_row_id, added_rows_count)) = snapshot.row_range() {
402 if first_row_id < self.metadata.next_row_id {
403 return Err(Error::new(
404 ErrorKind::DataInvalid,
405 format!(
406 "Cannot add a snapshot, first-row-id is behind table next-row-id: {first_row_id} < {}",
407 self.metadata.next_row_id
408 ),
409 ));
410 }
411
412 added_rows = Some(added_rows_count);
413 } else {
414 return Err(Error::new(
415 ErrorKind::DataInvalid,
416 format!(
417 "Cannot add a snapshot: first-row-id is null. first-row-id must be set for format version >= {MIN_FORMAT_VERSION_ROW_LINEAGE}",
418 ),
419 ));
420 }
421 }
422
423 if let Some(added_rows) = added_rows {
424 self.metadata.next_row_id = self
425 .metadata
426 .next_row_id
427 .checked_add(added_rows)
428 .ok_or_else(|| {
429 Error::new(
430 ErrorKind::DataInvalid,
431 "Cannot add snapshot: next-row-id overflowed when adding added-rows",
432 )
433 })?;
434 }
435
436 self.changes.push(TableUpdate::AddSnapshot {
438 snapshot: snapshot.clone(),
439 });
440
441 self.last_updated_ms = Some(snapshot.timestamp_ms());
442 self.metadata.last_sequence_number = snapshot.sequence_number();
443 self.metadata
444 .snapshots
445 .insert(snapshot.snapshot_id(), snapshot.into());
446
447 Ok(self)
448 }
449
450 pub fn set_branch_snapshot(self, snapshot: Snapshot, branch: &str) -> Result<Self> {
456 let reference = self.metadata.refs.get(branch).cloned();
457
458 let reference = if let Some(mut reference) = reference {
459 if !reference.is_branch() {
460 return Err(Error::new(
461 ErrorKind::DataInvalid,
462 format!("Cannot append snapshot to non-branch reference '{branch}'",),
463 ));
464 }
465
466 reference.snapshot_id = snapshot.snapshot_id();
467 reference
468 } else {
469 SnapshotReference {
470 snapshot_id: snapshot.snapshot_id(),
471 retention: SnapshotRetention::Branch {
472 min_snapshots_to_keep: None,
473 max_snapshot_age_ms: None,
474 max_ref_age_ms: None,
475 },
476 }
477 };
478
479 self.add_snapshot(snapshot)?.set_ref(branch, reference)
480 }
481
482 pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
486 let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
487
488 self.metadata.snapshots.retain(|k, _| {
489 if snapshot_ids.contains(k) {
490 removed_snapshots.push(*k);
491 false
492 } else {
493 true
494 }
495 });
496
497 if !removed_snapshots.is_empty() {
498 self.changes.push(TableUpdate::RemoveSnapshots {
499 snapshot_ids: removed_snapshots,
500 });
501 }
502
503 self.metadata
505 .refs
506 .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id));
507
508 self
509 }
510
511 pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> Result<Self> {
516 if self
517 .metadata
518 .refs
519 .get(ref_name)
520 .is_some_and(|snap_ref| snap_ref.eq(&reference))
521 {
522 return Ok(self);
523 }
524
525 let Some(snapshot) = self.metadata.snapshots.get(&reference.snapshot_id) else {
526 return Err(Error::new(
527 ErrorKind::DataInvalid,
528 format!(
529 "Cannot set '{ref_name}' to unknown snapshot: '{}'",
530 reference.snapshot_id
531 ),
532 ));
533 };
534
535 let is_added_snapshot = self.changes.iter().any(|update| {
537 matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if snap.snapshot_id() == snapshot.snapshot_id())
538 });
539 if is_added_snapshot {
540 self.last_updated_ms = Some(snapshot.timestamp_ms());
541 }
542
543 if ref_name == MAIN_BRANCH {
545 self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
546 let timestamp_ms = if let Some(last_updated_ms) = self.last_updated_ms {
547 last_updated_ms
548 } else {
549 let last_updated_ms = chrono::Utc::now().timestamp_millis();
550 self.last_updated_ms = Some(last_updated_ms);
551 last_updated_ms
552 };
553
554 self.metadata.snapshot_log.push(SnapshotLog {
555 snapshot_id: snapshot.snapshot_id(),
556 timestamp_ms,
557 });
558 }
559
560 self.changes.push(TableUpdate::SetSnapshotRef {
561 ref_name: ref_name.to_string(),
562 reference: reference.clone(),
563 });
564 self.metadata.refs.insert(ref_name.to_string(), reference);
565
566 Ok(self)
567 }
568
569 pub fn remove_ref(mut self, ref_name: &str) -> Self {
573 if ref_name == MAIN_BRANCH {
574 self.metadata.current_snapshot_id = None;
575 }
576
577 if self.metadata.refs.remove(ref_name).is_some() || ref_name == MAIN_BRANCH {
578 self.changes.push(TableUpdate::RemoveSnapshotRef {
579 ref_name: ref_name.to_string(),
580 });
581 }
582
583 self
584 }
585
586 pub fn set_statistics(mut self, statistics: StatisticsFile) -> Self {
588 self.metadata
589 .statistics
590 .insert(statistics.snapshot_id, statistics.clone());
591 self.changes.push(TableUpdate::SetStatistics {
592 statistics: statistics.clone(),
593 });
594 self
595 }
596
597 pub fn remove_statistics(mut self, snapshot_id: i64) -> Self {
599 let previous = self.metadata.statistics.remove(&snapshot_id);
600 if previous.is_some() {
601 self.changes
602 .push(TableUpdate::RemoveStatistics { snapshot_id });
603 }
604 self
605 }
606
607 pub fn set_partition_statistics(
609 mut self,
610 partition_statistics_file: PartitionStatisticsFile,
611 ) -> Self {
612 self.metadata.partition_statistics.insert(
613 partition_statistics_file.snapshot_id,
614 partition_statistics_file.clone(),
615 );
616 self.changes.push(TableUpdate::SetPartitionStatistics {
617 partition_statistics: partition_statistics_file,
618 });
619 self
620 }
621
622 pub fn remove_partition_statistics(mut self, snapshot_id: i64) -> Self {
624 let previous = self.metadata.partition_statistics.remove(&snapshot_id);
625 if previous.is_some() {
626 self.changes
627 .push(TableUpdate::RemovePartitionStatistics { snapshot_id });
628 }
629 self
630 }
631
632 pub fn add_schema(mut self, schema: Schema) -> Result<Self> {
639 self.validate_schema_field_names(&schema)?;
641
642 let new_schema_id = self.reuse_or_create_new_schema_id(&schema);
643 let schema_found = self.metadata.schemas.contains_key(&new_schema_id);
644
645 if schema_found {
646 if self.last_added_schema_id != Some(new_schema_id) {
647 self.changes.push(TableUpdate::AddSchema {
648 schema: schema.clone(),
649 });
650 self.last_added_schema_id = Some(new_schema_id);
651 }
652
653 return Ok(self);
654 }
655
656 self.metadata.last_column_id =
659 std::cmp::max(self.metadata.last_column_id, schema.highest_field_id());
660
661 let schema = match new_schema_id == schema.schema_id() {
663 true => schema,
664 false => schema.with_schema_id(new_schema_id),
665 };
666
667 self.metadata
668 .schemas
669 .insert(new_schema_id, schema.clone().into());
670
671 self.changes.push(TableUpdate::AddSchema { schema });
672
673 self.last_added_schema_id = Some(new_schema_id);
674
675 Ok(self)
676 }
677
678 pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> {
686 if schema_id == Self::LAST_ADDED {
687 schema_id = self.last_added_schema_id.ok_or_else(|| {
688 Error::new(
689 ErrorKind::DataInvalid,
690 "Cannot set current schema to last added schema: no schema has been added.",
691 )
692 })?;
693 };
694 let schema_id = schema_id; if schema_id == self.metadata.current_schema_id {
697 return Ok(self);
698 }
699
700 let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| {
701 Error::new(
702 ErrorKind::DataInvalid,
703 format!("Cannot set current schema to unknown schema with id: '{schema_id}'"),
704 )
705 })?;
706
707 self.metadata.current_schema_id = schema_id;
713
714 if self.last_added_schema_id == Some(schema_id) {
715 self.changes.push(TableUpdate::SetCurrentSchema {
716 schema_id: Self::LAST_ADDED,
717 });
718 } else {
719 self.changes
720 .push(TableUpdate::SetCurrentSchema { schema_id });
721 }
722
723 Ok(self)
724 }
725
726 pub fn add_current_schema(self, schema: Schema) -> Result<Self> {
728 self.add_schema(schema)?
729 .set_current_schema(Self::LAST_ADDED)
730 }
731
732 fn validate_schema_field_names(&self, schema: &Schema) -> Result<()> {
741 if self.metadata.schemas.is_empty() {
742 return Ok(());
743 }
744
745 for field_name in schema.field_id_to_name_map().values() {
746 let has_partition_conflict = self.metadata.partition_name_exists(field_name);
747 let is_new_field = !self.metadata.name_exists_in_any_schema(field_name);
748
749 if has_partition_conflict && is_new_field {
750 return Err(Error::new(
751 ErrorKind::DataInvalid,
752 format!(
753 "Cannot add schema field '{field_name}' because it conflicts with existing partition field name. \
754 Schema evolution cannot introduce field names that match existing partition field names."
755 ),
756 ));
757 }
758 }
759
760 Ok(())
761 }
762
763 fn validate_partition_field_names(&self, unbound_spec: &UnboundPartitionSpec) -> Result<()> {
773 if self.metadata.schemas.is_empty() {
774 return Ok(());
775 }
776
777 let current_schema = self.get_current_schema()?;
778 for partition_field in unbound_spec.fields() {
779 let exists_in_any_schema = self
780 .metadata
781 .name_exists_in_any_schema(&partition_field.name);
782
783 if !exists_in_any_schema {
785 continue;
786 }
787
788 if let Some(schema_field) = current_schema.field_by_name(&partition_field.name) {
790 let is_identity_transform =
791 partition_field.transform == crate::spec::Transform::Identity;
792 let has_matching_source_id = schema_field.id == partition_field.source_id;
793
794 if !is_identity_transform {
795 return Err(Error::new(
796 ErrorKind::DataInvalid,
797 format!(
798 "Cannot create partition with name '{}' that conflicts with schema field and is not an identity transform.",
799 partition_field.name
800 ),
801 ));
802 }
803
804 if !has_matching_source_id {
805 return Err(Error::new(
806 ErrorKind::DataInvalid,
807 format!(
808 "Cannot create identity partition sourced from different field in schema. \
809 Field name '{}' has id `{}` in schema but partition source id is `{}`",
810 partition_field.name, schema_field.id, partition_field.source_id
811 ),
812 ));
813 }
814 }
815 }
816
817 Ok(())
818 }
819
820 pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
831 let schema = self.get_current_schema()?.clone();
832
833 self.validate_partition_field_names(&unbound_spec)?;
835
836 let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)?
837 .with_last_assigned_field_id(self.metadata.last_partition_id)
838 .build()?;
839
840 let new_spec_id = self.reuse_or_create_new_spec_id(&spec);
841 let spec_found = self.metadata.partition_specs.contains_key(&new_spec_id);
842 let spec = spec.with_spec_id(new_spec_id);
843 let unbound_spec = unbound_spec.with_spec_id(new_spec_id);
844
845 if spec_found {
846 if self.last_added_spec_id != Some(new_spec_id) {
847 self.changes
848 .push(TableUpdate::AddSpec { spec: unbound_spec });
849 self.last_added_spec_id = Some(new_spec_id);
850 }
851
852 return Ok(self);
853 }
854
855 if self.metadata.format_version <= FormatVersion::V1 && !spec.has_sequential_ids() {
856 return Err(Error::new(
857 ErrorKind::DataInvalid,
858 "Cannot add partition spec with non-sequential field ids to format version 1 table",
859 ));
860 }
861
862 let highest_field_id = spec
863 .highest_field_id()
864 .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID);
865 self.metadata
866 .partition_specs
867 .insert(new_spec_id, Arc::new(spec));
868 self.changes
869 .push(TableUpdate::AddSpec { spec: unbound_spec });
870
871 self.last_added_spec_id = Some(new_spec_id);
872 self.metadata.last_partition_id =
873 std::cmp::max(self.metadata.last_partition_id, highest_field_id);
874
875 Ok(self)
876 }
877
878 pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> Result<Self> {
884 if spec_id == Self::LAST_ADDED {
885 spec_id = self.last_added_spec_id.ok_or_else(|| {
886 Error::new(
887 ErrorKind::DataInvalid,
888 "Cannot set default partition spec to last added spec: no spec has been added.",
889 )
890 })?;
891 }
892
893 if self.metadata.default_spec.spec_id() == spec_id {
894 return Ok(self);
895 }
896
897 if !self.metadata.partition_specs.contains_key(&spec_id) {
898 return Err(Error::new(
899 ErrorKind::DataInvalid,
900 format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",),
901 ));
902 }
903
904 let schemaless_spec = self
905 .metadata
906 .partition_specs
907 .get(&spec_id)
908 .ok_or_else(|| {
909 Error::new(
910 ErrorKind::DataInvalid,
911 format!(
912 "Cannot set default partition spec to unknown spec with id: '{spec_id}'",
913 ),
914 )
915 })?
916 .clone();
917 let spec = Arc::unwrap_or_clone(schemaless_spec);
918 let spec_type = spec.partition_type(self.get_current_schema()?)?;
919 self.metadata.default_spec = Arc::new(spec);
920 self.metadata.default_partition_type = spec_type;
921
922 if self.last_added_spec_id == Some(spec_id) {
923 self.changes.push(TableUpdate::SetDefaultSpec {
924 spec_id: Self::LAST_ADDED,
925 });
926 } else {
927 self.changes.push(TableUpdate::SetDefaultSpec { spec_id });
928 }
929
930 Ok(self)
931 }
932
933 pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
935 self.add_partition_spec(unbound_spec)?
936 .set_default_partition_spec(Self::LAST_ADDED)
937 }
938
939 pub fn remove_partition_specs(mut self, spec_ids: &[i32]) -> Result<Self> {
946 if spec_ids.contains(&self.metadata.default_spec.spec_id()) {
947 return Err(Error::new(
948 ErrorKind::DataInvalid,
949 "Cannot remove default partition spec",
950 ));
951 }
952
953 let mut removed_specs = Vec::with_capacity(spec_ids.len());
954 spec_ids.iter().for_each(|id| {
955 if self.metadata.partition_specs.remove(id).is_some() {
956 removed_specs.push(*id);
957 }
958 });
959
960 if !removed_specs.is_empty() {
961 self.changes.push(TableUpdate::RemovePartitionSpecs {
962 spec_ids: removed_specs,
963 });
964 }
965
966 Ok(self)
967 }
968
969 pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result<Self> {
980 let new_order_id = self.reuse_or_create_new_sort_id(&sort_order);
981 let sort_order_found = self.metadata.sort_orders.contains_key(&new_order_id);
982
983 if sort_order_found {
984 if self.last_added_order_id != Some(new_order_id) {
985 self.changes.push(TableUpdate::AddSortOrder {
986 sort_order: sort_order.clone().with_order_id(new_order_id),
987 });
988 self.last_added_order_id = Some(new_order_id);
989 }
990
991 return Ok(self);
992 }
993
994 let schema = self.get_current_schema()?.clone().as_ref().clone();
995 let sort_order = SortOrder::builder()
996 .with_order_id(new_order_id)
997 .with_fields(sort_order.fields)
998 .build(&schema)
999 .map_err(|e| {
1000 Error::new(
1001 ErrorKind::DataInvalid,
1002 format!("Sort order to add is incompatible with current schema: {e}"),
1003 )
1004 .with_source(e)
1005 })?;
1006
1007 self.last_added_order_id = Some(new_order_id);
1008 self.metadata
1009 .sort_orders
1010 .insert(new_order_id, sort_order.clone().into());
1011 self.changes.push(TableUpdate::AddSortOrder { sort_order });
1012
1013 Ok(self)
1014 }
1015
1016 pub fn set_default_sort_order(mut self, mut sort_order_id: i64) -> Result<Self> {
1022 if sort_order_id == Self::LAST_ADDED as i64 {
1023 sort_order_id = self.last_added_order_id.ok_or_else(|| {
1024 Error::new(
1025 ErrorKind::DataInvalid,
1026 "Cannot set default sort order to last added order: no order has been added.",
1027 )
1028 })?;
1029 }
1030
1031 if self.metadata.default_sort_order_id == sort_order_id {
1032 return Ok(self);
1033 }
1034
1035 if !self.metadata.sort_orders.contains_key(&sort_order_id) {
1036 return Err(Error::new(
1037 ErrorKind::DataInvalid,
1038 format!(
1039 "Cannot set default sort order to unknown order with id: '{sort_order_id}'"
1040 ),
1041 ));
1042 }
1043
1044 self.metadata.default_sort_order_id = sort_order_id;
1045
1046 if self.last_added_order_id == Some(sort_order_id) {
1047 self.changes.push(TableUpdate::SetDefaultSortOrder {
1048 sort_order_id: Self::LAST_ADDED as i64,
1049 });
1050 } else {
1051 self.changes
1052 .push(TableUpdate::SetDefaultSortOrder { sort_order_id });
1053 }
1054
1055 Ok(self)
1056 }
1057
1058 fn add_default_sort_order(self, sort_order: SortOrder) -> Result<Self> {
1060 self.add_sort_order(sort_order)?
1061 .set_default_sort_order(Self::LAST_ADDED as i64)
1062 }
1063
1064 pub fn add_encryption_key(mut self, key: EncryptedKey) -> Self {
1066 let key_id = key.key_id().to_string();
1067 if self.metadata.encryption_keys.contains_key(&key_id) {
1068 return self;
1070 }
1071
1072 self.metadata.encryption_keys.insert(key_id, key.clone());
1073 self.changes.push(TableUpdate::AddEncryptionKey {
1074 encryption_key: key,
1075 });
1076 self
1077 }
1078
1079 pub fn remove_encryption_key(mut self, key_id: &str) -> Self {
1081 if self.metadata.encryption_keys.remove(key_id).is_some() {
1082 self.changes.push(TableUpdate::RemoveEncryptionKey {
1083 key_id: key_id.to_string(),
1084 });
1085 }
1086 self
1087 }
1088
1089 pub fn build(mut self) -> Result<TableMetadataBuildResult> {
1091 self.metadata.last_updated_ms = self
1092 .last_updated_ms
1093 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1094
1095 let schema = self.get_current_schema()?.clone();
1099 let sort_order = Arc::unwrap_or_clone(self.get_default_sort_order()?);
1100
1101 self.metadata.default_spec = Arc::new(
1102 Arc::unwrap_or_clone(self.metadata.default_spec)
1103 .into_unbound()
1104 .bind(schema.clone())?,
1105 );
1106 self.metadata.default_partition_type =
1107 self.metadata.default_spec.partition_type(&schema)?;
1108 SortOrder::builder()
1109 .with_fields(sort_order.fields)
1110 .build(&schema)?;
1111
1112 self.update_snapshot_log()?;
1113 self.metadata.try_normalize()?;
1114
1115 if let Some(hist_entry) = self.previous_history_entry.take() {
1116 self.metadata.metadata_log.push(hist_entry);
1117 }
1118 let expired_metadata_logs = self.expire_metadata_log();
1119
1120 Ok(TableMetadataBuildResult {
1121 metadata: self.metadata,
1122 changes: self.changes,
1123 expired_metadata_logs,
1124 })
1125 }
1126
1127 fn expire_metadata_log(&mut self) -> Vec<MetadataLog> {
1128 let max_size = self
1129 .metadata
1130 .properties
1131 .get(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
1132 .and_then(|v| v.parse::<usize>().ok())
1133 .unwrap_or(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
1134 .max(1);
1135
1136 if self.metadata.metadata_log.len() > max_size {
1137 self.metadata
1138 .metadata_log
1139 .drain(0..self.metadata.metadata_log.len() - max_size)
1140 .collect()
1141 } else {
1142 Vec::new()
1143 }
1144 }
1145
1146 fn update_snapshot_log(&mut self) -> Result<()> {
1147 let intermediate_snapshots = self.get_intermediate_snapshots();
1148 let has_removed_snapshots = self
1149 .changes
1150 .iter()
1151 .any(|update| matches!(update, TableUpdate::RemoveSnapshots { .. }));
1152
1153 if intermediate_snapshots.is_empty() && !has_removed_snapshots {
1154 return Ok(());
1155 }
1156
1157 let mut new_snapshot_log = Vec::new();
1158 for log_entry in &self.metadata.snapshot_log {
1159 let snapshot_id = log_entry.snapshot_id;
1160 if self.metadata.snapshots.contains_key(&snapshot_id) {
1161 if !intermediate_snapshots.contains(&snapshot_id) {
1162 new_snapshot_log.push(log_entry.clone());
1163 }
1164 } else if has_removed_snapshots {
1165 new_snapshot_log.clear();
1171 }
1172 }
1173
1174 if let Some(current_snapshot_id) = self.metadata.current_snapshot_id {
1175 let last_id = new_snapshot_log.last().map(|entry| entry.snapshot_id);
1176 if last_id != Some(current_snapshot_id) {
1177 return Err(Error::new(
1178 ErrorKind::DataInvalid,
1179 "Cannot set invalid snapshot log: latest entry is not the current snapshot",
1180 ));
1181 }
1182 };
1183
1184 self.metadata.snapshot_log = new_snapshot_log;
1185 Ok(())
1186 }
1187
1188 fn get_intermediate_snapshots(&self) -> HashSet<i64> {
1198 let added_snapshot_ids = self
1199 .changes
1200 .iter()
1201 .filter_map(|update| match update {
1202 TableUpdate::AddSnapshot { snapshot } => Some(snapshot.snapshot_id()),
1203 _ => None,
1204 })
1205 .collect::<HashSet<_>>();
1206
1207 self.changes
1208 .iter()
1209 .filter_map(|update| match update {
1210 TableUpdate::SetSnapshotRef {
1211 ref_name,
1212 reference,
1213 } => {
1214 if added_snapshot_ids.contains(&reference.snapshot_id)
1215 && ref_name == MAIN_BRANCH
1216 && reference.snapshot_id
1217 != self
1218 .metadata
1219 .current_snapshot_id
1220 .unwrap_or(i64::from(Self::LAST_ADDED))
1221 {
1222 Some(reference.snapshot_id)
1223 } else {
1224 None
1225 }
1226 }
1227 _ => None,
1228 })
1229 .collect()
1230 }
1231
1232 fn reassign_ids(
1233 schema: Schema,
1234 spec: UnboundPartitionSpec,
1235 sort_order: SortOrder,
1236 ) -> Result<(Schema, PartitionSpec, SortOrder)> {
1237 let previous_id_to_name = schema.field_id_to_name_map().clone();
1239 let fresh_schema = schema
1240 .into_builder()
1241 .with_schema_id(DEFAULT_SCHEMA_ID)
1242 .with_reassigned_field_ids(FIRST_FIELD_ID)
1243 .build()?;
1244
1245 let mut fresh_spec = PartitionSpecBuilder::new(fresh_schema.clone());
1247 for field in spec.fields() {
1248 let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1249 Error::new(
1250 ErrorKind::DataInvalid,
1251 format!(
1252 "Cannot find source column with id {} for partition column {} in schema.",
1253 field.source_id, field.name
1254 ),
1255 )
1256 })?;
1257 fresh_spec =
1258 fresh_spec.add_partition_field(source_field_name, &field.name, field.transform)?;
1259 }
1260 let fresh_spec = fresh_spec.build()?;
1261
1262 let mut fresh_order = SortOrder::builder();
1264 for mut field in sort_order.fields {
1265 let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1266 Error::new(
1267 ErrorKind::DataInvalid,
1268 format!(
1269 "Cannot find source column with id {} for sort column in schema.",
1270 field.source_id
1271 ),
1272 )
1273 })?;
1274 let new_field_id = fresh_schema
1275 .field_by_name(source_field_name)
1276 .ok_or_else(|| {
1277 Error::new(
1278 ErrorKind::Unexpected,
1279 format!(
1280 "Cannot find source column with name {source_field_name} for sort column in re-assigned schema."
1281 ),
1282 )
1283 })?.id;
1284 field.source_id = new_field_id;
1285 fresh_order.with_sort_field(field);
1286 }
1287 let fresh_sort_order = fresh_order.build(&fresh_schema)?;
1288
1289 Ok((fresh_schema, fresh_spec, fresh_sort_order))
1290 }
1291
1292 fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 {
1293 self.metadata
1294 .schemas
1295 .iter()
1296 .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
1297 .unwrap_or_else(|| self.get_highest_schema_id() + 1)
1298 }
1299
1300 fn get_highest_schema_id(&self) -> i32 {
1301 *self
1302 .metadata
1303 .schemas
1304 .keys()
1305 .max()
1306 .unwrap_or(&self.metadata.current_schema_id)
1307 }
1308
1309 fn get_current_schema(&self) -> Result<&SchemaRef> {
1310 self.metadata
1311 .schemas
1312 .get(&self.metadata.current_schema_id)
1313 .ok_or_else(|| {
1314 Error::new(
1315 ErrorKind::DataInvalid,
1316 format!(
1317 "Current schema with id '{}' not found in table metadata.",
1318 self.metadata.current_schema_id
1319 ),
1320 )
1321 })
1322 }
1323
1324 fn get_default_sort_order(&self) -> Result<SortOrderRef> {
1325 self.metadata
1326 .sort_orders
1327 .get(&self.metadata.default_sort_order_id)
1328 .cloned()
1329 .ok_or_else(|| {
1330 Error::new(
1331 ErrorKind::DataInvalid,
1332 format!(
1333 "Default sort order with id '{}' not found in table metadata.",
1334 self.metadata.default_sort_order_id
1335 ),
1336 )
1337 })
1338 }
1339
1340 fn reuse_or_create_new_spec_id(&self, new_spec: &PartitionSpec) -> i32 {
1342 self.metadata
1343 .partition_specs
1344 .iter()
1345 .find_map(|(id, old_spec)| new_spec.is_compatible_with(old_spec).then_some(*id))
1346 .unwrap_or_else(|| {
1347 self.get_highest_spec_id()
1348 .map(|id| id + 1)
1349 .unwrap_or(DEFAULT_PARTITION_SPEC_ID)
1350 })
1351 }
1352
1353 fn get_highest_spec_id(&self) -> Option<i32> {
1354 self.metadata.partition_specs.keys().max().copied()
1355 }
1356
1357 fn reuse_or_create_new_sort_id(&self, new_sort_order: &SortOrder) -> i64 {
1359 if new_sort_order.is_unsorted() {
1360 return SortOrder::unsorted_order().order_id;
1361 }
1362
1363 self.metadata
1364 .sort_orders
1365 .iter()
1366 .find_map(|(id, sort_order)| {
1367 sort_order.fields.eq(&new_sort_order.fields).then_some(*id)
1368 })
1369 .unwrap_or_else(|| {
1370 self.highest_sort_order_id()
1371 .unwrap_or(SortOrder::unsorted_order().order_id)
1372 + 1
1373 })
1374 }
1375
1376 fn highest_sort_order_id(&self) -> Option<i64> {
1377 self.metadata.sort_orders.keys().max().copied()
1378 }
1379
1380 pub fn remove_schemas(mut self, schema_id_to_remove: &[i32]) -> Result<Self> {
1383 if schema_id_to_remove.contains(&self.metadata.current_schema_id) {
1384 return Err(Error::new(
1385 ErrorKind::DataInvalid,
1386 "Cannot remove current schema",
1387 ));
1388 }
1389
1390 if schema_id_to_remove.is_empty() {
1391 return Ok(self);
1392 }
1393
1394 let mut removed_schemas = Vec::with_capacity(schema_id_to_remove.len());
1395 self.metadata.schemas.retain(|id, _schema| {
1396 if schema_id_to_remove.contains(id) {
1397 removed_schemas.push(*id);
1398 false
1399 } else {
1400 true
1401 }
1402 });
1403
1404 self.changes.push(TableUpdate::RemoveSchemas {
1405 schema_ids: removed_schemas,
1406 });
1407
1408 Ok(self)
1409 }
1410}
1411
1412impl From<TableMetadataBuildResult> for TableMetadata {
1413 fn from(result: TableMetadataBuildResult) -> Self {
1414 result.metadata
1415 }
1416}
1417
1418#[cfg(test)]
1419mod tests {
1420 use std::fs::File;
1421 use std::io::BufReader;
1422 use std::thread::sleep;
1423
1424 use super::*;
1425 use crate::TableIdent;
1426 use crate::io::FileIOBuilder;
1427 use crate::spec::{
1428 BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema,
1429 SnapshotRetention, SortDirection, SortField, StructType, Summary, TableProperties,
1430 Transform, Type, UnboundPartitionField,
1431 };
1432 use crate::table::Table;
1433
1434 const TEST_LOCATION: &str = "s3://bucket/test/location";
1435 const LAST_ASSIGNED_COLUMN_ID: i32 = 3;
1436
1437 fn schema() -> Schema {
1438 Schema::builder()
1439 .with_fields(vec![
1440 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1441 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1442 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1443 ])
1444 .build()
1445 .unwrap()
1446 }
1447
1448 fn sort_order() -> SortOrder {
1449 let schema = schema();
1450 SortOrder::builder()
1451 .with_order_id(1)
1452 .with_sort_field(SortField {
1453 source_id: 3,
1454 transform: Transform::Bucket(4),
1455 direction: SortDirection::Descending,
1456 null_order: NullOrder::First,
1457 })
1458 .build(&schema)
1459 .unwrap()
1460 }
1461
1462 fn partition_spec() -> UnboundPartitionSpec {
1463 UnboundPartitionSpec::builder()
1464 .with_spec_id(0)
1465 .add_partition_field(2, "y", Transform::Identity)
1466 .unwrap()
1467 .build()
1468 }
1469
1470 fn builder_without_changes(format_version: FormatVersion) -> TableMetadataBuilder {
1471 TableMetadataBuilder::new(
1472 schema(),
1473 partition_spec(),
1474 sort_order(),
1475 TEST_LOCATION.to_string(),
1476 format_version,
1477 HashMap::new(),
1478 )
1479 .unwrap()
1480 .build()
1481 .unwrap()
1482 .metadata
1483 .into_builder(Some(
1484 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1485 ))
1486 }
1487
1488 #[test]
1489 fn test_minimal_build() {
1490 let metadata = TableMetadataBuilder::new(
1491 schema(),
1492 partition_spec(),
1493 sort_order(),
1494 TEST_LOCATION.to_string(),
1495 FormatVersion::V1,
1496 HashMap::new(),
1497 )
1498 .unwrap()
1499 .build()
1500 .unwrap()
1501 .metadata;
1502
1503 assert_eq!(metadata.format_version, FormatVersion::V1);
1504 assert_eq!(metadata.location, TEST_LOCATION);
1505 assert_eq!(metadata.current_schema_id, 0);
1506 assert_eq!(metadata.default_spec.spec_id(), 0);
1507 assert_eq!(metadata.default_sort_order_id, 1);
1508 assert_eq!(metadata.last_partition_id, 1000);
1509 assert_eq!(metadata.last_column_id, 3);
1510 assert_eq!(metadata.snapshots.len(), 0);
1511 assert_eq!(metadata.current_snapshot_id, None);
1512 assert_eq!(metadata.refs.len(), 0);
1513 assert_eq!(metadata.properties.len(), 0);
1514 assert_eq!(metadata.metadata_log.len(), 0);
1515 assert_eq!(metadata.last_sequence_number, 0);
1516 assert_eq!(metadata.last_column_id, LAST_ASSIGNED_COLUMN_ID);
1517
1518 let _ = serde_json::to_string(&metadata).unwrap();
1520
1521 let metadata = metadata
1523 .into_builder(Some(
1524 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1525 ))
1526 .upgrade_format_version(FormatVersion::V2)
1527 .unwrap()
1528 .build()
1529 .unwrap()
1530 .metadata;
1531
1532 assert_eq!(metadata.format_version, FormatVersion::V2);
1533 let _ = serde_json::to_string(&metadata).unwrap();
1534 }
1535
1536 #[test]
1537 fn test_build_unpartitioned_unsorted() {
1538 let schema = Schema::builder().build().unwrap();
1539 let metadata = TableMetadataBuilder::new(
1540 schema.clone(),
1541 PartitionSpec::unpartition_spec(),
1542 SortOrder::unsorted_order(),
1543 TEST_LOCATION.to_string(),
1544 FormatVersion::V2,
1545 HashMap::new(),
1546 )
1547 .unwrap()
1548 .build()
1549 .unwrap()
1550 .metadata;
1551
1552 assert_eq!(metadata.format_version, FormatVersion::V2);
1553 assert_eq!(metadata.location, TEST_LOCATION);
1554 assert_eq!(metadata.current_schema_id, 0);
1555 assert_eq!(metadata.default_spec.spec_id(), 0);
1556 assert_eq!(metadata.default_sort_order_id, 0);
1557 assert_eq!(metadata.last_partition_id, UNPARTITIONED_LAST_ASSIGNED_ID);
1558 assert_eq!(metadata.last_column_id, 0);
1559 assert_eq!(metadata.snapshots.len(), 0);
1560 assert_eq!(metadata.current_snapshot_id, None);
1561 assert_eq!(metadata.refs.len(), 0);
1562 assert_eq!(metadata.properties.len(), 0);
1563 assert_eq!(metadata.metadata_log.len(), 0);
1564 assert_eq!(metadata.last_sequence_number, 0);
1565 }
1566
1567 #[test]
1568 fn test_reassigns_ids() {
1569 let schema = Schema::builder()
1570 .with_schema_id(10)
1571 .with_fields(vec![
1572 NestedField::required(11, "a", Type::Primitive(PrimitiveType::Long)).into(),
1573 NestedField::required(12, "b", Type::Primitive(PrimitiveType::Long)).into(),
1574 NestedField::required(
1575 13,
1576 "struct",
1577 Type::Struct(StructType::new(vec![
1578 NestedField::required(14, "nested", Type::Primitive(PrimitiveType::Long))
1579 .into(),
1580 ])),
1581 )
1582 .into(),
1583 NestedField::required(15, "c", Type::Primitive(PrimitiveType::Long)).into(),
1584 ])
1585 .build()
1586 .unwrap();
1587 let spec = PartitionSpec::builder(schema.clone())
1588 .with_spec_id(20)
1589 .add_partition_field("a", "a", Transform::Identity)
1590 .unwrap()
1591 .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1592 .unwrap()
1593 .build()
1594 .unwrap();
1595 let sort_order = SortOrder::builder()
1596 .with_fields(vec![SortField {
1597 source_id: 11,
1598 transform: Transform::Identity,
1599 direction: SortDirection::Ascending,
1600 null_order: NullOrder::First,
1601 }])
1602 .with_order_id(10)
1603 .build(&schema)
1604 .unwrap();
1605
1606 let (fresh_schema, fresh_spec, fresh_sort_order) =
1607 TableMetadataBuilder::reassign_ids(schema, spec.into_unbound(), sort_order).unwrap();
1608
1609 let expected_schema = Schema::builder()
1610 .with_fields(vec![
1611 NestedField::required(1, "a", Type::Primitive(PrimitiveType::Long)).into(),
1612 NestedField::required(2, "b", Type::Primitive(PrimitiveType::Long)).into(),
1613 NestedField::required(
1614 3,
1615 "struct",
1616 Type::Struct(StructType::new(vec![
1617 NestedField::required(5, "nested", Type::Primitive(PrimitiveType::Long))
1618 .into(),
1619 ])),
1620 )
1621 .into(),
1622 NestedField::required(4, "c", Type::Primitive(PrimitiveType::Long)).into(),
1623 ])
1624 .build()
1625 .unwrap();
1626
1627 let expected_spec = PartitionSpec::builder(expected_schema.clone())
1628 .with_spec_id(0)
1629 .add_partition_field("a", "a", Transform::Identity)
1630 .unwrap()
1631 .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1632 .unwrap()
1633 .build()
1634 .unwrap();
1635
1636 let expected_sort_order = SortOrder::builder()
1637 .with_fields(vec![SortField {
1638 source_id: 1,
1639 transform: Transform::Identity,
1640 direction: SortDirection::Ascending,
1641 null_order: NullOrder::First,
1642 }])
1643 .with_order_id(1)
1644 .build(&expected_schema)
1645 .unwrap();
1646
1647 assert_eq!(fresh_schema, expected_schema);
1648 assert_eq!(fresh_spec, expected_spec);
1649 assert_eq!(fresh_sort_order, expected_sort_order);
1650 }
1651
1652 #[test]
1653 fn test_ids_are_reassigned_for_new_metadata() {
1654 let schema = schema().into_builder().with_schema_id(10).build().unwrap();
1655
1656 let metadata = TableMetadataBuilder::new(
1657 schema,
1658 partition_spec(),
1659 sort_order(),
1660 TEST_LOCATION.to_string(),
1661 FormatVersion::V1,
1662 HashMap::new(),
1663 )
1664 .unwrap()
1665 .build()
1666 .unwrap()
1667 .metadata;
1668
1669 assert_eq!(metadata.current_schema_id, 0);
1670 assert_eq!(metadata.current_schema().schema_id(), 0);
1671 }
1672
1673 #[test]
1674 fn test_new_metadata_changes() {
1675 let changes = TableMetadataBuilder::new(
1676 schema(),
1677 partition_spec(),
1678 sort_order(),
1679 TEST_LOCATION.to_string(),
1680 FormatVersion::V1,
1681 HashMap::from_iter(vec![("property 1".to_string(), "value 1".to_string())]),
1682 )
1683 .unwrap()
1684 .build()
1685 .unwrap()
1686 .changes;
1687
1688 pretty_assertions::assert_eq!(changes, vec![
1689 TableUpdate::SetLocation {
1690 location: TEST_LOCATION.to_string()
1691 },
1692 TableUpdate::AddSchema { schema: schema() },
1693 TableUpdate::SetCurrentSchema { schema_id: -1 },
1694 TableUpdate::AddSpec {
1695 spec: PartitionSpec::builder(schema())
1698 .with_spec_id(0)
1699 .add_unbound_field(UnboundPartitionField {
1700 name: "y".to_string(),
1701 transform: Transform::Identity,
1702 source_id: 2,
1703 field_id: Some(1000)
1704 })
1705 .unwrap()
1706 .build()
1707 .unwrap()
1708 .into_unbound(),
1709 },
1710 TableUpdate::SetDefaultSpec { spec_id: -1 },
1711 TableUpdate::AddSortOrder {
1712 sort_order: sort_order(),
1713 },
1714 TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1715 TableUpdate::SetProperties {
1716 updates: HashMap::from_iter(vec![(
1717 "property 1".to_string(),
1718 "value 1".to_string()
1719 )]),
1720 }
1721 ]);
1722 }
1723
1724 #[test]
1725 fn test_new_metadata_changes_unpartitioned_unsorted() {
1726 let schema = Schema::builder().build().unwrap();
1727 let changes = TableMetadataBuilder::new(
1728 schema.clone(),
1729 PartitionSpec::unpartition_spec().into_unbound(),
1730 SortOrder::unsorted_order(),
1731 TEST_LOCATION.to_string(),
1732 FormatVersion::V1,
1733 HashMap::new(),
1734 )
1735 .unwrap()
1736 .build()
1737 .unwrap()
1738 .changes;
1739
1740 pretty_assertions::assert_eq!(changes, vec![
1741 TableUpdate::SetLocation {
1742 location: TEST_LOCATION.to_string()
1743 },
1744 TableUpdate::AddSchema {
1745 schema: Schema::builder().build().unwrap(),
1746 },
1747 TableUpdate::SetCurrentSchema { schema_id: -1 },
1748 TableUpdate::AddSpec {
1749 spec: PartitionSpec::builder(schema)
1752 .with_spec_id(0)
1753 .build()
1754 .unwrap()
1755 .into_unbound(),
1756 },
1757 TableUpdate::SetDefaultSpec { spec_id: -1 },
1758 TableUpdate::AddSortOrder {
1759 sort_order: SortOrder::unsorted_order(),
1760 },
1761 TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1762 ]);
1763 }
1764
1765 #[test]
1766 fn test_add_partition_spec() {
1767 let builder = builder_without_changes(FormatVersion::V2);
1768
1769 let added_spec = UnboundPartitionSpec::builder()
1770 .with_spec_id(10)
1771 .add_partition_fields(vec![
1772 UnboundPartitionField {
1773 name: "y".to_string(),
1775 transform: Transform::Identity,
1776 source_id: 2,
1777 field_id: Some(1000),
1778 },
1779 UnboundPartitionField {
1780 name: "z".to_string(),
1782 transform: Transform::Identity,
1783 source_id: 3,
1784 field_id: None,
1785 },
1786 ])
1787 .unwrap()
1788 .build();
1789
1790 let build_result = builder
1791 .add_partition_spec(added_spec.clone())
1792 .unwrap()
1793 .build()
1794 .unwrap();
1795
1796 let expected_change = added_spec.with_spec_id(1);
1798 let expected_spec = PartitionSpec::builder(schema())
1799 .with_spec_id(1)
1800 .add_unbound_field(UnboundPartitionField {
1801 name: "y".to_string(),
1802 transform: Transform::Identity,
1803 source_id: 2,
1804 field_id: Some(1000),
1805 })
1806 .unwrap()
1807 .add_unbound_field(UnboundPartitionField {
1808 name: "z".to_string(),
1809 transform: Transform::Identity,
1810 source_id: 3,
1811 field_id: Some(1001),
1812 })
1813 .unwrap()
1814 .build()
1815 .unwrap();
1816
1817 assert_eq!(build_result.changes.len(), 1);
1818 assert_eq!(
1819 build_result.metadata.partition_spec_by_id(1),
1820 Some(&Arc::new(expected_spec))
1821 );
1822 assert_eq!(build_result.metadata.default_spec.spec_id(), 0);
1823 assert_eq!(build_result.metadata.last_partition_id, 1001);
1824 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1825 spec: expected_change
1826 });
1827
1828 let build_result = build_result
1830 .metadata
1831 .into_builder(Some(
1832 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1833 ))
1834 .remove_partition_specs(&[1])
1835 .unwrap()
1836 .build()
1837 .unwrap();
1838
1839 assert_eq!(build_result.changes.len(), 1);
1840 assert_eq!(build_result.metadata.partition_specs.len(), 1);
1841 assert!(build_result.metadata.partition_spec_by_id(1).is_none());
1842 }
1843
1844 #[test]
1845 fn test_set_default_partition_spec() {
1846 let builder = builder_without_changes(FormatVersion::V2);
1847 let schema = builder.get_current_schema().unwrap().clone();
1848 let added_spec = UnboundPartitionSpec::builder()
1849 .with_spec_id(10)
1850 .add_partition_field(1, "y_bucket[2]", Transform::Bucket(2))
1851 .unwrap()
1852 .build();
1853
1854 let build_result = builder
1855 .add_partition_spec(added_spec.clone())
1856 .unwrap()
1857 .set_default_partition_spec(-1)
1858 .unwrap()
1859 .build()
1860 .unwrap();
1861
1862 let expected_spec = PartitionSpec::builder(schema)
1863 .with_spec_id(1)
1864 .add_unbound_field(UnboundPartitionField {
1865 name: "y_bucket[2]".to_string(),
1866 transform: Transform::Bucket(2),
1867 source_id: 1,
1868 field_id: Some(1001),
1869 })
1870 .unwrap()
1871 .build()
1872 .unwrap();
1873
1874 assert_eq!(build_result.changes.len(), 2);
1875 assert_eq!(build_result.metadata.default_spec, Arc::new(expected_spec));
1876 assert_eq!(build_result.changes, vec![
1877 TableUpdate::AddSpec {
1878 spec: added_spec.with_spec_id(1)
1880 },
1881 TableUpdate::SetDefaultSpec { spec_id: -1 }
1882 ]);
1883 }
1884
1885 #[test]
1886 fn test_set_existing_default_partition_spec() {
1887 let builder = builder_without_changes(FormatVersion::V2);
1888 let unbound_spec = UnboundPartitionSpec::builder().with_spec_id(1).build();
1890 let build_result = builder
1891 .add_partition_spec(unbound_spec.clone())
1892 .unwrap()
1893 .set_default_partition_spec(-1)
1894 .unwrap()
1895 .build()
1896 .unwrap();
1897
1898 assert_eq!(build_result.changes.len(), 2);
1899 assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1900 spec: unbound_spec.clone()
1901 });
1902 assert_eq!(build_result.changes[1], TableUpdate::SetDefaultSpec {
1903 spec_id: -1
1904 });
1905 assert_eq!(
1906 build_result.metadata.default_spec,
1907 Arc::new(
1908 unbound_spec
1909 .bind(build_result.metadata.current_schema().clone())
1910 .unwrap()
1911 )
1912 );
1913
1914 let build_result = build_result
1916 .metadata
1917 .into_builder(Some(
1918 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1919 ))
1920 .set_default_partition_spec(0)
1921 .unwrap()
1922 .build()
1923 .unwrap();
1924
1925 assert_eq!(build_result.changes.len(), 1);
1926 assert_eq!(build_result.changes[0], TableUpdate::SetDefaultSpec {
1927 spec_id: 0
1928 });
1929 assert_eq!(
1930 build_result.metadata.default_spec,
1931 Arc::new(
1932 partition_spec()
1933 .bind(build_result.metadata.current_schema().clone())
1934 .unwrap()
1935 )
1936 );
1937 }
1938
1939 #[test]
1940 fn test_add_sort_order() {
1941 let builder = builder_without_changes(FormatVersion::V2);
1942
1943 let added_sort_order = SortOrder::builder()
1944 .with_order_id(10)
1945 .with_fields(vec![SortField {
1946 source_id: 1,
1947 transform: Transform::Identity,
1948 direction: SortDirection::Ascending,
1949 null_order: NullOrder::First,
1950 }])
1951 .build(&schema())
1952 .unwrap();
1953
1954 let build_result = builder
1955 .add_sort_order(added_sort_order.clone())
1956 .unwrap()
1957 .build()
1958 .unwrap();
1959
1960 let expected_sort_order = added_sort_order.with_order_id(2);
1961
1962 assert_eq!(build_result.changes.len(), 1);
1963 assert_eq!(build_result.metadata.sort_orders.keys().max(), Some(&2));
1964 pretty_assertions::assert_eq!(
1965 build_result.metadata.sort_order_by_id(2),
1966 Some(&Arc::new(expected_sort_order.clone()))
1967 );
1968 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSortOrder {
1969 sort_order: expected_sort_order
1970 });
1971 }
1972
1973 #[test]
1974 fn test_add_compatible_schema() {
1975 let builder = builder_without_changes(FormatVersion::V2);
1976
1977 let added_schema = Schema::builder()
1978 .with_schema_id(1)
1979 .with_fields(vec![
1980 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1981 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1982 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1983 NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
1984 ])
1985 .build()
1986 .unwrap();
1987
1988 let build_result = builder
1989 .add_current_schema(added_schema.clone())
1990 .unwrap()
1991 .build()
1992 .unwrap();
1993
1994 assert_eq!(build_result.changes.len(), 2);
1995 assert_eq!(build_result.metadata.schemas.keys().max(), Some(&1));
1996 pretty_assertions::assert_eq!(
1997 build_result.metadata.schema_by_id(1),
1998 Some(&Arc::new(added_schema.clone()))
1999 );
2000 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSchema {
2001 schema: added_schema
2002 });
2003 assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2004 schema_id: -1
2005 });
2006 }
2007
2008 #[test]
2009 fn test_set_current_schema_change_is_minus_one_if_schema_was_added_in_this_change() {
2010 let builder = builder_without_changes(FormatVersion::V2);
2011
2012 let added_schema = Schema::builder()
2013 .with_schema_id(1)
2014 .with_fields(vec![
2015 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
2016 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
2017 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
2018 NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
2019 ])
2020 .build()
2021 .unwrap();
2022
2023 let build_result = builder
2024 .add_schema(added_schema.clone())
2025 .unwrap()
2026 .set_current_schema(1)
2027 .unwrap()
2028 .build()
2029 .unwrap();
2030
2031 assert_eq!(build_result.changes.len(), 2);
2032 assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2033 schema_id: -1
2034 });
2035 }
2036
2037 #[test]
2038 fn test_no_metadata_log_for_create_table() {
2039 let build_result = TableMetadataBuilder::new(
2040 schema(),
2041 partition_spec(),
2042 sort_order(),
2043 TEST_LOCATION.to_string(),
2044 FormatVersion::V2,
2045 HashMap::new(),
2046 )
2047 .unwrap()
2048 .build()
2049 .unwrap();
2050
2051 assert_eq!(build_result.metadata.metadata_log.len(), 0);
2052 }
2053
2054 #[test]
2055 fn test_no_metadata_log_entry_for_no_previous_location() {
2056 let metadata = builder_without_changes(FormatVersion::V2)
2058 .build()
2059 .unwrap()
2060 .metadata;
2061 assert_eq!(metadata.metadata_log.len(), 1);
2062
2063 let build_result = metadata
2064 .into_builder(None)
2065 .set_properties(HashMap::from_iter(vec![(
2066 "foo".to_string(),
2067 "bar".to_string(),
2068 )]))
2069 .unwrap()
2070 .build()
2071 .unwrap();
2072
2073 assert_eq!(build_result.metadata.metadata_log.len(), 1);
2074 }
2075
2076 #[test]
2077 fn test_from_metadata_generates_metadata_log() {
2078 let metadata_path = "s3://bucket/test/location/metadata/metadata1.json";
2079 let builder = TableMetadataBuilder::new(
2080 schema(),
2081 partition_spec(),
2082 sort_order(),
2083 TEST_LOCATION.to_string(),
2084 FormatVersion::V2,
2085 HashMap::new(),
2086 )
2087 .unwrap()
2088 .build()
2089 .unwrap()
2090 .metadata
2091 .into_builder(Some(metadata_path.to_string()));
2092
2093 let builder = builder
2094 .add_default_sort_order(SortOrder::unsorted_order())
2095 .unwrap();
2096
2097 let build_result = builder.build().unwrap();
2098
2099 assert_eq!(build_result.metadata.metadata_log.len(), 1);
2100 assert_eq!(
2101 build_result.metadata.metadata_log[0].metadata_file,
2102 metadata_path
2103 );
2104 }
2105
2106 #[test]
2107 fn test_set_ref() {
2108 let builder = builder_without_changes(FormatVersion::V2);
2109
2110 let snapshot = Snapshot::builder()
2111 .with_snapshot_id(1)
2112 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2113 .with_sequence_number(0)
2114 .with_schema_id(0)
2115 .with_manifest_list("/snap-1.avro")
2116 .with_summary(Summary {
2117 operation: Operation::Append,
2118 additional_properties: HashMap::from_iter(vec![
2119 (
2120 "spark.app.id".to_string(),
2121 "local-1662532784305".to_string(),
2122 ),
2123 ("added-data-files".to_string(), "4".to_string()),
2124 ("added-records".to_string(), "4".to_string()),
2125 ("added-files-size".to_string(), "6001".to_string()),
2126 ]),
2127 })
2128 .build();
2129
2130 let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2131
2132 assert!(
2133 builder
2134 .clone()
2135 .set_ref(MAIN_BRANCH, SnapshotReference {
2136 snapshot_id: 10,
2137 retention: SnapshotRetention::Branch {
2138 min_snapshots_to_keep: Some(10),
2139 max_snapshot_age_ms: None,
2140 max_ref_age_ms: None,
2141 },
2142 })
2143 .unwrap_err()
2144 .to_string()
2145 .contains("Cannot set 'main' to unknown snapshot: '10'")
2146 );
2147
2148 let build_result = builder
2149 .set_ref(MAIN_BRANCH, SnapshotReference {
2150 snapshot_id: 1,
2151 retention: SnapshotRetention::Branch {
2152 min_snapshots_to_keep: Some(10),
2153 max_snapshot_age_ms: None,
2154 max_ref_age_ms: None,
2155 },
2156 })
2157 .unwrap()
2158 .build()
2159 .unwrap();
2160 assert_eq!(build_result.metadata.snapshots.len(), 1);
2161 assert_eq!(
2162 build_result.metadata.snapshot_by_id(1),
2163 Some(&Arc::new(snapshot.clone()))
2164 );
2165 assert_eq!(build_result.metadata.snapshot_log, vec![SnapshotLog {
2166 snapshot_id: 1,
2167 timestamp_ms: snapshot.timestamp_ms()
2168 }])
2169 }
2170
2171 #[test]
2172 fn test_snapshot_log_skips_intermediates() {
2173 let builder = builder_without_changes(FormatVersion::V2);
2174
2175 let snapshot_1 = Snapshot::builder()
2176 .with_snapshot_id(1)
2177 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2178 .with_sequence_number(0)
2179 .with_schema_id(0)
2180 .with_manifest_list("/snap-1.avro")
2181 .with_summary(Summary {
2182 operation: Operation::Append,
2183 additional_properties: HashMap::from_iter(vec![
2184 (
2185 "spark.app.id".to_string(),
2186 "local-1662532784305".to_string(),
2187 ),
2188 ("added-data-files".to_string(), "4".to_string()),
2189 ("added-records".to_string(), "4".to_string()),
2190 ("added-files-size".to_string(), "6001".to_string()),
2191 ]),
2192 })
2193 .build();
2194
2195 let snapshot_2 = Snapshot::builder()
2196 .with_snapshot_id(2)
2197 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2198 .with_sequence_number(0)
2199 .with_schema_id(0)
2200 .with_manifest_list("/snap-1.avro")
2201 .with_summary(Summary {
2202 operation: Operation::Append,
2203 additional_properties: HashMap::from_iter(vec![
2204 (
2205 "spark.app.id".to_string(),
2206 "local-1662532784305".to_string(),
2207 ),
2208 ("added-data-files".to_string(), "4".to_string()),
2209 ("added-records".to_string(), "4".to_string()),
2210 ("added-files-size".to_string(), "6001".to_string()),
2211 ]),
2212 })
2213 .build();
2214
2215 let result = builder
2216 .add_snapshot(snapshot_1)
2217 .unwrap()
2218 .set_ref(MAIN_BRANCH, SnapshotReference {
2219 snapshot_id: 1,
2220 retention: SnapshotRetention::Branch {
2221 min_snapshots_to_keep: Some(10),
2222 max_snapshot_age_ms: None,
2223 max_ref_age_ms: None,
2224 },
2225 })
2226 .unwrap()
2227 .set_branch_snapshot(snapshot_2.clone(), MAIN_BRANCH)
2228 .unwrap()
2229 .build()
2230 .unwrap();
2231
2232 assert_eq!(result.metadata.snapshot_log, vec![SnapshotLog {
2233 snapshot_id: 2,
2234 timestamp_ms: snapshot_2.timestamp_ms()
2235 }]);
2236 assert_eq!(result.metadata.current_snapshot().unwrap().snapshot_id(), 2);
2237 }
2238
2239 #[test]
2240 fn test_remove_main_ref_keeps_snapshot_log() {
2241 let builder = builder_without_changes(FormatVersion::V2);
2242
2243 let snapshot = Snapshot::builder()
2244 .with_snapshot_id(1)
2245 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2246 .with_sequence_number(0)
2247 .with_schema_id(0)
2248 .with_manifest_list("/snap-1.avro")
2249 .with_summary(Summary {
2250 operation: Operation::Append,
2251 additional_properties: HashMap::from_iter(vec![
2252 (
2253 "spark.app.id".to_string(),
2254 "local-1662532784305".to_string(),
2255 ),
2256 ("added-data-files".to_string(), "4".to_string()),
2257 ("added-records".to_string(), "4".to_string()),
2258 ("added-files-size".to_string(), "6001".to_string()),
2259 ]),
2260 })
2261 .build();
2262
2263 let result = builder
2264 .add_snapshot(snapshot.clone())
2265 .unwrap()
2266 .set_ref(MAIN_BRANCH, SnapshotReference {
2267 snapshot_id: 1,
2268 retention: SnapshotRetention::Branch {
2269 min_snapshots_to_keep: Some(10),
2270 max_snapshot_age_ms: None,
2271 max_ref_age_ms: None,
2272 },
2273 })
2274 .unwrap()
2275 .build()
2276 .unwrap();
2277
2278 assert_eq!(result.metadata.snapshot_log.len(), 1);
2280 assert_eq!(result.metadata.snapshot_log[0].snapshot_id, 1);
2281 assert_eq!(result.metadata.current_snapshot_id, Some(1));
2282
2283 let result_after_remove = result
2285 .metadata
2286 .into_builder(Some(
2287 "s3://bucket/test/location/metadata/metadata2.json".to_string(),
2288 ))
2289 .remove_ref(MAIN_BRANCH)
2290 .build()
2291 .unwrap();
2292
2293 assert_eq!(result_after_remove.metadata.snapshot_log.len(), 1);
2295 assert_eq!(result_after_remove.metadata.snapshot_log[0].snapshot_id, 1);
2296 assert_eq!(result_after_remove.metadata.current_snapshot_id, None);
2297 assert_eq!(result_after_remove.changes.len(), 1);
2298 assert_eq!(
2299 result_after_remove.changes[0],
2300 TableUpdate::RemoveSnapshotRef {
2301 ref_name: MAIN_BRANCH.to_string()
2302 }
2303 );
2304 }
2305
2306 #[test]
2307 fn test_set_branch_snapshot_creates_branch_if_not_exists() {
2308 let builder = builder_without_changes(FormatVersion::V2);
2309
2310 let snapshot = Snapshot::builder()
2311 .with_snapshot_id(2)
2312 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2313 .with_sequence_number(0)
2314 .with_schema_id(0)
2315 .with_manifest_list("/snap-1.avro")
2316 .with_summary(Summary {
2317 operation: Operation::Append,
2318 additional_properties: HashMap::new(),
2319 })
2320 .build();
2321
2322 let build_result = builder
2323 .set_branch_snapshot(snapshot.clone(), "new_branch")
2324 .unwrap()
2325 .build()
2326 .unwrap();
2327
2328 let reference = SnapshotReference {
2329 snapshot_id: 2,
2330 retention: SnapshotRetention::Branch {
2331 min_snapshots_to_keep: None,
2332 max_snapshot_age_ms: None,
2333 max_ref_age_ms: None,
2334 },
2335 };
2336
2337 assert_eq!(build_result.metadata.refs.len(), 1);
2338 assert_eq!(
2339 build_result.metadata.refs.get("new_branch"),
2340 Some(&reference)
2341 );
2342 assert_eq!(build_result.changes, vec![
2343 TableUpdate::AddSnapshot { snapshot },
2344 TableUpdate::SetSnapshotRef {
2345 ref_name: "new_branch".to_string(),
2346 reference
2347 }
2348 ]);
2349 }
2350
2351 #[test]
2352 fn test_cannot_add_duplicate_snapshot_id() {
2353 let builder = builder_without_changes(FormatVersion::V2);
2354
2355 let snapshot = Snapshot::builder()
2356 .with_snapshot_id(2)
2357 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2358 .with_sequence_number(0)
2359 .with_schema_id(0)
2360 .with_manifest_list("/snap-1.avro")
2361 .with_summary(Summary {
2362 operation: Operation::Append,
2363 additional_properties: HashMap::from_iter(vec![
2364 (
2365 "spark.app.id".to_string(),
2366 "local-1662532784305".to_string(),
2367 ),
2368 ("added-data-files".to_string(), "4".to_string()),
2369 ("added-records".to_string(), "4".to_string()),
2370 ("added-files-size".to_string(), "6001".to_string()),
2371 ]),
2372 })
2373 .build();
2374
2375 let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2376 builder.add_snapshot(snapshot).unwrap_err();
2377 }
2378
2379 #[test]
2380 fn test_add_incompatible_current_schema_fails() {
2381 let builder = builder_without_changes(FormatVersion::V2);
2382
2383 let added_schema = Schema::builder()
2384 .with_schema_id(1)
2385 .with_fields(vec![])
2386 .build()
2387 .unwrap();
2388
2389 let err = builder
2390 .add_current_schema(added_schema)
2391 .unwrap()
2392 .build()
2393 .unwrap_err();
2394
2395 assert!(
2396 err.to_string()
2397 .contains("Cannot find partition source field")
2398 );
2399 }
2400
2401 #[test]
2402 fn test_add_partition_spec_for_v1_requires_sequential_ids() {
2403 let builder = builder_without_changes(FormatVersion::V1);
2404
2405 let added_spec = UnboundPartitionSpec::builder()
2406 .with_spec_id(10)
2407 .add_partition_fields(vec![
2408 UnboundPartitionField {
2409 name: "y".to_string(),
2410 transform: Transform::Identity,
2411 source_id: 2,
2412 field_id: Some(1000),
2413 },
2414 UnboundPartitionField {
2415 name: "z".to_string(),
2416 transform: Transform::Identity,
2417 source_id: 3,
2418 field_id: Some(1002),
2419 },
2420 ])
2421 .unwrap()
2422 .build();
2423
2424 let err = builder.add_partition_spec(added_spec).unwrap_err();
2425 assert!(err.to_string().contains(
2426 "Cannot add partition spec with non-sequential field ids to format version 1 table"
2427 ));
2428 }
2429
2430 #[test]
2431 fn test_expire_metadata_log() {
2432 let builder = builder_without_changes(FormatVersion::V2);
2433 let metadata = builder
2434 .set_properties(HashMap::from_iter(vec![(
2435 TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
2436 "2".to_string(),
2437 )]))
2438 .unwrap()
2439 .build()
2440 .unwrap();
2441 assert_eq!(metadata.metadata.metadata_log.len(), 1);
2442 assert_eq!(metadata.expired_metadata_logs.len(), 0);
2443
2444 let metadata = metadata
2445 .metadata
2446 .into_builder(Some("path2".to_string()))
2447 .set_properties(HashMap::from_iter(vec![(
2448 "change_nr".to_string(),
2449 "1".to_string(),
2450 )]))
2451 .unwrap()
2452 .build()
2453 .unwrap();
2454
2455 assert_eq!(metadata.metadata.metadata_log.len(), 2);
2456 assert_eq!(metadata.expired_metadata_logs.len(), 0);
2457
2458 let metadata = metadata
2459 .metadata
2460 .into_builder(Some("path2".to_string()))
2461 .set_properties(HashMap::from_iter(vec![(
2462 "change_nr".to_string(),
2463 "2".to_string(),
2464 )]))
2465 .unwrap()
2466 .build()
2467 .unwrap();
2468 assert_eq!(metadata.metadata.metadata_log.len(), 2);
2469 assert_eq!(metadata.expired_metadata_logs.len(), 1);
2470 }
2471
2472 #[test]
2473 fn test_v2_sequence_number_cannot_decrease() {
2474 let builder = builder_without_changes(FormatVersion::V2);
2475
2476 let snapshot = Snapshot::builder()
2477 .with_snapshot_id(1)
2478 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2479 .with_sequence_number(1)
2480 .with_schema_id(0)
2481 .with_manifest_list("/snap-1")
2482 .with_summary(Summary {
2483 operation: Operation::Append,
2484 additional_properties: HashMap::new(),
2485 })
2486 .build();
2487
2488 let builder = builder
2489 .add_snapshot(snapshot.clone())
2490 .unwrap()
2491 .set_ref(MAIN_BRANCH, SnapshotReference {
2492 snapshot_id: 1,
2493 retention: SnapshotRetention::Branch {
2494 min_snapshots_to_keep: Some(10),
2495 max_snapshot_age_ms: None,
2496 max_ref_age_ms: None,
2497 },
2498 })
2499 .unwrap();
2500
2501 let snapshot = Snapshot::builder()
2502 .with_snapshot_id(2)
2503 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2504 .with_sequence_number(0)
2505 .with_schema_id(0)
2506 .with_manifest_list("/snap-0")
2507 .with_parent_snapshot_id(Some(1))
2508 .with_summary(Summary {
2509 operation: Operation::Append,
2510 additional_properties: HashMap::new(),
2511 })
2512 .build();
2513
2514 let err = builder
2515 .set_branch_snapshot(snapshot, MAIN_BRANCH)
2516 .unwrap_err();
2517 assert!(
2518 err.to_string()
2519 .contains("Cannot add snapshot with sequence number")
2520 );
2521 }
2522
2523 #[test]
2524 fn test_default_spec_cannot_be_removed() {
2525 let builder = builder_without_changes(FormatVersion::V2);
2526
2527 builder.remove_partition_specs(&[0]).unwrap_err();
2528 }
2529
2530 #[test]
2531 fn test_statistics() {
2532 let builder = builder_without_changes(FormatVersion::V2);
2533
2534 let statistics = StatisticsFile {
2535 snapshot_id: 3055729675574597004,
2536 statistics_path: "s3://a/b/stats.puffin".to_string(),
2537 file_size_in_bytes: 413,
2538 file_footer_size_in_bytes: 42,
2539 key_metadata: None,
2540 blob_metadata: vec![BlobMetadata {
2541 snapshot_id: 3055729675574597004,
2542 sequence_number: 1,
2543 fields: vec![1],
2544 r#type: "ndv".to_string(),
2545 properties: HashMap::new(),
2546 }],
2547 };
2548 let build_result = builder.set_statistics(statistics.clone()).build().unwrap();
2549
2550 assert_eq!(
2551 build_result.metadata.statistics,
2552 HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2553 );
2554 assert_eq!(build_result.changes, vec![TableUpdate::SetStatistics {
2555 statistics: statistics.clone()
2556 }]);
2557
2558 let builder = build_result.metadata.into_builder(None);
2560 let build_result = builder
2561 .remove_statistics(statistics.snapshot_id)
2562 .build()
2563 .unwrap();
2564
2565 assert_eq!(build_result.metadata.statistics.len(), 0);
2566 assert_eq!(build_result.changes, vec![TableUpdate::RemoveStatistics {
2567 snapshot_id: statistics.snapshot_id
2568 }]);
2569
2570 let builder = build_result.metadata.into_builder(None);
2572 let build_result = builder
2573 .remove_statistics(statistics.snapshot_id)
2574 .build()
2575 .unwrap();
2576 assert_eq!(build_result.metadata.statistics.len(), 0);
2577 assert_eq!(build_result.changes.len(), 0);
2578 }
2579
2580 #[test]
2581 fn test_add_partition_statistics() {
2582 let builder = builder_without_changes(FormatVersion::V2);
2583
2584 let statistics = PartitionStatisticsFile {
2585 snapshot_id: 3055729675574597004,
2586 statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2587 file_size_in_bytes: 43,
2588 };
2589
2590 let build_result = builder
2591 .set_partition_statistics(statistics.clone())
2592 .build()
2593 .unwrap();
2594 assert_eq!(
2595 build_result.metadata.partition_statistics,
2596 HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2597 );
2598 assert_eq!(build_result.changes, vec![
2599 TableUpdate::SetPartitionStatistics {
2600 partition_statistics: statistics.clone()
2601 }
2602 ]);
2603
2604 let builder = build_result.metadata.into_builder(None);
2606 let build_result = builder
2607 .remove_partition_statistics(statistics.snapshot_id)
2608 .build()
2609 .unwrap();
2610 assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2611 assert_eq!(build_result.changes, vec![
2612 TableUpdate::RemovePartitionStatistics {
2613 snapshot_id: statistics.snapshot_id
2614 }
2615 ]);
2616
2617 let builder = build_result.metadata.into_builder(None);
2619 let build_result = builder
2620 .remove_partition_statistics(statistics.snapshot_id)
2621 .build()
2622 .unwrap();
2623 assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2624 assert_eq!(build_result.changes.len(), 0);
2625 }
2626
2627 #[test]
2628 fn last_update_increased_for_property_only_update() {
2629 let builder = builder_without_changes(FormatVersion::V2);
2630
2631 let metadata = builder.build().unwrap().metadata;
2632 let last_updated_ms = metadata.last_updated_ms;
2633 sleep(std::time::Duration::from_millis(2));
2634
2635 let build_result = metadata
2636 .into_builder(Some(
2637 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2638 ))
2639 .set_properties(HashMap::from_iter(vec![(
2640 "foo".to_string(),
2641 "bar".to_string(),
2642 )]))
2643 .unwrap()
2644 .build()
2645 .unwrap();
2646
2647 assert!(
2648 build_result.metadata.last_updated_ms > last_updated_ms,
2649 "{} > {}",
2650 build_result.metadata.last_updated_ms,
2651 last_updated_ms
2652 );
2653 }
2654
2655 #[test]
2656 fn test_construct_default_main_branch() {
2657 let file = File::open(format!(
2659 "{}/testdata/table_metadata/{}",
2660 env!("CARGO_MANIFEST_DIR"),
2661 "TableMetadataV2Valid.json"
2662 ))
2663 .unwrap();
2664 let reader = BufReader::new(file);
2665 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2666
2667 let table = Table::builder()
2668 .metadata(resp)
2669 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2670 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2671 .file_io(FileIOBuilder::new("memory").build().unwrap())
2672 .build()
2673 .unwrap();
2674
2675 assert_eq!(
2676 table.metadata().refs.get(MAIN_BRANCH).unwrap().snapshot_id,
2677 table.metadata().current_snapshot_id().unwrap()
2678 );
2679 }
2680
2681 #[test]
2682 fn test_active_schema_cannot_be_removed() {
2683 let builder = builder_without_changes(FormatVersion::V2);
2684 builder.remove_schemas(&[0]).unwrap_err();
2685 }
2686
2687 #[test]
2688 fn test_remove_schemas() {
2689 let file = File::open(format!(
2690 "{}/testdata/table_metadata/{}",
2691 env!("CARGO_MANIFEST_DIR"),
2692 "TableMetadataV2Valid.json"
2693 ))
2694 .unwrap();
2695 let reader = BufReader::new(file);
2696 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2697
2698 let table = Table::builder()
2699 .metadata(resp)
2700 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2701 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2702 .file_io(FileIOBuilder::new("memory").build().unwrap())
2703 .build()
2704 .unwrap();
2705
2706 assert_eq!(2, table.metadata().schemas.len());
2707
2708 {
2709 let meta_data_builder = table.metadata().clone().into_builder(None);
2711 meta_data_builder.remove_schemas(&[1]).unwrap_err();
2712 }
2713
2714 let mut meta_data_builder = table.metadata().clone().into_builder(None);
2715 meta_data_builder = meta_data_builder.remove_schemas(&[0]).unwrap();
2716 let build_result = meta_data_builder.build().unwrap();
2717 assert_eq!(1, build_result.metadata.schemas.len());
2718 assert_eq!(1, build_result.metadata.current_schema_id);
2719 assert_eq!(1, build_result.metadata.current_schema().schema_id());
2720 assert_eq!(1, build_result.changes.len());
2721
2722 let remove_schema_ids =
2723 if let TableUpdate::RemoveSchemas { schema_ids } = &build_result.changes[0] {
2724 schema_ids
2725 } else {
2726 unreachable!("Expected RemoveSchema change")
2727 };
2728 assert_eq!(remove_schema_ids, &[0]);
2729 }
2730
2731 #[test]
2732 fn test_schema_evolution_now_correctly_validates_partition_field_name_conflicts() {
2733 let initial_schema = Schema::builder()
2734 .with_fields(vec![
2735 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2736 ])
2737 .build()
2738 .unwrap();
2739
2740 let partition_spec_with_bucket = UnboundPartitionSpec::builder()
2741 .with_spec_id(0)
2742 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2743 .unwrap()
2744 .build();
2745
2746 let metadata = TableMetadataBuilder::new(
2747 initial_schema,
2748 partition_spec_with_bucket,
2749 SortOrder::unsorted_order(),
2750 TEST_LOCATION.to_string(),
2751 FormatVersion::V2,
2752 HashMap::new(),
2753 )
2754 .unwrap()
2755 .build()
2756 .unwrap()
2757 .metadata;
2758
2759 let partition_field_names: Vec<String> = metadata
2760 .default_partition_spec()
2761 .fields()
2762 .iter()
2763 .map(|f| f.name.clone())
2764 .collect();
2765 assert!(partition_field_names.contains(&"bucket_data".to_string()));
2766
2767 let evolved_schema = Schema::builder()
2768 .with_fields(vec![
2769 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2770 NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
2772 ])
2773 .build()
2774 .unwrap();
2775
2776 let builder = metadata.into_builder(Some(
2777 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2778 ));
2779
2780 let result = builder.add_current_schema(evolved_schema);
2782
2783 assert!(result.is_err());
2784 let error = result.unwrap_err();
2785 let error_message = error.message();
2786 assert!(error_message.contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2787 assert!(error_message.contains("Schema evolution cannot introduce field names that match existing partition field names"));
2788 }
2789
2790 #[test]
2791 fn test_schema_evolution_should_validate_on_schema_add_not_metadata_build() {
2792 let initial_schema = Schema::builder()
2793 .with_fields(vec![
2794 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2795 ])
2796 .build()
2797 .unwrap();
2798
2799 let partition_spec = UnboundPartitionSpec::builder()
2800 .with_spec_id(0)
2801 .add_partition_field(1, "partition_col", Transform::Bucket(16))
2802 .unwrap()
2803 .build();
2804
2805 let metadata = TableMetadataBuilder::new(
2806 initial_schema,
2807 partition_spec,
2808 SortOrder::unsorted_order(),
2809 TEST_LOCATION.to_string(),
2810 FormatVersion::V2,
2811 HashMap::new(),
2812 )
2813 .unwrap()
2814 .build()
2815 .unwrap()
2816 .metadata;
2817
2818 let non_conflicting_schema = Schema::builder()
2819 .with_fields(vec![
2820 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2821 NestedField::required(2, "new_field", Type::Primitive(PrimitiveType::Int)).into(),
2822 ])
2823 .build()
2824 .unwrap();
2825
2826 let result = metadata
2828 .clone()
2829 .into_builder(Some("test_location".to_string()))
2830 .add_current_schema(non_conflicting_schema)
2831 .unwrap()
2832 .build();
2833
2834 assert!(result.is_ok());
2835 }
2836
2837 #[test]
2838 fn test_partition_spec_evolution_validates_schema_field_name_conflicts() {
2839 let initial_schema = Schema::builder()
2840 .with_fields(vec![
2841 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2842 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2843 .into(),
2844 ])
2845 .build()
2846 .unwrap();
2847
2848 let partition_spec = UnboundPartitionSpec::builder()
2849 .with_spec_id(0)
2850 .add_partition_field(1, "data_bucket", Transform::Bucket(16))
2851 .unwrap()
2852 .build();
2853
2854 let metadata = TableMetadataBuilder::new(
2855 initial_schema,
2856 partition_spec,
2857 SortOrder::unsorted_order(),
2858 TEST_LOCATION.to_string(),
2859 FormatVersion::V2,
2860 HashMap::new(),
2861 )
2862 .unwrap()
2863 .build()
2864 .unwrap()
2865 .metadata;
2866
2867 let builder = metadata.into_builder(Some(
2868 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2869 ));
2870
2871 let conflicting_partition_spec = UnboundPartitionSpec::builder()
2872 .with_spec_id(1)
2873 .add_partition_field(1, "existing_field", Transform::Bucket(8))
2874 .unwrap()
2875 .build();
2876
2877 let result = builder.add_partition_spec(conflicting_partition_spec);
2878
2879 assert!(result.is_err());
2880 let error = result.unwrap_err();
2881 let error_message = error.message();
2882 assert!(error_message.contains(
2884 "Cannot create partition with name 'existing_field' that conflicts with schema field"
2885 ));
2886 assert!(error_message.contains("and is not an identity transform"));
2887 }
2888
2889 #[test]
2890 fn test_schema_evolution_validates_against_all_historical_schemas() {
2891 let initial_schema = Schema::builder()
2893 .with_fields(vec![
2894 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2895 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2896 .into(),
2897 ])
2898 .build()
2899 .unwrap();
2900
2901 let partition_spec = UnboundPartitionSpec::builder()
2902 .with_spec_id(0)
2903 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2904 .unwrap()
2905 .build();
2906
2907 let metadata = TableMetadataBuilder::new(
2908 initial_schema,
2909 partition_spec,
2910 SortOrder::unsorted_order(),
2911 TEST_LOCATION.to_string(),
2912 FormatVersion::V2,
2913 HashMap::new(),
2914 )
2915 .unwrap()
2916 .build()
2917 .unwrap()
2918 .metadata;
2919
2920 let second_schema = Schema::builder()
2922 .with_fields(vec![
2923 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2924 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2925 .into(),
2926 ])
2927 .build()
2928 .unwrap();
2929
2930 let metadata = metadata
2931 .into_builder(Some("test_location".to_string()))
2932 .add_current_schema(second_schema)
2933 .unwrap()
2934 .build()
2935 .unwrap()
2936 .metadata;
2937
2938 let third_schema = Schema::builder()
2942 .with_fields(vec![
2943 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2944 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2945 .into(),
2946 NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2947 .into(),
2948 ])
2949 .build()
2950 .unwrap();
2951
2952 let builder = metadata
2953 .clone()
2954 .into_builder(Some("test_location".to_string()));
2955
2956 let result = builder.add_current_schema(third_schema);
2958 assert!(result.is_ok());
2959
2960 let conflicting_schema = Schema::builder()
2963 .with_fields(vec![
2964 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2965 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2966 .into(),
2967 NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2968 .into(),
2969 NestedField::required(5, "bucket_data", Type::Primitive(PrimitiveType::String))
2970 .into(), ])
2972 .build()
2973 .unwrap();
2974
2975 let builder2 = metadata.into_builder(Some("test_location".to_string()));
2976 let result2 = builder2.add_current_schema(conflicting_schema);
2977
2978 assert!(result2.is_err());
2981 let error = result2.unwrap_err();
2982 assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2983 }
2984
2985 #[test]
2986 fn test_schema_evolution_allows_existing_partition_field_if_exists_in_historical_schema() {
2987 let initial_schema = Schema::builder()
2989 .with_fields(vec![
2990 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2991 NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
2992 .into(),
2993 ])
2994 .build()
2995 .unwrap();
2996
2997 let partition_spec = UnboundPartitionSpec::builder()
2998 .with_spec_id(0)
2999 .add_partition_field(2, "partition_data", Transform::Identity)
3000 .unwrap()
3001 .build();
3002
3003 let metadata = TableMetadataBuilder::new(
3004 initial_schema,
3005 partition_spec,
3006 SortOrder::unsorted_order(),
3007 TEST_LOCATION.to_string(),
3008 FormatVersion::V2,
3009 HashMap::new(),
3010 )
3011 .unwrap()
3012 .build()
3013 .unwrap()
3014 .metadata;
3015
3016 let evolved_schema = Schema::builder()
3018 .with_fields(vec![
3019 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3020 NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
3021 .into(),
3022 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
3023 .into(),
3024 ])
3025 .build()
3026 .unwrap();
3027
3028 let result = metadata
3030 .into_builder(Some("test_location".to_string()))
3031 .add_current_schema(evolved_schema);
3032
3033 assert!(result.is_ok());
3034 }
3035
3036 #[test]
3037 fn test_schema_evolution_prevents_new_field_conflicting_with_partition_field() {
3038 let initial_schema = Schema::builder()
3040 .with_fields(vec![
3041 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3042 ])
3043 .build()
3044 .unwrap();
3045
3046 let partition_spec = UnboundPartitionSpec::builder()
3047 .with_spec_id(0)
3048 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
3049 .unwrap()
3050 .build();
3051
3052 let metadata = TableMetadataBuilder::new(
3053 initial_schema,
3054 partition_spec,
3055 SortOrder::unsorted_order(),
3056 TEST_LOCATION.to_string(),
3057 FormatVersion::V2,
3058 HashMap::new(),
3059 )
3060 .unwrap()
3061 .build()
3062 .unwrap()
3063 .metadata;
3064
3065 let conflicting_schema = Schema::builder()
3067 .with_fields(vec![
3068 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3069 NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
3071 ])
3072 .build()
3073 .unwrap();
3074
3075 let builder = metadata.into_builder(Some("test_location".to_string()));
3076 let result = builder.add_current_schema(conflicting_schema);
3077
3078 assert!(result.is_err());
3081 let error = result.unwrap_err();
3082 assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
3083 }
3084
3085 #[test]
3086 fn test_partition_spec_evolution_allows_non_conflicting_names() {
3087 let initial_schema = Schema::builder()
3088 .with_fields(vec![
3089 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3090 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
3091 .into(),
3092 ])
3093 .build()
3094 .unwrap();
3095
3096 let partition_spec = UnboundPartitionSpec::builder()
3097 .with_spec_id(0)
3098 .add_partition_field(1, "data_bucket", Transform::Bucket(16))
3099 .unwrap()
3100 .build();
3101
3102 let metadata = TableMetadataBuilder::new(
3103 initial_schema,
3104 partition_spec,
3105 SortOrder::unsorted_order(),
3106 TEST_LOCATION.to_string(),
3107 FormatVersion::V2,
3108 HashMap::new(),
3109 )
3110 .unwrap()
3111 .build()
3112 .unwrap()
3113 .metadata;
3114
3115 let builder = metadata.into_builder(Some(
3116 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3117 ));
3118
3119 let non_conflicting_partition_spec = UnboundPartitionSpec::builder()
3121 .with_spec_id(1)
3122 .add_partition_field(2, "new_partition_field", Transform::Bucket(8))
3123 .unwrap()
3124 .build();
3125
3126 let result = builder.add_partition_spec(non_conflicting_partition_spec);
3127
3128 assert!(result.is_ok());
3129 }
3130
3131 #[test]
3132 fn test_row_lineage_addition() {
3133 let new_rows = 30;
3134 let base = builder_without_changes(FormatVersion::V3)
3135 .build()
3136 .unwrap()
3137 .metadata;
3138 let add_rows = Snapshot::builder()
3139 .with_snapshot_id(0)
3140 .with_timestamp_ms(base.last_updated_ms + 1)
3141 .with_sequence_number(0)
3142 .with_schema_id(0)
3143 .with_manifest_list("foo")
3144 .with_parent_snapshot_id(None)
3145 .with_summary(Summary {
3146 operation: Operation::Append,
3147 additional_properties: HashMap::new(),
3148 })
3149 .with_row_range(base.next_row_id(), new_rows)
3150 .build();
3151
3152 let first_addition = base
3153 .into_builder(None)
3154 .add_snapshot(add_rows.clone())
3155 .unwrap()
3156 .build()
3157 .unwrap()
3158 .metadata;
3159
3160 assert_eq!(first_addition.next_row_id(), new_rows);
3161
3162 let add_more_rows = Snapshot::builder()
3163 .with_snapshot_id(1)
3164 .with_timestamp_ms(first_addition.last_updated_ms + 1)
3165 .with_sequence_number(1)
3166 .with_schema_id(0)
3167 .with_manifest_list("foo")
3168 .with_parent_snapshot_id(Some(0))
3169 .with_summary(Summary {
3170 operation: Operation::Append,
3171 additional_properties: HashMap::new(),
3172 })
3173 .with_row_range(first_addition.next_row_id(), new_rows)
3174 .build();
3175
3176 let second_addition = first_addition
3177 .into_builder(None)
3178 .add_snapshot(add_more_rows)
3179 .unwrap()
3180 .build()
3181 .unwrap()
3182 .metadata;
3183 assert_eq!(second_addition.next_row_id(), new_rows * 2);
3184 }
3185
3186 #[test]
3187 fn test_row_lineage_invalid_snapshot() {
3188 let new_rows = 30;
3189 let base = builder_without_changes(FormatVersion::V3)
3190 .build()
3191 .unwrap()
3192 .metadata;
3193
3194 let add_rows = Snapshot::builder()
3196 .with_snapshot_id(0)
3197 .with_timestamp_ms(base.last_updated_ms + 1)
3198 .with_sequence_number(0)
3199 .with_schema_id(0)
3200 .with_manifest_list("foo")
3201 .with_parent_snapshot_id(None)
3202 .with_summary(Summary {
3203 operation: Operation::Append,
3204 additional_properties: HashMap::new(),
3205 })
3206 .with_row_range(base.next_row_id(), new_rows)
3207 .build();
3208
3209 let added = base
3210 .into_builder(None)
3211 .add_snapshot(add_rows)
3212 .unwrap()
3213 .build()
3214 .unwrap()
3215 .metadata;
3216
3217 let invalid_new_rows = Snapshot::builder()
3218 .with_snapshot_id(1)
3219 .with_timestamp_ms(added.last_updated_ms + 1)
3220 .with_sequence_number(1)
3221 .with_schema_id(0)
3222 .with_manifest_list("foo")
3223 .with_parent_snapshot_id(Some(0))
3224 .with_summary(Summary {
3225 operation: Operation::Append,
3226 additional_properties: HashMap::new(),
3227 })
3228 .with_row_range(added.next_row_id() - 1, 10)
3230 .build();
3231
3232 let err = added
3233 .into_builder(None)
3234 .add_snapshot(invalid_new_rows)
3235 .unwrap_err();
3236 assert!(
3237 err.to_string().contains(
3238 "Cannot add a snapshot, first-row-id is behind table next-row-id: 29 < 30"
3239 )
3240 );
3241 }
3242
3243 #[test]
3244 fn test_row_lineage_append_branch() {
3245 let branch = "some_branch";
3249
3250 let base = builder_without_changes(FormatVersion::V3)
3252 .build()
3253 .unwrap()
3254 .metadata;
3255
3256 assert_eq!(base.next_row_id(), 0);
3258
3259 let branch_snapshot_1 = Snapshot::builder()
3261 .with_snapshot_id(1)
3262 .with_timestamp_ms(base.last_updated_ms + 1)
3263 .with_sequence_number(0)
3264 .with_schema_id(0)
3265 .with_manifest_list("foo")
3266 .with_parent_snapshot_id(None)
3267 .with_summary(Summary {
3268 operation: Operation::Append,
3269 additional_properties: HashMap::new(),
3270 })
3271 .with_row_range(base.next_row_id(), 30)
3272 .build();
3273
3274 let table_after_branch_1 = base
3275 .into_builder(None)
3276 .set_branch_snapshot(branch_snapshot_1.clone(), branch)
3277 .unwrap()
3278 .build()
3279 .unwrap()
3280 .metadata;
3281
3282 assert!(table_after_branch_1.current_snapshot().is_none());
3284
3285 let branch_ref = table_after_branch_1.refs.get(branch).unwrap();
3287 let branch_snap_1 = table_after_branch_1
3288 .snapshots
3289 .get(&branch_ref.snapshot_id)
3290 .unwrap();
3291 assert_eq!(branch_snap_1.first_row_id(), Some(0));
3292
3293 assert_eq!(table_after_branch_1.next_row_id(), 30);
3295
3296 let main_snapshot = Snapshot::builder()
3298 .with_snapshot_id(2)
3299 .with_timestamp_ms(table_after_branch_1.last_updated_ms + 1)
3300 .with_sequence_number(1)
3301 .with_schema_id(0)
3302 .with_manifest_list("bar")
3303 .with_parent_snapshot_id(None)
3304 .with_summary(Summary {
3305 operation: Operation::Append,
3306 additional_properties: HashMap::new(),
3307 })
3308 .with_row_range(table_after_branch_1.next_row_id(), 28)
3309 .build();
3310
3311 let table_after_main = table_after_branch_1
3312 .into_builder(None)
3313 .add_snapshot(main_snapshot.clone())
3314 .unwrap()
3315 .set_ref(MAIN_BRANCH, SnapshotReference {
3316 snapshot_id: main_snapshot.snapshot_id(),
3317 retention: SnapshotRetention::Branch {
3318 min_snapshots_to_keep: None,
3319 max_snapshot_age_ms: None,
3320 max_ref_age_ms: None,
3321 },
3322 })
3323 .unwrap()
3324 .build()
3325 .unwrap()
3326 .metadata;
3327
3328 let current_snapshot = table_after_main.current_snapshot().unwrap();
3330 assert_eq!(current_snapshot.first_row_id(), Some(30));
3331
3332 assert_eq!(table_after_main.next_row_id(), 58);
3334
3335 let branch_snapshot_2 = Snapshot::builder()
3337 .with_snapshot_id(3)
3338 .with_timestamp_ms(table_after_main.last_updated_ms + 1)
3339 .with_sequence_number(2)
3340 .with_schema_id(0)
3341 .with_manifest_list("baz")
3342 .with_parent_snapshot_id(Some(branch_snapshot_1.snapshot_id()))
3343 .with_summary(Summary {
3344 operation: Operation::Append,
3345 additional_properties: HashMap::new(),
3346 })
3347 .with_row_range(table_after_main.next_row_id(), 21)
3348 .build();
3349
3350 let table_after_branch_2 = table_after_main
3351 .into_builder(None)
3352 .set_branch_snapshot(branch_snapshot_2.clone(), branch)
3353 .unwrap()
3354 .build()
3355 .unwrap()
3356 .metadata;
3357
3358 let branch_ref_2 = table_after_branch_2.refs.get(branch).unwrap();
3360 let branch_snap_2 = table_after_branch_2
3361 .snapshots
3362 .get(&branch_ref_2.snapshot_id)
3363 .unwrap();
3364 assert_eq!(branch_snap_2.first_row_id(), Some(58));
3365
3366 assert_eq!(table_after_branch_2.next_row_id(), 79);
3368 }
3369
3370 #[test]
3371 fn test_encryption_keys() {
3372 let builder = builder_without_changes(FormatVersion::V2);
3373
3374 let encryption_key_1 = EncryptedKey::builder()
3376 .key_id("key-1")
3377 .encrypted_key_metadata(vec![1, 2, 3, 4])
3378 .encrypted_by_id("encryption-service-1")
3379 .properties(HashMap::from_iter(vec![(
3380 "algorithm".to_string(),
3381 "AES-256".to_string(),
3382 )]))
3383 .build();
3384
3385 let encryption_key_2 = EncryptedKey::builder()
3386 .key_id("key-2")
3387 .encrypted_key_metadata(vec![5, 6, 7, 8])
3388 .encrypted_by_id("encryption-service-2")
3389 .properties(HashMap::new())
3390 .build();
3391
3392 let build_result = builder
3394 .add_encryption_key(encryption_key_1.clone())
3395 .build()
3396 .unwrap();
3397
3398 assert_eq!(build_result.changes.len(), 1);
3399 assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3400 assert_eq!(
3401 build_result.metadata.encryption_key("key-1"),
3402 Some(&encryption_key_1)
3403 );
3404 assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3405 encryption_key: encryption_key_1.clone()
3406 });
3407
3408 let build_result = build_result
3410 .metadata
3411 .into_builder(Some(
3412 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3413 ))
3414 .add_encryption_key(encryption_key_2.clone())
3415 .build()
3416 .unwrap();
3417
3418 assert_eq!(build_result.changes.len(), 1);
3419 assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3420 assert_eq!(
3421 build_result.metadata.encryption_key("key-1"),
3422 Some(&encryption_key_1)
3423 );
3424 assert_eq!(
3425 build_result.metadata.encryption_key("key-2"),
3426 Some(&encryption_key_2)
3427 );
3428 assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3429 encryption_key: encryption_key_2.clone()
3430 });
3431
3432 let build_result = build_result
3434 .metadata
3435 .into_builder(Some(
3436 "s3://bucket/test/location/metadata/metadata2.json".to_string(),
3437 ))
3438 .add_encryption_key(encryption_key_1.clone())
3439 .build()
3440 .unwrap();
3441
3442 assert_eq!(build_result.changes.len(), 0);
3443 assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3444
3445 let build_result = build_result
3447 .metadata
3448 .into_builder(Some(
3449 "s3://bucket/test/location/metadata/metadata3.json".to_string(),
3450 ))
3451 .remove_encryption_key("key-1")
3452 .build()
3453 .unwrap();
3454
3455 assert_eq!(build_result.changes.len(), 1);
3456 assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3457 assert_eq!(build_result.metadata.encryption_key("key-1"), None);
3458 assert_eq!(
3459 build_result.metadata.encryption_key("key-2"),
3460 Some(&encryption_key_2)
3461 );
3462 assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3463 key_id: "key-1".to_string()
3464 });
3465
3466 let build_result = build_result
3468 .metadata
3469 .into_builder(Some(
3470 "s3://bucket/test/location/metadata/metadata4.json".to_string(),
3471 ))
3472 .remove_encryption_key("non-existent-key")
3473 .build()
3474 .unwrap();
3475
3476 assert_eq!(build_result.changes.len(), 0);
3477 assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3478
3479 let keys = build_result
3481 .metadata
3482 .encryption_keys_iter()
3483 .collect::<Vec<_>>();
3484 assert_eq!(keys.len(), 1);
3485 assert_eq!(keys[0], &encryption_key_2);
3486
3487 let build_result = build_result
3489 .metadata
3490 .into_builder(Some(
3491 "s3://bucket/test/location/metadata/metadata5.json".to_string(),
3492 ))
3493 .remove_encryption_key("key-2")
3494 .build()
3495 .unwrap();
3496
3497 assert_eq!(build_result.changes.len(), 1);
3498 assert_eq!(build_result.metadata.encryption_keys.len(), 0);
3499 assert_eq!(build_result.metadata.encryption_key("key-2"), None);
3500 assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3501 key_id: "key-2".to_string()
3502 });
3503
3504 let keys = build_result.metadata.encryption_keys_iter();
3506 assert_eq!(keys.len(), 0);
3507 }
3508}