iceberg/spec/
transform.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Transforms in iceberg.
19
20use std::cmp::Ordering;
21use std::fmt::{Display, Formatter};
22use std::str::FromStr;
23
24use fnv::FnvHashSet;
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26
27use super::{Datum, PrimitiveLiteral};
28use crate::ErrorKind;
29use crate::error::{Error, Result};
30use crate::expr::{
31    BinaryExpression, BoundPredicate, BoundReference, Predicate, PredicateOperator, Reference,
32    SetExpression, UnaryExpression,
33};
34use crate::spec::Literal;
35use crate::spec::datatypes::{PrimitiveType, Type};
36use crate::transform::{BoxedTransformFunction, create_transform_function};
37
38/// Transform is used to transform predicates to partition predicates,
39/// in addition to transforming data values.
40///
41/// Deriving partition predicates from column predicates on the table data
42/// is used to separate the logical queries from physical storage: the
43/// partitioning can change and the correct partition filters are always
44/// derived from column predicates.
45///
46/// This simplifies queries because users don’t have to supply both logical
47/// predicates and partition predicates.
48///
49/// All transforms must return `null` for a `null` input value.
50#[derive(Debug, PartialEq, Eq, Clone, Copy)]
51pub enum Transform {
52    /// Source value, unmodified
53    ///
54    /// - Source type could be any type.
55    /// - Return type is the same with source type.
56    Identity,
57    /// Hash of value, mod `N`.
58    ///
59    /// Bucket partition transforms use a 32-bit hash of the source value.
60    /// The 32-bit hash implementation is the 32-bit Murmur3 hash, x86
61    /// variant, seeded with 0.
62    ///
63    /// Transforms are parameterized by a number of buckets, N. The hash mod
64    /// N must produce a positive value by first discarding the sign bit of
65    /// the hash value. In pseudo-code, the function is:
66    ///
67    /// ```text
68    /// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N
69    /// ```
70    ///
71    /// - Source type could be `int`, `long`, `decimal`, `date`, `time`,
72    ///   `timestamp`, `timestamptz`, `string`, `uuid`, `fixed`, `binary`.
73    /// - Return type is `int`.
74    Bucket(u32),
75    /// Value truncated to width `W`
76    ///
77    /// For `int`:
78    ///
79    /// - `v - (v % W)` remainders must be positive
80    /// - example: W=10: 1 → 0, -1 → -10
81    /// - note: The remainder, v % W, must be positive.
82    ///
83    /// For `long`:
84    ///
85    /// - `v - (v % W)` remainders must be positive
86    /// - example: W=10: 1 → 0, -1 → -10
87    /// - note: The remainder, v % W, must be positive.
88    ///
89    /// For `decimal`:
90    ///
91    /// - `scaled_W = decimal(W, scale(v)) v - (v % scaled_W)`
92    /// - example: W=50, s=2: 10.65 → 10.50
93    ///
94    /// For `string`:
95    ///
96    /// - Substring of length L: `v.substring(0, L)`
97    /// - example: L=3: iceberg → ice
98    /// - note: Strings are truncated to a valid UTF-8 string with no more
99    ///   than L code points.
100    ///
101    /// - Source type could be `int`, `long`, `decimal`, `string`
102    /// - Return type is the same with source type.
103    Truncate(u32),
104    /// Extract a date or timestamp year, as years from 1970
105    ///
106    /// - Source type could be `date`, `timestamp`, `timestamptz`
107    /// - Return type is `int`
108    Year,
109    /// Extract a date or timestamp month, as months from 1970-01-01
110    ///
111    /// - Source type could be `date`, `timestamp`, `timestamptz`
112    /// - Return type is `int`
113    Month,
114    /// Extract a date or timestamp day, as days from 1970-01-01
115    ///
116    /// - Source type could be `date`, `timestamp`, `timestamptz`
117    /// - Return type is `int`
118    Day,
119    /// Extract a timestamp hour, as hours from 1970-01-01 00:00:00
120    ///
121    /// - Source type could be `timestamp`, `timestamptz`
122    /// - Return type is `int`
123    Hour,
124    /// Always produces `null`
125    ///
126    /// The void transform may be used to replace the transform in an
127    /// existing partition field so that the field is effectively dropped in
128    /// v1 tables.
129    ///
130    /// - Source type could be any type..
131    /// - Return type is Source type.
132    Void,
133    /// Used to represent some customized transform that can't be recognized or supported now.
134    Unknown,
135}
136
137impl Transform {
138    /// Returns a human-readable String representation of a transformed value.
139    pub fn to_human_string(&self, field_type: &Type, value: Option<&Literal>) -> String {
140        if let Some(value) = value {
141            if let Some(value) = value.as_primitive_literal() {
142                let field_type = field_type.as_primitive_type().unwrap();
143                let datum = Datum::new(field_type.clone(), value);
144                match self {
145                    Self::Identity => datum.to_human_string(),
146                    Self::Void => "null".to_string(),
147                    _ => {
148                        todo!()
149                    }
150                }
151            } else {
152                "null".to_string()
153            }
154        } else {
155            "null".to_string()
156        }
157    }
158
159    /// Get the return type of transform given the input type.
160    /// Returns `None` if it can't be transformed.
161    pub fn result_type(&self, input_type: &Type) -> Result<Type> {
162        match self {
163            Transform::Identity => {
164                if matches!(input_type, Type::Primitive(_)) {
165                    Ok(input_type.clone())
166                } else {
167                    Err(Error::new(
168                        ErrorKind::DataInvalid,
169                        format!("{input_type} is not a valid input type of identity transform",),
170                    ))
171                }
172            }
173            Transform::Void => Ok(input_type.clone()),
174            Transform::Unknown => Ok(Type::Primitive(PrimitiveType::String)),
175            Transform::Bucket(_) => {
176                if let Type::Primitive(p) = input_type {
177                    match p {
178                        PrimitiveType::Int
179                        | PrimitiveType::Long
180                        | PrimitiveType::Decimal { .. }
181                        | PrimitiveType::Date
182                        | PrimitiveType::Time
183                        | PrimitiveType::Timestamp
184                        | PrimitiveType::Timestamptz
185                        | PrimitiveType::TimestampNs
186                        | PrimitiveType::TimestamptzNs
187                        | PrimitiveType::String
188                        | PrimitiveType::Uuid
189                        | PrimitiveType::Fixed(_)
190                        | PrimitiveType::Binary => Ok(Type::Primitive(PrimitiveType::Int)),
191                        _ => Err(Error::new(
192                            ErrorKind::DataInvalid,
193                            format!("{input_type} is not a valid input type of bucket transform",),
194                        )),
195                    }
196                } else {
197                    Err(Error::new(
198                        ErrorKind::DataInvalid,
199                        format!("{input_type} is not a valid input type of bucket transform",),
200                    ))
201                }
202            }
203            Transform::Truncate(_) => {
204                if let Type::Primitive(p) = input_type {
205                    match p {
206                        PrimitiveType::Int
207                        | PrimitiveType::Long
208                        | PrimitiveType::String
209                        | PrimitiveType::Binary
210                        | PrimitiveType::Decimal { .. } => Ok(input_type.clone()),
211                        _ => Err(Error::new(
212                            ErrorKind::DataInvalid,
213                            format!("{input_type} is not a valid input type of truncate transform",),
214                        )),
215                    }
216                } else {
217                    Err(Error::new(
218                        ErrorKind::DataInvalid,
219                        format!("{input_type} is not a valid input type of truncate transform",),
220                    ))
221                }
222            }
223            Transform::Year | Transform::Month => {
224                if let Type::Primitive(p) = input_type {
225                    match p {
226                        PrimitiveType::Timestamp
227                        | PrimitiveType::Timestamptz
228                        | PrimitiveType::TimestampNs
229                        | PrimitiveType::TimestamptzNs
230                        | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Int)),
231                        _ => Err(Error::new(
232                            ErrorKind::DataInvalid,
233                            format!("{input_type} is not a valid input type of {self} transform",),
234                        )),
235                    }
236                } else {
237                    Err(Error::new(
238                        ErrorKind::DataInvalid,
239                        format!("{input_type} is not a valid input type of {self} transform",),
240                    ))
241                }
242            }
243            Transform::Day => {
244                if let Type::Primitive(p) = input_type {
245                    match p {
246                        PrimitiveType::Timestamp
247                        | PrimitiveType::Timestamptz
248                        | PrimitiveType::TimestampNs
249                        | PrimitiveType::TimestamptzNs
250                        | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Date)),
251                        _ => Err(Error::new(
252                            ErrorKind::DataInvalid,
253                            format!("{input_type} is not a valid input type of {self} transform",),
254                        )),
255                    }
256                } else {
257                    Err(Error::new(
258                        ErrorKind::DataInvalid,
259                        format!("{input_type} is not a valid input type of {self} transform",),
260                    ))
261                }
262            }
263            Transform::Hour => {
264                if let Type::Primitive(p) = input_type {
265                    match p {
266                        PrimitiveType::Timestamp
267                        | PrimitiveType::Timestamptz
268                        | PrimitiveType::TimestampNs
269                        | PrimitiveType::TimestamptzNs => Ok(Type::Primitive(PrimitiveType::Int)),
270                        _ => Err(Error::new(
271                            ErrorKind::DataInvalid,
272                            format!("{input_type} is not a valid input type of {self} transform",),
273                        )),
274                    }
275                } else {
276                    Err(Error::new(
277                        ErrorKind::DataInvalid,
278                        format!("{input_type} is not a valid input type of {self} transform",),
279                    ))
280                }
281            }
282        }
283    }
284
285    /// Whether the transform preserves the order of values.
286    pub fn preserves_order(&self) -> bool {
287        !matches!(
288            self,
289            Transform::Void | Transform::Bucket(_) | Transform::Unknown
290        )
291    }
292
293    /// Return the unique transform name to check if similar transforms for the same source field
294    /// are added multiple times in partition spec builder.
295    pub fn dedup_name(&self) -> String {
296        match self {
297            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
298                "time".to_string()
299            }
300            _ => format!("{self}"),
301        }
302    }
303
304    /// Whether ordering by this transform's result satisfies the ordering of another transform's
305    /// result.
306    ///
307    /// For example, sorting by day(ts) will produce an ordering that is also by month(ts) or
308    ///  year(ts). However, sorting by day(ts) will not satisfy the order of hour(ts) or identity(ts).
309    pub fn satisfies_order_of(&self, other: &Self) -> bool {
310        match self {
311            Transform::Identity => other.preserves_order(),
312            Transform::Hour => matches!(
313                other,
314                Transform::Hour | Transform::Day | Transform::Month | Transform::Year
315            ),
316            Transform::Day => matches!(other, Transform::Day | Transform::Month | Transform::Year),
317            Transform::Month => matches!(other, Transform::Month | Transform::Year),
318            _ => self == other,
319        }
320    }
321
322    /// Strictly projects a given predicate according to the transformation
323    /// specified by the `Transform` instance.
324    ///
325    /// This method ensures that the projected predicate is strictly aligned
326    /// with the transformation logic, providing a more precise filtering
327    /// mechanism for transformed data.
328    ///
329    /// # Example
330    /// Suppose, we have row filter `a = 10`, and a partition spec
331    /// `bucket(a, 37) as bs`, if one row matches `a = 10`, then its partition
332    /// value should match `bucket(10, 37) as bs`, and we project `a = 10` to
333    /// `bs = bucket(10, 37)`
334    pub fn strict_project(
335        &self,
336        name: &str,
337        predicate: &BoundPredicate,
338    ) -> Result<Option<Predicate>> {
339        let func = create_transform_function(self)?;
340
341        match self {
342            Transform::Identity => match predicate {
343                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
344                BoundPredicate::Binary(expr) => Ok(Some(Predicate::Binary(BinaryExpression::new(
345                    expr.op(),
346                    Reference::new(name),
347                    expr.literal().to_owned(),
348                )))),
349                BoundPredicate::Set(expr) => Ok(Some(Predicate::Set(SetExpression::new(
350                    expr.op(),
351                    Reference::new(name),
352                    expr.literals().to_owned(),
353                )))),
354                _ => Ok(None),
355            },
356            Transform::Bucket(_) => match predicate {
357                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
358                BoundPredicate::Binary(expr) => {
359                    self.project_binary_expr(name, PredicateOperator::NotEq, expr, &func)
360                }
361                BoundPredicate::Set(expr) => {
362                    self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
363                }
364                _ => Ok(None),
365            },
366            Transform::Truncate(width) => match predicate {
367                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
368                BoundPredicate::Binary(expr) => {
369                    if matches!(
370                        expr.term().field().field_type.as_primitive_type(),
371                        Some(&PrimitiveType::Int)
372                            | Some(&PrimitiveType::Long)
373                            | Some(&PrimitiveType::Decimal { .. })
374                    ) {
375                        self.truncate_number_strict(name, expr, &func)
376                    } else if expr.op() == PredicateOperator::StartsWith {
377                        let len = match expr.literal().literal() {
378                            PrimitiveLiteral::String(s) => s.len(),
379                            PrimitiveLiteral::Binary(b) => b.len(),
380                            _ => {
381                                return Err(Error::new(
382                                    ErrorKind::DataInvalid,
383                                    format!(
384                                        "Expected a string or binary literal, got: {:?}",
385                                        expr.literal()
386                                    ),
387                                ));
388                            }
389                        };
390                        match len.cmp(&(*width as usize)) {
391                            Ordering::Less => Ok(Some(Predicate::Binary(BinaryExpression::new(
392                                PredicateOperator::StartsWith,
393                                Reference::new(name),
394                                expr.literal().to_owned(),
395                            )))),
396                            Ordering::Equal => Ok(Some(Predicate::Binary(BinaryExpression::new(
397                                PredicateOperator::Eq,
398                                Reference::new(name),
399                                expr.literal().to_owned(),
400                            )))),
401                            Ordering::Greater => Ok(None),
402                        }
403                    } else if expr.op() == PredicateOperator::NotStartsWith {
404                        let len = match expr.literal().literal() {
405                            PrimitiveLiteral::String(s) => s.len(),
406                            PrimitiveLiteral::Binary(b) => b.len(),
407                            _ => {
408                                return Err(Error::new(
409                                    ErrorKind::DataInvalid,
410                                    format!(
411                                        "Expected a string or binary literal, got: {:?}",
412                                        expr.literal()
413                                    ),
414                                ));
415                            }
416                        };
417                        match len.cmp(&(*width as usize)) {
418                            Ordering::Less => Ok(Some(Predicate::Binary(BinaryExpression::new(
419                                PredicateOperator::NotStartsWith,
420                                Reference::new(name),
421                                expr.literal().to_owned(),
422                            )))),
423                            Ordering::Equal => Ok(Some(Predicate::Binary(BinaryExpression::new(
424                                PredicateOperator::NotEq,
425                                Reference::new(name),
426                                expr.literal().to_owned(),
427                            )))),
428                            Ordering::Greater => {
429                                Ok(Some(Predicate::Binary(BinaryExpression::new(
430                                    expr.op(),
431                                    Reference::new(name),
432                                    func.transform_literal_result(expr.literal())?,
433                                ))))
434                            }
435                        }
436                    } else {
437                        self.truncate_array_strict(name, expr, &func)
438                    }
439                }
440                BoundPredicate::Set(expr) => {
441                    self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
442                }
443                _ => Ok(None),
444            },
445            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
446                match predicate {
447                    BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
448                    BoundPredicate::Binary(expr) => self.truncate_number_strict(name, expr, &func),
449                    BoundPredicate::Set(expr) => {
450                        self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
451                    }
452                    _ => Ok(None),
453                }
454            }
455            _ => Ok(None),
456        }
457    }
458
459    /// Projects a given predicate according to the transformation
460    /// specified by the `Transform` instance.
461    ///
462    /// This allows predicates to be effectively applied to data
463    /// that has undergone transformation, enabling efficient querying
464    /// and filtering based on the original, untransformed data.
465    ///
466    /// # Example
467    /// Suppose, we have row filter `a = 10`, and a partition spec
468    /// `bucket(a, 37) as bs`, if one row matches `a = 10`, then its partition
469    /// value should match `bucket(10, 37) as bs`, and we project `a = 10` to
470    /// `bs = bucket(10, 37)`
471    pub fn project(&self, name: &str, predicate: &BoundPredicate) -> Result<Option<Predicate>> {
472        let func = create_transform_function(self)?;
473
474        match self {
475            Transform::Identity => match predicate {
476                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
477                BoundPredicate::Binary(expr) => Ok(Some(Predicate::Binary(BinaryExpression::new(
478                    expr.op(),
479                    Reference::new(name),
480                    expr.literal().to_owned(),
481                )))),
482                BoundPredicate::Set(expr) => Ok(Some(Predicate::Set(SetExpression::new(
483                    expr.op(),
484                    Reference::new(name),
485                    expr.literals().to_owned(),
486                )))),
487                _ => Ok(None),
488            },
489            Transform::Bucket(_) => match predicate {
490                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
491                BoundPredicate::Binary(expr) => {
492                    self.project_binary_expr(name, PredicateOperator::Eq, expr, &func)
493                }
494                BoundPredicate::Set(expr) => {
495                    self.project_set_expr(expr, PredicateOperator::In, name, &func)
496                }
497                _ => Ok(None),
498            },
499            Transform::Truncate(width) => match predicate {
500                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
501                BoundPredicate::Binary(expr) => {
502                    self.project_binary_with_adjusted_boundary(name, expr, &func, Some(*width))
503                }
504                BoundPredicate::Set(expr) => {
505                    self.project_set_expr(expr, PredicateOperator::In, name, &func)
506                }
507                _ => Ok(None),
508            },
509            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
510                match predicate {
511                    BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
512                    BoundPredicate::Binary(expr) => {
513                        self.project_binary_with_adjusted_boundary(name, expr, &func, None)
514                    }
515                    BoundPredicate::Set(expr) => {
516                        self.project_set_expr(expr, PredicateOperator::In, name, &func)
517                    }
518                    _ => Ok(None),
519                }
520            }
521            _ => Ok(None),
522        }
523    }
524
525    /// Check if `Transform` is applicable on datum's `PrimitiveType`
526    fn can_transform(&self, datum: &Datum) -> bool {
527        let input_type = datum.data_type().clone();
528        self.result_type(&Type::Primitive(input_type)).is_ok()
529    }
530
531    /// Creates a unary predicate from a given operator and a reference name.
532    fn project_unary(op: PredicateOperator, name: &str) -> Result<Option<Predicate>> {
533        Ok(Some(Predicate::Unary(UnaryExpression::new(
534            op,
535            Reference::new(name),
536        ))))
537    }
538
539    /// Attempts to create a binary predicate based on a binary expression,
540    /// if applicable.
541    ///
542    /// This method evaluates a given binary expression and, if the operation
543    /// is the given operator and the literal can be transformed, constructs a
544    /// `Predicate::Binary`variant representing the binary operation.
545    fn project_binary_expr(
546        &self,
547        name: &str,
548        op: PredicateOperator,
549        expr: &BinaryExpression<BoundReference>,
550        func: &BoxedTransformFunction,
551    ) -> Result<Option<Predicate>> {
552        if expr.op() != op || !self.can_transform(expr.literal()) {
553            return Ok(None);
554        }
555
556        Ok(Some(Predicate::Binary(BinaryExpression::new(
557            expr.op(),
558            Reference::new(name),
559            func.transform_literal_result(expr.literal())?,
560        ))))
561    }
562
563    /// Projects a binary expression to a predicate with an adjusted boundary.
564    ///
565    /// Checks if the literal within the given binary expression is
566    /// transformable. If transformable, it proceeds to potentially adjust
567    /// the boundary of the expression based on the comparison operator (`op`).
568    /// The potential adjustments involve incrementing or decrementing the
569    /// literal value and changing the `PredicateOperator` itself to its
570    /// inclusive variant.
571    fn project_binary_with_adjusted_boundary(
572        &self,
573        name: &str,
574        expr: &BinaryExpression<BoundReference>,
575        func: &BoxedTransformFunction,
576        width: Option<u32>,
577    ) -> Result<Option<Predicate>> {
578        if !self.can_transform(expr.literal()) {
579            return Ok(None);
580        }
581
582        let op = &expr.op();
583        let datum = &expr.literal();
584
585        if let Some(boundary) = Self::adjust_boundary(op, datum)? {
586            let transformed_projection = func.transform_literal_result(&boundary)?;
587
588            let adjusted_projection =
589                self.adjust_time_projection(op, datum, &transformed_projection);
590
591            let adjusted_operator = Self::adjust_operator(op, datum, width);
592
593            if let Some(op) = adjusted_operator {
594                let predicate = match adjusted_projection {
595                    None => Predicate::Binary(BinaryExpression::new(
596                        op,
597                        Reference::new(name),
598                        transformed_projection,
599                    )),
600                    Some(AdjustedProjection::Single(d)) => {
601                        Predicate::Binary(BinaryExpression::new(op, Reference::new(name), d))
602                    }
603                    Some(AdjustedProjection::Set(d)) => Predicate::Set(SetExpression::new(
604                        PredicateOperator::In,
605                        Reference::new(name),
606                        d,
607                    )),
608                };
609                return Ok(Some(predicate));
610            }
611        };
612
613        Ok(None)
614    }
615
616    /// Projects a set expression to a predicate,
617    /// applying a transformation to each literal in the set.
618    fn project_set_expr(
619        &self,
620        expr: &SetExpression<BoundReference>,
621        op: PredicateOperator,
622        name: &str,
623        func: &BoxedTransformFunction,
624    ) -> Result<Option<Predicate>> {
625        if expr.op() != op || expr.literals().iter().any(|d| !self.can_transform(d)) {
626            return Ok(None);
627        }
628
629        let mut new_set = FnvHashSet::default();
630
631        for lit in expr.literals() {
632            let datum = func.transform_literal_result(lit)?;
633
634            if let Some(AdjustedProjection::Single(d)) =
635                self.adjust_time_projection(&op, lit, &datum)
636            {
637                new_set.insert(d);
638            };
639
640            new_set.insert(datum);
641        }
642
643        Ok(Some(Predicate::Set(SetExpression::new(
644            expr.op(),
645            Reference::new(name),
646            new_set,
647        ))))
648    }
649
650    /// Adjusts the boundary value for comparison operations
651    /// based on the specified `PredicateOperator` and `Datum`.
652    ///
653    /// This function modifies the boundary value for certain comparison
654    /// operators (`LessThan`, `GreaterThan`) by incrementing or decrementing
655    /// the literal value within the given `Datum`. For operators that do not
656    /// imply a boundary shift (`Eq`, `LessThanOrEq`, `GreaterThanOrEq`,
657    /// `StartsWith`, `NotStartsWith`), the original datum is returned
658    /// unmodified.
659    fn adjust_boundary(op: &PredicateOperator, datum: &Datum) -> Result<Option<Datum>> {
660        let adjusted_boundary = match op {
661            PredicateOperator::LessThan => match (datum.data_type(), datum.literal()) {
662                (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(Datum::int(v - 1)),
663                (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(Datum::long(v - 1)),
664                (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
665                    Some(Datum::decimal(v - 1)?)
666                }
667                (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(Datum::date(v - 1)),
668                (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
669                    Some(Datum::timestamp_micros(v - 1))
670                }
671                _ => Some(datum.to_owned()),
672            },
673            PredicateOperator::GreaterThan => match (datum.data_type(), datum.literal()) {
674                (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(Datum::int(v + 1)),
675                (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(Datum::long(v + 1)),
676                (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
677                    Some(Datum::decimal(v + 1)?)
678                }
679                (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(Datum::date(v + 1)),
680                (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
681                    Some(Datum::timestamp_micros(v + 1))
682                }
683                _ => Some(datum.to_owned()),
684            },
685            PredicateOperator::Eq
686            | PredicateOperator::LessThanOrEq
687            | PredicateOperator::GreaterThanOrEq
688            | PredicateOperator::StartsWith
689            | PredicateOperator::NotStartsWith => Some(datum.to_owned()),
690            _ => None,
691        };
692
693        Ok(adjusted_boundary)
694    }
695
696    /// Adjusts the comparison operator based on the specified datum and an
697    /// optional width constraint.
698    ///
699    /// This function modifies the comparison operator for `LessThan` and
700    /// `GreaterThan` cases to their inclusive counterparts (`LessThanOrEq`,
701    /// `GreaterThanOrEq`) unconditionally. For `StartsWith` and
702    /// `NotStartsWith` operators acting on string literals, the operator may
703    /// be adjusted to `Eq` or `NotEq` if the string length matches the
704    /// specified width, indicating a precise match rather than a prefix
705    /// condition.
706    fn adjust_operator(
707        op: &PredicateOperator,
708        datum: &Datum,
709        width: Option<u32>,
710    ) -> Option<PredicateOperator> {
711        match op {
712            PredicateOperator::LessThan => Some(PredicateOperator::LessThanOrEq),
713            PredicateOperator::GreaterThan => Some(PredicateOperator::GreaterThanOrEq),
714            PredicateOperator::StartsWith => match datum.literal() {
715                PrimitiveLiteral::String(s) => {
716                    if let Some(w) = width {
717                        if s.len() == w as usize {
718                            return Some(PredicateOperator::Eq);
719                        };
720                    };
721                    Some(*op)
722                }
723                _ => Some(*op),
724            },
725            PredicateOperator::NotStartsWith => match datum.literal() {
726                PrimitiveLiteral::String(s) => {
727                    if let Some(w) = width {
728                        let w = w as usize;
729
730                        if s.len() == w {
731                            return Some(PredicateOperator::NotEq);
732                        }
733
734                        if s.len() < w {
735                            return Some(*op);
736                        }
737
738                        return None;
739                    };
740                    Some(*op)
741                }
742                _ => Some(*op),
743            },
744            _ => Some(*op),
745        }
746    }
747
748    /// Adjust projection for temporal transforms, align with Java
749    /// implementation: https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java#L275
750    fn adjust_time_projection(
751        &self,
752        op: &PredicateOperator,
753        original: &Datum,
754        transformed: &Datum,
755    ) -> Option<AdjustedProjection> {
756        let should_adjust = match self {
757            Transform::Day => matches!(original.data_type(), PrimitiveType::Timestamp),
758            Transform::Year | Transform::Month => true,
759            _ => false,
760        };
761
762        if should_adjust {
763            if let &PrimitiveLiteral::Int(v) = transformed.literal() {
764                match op {
765                    PredicateOperator::LessThan
766                    | PredicateOperator::LessThanOrEq
767                    | PredicateOperator::In => {
768                        if v < 0 {
769                            // # TODO
770                            // An ugly hack to fix. Refine the increment and decrement logic later.
771                            match self {
772                                Transform::Day => {
773                                    return Some(AdjustedProjection::Single(Datum::date(v + 1)));
774                                }
775                                _ => {
776                                    return Some(AdjustedProjection::Single(Datum::int(v + 1)));
777                                }
778                            }
779                        };
780                    }
781                    PredicateOperator::Eq => {
782                        if v < 0 {
783                            let new_set = FnvHashSet::from_iter(vec![
784                                transformed.to_owned(),
785                                // # TODO
786                                // An ugly hack to fix. Refine the increment and decrement logic later.
787                                {
788                                    match self {
789                                        Transform::Day => Datum::date(v + 1),
790                                        _ => Datum::int(v + 1),
791                                    }
792                                },
793                            ]);
794                            return Some(AdjustedProjection::Set(new_set));
795                        }
796                    }
797                    _ => {
798                        return None;
799                    }
800                }
801            };
802        }
803        None
804    }
805
806    // Increment for Int, Long, Decimal, Date, Timestamp
807    // Ignore other types
808    #[inline]
809    fn try_increment_number(datum: &Datum) -> Result<Datum> {
810        match (datum.data_type(), datum.literal()) {
811            (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Ok(Datum::int(v + 1)),
812            (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Ok(Datum::long(v + 1)),
813            (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => Datum::decimal(v + 1),
814            (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Ok(Datum::date(v + 1)),
815            (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
816                Ok(Datum::timestamp_micros(v + 1))
817            }
818            (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
819                Ok(Datum::timestamp_nanos(v + 1))
820            }
821            (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
822                Ok(Datum::timestamptz_micros(v + 1))
823            }
824            (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
825                Ok(Datum::timestamptz_nanos(v + 1))
826            }
827            (PrimitiveType::Int, _)
828            | (PrimitiveType::Long, _)
829            | (PrimitiveType::Decimal { .. }, _)
830            | (PrimitiveType::Date, _)
831            | (PrimitiveType::Timestamp, _) => Err(Error::new(
832                ErrorKind::Unexpected,
833                format!(
834                    "Unsupported literal increment for type: {:?}",
835                    datum.data_type()
836                ),
837            )),
838            _ => Ok(datum.to_owned()),
839        }
840    }
841
842    // Decrement for Int, Long, Decimal, Date, Timestamp
843    // Ignore other types
844    #[inline]
845    fn try_decrement_number(datum: &Datum) -> Result<Datum> {
846        match (datum.data_type(), datum.literal()) {
847            (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Ok(Datum::int(v - 1)),
848            (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Ok(Datum::long(v - 1)),
849            (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => Datum::decimal(v - 1),
850            (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Ok(Datum::date(v - 1)),
851            (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
852                Ok(Datum::timestamp_micros(v - 1))
853            }
854            (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
855                Ok(Datum::timestamp_nanos(v - 1))
856            }
857            (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
858                Ok(Datum::timestamptz_micros(v - 1))
859            }
860            (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
861                Ok(Datum::timestamptz_nanos(v - 1))
862            }
863            (PrimitiveType::Int, _)
864            | (PrimitiveType::Long, _)
865            | (PrimitiveType::Decimal { .. }, _)
866            | (PrimitiveType::Date, _)
867            | (PrimitiveType::Timestamp, _) => Err(Error::new(
868                ErrorKind::Unexpected,
869                format!(
870                    "Unsupported literal decrement for type: {:?}",
871                    datum.data_type()
872                ),
873            )),
874            _ => Ok(datum.to_owned()),
875        }
876    }
877
878    fn truncate_number_strict(
879        &self,
880        name: &str,
881        expr: &BinaryExpression<BoundReference>,
882        func: &BoxedTransformFunction,
883    ) -> Result<Option<Predicate>> {
884        let boundary = expr.literal();
885
886        if !matches!(
887            boundary.data_type(),
888            &PrimitiveType::Int
889                | &PrimitiveType::Long
890                | &PrimitiveType::Decimal { .. }
891                | &PrimitiveType::Date
892                | &PrimitiveType::Timestamp
893                | &PrimitiveType::Timestamptz
894                | &PrimitiveType::TimestampNs
895                | &PrimitiveType::TimestamptzNs
896        ) {
897            return Err(Error::new(
898                ErrorKind::DataInvalid,
899                format!("Expected a numeric literal, got: {:?}", boundary),
900            ));
901        }
902
903        let predicate = match expr.op() {
904            PredicateOperator::LessThan => Some(Predicate::Binary(BinaryExpression::new(
905                PredicateOperator::LessThan,
906                Reference::new(name),
907                func.transform_literal_result(boundary)?,
908            ))),
909            PredicateOperator::LessThanOrEq => Some(Predicate::Binary(BinaryExpression::new(
910                PredicateOperator::LessThan,
911                Reference::new(name),
912                func.transform_literal_result(&Self::try_increment_number(boundary)?)?,
913            ))),
914            PredicateOperator::GreaterThan => Some(Predicate::Binary(BinaryExpression::new(
915                PredicateOperator::GreaterThan,
916                Reference::new(name),
917                func.transform_literal_result(boundary)?,
918            ))),
919            PredicateOperator::GreaterThanOrEq => Some(Predicate::Binary(BinaryExpression::new(
920                PredicateOperator::GreaterThan,
921                Reference::new(name),
922                func.transform_literal_result(&Self::try_decrement_number(boundary)?)?,
923            ))),
924            PredicateOperator::NotEq => Some(Predicate::Binary(BinaryExpression::new(
925                PredicateOperator::NotEq,
926                Reference::new(name),
927                func.transform_literal_result(boundary)?,
928            ))),
929            _ => None,
930        };
931
932        Ok(predicate)
933    }
934
935    fn truncate_array_strict(
936        &self,
937        name: &str,
938        expr: &BinaryExpression<BoundReference>,
939        func: &BoxedTransformFunction,
940    ) -> Result<Option<Predicate>> {
941        let boundary = expr.literal();
942
943        match expr.op() {
944            PredicateOperator::LessThan | PredicateOperator::LessThanOrEq => {
945                Ok(Some(Predicate::Binary(BinaryExpression::new(
946                    PredicateOperator::LessThan,
947                    Reference::new(name),
948                    func.transform_literal_result(boundary)?,
949                ))))
950            }
951            PredicateOperator::GreaterThan | PredicateOperator::GreaterThanOrEq => {
952                Ok(Some(Predicate::Binary(BinaryExpression::new(
953                    PredicateOperator::GreaterThan,
954                    Reference::new(name),
955                    func.transform_literal_result(boundary)?,
956                ))))
957            }
958            PredicateOperator::NotEq => Ok(Some(Predicate::Binary(BinaryExpression::new(
959                PredicateOperator::NotEq,
960                Reference::new(name),
961                func.transform_literal_result(boundary)?,
962            )))),
963            _ => Ok(None),
964        }
965    }
966}
967
968impl Display for Transform {
969    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
970        match self {
971            Transform::Identity => write!(f, "identity"),
972            Transform::Year => write!(f, "year"),
973            Transform::Month => write!(f, "month"),
974            Transform::Day => write!(f, "day"),
975            Transform::Hour => write!(f, "hour"),
976            Transform::Void => write!(f, "void"),
977            Transform::Bucket(length) => write!(f, "bucket[{length}]"),
978            Transform::Truncate(width) => write!(f, "truncate[{width}]"),
979            Transform::Unknown => write!(f, "unknown"),
980        }
981    }
982}
983
984impl FromStr for Transform {
985    type Err = Error;
986
987    fn from_str(s: &str) -> Result<Self> {
988        let t = match s {
989            "identity" => Transform::Identity,
990            "year" => Transform::Year,
991            "month" => Transform::Month,
992            "day" => Transform::Day,
993            "hour" => Transform::Hour,
994            "void" => Transform::Void,
995            "unknown" => Transform::Unknown,
996            v if v.starts_with("bucket") => {
997                let length = v
998                    .strip_prefix("bucket")
999                    .expect("transform must starts with `bucket`")
1000                    .trim_start_matches('[')
1001                    .trim_end_matches(']')
1002                    .parse()
1003                    .map_err(|err| {
1004                        Error::new(
1005                            ErrorKind::DataInvalid,
1006                            format!("transform bucket type {v:?} is invalid"),
1007                        )
1008                        .with_source(err)
1009                    })?;
1010
1011                Transform::Bucket(length)
1012            }
1013            v if v.starts_with("truncate") => {
1014                let width = v
1015                    .strip_prefix("truncate")
1016                    .expect("transform must starts with `truncate`")
1017                    .trim_start_matches('[')
1018                    .trim_end_matches(']')
1019                    .parse()
1020                    .map_err(|err| {
1021                        Error::new(
1022                            ErrorKind::DataInvalid,
1023                            format!("transform truncate type {v:?} is invalid"),
1024                        )
1025                        .with_source(err)
1026                    })?;
1027
1028                Transform::Truncate(width)
1029            }
1030            v => {
1031                return Err(Error::new(
1032                    ErrorKind::DataInvalid,
1033                    format!("transform {v:?} is invalid"),
1034                ));
1035            }
1036        };
1037
1038        Ok(t)
1039    }
1040}
1041
1042impl Serialize for Transform {
1043    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1044    where S: Serializer {
1045        serializer.serialize_str(format!("{self}").as_str())
1046    }
1047}
1048
1049impl<'de> Deserialize<'de> for Transform {
1050    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1051    where D: Deserializer<'de> {
1052        let s = String::deserialize(deserializer)?;
1053        s.parse().map_err(<D::Error as serde::de::Error>::custom)
1054    }
1055}
1056
1057/// An enum representing the result of the adjusted projection.
1058/// Either being a single adjusted datum or a set.
1059#[derive(Debug)]
1060enum AdjustedProjection {
1061    Single(Datum),
1062    Set(FnvHashSet<Datum>),
1063}