iceberg/spec/
transform.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Transforms in iceberg.
19
20use std::cmp::Ordering;
21use std::fmt::{Display, Formatter};
22use std::str::FromStr;
23
24use fnv::FnvHashSet;
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26
27use super::{Datum, PrimitiveLiteral};
28use crate::ErrorKind;
29use crate::error::{Error, Result};
30use crate::expr::{
31    BinaryExpression, BoundPredicate, BoundReference, Predicate, PredicateOperator, Reference,
32    SetExpression, UnaryExpression,
33};
34use crate::spec::Literal;
35use crate::spec::datatypes::{PrimitiveType, Type};
36use crate::transform::{BoxedTransformFunction, create_transform_function};
37
38/// Transform is used to transform predicates to partition predicates,
39/// in addition to transforming data values.
40///
41/// Deriving partition predicates from column predicates on the table data
42/// is used to separate the logical queries from physical storage: the
43/// partitioning can change and the correct partition filters are always
44/// derived from column predicates.
45///
46/// This simplifies queries because users don’t have to supply both logical
47/// predicates and partition predicates.
48///
49/// All transforms must return `null` for a `null` input value.
50#[derive(Debug, PartialEq, Eq, Clone, Copy)]
51pub enum Transform {
52    /// Source value, unmodified
53    ///
54    /// - Source type could be any type.
55    /// - Return type is the same with source type.
56    Identity,
57    /// Hash of value, mod `N`.
58    ///
59    /// Bucket partition transforms use a 32-bit hash of the source value.
60    /// The 32-bit hash implementation is the 32-bit Murmur3 hash, x86
61    /// variant, seeded with 0.
62    ///
63    /// Transforms are parameterized by a number of buckets, N. The hash mod
64    /// N must produce a positive value by first discarding the sign bit of
65    /// the hash value. In pseudo-code, the function is:
66    ///
67    /// ```text
68    /// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N
69    /// ```
70    ///
71    /// - Source type could be `int`, `long`, `decimal`, `date`, `time`,
72    ///   `timestamp`, `timestamptz`, `string`, `uuid`, `fixed`, `binary`.
73    /// - Return type is `int`.
74    Bucket(u32),
75    /// Value truncated to width `W`
76    ///
77    /// For `int`:
78    ///
79    /// - `v - (v % W)` remainders must be positive
80    /// - example: W=10: 1 → 0, -1 → -10
81    /// - note: The remainder, v % W, must be positive.
82    ///
83    /// For `long`:
84    ///
85    /// - `v - (v % W)` remainders must be positive
86    /// - example: W=10: 1 → 0, -1 → -10
87    /// - note: The remainder, v % W, must be positive.
88    ///
89    /// For `decimal`:
90    ///
91    /// - `scaled_W = decimal(W, scale(v)) v - (v % scaled_W)`
92    /// - example: W=50, s=2: 10.65 → 10.50
93    ///
94    /// For `string`:
95    ///
96    /// - Substring of length L: `v.substring(0, L)`
97    /// - example: L=3: iceberg → ice
98    /// - note: Strings are truncated to a valid UTF-8 string with no more
99    ///   than L code points.
100    ///
101    /// - Source type could be `int`, `long`, `decimal`, `string`
102    /// - Return type is the same with source type.
103    Truncate(u32),
104    /// Extract a date or timestamp year, as years from 1970
105    ///
106    /// - Source type could be `date`, `timestamp`, `timestamptz`
107    /// - Return type is `int`
108    Year,
109    /// Extract a date or timestamp month, as months from 1970-01-01
110    ///
111    /// - Source type could be `date`, `timestamp`, `timestamptz`
112    /// - Return type is `int`
113    Month,
114    /// Extract a date or timestamp day, as days from 1970-01-01
115    ///
116    /// - Source type could be `date`, `timestamp`, `timestamptz`
117    /// - Return type is `int`
118    Day,
119    /// Extract a timestamp hour, as hours from 1970-01-01 00:00:00
120    ///
121    /// - Source type could be `timestamp`, `timestamptz`
122    /// - Return type is `int`
123    Hour,
124    /// Always produces `null`
125    ///
126    /// The void transform may be used to replace the transform in an
127    /// existing partition field so that the field is effectively dropped in
128    /// v1 tables.
129    ///
130    /// - Source type could be any type..
131    /// - Return type is Source type.
132    Void,
133    /// Used to represent some customized transform that can't be recognized or supported now.
134    Unknown,
135}
136
137impl Transform {
138    /// Returns a human-readable String representation of a transformed value.
139    pub fn to_human_string(&self, field_type: &Type, value: Option<&Literal>) -> String {
140        let Some(value) = value else {
141            return "null".to_string();
142        };
143
144        if let Some(value) = value.as_primitive_literal() {
145            let field_type = field_type.as_primitive_type().unwrap();
146            let datum = Datum::new(field_type.clone(), value);
147
148            match self {
149                Self::Void => "null".to_string(),
150                _ => datum.to_human_string(),
151            }
152        } else {
153            "null".to_string()
154        }
155    }
156
157    /// Get the return type of transform given the input type.
158    /// Returns `None` if it can't be transformed.
159    pub fn result_type(&self, input_type: &Type) -> Result<Type> {
160        match self {
161            Transform::Identity => {
162                if matches!(input_type, Type::Primitive(_)) {
163                    Ok(input_type.clone())
164                } else {
165                    Err(Error::new(
166                        ErrorKind::DataInvalid,
167                        format!("{input_type} is not a valid input type of identity transform",),
168                    ))
169                }
170            }
171            Transform::Void => Ok(input_type.clone()),
172            Transform::Unknown => Ok(Type::Primitive(PrimitiveType::String)),
173            Transform::Bucket(_) => {
174                if let Type::Primitive(p) = input_type {
175                    match p {
176                        PrimitiveType::Int
177                        | PrimitiveType::Long
178                        | PrimitiveType::Decimal { .. }
179                        | PrimitiveType::Date
180                        | PrimitiveType::Time
181                        | PrimitiveType::Timestamp
182                        | PrimitiveType::Timestamptz
183                        | PrimitiveType::TimestampNs
184                        | PrimitiveType::TimestamptzNs
185                        | PrimitiveType::String
186                        | PrimitiveType::Uuid
187                        | PrimitiveType::Fixed(_)
188                        | PrimitiveType::Binary => Ok(Type::Primitive(PrimitiveType::Int)),
189                        _ => Err(Error::new(
190                            ErrorKind::DataInvalid,
191                            format!("{input_type} is not a valid input type of bucket transform",),
192                        )),
193                    }
194                } else {
195                    Err(Error::new(
196                        ErrorKind::DataInvalid,
197                        format!("{input_type} is not a valid input type of bucket transform",),
198                    ))
199                }
200            }
201            Transform::Truncate(_) => {
202                if let Type::Primitive(p) = input_type {
203                    match p {
204                        PrimitiveType::Int
205                        | PrimitiveType::Long
206                        | PrimitiveType::String
207                        | PrimitiveType::Binary
208                        | PrimitiveType::Decimal { .. } => Ok(input_type.clone()),
209                        _ => Err(Error::new(
210                            ErrorKind::DataInvalid,
211                            format!("{input_type} is not a valid input type of truncate transform",),
212                        )),
213                    }
214                } else {
215                    Err(Error::new(
216                        ErrorKind::DataInvalid,
217                        format!("{input_type} is not a valid input type of truncate transform",),
218                    ))
219                }
220            }
221            Transform::Year | Transform::Month => {
222                if let Type::Primitive(p) = input_type {
223                    match p {
224                        PrimitiveType::Timestamp
225                        | PrimitiveType::Timestamptz
226                        | PrimitiveType::TimestampNs
227                        | PrimitiveType::TimestamptzNs
228                        | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Int)),
229                        _ => Err(Error::new(
230                            ErrorKind::DataInvalid,
231                            format!("{input_type} is not a valid input type of {self} transform",),
232                        )),
233                    }
234                } else {
235                    Err(Error::new(
236                        ErrorKind::DataInvalid,
237                        format!("{input_type} is not a valid input type of {self} transform",),
238                    ))
239                }
240            }
241            Transform::Day => {
242                if let Type::Primitive(p) = input_type {
243                    match p {
244                        PrimitiveType::Timestamp
245                        | PrimitiveType::Timestamptz
246                        | PrimitiveType::TimestampNs
247                        | PrimitiveType::TimestamptzNs
248                        | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Date)),
249                        _ => Err(Error::new(
250                            ErrorKind::DataInvalid,
251                            format!("{input_type} is not a valid input type of {self} transform",),
252                        )),
253                    }
254                } else {
255                    Err(Error::new(
256                        ErrorKind::DataInvalid,
257                        format!("{input_type} is not a valid input type of {self} transform",),
258                    ))
259                }
260            }
261            Transform::Hour => {
262                if let Type::Primitive(p) = input_type {
263                    match p {
264                        PrimitiveType::Timestamp
265                        | PrimitiveType::Timestamptz
266                        | PrimitiveType::TimestampNs
267                        | PrimitiveType::TimestamptzNs => Ok(Type::Primitive(PrimitiveType::Int)),
268                        _ => Err(Error::new(
269                            ErrorKind::DataInvalid,
270                            format!("{input_type} is not a valid input type of {self} transform",),
271                        )),
272                    }
273                } else {
274                    Err(Error::new(
275                        ErrorKind::DataInvalid,
276                        format!("{input_type} is not a valid input type of {self} transform",),
277                    ))
278                }
279            }
280        }
281    }
282
283    /// Whether the transform preserves the order of values.
284    pub fn preserves_order(&self) -> bool {
285        !matches!(
286            self,
287            Transform::Void | Transform::Bucket(_) | Transform::Unknown
288        )
289    }
290
291    /// Return the unique transform name to check if similar transforms for the same source field
292    /// are added multiple times in partition spec builder.
293    pub fn dedup_name(&self) -> String {
294        match self {
295            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
296                "time".to_string()
297            }
298            _ => format!("{self}"),
299        }
300    }
301
302    /// Whether ordering by this transform's result satisfies the ordering of another transform's
303    /// result.
304    ///
305    /// For example, sorting by day(ts) will produce an ordering that is also by month(ts) or
306    ///  year(ts). However, sorting by day(ts) will not satisfy the order of hour(ts) or identity(ts).
307    pub fn satisfies_order_of(&self, other: &Self) -> bool {
308        match self {
309            Transform::Identity => other.preserves_order(),
310            Transform::Hour => matches!(
311                other,
312                Transform::Hour | Transform::Day | Transform::Month | Transform::Year
313            ),
314            Transform::Day => matches!(other, Transform::Day | Transform::Month | Transform::Year),
315            Transform::Month => matches!(other, Transform::Month | Transform::Year),
316            _ => self == other,
317        }
318    }
319
320    /// Strictly projects a given predicate according to the transformation
321    /// specified by the `Transform` instance.
322    ///
323    /// This method ensures that the projected predicate is strictly aligned
324    /// with the transformation logic, providing a more precise filtering
325    /// mechanism for transformed data.
326    ///
327    /// # Example
328    /// Suppose, we have row filter `a = 10`, and a partition spec
329    /// `bucket(a, 37) as bs`, if one row matches `a = 10`, then its partition
330    /// value should match `bucket(10, 37) as bs`, and we project `a = 10` to
331    /// `bs = bucket(10, 37)`
332    pub fn strict_project(
333        &self,
334        name: &str,
335        predicate: &BoundPredicate,
336    ) -> Result<Option<Predicate>> {
337        let func = create_transform_function(self)?;
338
339        match self {
340            Transform::Identity => match predicate {
341                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
342                BoundPredicate::Binary(expr) => Ok(Some(Predicate::Binary(BinaryExpression::new(
343                    expr.op(),
344                    Reference::new(name),
345                    expr.literal().to_owned(),
346                )))),
347                BoundPredicate::Set(expr) => Ok(Some(Predicate::Set(SetExpression::new(
348                    expr.op(),
349                    Reference::new(name),
350                    expr.literals().to_owned(),
351                )))),
352                _ => Ok(None),
353            },
354            Transform::Bucket(_) => match predicate {
355                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
356                BoundPredicate::Binary(expr) => {
357                    self.project_binary_expr(name, PredicateOperator::NotEq, expr, &func)
358                }
359                BoundPredicate::Set(expr) => {
360                    self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
361                }
362                _ => Ok(None),
363            },
364            Transform::Truncate(width) => match predicate {
365                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
366                BoundPredicate::Binary(expr) => {
367                    if matches!(
368                        expr.term().field().field_type.as_primitive_type(),
369                        Some(&PrimitiveType::Int)
370                            | Some(&PrimitiveType::Long)
371                            | Some(&PrimitiveType::Decimal { .. })
372                    ) {
373                        self.truncate_number_strict(name, expr, &func)
374                    } else if expr.op() == PredicateOperator::StartsWith {
375                        let len = match expr.literal().literal() {
376                            PrimitiveLiteral::String(s) => s.len(),
377                            PrimitiveLiteral::Binary(b) => b.len(),
378                            _ => {
379                                return Err(Error::new(
380                                    ErrorKind::DataInvalid,
381                                    format!(
382                                        "Expected a string or binary literal, got: {:?}",
383                                        expr.literal()
384                                    ),
385                                ));
386                            }
387                        };
388                        match len.cmp(&(*width as usize)) {
389                            Ordering::Less => Ok(Some(Predicate::Binary(BinaryExpression::new(
390                                PredicateOperator::StartsWith,
391                                Reference::new(name),
392                                expr.literal().to_owned(),
393                            )))),
394                            Ordering::Equal => Ok(Some(Predicate::Binary(BinaryExpression::new(
395                                PredicateOperator::Eq,
396                                Reference::new(name),
397                                expr.literal().to_owned(),
398                            )))),
399                            Ordering::Greater => Ok(None),
400                        }
401                    } else if expr.op() == PredicateOperator::NotStartsWith {
402                        let len = match expr.literal().literal() {
403                            PrimitiveLiteral::String(s) => s.len(),
404                            PrimitiveLiteral::Binary(b) => b.len(),
405                            _ => {
406                                return Err(Error::new(
407                                    ErrorKind::DataInvalid,
408                                    format!(
409                                        "Expected a string or binary literal, got: {:?}",
410                                        expr.literal()
411                                    ),
412                                ));
413                            }
414                        };
415                        match len.cmp(&(*width as usize)) {
416                            Ordering::Less => Ok(Some(Predicate::Binary(BinaryExpression::new(
417                                PredicateOperator::NotStartsWith,
418                                Reference::new(name),
419                                expr.literal().to_owned(),
420                            )))),
421                            Ordering::Equal => Ok(Some(Predicate::Binary(BinaryExpression::new(
422                                PredicateOperator::NotEq,
423                                Reference::new(name),
424                                expr.literal().to_owned(),
425                            )))),
426                            Ordering::Greater => {
427                                Ok(Some(Predicate::Binary(BinaryExpression::new(
428                                    expr.op(),
429                                    Reference::new(name),
430                                    func.transform_literal_result(expr.literal())?,
431                                ))))
432                            }
433                        }
434                    } else {
435                        self.truncate_array_strict(name, expr, &func)
436                    }
437                }
438                BoundPredicate::Set(expr) => {
439                    self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
440                }
441                _ => Ok(None),
442            },
443            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
444                match predicate {
445                    BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
446                    BoundPredicate::Binary(expr) => self.truncate_number_strict(name, expr, &func),
447                    BoundPredicate::Set(expr) => {
448                        self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
449                    }
450                    _ => Ok(None),
451                }
452            }
453            _ => Ok(None),
454        }
455    }
456
457    /// Projects a given predicate according to the transformation
458    /// specified by the `Transform` instance.
459    ///
460    /// This allows predicates to be effectively applied to data
461    /// that has undergone transformation, enabling efficient querying
462    /// and filtering based on the original, untransformed data.
463    ///
464    /// # Example
465    /// Suppose, we have row filter `a = 10`, and a partition spec
466    /// `bucket(a, 37) as bs`, if one row matches `a = 10`, then its partition
467    /// value should match `bucket(10, 37) as bs`, and we project `a = 10` to
468    /// `bs = bucket(10, 37)`
469    pub fn project(&self, name: &str, predicate: &BoundPredicate) -> Result<Option<Predicate>> {
470        let func = create_transform_function(self)?;
471
472        match self {
473            Transform::Identity => match predicate {
474                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
475                BoundPredicate::Binary(expr) => Ok(Some(Predicate::Binary(BinaryExpression::new(
476                    expr.op(),
477                    Reference::new(name),
478                    expr.literal().to_owned(),
479                )))),
480                BoundPredicate::Set(expr) => Ok(Some(Predicate::Set(SetExpression::new(
481                    expr.op(),
482                    Reference::new(name),
483                    expr.literals().to_owned(),
484                )))),
485                _ => Ok(None),
486            },
487            Transform::Bucket(_) => match predicate {
488                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
489                BoundPredicate::Binary(expr) => {
490                    self.project_binary_expr(name, PredicateOperator::Eq, expr, &func)
491                }
492                BoundPredicate::Set(expr) => {
493                    self.project_set_expr(expr, PredicateOperator::In, name, &func)
494                }
495                _ => Ok(None),
496            },
497            Transform::Truncate(width) => match predicate {
498                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
499                BoundPredicate::Binary(expr) => {
500                    self.project_binary_with_adjusted_boundary(name, expr, &func, Some(*width))
501                }
502                BoundPredicate::Set(expr) => {
503                    self.project_set_expr(expr, PredicateOperator::In, name, &func)
504                }
505                _ => Ok(None),
506            },
507            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
508                match predicate {
509                    BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
510                    BoundPredicate::Binary(expr) => {
511                        self.project_binary_with_adjusted_boundary(name, expr, &func, None)
512                    }
513                    BoundPredicate::Set(expr) => {
514                        self.project_set_expr(expr, PredicateOperator::In, name, &func)
515                    }
516                    _ => Ok(None),
517                }
518            }
519            _ => Ok(None),
520        }
521    }
522
523    /// Check if `Transform` is applicable on datum's `PrimitiveType`
524    fn can_transform(&self, datum: &Datum) -> bool {
525        let input_type = datum.data_type().clone();
526        self.result_type(&Type::Primitive(input_type)).is_ok()
527    }
528
529    /// Creates a unary predicate from a given operator and a reference name.
530    fn project_unary(op: PredicateOperator, name: &str) -> Result<Option<Predicate>> {
531        Ok(Some(Predicate::Unary(UnaryExpression::new(
532            op,
533            Reference::new(name),
534        ))))
535    }
536
537    /// Attempts to create a binary predicate based on a binary expression,
538    /// if applicable.
539    ///
540    /// This method evaluates a given binary expression and, if the operation
541    /// is the given operator and the literal can be transformed, constructs a
542    /// `Predicate::Binary`variant representing the binary operation.
543    fn project_binary_expr(
544        &self,
545        name: &str,
546        op: PredicateOperator,
547        expr: &BinaryExpression<BoundReference>,
548        func: &BoxedTransformFunction,
549    ) -> Result<Option<Predicate>> {
550        if expr.op() != op || !self.can_transform(expr.literal()) {
551            return Ok(None);
552        }
553
554        Ok(Some(Predicate::Binary(BinaryExpression::new(
555            expr.op(),
556            Reference::new(name),
557            func.transform_literal_result(expr.literal())?,
558        ))))
559    }
560
561    /// Projects a binary expression to a predicate with an adjusted boundary.
562    ///
563    /// Checks if the literal within the given binary expression is
564    /// transformable. If transformable, it proceeds to potentially adjust
565    /// the boundary of the expression based on the comparison operator (`op`).
566    /// The potential adjustments involve incrementing or decrementing the
567    /// literal value and changing the `PredicateOperator` itself to its
568    /// inclusive variant.
569    fn project_binary_with_adjusted_boundary(
570        &self,
571        name: &str,
572        expr: &BinaryExpression<BoundReference>,
573        func: &BoxedTransformFunction,
574        width: Option<u32>,
575    ) -> Result<Option<Predicate>> {
576        if !self.can_transform(expr.literal()) {
577            return Ok(None);
578        }
579
580        let op = &expr.op();
581        let datum = &expr.literal();
582
583        if let Some(boundary) = Self::adjust_boundary(op, datum)? {
584            let transformed_projection = func.transform_literal_result(&boundary)?;
585
586            let adjusted_projection =
587                self.adjust_time_projection(op, datum, &transformed_projection);
588
589            let adjusted_operator = Self::adjust_operator(op, datum, width);
590
591            if let Some(op) = adjusted_operator {
592                let predicate = match adjusted_projection {
593                    None => Predicate::Binary(BinaryExpression::new(
594                        op,
595                        Reference::new(name),
596                        transformed_projection,
597                    )),
598                    Some(AdjustedProjection::Single(d)) => {
599                        Predicate::Binary(BinaryExpression::new(op, Reference::new(name), d))
600                    }
601                    Some(AdjustedProjection::Set(d)) => Predicate::Set(SetExpression::new(
602                        PredicateOperator::In,
603                        Reference::new(name),
604                        d,
605                    )),
606                };
607                return Ok(Some(predicate));
608            }
609        };
610
611        Ok(None)
612    }
613
614    /// Projects a set expression to a predicate,
615    /// applying a transformation to each literal in the set.
616    fn project_set_expr(
617        &self,
618        expr: &SetExpression<BoundReference>,
619        op: PredicateOperator,
620        name: &str,
621        func: &BoxedTransformFunction,
622    ) -> Result<Option<Predicate>> {
623        if expr.op() != op || expr.literals().iter().any(|d| !self.can_transform(d)) {
624            return Ok(None);
625        }
626
627        let mut new_set = FnvHashSet::default();
628
629        for lit in expr.literals() {
630            let datum = func.transform_literal_result(lit)?;
631
632            if let Some(AdjustedProjection::Single(d)) =
633                self.adjust_time_projection(&op, lit, &datum)
634            {
635                new_set.insert(d);
636            };
637
638            new_set.insert(datum);
639        }
640
641        Ok(Some(Predicate::Set(SetExpression::new(
642            expr.op(),
643            Reference::new(name),
644            new_set,
645        ))))
646    }
647
648    /// Adjusts the boundary value for comparison operations
649    /// based on the specified `PredicateOperator` and `Datum`.
650    ///
651    /// This function modifies the boundary value for certain comparison
652    /// operators (`LessThan`, `GreaterThan`) by incrementing or decrementing
653    /// the literal value within the given `Datum`. For operators that do not
654    /// imply a boundary shift (`Eq`, `LessThanOrEq`, `GreaterThanOrEq`,
655    /// `StartsWith`, `NotStartsWith`), the original datum is returned
656    /// unmodified.
657    fn adjust_boundary(op: &PredicateOperator, datum: &Datum) -> Result<Option<Datum>> {
658        let adjusted_boundary = match op {
659            PredicateOperator::LessThan => match (datum.data_type(), datum.literal()) {
660                (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(Datum::int(v - 1)),
661                (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(Datum::long(v - 1)),
662                (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
663                    Some(Datum::decimal(v - 1)?)
664                }
665                (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(Datum::date(v - 1)),
666                (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
667                    Some(Datum::timestamp_micros(v - 1))
668                }
669                _ => Some(datum.to_owned()),
670            },
671            PredicateOperator::GreaterThan => match (datum.data_type(), datum.literal()) {
672                (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(Datum::int(v + 1)),
673                (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(Datum::long(v + 1)),
674                (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
675                    Some(Datum::decimal(v + 1)?)
676                }
677                (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(Datum::date(v + 1)),
678                (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
679                    Some(Datum::timestamp_micros(v + 1))
680                }
681                _ => Some(datum.to_owned()),
682            },
683            PredicateOperator::Eq
684            | PredicateOperator::LessThanOrEq
685            | PredicateOperator::GreaterThanOrEq
686            | PredicateOperator::StartsWith
687            | PredicateOperator::NotStartsWith => Some(datum.to_owned()),
688            _ => None,
689        };
690
691        Ok(adjusted_boundary)
692    }
693
694    /// Adjusts the comparison operator based on the specified datum and an
695    /// optional width constraint.
696    ///
697    /// This function modifies the comparison operator for `LessThan` and
698    /// `GreaterThan` cases to their inclusive counterparts (`LessThanOrEq`,
699    /// `GreaterThanOrEq`) unconditionally. For `StartsWith` and
700    /// `NotStartsWith` operators acting on string literals, the operator may
701    /// be adjusted to `Eq` or `NotEq` if the string length matches the
702    /// specified width, indicating a precise match rather than a prefix
703    /// condition.
704    fn adjust_operator(
705        op: &PredicateOperator,
706        datum: &Datum,
707        width: Option<u32>,
708    ) -> Option<PredicateOperator> {
709        match op {
710            PredicateOperator::LessThan => Some(PredicateOperator::LessThanOrEq),
711            PredicateOperator::GreaterThan => Some(PredicateOperator::GreaterThanOrEq),
712            PredicateOperator::StartsWith => match datum.literal() {
713                PrimitiveLiteral::String(s) => {
714                    if let Some(w) = width
715                        && s.len() == w as usize
716                    {
717                        return Some(PredicateOperator::Eq);
718                    };
719                    Some(*op)
720                }
721                _ => Some(*op),
722            },
723            PredicateOperator::NotStartsWith => match datum.literal() {
724                PrimitiveLiteral::String(s) => {
725                    if let Some(w) = width {
726                        let w = w as usize;
727
728                        if s.len() == w {
729                            return Some(PredicateOperator::NotEq);
730                        }
731
732                        if s.len() < w {
733                            return Some(*op);
734                        }
735
736                        return None;
737                    };
738                    Some(*op)
739                }
740                _ => Some(*op),
741            },
742            _ => Some(*op),
743        }
744    }
745
746    /// Adjust projection for temporal transforms, align with Java
747    /// implementation: https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java#L275
748    fn adjust_time_projection(
749        &self,
750        op: &PredicateOperator,
751        original: &Datum,
752        transformed: &Datum,
753    ) -> Option<AdjustedProjection> {
754        let should_adjust = match self {
755            Transform::Day => matches!(original.data_type(), PrimitiveType::Timestamp),
756            Transform::Year | Transform::Month => true,
757            _ => false,
758        };
759
760        if should_adjust && let &PrimitiveLiteral::Int(v) = transformed.literal() {
761            match op {
762                PredicateOperator::LessThan
763                | PredicateOperator::LessThanOrEq
764                | PredicateOperator::In => {
765                    if v < 0 {
766                        // # TODO
767                        // An ugly hack to fix. Refine the increment and decrement logic later.
768                        match self {
769                            Transform::Day => {
770                                return Some(AdjustedProjection::Single(Datum::date(v + 1)));
771                            }
772                            _ => {
773                                return Some(AdjustedProjection::Single(Datum::int(v + 1)));
774                            }
775                        }
776                    };
777                }
778                PredicateOperator::Eq => {
779                    if v < 0 {
780                        let new_set = FnvHashSet::from_iter(vec![
781                            transformed.to_owned(),
782                            // # TODO
783                            // An ugly hack to fix. Refine the increment and decrement logic later.
784                            {
785                                match self {
786                                    Transform::Day => Datum::date(v + 1),
787                                    _ => Datum::int(v + 1),
788                                }
789                            },
790                        ]);
791                        return Some(AdjustedProjection::Set(new_set));
792                    }
793                }
794                _ => {
795                    return None;
796                }
797            }
798        };
799        None
800    }
801
802    // Increment for Int, Long, Decimal, Date, Timestamp
803    // Ignore other types
804    #[inline]
805    fn try_increment_number(datum: &Datum) -> Result<Datum> {
806        match (datum.data_type(), datum.literal()) {
807            (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Ok(Datum::int(v + 1)),
808            (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Ok(Datum::long(v + 1)),
809            (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => Datum::decimal(v + 1),
810            (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Ok(Datum::date(v + 1)),
811            (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
812                Ok(Datum::timestamp_micros(v + 1))
813            }
814            (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
815                Ok(Datum::timestamp_nanos(v + 1))
816            }
817            (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
818                Ok(Datum::timestamptz_micros(v + 1))
819            }
820            (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
821                Ok(Datum::timestamptz_nanos(v + 1))
822            }
823            (PrimitiveType::Int, _)
824            | (PrimitiveType::Long, _)
825            | (PrimitiveType::Decimal { .. }, _)
826            | (PrimitiveType::Date, _)
827            | (PrimitiveType::Timestamp, _) => Err(Error::new(
828                ErrorKind::Unexpected,
829                format!(
830                    "Unsupported literal increment for type: {:?}",
831                    datum.data_type()
832                ),
833            )),
834            _ => Ok(datum.to_owned()),
835        }
836    }
837
838    // Decrement for Int, Long, Decimal, Date, Timestamp
839    // Ignore other types
840    #[inline]
841    fn try_decrement_number(datum: &Datum) -> Result<Datum> {
842        match (datum.data_type(), datum.literal()) {
843            (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Ok(Datum::int(v - 1)),
844            (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Ok(Datum::long(v - 1)),
845            (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => Datum::decimal(v - 1),
846            (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Ok(Datum::date(v - 1)),
847            (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
848                Ok(Datum::timestamp_micros(v - 1))
849            }
850            (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
851                Ok(Datum::timestamp_nanos(v - 1))
852            }
853            (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
854                Ok(Datum::timestamptz_micros(v - 1))
855            }
856            (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
857                Ok(Datum::timestamptz_nanos(v - 1))
858            }
859            (PrimitiveType::Int, _)
860            | (PrimitiveType::Long, _)
861            | (PrimitiveType::Decimal { .. }, _)
862            | (PrimitiveType::Date, _)
863            | (PrimitiveType::Timestamp, _) => Err(Error::new(
864                ErrorKind::Unexpected,
865                format!(
866                    "Unsupported literal decrement for type: {:?}",
867                    datum.data_type()
868                ),
869            )),
870            _ => Ok(datum.to_owned()),
871        }
872    }
873
874    fn truncate_number_strict(
875        &self,
876        name: &str,
877        expr: &BinaryExpression<BoundReference>,
878        func: &BoxedTransformFunction,
879    ) -> Result<Option<Predicate>> {
880        let boundary = expr.literal();
881
882        if !matches!(
883            boundary.data_type(),
884            &PrimitiveType::Int
885                | &PrimitiveType::Long
886                | &PrimitiveType::Decimal { .. }
887                | &PrimitiveType::Date
888                | &PrimitiveType::Timestamp
889                | &PrimitiveType::Timestamptz
890                | &PrimitiveType::TimestampNs
891                | &PrimitiveType::TimestamptzNs
892        ) {
893            return Err(Error::new(
894                ErrorKind::DataInvalid,
895                format!("Expected a numeric literal, got: {boundary:?}"),
896            ));
897        }
898
899        let predicate = match expr.op() {
900            PredicateOperator::LessThan => Some(Predicate::Binary(BinaryExpression::new(
901                PredicateOperator::LessThan,
902                Reference::new(name),
903                func.transform_literal_result(boundary)?,
904            ))),
905            PredicateOperator::LessThanOrEq => Some(Predicate::Binary(BinaryExpression::new(
906                PredicateOperator::LessThan,
907                Reference::new(name),
908                func.transform_literal_result(&Self::try_increment_number(boundary)?)?,
909            ))),
910            PredicateOperator::GreaterThan => Some(Predicate::Binary(BinaryExpression::new(
911                PredicateOperator::GreaterThan,
912                Reference::new(name),
913                func.transform_literal_result(boundary)?,
914            ))),
915            PredicateOperator::GreaterThanOrEq => Some(Predicate::Binary(BinaryExpression::new(
916                PredicateOperator::GreaterThan,
917                Reference::new(name),
918                func.transform_literal_result(&Self::try_decrement_number(boundary)?)?,
919            ))),
920            PredicateOperator::NotEq => Some(Predicate::Binary(BinaryExpression::new(
921                PredicateOperator::NotEq,
922                Reference::new(name),
923                func.transform_literal_result(boundary)?,
924            ))),
925            _ => None,
926        };
927
928        Ok(predicate)
929    }
930
931    fn truncate_array_strict(
932        &self,
933        name: &str,
934        expr: &BinaryExpression<BoundReference>,
935        func: &BoxedTransformFunction,
936    ) -> Result<Option<Predicate>> {
937        let boundary = expr.literal();
938
939        match expr.op() {
940            PredicateOperator::LessThan | PredicateOperator::LessThanOrEq => {
941                Ok(Some(Predicate::Binary(BinaryExpression::new(
942                    PredicateOperator::LessThan,
943                    Reference::new(name),
944                    func.transform_literal_result(boundary)?,
945                ))))
946            }
947            PredicateOperator::GreaterThan | PredicateOperator::GreaterThanOrEq => {
948                Ok(Some(Predicate::Binary(BinaryExpression::new(
949                    PredicateOperator::GreaterThan,
950                    Reference::new(name),
951                    func.transform_literal_result(boundary)?,
952                ))))
953            }
954            PredicateOperator::NotEq => Ok(Some(Predicate::Binary(BinaryExpression::new(
955                PredicateOperator::NotEq,
956                Reference::new(name),
957                func.transform_literal_result(boundary)?,
958            )))),
959            _ => Ok(None),
960        }
961    }
962}
963
964impl Display for Transform {
965    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
966        match self {
967            Transform::Identity => write!(f, "identity"),
968            Transform::Year => write!(f, "year"),
969            Transform::Month => write!(f, "month"),
970            Transform::Day => write!(f, "day"),
971            Transform::Hour => write!(f, "hour"),
972            Transform::Void => write!(f, "void"),
973            Transform::Bucket(length) => write!(f, "bucket[{length}]"),
974            Transform::Truncate(width) => write!(f, "truncate[{width}]"),
975            Transform::Unknown => write!(f, "unknown"),
976        }
977    }
978}
979
980impl FromStr for Transform {
981    type Err = Error;
982
983    fn from_str(s: &str) -> Result<Self> {
984        let t = match s {
985            "identity" => Transform::Identity,
986            "year" => Transform::Year,
987            "month" => Transform::Month,
988            "day" => Transform::Day,
989            "hour" => Transform::Hour,
990            "void" => Transform::Void,
991            "unknown" => Transform::Unknown,
992            v if v.starts_with("bucket") => {
993                let length = v
994                    .strip_prefix("bucket")
995                    .expect("transform must starts with `bucket`")
996                    .trim_start_matches('[')
997                    .trim_end_matches(']')
998                    .parse()
999                    .map_err(|err| {
1000                        Error::new(
1001                            ErrorKind::DataInvalid,
1002                            format!("transform bucket type {v:?} is invalid"),
1003                        )
1004                        .with_source(err)
1005                    })?;
1006
1007                Transform::Bucket(length)
1008            }
1009            v if v.starts_with("truncate") => {
1010                let width = v
1011                    .strip_prefix("truncate")
1012                    .expect("transform must starts with `truncate`")
1013                    .trim_start_matches('[')
1014                    .trim_end_matches(']')
1015                    .parse()
1016                    .map_err(|err| {
1017                        Error::new(
1018                            ErrorKind::DataInvalid,
1019                            format!("transform truncate type {v:?} is invalid"),
1020                        )
1021                        .with_source(err)
1022                    })?;
1023
1024                Transform::Truncate(width)
1025            }
1026            v => {
1027                return Err(Error::new(
1028                    ErrorKind::DataInvalid,
1029                    format!("transform {v:?} is invalid"),
1030                ));
1031            }
1032        };
1033
1034        Ok(t)
1035    }
1036}
1037
1038impl Serialize for Transform {
1039    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1040    where S: Serializer {
1041        serializer.serialize_str(format!("{self}").as_str())
1042    }
1043}
1044
1045impl<'de> Deserialize<'de> for Transform {
1046    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1047    where D: Deserializer<'de> {
1048        let s = String::deserialize(deserializer)?;
1049        s.parse().map_err(<D::Error as serde::de::Error>::custom)
1050    }
1051}
1052
1053/// An enum representing the result of the adjusted projection.
1054/// Either being a single adjusted datum or a set.
1055#[derive(Debug)]
1056enum AdjustedProjection {
1057    Single(Datum),
1058    Set(FnvHashSet<Datum>),
1059}