1use std::sync::Arc;
22
23use itertools::Itertools;
24use serde::{Deserialize, Serialize};
25use typed_builder::TypedBuilder;
26
27use super::transform::Transform;
28use super::{NestedField, Schema, SchemaRef, StructType};
29use crate::spec::Struct;
30use crate::{Error, ErrorKind, Result};
31
32pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999;
33pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0;
34
35#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
37#[serde(rename_all = "kebab-case")]
38pub struct PartitionField {
39 pub source_id: i32,
41 pub field_id: i32,
44 pub name: String,
46 pub transform: Transform,
48}
49
50impl PartitionField {
51 pub fn into_unbound(self) -> UnboundPartitionField {
53 self.into()
54 }
55}
56
57pub type PartitionSpecRef = Arc<PartitionSpec>;
59#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
66#[serde(rename_all = "kebab-case")]
67pub struct PartitionSpec {
68 spec_id: i32,
70 fields: Vec<PartitionField>,
72}
73
74impl PartitionSpec {
75 pub fn builder(schema: impl Into<SchemaRef>) -> PartitionSpecBuilder {
77 PartitionSpecBuilder::new(schema)
78 }
79
80 pub fn fields(&self) -> &[PartitionField] {
82 &self.fields
83 }
84
85 pub fn spec_id(&self) -> i32 {
87 self.spec_id
88 }
89
90 pub fn unpartition_spec() -> Self {
92 Self {
93 spec_id: DEFAULT_PARTITION_SPEC_ID,
94 fields: vec![],
95 }
96 }
97
98 pub fn is_unpartitioned(&self) -> bool {
102 self.fields.is_empty() || self.fields.iter().all(|f| f.transform == Transform::Void)
103 }
104
105 pub fn partition_type(&self, schema: &Schema) -> Result<StructType> {
107 PartitionSpecBuilder::partition_type(&self.fields, schema)
108 }
109
110 pub fn into_unbound(self) -> UnboundPartitionSpec {
112 self.into()
113 }
114
115 pub fn with_spec_id(self, spec_id: i32) -> Self {
117 Self { spec_id, ..self }
118 }
119
120 pub fn has_sequential_ids(&self) -> bool {
124 has_sequential_ids(self.fields.iter().map(|f| f.field_id))
125 }
126
127 pub fn highest_field_id(&self) -> Option<i32> {
129 self.fields.iter().map(|f| f.field_id).max()
130 }
131
132 pub fn is_compatible_with(&self, other: &PartitionSpec) -> bool {
142 if self.fields.len() != other.fields.len() {
143 return false;
144 }
145
146 for (this_field, other_field) in self.fields.iter().zip(other.fields.iter()) {
147 if this_field.source_id != other_field.source_id
148 || this_field.name != other_field.name
149 || this_field.transform != other_field.transform
150 {
151 return false;
152 }
153 }
154
155 true
156 }
157
158 pub(crate) fn partition_to_path(&self, data: &Struct, schema: SchemaRef) -> String {
159 let partition_type = self.partition_type(&schema).unwrap();
160 let field_types = partition_type.fields();
161
162 self.fields
163 .iter()
164 .enumerate()
165 .map(|(i, field)| {
166 let value = data[i].as_ref();
167 format!(
168 "{}={}",
169 field.name,
170 field
171 .transform
172 .to_human_string(&field_types[i].field_type, value)
173 )
174 })
175 .join("/")
176 }
177}
178
179#[derive(Clone, Debug)]
182pub struct PartitionKey {
183 spec: PartitionSpec,
185 schema: SchemaRef,
187 data: Struct,
189}
190
191impl PartitionKey {
192 pub fn new(spec: PartitionSpec, schema: SchemaRef, data: Struct) -> Self {
194 Self { spec, schema, data }
195 }
196
197 pub fn to_path(&self) -> String {
199 self.spec.partition_to_path(&self.data, self.schema.clone())
200 }
201
202 pub fn is_effectively_none(partition_key: Option<&PartitionKey>) -> bool {
205 match partition_key {
206 None => true,
207 Some(pk) => pk.spec.is_unpartitioned(),
208 }
209 }
210}
211
212pub type UnboundPartitionSpecRef = Arc<UnboundPartitionSpec>;
214#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
216#[serde(rename_all = "kebab-case")]
217pub struct UnboundPartitionField {
218 pub source_id: i32,
220 #[builder(default, setter(strip_option(fallback = field_id_opt)))]
223 pub field_id: Option<i32>,
224 pub name: String,
226 pub transform: Transform,
228}
229
230#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
234#[serde(rename_all = "kebab-case")]
235pub struct UnboundPartitionSpec {
236 pub(crate) spec_id: Option<i32>,
238 pub(crate) fields: Vec<UnboundPartitionField>,
240}
241
242impl UnboundPartitionSpec {
243 pub fn builder() -> UnboundPartitionSpecBuilder {
245 UnboundPartitionSpecBuilder::default()
246 }
247
248 pub fn bind(self, schema: impl Into<SchemaRef>) -> Result<PartitionSpec> {
250 PartitionSpecBuilder::new_from_unbound(self, schema)?.build()
251 }
252
253 pub fn spec_id(&self) -> Option<i32> {
255 self.spec_id
256 }
257
258 pub fn fields(&self) -> &[UnboundPartitionField] {
260 &self.fields
261 }
262
263 pub fn with_spec_id(self, spec_id: i32) -> Self {
265 Self {
266 spec_id: Some(spec_id),
267 ..self
268 }
269 }
270}
271
272fn has_sequential_ids(field_ids: impl Iterator<Item = i32>) -> bool {
273 for (index, field_id) in field_ids.enumerate() {
274 let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)
275 .checked_add(1)
276 .and_then(|id| id.checked_add(index as i64))
277 .unwrap_or(i64::MAX);
278
279 if field_id as i64 != expected_id {
280 return false;
281 }
282 }
283
284 true
285}
286
287impl From<PartitionField> for UnboundPartitionField {
288 fn from(field: PartitionField) -> Self {
289 UnboundPartitionField {
290 source_id: field.source_id,
291 field_id: Some(field.field_id),
292 name: field.name,
293 transform: field.transform,
294 }
295 }
296}
297
298impl From<PartitionSpec> for UnboundPartitionSpec {
299 fn from(spec: PartitionSpec) -> Self {
300 UnboundPartitionSpec {
301 spec_id: Some(spec.spec_id),
302 fields: spec.fields.into_iter().map(Into::into).collect(),
303 }
304 }
305}
306
307#[derive(Debug, Default)]
309pub struct UnboundPartitionSpecBuilder {
310 spec_id: Option<i32>,
311 fields: Vec<UnboundPartitionField>,
312}
313
314impl UnboundPartitionSpecBuilder {
315 pub fn new() -> Self {
317 Self {
318 spec_id: None,
319 fields: vec![],
320 }
321 }
322
323 pub fn with_spec_id(mut self, spec_id: i32) -> Self {
325 self.spec_id = Some(spec_id);
326 self
327 }
328
329 pub fn add_partition_field(
331 self,
332 source_id: i32,
333 target_name: impl ToString,
334 transformation: Transform,
335 ) -> Result<Self> {
336 let field = UnboundPartitionField {
337 source_id,
338 field_id: None,
339 name: target_name.to_string(),
340 transform: transformation,
341 };
342 self.add_partition_field_internal(field)
343 }
344
345 pub fn add_partition_fields(
347 self,
348 fields: impl IntoIterator<Item = UnboundPartitionField>,
349 ) -> Result<Self> {
350 let mut builder = self;
351 for field in fields {
352 builder = builder.add_partition_field_internal(field)?;
353 }
354 Ok(builder)
355 }
356
357 fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result<Self> {
358 self.check_name_set_and_unique(&field.name)?;
359 self.check_for_redundant_partitions(field.source_id, &field.transform)?;
360 if let Some(partition_field_id) = field.field_id {
361 self.check_partition_id_unique(partition_field_id)?;
362 }
363 self.fields.push(field);
364 Ok(self)
365 }
366
367 pub fn build(self) -> UnboundPartitionSpec {
369 UnboundPartitionSpec {
370 spec_id: self.spec_id,
371 fields: self.fields,
372 }
373 }
374}
375
376#[derive(Debug)]
378pub struct PartitionSpecBuilder {
379 spec_id: Option<i32>,
380 last_assigned_field_id: i32,
381 fields: Vec<UnboundPartitionField>,
382 schema: SchemaRef,
383}
384
385impl PartitionSpecBuilder {
386 pub fn new(schema: impl Into<SchemaRef>) -> Self {
388 Self {
389 spec_id: None,
390 fields: vec![],
391 last_assigned_field_id: UNPARTITIONED_LAST_ASSIGNED_ID,
392 schema: schema.into(),
393 }
394 }
395
396 pub fn new_from_unbound(
398 unbound: UnboundPartitionSpec,
399 schema: impl Into<SchemaRef>,
400 ) -> Result<Self> {
401 let mut builder =
402 Self::new(schema).with_spec_id(unbound.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID));
403
404 for field in unbound.fields {
405 builder = builder.add_unbound_field(field)?;
406 }
407 Ok(builder)
408 }
409
410 pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self {
416 self.last_assigned_field_id = last_assigned_field_id;
417 self
418 }
419
420 pub fn with_spec_id(mut self, spec_id: i32) -> Self {
422 self.spec_id = Some(spec_id);
423 self
424 }
425
426 pub fn add_partition_field(
428 self,
429 source_name: impl AsRef<str>,
430 target_name: impl Into<String>,
431 transform: Transform,
432 ) -> Result<Self> {
433 let source_id = self
434 .schema
435 .field_by_name(source_name.as_ref())
436 .ok_or_else(|| {
437 Error::new(
438 ErrorKind::DataInvalid,
439 format!(
440 "Cannot find source column with name: {} in schema",
441 source_name.as_ref()
442 ),
443 )
444 })?
445 .id;
446 let field = UnboundPartitionField {
447 source_id,
448 field_id: None,
449 name: target_name.into(),
450 transform,
451 };
452
453 self.add_unbound_field(field)
454 }
455
456 pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> Result<Self> {
461 self.check_name_set_and_unique(&field.name)?;
462 self.check_for_redundant_partitions(field.source_id, &field.transform)?;
463 Self::check_name_does_not_collide_with_schema(&field, &self.schema)?;
464 Self::check_transform_compatibility(&field, &self.schema)?;
465 if let Some(partition_field_id) = field.field_id {
466 self.check_partition_id_unique(partition_field_id)?;
467 }
468
469 self.fields.push(field);
471 Ok(self)
472 }
473
474 pub fn add_unbound_fields(
476 self,
477 fields: impl IntoIterator<Item = UnboundPartitionField>,
478 ) -> Result<Self> {
479 let mut builder = self;
480 for field in fields {
481 builder = builder.add_unbound_field(field)?;
482 }
483 Ok(builder)
484 }
485
486 pub fn build(self) -> Result<PartitionSpec> {
488 let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id)?;
489 Ok(PartitionSpec {
490 spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID),
491 fields,
492 })
493 }
494
495 fn set_field_ids(
496 fields: Vec<UnboundPartitionField>,
497 last_assigned_field_id: i32,
498 ) -> Result<Vec<PartitionField>> {
499 let mut last_assigned_field_id = last_assigned_field_id;
500 let assigned_ids = fields
503 .iter()
504 .filter_map(|f| f.field_id)
505 .collect::<std::collections::HashSet<_>>();
506
507 fn _check_add_1(prev: i32) -> Result<i32> {
508 prev.checked_add(1).ok_or_else(|| {
509 Error::new(
510 ErrorKind::DataInvalid,
511 "Cannot assign more partition ids. Overflow.",
512 )
513 })
514 }
515
516 let mut bound_fields = Vec::with_capacity(fields.len());
517 for field in fields.into_iter() {
518 let partition_field_id = if let Some(partition_field_id) = field.field_id {
519 last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_field_id);
520 partition_field_id
521 } else {
522 last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
523 while assigned_ids.contains(&last_assigned_field_id) {
524 last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
525 }
526 last_assigned_field_id
527 };
528
529 bound_fields.push(PartitionField {
530 source_id: field.source_id,
531 field_id: partition_field_id,
532 name: field.name,
533 transform: field.transform,
534 })
535 }
536
537 Ok(bound_fields)
538 }
539
540 fn partition_type(fields: &Vec<PartitionField>, schema: &Schema) -> Result<StructType> {
542 let mut struct_fields = Vec::with_capacity(fields.len());
543 for partition_field in fields {
544 let field = schema
545 .field_by_id(partition_field.source_id)
546 .ok_or_else(|| {
547 Error::new(
548 ErrorKind::Unexpected,
551 format!(
552 "No column with source column id {} in schema {:?}",
553 partition_field.source_id, schema
554 ),
555 )
556 })?;
557 let res_type = partition_field.transform.result_type(&field.field_type)?;
558 let field =
559 NestedField::optional(partition_field.field_id, &partition_field.name, res_type)
560 .into();
561 struct_fields.push(field);
562 }
563 Ok(StructType::new(struct_fields))
564 }
565
566 fn check_name_does_not_collide_with_schema(
571 field: &UnboundPartitionField,
572 schema: &Schema,
573 ) -> Result<()> {
574 match schema.field_by_name(field.name.as_str()) {
575 Some(schema_collision) => {
576 if field.transform == Transform::Identity {
577 if schema_collision.id == field.source_id {
578 Ok(())
579 } else {
580 Err(Error::new(
581 ErrorKind::DataInvalid,
582 format!(
583 "Cannot create identity partition sourced from different field in schema. Field name '{}' has id `{}` in schema but partition source id is `{}`",
584 field.name, schema_collision.id, field.source_id
585 ),
586 ))
587 }
588 } else {
589 Err(Error::new(
590 ErrorKind::DataInvalid,
591 format!(
592 "Cannot create partition with name: '{}' that conflicts with schema field and is not an identity transform.",
593 field.name
594 ),
595 ))
596 }
597 }
598 None => Ok(()),
599 }
600 }
601
602 fn check_transform_compatibility(field: &UnboundPartitionField, schema: &Schema) -> Result<()> {
605 let schema_field = schema.field_by_id(field.source_id).ok_or_else(|| {
606 Error::new(
607 ErrorKind::DataInvalid,
608 format!(
609 "Cannot find partition source field with id `{}` in schema",
610 field.source_id
611 ),
612 )
613 })?;
614
615 if field.transform != Transform::Void {
616 if !schema_field.field_type.is_primitive() {
617 return Err(Error::new(
618 ErrorKind::DataInvalid,
619 format!(
620 "Cannot partition by non-primitive source field: '{}'.",
621 schema_field.field_type
622 ),
623 ));
624 }
625
626 if field
627 .transform
628 .result_type(&schema_field.field_type)
629 .is_err()
630 {
631 return Err(Error::new(
632 ErrorKind::DataInvalid,
633 format!(
634 "Invalid source type: '{}' for transform: '{}'.",
635 schema_field.field_type,
636 field.transform.dedup_name()
637 ),
638 ));
639 }
640 }
641
642 Ok(())
643 }
644}
645
646trait CorePartitionSpecValidator {
648 fn check_name_set_and_unique(&self, name: &str) -> Result<()> {
650 if name.is_empty() {
651 return Err(Error::new(
652 ErrorKind::DataInvalid,
653 "Cannot use empty partition name",
654 ));
655 }
656
657 if self.fields().iter().any(|f| f.name == name) {
658 return Err(Error::new(
659 ErrorKind::DataInvalid,
660 format!("Cannot use partition name more than once: {}", name),
661 ));
662 }
663 Ok(())
664 }
665
666 fn check_for_redundant_partitions(&self, source_id: i32, transform: &Transform) -> Result<()> {
668 let collision = self.fields().iter().find(|f| {
669 f.source_id == source_id && f.transform.dedup_name() == transform.dedup_name()
670 });
671
672 if let Some(collision) = collision {
673 Err(Error::new(
674 ErrorKind::DataInvalid,
675 format!(
676 "Cannot add redundant partition with source id `{}` and transform `{}`. A partition with the same source id and transform already exists with name `{}`",
677 source_id,
678 transform.dedup_name(),
679 collision.name
680 ),
681 ))
682 } else {
683 Ok(())
684 }
685 }
686
687 fn check_partition_id_unique(&self, field_id: i32) -> Result<()> {
689 if self.fields().iter().any(|f| f.field_id == Some(field_id)) {
690 return Err(Error::new(
691 ErrorKind::DataInvalid,
692 format!(
693 "Cannot use field id more than once in one PartitionSpec: {}",
694 field_id
695 ),
696 ));
697 }
698
699 Ok(())
700 }
701
702 fn fields(&self) -> &Vec<UnboundPartitionField>;
703}
704
705impl CorePartitionSpecValidator for PartitionSpecBuilder {
706 fn fields(&self) -> &Vec<UnboundPartitionField> {
707 &self.fields
708 }
709}
710
711impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder {
712 fn fields(&self) -> &Vec<UnboundPartitionField> {
713 &self.fields
714 }
715}
716
717#[cfg(test)]
718mod tests {
719 use super::*;
720 use crate::spec::{Literal, PrimitiveType, Type};
721
722 #[test]
723 fn test_partition_spec() {
724 let spec = r#"
725 {
726 "spec-id": 1,
727 "fields": [ {
728 "source-id": 4,
729 "field-id": 1000,
730 "name": "ts_day",
731 "transform": "day"
732 }, {
733 "source-id": 1,
734 "field-id": 1001,
735 "name": "id_bucket",
736 "transform": "bucket[16]"
737 }, {
738 "source-id": 2,
739 "field-id": 1002,
740 "name": "id_truncate",
741 "transform": "truncate[4]"
742 } ]
743 }
744 "#;
745
746 let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
747 assert_eq!(4, partition_spec.fields[0].source_id);
748 assert_eq!(1000, partition_spec.fields[0].field_id);
749 assert_eq!("ts_day", partition_spec.fields[0].name);
750 assert_eq!(Transform::Day, partition_spec.fields[0].transform);
751
752 assert_eq!(1, partition_spec.fields[1].source_id);
753 assert_eq!(1001, partition_spec.fields[1].field_id);
754 assert_eq!("id_bucket", partition_spec.fields[1].name);
755 assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
756
757 assert_eq!(2, partition_spec.fields[2].source_id);
758 assert_eq!(1002, partition_spec.fields[2].field_id);
759 assert_eq!("id_truncate", partition_spec.fields[2].name);
760 assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
761 }
762
763 #[test]
764 fn test_is_unpartitioned() {
765 let schema = Schema::builder()
766 .with_fields(vec![
767 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
768 .into(),
769 NestedField::required(
770 2,
771 "name",
772 Type::Primitive(crate::spec::PrimitiveType::String),
773 )
774 .into(),
775 ])
776 .build()
777 .unwrap();
778 let partition_spec = PartitionSpec::builder(schema.clone())
779 .with_spec_id(1)
780 .build()
781 .unwrap();
782 assert!(
783 partition_spec.is_unpartitioned(),
784 "Empty partition spec should be unpartitioned"
785 );
786
787 let partition_spec = PartitionSpec::builder(schema.clone())
788 .add_unbound_fields(vec![
789 UnboundPartitionField::builder()
790 .source_id(1)
791 .name("id".to_string())
792 .transform(Transform::Identity)
793 .build(),
794 UnboundPartitionField::builder()
795 .source_id(2)
796 .name("name_string".to_string())
797 .transform(Transform::Void)
798 .build(),
799 ])
800 .unwrap()
801 .with_spec_id(1)
802 .build()
803 .unwrap();
804 assert!(
805 !partition_spec.is_unpartitioned(),
806 "Partition spec with one non void transform should not be unpartitioned"
807 );
808
809 let partition_spec = PartitionSpec::builder(schema.clone())
810 .with_spec_id(1)
811 .add_unbound_fields(vec![
812 UnboundPartitionField::builder()
813 .source_id(1)
814 .name("id_void".to_string())
815 .transform(Transform::Void)
816 .build(),
817 UnboundPartitionField::builder()
818 .source_id(2)
819 .name("name_void".to_string())
820 .transform(Transform::Void)
821 .build(),
822 ])
823 .unwrap()
824 .build()
825 .unwrap();
826 assert!(
827 partition_spec.is_unpartitioned(),
828 "Partition spec with all void field should be unpartitioned"
829 );
830 }
831
832 #[test]
833 fn test_unbound_partition_spec() {
834 let spec = r#"
835 {
836 "spec-id": 1,
837 "fields": [ {
838 "source-id": 4,
839 "field-id": 1000,
840 "name": "ts_day",
841 "transform": "day"
842 }, {
843 "source-id": 1,
844 "field-id": 1001,
845 "name": "id_bucket",
846 "transform": "bucket[16]"
847 }, {
848 "source-id": 2,
849 "field-id": 1002,
850 "name": "id_truncate",
851 "transform": "truncate[4]"
852 } ]
853 }
854 "#;
855
856 let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
857 assert_eq!(Some(1), partition_spec.spec_id);
858
859 assert_eq!(4, partition_spec.fields[0].source_id);
860 assert_eq!(Some(1000), partition_spec.fields[0].field_id);
861 assert_eq!("ts_day", partition_spec.fields[0].name);
862 assert_eq!(Transform::Day, partition_spec.fields[0].transform);
863
864 assert_eq!(1, partition_spec.fields[1].source_id);
865 assert_eq!(Some(1001), partition_spec.fields[1].field_id);
866 assert_eq!("id_bucket", partition_spec.fields[1].name);
867 assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
868
869 assert_eq!(2, partition_spec.fields[2].source_id);
870 assert_eq!(Some(1002), partition_spec.fields[2].field_id);
871 assert_eq!("id_truncate", partition_spec.fields[2].name);
872 assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
873
874 let spec = r#"
875 {
876 "fields": [ {
877 "source-id": 4,
878 "name": "ts_day",
879 "transform": "day"
880 } ]
881 }
882 "#;
883 let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
884 assert_eq!(None, partition_spec.spec_id);
885
886 assert_eq!(4, partition_spec.fields[0].source_id);
887 assert_eq!(None, partition_spec.fields[0].field_id);
888 assert_eq!("ts_day", partition_spec.fields[0].name);
889 assert_eq!(Transform::Day, partition_spec.fields[0].transform);
890 }
891
892 #[test]
893 fn test_new_unpartition() {
894 let schema = Schema::builder()
895 .with_fields(vec![
896 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
897 .into(),
898 NestedField::required(
899 2,
900 "name",
901 Type::Primitive(crate::spec::PrimitiveType::String),
902 )
903 .into(),
904 ])
905 .build()
906 .unwrap();
907 let partition_spec = PartitionSpec::builder(schema.clone())
908 .with_spec_id(0)
909 .build()
910 .unwrap();
911 let partition_type = partition_spec.partition_type(&schema).unwrap();
912 assert_eq!(0, partition_type.fields().len());
913
914 let unpartition_spec = PartitionSpec::unpartition_spec();
915 assert_eq!(partition_spec, unpartition_spec);
916 }
917
918 #[test]
919 fn test_partition_type() {
920 let spec = r#"
921 {
922 "spec-id": 1,
923 "fields": [ {
924 "source-id": 4,
925 "field-id": 1000,
926 "name": "ts_day",
927 "transform": "day"
928 }, {
929 "source-id": 1,
930 "field-id": 1001,
931 "name": "id_bucket",
932 "transform": "bucket[16]"
933 }, {
934 "source-id": 2,
935 "field-id": 1002,
936 "name": "id_truncate",
937 "transform": "truncate[4]"
938 } ]
939 }
940 "#;
941
942 let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
943 let schema = Schema::builder()
944 .with_fields(vec![
945 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
946 .into(),
947 NestedField::required(
948 2,
949 "name",
950 Type::Primitive(crate::spec::PrimitiveType::String),
951 )
952 .into(),
953 NestedField::required(
954 3,
955 "ts",
956 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
957 )
958 .into(),
959 NestedField::required(
960 4,
961 "ts_day",
962 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
963 )
964 .into(),
965 NestedField::required(
966 5,
967 "id_bucket",
968 Type::Primitive(crate::spec::PrimitiveType::Int),
969 )
970 .into(),
971 NestedField::required(
972 6,
973 "id_truncate",
974 Type::Primitive(crate::spec::PrimitiveType::Int),
975 )
976 .into(),
977 ])
978 .build()
979 .unwrap();
980
981 let partition_type = partition_spec.partition_type(&schema).unwrap();
982 assert_eq!(3, partition_type.fields().len());
983 assert_eq!(
984 *partition_type.fields()[0],
985 NestedField::optional(
986 partition_spec.fields[0].field_id,
987 &partition_spec.fields[0].name,
988 Type::Primitive(crate::spec::PrimitiveType::Date)
989 )
990 );
991 assert_eq!(
992 *partition_type.fields()[1],
993 NestedField::optional(
994 partition_spec.fields[1].field_id,
995 &partition_spec.fields[1].name,
996 Type::Primitive(crate::spec::PrimitiveType::Int)
997 )
998 );
999 assert_eq!(
1000 *partition_type.fields()[2],
1001 NestedField::optional(
1002 partition_spec.fields[2].field_id,
1003 &partition_spec.fields[2].name,
1004 Type::Primitive(crate::spec::PrimitiveType::String)
1005 )
1006 );
1007 }
1008
1009 #[test]
1010 fn test_partition_empty() {
1011 let spec = r#"
1012 {
1013 "spec-id": 1,
1014 "fields": []
1015 }
1016 "#;
1017
1018 let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
1019 let schema = Schema::builder()
1020 .with_fields(vec![
1021 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1022 .into(),
1023 NestedField::required(
1024 2,
1025 "name",
1026 Type::Primitive(crate::spec::PrimitiveType::String),
1027 )
1028 .into(),
1029 NestedField::required(
1030 3,
1031 "ts",
1032 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1033 )
1034 .into(),
1035 NestedField::required(
1036 4,
1037 "ts_day",
1038 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1039 )
1040 .into(),
1041 NestedField::required(
1042 5,
1043 "id_bucket",
1044 Type::Primitive(crate::spec::PrimitiveType::Int),
1045 )
1046 .into(),
1047 NestedField::required(
1048 6,
1049 "id_truncate",
1050 Type::Primitive(crate::spec::PrimitiveType::Int),
1051 )
1052 .into(),
1053 ])
1054 .build()
1055 .unwrap();
1056
1057 let partition_type = partition_spec.partition_type(&schema).unwrap();
1058 assert_eq!(0, partition_type.fields().len());
1059 }
1060
1061 #[test]
1062 fn test_partition_error() {
1063 let spec = r#"
1064 {
1065 "spec-id": 1,
1066 "fields": [ {
1067 "source-id": 4,
1068 "field-id": 1000,
1069 "name": "ts_day",
1070 "transform": "day"
1071 }, {
1072 "source-id": 1,
1073 "field-id": 1001,
1074 "name": "id_bucket",
1075 "transform": "bucket[16]"
1076 }, {
1077 "source-id": 2,
1078 "field-id": 1002,
1079 "name": "id_truncate",
1080 "transform": "truncate[4]"
1081 } ]
1082 }
1083 "#;
1084
1085 let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
1086 let schema = Schema::builder()
1087 .with_fields(vec![
1088 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1089 .into(),
1090 NestedField::required(
1091 2,
1092 "name",
1093 Type::Primitive(crate::spec::PrimitiveType::String),
1094 )
1095 .into(),
1096 ])
1097 .build()
1098 .unwrap();
1099
1100 assert!(partition_spec.partition_type(&schema).is_err());
1101 }
1102
1103 #[test]
1104 fn test_builder_disallow_duplicate_names() {
1105 UnboundPartitionSpec::builder()
1106 .add_partition_field(1, "ts_day".to_string(), Transform::Day)
1107 .unwrap()
1108 .add_partition_field(2, "ts_day".to_string(), Transform::Day)
1109 .unwrap_err();
1110 }
1111
1112 #[test]
1113 fn test_builder_disallow_duplicate_field_ids() {
1114 let schema = Schema::builder()
1115 .with_fields(vec![
1116 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1117 .into(),
1118 NestedField::required(
1119 2,
1120 "name",
1121 Type::Primitive(crate::spec::PrimitiveType::String),
1122 )
1123 .into(),
1124 ])
1125 .build()
1126 .unwrap();
1127 PartitionSpec::builder(schema.clone())
1128 .add_unbound_field(UnboundPartitionField {
1129 source_id: 1,
1130 field_id: Some(1000),
1131 name: "id".to_string(),
1132 transform: Transform::Identity,
1133 })
1134 .unwrap()
1135 .add_unbound_field(UnboundPartitionField {
1136 source_id: 2,
1137 field_id: Some(1000),
1138 name: "id_bucket".to_string(),
1139 transform: Transform::Bucket(16),
1140 })
1141 .unwrap_err();
1142 }
1143
1144 #[test]
1145 fn test_builder_auto_assign_field_ids() {
1146 let schema = Schema::builder()
1147 .with_fields(vec![
1148 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1149 .into(),
1150 NestedField::required(
1151 2,
1152 "name",
1153 Type::Primitive(crate::spec::PrimitiveType::String),
1154 )
1155 .into(),
1156 NestedField::required(
1157 3,
1158 "ts",
1159 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1160 )
1161 .into(),
1162 ])
1163 .build()
1164 .unwrap();
1165 let spec = PartitionSpec::builder(schema.clone())
1166 .with_spec_id(1)
1167 .add_unbound_field(UnboundPartitionField {
1168 source_id: 1,
1169 name: "id".to_string(),
1170 transform: Transform::Identity,
1171 field_id: Some(1012),
1172 })
1173 .unwrap()
1174 .add_unbound_field(UnboundPartitionField {
1175 source_id: 2,
1176 name: "name_void".to_string(),
1177 transform: Transform::Void,
1178 field_id: None,
1179 })
1180 .unwrap()
1181 .add_unbound_field(UnboundPartitionField {
1183 source_id: 3,
1184 name: "year".to_string(),
1185 transform: Transform::Year,
1186 field_id: Some(1),
1187 })
1188 .unwrap()
1189 .build()
1190 .unwrap();
1191
1192 assert_eq!(1012, spec.fields[0].field_id);
1193 assert_eq!(1013, spec.fields[1].field_id);
1194 assert_eq!(1, spec.fields[2].field_id);
1195 }
1196
1197 #[test]
1198 fn test_builder_valid_schema() {
1199 let schema = Schema::builder()
1200 .with_fields(vec![
1201 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1202 .into(),
1203 NestedField::required(
1204 2,
1205 "name",
1206 Type::Primitive(crate::spec::PrimitiveType::String),
1207 )
1208 .into(),
1209 ])
1210 .build()
1211 .unwrap();
1212
1213 PartitionSpec::builder(schema.clone())
1214 .with_spec_id(1)
1215 .build()
1216 .unwrap();
1217
1218 let spec = PartitionSpec::builder(schema.clone())
1219 .with_spec_id(1)
1220 .add_partition_field("id", "id_bucket[16]", Transform::Bucket(16))
1221 .unwrap()
1222 .build()
1223 .unwrap();
1224
1225 assert_eq!(spec, PartitionSpec {
1226 spec_id: 1,
1227 fields: vec![PartitionField {
1228 source_id: 1,
1229 field_id: 1000,
1230 name: "id_bucket[16]".to_string(),
1231 transform: Transform::Bucket(16),
1232 }],
1233 });
1234 assert_eq!(
1235 spec.partition_type(&schema).unwrap(),
1236 StructType::new(vec![
1237 NestedField::optional(1000, "id_bucket[16]", Type::Primitive(PrimitiveType::Int))
1238 .into()
1239 ])
1240 )
1241 }
1242
1243 #[test]
1244 fn test_collision_with_schema_name() {
1245 let schema = Schema::builder()
1246 .with_fields(vec![
1247 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1248 .into(),
1249 ])
1250 .build()
1251 .unwrap();
1252
1253 PartitionSpec::builder(schema.clone())
1254 .with_spec_id(1)
1255 .build()
1256 .unwrap();
1257
1258 let err = PartitionSpec::builder(schema)
1259 .with_spec_id(1)
1260 .add_unbound_field(UnboundPartitionField {
1261 source_id: 1,
1262 field_id: None,
1263 name: "id".to_string(),
1264 transform: Transform::Bucket(16),
1265 })
1266 .unwrap_err();
1267 assert!(err.message().contains("conflicts with schema"))
1268 }
1269
1270 #[test]
1271 fn test_builder_collision_is_ok_for_identity_transforms() {
1272 let schema = Schema::builder()
1273 .with_fields(vec![
1274 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1275 .into(),
1276 NestedField::required(
1277 2,
1278 "number",
1279 Type::Primitive(crate::spec::PrimitiveType::Int),
1280 )
1281 .into(),
1282 ])
1283 .build()
1284 .unwrap();
1285
1286 PartitionSpec::builder(schema.clone())
1287 .with_spec_id(1)
1288 .build()
1289 .unwrap();
1290
1291 PartitionSpec::builder(schema.clone())
1292 .with_spec_id(1)
1293 .add_unbound_field(UnboundPartitionField {
1294 source_id: 1,
1295 field_id: None,
1296 name: "id".to_string(),
1297 transform: Transform::Identity,
1298 })
1299 .unwrap()
1300 .build()
1301 .unwrap();
1302
1303 PartitionSpec::builder(schema)
1305 .with_spec_id(1)
1306 .add_unbound_field(UnboundPartitionField {
1307 source_id: 2,
1308 field_id: None,
1309 name: "id".to_string(),
1310 transform: Transform::Identity,
1311 })
1312 .unwrap_err();
1313 }
1314
1315 #[test]
1316 fn test_builder_all_source_ids_must_exist() {
1317 let schema = Schema::builder()
1318 .with_fields(vec![
1319 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1320 .into(),
1321 NestedField::required(
1322 2,
1323 "name",
1324 Type::Primitive(crate::spec::PrimitiveType::String),
1325 )
1326 .into(),
1327 NestedField::required(
1328 3,
1329 "ts",
1330 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1331 )
1332 .into(),
1333 ])
1334 .build()
1335 .unwrap();
1336
1337 PartitionSpec::builder(schema.clone())
1339 .with_spec_id(1)
1340 .add_unbound_fields(vec![
1341 UnboundPartitionField {
1342 source_id: 1,
1343 field_id: None,
1344 name: "id_bucket".to_string(),
1345 transform: Transform::Bucket(16),
1346 },
1347 UnboundPartitionField {
1348 source_id: 2,
1349 field_id: None,
1350 name: "name".to_string(),
1351 transform: Transform::Identity,
1352 },
1353 ])
1354 .unwrap()
1355 .build()
1356 .unwrap();
1357
1358 PartitionSpec::builder(schema)
1360 .with_spec_id(1)
1361 .add_unbound_fields(vec![
1362 UnboundPartitionField {
1363 source_id: 1,
1364 field_id: None,
1365 name: "id_bucket".to_string(),
1366 transform: Transform::Bucket(16),
1367 },
1368 UnboundPartitionField {
1369 source_id: 4,
1370 field_id: None,
1371 name: "name".to_string(),
1372 transform: Transform::Identity,
1373 },
1374 ])
1375 .unwrap_err();
1376 }
1377
1378 #[test]
1379 fn test_builder_disallows_redundant() {
1380 let err = UnboundPartitionSpec::builder()
1381 .with_spec_id(1)
1382 .add_partition_field(1, "id_bucket[16]".to_string(), Transform::Bucket(16))
1383 .unwrap()
1384 .add_partition_field(
1385 1,
1386 "id_bucket_with_other_name".to_string(),
1387 Transform::Bucket(16),
1388 )
1389 .unwrap_err();
1390 assert!(err.message().contains("redundant partition"));
1391 }
1392
1393 #[test]
1394 fn test_builder_incompatible_transforms_disallowed() {
1395 let schema = Schema::builder()
1396 .with_fields(vec![
1397 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1398 .into(),
1399 ])
1400 .build()
1401 .unwrap();
1402
1403 PartitionSpec::builder(schema)
1404 .with_spec_id(1)
1405 .add_unbound_field(UnboundPartitionField {
1406 source_id: 1,
1407 field_id: None,
1408 name: "id_year".to_string(),
1409 transform: Transform::Year,
1410 })
1411 .unwrap_err();
1412 }
1413
1414 #[test]
1415 fn test_build_unbound_specs_without_partition_id() {
1416 let spec = UnboundPartitionSpec::builder()
1417 .with_spec_id(1)
1418 .add_partition_fields(vec![UnboundPartitionField {
1419 source_id: 1,
1420 field_id: None,
1421 name: "id_bucket[16]".to_string(),
1422 transform: Transform::Bucket(16),
1423 }])
1424 .unwrap()
1425 .build();
1426
1427 assert_eq!(spec, UnboundPartitionSpec {
1428 spec_id: Some(1),
1429 fields: vec![UnboundPartitionField {
1430 source_id: 1,
1431 field_id: None,
1432 name: "id_bucket[16]".to_string(),
1433 transform: Transform::Bucket(16),
1434 }]
1435 });
1436 }
1437
1438 #[test]
1439 fn test_is_compatible_with() {
1440 let schema = Schema::builder()
1441 .with_fields(vec![
1442 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1443 .into(),
1444 NestedField::required(
1445 2,
1446 "name",
1447 Type::Primitive(crate::spec::PrimitiveType::String),
1448 )
1449 .into(),
1450 ])
1451 .build()
1452 .unwrap();
1453
1454 let partition_spec_1 = PartitionSpec::builder(schema.clone())
1455 .with_spec_id(1)
1456 .add_unbound_field(UnboundPartitionField {
1457 source_id: 1,
1458 field_id: None,
1459 name: "id_bucket".to_string(),
1460 transform: Transform::Bucket(16),
1461 })
1462 .unwrap()
1463 .build()
1464 .unwrap();
1465
1466 let partition_spec_2 = PartitionSpec::builder(schema)
1467 .with_spec_id(1)
1468 .add_unbound_field(UnboundPartitionField {
1469 source_id: 1,
1470 field_id: None,
1471 name: "id_bucket".to_string(),
1472 transform: Transform::Bucket(16),
1473 })
1474 .unwrap()
1475 .build()
1476 .unwrap();
1477
1478 assert!(partition_spec_1.is_compatible_with(&partition_spec_2));
1479 }
1480
1481 #[test]
1482 fn test_not_compatible_with_transform_different() {
1483 let schema = Schema::builder()
1484 .with_fields(vec![
1485 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1486 .into(),
1487 ])
1488 .build()
1489 .unwrap();
1490
1491 let partition_spec_1 = PartitionSpec::builder(schema.clone())
1492 .with_spec_id(1)
1493 .add_unbound_field(UnboundPartitionField {
1494 source_id: 1,
1495 field_id: None,
1496 name: "id_bucket".to_string(),
1497 transform: Transform::Bucket(16),
1498 })
1499 .unwrap()
1500 .build()
1501 .unwrap();
1502
1503 let partition_spec_2 = PartitionSpec::builder(schema)
1504 .with_spec_id(1)
1505 .add_unbound_field(UnboundPartitionField {
1506 source_id: 1,
1507 field_id: None,
1508 name: "id_bucket".to_string(),
1509 transform: Transform::Bucket(32),
1510 })
1511 .unwrap()
1512 .build()
1513 .unwrap();
1514
1515 assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1516 }
1517
1518 #[test]
1519 fn test_not_compatible_with_source_id_different() {
1520 let schema = Schema::builder()
1521 .with_fields(vec![
1522 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1523 .into(),
1524 NestedField::required(
1525 2,
1526 "name",
1527 Type::Primitive(crate::spec::PrimitiveType::String),
1528 )
1529 .into(),
1530 ])
1531 .build()
1532 .unwrap();
1533
1534 let partition_spec_1 = PartitionSpec::builder(schema.clone())
1535 .with_spec_id(1)
1536 .add_unbound_field(UnboundPartitionField {
1537 source_id: 1,
1538 field_id: None,
1539 name: "id_bucket".to_string(),
1540 transform: Transform::Bucket(16),
1541 })
1542 .unwrap()
1543 .build()
1544 .unwrap();
1545
1546 let partition_spec_2 = PartitionSpec::builder(schema)
1547 .with_spec_id(1)
1548 .add_unbound_field(UnboundPartitionField {
1549 source_id: 2,
1550 field_id: None,
1551 name: "id_bucket".to_string(),
1552 transform: Transform::Bucket(16),
1553 })
1554 .unwrap()
1555 .build()
1556 .unwrap();
1557
1558 assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1559 }
1560
1561 #[test]
1562 fn test_not_compatible_with_order_different() {
1563 let schema = Schema::builder()
1564 .with_fields(vec![
1565 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1566 .into(),
1567 NestedField::required(
1568 2,
1569 "name",
1570 Type::Primitive(crate::spec::PrimitiveType::String),
1571 )
1572 .into(),
1573 ])
1574 .build()
1575 .unwrap();
1576
1577 let partition_spec_1 = PartitionSpec::builder(schema.clone())
1578 .with_spec_id(1)
1579 .add_unbound_field(UnboundPartitionField {
1580 source_id: 1,
1581 field_id: None,
1582 name: "id_bucket".to_string(),
1583 transform: Transform::Bucket(16),
1584 })
1585 .unwrap()
1586 .add_unbound_field(UnboundPartitionField {
1587 source_id: 2,
1588 field_id: None,
1589 name: "name".to_string(),
1590 transform: Transform::Identity,
1591 })
1592 .unwrap()
1593 .build()
1594 .unwrap();
1595
1596 let partition_spec_2 = PartitionSpec::builder(schema)
1597 .with_spec_id(1)
1598 .add_unbound_field(UnboundPartitionField {
1599 source_id: 2,
1600 field_id: None,
1601 name: "name".to_string(),
1602 transform: Transform::Identity,
1603 })
1604 .unwrap()
1605 .add_unbound_field(UnboundPartitionField {
1606 source_id: 1,
1607 field_id: None,
1608 name: "id_bucket".to_string(),
1609 transform: Transform::Bucket(16),
1610 })
1611 .unwrap()
1612 .build()
1613 .unwrap();
1614
1615 assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1616 }
1617
1618 #[test]
1619 fn test_highest_field_id_unpartitioned() {
1620 let spec = PartitionSpec::builder(Schema::builder().with_fields(vec![]).build().unwrap())
1621 .with_spec_id(1)
1622 .build()
1623 .unwrap();
1624
1625 assert!(spec.highest_field_id().is_none());
1626 }
1627
1628 #[test]
1629 fn test_highest_field_id() {
1630 let schema = Schema::builder()
1631 .with_fields(vec![
1632 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1633 .into(),
1634 NestedField::required(
1635 2,
1636 "name",
1637 Type::Primitive(crate::spec::PrimitiveType::String),
1638 )
1639 .into(),
1640 ])
1641 .build()
1642 .unwrap();
1643
1644 let spec = PartitionSpec::builder(schema)
1645 .with_spec_id(1)
1646 .add_unbound_field(UnboundPartitionField {
1647 source_id: 1,
1648 field_id: Some(1001),
1649 name: "id".to_string(),
1650 transform: Transform::Identity,
1651 })
1652 .unwrap()
1653 .add_unbound_field(UnboundPartitionField {
1654 source_id: 2,
1655 field_id: Some(1000),
1656 name: "name".to_string(),
1657 transform: Transform::Identity,
1658 })
1659 .unwrap()
1660 .build()
1661 .unwrap();
1662
1663 assert_eq!(Some(1001), spec.highest_field_id());
1664 }
1665
1666 #[test]
1667 fn test_has_sequential_ids() {
1668 let schema = Schema::builder()
1669 .with_fields(vec![
1670 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1671 .into(),
1672 NestedField::required(
1673 2,
1674 "name",
1675 Type::Primitive(crate::spec::PrimitiveType::String),
1676 )
1677 .into(),
1678 ])
1679 .build()
1680 .unwrap();
1681
1682 let spec = PartitionSpec::builder(schema)
1683 .with_spec_id(1)
1684 .add_unbound_field(UnboundPartitionField {
1685 source_id: 1,
1686 field_id: Some(1000),
1687 name: "id".to_string(),
1688 transform: Transform::Identity,
1689 })
1690 .unwrap()
1691 .add_unbound_field(UnboundPartitionField {
1692 source_id: 2,
1693 field_id: Some(1001),
1694 name: "name".to_string(),
1695 transform: Transform::Identity,
1696 })
1697 .unwrap()
1698 .build()
1699 .unwrap();
1700
1701 assert_eq!(1000, spec.fields[0].field_id);
1702 assert_eq!(1001, spec.fields[1].field_id);
1703 assert!(spec.has_sequential_ids());
1704 }
1705
1706 #[test]
1707 fn test_sequential_ids_must_start_at_1000() {
1708 let schema = Schema::builder()
1709 .with_fields(vec![
1710 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1711 .into(),
1712 NestedField::required(
1713 2,
1714 "name",
1715 Type::Primitive(crate::spec::PrimitiveType::String),
1716 )
1717 .into(),
1718 ])
1719 .build()
1720 .unwrap();
1721
1722 let spec = PartitionSpec::builder(schema)
1723 .with_spec_id(1)
1724 .add_unbound_field(UnboundPartitionField {
1725 source_id: 1,
1726 field_id: Some(999),
1727 name: "id".to_string(),
1728 transform: Transform::Identity,
1729 })
1730 .unwrap()
1731 .add_unbound_field(UnboundPartitionField {
1732 source_id: 2,
1733 field_id: Some(1000),
1734 name: "name".to_string(),
1735 transform: Transform::Identity,
1736 })
1737 .unwrap()
1738 .build()
1739 .unwrap();
1740
1741 assert_eq!(999, spec.fields[0].field_id);
1742 assert_eq!(1000, spec.fields[1].field_id);
1743 assert!(!spec.has_sequential_ids());
1744 }
1745
1746 #[test]
1747 fn test_sequential_ids_must_have_no_gaps() {
1748 let schema = Schema::builder()
1749 .with_fields(vec![
1750 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1751 .into(),
1752 NestedField::required(
1753 2,
1754 "name",
1755 Type::Primitive(crate::spec::PrimitiveType::String),
1756 )
1757 .into(),
1758 ])
1759 .build()
1760 .unwrap();
1761
1762 let spec = PartitionSpec::builder(schema)
1763 .with_spec_id(1)
1764 .add_unbound_field(UnboundPartitionField {
1765 source_id: 1,
1766 field_id: Some(1000),
1767 name: "id".to_string(),
1768 transform: Transform::Identity,
1769 })
1770 .unwrap()
1771 .add_unbound_field(UnboundPartitionField {
1772 source_id: 2,
1773 field_id: Some(1002),
1774 name: "name".to_string(),
1775 transform: Transform::Identity,
1776 })
1777 .unwrap()
1778 .build()
1779 .unwrap();
1780
1781 assert_eq!(1000, spec.fields[0].field_id);
1782 assert_eq!(1002, spec.fields[1].field_id);
1783 assert!(!spec.has_sequential_ids());
1784 }
1785
1786 #[test]
1787 fn test_partition_to_path() {
1788 let schema = Schema::builder()
1789 .with_fields(vec![
1790 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1791 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1792 ])
1793 .build()
1794 .unwrap();
1795
1796 let spec = PartitionSpec::builder(schema.clone())
1797 .add_partition_field("id", "id", Transform::Identity)
1798 .unwrap()
1799 .add_partition_field("name", "name", Transform::Identity)
1800 .unwrap()
1801 .build()
1802 .unwrap();
1803
1804 let data = Struct::from_iter([Some(Literal::int(42)), Some(Literal::string("alice"))]);
1805
1806 assert_eq!(
1807 spec.partition_to_path(&data, schema.into()),
1808 "id=42/name=alice"
1809 );
1810 }
1811}