iceberg/spec/
partition.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18/*!
19 * Partitioning
20 */
21use std::sync::Arc;
22
23use itertools::Itertools;
24use serde::{Deserialize, Serialize};
25use typed_builder::TypedBuilder;
26
27use super::transform::Transform;
28use super::{NestedField, Schema, SchemaRef, StructType};
29use crate::spec::Struct;
30use crate::{Error, ErrorKind, Result};
31
32pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999;
33pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0;
34
35/// Partition fields capture the transform from table data to partition values.
36#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
37#[serde(rename_all = "kebab-case")]
38pub struct PartitionField {
39    /// A source column id from the table’s schema
40    pub source_id: i32,
41    /// A partition field id that is used to identify a partition field and is unique within a partition spec.
42    /// In v2 table metadata, it is unique across all partition specs.
43    pub field_id: i32,
44    /// A partition name.
45    pub name: String,
46    /// A transform that is applied to the source column to produce a partition value.
47    pub transform: Transform,
48}
49
50impl PartitionField {
51    /// To unbound partition field
52    pub fn into_unbound(self) -> UnboundPartitionField {
53        self.into()
54    }
55}
56
57/// Reference to [`PartitionSpec`].
58pub type PartitionSpecRef = Arc<PartitionSpec>;
59/// Partition spec that defines how to produce a tuple of partition values from a record.
60///
61/// A [`PartitionSpec`] is originally obtained by binding an [`UnboundPartitionSpec`] to a schema and is
62/// only guaranteed to be valid for that schema. The main difference between [`PartitionSpec`] and
63/// [`UnboundPartitionSpec`] is that the former has field ids assigned,
64/// while field ids are optional for [`UnboundPartitionSpec`].
65#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
66#[serde(rename_all = "kebab-case")]
67pub struct PartitionSpec {
68    /// Identifier for PartitionSpec
69    spec_id: i32,
70    /// Details of the partition spec
71    fields: Vec<PartitionField>,
72}
73
74impl PartitionSpec {
75    /// Create a new partition spec builder with the given schema.
76    pub fn builder(schema: impl Into<SchemaRef>) -> PartitionSpecBuilder {
77        PartitionSpecBuilder::new(schema)
78    }
79
80    /// Fields of the partition spec
81    pub fn fields(&self) -> &[PartitionField] {
82        &self.fields
83    }
84
85    /// Spec id of the partition spec
86    pub fn spec_id(&self) -> i32 {
87        self.spec_id
88    }
89
90    /// Get a new unpartitioned partition spec
91    pub fn unpartition_spec() -> Self {
92        Self {
93            spec_id: DEFAULT_PARTITION_SPEC_ID,
94            fields: vec![],
95        }
96    }
97
98    /// Returns if the partition spec is unpartitioned.
99    ///
100    /// A [`PartitionSpec`] is unpartitioned if it has no fields or all fields are [`Transform::Void`] transform.
101    pub fn is_unpartitioned(&self) -> bool {
102        self.fields.is_empty() || self.fields.iter().all(|f| f.transform == Transform::Void)
103    }
104
105    /// Returns the partition type of this partition spec.
106    pub fn partition_type(&self, schema: &Schema) -> Result<StructType> {
107        PartitionSpecBuilder::partition_type(&self.fields, schema)
108    }
109
110    /// Convert to unbound partition spec
111    pub fn into_unbound(self) -> UnboundPartitionSpec {
112        self.into()
113    }
114
115    /// Change the spec id of the partition spec
116    pub fn with_spec_id(self, spec_id: i32) -> Self {
117        Self { spec_id, ..self }
118    }
119
120    /// Check if this partition spec has sequential partition ids.
121    /// Sequential ids start from 1000 and increment by 1 for each field.
122    /// This is required for spec version 1
123    pub fn has_sequential_ids(&self) -> bool {
124        has_sequential_ids(self.fields.iter().map(|f| f.field_id))
125    }
126
127    /// Get the highest field id in the partition spec.
128    pub fn highest_field_id(&self) -> Option<i32> {
129        self.fields.iter().map(|f| f.field_id).max()
130    }
131
132    /// Check if this partition spec is compatible with another partition spec.
133    ///
134    /// Returns true if the partition spec is equal to the other spec with partition field ids ignored and
135    /// spec_id ignored. The following must be identical:
136    /// * The number of fields
137    /// * Field order
138    /// * Field names
139    /// * Source column ids
140    /// * Transforms
141    pub fn is_compatible_with(&self, other: &PartitionSpec) -> bool {
142        if self.fields.len() != other.fields.len() {
143            return false;
144        }
145
146        for (this_field, other_field) in self.fields.iter().zip(other.fields.iter()) {
147            if this_field.source_id != other_field.source_id
148                || this_field.name != other_field.name
149                || this_field.transform != other_field.transform
150            {
151                return false;
152            }
153        }
154
155        true
156    }
157
158    pub(crate) fn partition_to_path(&self, data: &Struct, schema: SchemaRef) -> String {
159        let partition_type = self.partition_type(&schema).unwrap();
160        let field_types = partition_type.fields();
161
162        self.fields
163            .iter()
164            .enumerate()
165            .map(|(i, field)| {
166                let value = data[i].as_ref();
167                format!(
168                    "{}={}",
169                    field.name,
170                    field
171                        .transform
172                        .to_human_string(&field_types[i].field_type, value)
173                )
174            })
175            .join("/")
176    }
177}
178
179/// A partition key represents a specific partition in a table, containing the partition spec,
180/// schema, and the actual partition values.
181#[derive(Clone, Debug)]
182pub struct PartitionKey {
183    /// The partition spec that contains the partition fields.
184    spec: PartitionSpec,
185    /// The schema to which the partition spec is bound.
186    schema: SchemaRef,
187    /// Partition fields' values in struct.
188    data: Struct,
189}
190
191impl PartitionKey {
192    /// Creates a new partition key with the given spec, schema, and data.
193    pub fn new(spec: PartitionSpec, schema: SchemaRef, data: Struct) -> Self {
194        Self { spec, schema, data }
195    }
196
197    /// Generates a partition path based on the partition values.
198    pub fn to_path(&self) -> String {
199        self.spec.partition_to_path(&self.data, self.schema.clone())
200    }
201
202    /// Returns `true` if the partition key is absent (`None`)
203    /// or represents an unpartitioned spec.
204    pub fn is_effectively_none(partition_key: Option<&PartitionKey>) -> bool {
205        match partition_key {
206            None => true,
207            Some(pk) => pk.spec.is_unpartitioned(),
208        }
209    }
210}
211
212/// Reference to [`UnboundPartitionSpec`].
213pub type UnboundPartitionSpecRef = Arc<UnboundPartitionSpec>;
214/// Unbound partition field can be built without a schema and later bound to a schema.
215#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
216#[serde(rename_all = "kebab-case")]
217pub struct UnboundPartitionField {
218    /// A source column id from the table’s schema
219    pub source_id: i32,
220    /// A partition field id that is used to identify a partition field and is unique within a partition spec.
221    /// In v2 table metadata, it is unique across all partition specs.
222    #[builder(default, setter(strip_option(fallback = field_id_opt)))]
223    pub field_id: Option<i32>,
224    /// A partition name.
225    pub name: String,
226    /// A transform that is applied to the source column to produce a partition value.
227    pub transform: Transform,
228}
229
230/// Unbound partition spec can be built without a schema and later bound to a schema.
231/// They are used to transport schema information as part of the REST specification.
232/// The main difference to [`PartitionSpec`] is that the field ids are optional.
233#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
234#[serde(rename_all = "kebab-case")]
235pub struct UnboundPartitionSpec {
236    /// Identifier for PartitionSpec
237    pub(crate) spec_id: Option<i32>,
238    /// Details of the partition spec
239    pub(crate) fields: Vec<UnboundPartitionField>,
240}
241
242impl UnboundPartitionSpec {
243    /// Create unbound partition spec builder
244    pub fn builder() -> UnboundPartitionSpecBuilder {
245        UnboundPartitionSpecBuilder::default()
246    }
247
248    /// Bind this unbound partition spec to a schema.
249    pub fn bind(self, schema: impl Into<SchemaRef>) -> Result<PartitionSpec> {
250        PartitionSpecBuilder::new_from_unbound(self, schema)?.build()
251    }
252
253    /// Spec id of the partition spec
254    pub fn spec_id(&self) -> Option<i32> {
255        self.spec_id
256    }
257
258    /// Fields of the partition spec
259    pub fn fields(&self) -> &[UnboundPartitionField] {
260        &self.fields
261    }
262
263    /// Change the spec id of the partition spec
264    pub fn with_spec_id(self, spec_id: i32) -> Self {
265        Self {
266            spec_id: Some(spec_id),
267            ..self
268        }
269    }
270}
271
272fn has_sequential_ids(field_ids: impl Iterator<Item = i32>) -> bool {
273    for (index, field_id) in field_ids.enumerate() {
274        let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)
275            .checked_add(1)
276            .and_then(|id| id.checked_add(index as i64))
277            .unwrap_or(i64::MAX);
278
279        if field_id as i64 != expected_id {
280            return false;
281        }
282    }
283
284    true
285}
286
287impl From<PartitionField> for UnboundPartitionField {
288    fn from(field: PartitionField) -> Self {
289        UnboundPartitionField {
290            source_id: field.source_id,
291            field_id: Some(field.field_id),
292            name: field.name,
293            transform: field.transform,
294        }
295    }
296}
297
298impl From<PartitionSpec> for UnboundPartitionSpec {
299    fn from(spec: PartitionSpec) -> Self {
300        UnboundPartitionSpec {
301            spec_id: Some(spec.spec_id),
302            fields: spec.fields.into_iter().map(Into::into).collect(),
303        }
304    }
305}
306
307/// Create a new UnboundPartitionSpec
308#[derive(Debug, Default)]
309pub struct UnboundPartitionSpecBuilder {
310    spec_id: Option<i32>,
311    fields: Vec<UnboundPartitionField>,
312}
313
314impl UnboundPartitionSpecBuilder {
315    /// Create a new partition spec builder with the given schema.
316    pub fn new() -> Self {
317        Self {
318            spec_id: None,
319            fields: vec![],
320        }
321    }
322
323    /// Set the spec id for the partition spec.
324    pub fn with_spec_id(mut self, spec_id: i32) -> Self {
325        self.spec_id = Some(spec_id);
326        self
327    }
328
329    /// Add a new partition field to the partition spec from an unbound partition field.
330    pub fn add_partition_field(
331        self,
332        source_id: i32,
333        target_name: impl ToString,
334        transformation: Transform,
335    ) -> Result<Self> {
336        let field = UnboundPartitionField {
337            source_id,
338            field_id: None,
339            name: target_name.to_string(),
340            transform: transformation,
341        };
342        self.add_partition_field_internal(field)
343    }
344
345    /// Add multiple partition fields to the partition spec.
346    pub fn add_partition_fields(
347        self,
348        fields: impl IntoIterator<Item = UnboundPartitionField>,
349    ) -> Result<Self> {
350        let mut builder = self;
351        for field in fields {
352            builder = builder.add_partition_field_internal(field)?;
353        }
354        Ok(builder)
355    }
356
357    fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result<Self> {
358        self.check_name_set_and_unique(&field.name)?;
359        self.check_for_redundant_partitions(field.source_id, &field.transform)?;
360        if let Some(partition_field_id) = field.field_id {
361            self.check_partition_id_unique(partition_field_id)?;
362        }
363        self.fields.push(field);
364        Ok(self)
365    }
366
367    /// Build the unbound partition spec.
368    pub fn build(self) -> UnboundPartitionSpec {
369        UnboundPartitionSpec {
370            spec_id: self.spec_id,
371            fields: self.fields,
372        }
373    }
374}
375
376/// Create valid partition specs for a given schema.
377#[derive(Debug)]
378pub struct PartitionSpecBuilder {
379    spec_id: Option<i32>,
380    last_assigned_field_id: i32,
381    fields: Vec<UnboundPartitionField>,
382    schema: SchemaRef,
383}
384
385impl PartitionSpecBuilder {
386    /// Create a new partition spec builder with the given schema.
387    pub fn new(schema: impl Into<SchemaRef>) -> Self {
388        Self {
389            spec_id: None,
390            fields: vec![],
391            last_assigned_field_id: UNPARTITIONED_LAST_ASSIGNED_ID,
392            schema: schema.into(),
393        }
394    }
395
396    /// Create a new partition spec builder from an existing unbound partition spec.
397    pub fn new_from_unbound(
398        unbound: UnboundPartitionSpec,
399        schema: impl Into<SchemaRef>,
400    ) -> Result<Self> {
401        let mut builder =
402            Self::new(schema).with_spec_id(unbound.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID));
403
404        for field in unbound.fields {
405            builder = builder.add_unbound_field(field)?;
406        }
407        Ok(builder)
408    }
409
410    /// Set the last assigned field id for the partition spec.
411    ///
412    /// Set this field when a new partition spec is created for an existing TableMetaData.
413    /// As `field_id` must be unique in V2 metadata, this should be set to
414    /// the highest field id used previously.
415    pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self {
416        self.last_assigned_field_id = last_assigned_field_id;
417        self
418    }
419
420    /// Set the spec id for the partition spec.
421    pub fn with_spec_id(mut self, spec_id: i32) -> Self {
422        self.spec_id = Some(spec_id);
423        self
424    }
425
426    /// Add a new partition field to the partition spec.
427    pub fn add_partition_field(
428        self,
429        source_name: impl AsRef<str>,
430        target_name: impl Into<String>,
431        transform: Transform,
432    ) -> Result<Self> {
433        let source_id = self
434            .schema
435            .field_by_name(source_name.as_ref())
436            .ok_or_else(|| {
437                Error::new(
438                    ErrorKind::DataInvalid,
439                    format!(
440                        "Cannot find source column with name: {} in schema",
441                        source_name.as_ref()
442                    ),
443                )
444            })?
445            .id;
446        let field = UnboundPartitionField {
447            source_id,
448            field_id: None,
449            name: target_name.into(),
450            transform,
451        };
452
453        self.add_unbound_field(field)
454    }
455
456    /// Add a new partition field to the partition spec.
457    ///
458    /// If partition field id is set, it is used as the field id.
459    /// Otherwise, a new `field_id` is assigned.
460    pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> Result<Self> {
461        self.check_name_set_and_unique(&field.name)?;
462        self.check_for_redundant_partitions(field.source_id, &field.transform)?;
463        Self::check_name_does_not_collide_with_schema(&field, &self.schema)?;
464        Self::check_transform_compatibility(&field, &self.schema)?;
465        if let Some(partition_field_id) = field.field_id {
466            self.check_partition_id_unique(partition_field_id)?;
467        }
468
469        // Non-fallible from here
470        self.fields.push(field);
471        Ok(self)
472    }
473
474    /// Wrapper around `with_unbound_fields` to add multiple partition fields.
475    pub fn add_unbound_fields(
476        self,
477        fields: impl IntoIterator<Item = UnboundPartitionField>,
478    ) -> Result<Self> {
479        let mut builder = self;
480        for field in fields {
481            builder = builder.add_unbound_field(field)?;
482        }
483        Ok(builder)
484    }
485
486    /// Build a bound partition spec with the given schema.
487    pub fn build(self) -> Result<PartitionSpec> {
488        let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id)?;
489        Ok(PartitionSpec {
490            spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID),
491            fields,
492        })
493    }
494
495    fn set_field_ids(
496        fields: Vec<UnboundPartitionField>,
497        last_assigned_field_id: i32,
498    ) -> Result<Vec<PartitionField>> {
499        let mut last_assigned_field_id = last_assigned_field_id;
500        // Already assigned partition ids. If we see one of these during iteration,
501        // we skip it.
502        let assigned_ids = fields
503            .iter()
504            .filter_map(|f| f.field_id)
505            .collect::<std::collections::HashSet<_>>();
506
507        fn _check_add_1(prev: i32) -> Result<i32> {
508            prev.checked_add(1).ok_or_else(|| {
509                Error::new(
510                    ErrorKind::DataInvalid,
511                    "Cannot assign more partition ids. Overflow.",
512                )
513            })
514        }
515
516        let mut bound_fields = Vec::with_capacity(fields.len());
517        for field in fields.into_iter() {
518            let partition_field_id = if let Some(partition_field_id) = field.field_id {
519                last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_field_id);
520                partition_field_id
521            } else {
522                last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
523                while assigned_ids.contains(&last_assigned_field_id) {
524                    last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
525                }
526                last_assigned_field_id
527            };
528
529            bound_fields.push(PartitionField {
530                source_id: field.source_id,
531                field_id: partition_field_id,
532                name: field.name,
533                transform: field.transform,
534            })
535        }
536
537        Ok(bound_fields)
538    }
539
540    /// Returns the partition type of this partition spec.
541    fn partition_type(fields: &Vec<PartitionField>, schema: &Schema) -> Result<StructType> {
542        let mut struct_fields = Vec::with_capacity(fields.len());
543        for partition_field in fields {
544            let field = schema
545                .field_by_id(partition_field.source_id)
546                .ok_or_else(|| {
547                    Error::new(
548                        // This should never occur as check_transform_compatibility
549                        // already ensures that the source field exists in the schema
550                        ErrorKind::Unexpected,
551                        format!(
552                            "No column with source column id {} in schema {:?}",
553                            partition_field.source_id, schema
554                        ),
555                    )
556                })?;
557            let res_type = partition_field.transform.result_type(&field.field_type)?;
558            let field =
559                NestedField::optional(partition_field.field_id, &partition_field.name, res_type)
560                    .into();
561            struct_fields.push(field);
562        }
563        Ok(StructType::new(struct_fields))
564    }
565
566    /// Ensure that the partition name is unique among columns in the schema.
567    /// Duplicate names are allowed if:
568    /// 1. The column is sourced from the column with the same name.
569    /// 2. AND the transformation is identity
570    fn check_name_does_not_collide_with_schema(
571        field: &UnboundPartitionField,
572        schema: &Schema,
573    ) -> Result<()> {
574        match schema.field_by_name(field.name.as_str()) {
575            Some(schema_collision) => {
576                if field.transform == Transform::Identity {
577                    if schema_collision.id == field.source_id {
578                        Ok(())
579                    } else {
580                        Err(Error::new(
581                            ErrorKind::DataInvalid,
582                            format!(
583                                "Cannot create identity partition sourced from different field in schema. Field name '{}' has id `{}` in schema but partition source id is `{}`",
584                                field.name, schema_collision.id, field.source_id
585                            ),
586                        ))
587                    }
588                } else {
589                    Err(Error::new(
590                        ErrorKind::DataInvalid,
591                        format!(
592                            "Cannot create partition with name: '{}' that conflicts with schema field and is not an identity transform.",
593                            field.name
594                        ),
595                    ))
596                }
597            }
598            None => Ok(()),
599        }
600    }
601
602    /// Ensure that the transformation of the field is compatible with type of the field
603    /// in the schema. Implicitly also checks if the source field exists in the schema.
604    fn check_transform_compatibility(field: &UnboundPartitionField, schema: &Schema) -> Result<()> {
605        let schema_field = schema.field_by_id(field.source_id).ok_or_else(|| {
606            Error::new(
607                ErrorKind::DataInvalid,
608                format!(
609                    "Cannot find partition source field with id `{}` in schema",
610                    field.source_id
611                ),
612            )
613        })?;
614
615        if field.transform != Transform::Void {
616            if !schema_field.field_type.is_primitive() {
617                return Err(Error::new(
618                    ErrorKind::DataInvalid,
619                    format!(
620                        "Cannot partition by non-primitive source field: '{}'.",
621                        schema_field.field_type
622                    ),
623                ));
624            }
625
626            if field
627                .transform
628                .result_type(&schema_field.field_type)
629                .is_err()
630            {
631                return Err(Error::new(
632                    ErrorKind::DataInvalid,
633                    format!(
634                        "Invalid source type: '{}' for transform: '{}'.",
635                        schema_field.field_type,
636                        field.transform.dedup_name()
637                    ),
638                ));
639            }
640        }
641
642        Ok(())
643    }
644}
645
646/// Contains checks that are common to both PartitionSpecBuilder and UnboundPartitionSpecBuilder
647trait CorePartitionSpecValidator {
648    /// Ensure that the partition name is unique among the partition fields and is not empty.
649    fn check_name_set_and_unique(&self, name: &str) -> Result<()> {
650        if name.is_empty() {
651            return Err(Error::new(
652                ErrorKind::DataInvalid,
653                "Cannot use empty partition name",
654            ));
655        }
656
657        if self.fields().iter().any(|f| f.name == name) {
658            return Err(Error::new(
659                ErrorKind::DataInvalid,
660                format!("Cannot use partition name more than once: {}", name),
661            ));
662        }
663        Ok(())
664    }
665
666    /// For a single source-column transformations must be unique.
667    fn check_for_redundant_partitions(&self, source_id: i32, transform: &Transform) -> Result<()> {
668        let collision = self.fields().iter().find(|f| {
669            f.source_id == source_id && f.transform.dedup_name() == transform.dedup_name()
670        });
671
672        if let Some(collision) = collision {
673            Err(Error::new(
674                ErrorKind::DataInvalid,
675                format!(
676                    "Cannot add redundant partition with source id `{}` and transform `{}`. A partition with the same source id and transform already exists with name `{}`",
677                    source_id,
678                    transform.dedup_name(),
679                    collision.name
680                ),
681            ))
682        } else {
683            Ok(())
684        }
685    }
686
687    /// Check field / partition_id unique within the partition spec if set
688    fn check_partition_id_unique(&self, field_id: i32) -> Result<()> {
689        if self.fields().iter().any(|f| f.field_id == Some(field_id)) {
690            return Err(Error::new(
691                ErrorKind::DataInvalid,
692                format!(
693                    "Cannot use field id more than once in one PartitionSpec: {}",
694                    field_id
695                ),
696            ));
697        }
698
699        Ok(())
700    }
701
702    fn fields(&self) -> &Vec<UnboundPartitionField>;
703}
704
705impl CorePartitionSpecValidator for PartitionSpecBuilder {
706    fn fields(&self) -> &Vec<UnboundPartitionField> {
707        &self.fields
708    }
709}
710
711impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder {
712    fn fields(&self) -> &Vec<UnboundPartitionField> {
713        &self.fields
714    }
715}
716
717#[cfg(test)]
718mod tests {
719    use super::*;
720    use crate::spec::{Literal, PrimitiveType, Type};
721
722    #[test]
723    fn test_partition_spec() {
724        let spec = r#"
725        {
726        "spec-id": 1,
727        "fields": [ {
728            "source-id": 4,
729            "field-id": 1000,
730            "name": "ts_day",
731            "transform": "day"
732            }, {
733            "source-id": 1,
734            "field-id": 1001,
735            "name": "id_bucket",
736            "transform": "bucket[16]"
737            }, {
738            "source-id": 2,
739            "field-id": 1002,
740            "name": "id_truncate",
741            "transform": "truncate[4]"
742            } ]
743        }
744        "#;
745
746        let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
747        assert_eq!(4, partition_spec.fields[0].source_id);
748        assert_eq!(1000, partition_spec.fields[0].field_id);
749        assert_eq!("ts_day", partition_spec.fields[0].name);
750        assert_eq!(Transform::Day, partition_spec.fields[0].transform);
751
752        assert_eq!(1, partition_spec.fields[1].source_id);
753        assert_eq!(1001, partition_spec.fields[1].field_id);
754        assert_eq!("id_bucket", partition_spec.fields[1].name);
755        assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
756
757        assert_eq!(2, partition_spec.fields[2].source_id);
758        assert_eq!(1002, partition_spec.fields[2].field_id);
759        assert_eq!("id_truncate", partition_spec.fields[2].name);
760        assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
761    }
762
763    #[test]
764    fn test_is_unpartitioned() {
765        let schema = Schema::builder()
766            .with_fields(vec![
767                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
768                    .into(),
769                NestedField::required(
770                    2,
771                    "name",
772                    Type::Primitive(crate::spec::PrimitiveType::String),
773                )
774                .into(),
775            ])
776            .build()
777            .unwrap();
778        let partition_spec = PartitionSpec::builder(schema.clone())
779            .with_spec_id(1)
780            .build()
781            .unwrap();
782        assert!(
783            partition_spec.is_unpartitioned(),
784            "Empty partition spec should be unpartitioned"
785        );
786
787        let partition_spec = PartitionSpec::builder(schema.clone())
788            .add_unbound_fields(vec![
789                UnboundPartitionField::builder()
790                    .source_id(1)
791                    .name("id".to_string())
792                    .transform(Transform::Identity)
793                    .build(),
794                UnboundPartitionField::builder()
795                    .source_id(2)
796                    .name("name_string".to_string())
797                    .transform(Transform::Void)
798                    .build(),
799            ])
800            .unwrap()
801            .with_spec_id(1)
802            .build()
803            .unwrap();
804        assert!(
805            !partition_spec.is_unpartitioned(),
806            "Partition spec with one non void transform should not be unpartitioned"
807        );
808
809        let partition_spec = PartitionSpec::builder(schema.clone())
810            .with_spec_id(1)
811            .add_unbound_fields(vec![
812                UnboundPartitionField::builder()
813                    .source_id(1)
814                    .name("id_void".to_string())
815                    .transform(Transform::Void)
816                    .build(),
817                UnboundPartitionField::builder()
818                    .source_id(2)
819                    .name("name_void".to_string())
820                    .transform(Transform::Void)
821                    .build(),
822            ])
823            .unwrap()
824            .build()
825            .unwrap();
826        assert!(
827            partition_spec.is_unpartitioned(),
828            "Partition spec with all void field should be unpartitioned"
829        );
830    }
831
832    #[test]
833    fn test_unbound_partition_spec() {
834        let spec = r#"
835		{
836		"spec-id": 1,
837		"fields": [ {
838			"source-id": 4,
839			"field-id": 1000,
840			"name": "ts_day",
841			"transform": "day"
842			}, {
843			"source-id": 1,
844			"field-id": 1001,
845			"name": "id_bucket",
846			"transform": "bucket[16]"
847			}, {
848			"source-id": 2,
849			"field-id": 1002,
850			"name": "id_truncate",
851			"transform": "truncate[4]"
852			} ]
853		}
854		"#;
855
856        let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
857        assert_eq!(Some(1), partition_spec.spec_id);
858
859        assert_eq!(4, partition_spec.fields[0].source_id);
860        assert_eq!(Some(1000), partition_spec.fields[0].field_id);
861        assert_eq!("ts_day", partition_spec.fields[0].name);
862        assert_eq!(Transform::Day, partition_spec.fields[0].transform);
863
864        assert_eq!(1, partition_spec.fields[1].source_id);
865        assert_eq!(Some(1001), partition_spec.fields[1].field_id);
866        assert_eq!("id_bucket", partition_spec.fields[1].name);
867        assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
868
869        assert_eq!(2, partition_spec.fields[2].source_id);
870        assert_eq!(Some(1002), partition_spec.fields[2].field_id);
871        assert_eq!("id_truncate", partition_spec.fields[2].name);
872        assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
873
874        let spec = r#"
875		{
876		"fields": [ {
877			"source-id": 4,
878			"name": "ts_day",
879			"transform": "day"
880			} ]
881		}
882		"#;
883        let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
884        assert_eq!(None, partition_spec.spec_id);
885
886        assert_eq!(4, partition_spec.fields[0].source_id);
887        assert_eq!(None, partition_spec.fields[0].field_id);
888        assert_eq!("ts_day", partition_spec.fields[0].name);
889        assert_eq!(Transform::Day, partition_spec.fields[0].transform);
890    }
891
892    #[test]
893    fn test_new_unpartition() {
894        let schema = Schema::builder()
895            .with_fields(vec![
896                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
897                    .into(),
898                NestedField::required(
899                    2,
900                    "name",
901                    Type::Primitive(crate::spec::PrimitiveType::String),
902                )
903                .into(),
904            ])
905            .build()
906            .unwrap();
907        let partition_spec = PartitionSpec::builder(schema.clone())
908            .with_spec_id(0)
909            .build()
910            .unwrap();
911        let partition_type = partition_spec.partition_type(&schema).unwrap();
912        assert_eq!(0, partition_type.fields().len());
913
914        let unpartition_spec = PartitionSpec::unpartition_spec();
915        assert_eq!(partition_spec, unpartition_spec);
916    }
917
918    #[test]
919    fn test_partition_type() {
920        let spec = r#"
921            {
922            "spec-id": 1,
923            "fields": [ {
924                "source-id": 4,
925                "field-id": 1000,
926                "name": "ts_day",
927                "transform": "day"
928                }, {
929                "source-id": 1,
930                "field-id": 1001,
931                "name": "id_bucket",
932                "transform": "bucket[16]"
933                }, {
934                "source-id": 2,
935                "field-id": 1002,
936                "name": "id_truncate",
937                "transform": "truncate[4]"
938                } ]
939            }
940            "#;
941
942        let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
943        let schema = Schema::builder()
944            .with_fields(vec![
945                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
946                    .into(),
947                NestedField::required(
948                    2,
949                    "name",
950                    Type::Primitive(crate::spec::PrimitiveType::String),
951                )
952                .into(),
953                NestedField::required(
954                    3,
955                    "ts",
956                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
957                )
958                .into(),
959                NestedField::required(
960                    4,
961                    "ts_day",
962                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
963                )
964                .into(),
965                NestedField::required(
966                    5,
967                    "id_bucket",
968                    Type::Primitive(crate::spec::PrimitiveType::Int),
969                )
970                .into(),
971                NestedField::required(
972                    6,
973                    "id_truncate",
974                    Type::Primitive(crate::spec::PrimitiveType::Int),
975                )
976                .into(),
977            ])
978            .build()
979            .unwrap();
980
981        let partition_type = partition_spec.partition_type(&schema).unwrap();
982        assert_eq!(3, partition_type.fields().len());
983        assert_eq!(
984            *partition_type.fields()[0],
985            NestedField::optional(
986                partition_spec.fields[0].field_id,
987                &partition_spec.fields[0].name,
988                Type::Primitive(crate::spec::PrimitiveType::Date)
989            )
990        );
991        assert_eq!(
992            *partition_type.fields()[1],
993            NestedField::optional(
994                partition_spec.fields[1].field_id,
995                &partition_spec.fields[1].name,
996                Type::Primitive(crate::spec::PrimitiveType::Int)
997            )
998        );
999        assert_eq!(
1000            *partition_type.fields()[2],
1001            NestedField::optional(
1002                partition_spec.fields[2].field_id,
1003                &partition_spec.fields[2].name,
1004                Type::Primitive(crate::spec::PrimitiveType::String)
1005            )
1006        );
1007    }
1008
1009    #[test]
1010    fn test_partition_empty() {
1011        let spec = r#"
1012            {
1013            "spec-id": 1,
1014            "fields": []
1015            }
1016            "#;
1017
1018        let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
1019        let schema = Schema::builder()
1020            .with_fields(vec![
1021                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1022                    .into(),
1023                NestedField::required(
1024                    2,
1025                    "name",
1026                    Type::Primitive(crate::spec::PrimitiveType::String),
1027                )
1028                .into(),
1029                NestedField::required(
1030                    3,
1031                    "ts",
1032                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1033                )
1034                .into(),
1035                NestedField::required(
1036                    4,
1037                    "ts_day",
1038                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1039                )
1040                .into(),
1041                NestedField::required(
1042                    5,
1043                    "id_bucket",
1044                    Type::Primitive(crate::spec::PrimitiveType::Int),
1045                )
1046                .into(),
1047                NestedField::required(
1048                    6,
1049                    "id_truncate",
1050                    Type::Primitive(crate::spec::PrimitiveType::Int),
1051                )
1052                .into(),
1053            ])
1054            .build()
1055            .unwrap();
1056
1057        let partition_type = partition_spec.partition_type(&schema).unwrap();
1058        assert_eq!(0, partition_type.fields().len());
1059    }
1060
1061    #[test]
1062    fn test_partition_error() {
1063        let spec = r#"
1064        {
1065        "spec-id": 1,
1066        "fields": [ {
1067            "source-id": 4,
1068            "field-id": 1000,
1069            "name": "ts_day",
1070            "transform": "day"
1071            }, {
1072            "source-id": 1,
1073            "field-id": 1001,
1074            "name": "id_bucket",
1075            "transform": "bucket[16]"
1076            }, {
1077            "source-id": 2,
1078            "field-id": 1002,
1079            "name": "id_truncate",
1080            "transform": "truncate[4]"
1081            } ]
1082        }
1083        "#;
1084
1085        let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
1086        let schema = Schema::builder()
1087            .with_fields(vec![
1088                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1089                    .into(),
1090                NestedField::required(
1091                    2,
1092                    "name",
1093                    Type::Primitive(crate::spec::PrimitiveType::String),
1094                )
1095                .into(),
1096            ])
1097            .build()
1098            .unwrap();
1099
1100        assert!(partition_spec.partition_type(&schema).is_err());
1101    }
1102
1103    #[test]
1104    fn test_builder_disallow_duplicate_names() {
1105        UnboundPartitionSpec::builder()
1106            .add_partition_field(1, "ts_day".to_string(), Transform::Day)
1107            .unwrap()
1108            .add_partition_field(2, "ts_day".to_string(), Transform::Day)
1109            .unwrap_err();
1110    }
1111
1112    #[test]
1113    fn test_builder_disallow_duplicate_field_ids() {
1114        let schema = Schema::builder()
1115            .with_fields(vec![
1116                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1117                    .into(),
1118                NestedField::required(
1119                    2,
1120                    "name",
1121                    Type::Primitive(crate::spec::PrimitiveType::String),
1122                )
1123                .into(),
1124            ])
1125            .build()
1126            .unwrap();
1127        PartitionSpec::builder(schema.clone())
1128            .add_unbound_field(UnboundPartitionField {
1129                source_id: 1,
1130                field_id: Some(1000),
1131                name: "id".to_string(),
1132                transform: Transform::Identity,
1133            })
1134            .unwrap()
1135            .add_unbound_field(UnboundPartitionField {
1136                source_id: 2,
1137                field_id: Some(1000),
1138                name: "id_bucket".to_string(),
1139                transform: Transform::Bucket(16),
1140            })
1141            .unwrap_err();
1142    }
1143
1144    #[test]
1145    fn test_builder_auto_assign_field_ids() {
1146        let schema = Schema::builder()
1147            .with_fields(vec![
1148                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1149                    .into(),
1150                NestedField::required(
1151                    2,
1152                    "name",
1153                    Type::Primitive(crate::spec::PrimitiveType::String),
1154                )
1155                .into(),
1156                NestedField::required(
1157                    3,
1158                    "ts",
1159                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1160                )
1161                .into(),
1162            ])
1163            .build()
1164            .unwrap();
1165        let spec = PartitionSpec::builder(schema.clone())
1166            .with_spec_id(1)
1167            .add_unbound_field(UnboundPartitionField {
1168                source_id: 1,
1169                name: "id".to_string(),
1170                transform: Transform::Identity,
1171                field_id: Some(1012),
1172            })
1173            .unwrap()
1174            .add_unbound_field(UnboundPartitionField {
1175                source_id: 2,
1176                name: "name_void".to_string(),
1177                transform: Transform::Void,
1178                field_id: None,
1179            })
1180            .unwrap()
1181            // Should keep its ID even if its lower
1182            .add_unbound_field(UnboundPartitionField {
1183                source_id: 3,
1184                name: "year".to_string(),
1185                transform: Transform::Year,
1186                field_id: Some(1),
1187            })
1188            .unwrap()
1189            .build()
1190            .unwrap();
1191
1192        assert_eq!(1012, spec.fields[0].field_id);
1193        assert_eq!(1013, spec.fields[1].field_id);
1194        assert_eq!(1, spec.fields[2].field_id);
1195    }
1196
1197    #[test]
1198    fn test_builder_valid_schema() {
1199        let schema = Schema::builder()
1200            .with_fields(vec![
1201                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1202                    .into(),
1203                NestedField::required(
1204                    2,
1205                    "name",
1206                    Type::Primitive(crate::spec::PrimitiveType::String),
1207                )
1208                .into(),
1209            ])
1210            .build()
1211            .unwrap();
1212
1213        PartitionSpec::builder(schema.clone())
1214            .with_spec_id(1)
1215            .build()
1216            .unwrap();
1217
1218        let spec = PartitionSpec::builder(schema.clone())
1219            .with_spec_id(1)
1220            .add_partition_field("id", "id_bucket[16]", Transform::Bucket(16))
1221            .unwrap()
1222            .build()
1223            .unwrap();
1224
1225        assert_eq!(spec, PartitionSpec {
1226            spec_id: 1,
1227            fields: vec![PartitionField {
1228                source_id: 1,
1229                field_id: 1000,
1230                name: "id_bucket[16]".to_string(),
1231                transform: Transform::Bucket(16),
1232            }],
1233        });
1234        assert_eq!(
1235            spec.partition_type(&schema).unwrap(),
1236            StructType::new(vec![
1237                NestedField::optional(1000, "id_bucket[16]", Type::Primitive(PrimitiveType::Int))
1238                    .into()
1239            ])
1240        )
1241    }
1242
1243    #[test]
1244    fn test_collision_with_schema_name() {
1245        let schema = Schema::builder()
1246            .with_fields(vec![
1247                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1248                    .into(),
1249            ])
1250            .build()
1251            .unwrap();
1252
1253        PartitionSpec::builder(schema.clone())
1254            .with_spec_id(1)
1255            .build()
1256            .unwrap();
1257
1258        let err = PartitionSpec::builder(schema)
1259            .with_spec_id(1)
1260            .add_unbound_field(UnboundPartitionField {
1261                source_id: 1,
1262                field_id: None,
1263                name: "id".to_string(),
1264                transform: Transform::Bucket(16),
1265            })
1266            .unwrap_err();
1267        assert!(err.message().contains("conflicts with schema"))
1268    }
1269
1270    #[test]
1271    fn test_builder_collision_is_ok_for_identity_transforms() {
1272        let schema = Schema::builder()
1273            .with_fields(vec![
1274                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1275                    .into(),
1276                NestedField::required(
1277                    2,
1278                    "number",
1279                    Type::Primitive(crate::spec::PrimitiveType::Int),
1280                )
1281                .into(),
1282            ])
1283            .build()
1284            .unwrap();
1285
1286        PartitionSpec::builder(schema.clone())
1287            .with_spec_id(1)
1288            .build()
1289            .unwrap();
1290
1291        PartitionSpec::builder(schema.clone())
1292            .with_spec_id(1)
1293            .add_unbound_field(UnboundPartitionField {
1294                source_id: 1,
1295                field_id: None,
1296                name: "id".to_string(),
1297                transform: Transform::Identity,
1298            })
1299            .unwrap()
1300            .build()
1301            .unwrap();
1302
1303        // Not OK for different source id
1304        PartitionSpec::builder(schema)
1305            .with_spec_id(1)
1306            .add_unbound_field(UnboundPartitionField {
1307                source_id: 2,
1308                field_id: None,
1309                name: "id".to_string(),
1310                transform: Transform::Identity,
1311            })
1312            .unwrap_err();
1313    }
1314
1315    #[test]
1316    fn test_builder_all_source_ids_must_exist() {
1317        let schema = Schema::builder()
1318            .with_fields(vec![
1319                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1320                    .into(),
1321                NestedField::required(
1322                    2,
1323                    "name",
1324                    Type::Primitive(crate::spec::PrimitiveType::String),
1325                )
1326                .into(),
1327                NestedField::required(
1328                    3,
1329                    "ts",
1330                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1331                )
1332                .into(),
1333            ])
1334            .build()
1335            .unwrap();
1336
1337        // Valid
1338        PartitionSpec::builder(schema.clone())
1339            .with_spec_id(1)
1340            .add_unbound_fields(vec![
1341                UnboundPartitionField {
1342                    source_id: 1,
1343                    field_id: None,
1344                    name: "id_bucket".to_string(),
1345                    transform: Transform::Bucket(16),
1346                },
1347                UnboundPartitionField {
1348                    source_id: 2,
1349                    field_id: None,
1350                    name: "name".to_string(),
1351                    transform: Transform::Identity,
1352                },
1353            ])
1354            .unwrap()
1355            .build()
1356            .unwrap();
1357
1358        // Invalid
1359        PartitionSpec::builder(schema)
1360            .with_spec_id(1)
1361            .add_unbound_fields(vec![
1362                UnboundPartitionField {
1363                    source_id: 1,
1364                    field_id: None,
1365                    name: "id_bucket".to_string(),
1366                    transform: Transform::Bucket(16),
1367                },
1368                UnboundPartitionField {
1369                    source_id: 4,
1370                    field_id: None,
1371                    name: "name".to_string(),
1372                    transform: Transform::Identity,
1373                },
1374            ])
1375            .unwrap_err();
1376    }
1377
1378    #[test]
1379    fn test_builder_disallows_redundant() {
1380        let err = UnboundPartitionSpec::builder()
1381            .with_spec_id(1)
1382            .add_partition_field(1, "id_bucket[16]".to_string(), Transform::Bucket(16))
1383            .unwrap()
1384            .add_partition_field(
1385                1,
1386                "id_bucket_with_other_name".to_string(),
1387                Transform::Bucket(16),
1388            )
1389            .unwrap_err();
1390        assert!(err.message().contains("redundant partition"));
1391    }
1392
1393    #[test]
1394    fn test_builder_incompatible_transforms_disallowed() {
1395        let schema = Schema::builder()
1396            .with_fields(vec![
1397                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1398                    .into(),
1399            ])
1400            .build()
1401            .unwrap();
1402
1403        PartitionSpec::builder(schema)
1404            .with_spec_id(1)
1405            .add_unbound_field(UnboundPartitionField {
1406                source_id: 1,
1407                field_id: None,
1408                name: "id_year".to_string(),
1409                transform: Transform::Year,
1410            })
1411            .unwrap_err();
1412    }
1413
1414    #[test]
1415    fn test_build_unbound_specs_without_partition_id() {
1416        let spec = UnboundPartitionSpec::builder()
1417            .with_spec_id(1)
1418            .add_partition_fields(vec![UnboundPartitionField {
1419                source_id: 1,
1420                field_id: None,
1421                name: "id_bucket[16]".to_string(),
1422                transform: Transform::Bucket(16),
1423            }])
1424            .unwrap()
1425            .build();
1426
1427        assert_eq!(spec, UnboundPartitionSpec {
1428            spec_id: Some(1),
1429            fields: vec![UnboundPartitionField {
1430                source_id: 1,
1431                field_id: None,
1432                name: "id_bucket[16]".to_string(),
1433                transform: Transform::Bucket(16),
1434            }]
1435        });
1436    }
1437
1438    #[test]
1439    fn test_is_compatible_with() {
1440        let schema = Schema::builder()
1441            .with_fields(vec![
1442                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1443                    .into(),
1444                NestedField::required(
1445                    2,
1446                    "name",
1447                    Type::Primitive(crate::spec::PrimitiveType::String),
1448                )
1449                .into(),
1450            ])
1451            .build()
1452            .unwrap();
1453
1454        let partition_spec_1 = PartitionSpec::builder(schema.clone())
1455            .with_spec_id(1)
1456            .add_unbound_field(UnboundPartitionField {
1457                source_id: 1,
1458                field_id: None,
1459                name: "id_bucket".to_string(),
1460                transform: Transform::Bucket(16),
1461            })
1462            .unwrap()
1463            .build()
1464            .unwrap();
1465
1466        let partition_spec_2 = PartitionSpec::builder(schema)
1467            .with_spec_id(1)
1468            .add_unbound_field(UnboundPartitionField {
1469                source_id: 1,
1470                field_id: None,
1471                name: "id_bucket".to_string(),
1472                transform: Transform::Bucket(16),
1473            })
1474            .unwrap()
1475            .build()
1476            .unwrap();
1477
1478        assert!(partition_spec_1.is_compatible_with(&partition_spec_2));
1479    }
1480
1481    #[test]
1482    fn test_not_compatible_with_transform_different() {
1483        let schema = Schema::builder()
1484            .with_fields(vec![
1485                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1486                    .into(),
1487            ])
1488            .build()
1489            .unwrap();
1490
1491        let partition_spec_1 = PartitionSpec::builder(schema.clone())
1492            .with_spec_id(1)
1493            .add_unbound_field(UnboundPartitionField {
1494                source_id: 1,
1495                field_id: None,
1496                name: "id_bucket".to_string(),
1497                transform: Transform::Bucket(16),
1498            })
1499            .unwrap()
1500            .build()
1501            .unwrap();
1502
1503        let partition_spec_2 = PartitionSpec::builder(schema)
1504            .with_spec_id(1)
1505            .add_unbound_field(UnboundPartitionField {
1506                source_id: 1,
1507                field_id: None,
1508                name: "id_bucket".to_string(),
1509                transform: Transform::Bucket(32),
1510            })
1511            .unwrap()
1512            .build()
1513            .unwrap();
1514
1515        assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1516    }
1517
1518    #[test]
1519    fn test_not_compatible_with_source_id_different() {
1520        let schema = Schema::builder()
1521            .with_fields(vec![
1522                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1523                    .into(),
1524                NestedField::required(
1525                    2,
1526                    "name",
1527                    Type::Primitive(crate::spec::PrimitiveType::String),
1528                )
1529                .into(),
1530            ])
1531            .build()
1532            .unwrap();
1533
1534        let partition_spec_1 = PartitionSpec::builder(schema.clone())
1535            .with_spec_id(1)
1536            .add_unbound_field(UnboundPartitionField {
1537                source_id: 1,
1538                field_id: None,
1539                name: "id_bucket".to_string(),
1540                transform: Transform::Bucket(16),
1541            })
1542            .unwrap()
1543            .build()
1544            .unwrap();
1545
1546        let partition_spec_2 = PartitionSpec::builder(schema)
1547            .with_spec_id(1)
1548            .add_unbound_field(UnboundPartitionField {
1549                source_id: 2,
1550                field_id: None,
1551                name: "id_bucket".to_string(),
1552                transform: Transform::Bucket(16),
1553            })
1554            .unwrap()
1555            .build()
1556            .unwrap();
1557
1558        assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1559    }
1560
1561    #[test]
1562    fn test_not_compatible_with_order_different() {
1563        let schema = Schema::builder()
1564            .with_fields(vec![
1565                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1566                    .into(),
1567                NestedField::required(
1568                    2,
1569                    "name",
1570                    Type::Primitive(crate::spec::PrimitiveType::String),
1571                )
1572                .into(),
1573            ])
1574            .build()
1575            .unwrap();
1576
1577        let partition_spec_1 = PartitionSpec::builder(schema.clone())
1578            .with_spec_id(1)
1579            .add_unbound_field(UnboundPartitionField {
1580                source_id: 1,
1581                field_id: None,
1582                name: "id_bucket".to_string(),
1583                transform: Transform::Bucket(16),
1584            })
1585            .unwrap()
1586            .add_unbound_field(UnboundPartitionField {
1587                source_id: 2,
1588                field_id: None,
1589                name: "name".to_string(),
1590                transform: Transform::Identity,
1591            })
1592            .unwrap()
1593            .build()
1594            .unwrap();
1595
1596        let partition_spec_2 = PartitionSpec::builder(schema)
1597            .with_spec_id(1)
1598            .add_unbound_field(UnboundPartitionField {
1599                source_id: 2,
1600                field_id: None,
1601                name: "name".to_string(),
1602                transform: Transform::Identity,
1603            })
1604            .unwrap()
1605            .add_unbound_field(UnboundPartitionField {
1606                source_id: 1,
1607                field_id: None,
1608                name: "id_bucket".to_string(),
1609                transform: Transform::Bucket(16),
1610            })
1611            .unwrap()
1612            .build()
1613            .unwrap();
1614
1615        assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1616    }
1617
1618    #[test]
1619    fn test_highest_field_id_unpartitioned() {
1620        let spec = PartitionSpec::builder(Schema::builder().with_fields(vec![]).build().unwrap())
1621            .with_spec_id(1)
1622            .build()
1623            .unwrap();
1624
1625        assert!(spec.highest_field_id().is_none());
1626    }
1627
1628    #[test]
1629    fn test_highest_field_id() {
1630        let schema = Schema::builder()
1631            .with_fields(vec![
1632                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1633                    .into(),
1634                NestedField::required(
1635                    2,
1636                    "name",
1637                    Type::Primitive(crate::spec::PrimitiveType::String),
1638                )
1639                .into(),
1640            ])
1641            .build()
1642            .unwrap();
1643
1644        let spec = PartitionSpec::builder(schema)
1645            .with_spec_id(1)
1646            .add_unbound_field(UnboundPartitionField {
1647                source_id: 1,
1648                field_id: Some(1001),
1649                name: "id".to_string(),
1650                transform: Transform::Identity,
1651            })
1652            .unwrap()
1653            .add_unbound_field(UnboundPartitionField {
1654                source_id: 2,
1655                field_id: Some(1000),
1656                name: "name".to_string(),
1657                transform: Transform::Identity,
1658            })
1659            .unwrap()
1660            .build()
1661            .unwrap();
1662
1663        assert_eq!(Some(1001), spec.highest_field_id());
1664    }
1665
1666    #[test]
1667    fn test_has_sequential_ids() {
1668        let schema = Schema::builder()
1669            .with_fields(vec![
1670                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1671                    .into(),
1672                NestedField::required(
1673                    2,
1674                    "name",
1675                    Type::Primitive(crate::spec::PrimitiveType::String),
1676                )
1677                .into(),
1678            ])
1679            .build()
1680            .unwrap();
1681
1682        let spec = PartitionSpec::builder(schema)
1683            .with_spec_id(1)
1684            .add_unbound_field(UnboundPartitionField {
1685                source_id: 1,
1686                field_id: Some(1000),
1687                name: "id".to_string(),
1688                transform: Transform::Identity,
1689            })
1690            .unwrap()
1691            .add_unbound_field(UnboundPartitionField {
1692                source_id: 2,
1693                field_id: Some(1001),
1694                name: "name".to_string(),
1695                transform: Transform::Identity,
1696            })
1697            .unwrap()
1698            .build()
1699            .unwrap();
1700
1701        assert_eq!(1000, spec.fields[0].field_id);
1702        assert_eq!(1001, spec.fields[1].field_id);
1703        assert!(spec.has_sequential_ids());
1704    }
1705
1706    #[test]
1707    fn test_sequential_ids_must_start_at_1000() {
1708        let schema = Schema::builder()
1709            .with_fields(vec![
1710                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1711                    .into(),
1712                NestedField::required(
1713                    2,
1714                    "name",
1715                    Type::Primitive(crate::spec::PrimitiveType::String),
1716                )
1717                .into(),
1718            ])
1719            .build()
1720            .unwrap();
1721
1722        let spec = PartitionSpec::builder(schema)
1723            .with_spec_id(1)
1724            .add_unbound_field(UnboundPartitionField {
1725                source_id: 1,
1726                field_id: Some(999),
1727                name: "id".to_string(),
1728                transform: Transform::Identity,
1729            })
1730            .unwrap()
1731            .add_unbound_field(UnboundPartitionField {
1732                source_id: 2,
1733                field_id: Some(1000),
1734                name: "name".to_string(),
1735                transform: Transform::Identity,
1736            })
1737            .unwrap()
1738            .build()
1739            .unwrap();
1740
1741        assert_eq!(999, spec.fields[0].field_id);
1742        assert_eq!(1000, spec.fields[1].field_id);
1743        assert!(!spec.has_sequential_ids());
1744    }
1745
1746    #[test]
1747    fn test_sequential_ids_must_have_no_gaps() {
1748        let schema = Schema::builder()
1749            .with_fields(vec![
1750                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1751                    .into(),
1752                NestedField::required(
1753                    2,
1754                    "name",
1755                    Type::Primitive(crate::spec::PrimitiveType::String),
1756                )
1757                .into(),
1758            ])
1759            .build()
1760            .unwrap();
1761
1762        let spec = PartitionSpec::builder(schema)
1763            .with_spec_id(1)
1764            .add_unbound_field(UnboundPartitionField {
1765                source_id: 1,
1766                field_id: Some(1000),
1767                name: "id".to_string(),
1768                transform: Transform::Identity,
1769            })
1770            .unwrap()
1771            .add_unbound_field(UnboundPartitionField {
1772                source_id: 2,
1773                field_id: Some(1002),
1774                name: "name".to_string(),
1775                transform: Transform::Identity,
1776            })
1777            .unwrap()
1778            .build()
1779            .unwrap();
1780
1781        assert_eq!(1000, spec.fields[0].field_id);
1782        assert_eq!(1002, spec.fields[1].field_id);
1783        assert!(!spec.has_sequential_ids());
1784    }
1785
1786    #[test]
1787    fn test_partition_to_path() {
1788        let schema = Schema::builder()
1789            .with_fields(vec![
1790                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1791                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1792            ])
1793            .build()
1794            .unwrap();
1795
1796        let spec = PartitionSpec::builder(schema.clone())
1797            .add_partition_field("id", "id", Transform::Identity)
1798            .unwrap()
1799            .add_partition_field("name", "name", Transform::Identity)
1800            .unwrap()
1801            .build()
1802            .unwrap();
1803
1804        let data = Struct::from_iter([Some(Literal::int(42)), Some(Literal::string("alice"))]);
1805
1806        assert_eq!(
1807            spec.partition_to_path(&data, schema.into()),
1808            "id=42/name=alice"
1809        );
1810    }
1811}