1use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use uuid::Uuid;
22
23use super::{
24 DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, FormatVersion, MAIN_BRANCH, MetadataLog,
25 ONE_MINUTE_MS, PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX,
26 PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT, PartitionSpec, PartitionSpecBuilder,
27 PartitionStatisticsFile, RESERVED_PROPERTIES, Schema, SchemaRef, Snapshot, SnapshotLog,
28 SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
29 TableMetadata, UNPARTITIONED_LAST_ASSIGNED_ID, UnboundPartitionSpec,
30};
31use crate::error::{Error, ErrorKind, Result};
32use crate::{TableCreation, TableUpdate};
33
34const FIRST_FIELD_ID: u32 = 1;
35
36#[derive(Debug, Clone)]
49pub struct TableMetadataBuilder {
50 metadata: TableMetadata,
51 changes: Vec<TableUpdate>,
52 last_added_schema_id: Option<i32>,
53 last_added_spec_id: Option<i32>,
54 last_added_order_id: Option<i64>,
55 previous_history_entry: Option<MetadataLog>,
57 last_updated_ms: Option<i64>,
58}
59
60#[derive(Debug, Clone, PartialEq)]
61pub struct TableMetadataBuildResult {
63 pub metadata: TableMetadata,
65 pub changes: Vec<TableUpdate>,
67 pub expired_metadata_logs: Vec<MetadataLog>,
69}
70
71impl TableMetadataBuilder {
72 pub const LAST_ADDED: i32 = -1;
74
75 pub fn new(
80 schema: Schema,
81 spec: impl Into<UnboundPartitionSpec>,
82 sort_order: SortOrder,
83 location: String,
84 format_version: FormatVersion,
85 properties: HashMap<String, String>,
86 ) -> Result<Self> {
87 let (fresh_schema, fresh_spec, fresh_sort_order) =
89 Self::reassign_ids(schema, spec.into(), sort_order)?;
90 let schema_id = fresh_schema.schema_id();
91
92 let builder = Self {
93 metadata: TableMetadata {
94 format_version,
95 table_uuid: Uuid::now_v7(),
96 location: "".to_string(), last_sequence_number: 0,
98 last_updated_ms: 0, last_column_id: -1, current_schema_id: -1, schemas: HashMap::new(),
102 partition_specs: HashMap::new(),
103 default_spec: Arc::new(
104 PartitionSpec::unpartition_spec().with_spec_id(-1),
110 ), default_partition_type: StructType::new(vec![]),
112 last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
113 properties: HashMap::new(),
114 current_snapshot_id: None,
115 snapshots: HashMap::new(),
116 snapshot_log: vec![],
117 sort_orders: HashMap::new(),
118 metadata_log: vec![],
119 default_sort_order_id: -1, refs: HashMap::default(),
121 statistics: HashMap::new(),
122 partition_statistics: HashMap::new(),
123 encryption_keys: HashMap::new(),
124 },
125 last_updated_ms: None,
126 changes: vec![],
127 last_added_schema_id: Some(schema_id),
128 last_added_spec_id: None,
129 last_added_order_id: None,
130 previous_history_entry: None,
131 };
132
133 builder
134 .set_location(location)
135 .add_current_schema(fresh_schema)?
136 .add_default_partition_spec(fresh_spec.into_unbound())?
137 .add_default_sort_order(fresh_sort_order)?
138 .set_properties(properties)
139 }
140
141 #[must_use]
147 pub fn new_from_metadata(
148 previous: TableMetadata,
149 current_file_location: Option<String>,
150 ) -> Self {
151 Self {
152 previous_history_entry: current_file_location.map(|l| MetadataLog {
153 metadata_file: l,
154 timestamp_ms: previous.last_updated_ms,
155 }),
156 metadata: previous,
157 changes: Vec::default(),
158 last_added_schema_id: None,
159 last_added_spec_id: None,
160 last_added_order_id: None,
161 last_updated_ms: None,
162 }
163 }
164
165 pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
167 let TableCreation {
168 name: _,
169 location,
170 schema,
171 partition_spec,
172 sort_order,
173 properties,
174 } = table_creation;
175
176 let location = location.ok_or_else(|| {
177 Error::new(
178 ErrorKind::DataInvalid,
179 "Can't create table without location",
180 )
181 })?;
182 let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
183 spec_id: None,
184 fields: vec![],
185 });
186
187 Self::new(
188 schema,
189 partition_spec,
190 sort_order.unwrap_or(SortOrder::unsorted_order()),
191 location,
192 FormatVersion::V2,
193 properties,
194 )
195 }
196
197 pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
199 if self.metadata.table_uuid != uuid {
200 self.metadata.table_uuid = uuid;
201 self.changes.push(TableUpdate::AssignUuid { uuid });
202 }
203
204 self
205 }
206
207 pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> Result<Self> {
212 if format_version < self.metadata.format_version {
213 return Err(Error::new(
214 ErrorKind::DataInvalid,
215 format!(
216 "Cannot downgrade FormatVersion from {} to {}",
217 self.metadata.format_version, format_version
218 ),
219 ));
220 }
221
222 if format_version != self.metadata.format_version {
223 match format_version {
224 FormatVersion::V1 => {
225 }
227 FormatVersion::V2 => {
228 self.metadata.format_version = format_version;
229 self.changes
230 .push(TableUpdate::UpgradeFormatVersion { format_version });
231 }
232 }
233 }
234
235 Ok(self)
236 }
237
238 pub fn set_properties(mut self, properties: HashMap<String, String>) -> Result<Self> {
247 let reserved_properties = properties
249 .keys()
250 .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
251 .map(ToString::to_string)
252 .collect::<Vec<_>>();
253
254 if !reserved_properties.is_empty() {
255 return Err(Error::new(
256 ErrorKind::DataInvalid,
257 format!(
258 "Table properties should not contain reserved properties, but got: [{}]",
259 reserved_properties.join(", ")
260 ),
261 ));
262 }
263
264 if properties.is_empty() {
265 return Ok(self);
266 }
267
268 self.metadata.properties.extend(properties.clone());
269 self.changes.push(TableUpdate::SetProperties {
270 updates: properties,
271 });
272
273 Ok(self)
274 }
275
276 pub fn remove_properties(mut self, properties: &[String]) -> Result<Self> {
282 let properties = properties.iter().cloned().collect::<HashSet<_>>();
284
285 let reserved_properties = properties
287 .iter()
288 .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
289 .map(ToString::to_string)
290 .collect::<Vec<_>>();
291
292 if !reserved_properties.is_empty() {
293 return Err(Error::new(
294 ErrorKind::DataInvalid,
295 format!(
296 "Table properties to remove contain reserved properties: [{}]",
297 reserved_properties.join(", ")
298 ),
299 ));
300 }
301
302 for property in &properties {
303 self.metadata.properties.remove(property);
304 }
305
306 if !properties.is_empty() {
307 self.changes.push(TableUpdate::RemoveProperties {
308 removals: properties.into_iter().collect(),
309 });
310 }
311
312 Ok(self)
313 }
314
315 pub fn set_location(mut self, location: String) -> Self {
317 let location = location.trim_end_matches('/').to_string();
318 if self.metadata.location != location {
319 self.changes.push(TableUpdate::SetLocation {
320 location: location.clone(),
321 });
322 self.metadata.location = location;
323 }
324
325 self
326 }
327
328 pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
334 if self
335 .metadata
336 .snapshots
337 .contains_key(&snapshot.snapshot_id())
338 {
339 return Err(Error::new(
340 ErrorKind::DataInvalid,
341 format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()),
342 ));
343 }
344
345 if self.metadata.format_version != FormatVersion::V1
346 && snapshot.sequence_number() <= self.metadata.last_sequence_number
347 && snapshot.parent_snapshot_id().is_some()
348 {
349 return Err(Error::new(
350 ErrorKind::DataInvalid,
351 format!(
352 "Cannot add snapshot with sequence number {} older than last sequence number {}",
353 snapshot.sequence_number(),
354 self.metadata.last_sequence_number
355 ),
356 ));
357 }
358
359 if let Some(last) = self.metadata.snapshot_log.last() {
360 if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
363 return Err(Error::new(
364 ErrorKind::DataInvalid,
365 format!(
366 "Invalid snapshot timestamp {}: before last snapshot timestamp {}",
367 snapshot.timestamp_ms(),
368 last.timestamp_ms
369 ),
370 ));
371 }
372 }
373
374 let max_last_updated = self
375 .last_updated_ms
376 .unwrap_or_default()
377 .max(self.metadata.last_updated_ms);
378 if snapshot.timestamp_ms() - max_last_updated < -ONE_MINUTE_MS {
379 return Err(Error::new(
380 ErrorKind::DataInvalid,
381 format!(
382 "Invalid snapshot timestamp {}: before last updated timestamp {}",
383 snapshot.timestamp_ms(),
384 max_last_updated
385 ),
386 ));
387 }
388
389 self.changes.push(TableUpdate::AddSnapshot {
391 snapshot: snapshot.clone(),
392 });
393
394 self.last_updated_ms = Some(snapshot.timestamp_ms());
395 self.metadata.last_sequence_number = snapshot.sequence_number();
396 self.metadata
397 .snapshots
398 .insert(snapshot.snapshot_id(), snapshot.into());
399
400 Ok(self)
401 }
402
403 pub fn set_branch_snapshot(self, snapshot: Snapshot, branch: &str) -> Result<Self> {
409 let reference = self.metadata.refs.get(branch).cloned();
410
411 let reference = if let Some(mut reference) = reference {
412 if !reference.is_branch() {
413 return Err(Error::new(
414 ErrorKind::DataInvalid,
415 format!("Cannot append snapshot to non-branch reference '{branch}'",),
416 ));
417 }
418
419 reference.snapshot_id = snapshot.snapshot_id();
420 reference
421 } else {
422 SnapshotReference {
423 snapshot_id: snapshot.snapshot_id(),
424 retention: SnapshotRetention::Branch {
425 min_snapshots_to_keep: None,
426 max_snapshot_age_ms: None,
427 max_ref_age_ms: None,
428 },
429 }
430 };
431
432 self.add_snapshot(snapshot)?.set_ref(branch, reference)
433 }
434
435 pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
439 let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
440
441 self.metadata.snapshots.retain(|k, _| {
442 if snapshot_ids.contains(k) {
443 removed_snapshots.push(*k);
444 false
445 } else {
446 true
447 }
448 });
449
450 if !removed_snapshots.is_empty() {
451 self.changes.push(TableUpdate::RemoveSnapshots {
452 snapshot_ids: removed_snapshots,
453 });
454 }
455
456 self.metadata
458 .refs
459 .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id));
460
461 self
462 }
463
464 pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> Result<Self> {
469 if self
470 .metadata
471 .refs
472 .get(ref_name)
473 .is_some_and(|snap_ref| snap_ref.eq(&reference))
474 {
475 return Ok(self);
476 }
477
478 let Some(snapshot) = self.metadata.snapshots.get(&reference.snapshot_id) else {
479 return Err(Error::new(
480 ErrorKind::DataInvalid,
481 format!(
482 "Cannot set '{ref_name}' to unknown snapshot: '{}'",
483 reference.snapshot_id
484 ),
485 ));
486 };
487
488 let is_added_snapshot = self.changes.iter().any(|update| {
490 matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if snap.snapshot_id() == snapshot.snapshot_id())
491 });
492 if is_added_snapshot {
493 self.last_updated_ms = Some(snapshot.timestamp_ms());
494 }
495
496 if ref_name == MAIN_BRANCH {
498 self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
499 let timestamp_ms = if let Some(last_updated_ms) = self.last_updated_ms {
500 last_updated_ms
501 } else {
502 let last_updated_ms = chrono::Utc::now().timestamp_millis();
503 self.last_updated_ms = Some(last_updated_ms);
504 last_updated_ms
505 };
506
507 self.metadata.snapshot_log.push(SnapshotLog {
508 snapshot_id: snapshot.snapshot_id(),
509 timestamp_ms,
510 });
511 }
512
513 self.changes.push(TableUpdate::SetSnapshotRef {
514 ref_name: ref_name.to_string(),
515 reference: reference.clone(),
516 });
517 self.metadata.refs.insert(ref_name.to_string(), reference);
518
519 Ok(self)
520 }
521
522 pub fn remove_ref(mut self, ref_name: &str) -> Self {
526 if ref_name == MAIN_BRANCH {
527 self.metadata.current_snapshot_id = None;
528 self.metadata.snapshot_log.clear();
529 }
530
531 if self.metadata.refs.remove(ref_name).is_some() || ref_name == MAIN_BRANCH {
532 self.changes.push(TableUpdate::RemoveSnapshotRef {
533 ref_name: ref_name.to_string(),
534 });
535 }
536
537 self
538 }
539
540 pub fn set_statistics(mut self, statistics: StatisticsFile) -> Self {
542 self.metadata
543 .statistics
544 .insert(statistics.snapshot_id, statistics.clone());
545 self.changes.push(TableUpdate::SetStatistics {
546 statistics: statistics.clone(),
547 });
548 self
549 }
550
551 pub fn remove_statistics(mut self, snapshot_id: i64) -> Self {
553 let previous = self.metadata.statistics.remove(&snapshot_id);
554 if previous.is_some() {
555 self.changes
556 .push(TableUpdate::RemoveStatistics { snapshot_id });
557 }
558 self
559 }
560
561 pub fn set_partition_statistics(
563 mut self,
564 partition_statistics_file: PartitionStatisticsFile,
565 ) -> Self {
566 self.metadata.partition_statistics.insert(
567 partition_statistics_file.snapshot_id,
568 partition_statistics_file.clone(),
569 );
570 self.changes.push(TableUpdate::SetPartitionStatistics {
571 partition_statistics: partition_statistics_file,
572 });
573 self
574 }
575
576 pub fn remove_partition_statistics(mut self, snapshot_id: i64) -> Self {
578 let previous = self.metadata.partition_statistics.remove(&snapshot_id);
579 if previous.is_some() {
580 self.changes
581 .push(TableUpdate::RemovePartitionStatistics { snapshot_id });
582 }
583 self
584 }
585
586 pub fn add_schema(mut self, schema: Schema) -> Result<Self> {
593 self.validate_schema_field_names(&schema)?;
595
596 let new_schema_id = self.reuse_or_create_new_schema_id(&schema);
597 let schema_found = self.metadata.schemas.contains_key(&new_schema_id);
598
599 if schema_found {
600 if self.last_added_schema_id != Some(new_schema_id) {
601 self.changes.push(TableUpdate::AddSchema {
602 schema: schema.clone(),
603 });
604 self.last_added_schema_id = Some(new_schema_id);
605 }
606
607 return Ok(self);
608 }
609
610 self.metadata.last_column_id =
613 std::cmp::max(self.metadata.last_column_id, schema.highest_field_id());
614
615 let schema = match new_schema_id == schema.schema_id() {
617 true => schema,
618 false => schema.with_schema_id(new_schema_id),
619 };
620
621 self.metadata
622 .schemas
623 .insert(new_schema_id, schema.clone().into());
624
625 self.changes.push(TableUpdate::AddSchema { schema });
626
627 self.last_added_schema_id = Some(new_schema_id);
628
629 Ok(self)
630 }
631
632 pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> {
640 if schema_id == Self::LAST_ADDED {
641 schema_id = self.last_added_schema_id.ok_or_else(|| {
642 Error::new(
643 ErrorKind::DataInvalid,
644 "Cannot set current schema to last added schema: no schema has been added.",
645 )
646 })?;
647 };
648 let schema_id = schema_id; if schema_id == self.metadata.current_schema_id {
651 return Ok(self);
652 }
653
654 let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| {
655 Error::new(
656 ErrorKind::DataInvalid,
657 format!(
658 "Cannot set current schema to unknown schema with id: '{}'",
659 schema_id
660 ),
661 )
662 })?;
663
664 self.metadata.current_schema_id = schema_id;
670
671 if self.last_added_schema_id == Some(schema_id) {
672 self.changes.push(TableUpdate::SetCurrentSchema {
673 schema_id: Self::LAST_ADDED,
674 });
675 } else {
676 self.changes
677 .push(TableUpdate::SetCurrentSchema { schema_id });
678 }
679
680 Ok(self)
681 }
682
683 pub fn add_current_schema(self, schema: Schema) -> Result<Self> {
685 self.add_schema(schema)?
686 .set_current_schema(Self::LAST_ADDED)
687 }
688
689 fn validate_schema_field_names(&self, schema: &Schema) -> Result<()> {
698 if self.metadata.schemas.is_empty() {
699 return Ok(());
700 }
701
702 for field_name in schema.field_id_to_name_map().values() {
703 let has_partition_conflict = self.metadata.partition_name_exists(field_name);
704 let is_new_field = !self.metadata.name_exists_in_any_schema(field_name);
705
706 if has_partition_conflict && is_new_field {
707 return Err(Error::new(
708 ErrorKind::DataInvalid,
709 format!(
710 "Cannot add schema field '{}' because it conflicts with existing partition field name. \
711 Schema evolution cannot introduce field names that match existing partition field names.",
712 field_name
713 ),
714 ));
715 }
716 }
717
718 Ok(())
719 }
720
721 fn validate_partition_field_names(&self, unbound_spec: &UnboundPartitionSpec) -> Result<()> {
731 if self.metadata.schemas.is_empty() {
732 return Ok(());
733 }
734
735 let current_schema = self.get_current_schema()?;
736 for partition_field in unbound_spec.fields() {
737 let exists_in_any_schema = self
738 .metadata
739 .name_exists_in_any_schema(&partition_field.name);
740
741 if !exists_in_any_schema {
743 continue;
744 }
745
746 if let Some(schema_field) = current_schema.field_by_name(&partition_field.name) {
748 let is_identity_transform =
749 partition_field.transform == crate::spec::Transform::Identity;
750 let has_matching_source_id = schema_field.id == partition_field.source_id;
751
752 if !is_identity_transform {
753 return Err(Error::new(
754 ErrorKind::DataInvalid,
755 format!(
756 "Cannot create partition with name '{}' that conflicts with schema field and is not an identity transform.",
757 partition_field.name
758 ),
759 ));
760 }
761
762 if !has_matching_source_id {
763 return Err(Error::new(
764 ErrorKind::DataInvalid,
765 format!(
766 "Cannot create identity partition sourced from different field in schema. \
767 Field name '{}' has id `{}` in schema but partition source id is `{}`",
768 partition_field.name, schema_field.id, partition_field.source_id
769 ),
770 ));
771 }
772 }
773 }
774
775 Ok(())
776 }
777
778 pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
789 let schema = self.get_current_schema()?.clone();
790
791 self.validate_partition_field_names(&unbound_spec)?;
793
794 let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)?
795 .with_last_assigned_field_id(self.metadata.last_partition_id)
796 .build()?;
797
798 let new_spec_id = self.reuse_or_create_new_spec_id(&spec);
799 let spec_found = self.metadata.partition_specs.contains_key(&new_spec_id);
800 let spec = spec.with_spec_id(new_spec_id);
801 let unbound_spec = unbound_spec.with_spec_id(new_spec_id);
802
803 if spec_found {
804 if self.last_added_spec_id != Some(new_spec_id) {
805 self.changes
806 .push(TableUpdate::AddSpec { spec: unbound_spec });
807 self.last_added_spec_id = Some(new_spec_id);
808 }
809
810 return Ok(self);
811 }
812
813 if self.metadata.format_version <= FormatVersion::V1 && !spec.has_sequential_ids() {
814 return Err(Error::new(
815 ErrorKind::DataInvalid,
816 "Cannot add partition spec with non-sequential field ids to format version 1 table",
817 ));
818 }
819
820 let highest_field_id = spec
821 .highest_field_id()
822 .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID);
823 self.metadata
824 .partition_specs
825 .insert(new_spec_id, Arc::new(spec));
826 self.changes
827 .push(TableUpdate::AddSpec { spec: unbound_spec });
828
829 self.last_added_spec_id = Some(new_spec_id);
830 self.metadata.last_partition_id =
831 std::cmp::max(self.metadata.last_partition_id, highest_field_id);
832
833 Ok(self)
834 }
835
836 pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> Result<Self> {
842 if spec_id == Self::LAST_ADDED {
843 spec_id = self.last_added_spec_id.ok_or_else(|| {
844 Error::new(
845 ErrorKind::DataInvalid,
846 "Cannot set default partition spec to last added spec: no spec has been added.",
847 )
848 })?;
849 }
850
851 if self.metadata.default_spec.spec_id() == spec_id {
852 return Ok(self);
853 }
854
855 if !self.metadata.partition_specs.contains_key(&spec_id) {
856 return Err(Error::new(
857 ErrorKind::DataInvalid,
858 format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",),
859 ));
860 }
861
862 let schemaless_spec = self
863 .metadata
864 .partition_specs
865 .get(&spec_id)
866 .ok_or_else(|| {
867 Error::new(
868 ErrorKind::DataInvalid,
869 format!(
870 "Cannot set default partition spec to unknown spec with id: '{spec_id}'",
871 ),
872 )
873 })?
874 .clone();
875 let spec = Arc::unwrap_or_clone(schemaless_spec);
876 let spec_type = spec.partition_type(self.get_current_schema()?)?;
877 self.metadata.default_spec = Arc::new(spec);
878 self.metadata.default_partition_type = spec_type;
879
880 if self.last_added_spec_id == Some(spec_id) {
881 self.changes.push(TableUpdate::SetDefaultSpec {
882 spec_id: Self::LAST_ADDED,
883 });
884 } else {
885 self.changes.push(TableUpdate::SetDefaultSpec { spec_id });
886 }
887
888 Ok(self)
889 }
890
891 pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
893 self.add_partition_spec(unbound_spec)?
894 .set_default_partition_spec(Self::LAST_ADDED)
895 }
896
897 pub fn remove_partition_specs(mut self, spec_ids: &[i32]) -> Result<Self> {
904 if spec_ids.contains(&self.metadata.default_spec.spec_id()) {
905 return Err(Error::new(
906 ErrorKind::DataInvalid,
907 "Cannot remove default partition spec",
908 ));
909 }
910
911 let mut removed_specs = Vec::with_capacity(spec_ids.len());
912 spec_ids.iter().for_each(|id| {
913 if self.metadata.partition_specs.remove(id).is_some() {
914 removed_specs.push(*id);
915 }
916 });
917
918 if !removed_specs.is_empty() {
919 self.changes.push(TableUpdate::RemovePartitionSpecs {
920 spec_ids: removed_specs,
921 });
922 }
923
924 Ok(self)
925 }
926
927 pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result<Self> {
938 let new_order_id = self.reuse_or_create_new_sort_id(&sort_order);
939 let sort_order_found = self.metadata.sort_orders.contains_key(&new_order_id);
940
941 if sort_order_found {
942 if self.last_added_order_id != Some(new_order_id) {
943 self.changes.push(TableUpdate::AddSortOrder {
944 sort_order: sort_order.clone().with_order_id(new_order_id),
945 });
946 self.last_added_order_id = Some(new_order_id);
947 }
948
949 return Ok(self);
950 }
951
952 let schema = self.get_current_schema()?.clone().as_ref().clone();
953 let sort_order = SortOrder::builder()
954 .with_order_id(new_order_id)
955 .with_fields(sort_order.fields)
956 .build(&schema)
957 .map_err(|e| {
958 Error::new(
959 ErrorKind::DataInvalid,
960 format!("Sort order to add is incompatible with current schema: {e}"),
961 )
962 .with_source(e)
963 })?;
964
965 self.last_added_order_id = Some(new_order_id);
966 self.metadata
967 .sort_orders
968 .insert(new_order_id, sort_order.clone().into());
969 self.changes.push(TableUpdate::AddSortOrder { sort_order });
970
971 Ok(self)
972 }
973
974 pub fn set_default_sort_order(mut self, mut sort_order_id: i64) -> Result<Self> {
980 if sort_order_id == Self::LAST_ADDED as i64 {
981 sort_order_id = self.last_added_order_id.ok_or_else(|| {
982 Error::new(
983 ErrorKind::DataInvalid,
984 "Cannot set default sort order to last added order: no order has been added.",
985 )
986 })?;
987 }
988
989 if self.metadata.default_sort_order_id == sort_order_id {
990 return Ok(self);
991 }
992
993 if !self.metadata.sort_orders.contains_key(&sort_order_id) {
994 return Err(Error::new(
995 ErrorKind::DataInvalid,
996 format!(
997 "Cannot set default sort order to unknown order with id: '{sort_order_id}'"
998 ),
999 ));
1000 }
1001
1002 self.metadata.default_sort_order_id = sort_order_id;
1003
1004 if self.last_added_order_id == Some(sort_order_id) {
1005 self.changes.push(TableUpdate::SetDefaultSortOrder {
1006 sort_order_id: Self::LAST_ADDED as i64,
1007 });
1008 } else {
1009 self.changes
1010 .push(TableUpdate::SetDefaultSortOrder { sort_order_id });
1011 }
1012
1013 Ok(self)
1014 }
1015
1016 fn add_default_sort_order(self, sort_order: SortOrder) -> Result<Self> {
1018 self.add_sort_order(sort_order)?
1019 .set_default_sort_order(Self::LAST_ADDED as i64)
1020 }
1021
1022 pub fn build(mut self) -> Result<TableMetadataBuildResult> {
1024 self.metadata.last_updated_ms = self
1025 .last_updated_ms
1026 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1027
1028 let schema = self.get_current_schema()?.clone();
1032 let sort_order = Arc::unwrap_or_clone(self.get_default_sort_order()?);
1033
1034 self.metadata.default_spec = Arc::new(
1035 Arc::unwrap_or_clone(self.metadata.default_spec)
1036 .into_unbound()
1037 .bind(schema.clone())?,
1038 );
1039 self.metadata.default_partition_type =
1040 self.metadata.default_spec.partition_type(&schema)?;
1041 SortOrder::builder()
1042 .with_fields(sort_order.fields)
1043 .build(&schema)?;
1044
1045 self.update_snapshot_log()?;
1046 self.metadata.try_normalize()?;
1047
1048 if let Some(hist_entry) = self.previous_history_entry.take() {
1049 self.metadata.metadata_log.push(hist_entry);
1050 }
1051 let expired_metadata_logs = self.expire_metadata_log();
1052
1053 Ok(TableMetadataBuildResult {
1054 metadata: self.metadata,
1055 changes: self.changes,
1056 expired_metadata_logs,
1057 })
1058 }
1059
1060 fn expire_metadata_log(&mut self) -> Vec<MetadataLog> {
1061 let max_size = self
1062 .metadata
1063 .properties
1064 .get(PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
1065 .and_then(|v| v.parse::<usize>().ok())
1066 .unwrap_or(PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
1067 .max(1);
1068
1069 if self.metadata.metadata_log.len() > max_size {
1070 self.metadata
1071 .metadata_log
1072 .drain(0..self.metadata.metadata_log.len() - max_size)
1073 .collect()
1074 } else {
1075 Vec::new()
1076 }
1077 }
1078
1079 fn update_snapshot_log(&mut self) -> Result<()> {
1080 let intermediate_snapshots = self.get_intermediate_snapshots();
1081 let has_removed_snapshots = self
1082 .changes
1083 .iter()
1084 .any(|update| matches!(update, TableUpdate::RemoveSnapshots { .. }));
1085
1086 if intermediate_snapshots.is_empty() && !has_removed_snapshots {
1087 return Ok(());
1088 }
1089
1090 let mut new_snapshot_log = Vec::new();
1091 for log_entry in &self.metadata.snapshot_log {
1092 let snapshot_id = log_entry.snapshot_id;
1093 if self.metadata.snapshots.contains_key(&snapshot_id) {
1094 if !intermediate_snapshots.contains(&snapshot_id) {
1095 new_snapshot_log.push(log_entry.clone());
1096 }
1097 } else if has_removed_snapshots {
1098 new_snapshot_log.clear();
1104 }
1105 }
1106
1107 if let Some(current_snapshot_id) = self.metadata.current_snapshot_id {
1108 let last_id = new_snapshot_log.last().map(|entry| entry.snapshot_id);
1109 if last_id != Some(current_snapshot_id) {
1110 return Err(Error::new(
1111 ErrorKind::DataInvalid,
1112 "Cannot set invalid snapshot log: latest entry is not the current snapshot",
1113 ));
1114 }
1115 };
1116
1117 self.metadata.snapshot_log = new_snapshot_log;
1118 Ok(())
1119 }
1120
1121 fn get_intermediate_snapshots(&self) -> HashSet<i64> {
1131 let added_snapshot_ids = self
1132 .changes
1133 .iter()
1134 .filter_map(|update| match update {
1135 TableUpdate::AddSnapshot { snapshot } => Some(snapshot.snapshot_id()),
1136 _ => None,
1137 })
1138 .collect::<HashSet<_>>();
1139
1140 self.changes
1141 .iter()
1142 .filter_map(|update| match update {
1143 TableUpdate::SetSnapshotRef {
1144 ref_name,
1145 reference,
1146 } => {
1147 if added_snapshot_ids.contains(&reference.snapshot_id)
1148 && ref_name == MAIN_BRANCH
1149 && reference.snapshot_id
1150 != self
1151 .metadata
1152 .current_snapshot_id
1153 .unwrap_or(i64::from(Self::LAST_ADDED))
1154 {
1155 Some(reference.snapshot_id)
1156 } else {
1157 None
1158 }
1159 }
1160 _ => None,
1161 })
1162 .collect()
1163 }
1164
1165 fn reassign_ids(
1166 schema: Schema,
1167 spec: UnboundPartitionSpec,
1168 sort_order: SortOrder,
1169 ) -> Result<(Schema, PartitionSpec, SortOrder)> {
1170 let previous_id_to_name = schema.field_id_to_name_map().clone();
1172 let fresh_schema = schema
1173 .into_builder()
1174 .with_schema_id(DEFAULT_SCHEMA_ID)
1175 .with_reassigned_field_ids(FIRST_FIELD_ID)
1176 .build()?;
1177
1178 let mut fresh_spec = PartitionSpecBuilder::new(fresh_schema.clone());
1180 for field in spec.fields() {
1181 let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1182 Error::new(
1183 ErrorKind::DataInvalid,
1184 format!(
1185 "Cannot find source column with id {} for partition column {} in schema.",
1186 field.source_id, field.name
1187 ),
1188 )
1189 })?;
1190 fresh_spec =
1191 fresh_spec.add_partition_field(source_field_name, &field.name, field.transform)?;
1192 }
1193 let fresh_spec = fresh_spec.build()?;
1194
1195 let mut fresh_order = SortOrder::builder();
1197 for mut field in sort_order.fields {
1198 let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1199 Error::new(
1200 ErrorKind::DataInvalid,
1201 format!(
1202 "Cannot find source column with id {} for sort column in schema.",
1203 field.source_id
1204 ),
1205 )
1206 })?;
1207 let new_field_id = fresh_schema
1208 .field_by_name(source_field_name)
1209 .ok_or_else(|| {
1210 Error::new(
1211 ErrorKind::Unexpected,
1212 format!(
1213 "Cannot find source column with name {} for sort column in re-assigned schema.",
1214 source_field_name
1215 ),
1216 )
1217 })?.id;
1218 field.source_id = new_field_id;
1219 fresh_order.with_sort_field(field);
1220 }
1221 let fresh_sort_order = fresh_order.build(&fresh_schema)?;
1222
1223 Ok((fresh_schema, fresh_spec, fresh_sort_order))
1224 }
1225
1226 fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 {
1227 self.metadata
1228 .schemas
1229 .iter()
1230 .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
1231 .unwrap_or_else(|| self.get_highest_schema_id() + 1)
1232 }
1233
1234 fn get_highest_schema_id(&self) -> i32 {
1235 *self
1236 .metadata
1237 .schemas
1238 .keys()
1239 .max()
1240 .unwrap_or(&self.metadata.current_schema_id)
1241 }
1242
1243 fn get_current_schema(&self) -> Result<&SchemaRef> {
1244 self.metadata
1245 .schemas
1246 .get(&self.metadata.current_schema_id)
1247 .ok_or_else(|| {
1248 Error::new(
1249 ErrorKind::DataInvalid,
1250 format!(
1251 "Current schema with id '{}' not found in table metadata.",
1252 self.metadata.current_schema_id
1253 ),
1254 )
1255 })
1256 }
1257
1258 fn get_default_sort_order(&self) -> Result<SortOrderRef> {
1259 self.metadata
1260 .sort_orders
1261 .get(&self.metadata.default_sort_order_id)
1262 .cloned()
1263 .ok_or_else(|| {
1264 Error::new(
1265 ErrorKind::DataInvalid,
1266 format!(
1267 "Default sort order with id '{}' not found in table metadata.",
1268 self.metadata.default_sort_order_id
1269 ),
1270 )
1271 })
1272 }
1273
1274 fn reuse_or_create_new_spec_id(&self, new_spec: &PartitionSpec) -> i32 {
1276 self.metadata
1277 .partition_specs
1278 .iter()
1279 .find_map(|(id, old_spec)| new_spec.is_compatible_with(old_spec).then_some(*id))
1280 .unwrap_or_else(|| {
1281 self.get_highest_spec_id()
1282 .map(|id| id + 1)
1283 .unwrap_or(DEFAULT_PARTITION_SPEC_ID)
1284 })
1285 }
1286
1287 fn get_highest_spec_id(&self) -> Option<i32> {
1288 self.metadata.partition_specs.keys().max().copied()
1289 }
1290
1291 fn reuse_or_create_new_sort_id(&self, new_sort_order: &SortOrder) -> i64 {
1293 if new_sort_order.is_unsorted() {
1294 return SortOrder::unsorted_order().order_id;
1295 }
1296
1297 self.metadata
1298 .sort_orders
1299 .iter()
1300 .find_map(|(id, sort_order)| {
1301 sort_order.fields.eq(&new_sort_order.fields).then_some(*id)
1302 })
1303 .unwrap_or_else(|| {
1304 self.highest_sort_order_id()
1305 .unwrap_or(SortOrder::unsorted_order().order_id)
1306 + 1
1307 })
1308 }
1309
1310 fn highest_sort_order_id(&self) -> Option<i64> {
1311 self.metadata.sort_orders.keys().max().copied()
1312 }
1313
1314 pub fn remove_schemas(mut self, schema_id_to_remove: &[i32]) -> Result<Self> {
1317 if schema_id_to_remove.contains(&self.metadata.current_schema_id) {
1318 return Err(Error::new(
1319 ErrorKind::DataInvalid,
1320 "Cannot remove current schema",
1321 ));
1322 }
1323
1324 if schema_id_to_remove.is_empty() {
1325 return Ok(self);
1326 }
1327
1328 let mut removed_schemas = Vec::with_capacity(schema_id_to_remove.len());
1329 self.metadata.schemas.retain(|id, _schema| {
1330 if schema_id_to_remove.contains(id) {
1331 removed_schemas.push(*id);
1332 false
1333 } else {
1334 true
1335 }
1336 });
1337
1338 self.changes.push(TableUpdate::RemoveSchemas {
1339 schema_ids: removed_schemas,
1340 });
1341
1342 Ok(self)
1343 }
1344}
1345
1346impl From<TableMetadataBuildResult> for TableMetadata {
1347 fn from(result: TableMetadataBuildResult) -> Self {
1348 result.metadata
1349 }
1350}
1351
1352#[cfg(test)]
1353mod tests {
1354 use std::fs::File;
1355 use std::io::BufReader;
1356 use std::thread::sleep;
1357
1358 use super::*;
1359 use crate::TableIdent;
1360 use crate::io::FileIOBuilder;
1361 use crate::spec::{
1362 BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema,
1363 SnapshotRetention, SortDirection, SortField, StructType, Summary, Transform, Type,
1364 UnboundPartitionField,
1365 };
1366 use crate::table::Table;
1367
1368 const TEST_LOCATION: &str = "s3://bucket/test/location";
1369 const LAST_ASSIGNED_COLUMN_ID: i32 = 3;
1370
1371 fn schema() -> Schema {
1372 Schema::builder()
1373 .with_fields(vec![
1374 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1375 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1376 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1377 ])
1378 .build()
1379 .unwrap()
1380 }
1381
1382 fn sort_order() -> SortOrder {
1383 let schema = schema();
1384 SortOrder::builder()
1385 .with_order_id(1)
1386 .with_sort_field(SortField {
1387 source_id: 3,
1388 transform: Transform::Bucket(4),
1389 direction: SortDirection::Descending,
1390 null_order: NullOrder::First,
1391 })
1392 .build(&schema)
1393 .unwrap()
1394 }
1395
1396 fn partition_spec() -> UnboundPartitionSpec {
1397 UnboundPartitionSpec::builder()
1398 .with_spec_id(0)
1399 .add_partition_field(2, "y", Transform::Identity)
1400 .unwrap()
1401 .build()
1402 }
1403
1404 fn builder_without_changes(format_version: FormatVersion) -> TableMetadataBuilder {
1405 TableMetadataBuilder::new(
1406 schema(),
1407 partition_spec(),
1408 sort_order(),
1409 TEST_LOCATION.to_string(),
1410 format_version,
1411 HashMap::new(),
1412 )
1413 .unwrap()
1414 .build()
1415 .unwrap()
1416 .metadata
1417 .into_builder(Some(
1418 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1419 ))
1420 }
1421
1422 #[test]
1423 fn test_minimal_build() {
1424 let metadata = TableMetadataBuilder::new(
1425 schema(),
1426 partition_spec(),
1427 sort_order(),
1428 TEST_LOCATION.to_string(),
1429 FormatVersion::V1,
1430 HashMap::new(),
1431 )
1432 .unwrap()
1433 .build()
1434 .unwrap()
1435 .metadata;
1436
1437 assert_eq!(metadata.format_version, FormatVersion::V1);
1438 assert_eq!(metadata.location, TEST_LOCATION);
1439 assert_eq!(metadata.current_schema_id, 0);
1440 assert_eq!(metadata.default_spec.spec_id(), 0);
1441 assert_eq!(metadata.default_sort_order_id, 1);
1442 assert_eq!(metadata.last_partition_id, 1000);
1443 assert_eq!(metadata.last_column_id, 3);
1444 assert_eq!(metadata.snapshots.len(), 0);
1445 assert_eq!(metadata.current_snapshot_id, None);
1446 assert_eq!(metadata.refs.len(), 0);
1447 assert_eq!(metadata.properties.len(), 0);
1448 assert_eq!(metadata.metadata_log.len(), 0);
1449 assert_eq!(metadata.last_sequence_number, 0);
1450 assert_eq!(metadata.last_column_id, LAST_ASSIGNED_COLUMN_ID);
1451
1452 let _ = serde_json::to_string(&metadata).unwrap();
1454
1455 let metadata = metadata
1457 .into_builder(Some(
1458 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1459 ))
1460 .upgrade_format_version(FormatVersion::V2)
1461 .unwrap()
1462 .build()
1463 .unwrap()
1464 .metadata;
1465
1466 assert_eq!(metadata.format_version, FormatVersion::V2);
1467 let _ = serde_json::to_string(&metadata).unwrap();
1468 }
1469
1470 #[test]
1471 fn test_build_unpartitioned_unsorted() {
1472 let schema = Schema::builder().build().unwrap();
1473 let metadata = TableMetadataBuilder::new(
1474 schema.clone(),
1475 PartitionSpec::unpartition_spec(),
1476 SortOrder::unsorted_order(),
1477 TEST_LOCATION.to_string(),
1478 FormatVersion::V2,
1479 HashMap::new(),
1480 )
1481 .unwrap()
1482 .build()
1483 .unwrap()
1484 .metadata;
1485
1486 assert_eq!(metadata.format_version, FormatVersion::V2);
1487 assert_eq!(metadata.location, TEST_LOCATION);
1488 assert_eq!(metadata.current_schema_id, 0);
1489 assert_eq!(metadata.default_spec.spec_id(), 0);
1490 assert_eq!(metadata.default_sort_order_id, 0);
1491 assert_eq!(metadata.last_partition_id, UNPARTITIONED_LAST_ASSIGNED_ID);
1492 assert_eq!(metadata.last_column_id, 0);
1493 assert_eq!(metadata.snapshots.len(), 0);
1494 assert_eq!(metadata.current_snapshot_id, None);
1495 assert_eq!(metadata.refs.len(), 0);
1496 assert_eq!(metadata.properties.len(), 0);
1497 assert_eq!(metadata.metadata_log.len(), 0);
1498 assert_eq!(metadata.last_sequence_number, 0);
1499 }
1500
1501 #[test]
1502 fn test_reassigns_ids() {
1503 let schema = Schema::builder()
1504 .with_schema_id(10)
1505 .with_fields(vec![
1506 NestedField::required(11, "a", Type::Primitive(PrimitiveType::Long)).into(),
1507 NestedField::required(12, "b", Type::Primitive(PrimitiveType::Long)).into(),
1508 NestedField::required(
1509 13,
1510 "struct",
1511 Type::Struct(StructType::new(vec![
1512 NestedField::required(14, "nested", Type::Primitive(PrimitiveType::Long))
1513 .into(),
1514 ])),
1515 )
1516 .into(),
1517 NestedField::required(15, "c", Type::Primitive(PrimitiveType::Long)).into(),
1518 ])
1519 .build()
1520 .unwrap();
1521 let spec = PartitionSpec::builder(schema.clone())
1522 .with_spec_id(20)
1523 .add_partition_field("a", "a", Transform::Identity)
1524 .unwrap()
1525 .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1526 .unwrap()
1527 .build()
1528 .unwrap();
1529 let sort_order = SortOrder::builder()
1530 .with_fields(vec![SortField {
1531 source_id: 11,
1532 transform: Transform::Identity,
1533 direction: SortDirection::Ascending,
1534 null_order: NullOrder::First,
1535 }])
1536 .with_order_id(10)
1537 .build(&schema)
1538 .unwrap();
1539
1540 let (fresh_schema, fresh_spec, fresh_sort_order) =
1541 TableMetadataBuilder::reassign_ids(schema, spec.into_unbound(), sort_order).unwrap();
1542
1543 let expected_schema = Schema::builder()
1544 .with_fields(vec![
1545 NestedField::required(1, "a", Type::Primitive(PrimitiveType::Long)).into(),
1546 NestedField::required(2, "b", Type::Primitive(PrimitiveType::Long)).into(),
1547 NestedField::required(
1548 3,
1549 "struct",
1550 Type::Struct(StructType::new(vec![
1551 NestedField::required(5, "nested", Type::Primitive(PrimitiveType::Long))
1552 .into(),
1553 ])),
1554 )
1555 .into(),
1556 NestedField::required(4, "c", Type::Primitive(PrimitiveType::Long)).into(),
1557 ])
1558 .build()
1559 .unwrap();
1560
1561 let expected_spec = PartitionSpec::builder(expected_schema.clone())
1562 .with_spec_id(0)
1563 .add_partition_field("a", "a", Transform::Identity)
1564 .unwrap()
1565 .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1566 .unwrap()
1567 .build()
1568 .unwrap();
1569
1570 let expected_sort_order = SortOrder::builder()
1571 .with_fields(vec![SortField {
1572 source_id: 1,
1573 transform: Transform::Identity,
1574 direction: SortDirection::Ascending,
1575 null_order: NullOrder::First,
1576 }])
1577 .with_order_id(1)
1578 .build(&expected_schema)
1579 .unwrap();
1580
1581 assert_eq!(fresh_schema, expected_schema);
1582 assert_eq!(fresh_spec, expected_spec);
1583 assert_eq!(fresh_sort_order, expected_sort_order);
1584 }
1585
1586 #[test]
1587 fn test_ids_are_reassigned_for_new_metadata() {
1588 let schema = schema().into_builder().with_schema_id(10).build().unwrap();
1589
1590 let metadata = TableMetadataBuilder::new(
1591 schema,
1592 partition_spec(),
1593 sort_order(),
1594 TEST_LOCATION.to_string(),
1595 FormatVersion::V1,
1596 HashMap::new(),
1597 )
1598 .unwrap()
1599 .build()
1600 .unwrap()
1601 .metadata;
1602
1603 assert_eq!(metadata.current_schema_id, 0);
1604 assert_eq!(metadata.current_schema().schema_id(), 0);
1605 }
1606
1607 #[test]
1608 fn test_new_metadata_changes() {
1609 let changes = TableMetadataBuilder::new(
1610 schema(),
1611 partition_spec(),
1612 sort_order(),
1613 TEST_LOCATION.to_string(),
1614 FormatVersion::V1,
1615 HashMap::from_iter(vec![("property 1".to_string(), "value 1".to_string())]),
1616 )
1617 .unwrap()
1618 .build()
1619 .unwrap()
1620 .changes;
1621
1622 pretty_assertions::assert_eq!(changes, vec![
1623 TableUpdate::SetLocation {
1624 location: TEST_LOCATION.to_string()
1625 },
1626 TableUpdate::AddSchema { schema: schema() },
1627 TableUpdate::SetCurrentSchema { schema_id: -1 },
1628 TableUpdate::AddSpec {
1629 spec: PartitionSpec::builder(schema())
1632 .with_spec_id(0)
1633 .add_unbound_field(UnboundPartitionField {
1634 name: "y".to_string(),
1635 transform: Transform::Identity,
1636 source_id: 2,
1637 field_id: Some(1000)
1638 })
1639 .unwrap()
1640 .build()
1641 .unwrap()
1642 .into_unbound(),
1643 },
1644 TableUpdate::SetDefaultSpec { spec_id: -1 },
1645 TableUpdate::AddSortOrder {
1646 sort_order: sort_order(),
1647 },
1648 TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1649 TableUpdate::SetProperties {
1650 updates: HashMap::from_iter(vec![(
1651 "property 1".to_string(),
1652 "value 1".to_string()
1653 )]),
1654 }
1655 ]);
1656 }
1657
1658 #[test]
1659 fn test_new_metadata_changes_unpartitioned_unsorted() {
1660 let schema = Schema::builder().build().unwrap();
1661 let changes = TableMetadataBuilder::new(
1662 schema.clone(),
1663 PartitionSpec::unpartition_spec().into_unbound(),
1664 SortOrder::unsorted_order(),
1665 TEST_LOCATION.to_string(),
1666 FormatVersion::V1,
1667 HashMap::new(),
1668 )
1669 .unwrap()
1670 .build()
1671 .unwrap()
1672 .changes;
1673
1674 pretty_assertions::assert_eq!(changes, vec![
1675 TableUpdate::SetLocation {
1676 location: TEST_LOCATION.to_string()
1677 },
1678 TableUpdate::AddSchema {
1679 schema: Schema::builder().build().unwrap(),
1680 },
1681 TableUpdate::SetCurrentSchema { schema_id: -1 },
1682 TableUpdate::AddSpec {
1683 spec: PartitionSpec::builder(schema)
1686 .with_spec_id(0)
1687 .build()
1688 .unwrap()
1689 .into_unbound(),
1690 },
1691 TableUpdate::SetDefaultSpec { spec_id: -1 },
1692 TableUpdate::AddSortOrder {
1693 sort_order: SortOrder::unsorted_order(),
1694 },
1695 TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1696 ]);
1697 }
1698
1699 #[test]
1700 fn test_add_partition_spec() {
1701 let builder = builder_without_changes(FormatVersion::V2);
1702
1703 let added_spec = UnboundPartitionSpec::builder()
1704 .with_spec_id(10)
1705 .add_partition_fields(vec![
1706 UnboundPartitionField {
1707 name: "y".to_string(),
1709 transform: Transform::Identity,
1710 source_id: 2,
1711 field_id: Some(1000),
1712 },
1713 UnboundPartitionField {
1714 name: "z".to_string(),
1716 transform: Transform::Identity,
1717 source_id: 3,
1718 field_id: None,
1719 },
1720 ])
1721 .unwrap()
1722 .build();
1723
1724 let build_result = builder
1725 .add_partition_spec(added_spec.clone())
1726 .unwrap()
1727 .build()
1728 .unwrap();
1729
1730 let expected_change = added_spec.with_spec_id(1);
1732 let expected_spec = PartitionSpec::builder(schema())
1733 .with_spec_id(1)
1734 .add_unbound_field(UnboundPartitionField {
1735 name: "y".to_string(),
1736 transform: Transform::Identity,
1737 source_id: 2,
1738 field_id: Some(1000),
1739 })
1740 .unwrap()
1741 .add_unbound_field(UnboundPartitionField {
1742 name: "z".to_string(),
1743 transform: Transform::Identity,
1744 source_id: 3,
1745 field_id: Some(1001),
1746 })
1747 .unwrap()
1748 .build()
1749 .unwrap();
1750
1751 assert_eq!(build_result.changes.len(), 1);
1752 assert_eq!(
1753 build_result.metadata.partition_spec_by_id(1),
1754 Some(&Arc::new(expected_spec))
1755 );
1756 assert_eq!(build_result.metadata.default_spec.spec_id(), 0);
1757 assert_eq!(build_result.metadata.last_partition_id, 1001);
1758 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1759 spec: expected_change
1760 });
1761
1762 let build_result = build_result
1764 .metadata
1765 .into_builder(Some(
1766 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1767 ))
1768 .remove_partition_specs(&[1])
1769 .unwrap()
1770 .build()
1771 .unwrap();
1772
1773 assert_eq!(build_result.changes.len(), 1);
1774 assert_eq!(build_result.metadata.partition_specs.len(), 1);
1775 assert!(build_result.metadata.partition_spec_by_id(1).is_none());
1776 }
1777
1778 #[test]
1779 fn test_set_default_partition_spec() {
1780 let builder = builder_without_changes(FormatVersion::V2);
1781 let schema = builder.get_current_schema().unwrap().clone();
1782 let added_spec = UnboundPartitionSpec::builder()
1783 .with_spec_id(10)
1784 .add_partition_field(1, "y_bucket[2]", Transform::Bucket(2))
1785 .unwrap()
1786 .build();
1787
1788 let build_result = builder
1789 .add_partition_spec(added_spec.clone())
1790 .unwrap()
1791 .set_default_partition_spec(-1)
1792 .unwrap()
1793 .build()
1794 .unwrap();
1795
1796 let expected_spec = PartitionSpec::builder(schema)
1797 .with_spec_id(1)
1798 .add_unbound_field(UnboundPartitionField {
1799 name: "y_bucket[2]".to_string(),
1800 transform: Transform::Bucket(2),
1801 source_id: 1,
1802 field_id: Some(1001),
1803 })
1804 .unwrap()
1805 .build()
1806 .unwrap();
1807
1808 assert_eq!(build_result.changes.len(), 2);
1809 assert_eq!(build_result.metadata.default_spec, Arc::new(expected_spec));
1810 assert_eq!(build_result.changes, vec![
1811 TableUpdate::AddSpec {
1812 spec: added_spec.with_spec_id(1)
1814 },
1815 TableUpdate::SetDefaultSpec { spec_id: -1 }
1816 ]);
1817 }
1818
1819 #[test]
1820 fn test_set_existing_default_partition_spec() {
1821 let builder = builder_without_changes(FormatVersion::V2);
1822 let unbound_spec = UnboundPartitionSpec::builder().with_spec_id(1).build();
1824 let build_result = builder
1825 .add_partition_spec(unbound_spec.clone())
1826 .unwrap()
1827 .set_default_partition_spec(-1)
1828 .unwrap()
1829 .build()
1830 .unwrap();
1831
1832 assert_eq!(build_result.changes.len(), 2);
1833 assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1834 spec: unbound_spec.clone()
1835 });
1836 assert_eq!(build_result.changes[1], TableUpdate::SetDefaultSpec {
1837 spec_id: -1
1838 });
1839 assert_eq!(
1840 build_result.metadata.default_spec,
1841 Arc::new(
1842 unbound_spec
1843 .bind(build_result.metadata.current_schema().clone())
1844 .unwrap()
1845 )
1846 );
1847
1848 let build_result = build_result
1850 .metadata
1851 .into_builder(Some(
1852 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1853 ))
1854 .set_default_partition_spec(0)
1855 .unwrap()
1856 .build()
1857 .unwrap();
1858
1859 assert_eq!(build_result.changes.len(), 1);
1860 assert_eq!(build_result.changes[0], TableUpdate::SetDefaultSpec {
1861 spec_id: 0
1862 });
1863 assert_eq!(
1864 build_result.metadata.default_spec,
1865 Arc::new(
1866 partition_spec()
1867 .bind(build_result.metadata.current_schema().clone())
1868 .unwrap()
1869 )
1870 );
1871 }
1872
1873 #[test]
1874 fn test_add_sort_order() {
1875 let builder = builder_without_changes(FormatVersion::V2);
1876
1877 let added_sort_order = SortOrder::builder()
1878 .with_order_id(10)
1879 .with_fields(vec![SortField {
1880 source_id: 1,
1881 transform: Transform::Identity,
1882 direction: SortDirection::Ascending,
1883 null_order: NullOrder::First,
1884 }])
1885 .build(&schema())
1886 .unwrap();
1887
1888 let build_result = builder
1889 .add_sort_order(added_sort_order.clone())
1890 .unwrap()
1891 .build()
1892 .unwrap();
1893
1894 let expected_sort_order = added_sort_order.with_order_id(2);
1895
1896 assert_eq!(build_result.changes.len(), 1);
1897 assert_eq!(build_result.metadata.sort_orders.keys().max(), Some(&2));
1898 pretty_assertions::assert_eq!(
1899 build_result.metadata.sort_order_by_id(2),
1900 Some(&Arc::new(expected_sort_order.clone()))
1901 );
1902 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSortOrder {
1903 sort_order: expected_sort_order
1904 });
1905 }
1906
1907 #[test]
1908 fn test_add_compatible_schema() {
1909 let builder = builder_without_changes(FormatVersion::V2);
1910
1911 let added_schema = Schema::builder()
1912 .with_schema_id(1)
1913 .with_fields(vec![
1914 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1915 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1916 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1917 NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
1918 ])
1919 .build()
1920 .unwrap();
1921
1922 let build_result = builder
1923 .add_current_schema(added_schema.clone())
1924 .unwrap()
1925 .build()
1926 .unwrap();
1927
1928 assert_eq!(build_result.changes.len(), 2);
1929 assert_eq!(build_result.metadata.schemas.keys().max(), Some(&1));
1930 pretty_assertions::assert_eq!(
1931 build_result.metadata.schema_by_id(1),
1932 Some(&Arc::new(added_schema.clone()))
1933 );
1934 pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSchema {
1935 schema: added_schema
1936 });
1937 assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
1938 schema_id: -1
1939 });
1940 }
1941
1942 #[test]
1943 fn test_set_current_schema_change_is_minus_one_if_schema_was_added_in_this_change() {
1944 let builder = builder_without_changes(FormatVersion::V2);
1945
1946 let added_schema = Schema::builder()
1947 .with_schema_id(1)
1948 .with_fields(vec![
1949 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1950 NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1951 NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1952 NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
1953 ])
1954 .build()
1955 .unwrap();
1956
1957 let build_result = builder
1958 .add_schema(added_schema.clone())
1959 .unwrap()
1960 .set_current_schema(1)
1961 .unwrap()
1962 .build()
1963 .unwrap();
1964
1965 assert_eq!(build_result.changes.len(), 2);
1966 assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
1967 schema_id: -1
1968 });
1969 }
1970
1971 #[test]
1972 fn test_no_metadata_log_for_create_table() {
1973 let build_result = TableMetadataBuilder::new(
1974 schema(),
1975 partition_spec(),
1976 sort_order(),
1977 TEST_LOCATION.to_string(),
1978 FormatVersion::V2,
1979 HashMap::new(),
1980 )
1981 .unwrap()
1982 .build()
1983 .unwrap();
1984
1985 assert_eq!(build_result.metadata.metadata_log.len(), 0);
1986 }
1987
1988 #[test]
1989 fn test_no_metadata_log_entry_for_no_previous_location() {
1990 let metadata = builder_without_changes(FormatVersion::V2)
1992 .build()
1993 .unwrap()
1994 .metadata;
1995 assert_eq!(metadata.metadata_log.len(), 1);
1996
1997 let build_result = metadata
1998 .into_builder(None)
1999 .set_properties(HashMap::from_iter(vec![(
2000 "foo".to_string(),
2001 "bar".to_string(),
2002 )]))
2003 .unwrap()
2004 .build()
2005 .unwrap();
2006
2007 assert_eq!(build_result.metadata.metadata_log.len(), 1);
2008 }
2009
2010 #[test]
2011 fn test_from_metadata_generates_metadata_log() {
2012 let metadata_path = "s3://bucket/test/location/metadata/metadata1.json";
2013 let builder = TableMetadataBuilder::new(
2014 schema(),
2015 partition_spec(),
2016 sort_order(),
2017 TEST_LOCATION.to_string(),
2018 FormatVersion::V2,
2019 HashMap::new(),
2020 )
2021 .unwrap()
2022 .build()
2023 .unwrap()
2024 .metadata
2025 .into_builder(Some(metadata_path.to_string()));
2026
2027 let builder = builder
2028 .add_default_sort_order(SortOrder::unsorted_order())
2029 .unwrap();
2030
2031 let build_result = builder.build().unwrap();
2032
2033 assert_eq!(build_result.metadata.metadata_log.len(), 1);
2034 assert_eq!(
2035 build_result.metadata.metadata_log[0].metadata_file,
2036 metadata_path
2037 );
2038 }
2039
2040 #[test]
2041 fn test_set_ref() {
2042 let builder = builder_without_changes(FormatVersion::V2);
2043
2044 let snapshot = Snapshot::builder()
2045 .with_snapshot_id(1)
2046 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2047 .with_sequence_number(0)
2048 .with_schema_id(0)
2049 .with_manifest_list("/snap-1.avro")
2050 .with_summary(Summary {
2051 operation: Operation::Append,
2052 additional_properties: HashMap::from_iter(vec![
2053 (
2054 "spark.app.id".to_string(),
2055 "local-1662532784305".to_string(),
2056 ),
2057 ("added-data-files".to_string(), "4".to_string()),
2058 ("added-records".to_string(), "4".to_string()),
2059 ("added-files-size".to_string(), "6001".to_string()),
2060 ]),
2061 })
2062 .build();
2063
2064 let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2065
2066 assert!(
2067 builder
2068 .clone()
2069 .set_ref(MAIN_BRANCH, SnapshotReference {
2070 snapshot_id: 10,
2071 retention: SnapshotRetention::Branch {
2072 min_snapshots_to_keep: Some(10),
2073 max_snapshot_age_ms: None,
2074 max_ref_age_ms: None,
2075 },
2076 })
2077 .unwrap_err()
2078 .to_string()
2079 .contains("Cannot set 'main' to unknown snapshot: '10'")
2080 );
2081
2082 let build_result = builder
2083 .set_ref(MAIN_BRANCH, SnapshotReference {
2084 snapshot_id: 1,
2085 retention: SnapshotRetention::Branch {
2086 min_snapshots_to_keep: Some(10),
2087 max_snapshot_age_ms: None,
2088 max_ref_age_ms: None,
2089 },
2090 })
2091 .unwrap()
2092 .build()
2093 .unwrap();
2094 assert_eq!(build_result.metadata.snapshots.len(), 1);
2095 assert_eq!(
2096 build_result.metadata.snapshot_by_id(1),
2097 Some(&Arc::new(snapshot.clone()))
2098 );
2099 assert_eq!(build_result.metadata.snapshot_log, vec![SnapshotLog {
2100 snapshot_id: 1,
2101 timestamp_ms: snapshot.timestamp_ms()
2102 }])
2103 }
2104
2105 #[test]
2106 fn test_snapshot_log_skips_intermediates() {
2107 let builder = builder_without_changes(FormatVersion::V2);
2108
2109 let snapshot_1 = Snapshot::builder()
2110 .with_snapshot_id(1)
2111 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2112 .with_sequence_number(0)
2113 .with_schema_id(0)
2114 .with_manifest_list("/snap-1.avro")
2115 .with_summary(Summary {
2116 operation: Operation::Append,
2117 additional_properties: HashMap::from_iter(vec![
2118 (
2119 "spark.app.id".to_string(),
2120 "local-1662532784305".to_string(),
2121 ),
2122 ("added-data-files".to_string(), "4".to_string()),
2123 ("added-records".to_string(), "4".to_string()),
2124 ("added-files-size".to_string(), "6001".to_string()),
2125 ]),
2126 })
2127 .build();
2128
2129 let snapshot_2 = Snapshot::builder()
2130 .with_snapshot_id(2)
2131 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2132 .with_sequence_number(0)
2133 .with_schema_id(0)
2134 .with_manifest_list("/snap-1.avro")
2135 .with_summary(Summary {
2136 operation: Operation::Append,
2137 additional_properties: HashMap::from_iter(vec![
2138 (
2139 "spark.app.id".to_string(),
2140 "local-1662532784305".to_string(),
2141 ),
2142 ("added-data-files".to_string(), "4".to_string()),
2143 ("added-records".to_string(), "4".to_string()),
2144 ("added-files-size".to_string(), "6001".to_string()),
2145 ]),
2146 })
2147 .build();
2148
2149 let result = builder
2150 .add_snapshot(snapshot_1)
2151 .unwrap()
2152 .set_ref(MAIN_BRANCH, SnapshotReference {
2153 snapshot_id: 1,
2154 retention: SnapshotRetention::Branch {
2155 min_snapshots_to_keep: Some(10),
2156 max_snapshot_age_ms: None,
2157 max_ref_age_ms: None,
2158 },
2159 })
2160 .unwrap()
2161 .set_branch_snapshot(snapshot_2.clone(), MAIN_BRANCH)
2162 .unwrap()
2163 .build()
2164 .unwrap();
2165
2166 assert_eq!(result.metadata.snapshot_log, vec![SnapshotLog {
2167 snapshot_id: 2,
2168 timestamp_ms: snapshot_2.timestamp_ms()
2169 }]);
2170 assert_eq!(result.metadata.current_snapshot().unwrap().snapshot_id(), 2);
2171 }
2172
2173 #[test]
2174 fn test_set_branch_snapshot_creates_branch_if_not_exists() {
2175 let builder = builder_without_changes(FormatVersion::V2);
2176
2177 let snapshot = Snapshot::builder()
2178 .with_snapshot_id(2)
2179 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2180 .with_sequence_number(0)
2181 .with_schema_id(0)
2182 .with_manifest_list("/snap-1.avro")
2183 .with_summary(Summary {
2184 operation: Operation::Append,
2185 additional_properties: HashMap::new(),
2186 })
2187 .build();
2188
2189 let build_result = builder
2190 .set_branch_snapshot(snapshot.clone(), "new_branch")
2191 .unwrap()
2192 .build()
2193 .unwrap();
2194
2195 let reference = SnapshotReference {
2196 snapshot_id: 2,
2197 retention: SnapshotRetention::Branch {
2198 min_snapshots_to_keep: None,
2199 max_snapshot_age_ms: None,
2200 max_ref_age_ms: None,
2201 },
2202 };
2203
2204 assert_eq!(build_result.metadata.refs.len(), 1);
2205 assert_eq!(
2206 build_result.metadata.refs.get("new_branch"),
2207 Some(&reference)
2208 );
2209 assert_eq!(build_result.changes, vec![
2210 TableUpdate::AddSnapshot { snapshot },
2211 TableUpdate::SetSnapshotRef {
2212 ref_name: "new_branch".to_string(),
2213 reference
2214 }
2215 ]);
2216 }
2217
2218 #[test]
2219 fn test_cannot_add_duplicate_snapshot_id() {
2220 let builder = builder_without_changes(FormatVersion::V2);
2221
2222 let snapshot = Snapshot::builder()
2223 .with_snapshot_id(2)
2224 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2225 .with_sequence_number(0)
2226 .with_schema_id(0)
2227 .with_manifest_list("/snap-1.avro")
2228 .with_summary(Summary {
2229 operation: Operation::Append,
2230 additional_properties: HashMap::from_iter(vec![
2231 (
2232 "spark.app.id".to_string(),
2233 "local-1662532784305".to_string(),
2234 ),
2235 ("added-data-files".to_string(), "4".to_string()),
2236 ("added-records".to_string(), "4".to_string()),
2237 ("added-files-size".to_string(), "6001".to_string()),
2238 ]),
2239 })
2240 .build();
2241
2242 let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2243 builder.add_snapshot(snapshot).unwrap_err();
2244 }
2245
2246 #[test]
2247 fn test_add_incompatible_current_schema_fails() {
2248 let builder = builder_without_changes(FormatVersion::V2);
2249
2250 let added_schema = Schema::builder()
2251 .with_schema_id(1)
2252 .with_fields(vec![])
2253 .build()
2254 .unwrap();
2255
2256 let err = builder
2257 .add_current_schema(added_schema)
2258 .unwrap()
2259 .build()
2260 .unwrap_err();
2261
2262 assert!(
2263 err.to_string()
2264 .contains("Cannot find partition source field")
2265 );
2266 }
2267
2268 #[test]
2269 fn test_add_partition_spec_for_v1_requires_sequential_ids() {
2270 let builder = builder_without_changes(FormatVersion::V1);
2271
2272 let added_spec = UnboundPartitionSpec::builder()
2273 .with_spec_id(10)
2274 .add_partition_fields(vec![
2275 UnboundPartitionField {
2276 name: "y".to_string(),
2277 transform: Transform::Identity,
2278 source_id: 2,
2279 field_id: Some(1000),
2280 },
2281 UnboundPartitionField {
2282 name: "z".to_string(),
2283 transform: Transform::Identity,
2284 source_id: 3,
2285 field_id: Some(1002),
2286 },
2287 ])
2288 .unwrap()
2289 .build();
2290
2291 let err = builder.add_partition_spec(added_spec).unwrap_err();
2292 assert!(err.to_string().contains(
2293 "Cannot add partition spec with non-sequential field ids to format version 1 table"
2294 ));
2295 }
2296
2297 #[test]
2298 fn test_expire_metadata_log() {
2299 let builder = builder_without_changes(FormatVersion::V2);
2300 let metadata = builder
2301 .set_properties(HashMap::from_iter(vec![(
2302 PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
2303 "2".to_string(),
2304 )]))
2305 .unwrap()
2306 .build()
2307 .unwrap();
2308 assert_eq!(metadata.metadata.metadata_log.len(), 1);
2309 assert_eq!(metadata.expired_metadata_logs.len(), 0);
2310
2311 let metadata = metadata
2312 .metadata
2313 .into_builder(Some("path2".to_string()))
2314 .set_properties(HashMap::from_iter(vec![(
2315 "change_nr".to_string(),
2316 "1".to_string(),
2317 )]))
2318 .unwrap()
2319 .build()
2320 .unwrap();
2321
2322 assert_eq!(metadata.metadata.metadata_log.len(), 2);
2323 assert_eq!(metadata.expired_metadata_logs.len(), 0);
2324
2325 let metadata = metadata
2326 .metadata
2327 .into_builder(Some("path2".to_string()))
2328 .set_properties(HashMap::from_iter(vec![(
2329 "change_nr".to_string(),
2330 "2".to_string(),
2331 )]))
2332 .unwrap()
2333 .build()
2334 .unwrap();
2335 assert_eq!(metadata.metadata.metadata_log.len(), 2);
2336 assert_eq!(metadata.expired_metadata_logs.len(), 1);
2337 }
2338
2339 #[test]
2340 fn test_v2_sequence_number_cannot_decrease() {
2341 let builder = builder_without_changes(FormatVersion::V2);
2342
2343 let snapshot = Snapshot::builder()
2344 .with_snapshot_id(1)
2345 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2346 .with_sequence_number(1)
2347 .with_schema_id(0)
2348 .with_manifest_list("/snap-1")
2349 .with_summary(Summary {
2350 operation: Operation::Append,
2351 additional_properties: HashMap::new(),
2352 })
2353 .build();
2354
2355 let builder = builder
2356 .add_snapshot(snapshot.clone())
2357 .unwrap()
2358 .set_ref(MAIN_BRANCH, SnapshotReference {
2359 snapshot_id: 1,
2360 retention: SnapshotRetention::Branch {
2361 min_snapshots_to_keep: Some(10),
2362 max_snapshot_age_ms: None,
2363 max_ref_age_ms: None,
2364 },
2365 })
2366 .unwrap();
2367
2368 let snapshot = Snapshot::builder()
2369 .with_snapshot_id(2)
2370 .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2371 .with_sequence_number(0)
2372 .with_schema_id(0)
2373 .with_manifest_list("/snap-0")
2374 .with_parent_snapshot_id(Some(1))
2375 .with_summary(Summary {
2376 operation: Operation::Append,
2377 additional_properties: HashMap::new(),
2378 })
2379 .build();
2380
2381 let err = builder
2382 .set_branch_snapshot(snapshot, MAIN_BRANCH)
2383 .unwrap_err();
2384 assert!(
2385 err.to_string()
2386 .contains("Cannot add snapshot with sequence number")
2387 );
2388 }
2389
2390 #[test]
2391 fn test_default_spec_cannot_be_removed() {
2392 let builder = builder_without_changes(FormatVersion::V2);
2393
2394 builder.remove_partition_specs(&[0]).unwrap_err();
2395 }
2396
2397 #[test]
2398 fn test_statistics() {
2399 let builder = builder_without_changes(FormatVersion::V2);
2400
2401 let statistics = StatisticsFile {
2402 snapshot_id: 3055729675574597004,
2403 statistics_path: "s3://a/b/stats.puffin".to_string(),
2404 file_size_in_bytes: 413,
2405 file_footer_size_in_bytes: 42,
2406 key_metadata: None,
2407 blob_metadata: vec![BlobMetadata {
2408 snapshot_id: 3055729675574597004,
2409 sequence_number: 1,
2410 fields: vec![1],
2411 r#type: "ndv".to_string(),
2412 properties: HashMap::new(),
2413 }],
2414 };
2415 let build_result = builder.set_statistics(statistics.clone()).build().unwrap();
2416
2417 assert_eq!(
2418 build_result.metadata.statistics,
2419 HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2420 );
2421 assert_eq!(build_result.changes, vec![TableUpdate::SetStatistics {
2422 statistics: statistics.clone()
2423 }]);
2424
2425 let builder = build_result.metadata.into_builder(None);
2427 let build_result = builder
2428 .remove_statistics(statistics.snapshot_id)
2429 .build()
2430 .unwrap();
2431
2432 assert_eq!(build_result.metadata.statistics.len(), 0);
2433 assert_eq!(build_result.changes, vec![TableUpdate::RemoveStatistics {
2434 snapshot_id: statistics.snapshot_id
2435 }]);
2436
2437 let builder = build_result.metadata.into_builder(None);
2439 let build_result = builder
2440 .remove_statistics(statistics.snapshot_id)
2441 .build()
2442 .unwrap();
2443 assert_eq!(build_result.metadata.statistics.len(), 0);
2444 assert_eq!(build_result.changes.len(), 0);
2445 }
2446
2447 #[test]
2448 fn test_add_partition_statistics() {
2449 let builder = builder_without_changes(FormatVersion::V2);
2450
2451 let statistics = PartitionStatisticsFile {
2452 snapshot_id: 3055729675574597004,
2453 statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2454 file_size_in_bytes: 43,
2455 };
2456
2457 let build_result = builder
2458 .set_partition_statistics(statistics.clone())
2459 .build()
2460 .unwrap();
2461 assert_eq!(
2462 build_result.metadata.partition_statistics,
2463 HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2464 );
2465 assert_eq!(build_result.changes, vec![
2466 TableUpdate::SetPartitionStatistics {
2467 partition_statistics: statistics.clone()
2468 }
2469 ]);
2470
2471 let builder = build_result.metadata.into_builder(None);
2473 let build_result = builder
2474 .remove_partition_statistics(statistics.snapshot_id)
2475 .build()
2476 .unwrap();
2477 assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2478 assert_eq!(build_result.changes, vec![
2479 TableUpdate::RemovePartitionStatistics {
2480 snapshot_id: statistics.snapshot_id
2481 }
2482 ]);
2483
2484 let builder = build_result.metadata.into_builder(None);
2486 let build_result = builder
2487 .remove_partition_statistics(statistics.snapshot_id)
2488 .build()
2489 .unwrap();
2490 assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2491 assert_eq!(build_result.changes.len(), 0);
2492 }
2493
2494 #[test]
2495 fn last_update_increased_for_property_only_update() {
2496 let builder = builder_without_changes(FormatVersion::V2);
2497
2498 let metadata = builder.build().unwrap().metadata;
2499 let last_updated_ms = metadata.last_updated_ms;
2500 sleep(std::time::Duration::from_millis(2));
2501
2502 let build_result = metadata
2503 .into_builder(Some(
2504 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2505 ))
2506 .set_properties(HashMap::from_iter(vec![(
2507 "foo".to_string(),
2508 "bar".to_string(),
2509 )]))
2510 .unwrap()
2511 .build()
2512 .unwrap();
2513
2514 assert!(
2515 build_result.metadata.last_updated_ms > last_updated_ms,
2516 "{} > {}",
2517 build_result.metadata.last_updated_ms,
2518 last_updated_ms
2519 );
2520 }
2521
2522 #[test]
2523 fn test_construct_default_main_branch() {
2524 let file = File::open(format!(
2526 "{}/testdata/table_metadata/{}",
2527 env!("CARGO_MANIFEST_DIR"),
2528 "TableMetadataV2Valid.json"
2529 ))
2530 .unwrap();
2531 let reader = BufReader::new(file);
2532 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2533
2534 let table = Table::builder()
2535 .metadata(resp)
2536 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2537 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2538 .file_io(FileIOBuilder::new("memory").build().unwrap())
2539 .build()
2540 .unwrap();
2541
2542 assert_eq!(
2543 table.metadata().refs.get(MAIN_BRANCH).unwrap().snapshot_id,
2544 table.metadata().current_snapshot_id().unwrap()
2545 );
2546 }
2547
2548 #[test]
2549 fn test_active_schema_cannot_be_removed() {
2550 let builder = builder_without_changes(FormatVersion::V2);
2551 builder.remove_schemas(&[0]).unwrap_err();
2552 }
2553
2554 #[test]
2555 fn test_remove_schemas() {
2556 let file = File::open(format!(
2557 "{}/testdata/table_metadata/{}",
2558 env!("CARGO_MANIFEST_DIR"),
2559 "TableMetadataV2Valid.json"
2560 ))
2561 .unwrap();
2562 let reader = BufReader::new(file);
2563 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2564
2565 let table = Table::builder()
2566 .metadata(resp)
2567 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2568 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2569 .file_io(FileIOBuilder::new("memory").build().unwrap())
2570 .build()
2571 .unwrap();
2572
2573 assert_eq!(2, table.metadata().schemas.len());
2574
2575 {
2576 let meta_data_builder = table.metadata().clone().into_builder(None);
2578 meta_data_builder.remove_schemas(&[1]).unwrap_err();
2579 }
2580
2581 let mut meta_data_builder = table.metadata().clone().into_builder(None);
2582 meta_data_builder = meta_data_builder.remove_schemas(&[0]).unwrap();
2583 let build_result = meta_data_builder.build().unwrap();
2584 assert_eq!(1, build_result.metadata.schemas.len());
2585 assert_eq!(1, build_result.metadata.current_schema_id);
2586 assert_eq!(1, build_result.metadata.current_schema().schema_id());
2587 assert_eq!(1, build_result.changes.len());
2588
2589 let remove_schema_ids =
2590 if let TableUpdate::RemoveSchemas { schema_ids } = &build_result.changes[0] {
2591 schema_ids
2592 } else {
2593 unreachable!("Expected RemoveSchema change")
2594 };
2595 assert_eq!(remove_schema_ids, &[0]);
2596 }
2597
2598 #[test]
2599 fn test_schema_evolution_now_correctly_validates_partition_field_name_conflicts() {
2600 let initial_schema = Schema::builder()
2601 .with_fields(vec![
2602 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2603 ])
2604 .build()
2605 .unwrap();
2606
2607 let partition_spec_with_bucket = UnboundPartitionSpec::builder()
2608 .with_spec_id(0)
2609 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2610 .unwrap()
2611 .build();
2612
2613 let metadata = TableMetadataBuilder::new(
2614 initial_schema,
2615 partition_spec_with_bucket,
2616 SortOrder::unsorted_order(),
2617 TEST_LOCATION.to_string(),
2618 FormatVersion::V2,
2619 HashMap::new(),
2620 )
2621 .unwrap()
2622 .build()
2623 .unwrap()
2624 .metadata;
2625
2626 let partition_field_names: Vec<String> = metadata
2627 .default_partition_spec()
2628 .fields()
2629 .iter()
2630 .map(|f| f.name.clone())
2631 .collect();
2632 assert!(partition_field_names.contains(&"bucket_data".to_string()));
2633
2634 let evolved_schema = Schema::builder()
2635 .with_fields(vec![
2636 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2637 NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
2639 ])
2640 .build()
2641 .unwrap();
2642
2643 let builder = metadata.into_builder(Some(
2644 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2645 ));
2646
2647 let result = builder.add_current_schema(evolved_schema);
2649
2650 assert!(result.is_err());
2651 let error = result.unwrap_err();
2652 let error_message = error.message();
2653 assert!(error_message.contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2654 assert!(error_message.contains("Schema evolution cannot introduce field names that match existing partition field names"));
2655 }
2656
2657 #[test]
2658 fn test_schema_evolution_should_validate_on_schema_add_not_metadata_build() {
2659 let initial_schema = Schema::builder()
2660 .with_fields(vec![
2661 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2662 ])
2663 .build()
2664 .unwrap();
2665
2666 let partition_spec = UnboundPartitionSpec::builder()
2667 .with_spec_id(0)
2668 .add_partition_field(1, "partition_col", Transform::Bucket(16))
2669 .unwrap()
2670 .build();
2671
2672 let metadata = TableMetadataBuilder::new(
2673 initial_schema,
2674 partition_spec,
2675 SortOrder::unsorted_order(),
2676 TEST_LOCATION.to_string(),
2677 FormatVersion::V2,
2678 HashMap::new(),
2679 )
2680 .unwrap()
2681 .build()
2682 .unwrap()
2683 .metadata;
2684
2685 let non_conflicting_schema = Schema::builder()
2686 .with_fields(vec![
2687 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2688 NestedField::required(2, "new_field", Type::Primitive(PrimitiveType::Int)).into(),
2689 ])
2690 .build()
2691 .unwrap();
2692
2693 let result = metadata
2695 .clone()
2696 .into_builder(Some("test_location".to_string()))
2697 .add_current_schema(non_conflicting_schema)
2698 .unwrap()
2699 .build();
2700
2701 assert!(result.is_ok());
2702 }
2703
2704 #[test]
2705 fn test_partition_spec_evolution_validates_schema_field_name_conflicts() {
2706 let initial_schema = Schema::builder()
2707 .with_fields(vec![
2708 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2709 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2710 .into(),
2711 ])
2712 .build()
2713 .unwrap();
2714
2715 let partition_spec = UnboundPartitionSpec::builder()
2716 .with_spec_id(0)
2717 .add_partition_field(1, "data_bucket", Transform::Bucket(16))
2718 .unwrap()
2719 .build();
2720
2721 let metadata = TableMetadataBuilder::new(
2722 initial_schema,
2723 partition_spec,
2724 SortOrder::unsorted_order(),
2725 TEST_LOCATION.to_string(),
2726 FormatVersion::V2,
2727 HashMap::new(),
2728 )
2729 .unwrap()
2730 .build()
2731 .unwrap()
2732 .metadata;
2733
2734 let builder = metadata.into_builder(Some(
2735 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2736 ));
2737
2738 let conflicting_partition_spec = UnboundPartitionSpec::builder()
2739 .with_spec_id(1)
2740 .add_partition_field(1, "existing_field", Transform::Bucket(8))
2741 .unwrap()
2742 .build();
2743
2744 let result = builder.add_partition_spec(conflicting_partition_spec);
2745
2746 assert!(result.is_err());
2747 let error = result.unwrap_err();
2748 let error_message = error.message();
2749 assert!(error_message.contains(
2751 "Cannot create partition with name 'existing_field' that conflicts with schema field"
2752 ));
2753 assert!(error_message.contains("and is not an identity transform"));
2754 }
2755
2756 #[test]
2757 fn test_schema_evolution_validates_against_all_historical_schemas() {
2758 let initial_schema = Schema::builder()
2760 .with_fields(vec![
2761 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2762 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2763 .into(),
2764 ])
2765 .build()
2766 .unwrap();
2767
2768 let partition_spec = UnboundPartitionSpec::builder()
2769 .with_spec_id(0)
2770 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2771 .unwrap()
2772 .build();
2773
2774 let metadata = TableMetadataBuilder::new(
2775 initial_schema,
2776 partition_spec,
2777 SortOrder::unsorted_order(),
2778 TEST_LOCATION.to_string(),
2779 FormatVersion::V2,
2780 HashMap::new(),
2781 )
2782 .unwrap()
2783 .build()
2784 .unwrap()
2785 .metadata;
2786
2787 let second_schema = Schema::builder()
2789 .with_fields(vec![
2790 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2791 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2792 .into(),
2793 ])
2794 .build()
2795 .unwrap();
2796
2797 let metadata = metadata
2798 .into_builder(Some("test_location".to_string()))
2799 .add_current_schema(second_schema)
2800 .unwrap()
2801 .build()
2802 .unwrap()
2803 .metadata;
2804
2805 let third_schema = Schema::builder()
2809 .with_fields(vec![
2810 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2811 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2812 .into(),
2813 NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2814 .into(),
2815 ])
2816 .build()
2817 .unwrap();
2818
2819 let builder = metadata
2820 .clone()
2821 .into_builder(Some("test_location".to_string()));
2822
2823 let result = builder.add_current_schema(third_schema);
2825 assert!(result.is_ok());
2826
2827 let conflicting_schema = Schema::builder()
2830 .with_fields(vec![
2831 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2832 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2833 .into(),
2834 NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2835 .into(),
2836 NestedField::required(5, "bucket_data", Type::Primitive(PrimitiveType::String))
2837 .into(), ])
2839 .build()
2840 .unwrap();
2841
2842 let builder2 = metadata.into_builder(Some("test_location".to_string()));
2843 let result2 = builder2.add_current_schema(conflicting_schema);
2844
2845 assert!(result2.is_err());
2848 let error = result2.unwrap_err();
2849 assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2850 }
2851
2852 #[test]
2853 fn test_schema_evolution_allows_existing_partition_field_if_exists_in_historical_schema() {
2854 let initial_schema = Schema::builder()
2856 .with_fields(vec![
2857 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2858 NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
2859 .into(),
2860 ])
2861 .build()
2862 .unwrap();
2863
2864 let partition_spec = UnboundPartitionSpec::builder()
2865 .with_spec_id(0)
2866 .add_partition_field(2, "partition_data", Transform::Identity)
2867 .unwrap()
2868 .build();
2869
2870 let metadata = TableMetadataBuilder::new(
2871 initial_schema,
2872 partition_spec,
2873 SortOrder::unsorted_order(),
2874 TEST_LOCATION.to_string(),
2875 FormatVersion::V2,
2876 HashMap::new(),
2877 )
2878 .unwrap()
2879 .build()
2880 .unwrap()
2881 .metadata;
2882
2883 let evolved_schema = Schema::builder()
2885 .with_fields(vec![
2886 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2887 NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
2888 .into(),
2889 NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2890 .into(),
2891 ])
2892 .build()
2893 .unwrap();
2894
2895 let result = metadata
2897 .into_builder(Some("test_location".to_string()))
2898 .add_current_schema(evolved_schema);
2899
2900 assert!(result.is_ok());
2901 }
2902
2903 #[test]
2904 fn test_schema_evolution_prevents_new_field_conflicting_with_partition_field() {
2905 let initial_schema = Schema::builder()
2907 .with_fields(vec![
2908 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2909 ])
2910 .build()
2911 .unwrap();
2912
2913 let partition_spec = UnboundPartitionSpec::builder()
2914 .with_spec_id(0)
2915 .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2916 .unwrap()
2917 .build();
2918
2919 let metadata = TableMetadataBuilder::new(
2920 initial_schema,
2921 partition_spec,
2922 SortOrder::unsorted_order(),
2923 TEST_LOCATION.to_string(),
2924 FormatVersion::V2,
2925 HashMap::new(),
2926 )
2927 .unwrap()
2928 .build()
2929 .unwrap()
2930 .metadata;
2931
2932 let conflicting_schema = Schema::builder()
2934 .with_fields(vec![
2935 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2936 NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
2938 ])
2939 .build()
2940 .unwrap();
2941
2942 let builder = metadata.into_builder(Some("test_location".to_string()));
2943 let result = builder.add_current_schema(conflicting_schema);
2944
2945 assert!(result.is_err());
2948 let error = result.unwrap_err();
2949 assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2950 }
2951
2952 #[test]
2953 fn test_partition_spec_evolution_allows_non_conflicting_names() {
2954 let initial_schema = Schema::builder()
2955 .with_fields(vec![
2956 NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2957 NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2958 .into(),
2959 ])
2960 .build()
2961 .unwrap();
2962
2963 let partition_spec = UnboundPartitionSpec::builder()
2964 .with_spec_id(0)
2965 .add_partition_field(1, "data_bucket", Transform::Bucket(16))
2966 .unwrap()
2967 .build();
2968
2969 let metadata = TableMetadataBuilder::new(
2970 initial_schema,
2971 partition_spec,
2972 SortOrder::unsorted_order(),
2973 TEST_LOCATION.to_string(),
2974 FormatVersion::V2,
2975 HashMap::new(),
2976 )
2977 .unwrap()
2978 .build()
2979 .unwrap()
2980 .metadata;
2981
2982 let builder = metadata.into_builder(Some(
2983 "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2984 ));
2985
2986 let non_conflicting_partition_spec = UnboundPartitionSpec::builder()
2988 .with_spec_id(1)
2989 .add_partition_field(2, "new_partition_field", Transform::Bucket(8))
2990 .unwrap()
2991 .build();
2992
2993 let result = builder.add_partition_spec(non_conflicting_partition_spec);
2994
2995 assert!(result.is_ok());
2996 }
2997}